@@ -328,7 +328,8 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
328
328
LOCKMODE lockmode);
329
329
static ObjectAddress ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
330
330
bool recurse, bool recursing, LOCKMODE lockmode);
331
- static ObjectAddress ATExecValidateConstraint(Relation rel, char *constrName,
331
+ static ObjectAddress ATExecValidateConstraint(List **wqueue,
332
+ Relation rel, char *constrName,
332
333
bool recurse, bool recursing, LOCKMODE lockmode);
333
334
static int transformColumnNameList(Oid relId, List *colList,
334
335
int16 *attnums, Oid *atttypids);
@@ -342,7 +343,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
342
343
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
343
344
static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId,
344
345
Oid *funcid);
345
- static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
346
346
static void validateForeignKeyConstraint(char *conname,
347
347
Relation rel, Relation pkrel,
348
348
Oid pkindOid, Oid constraintOid);
@@ -4500,13 +4500,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
4500
4500
address = ATExecAlterConstraint(rel, cmd, false, false, lockmode);
4501
4501
break;
4502
4502
case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
4503
- address = ATExecValidateConstraint(rel, cmd->name, false , false,
4504
- lockmode);
4503
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, false,
4504
+ false, lockmode);
4505
4505
break;
4506
4506
case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
4507
4507
* recursion */
4508
- address = ATExecValidateConstraint(rel, cmd->name, true, false ,
4509
- lockmode);
4508
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, true,
4509
+ false, lockmode);
4510
4510
break;
4511
4511
case AT_DropConstraint: /* DROP CONSTRAINT */
4512
4512
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -9729,8 +9729,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
9729
9729
* was already validated, InvalidObjectAddress is returned.
9730
9730
*/
9731
9731
static ObjectAddress
9732
- ATExecValidateConstraint(Relation rel, char *constrName, bool recurse ,
9733
- bool recursing, LOCKMODE lockmode)
9732
+ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
9733
+ bool recurse, bool recursing, LOCKMODE lockmode)
9734
9734
{
9735
9735
Relation conrel;
9736
9736
SysScanDesc scan;
@@ -9776,27 +9776,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9776
9776
9777
9777
if (!con->convalidated)
9778
9778
{
9779
+ AlteredTableInfo *tab;
9779
9780
HeapTuple copyTuple;
9780
9781
Form_pg_constraint copy_con;
9781
9782
9782
9783
if (con->contype == CONSTRAINT_FOREIGN)
9783
9784
{
9784
- Relation refrel;
9785
+ NewConstraint *newcon;
9786
+ Constraint *fkconstraint;
9785
9787
9786
- /*
9787
- * Triggers are already in place on both tables, so a concurrent
9788
- * write that alters the result here is not possible. Normally we
9789
- * can run a query here to do the validation, which would only
9790
- * require AccessShareLock. In some cases, it is possible that we
9791
- * might need to fire triggers to perform the check, so we take a
9792
- * lock at RowShareLock level just in case.
9793
- */
9794
- refrel = table_open(con->confrelid, RowShareLock);
9788
+ /* Queue validation for phase 3 */
9789
+ fkconstraint = makeNode(Constraint);
9790
+ /* for now this is all we need */
9791
+ fkconstraint->conname = constrName;
9795
9792
9796
- validateForeignKeyConstraint(constrName, rel, refrel,
9797
- con->conindid,
9798
- con->oid);
9799
- table_close(refrel, NoLock);
9793
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
9794
+ newcon->name = constrName;
9795
+ newcon->contype = CONSTR_FOREIGN;
9796
+ newcon->refrelid = con->confrelid;
9797
+ newcon->refindid = con->conindid;
9798
+ newcon->conid = con->oid;
9799
+ newcon->qual = (Node *) fkconstraint;
9800
+
9801
+ /* Find or create work queue entry for this table */
9802
+ tab = ATGetQueueEntry(wqueue, rel);
9803
+ tab->constraints = lappend(tab->constraints, newcon);
9800
9804
9801
9805
/*
9802
9806
* We disallow creating invalid foreign keys to or from
@@ -9807,6 +9811,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9807
9811
{
9808
9812
List *children = NIL;
9809
9813
ListCell *child;
9814
+ NewConstraint *newcon;
9815
+ bool isnull;
9816
+ Datum val;
9817
+ char *conbin;
9810
9818
9811
9819
/*
9812
9820
* If we're recursing, the parent has already done this, so skip
@@ -9846,12 +9854,30 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9846
9854
/* find_all_inheritors already got lock */
9847
9855
childrel = table_open(childoid, NoLock);
9848
9856
9849
- ATExecValidateConstraint(childrel, constrName, false,
9857
+ ATExecValidateConstraint(wqueue, childrel, constrName, false,
9850
9858
true, lockmode);
9851
9859
table_close(childrel, NoLock);
9852
9860
}
9853
9861
9854
- validateCheckConstraint(rel, tuple);
9862
+ /* Queue validation for phase 3 */
9863
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
9864
+ newcon->name = constrName;
9865
+ newcon->contype = CONSTR_CHECK;
9866
+ newcon->refrelid = InvalidOid;
9867
+ newcon->refindid = InvalidOid;
9868
+ newcon->conid = con->oid;
9869
+
9870
+ val = SysCacheGetAttr(CONSTROID, tuple,
9871
+ Anum_pg_constraint_conbin, &isnull);
9872
+ if (isnull)
9873
+ elog(ERROR, "null conbin for constraint %u", con->oid);
9874
+
9875
+ conbin = TextDatumGetCString(val);
9876
+ newcon->qual = (Node *) stringToNode(conbin);
9877
+
9878
+ /* Find or create work queue entry for this table */
9879
+ tab = ATGetQueueEntry(wqueue, rel);
9880
+ tab->constraints = lappend(tab->constraints, newcon);
9855
9881
9856
9882
/*
9857
9883
* Invalidate relcache so that others see the new validated
@@ -10225,87 +10251,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
10225
10251
}
10226
10252
}
10227
10253
10228
- /*
10229
- * Scan the existing rows in a table to verify they meet a proposed
10230
- * CHECK constraint.
10231
- *
10232
- * The caller must have opened and locked the relation appropriately.
10233
- */
10234
- static void
10235
- validateCheckConstraint(Relation rel, HeapTuple constrtup)
10236
- {
10237
- EState *estate;
10238
- Datum val;
10239
- char *conbin;
10240
- Expr *origexpr;
10241
- ExprState *exprstate;
10242
- TableScanDesc scan;
10243
- ExprContext *econtext;
10244
- MemoryContext oldcxt;
10245
- TupleTableSlot *slot;
10246
- Form_pg_constraint constrForm;
10247
- bool isnull;
10248
- Snapshot snapshot;
10249
-
10250
- /*
10251
- * VALIDATE CONSTRAINT is a no-op for foreign tables and partitioned
10252
- * tables.
10253
- */
10254
- if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
10255
- rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
10256
- return;
10257
-
10258
- constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
10259
-
10260
- estate = CreateExecutorState();
10261
-
10262
- /*
10263
- * XXX this tuple doesn't really come from a syscache, but this doesn't
10264
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
10265
- * the tupdesc
10266
- */
10267
- val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
10268
- &isnull);
10269
- if (isnull)
10270
- elog(ERROR, "null conbin for constraint %u",
10271
- constrForm->oid);
10272
- conbin = TextDatumGetCString(val);
10273
- origexpr = (Expr *) stringToNode(conbin);
10274
- exprstate = ExecPrepareExpr(origexpr, estate);
10275
-
10276
- econtext = GetPerTupleExprContext(estate);
10277
- slot = table_slot_create(rel, NULL);
10278
- econtext->ecxt_scantuple = slot;
10279
-
10280
- snapshot = RegisterSnapshot(GetLatestSnapshot());
10281
- scan = table_beginscan(rel, snapshot, 0, NULL);
10282
-
10283
- /*
10284
- * Switch to per-tuple memory context and reset it for each tuple
10285
- * produced, so we don't leak memory.
10286
- */
10287
- oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
10288
-
10289
- while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
10290
- {
10291
- if (!ExecCheck(exprstate, econtext))
10292
- ereport(ERROR,
10293
- (errcode(ERRCODE_CHECK_VIOLATION),
10294
- errmsg("check constraint \"%s\" of relation \"%s\" is violated by some row",
10295
- NameStr(constrForm->conname),
10296
- RelationGetRelationName(rel)),
10297
- errtableconstraint(rel, NameStr(constrForm->conname))));
10298
-
10299
- ResetExprContext(econtext);
10300
- }
10301
-
10302
- MemoryContextSwitchTo(oldcxt);
10303
- table_endscan(scan);
10304
- UnregisterSnapshot(snapshot);
10305
- ExecDropSingleTupleTableSlot(slot);
10306
- FreeExecutorState(estate);
10307
- }
10308
-
10309
10254
/*
10310
10255
* Scan the existing rows in a table to verify they meet a proposed FK
10311
10256
* constraint.
0 commit comments