Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 12b1491

Browse files
committed
port referee back
1 parent 5282434 commit 12b1491

File tree

9 files changed

+101
-29
lines changed

9 files changed

+101
-29
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ COPY ./ /pg/mmts/
66
RUN export USE_PGXS=1 && \
77
cd /pg/mmts && make clean && make install
88

9-
# RUN export USE_PGXS=1 && \
10-
# cd /pg/src/contrib/referee && make clean && make install
9+
RUN export USE_PGXS=1 && \
10+
cd /pg/src/contrib/referee && make clean && make install
1111

1212
# pg_regress client assumes such dir exists on server
1313
RUN cp /pg/src/src/test/regress/*.so /pg/install/lib/postgresql/

src/dmq.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -772,8 +772,8 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
772772
}
773773

774774
mtm_log(DmqTraceIncoming,
775-
"[DMQ] got message %s.%s, passing to %d", stream_name, body,
776-
sub->procno);
775+
"[DMQ] got message %s.%s (len=%d), passing to %d", stream_name, body,
776+
body_len, sub->procno);
777777

778778
/* and send it */
779779
res = shm_mq_send(mq_handles[sub->procno], body_len, body, false);
@@ -1546,8 +1546,8 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15461546
*sender_id = i;
15471547

15481548
mtm_log(DmqTraceIncoming,
1549-
"[DMQ] dmq_pop_nb: got message %s from %s",
1550-
(char *) data, dmq_local.inhandles[i].name);
1549+
"[DMQ] dmq_pop_nb: got message %s (len=%zu) from %s",
1550+
(char *) data, len, dmq_local.inhandles[i].name);
15511551
return true;
15521552
}
15531553
else if (res == SHM_MQ_DETACHED)

src/include/resolver.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern void ResolverInit(void);
2222
extern BackgroundWorkerHandle *ResolverStart(Oid db_id, Oid user_id);
2323
extern void ResolveTransactionsForNode(int node_id, int n_all_nodes);
2424
extern void ResolveAllTransactions(int n_all_nodes);
25+
extern void ResolveForRefereeWinner(int n_all_nodes);
2526
extern char *MtmTxStateMnem(MtmTxState state);
2627

2728
#endif /* RESOLVER_H */

src/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ MtmStartReceiver(int nodeId, Oid db_id, Oid user_id, pid_t monitor_pid)
999999
MemSet(&worker, 0, sizeof(BackgroundWorker));
10001000
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
10011001
worker.bgw_start_time = BgWorkerStart_ConsistentState;
1002-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
1002+
worker.bgw_restart_time = 1;
10031003
worker.bgw_main_arg = Int32GetDatum(nodeId);
10041004
worker.bgw_notify_pid = monitor_pid;
10051005

src/resolver.c

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ ResolverStart(Oid db_id, Oid user_id)
132132
MemSet(&worker, 0, sizeof(BackgroundWorker));
133133
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
134134
worker.bgw_start_time = BgWorkerStart_ConsistentState;
135-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
135+
worker.bgw_restart_time = 1;
136136

137137
memcpy(worker.bgw_extra, &db_id, sizeof(Oid));
138138
memcpy(worker.bgw_extra + sizeof(Oid), &user_id, sizeof(Oid));
@@ -223,6 +223,46 @@ ResolveAllTransactions(int n_all_nodes)
223223
ResolveTransactionsForNode(-1, n_all_nodes);
224224
}
225225

226+
void
227+
ResolveForRefereeWinner(int n_all_nodes)
228+
{
229+
HASH_SEQ_STATUS hash_seq;
230+
resolver_tx *tx;
231+
232+
mtm_log(LOG, "ResolveForRefereeWinner");
233+
234+
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
235+
load_tasks(-1, n_all_nodes);
236+
237+
hash_seq_init(&hash_seq, gid2tx);
238+
while ((tx = hash_seq_search(&hash_seq)) != NULL)
239+
{
240+
MtmTxState state = tx->state[Mtm->my_node_id - 1];
241+
bool found;
242+
243+
if (state == MtmTxPrepared)
244+
{
245+
FinishPreparedTransaction(tx->gid, false, true);
246+
mtm_log(ResolverTxFinish, "TXFINISH: %s aborted", tx->gid);
247+
hash_search(gid2tx, tx->gid, HASH_REMOVE, &found);
248+
Assert(found);
249+
}
250+
else if (state == MtmTxPreCommited)
251+
{
252+
FinishPreparedTransaction(tx->gid, true, true);
253+
mtm_log(ResolverTxFinish, "TXFINISH: %s committed", tx->gid);
254+
hash_search(gid2tx, tx->gid, HASH_REMOVE, &found);
255+
Assert(found);
256+
}
257+
else
258+
{
259+
Assert(false);
260+
}
261+
}
262+
263+
LWLockRelease(resolver_state->lock);
264+
}
265+
226266
char *
227267
MtmTxStateMnem(MtmTxState state)
228268
{

src/state.c

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ MtmCheckState(void)
261261
break;
262262

263263
case MTM_RECOVERY:
264-
if (mtm_state->recovered)
264+
if (mtm_state->recovered || mtm_state->referee_grant)
265265
{
266266
MtmSetClusterStatus(MTM_RECOVERED);
267267

@@ -279,7 +279,8 @@ MtmCheckState(void)
279279
* in MTM_RECOVERED state.
280280
*/
281281
case MTM_RECOVERED:
282-
if (nReceivers == nEnabled && nSenders == nEnabled && nEnabled == nConnected - 1)
282+
if ((nReceivers == nEnabled && nSenders == nEnabled && nEnabled == nConnected - 1)
283+
|| mtm_state->referee_grant)
283284
{
284285
MtmSetClusterStatus(MTM_ONLINE);
285286

@@ -605,8 +606,6 @@ MtmRefreshClusterStatus()
605606
// MtmCheckState();
606607
// MtmUnlock();
607608

608-
return;
609-
610609
/*
611610
* Check for referee decision when only half of nodes are visible.
612611
* Do not hold lock here, but recheck later wheter mask changed.
@@ -642,8 +641,9 @@ MtmRefreshClusterStatus()
642641
if (popcount(mtm_state->connected_mask) == 1)
643642
{
644643
// MtmPollStatusOfPreparedTransactions(true);
645-
ResolveAllTransactions(popcount(mtm_state->configured_mask));
644+
ResolveForRefereeWinner(popcount(mtm_state->configured_mask));
646645
}
646+
mtm_state->recovered = true;
647647
MtmEnableNode(Mtm->my_node_id);
648648
MtmCheckState();
649649
}
@@ -659,7 +659,7 @@ MtmRefreshClusterStatus()
659659
* can get refereeGrant before start of walsender, so it start in recovered mode.
660660
*/
661661
if (MtmRefereeConnStr && *MtmRefereeConnStr && mtm_state->referee_winner_id &&
662-
popcount(mtm_state->connected_mask) == popcount(mtm_state->configured_mask) &&
662+
popcount(mtm_state->enabled_mask) == popcount(mtm_state->configured_mask) &&
663663
MtmGetCurrentStatus() == MTM_ONLINE) /* restrict this actions only to major -> online transition */
664664
{
665665
if (MtmRefereeClearWinner())
@@ -670,6 +670,8 @@ MtmRefreshClusterStatus()
670670
}
671671
}
672672

673+
return;
674+
673675
// Mtm->clique = (((nodemask_t)1 << Mtm->nAllNodes) - 1);
674676
// return;
675677

@@ -1143,7 +1145,7 @@ MtmMonitorStart(Oid db_id, Oid user_id)
11431145
MemSet(&worker, 0, sizeof(BackgroundWorker));
11441146
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
11451147
worker.bgw_start_time = BgWorkerStart_ConsistentState;
1146-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
1148+
worker.bgw_restart_time = 1;
11471149
worker.bgw_main_arg = Int32GetDatum(0);
11481150

11491151
memcpy(worker.bgw_extra, &db_id, sizeof(Oid));
@@ -1609,6 +1611,8 @@ MtmMonitor(Datum arg)
16091611
// XXX: add tx start/stop to clear mcxt?
16101612
check_status_requests(mtm_cfg);
16111613

1614+
MtmRefreshClusterStatus();
1615+
16121616
rc = WaitLatch(MyLatch,
16131617
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
16141618
1000, PG_WAIT_EXTENSION);

tests/docker-entrypoint.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ if [ "$1" = 'postgres' ]; then
2525
max_worker_processes = 50
2626
max_replication_slots = 10
2727
max_wal_senders = 10
28-
# log_statement = all
28+
log_statement = all
2929
3030
shared_preload_libraries = 'multimaster'
3131
multimaster.volkswagen_mode = on
@@ -71,8 +71,8 @@ if [ "$1" = 'postgres' ]; then
7171
EOSQL
7272
echo
7373

74-
psql -U `whoami` $POSTGRES_DB -c 'CREATE EXTENSION multimaster;';
75-
psql -U `whoami` $POSTGRES_DB -c "select mtm.init_node($NODE_ID, '{$CONNSTRS}');"
74+
# psql -U `whoami` $POSTGRES_DB -c 'CREATE EXTENSION multimaster;';
75+
# psql -U `whoami` $POSTGRES_DB -c "select mtm.init_node($NODE_ID, '{$CONNSTRS}');"
7676

7777
pg_ctl -D "$PGDATA" -m fast -w stop
7878
fi

tests/lib/bank_client.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ def __init__(self, dsns, n_accounts=100000):
8383

8484
self.total = 0
8585
self.aggregates = [{} for e in dsns]
86-
keep_trying(40, 1, self.initdb, 'self.initdb')
86+
keep_trying(40, 1, self.create_extension, 'self.create_extension')
87+
keep_trying(40, 1, self.await_nodes, 'self.await_nodes')
88+
89+
self.initdb()
90+
8791
self.nodes_state_fields = ["id", "disabled", "disconnected", "catchUp", "slotLag",
8892
"avgTransDelay", "lastStatusChange", "oldestSnapshot", "SenderPid",
8993
"SenderStartTime ", "ReceiverPid", "ReceiverStartTime", "connStr"]
@@ -116,8 +120,6 @@ def __init__(self, dsns, n_accounts=100000):
116120
def initdb(self):
117121
conn = psycopg2.connect(self.dsns[0])
118122
cur = conn.cursor()
119-
# cur.execute('create extension if not exists multimaster')
120-
conn.commit()
121123
cur.execute('drop table if exists bank_test')
122124
cur.execute('create table bank_test(uid int primary key, amount int)')
123125
cur.execute('create table insert_test(id text primary key)')
@@ -138,16 +140,39 @@ def execute(self, node_id, statements):
138140
cur.close()
139141
con.close()
140142

143+
def await_nodes(self):
144+
print("await_nodes")
145+
146+
for dsn in self.dsns:
147+
con = psycopg2.connect(dsn)
148+
con.autocommit = True
149+
cur = con.cursor()
150+
cur.execute('select 1')
151+
cur.close()
152+
con.close()
153+
154+
141155
def create_extension(self):
142156

157+
print("create extension")
158+
143159
for dsn in self.dsns:
144160
con = psycopg2.connect(dsn)
145161
con.autocommit = True
146162
cur = con.cursor()
147-
cur.execute('create extension multimaster;')
163+
cur.execute('create extension if not exists multimaster')
148164
cur.close()
149165
con.close()
150166

167+
conn = psycopg2.connect(self.dsns[0])
168+
cur = conn.cursor()
169+
cur.execute("select mtm.init_cluster($$%s$$, $${%s}$$);" %
170+
("dbname=regression user=pg host=node1",
171+
'"dbname=regression user=pg host=node2"'))
172+
conn.commit()
173+
cur.close()
174+
conn.close()
175+
151176
def is_data_identic(self):
152177
hashes = set()
153178
hashes2 = set()
@@ -324,13 +349,13 @@ def total_tx(self, conn, cur, agg, conn_i):
324349
agg.isolation += 1
325350
print(datetime.datetime.utcnow(), 'Isolation error, total ', self.total, ' -> ', total[0], ', node ', conn_i+1)
326351
self.total = total[0]
327-
print(self.oops)
328-
yield from cur.execute('select * from pg_prepared_xacts order by prepared;')
329-
pxacts = yield from cur.fetchall()
330-
for pxact in pxacts:
331-
for i, col in enumerate(["transaction", "gid", "prepared", "owner", "database", "state3pc"]):
332-
print(pxact[i], end="\t")
333-
print("\n")
352+
# print(self.oops)
353+
# yield from cur.execute('select * from pg_prepared_xacts order by prepared;')
354+
# pxacts = yield from cur.fetchall()
355+
# for pxact in pxacts:
356+
# for i, col in enumerate(["transaction", "gid", "prepared", "owner", "database", "state3pc"]):
357+
# print(pxact[i], end="\t")
358+
# print("\n")
334359

335360
def run(self):
336361
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

tests/test_referee.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ def test_saved_referee_decision(self):
179179
print('###########################')
180180
docker_api.containers.get('node2').start()
181181
self.awaitOnline("dbname=regression user=postgres host=127.0.0.1 port=15433")
182+
self.awaitOnline("dbname=regression user=postgres host=127.0.0.1 port=15432")
182183

183184
self.client.bgrun()
184185
time.sleep(3)
@@ -316,6 +317,7 @@ def test_consequent_shutdown(self):
316317
self.client.stop()
317318
docker_api.containers.get('node2').start()
318319
self.awaitOnline("dbname=regression user=postgres host=127.0.0.1 port=15433")
320+
self.awaitOnline("dbname=regression user=postgres host=127.0.0.1 port=15432")
319321
self.client.bgrun()
320322
time.sleep(3)
321323

0 commit comments

Comments
 (0)