@@ -330,6 +330,16 @@ pglogical_receiver_main(Datum main_arg)
330
330
timeline = Mtm -> nodes [nodeId - 1 ].timeline ;
331
331
count = Mtm -> recoveryCount ;
332
332
333
+
334
+ /* Set proper restartLSN for filtering */
335
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
336
+ {
337
+ if (i == MtmNodeId - 1 )
338
+ continue ;
339
+ Mtm -> nodes [i ].restartLSN = replorigin_get_progress (Mtm -> nodes [i ].originId , false);
340
+ MTM_LOG1 ("Node %d restartLSN -> %llx" , i + 1 , Mtm -> nodes [i ].restartLSN );
341
+ }
342
+
333
343
/* Establish connection to remote server */
334
344
conn = PQconnectdb_safe (connString , 0 );
335
345
status = PQstatus (conn );
@@ -343,7 +353,7 @@ pglogical_receiver_main(Datum main_arg)
343
353
query = createPQExpBuffer ();
344
354
345
355
/* Start logical replication at specified position */
346
- originStartPos = replorigin_get_progress ( Mtm -> nodes [nodeId - 1 ].originId , false) ;
356
+ originStartPos = Mtm -> nodes [nodeId - 1 ].restartLSN ;
347
357
if (originStartPos == INVALID_LSN || Mtm -> nodes [nodeId - 1 ].manualRecovery ) {
348
358
/*
349
359
* We are just creating new replication slot.
@@ -367,12 +377,6 @@ pglogical_receiver_main(Datum main_arg)
367
377
PQclear (res );
368
378
resetPQExpBuffer (query );
369
379
Mtm -> nodes [nodeId - 1 ].manualRecovery = false;
370
- } else {
371
- if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
372
- MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
373
- Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
374
- }
375
- MTM_LOG1 ("Restart logical receiver at position %llx from node %d" , originStartPos , nodeId );
376
380
}
377
381
378
382
MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
0 commit comments