@@ -116,6 +116,7 @@ struct ReadStream
116
116
int16 pinned_buffers ;
117
117
int16 distance ;
118
118
bool advice_enabled ;
119
+ bool temporary ;
119
120
120
121
/*
121
122
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -213,8 +214,9 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data)
213
214
}
214
215
215
216
/*
216
- * In order to deal with short reads in StartReadBuffers(), we sometimes need
217
- * to defer handling of a block until later.
217
+ * In order to deal with buffer shortages and I/O limits after short reads, we
218
+ * sometimes need to defer handling of a block we've already consumed from the
219
+ * registered callback until later.
218
220
*/
219
221
static inline void
220
222
read_stream_unget_block (ReadStream * stream , BlockNumber blocknum )
@@ -225,7 +227,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
225
227
stream -> buffered_blocknum = blocknum ;
226
228
}
227
229
228
- static void
230
+ /*
231
+ * Start as much of the current pending read as we can. If we have to split it
232
+ * because of the per-backend buffer limit, or the buffer manager decides to
233
+ * split it, then the pending read is adjusted to hold the remaining portion.
234
+ *
235
+ * We can always start a read of at least size one if we have no progress yet.
236
+ * Otherwise it's possible that we can't start a read at all because of a lack
237
+ * of buffers, and then false is returned. Buffer shortages also reduce the
238
+ * distance to a level that prevents look-ahead until buffers are released.
239
+ */
240
+ static bool
229
241
read_stream_start_pending_read (ReadStream * stream , bool suppress_advice )
230
242
{
231
243
bool need_wait ;
@@ -234,12 +246,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234
246
int16 io_index ;
235
247
int16 overflow ;
236
248
int16 buffer_index ;
249
+ int16 buffer_limit ;
237
250
238
251
/* This should only be called with a pending read. */
239
252
Assert (stream -> pending_read_nblocks > 0 );
240
253
Assert (stream -> pending_read_nblocks <= stream -> io_combine_limit );
241
254
242
- /* We had better not exceed the pin limit by starting this read. */
255
+ /* We had better not exceed the per-stream buffer limit with this read. */
243
256
Assert (stream -> pinned_buffers + stream -> pending_read_nblocks <=
244
257
stream -> max_pinned_buffers );
245
258
@@ -260,10 +273,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
260
273
else
261
274
flags = 0 ;
262
275
263
- /* We say how many blocks we want to read, but may be smaller on return. */
276
+ /* How many more buffers is this backend allowed? */
277
+ if (stream -> temporary )
278
+ buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
279
+ else
280
+ buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
281
+ if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
282
+ buffer_limit = 1 ; /* guarantee progress */
283
+
284
+ /* Does the per-backend limit affect this read? */
285
+ nblocks = stream -> pending_read_nblocks ;
286
+ if (buffer_limit < nblocks )
287
+ {
288
+ int16 new_distance ;
289
+
290
+ /* Shrink distance: no more look-ahead until buffers are released. */
291
+ new_distance = stream -> pinned_buffers + buffer_limit ;
292
+ if (stream -> distance > new_distance )
293
+ stream -> distance = new_distance ;
294
+
295
+ /* Unless we have nothing to give the consumer, stop here. */
296
+ if (stream -> pinned_buffers > 0 )
297
+ return false;
298
+
299
+ /* A short read is required to make progress. */
300
+ nblocks = buffer_limit ;
301
+ }
302
+
303
+ /*
304
+ * We say how many blocks we want to read, but it may be smaller on return
305
+ * if the buffer manager decides to shorten the read.
306
+ */
264
307
buffer_index = stream -> next_buffer_index ;
265
308
io_index = stream -> next_io_index ;
266
- nblocks = stream -> pending_read_nblocks ;
267
309
need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
268
310
& stream -> buffers [buffer_index ],
269
311
stream -> pending_read_blocknum ,
@@ -313,6 +355,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
313
355
/* Adjust the pending read to cover the remaining portion, if any. */
314
356
stream -> pending_read_blocknum += nblocks ;
315
357
stream -> pending_read_nblocks -= nblocks ;
358
+
359
+ return true;
316
360
}
317
361
318
362
static void
@@ -361,14 +405,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
361
405
/* We have to start the pending read before we can build another. */
362
406
while (stream -> pending_read_nblocks > 0 )
363
407
{
364
- read_stream_start_pending_read (stream , suppress_advice );
365
- suppress_advice = false;
366
- if (stream -> ios_in_progress == stream -> max_ios )
408
+ if (!read_stream_start_pending_read (stream , suppress_advice ) ||
409
+ stream -> ios_in_progress == stream -> max_ios )
367
410
{
368
- /* And we 've hit the limit. Rewind, and stop here. */
411
+ /* We 've hit the buffer or I/O limit. Rewind and stop here. */
369
412
read_stream_unget_block (stream , blocknum );
370
413
return ;
371
414
}
415
+
416
+ suppress_advice = false;
372
417
}
373
418
374
419
/* This is the start of a new pending read. */
@@ -382,15 +427,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
382
427
* io_combine_limit size once more buffers have been consumed. However,
383
428
* if we've already reached io_combine_limit, or we've reached the
384
429
* distance limit and there isn't anything pinned yet, or the callback has
385
- * signaled end-of-stream, we start the read immediately.
430
+ * signaled end-of-stream, we start the read immediately. Note that the
431
+ * pending read can exceed the distance goal, if the latter was reduced
432
+ * after hitting the per-backend buffer limit.
386
433
*/
387
434
if (stream -> pending_read_nblocks > 0 &&
388
435
(stream -> pending_read_nblocks == stream -> io_combine_limit ||
389
- (stream -> pending_read_nblocks = = stream -> distance &&
436
+ (stream -> pending_read_nblocks > = stream -> distance &&
390
437
stream -> pinned_buffers == 0 ) ||
391
438
stream -> distance == 0 ) &&
392
439
stream -> ios_in_progress < stream -> max_ios )
393
440
read_stream_start_pending_read (stream , suppress_advice );
441
+
442
+ /*
443
+ * There should always be something pinned when we leave this function,
444
+ * whether started by this call or not, unless we've hit the end of the
445
+ * stream. In the worst case we can always make progress one buffer at a
446
+ * time.
447
+ */
448
+ Assert (stream -> pinned_buffers > 0 || stream -> distance == 0 );
394
449
}
395
450
396
451
/*
@@ -420,6 +475,7 @@ read_stream_begin_impl(int flags,
420
475
int max_ios ;
421
476
int strategy_pin_limit ;
422
477
uint32 max_pinned_buffers ;
478
+ uint32 max_possible_buffer_limit ;
423
479
Oid tablespace_id ;
424
480
425
481
/*
@@ -475,12 +531,23 @@ read_stream_begin_impl(int flags,
475
531
strategy_pin_limit = GetAccessStrategyPinLimit (strategy );
476
532
max_pinned_buffers = Min (strategy_pin_limit , max_pinned_buffers );
477
533
478
- /* Don't allow this backend to pin more than its share of buffers. */
534
+ /*
535
+ * Also limit our queue to the maximum number of pins we could ever be
536
+ * allowed to acquire according to the buffer manager. We may not really
537
+ * be able to use them all due to other pins held by this backend, but
538
+ * we'll check that later in read_stream_start_pending_read().
539
+ */
479
540
if (SmgrIsTemp (smgr ))
480
- LimitAdditionalLocalPins ( & max_pinned_buffers );
541
+ max_possible_buffer_limit = GetLocalPinLimit ( );
481
542
else
482
- LimitAdditionalPins (& max_pinned_buffers );
483
- Assert (max_pinned_buffers > 0 );
543
+ max_possible_buffer_limit = GetPinLimit ();
544
+ max_pinned_buffers = Min (max_pinned_buffers , max_possible_buffer_limit );
545
+
546
+ /*
547
+ * The limit might be zero on a system configured with too few buffers for
548
+ * the number of connections. We need at least one to make progress.
549
+ */
550
+ max_pinned_buffers = Max (1 , max_pinned_buffers );
484
551
485
552
/*
486
553
* We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +613,7 @@ read_stream_begin_impl(int flags,
546
613
stream -> callback = callback ;
547
614
stream -> callback_private_data = callback_private_data ;
548
615
stream -> buffered_blocknum = InvalidBlockNumber ;
616
+ stream -> temporary = SmgrIsTemp (smgr );
549
617
550
618
/*
551
619
* Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +742,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
674
742
* arbitrary I/O entry (they're all free). We don't have to
675
743
* adjust pinned_buffers because we're transferring one to caller
676
744
* but pinning one more.
745
+ *
746
+ * In the fast path we don't need to check the pin limit. We're
747
+ * always allowed at least one pin so that progress can be made,
748
+ * and that's all we need here. Although two pins are momentarily
749
+ * held at the same time, the model used here is that the stream
750
+ * holds only one, and the other now belongs to the caller.
677
751
*/
678
752
if (likely (!StartReadBuffer (& stream -> ios [0 ].op ,
679
753
& stream -> buffers [oldest_buffer_index ],
0 commit comments