Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 55558df

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parent b9d70ef commit 55558df

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

src/backend/replication/logical/logical.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
622622

623623
/* set output state */
624624
ctx->accept_writes = false;
625+
ctx->end_xact = false;
625626

626627
/* do the actual work: call callback */
627628
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -649,6 +650,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
649650

650651
/* set output state */
651652
ctx->accept_writes = false;
653+
ctx->end_xact = false;
652654

653655
/* do the actual work: call callback */
654656
ctx->callbacks.shutdown_cb(ctx);
@@ -684,6 +686,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
684686
ctx->accept_writes = true;
685687
ctx->write_xid = txn->xid;
686688
ctx->write_location = txn->first_lsn;
689+
ctx->end_xact = false;
687690

688691
/* do the actual work: call callback */
689692
ctx->callbacks.begin_cb(ctx, txn);
@@ -715,6 +718,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
715718
ctx->accept_writes = true;
716719
ctx->write_xid = txn->xid;
717720
ctx->write_location = txn->end_lsn; /* points to the end of the record */
721+
ctx->end_xact = true;
718722

719723
/* do the actual work: call callback */
720724
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -754,6 +758,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
754758
*/
755759
ctx->write_location = change->lsn;
756760

761+
ctx->end_xact = false;
762+
757763
ctx->callbacks.change_cb(ctx, txn, relation, change);
758764

759765
/* Pop the error context stack */
@@ -794,6 +800,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
794800
*/
795801
ctx->write_location = change->lsn;
796802

803+
ctx->end_xact = false;
804+
797805
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
798806

799807
/* Pop the error context stack */
@@ -820,6 +828,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
820828

821829
/* set output state */
822830
ctx->accept_writes = false;
831+
ctx->end_xact = false;
823832

824833
/* do the actual work: call callback */
825834
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -857,6 +866,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
857866
ctx->accept_writes = true;
858867
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
859868
ctx->write_location = message_lsn;
869+
ctx->end_xact = false;
860870

861871
/* do the actual work: call callback */
862872
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static List *LoadPublications(List *pubnames);
5353
static void publication_invalidation_cb(Datum arg, int cacheid,
5454
uint32 hashvalue);
5555
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
56+
static void update_replication_progress(LogicalDecodingContext *ctx);
5657

5758
/*
5859
* Entry in the map used to remember which relation schemas we sent.
@@ -277,7 +278,7 @@ static void
277278
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
278279
XLogRecPtr commit_lsn)
279280
{
280-
OutputPluginUpdateProgress(ctx);
281+
update_replication_progress(ctx);
281282

282283
OutputPluginPrepareWrite(ctx, true);
283284
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -385,6 +386,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
385386
RelationSyncEntry *relentry;
386387
Relation ancestor = NULL;
387388

389+
update_replication_progress(ctx);
390+
388391
if (!is_publishable_relation(relation))
389392
return;
390393

@@ -514,6 +517,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
514517
int nrelids;
515518
Oid *relids;
516519

520+
update_replication_progress(ctx);
521+
517522
old = MemoryContextSwitchTo(data->context);
518523

519524
relids = palloc0(nrelations * sizeof(Oid));
@@ -912,3 +917,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
912917
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
913918
entry->replicate_valid = false;
914919
}
920+
921+
/*
922+
* Try to update progress and send a keepalive message if too many changes were
923+
* processed.
924+
*
925+
* For a large transaction, if we don't send any change to the downstream for a
926+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
927+
* This can happen when all or most of the changes are not published.
928+
*/
929+
static void
930+
update_replication_progress(LogicalDecodingContext *ctx)
931+
{
932+
static int changes_count = 0;
933+
934+
/*
935+
* We don't want to try sending a keepalive message after processing each
936+
* change as that can have overhead. Tests revealed that there is no
937+
* noticeable overhead in doing it after continuously processing 100 or so
938+
* changes.
939+
*/
940+
#define CHANGES_THRESHOLD 100
941+
942+
/*
943+
* If we are at the end of transaction LSN, update progress tracking.
944+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
945+
* try to send a keepalive message if required.
946+
*/
947+
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
948+
{
949+
OutputPluginUpdateProgress(ctx);
950+
changes_count = 0;
951+
}
952+
}

src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ static void ProcessStandbyMessage(void);
240240
static void ProcessStandbyReplyMessage(void);
241241
static void ProcessStandbyHSFeedbackMessage(void);
242242
static void ProcessRepliesIfAny(void);
243+
static void ProcessPendingWrites(void);
243244
static void WalSndKeepalive(bool requestReply);
244245
static void WalSndKeepaliveIfNecessary(void);
245246
static void WalSndCheckTimeOut(void);
@@ -1295,6 +1296,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12951296
}
12961297

12971298
/* If we have pending write here, go to slow path */
1299+
ProcessPendingWrites();
1300+
}
1301+
1302+
/*
1303+
* Wait until there is no pending write. Also process replies from the other
1304+
* side and check timeouts during that.
1305+
*/
1306+
static void
1307+
ProcessPendingWrites(void)
1308+
{
12981309
for (;;)
12991310
{
13001311
int wakeEvents;
@@ -1354,18 +1365,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
13541365
{
13551366
static TimestampTz sendTime = 0;
13561367
TimestampTz now = GetCurrentTimestamp();
1368+
bool end_xact = ctx->end_xact;
13571369

13581370
/*
13591371
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
13601372
* avoid flooding the lag tracker when we commit frequently.
1373+
*
1374+
* We don't have a mechanism to get the ack for any LSN other than end
1375+
* xact LSN from the downstream. So, we track lag only for end of
1376+
* transaction LSN.
13611377
*/
13621378
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1363-
if (!TimestampDifferenceExceeds(sendTime, now,
1364-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1365-
return;
1379+
if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1380+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1381+
{
1382+
LagTrackerWrite(lsn, now);
1383+
sendTime = now;
1384+
}
13661385

1367-
LagTrackerWrite(lsn, now);
1368-
sendTime = now;
1386+
/*
1387+
* Try to send a keepalive if required. We don't need to try sending keep
1388+
* alive messages at the transaction end as that will be done at a later
1389+
* point in time. This is required only for large transactions where we
1390+
* don't send any changes to the downstream and the receiver can timeout
1391+
* due to that.
1392+
*/
1393+
if (!end_xact &&
1394+
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1395+
wal_sender_timeout / 2))
1396+
ProcessPendingWrites();
13691397
}
13701398

13711399
/*

src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ typedef struct LogicalDecodingContext
4949
*/
5050
bool fast_forward;
5151

52+
/* Are we processing the end LSN of a transaction? */
53+
bool end_xact;
54+
5255
OutputPluginCallbacks callbacks;
5356
OutputPluginOptions options;
5457

0 commit comments

Comments
 (0)