Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Herrera2019-04-03 17:38:20 +0000
committerAlvaro Herrera2019-04-03 17:40:21 +0000
commitf56f8f8da6afd8523b4d5284e02a20ed2b33ef8d (patch)
treee5f59afa60601ff9c2e92d7746df6dba57b73c99 /src/backend/commands/tablecmds.c
parent9155580fd5fc2a0cbb23376dfca7cd21f59c2c7b (diff)
Support foreign keys that reference partitioned tables
Previously, while primary keys could be made on partitioned tables, it was not possible to define foreign keys that reference those primary keys. Now it is possible to do that. Author: Álvaro Herrera Reviewed-by: Amit Langote, Jesper Pedersen Discussion: https://postgr.es/m/20181102234158.735b3fevta63msbj@alvherre.pgsql
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c1342
1 files changed, 965 insertions, 377 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 654179297cf..978b6bec44a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -415,10 +415,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
-static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
- List **cloned);
-static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
- Relation partRel, List *clone, List **cloned);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+ Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+ Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok, LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+ Relation partitionRel);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel);
+static void CloneFkReferencing(List **wqueue, Relation parentRel,
+ Relation partRel);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+ Constraint *fkconstraint, Oid constraintOid,
+ Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+ Constraint *fkconstraint, Oid constraintOid,
+ Oid indexOid);
+static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+ Oid partRelid,
+ Oid parentConstrOid, int numfks,
+ AttrNumber *mapped_conkey, AttrNumber *confkey,
+ Oid *conpfeqop);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
@@ -501,6 +523,8 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
static void update_relispartition(Relation classRel, Oid relationId,
bool newval);
+static List *GetParentedForeignKeyRefs(Relation partition);
+static void ATDetachCheckNoForeignKeyRefs(Relation partition);
/* ----------------------------------------------------------------
@@ -1083,7 +1107,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
* And foreign keys too. Note that because we're freshly creating the
* table, there is no need to verify these new constraints.
*/
- CloneForeignKeyConstraints(parentId, relationId, NULL);
+ CloneForeignKeyConstraints(NULL, parent, rel);
table_close(parent, NoLock);
}
@@ -3563,7 +3587,8 @@ AlterTableGetLockLevel(List *cmds)
/*
* Removing constraints can affect SELECTs that have been
- * optimised assuming the constraint holds true.
+ * optimised assuming the constraint holds true. See also
+ * CloneFkReferenced.
*/
case AT_DropConstraint: /* as DROP INDEX */
case AT_DropNotNull: /* may change some SQL plans */
@@ -7224,9 +7249,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
case CONSTR_FOREIGN:
/*
- * Note that we currently never recurse for FK constraints, so the
- * "recurse" flag is silently ignored.
- *
* Assign or validate constraint name
*/
if (newConstraint->conname)
@@ -7444,6 +7466,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Subroutine for ATExecAddConstraint. Must already hold exclusive
* lock on the rel, and have done appropriate validity checks for it.
* We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof). We also need action triggers on each leaf partition on the
+ * referenced side, and check triggers on each leaf partition on the
+ * referencing side.
*/
static ObjectAddress
ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7459,12 +7488,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pfeqoperators[INDEX_MAX_KEYS];
Oid ppeqoperators[INDEX_MAX_KEYS];
Oid ffeqoperators[INDEX_MAX_KEYS];
- bool connoinherit;
int i;
int numfks,
numpks;
Oid indexOid;
- Oid constrOid;
bool old_check_ok;
ObjectAddress address;
ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7482,12 +7509,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Validity checks (permission checks wait till we have the column
* numbers)
*/
- if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot reference partitioned table \"%s\"",
- RelationGetRelationName(pkrel))));
-
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (!recurse)
@@ -7505,7 +7526,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
errdetail("This feature is not yet supported on partitioned tables.")));
}
- if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+ if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+ pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("referenced relation \"%s\" is not a table",
@@ -7741,8 +7763,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("foreign key constraint \"%s\" "
- "cannot be implemented",
+ errmsg("foreign key constraint \"%s\" cannot be implemented",
fkconstraint->conname),
errdetail("Key columns \"%s\" and \"%s\" "
"are of incompatible types: %s and %s.",
@@ -7830,15 +7851,126 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
- * FKs always inherit for partitioned tables, and never for legacy
- * inheritance.
+ * Create all the constraint and trigger objects, recursing to partitions
+ * as necessary. First handle the referenced side.
+ */
+ address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+ indexOid,
+ InvalidOid, /* no parent constraint */
+ numfks,
+ pkattnum,
+ fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok);
+
+ /* Now handle the referencing side. */
+ addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+ indexOid,
+ address.objectId,
+ numfks,
+ pkattnum,
+ fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok,
+ lockmode);
+
+ /*
+ * Done. Close pk table, but keep lock until we've committed.
+ */
+ table_close(pkrel, NoLock);
+
+ return address;
+}
+
+/*
+ * addFkRecurseReferenced
+ * subroutine for ATAddForeignKeyConstraint; recurses on the referenced
+ * side of the constraint
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions. If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+ Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks,
+ int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+ Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+ ObjectAddress address;
+ Oid constrOid;
+ char *conname;
+ bool conislocal;
+ int coninhcount;
+ bool connoinherit;
+
+ /*
+ * Verify relkind for each referenced partition. At the top level, this
+ * is redundant with a previous check, but we need it when recursing.
*/
- connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+ if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+ pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("referenced relation \"%s\" is not a table",
+ RelationGetRelationName(pkrel))));
+
+ /*
+ * Caller supplies us with a constraint name; however, it may be used in
+ * this partition, so come up with a different one in that case.
+ */
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(rel),
+ fkconstraint->conname))
+ conname = ChooseConstraintName(RelationGetRelationName(rel),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(rel), NIL);
+ else
+ conname = fkconstraint->conname;
+
+ if (OidIsValid(parentConstr))
+ {
+ conislocal = false;
+ coninhcount = 1;
+ connoinherit = false;
+ }
+ else
+ {
+ conislocal = true;
+ coninhcount = 0;
+
+ /*
+ * always inherit for partitioned tables, never for legacy inheritance
+ */
+ connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+ }
/*
* Record the FK constraint in pg_constraint.
*/
- constrOid = CreateConstraintEntry(fkconstraint->conname,
+ constrOid = CreateConstraintEntry(conname,
RelationGetNamespace(rel),
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
@@ -7856,108 +7988,317 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
- numpks,
+ numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkconstraint->fk_matchtype,
NULL, /* no exclusion constraint */
NULL, /* no check constraint */
NULL,
- true, /* islocal */
- 0, /* inhcount */
- connoinherit, /* conNoInherit */
+ conislocal, /* islocal */
+ coninhcount, /* inhcount */
+ connoinherit, /* conNoInherit */
false); /* is_internal */
+
ObjectAddressSet(address, ConstraintRelationId, constrOid);
/*
- * Create the triggers that will enforce the constraint. We only want the
- * action triggers to appear for the parent partitioned relation, even
- * though the constraints also exist below.
+ * Mark the child constraint as part of the parent constraint; it must not
+ * be dropped on its own. (This constraint is deleted when the partition
+ * is detached, but a special check needs to occur that the partition
+ * contains no referenced values.)
*/
- createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
- constrOid, indexOid, !recursing);
+ if (OidIsValid(parentConstr))
+ {
+ ObjectAddress referenced;
+
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_INTERNAL);
+ }
+
+ /* make new constraint visible, in case we add more */
+ CommandCounterIncrement();
/*
- * Tell Phase 3 to check that the constraint is satisfied by existing
- * rows. We can skip this during table creation, when requested explicitly
- * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
- * recreating a constraint following a SET DATA TYPE operation that did
- * not impugn its validity.
+ * If the referenced table is a plain relation, create the action triggers
+ * that enforce the constraint.
*/
- if (!old_check_ok && !fkconstraint->skip_validation)
+ if (pkrel->rd_rel->relkind == RELKIND_RELATION)
{
- NewConstraint *newcon;
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = fkconstraint->conname;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = RelationGetRelid(pkrel);
- newcon->refindid = indexOid;
- newcon->conid = constrOid;
- newcon->qual = (Node *) fkconstraint;
-
- tab->constraints = lappend(tab->constraints, newcon);
+ createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+ fkconstraint,
+ constrOid, indexOid);
}
/*
- * When called on a partitioned table, recurse to create the constraint on
- * the partitions also.
+ * If the referenced table is partitioned, recurse on ourselves to handle
+ * each partition. We need one pg_constraint row created for each
+ * partition in addition to the pg_constraint row for the parent table.
*/
- if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
- PartitionDesc partdesc;
- Relation pg_constraint;
- List *cloned = NIL;
- ListCell *cell;
-
- pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
- partdesc = RelationGetPartitionDesc(rel);
+ PartitionDesc pd = RelationGetPartitionDesc(pkrel);
- for (i = 0; i < partdesc->nparts; i++)
+ for (int i = 0; i < pd->nparts; i++)
{
- Oid partitionId = partdesc->oids[i];
- Relation partition = table_open(partitionId, lockmode);
-
- CheckTableNotInUse(partition, "ALTER TABLE");
+ Relation partRel;
+ AttrNumber *map;
+ AttrNumber *mapped_pkattnum;
+ Oid partIndexId;
- CloneFkReferencing(pg_constraint, rel, partition,
- list_make1_oid(constrOid),
- &cloned);
+ partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
- table_close(partition, NoLock);
+ /*
+ * Map the attribute numbers in the referenced side of the FK
+ * definition to match the partition's column layout.
+ */
+ map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+ RelationGetDescr(pkrel),
+ gettext_noop("could not convert row type"));
+ if (map)
+ {
+ mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+ for (int j = 0; j < numfks; j++)
+ mapped_pkattnum[j] = map[pkattnum[j] - 1];
+ }
+ else
+ mapped_pkattnum = pkattnum;
+
+ /* do the deed */
+ partIndexId = index_get_partition(partRel, indexOid);
+ if (!OidIsValid(partIndexId))
+ elog(ERROR, "index for %u not found in partition %s",
+ indexOid, RelationGetRelationName(partRel));
+ addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+ partIndexId, constrOid, numfks,
+ mapped_pkattnum, fkattnum,
+ pfeqoperators, ppeqoperators, ffeqoperators,
+ old_check_ok);
+
+ /* Done -- clean up (but keep the lock) */
+ table_close(partRel, NoLock);
+ if (map)
+ {
+ pfree(mapped_pkattnum);
+ pfree(map);
+ }
}
- table_close(pg_constraint, RowExclusiveLock);
+ }
+
+ return address;
+}
+
+/*
+ * addFkRecurseReferencing
+ * subroutine for ATAddForeignKeyConstraint and CloneFkReferencing
+ *
+ * If the referencing relation is a plain relation, create the necessary check
+ * triggers that implement the constraint, and set up for Phase 3 constraint
+ * verification. If the referencing relation is a partitioned table, then
+ * we create a pg_constraint row for it and recurse on this routine for each
+ * partition.
+ *
+ * We assume that the referenced relation is locked against concurrent
+ * deletions. If it's a partitioned relation, every partition must be so
+ * locked.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ * lockmode is the lockmode to acquire on partitions when recursing.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+ Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok, LOCKMODE lockmode)
+{
+ AssertArg(OidIsValid(parentConstr));
+
+ if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("foreign keys constraints are not supported on foreign tables")));
+
+ /*
+ * If the referencing relation is a plain table, add the check triggers to
+ * it and, if necessary, schedule it to be checked in Phase 3.
+ *
+ * If the relation is partitioned, drill down to do it to its partitions.
+ */
+ if (rel->rd_rel->relkind == RELKIND_RELATION)
+ {
+ createForeignKeyCheckTriggers(RelationGetRelid(rel),
+ RelationGetRelid(pkrel),
+ fkconstraint,
+ parentConstr,
+ indexOid);
- foreach(cell, cloned)
+ /*
+ * Tell Phase 3 to check that the constraint is satisfied by existing
+ * rows. We can skip this during table creation, when requested
+ * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+ * and when we're recreating a constraint following a SET DATA TYPE
+ * operation that did not impugn its validity.
+ */
+ if (wqueue && !old_check_ok && !fkconstraint->skip_validation)
{
- ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
- Relation partition = table_open(cc->relid, lockmode);
- AlteredTableInfo *childtab;
NewConstraint *newcon;
+ AlteredTableInfo *tab;
- /* Find or create work queue entry for this partition */
- childtab = ATGetQueueEntry(wqueue, partition);
+ tab = ATGetQueueEntry(wqueue, rel);
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = cc->constraint->conname;
+ newcon->name = get_constraint_name(parentConstr);
newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = cc->refrelid;
- newcon->refindid = cc->conindid;
- newcon->conid = cc->conid;
+ newcon->refrelid = RelationGetRelid(pkrel);
+ newcon->refindid = indexOid;
+ newcon->conid = parentConstr;
newcon->qual = (Node *) fkconstraint;
- childtab->constraints = lappend(childtab->constraints, newcon);
-
- table_close(partition, lockmode);
+ tab->constraints = lappend(tab->constraints, newcon);
}
}
+ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc pd = RelationGetPartitionDesc(rel);
- /*
- * Close pk table, but keep lock until we've committed.
- */
- table_close(pkrel, NoLock);
+ /*
+ * Recurse to take appropriate action on each partition; either we
+ * find an existing constraint to reparent to ours, or we create a new
+ * one.
+ */
+ for (int i = 0; i < pd->nparts; i++)
+ {
+ Oid partitionId = pd->oids[i];
+ Relation partition = table_open(partitionId, lockmode);
+ List *partFKs;
+ AttrNumber *attmap;
+ AttrNumber mapped_fkattnum[INDEX_MAX_KEYS];
+ bool attached;
+ char *conname;
+ Oid constrOid;
+ ObjectAddress address,
+ referenced;
+ ListCell *cell;
- return address;
+ CheckTableNotInUse(partition, "ALTER TABLE");
+
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
+ RelationGetDescr(rel),
+ gettext_noop("could not convert row type"));
+ for (int j = 0; j < numfks; j++)
+ mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
+
+ /* Check whether an existing constraint can be repurposed */
+ partFKs = copyObject(RelationGetFKeyList(partition));
+ attached = false;
+ foreach(cell, partFKs)
+ {
+ ForeignKeyCacheInfo *fk;
+
+ fk = lfirst_node(ForeignKeyCacheInfo, cell);
+ if (tryAttachPartitionForeignKey(fk,
+ partitionId,
+ parentConstr,
+ numfks,
+ mapped_fkattnum,
+ pkattnum,
+ pfeqoperators))
+ {
+ attached = true;
+ break;
+ }
+ }
+ if (attached)
+ {
+ table_close(partition, NoLock);
+ continue;
+ }
+
+ /*
+ * No luck finding a good constraint to reuse; create our own.
+ */
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(partition),
+ fkconstraint->conname))
+ conname = ChooseConstraintName(RelationGetRelationName(partition),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(partition), NIL);
+ else
+ conname = fkconstraint->conname;
+ constrOid =
+ CreateConstraintEntry(conname,
+ RelationGetNamespace(partition),
+ CONSTRAINT_FOREIGN,
+ fkconstraint->deferrable,
+ fkconstraint->initdeferred,
+ fkconstraint->initially_valid,
+ parentConstr,
+ partitionId,
+ mapped_fkattnum,
+ numfks,
+ numfks,
+ InvalidOid,
+ indexOid,
+ RelationGetRelid(pkrel),
+ pkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ numfks,
+ fkconstraint->fk_upd_action,
+ fkconstraint->fk_del_action,
+ fkconstraint->fk_matchtype,
+ NULL,
+ NULL,
+ NULL,
+ false,
+ 1,
+ false,
+ false);
+
+ /*
+ * Give this constraint partition-type dependencies on the parent
+ * constraint as well as the table.
+ */
+ ObjectAddressSet(address, ConstraintRelationId, constrOid);
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+ ObjectAddressSet(referenced, RelationRelationId, partitionId);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+
+ /* Make all this visible before recursing */
+ CommandCounterIncrement();
+
+ /* call ourselves to finalize the creation and we're done */
+ addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel,
+ indexOid,
+ constrOid,
+ numfks,
+ pkattnum,
+ mapped_fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok,
+ lockmode);
+
+ table_close(partition, NoLock);
+ }
+ }
}
/*
@@ -7965,77 +8306,219 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
- * relationId is a partition of parentId, so we can be certain that it has the
- * same columns with the same datatypes. The columns may be in different
+ * partitionRel is a partition of parentRel, so we can be certain that it has
+ * the same columns with the same datatypes. The columns may be in different
* order, though.
*
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created, for the purposes of validating the constraint in ALTER TABLE's
- * Phase 3.
+ * wqueue must be passed to set up phase 3 constraint checking, unless the
+ * referencing-side partition is known to be empty (such as in CREATE TABLE /
+ * PARTITION OF).
*/
static void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+ Relation partitionRel)
+{
+ /* This only works for declarative partitioning */
+ Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+ /*
+ * Clone constraints for which the parent is on the referenced side.
+ */
+ CloneFkReferenced(parentRel, partitionRel);
+
+ /*
+ * Now clone constraints where the parent is on the referencing side.
+ */
+ CloneFkReferencing(wqueue, parentRel, partitionRel);
+}
+
+/*
+ * CloneFkReferenced
+ * Subroutine for CloneForeignKeyConstraints
+ *
+ * Find all the FKs that have the parent relation on the referenced side;
+ * clone those constraints to the given partition. This is to be called
+ * when the partition is being created or attached.
+ *
+ * This recurses to partitions, if the relation being attached is partitioned.
+ * Recursion is done by calling addFkRecurseReferenced.
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel)
{
Relation pg_constraint;
- Relation parentRel;
- Relation rel;
- ScanKeyData key;
+ AttrNumber *attmap;
+ ListCell *cell;
SysScanDesc scan;
+ ScanKeyData key[2];
HeapTuple tuple;
List *clone = NIL;
- parentRel = table_open(parentId, NoLock); /* already got lock */
- /* see ATAddForeignKeyConstraint about lock level */
- rel = table_open(relationId, AccessExclusiveLock);
+ /*
+ * Search for any constraints where this partition is in the referenced
+ * side. However, we must ignore any constraint whose parent constraint
+ * is also going to be cloned, to avoid duplicates. So do it in two
+ * steps: first construct the list of constraints to clone, then go over
+ * that list cloning those whose parents are not in the list. (We must
+ * not rely on the parent being seen first, since the catalog scan could
+ * return children first.)
+ */
pg_constraint = table_open(ConstraintRelationId, RowShareLock);
-
- /* Obtain the list of constraints to clone or attach */
- ScanKeyInit(&key,
- Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
- F_OIDEQ, ObjectIdGetDatum(parentId));
- scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
- NULL, 1, &key);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+ ScanKeyInit(&key[1],
+ Anum_pg_constraint_contype, BTEqualStrategyNumber,
+ F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+ /* This is a seqscan, as we don't have a usable index ... */
+ scan = systable_beginscan(pg_constraint, InvalidOid, true,
+ NULL, 2, key);
while ((tuple = systable_getnext(scan)) != NULL)
{
- Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
- clone = lappend_oid(clone, oid);
+ /* Only try to clone the top-level constraint; skip child ones. */
+ if (constrForm->conparentid != InvalidOid)
+ continue;
+
+ clone = lappend_oid(clone, constrForm->oid);
}
systable_endscan(scan);
+ table_close(pg_constraint, RowShareLock);
- /* Do the actual work, recursing to partitions as needed */
- CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+ RelationGetDescr(parentRel),
+ gettext_noop("could not convert row type"));
+ foreach(cell, clone)
+ {
+ Oid constrOid = lfirst_oid(cell);
+ Form_pg_constraint constrForm;
+ Relation fkRel;
+ Oid indexOid;
+ Oid partIndexId;
+ int numfks;
+ AttrNumber conkey[INDEX_MAX_KEYS];
+ AttrNumber mapped_confkey[INDEX_MAX_KEYS];
+ AttrNumber confkey[INDEX_MAX_KEYS];
+ Oid conpfeqop[INDEX_MAX_KEYS];
+ Oid conppeqop[INDEX_MAX_KEYS];
+ Oid conffeqop[INDEX_MAX_KEYS];
+ Constraint *fkconstraint;
- /* We're done. Clean up */
- table_close(parentRel, NoLock);
- table_close(rel, NoLock); /* keep lock till commit */
- table_close(pg_constraint, RowShareLock);
+ tuple = SearchSysCache1(CONSTROID, constrOid);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /*
+ * Because we're only expanding the key space at the referenced side,
+ * we don't need to prevent any operation in the referencing table, so
+ * AccessShareLock suffices (assumes that dropping the constraint
+ * acquires AEL).
+ */
+ fkRel = table_open(constrForm->conrelid, AccessShareLock);
+
+ indexOid = constrForm->conindid;
+ DeconstructFkConstraintRow(tuple,
+ &numfks,
+ conkey,
+ confkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop);
+ for (int i = 0; i < numfks; i++)
+ mapped_confkey[i] = attmap[confkey[i] - 1];
+
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->conname = NameStr(constrForm->conname);
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+ fkconstraint->initially_valid = true;
+ fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+ /* set up colnames that are used to generate the constraint name */
+ for (int i = 0; i < numfks; i++)
+ {
+ Form_pg_attribute att;
+
+ att = TupleDescAttr(RelationGetDescr(fkRel),
+ conkey[i] - 1);
+ fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+ makeString(NameStr(att->attname)));
+ }
+
+ /*
+ * Add the new foreign key constraint pointing to the new partition.
+ * Because this new partition appears in the referenced side of the
+ * constraint, we don't need to set up for Phase 3 check.
+ */
+ partIndexId = index_get_partition(partitionRel, indexOid);
+ if (!OidIsValid(partIndexId))
+ elog(ERROR, "index for %u not found in partition %s",
+ indexOid, RelationGetRelationName(partitionRel));
+ addFkRecurseReferenced(NULL,
+ fkconstraint,
+ fkRel,
+ partitionRel,
+ partIndexId,
+ constrOid,
+ numfks,
+ mapped_confkey,
+ conkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ true);
+
+ table_close(fkRel, NoLock);
+ ReleaseSysCache(tuple);
+ }
}
/*
* CloneFkReferencing
- * Recursive subroutine for CloneForeignKeyConstraints, referencing side
- *
- * Clone the given list of FK constraints when a partition is attached on the
- * referencing side of those constraints.
+ * Subroutine for CloneForeignKeyConstraints
*
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them. We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
+ * For each FK constraint of the parent relation in the given list, find an
+ * equivalent constraint in its partition relation that can be reparented;
+ * if one cannot be found, create a new constraint in the partition as its
+ * child.
*
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
+ * If wqueue is given, it is used to set up phase-3 verification for each
+ * cloned constraint; if omitted, we assume that such verification is not
+ * needed (example: the partition is being created anew).
*/
static void
-CloneFkReferencing(Relation pg_constraint, Relation parentRel,
- Relation partRel, List *clone, List **cloned)
+CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
{
AttrNumber *attmap;
List *partFKs;
- List *subclone = NIL;
+ List *clone = NIL;
ListCell *cell;
+ /* obtain a list of constraints that we need to clone */
+ foreach(cell, RelationGetFKeyList(parentRel))
+ {
+ ForeignKeyCacheInfo *fk = lfirst(cell);
+
+ clone = lappend_oid(clone, fk->conoid);
+ }
+
+ /*
+ * Silently do nothing if there's nothing to do. In particular, this
+ * avoids throwing a spurious error for foreign tables.
+ */
+ if (clone == NIL)
+ return;
+
+ if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("foreign keys constraints are not supported on foreign tables")));
+
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
@@ -8050,6 +8533,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
{
Oid parentConstrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
+ Relation pkrel;
HeapTuple tuple;
int numfks;
AttrNumber conkey[INDEX_MAX_KEYS];
@@ -8059,13 +8543,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
- bool attach_it;
+ bool attached;
+ Oid indexOid;
Oid constrOid;
- ObjectAddress parentAddr,
- childAddr,
- childTableAddr;
+ ObjectAddress address,
+ referenced;
ListCell *cell;
- int i;
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
@@ -8073,161 +8556,92 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
- /* only foreign keys */
- if (constrForm->contype != CONSTRAINT_FOREIGN)
+ /* Don't clone constraints whose parents are being cloned */
+ if (list_member_oid(clone, constrForm->conparentid))
{
ReleaseSysCache(tuple);
continue;
}
- ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+ /*
+ * Need to prevent concurrent deletions. If pkrel is a partitioned
+ * relation, that means to lock all partitions.
+ */
+ pkrel = table_open(constrForm->confrelid, ShareRowExclusiveLock);
+ if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ (void) find_all_inheritors(RelationGetRelid(pkrel),
+ ShareRowExclusiveLock, NULL);
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
- for (i = 0; i < numfks; i++)
+ for (int i = 0; i < numfks; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
- * fit for the purpose. If one is, attach the parent constraint to it,
- * and don't clone anything. This way we avoid the expensive
- * verification step and don't end up with a duplicate FK. This also
- * means we don't consider this constraint when recursing to
- * partitions.
+ * fit for the purpose. If one is, attach the parent constraint to
+ * it, and don't clone anything. This way we avoid the expensive
+ * verification step and don't end up with a duplicate FK, and we
+ * don't need to recurse to partitions for this constraint.
*/
- attach_it = false;
+ attached = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
- Form_pg_constraint partConstr;
- HeapTuple partcontup;
- Relation trigrel;
- HeapTuple trigtup;
- SysScanDesc scan;
- ScanKeyData key;
-
- attach_it = true;
-
- /*
- * Do some quick & easy initial checks. If any of these fail, we
- * cannot use this constraint, but keep looking.
- */
- if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
- {
- attach_it = false;
- continue;
- }
- for (i = 0; i < numfks; i++)
- {
- if (fk->conkey[i] != mapped_conkey[i] ||
- fk->confkey[i] != confkey[i] ||
- fk->conpfeqop[i] != conpfeqop[i])
- {
- attach_it = false;
- break;
- }
- }
- if (!attach_it)
- continue;
- /*
- * Looks good so far; do some more extensive checks. Presumably
- * the check for 'convalidated' could be dropped, since we don't
- * really care about that, but let's be careful for now.
- */
- partcontup = SearchSysCache1(CONSTROID,
- ObjectIdGetDatum(fk->conoid));
- if (!partcontup)
- elog(ERROR, "cache lookup failed for constraint %u",
- fk->conoid);
- partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
- if (OidIsValid(partConstr->conparentid) ||
- !partConstr->convalidated ||
- partConstr->condeferrable != constrForm->condeferrable ||
- partConstr->condeferred != constrForm->condeferred ||
- partConstr->confupdtype != constrForm->confupdtype ||
- partConstr->confdeltype != constrForm->confdeltype ||
- partConstr->confmatchtype != constrForm->confmatchtype)
+ if (tryAttachPartitionForeignKey(fk,
+ RelationGetRelid(partRel),
+ parentConstrOid,
+ numfks,
+ mapped_conkey,
+ confkey,
+ conpfeqop))
{
- ReleaseSysCache(partcontup);
- attach_it = false;
- continue;
- }
-
- ReleaseSysCache(partcontup);
-
- /*
- * Looks good! Attach this constraint. The action triggers in
- * the new partition become redundant -- the parent table already
- * has equivalent ones, and those will be able to reach the
- * partition. Remove the ones in the partition. We identify them
- * because they have our constraint OID, as well as being on the
- * referenced rel.
- */
- trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
- ScanKeyInit(&key,
- Anum_pg_trigger_tgconstraint,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(fk->conoid));
-
- scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
- NULL, 1, &key);
- while ((trigtup = systable_getnext(scan)) != NULL)
- {
- Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
- ObjectAddress trigger;
-
- if (trgform->tgconstrrelid != fk->conrelid)
- continue;
- if (trgform->tgrelid != fk->confrelid)
- continue;
-
- /*
- * The constraint is originally set up to contain this trigger
- * as an implementation object, so there's a dependency record
- * that links the two; however, since the trigger is no longer
- * needed, we remove the dependency link in order to be able
- * to drop the trigger while keeping the constraint intact.
- */
- deleteDependencyRecordsFor(TriggerRelationId,
- trgform->oid,
- false);
- /* make dependency deletion visible to performDeletion */
- CommandCounterIncrement();
- ObjectAddressSet(trigger, TriggerRelationId,
- trgform->oid);
- performDeletion(&trigger, DROP_RESTRICT, 0);
- /* make trigger drop visible, in case the loop iterates */
- CommandCounterIncrement();
+ attached = true;
+ table_close(pkrel, NoLock);
+ break;
}
-
- systable_endscan(scan);
- table_close(trigrel, RowExclusiveLock);
-
- ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
- RelationGetRelid(partRel));
- CommandCounterIncrement();
- attach_it = true;
- break;
}
-
- /*
- * If we attached to an existing constraint, there is no need to
- * create a new one. In fact, there's no need to recurse for this
- * constraint to partitions, either.
- */
- if (attach_it)
+ if (attached)
{
ReleaseSysCache(tuple);
continue;
}
+ /* No dice. Set up to create our own constraint */
+ fkconstraint = makeNode(Constraint);
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(partRel),
+ NameStr(constrForm->conname)))
+ fkconstraint->conname =
+ ChooseConstraintName(RelationGetRelationName(partRel),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(partRel), NIL);
+ else
+ fkconstraint->conname = NameStr(constrForm->conname);
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+ fkconstraint->fk_matchtype = constrForm->confmatchtype;
+ for (int i = 0; i < numfks; i++)
+ {
+ Form_pg_attribute att;
+
+ att = TupleDescAttr(RelationGetDescr(partRel),
+ mapped_conkey[i] - 1);
+ fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+ makeString(NameStr(att->attname)));
+ }
+
+ indexOid = constrForm->conindid;
constrOid =
- CreateConstraintEntry(NameStr(constrForm->conname),
+ CreateConstraintEntry(fkconstraint->conname,
constrForm->connamespace,
CONSTRAINT_FOREIGN,
- constrForm->condeferrable,
- constrForm->condeferred,
+ fkconstraint->deferrable,
+ fkconstraint->initdeferred,
constrForm->convalidated,
parentConstrOid,
RelationGetRelid(partRel),
@@ -8235,92 +8649,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
numfks,
numfks,
InvalidOid, /* not a domain constraint */
- constrForm->conindid, /* same index */
+ indexOid,
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
numfks,
- constrForm->confupdtype,
- constrForm->confdeltype,
- constrForm->confmatchtype,
+ fkconstraint->fk_upd_action,
+ fkconstraint->fk_del_action,
+ fkconstraint->fk_matchtype,
NULL,
NULL,
NULL,
- false,
- 1, false, true);
- subclone = lappend_oid(subclone, constrOid);
+ false, /* islocal */
+ 1, /* inhcount */
+ false, /* conNoInherit */
+ true);
/* Set up partition dependencies for the new constraint */
- ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
- recordDependencyOn(&childAddr, &parentAddr,
- DEPENDENCY_PARTITION_PRI);
- ObjectAddressSet(childTableAddr, RelationRelationId,
+ ObjectAddressSet(address, ConstraintRelationId, constrOid);
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+ ObjectAddressSet(referenced, RelationRelationId,
RelationGetRelid(partRel));
- recordDependencyOn(&childAddr, &childTableAddr,
- DEPENDENCY_PARTITION_SEC);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
- fkconstraint = makeNode(Constraint);
- /* for now this is all we need */
- fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
- fkconstraint->fk_upd_action = constrForm->confupdtype;
- fkconstraint->fk_del_action = constrForm->confdeltype;
- fkconstraint->deferrable = constrForm->condeferrable;
- fkconstraint->initdeferred = constrForm->condeferred;
+ /* Done with the cloned constraint's tuple */
+ ReleaseSysCache(tuple);
- createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
- constrOid, constrForm->conindid, false);
+ /* Make all this visible before recursing */
+ CommandCounterIncrement();
- if (cloned)
- {
- ClonedConstraint *newc;
+ addFkRecurseReferencing(wqueue,
+ fkconstraint,
+ partRel,
+ pkrel,
+ indexOid,
+ constrOid,
+ numfks,
+ confkey,
+ mapped_conkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ false, /* no old check exists */
+ AccessExclusiveLock);
+ table_close(pkrel, NoLock);
+ }
+}
- /*
- * Feed back caller about the constraints we created, so that they
- * can set up constraint verification.
- */
- newc = palloc(sizeof(ClonedConstraint));
- newc->relid = RelationGetRelid(partRel);
- newc->refrelid = constrForm->confrelid;
- newc->conindid = constrForm->conindid;
- newc->conid = constrOid;
- newc->constraint = fkconstraint;
+/*
+ * When the parent of a partition receives [the referencing side of] a foreign
+ * key, we must propagate that foreign key to the partition. However, the
+ * partition might already have an equivalent foreign key; this routine
+ * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined
+ * by the other parameters. If they are equivalent, create the link between
+ * the two constraints and return true.
+ *
+ * If the given FK does not match the one defined by rest of the params,
+ * return false.
+ */
+static bool
+tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+ Oid partRelid,
+ Oid parentConstrOid,
+ int numfks,
+ AttrNumber *mapped_conkey,
+ AttrNumber *confkey,
+ Oid *conpfeqop)
+{
+ HeapTuple parentConstrTup;
+ Form_pg_constraint parentConstr;
+ HeapTuple partcontup;
+ Form_pg_constraint partConstr;
+ Relation trigrel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ HeapTuple trigtup;
+
+ parentConstrTup = SearchSysCache1(CONSTROID,
+ ObjectIdGetDatum(parentConstrOid));
+ if (!parentConstrTup)
+ elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
+ parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
- *cloned = lappend(*cloned, newc);
+ /*
+ * Do some quick & easy initial checks. If any of these fail, we cannot
+ * use this constraint.
+ */
+ if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
+ {
+ ReleaseSysCache(parentConstrTup);
+ return false;
+ }
+ for (int i = 0; i < numfks; i++)
+ {
+ if (fk->conkey[i] != mapped_conkey[i] ||
+ fk->confkey[i] != confkey[i] ||
+ fk->conpfeqop[i] != conpfeqop[i])
+ {
+ ReleaseSysCache(parentConstrTup);
+ return false;
}
+ }
- ReleaseSysCache(tuple);
+ /*
+ * Looks good so far; do some more extensive checks. Presumably the check
+ * for 'convalidated' could be dropped, since we don't really care about
+ * that, but let's be careful for now.
+ */
+ partcontup = SearchSysCache1(CONSTROID,
+ ObjectIdGetDatum(fk->conoid));
+ if (!partcontup)
+ elog(ERROR, "cache lookup failed for constraint %u",
+ fk->conoid);
+ partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+ if (OidIsValid(partConstr->conparentid) ||
+ !partConstr->convalidated ||
+ partConstr->condeferrable != parentConstr->condeferrable ||
+ partConstr->condeferred != parentConstr->condeferred ||
+ partConstr->confupdtype != parentConstr->confupdtype ||
+ partConstr->confdeltype != parentConstr->confdeltype ||
+ partConstr->confmatchtype != parentConstr->confmatchtype)
+ {
+ ReleaseSysCache(parentConstrTup);
+ ReleaseSysCache(partcontup);
+ return false;
}
- pfree(attmap);
- list_free_deep(partFKs);
+ ReleaseSysCache(partcontup);
+ ReleaseSysCache(parentConstrTup);
/*
- * If the partition is partitioned, recurse to handle any constraints that
- * were cloned.
+ * Looks good! Attach this constraint. The action triggers in the new
+ * partition become redundant -- the parent table already has equivalent
+ * ones, and those will be able to reach the partition. Remove the ones
+ * in the partition. We identify them because they have our constraint
+ * OID, as well as being on the referenced rel.
*/
- if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
- subclone != NIL)
+ trigrel = table_open(TriggerRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_trigger_tgconstraint,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(fk->conoid));
+
+ scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
+ NULL, 1, &key);
+ while ((trigtup = systable_getnext(scan)) != NULL)
{
- PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
- int i;
+ Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
+ ObjectAddress trigger;
- for (i = 0; i < partdesc->nparts; i++)
- {
- Relation childRel;
+ if (trgform->tgconstrrelid != fk->conrelid)
+ continue;
+ if (trgform->tgrelid != fk->confrelid)
+ continue;
- childRel = table_open(partdesc->oids[i], AccessExclusiveLock);
- CloneFkReferencing(pg_constraint,
- partRel,
- childRel,
- subclone,
- cloned);
- table_close(childRel, NoLock); /* keep lock till commit */
- }
+ /*
+ * The constraint is originally set up to contain this trigger as an
+ * implementation object, so there's a dependency record that links
+ * the two; however, since the trigger is no longer needed, we remove
+ * the dependency link in order to be able to drop the trigger while
+ * keeping the constraint intact.
+ */
+ deleteDependencyRecordsFor(TriggerRelationId,
+ trgform->oid,
+ false);
+ /* make dependency deletion visible to performDeletion */
+ CommandCounterIncrement();
+ ObjectAddressSet(trigger, TriggerRelationId,
+ trgform->oid);
+ performDeletion(&trigger, DROP_RESTRICT, 0);
+ /* make trigger drop visible, in case the loop iterates */
+ CommandCounterIncrement();
}
+
+ systable_endscan(scan);
+ table_close(trigrel, RowExclusiveLock);
+
+ ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
+ CommandCounterIncrement();
+ return true;
}
+
/*
* ALTER TABLE ALTER CONSTRAINT
*
@@ -8440,8 +8953,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
/*
* Update deferrability of RI_FKey_noaction_del,
* RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd
- * triggers, but not others; see createForeignKeyTriggers and
- * CreateFKCheckTrigger.
+ * triggers, but not others; see createForeignKeyActionTriggers
+ * and CreateFKCheckTrigger.
*/
if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL &&
tgform->tgfoid != F_RI_FKEY_NOACTION_UPD &&
@@ -9349,37 +9862,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
}
/*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
- Oid constraintOid, Oid indexOid, bool create_action)
-{
- /*
- * For the referenced side, create action triggers, if requested. (If the
- * referencing side is partitioned, there is still only one trigger, which
- * runs on the referenced side and points to the top of the referencing
- * hierarchy.)
- */
- if (create_action)
- createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
- indexOid);
-
- /*
- * For the referencing side, create the check triggers. We only need
- * these on the partitions.
- */
- if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
- createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
- fkconstraint, constraintOid, indexOid);
-
- CommandCounterIncrement();
-}
-
-/*
* ALTER TABLE DROP CONSTRAINT
*
* Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
@@ -14778,8 +15260,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
Oid defaultPartOid;
List *partBoundConstraint;
- List *cloned;
- ListCell *l;
/*
* We must lock the default partition if one exists, because attaching a
@@ -14960,33 +15440,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
CloneRowTriggersToPartition(rel, attachrel);
/*
- * Clone foreign key constraints, and setup for Phase 3 to verify them.
+ * Clone foreign key constraints. Callee is responsible for setting up
+ * for phase 3 constraint verification.
*/
- cloned = NIL;
- CloneForeignKeyConstraints(RelationGetRelid(rel),
- RelationGetRelid(attachrel), &cloned);
- foreach(l, cloned)
- {
- ClonedConstraint *clonedcon = lfirst(l);
- NewConstraint *newcon;
- Relation clonedrel;
- AlteredTableInfo *parttab;
-
- clonedrel = relation_open(clonedcon->relid, NoLock);
- parttab = ATGetQueueEntry(wqueue, clonedrel);
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = clonedcon->constraint->conname;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = clonedcon->refrelid;
- newcon->refindid = clonedcon->conindid;
- newcon->conid = clonedcon->conid;
- newcon->qual = (Node *) clonedcon->constraint;
-
- parttab->constraints = lappend(parttab->constraints, newcon);
-
- relation_close(clonedrel, NoLock);
- }
+ CloneForeignKeyConstraints(wqueue, rel, attachrel);
/*
* Generate partition constraint from the partition bound specification.
@@ -15177,6 +15634,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
RelationGetRelid(attachrel));
update_relispartition(NULL, cldIdxId, true);
found = true;
+
+ CommandCounterIncrement();
break;
}
}
@@ -15368,6 +15827,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
partRel = table_openrv(name, ShareUpdateExclusiveLock);
+ /* Ensure that foreign keys still hold after this detach */
+ ATDetachCheckNoForeignKeyRefs(partRel);
+
/* All inheritance related checks are performed within the function */
RemoveInheritance(partRel, rel);
@@ -15487,6 +15949,28 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
list_free_deep(fks);
/*
+ * Any sub-constrains that are in the referenced-side of a larger
+ * constraint have to be removed. This partition is no longer part of the
+ * key space of the constraint.
+ */
+ foreach(cell, GetParentedForeignKeyRefs(partRel))
+ {
+ Oid constrOid = lfirst_oid(cell);
+ ObjectAddress constraint;
+
+ ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+ deleteDependencyRecordsForClass(ConstraintRelationId,
+ constrOid,
+ ConstraintRelationId,
+ DEPENDENCY_INTERNAL);
+ CommandCounterIncrement();
+
+ ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
+ performDeletion(&constraint, DROP_RESTRICT, 0);
+ }
+ CommandCounterIncrement();
+
+ /*
* Invalidate the parent's relcache so that the partition is no longer
* included in its partition descriptor.
*/
@@ -15866,3 +16350,107 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
if (opened)
table_close(classRel, RowExclusiveLock);
}
+
+/*
+ * Return an OID list of constraints that reference the given relation
+ * that are marked as having a parent constraints.
+ */
+static List *
+GetParentedForeignKeyRefs(Relation partition)
+{
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData key[2];
+ List *constraints = NIL;
+
+ /*
+ * If no indexes, or no columns are referenceable by FKs, we can avoid the
+ * scan.
+ */
+ if (RelationGetIndexList(partition) == NIL ||
+ bms_is_empty(RelationGetIndexAttrBitmap(partition,
+ INDEX_ATTR_BITMAP_KEY)))
+ return NIL;
+
+ /* Search for constraints referencing this table */
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+ ScanKeyInit(&key[1],
+ Anum_pg_constraint_contype, BTEqualStrategyNumber,
+ F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+ /* XXX This is a seqscan, as we don't have a usable index */
+ scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key);
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /*
+ * We only need to process constraints that are part of larger ones.
+ */
+ if (!OidIsValid(constrForm->conparentid))
+ continue;
+
+ constraints = lappend_oid(constraints, constrForm->oid);
+ }
+
+ systable_endscan(scan);
+ table_close(pg_constraint, AccessShareLock);
+
+ return constraints;
+}
+
+/*
+ * During DETACH PARTITION, verify that any foreign keys pointing to the
+ * partitioned table would not become invalid. An error is raised if any
+ * referenced values exist.
+ */
+static void
+ATDetachCheckNoForeignKeyRefs(Relation partition)
+{
+ List *constraints;
+ ListCell *cell;
+
+ constraints = GetParentedForeignKeyRefs(partition);
+
+ foreach(cell, constraints)
+ {
+ Oid constrOid = lfirst_oid(cell);
+ HeapTuple tuple;
+ Form_pg_constraint constrForm;
+ Relation rel;
+ Trigger trig;
+
+ tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ Assert(OidIsValid(constrForm->conparentid));
+ Assert(constrForm->confrelid == RelationGetRelid(partition));
+
+ /* prevent data changes into the referencing table until commit */
+ rel = table_open(constrForm->conrelid, ShareLock);
+
+ MemSet(&trig, 0, sizeof(trig));
+ trig.tgoid = InvalidOid;
+ trig.tgname = NameStr(constrForm->conname);
+ trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+ trig.tgisinternal = true;
+ trig.tgconstrrelid = RelationGetRelid(partition);
+ trig.tgconstrindid = constrForm->conindid;
+ trig.tgconstraint = constrForm->oid;
+ trig.tgdeferrable = false;
+ trig.tginitdeferred = false;
+ /* we needn't fill in remaining fields */
+
+ RI_PartitionRemove_Check(&trig, rel, partition);
+
+ ReleaseSysCache(tuple);
+
+ table_close(rel, NoLock);
+ }
+}