Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 17f2cab

Browse files
knizhnikkelvich
authored andcommitted
Restore timeout for 2pc commit
1 parent 1708668 commit 17f2cab

File tree

2 files changed

+35
-21
lines changed

2 files changed

+35
-21
lines changed

multimaster.c

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,9 @@ MtmAdjustOldestXid(TransactionId xid)
583583

584584
for (ts = Mtm->transListHead;
585585
ts != NULL
586+
&& (ts->status == TRANSACTION_STATUS_ABORTED || ts->status == TRANSACTION_STATUS_COMMITTED)
586587
&& ts->csn < oldestSnapshot
588+
&& !ts->isPinned
587589
&& TransactionIdPrecedes(ts->xid, xid);
588590
prev = ts, ts = ts->next)
589591
{
@@ -653,6 +655,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
653655
sts = (MtmTransState*)hash_search(MtmXid2State, &subxids[i], HASH_ENTER, &found);
654656
Assert(!found);
655657
sts->isActive = false;
658+
sts->isPinned = false;
656659
sts->status = ts->status;
657660
sts->csn = ts->csn;
658661
sts->votingCompleted = true;
@@ -814,6 +817,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
814817
ts->isLocal = true;
815818
ts->isPrepared = false;
816819
ts->isTwoPhase = x->isTwoPhase;
820+
ts->isPinned = false;
817821
ts->votingCompleted = false;
818822
if (!found) {
819823
ts->isEnqueued = false;
@@ -963,8 +967,13 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
963967
{
964968
int result = 0;
965969
int nConfigChanges = Mtm->nConfigChanges;
966-
timestamp_t elapsed, start = MtmGetSystemTime();
967-
timestamp_t deadline = 0;
970+
timestamp_t prepareTime = ts->csn - ts->snapshot;
971+
timestamp_t timeout = Max(prepareTime + MSEC_TO_USEC(MtmMin2PCTimeout), prepareTime*MtmMax2PCRatio/100);
972+
timestamp_t deadline = MtmGetSystemTime() + timeout;
973+
timestamp_t now;
974+
975+
Assert(ts->csn > ts->snapshot);
976+
968977
/* Wait votes from all nodes until: */
969978
while (!MtmVotingCompleted(ts)
970979
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
@@ -980,19 +989,16 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
980989
if (result & WL_LATCH_SET) {
981990
ResetLatch(&MyProc->procLatch);
982991
}
983-
elapsed = MtmGetSystemTime() - start;
992+
now = MtmGetSystemTime();
984993
MtmLock(LW_EXCLUSIVE);
985-
if (deadline == 0 && ts->votedMask != 0) {
986-
deadline = Max(MSEC_TO_USEC(MtmMin2PCTimeout), elapsed*MtmMax2PCRatio/100);
987-
} else {
994+
if (now > deadline) {
988995
if (ts->isPrepared) {
989996
/* resend precommit message */
990997
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
991998
} else {
992-
if (elapsed > deadline) {
993-
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(elapsed));
994-
MtmAbortTransaction(ts);
995-
}
999+
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
1000+
MtmAbortTransaction(ts);
1001+
break;
9961002
}
9971003
}
9981004
}
@@ -1005,7 +1011,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10051011
} else {
10061012
if (Mtm->status != MTM_ONLINE) {
10071013
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1008-
} else if (nConfigChanges != Mtm->nConfigChanges) {
1014+
} else {
10091015
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
10101016
}
10111017
MtmAbortTransaction(ts);
@@ -1202,6 +1208,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021208
ts->status = TRANSACTION_STATUS_ABORTED;
12031209
ts->isLocal = true;
12041210
ts->isPrepared = false;
1211+
ts->isPinned = false;
12051212
ts->snapshot = x->snapshot;
12061213
ts->isTwoPhase = x->isTwoPhase;
12071214
ts->csn = MtmAssignCSN();
@@ -1280,7 +1287,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12801287
}
12811288
}
12821289

1283-
void MtmBroadcastPollMessage(MtmTransState* ts)
1290+
static void MtmBroadcastPollMessage(MtmTransState* ts)
12841291
{
12851292
int i;
12861293
MtmArbiterMessage msg;
@@ -1293,7 +1300,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
12931300

12941301
for (i = 0; i < Mtm->nAllNodes; i++)
12951302
{
1296-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
1303+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
12971304
{
12981305
msg.node = i+1;
12991306
MTM_LOG3("Send request for transaction %s to node %d", msg.gid, msg.node);
@@ -1480,15 +1487,17 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14801487
Assert(ts->gid[0]);
14811488
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
14821489
elog(LOG, "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d", ts->gid, MtmNodeId);
1483-
//MtmUnlock();
1490+
ts->isPinned = true;
1491+
MtmUnlock();
14841492
MtmFinishPreparedTransaction(ts, false);
1485-
//MtmLock(LW_EXCLUSIVE);
1493+
MtmLock(LW_EXCLUSIVE);
1494+
ts->isPinned = false;
14861495
} else {
14871496
MTM_LOG1("Poll state of transaction %d (%s)", ts->xid, ts->gid);
14881497
MtmBroadcastPollMessage(ts);
14891498
}
14901499
} else {
1491-
MTM_LOG2("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
1500+
MTM_LOG1("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
14921501
ts->xid, ts->gid, ts->status, ts->gtid.node, ts->gtid.xid, ts->votedMask);
14931502
}
14941503
}
@@ -3216,8 +3225,13 @@ bool MtmFilterTransaction(char* record, int size)
32163225
duplicate = true;
32173226
}
32183227

3219-
MTM_LOG2("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3220-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3228+
if (duplicate) {
3229+
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3230+
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3231+
} else {
3232+
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3233+
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3234+
}
32213235
return duplicate;
32223236
}
32233237

@@ -3831,7 +3845,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38313845
} else {
38323846
CommitTransactionCommand();
38333847
if (x->isSuspended) {
3834-
elog(WARNING, "Transaction %s is left in prepared state because coordinator onde is not online", x->gid);
3848+
elog(WARNING, "Transaction %s is left in prepared state because coordinator node is not online", x->gid);
38353849
} else {
38363850
StartTransactionCommand();
38373851
if (x->status == TRANSACTION_STATUS_ABORTED) {

multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ typedef struct MtmTransState
223223
bool isEnqueued; /* Transaction is inserted in queue */
224224
bool isPrepared; /* Transaction is prepared: now it is safe to commit transaction */
225225
bool isActive; /* Transaction is active */
226-
bool isTwoPhase; /* user level 2PC */
226+
bool isTwoPhase; /* User level 2PC */
227+
bool isPinned; /* Transaction oid potected from GC */
227228
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
228229
nodemask_t votedMask; /* Mask of voted nodes */
229230
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
@@ -331,7 +332,6 @@ extern void MtmExecutor(void* work, size_t size);
331332
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
332333
extern void MtmSendMessage(MtmArbiterMessage* msg);
333334
extern void MtmAdjustSubtransactions(MtmTransState* ts);
334-
extern void MtmBroadcastPollMessage(MtmTransState* ts);
335335
extern void MtmLock(LWLockMode mode);
336336
extern void MtmUnlock(void);
337337
extern void MtmLockNode(int nodeId, LWLockMode mode);

0 commit comments

Comments
 (0)