Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1e73a04

Browse files
committed
Use origin LSN for recovery
1 parent 671f3e1 commit 1e73a04

File tree

4 files changed

+21
-15
lines changed

4 files changed

+21
-15
lines changed

contrib/mmts/pglogical_apply.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ process_remote_commit(StringInfo in)
511511
csn_t csn;
512512
const char *gid = NULL;
513513
XLogRecPtr end_lsn;
514+
XLogRecPtr origin_lsn;
514515
int n_records;
515516
/* read flags */
516517
flags = pq_getmsgbyte(in);
@@ -525,6 +526,8 @@ process_remote_commit(StringInfo in)
525526
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
526527
end_lsn = pq_getmsgint64(in); /* end_lsn */
527528
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
529+
origin_lsn = pq_getmsgint64(in);
530+
Mtm->nodes[MtmReplicationNodeId-1].restartLsn = origin_lsn;
528531

529532
Assert(replorigin_session_origin == InvalidRepOriginId);
530533

contrib/mmts/pglogical_proto.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107107
{
108108
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
109109
csn_t csn = MtmTransactionSnapshot(txn->xid);
110-
MTM_LOG1("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
110+
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111111
MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
112112

113113
if (csn == INVALID_CSN && !isRecovery) {
@@ -131,7 +131,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
131131
{
132132
uint8 flags = 0;
133133

134-
MTM_LOG1("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
134+
MTM_LOG3("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
135135
MyProcPid, txn->xid, MtmReplicationNodeId, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
136136

137137

@@ -178,11 +178,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
178178

179179
Assert(txn->xact_action != XLOG_XACT_PREPARE || txn->xid < 1000 || MtmTransactionRecords >= 2);
180180
pq_sendint(out, MtmTransactionRecords, 4);
181-
181+
182182
/* send fixed fields */
183183
pq_sendint64(out, commit_lsn);
184184
pq_sendint64(out, txn->end_lsn);
185185
pq_sendint64(out, txn->commit_time);
186+
pq_sendint64(out, txn->origin_lsn);
186187

187188
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {
188189
Assert(MtmTransactionRecords == 0);
@@ -220,7 +221,7 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
220221
if (!MtmIsFilteredTxn) {
221222
MtmTransactionRecords += 1;
222223

223-
MTM_LOG1("%d: pglogical_write_update confirmed_flush=%lx", MyProcPid, MyReplicationSlot->data.confirmed_flush);
224+
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx", MyProcPid, MyReplicationSlot->data.confirmed_flush);
224225

225226

226227
pq_sendbyte(out, 'U'); /* action UPDATE */

contrib/mmts/pglogical_receiver.c

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ pglogical_receiver_main(Datum main_arg)
210210
MtmReplicationMode mode;
211211

212212
ByteBuffer buf;
213-
XLogRecPtr originStartPos = 0;
214213
RepOriginId originId;
215214
char* originName;
216215
/* Buffer for COPY data */
@@ -248,6 +247,7 @@ pglogical_receiver_main(Datum main_arg)
248247
{
249248
int count;
250249
ConnStatusType status;
250+
XLogRecPtr originStartPos = 0;
251251

252252
/*
253253
* Determine when and how we should open replication slot.
@@ -272,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
272272
}
273273

274274
query = createPQExpBuffer();
275-
#if 1 /* Do we need to recretate slot ? */
275+
#if 1 /* Do we need to recreate slot ? */
276276
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
277277
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
278278
res = PQexec(conn, query->data);
@@ -305,8 +305,9 @@ pglogical_receiver_main(Datum main_arg)
305305

306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308-
originStartPos = 0;
309-
} else {
308+
originStartPos = Mtm->nodes[nodeId].restartLsn;
309+
}
310+
if (originStartPos == 0) {
310311
StartTransactionCommand();
311312
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
312313
originId = replorigin_by_name(originName, true);
@@ -485,7 +486,14 @@ pglogical_receiver_main(Datum main_arg)
485486
if (rc > hdr_len)
486487
{
487488
stmt = copybuf + hdr_len;
488-
489+
if (mode == REPLMODE_RECOVERED) {
490+
if (stmt[0] != 'B') {
491+
output_written_lsn = Max(walEnd, output_written_lsn);
492+
continue;
493+
}
494+
mode = REPLMODE_NORMAL;
495+
}
496+
489497
if (buf.used >= MtmTransSpillThreshold*MB) {
490498
if (spill_file < 0) {
491499
int file_id;

src/backend/replication/logical/decode.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include <time.h>
2929

3030
#include "postgres.h"
31-
#include "miscadmin.h"
3231

3332
#include "access/heapam.h"
3433
#include "access/heapam_xlog.h"
@@ -424,8 +423,6 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
424423

425424
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
426425

427-
elog(LOG, "%d: DecodeHeapOp XID=%d, info=%d", MyProcPid, xid, info);
428-
429426
/* no point in doing anything yet */
430427
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
431428
return;
@@ -816,8 +813,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
816813
char *data;
817814
RelFileNode target_node;
818815

819-
elog(LOG, "%d: DecodeUpdate XID=%d", MyProcPid);
820-
821816
xlrec = (xl_heap_update *) XLogRecGetData(r);
822817

823818
/* only interested in our database */
@@ -827,7 +822,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
827822

828823
/* output plugin doesn't look for this origin, no need to queue */
829824
if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) {
830-
elog(LOG, "%d: DecodeUpdate XID=%d filtered by origin %lx", MyProcPid, XLogRecGetOrigin(r));
831825
return;
832826
}
833827

0 commit comments

Comments
 (0)