Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1161aab

Browse files
knizhnikkelvich
authored andcommitted
Support read-committed isolation level
1 parent c1785c0 commit 1161aab

File tree

5 files changed

+49
-5
lines changed

5 files changed

+49
-5
lines changed

multimaster.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,22 @@ csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_
504504
return snapshot;
505505
}
506506

507+
void MtmSetSnapshot(csn_t globalSnapshot)
508+
{
509+
MtmLock(LW_EXCLUSIVE);
510+
MtmSyncClock(globalSnapshot);
511+
MtmTx.snapshot = globalSnapshot;
512+
MtmUnlock();
513+
}
507514

515+
508516
Snapshot MtmGetSnapshot(Snapshot snapshot)
509517
{
510518
snapshot = PgGetSnapshotData(snapshot);
519+
if (XactIsoLevel == XACT_READ_COMMITTED && MtmTx.snapshot != INVALID_CSN && TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
520+
MtmTx.snapshot = MtmGetCurrentTime();
521+
LogLogicalMessage("S", (char*)&MtmTx.snapshot, sizeof(MtmTx.snapshot), true);
522+
}
511523
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;
512524
return snapshot;
513525
}
@@ -540,7 +552,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
540552

541553
Assert(xid != InvalidTransactionId);
542554

543-
if (!MtmUseDtm) {
555+
if (!MtmUseDtm || TransactionIdPrecedes(xid, Mtm->oldestXid)) {
544556
return PgXidInMVCCSnapshot(xid, snapshot);
545557
}
546558
MtmLock(LW_SHARED);
@@ -681,6 +693,10 @@ MtmAdjustOldestXid(TransactionId xid)
681693
hash_search(MtmGid2State, &prev->gid, HASH_REMOVE, NULL);
682694
}
683695
}
696+
if (ts != NULL) {
697+
MTM_LOG2("Adjust(%lld) stop at snashot %lld, xid %lld, pinned=%d, oldestSnaphsot=%lld\n",
698+
(long64)xid, ts->csn, (long64)ts->xid, ts->isPinned, oldestSnapshot);
699+
}
684700
}
685701

686702
if (MtmUseDtm && !MtmVolksWagenMode)
@@ -2827,11 +2843,13 @@ static bool ConfigIsSane(void)
28272843
{
28282844
bool ok = true;
28292845

2846+
#if 0
28302847
if (DefaultXactIsoLevel != XACT_REPEATABLE_READ)
28312848
{
28322849
MTM_ELOG(WARNING, "multimaster requires default_transaction_isolation = 'repeatable read'");
28332850
ok = false;
28342851
}
2852+
#endif
28352853

28362854
if (MtmMaxNodes < 1)
28372855
{
@@ -5123,11 +5141,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51235141
standard_ProcessUtility(parsetree, queryString, context,
51245142
params, dest, completionTag);
51255143
}
5126-
5144+
#if 0
51275145
if (!MtmVolksWagenMode && MtmTx.isDistributed && XactIsoLevel != XACT_REPEATABLE_READ) {
51285146
MTM_ELOG(ERROR, "Isolation level %s is not supported by multimaster", isoLevelStr[XactIsoLevel]);
51295147
}
5130-
5148+
#endif
51315149
if (MyXactAccessedTempRel)
51325150
{
51335151
MTM_LOG1("Xact accessed temp table, stopping replication");

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,5 +424,6 @@ extern bool MtmTransIsActive(void);
424424
extern MtmTransState* MtmGetActiveTransaction(MtmL2List* list);
425425
extern void MtmReleaseLocks(void);
426426
extern void MtmInitMessage(MtmArbiterMessage* msg, MtmMessageCode code);
427+
extern void MtmSetSnapshot(csn_t snapshot);
427428

428429
#endif

pglogical_apply.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,15 @@ process_remote_message(StringInfo s)
483483
standalone = true;
484484
break;
485485
}
486-
487-
}
486+
case 'S':
487+
{
488+
Assert(messageSize == sizeof(csn_t));
489+
MtmSetSnapshot(*(csn_t*)messageBody);
490+
break;
491+
}
492+
default:
493+
Assert(false);
494+
}
488495
return standalone;
489496
}
490497

pglogical_proto.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ pglogical_write_message(StringInfo out,
180180
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
181181
}
182182
break;
183+
case 'S':
184+
if (MtmIsFilteredTxn) {
185+
return;
186+
}
187+
break;
183188
case 'D':
184189
if (MtmIsFilteredTxn) {
185190
MTM_LOG2("%d: pglogical_write_message filtered", MyProcPid);

tests/dtmbench.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct config
6969
int updatePercent;
7070
vector<string> connections;
7171
bool scatter;
72+
bool avoidDeadlocks;
7273

7374
config() {
7475
nReaders = 1;
@@ -77,6 +78,7 @@ struct config
7778
nAccounts = 100000;
7879
updatePercent = 100;
7980
scatter = false;
81+
avoidDeadlocks = false;
8082
}
8183
};
8284

@@ -157,6 +159,12 @@ void* writer(void* arg)
157159
if (cfg.scatter) {
158160
srcAcc = srcAcc/cfg.nWriters*cfg.nWriters + t.id;
159161
dstAcc = dstAcc/cfg.nWriters*cfg.nWriters + t.id;
162+
} else if (cfg.avoidDeadlocks) {
163+
if (dstAcc < srcAcc) {
164+
int tmp = srcAcc;
165+
srcAcc = dstAcc;
166+
dstAcc = tmp;
167+
}
160168
}
161169
try {
162170
if (random() % 100 < cfg.updatePercent) {
@@ -240,6 +248,9 @@ int main (int argc, char* argv[])
240248
case 'i':
241249
initialize = true;
242250
continue;
251+
case 'd':
252+
cfg.avoidDeadlocks = true;
253+
continue;
243254
}
244255
}
245256
printf("Options:\n"
@@ -249,6 +260,8 @@ int main (int argc, char* argv[])
249260
"\t-n N\tnumber of iterations (1000)\n"
250261
"\t-p N\tupdate percent (100)\n"
251262
"\t-c STR\tdatabase connection string\n"
263+
"\t-s\tscattern avoid deadlocks\n"
264+
"\t-d\tavoid deadlocks\n"
252265
"\t-i\tinitialize database\n");
253266
return 1;
254267
}

0 commit comments

Comments
 (0)