@@ -217,15 +217,14 @@ pglogical_receiver_main(Datum main_arg)
217
217
MtmReplicationMode mode ;
218
218
219
219
ByteBuffer buf ;
220
- RepOriginId originId ;
221
- char * originName ;
222
220
/* Buffer for COPY data */
223
221
char * copybuf = NULL ;
224
222
int spill_file = -1 ;
225
223
StringInfoData spill_info ;
226
224
char * slotName ;
227
225
char * connString = psprintf ("replication=database %s" , Mtm -> nodes [nodeId - 1 ].con .connStr );
228
226
static PortalData fakePortal ;
227
+ int i ;
229
228
230
229
MtmBackgroundWorker = true;
231
230
@@ -258,16 +257,27 @@ pglogical_receiver_main(Datum main_arg)
258
257
ActivePortal -> status = PORTAL_ACTIVE ;
259
258
ActivePortal -> sourceText = "" ;
260
259
261
- /* Create originid */
262
- StartTransactionCommand ();
263
- originName = psprintf (MULTIMASTER_SLOT_PATTERN , nodeId );
264
- originId = replorigin_by_name (originName , true);
265
- if (originId == InvalidRepOriginId ) {
266
- originId = replorigin_create (originName );
260
+ /*
261
+ * Set proper restartLsn for all origins
262
+ */
263
+ MtmLock (LW_EXCLUSIVE );
264
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
265
+ {
266
+ char * originName ;
267
+ RepOriginId originId ;
268
+
269
+ StartTransactionCommand ();
270
+ originName = psprintf (MULTIMASTER_SLOT_PATTERN , i + 1 );
271
+ originId = replorigin_by_name (originName , true);
272
+ if (originId == InvalidRepOriginId ) {
273
+ originId = replorigin_create (originName );
274
+ }
275
+ CommitTransactionCommand ();
276
+ if (Mtm -> nodes [i ].restartLSN == INVALID_LSN )
277
+ Mtm -> nodes [i ].restartLSN = replorigin_get_progress (originId , true);
278
+ Mtm -> nodes [i ].originId = originId ;
267
279
}
268
- CommitTransactionCommand ();
269
- Mtm -> nodes [nodeId - 1 ].originId = originId ;
270
- Mtm -> nodes [nodeId - 1 ].restartLSN = INVALID_LSN ;
280
+ MtmUnlock ();
271
281
272
282
/* This is main loop of logical replication.
273
283
* In case of errors we will try to reestablish connection.
@@ -277,7 +287,7 @@ pglogical_receiver_main(Datum main_arg)
277
287
{
278
288
int count ;
279
289
ConnStatusType status ;
280
- lsn_t originStartPos = Mtm -> nodes [ nodeId - 1 ]. restartLSN ;
290
+ lsn_t originStartPos ;
281
291
int timeline ;
282
292
283
293
/*
@@ -308,7 +318,7 @@ pglogical_receiver_main(Datum main_arg)
308
318
query = createPQExpBuffer ();
309
319
310
320
/* Start logical replication at specified position */
311
- originStartPos = replorigin_get_progress (originId , false);
321
+ originStartPos = replorigin_get_progress (Mtm -> nodes [ nodeId - 1 ]. originId , false);
312
322
if (originStartPos == INVALID_LSN || Mtm -> nodes [nodeId - 1 ].manualRecovery ) {
313
323
/*
314
324
* We are just creating new replication slot.
@@ -337,7 +347,7 @@ pglogical_receiver_main(Datum main_arg)
337
347
MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
338
348
Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
339
349
}
340
- MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" , originStartPos , originId , nodeId );
350
+ MTM_LOG1 ("Restart logical receiver at position %llx from node %d" , originStartPos , nodeId );
341
351
}
342
352
343
353
MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
0 commit comments