Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d2097bb

Browse files
knizhnikkelvich
authored andcommitted
Arbitrator fixes
1 parent 8091c2c commit d2097bb

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

arbitrator/arbitrator.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ int main (int argc, char* argv[])
103103
queries[i] = pipes[i]->insert(sql);
104104
}
105105
sleep(cfg.timeout);
106-
enabledMask = disabledMask;
106+
enabledMask = 0;
107107
for (size_t i = 0; i < nConns; i++) {
108108
if (!BIT_CHECK(didsabledMask, i)) {
109109
if (!pipes[i]->is_finished(queries[i]))
@@ -113,8 +113,14 @@ int main (int argc, char* argv[])
113113
delete conns[i];
114114
conns[i] = NULL;
115115
} else {
116-
result r = pipes[i]->retrieve(results[i]);
117-
enabledMask &= ~r[0][0].as(nodemask_t());
116+
try {
117+
result r = pipes[i]->retrieve(results[i]);
118+
enabledMask |= r[0][0].as(nodemask_t());
119+
} catch (pqxx_exception const& x) {
120+
delete conns[i];
121+
conns[i] = NULL;
122+
fprintf(stderr, "Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
123+
}
118124
}
119125
}
120126
}

multimaster.c

+34-16
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ static void MtmEnableNode(int nodeId)
19651965
if (BIT_SET(Mtm->disabledNodeMask, nodeId-1)) {
19661966
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
19671967
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
1968+
BIT_SET(Mtm->recoveredNodeMask, nodeId-1);
19681969
Mtm->nConfigChanges += 1;
19691970
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
19701971
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
@@ -2133,6 +2134,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
21332134
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
21342135
BIT_CLEAR(Mtm->originLockNodeMask, nodeId-1);
21352136
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
2137+
BIT_SET(Mtm->recoveredNodeMask, nodeId-1);
21362138
Mtm->nLiveNodes += 1;
21372139
MtmCheckQuorum();
21382140
} else {
@@ -2272,6 +2274,22 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
22722274
}
22732275

22742276

2277+
static int MtmGetNumberOfVotingNodes()
2278+
{
2279+
int i;
2280+
int nVotingNodes = Mtm->nAllNodes;
2281+
notebask_t deadNodeMask = Mtm->deadNodeMask;
2282+
for (i = 0; deadNodeMask != 0; i++) {
2283+
if (BIT_CHECK(deadNodeMask, i)) {
2284+
if (!BIT_CHECK(newClique, i)) {
2285+
nVotingNodes -= 1;
2286+
}
2287+
BIT_CLEAR(deadNodeMask, i);
2288+
}
2289+
}
2290+
return nVotingNodes;
2291+
}
2292+
22752293
/**
22762294
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
22772295
* This function is called by arbiter monitor process with period MtmHeartbeatSendTimeout
@@ -2282,9 +2300,7 @@ void MtmRefreshClusterStatus()
22822300
nodemask_t matrix[MAX_NODES];
22832301
int cliqueSize;
22842302
nodemask_t oldClique = ~Mtm->disabledNodeMask & (((nodemask_t)1 << Mtm->nAllNodes)-1);
2285-
nodemask_t arbitratorDisabledMask;
22862303
int nVotingNodes;
2287-
int i;
22882304

22892305
MtmBuildConnectivityMatrix(matrix);
22902306
newClique = MtmFindMaxClique(matrix, Mtm->nAllNodes, &cliqueSize);
@@ -2305,16 +2321,7 @@ void MtmRefreshClusterStatus()
23052321
newClique = MtmFindMaxClique(matrix, Mtm->nAllNodes, &cliqueSize);
23062322
} while (newClique != oldClique);
23072323

2308-
nVotingNodes = Mtm->nAllNodes;
2309-
arbitratorDisabledMask = Mtm->arbitratorDisabledMask;
2310-
for (i = 0; arbitratorDisabledMask != 0; i++) {
2311-
if (BIT_CHECK(arbitratorDisabledMask, i)) {
2312-
if (!BIT_CHECK(newClique, i)) {
2313-
nVotingNodes -= 1;
2314-
}
2315-
BIT_CLEAR(arbitratorDisabledMask, i);
2316-
}
2317-
}
2324+
nVotingNodes = MtmGetNumberOfVotingNodes();
23182325
if (cliqueSize >= nVotingNodes/2+1 || (cliqueSize == (nVotingNodes+1)/2 && MtmMajorNode)) { /* have quorum */
23192326
fprintf(stderr, "Old mask: ");
23202327
for (i = 0; i < Mtm->nAllNodes; i++) {
@@ -2379,7 +2386,9 @@ void MtmRefreshClusterStatus()
23792386
*/
23802387
void MtmCheckQuorum(void)
23812388
{
2382-
if (Mtm->nLiveNodes >= Mtm->nAllNodes/2+1 || (Mtm->nLiveNodes == (Mtm->nAllNodes+1)/2 && MtmMajorNode)) { /* have quorum */
2389+
int nVotingNodes = MtmGetNumberOfVotingNodes();
2390+
2391+
if (Mtm->nLiveNodes >= nVotingNodes/2+1 || (Mtm->nLiveNodes == (nVotingNodes+1)/2 && MtmMajorNode)) { /* have quorum */
23832392
if (Mtm->status == MTM_IN_MINORITY) {
23842393
MTM_LOG1("Node is in majority: disabled mask %llx", Mtm->disabledNodeMask);
23852394
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2627,7 +2636,8 @@ static void MtmInitialize()
26272636
Mtm->disabledNodeMask = 0;
26282637
Mtm->stalledNodeMask = 0;
26292638
Mtm->stoppedNodeMask = 0;
2630-
Mtm->arbitratorDisabledMask = 0;
2639+
Mtm->deadNodeMask = 0;
2640+
Mtm->recoveredNodeMask = 0;
26312641
Mtm->pglogicalReceiverMask = 0;
26322642
Mtm->pglogicalSenderMask = 0;
26332643
Mtm->inducedLockNodeMask = 0;
@@ -5458,6 +5468,14 @@ Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
54585468

54595469
Datum mtm_arbitrator_poll(PG_FUNCTION_ARGS)
54605470
{
5461-
Mtm->arbitratorDisabledMask = PG_GETARG_INT64(0);
5462-
PG_RETURN_INT64(Mtm->disabledNodeMask);
5471+
nodemask_t recoveredNodeMask;
5472+
5473+
MtmLock(LW_EXCLUSIVE);
5474+
recoveredNodeMask = Mtm->recoveredNodeMask;
5475+
Mtm->deadNodeMask = PG_GETARG_INT64(0);
5476+
Mtm->recoveredNodeMask &= ~Mtm->deadNodeMask;
5477+
MtmCheckQuorum();
5478+
MtmUnlock();
5479+
5480+
PG_RETURN_INT64(recoveredNodeMask);
54635481
}

multimaster.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ typedef struct
278278
LWLockPadded *locks; /* multimaster lock tranche */
279279
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
280280
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
281-
nodemask_t arbitratorDisabledMask; /* Bitmask of node disabled by arbitrator */
281+
nodemask_t deadNodeMask; /* Bitmask of nodes considered as dead by arbitrator */
282+
nodemask_t recoveredNodeMask; /* Bitmask of nodes recoverd after been reported as dead by arbitrator */
282283
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
283284
nodemask_t stoppedNodeMask; /* Bitmask of stopped (permanently disabled nodes) */
284285
nodemask_t pglogicalReceiverMask; /* bitmask of started pglogic receivers */

0 commit comments

Comments
 (0)