Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit dd9b26b

Browse files
committed
Finish recovery after we sure that all forwarded transactions are applied up to some lsn
1 parent 6f0d2c9 commit dd9b26b

10 files changed

+102
-42
lines changed

Cluster.pm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ sub configure
159159
multimaster.max_nodes = 6
160160
# XXX try without ignore_tables_without_pk
161161
multimaster.ignore_tables_without_pk = true
162-
multimaster.volkswagen_mode = 1
162+
# multimaster.volkswagen_mode = 1
163163
log_line_prefix = '%t [%p]: '
164164
));
165165

src/include/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ extern bool MtmBackgroundWorker;
162162
extern char* MtmRefereeConnStr;
163163

164164
extern LWLock *MtmCommitBarrier;
165+
extern LWLock *MtmReceiverBarrier;
165166

166167
extern void MtmXactCallback2(XactEvent event, void *arg);
167168
extern bool MtmIsUserTransaction(void);
@@ -194,5 +195,6 @@ extern void MtmUpdateControlFile(void);
194195
extern void MtmCheckSlots(void);
195196

196197
extern TimestampTz MtmGetIncreasingTimestamp(void);
198+
extern bool MtmAllApplyWorkersFinished(void);
197199

198200
#endif

src/include/pglogical_output.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ typedef struct HookFuncName
4343

4444
typedef struct MtmDecoderPrivate
4545
{
46-
int magic; // XXX
4746
bool is_recovery;
47+
bool recovery_done;
4848
int64 session_id;
4949
} MtmDecoderPrivate;
5050

src/multimaster.c

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ static TransactionManager MtmTM =
124124
};
125125

126126
LWLock *MtmCommitBarrier;
127+
LWLock *MtmReceiverBarrier;
127128

128129
bool MtmDoReplication;
129130
char* MtmDatabaseName;
@@ -601,6 +602,7 @@ static void MtmInitialize()
601602
RegisterXactCallback(MtmXactCallback2, NULL);
602603

603604
MtmCommitBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+1].lock);
605+
MtmReceiverBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+2].lock);
604606

605607
MtmDoReplication = true;
606608
TM = &MtmTM;
@@ -1188,7 +1190,7 @@ _PG_init(void)
11881190
* resources in mtm_shmem_startup().
11891191
*/
11901192
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize + sizeof(MtmTime));
1191-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 1);
1193+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 2);
11921194

11931195
MtmMonitorInitialize();
11941196

@@ -1837,3 +1839,28 @@ MtmWaitForExtensionCreation(void)
18371839
MtmSleep(USECS_PER_SEC);
18381840
}
18391841
}
1842+
1843+
bool
1844+
MtmAllApplyWorkersFinished()
1845+
{
1846+
int i;
1847+
1848+
for (i = 0; i < Mtm->nAllNodes; i++)
1849+
{
1850+
volatile int ntasks;
1851+
1852+
if (i == MtmNodeId - 1)
1853+
continue;
1854+
1855+
SpinLockAcquire(&Mtm->nodes[i].pool.lock);
1856+
ntasks = Mtm->nodes[i].pool.active + Mtm->nodes[i].pool.pending;
1857+
SpinLockRelease(&Mtm->nodes[i].pool.lock);
1858+
1859+
if (ntasks != 0)
1860+
false;
1861+
}
1862+
1863+
return true;
1864+
}
1865+
1866+

src/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
502502
char const* messageBody = pq_getmsgbytes(s, messageSize);
503503
bool standalone = false;
504504

505-
MtmBeginSession(MtmReplicationNodeId);
505+
MtmBeginSession(receiver_ctx->node_id);
506506

507507
switch (action)
508508
{

src/pglogical_output.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,16 +470,31 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
470470
*/
471471
if (data->api && hooks_data->is_recovery)
472472
{
473-
MtmOutputPluginPrepareWrite(ctx, true, true);
474-
data->api->write_caughtup(ctx->out, data, ctx->reader->EndRecPtr);
475-
MtmOutputPluginWrite(ctx, true, true);
476473

477474
/*
478-
* This hook can be called mupltiple times when there is concurrent
479-
* load, so exit right after we wrote recovery message first time during
480-
* current recovery session.
475+
* Create safe point in our WAL so that recovered node will not lose
476+
* any transactions during switch to normal mode.
481477
*/
482-
proc_exit(0);
478+
if (!hooks_data->recovery_done)
479+
{
480+
LWLockAcquire(MtmReceiverBarrier, LW_EXCLUSIVE);
481+
hooks_data->recovery_done = true;
482+
}
483+
else if (MtmAllApplyWorkersFinished())
484+
{
485+
MtmOutputPluginPrepareWrite(ctx, true, true);
486+
data->api->write_caughtup(ctx->out, data, ctx->reader->EndRecPtr);
487+
MtmOutputPluginWrite(ctx, true, true);
488+
489+
LWLockRelease(MtmReceiverBarrier);
490+
491+
/*
492+
* This hook can be called mupltiple times when there is concurrent
493+
* load, so exit right after we wrote recovery message first time during
494+
* current recovery session.
495+
*/
496+
proc_exit(0);
497+
}
483498
}
484499
}
485500

src/pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
708708
hooks_data = (MtmDecoderPrivate *) palloc0(sizeof(MtmDecoderPrivate));
709709
args->private_data = hooks_data;
710710
hooks_data->session_id = 0;
711+
hooks_data->recovery_done = false;
712+
hooks_data->is_recovery = false;
711713

712714
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
713715

src/pglogical_receiver.c

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -252,23 +252,24 @@ MtmUpdateLsnMapping(int node_id, lsn_t end_lsn)
252252
lsn_t local_flush = GetFlushRecPtr();
253253
MemoryContext old_context = MemoryContextSwitchTo(TopMemoryContext);
254254

255-
if (end_lsn != INVALID_LSN) {
255+
if (end_lsn != INVALID_LSN)
256+
{
256257
/* Track commit lsn */
257258
flushpos = (MtmFlushPosition *) palloc(sizeof(MtmFlushPosition));
258259
flushpos->node_id = node_id;
259260
flushpos->local_end = XactLastCommitEnd;
260261
flushpos->remote_end = end_lsn;
261262
dlist_push_tail(&MtmLsnMapping, &flushpos->node);
262263
}
264+
263265
MtmLock(LW_EXCLUSIVE);
264266
dlist_foreach_modify(iter, &MtmLsnMapping)
265267
{
266268
flushpos = dlist_container(MtmFlushPosition, node, iter.cur);
267269
if (flushpos->local_end <= local_flush)
268270
{
269-
if (Mtm->nodes[node_id-1].flushPos < flushpos->remote_end) {
271+
if (Mtm->nodes[node_id-1].flushPos < flushpos->remote_end)
270272
Mtm->nodes[node_id-1].flushPos = flushpos->remote_end;
271-
}
272273
dlist_delete(iter.cur);
273274
pfree(flushpos);
274275
} else {
@@ -280,15 +281,23 @@ MtmUpdateLsnMapping(int node_id, lsn_t end_lsn)
280281
}
281282

282283
static void
283-
MtmExecute(void* work, int size, MtmReceiverContext *receiver_ctx)
284+
MtmExecute(void* work, int size, MtmReceiverContext *receiver_ctx, bool cddl)
284285
{
285286
/* parallel_allowed should never be set during recovery */
286287
Assert( !(receiver_ctx->is_recovery && receiver_ctx->parallel_allowed) );
287288

288-
if (receiver_ctx->is_recovery || !receiver_ctx->parallel_allowed)
289+
// end_lsn
290+
291+
LWLockAcquire(MtmReceiverBarrier, LW_SHARED);
292+
293+
if (receiver_ctx->is_recovery || !receiver_ctx->parallel_allowed || cddl)
289294
MtmExecutor(work, size, receiver_ctx);
290295
else
291296
BgwPoolExecute(&Mtm->nodes[MtmReplicationNodeId-1].pool, work, size, receiver_ctx);
297+
298+
/* Our error handler can release lock */
299+
if (LWLockHeldByMe(MtmReceiverBarrier))
300+
LWLockRelease(MtmReceiverBarrier);
292301
}
293302

294303
/*
@@ -766,10 +775,15 @@ pglogical_receiver_main(Datum main_arg)
766775
ByteBufferReset(&buf);
767776
}
768777
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'P' || stmt[1] == 'C'))) {
769-
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
770-
MtmExecute(stmt, msg_len, &receiver_ctx);
771-
} else {
772-
MtmExecutor(stmt, msg_len, &receiver_ctx); /* all other messages can be processed by receiver itself */
778+
if (stmt[0] == 'M' && stmt[1] == 'C')
779+
{
780+
/* concurrent DDL should be executed by parallel workers */
781+
MtmExecute(stmt, msg_len, &receiver_ctx, true);
782+
}
783+
else
784+
{
785+
/* all other messages should be processed by receiver itself */
786+
MtmExecute(stmt, msg_len, &receiver_ctx, false);
773787
}
774788
} else {
775789
ByteBufferAppend(&buf, stmt, msg_len);
@@ -783,23 +797,13 @@ pglogical_receiver_main(Datum main_arg)
783797
pq_sendint(&spill_info, buf.used, 4);
784798
MtmSpillToFile(spill_file, buf.data, buf.used);
785799
MtmCloseSpillFile(spill_file);
786-
MtmExecute(spill_info.data, spill_info.len, &receiver_ctx);
800+
MtmExecute(spill_info.data, spill_info.len, &receiver_ctx, false);
787801
spill_file = -1;
788802
resetStringInfo(&spill_info);
789-
} else {
790-
if (MtmPreserveCommitOrder && buf.used == msg_len) {
791-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
792-
// timestamp_t stop, start = MtmGetSystemTime();
793-
MtmExecutor(buf.data, buf.used, &receiver_ctx);
794-
// stop = MtmGetSystemTime();
795-
// if (stop - start > USECS_PER_SEC) {
796-
// elog(WARNING, "Commit of prepared transaction takes %lld usec, flags=%x", stop - start, stmt[1]);
797-
// }
798-
} else {
799-
/* all other commits should be applied in place */
800-
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
801-
MtmExecute(buf.data, buf.used, &receiver_ctx);
802-
}
803+
}
804+
else
805+
{
806+
MtmExecute(buf.data, buf.used, &receiver_ctx, false);
803807
}
804808
} else if (spill_file >= 0) {
805809
MtmCloseSpillFile(spill_file);

tests/lib/bank_client.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ def no_prepared_tx(self):
180180
print("n_prepared = %d" % (n_prepared))
181181
return (n_prepared)
182182

183+
def list_prepared(self, node_id):
184+
con = psycopg2.connect(self.dsns[node_id] + " application_name=mtm_admin")
185+
cur = con.cursor()
186+
cur.execute('select * from pg_prepared_xacts order by prepared;')
187+
for pxact in cur.fetchall():
188+
for i, col in enumerate(["transaction", "gid", "prepared", "owner", "database", "state3pc"]):
189+
print(pxact[i], end="\t")
190+
print("\n")
191+
print("----\n")
192+
183193
def insert_counts(self):
184194
counts = []
185195

@@ -308,13 +318,12 @@ def total_tx(self, conn, cur, agg, conn_i):
308318
print(datetime.datetime.utcnow(), 'Isolation error, total ', self.total, ' -> ', total[0], ', node ', conn_i+1)
309319
self.total = total[0]
310320
print(self.oops)
311-
# yield from cur.execute('select * from mtm.get_nodes_state()')
312-
# nodes_state = yield from cur.fetchall()
313-
# for i, col in enumerate(self.nodes_state_fields):
314-
# print("%17s" % col, end="\t")
315-
# for j in range(3):
316-
# print("%19s" % nodes_state[j][i], end="\t")
317-
# print("\n")
321+
yield from cur.execute('select * from pg_prepared_xacts order by prepared;')
322+
pxacts = yield from cur.fetchall()
323+
for pxact in pxacts:
324+
for i, col in enumerate(["transaction", "gid", "prepared", "owner", "database", "state3pc"]):
325+
print(pxact[i], end="\t")
326+
print("\n")
318327

319328
def run(self):
320329
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

tests/lib/test_helper.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def awaitCommit(self, node_id):
4444
if ('commit' in aggs[node_id]['transfer']['finish'] and
4545
aggs[node_id]['transfer']['finish']['commit'] > 10):
4646
break
47+
self.client.list_prepared(node_id)
4748
time.sleep(5)
4849
total_sleep += 5
4950

0 commit comments

Comments
 (0)