@@ -301,23 +301,16 @@ pglogical_receiver_main(Datum main_arg)
301
301
}
302
302
303
303
query = createPQExpBuffer ();
304
- if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm -> nodes [nodeId - 1 ].timeline )
305
- || mode == REPLMODE_CREATE_NEW )
306
- { /* recreate slot */
307
- timestamp_t start = MtmGetSystemTime ();
308
- appendPQExpBuffer (query , "DROP_REPLICATION_SLOT \"%s\"" , slotName );
309
- res = PQexec (conn , query -> data );
310
- elog (LOG , "Drop replication slot %s: %ld milliseconds" , slotName , (long )USEC_TO_MSEC (MtmGetSystemTime () - start ));
311
- PQclear (res );
312
- resetPQExpBuffer (query );
313
- timeline = Mtm -> nodes [nodeId - 1 ].timeline ;
314
- }
315
- /* My original assumption was that we can perfrom recovery only from existed slot,
316
- * but unfortunately looks like slots can "disapear" together with WAL-sender.
317
- * So let's try to recreate slot always. */
318
- /* if (mode != REPLMODE_REPLICATION) */
319
- {
320
- timestamp_t start = MtmGetSystemTime ();
304
+
305
+ /* Start logical replication at specified position */
306
+ originStartPos = replorigin_get_progress (originId , false);
307
+ if (originStartPos == INVALID_LSN ) {
308
+ /*
309
+ * We are just creating new replication slot.
310
+ * It is assumed that state of local and remote nodes is the same at this moment.
311
+ * They are either empty, either new node is synchronized using base_backup.
312
+ * So we assume that LSNs are the same for local and remote node
313
+ */
321
314
appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , slotName , MULTIMASTER_NAME );
322
315
res = PQexec (conn , query -> data );
323
316
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
@@ -331,30 +324,14 @@ pglogical_receiver_main(Datum main_arg)
331
324
goto OnError ;
332
325
}
333
326
}
334
- elog (LOG , "Recreate replication slot %s: %ld milliseconds" , slotName , (long )USEC_TO_MSEC (MtmGetSystemTime () - start ));
335
327
PQclear (res );
336
328
resetPQExpBuffer (query );
337
- }
338
-
339
- /* Start logical replication at specified position */
340
- if (originStartPos == INVALID_LSN ) {
341
- originStartPos = replorigin_get_progress (originId , false);
342
- if (originStartPos == INVALID_LSN ) {
343
- /*
344
- * We are just creating new replication slot.
345
- * It is assumed that state of local and remote nodes is the same at this moment.
346
- * Them are either empty, either new node is synchronized using base_backup.
347
- * So we assume that LSNs are the same for local and remote node
348
- */
349
- originStartPos = INVALID_LSN ;
350
- MTM_LOG1 ("Start logical receiver at position %llx from node %d" , originStartPos , nodeId );
351
- } else {
352
- if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
353
- MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
354
- Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
355
- }
356
- MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" , originStartPos , originId , nodeId );
329
+ } else {
330
+ if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
331
+ MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
332
+ Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
357
333
}
334
+ MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" , originStartPos , originId , nodeId );
358
335
}
359
336
360
337
MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
@@ -373,10 +350,21 @@ pglogical_receiver_main(Datum main_arg)
373
350
res = PQexec (conn , query -> data );
374
351
if (PQresultStatus (res ) != PGRES_COPY_BOTH )
375
352
{
376
- PQclear (res );
377
- ereport (WARNING , (MTM_ERRMSG ("%s: Could not start logical replication" ,
378
- worker_proc )));
379
- goto OnError ;
353
+ int i , n_deleted_slots = 0 ;
354
+
355
+ elog (WARNING , "Can't find slot on node%d. Shutting down receiver." , nodeId );
356
+ Mtm -> nodes [nodeId - 1 ].slotDeleted = true;
357
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
358
+ {
359
+ if (Mtm -> nodes [i ].slotDeleted )
360
+ n_deleted_slots ++ ;
361
+ }
362
+ if (n_deleted_slots == Mtm -> nAllNodes - 1 )
363
+ {
364
+ elog (WARNING , "All neighbour nopes have no replication slot for us. Exiting." );
365
+ kill (PostmasterPid , SIGTERM );
366
+ }
367
+ proc_exit (1 );
380
368
}
381
369
PQclear (res );
382
370
resetPQExpBuffer (query );
0 commit comments