Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3de241d

Browse files
committed
Foreign keys on partitioned tables
Author: Álvaro Herrera Discussion: https://postgr.es/m/20171231194359.cvojcour423ulha4@alvherre.pgsql Reviewed-by: Peter Eisentraut
1 parent 857f9c3 commit 3de241d

File tree

17 files changed

+895
-109
lines changed

17 files changed

+895
-109
lines changed

doc/src/sgml/ref/alter_table.sgml

+2-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
368368
specified check constraints). But the
369369
database will not assume that the constraint holds for all rows in
370370
the table, until it is validated by using the <literal>VALIDATE
371-
CONSTRAINT</literal> option.
371+
CONSTRAINT</literal> option. Foreign key constraints on partitioned
372+
tables may not be declared <literal>NOT VALID</literal> at present.
372373
</para>
373374

374375
<para>

doc/src/sgml/ref/create_table.sgml

+9-4
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,12 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
546546
</para>
547547

548548
<para>
549-
Partitioned tables do not support <literal>EXCLUDE</literal> or
550-
<literal>FOREIGN KEY</literal> constraints; however, you can define
551-
these constraints on individual partitions.
549+
Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
550+
however, you can define these constraints on individual partitions.
551+
Also, while it's possible to define <literal>PRIMARY KEY</literal>
552+
constraints on partitioned tables, it is not supported to create foreign
553+
keys cannot that reference them. This restriction will be lifted in a
554+
future release.
552555
</para>
553556

554557
</listitem>
@@ -907,7 +910,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
907910
must have <literal>REFERENCES</literal> permission on the referenced table
908911
(either the whole table, or the specific referenced columns).
909912
Note that foreign key constraints cannot be defined between temporary
910-
tables and permanent tables.
913+
tables and permanent tables. Also note that while it is possible to
914+
define a foreign key on a partitioned table, it is not possible to
915+
declare a foreign key that references a partitioned table.
911916
</para>
912917

913918
<para>

src/backend/catalog/pg_constraint.c

+237
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "catalog/pg_operator.h"
2727
#include "catalog/pg_type.h"
2828
#include "commands/defrem.h"
29+
#include "commands/tablecmds.h"
2930
#include "utils/array.h"
3031
#include "utils/builtins.h"
3132
#include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
377378
return conOid;
378379
}
379380

381+
/*
382+
* CloneForeignKeyConstraints
383+
* Clone foreign keys from a partitioned table to a newly acquired
384+
* partition.
385+
*
386+
* relationId is a partition of parentId, so we can be certain that it has the
387+
* same columns with the same datatypes. The columns may be in different
388+
* order, though.
389+
*
390+
* The *cloned list is appended ClonedConstraint elements describing what was
391+
* created.
392+
*/
393+
void
394+
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
395+
{
396+
Relation pg_constraint;
397+
Relation parentRel;
398+
Relation rel;
399+
ScanKeyData key;
400+
SysScanDesc scan;
401+
TupleDesc tupdesc;
402+
HeapTuple tuple;
403+
AttrNumber *attmap;
404+
405+
parentRel = heap_open(parentId, NoLock); /* already got lock */
406+
/* see ATAddForeignKeyConstraint about lock level */
407+
rel = heap_open(relationId, AccessExclusiveLock);
408+
409+
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
410+
tupdesc = RelationGetDescr(pg_constraint);
411+
412+
/*
413+
* The constraint key may differ, if the columns in the partition are
414+
* different. This map is used to convert them.
415+
*/
416+
attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
417+
RelationGetDescr(parentRel),
418+
gettext_noop("could not convert row type"));
419+
420+
ScanKeyInit(&key,
421+
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
422+
F_OIDEQ, ObjectIdGetDatum(parentId));
423+
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
424+
NULL, 1, &key);
425+
426+
while ((tuple = systable_getnext(scan)) != NULL)
427+
{
428+
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
429+
AttrNumber conkey[INDEX_MAX_KEYS];
430+
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
431+
AttrNumber confkey[INDEX_MAX_KEYS];
432+
Oid conpfeqop[INDEX_MAX_KEYS];
433+
Oid conppeqop[INDEX_MAX_KEYS];
434+
Oid conffeqop[INDEX_MAX_KEYS];
435+
Constraint *fkconstraint;
436+
ClonedConstraint *newc;
437+
Oid constrOid;
438+
ObjectAddress parentAddr,
439+
childAddr;
440+
int nelem;
441+
int i;
442+
ArrayType *arr;
443+
Datum datum;
444+
bool isnull;
445+
446+
/* only foreign keys */
447+
if (constrForm->contype != CONSTRAINT_FOREIGN)
448+
continue;
449+
450+
ObjectAddressSet(parentAddr, ConstraintRelationId,
451+
HeapTupleGetOid(tuple));
452+
453+
datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
454+
tupdesc, &isnull);
455+
if (isnull)
456+
elog(ERROR, "null conkey");
457+
arr = DatumGetArrayTypeP(datum);
458+
nelem = ARR_DIMS(arr)[0];
459+
if (ARR_NDIM(arr) != 1 ||
460+
nelem < 1 ||
461+
nelem > INDEX_MAX_KEYS ||
462+
ARR_HASNULL(arr) ||
463+
ARR_ELEMTYPE(arr) != INT2OID)
464+
elog(ERROR, "conkey is not a 1-D smallint array");
465+
memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
466+
467+
for (i = 0; i < nelem; i++)
468+
mapped_conkey[i] = attmap[conkey[i] - 1];
469+
470+
datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
471+
tupdesc, &isnull);
472+
if (isnull)
473+
elog(ERROR, "null confkey");
474+
arr = DatumGetArrayTypeP(datum);
475+
nelem = ARR_DIMS(arr)[0];
476+
if (ARR_NDIM(arr) != 1 ||
477+
nelem < 1 ||
478+
nelem > INDEX_MAX_KEYS ||
479+
ARR_HASNULL(arr) ||
480+
ARR_ELEMTYPE(arr) != INT2OID)
481+
elog(ERROR, "confkey is not a 1-D smallint array");
482+
memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
483+
484+
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
485+
tupdesc, &isnull);
486+
if (isnull)
487+
elog(ERROR, "null conpfeqop");
488+
arr = DatumGetArrayTypeP(datum);
489+
nelem = ARR_DIMS(arr)[0];
490+
if (ARR_NDIM(arr) != 1 ||
491+
nelem < 1 ||
492+
nelem > INDEX_MAX_KEYS ||
493+
ARR_HASNULL(arr) ||
494+
ARR_ELEMTYPE(arr) != OIDOID)
495+
elog(ERROR, "conpfeqop is not a 1-D OID array");
496+
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
497+
498+
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
499+
tupdesc, &isnull);
500+
if (isnull)
501+
elog(ERROR, "null conpfeqop");
502+
arr = DatumGetArrayTypeP(datum);
503+
nelem = ARR_DIMS(arr)[0];
504+
if (ARR_NDIM(arr) != 1 ||
505+
nelem < 1 ||
506+
nelem > INDEX_MAX_KEYS ||
507+
ARR_HASNULL(arr) ||
508+
ARR_ELEMTYPE(arr) != OIDOID)
509+
elog(ERROR, "conpfeqop is not a 1-D OID array");
510+
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
511+
512+
datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
513+
tupdesc, &isnull);
514+
if (isnull)
515+
elog(ERROR, "null conppeqop");
516+
arr = DatumGetArrayTypeP(datum);
517+
nelem = ARR_DIMS(arr)[0];
518+
if (ARR_NDIM(arr) != 1 ||
519+
nelem < 1 ||
520+
nelem > INDEX_MAX_KEYS ||
521+
ARR_HASNULL(arr) ||
522+
ARR_ELEMTYPE(arr) != OIDOID)
523+
elog(ERROR, "conppeqop is not a 1-D OID array");
524+
memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
525+
526+
datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
527+
tupdesc, &isnull);
528+
if (isnull)
529+
elog(ERROR, "null conffeqop");
530+
arr = DatumGetArrayTypeP(datum);
531+
nelem = ARR_DIMS(arr)[0];
532+
if (ARR_NDIM(arr) != 1 ||
533+
nelem < 1 ||
534+
nelem > INDEX_MAX_KEYS ||
535+
ARR_HASNULL(arr) ||
536+
ARR_ELEMTYPE(arr) != OIDOID)
537+
elog(ERROR, "conffeqop is not a 1-D OID array");
538+
memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
539+
540+
constrOid =
541+
CreateConstraintEntry(NameStr(constrForm->conname),
542+
constrForm->connamespace,
543+
CONSTRAINT_FOREIGN,
544+
constrForm->condeferrable,
545+
constrForm->condeferred,
546+
constrForm->convalidated,
547+
HeapTupleGetOid(tuple),
548+
relationId,
549+
mapped_conkey,
550+
nelem,
551+
InvalidOid, /* not a domain constraint */
552+
constrForm->conindid, /* same index */
553+
constrForm->confrelid, /* same foreign rel */
554+
confkey,
555+
conpfeqop,
556+
conppeqop,
557+
conffeqop,
558+
nelem,
559+
constrForm->confupdtype,
560+
constrForm->confdeltype,
561+
constrForm->confmatchtype,
562+
NULL,
563+
NULL,
564+
NULL,
565+
NULL,
566+
false,
567+
1, false, true);
568+
569+
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
570+
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
571+
572+
fkconstraint = makeNode(Constraint);
573+
/* for now this is all we need */
574+
fkconstraint->fk_upd_action = constrForm->confupdtype;
575+
fkconstraint->fk_del_action = constrForm->confdeltype;
576+
fkconstraint->deferrable = constrForm->condeferrable;
577+
fkconstraint->initdeferred = constrForm->condeferred;
578+
579+
createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
580+
constrOid, constrForm->conindid, false);
581+
582+
if (cloned)
583+
{
584+
/*
585+
* Feed back caller about the constraints we created, so that they can
586+
* set up constraint verification.
587+
*/
588+
newc = palloc(sizeof(ClonedConstraint));
589+
newc->relid = relationId;
590+
newc->refrelid = constrForm->confrelid;
591+
newc->conindid = constrForm->conindid;
592+
newc->conid = constrOid;
593+
newc->constraint = fkconstraint;
594+
595+
*cloned = lappend(*cloned, newc);
596+
}
597+
}
598+
systable_endscan(scan);
599+
600+
pfree(attmap);
601+
602+
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
603+
{
604+
PartitionDesc partdesc = RelationGetPartitionDesc(rel);
605+
int i;
606+
607+
for (i = 0; i < partdesc->nparts; i++)
608+
CloneForeignKeyConstraints(RelationGetRelid(rel),
609+
partdesc->oids[i],
610+
cloned);
611+
}
612+
613+
heap_close(rel, NoLock); /* keep lock till commit */
614+
heap_close(parentRel, NoLock);
615+
heap_close(pg_constraint, RowShareLock);
616+
}
380617

381618
/*
382619
* Test whether given name is currently used as a constraint name

0 commit comments

Comments
 (0)