diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 1342 |
1 files changed, 965 insertions, 377 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 654179297cf..978b6bec44a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -415,10 +415,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); -static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, - List **cloned); -static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned); +static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok); +static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok, LOCKMODE lockmode); +static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel, + Relation partitionRel); +static void CloneFkReferenced(Relation parentRel, Relation partitionRel); +static void CloneFkReferencing(List **wqueue, Relation parentRel, + Relation partRel); +static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, int numfks, + AttrNumber *mapped_conkey, AttrNumber *confkey, + Oid *conpfeqop); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -501,6 +523,8 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); static void update_relispartition(Relation classRel, Oid relationId, bool newval); +static List *GetParentedForeignKeyRefs(Relation partition); +static void ATDetachCheckNoForeignKeyRefs(Relation partition); /* ---------------------------------------------------------------- @@ -1083,7 +1107,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, * And foreign keys too. Note that because we're freshly creating the * table, there is no need to verify these new constraints. */ - CloneForeignKeyConstraints(parentId, relationId, NULL); + CloneForeignKeyConstraints(NULL, parent, rel); table_close(parent, NoLock); } @@ -3563,7 +3587,8 @@ AlterTableGetLockLevel(List *cmds) /* * Removing constraints can affect SELECTs that have been - * optimised assuming the constraint holds true. + * optimised assuming the constraint holds true. See also + * CloneFkReferenced. */ case AT_DropConstraint: /* as DROP INDEX */ case AT_DropNotNull: /* may change some SQL plans */ @@ -7224,9 +7249,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, case CONSTR_FOREIGN: /* - * Note that we currently never recurse for FK constraints, so the - * "recurse" flag is silently ignored. - * * Assign or validate constraint name */ if (newConstraint->conname) @@ -7444,6 +7466,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity checks for it. * We do permissions checks here, however. + * + * When the referenced or referencing tables (or both) are partitioned, + * multiple pg_constraint rows are required -- one for each partitioned table + * and each partition on each side (fortunately, not one for every combination + * thereof). We also need action triggers on each leaf partition on the + * referenced side, and check triggers on each leaf partition on the + * referencing side. */ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, @@ -7459,12 +7488,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Oid pfeqoperators[INDEX_MAX_KEYS]; Oid ppeqoperators[INDEX_MAX_KEYS]; Oid ffeqoperators[INDEX_MAX_KEYS]; - bool connoinherit; int i; int numfks, numpks; Oid indexOid; - Oid constrOid; bool old_check_ok; ObjectAddress address; ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop); @@ -7482,12 +7509,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot reference partitioned table \"%s\"", - RelationGetRelationName(pkrel)))); - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { if (!recurse) @@ -7505,7 +7526,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errdetail("This feature is not yet supported on partitioned tables."))); } - if (pkrel->rd_rel->relkind != RELKIND_RELATION) + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("referenced relation \"%s\" is not a table", @@ -7741,8 +7763,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("foreign key constraint \"%s\" " - "cannot be implemented", + errmsg("foreign key constraint \"%s\" cannot be implemented", fkconstraint->conname), errdetail("Key columns \"%s\" and \"%s\" " "are of incompatible types: %s and %s.", @@ -7830,15 +7851,126 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* - * FKs always inherit for partitioned tables, and never for legacy - * inheritance. + * Create all the constraint and trigger objects, recursing to partitions + * as necessary. First handle the referenced side. + */ + address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel, + indexOid, + InvalidOid, /* no parent constraint */ + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok); + + /* Now handle the referencing side. */ + addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel, + indexOid, + address.objectId, + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + lockmode); + + /* + * Done. Close pk table, but keep lock until we've committed. + */ + table_close(pkrel, NoLock); + + return address; +} + +/* + * addFkRecurseReferenced + * subroutine for ATAddForeignKeyConstraint; recurses on the referenced + * side of the constraint + * + * Create pg_constraint rows for the referenced side of the constraint, + * referencing the parent of the referencing side; also create action triggers + * on leaf partitions. If the table is partitioned, recurse to handle each + * partition. + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the root referencing relation. + * pkrel is the referenced relation; might be a partition, if recursing. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstr is the OID of a parent constraint; InvalidOid if this is a + * top-level constraint. + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + */ +static ObjectAddress +addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok) +{ + ObjectAddress address; + Oid constrOid; + char *conname; + bool conislocal; + int coninhcount; + bool connoinherit; + + /* + * Verify relkind for each referenced partition. At the top level, this + * is redundant with a previous check, but we need it when recursing. */ - connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)))); + + /* + * Caller supplies us with a constraint name; however, it may be used in + * this partition, so come up with a different one in that case. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(rel), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(rel), NIL); + else + conname = fkconstraint->conname; + + if (OidIsValid(parentConstr)) + { + conislocal = false; + coninhcount = 1; + connoinherit = false; + } + else + { + conislocal = true; + coninhcount = 0; + + /* + * always inherit for partitioned tables, never for legacy inheritance + */ + connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + } /* * Record the FK constraint in pg_constraint. */ - constrOid = CreateConstraintEntry(fkconstraint->conname, + constrOid = CreateConstraintEntry(conname, RelationGetNamespace(rel), CONSTRAINT_FOREIGN, fkconstraint->deferrable, @@ -7856,108 +7988,317 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, pfeqoperators, ppeqoperators, ffeqoperators, - numpks, + numfks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, NULL, /* no exclusion constraint */ NULL, /* no check constraint */ NULL, - true, /* islocal */ - 0, /* inhcount */ - connoinherit, /* conNoInherit */ + conislocal, /* islocal */ + coninhcount, /* inhcount */ + connoinherit, /* conNoInherit */ false); /* is_internal */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); /* - * Create the triggers that will enforce the constraint. We only want the - * action triggers to appear for the parent partitioned relation, even - * though the constraints also exist below. + * Mark the child constraint as part of the parent constraint; it must not + * be dropped on its own. (This constraint is deleted when the partition + * is detached, but a special check needs to occur that the partition + * contains no referenced values.) */ - createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid, !recursing); + if (OidIsValid(parentConstr)) + { + ObjectAddress referenced; + + ObjectAddressSet(referenced, ConstraintRelationId, parentConstr); + recordDependencyOn(&address, &referenced, DEPENDENCY_INTERNAL); + } + + /* make new constraint visible, in case we add more */ + CommandCounterIncrement(); /* - * Tell Phase 3 to check that the constraint is satisfied by existing - * rows. We can skip this during table creation, when requested explicitly - * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're - * recreating a constraint following a SET DATA TYPE operation that did - * not impugn its validity. + * If the referenced table is a plain relation, create the action triggers + * that enforce the constraint. */ - if (!old_check_ok && !fkconstraint->skip_validation) + if (pkrel->rd_rel->relkind == RELKIND_RELATION) { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->refindid = indexOid; - newcon->conid = constrOid; - newcon->qual = (Node *) fkconstraint; - - tab->constraints = lappend(tab->constraints, newcon); + createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel), + fkconstraint, + constrOid, indexOid); } /* - * When called on a partitioned table, recurse to create the constraint on - * the partitions also. + * If the referenced table is partitioned, recurse on ourselves to handle + * each partition. We need one pg_constraint row created for each + * partition in addition to the pg_constraint row for the parent table. */ - if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - PartitionDesc partdesc; - Relation pg_constraint; - List *cloned = NIL; - ListCell *cell; - - pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock); - partdesc = RelationGetPartitionDesc(rel); + PartitionDesc pd = RelationGetPartitionDesc(pkrel); - for (i = 0; i < partdesc->nparts; i++) + for (int i = 0; i < pd->nparts; i++) { - Oid partitionId = partdesc->oids[i]; - Relation partition = table_open(partitionId, lockmode); - - CheckTableNotInUse(partition, "ALTER TABLE"); + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_pkattnum; + Oid partIndexId; - CloneFkReferencing(pg_constraint, rel, partition, - list_make1_oid(constrOid), - &cloned); + partRel = table_open(pd->oids[i], ShareRowExclusiveLock); - table_close(partition, NoLock); + /* + * Map the attribute numbers in the referenced side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel), + gettext_noop("could not convert row type")); + if (map) + { + mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); + for (int j = 0; j < numfks; j++) + mapped_pkattnum[j] = map[pkattnum[j] - 1]; + } + else + mapped_pkattnum = pkattnum; + + /* do the deed */ + partIndexId = index_get_partition(partRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partRel)); + addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel, + partIndexId, constrOid, numfks, + mapped_pkattnum, fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok); + + /* Done -- clean up (but keep the lock) */ + table_close(partRel, NoLock); + if (map) + { + pfree(mapped_pkattnum); + pfree(map); + } } - table_close(pg_constraint, RowExclusiveLock); + } + + return address; +} + +/* + * addFkRecurseReferencing + * subroutine for ATAddForeignKeyConstraint and CloneFkReferencing + * + * If the referencing relation is a plain relation, create the necessary check + * triggers that implement the constraint, and set up for Phase 3 constraint + * verification. If the referencing relation is a partitioned table, then + * we create a pg_constraint row for it and recurse on this routine for each + * partition. + * + * We assume that the referenced relation is locked against concurrent + * deletions. If it's a partitioned relation, every partition must be so + * locked. + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the referencing relation; might be a partition, if recursing. + * pkrel is the root referenced relation. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstr is the OID of the parent constraint (there is always one). + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + * lockmode is the lockmode to acquire on partitions when recursing. + */ +static void +addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok, LOCKMODE lockmode) +{ + AssertArg(OidIsValid(parentConstr)); + + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + + /* + * If the referencing relation is a plain table, add the check triggers to + * it and, if necessary, schedule it to be checked in Phase 3. + * + * If the relation is partitioned, drill down to do it to its partitions. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + createForeignKeyCheckTriggers(RelationGetRelid(rel), + RelationGetRelid(pkrel), + fkconstraint, + parentConstr, + indexOid); - foreach(cell, cloned) + /* + * Tell Phase 3 to check that the constraint is satisfied by existing + * rows. We can skip this during table creation, when requested + * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command, + * and when we're recreating a constraint following a SET DATA TYPE + * operation that did not impugn its validity. + */ + if (wqueue && !old_check_ok && !fkconstraint->skip_validation) { - ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell); - Relation partition = table_open(cc->relid, lockmode); - AlteredTableInfo *childtab; NewConstraint *newcon; + AlteredTableInfo *tab; - /* Find or create work queue entry for this partition */ - childtab = ATGetQueueEntry(wqueue, partition); + tab = ATGetQueueEntry(wqueue, rel); newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = cc->constraint->conname; + newcon->name = get_constraint_name(parentConstr); newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = cc->refrelid; - newcon->refindid = cc->conindid; - newcon->conid = cc->conid; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->refindid = indexOid; + newcon->conid = parentConstr; newcon->qual = (Node *) fkconstraint; - childtab->constraints = lappend(childtab->constraints, newcon); - - table_close(partition, lockmode); + tab->constraints = lappend(tab->constraints, newcon); } } + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc pd = RelationGetPartitionDesc(rel); - /* - * Close pk table, but keep lock until we've committed. - */ - table_close(pkrel, NoLock); + /* + * Recurse to take appropriate action on each partition; either we + * find an existing constraint to reparent to ours, or we create a new + * one. + */ + for (int i = 0; i < pd->nparts; i++) + { + Oid partitionId = pd->oids[i]; + Relation partition = table_open(partitionId, lockmode); + List *partFKs; + AttrNumber *attmap; + AttrNumber mapped_fkattnum[INDEX_MAX_KEYS]; + bool attached; + char *conname; + Oid constrOid; + ObjectAddress address, + referenced; + ListCell *cell; - return address; + CheckTableNotInUse(partition, "ALTER TABLE"); + + attmap = convert_tuples_by_name_map(RelationGetDescr(partition), + RelationGetDescr(rel), + gettext_noop("could not convert row type")); + for (int j = 0; j < numfks; j++) + mapped_fkattnum[j] = attmap[fkattnum[j] - 1]; + + /* Check whether an existing constraint can be repurposed */ + partFKs = copyObject(RelationGetFKeyList(partition)); + attached = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk; + + fk = lfirst_node(ForeignKeyCacheInfo, cell); + if (tryAttachPartitionForeignKey(fk, + partitionId, + parentConstr, + numfks, + mapped_fkattnum, + pkattnum, + pfeqoperators)) + { + attached = true; + break; + } + } + if (attached) + { + table_close(partition, NoLock); + continue; + } + + /* + * No luck finding a good constraint to reuse; create our own. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(partition), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(partition), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(partition), NIL); + else + conname = fkconstraint->conname; + constrOid = + CreateConstraintEntry(conname, + RelationGetNamespace(partition), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + fkconstraint->initially_valid, + parentConstr, + partitionId, + mapped_fkattnum, + numfks, + numfks, + InvalidOid, + indexOid, + RelationGetRelid(pkrel), + pkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + numfks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, + NULL, + NULL, + NULL, + false, + 1, + false, + false); + + /* + * Give this constraint partition-type dependencies on the parent + * constraint as well as the table. + */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); + ObjectAddressSet(referenced, ConstraintRelationId, parentConstr); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, partitionId); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + + /* Make all this visible before recursing */ + CommandCounterIncrement(); + + /* call ourselves to finalize the creation and we're done */ + addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel, + indexOid, + constrOid, + numfks, + pkattnum, + mapped_fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + lockmode); + + table_close(partition, NoLock); + } + } } /* @@ -7965,77 +8306,219 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Clone foreign keys from a partitioned table to a newly acquired * partition. * - * relationId is a partition of parentId, so we can be certain that it has the - * same columns with the same datatypes. The columns may be in different + * partitionRel is a partition of parentRel, so we can be certain that it has + * the same columns with the same datatypes. The columns may be in different * order, though. * - * The *cloned list is appended ClonedConstraint elements describing what was - * created, for the purposes of validating the constraint in ALTER TABLE's - * Phase 3. + * wqueue must be passed to set up phase 3 constraint checking, unless the + * referencing-side partition is known to be empty (such as in CREATE TABLE / + * PARTITION OF). */ static void -CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +CloneForeignKeyConstraints(List **wqueue, Relation parentRel, + Relation partitionRel) +{ + /* This only works for declarative partitioning */ + Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + + /* + * Clone constraints for which the parent is on the referenced side. + */ + CloneFkReferenced(parentRel, partitionRel); + + /* + * Now clone constraints where the parent is on the referencing side. + */ + CloneFkReferencing(wqueue, parentRel, partitionRel); +} + +/* + * CloneFkReferenced + * Subroutine for CloneForeignKeyConstraints + * + * Find all the FKs that have the parent relation on the referenced side; + * clone those constraints to the given partition. This is to be called + * when the partition is being created or attached. + * + * This recurses to partitions, if the relation being attached is partitioned. + * Recursion is done by calling addFkRecurseReferenced. + */ +static void +CloneFkReferenced(Relation parentRel, Relation partitionRel) { Relation pg_constraint; - Relation parentRel; - Relation rel; - ScanKeyData key; + AttrNumber *attmap; + ListCell *cell; SysScanDesc scan; + ScanKeyData key[2]; HeapTuple tuple; List *clone = NIL; - parentRel = table_open(parentId, NoLock); /* already got lock */ - /* see ATAddForeignKeyConstraint about lock level */ - rel = table_open(relationId, AccessExclusiveLock); + /* + * Search for any constraints where this partition is in the referenced + * side. However, we must ignore any constraint whose parent constraint + * is also going to be cloned, to avoid duplicates. So do it in two + * steps: first construct the list of constraints to clone, then go over + * that list cloning those whose parents are not in the list. (We must + * not rely on the parent being seen first, since the catalog scan could + * return children first.) + */ pg_constraint = table_open(ConstraintRelationId, RowShareLock); - - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(parentId)); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + /* This is a seqscan, as we don't have a usable index ... */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); while ((tuple = systable_getnext(scan)) != NULL) { - Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - clone = lappend_oid(clone, oid); + /* Only try to clone the top-level constraint; skip child ones. */ + if (constrForm->conparentid != InvalidOid) + continue; + + clone = lappend_oid(clone, constrForm->oid); } systable_endscan(scan); + table_close(pg_constraint, RowShareLock); - /* Do the actual work, recursing to partitions as needed */ - CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + foreach(cell, clone) + { + Oid constrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + Relation fkRel; + Oid indexOid; + Oid partIndexId; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; - /* We're done. Clean up */ - table_close(parentRel, NoLock); - table_close(rel, NoLock); /* keep lock till commit */ - table_close(pg_constraint, RowShareLock); + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* + * Because we're only expanding the key space at the referenced side, + * we don't need to prevent any operation in the referencing table, so + * AccessShareLock suffices (assumes that dropping the constraint + * acquires AEL). + */ + fkRel = table_open(constrForm->conrelid, AccessShareLock); + + indexOid = constrForm->conindid; + DeconstructFkConstraintRow(tuple, + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap[confkey[i] - 1]; + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->initially_valid = true; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + + /* set up colnames that are used to generate the constraint name */ + for (int i = 0; i < numfks; i++) + { + Form_pg_attribute att; + + att = TupleDescAttr(RelationGetDescr(fkRel), + conkey[i] - 1); + fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs, + makeString(NameStr(att->attname))); + } + + /* + * Add the new foreign key constraint pointing to the new partition. + * Because this new partition appears in the referenced side of the + * constraint, we don't need to set up for Phase 3 check. + */ + partIndexId = index_get_partition(partitionRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partitionRel)); + addFkRecurseReferenced(NULL, + fkconstraint, + fkRel, + partitionRel, + partIndexId, + constrOid, + numfks, + mapped_confkey, + conkey, + conpfeqop, + conppeqop, + conffeqop, + true); + + table_close(fkRel, NoLock); + ReleaseSysCache(tuple); + } } /* * CloneFkReferencing - * Recursive subroutine for CloneForeignKeyConstraints, referencing side - * - * Clone the given list of FK constraints when a partition is attached on the - * referencing side of those constraints. + * Subroutine for CloneForeignKeyConstraints * - * When cloning foreign keys to a partition, it may happen that equivalent - * constraints already exist in the partition for some of them. We can skip - * creating a clone in that case, and instead just attach the existing - * constraint to the one in the parent. + * For each FK constraint of the parent relation in the given list, find an + * equivalent constraint in its partition relation that can be reparented; + * if one cannot be found, create a new constraint in the partition as its + * child. * - * This function recurses to partitions, if the new partition is partitioned; - * of course, only do this for FKs that were actually cloned. + * If wqueue is given, it is used to set up phase-3 verification for each + * cloned constraint; if omitted, we assume that such verification is not + * needed (example: the partition is being created anew). */ static void -CloneFkReferencing(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned) +CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) { AttrNumber *attmap; List *partFKs; - List *subclone = NIL; + List *clone = NIL; ListCell *cell; + /* obtain a list of constraints that we need to clone */ + foreach(cell, RelationGetFKeyList(parentRel)) + { + ForeignKeyCacheInfo *fk = lfirst(cell); + + clone = lappend_oid(clone, fk->conoid); + } + + /* + * Silently do nothing if there's nothing to do. In particular, this + * avoids throwing a spurious error for foreign tables. + */ + if (clone == NIL) + return; + + if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + /* * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. @@ -8050,6 +8533,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, { Oid parentConstrOid = lfirst_oid(cell); Form_pg_constraint constrForm; + Relation pkrel; HeapTuple tuple; int numfks; AttrNumber conkey[INDEX_MAX_KEYS]; @@ -8059,13 +8543,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, Oid conppeqop[INDEX_MAX_KEYS]; Oid conffeqop[INDEX_MAX_KEYS]; Constraint *fkconstraint; - bool attach_it; + bool attached; + Oid indexOid; Oid constrOid; - ObjectAddress parentAddr, - childAddr, - childTableAddr; + ObjectAddress address, + referenced; ListCell *cell; - int i; tuple = SearchSysCache1(CONSTROID, parentConstrOid); if (!tuple) @@ -8073,161 +8556,92 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, parentConstrOid); constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - /* only foreign keys */ - if (constrForm->contype != CONSTRAINT_FOREIGN) + /* Don't clone constraints whose parents are being cloned */ + if (list_member_oid(clone, constrForm->conparentid)) { ReleaseSysCache(tuple); continue; } - ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + /* + * Need to prevent concurrent deletions. If pkrel is a partitioned + * relation, that means to lock all partitions. + */ + pkrel = table_open(constrForm->confrelid, ShareRowExclusiveLock); + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + (void) find_all_inheritors(RelationGetRelid(pkrel), + ShareRowExclusiveLock, NULL); DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, conpfeqop, conppeqop, conffeqop); - for (i = 0; i < numfks; i++) + for (int i = 0; i < numfks; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; /* * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive - * verification step and don't end up with a duplicate FK. This also - * means we don't consider this constraint when recursing to - * partitions. + * fit for the purpose. If one is, attach the parent constraint to + * it, and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK, and we + * don't need to recurse to partitions for this constraint. */ - attach_it = false; + attached = false; foreach(cell, partFKs) { ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - Relation trigrel; - HeapTuple trigtup; - SysScanDesc scan; - ScanKeyData key; - - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) - { - attach_it = false; - continue; - } - for (i = 0; i < numfks; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) + if (tryAttachPartitionForeignKey(fk, + RelationGetRelid(partRel), + parentConstrOid, + numfks, + mapped_conkey, + confkey, + conpfeqop)) { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* - * Looks good! Attach this constraint. The action triggers in - * the new partition become redundant -- the parent table already - * has equivalent ones, and those will be able to reach the - * partition. Remove the ones in the partition. We identify them - * because they have our constraint OID, as well as being on the - * referenced rel. - */ - trigrel = heap_open(TriggerRelationId, RowExclusiveLock); - ScanKeyInit(&key, - Anum_pg_trigger_tgconstraint, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(fk->conoid)); - - scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, - NULL, 1, &key); - while ((trigtup = systable_getnext(scan)) != NULL) - { - Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); - ObjectAddress trigger; - - if (trgform->tgconstrrelid != fk->conrelid) - continue; - if (trgform->tgrelid != fk->confrelid) - continue; - - /* - * The constraint is originally set up to contain this trigger - * as an implementation object, so there's a dependency record - * that links the two; however, since the trigger is no longer - * needed, we remove the dependency link in order to be able - * to drop the trigger while keeping the constraint intact. - */ - deleteDependencyRecordsFor(TriggerRelationId, - trgform->oid, - false); - /* make dependency deletion visible to performDeletion */ - CommandCounterIncrement(); - ObjectAddressSet(trigger, TriggerRelationId, - trgform->oid); - performDeletion(&trigger, DROP_RESTRICT, 0); - /* make trigger drop visible, in case the loop iterates */ - CommandCounterIncrement(); + attached = true; + table_close(pkrel, NoLock); + break; } - - systable_endscan(scan); - table_close(trigrel, RowExclusiveLock); - - ConstraintSetParentConstraint(fk->conoid, parentConstrOid, - RelationGetRelid(partRel)); - CommandCounterIncrement(); - attach_it = true; - break; } - - /* - * If we attached to an existing constraint, there is no need to - * create a new one. In fact, there's no need to recurse for this - * constraint to partitions, either. - */ - if (attach_it) + if (attached) { ReleaseSysCache(tuple); continue; } + /* No dice. Set up to create our own constraint */ + fkconstraint = makeNode(Constraint); + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(partRel), + NameStr(constrForm->conname))) + fkconstraint->conname = + ChooseConstraintName(RelationGetRelationName(partRel), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(partRel), NIL); + else + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + for (int i = 0; i < numfks; i++) + { + Form_pg_attribute att; + + att = TupleDescAttr(RelationGetDescr(partRel), + mapped_conkey[i] - 1); + fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs, + makeString(NameStr(att->attname))); + } + + indexOid = constrForm->conindid; constrOid = - CreateConstraintEntry(NameStr(constrForm->conname), + CreateConstraintEntry(fkconstraint->conname, constrForm->connamespace, CONSTRAINT_FOREIGN, - constrForm->condeferrable, - constrForm->condeferred, + fkconstraint->deferrable, + fkconstraint->initdeferred, constrForm->convalidated, parentConstrOid, RelationGetRelid(partRel), @@ -8235,92 +8649,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, numfks, numfks, InvalidOid, /* not a domain constraint */ - constrForm->conindid, /* same index */ + indexOid, constrForm->confrelid, /* same foreign rel */ confkey, conpfeqop, conppeqop, conffeqop, numfks, - constrForm->confupdtype, - constrForm->confdeltype, - constrForm->confmatchtype, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, NULL, NULL, NULL, - false, - 1, false, true); - subclone = lappend_oid(subclone, constrOid); + false, /* islocal */ + 1, /* inhcount */ + false, /* conNoInherit */ + true); /* Set up partition dependencies for the new constraint */ - ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); - recordDependencyOn(&childAddr, &parentAddr, - DEPENDENCY_PARTITION_PRI); - ObjectAddressSet(childTableAddr, RelationRelationId, + ObjectAddressSet(address, ConstraintRelationId, constrOid); + ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, RelationGetRelid(partRel)); - recordDependencyOn(&childAddr, &childTableAddr, - DEPENDENCY_PARTITION_SEC); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); - fkconstraint->fk_upd_action = constrForm->confupdtype; - fkconstraint->fk_del_action = constrForm->confdeltype; - fkconstraint->deferrable = constrForm->condeferrable; - fkconstraint->initdeferred = constrForm->condeferred; + /* Done with the cloned constraint's tuple */ + ReleaseSysCache(tuple); - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); + /* Make all this visible before recursing */ + CommandCounterIncrement(); - if (cloned) - { - ClonedConstraint *newc; + addFkRecurseReferencing(wqueue, + fkconstraint, + partRel, + pkrel, + indexOid, + constrOid, + numfks, + confkey, + mapped_conkey, + conpfeqop, + conppeqop, + conffeqop, + false, /* no old check exists */ + AccessExclusiveLock); + table_close(pkrel, NoLock); + } +} - /* - * Feed back caller about the constraints we created, so that they - * can set up constraint verification. - */ - newc = palloc(sizeof(ClonedConstraint)); - newc->relid = RelationGetRelid(partRel); - newc->refrelid = constrForm->confrelid; - newc->conindid = constrForm->conindid; - newc->conid = constrOid; - newc->constraint = fkconstraint; +/* + * When the parent of a partition receives [the referencing side of] a foreign + * key, we must propagate that foreign key to the partition. However, the + * partition might already have an equivalent foreign key; this routine + * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined + * by the other parameters. If they are equivalent, create the link between + * the two constraints and return true. + * + * If the given FK does not match the one defined by rest of the params, + * return false. + */ +static bool +tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, + int numfks, + AttrNumber *mapped_conkey, + AttrNumber *confkey, + Oid *conpfeqop) +{ + HeapTuple parentConstrTup; + Form_pg_constraint parentConstr; + HeapTuple partcontup; + Form_pg_constraint partConstr; + Relation trigrel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple trigtup; + + parentConstrTup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(parentConstrOid)); + if (!parentConstrTup) + elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid); + parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup); - *cloned = lappend(*cloned, newc); + /* + * Do some quick & easy initial checks. If any of these fail, we cannot + * use this constraint. + */ + if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks) + { + ReleaseSysCache(parentConstrTup); + return false; + } + for (int i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + ReleaseSysCache(parentConstrTup); + return false; } + } - ReleaseSysCache(tuple); + /* + * Looks good so far; do some more extensive checks. Presumably the check + * for 'convalidated' could be dropped, since we don't really care about + * that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != parentConstr->condeferrable || + partConstr->condeferred != parentConstr->condeferred || + partConstr->confupdtype != parentConstr->confupdtype || + partConstr->confdeltype != parentConstr->confdeltype || + partConstr->confmatchtype != parentConstr->confmatchtype) + { + ReleaseSysCache(parentConstrTup); + ReleaseSysCache(partcontup); + return false; } - pfree(attmap); - list_free_deep(partFKs); + ReleaseSysCache(partcontup); + ReleaseSysCache(parentConstrTup); /* - * If the partition is partitioned, recurse to handle any constraints that - * were cloned. + * Looks good! Attach this constraint. The action triggers in the new + * partition become redundant -- the parent table already has equivalent + * ones, and those will be able to reach the partition. Remove the ones + * in the partition. We identify them because they have our constraint + * OID, as well as being on the referenced rel. */ - if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - subclone != NIL) + trigrel = table_open(TriggerRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_trigger_tgconstraint, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(fk->conoid)); + + scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, + NULL, 1, &key); + while ((trigtup = systable_getnext(scan)) != NULL) { - PartitionDesc partdesc = RelationGetPartitionDesc(partRel); - int i; + Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); + ObjectAddress trigger; - for (i = 0; i < partdesc->nparts; i++) - { - Relation childRel; + if (trgform->tgconstrrelid != fk->conrelid) + continue; + if (trgform->tgrelid != fk->confrelid) + continue; - childRel = table_open(partdesc->oids[i], AccessExclusiveLock); - CloneFkReferencing(pg_constraint, - partRel, - childRel, - subclone, - cloned); - table_close(childRel, NoLock); /* keep lock till commit */ - } + /* + * The constraint is originally set up to contain this trigger as an + * implementation object, so there's a dependency record that links + * the two; however, since the trigger is no longer needed, we remove + * the dependency link in order to be able to drop the trigger while + * keeping the constraint intact. + */ + deleteDependencyRecordsFor(TriggerRelationId, + trgform->oid, + false); + /* make dependency deletion visible to performDeletion */ + CommandCounterIncrement(); + ObjectAddressSet(trigger, TriggerRelationId, + trgform->oid); + performDeletion(&trigger, DROP_RESTRICT, 0); + /* make trigger drop visible, in case the loop iterates */ + CommandCounterIncrement(); } + + systable_endscan(scan); + table_close(trigrel, RowExclusiveLock); + + ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid); + CommandCounterIncrement(); + return true; } + /* * ALTER TABLE ALTER CONSTRAINT * @@ -8440,8 +8953,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd, /* * Update deferrability of RI_FKey_noaction_del, * RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd - * triggers, but not others; see createForeignKeyTriggers and - * CreateFKCheckTrigger. + * triggers, but not others; see createForeignKeyActionTriggers + * and CreateFKCheckTrigger. */ if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL && tgform->tgfoid != F_RI_FKEY_NOACTION_UPD && @@ -9349,37 +9862,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, } /* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. - */ -void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid, bool create_action) -{ - /* - * For the referenced side, create action triggers, if requested. (If the - * referencing side is partitioned, there is still only one trigger, which - * runs on the referenced side and points to the top of the referencing - * hierarchy.) - */ - if (create_action) - createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, - indexOid); - - /* - * For the referencing side, create the check triggers. We only need - * these on the partitions. - */ - if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, - fkconstraint, constraintOid, indexOid); - - CommandCounterIncrement(); -} - -/* * ALTER TABLE DROP CONSTRAINT * * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. @@ -14778,8 +15260,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) bool found_whole_row; Oid defaultPartOid; List *partBoundConstraint; - List *cloned; - ListCell *l; /* * We must lock the default partition if one exists, because attaching a @@ -14960,33 +15440,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) CloneRowTriggersToPartition(rel, attachrel); /* - * Clone foreign key constraints, and setup for Phase 3 to verify them. + * Clone foreign key constraints. Callee is responsible for setting up + * for phase 3 constraint verification. */ - cloned = NIL; - CloneForeignKeyConstraints(RelationGetRelid(rel), - RelationGetRelid(attachrel), &cloned); - foreach(l, cloned) - { - ClonedConstraint *clonedcon = lfirst(l); - NewConstraint *newcon; - Relation clonedrel; - AlteredTableInfo *parttab; - - clonedrel = relation_open(clonedcon->relid, NoLock); - parttab = ATGetQueueEntry(wqueue, clonedrel); - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = clonedcon->constraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = clonedcon->refrelid; - newcon->refindid = clonedcon->conindid; - newcon->conid = clonedcon->conid; - newcon->qual = (Node *) clonedcon->constraint; - - parttab->constraints = lappend(parttab->constraints, newcon); - - relation_close(clonedrel, NoLock); - } + CloneForeignKeyConstraints(wqueue, rel, attachrel); /* * Generate partition constraint from the partition bound specification. @@ -15177,6 +15634,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) RelationGetRelid(attachrel)); update_relispartition(NULL, cldIdxId, true); found = true; + + CommandCounterIncrement(); break; } } @@ -15368,6 +15827,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name) partRel = table_openrv(name, ShareUpdateExclusiveLock); + /* Ensure that foreign keys still hold after this detach */ + ATDetachCheckNoForeignKeyRefs(partRel); + /* All inheritance related checks are performed within the function */ RemoveInheritance(partRel, rel); @@ -15487,6 +15949,28 @@ ATExecDetachPartition(Relation rel, RangeVar *name) list_free_deep(fks); /* + * Any sub-constrains that are in the referenced-side of a larger + * constraint have to be removed. This partition is no longer part of the + * key space of the constraint. + */ + foreach(cell, GetParentedForeignKeyRefs(partRel)) + { + Oid constrOid = lfirst_oid(cell); + ObjectAddress constraint; + + ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid); + deleteDependencyRecordsForClass(ConstraintRelationId, + constrOid, + ConstraintRelationId, + DEPENDENCY_INTERNAL); + CommandCounterIncrement(); + + ObjectAddressSet(constraint, ConstraintRelationId, constrOid); + performDeletion(&constraint, DROP_RESTRICT, 0); + } + CommandCounterIncrement(); + + /* * Invalidate the parent's relcache so that the partition is no longer * included in its partition descriptor. */ @@ -15866,3 +16350,107 @@ update_relispartition(Relation classRel, Oid relationId, bool newval) if (opened) table_close(classRel, RowExclusiveLock); } + +/* + * Return an OID list of constraints that reference the given relation + * that are marked as having a parent constraints. + */ +static List * +GetParentedForeignKeyRefs(Relation partition) +{ + Relation pg_constraint; + HeapTuple tuple; + SysScanDesc scan; + ScanKeyData key[2]; + List *constraints = NIL; + + /* + * If no indexes, or no columns are referenceable by FKs, we can avoid the + * scan. + */ + if (RelationGetIndexList(partition) == NIL || + bms_is_empty(RelationGetIndexAttrBitmap(partition, + INDEX_ATTR_BITMAP_KEY))) + return NIL; + + /* Search for constraints referencing this table */ + pg_constraint = table_open(ConstraintRelationId, AccessShareLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + + /* XXX This is a seqscan, as we don't have a usable index */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* + * We only need to process constraints that are part of larger ones. + */ + if (!OidIsValid(constrForm->conparentid)) + continue; + + constraints = lappend_oid(constraints, constrForm->oid); + } + + systable_endscan(scan); + table_close(pg_constraint, AccessShareLock); + + return constraints; +} + +/* + * During DETACH PARTITION, verify that any foreign keys pointing to the + * partitioned table would not become invalid. An error is raised if any + * referenced values exist. + */ +static void +ATDetachCheckNoForeignKeyRefs(Relation partition) +{ + List *constraints; + ListCell *cell; + + constraints = GetParentedForeignKeyRefs(partition); + + foreach(cell, constraints) + { + Oid constrOid = lfirst_oid(cell); + HeapTuple tuple; + Form_pg_constraint constrForm; + Relation rel; + Trigger trig; + + tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + Assert(OidIsValid(constrForm->conparentid)); + Assert(constrForm->confrelid == RelationGetRelid(partition)); + + /* prevent data changes into the referencing table until commit */ + rel = table_open(constrForm->conrelid, ShareLock); + + MemSet(&trig, 0, sizeof(trig)); + trig.tgoid = InvalidOid; + trig.tgname = NameStr(constrForm->conname); + trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN; + trig.tgisinternal = true; + trig.tgconstrrelid = RelationGetRelid(partition); + trig.tgconstrindid = constrForm->conindid; + trig.tgconstraint = constrForm->oid; + trig.tgdeferrable = false; + trig.tginitdeferred = false; + /* we needn't fill in remaining fields */ + + RI_PartitionRemove_Check(&trig, rel, partition); + + ReleaseSysCache(tuple); + + table_close(rel, NoLock); + } +} |