Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 98f044d

Browse files
committed
Revert "apply tx in receiver itself during REPLMODE_RECOVERED"
This reverts commit 407f582.
1 parent ec97513 commit 98f044d

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

pglogical_receiver.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ static char const* const MtmReplicationModeName[] =
213213
};
214214

215215
static void
216-
MtmExecute(void* work, int size, MtmReplicationMode mode)
216+
MtmExecute(void* work, int size)
217217
{
218218
/* During recovery apply changes sequentially to preserve commit order */
219-
if (mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED)
219+
if (Mtm->status == MTM_RECOVERY)
220220
MtmExecutor(work, size);
221221
else
222222
BgwPoolExecute(&Mtm->nodes[MtmReplicationNodeId-1].pool, work, size);
@@ -309,8 +309,6 @@ pglogical_receiver_main(Datum main_arg)
309309
lsn_t originStartPos;
310310
int timeline;
311311

312-
ByteBufferReset(&buf);
313-
314312
/*
315313
* Determine when and how we should open replication slot.
316314
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -583,7 +581,7 @@ pglogical_receiver_main(Datum main_arg)
583581
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C'))) {
584582
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
585583
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
586-
MtmExecute(stmt, msg_len, mode);
584+
MtmExecute(stmt, msg_len);
587585
} else {
588586
MtmExecutor(stmt, msg_len); /* all other messages can be processed by receiver itself */
589587
}
@@ -599,7 +597,7 @@ pglogical_receiver_main(Datum main_arg)
599597
pq_sendint(&spill_info, buf.used, 4);
600598
MtmSpillToFile(spill_file, buf.data, buf.used);
601599
MtmCloseSpillFile(spill_file);
602-
MtmExecute(spill_info.data, spill_info.len, mode);
600+
MtmExecute(spill_info.data, spill_info.len);
603601
spill_file = -1;
604602
resetStringInfo(&spill_info);
605603
} else {
@@ -614,7 +612,7 @@ pglogical_receiver_main(Datum main_arg)
614612
} else {
615613
/* all other commits should be applied in place */
616614
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
617-
MtmExecute(buf.data, buf.used, mode);
615+
MtmExecute(buf.data, buf.used);
618616
}
619617
}
620618
} else if (spill_file >= 0) {

0 commit comments

Comments
 (0)