Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit adb4661

Browse files
committed
Fix recovery_prefetch with low maintenance_io_concurrency.
We should process completed IOs *before* trying to start more, so that it is always possible to decode one more record when the decoded record queue is empty, even if maintenance_io_concurrency is set so low that a single earlier WAL record might have saturated the IO queue. That bug was hidden because the effect of maintenance_io_concurrency was arbitrarily clamped to be at least 2. Fix the ordering, and also remove that clamp. We need a special case for 0, which is now treated the same as recovery_prefetch=off, but otherwise the number is used directly. This allows for testing with 1, which would have made the problem obvious in simple test scenarios. Also add an explicit error message for missing contrecords. It was a bit strange that we didn't report an error already, and became a latent bug with prefetching, since the internal state that tracks aborted contrecords would not survive retrying, as revealed by 026_overwrite_contrecord.pl with this adjustment. Reporting an error prevents that. Back-patch to 15. Reported-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
1 parent 12d40d4 commit adb4661

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

src/backend/access/transam/xlogprefetcher.c

+36-18
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
int recovery_prefetch = RECOVERY_PREFETCH_TRY;
7373

7474
#ifdef USE_PREFETCH
75-
#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
75+
#define RecoveryPrefetchEnabled() \
76+
(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77+
maintenance_io_concurrency > 0)
7678
#else
7779
#define RecoveryPrefetchEnabled() false
7880
#endif
@@ -985,6 +987,7 @@ XLogRecord *
985987
XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
986988
{
987989
DecodedXLogRecord *record;
990+
XLogRecPtr replayed_up_to;
988991

989992
/*
990993
* See if it's time to reset the prefetching machinery, because a relevant
@@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10001003

10011004
if (RecoveryPrefetchEnabled())
10021005
{
1003-
max_inflight = Max(maintenance_io_concurrency, 2);
1006+
Assert(maintenance_io_concurrency > 0);
1007+
max_inflight = maintenance_io_concurrency;
10041008
max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
10051009
}
10061010
else
@@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10181022
}
10191023

10201024
/*
1021-
* Release last returned record, if there is one. We need to do this so
1022-
* that we can check for empty decode queue accurately.
1025+
* Release last returned record, if there is one, as it's now been
1026+
* replayed.
10231027
*/
1024-
XLogReleasePreviousRecord(prefetcher->reader);
1028+
replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
10251029

1026-
/* If there's nothing queued yet, then start prefetching. */
1030+
/*
1031+
* Can we drop any filters yet? If we were waiting for a relation to be
1032+
* created or extended, it is now OK to access blocks in the covered
1033+
* range.
1034+
*/
1035+
XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1036+
1037+
/*
1038+
* All IO initiated by earlier WAL is now completed. This might trigger
1039+
* further prefetching.
1040+
*/
1041+
lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1042+
1043+
/*
1044+
* If there's nothing queued yet, then start prefetching to cause at least
1045+
* one record to be queued.
1046+
*/
10271047
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048+
{
1049+
Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1050+
Assert(lrq_completed(prefetcher->streaming_read) == 0);
10281051
lrq_prefetch(prefetcher->streaming_read);
1052+
}
10291053

10301054
/* Read the next record. */
10311055
record = XLogNextRecord(prefetcher->reader, errmsg);
@@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10391063
Assert(record == prefetcher->reader->record);
10401064

10411065
/*
1042-
* Can we drop any prefetch filters yet, given the record we're about to
1043-
* return? This assumes that any records with earlier LSNs have been
1044-
* replayed, so if we were waiting for a relation to be created or
1045-
* extended, it is now OK to access blocks in the covered range.
1066+
* If maintenance_io_concurrency is set very low, we might have started
1067+
* prefetching some but not all of the blocks referenced in the record
1068+
* we're about to return. Forget about the rest of the blocks in this
1069+
* record by dropping the prefetcher's reference to it.
10461070
*/
1047-
XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
1071+
if (record == prefetcher->record)
1072+
prefetcher->record = NULL;
10481073

10491074
/*
10501075
* See if it's time to compute some statistics, because enough WAL has
@@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10531078
if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
10541079
XLogPrefetcherComputeStats(prefetcher);
10551080

1056-
/*
1057-
* The caller is about to replay this record, so we can now report that
1058-
* all IO initiated because of early WAL must be finished. This may
1059-
* trigger more readahead.
1060-
*/
1061-
lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
1062-
10631081
Assert(record == prefetcher->reader->record);
10641082

10651083
return &record->header;

src/backend/access/transam/xlogreader.c

+19-4
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
275275
}
276276

277277
/*
278-
* See if we can release the last record that was returned by
279-
* XLogNextRecord(), if any, to free up space.
278+
* Release the last record that was returned by XLogNextRecord(), if any, to
279+
* free up space. Returns the LSN past the end of the record.
280280
*/
281-
void
281+
XLogRecPtr
282282
XLogReleasePreviousRecord(XLogReaderState *state)
283283
{
284284
DecodedXLogRecord *record;
285+
XLogRecPtr next_lsn;
285286

286287
if (!state->record)
287-
return;
288+
return InvalidXLogRecPtr;
288289

289290
/*
290291
* Remove it from the decoded record queue. It must be the oldest item
291292
* decoded, decode_queue_head.
292293
*/
293294
record = state->record;
295+
next_lsn = record->next_lsn;
294296
Assert(record == state->decode_queue_head);
295297
state->record = NULL;
296298
state->decode_queue_head = record->next;
@@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
336338
state->decode_buffer_tail = state->decode_buffer;
337339
}
338340
}
341+
342+
return next_lsn;
339343
}
340344

341345
/*
@@ -907,6 +911,17 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
907911
*/
908912
state->abortedRecPtr = RecPtr;
909913
state->missingContrecPtr = targetPagePtr;
914+
915+
/*
916+
* If we got here without reporting an error, report one now so that
917+
* XLogPrefetcherReadRecord() doesn't bring us back a second time and
918+
* clobber the above state. Otherwise, the existing error takes
919+
* precedence.
920+
*/
921+
if (!state->errormsg_buf[0])
922+
report_invalid_record(state,
923+
"missing contrecord at %X/%X",
924+
LSN_FORMAT_ARGS(RecPtr));
910925
}
911926

912927
if (decoded && decoded->oversized)

src/include/access/xlogreader.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
363363
char **errormsg);
364364

365365
/* Release the previously returned record, if necessary. */
366-
extern void XLogReleasePreviousRecord(XLogReaderState *state);
366+
extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
367367

368368
/* Try to read ahead, if there is data and space. */
369369
extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,

0 commit comments

Comments
 (0)