Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8abb06b

Browse files
knizhnikkelvich
authored andcommitted
Support update of tables without primary keys in multimaster
1 parent 22cb257 commit 8abb06b

File tree

2 files changed

+169
-70
lines changed

2 files changed

+169
-70
lines changed

multimaster.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5454,11 +5454,9 @@ static void MtmProcessUtility(PlannedStmt *pstmt,
54545454
if (relid != InvalidOid) {
54555455
Oid constraint_oid;
54565456
Bitmapset* pk = get_primary_key_attnos(relid, true, &constraint_oid);
5457-
if (pk == NULL && !MtmVolksWagenMode) {
5457+
if (pk == NULL && !MtmVolksWagenMode && MtmIgnoreTablesWithoutPk) {
54585458
elog(WARNING,
5459-
MtmIgnoreTablesWithoutPk
5460-
? "Table %s.%s without primary will not be replicated"
5461-
: "Updates and deletes of table %s.%s without primary will not be replicated",
5459+
"Table %s.%s without primary will not be replicated",
54625460
create->relation->schemaname ? create->relation->schemaname : "public",
54635461
create->relation->relname);
54645462
}

pglogical_apply.c

Lines changed: 167 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "tcop/utility.h"
4646

4747
#include "utils/array.h"
48+
#include "utils/datum.h"
4849
#include "utils/tqual.h"
4950
#include "utils/builtins.h"
5051
#include "utils/datetime.h"
@@ -72,6 +73,7 @@ static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
7273
static EState* create_rel_estate(Relation rel);
7374
static bool find_pkey_tuple(ScanKey skey, Relation rel, Relation idxrel,
7475
TupleTableSlot *slot, bool lock, LockTupleMode mode);
76+
static bool find_heap_tuple(TupleData *tup, Relation rel, TupleTableSlot *slot, bool lock);
7577
static void build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleData *tup);
7678
static bool build_index_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleData *tup);
7779
static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
@@ -86,6 +88,102 @@ static void process_remote_delete(StringInfo s, Relation rel);
8688

8789
static bool GucAltered; /* transaction is setting some GUC variables */
8890

91+
bool find_heap_tuple(TupleData *tup, Relation rel, TupleTableSlot *slot, bool lock)
92+
{
93+
HeapTuple tuple;
94+
HeapScanDesc scan;
95+
SnapshotData snap;
96+
TransactionId xwait;
97+
TupleDesc tupDesc = RelationGetDescr(rel);
98+
int natts = tupDesc->natts;
99+
Datum* values = (Datum*)palloc(natts*sizeof(Datum));
100+
bool* nulls = (bool*)palloc(natts*sizeof(bool));
101+
bool found = false;
102+
int i;
103+
104+
InitDirtySnapshot(snap);
105+
scan = heap_beginscan(rel,
106+
&snap,
107+
0,
108+
NULL);
109+
retry:
110+
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
111+
{
112+
CHECK_FOR_INTERRUPTS();
113+
heap_deform_tuple(tuple, tupDesc, values, nulls);
114+
115+
for (i = 0; i < natts; i++)
116+
{
117+
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
118+
if (nulls[i] && tup->isnull[i]) /* both nulls */
119+
continue;
120+
else if (nulls[i] ^ tup->isnull[i]) /* one is null and one is not null */
121+
break;
122+
else if (!datumIsEqual(tup->values[i], values[i], att->attbyval, att->attlen))
123+
break;
124+
}
125+
if (i == natts)
126+
{
127+
/* FIXME: Improve TupleSlot to not require copying the whole tuple */
128+
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
129+
ExecMaterializeSlot(slot);
130+
131+
xwait = TransactionIdIsValid(snap.xmin) ?
132+
snap.xmin : snap.xmax;
133+
134+
if (TransactionIdIsValid(xwait))
135+
{
136+
XactLockTableWait(xwait, NULL, NULL, XLTW_None);
137+
heap_rescan(scan, NULL);
138+
goto retry;
139+
}
140+
found = true;
141+
142+
if (lock)
143+
{
144+
Buffer buf;
145+
HeapUpdateFailureData hufd;
146+
HTSU_Result res;
147+
HeapTupleData locktup;
148+
149+
ItemPointerCopy(&slot->tts_tuple->t_self, &locktup.t_self);
150+
151+
PushActiveSnapshot(GetLatestSnapshot());
152+
153+
res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false), LockTupleExclusive,
154+
false /* wait */,
155+
false /* don't follow updates */,
156+
&buf, &hufd);
157+
/* the tuple slot already has the buffer pinned */
158+
ReleaseBuffer(buf);
159+
160+
PopActiveSnapshot();
161+
162+
switch (res)
163+
{
164+
case HeapTupleMayBeUpdated:
165+
break;
166+
case HeapTupleUpdated:
167+
/* XXX: Improve handling here */
168+
ereport(LOG,
169+
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
170+
MTM_ERRMSG("concurrent update, retrying")));
171+
goto retry;
172+
default:
173+
MTM_ELOG(ERROR, "unexpected HTSU_Result after locking: %u", res);
174+
break;
175+
}
176+
}
177+
break;
178+
}
179+
}
180+
heap_endscan(scan);
181+
pfree(values);
182+
pfree(nulls);
183+
184+
return found;
185+
}
186+
89187
/*
90188
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
91189
*
@@ -879,10 +977,10 @@ process_remote_insert(StringInfo s, Relation rel)
879977
{
880978
MtmMakeTableLocal((char*)DatumGetPointer(new_tuple.values[0]), (char*)DatumGetPointer(new_tuple.values[1]));
881979
}
882-
980+
883981
ExecResetTupleTable(estate->es_tupleTable, true);
884982
FreeExecutorState(estate);
885-
983+
886984
CommandCounterIncrement();
887985
}
888986

@@ -893,14 +991,15 @@ process_remote_update(StringInfo s, Relation rel)
893991
EState *estate;
894992
TupleTableSlot *newslot;
895993
TupleTableSlot *oldslot;
896-
bool pkey_sent;
897-
bool found_tuple;
898-
TupleData old_tuple;
899-
TupleData new_tuple;
900-
Oid idxoid;
901-
Relation idxrel;
902-
ScanKeyData skey[INDEX_MAX_KEYS];
903-
HeapTuple remote_tuple = NULL;
994+
bool pkey_sent;
995+
bool found_tuple;
996+
TupleData old_tuple;
997+
TupleData new_tuple;
998+
Oid idxoid;
999+
Relation idxrel = InvalidOid;
1000+
TupleDesc tupDesc = RelationGetDescr(rel);
1001+
ScanKeyData skey[INDEX_MAX_KEYS];
1002+
HeapTuple remote_tuple = NULL;
9041003

9051004
action = pq_getmsgbyte(s);
9061005

@@ -911,9 +1010,9 @@ process_remote_update(StringInfo s, Relation rel)
9111010

9121011
estate = create_rel_estate(rel);
9131012
oldslot = ExecInitExtraTupleSlot(estate);
914-
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
1013+
ExecSetSlotDescriptor(oldslot, tupDesc);
9151014
newslot = ExecInitExtraTupleSlot(estate);
916-
ExecSetSlotDescriptor(newslot, RelationGetDescr(rel));
1015+
ExecSetSlotDescriptor(newslot, tupDesc);
9171016

9181017
if (action == 'K')
9191018
{
@@ -940,32 +1039,33 @@ process_remote_update(StringInfo s, Relation rel)
9401039
if (rel->rd_indexvalid == 0)
9411040
RelationGetIndexList(rel);
9421041
idxoid = rel->rd_replidindex;
1042+
1043+
PushActiveSnapshot(GetTransactionSnapshot());
1044+
9431045
if (!OidIsValid(idxoid))
9441046
{
945-
MTM_ELOG(ERROR, "could not find primary key for table with oid %u",
946-
RelationGetRelid(rel));
947-
return;
1047+
found_tuple = find_heap_tuple(pkey_sent ? &old_tuple : &new_tuple, rel, oldslot, true);
9481048
}
1049+
else
1050+
{
1051+
/* open index, so we can build scan key for row */
1052+
idxrel = index_open(idxoid, RowExclusiveLock);
9491053

950-
/* open index, so we can build scan key for row */
951-
idxrel = index_open(idxoid, RowExclusiveLock);
952-
953-
Assert(idxrel->rd_index->indisunique);
954-
955-
/* Use columns from the new tuple if the key didn't change. */
956-
build_index_scan_key(skey, rel, idxrel,
957-
pkey_sent ? &old_tuple : &new_tuple);
1054+
Assert(idxrel->rd_index->indisunique);
9581055

959-
PushActiveSnapshot(GetTransactionSnapshot());
1056+
/* Use columns from the new tuple if the key didn't change. */
1057+
build_index_scan_key(skey, rel, idxrel,
1058+
pkey_sent ? &old_tuple : &new_tuple);
9601059

961-
/* look for tuple identified by the (old) primary key */
962-
found_tuple = find_pkey_tuple(skey, rel, idxrel, oldslot, true,
963-
pkey_sent ? LockTupleExclusive : LockTupleNoKeyExclusive);
1060+
/* look for tuple identified by the (old) primary key */
1061+
found_tuple = find_pkey_tuple(skey, rel, idxrel, oldslot, true,
1062+
pkey_sent ? LockTupleExclusive : LockTupleNoKeyExclusive);
9641063

1064+
}
9651065
if (found_tuple)
9661066
{
9671067
remote_tuple = heap_modify_tuple(oldslot->tts_tuple,
968-
RelationGetDescr(rel),
1068+
tupDesc,
9691069
new_tuple.values,
9701070
new_tuple.isnull,
9711071
new_tuple.changed);
@@ -976,31 +1076,30 @@ process_remote_update(StringInfo s, Relation rel)
9761076
{
9771077
StringInfoData o;
9781078
initStringInfo(&o);
979-
tuple_to_stringinfo(&o, RelationGetDescr(rel), oldslot->tts_tuple, false);
1079+
tuple_to_stringinfo(&o,tupDesc, oldslot->tts_tuple, false);
9801080
appendStringInfo(&o, " to");
981-
tuple_to_stringinfo(&o, RelationGetDescr(rel), remote_tuple, false);
1081+
tuple_to_stringinfo(&o,tupDesc, remote_tuple, false);
9821082
MTM_LOG1("%lu: UPDATE: %s", GetCurrentTransactionId(), o.data);
9831083
resetStringInfo(&o);
9841084
}
9851085
#endif
9861086

987-
simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
988-
UserTableUpdateIndexes(estate, newslot);
1087+
simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
1088+
UserTableUpdateIndexes(estate, newslot);
9891089
}
9901090
else
9911091
{
992-
ereport(ERROR,
993-
(errcode(ERRCODE_NO_DATA_FOUND),
994-
MTM_ERRMSG("Record with specified key can not be located at this node"),
995-
errdetail("Most likely we have DELETE-UPDATE conflict")));
996-
1092+
ereport(ERROR,
1093+
(errcode(ERRCODE_NO_DATA_FOUND),
1094+
MTM_ERRMSG("Record with specified key can not be located at this node"),
1095+
errdetail("Most likely we have DELETE-UPDATE conflict")));
9971096
}
998-
1097+
1098+
/* release locks upon commit */
1099+
if (OidIsValid(idxrel))
1100+
index_close(idxrel, NoLock);
1101+
9991102
PopActiveSnapshot();
1000-
1001-
/* release locks upon commit */
1002-
index_close(idxrel, NoLock);
1003-
10041103
ExecResetTupleTable(estate->es_tupleTable, true);
10051104
FreeExecutorState(estate);
10061105

@@ -1015,50 +1114,51 @@ process_remote_delete(StringInfo s, Relation rel)
10151114
TupleTableSlot *oldslot;
10161115
Oid idxoid;
10171116
Relation idxrel;
1117+
TupleDesc tupDesc = RelationGetDescr(rel);
10181118
ScanKeyData skey[INDEX_MAX_KEYS];
10191119
bool found_old;
10201120

10211121
estate = create_rel_estate(rel);
10221122
oldslot = ExecInitExtraTupleSlot(estate);
1023-
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
1123+
ExecSetSlotDescriptor(oldslot, tupDesc);
10241124

10251125
read_tuple_parts(s, rel, &oldtup);
10261126

10271127
/* lookup index to build scankey */
10281128
if (rel->rd_indexvalid == 0)
10291129
RelationGetIndexList(rel);
10301130
idxoid = rel->rd_replidindex;
1131+
1132+
PushActiveSnapshot(GetTransactionSnapshot());
1133+
10311134
if (!OidIsValid(idxoid))
10321135
{
1033-
MTM_ELOG(ERROR, "could not find primary key for table with oid %u",
1034-
RelationGetRelid(rel));
1035-
return;
1136+
found_old = find_heap_tuple(&oldtup, rel, oldslot, true);
10361137
}
1138+
else
1139+
{
1140+
/* Now open the primary key index */
1141+
idxrel = index_open(idxoid, RowExclusiveLock);
10371142

1038-
/* Now open the primary key index */
1039-
idxrel = index_open(idxoid, RowExclusiveLock);
1040-
1041-
if (rel->rd_rel->relkind != RELKIND_RELATION)
1042-
MTM_ELOG(ERROR, "unexpected relkind '%c' rel \"%s\"",
1043-
rel->rd_rel->relkind, RelationGetRelationName(rel));
1143+
if (rel->rd_rel->relkind != RELKIND_RELATION)
1144+
MTM_ELOG(ERROR, "unexpected relkind '%c' rel \"%s\"",
1145+
rel->rd_rel->relkind, RelationGetRelationName(rel));
10441146

10451147
#ifdef VERBOSE_DELETE
1046-
{
1047-
HeapTuple tup;
1048-
tup = heap_form_tuple(RelationGetDescr(rel),
1049-
oldtup.values, oldtup.isnull);
1050-
ExecStoreTuple(tup, oldslot, InvalidBuffer, true);
1051-
}
1052-
log_tuple("DELETE old-key:%s", RelationGetDescr(rel), oldslot->tts_tuple);
1148+
{
1149+
HeapTuple tup;
1150+
tup = heap_form_tuple(tupDesc,
1151+
oldtup.values, oldtup.isnull);
1152+
ExecStoreTuple(tup, oldslot, InvalidBuffer, true);
1153+
}
1154+
log_tuple("DELETE old-key:%s", tupDesc, oldslot->tts_tuple);
10531155
#endif
10541156

1055-
PushActiveSnapshot(GetTransactionSnapshot());
1056-
1057-
build_index_scan_key(skey, rel, idxrel, &oldtup);
1058-
1059-
/* try to find tuple via a (candidate|primary) key */
1060-
found_old = find_pkey_tuple(skey, rel, idxrel, oldslot, true, LockTupleExclusive);
1157+
build_index_scan_key(skey, rel, idxrel, &oldtup);
10611158

1159+
/* try to find tuple via a (candidate|primary) key */
1160+
found_old = find_pkey_tuple(skey, rel, idxrel, oldslot, true, LockTupleExclusive);
1161+
}
10621162
if (found_old)
10631163
{
10641164
simple_heap_delete(rel, &oldslot->tts_tuple->t_self);
@@ -1073,7 +1173,8 @@ process_remote_delete(StringInfo s, Relation rel)
10731173

10741174
PopActiveSnapshot();
10751175

1076-
index_close(idxrel, NoLock);
1176+
if (OidIsValid(idxoid))
1177+
index_close(idxrel, NoLock);
10771178

10781179
ExecResetTupleTable(estate->es_tupleTable, true);
10791180
FreeExecutorState(estate);

0 commit comments

Comments
 (0)