Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fa0ee84

Browse files
committed
reconnectMask maks modifications to state.c; disable MTM_RECOVERY_FINISH1 event
1 parent 526109f commit fa0ee84

File tree

5 files changed

+109
-132
lines changed

5 files changed

+109
-132
lines changed

contrib/mmts/multimaster.c

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,7 +1146,7 @@ bool MtmWatchdog(timestamp_t now)
11461146
{
11471147
MTM_ELOG(WARNING, "Heartbeat is not received from node %d during %d msec",
11481148
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
1149-
MtmOnNodeDisconnect(i+1);
1149+
MtmStateProcessNeighborEvent(i+1, MTM_NEIGHBOR_HEARTBEAT_TIMEOUT);
11501150
allAlive = false;
11511151
}
11521152
}
@@ -1757,16 +1757,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot, nodemas
17571757
} else {
17581758
globalSnapshot = MtmTx.snapshot;
17591759
}
1760-
if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1761-
{
1762-
/* In case of recovery InvalidTransactionId is passed */
1763-
MtmStateProcessEvent(MTM_RECOVERY_START1);
1764-
}
1765-
else if (Mtm->status == MTM_RECOVERY)
1766-
{
1767-
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1768-
MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1769-
}
1760+
1761+
1762+
// if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1763+
// {
1764+
// /* In case of recovery InvalidTransactionId is passed */
1765+
// MtmStateProcessEvent(MTM_RECOVERY_START1);
1766+
// }
1767+
// else if (Mtm->status == MTM_RECOVERY)
1768+
// {
1769+
// /* When recovery is completed we get normal transaction ID and switch to normal mode */
1770+
// MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1771+
// }
17701772
}
17711773

17721774
void MtmSetCurrentTransactionGID(char const* gid)
@@ -2073,7 +2075,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20732075
MtmLock(LW_EXCLUSIVE);
20742076
if (MtmIsRecoveredNode(nodeId) && Mtm->nActiveTransactions == 0) {
20752077
if (BIT_CHECK(Mtm->originLockNodeMask, nodeId-1)) {
2076-
MtmStateProcessNeighborEvent(nodeId-1, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
2078+
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20772079
} else {
20782080
MTM_LOG1("Node %d is caught-up at WAL position %llx without locking cluster", nodeId, walEndPtr);
20792081
/* We are lucky: caught-up without locking cluster! */
@@ -2274,7 +2276,6 @@ void MtmRefreshClusterStatus()
22742276
disabled = ~newClique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
22752277

22762278
if (disabled) {
2277-
// timestamp_t now = MtmGetSystemTime();
22782279
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
22792280
if (mask & 1) {
22802281
if ( i+1 == MtmNodeId )
@@ -2295,58 +2296,6 @@ void MtmRefreshClusterStatus()
22952296
}
22962297
}
22972298

2298-
2299-
/*
2300-
* This function is called in case of non-recoverable connection failure with this node.
2301-
* Non-recoverable means that connections can not be reestablish using specified number of attempts.
2302-
* It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2303-
* and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2304-
* are sent together with any arbiter message, including heartbeats.
2305-
*/
2306-
void MtmOnNodeDisconnect(int nodeId)
2307-
{
2308-
timestamp_t now = MtmGetSystemTime();
2309-
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
2310-
{
2311-
/* Node is already disabled */
2312-
return;
2313-
}
2314-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) > now)
2315-
{
2316-
/* Avoid false detection of node failure and prevent node status blinking */
2317-
return;
2318-
}
2319-
MtmLock(LW_EXCLUSIVE);
2320-
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
2321-
BIT_SET(Mtm->reconnectMask, nodeId-1);
2322-
MTM_ELOG(LOG, "Disconnect node %d connectivity mask %llx",
2323-
nodeId, SELF_CONNECTIVITY_MASK);
2324-
MtmUnlock();
2325-
}
2326-
2327-
/*
2328-
* This method is called when connection with node is reestablished
2329-
*/
2330-
void MtmOnNodeConnect(int nodeId)
2331-
{
2332-
MtmLock(LW_EXCLUSIVE);
2333-
MTM_ELOG(LOG, "Connect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
2334-
BIT_CLEAR(SELF_CONNECTIVITY_MASK, nodeId-1);
2335-
BIT_SET(Mtm->reconnectMask, nodeId-1); /* force sender to reestablish connection and send heartbeat */
2336-
MtmUnlock();
2337-
}
2338-
2339-
/*
2340-
* Set reconnect mask to force reconnection attempt to the node
2341-
*/
2342-
void MtmReconnectNode(int nodeId)
2343-
{
2344-
MtmLock(LW_EXCLUSIVE);
2345-
MTM_ELOG(LOG, "Reconnect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
2346-
BIT_SET(Mtm->reconnectMask, nodeId-1);
2347-
MtmUnlock();
2348-
}
2349-
23502299
/*
23512300
* -------------------------------------------
23522301
* Node initialization
@@ -3659,11 +3608,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36593608
} else {
36603609
MTM_LOG1("Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
36613610
}
3662-
if (!BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1)) {
3663-
MTM_ELOG(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3664-
BIT_SET(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);
3665-
MtmStateProcessEvent(MTM_WAL_SENDER_START);
3666-
}
3611+
3612+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START);
3613+
36673614
BIT_SET(Mtm->reconnectMask, MtmReplicationNodeId-1); /* arbiter should try to reestablish connection with this node */
36683615
MtmUnlock();
36693616
on_shmem_exit(MtmOnProcExit, 0);

contrib/mmts/multimaster.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,11 +401,8 @@ extern void MtmLockNode(int nodeId, LWLockMode mode);
401401
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
402402
extern void MtmUnlockNode(int nodeId);
403403
extern void MtmStopNode(int nodeId, bool dropSlot);
404-
extern void MtmReconnectNode(int nodeId);
405404
extern void MtmRecoverNode(int nodeId);
406405
extern void MtmResumeNode(int nodeId);
407-
extern void MtmOnNodeDisconnect(int nodeId);
408-
extern void MtmOnNodeConnect(int nodeId);
409406
extern void MtmWakeUpBackend(MtmTransState* ts);
410407
extern void MtmSleep(timestamp_t interval);
411408
extern void MtmAbortTransaction(MtmTransState* ts);

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ pglogical_receiver_main(Datum main_arg)
368368
PQclear(res);
369369
resetPQExpBuffer(query);
370370

371-
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECEIVER_START);
371+
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_WAL_RECEIVER_START);
372372

373373
while (!got_sigterm)
374374
{

contrib/mmts/state.c

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
char const* const MtmNeighborEventMnem[] =
77
{
88
"MTM_NEIGHBOR_CLIQUE_DISABLE",
9-
"MTM_NEIGHBOR_RECEIVER_START",
9+
"MTM_NEIGHBOR_WAL_RECEIVER_START",
10+
"MTM_NEIGHBOR_WAL_SENDER_START",
11+
"MTM_NEIGHBOR_HEARTBEAT_TIMEOUT",
1012
"MTM_NEIGHBOR_RECOVERY_CAUGHTUP"
1113
};
1214

@@ -23,9 +25,6 @@ char const* const MtmEventMnem[] =
2325
"MTM_RECOVERY_FINISH1",
2426
"MTM_RECOVERY_FINISH2",
2527

26-
"MTM_WAL_RECEIVER_START",
27-
"MTM_WAL_SENDER_START",
28-
2928
"MTM_NONRECOVERABLE_ERROR"
3029
};
3130

@@ -37,39 +36,61 @@ static void MtmSwitchClusterMode(MtmNodeStatus mode);
3736
void
3837
MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev)
3938
{
40-
MTM_LOG1("[STATE] Processing %s", MtmNeighborEventMnem[ev]);
39+
MTM_LOG1("[STATE] Processing %s for node %u", MtmNeighborEventMnem[ev], node_id);
4140

4241
switch(ev)
4342
{
4443
case MTM_NEIGHBOR_CLIQUE_DISABLE:
4544
MtmDisableNode(node_id);
4645
break;
4746

48-
case MTM_NEIGHBOR_RECEIVER_START:
49-
/*
50-
* This functions is called by pglogical receiver main function when receiver background worker is started.
51-
* We switch to ONLINE mode when all receivers are connected.
52-
* As far as background worker can be restarted multiple times, use node bitmask.
53-
*/
47+
case MTM_NEIGHBOR_HEARTBEAT_TIMEOUT:
48+
MtmLock(LW_EXCLUSIVE);
49+
BIT_SET(SELF_CONNECTIVITY_MASK, node_id - 1);
50+
BIT_SET(Mtm->reconnectMask, node_id - 1);
51+
MtmUnlock();
52+
break;
53+
54+
case MTM_NEIGHBOR_WAL_RECEIVER_START:
5455
MtmLock(LW_EXCLUSIVE);
55-
if (!BIT_CHECK(Mtm->pglogicalReceiverMask, node_id)) {
56-
BIT_SET(Mtm->pglogicalReceiverMask, node_id);
57-
if (BIT_CHECK(Mtm->disabledNodeMask, node_id)) {
56+
if (!BIT_CHECK(Mtm->pglogicalReceiverMask, node_id - 1)) {
57+
BIT_SET(Mtm->pglogicalReceiverMask, node_id - 1);
58+
if (BIT_CHECK(Mtm->disabledNodeMask, node_id - 1)) {
5859
MtmEnableNode(node_id);
5960
}
60-
MtmStateProcessEvent(MTM_WAL_RECEIVER_START);
61+
62+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
63+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
64+
{
65+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
66+
MtmSwitchClusterMode(MTM_ONLINE);
67+
}
6168
}
6269
MtmUnlock();
6370
break;
6471

72+
case MTM_NEIGHBOR_WAL_SENDER_START:
73+
if (!BIT_CHECK(Mtm->pglogicalSenderMask, node_id - 1)) {
74+
BIT_SET(Mtm->pglogicalSenderMask, node_id - 1);
75+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
76+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
77+
{
78+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
79+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
80+
MtmSwitchClusterMode(MTM_ONLINE);
81+
}
82+
}
83+
break;
84+
6585
case MTM_NEIGHBOR_RECOVERY_CAUGHTUP:
66-
Assert(BIT_CHECK(Mtm->disabledNodeMask, node_id));
67-
BIT_CLEAR(Mtm->originLockNodeMask, node_id);
68-
BIT_CLEAR(Mtm->disabledNodeMask, node_id);
69-
BIT_SET(Mtm->recoveredNodeMask, node_id);
86+
Assert(BIT_CHECK(Mtm->disabledNodeMask, node_id - 1));
87+
BIT_CLEAR(Mtm->originLockNodeMask, node_id - 1);
88+
BIT_CLEAR(Mtm->disabledNodeMask, node_id - 1);
89+
BIT_SET(Mtm->recoveredNodeMask, node_id - 1);
7090
Mtm->nLiveNodes += 1;
7191
MtmCheckQuorum();
7292
break;
93+
7394
}
7495
}
7596

@@ -114,7 +135,7 @@ MtmStateProcessEvent(MtmEvent ev)
114135
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1)
115136
{
116137
/* no quorum */
117-
MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
138+
// MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
118139
MtmSwitchClusterMode(MTM_IN_MINORITY);
119140
}
120141
else if (Mtm->status == MTM_INITIALIZATION)
@@ -185,28 +206,6 @@ MtmStateProcessEvent(MtmEvent ev)
185206
break;
186207

187208

188-
case MTM_WAL_RECEIVER_START:
189-
MTM_ELOG(LOG, "[STATE] Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
190-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
191-
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
192-
{
193-
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
194-
MtmSwitchClusterMode(MTM_ONLINE);
195-
}
196-
break;
197-
198-
199-
case MTM_WAL_SENDER_START:
200-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
201-
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
202-
{
203-
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
204-
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
205-
MtmSwitchClusterMode(MTM_ONLINE);
206-
}
207-
break;
208-
209-
210209
case MTM_NONRECOVERABLE_ERROR:
211210
// MTM_ELOG(WARNING, "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u",
212211
// edata->sqlerrcode, edata->message, getpid());
@@ -234,13 +233,10 @@ MtmSwitchClusterMode(MtmNodeStatus mode)
234233
*/
235234
void MtmDisableNode(int nodeId)
236235
{
237-
timestamp_t now = MtmGetSystemTime();
238-
MTM_ELOG(WARNING, "Disable node %d at xlog position %llx, last status change time %d msec ago", nodeId, (long64)GetXLogInsertRecPtr(),
239-
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
240236
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
241237
Mtm->nConfigChanges += 1;
242238
Mtm->nodes[nodeId-1].timeline += 1;
243-
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
239+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
244240
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
245241
if (nodeId != MtmNodeId) {
246242
Mtm->nLiveNodes -= 1;
@@ -269,7 +265,6 @@ void MtmEnableNode(int nodeId)
269265
if (nodeId != MtmNodeId) {
270266
Mtm->nLiveNodes += 1;
271267
}
272-
// MTM_ELOG(WARNING, "Enable node %d at xlog position %llx", nodeId, (long64)GetXLogInsertRecPtr());
273268
}
274269
MtmCheckQuorum();
275270
}
@@ -282,15 +277,49 @@ static void MtmCheckQuorum(void)
282277
{
283278
int nVotingNodes = MtmGetNumberOfVotingNodes();
284279

285-
if (Mtm->nLiveNodes >= nVotingNodes/2+1 || (Mtm->nLiveNodes == (nVotingNodes+1)/2 && MtmMajorNode)) { /* have quorum */
286-
if (Mtm->status == MTM_IN_MINORITY) {
287-
// MTM_LOG1("Node is in majority: disabled mask %llx", Mtm->disabledNodeMask);
280+
if (Mtm->nLiveNodes >= nVotingNodes/2+1 || (Mtm->nLiveNodes == (nVotingNodes+1)/2 && MtmMajorNode))
281+
{
282+
if (Mtm->status == MTM_IN_MINORITY)
288283
MtmSwitchClusterMode(MTM_ONLINE);
289-
}
290-
} else {
291-
if (Mtm->status == MTM_ONLINE) { /* out of quorum */
292-
// MTM_ELOG(WARNING, "Node is in minority: disabled mask %llx", Mtm->disabledNodeMask);
284+
}
285+
else
286+
{
287+
if (Mtm->status == MTM_ONLINE)
293288
MtmSwitchClusterMode(MTM_IN_MINORITY);
294-
}
295289
}
296290
}
291+
292+
void MtmOnNodeDisconnect(int nodeId)
293+
{
294+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
295+
return;
296+
297+
MTM_LOG1("[STATE] NodeDisconnect for node %u", nodeId);
298+
299+
MtmLock(LW_EXCLUSIVE);
300+
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
301+
BIT_SET(Mtm->reconnectMask, nodeId-1);
302+
MtmUnlock();
303+
}
304+
305+
void MtmOnNodeConnect(int nodeId)
306+
{
307+
if (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
308+
return;
309+
310+
MTM_LOG1("[STATE] NodeConnect for node %u", nodeId);
311+
312+
MtmLock(LW_EXCLUSIVE);
313+
BIT_CLEAR(SELF_CONNECTIVITY_MASK, nodeId-1);
314+
BIT_SET(Mtm->reconnectMask, nodeId-1);
315+
MtmUnlock();
316+
}
317+
318+
void MtmReconnectNode(int nodeId)
319+
{
320+
MTM_LOG1("[STATE] ReconnectNode for node %u", nodeId);
321+
MtmLock(LW_EXCLUSIVE);
322+
BIT_SET(Mtm->reconnectMask, nodeId-1);
323+
MtmUnlock();
324+
}
325+

contrib/mmts/state.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
typedef enum
33
{
44
MTM_NEIGHBOR_CLIQUE_DISABLE,
5-
MTM_NEIGHBOR_RECEIVER_START,
5+
MTM_NEIGHBOR_WAL_RECEIVER_START,
6+
MTM_NEIGHBOR_WAL_SENDER_START,
7+
MTM_NEIGHBOR_HEARTBEAT_TIMEOUT,
68
MTM_NEIGHBOR_RECOVERY_CAUGHTUP
79
} MtmNeighborEvent;
810

@@ -16,8 +18,6 @@ typedef enum
1618
MTM_RECOVERY_START2,
1719
MTM_RECOVERY_FINISH1,
1820
MTM_RECOVERY_FINISH2,
19-
MTM_WAL_RECEIVER_START,
20-
MTM_WAL_SENDER_START,
2121
MTM_NONRECOVERABLE_ERROR
2222
} MtmEvent;
2323

@@ -26,3 +26,7 @@ extern void MtmStateProcessEvent(MtmEvent ev);
2626
extern void MtmDisableNode(int nodeId);
2727
extern void MtmEnableNode(int nodeId);
2828

29+
extern void MtmOnNodeDisconnect(int nodeId);
30+
extern void MtmOnNodeConnect(int nodeId);
31+
extern void MtmReconnectNode(int nodeId);
32+

0 commit comments

Comments
 (0)