Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d0d17dc

Browse files
knizhnikkelvich
authored andcommitted
Use origin LSN for recovery
1 parent abbad76 commit d0d17dc

File tree

3 files changed

+28
-26
lines changed

3 files changed

+28
-26
lines changed

pglogical_apply.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ process_remote_commit(StringInfo in)
543543
csn_t csn;
544544
const char *gid = NULL;
545545
XLogRecPtr end_lsn;
546+
XLogRecPtr origin_lsn;
546547
int n_records;
547548
/* read flags */
548549
flags = pq_getmsgbyte(in);
@@ -557,6 +558,8 @@ process_remote_commit(StringInfo in)
557558
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
558559
end_lsn = pq_getmsgint64(in); /* end_lsn */
559560
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
561+
origin_lsn = pq_getmsgint64(in);
562+
Mtm->nodes[MtmReplicationNodeId-1].restartLsn = origin_lsn;
560563

561564
Assert(replorigin_session_origin == InvalidRepOriginId);
562565

pglogical_proto.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107107
{
108108
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
109109
csn_t csn = MtmTransactionSnapshot(txn->xid);
110-
MTM_LOG1("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
110+
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111111
MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
112112

113113
if (csn == INVALID_CSN && !isRecovery) {
@@ -140,7 +140,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
140140
{
141141
uint8 flags = 0;
142142

143-
MTM_LOG1("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
143+
MTM_LOG3("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
144144
MyProcPid, txn->xid, MtmReplicationNodeId, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
145145

146146

@@ -187,11 +187,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187

188188
Assert(txn->xact_action != XLOG_XACT_PREPARE || txn->xid < 1000 || MtmTransactionRecords >= 2);
189189
pq_sendint(out, MtmTransactionRecords, 4);
190-
190+
191191
/* send fixed fields */
192192
pq_sendint64(out, commit_lsn);
193193
pq_sendint64(out, txn->end_lsn);
194194
pq_sendint64(out, txn->commit_time);
195+
pq_sendint64(out, txn->origin_lsn);
195196

196197
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {
197198
Assert(MtmTransactionRecords == 0);
@@ -229,7 +230,7 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
229230
if (!MtmIsFilteredTxn) {
230231
MtmTransactionRecords += 1;
231232

232-
MTM_LOG1("%d: pglogical_write_update confirmed_flush=%lx", MyProcPid, MyReplicationSlot->data.confirmed_flush);
233+
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx", MyProcPid, MyReplicationSlot->data.confirmed_flush);
233234

234235

235236
pq_sendbyte(out, 'U'); /* action UPDATE */

pglogical_receiver.c

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ pglogical_receiver_main(Datum main_arg)
210210
MtmReplicationMode mode;
211211

212212
ByteBuffer buf;
213-
XLogRecPtr originStartPos = 0;
214213
RepOriginId originId;
215214
char* originName;
216215
/* Buffer for COPY data */
@@ -248,6 +247,7 @@ pglogical_receiver_main(Datum main_arg)
248247
{
249248
int count;
250249
ConnStatusType status;
250+
XLogRecPtr originStartPos = 0;
251251

252252
/*
253253
* Determine when and how we should open replication slot.
@@ -272,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
272272
}
273273

274274
query = createPQExpBuffer();
275-
#if 1 /* Do we need to recretate slot ? */
275+
#if 1 /* Do we need to recreate slot ? */
276276
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
277277
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
278278
res = PQexec(conn, query->data);
@@ -305,8 +305,9 @@ pglogical_receiver_main(Datum main_arg)
305305

306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308-
originStartPos = 0;
309-
} else {
308+
originStartPos = Mtm->nodes[nodeId].restartLsn;
309+
}
310+
if (originStartPos == 0) {
310311
StartTransactionCommand();
311312
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
312313
originId = replorigin_by_name(originName, true);
@@ -485,25 +486,22 @@ pglogical_receiver_main(Datum main_arg)
485486
if (rc > hdr_len)
486487
{
487488
stmt = copybuf + hdr_len;
488-
489-
if (buf.used >= MtmTransSpillThreshold*MB) {
490-
if (spill_file < 0) {
491-
int file_id;
492-
spill_file = MtmCreateSpillFile(nodeId, &file_id);
493-
pq_sendbyte(&spill_info, 'F');
494-
pq_sendint(&spill_info, nodeId, 4);
495-
pq_sendint(&spill_info, file_id, 4);
489+
if (mode == REPLMODE_RECOVERED) {
490+
if (stmt[0] != 'B') {
491+
output_written_lsn = Max(walEnd, output_written_lsn);
492+
continue;
493+
}
494+
mode = REPLMODE_NORMAL;
496495
}
497-
ByteBufferAppend(&buf, ")", 1);
498-
pq_sendbyte(&spill_info, '(');
499-
pq_sendint(&spill_info, buf.used, 4);
500-
MtmSpillToFile(spill_file, buf.data, buf.used);
501-
ByteBufferReset(&buf);
502-
}
503-
ByteBufferAppend(&buf, stmt, rc - hdr_len);
504-
if (stmt[0] == 'C') /* commit|prepare */
505-
{
506-
if (spill_file >= 0) {
496+
497+
if (buf.used >= MtmTransSpillThreshold*MB) {
498+
if (spill_file < 0) {
499+
int file_id;
500+
spill_file = MtmCreateSpillFile(nodeId, &file_id);
501+
pq_sendbyte(&spill_info, 'F');
502+
pq_sendint(&spill_info, nodeId, 4);
503+
pq_sendint(&spill_info, file_id, 4);
504+
}
507505
ByteBufferAppend(&buf, ")", 1);
508506
pq_sendbyte(&spill_info, '(');
509507
pq_sendint(&spill_info, buf.used, 4);

0 commit comments

Comments
 (0)