Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7d24637

Browse files
committed
Do not delete slots upon reconnect
1 parent d4ef72d commit 7d24637

File tree

4 files changed

+45
-51
lines changed

4 files changed

+45
-51
lines changed

contrib/mmts/multimaster.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,11 @@ MtmAdjustOldestXid(TransactionId xid)
772772
Mtm->transListHead = prev;
773773
}
774774
}
775+
776+
if (!MyReplicationSlot) {
777+
MtmCheckSlots();
778+
}
779+
775780
return xid;
776781
}
777782

@@ -1551,9 +1556,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15511556
MtmUnlock();
15521557

15531558
MtmResetTransaction();
1554-
if (!MyReplicationSlot) {
1555-
MtmCheckSlots();
1556-
}
15571559
if (MtmClusterLocked) {
15581560
MtmUnlockCluster();
15591561
}
@@ -2728,6 +2730,7 @@ static void MtmInitialize()
27282730
Mtm->nodes[i].originId = InvalidRepOriginId;
27292731
Mtm->nodes[i].timeline = 0;
27302732
Mtm->nodes[i].nHeartbeats = 0;
2733+
Mtm->nodes[i].slotDeleted = false;
27312734
}
27322735
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
27332736
/* All transaction originated from the current node should be ignored during recovery */
@@ -3791,8 +3794,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37913794
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
37923795
MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
37933796
MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
3794-
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3795-
|| recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
3797+
// Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3798+
// || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
37963799
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
37973800
}
37983801
} else {

contrib/mmts/multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ typedef struct
235235
int lockGraphAllocated;
236236
int lockGraphUsed;
237237
uint64 nHeartbeats;
238+
bool slotDeleted; /* Signalizes that node is already deleted our slot and
239+
* recovery from that node isn't possible.
240+
*/
238241
} MtmNodeInfo;
239242

240243
typedef struct MtmL2List

contrib/mmts/pglogical_receiver.c

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -299,23 +299,16 @@ pglogical_receiver_main(Datum main_arg)
299299
}
300300

301301
query = createPQExpBuffer();
302-
if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm->nodes[nodeId-1].timeline)
303-
|| mode == REPLMODE_CREATE_NEW)
304-
{ /* recreate slot */
305-
timestamp_t start = MtmGetSystemTime();
306-
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
307-
res = PQexec(conn, query->data);
308-
elog(LOG, "Drop replication slot %s: %ld milliseconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
309-
PQclear(res);
310-
resetPQExpBuffer(query);
311-
timeline = Mtm->nodes[nodeId-1].timeline;
312-
}
313-
/* My original assumption was that we can perfrom recovery only from existed slot,
314-
* but unfortunately looks like slots can "disapear" together with WAL-sender.
315-
* So let's try to recreate slot always. */
316-
/* if (mode != REPLMODE_REPLICATION) */
317-
{
318-
timestamp_t start = MtmGetSystemTime();
302+
303+
/* Start logical replication at specified position */
304+
originStartPos = replorigin_get_progress(originId, false);
305+
if (originStartPos == INVALID_LSN) {
306+
/*
307+
* We are just creating new replication slot.
308+
* It is assumed that state of local and remote nodes is the same at this moment.
309+
* They are either empty, either new node is synchronized using base_backup.
310+
* So we assume that LSNs are the same for local and remote node
311+
*/
319312
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", slotName, MULTIMASTER_NAME);
320313
res = PQexec(conn, query->data);
321314
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -329,30 +322,14 @@ pglogical_receiver_main(Datum main_arg)
329322
goto OnError;
330323
}
331324
}
332-
elog(LOG, "Recreate replication slot %s: %ld milliseconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
333325
PQclear(res);
334326
resetPQExpBuffer(query);
335-
}
336-
337-
/* Start logical replication at specified position */
338-
if (originStartPos == INVALID_LSN) {
339-
originStartPos = replorigin_get_progress(originId, false);
340-
if (originStartPos == INVALID_LSN) {
341-
/*
342-
* We are just creating new replication slot.
343-
* It is assumed that state of local and remote nodes is the same at this moment.
344-
* Them are either empty, either new node is synchronized using base_backup.
345-
* So we assume that LSNs are the same for local and remote node
346-
*/
347-
originStartPos = INVALID_LSN;
348-
MTM_LOG1("Start logical receiver at position %llx from node %d", originStartPos, nodeId);
349-
} else {
350-
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
351-
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
352-
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
353-
}
354-
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d", originStartPos, originId, nodeId);
327+
} else {
328+
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
329+
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
330+
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
355331
}
332+
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d", originStartPos, originId, nodeId);
356333
}
357334

358335
MTM_LOG1("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx",
@@ -371,10 +348,21 @@ pglogical_receiver_main(Datum main_arg)
371348
res = PQexec(conn, query->data);
372349
if (PQresultStatus(res) != PGRES_COPY_BOTH)
373350
{
374-
PQclear(res);
375-
ereport(WARNING, (MTM_ERRMSG("%s: Could not start logical replication",
376-
worker_proc)));
377-
goto OnError;
351+
int i, n_deleted_slots = 0;
352+
353+
elog(WARNING, "Can't find slot on node%d. Shutting down receiver.", nodeId);
354+
Mtm->nodes[nodeId-1].slotDeleted = true;
355+
for (i = 0; i < Mtm->nAllNodes; i++)
356+
{
357+
if (Mtm->nodes[i].slotDeleted)
358+
n_deleted_slots++;
359+
}
360+
if (n_deleted_slots == Mtm->nAllNodes - 1)
361+
{
362+
elog(WARNING, "All neighbour nopes have no replication slot for us. Exiting.");
363+
kill(PostmasterPid, SIGTERM);
364+
}
365+
proc_exit(1);
378366
}
379367
PQclear(res);
380368
resetPQExpBuffer(query);

contrib/mmts/t/004_recovery.pl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
$cluster->pgbench(0, ('-i', -s => '10') );
1515

1616
# kill node while neighbour is under load
17-
my $pgb_handle = $cluster->pgbench_async(0, ('-N', -T => '10') );
17+
my $pgb_handle = $cluster->pgbench_async(1, ('-N', -T => '10') );
1818
sleep(5);
1919
$cluster->{nodes}->[2]->stop('fast');
2020
$cluster->pgbench_await($pgb_handle);
2121

2222
# start node while neighbour is under load
23-
$pgb_handle = $cluster->pgbench_async(0, ('-N', -T => '10') );
24-
sleep(5);
23+
$pgb_handle = $cluster->pgbench_async(0, ('-N', -T => '50') );
24+
sleep(10);
2525
$cluster->{nodes}->[2]->start;
2626
$cluster->pgbench_await($pgb_handle);
2727

28-
# give it 10s to recover
28+
# give it extra 10s to recover
2929
sleep(10);
3030

3131
# check data identity

0 commit comments

Comments
 (0)