Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3abe087

Browse files
knizhnikkelvich
authored andcommitted
Restore timeouts
1 parent 7668c14 commit 3abe087

File tree

1 file changed

+94
-77
lines changed

1 file changed

+94
-77
lines changed

multimaster.c

Lines changed: 94 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ int MtmTransSpillThreshold;
223223
int MtmMaxNodes;
224224
int MtmHeartbeatSendTimeout;
225225
int MtmHeartbeatRecvTimeout;
226+
int MtmMin2PCTimeout;
227+
int MtmMax2PCRatio;
226228
bool MtmUseRaftable;
227229
bool MtmUseDtm;
228230
bool MtmPreserveCommitOrder;
@@ -954,6 +956,62 @@ MtmVotingCompleted(MtmTransState* ts)
954956
|| ts->status == TRANSACTION_STATUS_ABORTED; /* or transaction was aborted */
955957
}
956958

959+
static void
960+
Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
961+
{
962+
int result = 0;
963+
int nConfigChanges = Mtm->nConfigChanges;
964+
timestamp_t elapsed, start = MtmGetSystemTime();
965+
timestamp_t deadline = 0;
966+
/* Wait votes from all nodes until: */
967+
while (!MtmVotingCompleted(ts)
968+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
969+
{
970+
MtmUnlock();
971+
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Start");
972+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
973+
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Finish");
974+
/* Emergency bailout if postmaster has died */
975+
if (result & WL_POSTMASTER_DEATH) {
976+
proc_exit(1);
977+
}
978+
if (result & WL_LATCH_SET) {
979+
ResetLatch(&MyProc->procLatch);
980+
}
981+
elapsed = MtmGetSystemTime() - start;
982+
MtmLock(LW_EXCLUSIVE);
983+
if (deadline == 0 && ts->votedMask != 0) {
984+
deadline = Max(MSEC_TO_USEC(MtmMin2PCTimeout), elapsed*MtmMax2PCRatio/100);
985+
} else {
986+
if (ts->isPrepared) {
987+
/* reset precommit message */
988+
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
989+
} else {
990+
if (elapsed > deadline) {
991+
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(elapsed));
992+
MtmAbortTransaction(ts);
993+
}
994+
}
995+
}
996+
}
997+
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
998+
if (ts->isPrepared) {
999+
// GetNewTransactionId(false); /* force increment of transaction counter */
1000+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1001+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1002+
x->isSuspended = true;
1003+
} else {
1004+
if (Mtm->status != MTM_ONLINE) {
1005+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1006+
} else if (nConfigChanges != Mtm->nConfigChanges) {
1007+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1008+
}
1009+
MtmAbortTransaction(ts);
1010+
}
1011+
}
1012+
x->status = ts->status;
1013+
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
1014+
}
9571015

9581016
static void
9591017
MtmPostPrepareTransaction(MtmCurrentTrans* x)
@@ -987,42 +1045,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9871045
MtmUnlock();
9881046
MtmResetTransaction();
9891047
} else {
990-
int result = 0;
991-
int nConfigChanges = Mtm->nConfigChanges;
992-
/* Wait votes from all nodes until: */
993-
while (!MtmVotingCompleted(ts)
994-
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
995-
{
996-
MtmUnlock();
997-
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Start");
998-
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
999-
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Finish");
1000-
/* Emergency bailout if postmaster has died */
1001-
if (result & WL_POSTMASTER_DEATH) {
1002-
proc_exit(1);
1003-
}
1004-
if (result & WL_LATCH_SET) {
1005-
ResetLatch(&MyProc->procLatch);
1006-
}
1007-
MtmLock(LW_EXCLUSIVE);
1008-
}
1009-
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1010-
if (ts->isPrepared) {
1011-
// GetNewTransactionId(false); /* force increment of transaction counter */
1012-
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013-
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1014-
x->isSuspended = true;
1015-
} else {
1016-
if (Mtm->status != MTM_ONLINE) {
1017-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1018-
} else {
1019-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1020-
}
1021-
MtmAbortTransaction(ts);
1022-
}
1023-
}
1024-
x->status = ts->status;
1025-
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
1048+
Mtm2PCVoting(x, ts);
10261049
MtmUnlock();
10271050
if (x->isTwoPhase) {
10281051
MtmResetTransaction();
@@ -1051,9 +1074,6 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10511074
if (tm == NULL) {
10521075
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
10531076
} else {
1054-
int result = 0;
1055-
int nConfigChanges = Mtm->nConfigChanges;
1056-
10571077
Assert(tm->state != NULL);
10581078
MTM_LOG3("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
10591079
ts = tm->state;
@@ -1065,44 +1085,11 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10651085
ts->procno = MyProc->pgprocno;
10661086
MTM_TXTRACE(ts, "Coordinator sends MSG_PRECOMMIT");
10671087
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1068-
1069-
/* Wait votes from all nodes until: */
1070-
while (!MtmVotingCompleted(ts)
1071-
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
1072-
{
1073-
MtmUnlock();
1074-
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
1075-
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
1076-
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Finish");
1077-
/* Emergency bailout if postmaster has died */
1078-
if (result & WL_POSTMASTER_DEATH) {
1079-
proc_exit(1);
1080-
}
1081-
MtmLock(LW_EXCLUSIVE);
1082-
if (result & WL_LATCH_SET) {
1083-
MTM_LOG3("Latch signaled at %ld", MtmGetSystemTime());
1084-
ResetLatch(&MyProc->procLatch);
1085-
}
1086-
}
1087-
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1088-
if (ts->isPrepared) {
1089-
// GetNewTransactionId(false); /* force increment of transaction counter */
1090-
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091-
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1092-
x->isSuspended = true;
1093-
} else {
1094-
if (Mtm->status != MTM_ONLINE) {
1095-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1096-
} else {
1097-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1098-
}
1099-
MtmAbortTransaction(ts);
1100-
}
1101-
}
1102-
x->status = ts->status;
1088+
1089+
Mtm2PCVoting(x, ts);
1090+
11031091
x->xid = ts->xid;
11041092
x->isPrepared = true;
1105-
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
11061093
}
11071094
MtmUnlock();
11081095
}
@@ -1202,7 +1189,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021189
* Send notification only if ABORT happens during transaction processing at replicas,
12031190
* do not send notification if ABORT is received from master
12041191
*/
1205-
MTM_LOG2("%d: send ABORT notification for transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
1192+
MTM_LOG1("%d: send ABORT notification for transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
12061193
if (ts == NULL) {
12071194
bool found;
12081195
Assert(TransactionIdIsValid(x->xid));
@@ -1277,7 +1264,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12771264
int i;
12781265
for (i = 0; i < Mtm->nAllNodes; i++)
12791266
{
1280-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
1267+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
12811268
{
12821269
Assert(TransactionIdIsValid(ts->xids[i]));
12831270
msg.node = i+1;
@@ -2645,6 +2632,36 @@ _PG_init(void)
26452632
NULL
26462633
);
26472634

2635+
DefineCustomIntVariable(
2636+
"multimaster.min_2pc_timeout",
2637+
"Minimal timeout between receiving PREPARED message from nodes participated in transaction to coordinator (milliseconds)",
2638+
NULL,
2639+
&MtmMin2PCTimeout,
2640+
2000, /* 2 seconds */
2641+
1,
2642+
INT_MAX,
2643+
PGC_BACKEND,
2644+
0,
2645+
NULL,
2646+
NULL,
2647+
NULL
2648+
);
2649+
2650+
DefineCustomIntVariable(
2651+
"multimaster.max_2pc_ratio",
2652+
"Maximal ratio (in percents) between prepare time at different nodes: if T is time of preparing transaction at some node, then transaction can be aborted if prepared responce was not received in T*MtmMax2PCRatio/100",
2653+
NULL,
2654+
&MtmMax2PCRatio,
2655+
200, /* 2 times */
2656+
1,
2657+
INT_MAX,
2658+
PGC_BACKEND,
2659+
0,
2660+
NULL,
2661+
NULL,
2662+
NULL
2663+
);
2664+
26482665
DefineCustomIntVariable(
26492666
"multimaster.queue_size",
26502667
"Multimaster queue size",

0 commit comments

Comments
 (0)