Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0a3c6c0

Browse files
knizhnikkelvich
authored andcommitted
Add 3PC support
1 parent db108a1 commit 0a3c6c0

8 files changed

+157
-42
lines changed

arbiter.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
536536
nodemask_t save_mask = busy_mask;
537537
BIT_SET(busy_mask, node);
538538
while (true) {
539+
#if 0
539540
if (sockets[node] >= 0 && BIT_CHECK(Mtm->reconnectMask, node)) {
540541
elog(WARNING, "Arbiter is forced to reconnect to node %d", node+1);
541542
close(sockets[node]);
542543
sockets[node] = -1;
543544
}
545+
#endif
544546
if (BIT_CHECK(Mtm->reconnectMask, node)) {
545547
MtmLock(LW_EXCLUSIVE);
546548
BIT_CLEAR(Mtm->reconnectMask, node);
@@ -872,7 +874,8 @@ static void MtmReceiver(Datum arg)
872874
}
873875

874876
rc = MtmReadFromNode(i, (char*)rxBuffer[i].data + rxBuffer[i].used, rxBuffer[i].size-rxBuffer[i].used);
875-
if (rc <= 0) {
877+
if (rc <= 0) {
878+
MTM_LOG1("Failed to read response from node %d", i+1);
876879
continue;
877880
}
878881

@@ -940,6 +943,8 @@ static void MtmReceiver(Datum arg)
940943
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
941944
elog(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
942945
MtmFinishPreparedTransaction(ts, true);
946+
} else {
947+
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx", msg->gid, msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
943948
}
944949
} else {
945950
elog(LOG, "Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
@@ -1009,7 +1014,8 @@ static void MtmReceiver(Datum arg)
10091014
} else if (MtmUseDtm) {
10101015
ts->votedMask = 0;
10111016
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
1012-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1017+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018+
SetPrepareTransactionState(ts->gid, "precommitted");
10131019
} else {
10141020
ts->status = TRANSACTION_STATUS_UNKNOWN;
10151021
MtmWakeUpBackend(ts);
@@ -1056,6 +1062,7 @@ static void MtmReceiver(Datum arg)
10561062
} else {
10571063
switch (msg->code) {
10581064
case MSG_PRECOMMIT:
1065+
Assert(false); // Now send through pglogical
10591066
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
10601067
ts->status = TRANSACTION_STATUS_UNKNOWN;
10611068
ts->csn = MtmAssignCSN();

bgwpool.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
bool MtmIsLogicalReceiver;
1818
int MtmMaxWorkers;
1919

20-
static BgwPool* pool;
20+
static BgwPool* MtmPool;
2121

2222
static void BgwShutdownWorker(int sig)
2323
{
24-
BgwPoolStop(pool);
24+
if (MtmPool) {
25+
BgwPoolStop(MtmPool);
26+
}
2527
}
2628

2729
static void BgwPoolMainLoop(BgwPool* pool)
@@ -32,6 +34,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3234
sigset_t sset;
3335

3436
MtmIsLogicalReceiver = true;
37+
MtmPool = pool;
3538

3639
signal(SIGINT, BgwShutdownWorker);
3740
signal(SIGQUIT, BgwShutdownWorker);
@@ -88,6 +91,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8891

8992
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
9093
{
94+
MtmPool = pool;
9195
pool->queue = (char*)ShmemAlloc(queueSize);
9296
pool->executor = executor;
9397
PGSemaphoreCreate(&pool->available);

multimaster.c

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
416416

417417
MtmLock(LW_SHARED);
418418
if (Mtm->status == MTM_ONLINE) {
419-
MtmTransState* ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
419+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
420420
if (ts != NULL && !ts->isLocal) {
421421
snapshot = ts->snapshot;
422422
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
@@ -806,7 +806,7 @@ static MtmTransState*
806806
MtmCreateTransState(MtmCurrentTrans* x)
807807
{
808808
bool found;
809-
MtmTransState* ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
809+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
810810
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
811811
ts->snapshot = x->snapshot;
812812
ts->isLocal = true;
@@ -859,6 +859,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
859859
/* Do not take in account bg-workers which are performing recovery */
860860
elog(ERROR, "Abort current transaction because this cluster node is in %s status", MtmNodeStatusMnem[Mtm->status]);
861861
}
862+
if (TransactionIdIsValid(x->gtid.xid) && BIT_CHECK(Mtm->disabledNodeMask, x->gtid.node-1)) {
863+
/* Coordinator of transaction is disabled: just abort transaction without any further steps */
864+
elog(ERROR, "Abort transaction %d because it's coordinator %d was disabled", x->xid, x->gtid.node);
865+
}
862866

863867
MtmLock(LW_EXCLUSIVE);
864868

@@ -920,6 +924,32 @@ bool MtmWatchdog(timestamp_t now)
920924
return allAlive;
921925
}
922926

927+
/*
928+
* Mark transaction as precommitted
929+
*/
930+
void MtmPrecommitTransaction(char const* gid)
931+
{
932+
MtmLock(LW_EXCLUSIVE);
933+
{
934+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
935+
if (tm == NULL) {
936+
elog(WARNING, "MtmPrecommitTransaction: transaciton '%s' is not found", gid);
937+
} else {
938+
MtmTransState* ts = tm->state;
939+
Assert(ts != NULL);
940+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
941+
ts->status = TRANSACTION_STATUS_UNKNOWN;
942+
ts->csn = MtmAssignCSN();
943+
MtmAdjustSubtransactions(ts);
944+
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
945+
}
946+
}
947+
MtmUnlock();
948+
}
949+
950+
951+
952+
923953

924954
static bool
925955
MtmVotingCompleted(MtmTransState* ts)
@@ -944,7 +974,8 @@ MtmVotingCompleted(MtmTransState* ts)
944974
return true;
945975
} else if (MtmUseDtm) {
946976
ts->votedMask = 0;
947-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
977+
SetPrepareTransactionState(ts->gid, "precommitted");
978+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
948979
return false;
949980
} else {
950981
ts->status = TRANSACTION_STATUS_UNKNOWN;
@@ -964,7 +995,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
964995
int nConfigChanges = Mtm->nConfigChanges;
965996
timestamp_t prepareTime = ts->csn - ts->snapshot;
966997
timestamp_t timeout = Max(prepareTime + MSEC_TO_USEC(MtmMin2PCTimeout), prepareTime*MtmMax2PCRatio/100);
967-
timestamp_t deadline = MtmGetSystemTime() + timeout;
998+
timestamp_t start = MtmGetSystemTime();
999+
timestamp_t deadline = start + timeout;
9681000
timestamp_t now;
9691001

9701002
Assert(ts->csn > ts->snapshot);
@@ -989,7 +1021,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9891021
if (now > deadline) {
9901022
if (ts->isPrepared) {
9911023
/* resend precommit message */
992-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1024+
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1025+
elog(LOG, "Distributes transaction is not committed in %ld msec", USEC_TO_MSEC(now - start));
9931026
} else {
9941027
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
9951028
MtmAbortTransaction(ts);
@@ -1032,15 +1065,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10321065
elog(ERROR, "ERROR INJECTION for transaction %d (%s)", x->xid, x->gid);
10331066
}
10341067
MtmLock(LW_EXCLUSIVE);
1035-
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
1068+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
10361069
Assert(ts != NULL);
10371070
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10381071
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
10391072
MTM_TXTRACE(x, "recovery?");
10401073
Assert(x->gid[0]);
10411074
ts->votingCompleted = true;
10421075
MTM_TXTRACE(x, "recovery? 1");
1043-
if (Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmReplicationNodeId) {
1076+
if (Mtm->status != MTM_RECOVERY/* || Mtm->recoverySlot != MtmReplicationNodeId*/) {
10441077
MTM_TXTRACE(x, "recovery? 2");
10451078
MtmSend2PCMessage(ts, MSG_PREPARED); /* send notification to coordinator */
10461079
if (!MtmUseDtm) {
@@ -1097,7 +1130,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10971130
ts->votedMask = 0;
10981131
ts->procno = MyProc->pgprocno;
10991132
MTM_TXTRACE(ts, "Coordinator sends MSG_PRECOMMIT");
1100-
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1133+
SetPrepareTransactionState(ts->gid, "precommitted");
1134+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
11011135

11021136
Mtm2PCVoting(x, ts);
11031137

@@ -1154,7 +1188,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541188
MtmTransState* ts = NULL;
11551189
MtmLock(LW_EXCLUSIVE);
11561190
if (x->isPrepared) {
1157-
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
1191+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
11581192
Assert(ts != NULL);
11591193
Assert(strcmp(x->gid, ts->gid) == 0);
11601194
} else if (x->gid[0]) {
@@ -1206,7 +1240,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12061240
if (ts == NULL) {
12071241
bool found;
12081242
Assert(TransactionIdIsValid(x->xid));
1209-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
1243+
ts = (MtmTransState*)hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
12101244
if (!found) {
12111245
ts->isEnqueued = false;
12121246
ts->isActive = false;
@@ -1316,6 +1350,53 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
13161350
}
13171351
}
13181352

1353+
/*
1354+
* Restore state of recovered prepared transaction in memory
1355+
*/
1356+
static void MtmLoadPreparedTransactions(void)
1357+
{
1358+
PreparedTransaction pxacts;
1359+
int n = GetPreparedTransactions(&pxacts);
1360+
int i;
1361+
1362+
for (i = 0; i < n; i++) {
1363+
bool found;
1364+
char const* gid = pxacts[i].gid;
1365+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_ENTER, &found);
1366+
if (!found) {
1367+
TransactionId xid = GetNewTransactionId(false);
1368+
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_ENTER, &found);
1369+
MTM_LOG1("Recover prepared transaction %s xid %d", gid, xid);
1370+
MyPgXact->xid = InvalidTransactionId; /* dirty hack:((( */
1371+
Assert(!found);
1372+
Mtm->nActiveTransactions += 1;
1373+
ts->isEnqueued = false;
1374+
ts->isActive = true;
1375+
ts->status = strcmp(pxacts[i].state_3pc, "precommitted") == 0 ? TRANSACTION_STATUS_UNKNOWN : TRANSACTION_STATUS_IN_PROGRESS;
1376+
ts->isLocal = true;
1377+
ts->isPrepared = false;
1378+
ts->isPinned = false;
1379+
ts->snapshot = INVALID_CSN;
1380+
ts->isTwoPhase = false;
1381+
ts->csn = 0; /* should be replaced with real CSN by poll result */
1382+
ts->gtid.node = MtmNodeId;
1383+
ts->gtid.xid = xid;
1384+
ts->nSubxids = 0;
1385+
ts->votingCompleted = true;
1386+
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
1387+
ts->votedMask = 0;
1388+
strcpy(ts->gid, gid);
1389+
MtmTransactionListAppend(ts);
1390+
tm->status = ts->status;
1391+
tm->state = ts;
1392+
MtmBroadcastPollMessage(ts);
1393+
}
1394+
}
1395+
MTM_LOG1("Recover %d prepared transactions", n);
1396+
if (pxacts) {
1397+
pfree(pxacts);
1398+
}
1399+
}
13191400

13201401
static void MtmStartRecovery()
13211402
{
@@ -2051,6 +2132,7 @@ static void MtmCheckControlFile(void)
20512132
}
20522133
}
20532134

2135+
20542136
static void MtmInitialize()
20552137
{
20562138
bool found;
@@ -2087,6 +2169,7 @@ static void MtmInitialize()
20872169
Mtm->nConfigChanges = 0;
20882170
Mtm->recoveryCount = 0;
20892171
Mtm->localTablesHashLoaded = false;
2172+
Mtm->preparedTransactionsLoaded = false;
20902173
Mtm->inject2PCError = 0;
20912174
Mtm->sendQueue = NULL;
20922175
Mtm->freeQueue = NULL;
@@ -2851,6 +2934,13 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28512934
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28522935

28532936
MtmLock(LW_EXCLUSIVE);
2937+
2938+
if (!Mtm->preparedTransactionsLoaded)
2939+
{
2940+
MtmLoadPreparedTransactions();
2941+
Mtm->preparedTransactionsLoaded = true;
2942+
}
2943+
28542944
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28552945
{
28562946
if (*shutdown)
@@ -3330,7 +3420,7 @@ mtm_get_csn(PG_FUNCTION_ARGS)
33303420
csn_t csn = INVALID_CSN;
33313421

33323422
MtmLock(LW_SHARED);
3333-
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
3423+
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
33343424
if (ts != NULL) {
33353425
csn = ts->csn;
33363426
}
@@ -3859,14 +3949,12 @@ static inline void MtmGucUpdate(const char *key, char *value)
38593949
MtmGucEntry *hentry;
38603950
bool found;
38613951

3862-
hentry = hash_search(MtmGucHash, key, HASH_FIND, &found);
3952+
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_ENTER, &found);
38633953
if (found)
38643954
{
38653955
pfree(hentry->value);
38663956
dlist_delete(&hentry->list_node);
38673957
}
3868-
3869-
hentry = hash_search(MtmGucHash, key, HASH_ENTER, NULL);
38703958
hentry->value = value;
38713959
dlist_push_tail(&MtmGucList, &hentry->list_node);
38723960
}
@@ -3876,7 +3964,7 @@ static inline void MtmGucRemove(const char *key)
38763964
MtmGucEntry *hentry;
38773965
bool found;
38783966

3879-
hentry = hash_search(MtmGucHash, key, HASH_FIND, &found);
3967+
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_FIND, &found);
38803968
if (found)
38813969
{
38823970
pfree(hentry->value);
@@ -4465,7 +4553,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
44654553
}
44664554

44674555
static bool
4468-
MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4556+
MtmDetectGlobalDeadLockForXid(TransactionId xid)
44694557
{
44704558
bool hasDeadlock = false;
44714559
if (TransactionIdIsValid(xid)) {
@@ -4530,11 +4618,11 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
45304618

45314619
MTM_LOG1("Detect global deadlock for %d by backend %d", pgxact->xid, MyProcPid);
45324620

4533-
return MtmDetectGlobalDeadLockFortXid(pgxact->xid);
4621+
return MtmDetectGlobalDeadLockForXid(pgxact->xid);
45344622
}
45354623

45364624
Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
45374625
{
45384626
TransactionId xid = PG_GETARG_INT32(0);
4539-
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4627+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockForXid(xid));
45404628
}

0 commit comments

Comments
 (0)