Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a79da22

Browse files
committed
check references before partition DROP
1 parent 75fdbdf commit a79da22

File tree

2 files changed

+128
-22
lines changed

2 files changed

+128
-22
lines changed

src/include/ref_integrity.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ void createPartitionForeignKeyTriggers(Oid partition,
1313
HeapTuple get_index_for_key(Relation rel, AttrNumber attnum, Oid *index_id);
1414
List *get_ri_triggers_list(Oid relid, Oid constr);
1515
void ri_removePartitionDependencies(Oid parent, Relation partition);
16+
void ri_checkReferences(Relation partition, Oid constraintOid);

src/ref_integrity.c

Lines changed: 127 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,10 @@ static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
158158
static CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
159159

160160
static void quoteOneName(char *buffer, const char *name);
161-
static void quoteRelationName(char *buffer, Relation rel);
161+
static void quoteRelationName(char *buffer, Oid relid);
162162
static void ri_ReportViolation(const ConstraintInfo *riinfo,
163-
Relation rel,
164-
Relation refRel,
163+
Oid rel,
164+
Oid refRel,
165165
Datum key,
166166
Oid keytype,
167167
bool onfk);
@@ -420,8 +420,8 @@ ri_fkey_check(TriggerData *trigdata)
420420
/* Is there partition for the key? */
421421
partid = find_partition_for_value(value, fk_type, prel);
422422
if (partid == InvalidOid)
423-
/* TODO: Write a decent error message as in ri_triggers.c */
424-
elog(ERROR, "can't find suitable partition for foreign key");
423+
ri_ReportViolation(riinfo, fk_rel->rd_id, pk_rel->rd_id,
424+
value, fk_type, true);
425425

426426
part_rel = heap_open(partid, RowShareLock);
427427

@@ -445,7 +445,7 @@ ri_fkey_check(TriggerData *trigdata)
445445
* ----------
446446
*/
447447
initStringInfo(&querybuf);
448-
quoteRelationName(pkrelname, part_rel);
448+
quoteRelationName(pkrelname, part_rel->rd_id);
449449
appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
450450

451451
quoteOneName(attname, RIAttName(pk_rel, riinfo->pk_attnum));
@@ -538,7 +538,7 @@ ri_restrict(TriggerData *trigdata, bool is_upd)
538538
* ----------
539539
*/
540540
initStringInfo(&querybuf);
541-
quoteRelationName(fkrelname, fk_rel);
541+
quoteRelationName(fkrelname, fk_rel->rd_id);
542542
appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", fkrelname);
543543

544544
quoteOneName(attname, RIAttName(fk_rel, riinfo->fk_attnum));
@@ -882,10 +882,6 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
882882
/* Do some easy cross-checks against the trigger call data */
883883
if (rel_is_pk)
884884
{
885-
// if (riinfo->fk_relid != trigger->tgconstrrelid ||
886-
// riinfo->pk_relid != RelationGetRelid(trig_rel))
887-
// elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
888-
// trigger->tgname, RelationGetRelationName(trig_rel));
889885
/* XXX Probably we should check that trig_rel is a partition */
890886
if (riinfo->fk_relid != trigger->tgconstrrelid)
891887
elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
@@ -1175,7 +1171,7 @@ ri_PerformCheck(const ConstraintInfo *riinfo,
11751171

11761172
/* XXX wouldn't it be clearer to do this part at the caller? */
11771173
if ((SPI_processed == 0) != source_is_pk)
1178-
ri_ReportViolation(riinfo, source_rel, query_rel,
1174+
ri_ReportViolation(riinfo, source_rel->rd_id, query_rel->rd_id,
11791175
key[0], keytype, !source_is_pk);
11801176

11811177
return SPI_processed != 0;
@@ -1208,18 +1204,18 @@ quoteOneName(char *buffer, const char *name)
12081204
* buffer must be MAX_QUOTED_REL_NAME_LEN long (includes room for \0)
12091205
*/
12101206
static void
1211-
quoteRelationName(char *buffer, Relation rel)
1207+
quoteRelationName(char *buffer, Oid relid)
12121208
{
1213-
quoteOneName(buffer, get_namespace_name(RelationGetNamespace(rel)));
1209+
quoteOneName(buffer, get_namespace_name(get_rel_namespace(relid)));
12141210
buffer += strlen(buffer);
12151211
*buffer++ = '.';
1216-
quoteOneName(buffer, RelationGetRelationName(rel));
1212+
quoteOneName(buffer, get_rel_name(relid));
12171213
}
12181214

12191215
static void
12201216
ri_ReportViolation(const ConstraintInfo *riinfo,
1221-
Relation rel,
1222-
Relation refRel,
1217+
Oid rel,
1218+
Oid refRel,
12231219
Datum key,
12241220
Oid keytype,
12251221
bool onfk)
@@ -1228,23 +1224,23 @@ ri_ReportViolation(const ConstraintInfo *riinfo,
12281224
ereport(ERROR,
12291225
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
12301226
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
1231-
RelationGetRelationName(rel),
1227+
get_rel_name(rel),
12321228
NameStr(riinfo->conname)),
12331229
errdetail("Key (%s)=(%s) is not present in table \"%s\".",
12341230
get_attname(riinfo->pk_relid, riinfo->pk_attnum),
12351231
datum_to_cstring(key, keytype),
1236-
RelationGetRelationName(refRel))));
1232+
get_rel_name(refRel))));
12371233
else
12381234
ereport(ERROR,
12391235
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
12401236
errmsg("update or delete on table \"%s\" violates foreign key constraint \"%s\" on table \"%s\"",
1241-
RelationGetRelationName(rel),
1237+
get_rel_name(rel),
12421238
NameStr(riinfo->conname),
1243-
RelationGetRelationName(refRel)),
1239+
get_rel_name(refRel)),
12441240
errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
12451241
get_attname(riinfo->fk_relid, riinfo->fk_attnum),
12461242
datum_to_cstring(key, keytype),
1247-
RelationGetRelationName(refRel))));
1243+
get_rel_name(refRel))));
12481244
}
12491245

12501246
static void
@@ -1752,6 +1748,112 @@ validateForeignKeyConstraint(char *conname,
17521748
UnregisterSnapshot(snapshot);
17531749
}
17541750

1751+
/*
1752+
* On DROP partition we must ensure that no FK row references this partition
1753+
*/
1754+
void
1755+
ri_checkReferences(Relation partition, Oid constraintOid)
1756+
{
1757+
const ConstraintInfo *riinfo;
1758+
StringInfoData querybuf;
1759+
AttrNumber pkattnum;
1760+
char *attname;
1761+
char pkrelname[MAX_QUOTED_REL_NAME_LEN];
1762+
char fkrelname[MAX_QUOTED_REL_NAME_LEN];
1763+
char pkattname[MAX_QUOTED_NAME_LEN + 3];
1764+
char fkattname[MAX_QUOTED_NAME_LEN + 3];
1765+
Oid pkattype,
1766+
fkattype;
1767+
int spi_result;
1768+
SPIPlanPtr qplan;
1769+
1770+
Assert(OidIsValid(constraintOid));
1771+
riinfo = ri_LoadConstraintInfo(constraintOid);
1772+
1773+
/* Get partition attribute number */
1774+
pkattnum = get_attnum(partition->rd_id,
1775+
get_attname(riinfo->pk_relid, riinfo->pk_attnum));
1776+
1777+
quoteRelationName(pkrelname, partition->rd_id);
1778+
quoteRelationName(fkrelname, riinfo->fk_relid);
1779+
pkattype = get_atttype(partition->rd_id, pkattnum);
1780+
fkattype = get_atttype(riinfo->fk_relid, riinfo->fk_attnum);
1781+
1782+
strcpy(pkattname, "pk.");
1783+
strcpy(fkattname, "fk.");
1784+
quoteOneName(pkattname + 3,
1785+
get_attname(partition->rd_id, pkattnum));
1786+
quoteOneName(fkattname + 3,
1787+
get_attname(riinfo->fk_relid, riinfo->fk_attnum));
1788+
1789+
/*----------
1790+
* The query string built is:
1791+
* SELECT pk.keycol FROM ONLY partition pk
1792+
* INNER JOIN ONLY fkrelname fk
1793+
* ON pk.keycol = fk.keycol;
1794+
*----------
1795+
*/
1796+
initStringInfo(&querybuf);
1797+
appendStringInfo(&querybuf, "SELECT %s", pkattname);
1798+
appendStringInfo(&querybuf,
1799+
" FROM ONLY %s pk INNER JOIN ONLY %s fk ON ",
1800+
pkrelname, fkrelname);
1801+
ri_GenerateQual(&querybuf, "",
1802+
pkattname, pkattype,
1803+
riinfo->pf_eq_opr,
1804+
fkattname, fkattype);
1805+
1806+
if (SPI_connect() != SPI_OK_CONNECT)
1807+
elog(ERROR, "SPI_connect failed");
1808+
1809+
/*
1810+
* Generate the plan. We don't need to cache it, and there are no
1811+
* arguments to the plan.
1812+
*/
1813+
qplan = SPI_prepare(querybuf.data, 0, NULL);
1814+
1815+
if (qplan == NULL)
1816+
elog(ERROR, "SPI_prepare returned %d for %s",
1817+
SPI_result, querybuf.data);
1818+
1819+
/*
1820+
* Run the plan. For safety we force a current snapshot to be used. (In
1821+
* transaction-snapshot mode, this arguably violates transaction isolation
1822+
* rules, but we really haven't got much choice.) We don't need to
1823+
* register the snapshot, because SPI_execute_snapshot will see to it. We
1824+
* need at most one tuple returned, so pass limit = 1.
1825+
*/
1826+
spi_result = SPI_execute_snapshot(qplan,
1827+
NULL, NULL,
1828+
GetLatestSnapshot(),
1829+
InvalidSnapshot,
1830+
true, false, 1);
1831+
1832+
/* Check result */
1833+
if (spi_result != SPI_OK_SELECT)
1834+
elog(ERROR, "SPI_execute_snapshot returned %d", spi_result);
1835+
1836+
/* Did we find a tuple violating the constraint? */
1837+
if (SPI_processed > 0)
1838+
{
1839+
Datum value;
1840+
HeapTuple tuple = SPI_tuptable->vals[0];
1841+
TupleDesc tupdesc = SPI_tuptable->tupdesc;
1842+
bool isnull;
1843+
1844+
value = heap_getattr(tuple, 1, tupdesc, &isnull);
1845+
Assert(!isnull);
1846+
1847+
ri_ReportViolation(riinfo, partition->rd_id, riinfo->fk_relid,
1848+
value, pkattype, false);
1849+
}
1850+
1851+
if (SPI_finish() != SPI_OK_FINISH)
1852+
elog(ERROR, "SPI_finish failed");
1853+
1854+
elog(ERROR, "test");
1855+
}
1856+
17551857
/*
17561858
* Returns oids list of RI triggers for specified partition and FK constraint
17571859
*/
@@ -1827,6 +1929,9 @@ ri_removePartitionDependencies(Oid parent, Relation partition)
18271929
indexTuple = get_index_for_key(partition, attnum, &index);
18281930
if (HeapTupleIsValid(indexTuple))
18291931
{
1932+
/* Check if there are references in FK table */
1933+
ri_checkReferences(partition, constr);
1934+
18301935
deleteDependencyRecords(ConstraintRelationId, constr,
18311936
RelationRelationId, index);
18321937
// deleteDependencyRecordsRef(RelationRelationId, indexOid);

0 commit comments

Comments
 (0)