Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 59e22f8

Browse files
committed
Add 3PC support
1 parent add9eaf commit 59e22f8

File tree

17 files changed

+346
-95
lines changed

17 files changed

+346
-95
lines changed

contrib/mmts/arbiter.c

+9-2
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
536536
nodemask_t save_mask = busy_mask;
537537
BIT_SET(busy_mask, node);
538538
while (true) {
539+
#if 0
539540
if (sockets[node] >= 0 && BIT_CHECK(Mtm->reconnectMask, node)) {
540541
elog(WARNING, "Arbiter is forced to reconnect to node %d", node+1);
541542
close(sockets[node]);
542543
sockets[node] = -1;
543544
}
545+
#endif
544546
if (BIT_CHECK(Mtm->reconnectMask, node)) {
545547
MtmLock(LW_EXCLUSIVE);
546548
BIT_CLEAR(Mtm->reconnectMask, node);
@@ -872,7 +874,8 @@ static void MtmReceiver(Datum arg)
872874
}
873875

874876
rc = MtmReadFromNode(i, (char*)rxBuffer[i].data + rxBuffer[i].used, rxBuffer[i].size-rxBuffer[i].used);
875-
if (rc <= 0) {
877+
if (rc <= 0) {
878+
MTM_LOG1("Failed to read response from node %d", i+1);
876879
continue;
877880
}
878881

@@ -940,6 +943,8 @@ static void MtmReceiver(Datum arg)
940943
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
941944
elog(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
942945
MtmFinishPreparedTransaction(ts, true);
946+
} else {
947+
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx", msg->gid, msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
943948
}
944949
} else {
945950
elog(LOG, "Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
@@ -1009,7 +1014,8 @@ static void MtmReceiver(Datum arg)
10091014
} else if (MtmUseDtm) {
10101015
ts->votedMask = 0;
10111016
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
1012-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1017+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018+
SetPrepareTransactionState(ts->gid, "precommitted");
10131019
} else {
10141020
ts->status = TRANSACTION_STATUS_UNKNOWN;
10151021
MtmWakeUpBackend(ts);
@@ -1056,6 +1062,7 @@ static void MtmReceiver(Datum arg)
10561062
} else {
10571063
switch (msg->code) {
10581064
case MSG_PRECOMMIT:
1065+
Assert(false); // Now send through pglogical
10591066
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
10601067
ts->status = TRANSACTION_STATUS_UNKNOWN;
10611068
ts->csn = MtmAssignCSN();

contrib/mmts/bgwpool.c

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
bool MtmIsLogicalReceiver;
1818
int MtmMaxWorkers;
1919

20-
static BgwPool* pool;
20+
static BgwPool* MtmPool;
2121

2222
static void BgwShutdownWorker(int sig)
2323
{
24-
BgwPoolStop(pool);
24+
if (MtmPool) {
25+
BgwPoolStop(MtmPool);
26+
}
2527
}
2628

2729
static void BgwPoolMainLoop(BgwPool* pool)
@@ -32,6 +34,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3234
sigset_t sset;
3335

3436
MtmIsLogicalReceiver = true;
37+
MtmPool = pool;
3538

3639
signal(SIGINT, BgwShutdownWorker);
3740
signal(SIGQUIT, BgwShutdownWorker);
@@ -88,6 +91,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8891

8992
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
9093
{
94+
MtmPool = pool;
9195
pool->queue = (char*)ShmemAlloc(queueSize);
9296
pool->executor = executor;
9397
PGSemaphoreCreate(&pool->available);

contrib/mmts/multimaster.c

+106-18
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
421421

422422
MtmLock(LW_SHARED);
423423
if (Mtm->status == MTM_ONLINE) {
424-
MtmTransState* ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
424+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
425425
if (ts != NULL && !ts->isLocal) {
426426
snapshot = ts->snapshot;
427427
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
@@ -811,7 +811,7 @@ static MtmTransState*
811811
MtmCreateTransState(MtmCurrentTrans* x)
812812
{
813813
bool found;
814-
MtmTransState* ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
814+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
815815
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
816816
ts->snapshot = x->snapshot;
817817
ts->isLocal = true;
@@ -864,6 +864,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
864864
/* Do not take in account bg-workers which are performing recovery */
865865
elog(ERROR, "Abort current transaction because this cluster node is in %s status", MtmNodeStatusMnem[Mtm->status]);
866866
}
867+
if (TransactionIdIsValid(x->gtid.xid) && BIT_CHECK(Mtm->disabledNodeMask, x->gtid.node-1)) {
868+
/* Coordinator of transaction is disabled: just abort transaction without any further steps */
869+
elog(ERROR, "Abort transaction %d because it's coordinator %d was disabled", x->xid, x->gtid.node);
870+
}
867871

868872
MtmLock(LW_EXCLUSIVE);
869873

@@ -925,6 +929,32 @@ bool MtmWatchdog(timestamp_t now)
925929
return allAlive;
926930
}
927931

932+
/*
933+
* Mark transaction as precommitted
934+
*/
935+
void MtmPrecommitTransaction(char const* gid)
936+
{
937+
MtmLock(LW_EXCLUSIVE);
938+
{
939+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
940+
if (tm == NULL) {
941+
elog(WARNING, "MtmPrecommitTransaction: transaciton '%s' is not found", gid);
942+
} else {
943+
MtmTransState* ts = tm->state;
944+
Assert(ts != NULL);
945+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
946+
ts->status = TRANSACTION_STATUS_UNKNOWN;
947+
ts->csn = MtmAssignCSN();
948+
MtmAdjustSubtransactions(ts);
949+
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
950+
}
951+
}
952+
MtmUnlock();
953+
}
954+
955+
956+
957+
928958

929959
static bool
930960
MtmVotingCompleted(MtmTransState* ts)
@@ -949,7 +979,8 @@ MtmVotingCompleted(MtmTransState* ts)
949979
return true;
950980
} else if (MtmUseDtm) {
951981
ts->votedMask = 0;
952-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
982+
SetPrepareTransactionState(ts->gid, "precommitted");
983+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
953984
return false;
954985
} else {
955986
ts->status = TRANSACTION_STATUS_UNKNOWN;
@@ -969,7 +1000,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9691000
int nConfigChanges = Mtm->nConfigChanges;
9701001
timestamp_t prepareTime = ts->csn - ts->snapshot;
9711002
timestamp_t timeout = Max(prepareTime + MSEC_TO_USEC(MtmMin2PCTimeout), prepareTime*MtmMax2PCRatio/100);
972-
timestamp_t deadline = MtmGetSystemTime() + timeout;
1003+
timestamp_t start = MtmGetSystemTime();
1004+
timestamp_t deadline = start + timeout;
9731005
timestamp_t now;
9741006

9751007
Assert(ts->csn > ts->snapshot);
@@ -994,7 +1026,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9941026
if (now > deadline) {
9951027
if (ts->isPrepared) {
9961028
/* resend precommit message */
997-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1029+
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1030+
elog(LOG, "Distributes transaction is not committed in %ld msec", USEC_TO_MSEC(now - start));
9981031
} else {
9991032
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
10001033
MtmAbortTransaction(ts);
@@ -1037,15 +1070,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10371070
elog(ERROR, "ERROR INJECTION for transaction %d (%s)", x->xid, x->gid);
10381071
}
10391072
MtmLock(LW_EXCLUSIVE);
1040-
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
1073+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
10411074
Assert(ts != NULL);
10421075
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10431076
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
10441077
MTM_TXTRACE(x, "recovery?");
10451078
Assert(x->gid[0]);
10461079
ts->votingCompleted = true;
10471080
MTM_TXTRACE(x, "recovery? 1");
1048-
if (Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmReplicationNodeId) {
1081+
if (Mtm->status != MTM_RECOVERY/* || Mtm->recoverySlot != MtmReplicationNodeId*/) {
10491082
MTM_TXTRACE(x, "recovery? 2");
10501083
MtmSend2PCMessage(ts, MSG_PREPARED); /* send notification to coordinator */
10511084
if (!MtmUseDtm) {
@@ -1102,7 +1135,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11021135
ts->votedMask = 0;
11031136
ts->procno = MyProc->pgprocno;
11041137
MTM_TXTRACE(ts, "Coordinator sends MSG_PRECOMMIT");
1105-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1138+
SetPrepareTransactionState(ts->gid, "precommitted");
1139+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
11061140

11071141
Mtm2PCVoting(x, ts);
11081142

@@ -1159,7 +1193,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11591193
MtmTransState* ts = NULL;
11601194
MtmLock(LW_EXCLUSIVE);
11611195
if (x->isPrepared) {
1162-
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
1196+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
11631197
Assert(ts != NULL);
11641198
Assert(strcmp(x->gid, ts->gid) == 0);
11651199
} else if (x->gid[0]) {
@@ -1211,7 +1245,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12111245
if (ts == NULL) {
12121246
bool found;
12131247
Assert(TransactionIdIsValid(x->xid));
1214-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
1248+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
12151249
if (!found) {
12161250
ts->isEnqueued = false;
12171251
ts->isActive = false;
@@ -1321,6 +1355,53 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
13211355
}
13221356
}
13231357

1358+
/*
1359+
* Restore state of recovered prepared transaction in memory
1360+
*/
1361+
static void MtmLoadPreparedTransactions(void)
1362+
{
1363+
PreparedTransaction pxacts;
1364+
int n = GetPreparedTransactions(&pxacts);
1365+
int i;
1366+
1367+
for (i = 0; i < n; i++) {
1368+
bool found;
1369+
char const* gid = pxacts[i].gid;
1370+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_ENTER, &found);
1371+
if (!found) {
1372+
TransactionId xid = GetNewTransactionId(false);
1373+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_ENTER, &found);
1374+
MTM_LOG1("Recover prepared transaction %s xid %d", gid, xid);
1375+
MyPgXact->xid = InvalidTransactionId; /* dirty hack:((( */
1376+
Assert(!found);
1377+
Mtm->nActiveTransactions += 1;
1378+
ts->isEnqueued = false;
1379+
ts->isActive = true;
1380+
ts->status = strcmp(pxacts[i].state_3pc, "precommitted") == 0 ? TRANSACTION_STATUS_UNKNOWN : TRANSACTION_STATUS_IN_PROGRESS;
1381+
ts->isLocal = true;
1382+
ts->isPrepared = false;
1383+
ts->isPinned = false;
1384+
ts->snapshot = INVALID_CSN;
1385+
ts->isTwoPhase = false;
1386+
ts->csn = 0; /* should be replaced with real CSN by poll result */
1387+
ts->gtid.node = MtmNodeId;
1388+
ts->gtid.xid = xid;
1389+
ts->nSubxids = 0;
1390+
ts->votingCompleted = true;
1391+
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
1392+
ts->votedMask = 0;
1393+
strcpy(ts->gid, gid);
1394+
MtmTransactionListAppend(ts);
1395+
tm->status = ts->status;
1396+
tm->state = ts;
1397+
MtmBroadcastPollMessage(ts);
1398+
}
1399+
}
1400+
MTM_LOG1("Recover %d prepared transactions", n);
1401+
if (pxacts) {
1402+
pfree(pxacts);
1403+
}
1404+
}
13241405

13251406
static void MtmStartRecovery()
13261407
{
@@ -2084,6 +2165,7 @@ static void MtmCheckControlFile(void)
20842165
}
20852166
}
20862167

2168+
20872169
static void MtmInitialize()
20882170
{
20892171
bool found;
@@ -2120,6 +2202,7 @@ static void MtmInitialize()
21202202
Mtm->nConfigChanges = 0;
21212203
Mtm->recoveryCount = 0;
21222204
Mtm->localTablesHashLoaded = false;
2205+
Mtm->preparedTransactionsLoaded = false;
21232206
Mtm->inject2PCError = 0;
21242207
Mtm->sendQueue = NULL;
21252208
Mtm->freeQueue = NULL;
@@ -2923,6 +3006,13 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29233006
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
29243007

29253008
MtmLock(LW_EXCLUSIVE);
3009+
3010+
if (!Mtm->preparedTransactionsLoaded)
3011+
{
3012+
MtmLoadPreparedTransactions();
3013+
Mtm->preparedTransactionsLoaded = true;
3014+
}
3015+
29263016
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
29273017
{
29283018
if (*shutdown)
@@ -3402,7 +3492,7 @@ mtm_get_csn(PG_FUNCTION_ARGS)
34023492
csn_t csn = INVALID_CSN;
34033493

34043494
MtmLock(LW_SHARED);
3405-
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
3495+
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
34063496
if (ts != NULL) {
34073497
csn = ts->csn;
34083498
}
@@ -3926,14 +4016,12 @@ static inline void MtmGucUpdate(const char *key, char *value)
39264016
MtmGucEntry *hentry;
39274017
bool found;
39284018

3929-
hentry = hash_search(MtmGucHash, key, HASH_FIND, &found);
4019+
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_ENTER, &found);
39304020
if (found)
39314021
{
39324022
pfree(hentry->value);
39334023
dlist_delete(&hentry->list_node);
39344024
}
3935-
3936-
hentry = hash_search(MtmGucHash, key, HASH_ENTER, NULL);
39374025
hentry->value = value;
39384026
dlist_push_tail(&MtmGucList, &hentry->list_node);
39394027
}
@@ -3943,7 +4031,7 @@ static inline void MtmGucRemove(const char *key)
39434031
MtmGucEntry *hentry;
39444032
bool found;
39454033

3946-
hentry = hash_search(MtmGucHash, key, HASH_FIND, &found);
4034+
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_FIND, &found);
39474035
if (found)
39484036
{
39494037
pfree(hentry->value);
@@ -4531,7 +4619,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
45314619
}
45324620

45334621
static bool
4534-
MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4622+
MtmDetectGlobalDeadLockForXid(TransactionId xid)
45354623
{
45364624
bool hasDeadlock = false;
45374625
if (TransactionIdIsValid(xid)) {
@@ -4587,11 +4675,11 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
45874675

45884676
MTM_LOG1("Detect global deadlock for %d by backend %d", pgxact->xid, MyProcPid);
45894677

4590-
return MtmDetectGlobalDeadLockFortXid(pgxact->xid);
4678+
return MtmDetectGlobalDeadLockForXid(pgxact->xid);
45914679
}
45924680

45934681
Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
45944682
{
45954683
TransactionId xid = PG_GETARG_INT32(0);
4596-
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4684+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockForXid(xid));
45974685
}

0 commit comments

Comments
 (0)