Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c28e0b1

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not to impose too high a per message overhead. The fast path skips checks for interrupts and timeouts. However, the existing coding failed to consider the fact that a transaction with a large number of changes may take a very long time to be processed and sent to the client. This causes the walsender to ignore interrupts for potentially a long time and more importantly it will result in the walsender being killed due to timeout at the end of such a transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when the last keepalive check happened less than half the walsender timeout ago. Otherwise the slower code path will be taken. Backpatched to 9.4 Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, Craig Ringer and Robert Haas. Discussion: https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parent c26d46f commit c28e0b1

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

src/backend/replication/walsender.c

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,9 @@ static void
10761076
WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10771077
bool last_write)
10781078
{
1079+
TimestampTz now;
1080+
int64 now_int;
1081+
10791082
/* output previously gathered data in a CopyData packet */
10801083
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
10811084

@@ -1085,23 +1088,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10851088
* several releases by streaming physical replication.
10861089
*/
10871090
resetStringInfo(&tmpbuf);
1088-
pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1091+
now_int = GetCurrentIntegerTimestamp();
1092+
now = IntegerTimestampToTimestampTz(now_int);
1093+
pq_sendint64(&tmpbuf, now_int);
10891094
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
10901095
tmpbuf.data, sizeof(int64));
10911096

1092-
/* fast path */
1097+
CHECK_FOR_INTERRUPTS();
1098+
10931099
/* Try to flush pending output to the client */
10941100
if (pq_flush_if_writable() != 0)
10951101
WalSndShutdown();
10961102

1097-
if (!pq_is_send_pending())
1103+
/* Try taking fast path unless we get too close to walsender timeout. */
1104+
if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1105+
wal_sender_timeout / 2) &&
1106+
!pq_is_send_pending())
1107+
{
10981108
return;
1109+
}
10991110

1111+
/* If we have pending write here, go to slow path */
11001112
for (;;)
11011113
{
11021114
int wakeEvents;
11031115
long sleeptime;
1104-
TimestampTz now;
1116+
1117+
/* Check for input from the client */
1118+
ProcessRepliesIfAny();
1119+
1120+
now = GetCurrentTimestamp();
1121+
1122+
/* die if timeout was reached */
1123+
WalSndCheckTimeOut(now);
1124+
1125+
/* Send keepalive if the time has come */
1126+
WalSndKeepaliveIfNecessary(now);
1127+
1128+
if (!pq_is_send_pending())
1129+
break;
1130+
1131+
sleeptime = WalSndComputeSleeptime(now);
1132+
1133+
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1134+
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1135+
1136+
/* Sleep until something happens or we time out */
1137+
WaitLatchOrSocket(MyLatch, wakeEvents,
1138+
MyProcPort->sock, sleeptime);
11051139

11061140
/*
11071141
* Emergency bailout if postmaster has died. This is to avoid the
@@ -1123,33 +1157,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11231157
SyncRepInitConfig();
11241158
}
11251159

1126-
/* Check for input from the client */
1127-
ProcessRepliesIfAny();
1128-
11291160
/* Try to flush pending output to the client */
11301161
if (pq_flush_if_writable() != 0)
11311162
WalSndShutdown();
1132-
1133-
/* If we finished clearing the buffered data, we're done here. */
1134-
if (!pq_is_send_pending())
1135-
break;
1136-
1137-
now = GetCurrentTimestamp();
1138-
1139-
/* die if timeout was reached */
1140-
WalSndCheckTimeOut(now);
1141-
1142-
/* Send keepalive if the time has come */
1143-
WalSndKeepaliveIfNecessary(now);
1144-
1145-
sleeptime = WalSndComputeSleeptime(now);
1146-
1147-
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1148-
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1149-
1150-
/* Sleep until something happens or we time out */
1151-
WaitLatchOrSocket(MyLatch, wakeEvents,
1152-
MyProcPort->sock, sleeptime);
11531163
}
11541164

11551165
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)