Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 71924e3

Browse files
knizhnikkelvich
authored andcommitted
Detect node disconnection in case of pglogical_receiver failures
1 parent 20cbd9c commit 71924e3

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ void MtmCheckQuorum(void)
12891289
}
12901290
} else {
12911291
if (Mtm->status == MTM_IN_MINORITY) {
1292-
MTM_LOG1("Node is in majority: dissbled mask %lx", (long) Mtm->disabledNodeMask);
1292+
MTM_LOG1("Node is in majority: disabled mask %lx", (long) Mtm->disabledNodeMask);
12931293
MtmSwitchClusterMode(MTM_ONLINE);
12941294
}
12951295
}

pglogical_receiver.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ static char const* const MtmReplicationModeName[] =
194194
{
195195
"recovered", /* SLOT_CREATE_NEW: recovery of node is completed so drop old slot and restart replication from the current position in WAL */
196196
"recovery", /* SLOT_OPEN_EXISTED: perform recorvery of the node by applying all data from theslot from specified point */
197-
"normal" /* SLOT_OPEN_ALWAYS: normal mode: use existeed slot or create new one and start receiving data from it from the specified position */
197+
"normal" /* SLOT_OPEN_ALWAYS: normal mode: use existed slot or create new one and start receiving data from it from the specified position */
198198
};
199199

200200
static void
@@ -248,6 +248,7 @@ pglogical_receiver_main(Datum main_arg)
248248
PQfinish(conn);
249249
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
250250
worker_proc)));
251+
/* Do not make decision about node status here because at startup peer node may just no yet started */
251252
/* MtmOnNodeDisconnect(nodeId); */
252253
proc_exit(1);
253254
}
@@ -271,6 +272,7 @@ pglogical_receiver_main(Datum main_arg)
271272
PQclear(res);
272273
ereport(ERROR, (errmsg("%s: Could not create logical slot",
273274
worker_proc)));
275+
MtmOnNodeDisconnect(nodeId);
274276
proc_exit(1);
275277
}
276278
}
@@ -312,6 +314,7 @@ pglogical_receiver_main(Datum main_arg)
312314
PQclear(res);
313315
ereport(WARNING, (errmsg("%s: Could not start logical replication",
314316
worker_proc)));
317+
MtmOnNodeDisconnect(nodeId);
315318
proc_exit(1);
316319
}
317320
PQclear(res);
@@ -402,6 +405,7 @@ pglogical_receiver_main(Datum main_arg)
402405
{
403406
ereport(LOG, (errmsg("%s: streaming header too small: %d",
404407
worker_proc, rc)));
408+
MtmOnNodeDisconnect(nodeId);
405409
proc_exit(1);
406410
}
407411
replyRequested = copybuf[pos];
@@ -421,15 +425,18 @@ pglogical_receiver_main(Datum main_arg)
421425
int64 now = feGetCurrentTimestamp();
422426

423427
/* Leave is feedback is not sent properly */
424-
if (!sendFeedback(conn, now, nodeId))
428+
if (!sendFeedback(conn, now, nodeId)) {
429+
MtmOnNodeDisconnect(nodeId);
425430
proc_exit(1);
431+
}
426432
}
427433
continue;
428434
}
429435
else if (copybuf[0] != 'w')
430436
{
431437
ereport(LOG, (errmsg("%s: Incorrect streaming header",
432438
worker_proc)));
439+
MtmOnNodeDisconnect(nodeId);
433440
proc_exit(1);
434441
}
435442

@@ -538,6 +545,7 @@ pglogical_receiver_main(Datum main_arg)
538545
{
539546
ereport(LOG, (errmsg("%s: Incorrect status received... Leaving.",
540547
worker_proc)));
548+
MtmOnNodeDisconnect(nodeId);
541549
proc_exit(1);
542550
}
543551

@@ -546,6 +554,7 @@ pglogical_receiver_main(Datum main_arg)
546554
{
547555
ereport(LOG, (errmsg("%s: Data remaining on the socket... Leaving.",
548556
worker_proc)));
557+
MtmOnNodeDisconnect(nodeId);
549558
proc_exit(1);
550559
}
551560
continue;
@@ -564,6 +573,7 @@ pglogical_receiver_main(Datum main_arg)
564573
{
565574
ereport(LOG, (errmsg("%s: Failure while receiving changes...",
566575
worker_proc)));
576+
MtmOnNodeDisconnect(nodeId);
567577
proc_exit(1);
568578
}
569579
}

0 commit comments

Comments
 (0)