Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit de99b01

Browse files
knizhnikkelvich
authored andcommitted
Implement internal heartbeat for multimaster
1 parent 8a26667 commit de99b01

File tree

3 files changed

+147
-52
lines changed

3 files changed

+147
-52
lines changed

arbiter.c

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "utils/array.h"
4545
#include "utils/builtins.h"
4646
#include "utils/memutils.h"
47+
#include "utils/timeout.h"
4748
#include "commands/dbcommands.h"
4849
#include "miscadmin.h"
4950
#include "postmaster/autovacuum.h"
@@ -101,22 +102,23 @@ typedef struct
101102

102103
static int* sockets;
103104
static int gateway;
105+
static bool send_heartbeat;
104106

105107
static void MtmTransSender(Datum arg);
106108
static void MtmTransReceiver(Datum arg);
107109

108-
/*
109-
* static char const* const messageText[] =
110-
* {
111-
* "INVALID",
112-
* "HANDSHAKE",
113-
* "READY",
114-
* "PREPARE",
115-
* "PREPARED",
116-
* "ABORTED",
117-
* "STATUS"
118-
*};
119-
*/
110+
111+
static char const* const messageText[] =
112+
{
113+
"INVALID",
114+
"HANDSHAKE",
115+
"READY",
116+
"PREPARE",
117+
"PREPARED",
118+
"ABORTED",
119+
"STATUS",
120+
"HEARTBEAT"
121+
};
120122

121123
static BackgroundWorker MtmSender = {
122124
"mtm-sender",
@@ -513,14 +515,19 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
513515
}
514516
buf->used = 0;
515517
}
516-
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
517-
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
518-
519-
Assert(ts->cmd != MSG_INVALID);
520-
buf->data[buf->used].code = ts->cmd;
521518
buf->data[buf->used].dxid = xid;
522-
buf->data[buf->used].sxid = ts->xid;
523-
buf->data[buf->used].csn = ts->csn;
519+
520+
if (ts != NULL) {
521+
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
522+
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
523+
Assert(ts->cmd != MSG_INVALID);
524+
buf->data[buf->used].code = ts->cmd;
525+
buf->data[buf->used].sxid = ts->xid;
526+
buf->data[buf->used].csn = ts->csn;
527+
} else {
528+
buf->data[buf->used].code = MSG_HEARTBEAT;
529+
MTM_LOG3("Send HEARTBEAT message to node %d from node %d\n", node+1, MtmNodeId);
530+
}
524531
buf->data[buf->used].node = MtmNodeId;
525532
buf->data[buf->used].disabledNodeMask = Mtm->disabledNodeMask;
526533
buf->data[buf->used].oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
@@ -533,15 +540,21 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
533540
int n = 1;
534541
for (i = 0; i < Mtm->nAllNodes; i++)
535542
{
536-
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i])) {
543+
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && (ts == NULL || TransactionIdIsValid(ts->xids[i]))) {
537544
Assert(i+1 != MtmNodeId);
538-
MtmAppendBuffer(txBuffer, ts->xids[i], i, ts);
545+
MtmAppendBuffer(txBuffer, ts ? ts->xids[i] : InvalidTransactionId, i, ts);
539546
n += 1;
540547
}
541548
}
542549
Assert(n == Mtm->nLiveNodes);
543550
}
544551

552+
static void MtmSendHeartbeat()
553+
{
554+
send_heartbeat = true;
555+
PGSemaphoreUnlock(&Mtm->votingSemaphore);
556+
}
557+
545558

546559
static void MtmTransSender(Datum arg)
547560
{
@@ -556,6 +569,8 @@ static void MtmTransSender(Datum arg)
556569
sigfillset(&sset);
557570
sigprocmask(SIG_UNBLOCK, &sset, NULL);
558571

572+
RegisterTimeout(USER_TIMEOUT, MtmSendHeartbeat);
573+
559574
MtmOpenConnections();
560575

561576
for (i = 0; i < nNodes; i++) {
@@ -567,6 +582,10 @@ static void MtmTransSender(Datum arg)
567582
PGSemaphoreLock(&Mtm->votingSemaphore);
568583
CHECK_FOR_INTERRUPTS();
569584

585+
if (send_heartbeat) {
586+
send_heartbeat = false;
587+
MtmBroadcastMessage(txBuffer, NULL);
588+
}
570589
/*
571590
* Use shared lock to improve locality,
572591
* because all other process modifying this list are using exclusive lock
@@ -700,15 +719,22 @@ static void MtmTransReceiver(Datum arg)
700719

701720
for (j = 0; j < nResponses; j++) {
702721
MtmArbiterMessage* msg = &rxBuffer[i].data[j];
703-
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
704-
Assert(ts != NULL);
722+
MtmTransState* ts;
723+
705724
Assert(msg->node > 0 && msg->node <= nNodes && msg->node != MtmNodeId);
725+
Mtm->nodes[msg->node-1].oldestSnapshot = msg->oldestSnapshot;
726+
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
727+
728+
if (msg->code == MSG_HEARTBEAT) {
729+
continue;
730+
}
731+
ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
732+
Assert(ts != NULL);
706733

707734
if (BIT_CHECK(msg->disabledNodeMask, MtmNodeId-1) && Mtm->status != MTM_RECOVERY) {
708735
elog(PANIC, "Node %d thinks that I was dead: perform hara-kiri not to be a zombie", msg->node);
709736
}
710-
Mtm->nodes[msg->node-1].oldestSnapshot = msg->oldestSnapshot;
711-
737+
712738
if (MtmIsCoordinator(ts)) {
713739
switch (msg->code) {
714740
case MSG_READY:
@@ -768,7 +794,7 @@ static void MtmTransReceiver(Datum arg)
768794
} else {
769795
switch (msg->code) {
770796
case MSG_PREPARE:
771-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
797+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
772798
ts->status = TRANSACTION_STATUS_UNKNOWN;
773799
ts->csn = MtmAssignCSN();
774800
MtmAdjustSubtransactions(ts);

multimaster.c

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ int MtmReconnectAttempts;
191191
int MtmNodeDisableDelay;
192192
int MtmTransSpillThreshold;
193193
int MtmMaxNodes;
194+
int MtmHeartbeatSendTimeout;
195+
int MtmHeartbeatRecvTimeout;
194196
bool MtmUseRaftable;
195197
bool MtmUseDtm;
196198

@@ -741,6 +743,27 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
741743

742744
}
743745

746+
/*
747+
* Check heartbeats
748+
*/
749+
static void MtmWatchdog()
750+
{
751+
int i, n = Mtm->nAllNodes;
752+
timestamp_t now = MtmGetSystemTime();
753+
for (i = 0; i < n; i++) {
754+
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
755+
if (Mtm->nodes[i].lastHeartbeat != 0
756+
&& now > Mtm->nodes[i].lastHeartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
757+
{
758+
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago",
759+
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
760+
MtmOnNodeDisconnect(i+1);
761+
}
762+
}
763+
}
764+
}
765+
766+
744767
static void
745768
MtmPostPrepareTransaction(MtmCurrentTrans* x)
746769
{
@@ -770,14 +793,24 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
770793
MtmUnlock();
771794
MtmResetTransaction(x);
772795
} else {
773-
time_t timeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
796+
time_t transTimeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
797+
time_t timeout = transTimeout < MtmHeartbeatRecvTimeout ? transTimeout : MtmHeartbeatRecvTimeout;
798+
timestamp_t deadline = MtmGetSystemTime() + MSEC_TO_USEC(transTimeout);
774799
int result = 0;
775800
int nConfigChanges = Mtm->nConfigChanges;
776801
/* wait votes from all nodes */
777-
while (!ts->votingCompleted && !(result & WL_TIMEOUT)) {
802+
while (!ts->votingCompleted) {
778803
MtmUnlock();
804+
MtmWatchdog();
779805
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
780-
ResetLatch(&MyProc->procLatch);
806+
if (result & WL_TIMEOUT) {
807+
if (MtmGetSystemTime() > deadline) {
808+
MtmLock(LW_SHARED);
809+
break;
810+
}
811+
} else {
812+
ResetLatch(&MyProc->procLatch);
813+
}
781814
MtmLock(LW_SHARED);
782815
}
783816
if (!ts->votingCompleted) {
@@ -1022,6 +1055,22 @@ void MtmHandleApplyError(void)
10221055
}
10231056

10241057

1058+
static void MtmDisableNode(int nodeId)
1059+
{
1060+
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1061+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1062+
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1063+
Mtm->nLiveNodes -= 1;
1064+
}
1065+
1066+
static void MtmEnableNode(int nodeId)
1067+
{
1068+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1069+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1070+
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1071+
Mtm->nLiveNodes += 1;
1072+
}
1073+
10251074
void MtmRecoveryCompleted(void)
10261075
{
10271076
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
@@ -1116,9 +1165,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11161165
MTM_LOG1("%d: node %d is caugth-up without locking cluster", MyProcPid, nodeId);
11171166
/* We are lucky: caugth-up without locking cluster! */
11181167
}
1119-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1120-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1121-
Mtm->nLiveNodes += 1;
1168+
MtmEnableNode(nodeId);
11221169
Mtm->nConfigChanges += 1;
11231170
caughtUp = true;
11241171
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
@@ -1261,17 +1308,13 @@ bool MtmRefreshClusterStatus(bool nowait)
12611308
mask = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
12621309
for (i = 0; mask != 0; i++, mask >>= 1) {
12631310
if (mask & 1) {
1264-
Mtm->nLiveNodes -= 1;
1265-
BIT_SET(Mtm->disabledNodeMask, i);
1266-
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
1311+
MtmDisableNode(i+1);
12671312
}
12681313
}
12691314
mask = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
12701315
for (i = 0; mask != 0; i++, mask >>= 1) {
12711316
if (mask & 1) {
1272-
Mtm->nLiveNodes += 1;
1273-
BIT_CLEAR(Mtm->disabledNodeMask, i);
1274-
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
1317+
MtmEnableNode(i+1);
12751318
}
12761319
}
12771320
MtmCheckQuorum();
@@ -1316,7 +1359,6 @@ void MtmOnNodeDisconnect(int nodeId)
13161359
/* Avoid false detection of node failure and prevent node status blinking */
13171360
return;
13181361
}
1319-
13201362
BIT_SET(Mtm->connectivityMask, nodeId-1);
13211363
BIT_SET(Mtm->reconnectMask, nodeId-1);
13221364
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
@@ -1327,9 +1369,7 @@ void MtmOnNodeDisconnect(int nodeId)
13271369
if (!MtmRefreshClusterStatus(false)) {
13281370
MtmLock(LW_EXCLUSIVE);
13291371
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1330-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1331-
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1332-
Mtm->nLiveNodes -= 1;
1372+
MtmDisableNode(nodeId);
13331373
MtmCheckQuorum();
13341374
/* Interrupt voting for active transaction and abort them */
13351375
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
@@ -1503,6 +1543,7 @@ static void MtmInitialize()
15031543
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
15041544
Mtm->nodes[i].con = MtmConnections[i];
15051545
Mtm->nodes[i].flushPos = 0;
1546+
Mtm->nodes[i].lastHeartbeat = 0;
15061547
}
15071548
PGSemaphoreCreate(&Mtm->votingSemaphore);
15081549
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1627,6 +1668,36 @@ _PG_init(void)
16271668
if (!process_shared_preload_libraries_in_progress)
16281669
return;
16291670

1671+
DefineCustomIntVariable(
1672+
"multimaster.heartbeat_send_timeout",
1673+
"Timeout in milliseconds of sending heartbeat messages",
1674+
"Period of broadcasting heartbeat messages by abiter to all nodes",
1675+
&MtmHeartbeatSendTimeout,
1676+
1000,
1677+
1,
1678+
INT_MAX,
1679+
PGC_BACKEND,
1680+
0,
1681+
NULL,
1682+
NULL,
1683+
NULL
1684+
);
1685+
1686+
DefineCustomIntVariable(
1687+
"multimaster.heartbeat_recv_timeout",
1688+
"Timeout in milliseconds of receiving heartbeat messages",
1689+
"If no heartbeat message is received from node within this period, it assumed to be dead",
1690+
&MtmHeartbeatRecvTimeout,
1691+
2000,
1692+
1,
1693+
INT_MAX,
1694+
PGC_BACKEND,
1695+
0,
1696+
NULL,
1697+
NULL,
1698+
NULL
1699+
);
1700+
16301701
DefineCustomIntVariable(
16311702
"multimaster.gc_period",
16321703
"Number of distributed transactions after which garbage collection is started",
@@ -2056,9 +2127,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20562127
{
20572128
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nLiveNodes);
20582129
}
2059-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2060-
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
2061-
Mtm->nLiveNodes -= 1;
2130+
MtmDisableNode(nodeId);
20622131
MtmCheckQuorum();
20632132
if (!MtmIsBroadcast())
20642133
{
@@ -2110,17 +2179,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
21102179
if (MtmIsRecoverySession) {
21112180
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
21122181
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
2113-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2114-
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
2115-
Mtm->nLiveNodes -= 1;
2182+
MtmDisableNode(MtmReplicationNodeId);
21162183
MtmCheckQuorum();
21172184
}
21182185
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
21192186
if (recoveryCompleted) {
21202187
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
2121-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2122-
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
2123-
Mtm->nLiveNodes += 1;
2188+
MtmEnableNode(MtmReplicationNodeId);
21242189
MtmCheckQuorum();
21252190
} else {
21262191
elog(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);

multimaster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
MSG_PREPARE,
9393
MSG_PREPARED,
9494
MSG_ABORTED,
95-
MSG_STATUS
95+
MSG_STATUS,
96+
MSG_HEARTBEAT
9697
} MtmMessageCode;
9798

9899
typedef enum
@@ -127,6 +128,7 @@ typedef struct
127128
timestamp_t lastStatusChangeTime;
128129
timestamp_t receiverStartTime;
129130
timestamp_t senderStartTime;
131+
timestamp_t lastHeartbeat;
130132
int senderPid;
131133
int receiverPid;
132134
XLogRecPtr flushPos;
@@ -218,6 +220,8 @@ extern int MtmReconnectAttempts;
218220
extern int MtmKeepaliveTimeout;
219221
extern int MtmNodeDisableDelay;
220222
extern int MtmTransSpillThreshold;
223+
extern int MtmHeartbeatSendTimeout;
224+
extern int MtmHeartbeatRecvTimeout;
221225
extern bool MtmUseDtm;
222226
extern HTAB* MtmXid2State;
223227

0 commit comments

Comments
 (0)