Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c506245

Browse files
knizhnikkelvich
authored andcommitted
Add wait 2pc timeout
1 parent b6abbca commit c506245

File tree

5 files changed

+75
-31
lines changed

5 files changed

+75
-31
lines changed

arbiter.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -720,14 +720,24 @@ static void MtmTransReceiver(Datum arg)
720720
MtmAbortTransaction(ts);
721721
}
722722

723+
if (!MtmUseDtm && msg->csn > ts->csn) {
724+
ts->csn = msg->csn;
725+
MtmSyncClock(ts->csn);
726+
}
727+
723728
if (++ts->nVotes == Mtm->nNodes) {
724729
/* All nodes are finished their transactions */
725-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
730+
if (ts->status == TRANSACTION_STATUS_ABORTED) {
731+
MtmWakeUpBackend(ts);
732+
} else if (MtmUseDtm) {
733+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
726734
ts->nVotes = 1; /* I voted myself */
727735
MtmSendNotificationMessage(ts, MSG_PREPARE);
728736
} else {
729-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
730-
MtmWakeUpBackend(ts);
737+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
738+
ts->csn = MtmAssignCSN();
739+
ts->status = TRANSACTION_STATUS_UNKNOWN;
740+
MtmWakeUpBackend(ts);
731741
}
732742
}
733743
break;

multimaster.c

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ static int MtmWorkers;
190190
static int MtmVacuumDelay;
191191
static int MtmMinRecoveryLag;
192192
static int MtmMaxRecoveryLag;
193+
static int Mtm2PCPrepareRatio;
194+
static int Mtm2PCMinTimeout;
193195
static bool MtmIgnoreTablesWithoutPk;
194196

195197
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -765,8 +767,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
765767

766768
}
767769

768-
static time_t maxWakeupTime;
769-
770770
static void
771771
MtmPostPrepareTransaction(MtmCurrentTrans* x)
772772
{
@@ -782,25 +782,32 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
782782
tm->state = ts;
783783
ts->votingCompleted = true;
784784
if (Mtm->status != MTM_RECOVERY) {
785-
MtmSendNotificationMessage(ts, MtmUseDtm ? MSG_READY : MSG_PREPARED); /* send notification to coordinator */
785+
if (MtmUseDtm) {
786+
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
787+
} else {
788+
ts->csn = MtmAssignCSN();
789+
MtmSendNotificationMessage(ts, MSG_PREPARED); /* send notification to coordinator */
790+
ts->status = TRANSACTION_STATUS_UNKNOWN;
791+
}
786792
} else {
787793
ts->status = TRANSACTION_STATUS_UNKNOWN;
788794
}
789795
MtmUnlock();
790796
MtmResetTransaction(x);
791797
} else {
792-
time_t wakeupTime;
798+
time_t timeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
799+
int result = 0;
793800
/* wait votes from all nodes */
794-
while (!ts->votingCompleted) {
801+
while (!ts->votingCompleted && !(result & WL_TIMEOUT)) {
795802
MtmUnlock();
796-
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
803+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
797804
ResetLatch(&MyProc->procLatch);
798-
wakeupTime = MtmGetCurrentTime() - ts->wakeupTime;
799-
if (wakeupTime > maxWakeupTime) {
800-
maxWakeupTime = wakeupTime;
801-
}
802805
MtmLock(LW_SHARED);
803806
}
807+
if (!ts->votingCompleted) {
808+
ts->status = TRANSACTION_STATUS_ABORTED;
809+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration", (int)timeout);
810+
}
804811
x->status = ts->status;
805812
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
806813
MtmUnlock();
@@ -988,11 +995,12 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
988995
}
989996

990997
void MtmWakeUpBackend(MtmTransState* ts)
991-
{
992-
MTM_LOG3("Wakeup backed procno=%d, pid=%d", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
993-
ts->votingCompleted = true;
994-
ts->wakeupTime = MtmGetCurrentTime();
995-
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
998+
{
999+
if (!ts->votingCompleted) {
1000+
MTM_LOG3("Wakeup backed procno=%d, pid=%d", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
1001+
ts->votingCompleted = true;
1002+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
1003+
}
9961004
}
9971005

9981006
void MtmAbortTransaction(MtmTransState* ts)
@@ -1598,6 +1606,38 @@ _PG_init(void)
15981606
if (!process_shared_preload_libraries_in_progress)
15991607
return;
16001608

1609+
DefineCustomIntVariable(
1610+
"multimaster.2pc_min_timeout",
1611+
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
1612+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1613+
&Mtm2PCMinTimeout,
1614+
10000,
1615+
0,
1616+
INT_MAX,
1617+
PGC_BACKEND,
1618+
0,
1619+
NULL,
1620+
NULL,
1621+
NULL
1622+
);
1623+
1624+
DefineCustomIntVariable(
1625+
"multimaster.2pc_prepare_ratio",
1626+
"Percent of prepare time for maximal time of second phase of two-pahse commit",
1627+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1628+
&Mtm2PCPrepareRatio,
1629+
100,
1630+
0,
1631+
INT_MAX,
1632+
PGC_BACKEND,
1633+
0,
1634+
NULL,
1635+
NULL,
1636+
NULL
1637+
);
1638+
1639+
1640+
16011641
DefineCustomIntVariable(
16021642
"multimaster.node_disable_delay",
16031643
"Minamal amount of time (sec) between node status change",

multimaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ typedef struct MtmTransState
135135
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
136136
used to notify coordinator by arbiter */
137137
int nSubxids; /* Number of subtransanctions */
138-
time_t wakeupTime;
139138
MtmMessageCode cmd; /* Notification message to be sent */
140139
struct MtmTransState* nextVoting; /* Next element in L1-list of voting transactions. */
141140
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */

pglogical_receiver.c

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,13 @@ static volatile sig_atomic_t got_sighup = false;
5353

5454
/* GUC variables */
5555
static int receiver_idle_time = 0;
56-
static bool receiver_sync_mode = true;
56+
static bool receiver_sync_mode = false;
5757

5858
/* Worker name */
59-
char worker_proc[BGW_MAXLEN];
59+
static char worker_proc[BGW_MAXLEN];
6060

6161
/* Lastly written positions */
6262
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63-
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64-
static XLogRecPtr output_applied_lsn = InvalidXLogRecPtr;
6563

6664
/* Stream functions */
6765
static void fe_sendint64(int64 i, char *buf);
@@ -91,16 +89,17 @@ receiver_raw_sighup(SIGNAL_ARGS)
9189
* Send a Standby Status Update message to server.
9290
*/
9391
static bool
94-
sendFeedback(PGconn *conn, int64 now)
92+
sendFeedback(PGconn *conn, int64 now, RepOriginId originId)
9593
{
9694
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
9795
int len = 0;
96+
XLogRecPtr output_applied_lsn = replorigin_get_progress(originId, true);
9897

9998
replybuf[len] = 'r';
10099
len += 1;
101100
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
102101
len += 8;
103-
fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
102+
fe_sendint64(output_applied_lsn, &replybuf[len]); /* flush */
104103
len += 8;
105104
fe_sendint64(output_applied_lsn, &replybuf[len]); /* apply */
106105
len += 8;
@@ -409,8 +408,6 @@ pglogical_receiver_main(Datum main_arg)
409408

410409
/* Update written position */
411410
output_written_lsn = Max(walEnd, output_written_lsn);
412-
output_fsync_lsn = output_written_lsn;
413-
output_applied_lsn = output_written_lsn;
414411

415412
/*
416413
* If the server requested an immediate reply, send one.
@@ -424,7 +421,7 @@ pglogical_receiver_main(Datum main_arg)
424421
int64 now = feGetCurrentTimestamp();
425422

426423
/* Leave is feedback is not sent properly */
427-
if (!sendFeedback(conn, now))
424+
if (!sendFeedback(conn, now, originId))
428425
proc_exit(1);
429426
}
430427
continue;
@@ -482,8 +479,6 @@ pglogical_receiver_main(Datum main_arg)
482479
}
483480
/* Update written position */
484481
output_written_lsn = Max(walEnd, output_written_lsn);
485-
output_fsync_lsn = output_written_lsn;
486-
output_applied_lsn = output_written_lsn;
487482
}
488483

489484
/* No data, move to next loop */

tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
// r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm.get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

0 commit comments

Comments
 (0)