@@ -522,16 +522,18 @@ pglogical_receiver_main(Datum main_arg)
522
522
523
523
if (rc > hdr_len )
524
524
{
525
+ int msg_len = rc - hdr_len ;
525
526
stmt = copybuf + hdr_len ;
526
527
if (mode == REPLMODE_RECOVERED ) {
528
+ /* Ingore all incompleted transactions from recovered node */
527
529
if (stmt [0 ] != 'B' ) {
528
530
output_written_lsn = Max (walEnd , output_written_lsn );
529
531
continue ;
530
532
}
531
533
mode = REPLMODE_OPEN_EXISTED ;
532
534
}
533
535
MTM_LOG3 ("Receive message %c from node %d" , stmt [0 ], nodeId );
534
- if (buf .used >= MtmTransSpillThreshold * MB ) {
536
+ if (buf .used + msg_len + 1 >= MtmTransSpillThreshold * MB ) {
535
537
if (spill_file < 0 ) {
536
538
int file_id ;
537
539
spill_file = MtmCreateSpillFile (nodeId , & file_id );
@@ -548,15 +550,15 @@ pglogical_receiver_main(Datum main_arg)
548
550
if (stmt [0 ] == 'Z' || (stmt [0 ] == 'M' && (stmt [1 ] == 'L' || stmt [1 ] == 'A' || stmt [1 ] == 'C' ))) {
549
551
MTM_LOG3 ("Process '%c' message from %d" , stmt [1 ], nodeId );
550
552
if (stmt [0 ] == 'M' && stmt [1 ] == 'C' ) { /* concurrent DDL should be executed by parallel workers */
551
- MtmExecute (stmt , rc - hdr_len );
553
+ MtmExecute (stmt , msg_len );
552
554
} else {
553
- MtmExecutor (stmt , rc - hdr_len ); /* all other messages can be processed by receiver itself */
555
+ MtmExecutor (stmt , msg_len ); /* all other messages can be processed by receiver itself */
554
556
}
555
557
} else {
556
- ByteBufferAppend (& buf , stmt , rc - hdr_len );
558
+ ByteBufferAppend (& buf , stmt , msg_len );
557
559
if (stmt [0 ] == 'C' ) /* commit */
558
560
{
559
- if (!MtmFilterTransaction (stmt , rc - hdr_len ))
561
+ if (!MtmFilterTransaction (stmt , msg_len ))
560
562
{
561
563
if (spill_file >= 0 ) {
562
564
ByteBufferAppend (& buf , ")" , 1 );
@@ -568,7 +570,7 @@ pglogical_receiver_main(Datum main_arg)
568
570
spill_file = -1 ;
569
571
resetStringInfo (& spill_info );
570
572
} else {
571
- if (MtmPreserveCommitOrder && buf .used == rc - hdr_len ) {
573
+ if (MtmPreserveCommitOrder && buf .used == msg_len ) {
572
574
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
573
575
timestamp_t stop , start = MtmGetSystemTime ();
574
576
MtmExecutor (buf .data , buf .used );
0 commit comments