@@ -213,10 +213,10 @@ static char const* const MtmReplicationModeName[] =
213
213
};
214
214
215
215
static void
216
- MtmExecute (void * work , int size )
216
+ MtmExecute (void * work , int size , MtmReplicationMode mode )
217
217
{
218
218
/* During recovery apply changes sequentially to preserve commit order */
219
- if (Mtm -> status == MTM_RECOVERY )
219
+ if (mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED )
220
220
MtmExecutor (work , size );
221
221
else
222
222
BgwPoolExecute (& Mtm -> nodes [MtmReplicationNodeId - 1 ].pool , work , size );
@@ -309,6 +309,8 @@ pglogical_receiver_main(Datum main_arg)
309
309
lsn_t originStartPos ;
310
310
int timeline ;
311
311
312
+ ByteBufferReset (& buf );
313
+
312
314
/*
313
315
* Determine when and how we should open replication slot.
314
316
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -581,7 +583,7 @@ pglogical_receiver_main(Datum main_arg)
581
583
if (stmt [0 ] == 'Z' || (stmt [0 ] == 'M' && (stmt [1 ] == 'L' || stmt [1 ] == 'A' || stmt [1 ] == 'C' ))) {
582
584
MTM_LOG3 ("Process '%c' message from %d" , stmt [1 ], nodeId );
583
585
if (stmt [0 ] == 'M' && stmt [1 ] == 'C' ) { /* concurrent DDL should be executed by parallel workers */
584
- MtmExecute (stmt , msg_len );
586
+ MtmExecute (stmt , msg_len , mode );
585
587
} else {
586
588
MtmExecutor (stmt , msg_len ); /* all other messages can be processed by receiver itself */
587
589
}
@@ -597,7 +599,7 @@ pglogical_receiver_main(Datum main_arg)
597
599
pq_sendint (& spill_info , buf .used , 4 );
598
600
MtmSpillToFile (spill_file , buf .data , buf .used );
599
601
MtmCloseSpillFile (spill_file );
600
- MtmExecute (spill_info .data , spill_info .len );
602
+ MtmExecute (spill_info .data , spill_info .len , mode );
601
603
spill_file = -1 ;
602
604
resetStringInfo (& spill_info );
603
605
} else {
@@ -612,7 +614,7 @@ pglogical_receiver_main(Datum main_arg)
612
614
} else {
613
615
/* all other commits should be applied in place */
614
616
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
615
- MtmExecute (buf .data , buf .used );
617
+ MtmExecute (buf .data , buf .used , mode );
616
618
}
617
619
}
618
620
} else if (spill_file >= 0 ) {
0 commit comments