Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5740217

Browse files
committed
Force reconnection of arbiter after recovery completion
1 parent f42f375 commit 5740217

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

contrib/mmts/arbiter.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,19 @@ static void MtmSendHeartbeat()
350350

351351
for (i = 0; i < Mtm->nAllNodes; i++)
352352
{
353-
if (i+1 != MtmNodeId && !BIT_CHECK(busy_mask, i)
354-
&& (Mtm->status != MTM_ONLINE
355-
|| (sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i) && !BIT_CHECK(Mtm->reconnectMask, i))))
356-
{
357-
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
358-
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
359-
} else {
360-
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
353+
if (i+1 != MtmNodeId) {
354+
if (!BIT_CHECK(busy_mask, i)
355+
&& (Mtm->status != MTM_ONLINE
356+
|| (sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i))
357+
|| BIT_CHECK(Mtm->reconnectMask, i)))
358+
{
359+
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
360+
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
361+
} else {
362+
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
363+
}
364+
} else {
365+
MTM_LOG1("Do not send hearbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
361366
}
362367
}
363368
}

contrib/mmts/multimaster.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ void MtmRecoveryCompleted(void)
13491349
MtmLock(LW_EXCLUSIVE);
13501350
Mtm->recoverySlot = 0;
13511351
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1352+
Mtm->reconnectMask |= Mtm->connectivityMask; /* try to reestablish all connections */
13521353
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13531354
for (i = 0; i < Mtm->nAllNodes; i++) {
13541355
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
@@ -1469,6 +1470,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
14691470
void MtmSwitchClusterMode(MtmNodeStatus mode)
14701471
{
14711472
Mtm->status = mode;
1473+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
14721474
MTM_LOG1("Switch to %s mode", MtmNodeStatusMnem[mode]);
14731475
/* ??? Something else to do here? */
14741476
}
@@ -1603,11 +1605,10 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16031605
if (disabled) {
16041606
timestamp_t now = MtmGetSystemTime();
16051607
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
1606-
if (i+1 != MtmNodeId
1607-
&& (mask & 1) != 0
1608-
&& Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now)
1609-
{
1610-
MtmDisableNode(i+1);
1608+
if (mask & 1) {
1609+
if (Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now) {
1610+
MtmDisableNode(i+1);
1611+
}
16111612
}
16121613
}
16131614
}
@@ -1682,6 +1683,7 @@ void MtmOnNodeDisconnect(int nodeId)
16821683
MtmLock(LW_EXCLUSIVE);
16831684
BIT_SET(Mtm->connectivityMask, nodeId-1);
16841685
BIT_SET(Mtm->reconnectMask, nodeId-1);
1686+
MTM_LOG1("Disconnect node %d connectivity mask %lx", nodeId, Mtm->connectivityMask);
16851687
MtmUnlock();
16861688

16871689
if (!RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false))
@@ -1726,7 +1728,7 @@ void MtmOnNodeConnect(int nodeId)
17261728
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
17271729
MtmUnlock();
17281730

1729-
MTM_LOG1("Reconnect node %d", nodeId);
1731+
MTM_LOG1("Reconnect node %d, connectivityMask=%lx", nodeId, Mtm->connectivityMask);
17301732
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
17311733
}
17321734

0 commit comments

Comments
 (0)