Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d584aab

Browse files
committed
Save persistent winner decision locally and fix highlighted problem with recovery from disabled node
1 parent 4e48f2d commit d584aab

File tree

5 files changed

+222
-3
lines changed

5 files changed

+222
-3
lines changed

multimaster--1.0.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ LANGUAGE C;
114114

115115
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema name, rel_name name, primary key(rel_schema, rel_name));
116116

117+
CREATE TABLE mtm.referee_decision(key text primary key not null, node_id int);
118+
117119
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
118120
$$
119121
DECLARE

multimaster.c

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ bool MtmPreserveCommitOrder;
256256
bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257257
bool MtmMajorNode;
258258
char* MtmRefereeConnStr;
259+
bool MtmEnforceLocalTx;
259260

260261
static char* MtmConnStrs;
261262
static char* MtmRemoteFunctionsList;
@@ -1110,7 +1111,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
11101111
ts->procno = MyProc->pgprocno;
11111112
ts->votingCompleted = false;
11121113
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
1113-
ts->isLocal = x->isReplicated || !x->containsDML || (ts->participantsMask == 0);
1114+
ts->isLocal = x->isReplicated || !x->containsDML || (ts->participantsMask == 0) || MtmEnforceLocalTx;
11141115
ts->nConfigChanges = Mtm->nConfigChanges;
11151116
ts->votedMask = 0;
11161117
ts->nSubxids = xactGetCommittedChildren(&subxids);
@@ -3713,8 +3714,14 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
37133714
* unless we are performing recovery of disabled node
37143715
* (in this case all transactions should be sent)
37153716
*/
3716-
bool res = Mtm->status != MTM_RECOVERY
3717-
&& (args->origin_id == InvalidRepOriginId
3717+
/*
3718+
* I removed (Mtm->status != MTM_RECOVERY) here since in major
3719+
* mode we need to recover from offline node too. Also it seems
3720+
* that with amount of nodes >= 3 we also need that. --sk
3721+
*
3722+
* On a first look this works fine.
3723+
*/
3724+
bool res = (args->origin_id == InvalidRepOriginId
37183725
|| MtmIsRecoveredNode(MtmReplicationNodeId));
37193726
if (!res) {
37203727
MTM_LOG2("Filter transaction with origin_id=%d", args->origin_id);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ extern MtmConnectionInfo* MtmConnections;
392392
extern bool MtmMajorNode;
393393
extern bool MtmBackgroundWorker;
394394
extern char* MtmRefereeConnStr;
395+
extern bool MtmEnforceLocalTx;
395396

396397

397398
extern void MtmArbiterInitialize(void);

state.c

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
#include "miscadmin.h" /* PostmasterPid */
33
#include "multimaster.h"
44
#include "state.h"
5+
#include "executor/spi.h"
6+
#include "utils/snapmgr.h"
7+
#include "nodes/makefuncs.h"
8+
#include "catalog/namespace.h"
59

610
char const* const MtmNeighborEventMnem[] =
711
{
@@ -27,6 +31,9 @@ char const* const MtmEventMnem[] =
2731

2832
static int MtmRefereeGetWinner(void);
2933
static bool MtmRefereeClearWinner(void);
34+
static int MtmRefereeReadSaved(void);
35+
36+
static bool mtm_state_initialized;
3037

3138
// XXXX: allocate in context and clean it
3239
static char *
@@ -434,6 +441,13 @@ MtmRefreshClusterStatus()
434441
{
435442
int winner_node_id = MtmRefereeGetWinner();
436443

444+
/* We also can have old value. Do that only from single mtm-monitor process */
445+
if (winner_node_id <= 0 && !mtm_state_initialized)
446+
{
447+
winner_node_id = MtmRefereeReadSaved();
448+
mtm_state_initialized = true;
449+
}
450+
437451
if (winner_node_id > 0)
438452
{
439453
Mtm->refereeWinnerId = winner_node_id;
@@ -555,13 +569,70 @@ MtmRefreshClusterStatus()
555569
MtmUnlock();
556570
}
557571

572+
/*
573+
* Referee caches decision in mtm.referee_decision
574+
*/
575+
static bool
576+
MtmRefereeHasLocalTable()
577+
{
578+
RangeVar *rv;
579+
Oid rel_oid;
580+
581+
StartTransactionCommand();
582+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, "referee_decision", -1);
583+
rel_oid = RangeVarGetRelid(rv, NoLock, true);
584+
CommitTransactionCommand();
585+
586+
return OidIsValid(rel_oid);
587+
}
588+
589+
static int
590+
MtmRefereeReadSaved(void)
591+
{
592+
int winner = -1;
593+
int rc;
594+
595+
if (!MtmRefereeHasLocalTable())
596+
return -1;
597+
598+
/* Save result locally */
599+
StartTransactionCommand();
600+
SPI_connect();
601+
PushActiveSnapshot(GetTransactionSnapshot());
602+
rc = SPI_execute("select node_id from mtm.referee_decision where key = 'winner';", true, 0);
603+
if (rc != SPI_OK_SELECT)
604+
{
605+
MTM_ELOG(WARNING, "Failed to load referee decision");
606+
}
607+
else if (SPI_processed > 0)
608+
{
609+
bool isnull;
610+
winner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull));
611+
Assert(SPI_processed == 1);
612+
Assert(!isnull);
613+
}
614+
else
615+
{
616+
/* no saved decision found */
617+
Assert(SPI_processed == 0);
618+
}
619+
SPI_finish();
620+
PopActiveSnapshot();
621+
CommitTransactionCommand();
622+
623+
MTM_LOG1("Read saved referee decision, winner=%d.", winner);
624+
return winner;
625+
}
626+
558627
static int
559628
MtmRefereeGetWinner(void)
560629
{
561630
PGconn* conn;
562631
PGresult *res;
563632
char sql[128];
564633
int winner_node_id;
634+
int old_winner = -1;
635+
int rc;
565636

566637
conn = PQconnectdb_safe(MtmRefereeConnStr, 5);
567638
if (PQstatus(conn) != CONNECTION_OK)
@@ -599,6 +670,48 @@ MtmRefereeGetWinner(void)
599670
/* Ok, we finally got it! */
600671
PQclear(res);
601672
PQfinish(conn);
673+
674+
/* Save result locally */
675+
if (MtmRefereeHasLocalTable())
676+
{
677+
MtmEnforceLocalTx = true;
678+
StartTransactionCommand();
679+
SPI_connect();
680+
PushActiveSnapshot(GetTransactionSnapshot());
681+
/* Check old value if any */
682+
rc = SPI_execute("select node_id from mtm.referee_decision where key = 'winner';", true, 0);
683+
if (rc != SPI_OK_SELECT)
684+
{
685+
MTM_ELOG(WARNING, "Failed to load previous referee decision");
686+
}
687+
else if (SPI_processed > 0)
688+
{
689+
bool isnull;
690+
old_winner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull));
691+
Assert(SPI_processed == 1);
692+
Assert(!isnull);
693+
}
694+
else
695+
{
696+
/* no saved decision found */
697+
Assert(SPI_processed == 0);
698+
}
699+
/* Update actual key */
700+
sprintf(sql,
701+
"insert into mtm.referee_decision values ('winner', %d) on conflict(key) do nothing;",
702+
winner_node_id);
703+
rc = SPI_execute(sql, false, 0);
704+
SPI_finish();
705+
if (rc < 0)
706+
MTM_ELOG(WARNING, "Failed to save referee decision, but proceeding anyway");
707+
PopActiveSnapshot();
708+
CommitTransactionCommand();
709+
MtmEnforceLocalTx = false;
710+
711+
if (old_winner > 0 && old_winner != winner_node_id)
712+
MTM_LOG1("WARNING Overriding old referee decision (%d) with new one (%d)", old_winner, winner_node_id);
713+
}
714+
602715
MTM_LOG1("Got referee response, winner node_id=%d.", winner_node_id);
603716
return winner_node_id;
604717
}
@@ -609,6 +722,32 @@ MtmRefereeClearWinner(void)
609722
PGconn* conn;
610723
PGresult *res;
611724
char *response;
725+
int rc;
726+
727+
/*
728+
* Delete result locally first.
729+
*
730+
* If we delete decision from referee but fail to delete local cached
731+
* that will be pretty bad -- on the next reboot we can read
732+
* stale referee decision and on next failure end up with two masters.
733+
* So just delete local cache first.
734+
*/
735+
if (MtmRefereeHasLocalTable())
736+
{
737+
StartTransactionCommand();
738+
SPI_connect();
739+
PushActiveSnapshot(GetTransactionSnapshot());
740+
rc = SPI_execute("delete from mtm.referee_decision where key = 'winner'", false, 0);
741+
SPI_finish();
742+
PopActiveSnapshot();
743+
CommitTransactionCommand();
744+
if (rc < 0)
745+
{
746+
MTM_ELOG(WARNING, "Failed to save referee decision, proceeding anyway");
747+
return false;
748+
}
749+
}
750+
612751

613752
conn = PQconnectdb_safe(MtmRefereeConnStr, 5);
614753
if (PQstatus(conn) != CONNECTION_OK)

tests2/test_referee.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,76 @@ def test_double_failure_referee(self):
102102
self.assertCommits(aggs)
103103
self.assertIsolation(aggs)
104104

105+
def test_saved_referee_decision(self):
106+
print('### test_saved_referee_decision ###')
107+
docker_api = docker.from_env()
108+
109+
print('#### down on(winner) || on')
110+
print('###########################')
111+
aggs_failure, aggs = self.performFailure(StopNode('node1'))
112+
113+
self.assertNoCommits(aggs_failure[:1])
114+
self.assertCommits(aggs_failure[1:])
115+
self.assertIsolation(aggs_failure)
116+
117+
self.assertNoCommits(aggs[:1])
118+
self.assertCommits(aggs[1:])
119+
self.assertIsolation(aggs)
120+
121+
print('#### down restart(winner) || down')
122+
print('###########################')
123+
docker_api.containers.get('referee').stop()
124+
aggs_failure, aggs = self.performFailure(RestartNode('node2'), node_wait_for_commit=1)
125+
126+
# without saved decision node2 will be endlessy disabled here
127+
128+
self.assertNoCommits(aggs_failure)
129+
self.assertIsolation(aggs_failure)
130+
131+
self.assertNoCommits(aggs[:1])
132+
self.assertCommits(aggs[1:])
133+
self.assertIsolation(aggs)
134+
135+
print('#### up down(winner) || down')
136+
print('###########################')
137+
docker_api.containers.get('node2').stop()
138+
docker_api.containers.get('node1').start()
139+
aggs_failure, aggs = self.performFailure(NoFailure())
140+
141+
self.assertNoCommits(aggs_failure)
142+
self.assertIsolation(aggs_failure)
143+
self.assertNoCommits(aggs)
144+
self.assertIsolation(aggs)
145+
146+
print('#### up down(winner) || up')
147+
print('###########################')
148+
docker_api.containers.get('referee').start()
149+
aggs_failure, aggs = self.performFailure(NoFailure())
150+
151+
self.assertNoCommits(aggs_failure)
152+
self.assertIsolation(aggs_failure)
153+
self.assertNoCommits(aggs)
154+
self.assertIsolation(aggs)
155+
156+
print('#### up up(winner) || up')
157+
print('###########################')
158+
docker_api.containers.get('node2').start()
159+
self.awaitCommit(0)
160+
161+
# give it time to clean old decision
162+
time.sleep(5)
163+
164+
print('#### check that decision is cleaned')
165+
print('###########################')
166+
con = psycopg2.connect("dbname=regression user=postgres host=127.0.0.1 port=15435")
167+
con.autocommit = True
168+
cur = con.cursor()
169+
cur.execute("select node_id into winner_id from referee.decision where key = 'winner'")
170+
decisions_count = cur.rowcount
171+
cur.close()
172+
con.close()
173+
174+
self.assertEqual(decisions_count, 0)
105175

106176
def test_winner_restart(self):
107177
print('### test_winner_restart ###')

0 commit comments

Comments
 (0)