Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fb67c0b

Browse files
knizhnikkelvich
authored andcommitted
Do not drop slot on end of recovery and try to explicitly specify restart position for it
1 parent 4b71181 commit fb67c0b

File tree

5 files changed

+25
-5
lines changed

5 files changed

+25
-5
lines changed

multimaster.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,8 @@ static void MtmInitialize()
18011801
Mtm->nodes[i].con = MtmConnections[i];
18021802
Mtm->nodes[i].flushPos = 0;
18031803
Mtm->nodes[i].lastHeartbeat = 0;
1804+
Mtm->nodes[i].restartLsn = 0;
1805+
Mtm->nodes[i].originId = InvalidRepOriginId;
18041806
}
18051807
PGSemaphoreCreate(&Mtm->votingSemaphore);
18061808
PGSemaphoreReset(&Mtm->votingSemaphore);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ typedef struct
146146
XLogRecPtr flushPos;
147147
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
148148
XLogRecPtr restartLsn;
149+
RepOriginId originId;
149150
} MtmNodeInfo;
150151

151152
typedef struct MtmTransState

pglogical_apply.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,13 @@ MtmEndSession(void)
539539
static void
540540
process_remote_commit(StringInfo in)
541541
{
542+
int i;
542543
uint8 flags;
543544
csn_t csn;
544545
const char *gid = NULL;
545546
XLogRecPtr end_lsn;
546547
XLogRecPtr origin_lsn;
548+
RepOriginId originId;
547549
int n_records;
548550
/* read flags */
549551
flags = pq_getmsgbyte(in);
@@ -558,9 +560,21 @@ process_remote_commit(StringInfo in)
558560
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
559561
end_lsn = pq_getmsgint64(in); /* end_lsn */
560562
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
561-
origin_lsn = pq_getmsgint64(in);
562-
Mtm->nodes[MtmReplicationNodeId-1].restartLsn = origin_lsn;
563-
563+
564+
originId = (RepOriginId)pq_getmsgint(in, 2);
565+
origin_lsn = pq_getmsgint64(in);
566+
567+
if (originId != InvalidRepOriginId) {
568+
for (i = 0; i < Mtm->nAllNodes; i++) {
569+
if (Mtm->nodes[i].originId == originId) {
570+
Mtm->nodes[i].restartLsn = origin_lsn;
571+
break;
572+
}
573+
}
574+
if (i == Mtm->nAllNodes) {
575+
elog(WARNING, "Failed to map origin %d", originId);
576+
}
577+
}
564578
Assert(replorigin_session_origin == InvalidRepOriginId);
565579

566580
switch(PGLOGICAL_XACT_EVENT(flags))

pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
192192
pq_sendint64(out, commit_lsn);
193193
pq_sendint64(out, txn->end_lsn);
194194
pq_sendint64(out, txn->commit_time);
195+
196+
pq_sendint(out, txn->origin_id, 2);
195197
pq_sendint64(out, txn->origin_lsn);
196198

197199
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {

pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
272272
}
273273

274274
query = createPQExpBuffer();
275-
#if 1 /* Do we need to recreate slot ? */
275+
#if 0 /* Do we need to recreate slot ? */
276276
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
277277
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
278278
res = PQexec(conn, query->data);
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305

306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308-
originStartPos = Mtm->nodes[nodeId].restartLsn;
308+
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
309309
}
310310
if (originStartPos == 0) {
311311
StartTransactionCommand();
@@ -325,6 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
originStartPos = replorigin_get_progress(originId, false);
326326
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
327327
}
328+
Mtm->nodes[nodeId-1].originId = originId;
328329
CommitTransactionCommand();
329330
}
330331

0 commit comments

Comments
 (0)