Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 63c917c

Browse files
committed
[PGPRO-4074] Port LogicalDecodingCaughtUp callback.
tags: multimaster (cherry picked from commit d75ea7f66dfea0644d0bf98011f4bc2fd32bcd63)
1 parent 6a640de commit 63c917c

File tree

4 files changed

+47
-0
lines changed

4 files changed

+47
-0
lines changed

src/backend/replication/logical/logical.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,35 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10401040
error_context_stack = errcallback.previous;
10411041
}
10421042

1043+
void LogicalDecodingCaughtUp(LogicalDecodingContext *ctx)
1044+
{
1045+
LogicalErrorCallbackState state;
1046+
ErrorContextCallback errcallback;
1047+
1048+
if (ctx->callbacks.caughtup_cb == NULL)
1049+
return;
1050+
1051+
/* Push callback + info on the error context stack */
1052+
state.ctx = ctx;
1053+
state.callback_name = "caughtup";
1054+
state.report_location = ctx->reader->EndRecPtr;
1055+
errcallback.callback = output_plugin_error_callback;
1056+
errcallback.arg = (void *) &state;
1057+
errcallback.previous = error_context_stack;
1058+
error_context_stack = &errcallback;
1059+
1060+
/* set output state */
1061+
ctx->accept_writes = true;
1062+
ctx->write_xid = InvalidTransactionId;
1063+
ctx->write_location = ctx->reader->EndRecPtr;
1064+
1065+
/* do the actual work: call callback */
1066+
ctx->callbacks.caughtup_cb(ctx);
1067+
1068+
/* Pop the error context stack */
1069+
error_context_stack = errcallback.previous;
1070+
}
1071+
10431072
/*
10441073
* Set the required catalog xmin horizon for historic snapshots in the current
10451074
* replication slot.

src/backend/replication/walsender.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,7 @@ WalSndWaitForWal(XLogRecPtr loc)
13801380
{
13811381
int wakeEvents;
13821382
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1383+
bool caughtup_cb_called = false;
13831384

13841385
/*
13851386
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1459,6 +1460,15 @@ WalSndWaitForWal(XLogRecPtr loc)
14591460

14601461
/* Waiting for new WAL. Since we need to wait, we're now caught up. */
14611462
WalSndCaughtUp = true;
1463+
/*
1464+
* Call cb only once: if it writes anyting, it'll probably call
1465+
* WalSndWriteData who sets latch, thus creating busy loop.
1466+
*/
1467+
if (!caughtup_cb_called && logical_decoding_ctx)
1468+
{
1469+
LogicalDecodingCaughtUp(logical_decoding_ctx);
1470+
caughtup_cb_called = true;
1471+
}
14621472

14631473
/*
14641474
* Try to flush any pending output to the client.

src/include/replication/logical.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
120120
XLogRecPtr restart_lsn);
121121
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
122122

123+
extern void LogicalDecodingCaughtUp(LogicalDecodingContext *ctx);
124+
123125
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
124126

125127
#endif

src/include/replication/output_plugin.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
139139
*/
140140
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
141141

142+
/*
143+
* Called when WAL sender caught up.
144+
*/
145+
typedef void (*LogicalDecodeCaughtUpCB) (struct LogicalDecodingContext * ctx);
146+
142147
/*
143148
* Output plugin callbacks
144149
*/
@@ -157,6 +162,7 @@ typedef struct OutputPluginCallbacks
157162
LogicalDecodeAbortPreparedCB abort_prepared_cb;
158163
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
159164
LogicalDecodeShutdownCB shutdown_cb;
165+
LogicalDecodeCaughtUpCB caughtup_cb;
160166
} OutputPluginCallbacks;
161167

162168
/* Functions in replication/logical/logical.c */

0 commit comments

Comments
 (0)