Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 34f55fa

Browse files
committed
apply tx in receiver itself during REPLMODE_RECOVERED
1 parent 8bf872e commit 34f55fa

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

pglogical_receiver.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ static char const* const MtmReplicationModeName[] =
213213
};
214214

215215
static void
216-
MtmExecute(void* work, int size)
216+
MtmExecute(void* work, int size, MtmReplicationMode mode)
217217
{
218218
/* During recovery apply changes sequentially to preserve commit order */
219-
if (Mtm->status == MTM_RECOVERY)
219+
if (mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED)
220220
MtmExecutor(work, size);
221221
else
222222
BgwPoolExecute(&Mtm->nodes[MtmReplicationNodeId-1].pool, work, size);
@@ -309,6 +309,8 @@ pglogical_receiver_main(Datum main_arg)
309309
lsn_t originStartPos;
310310
int timeline;
311311

312+
ByteBufferReset(&buf);
313+
312314
/*
313315
* Determine when and how we should open replication slot.
314316
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -581,7 +583,7 @@ pglogical_receiver_main(Datum main_arg)
581583
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C'))) {
582584
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
583585
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
584-
MtmExecute(stmt, msg_len);
586+
MtmExecute(stmt, msg_len, mode);
585587
} else {
586588
MtmExecutor(stmt, msg_len); /* all other messages can be processed by receiver itself */
587589
}
@@ -597,7 +599,7 @@ pglogical_receiver_main(Datum main_arg)
597599
pq_sendint(&spill_info, buf.used, 4);
598600
MtmSpillToFile(spill_file, buf.data, buf.used);
599601
MtmCloseSpillFile(spill_file);
600-
MtmExecute(spill_info.data, spill_info.len);
602+
MtmExecute(spill_info.data, spill_info.len, mode);
601603
spill_file = -1;
602604
resetStringInfo(&spill_info);
603605
} else {
@@ -612,7 +614,7 @@ pglogical_receiver_main(Datum main_arg)
612614
} else {
613615
/* all other commits should be applied in place */
614616
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
615-
MtmExecute(buf.data, buf.used);
617+
MtmExecute(buf.data, buf.used, mode);
616618
}
617619
}
618620
} else if (spill_file >= 0) {

0 commit comments

Comments
 (0)