Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 67de224

Browse files
committed
reconnectMask maks modifications to state.c; disable MTM_RECOVERY_FINISH1 event
1 parent a310215 commit 67de224

File tree

5 files changed

+109
-132
lines changed

5 files changed

+109
-132
lines changed

multimaster.c

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ bool MtmWatchdog(timestamp_t now)
11471147
{
11481148
MTM_ELOG(WARNING, "Heartbeat is not received from node %d during %d msec",
11491149
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
1150-
MtmOnNodeDisconnect(i+1);
1150+
MtmStateProcessNeighborEvent(i+1, MTM_NEIGHBOR_HEARTBEAT_TIMEOUT);
11511151
allAlive = false;
11521152
}
11531153
}
@@ -1758,16 +1758,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot, nodemas
17581758
} else {
17591759
globalSnapshot = MtmTx.snapshot;
17601760
}
1761-
if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1762-
{
1763-
/* In case of recovery InvalidTransactionId is passed */
1764-
MtmStateProcessEvent(MTM_RECOVERY_START1);
1765-
}
1766-
else if (Mtm->status == MTM_RECOVERY)
1767-
{
1768-
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1769-
MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1770-
}
1761+
1762+
1763+
// if (!TransactionIdIsValid(gtid->xid) && Mtm->status != MTM_RECOVERY)
1764+
// {
1765+
// /* In case of recovery InvalidTransactionId is passed */
1766+
// MtmStateProcessEvent(MTM_RECOVERY_START1);
1767+
// }
1768+
// else if (Mtm->status == MTM_RECOVERY)
1769+
// {
1770+
// /* When recovery is completed we get normal transaction ID and switch to normal mode */
1771+
// MtmStateProcessEvent(MTM_RECOVERY_FINISH1);
1772+
// }
17711773
}
17721774

17731775
void MtmSetCurrentTransactionGID(char const* gid)
@@ -2074,7 +2076,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20742076
MtmLock(LW_EXCLUSIVE);
20752077
if (MtmIsRecoveredNode(nodeId) && Mtm->nActiveTransactions == 0) {
20762078
if (BIT_CHECK(Mtm->originLockNodeMask, nodeId-1)) {
2077-
MtmStateProcessNeighborEvent(nodeId-1, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
2079+
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20782080
} else {
20792081
MTM_LOG1("Node %d is caught-up at WAL position %llx without locking cluster", nodeId, walEndPtr);
20802082
/* We are lucky: caught-up without locking cluster! */
@@ -2275,7 +2277,6 @@ void MtmRefreshClusterStatus()
22752277
disabled = ~newClique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
22762278

22772279
if (disabled) {
2278-
// timestamp_t now = MtmGetSystemTime();
22792280
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
22802281
if (mask & 1) {
22812282
if ( i+1 == MtmNodeId )
@@ -2296,58 +2297,6 @@ void MtmRefreshClusterStatus()
22962297
}
22972298
}
22982299

2299-
2300-
/*
2301-
* This function is called in case of non-recoverable connection failure with this node.
2302-
* Non-recoverable means that connections can not be reestablish using specified number of attempts.
2303-
* It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2304-
* and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2305-
* are sent together with any arbiter message, including heartbeats.
2306-
*/
2307-
void MtmOnNodeDisconnect(int nodeId)
2308-
{
2309-
timestamp_t now = MtmGetSystemTime();
2310-
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
2311-
{
2312-
/* Node is already disabled */
2313-
return;
2314-
}
2315-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) > now)
2316-
{
2317-
/* Avoid false detection of node failure and prevent node status blinking */
2318-
return;
2319-
}
2320-
MtmLock(LW_EXCLUSIVE);
2321-
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
2322-
BIT_SET(Mtm->reconnectMask, nodeId-1);
2323-
MTM_ELOG(LOG, "Disconnect node %d connectivity mask %llx",
2324-
nodeId, SELF_CONNECTIVITY_MASK);
2325-
MtmUnlock();
2326-
}
2327-
2328-
/*
2329-
* This method is called when connection with node is reestablished
2330-
*/
2331-
void MtmOnNodeConnect(int nodeId)
2332-
{
2333-
MtmLock(LW_EXCLUSIVE);
2334-
MTM_ELOG(LOG, "Connect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
2335-
BIT_CLEAR(SELF_CONNECTIVITY_MASK, nodeId-1);
2336-
BIT_SET(Mtm->reconnectMask, nodeId-1); /* force sender to reestablish connection and send heartbeat */
2337-
MtmUnlock();
2338-
}
2339-
2340-
/*
2341-
* Set reconnect mask to force reconnection attempt to the node
2342-
*/
2343-
void MtmReconnectNode(int nodeId)
2344-
{
2345-
MtmLock(LW_EXCLUSIVE);
2346-
MTM_ELOG(LOG, "Reconnect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
2347-
BIT_SET(Mtm->reconnectMask, nodeId-1);
2348-
MtmUnlock();
2349-
}
2350-
23512300
/*
23522301
* -------------------------------------------
23532302
* Node initialization
@@ -3673,11 +3622,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36733622
} else {
36743623
MTM_LOG1("Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
36753624
}
3676-
if (!BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1)) {
3677-
MTM_ELOG(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3678-
BIT_SET(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);
3679-
MtmStateProcessEvent(MTM_WAL_SENDER_START);
3680-
}
3625+
3626+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START);
3627+
36813628
BIT_SET(Mtm->reconnectMask, MtmReplicationNodeId-1); /* arbiter should try to reestablish connection with this node */
36823629
MtmUnlock();
36833630
on_shmem_exit(MtmOnProcExit, 0);

multimaster.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,8 @@ extern void MtmLockNode(int nodeId, LWLockMode mode);
402402
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
403403
extern void MtmUnlockNode(int nodeId);
404404
extern void MtmStopNode(int nodeId, bool dropSlot);
405-
extern void MtmReconnectNode(int nodeId);
406405
extern void MtmRecoverNode(int nodeId);
407406
extern void MtmResumeNode(int nodeId);
408-
extern void MtmOnNodeDisconnect(int nodeId);
409-
extern void MtmOnNodeConnect(int nodeId);
410407
extern void MtmWakeUpBackend(MtmTransState* ts);
411408
extern void MtmSleep(timestamp_t interval);
412409
extern void MtmAbortTransaction(MtmTransState* ts);

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ pglogical_receiver_main(Datum main_arg)
370370
PQclear(res);
371371
resetPQExpBuffer(query);
372372

373-
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECEIVER_START);
373+
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_WAL_RECEIVER_START);
374374

375375
while (!got_sigterm)
376376
{

state.c

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
char const* const MtmNeighborEventMnem[] =
77
{
88
"MTM_NEIGHBOR_CLIQUE_DISABLE",
9-
"MTM_NEIGHBOR_RECEIVER_START",
9+
"MTM_NEIGHBOR_WAL_RECEIVER_START",
10+
"MTM_NEIGHBOR_WAL_SENDER_START",
11+
"MTM_NEIGHBOR_HEARTBEAT_TIMEOUT",
1012
"MTM_NEIGHBOR_RECOVERY_CAUGHTUP"
1113
};
1214

@@ -23,9 +25,6 @@ char const* const MtmEventMnem[] =
2325
"MTM_RECOVERY_FINISH1",
2426
"MTM_RECOVERY_FINISH2",
2527

26-
"MTM_WAL_RECEIVER_START",
27-
"MTM_WAL_SENDER_START",
28-
2928
"MTM_NONRECOVERABLE_ERROR"
3029
};
3130

@@ -37,39 +36,61 @@ static void MtmSwitchClusterMode(MtmNodeStatus mode);
3736
void
3837
MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev)
3938
{
40-
MTM_LOG1("[STATE] Processing %s", MtmNeighborEventMnem[ev]);
39+
MTM_LOG1("[STATE] Processing %s for node %u", MtmNeighborEventMnem[ev], node_id);
4140

4241
switch(ev)
4342
{
4443
case MTM_NEIGHBOR_CLIQUE_DISABLE:
4544
MtmDisableNode(node_id);
4645
break;
4746

48-
case MTM_NEIGHBOR_RECEIVER_START:
49-
/*
50-
* This functions is called by pglogical receiver main function when receiver background worker is started.
51-
* We switch to ONLINE mode when all receivers are connected.
52-
* As far as background worker can be restarted multiple times, use node bitmask.
53-
*/
47+
case MTM_NEIGHBOR_HEARTBEAT_TIMEOUT:
48+
MtmLock(LW_EXCLUSIVE);
49+
BIT_SET(SELF_CONNECTIVITY_MASK, node_id - 1);
50+
BIT_SET(Mtm->reconnectMask, node_id - 1);
51+
MtmUnlock();
52+
break;
53+
54+
case MTM_NEIGHBOR_WAL_RECEIVER_START:
5455
MtmLock(LW_EXCLUSIVE);
55-
if (!BIT_CHECK(Mtm->pglogicalReceiverMask, node_id)) {
56-
BIT_SET(Mtm->pglogicalReceiverMask, node_id);
57-
if (BIT_CHECK(Mtm->disabledNodeMask, node_id)) {
56+
if (!BIT_CHECK(Mtm->pglogicalReceiverMask, node_id - 1)) {
57+
BIT_SET(Mtm->pglogicalReceiverMask, node_id - 1);
58+
if (BIT_CHECK(Mtm->disabledNodeMask, node_id - 1)) {
5859
MtmEnableNode(node_id);
5960
}
60-
MtmStateProcessEvent(MTM_WAL_RECEIVER_START);
61+
62+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
63+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
64+
{
65+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
66+
MtmSwitchClusterMode(MTM_ONLINE);
67+
}
6168
}
6269
MtmUnlock();
6370
break;
6471

72+
case MTM_NEIGHBOR_WAL_SENDER_START:
73+
if (!BIT_CHECK(Mtm->pglogicalSenderMask, node_id - 1)) {
74+
BIT_SET(Mtm->pglogicalSenderMask, node_id - 1);
75+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
76+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
77+
{
78+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
79+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
80+
MtmSwitchClusterMode(MTM_ONLINE);
81+
}
82+
}
83+
break;
84+
6585
case MTM_NEIGHBOR_RECOVERY_CAUGHTUP:
66-
Assert(BIT_CHECK(Mtm->disabledNodeMask, node_id));
67-
BIT_CLEAR(Mtm->originLockNodeMask, node_id);
68-
BIT_CLEAR(Mtm->disabledNodeMask, node_id);
69-
BIT_SET(Mtm->recoveredNodeMask, node_id);
86+
Assert(BIT_CHECK(Mtm->disabledNodeMask, node_id - 1));
87+
BIT_CLEAR(Mtm->originLockNodeMask, node_id - 1);
88+
BIT_CLEAR(Mtm->disabledNodeMask, node_id - 1);
89+
BIT_SET(Mtm->recoveredNodeMask, node_id - 1);
7090
Mtm->nLiveNodes += 1;
7191
MtmCheckQuorum();
7292
break;
93+
7394
}
7495
}
7596

@@ -114,7 +135,7 @@ MtmStateProcessEvent(MtmEvent ev)
114135
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1)
115136
{
116137
/* no quorum */
117-
MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
138+
// MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
118139
MtmSwitchClusterMode(MTM_IN_MINORITY);
119140
}
120141
else if (Mtm->status == MTM_INITIALIZATION)
@@ -185,28 +206,6 @@ MtmStateProcessEvent(MtmEvent ev)
185206
break;
186207

187208

188-
case MTM_WAL_RECEIVER_START:
189-
MTM_ELOG(LOG, "[STATE] Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
190-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
191-
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
192-
{
193-
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
194-
MtmSwitchClusterMode(MTM_ONLINE);
195-
}
196-
break;
197-
198-
199-
case MTM_WAL_SENDER_START:
200-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
201-
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
202-
{
203-
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
204-
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
205-
MtmSwitchClusterMode(MTM_ONLINE);
206-
}
207-
break;
208-
209-
210209
case MTM_NONRECOVERABLE_ERROR:
211210
// MTM_ELOG(WARNING, "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u",
212211
// edata->sqlerrcode, edata->message, getpid());
@@ -234,13 +233,10 @@ MtmSwitchClusterMode(MtmNodeStatus mode)
234233
*/
235234
void MtmDisableNode(int nodeId)
236235
{
237-
timestamp_t now = MtmGetSystemTime();
238-
MTM_ELOG(WARNING, "Disable node %d at xlog position %llx, last status change time %d msec ago", nodeId, (long64)GetXLogInsertRecPtr(),
239-
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
240236
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
241237
Mtm->nConfigChanges += 1;
242238
Mtm->nodes[nodeId-1].timeline += 1;
243-
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
239+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
244240
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
245241
if (nodeId != MtmNodeId) {
246242
Mtm->nLiveNodes -= 1;
@@ -269,7 +265,6 @@ void MtmEnableNode(int nodeId)
269265
if (nodeId != MtmNodeId) {
270266
Mtm->nLiveNodes += 1;
271267
}
272-
// MTM_ELOG(WARNING, "Enable node %d at xlog position %llx", nodeId, (long64)GetXLogInsertRecPtr());
273268
}
274269
MtmCheckQuorum();
275270
}
@@ -282,15 +277,49 @@ static void MtmCheckQuorum(void)
282277
{
283278
int nVotingNodes = MtmGetNumberOfVotingNodes();
284279

285-
if (Mtm->nLiveNodes >= nVotingNodes/2+1 || (Mtm->nLiveNodes == (nVotingNodes+1)/2 && MtmMajorNode)) { /* have quorum */
286-
if (Mtm->status == MTM_IN_MINORITY) {
287-
// MTM_LOG1("Node is in majority: disabled mask %llx", Mtm->disabledNodeMask);
280+
if (Mtm->nLiveNodes >= nVotingNodes/2+1 || (Mtm->nLiveNodes == (nVotingNodes+1)/2 && MtmMajorNode))
281+
{
282+
if (Mtm->status == MTM_IN_MINORITY)
288283
MtmSwitchClusterMode(MTM_ONLINE);
289-
}
290-
} else {
291-
if (Mtm->status == MTM_ONLINE) { /* out of quorum */
292-
// MTM_ELOG(WARNING, "Node is in minority: disabled mask %llx", Mtm->disabledNodeMask);
284+
}
285+
else
286+
{
287+
if (Mtm->status == MTM_ONLINE)
293288
MtmSwitchClusterMode(MTM_IN_MINORITY);
294-
}
295289
}
296290
}
291+
292+
void MtmOnNodeDisconnect(int nodeId)
293+
{
294+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
295+
return;
296+
297+
MTM_LOG1("[STATE] NodeDisconnect for node %u", nodeId);
298+
299+
MtmLock(LW_EXCLUSIVE);
300+
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
301+
BIT_SET(Mtm->reconnectMask, nodeId-1);
302+
MtmUnlock();
303+
}
304+
305+
void MtmOnNodeConnect(int nodeId)
306+
{
307+
if (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
308+
return;
309+
310+
MTM_LOG1("[STATE] NodeConnect for node %u", nodeId);
311+
312+
MtmLock(LW_EXCLUSIVE);
313+
BIT_CLEAR(SELF_CONNECTIVITY_MASK, nodeId-1);
314+
BIT_SET(Mtm->reconnectMask, nodeId-1);
315+
MtmUnlock();
316+
}
317+
318+
void MtmReconnectNode(int nodeId)
319+
{
320+
MTM_LOG1("[STATE] ReconnectNode for node %u", nodeId);
321+
MtmLock(LW_EXCLUSIVE);
322+
BIT_SET(Mtm->reconnectMask, nodeId-1);
323+
MtmUnlock();
324+
}
325+

state.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
typedef enum
33
{
44
MTM_NEIGHBOR_CLIQUE_DISABLE,
5-
MTM_NEIGHBOR_RECEIVER_START,
5+
MTM_NEIGHBOR_WAL_RECEIVER_START,
6+
MTM_NEIGHBOR_WAL_SENDER_START,
7+
MTM_NEIGHBOR_HEARTBEAT_TIMEOUT,
68
MTM_NEIGHBOR_RECOVERY_CAUGHTUP
79
} MtmNeighborEvent;
810

@@ -16,8 +18,6 @@ typedef enum
1618
MTM_RECOVERY_START2,
1719
MTM_RECOVERY_FINISH1,
1820
MTM_RECOVERY_FINISH2,
19-
MTM_WAL_RECEIVER_START,
20-
MTM_WAL_SENDER_START,
2121
MTM_NONRECOVERABLE_ERROR
2222
} MtmEvent;
2323

@@ -26,3 +26,7 @@ extern void MtmStateProcessEvent(MtmEvent ev);
2626
extern void MtmDisableNode(int nodeId);
2727
extern void MtmEnableNode(int nodeId);
2828

29+
extern void MtmOnNodeDisconnect(int nodeId);
30+
extern void MtmOnNodeConnect(int nodeId);
31+
extern void MtmReconnectNode(int nodeId);
32+

0 commit comments

Comments
 (0)