Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_constraint.c58
-rw-r--r--src/backend/commands/indexcmds.c31
-rw-r--r--src/backend/commands/tablecmds.c226
-rw-r--r--src/backend/parser/gram.y45
-rw-r--r--src/backend/utils/adt/ri_triggers.c169
-rw-r--r--src/backend/utils/adt/ruleutils.c17
6 files changed, 452 insertions, 94 deletions
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 9be050ccee8..1e2df031a84 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/genam.h"
+#include "access/gist.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/table.h"
@@ -1350,6 +1351,63 @@ DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
}
/*
+ * FindFkPeriodOpers -
+ *
+ * Looks up the operator oids used for the PERIOD part of a temporal foreign key.
+ * The opclass should be the opclass of that PERIOD element.
+ * Everything else is an output: containedbyoperoid is the ContainedBy operator for
+ * types matching the PERIOD element.
+ * aggedcontainedbyoperoid is also a ContainedBy operator,
+ * but one whose rhs is a multirange.
+ * That way foreign keys can compare fkattr <@ range_agg(pkattr).
+ */
+void
+FindFKPeriodOpers(Oid opclass,
+ Oid *containedbyoperoid,
+ Oid *aggedcontainedbyoperoid)
+{
+ Oid opfamily = InvalidOid;
+ Oid opcintype = InvalidOid;
+ StrategyNumber strat;
+
+ /* Make sure we have a range or multirange. */
+ if (get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
+ {
+ if (opcintype != ANYRANGEOID && opcintype != ANYMULTIRANGEOID)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("invalid type for PERIOD part of foreign key"),
+ errdetail("Only range and multirange are supported."));
+
+ }
+ else
+ elog(ERROR, "cache lookup failed for opclass %u", opclass);
+
+ /*
+ * Look up the ContainedBy operator whose lhs and rhs are the opclass's
+ * type. We use this to optimize RI checks: if the new value includes all
+ * of the old value, then we can treat the attribute as if it didn't
+ * change, and skip the RI check.
+ */
+ strat = RTContainedByStrategyNumber;
+ GetOperatorFromWellKnownStrategy(opclass,
+ InvalidOid,
+ containedbyoperoid,
+ &strat);
+
+ /*
+ * Now look up the ContainedBy operator. Its left arg must be the type of
+ * the column (or rather of the opclass). Its right arg must match the
+ * return type of the support proc.
+ */
+ strat = RTContainedByStrategyNumber;
+ GetOperatorFromWellKnownStrategy(opclass,
+ ANYMULTIRANGEOID,
+ aggedcontainedbyoperoid,
+ &strat);
+}
+
+/*
* Determine whether a relation can be proven functionally dependent on
* a set of grouping columns. If so, return true and add the pg_constraint
* OIDs of the constraints needed for the proof to the *constraintDeps list.
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index d7b71b81d3b..f99c2d2deec 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2205,7 +2205,7 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
strat = RTOverlapStrategyNumber;
else
strat = RTEqualStrategyNumber;
- GetOperatorFromWellKnownStrategy(opclassOids[attn], atttype,
+ GetOperatorFromWellKnownStrategy(opclassOids[attn], InvalidOid,
&opid, &strat);
indexInfo->ii_ExclusionOps[attn] = opid;
indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
@@ -2445,7 +2445,7 @@ GetDefaultOpClass(Oid type_id, Oid am_id)
* GetOperatorFromWellKnownStrategy
*
* opclass - the opclass to use
- * atttype - the type to ask about
+ * rhstype - the type for the right-hand side, or InvalidOid to use the type of the given opclass.
* opid - holds the operator we found
* strat - holds the input and output strategy number
*
@@ -2458,14 +2458,14 @@ GetDefaultOpClass(Oid type_id, Oid am_id)
* InvalidStrategy.
*/
void
-GetOperatorFromWellKnownStrategy(Oid opclass, Oid atttype,
+GetOperatorFromWellKnownStrategy(Oid opclass, Oid rhstype,
Oid *opid, StrategyNumber *strat)
{
Oid opfamily;
Oid opcintype;
StrategyNumber instrat = *strat;
- Assert(instrat == RTEqualStrategyNumber || instrat == RTOverlapStrategyNumber);
+ Assert(instrat == RTEqualStrategyNumber || instrat == RTOverlapStrategyNumber || instrat == RTContainedByStrategyNumber);
*opid = InvalidOid;
@@ -2488,16 +2488,21 @@ GetOperatorFromWellKnownStrategy(Oid opclass, Oid atttype,
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_OBJECT),
- instrat == RTEqualStrategyNumber ?
- errmsg("could not identify an equality operator for type %s", format_type_be(atttype)) :
- errmsg("could not identify an overlaps operator for type %s", format_type_be(atttype)),
+ instrat == RTEqualStrategyNumber ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
+ instrat == RTOverlapStrategyNumber ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
+ instrat == RTContainedByStrategyNumber ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
errdetail("Could not translate strategy number %d for operator class \"%s\" for access method \"%s\".",
instrat, NameStr(((Form_pg_opclass) GETSTRUCT(tuple))->opcname), "gist"));
-
- ReleaseSysCache(tuple);
}
- *opid = get_opfamily_member(opfamily, opcintype, opcintype, *strat);
+ /*
+ * We parameterize rhstype so foreign keys can ask for a <@ operator
+ * whose rhs matches the aggregate function. For example range_agg
+ * returns anymultirange.
+ */
+ if (!OidIsValid(rhstype))
+ rhstype = opcintype;
+ *opid = get_opfamily_member(opfamily, opcintype, rhstype, *strat);
}
if (!OidIsValid(*opid))
@@ -2510,9 +2515,9 @@ GetOperatorFromWellKnownStrategy(Oid opclass, Oid atttype,
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_OBJECT),
- instrat == RTEqualStrategyNumber ?
- errmsg("could not identify an equality operator for type %s", format_type_be(atttype)) :
- errmsg("could not identify an overlaps operator for type %s", format_type_be(atttype)),
+ instrat == RTEqualStrategyNumber ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
+ instrat == RTOverlapStrategyNumber ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
+ instrat == RTContainedByStrategyNumber ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
NameStr(((Form_pg_opfamily) GETSTRUCT(tuple))->opfname), "gist"));
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 818ed5702cf..022ddf172a3 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16,6 +16,7 @@
#include "access/attmap.h"
#include "access/genam.h"
+#include "access/gist.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/multixact.h"
@@ -215,6 +216,7 @@ typedef struct NewConstraint
ConstrType contype; /* CHECK or FOREIGN */
Oid refrelid; /* PK rel, if FOREIGN */
Oid refindid; /* OID of PK's index, if FOREIGN */
+ bool conwithperiod; /* Whether the new FOREIGN KEY uses PERIOD */
Oid conid; /* OID of pg_constraint entry, if FOREIGN */
Node *qual; /* Check expr or CONSTR_FOREIGN Constraint */
ExprState *qualstate; /* Execution state for CHECK expr */
@@ -389,16 +391,17 @@ static int transformColumnNameList(Oid relId, List *colList,
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
List **attnamelist,
int16 *attnums, Oid *atttypids,
- Oid *opclasses);
+ Oid *opclasses, bool *pk_has_without_overlaps);
static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
- Oid *opclasses);
+ bool with_period, Oid *opclasses,
+ bool *pk_has_without_overlaps);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId,
Oid *funcid);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
- Oid pkindOid, Oid constraintOid);
+ Oid pkindOid, Oid constraintOid, bool hasperiod);
static void CheckAlterTableIsSafe(Relation rel);
static void ATController(AlterTableStmt *parsetree,
Relation rel, List *cmds, bool recurse, LOCKMODE lockmode,
@@ -510,7 +513,8 @@ static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstra
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok,
- Oid parentDelTrigger, Oid parentUpdTrigger);
+ Oid parentDelTrigger, Oid parentUpdTrigger,
+ bool with_period);
static void validateFkOnDeleteSetColumns(int numfks, const int16 *fkattnums,
int numfksetcols, const int16 *fksetcolsattnums,
List *fksetcols);
@@ -520,7 +524,9 @@ static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok, LOCKMODE lockmode,
- Oid parentInsTrigger, Oid parentUpdTrigger);
+ Oid parentInsTrigger, Oid parentUpdTrigger,
+ bool with_period);
+
static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
Relation partitionRel);
static void CloneFkReferenced(Relation parentRel, Relation partitionRel);
@@ -5887,7 +5893,8 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
validateForeignKeyConstraint(fkconstraint->conname, rel, refrel,
con->refindid,
- con->conid);
+ con->conid,
+ con->conwithperiod);
/*
* No need to mark the constraint row as validated, we did
@@ -9534,6 +9541,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid ppeqoperators[INDEX_MAX_KEYS] = {0};
Oid ffeqoperators[INDEX_MAX_KEYS] = {0};
int16 fkdelsetcols[INDEX_MAX_KEYS] = {0};
+ bool with_period;
+ bool pk_has_without_overlaps;
int i;
int numfks,
numpks,
@@ -9628,6 +9637,11 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
numfks = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_attrs,
fkattnum, fktypoid);
+ with_period = fkconstraint->fk_with_period || fkconstraint->pk_with_period;
+ if (with_period && !fkconstraint->fk_with_period)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_FOREIGN_KEY),
+ errmsg("foreign key uses PERIOD on the referenced table but not the referencing table"));
numfkdelsetcols = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_del_set_cols,
@@ -9647,19 +9661,41 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
&fkconstraint->pk_attrs,
pkattnum, pktypoid,
- opclasses);
+ opclasses, &pk_has_without_overlaps);
+
+ /* If the primary key uses WITHOUT OVERLAPS, the fk must use PERIOD */
+ if (pk_has_without_overlaps && !fkconstraint->fk_with_period)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_FOREIGN_KEY),
+ errmsg("foreign key uses PERIOD on the referenced table but not the referencing table"));
}
else
{
numpks = transformColumnNameList(RelationGetRelid(pkrel),
fkconstraint->pk_attrs,
pkattnum, pktypoid);
+
+ /* Since we got pk_attrs, one should be a period. */
+ if (with_period && !fkconstraint->pk_with_period)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_FOREIGN_KEY),
+ errmsg("foreign key uses PERIOD on the referencing table but not the referenced table"));
+
/* Look for an index matching the column list */
indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
- opclasses);
+ with_period, opclasses, &pk_has_without_overlaps);
}
/*
+ * If the referenced primary key has WITHOUT OVERLAPS, the foreign key
+ * must use PERIOD.
+ */
+ if (pk_has_without_overlaps && !with_period)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_FOREIGN_KEY),
+ errmsg("foreign key must use PERIOD when referencing a primary using WITHOUT OVERLAPS"));
+
+ /*
* Now we can check permissions.
*/
checkFkeyPermissions(pkrel, pkattnum, numpks);
@@ -9693,6 +9729,28 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
+ * Some actions are currently unsupported for foreign keys using PERIOD.
+ */
+ if (fkconstraint->fk_with_period)
+ {
+ if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_CASCADE ||
+ fkconstraint->fk_upd_action == FKCONSTR_ACTION_SETNULL ||
+ fkconstraint->fk_upd_action == FKCONSTR_ACTION_SETDEFAULT)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unsupported %s action for foreign key constraint using PERIOD",
+ "ON UPDATE"));
+
+ if (fkconstraint->fk_del_action == FKCONSTR_ACTION_CASCADE ||
+ fkconstraint->fk_del_action == FKCONSTR_ACTION_SETNULL ||
+ fkconstraint->fk_del_action == FKCONSTR_ACTION_SETDEFAULT)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unsupported %s action for foreign key constraint using PERIOD",
+ "ON DELETE"));
+ }
+
+ /*
* Look up the equality operators to use in the constraint.
*
* Note that we have to be careful about the difference between the actual
@@ -9738,16 +9796,56 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
opcintype = cla_tup->opcintype;
ReleaseSysCache(cla_ht);
- /*
- * Check it's a btree; currently this can never fail since no other
- * index AMs support unique indexes. If we ever did have other types
- * of unique indexes, we'd need a way to determine which operator
- * strategy number is equality. (Is it reasonable to insist that
- * every such index AM use btree's number for equality?)
- */
- if (amid != BTREE_AM_OID)
- elog(ERROR, "only b-tree indexes are supported for foreign keys");
- eqstrategy = BTEqualStrategyNumber;
+ if (with_period)
+ {
+ StrategyNumber rtstrategy;
+ bool for_overlaps = with_period && i == numpks - 1;
+
+ /*
+ * GiST indexes are required to support temporal foreign keys
+ * because they combine equals and overlaps.
+ */
+ if (amid != GIST_AM_OID)
+ elog(ERROR, "only GiST indexes are supported for temporal foreign keys");
+
+ rtstrategy = for_overlaps ? RTOverlapStrategyNumber : RTEqualStrategyNumber;
+
+ /*
+ * An opclass can use whatever strategy numbers it wants, so we
+ * ask the opclass what number it actually uses instead of our RT*
+ * constants.
+ */
+ eqstrategy = GistTranslateStratnum(opclasses[i], rtstrategy);
+ if (eqstrategy == InvalidStrategy)
+ {
+ HeapTuple tuple;
+
+ tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclasses[i]));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for operator class %u", opclasses[i]);
+
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ for_overlaps
+ ? errmsg("could not identify an overlaps operator for foreign key")
+ : errmsg("could not identify an equality operator for foreign key"),
+ errdetail("Could not translate strategy number %d for operator class \"%s\" for access method \"%s\".",
+ rtstrategy, NameStr(((Form_pg_opclass) GETSTRUCT(tuple))->opcname), "gist"));
+ }
+ }
+ else
+ {
+ /*
+ * Check it's a btree; currently this can never fail since no
+ * other index AMs support unique indexes. If we ever did have
+ * other types of unique indexes, we'd need a way to determine
+ * which operator strategy number is equality. (We could use
+ * something like GistTranslateStratnum.)
+ */
+ if (amid != BTREE_AM_OID)
+ elog(ERROR, "only b-tree indexes are supported for foreign keys");
+ eqstrategy = BTEqualStrategyNumber;
+ }
/*
* There had better be a primary equality operator for the index.
@@ -9898,6 +9996,22 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
+ * For FKs with PERIOD we need additional operators to check whether the
+ * referencing row's range is contained by the aggregated ranges of the
+ * referenced row(s). For rangetypes and multirangetypes this is
+ * fk.periodatt <@ range_agg(pk.periodatt). Those are the only types we
+ * support for now. FKs will look these up at "runtime", but we should
+ * make sure the lookup works here, even if we don't use the values.
+ */
+ if (with_period)
+ {
+ Oid periodoperoid;
+ Oid aggedperiodoperoid;
+
+ FindFKPeriodOpers(opclasses[numpks - 1], &periodoperoid, &aggedperiodoperoid);
+ }
+
+ /*
* Create all the constraint and trigger objects, recursing to partitions
* as necessary. First handle the referenced side.
*/
@@ -9913,7 +10027,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
numfkdelsetcols,
fkdelsetcols,
old_check_ok,
- InvalidOid, InvalidOid);
+ InvalidOid, InvalidOid,
+ with_period);
/* Now handle the referencing side. */
addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
@@ -9929,7 +10044,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
fkdelsetcols,
old_check_ok,
lockmode,
- InvalidOid, InvalidOid);
+ InvalidOid, InvalidOid,
+ with_period);
/*
* Done. Close pk table, but keep lock until we've committed.
@@ -10014,7 +10130,8 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok,
- Oid parentDelTrigger, Oid parentUpdTrigger)
+ Oid parentDelTrigger, Oid parentUpdTrigger,
+ bool with_period)
{
ObjectAddress address;
Oid constrOid;
@@ -10100,7 +10217,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
conislocal, /* islocal */
coninhcount, /* inhcount */
connoinherit, /* conNoInherit */
- false, /* conPeriod */
+ with_period, /* conPeriod */
false); /* is_internal */
ObjectAddressSet(address, ConstraintRelationId, constrOid);
@@ -10176,7 +10293,8 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
pfeqoperators, ppeqoperators, ffeqoperators,
numfkdelsetcols, fkdelsetcols,
old_check_ok,
- deleteTriggerOid, updateTriggerOid);
+ deleteTriggerOid, updateTriggerOid,
+ with_period);
/* Done -- clean up (but keep the lock) */
table_close(partRel, NoLock);
@@ -10234,7 +10352,8 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok, LOCKMODE lockmode,
- Oid parentInsTrigger, Oid parentUpdTrigger)
+ Oid parentInsTrigger, Oid parentUpdTrigger,
+ bool with_period)
{
Oid insertTriggerOid,
updateTriggerOid;
@@ -10282,6 +10401,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
newcon->refrelid = RelationGetRelid(pkrel);
newcon->refindid = indexOid;
newcon->conid = parentConstr;
+ newcon->conwithperiod = fkconstraint->fk_with_period;
newcon->qual = (Node *) fkconstraint;
tab->constraints = lappend(tab->constraints, newcon);
@@ -10399,7 +10519,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
false,
1,
false,
- false, /* conPeriod */
+ with_period, /* conPeriod */
false);
/*
@@ -10430,7 +10550,8 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
old_check_ok,
lockmode,
insertTriggerOid,
- updateTriggerOid);
+ updateTriggerOid,
+ with_period);
table_close(partition, NoLock);
}
@@ -10666,7 +10787,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
confdelsetcols,
true,
deleteTriggerOid,
- updateTriggerOid);
+ updateTriggerOid,
+ constrForm->conperiod);
table_close(fkRel, NoLock);
ReleaseSysCache(tuple);
@@ -10776,6 +10898,7 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
ListCell *lc;
Oid insertTriggerOid,
updateTriggerOid;
+ bool with_period;
tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(parentConstrOid));
if (!HeapTupleIsValid(tuple))
@@ -10891,6 +11014,7 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
indexOid = constrForm->conindid;
+ with_period = constrForm->conperiod;
constrOid =
CreateConstraintEntry(fkconstraint->conname,
constrForm->connamespace,
@@ -10922,7 +11046,7 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
false, /* islocal */
1, /* inhcount */
false, /* conNoInherit */
- false, /* conPeriod */
+ with_period, /* conPeriod */
true);
/* Set up partition dependencies for the new constraint */
@@ -10956,7 +11080,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
false, /* no old check exists */
AccessExclusiveLock,
insertTriggerOid,
- updateTriggerOid);
+ updateTriggerOid,
+ with_period);
table_close(pkrel, NoLock);
}
@@ -11767,7 +11892,8 @@ transformColumnNameList(Oid relId, List *colList,
*
* Look up the names, attnums, and types of the primary key attributes
* for the pkrel. Also return the index OID and index opclasses of the
- * index supporting the primary key.
+ * index supporting the primary key. Also return whether the index has
+ * WITHOUT OVERLAPS.
*
* All parameters except pkrel are output parameters. Also, the function
* return value is the number of attributes in the primary key.
@@ -11778,7 +11904,7 @@ static int
transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
List **attnamelist,
int16 *attnums, Oid *atttypids,
- Oid *opclasses)
+ Oid *opclasses, bool *pk_has_without_overlaps)
{
List *indexoidlist;
ListCell *indexoidscan;
@@ -11856,6 +11982,8 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
}
+ *pk_has_without_overlaps = indexStruct->indisexclusion;
+
ReleaseSysCache(indexTuple);
return i;
@@ -11869,14 +11997,16 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
*
* Returns the OID of the unique index supporting the constraint and
* populates the caller-provided 'opclasses' array with the opclasses
- * associated with the index columns.
+ * associated with the index columns. Also sets whether the index
+ * uses WITHOUT OVERLAPS.
*
* Raises an ERROR on validation failure.
*/
static Oid
transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
- Oid *opclasses)
+ bool with_period, Oid *opclasses,
+ bool *pk_has_without_overlaps)
{
Oid indexoid = InvalidOid;
bool found = false;
@@ -11923,12 +12053,12 @@ transformFkeyCheckAttrs(Relation pkrel,
indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
/*
- * Must have the right number of columns; must be unique and not a
- * partial index; forget it if there are any expressions, too. Invalid
- * indexes are out as well.
+ * Must have the right number of columns; must be unique (or if
+ * temporal then exclusion instead) and not a partial index; forget it
+ * if there are any expressions, too. Invalid indexes are out as well.
*/
if (indexStruct->indnkeyatts == numattrs &&
- indexStruct->indisunique &&
+ (with_period ? indexStruct->indisexclusion : indexStruct->indisunique) &&
indexStruct->indisvalid &&
heap_attisnull(indexTuple, Anum_pg_index_indpred, NULL) &&
heap_attisnull(indexTuple, Anum_pg_index_indexprs, NULL))
@@ -11966,6 +12096,13 @@ transformFkeyCheckAttrs(Relation pkrel,
if (!found)
break;
}
+ /* The last attribute in the index must be the PERIOD FK part */
+ if (found && with_period)
+ {
+ int16 periodattnum = attnums[numattrs - 1];
+
+ found = (periodattnum == indexStruct->indkey.values[numattrs - 1]);
+ }
/*
* Refuse to use a deferrable unique/primary key. This is per SQL
@@ -11981,6 +12118,10 @@ transformFkeyCheckAttrs(Relation pkrel,
found_deferrable = true;
found = false;
}
+
+ /* We need to know whether the index has WITHOUT OVERLAPS */
+ if (found)
+ *pk_has_without_overlaps = indexStruct->indisexclusion;
}
ReleaseSysCache(indexTuple);
if (found)
@@ -12075,7 +12216,8 @@ validateForeignKeyConstraint(char *conname,
Relation rel,
Relation pkrel,
Oid pkindOid,
- Oid constraintOid)
+ Oid constraintOid,
+ bool hasperiod)
{
TupleTableSlot *slot;
TableScanDesc scan;
@@ -12103,9 +12245,11 @@ validateForeignKeyConstraint(char *conname,
/*
* See if we can do it with a single LEFT JOIN query. A false result
- * indicates we must proceed with the fire-the-trigger method.
+ * indicates we must proceed with the fire-the-trigger method. We can't do
+ * a LEFT JOIN for temporal FKs yet, but we can once we support temporal
+ * left joins.
*/
- if (RI_Initial_Check(&trig, rel, pkrel))
+ if (!hasperiod && RI_Initial_Check(&trig, rel, pkrel))
return;
/*
@@ -12256,6 +12400,7 @@ createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstr
fk_trigger->whenClause = NULL;
fk_trigger->transitionRels = NIL;
fk_trigger->constrrel = NULL;
+
switch (fkconstraint->fk_del_action)
{
case FKCONSTR_ACTION_NOACTION:
@@ -12316,6 +12461,7 @@ createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstr
fk_trigger->whenClause = NULL;
fk_trigger->transitionRels = NIL;
fk_trigger->constrrel = NULL;
+
switch (fkconstraint->fk_upd_action)
{
case FKCONSTR_ACTION_NOACTION:
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c8b4e8dde4c..d587f6dcd98 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -524,12 +524,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SetResetClause FunctionSetResetClause
%type <node> TableElement TypedTableElement ConstraintElem DomainConstraintElem TableFuncElement
-%type <node> columnDef columnOptions
+%type <node> columnDef columnOptions optionalPeriodName
%type <defelt> def_elem reloption_elem old_aggr_elem operator_def_elem
%type <node> def_arg columnElem where_clause where_or_current_clause
a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
columnref in_expr having_clause func_table xmltable array_expr
OptWhereClause operator_def_arg
+%type <list> opt_column_and_period_list
%type <list> rowsfrom_item rowsfrom_list opt_col_def_list
%type <boolean> opt_ordinality opt_without_overlaps
%type <list> ExclusionConstraintList ExclusionConstraintElem
@@ -761,7 +762,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
OVER OVERLAPS OVERLAY OVERRIDING OWNED OWNER
PARALLEL PARAMETER PARSER PARTIAL PARTITION PASSING PASSWORD PATH
- PLACING PLAN PLANS POLICY
+ PERIOD PLACING PLAN PLANS POLICY
POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROCEDURES PROGRAM PUBLICATION
@@ -4230,21 +4231,31 @@ ConstraintElem:
NULL, yyscanner);
$$ = (Node *) n;
}
- | FOREIGN KEY '(' columnList ')' REFERENCES qualified_name
- opt_column_list key_match key_actions ConstraintAttributeSpec
+ | FOREIGN KEY '(' columnList optionalPeriodName ')' REFERENCES qualified_name
+ opt_column_and_period_list key_match key_actions ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_FOREIGN;
n->location = @1;
- n->pktable = $7;
+ n->pktable = $8;
n->fk_attrs = $4;
- n->pk_attrs = $8;
- n->fk_matchtype = $9;
- n->fk_upd_action = ($10)->updateAction->action;
- n->fk_del_action = ($10)->deleteAction->action;
- n->fk_del_set_cols = ($10)->deleteAction->cols;
- processCASbits($11, @11, "FOREIGN KEY",
+ if ($5)
+ {
+ n->fk_attrs = lappend(n->fk_attrs, $5);
+ n->fk_with_period = true;
+ }
+ n->pk_attrs = linitial($9);
+ if (lsecond($9))
+ {
+ n->pk_attrs = lappend(n->pk_attrs, lsecond($9));
+ n->pk_with_period = true;
+ }
+ n->fk_matchtype = $10;
+ n->fk_upd_action = ($11)->updateAction->action;
+ n->fk_del_action = ($11)->deleteAction->action;
+ n->fk_del_set_cols = ($11)->deleteAction->cols;
+ processCASbits($12, @12, "FOREIGN KEY",
&n->deferrable, &n->initdeferred,
&n->skip_validation, NULL,
yyscanner);
@@ -4326,6 +4337,16 @@ columnList:
| columnList ',' columnElem { $$ = lappend($1, $3); }
;
+optionalPeriodName:
+ ',' PERIOD columnElem { $$ = $3; }
+ | /*EMPTY*/ { $$ = NULL; }
+ ;
+
+opt_column_and_period_list:
+ '(' columnList optionalPeriodName ')' { $$ = list_make2($2, $3); }
+ | /*EMPTY*/ { $$ = list_make2(NIL, NULL); }
+ ;
+
columnElem: ColId
{
$$ = (Node *) makeString($1);
@@ -17701,6 +17722,7 @@ unreserved_keyword:
| PASSING
| PASSWORD
| PATH
+ | PERIOD
| PLAN
| PLANS
| POLICY
@@ -18324,6 +18346,7 @@ bare_label_keyword:
| PASSING
| PASSWORD
| PATH
+ | PERIOD
| PLACING
| PLAN
| PLANS
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 25931f397f7..6896e1ae638 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -30,6 +30,7 @@
#include "access/xact.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
+#include "catalog/pg_proc.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/spi.h"
@@ -45,6 +46,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rangetypes.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/ruleutils.h"
@@ -96,6 +98,9 @@
*
* Information extracted from an FK pg_constraint entry. This is cached in
* ri_constraint_cache.
+ *
+ * Note that pf/pp/ff_eq_oprs may hold the overlaps operator instead of equals
+ * for the PERIOD part of a temporal foreign key.
*/
typedef struct RI_ConstraintInfo
{
@@ -115,12 +120,15 @@ typedef struct RI_ConstraintInfo
int16 confdelsetcols[RI_MAX_NUMKEYS]; /* attnums of cols to set on
* delete */
char confmatchtype; /* foreign key's match type */
+ bool hasperiod; /* if the foreign key uses PERIOD */
int nkeys; /* number of key columns */
int16 pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */
int16 fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */
Oid pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */
Oid pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */
Oid ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */
+ Oid period_contained_by_oper; /* anyrange <@ anyrange */
+ Oid agged_period_contained_by_oper; /* fkattr <@ range_agg(pkattr) */
dlist_node valid_link; /* Link in list of valid entries */
} RI_ConstraintInfo;
@@ -199,8 +207,8 @@ static void ri_BuildQueryKey(RI_QueryKey *key,
int32 constr_queryno);
static bool ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
const RI_ConstraintInfo *riinfo, bool rel_is_pk);
-static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
- Datum oldvalue, Datum newvalue);
+static bool ri_CompareWithCast(Oid eq_opr, Oid typeid,
+ Datum lhs, Datum rhs);
static void ri_InitHashTables(void);
static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
@@ -361,14 +369,41 @@ RI_FKey_check(TriggerData *trigdata)
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding FK attributes.
+ *
+ * But for temporal FKs we need to make sure
+ * the FK's range is completely covered.
+ * So we use this query instead:
+ * SELECT 1
+ * FROM (
+ * SELECT pkperiodatt AS r
+ * FROM [ONLY] pktable x
+ * WHERE pkatt1 = $1 [AND ...]
+ * AND pkperiodatt && $n
+ * FOR KEY SHARE OF x
+ * ) x1
+ * HAVING $n <@ range_agg(x1.r)
+ * Note if FOR KEY SHARE ever allows GROUP BY and HAVING
+ * we can make this a bit simpler.
* ----------
*/
initStringInfo(&querybuf);
pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
- pk_only, pkrelname);
+ if (riinfo->hasperiod)
+ {
+ quoteOneName(attname,
+ RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1]));
+
+ appendStringInfo(&querybuf,
+ "SELECT 1 FROM (SELECT %s AS r FROM %s%s x",
+ attname, pk_only, pkrelname);
+ }
+ else
+ {
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
+ }
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -386,6 +421,18 @@ RI_FKey_check(TriggerData *trigdata)
queryoids[i] = fk_type;
}
appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+ if (riinfo->hasperiod)
+ {
+ Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
+
+ appendStringInfo(&querybuf, ") x1 HAVING ");
+ sprintf(paramname, "$%d", riinfo->nkeys);
+ ri_GenerateQual(&querybuf, "",
+ paramname, fk_type,
+ riinfo->agged_period_contained_by_oper,
+ "pg_catalog.range_agg", ANYMULTIRANGEOID);
+ appendStringInfo(&querybuf, "(x1.r)");
+ }
/* Prepare and save the plan */
qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
@@ -492,14 +539,40 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* PK attributes themselves.
+ *
+ * But for temporal FKs we need to make sure
+ * the old PK's range is completely covered.
+ * So we use this query instead:
+ * SELECT 1
+ * FROM (
+ * SELECT pkperiodatt AS r
+ * FROM [ONLY] pktable x
+ * WHERE pkatt1 = $1 [AND ...]
+ * AND pkperiodatt && $n
+ * FOR KEY SHARE OF x
+ * ) x1
+ * HAVING $n <@ range_agg(x1.r)
+ * Note if FOR KEY SHARE ever allows GROUP BY and HAVING
+ * we can make this a bit simpler.
* ----------
*/
initStringInfo(&querybuf);
pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
- pk_only, pkrelname);
+ if (riinfo->hasperiod)
+ {
+ quoteOneName(attname, RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1]));
+
+ appendStringInfo(&querybuf,
+ "SELECT 1 FROM (SELECT %s AS r FROM %s%s x",
+ attname, pk_only, pkrelname);
+ }
+ else
+ {
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
+ }
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -516,6 +589,18 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
queryoids[i] = pk_type;
}
appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+ if (riinfo->hasperiod)
+ {
+ Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
+
+ appendStringInfo(&querybuf, ") x1 HAVING ");
+ sprintf(paramname, "$%d", riinfo->nkeys);
+ ri_GenerateQual(&querybuf, "",
+ paramname, fk_type,
+ riinfo->agged_period_contained_by_oper,
+ "pg_catalog.range_agg", ANYMULTIRANGEOID);
+ appendStringInfo(&querybuf, "(x1.r)");
+ }
/* Prepare and save the plan */
qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
@@ -2154,6 +2239,7 @@ ri_LoadConstraintInfo(Oid constraintOid)
riinfo->confupdtype = conForm->confupdtype;
riinfo->confdeltype = conForm->confdeltype;
riinfo->confmatchtype = conForm->confmatchtype;
+ riinfo->hasperiod = conForm->conperiod;
DeconstructFkConstraintRow(tup,
&riinfo->nkeys,
@@ -2165,6 +2251,20 @@ ri_LoadConstraintInfo(Oid constraintOid)
&riinfo->ndelsetcols,
riinfo->confdelsetcols);
+ /*
+ * For temporal FKs, get the operators and functions we need. We ask the
+ * opclass of the PK element for these. This all gets cached (as does the
+ * generated plan), so there's no performance issue.
+ */
+ if (riinfo->hasperiod)
+ {
+ Oid opclass = get_index_column_opclass(conForm->conindid, riinfo->nkeys);
+
+ FindFKPeriodOpers(opclass,
+ &riinfo->period_contained_by_oper,
+ &riinfo->agged_period_contained_by_oper);
+ }
+
ReleaseSysCache(tup);
/*
@@ -2776,7 +2876,10 @@ ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
/*
* ri_KeysEqual -
*
- * Check if all key values in OLD and NEW are equal.
+ * Check if all key values in OLD and NEW are "equivalent":
+ * For normal FKs we check for equality.
+ * For temporal FKs we check that the PK side is a superset of its old value,
+ * or the FK side is a subset of its old value.
*
* Note: at some point we might wish to redefine this as checking for
* "IS NOT DISTINCT" rather than "=", that is, allow two nulls to be
@@ -2832,13 +2935,25 @@ ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
}
else
{
+ Oid eq_opr;
+
+ /*
+ * When comparing the PERIOD columns we can skip the check
+ * whenever the referencing column stayed equal or shrank, so test
+ * with the contained-by operator instead.
+ */
+ if (riinfo->hasperiod && i == riinfo->nkeys - 1)
+ eq_opr = riinfo->period_contained_by_oper;
+ else
+ eq_opr = riinfo->ff_eq_oprs[i];
+
/*
* For the FK table, compare with the appropriate equality
* operator. Changes that compare equal will still satisfy the
* constraint after the update.
*/
- if (!ri_AttributesEqual(riinfo->ff_eq_oprs[i], RIAttType(rel, attnums[i]),
- oldvalue, newvalue))
+ if (!ri_CompareWithCast(eq_opr, RIAttType(rel, attnums[i]),
+ newvalue, oldvalue))
return false;
}
}
@@ -2848,29 +2963,31 @@ ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
/*
- * ri_AttributesEqual -
+ * ri_CompareWithCast -
*
- * Call the appropriate equality comparison operator for two values.
+ * Call the appropriate comparison operator for two values.
+ * Normally this is equality, but for the PERIOD part of foreign keys
+ * it is ContainedBy, so the order of lhs vs rhs is significant.
*
* NB: we have already checked that neither value is null.
*/
static bool
-ri_AttributesEqual(Oid eq_opr, Oid typeid,
- Datum oldvalue, Datum newvalue)
+ri_CompareWithCast(Oid eq_opr, Oid typeid,
+ Datum lhs, Datum rhs)
{
RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
/* Do we need to cast the values? */
if (OidIsValid(entry->cast_func_finfo.fn_oid))
{
- oldvalue = FunctionCall3(&entry->cast_func_finfo,
- oldvalue,
- Int32GetDatum(-1), /* typmod */
- BoolGetDatum(false)); /* implicit coercion */
- newvalue = FunctionCall3(&entry->cast_func_finfo,
- newvalue,
- Int32GetDatum(-1), /* typmod */
- BoolGetDatum(false)); /* implicit coercion */
+ lhs = FunctionCall3(&entry->cast_func_finfo,
+ lhs,
+ Int32GetDatum(-1), /* typmod */
+ BoolGetDatum(false)); /* implicit coercion */
+ rhs = FunctionCall3(&entry->cast_func_finfo,
+ rhs,
+ Int32GetDatum(-1), /* typmod */
+ BoolGetDatum(false)); /* implicit coercion */
}
/*
@@ -2884,10 +3001,16 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
* open), we'll just use the default collation here, which could lead to
* some false negatives. All this would break if we ever allow
* database-wide collations to be nondeterministic.
+ *
+ * With range/multirangetypes, the collation of the base type is stored as
+ * part of the rangetype (pg_range.rngcollation), and always used, so
+ * there is no danger of inconsistency even using a non-equals operator.
+ * But if we support arbitrary types with PERIOD, we should perhaps just
+ * always force a re-check.
*/
return DatumGetBool(FunctionCall2Coll(&entry->eq_opr_finfo,
DEFAULT_COLLATION_OID,
- oldvalue, newvalue));
+ lhs, rhs));
}
/*
@@ -2942,7 +3065,7 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
* the cast function to get to the operator's input type.
*
* XXX eventually it would be good to support array-coercion cases
- * here and in ri_AttributesEqual(). At the moment there is no point
+ * here and in ri_CompareWithCast(). At the moment there is no point
* because cases involving nonidentical array types will be rejected
* at constraint creation time.
*
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index c05d41ce023..2177d17e278 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -352,7 +352,7 @@ static char *pg_get_viewdef_worker(Oid viewoid,
int prettyFlags, int wrapColumn);
static char *pg_get_triggerdef_worker(Oid trigid, bool pretty);
static int decompile_column_index_array(Datum column_index_array, Oid relId,
- StringInfo buf);
+ bool withPeriod, StringInfo buf);
static char *pg_get_ruledef_worker(Oid ruleoid, int prettyFlags);
static char *pg_get_indexdef_worker(Oid indexrelid, int colno,
const Oid *excludeOps,
@@ -2270,7 +2270,8 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
val = SysCacheGetAttrNotNull(CONSTROID, tup,
Anum_pg_constraint_conkey);
- decompile_column_index_array(val, conForm->conrelid, &buf);
+ /* If it is a temporal foreign key then it uses PERIOD. */
+ decompile_column_index_array(val, conForm->conrelid, conForm->conperiod, &buf);
/* add foreign relation name */
appendStringInfo(&buf, ") REFERENCES %s(",
@@ -2281,7 +2282,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
val = SysCacheGetAttrNotNull(CONSTROID, tup,
Anum_pg_constraint_confkey);
- decompile_column_index_array(val, conForm->confrelid, &buf);
+ decompile_column_index_array(val, conForm->confrelid, conForm->conperiod, &buf);
appendStringInfoChar(&buf, ')');
@@ -2367,7 +2368,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
if (!isnull)
{
appendStringInfoString(&buf, " (");
- decompile_column_index_array(val, conForm->conrelid, &buf);
+ decompile_column_index_array(val, conForm->conrelid, false, &buf);
appendStringInfoChar(&buf, ')');
}
@@ -2402,7 +2403,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
val = SysCacheGetAttrNotNull(CONSTROID, tup,
Anum_pg_constraint_conkey);
- keyatts = decompile_column_index_array(val, conForm->conrelid, &buf);
+ keyatts = decompile_column_index_array(val, conForm->conrelid, false, &buf);
if (conForm->conperiod)
appendStringInfoString(&buf, " WITHOUT OVERLAPS");
@@ -2586,7 +2587,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
*/
static int
decompile_column_index_array(Datum column_index_array, Oid relId,
- StringInfo buf)
+ bool withPeriod, StringInfo buf)
{
Datum *keys;
int nKeys;
@@ -2605,7 +2606,9 @@ decompile_column_index_array(Datum column_index_array, Oid relId,
if (j == 0)
appendStringInfoString(buf, quote_identifier(colName));
else
- appendStringInfo(buf, ", %s", quote_identifier(colName));
+ appendStringInfo(buf, ", %s%s",
+ (withPeriod && j == nKeys - 1) ? "PERIOD " : "",
+ quote_identifier(colName));
}
return nKeys;