@@ -95,8 +95,10 @@ struct ReadStream
95
95
int16 ios_in_progress ;
96
96
int16 queue_size ;
97
97
int16 max_pinned_buffers ;
98
+ int16 forwarded_buffers ;
98
99
int16 pinned_buffers ;
99
100
int16 distance ;
101
+ int16 initialized_buffers ;
100
102
bool advice_enabled ;
101
103
bool temporary ;
102
104
@@ -223,8 +225,10 @@ static bool
223
225
read_stream_start_pending_read (ReadStream * stream )
224
226
{
225
227
bool need_wait ;
228
+ int requested_nblocks ;
226
229
int nblocks ;
227
230
int flags = 0 ;
231
+ int forwarded ;
228
232
int16 io_index ;
229
233
int16 overflow ;
230
234
int16 buffer_index ;
@@ -275,11 +279,19 @@ read_stream_start_pending_read(ReadStream *stream)
275
279
}
276
280
}
277
281
278
- /* Compute the remaining portion of the per-backend buffer limit. */
282
+ /*
283
+ * Compute the remaining portion of the per-backend buffer limit. If we
284
+ * already have some forwarded buffers, we can certainly use those. They
285
+ * are already pinned, and are mapped to the starting blocks of the pending
286
+ * read, they just don't have any I/O started yet and are not counted in
287
+ * stream->pinned_buffers.
288
+ */
279
289
if (stream -> temporary )
280
290
buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
281
291
else
282
292
buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
293
+ Assert (stream -> forwarded_buffers <= stream -> pending_read_nblocks );
294
+ buffer_limit += stream -> forwarded_buffers ;
283
295
if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
284
296
buffer_limit = 1 ; /* guarantee progress */
285
297
@@ -306,8 +318,31 @@ read_stream_start_pending_read(ReadStream *stream)
306
318
* We say how many blocks we want to read, but it may be smaller on return
307
319
* if the buffer manager decides it needs a short read at its level.
308
320
*/
321
+ requested_nblocks = Min (buffer_limit , stream -> pending_read_nblocks );
322
+ nblocks = requested_nblocks ;
309
323
buffer_index = stream -> next_buffer_index ;
310
324
io_index = stream -> next_io_index ;
325
+
326
+ /*
327
+ * The first time around the queue we initialize it as we go, including
328
+ * the overflow zone, because otherwise the entries would appear as
329
+ * forwarded buffers. This avoids initializing the whole queue up front
330
+ * in cases where it is large but we don't ever use it due to the
331
+ * all-cached fast path or small scans.
332
+ */
333
+ while (stream -> initialized_buffers < buffer_index + nblocks )
334
+ stream -> buffers [stream -> initialized_buffers ++ ] = InvalidBuffer ;
335
+
336
+ /*
337
+ * Start the I/O. Any buffers that are not InvalidBuffer will be
338
+ * interpreted as already pinned, forwarded by an earlier call to
339
+ * StartReadBuffers(), and must map to the expected blocks. The nblocks
340
+ * value may be smaller on return indicating the size of the I/O that
341
+ * could be started. Buffers beyond the output nblocks number may also
342
+ * have been pinned without starting I/O due to various edge cases. In
343
+ * that case we'll just leave them in the queue ahead of us, "forwarded"
344
+ * to the next call, avoiding the need to unpin/repin.
345
+ */
311
346
need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
312
347
& stream -> buffers [buffer_index ],
313
348
stream -> pending_read_blocknum ,
@@ -336,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream)
336
371
stream -> seq_blocknum = stream -> pending_read_blocknum + nblocks ;
337
372
}
338
373
374
+ /*
375
+ * How many pins were acquired but forwarded to the next call? These need
376
+ * to be passed to the next StartReadBuffers() call, or released if the
377
+ * stream ends early. We need the number for accounting purposes, since
378
+ * they are not counted in stream->pinned_buffers but we already hold
379
+ * them.
380
+ */
381
+ forwarded = 0 ;
382
+ while (nblocks + forwarded < requested_nblocks &&
383
+ stream -> buffers [buffer_index + nblocks + forwarded ] != InvalidBuffer )
384
+ forwarded ++ ;
385
+ stream -> forwarded_buffers = forwarded ;
386
+
339
387
/*
340
388
* We gave a contiguous range of buffer space to StartReadBuffers(), but
341
- * we want it to wrap around at queue_size. Slide overflowing buffers to
342
- * the front of the array.
389
+ * we want it to wrap around at queue_size. Copy overflowing buffers to
390
+ * the front of the array where they'll be consumed, but also leave a copy
391
+ * in the overflow zone which the I/O operation has a pointer to (it needs
392
+ * a contiguous array). Both copies will be cleared when the buffers are
393
+ * handed to the consumer.
343
394
*/
344
- overflow = (buffer_index + nblocks ) - stream -> queue_size ;
395
+ overflow = (buffer_index + nblocks + forwarded ) - stream -> queue_size ;
345
396
if (overflow > 0 )
346
- memmove (& stream -> buffers [0 ],
347
- & stream -> buffers [stream -> queue_size ],
348
- sizeof (stream -> buffers [0 ]) * overflow );
397
+ {
398
+ Assert (overflow < stream -> queue_size ); /* can't overlap */
399
+ memcpy (& stream -> buffers [0 ],
400
+ & stream -> buffers [stream -> queue_size ],
401
+ sizeof (stream -> buffers [0 ]) * overflow );
402
+ }
349
403
350
404
/* Compute location of start of next read, without using % operator. */
351
405
buffer_index += nblocks ;
@@ -730,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
730
784
731
785
/* Fast path assumptions. */
732
786
Assert (stream -> ios_in_progress == 0 );
787
+ Assert (stream -> forwarded_buffers == 0 );
733
788
Assert (stream -> pinned_buffers == 1 );
734
789
Assert (stream -> distance == 1 );
735
790
Assert (stream -> pending_read_nblocks == 0 );
736
791
Assert (stream -> per_buffer_data_size == 0 );
792
+ Assert (stream -> initialized_buffers > stream -> oldest_buffer_index );
737
793
738
794
/* We're going to return the buffer we pinned last time. */
739
795
oldest_buffer_index = stream -> oldest_buffer_index ;
@@ -782,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
782
838
stream -> distance = 0 ;
783
839
stream -> oldest_buffer_index = stream -> next_buffer_index ;
784
840
stream -> pinned_buffers = 0 ;
841
+ stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
785
842
}
786
843
787
844
stream -> fast_path = false;
@@ -858,10 +915,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
858
915
stream -> seq_until_processed = InvalidBlockNumber ;
859
916
}
860
917
861
- #ifdef CLOBBER_FREED_MEMORY
862
- /* Clobber old buffer for debugging purposes. */
918
+ /*
919
+ * We must zap this queue entry, or else it would appear as a forwarded
920
+ * buffer. If it's potentially in the overflow zone (ie it wrapped around
921
+ * the queue), also zap that copy.
922
+ */
863
923
stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
864
- #endif
924
+ if (oldest_buffer_index < io_combine_limit - 1 )
925
+ stream -> buffers [stream -> queue_size + oldest_buffer_index ] =
926
+ InvalidBuffer ;
865
927
866
928
#if defined(CLOBBER_FREED_MEMORY ) || defined(USE_VALGRIND )
867
929
@@ -906,6 +968,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
906
968
#ifndef READ_STREAM_DISABLE_FAST_PATH
907
969
/* See if we can take the fast path for all-cached scans next time. */
908
970
if (stream -> ios_in_progress == 0 &&
971
+ stream -> forwarded_buffers == 0 &&
909
972
stream -> pinned_buffers == 1 &&
910
973
stream -> distance == 1 &&
911
974
stream -> pending_read_nblocks == 0 &&
@@ -941,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
941
1004
void
942
1005
read_stream_reset (ReadStream * stream )
943
1006
{
1007
+ int16 index ;
944
1008
Buffer buffer ;
945
1009
946
1010
/* Stop looking ahead. */
@@ -957,6 +1021,24 @@ read_stream_reset(ReadStream *stream)
957
1021
while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
958
1022
ReleaseBuffer (buffer );
959
1023
1024
+ /* Unpin any unused forwarded buffers. */
1025
+ index = stream -> next_buffer_index ;
1026
+ while (index < stream -> initialized_buffers &&
1027
+ (buffer = stream -> buffers [index ]) != InvalidBuffer )
1028
+ {
1029
+ Assert (stream -> forwarded_buffers > 0 );
1030
+ stream -> forwarded_buffers -- ;
1031
+ ReleaseBuffer (buffer );
1032
+
1033
+ stream -> buffers [index ] = InvalidBuffer ;
1034
+ if (index < io_combine_limit - 1 )
1035
+ stream -> buffers [stream -> queue_size + index ] = InvalidBuffer ;
1036
+
1037
+ if (++ index == stream -> queue_size )
1038
+ index = 0 ;
1039
+ }
1040
+
1041
+ Assert (stream -> forwarded_buffers == 0 );
960
1042
Assert (stream -> pinned_buffers == 0 );
961
1043
Assert (stream -> ios_in_progress == 0 );
962
1044
0 commit comments