Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6b70fbe

Browse files
knizhnikkelvich
authored andcommitted
Handle node disconnect
1 parent 69d8dd9 commit 6b70fbe

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

multimaster.c

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ MtmCheckClusterLock()
10471047
Mtm->nNodes += Mtm->nLockers;
10481048
Mtm->nLockers = 0;
10491049
Mtm->nodeLockerMask = 0;
1050+
MtmCheckQuorum();
10501051
}
10511052
}
10521053
break;
@@ -1056,14 +1057,17 @@ MtmCheckClusterLock()
10561057
/**
10571058
* Build internode connectivity mask. 1 - means that node is disconnected.
10581059
*/
1059-
static void
1060+
static bool
10601061
MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10611062
{
10621063
int i, j, n = MtmNodes;
10631064
for (i = 0; i < n; i++) {
10641065
if (i+1 != MtmNodeId) {
10651066
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
1066-
matrix[i] = data ? *(nodemask_t*)data : 0;
1067+
if (data == NULL) {
1068+
return false;
1069+
}
1070+
matrix[i] = *(nodemask_t*)data;
10671071
} else {
10681072
matrix[i] = Mtm->connectivityMask;
10691073
}
@@ -1074,21 +1078,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10741078
matrix[i] |= ((matrix[j] >> i) & 1) << j;
10751079
}
10761080
}
1081+
return true;
10771082
}
10781083

10791084

10801085
/**
10811086
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
10821087
* This function returns false if current node is excluded from cluster, true otherwise
10831088
*/
1084-
void MtmRefreshClusterStatus(bool nowait)
1089+
bool MtmRefreshClusterStatus(bool nowait)
10851090
{
10861091
nodemask_t mask, clique;
10871092
nodemask_t matrix[MAX_NODES];
10881093
int clique_size;
10891094
int i;
10901095

1091-
MtmBuildConnectivityMatrix(matrix, nowait);
1096+
if (!MtmBuildConnectivityMatrix(matrix, nowait)) {
1097+
/* RAFT is not available */
1098+
return false;
1099+
}
10921100

10931101
clique = MtmFindMaxClique(matrix, MtmNodes, &clique_size);
10941102
if (clique_size >= MtmNodes/2+1) { /* have quorum */
@@ -1108,6 +1116,7 @@ void MtmRefreshClusterStatus(bool nowait)
11081116
BIT_CLEAR(Mtm->disabledNodeMask, i);
11091117
}
11101118
}
1119+
MtmCheckQuorum();
11111120
MtmUnlock();
11121121
if (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId-1)) {
11131122
if (Mtm->status == MTM_ONLINE) {
@@ -1120,9 +1129,27 @@ void MtmRefreshClusterStatus(bool nowait)
11201129
}
11211130
} else {
11221131
elog(WARNING, "Clique %lx has no quorum", clique);
1132+
Mtm->status = MTM_IN_MINORITY;
11231133
}
1134+
return true;
11241135
}
11251136

1137+
void MtmCheckQuorum(void)
1138+
{
1139+
if (Mtm->nNodes < MtmNodes/2+1) {
1140+
if (Mtm->status == MTM_ONLINE) { /* out of quorum */
1141+
elog(WARNING, "Node is in minority: disabled mask %lx", Mtm->disabledNodeMask);
1142+
Mtm->status = MTM_IN_MINORITY;
1143+
}
1144+
} else {
1145+
if (Mtm->status == MTM_IN_MINORITY) {
1146+
elog(WARNING, "Node is in majority: dissbled mask %lx", Mtm->disabledNodeMask);
1147+
Mtm->status = MTM_ONLINE;
1148+
}
1149+
}
1150+
}
1151+
1152+
11261153
void MtmOnNodeDisconnect(int nodeId)
11271154
{
11281155
BIT_SET(Mtm->connectivityMask, nodeId-1);
@@ -1131,7 +1158,15 @@ void MtmOnNodeDisconnect(int nodeId)
11311158
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11321159
MtmSleep(MtmKeepaliveTimeout);
11331160

1134-
MtmRefreshClusterStatus(false);
1161+
if (!MtmRefreshClusterStatus(false)) {
1162+
MtmLock(LW_EXCLUSIVE);
1163+
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1164+
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1165+
Mtm->nNodes -= 1;
1166+
MtmCheckQuorum();
1167+
}
1168+
MtmUnlock();
1169+
}
11351170
}
11361171

11371172
void MtmOnNodeConnect(int nodeId)
@@ -1633,6 +1668,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16331668
}
16341669
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
16351670
Mtm->nNodes -= 1;
1671+
MtmCheckQuorum();
16361672
if (!MtmIsBroadcast())
16371673
{
16381674
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
@@ -1647,6 +1683,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16471683
static void
16481684
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16491685
{
1686+
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
16501687
MtmOnNodeDisconnect(MtmReplicationNodeId);
16511688
}
16521689

multimaster.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
#include "pglogical_output/hooks.h"
99

1010
#define MTM_TUPLE_TRACE(fmt, ...)
11-
/*
11+
#if 0
1212
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
14-
*/
14+
#else
1515
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1616
#define MTM_TRACE(fmt, ...)
17-
/* */
17+
#endif
1818

1919
#define MULTIMASTER_NAME "multimaster"
2020
#define MULTIMASTER_SCHEMA_NAME "mtm"
@@ -72,7 +72,8 @@ typedef enum
7272
MTM_OFFLINE, /* Node is out of quorum */
7373
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
7474
MTM_ONLINE, /* Ready to receive client's queries */
75-
MTM_RECOVERY /* Node is in recovery process */
75+
MTM_RECOVERY, /* Node is in recovery process */
76+
MTM_IN_MINORITY /* Node is out of quorum */
7677
} MtmNodeStatus;
7778

7879
typedef enum
@@ -193,8 +194,10 @@ extern TransactionId MtmGetCurrentTransactionId(void);
193194
extern XidStatus MtmGetCurrentTransactionStatus(void);
194195
extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
195196
extern bool MtmIsRecoveredNode(int nodeId);
196-
extern void MtmRefreshClusterStatus(bool nowait);
197+
extern bool MtmRefreshClusterStatus(bool nowait);
197198
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
198199
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
199200
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
201+
extern void MtmCheckQuorum(void);
202+
200203
#endif

pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ pglogical_receiver_main(Datum main_arg)
240240
if (PQstatus(conn) != CONNECTION_OK)
241241
{
242242
PQfinish(conn);
243-
ereport(ERROR, (errmsg("%s: Could not establish connection to remote server",
243+
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
244244
worker_proc)));
245+
MtmOnNodeDisconnect(args->remote_node);
245246
proc_exit(1);
246247
}
247248

0 commit comments

Comments
 (0)