Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f947553

Browse files
knizhnikkelvich
authored andcommitted
Handle heartbeat timesouts in arbiter receiver
1 parent 54cbf4c commit f947553

File tree

3 files changed

+26
-24
lines changed

3 files changed

+26
-24
lines changed

arbiter.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ static void MtmTransReceiver(Datum arg)
675675
int nResponses;
676676
int i, j, n, rc;
677677
MtmBuffer* rxBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
678+
timestamp_t lastHeartbeatCheck = MtmGetSystemTime();
679+
timestamp_t now;
678680

679681
#if USE_EPOLL
680682
struct epoll_event* events = (struct epoll_event*)palloc(sizeof(struct epoll_event)*nNodes);
@@ -698,7 +700,7 @@ static void MtmTransReceiver(Datum arg)
698700

699701
while (!stop) {
700702
#if USE_EPOLL
701-
n = epoll_wait(epollfd, events, nNodes, USEC_TO_MSEC(MtmKeepaliveTimeout));
703+
n = epoll_wait(epollfd, events, nNodes, MtmHeartbeatRecvTimeout);
702704
if (n < 0) {
703705
if (errno == EINTR) {
704706
continue;
@@ -717,8 +719,8 @@ static void MtmTransReceiver(Datum arg)
717719
do {
718720
struct timeval tv;
719721
events = inset;
720-
tv.tv_sec = MtmKeepaliveTimeout/USECS_PER_SEC;
721-
tv.tv_usec = MtmKeepaliveTimeout%USECS_PER_SEC;
722+
tv.tv_sec = MtmHeartbeatRecvTimeout/1000;
723+
tv.tv_usec = MtmHeartbeatRecvTimeout%1000*1000;
722724
do {
723725
n = select(max_fd+1, &events, NULL, NULL, &tv);
724726
} while (n < 0 && errno == EINTR);
@@ -855,8 +857,13 @@ static void MtmTransReceiver(Datum arg)
855857
}
856858
}
857859
}
860+
now = MtmGetSystemTime();
861+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
862+
MtmWatchdog();
863+
lastHeartbeatCheck = now;
864+
}
858865
if (n == 0 && Mtm->disabledNodeMask != 0) {
859-
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
866+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
860867
MtmRefreshClusterStatus(false);
861868
}
862869
}

multimaster.c

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ int MtmReplicationNodeId;
186186
int MtmArbiterPort;
187187
int MtmConnectAttempts;
188188
int MtmConnectTimeout;
189-
int MtmKeepaliveTimeout;
189+
int MtmRaftPollDelay;
190190
int MtmReconnectAttempts;
191191
int MtmNodeDisableDelay;
192192
int MtmTransSpillThreshold;
@@ -746,7 +746,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
746746
/*
747747
* Check heartbeats
748748
*/
749-
static void MtmWatchdog()
749+
void MtmWatchdog(void)
750750
{
751751
int i, n = Mtm->nAllNodes;
752752
timestamp_t now = MtmGetSystemTime();
@@ -794,33 +794,27 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
794794
MtmResetTransaction(x);
795795
} else {
796796
time_t transTimeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
797-
time_t timeout = Min(transTimeout, MtmHeartbeatRecvTimeout);
798-
timestamp_t deadline = MtmGetSystemTime() + MSEC_TO_USEC(transTimeout);
799797
int result = 0;
800798
int nConfigChanges = Mtm->nConfigChanges;
801799
/* wait votes from all nodes */
802-
while (!ts->votingCompleted) {
800+
while (!ts->votingCompleted && !(result & WL_TIMEOUT))
801+
{
803802
MtmUnlock();
804-
//MtmWatchdog();
803+
MtmWatchdog();
805804
if (ts->status == TRANSACTION_STATUS_ABORTED) {
806805
elog(WARNING, "Transaction %d(%s) is aborted by watchdog", x->xid, x->gid);
807806
x->status = TRANSACTION_STATUS_ABORTED;
808807
return;
809808
}
810-
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
809+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, transTimeout);
811810
if (result & WL_LATCH_SET) {
812811
ResetLatch(&MyProc->procLatch);
813-
} else if (result & WL_TIMEOUT) {
814-
if (MtmGetSystemTime() > deadline) {
815-
MtmLock(LW_SHARED);
816-
break;
817-
}
818812
}
819813
MtmLock(LW_SHARED);
820814
}
821815
if (!ts->votingCompleted) {
822816
ts->status = TRANSACTION_STATUS_ABORTED;
823-
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
817+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
824818
} else if (nConfigChanges != Mtm->nConfigChanges) {
825819
ts->status = TRANSACTION_STATUS_ABORTED;
826820
elog(WARNING, "Transaction is aborted because cluster configuration is changed during commit");
@@ -1368,8 +1362,7 @@ void MtmOnNodeDisconnect(int nodeId)
13681362
BIT_SET(Mtm->reconnectMask, nodeId-1);
13691363
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
13701364

1371-
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1372-
MtmSleep(MtmKeepaliveTimeout);
1365+
MtmSleep(MtmRaftPollDelay);
13731366

13741367
if (!MtmRefreshClusterStatus(false)) {
13751368
MtmLock(LW_EXCLUSIVE);
@@ -1969,10 +1962,10 @@ _PG_init(void)
19691962
);
19701963

19711964
DefineCustomIntVariable(
1972-
"multimaster.keepalive_timeout",
1973-
"Multimaster keepalive interval for sockets",
1965+
"multimaster.raft_poll_delay",
1966+
"Multimaster delay of polling cluster state from Raftable after updating local node status",
19741967
"Timeout in microseconds before polling state of nodes",
1975-
&MtmKeepaliveTimeout,
1968+
&MtmRaftPollDelay,
19761969
1000000,
19771970
1,
19781971
INT_MAX,

multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ extern char* MtmDatabaseName;
217217
extern int MtmConnectAttempts;
218218
extern int MtmConnectTimeout;
219219
extern int MtmReconnectAttempts;
220-
extern int MtmKeepaliveTimeout;
220+
extern int MtmRaftPollDelay;
221221
extern int MtmNodeDisableDelay;
222222
extern int MtmTransSpillThreshold;
223223
extern int MtmHeartbeatSendTimeout;
@@ -266,6 +266,8 @@ extern void MtmRecoveryCompleted(void);
266266
extern void MtmMakeTableLocal(char* schema, char* name);
267267
extern void MtmHandleApplyError(void);
268268
extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
269-
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
269+
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
270+
extern void MtmWatchdog(void);
271+
270272

271273
#endif

0 commit comments

Comments
 (0)