@@ -764,15 +764,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
764
764
/* make sure we have enough WAL available */
765
765
flushptr = WalSndWaitForWal (targetPagePtr + reqLen );
766
766
767
- /* more than one block available */
768
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr )
769
- count = XLOG_BLCKSZ ;
770
- /* not enough WAL synced, that can happen during shutdown */
771
- else if (targetPagePtr + reqLen > flushptr )
767
+ /* fail if not (implies we are going to shut down) */
768
+ if (flushptr < targetPagePtr + reqLen )
772
769
return -1 ;
773
- /* part of the page available */
770
+
771
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr )
772
+ count = XLOG_BLCKSZ ; /* more than one block available */
774
773
else
775
- count = flushptr - targetPagePtr ;
774
+ count = flushptr - targetPagePtr ; /* part of the page available */
776
775
777
776
/* now actually read the data, we know it's there */
778
777
XLogRead (cur_page , targetPagePtr , XLOG_BLCKSZ );
@@ -1158,7 +1157,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1158
1157
}
1159
1158
1160
1159
/*
1161
- * Wait till WAL < loc is flushed to disk so it can be safely read.
1160
+ * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1161
+ *
1162
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1163
+ * if we detect a shutdown request (either from postmaster or client)
1164
+ * we will return early, so caller must always check.
1162
1165
*/
1163
1166
static XLogRecPtr
1164
1167
WalSndWaitForWal (XLogRecPtr loc )
@@ -1225,9 +1228,7 @@ WalSndWaitForWal(XLogRecPtr loc)
1225
1228
RecentFlushPtr = GetXLogReplayRecPtr (NULL );
1226
1229
1227
1230
/*
1228
- * If postmaster asked us to stop, don't wait here anymore. This will
1229
- * cause the xlogreader to return without reading a full record, which
1230
- * is the fastest way to reach the mainloop which then can quit.
1231
+ * If postmaster asked us to stop, don't wait anymore.
1231
1232
*
1232
1233
* It's important to do this check after the recomputation of
1233
1234
* RecentFlushPtr, so we can send all remaining data before shutting
@@ -1258,14 +1259,20 @@ WalSndWaitForWal(XLogRecPtr loc)
1258
1259
WalSndCaughtUp = true;
1259
1260
1260
1261
/*
1261
- * Try to flush pending output to the client. Also wait for the socket
1262
- * becoming writable, if there's still pending output after an attempt
1263
- * to flush. Otherwise we might just sit on output data while waiting
1264
- * for new WAL being generated.
1262
+ * Try to flush any pending output to the client.
1265
1263
*/
1266
1264
if (pq_flush_if_writable () != 0 )
1267
1265
WalSndShutdown ();
1268
1266
1267
+ /*
1268
+ * If we have received CopyDone from the client, sent CopyDone
1269
+ * ourselves, and the output buffer is empty, it's time to exit
1270
+ * streaming, so fail the current WAL fetch request.
1271
+ */
1272
+ if (streamingDoneReceiving && streamingDoneSending &&
1273
+ !pq_is_send_pending ())
1274
+ break ;
1275
+
1269
1276
now = GetCurrentTimestamp ();
1270
1277
1271
1278
/* die if timeout was reached */
@@ -1274,6 +1281,13 @@ WalSndWaitForWal(XLogRecPtr loc)
1274
1281
/* Send keepalive if the time has come */
1275
1282
WalSndKeepaliveIfNecessary (now );
1276
1283
1284
+ /*
1285
+ * Sleep until something happens or we time out. Also wait for the
1286
+ * socket becoming writable, if there's still pending output.
1287
+ * Otherwise we might sit on sendable output data while waiting for
1288
+ * new WAL to be generated. (But if we have nothing to send, we don't
1289
+ * want to wake on socket-writable.)
1290
+ */
1277
1291
sleeptime = WalSndComputeSleeptime (now );
1278
1292
1279
1293
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1282,7 +1296,6 @@ WalSndWaitForWal(XLogRecPtr loc)
1282
1296
if (pq_is_send_pending ())
1283
1297
wakeEvents |= WL_SOCKET_WRITEABLE ;
1284
1298
1285
- /* Sleep until something happens or we time out */
1286
1299
WaitLatchOrSocket (MyLatch , wakeEvents ,
1287
1300
MyProcPort -> sock , sleeptime );
1288
1301
}
@@ -1870,7 +1883,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
1870
1883
* ourselves, and the output buffer is empty, it's time to exit
1871
1884
* streaming.
1872
1885
*/
1873
- if (!pq_is_send_pending () && streamingDoneSending && streamingDoneReceiving )
1886
+ if (streamingDoneReceiving && streamingDoneSending &&
1887
+ !pq_is_send_pending ())
1874
1888
break ;
1875
1889
1876
1890
/*
0 commit comments