Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2d69b46

Browse files
committed
Handle heartbeat timesouts in arbiter receiver
1 parent 1c24395 commit 2d69b46

File tree

4 files changed

+27
-25
lines changed

4 files changed

+27
-25
lines changed

contrib/mmts/arbiter.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ static void MtmTransReceiver(Datum arg)
675675
int nResponses;
676676
int i, j, n, rc;
677677
MtmBuffer* rxBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
678+
timestamp_t lastHeartbeatCheck = MtmGetSystemTime();
679+
timestamp_t now;
678680

679681
#if USE_EPOLL
680682
struct epoll_event* events = (struct epoll_event*)palloc(sizeof(struct epoll_event)*nNodes);
@@ -698,7 +700,7 @@ static void MtmTransReceiver(Datum arg)
698700

699701
while (!stop) {
700702
#if USE_EPOLL
701-
n = epoll_wait(epollfd, events, nNodes, USEC_TO_MSEC(MtmKeepaliveTimeout));
703+
n = epoll_wait(epollfd, events, nNodes, MtmHeartbeatRecvTimeout);
702704
if (n < 0) {
703705
if (errno == EINTR) {
704706
continue;
@@ -717,8 +719,8 @@ static void MtmTransReceiver(Datum arg)
717719
do {
718720
struct timeval tv;
719721
events = inset;
720-
tv.tv_sec = MtmKeepaliveTimeout/USECS_PER_SEC;
721-
tv.tv_usec = MtmKeepaliveTimeout%USECS_PER_SEC;
722+
tv.tv_sec = MtmHeartbeatRecvTimeout/1000;
723+
tv.tv_usec = MtmHeartbeatRecvTimeout%1000*1000;
722724
do {
723725
n = select(max_fd+1, &events, NULL, NULL, &tv);
724726
} while (n < 0 && errno == EINTR);
@@ -855,8 +857,13 @@ static void MtmTransReceiver(Datum arg)
855857
}
856858
}
857859
}
860+
now = MtmGetSystemTime();
861+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
862+
MtmWatchdog();
863+
lastHeartbeatCheck = now;
864+
}
858865
if (n == 0 && Mtm->disabledNodeMask != 0) {
859-
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
866+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
860867
MtmRefreshClusterStatus(false);
861868
}
862869
}

contrib/mmts/multimaster.c

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ int MtmReplicationNodeId;
187187
int MtmArbiterPort;
188188
int MtmConnectAttempts;
189189
int MtmConnectTimeout;
190-
int MtmKeepaliveTimeout;
190+
int MtmRaftPollDelay;
191191
int MtmReconnectAttempts;
192192
int MtmNodeDisableDelay;
193193
int MtmTransSpillThreshold;
@@ -747,7 +747,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
747747
/*
748748
* Check heartbeats
749749
*/
750-
static void MtmWatchdog()
750+
void MtmWatchdog(void)
751751
{
752752
int i, n = Mtm->nAllNodes;
753753
timestamp_t now = MtmGetSystemTime();
@@ -795,33 +795,27 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
795795
MtmResetTransaction(x);
796796
} else {
797797
time_t transTimeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
798-
time_t timeout = Min(transTimeout, MtmHeartbeatRecvTimeout);
799-
timestamp_t deadline = MtmGetSystemTime() + MSEC_TO_USEC(transTimeout);
800798
int result = 0;
801799
int nConfigChanges = Mtm->nConfigChanges;
802800
/* wait votes from all nodes */
803-
while (!ts->votingCompleted) {
801+
while (!ts->votingCompleted && !(result & WL_TIMEOUT))
802+
{
804803
MtmUnlock();
805-
//MtmWatchdog();
804+
MtmWatchdog();
806805
if (ts->status == TRANSACTION_STATUS_ABORTED) {
807806
elog(WARNING, "Transaction %d(%s) is aborted by watchdog", x->xid, x->gid);
808807
x->status = TRANSACTION_STATUS_ABORTED;
809808
return;
810809
}
811-
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
810+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, transTimeout);
812811
if (result & WL_LATCH_SET) {
813812
ResetLatch(&MyProc->procLatch);
814-
} else if (result & WL_TIMEOUT) {
815-
if (MtmGetSystemTime() > deadline) {
816-
MtmLock(LW_SHARED);
817-
break;
818-
}
819813
}
820814
MtmLock(LW_SHARED);
821815
}
822816
if (!ts->votingCompleted) {
823817
ts->status = TRANSACTION_STATUS_ABORTED;
824-
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
818+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
825819
} else if (nConfigChanges != Mtm->nConfigChanges) {
826820
ts->status = TRANSACTION_STATUS_ABORTED;
827821
elog(WARNING, "Transaction is aborted because cluster configuration is changed during commit");
@@ -1369,8 +1363,7 @@ void MtmOnNodeDisconnect(int nodeId)
13691363
BIT_SET(Mtm->reconnectMask, nodeId-1);
13701364
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
13711365

1372-
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1373-
MtmSleep(MtmKeepaliveTimeout);
1366+
MtmSleep(MtmRaftPollDelay);
13741367

13751368
if (!MtmRefreshClusterStatus(false)) {
13761369
MtmLock(LW_EXCLUSIVE);
@@ -1970,10 +1963,10 @@ _PG_init(void)
19701963
);
19711964

19721965
DefineCustomIntVariable(
1973-
"multimaster.keepalive_timeout",
1974-
"Multimaster keepalive interval for sockets",
1966+
"multimaster.raft_poll_delay",
1967+
"Multimaster delay of polling cluster state from Raftable after updating local node status",
19751968
"Timeout in microseconds before polling state of nodes",
1976-
&MtmKeepaliveTimeout,
1969+
&MtmRaftPollDelay,
19771970
1000000,
19781971
1,
19791972
INT_MAX,

contrib/mmts/multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ extern char* MtmDatabaseName;
217217
extern int MtmConnectAttempts;
218218
extern int MtmConnectTimeout;
219219
extern int MtmReconnectAttempts;
220-
extern int MtmKeepaliveTimeout;
220+
extern int MtmRaftPollDelay;
221221
extern int MtmNodeDisableDelay;
222222
extern int MtmTransSpillThreshold;
223223
extern int MtmHeartbeatSendTimeout;
@@ -266,6 +266,8 @@ extern void MtmRecoveryCompleted(void);
266266
extern void MtmMakeTableLocal(char* schema, char* name);
267267
extern void MtmHandleApplyError(void);
268268
extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
269-
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
269+
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
270+
extern void MtmWatchdog(void);
271+
270272

271273
#endif

contrib/raftable/state.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include "blockmem.h"
77
#include "state.h"
88

9-
#define RAFTABLE_BLOCK_MEM (1024 * 1024)
9+
#define RAFTABLE_BLOCK_MEM (8*1024 * 1024)
1010
#define RAFTABLE_HASH_SIZE (127)
1111

1212
typedef struct State {

0 commit comments

Comments
 (0)