Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Correct attach/detach logic for FKs in partitions
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 12 Oct 2018 15:36:26 +0000 (12:36 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 12 Oct 2018 15:37:37 +0000 (12:37 -0300)
There was no code to handle foreign key constraints on partitioned
tables in the case of ALTER TABLE DETACH; and if you happened to ATTACH
a partition that already had an equivalent constraint, that one was
ignored and a new constraint was created.  Adding this to the fact that
foreign key cloning reuses the constraint name on the partition instead
of generating a new name (as it probably should, to cater to SQL
standard rules about constraint naming within schemas), the result was a
pretty poor user experience -- the most visible failure was that just
detaching a partition and re-attaching it failed with an error such as

  ERROR:  duplicate key value violates unique constraint "pg_constraint_conrelid_contypid_conname_index"
  DETAIL:  Key (conrelid, contypid, conname)=(26702, 0, test_result_asset_id_fkey) already exists.

because it would try to create an identically-named constraint in the
partition.  To make matters worse, if you tried to drop the constraint
in the now-independent partition, that would fail because the constraint
was still seen as dependent on the constraint in its former parent
partitioned table:
  ERROR:  cannot drop inherited constraint "test_result_asset_id_fkey" of relation "test_result_cbsystem_0001_0050_monthly_2018_09"

This fix attacks the problem from two angles: first, when the partition
is detached, the constraint is also marked as independent, so the drop
now works.  Second, when the partition is re-attached, we scan existing
constraints searching for one matching the FK in the parent, and if one
exists, we link that one to the parent constraint.  So we don't end up
with a duplicate -- and better yet, we don't need to scan the referenced
table to verify that the constraint holds.

To implement this I made a small change to previously planner-only
struct ForeignKeyCacheInfo to contain the constraint OID; also relcache
now maintains the list of FKs for partitioned tables too.

Backpatch to 11.

Reported-by: Michael Vitale (bug #15425)
Discussion: https://postgr.es/m/15425-2dbc9d2aa999f816@postgresql.org

src/backend/catalog/pg_constraint.c
src/backend/commands/tablecmds.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/utils/cache/relcache.c
src/include/utils/rel.h
src/test/regress/expected/foreign_key.out
src/test/regress/sql/foreign_key.sql

index 2063abb8aeb6a4c55e83188f05e6198e02ac9850..f4057a9f15276c1d001966b3df72a1ab170002bb 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/tupconvert.h"
+#include "access/xact.h"
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
 #include "utils/tqual.h"
 
 
+static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
+                    Relation partRel, List *clone, List **cloned);
+
+
 /*
  * CreateConstraintEntry
  * Create a constraint table entry.
@@ -400,34 +405,74 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
    Relation    rel;
    ScanKeyData key;
    SysScanDesc scan;
-   TupleDesc   tupdesc;
    HeapTuple   tuple;
-   AttrNumber *attmap;
+   List       *clone = NIL;
 
    parentRel = heap_open(parentId, NoLock);    /* already got lock */
    /* see ATAddForeignKeyConstraint about lock level */
    rel = heap_open(relationId, AccessExclusiveLock);
-
    pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+
+   /* Obtain the list of constraints to clone or attach */
+   ScanKeyInit(&key,
+               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+               F_OIDEQ, ObjectIdGetDatum(parentId));
+   scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+                             NULL, 1, &key);
+   while ((tuple = systable_getnext(scan)) != NULL)
+       clone = lappend_oid(clone, HeapTupleGetOid(tuple));
+   systable_endscan(scan);
+
+   /* Do the actual work, recursing to partitions as needed */
+   clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
+
+   /* We're done.  Clean up */
+   heap_close(parentRel, NoLock);
+   heap_close(rel, NoLock);    /* keep lock till commit */
+   heap_close(pg_constraint, RowShareLock);
+}
+
+/*
+ * clone_fk_constraints
+ *     Recursive subroutine for CloneForeignKeyConstraints
+ *
+ * Clone the given list of FK constraints when a partition is attached.
+ *
+ * When cloning foreign keys to a partition, it may happen that equivalent
+ * constraints already exist in the partition for some of them.  We can skip
+ * creating a clone in that case, and instead just attach the existing
+ * constraint to the one in the parent.
+ *
+ * This function recurses to partitions, if the new partition is partitioned;
+ * of course, only do this for FKs that were actually cloned.
+ */
+static void
+clone_fk_constraints(Relation pg_constraint, Relation parentRel,
+                    Relation partRel, List *clone, List **cloned)
+{
+   TupleDesc   tupdesc;
+   AttrNumber *attmap;
+   List       *partFKs;
+   List       *subclone = NIL;
+   ListCell   *cell;
+
    tupdesc = RelationGetDescr(pg_constraint);
 
    /*
     * The constraint key may differ, if the columns in the partition are
     * different.  This map is used to convert them.
     */
-   attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+   attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
                                        RelationGetDescr(parentRel),
                                        gettext_noop("could not convert row type"));
 
-   ScanKeyInit(&key,
-               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-               F_OIDEQ, ObjectIdGetDatum(parentId));
-   scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-                             NULL, 1, &key);
+   partFKs = copyObject(RelationGetFKeyList(partRel));
 
-   while ((tuple = systable_getnext(scan)) != NULL)
+   foreach(cell, clone)
    {
-       Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+       Oid         parentConstrOid = lfirst_oid(cell);
+       Form_pg_constraint constrForm;
+       HeapTuple   tuple;
        AttrNumber  conkey[INDEX_MAX_KEYS];
        AttrNumber  mapped_conkey[INDEX_MAX_KEYS];
        AttrNumber  confkey[INDEX_MAX_KEYS];
@@ -435,22 +480,31 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
        Oid         conppeqop[INDEX_MAX_KEYS];
        Oid         conffeqop[INDEX_MAX_KEYS];
        Constraint *fkconstraint;
-       ClonedConstraint *newc;
+       bool        attach_it;
        Oid         constrOid;
        ObjectAddress parentAddr,
                    childAddr;
        int         nelem;
+       ListCell   *cell;
        int         i;
        ArrayType  *arr;
        Datum       datum;
        bool        isnull;
 
+       tuple = SearchSysCache1(CONSTROID, parentConstrOid);
+       if (!tuple)
+           elog(ERROR, "cache lookup failed for constraint %u",
+                parentConstrOid);
+       constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
        /* only foreign keys */
        if (constrForm->contype != CONSTRAINT_FOREIGN)
+       {
+           ReleaseSysCache(tuple);
            continue;
+       }
 
-       ObjectAddressSet(parentAddr, ConstraintRelationId,
-                        HeapTupleGetOid(tuple));
+       ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
 
        datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
                            tupdesc, &isnull);
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
            elog(ERROR, "conffeqop is not a 1-D OID array");
        memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
 
+       /*
+        * Before creating a new constraint, see whether any existing FKs are
+        * fit for the purpose.  If one is, attach the parent constraint to it,
+        * and don't clone anything.  This way we avoid the expensive
+        * verification step and don't end up with a duplicate FK.  This also
+        * means we don't consider this constraint when recursing to
+        * partitions.
+        */
+       attach_it = false;
+       foreach(cell, partFKs)
+       {
+           ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+           Form_pg_constraint partConstr;
+           HeapTuple   partcontup;
+
+           attach_it = true;
+
+           /*
+            * Do some quick & easy initial checks.  If any of these fail, we
+            * cannot use this constraint, but keep looking.
+            */
+           if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
+           {
+               attach_it = false;
+               continue;
+           }
+           for (i = 0; i < nelem; i++)
+           {
+               if (fk->conkey[i] != mapped_conkey[i] ||
+                   fk->confkey[i] != confkey[i] ||
+                   fk->conpfeqop[i] != conpfeqop[i])
+               {
+                   attach_it = false;
+                   break;
+               }
+           }
+           if (!attach_it)
+               continue;
+
+           /*
+            * Looks good so far; do some more extensive checks.  Presumably
+            * the check for 'convalidated' could be dropped, since we don't
+            * really care about that, but let's be careful for now.
+            */
+           partcontup = SearchSysCache1(CONSTROID,
+                                        ObjectIdGetDatum(fk->conoid));
+           if (!partcontup)
+               elog(ERROR, "cache lookup failed for constraint %u",
+                    fk->conoid);
+           partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+           if (OidIsValid(partConstr->conparentid) ||
+               !partConstr->convalidated ||
+               partConstr->condeferrable != constrForm->condeferrable ||
+               partConstr->condeferred != constrForm->condeferred ||
+               partConstr->confupdtype != constrForm->confupdtype ||
+               partConstr->confdeltype != constrForm->confdeltype ||
+               partConstr->confmatchtype != constrForm->confmatchtype)
+           {
+               ReleaseSysCache(partcontup);
+               attach_it = false;
+               continue;
+           }
+
+           ReleaseSysCache(partcontup);
+
+           /* looks good!  Attach this constraint */
+           ConstraintSetParentConstraint(fk->conoid,
+                                         HeapTupleGetOid(tuple));
+           CommandCounterIncrement();
+           attach_it = true;
+           break;
+       }
+
+       /*
+        * If we attached to an existing constraint, there is no need to
+        * create a new one.  In fact, there's no need to recurse for this
+        * constraint to partitions, either.
+        */
+       if (attach_it)
+       {
+           ReleaseSysCache(tuple);
+           continue;
+       }
+
        constrOid =
            CreateConstraintEntry(NameStr(constrForm->conname),
                                  constrForm->connamespace,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                                  constrForm->condeferred,
                                  constrForm->convalidated,
                                  HeapTupleGetOid(tuple),
-                                 relationId,
+                                 RelationGetRelid(partRel),
                                  mapped_conkey,
                                  nelem,
                                  nelem,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                                  NULL,
                                  false,
                                  1, false, true);
+       subclone = lappend_oid(subclone, constrOid);
 
        ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
        recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
@@ -580,17 +719,19 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
        fkconstraint->deferrable = constrForm->condeferrable;
        fkconstraint->initdeferred = constrForm->condeferred;
 
-       createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
+       createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
                                 constrOid, constrForm->conindid, false);
 
        if (cloned)
        {
+           ClonedConstraint *newc;
+
            /*
             * Feed back caller about the constraints we created, so that they
             * can set up constraint verification.
             */
            newc = palloc(sizeof(ClonedConstraint));
-           newc->relid = relationId;
+           newc->relid = RelationGetRelid(partRel);
            newc->refrelid = constrForm->confrelid;
            newc->conindid = constrForm->conindid;
            newc->conid = constrOid;
@@ -598,25 +739,36 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 
            *cloned = lappend(*cloned, newc);
        }
+
+       ReleaseSysCache(tuple);
    }
-   systable_endscan(scan);
 
    pfree(attmap);
+   list_free_deep(partFKs);
 
-   if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+   /*
+    * If the partition is partitioned, recurse to handle any constraints that
+    * were cloned.
+    */
+   if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+       subclone != NIL)
    {
-       PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+       PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
        int         i;
 
        for (i = 0; i < partdesc->nparts; i++)
-           CloneForeignKeyConstraints(RelationGetRelid(rel),
-                                      partdesc->oids[i],
-                                      cloned);
+       {
+           Relation    childRel;
+
+           childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
+           clone_fk_constraints(pg_constraint,
+                                partRel,
+                                childRel,
+                                subclone,
+                                cloned);
+           heap_close(childRel, NoLock);   /* keep lock till commit */
+       }
    }
-
-   heap_close(rel, NoLock);    /* keep lock till commit */
-   heap_close(parentRel, NoLock);
-   heap_close(pg_constraint, RowShareLock);
 }
 
 /*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
        elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
    newtup = heap_copytuple(tuple);
    constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
-   constrForm->conislocal = false;
-   constrForm->coninhcount++;
-   constrForm->conparentid = parentConstrId;
-   CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
-   ReleaseSysCache(tuple);
+   if (OidIsValid(parentConstrId))
+   {
+       constrForm->conislocal = false;
+       constrForm->coninhcount++;
+       constrForm->conparentid = parentConstrId;
+
+       CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
 
-   ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
-   ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
+       ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
+       ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
 
-   recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
+       recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
+   }
+   else
+   {
+       constrForm->coninhcount--;
+       if (constrForm->coninhcount <= 0)
+           constrForm->conislocal = true;
+       constrForm->conparentid = InvalidOid;
+
+       deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
+                                       ConstraintRelationId,
+                                       DEPENDENCY_INTERNAL_AUTO);
+       CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
+   }
 
+   ReleaseSysCache(tuple);
    heap_close(constrRel, RowExclusiveLock);
 }
 
index e10d3dbf3ddfaac09e0fb1d017656c43cc460b3f..3e112b4ef428d966deca31a4fe485bd44521ddb1 100644 (file)
@@ -14091,6 +14091,11 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 
    attachrel = heap_openrv(cmd->name, AccessExclusiveLock);
 
+   /*
+    * XXX I think it'd be a good idea to grab locks on all tables referenced
+    * by FKs at this point also.
+    */
+
    /*
     * Must be owner of both parent and source table -- parent was checked by
     * ATSimplePermissions call in ATPrepCmd
@@ -14663,6 +14668,7 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
    ObjectAddress address;
    Oid         defaultPartOid;
    List       *indexes;
+   List       *fks;
    ListCell   *cell;
 
    /*
@@ -14738,6 +14744,23 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
    }
    heap_close(classRel, RowExclusiveLock);
 
+   /* Detach foreign keys */
+   fks = copyObject(RelationGetFKeyList(partRel));
+   foreach(cell, fks)
+   {
+       ForeignKeyCacheInfo *fk = lfirst(cell);
+       HeapTuple   contup;
+
+       contup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
+       if (!contup)
+           elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
+
+       ConstraintSetParentConstraint(fk->conoid, InvalidOid);
+
+       ReleaseSysCache(contup);
+   }
+   list_free_deep(fks);
+
    /*
     * Invalidate the parent's relcache so that the partition is no longer
     * included in its partition descriptor.
index e47641d572159f612b9ce3656ff5a5c77d28897e..e8ea59e34afd135b1730ebf4b41e399d36c51780 100644 (file)
@@ -4745,6 +4745,7 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from)
 {
    ForeignKeyCacheInfo *newnode = makeNode(ForeignKeyCacheInfo);
 
+   COPY_SCALAR_FIELD(conoid);
    COPY_SCALAR_FIELD(conrelid);
    COPY_SCALAR_FIELD(confrelid);
    COPY_SCALAR_FIELD(nkeys);
index 9f4ffac91e6b215f6fb049f1d84dd82105e7a020..69731ccdea2edf34698dfbdee7e568f722b3cf5b 100644 (file)
@@ -3633,6 +3633,7 @@ _outForeignKeyCacheInfo(StringInfo str, const ForeignKeyCacheInfo *node)
 
    WRITE_NODE_TYPE("FOREIGNKEYCACHEINFO");
 
+   WRITE_OID_FIELD(conoid);
    WRITE_OID_FIELD(conrelid);
    WRITE_OID_FIELD(confrelid);
    WRITE_INT_FIELD(nkeys);
index a4fc0011031ec4dabc7e5f58feb1c26663cf5147..fd3d010b7783219141f7f25979fde7f61cc5a47c 100644 (file)
@@ -4108,8 +4108,9 @@ RelationGetFKeyList(Relation relation)
    if (relation->rd_fkeyvalid)
        return relation->rd_fkeylist;
 
-   /* Fast path: if it doesn't have any triggers, it can't have FKs */
-   if (!relation->rd_rel->relhastriggers)
+   /* Fast path: non-partitioned tables without triggers can't have FKs */
+   if (!relation->rd_rel->relhastriggers &&
+       relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
        return NIL;
 
    /*
@@ -4144,6 +4145,7 @@ RelationGetFKeyList(Relation relation)
            continue;
 
        info = makeNode(ForeignKeyCacheInfo);
+       info->conoid = HeapTupleGetOid(htup);
        info->conrelid = constraint->conrelid;
        info->confrelid = constraint->confrelid;
 
index 6ecbdb629443848856ecddcf8198fbd297f61d47..84469f57151976bfe15a884abe606868e874265d 100644 (file)
@@ -202,12 +202,13 @@ typedef struct RelationData
  * The per-FK-column arrays can be fixed-size because we allow at most
  * INDEX_MAX_KEYS columns in a foreign key constraint.
  *
- * Currently, we only cache fields of interest to the planner, but the
- * set of fields could be expanded in future.
+ * Currently, we mostly cache fields of interest to the planner, but the set
+ * of fields has already grown the constraint OID for other uses.
  */
 typedef struct ForeignKeyCacheInfo
 {
    NodeTag     type;
+   Oid         conoid;         /* oid of the constraint itself */
    Oid         conrelid;       /* relation constrained by the foreign key */
    Oid         confrelid;      /* relation referenced by the foreign key */
    int         nkeys;          /* number of columns in the foreign key */
index 4e5cb8901e1c57992c268b1d9ff28d43f2c5012c..52164e89d2917eeee9648666aff8be07b8722bed 100644 (file)
@@ -1648,6 +1648,125 @@ SELECT * FROM fk_partitioned_fk WHERE a = 142857;
 
 -- verify that DROP works
 DROP TABLE fk_partitioned_fk_2;
+-- Test behavior of the constraint together with attaching and detaching
+-- partitions.
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_2;
+BEGIN;
+DROP TABLE fk_partitioned_fk;
+-- constraint should still be there
+\d fk_partitioned_fk_2;
+        Table "public.fk_partitioned_fk_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 2501
+ b      | integer |           |          | 142857
+Foreign-key constraints:
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+ROLLBACK;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_2 (b int, c text, a int,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk ON UPDATE CASCADE ON DELETE CASCADE);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN c;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+-- should have only one constraint
+\d fk_partitioned_fk_2
+        Table "public.fk_partitioned_fk_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ b      | integer |           |          | 
+ a      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (1500, 1502)
+Foreign-key constraints:
+    "fk_partitioned_fk_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a);
+CREATE TABLE fk_partitioned_fk_4_1 PARTITION OF fk_partitioned_fk_4 FOR VALUES FROM (1,1) TO (100,100);
+CREATE TABLE fk_partitioned_fk_4_2 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL);
+ALTER TABLE fk_partitioned_fk_4 ATTACH PARTITION fk_partitioned_fk_4_2 FOR VALUES FROM (100,100) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_4;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+-- should only have one constraint
+\d fk_partitioned_fk_4
+        Table "public.fk_partitioned_fk_4"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (3500, 3502)
+Partition key: RANGE (b, a)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+Number of partitions: 2 (Use \d+ to list them.)
+
+\d fk_partitioned_fk_4_1
+       Table "public.fk_partitioned_fk_4_1"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_4 FOR VALUES FROM (1, 1) TO (100, 100)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+-- this one has an FK with mismatched properties
+\d fk_partitioned_fk_4_2
+       Table "public.fk_partitioned_fk_4_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_4 FOR VALUES FROM (100, 100) TO (1000, 1000)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+CREATE TABLE fk_partitioned_fk_5 (a int, b int,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE)
+  PARTITION BY RANGE (a);
+CREATE TABLE fk_partitioned_fk_5_1 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_5;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+-- this one has two constraints, similar but not quite the one in the parent,
+-- so it gets a new one
+\d fk_partitioned_fk_5
+        Table "public.fk_partitioned_fk_5"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (4500)
+Partition key: RANGE (a)
+Foreign-key constraints:
+    "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
+    "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+Number of partitions: 1 (Use \d+ to list them.)
+
+-- verify that it works to reattaching a child with multiple candidate
+-- constraints
+ALTER TABLE fk_partitioned_fk_5 DETACH PARTITION fk_partitioned_fk_5_1;
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+\d fk_partitioned_fk_5_1
+       Table "public.fk_partitioned_fk_5_1"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_5 FOR VALUES FROM (0) TO (10)
+Foreign-key constraints:
+    "fk_partitioned_fk_5_1_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b)
+    "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
+    "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
 -- verify that attaching a table checks that the existing data satisfies the
 -- constraint
 CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
index 6fcb5dfb4eb3b001e3fdb2cafc646a47bd38d994..f3870048551f546a908d6ee74fc196b0d36fb868 100644 (file)
@@ -1226,6 +1226,56 @@ SELECT * FROM fk_partitioned_fk WHERE a = 142857;
 -- verify that DROP works
 DROP TABLE fk_partitioned_fk_2;
 
+-- Test behavior of the constraint together with attaching and detaching
+-- partitions.
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_2;
+BEGIN;
+DROP TABLE fk_partitioned_fk;
+-- constraint should still be there
+\d fk_partitioned_fk_2;
+ROLLBACK;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_2 (b int, c text, a int,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk ON UPDATE CASCADE ON DELETE CASCADE);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN c;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+-- should have only one constraint
+\d fk_partitioned_fk_2
+DROP TABLE fk_partitioned_fk_2;
+
+CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a);
+CREATE TABLE fk_partitioned_fk_4_1 PARTITION OF fk_partitioned_fk_4 FOR VALUES FROM (1,1) TO (100,100);
+CREATE TABLE fk_partitioned_fk_4_2 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL);
+ALTER TABLE fk_partitioned_fk_4 ATTACH PARTITION fk_partitioned_fk_4_2 FOR VALUES FROM (100,100) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_4;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+-- should only have one constraint
+\d fk_partitioned_fk_4
+\d fk_partitioned_fk_4_1
+-- this one has an FK with mismatched properties
+\d fk_partitioned_fk_4_2
+
+CREATE TABLE fk_partitioned_fk_5 (a int, b int,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE,
+   FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE)
+  PARTITION BY RANGE (a);
+CREATE TABLE fk_partitioned_fk_5_1 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_5;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+-- this one has two constraints, similar but not quite the one in the parent,
+-- so it gets a new one
+\d fk_partitioned_fk_5
+-- verify that it works to reattaching a child with multiple candidate
+-- constraints
+ALTER TABLE fk_partitioned_fk_5 DETACH PARTITION fk_partitioned_fk_5_1;
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+\d fk_partitioned_fk_5_1
+
 -- verify that attaching a table checks that the existing data satisfies the
 -- constraint
 CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);