Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4be7f76

Browse files
committed
Clean referee decision when all nodes are online; do not send requests to referee continously from disabled node
1 parent 5135a59 commit 4be7f76

8 files changed

+150
-52
lines changed

multimaster--1.0.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,11 @@ END
170170
$$
171171
LANGUAGE plpgsql;
172172

173-
CREATE OR REPLACE FUNCTION mtm.referee_clean() RETURNS void AS
173+
CREATE OR REPLACE FUNCTION mtm.referee_clean() RETURNS bool AS
174174
$$
175+
BEGIN
175176
delete from mtm.referee_decision where key = 'winner';
177+
return 'true';
178+
END
176179
$$
177-
LANGUAGE sql;
180+
LANGUAGE plpgsql;

multimaster.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2376,6 +2376,7 @@ static void MtmInitialize()
23762376
Mtm->disabledNodeMask = (((nodemask_t)1 << MtmNodes) - 1);
23772377
Mtm->clique = 0;
23782378
Mtm->refereeGrant = false;
2379+
Mtm->refereeWinnerId = 0;
23792380
Mtm->stalledNodeMask = 0;
23802381
Mtm->stoppedNodeMask = 0;
23812382
Mtm->deadNodeMask = 0;
@@ -4158,15 +4159,42 @@ static void erase_option_from_connstr(const char *option, char *connstr)
41584159
pfree(needle);
41594160
}
41604161

4161-
PGconn *PQconnectdb_safe(const char *conninfo)
4162+
PGconn *
4163+
PQconnectdb_safe(const char *conninfo, int timeout)
41624164
{
41634165
PGconn *conn;
4166+
struct timeval tv = { timeout, 0 };
41644167
char *safe_connstr = pstrdup(conninfo);
4165-
erase_option_from_connstr("arbiter_port", safe_connstr);
41664168

4167-
conn = PQconnectdb(safe_connstr);
4169+
/* XXXX add timeout to connstring if set */
41684170

4171+
erase_option_from_connstr("arbiter_port", safe_connstr);
4172+
conn = PQconnectdb(safe_connstr);
41694173
pfree(safe_connstr);
4174+
4175+
if (PQstatus(conn) != CONNECTION_OK)
4176+
{
4177+
MTM_ELOG(WARNING, "Could not connect to '%s': %s",
4178+
safe_connstr, PQerrorMessage(conn));
4179+
}
4180+
4181+
if (timeout != 0)
4182+
{
4183+
int socket_fd = PQsocket(conn);
4184+
4185+
if (socket_fd < 0)
4186+
{
4187+
MTM_ELOG(WARNING, "Referee socket is invalid");
4188+
}
4189+
4190+
if (setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO,
4191+
(char *)&tv, sizeof(tv)) < 0)
4192+
{
4193+
MTM_ELOG(WARNING, "Could not set referee socket timeout: %s",
4194+
strerror(errno));
4195+
}
4196+
}
4197+
41704198
return conn;
41714199
}
41724200

@@ -4202,7 +4230,7 @@ mtm_collect_cluster_info(PG_FUNCTION_ARGS)
42024230
SRF_RETURN_DONE(funcctx);
42034231
}
42044232

4205-
conn = PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
4233+
conn = PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr, 0);
42064234
if (PQstatus(conn) != CONNECTION_OK)
42074235
{
42084236
MTM_ELOG(WARNING, "Failed to establish connection '%s' to node %d: error = %s", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId, PQerrorMessage(conn));
@@ -4411,7 +4439,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int force
44114439
{
44124440
if (!BIT_CHECK(disabledNodeMask, i) || (i + 1 == forceOnNode))
44134441
{
4414-
conns[i] = PQconnectdb_safe(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
4442+
conns[i] = PQconnectdb_safe(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE), 0);
44154443
if (PQstatus(conns[i]) != CONNECTION_OK)
44164444
{
44174445
if (ignoreError)

multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ typedef struct
289289
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
290290
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
291291
nodemask_t clique; /* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
292-
bool refereeGrant; /* Referee allowed us to work with half of the nodes */
292+
bool refereeGrant; /* Referee allowed us to work with half of the nodes */
293+
int refereeWinnerId; /* Node that won referee contest */
293294
nodemask_t deadNodeMask; /* Bitmask of nodes considered as dead by referee */
294295
nodemask_t recoveredNodeMask; /* Bitmask of nodes recoverd after been reported as dead by referee */
295296
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -430,7 +431,7 @@ extern void MtmCheckHeartbeat(void);
430431
extern void MtmResetTransaction(void);
431432
extern void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize);
432433
extern void MtmReleaseRecoverySlot(int nodeId);
433-
extern PGconn *PQconnectdb_safe(const char *conninfo);
434+
extern PGconn *PQconnectdb_safe(const char *conninfo, int timeout);
434435
extern void MtmBeginSession(int nodeId);
435436
extern void MtmEndSession(int nodeId, bool unlock);
436437
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ pglogical_receiver_main(Datum main_arg)
296296
count = Mtm->recoveryCount;
297297

298298
/* Establish connection to remote server */
299-
conn = PQconnectdb_safe(connString);
299+
conn = PQconnectdb_safe(connString, 0);
300300
status = PQstatus(conn);
301301
if (status != CONNECTION_OK)
302302
{

referee.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static void MtmRefereeLoop(char const** connections, int nConns)
5050
int result;
5151

5252
for (i = 0; i < nConns; i++) {
53-
conns[i] = PQconnectdb_safe(connections[i]);
53+
conns[i] = PQconnectdb_safe(connections[i], 0);
5454
status = PQstatus(conns[i]);
5555
if (status != CONNECTION_OK)
5656
{
@@ -74,7 +74,7 @@ static void MtmRefereeLoop(char const** connections, int nConns)
7474
/* Some of live node reestablished connection with dead node, so referee should also try to connect to this node */
7575
if (conns[i] == NULL) {
7676
if (BIT_CHECK(newEnabledMask, i)) {
77-
conns[i] = PQconnectdb_safe(connections[i]);
77+
conns[i] = PQconnectdb_safe(connections[i], 0);
7878
status = PQstatus(conns[i]);
7979
if (status == CONNECTION_OK) {
8080
BIT_CLEAR(disabledMask, i);

state.c

Lines changed: 82 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ char const* const MtmEventMnem[] =
2525
"MTM_NONRECOVERABLE_ERROR"
2626
};
2727

28-
static int MtmGetRefereeWinner(void);
28+
static int MtmRefereeGetWinner(void);
29+
static bool MtmRefereeClearWinner(void);
2930

3031
// XXXX: allocate in context and clean it
3132
static char *
@@ -81,7 +82,7 @@ MtmCheckState(void)
8182
// int nVotingNodes = MtmGetNumberOfVotingNodes();
8283
bool isEnabledState;
8384
int nEnabled = countZeroBits(Mtm->disabledNodeMask, Mtm->nAllNodes);
84-
int nConnected = countZeroBits(SELF_CONNECTIVITY_MASK, Mtm->nAllNodes);
85+
int nConnected = countZeroBits(EFFECTIVE_CONNECTIVITY_MASK, Mtm->nAllNodes);
8586
int nReceivers = Mtm->nAllNodes - countZeroBits(Mtm->pglogicalReceiverMask, Mtm->nAllNodes);
8687
int nSenders = Mtm->nAllNodes - countZeroBits(Mtm->pglogicalSenderMask, Mtm->nAllNodes);
8788

@@ -390,26 +391,46 @@ MtmRefreshClusterStatus()
390391
MtmCheckState();
391392

392393
/*
393-
* Check for referee decidion when pnly half of nodes are visible.
394+
* Check for referee decision when only half of nodes are visible.
394395
*/
395-
if (MtmRefereeConnStr && *MtmRefereeConnStr && !Mtm->refereeGrant &&
396+
if (MtmRefereeConnStr && *MtmRefereeConnStr && !Mtm->refereeWinnerId &&
396397
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK, Mtm->nAllNodes) == Mtm->nAllNodes/2)
397398
{
398-
int winner_node_id = MtmGetRefereeWinner();
399-
if (winner_node_id != -1 &&
400-
!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, winner_node_id - 1))
399+
int winner_node_id = MtmRefereeGetWinner();
400+
401+
if (winner_node_id > 0)
401402
{
402-
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
403-
winner_node_id);
404-
Mtm->refereeGrant = true;
405-
406-
MtmLock(LW_EXCLUSIVE);
407-
MtmEnableNode(MtmNodeId);
408-
MtmCheckState();
409-
MtmUnlock();
403+
Mtm->refereeWinnerId = winner_node_id;
404+
if (!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, winner_node_id - 1))
405+
{
406+
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
407+
winner_node_id);
408+
Mtm->refereeGrant = true;
409+
MtmLock(LW_EXCLUSIVE);
410+
MtmEnableNode(MtmNodeId);
411+
MtmCheckState();
412+
MtmUnlock();
413+
}
410414
}
411415
}
412416

417+
/*
418+
* Clear winner if we again have all nodes are online.
419+
*/
420+
if (MtmRefereeConnStr && *MtmRefereeConnStr && Mtm->refereeWinnerId &&
421+
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK, Mtm->nAllNodes) == Mtm->nAllNodes)
422+
{
423+
if (MtmRefereeClearWinner())
424+
{
425+
Mtm->refereeWinnerId = 0;
426+
Mtm->refereeGrant = false;
427+
MTM_LOG1("[STATE] Cleaning old referee decision");
428+
}
429+
}
430+
431+
/* Do not check clique with referee grant */
432+
if (Mtm->refereeGrant)
433+
return;
413434

414435
/*
415436
* Check for clique.
@@ -473,37 +494,17 @@ MtmRefreshClusterStatus()
473494
}
474495

475496
static int
476-
MtmGetRefereeWinner(void)
497+
MtmRefereeGetWinner(void)
477498
{
478-
int socket_fd;
479499
PGconn* conn;
480500
PGresult *res;
481-
struct timeval timeout = { 5, 0 };
482501
char sql[128];
483502
int winner_node_id;
484503

485-
conn = PQconnectdb_safe(MtmRefereeConnStr);
504+
conn = PQconnectdb_safe(MtmRefereeConnStr, 5);
486505
if (PQstatus(conn) != CONNECTION_OK)
487506
{
488-
MTM_ELOG(WARNING, "Could not connect to referee (%s): %s",
489-
MtmRefereeConnStr, PQerrorMessage(conn));
490-
PQfinish(conn);
491-
return -1;
492-
}
493-
494-
socket_fd = PQsocket(conn);
495-
if (socket_fd < 0)
496-
{
497-
MTM_ELOG(WARNING, "Referee socket is invalid");
498-
PQfinish(conn);
499-
return -1;
500-
}
501-
502-
if (setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO,
503-
(char *)&timeout, sizeof(timeout)) < 0)
504-
{
505-
MTM_ELOG(WARNING, "Could not set referee socket timeout: %s",
506-
strerror(errno));
507+
MTM_ELOG(WARNING, "Could not connect to referee");
507508
PQfinish(conn);
508509
return -1;
509510
}
@@ -540,3 +541,46 @@ MtmGetRefereeWinner(void)
540541
return winner_node_id;
541542
}
542543

544+
static bool
545+
MtmRefereeClearWinner(void)
546+
{
547+
PGconn* conn;
548+
PGresult *res;
549+
char *response;
550+
551+
conn = PQconnectdb_safe(MtmRefereeConnStr, 5);
552+
if (PQstatus(conn) != CONNECTION_OK)
553+
{
554+
MTM_ELOG(WARNING, "Could not connect to referee");
555+
PQfinish(conn);
556+
return false;
557+
}
558+
559+
res = PQexec(conn, "select mtm.referee_clean()");
560+
if (PQresultStatus(res) != PGRES_TUPLES_OK ||
561+
PQntuples(res) != 1 ||
562+
PQnfields(res) != 1)
563+
{
564+
MTM_ELOG(WARNING, "Refusing unexpected result (r=%d, n=%d, w=%d, k=%s) from referee_clean().",
565+
PQresultStatus(res), PQntuples(res), PQnfields(res), PQgetvalue(res, 0, 0));
566+
PQclear(res);
567+
PQfinish(conn);
568+
return false;
569+
}
570+
571+
response = PQgetvalue(res, 0, 0);
572+
573+
if (false)
574+
{
575+
MTM_ELOG(WARNING, "Wrong response from referee");
576+
PQclear(res);
577+
PQfinish(conn);
578+
return false;
579+
}
580+
581+
/* Ok, we finally got it! */
582+
MTM_LOG1("Got referee clear response %s", response);
583+
PQclear(res);
584+
PQfinish(conn);
585+
return true;
586+
}

tests2/lib/test_helper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def assertNoCommits(self, aggs):
3434
if commits:
3535
raise AssertionError('There are commits during aggregation interval')
3636

37-
def performFailure(self, failure):
37+
def performFailure(self, failure, wait=0):
3838

3939
time.sleep(TEST_WARMING_TIME)
4040

@@ -51,7 +51,7 @@ def performFailure(self, failure):
5151
aggs_failure = self.client.get_aggregates()
5252

5353

54-
# time.sleep(10000)
54+
time.sleep(wait)
5555
failure.stop()
5656

5757
print('Eliminate failure at ',datetime.datetime.utcnow())

tests2/test_major.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,28 @@ def test_partition_referee(self):
9595
self.assertCommits(aggs)
9696
self.assertIsolation(aggs)
9797

98+
def test_double_failure_referee(self):
99+
print('### test_double_failure_referee ###')
100+
101+
aggs_failure, aggs = self.performFailure(SingleNodePartition('node2'))
102+
103+
self.assertCommits(aggs_failure[:1])
104+
self.assertNoCommits(aggs_failure[1:])
105+
self.assertIsolation(aggs_failure)
106+
107+
self.assertCommits(aggs)
108+
self.assertIsolation(aggs)
109+
110+
aggs_failure, aggs = self.performFailure(SingleNodePartition('node1'))
111+
112+
self.assertNoCommits(aggs_failure[:1])
113+
self.assertCommits(aggs_failure[1:])
114+
self.assertIsolation(aggs_failure)
115+
116+
self.assertCommits(aggs)
117+
self.assertIsolation(aggs)
118+
119+
98120
if __name__ == '__main__':
99121
unittest.main()
100122

0 commit comments

Comments
 (0)