diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36fb9fe152cf..51c15330117c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -17,30 +17,12 @@ * pending read. When that isn't possible, the existing pending read is sent * to StartReadBuffers() so that a new one can begin to form. * - * The algorithm for controlling the look-ahead distance tries to classify the - * stream into three ideal behaviors: + * The algorithm for controlling the look-ahead distance is based on recent + * cache hits and misses: * - * A) No I/O is necessary, because the requested blocks are fully cached - * already. There is no benefit to looking ahead more than one block, so - * distance is 1. This is the default initial assumption. - * - * B) I/O is necessary, but read-ahead advice is undesirable because the - * access is sequential and we can rely on the kernel's read-ahead heuristics, - * or impossible because direct I/O is enabled, or the system doesn't support - * read-ahead advice. There is no benefit in looking ahead more than - * io_combine_limit, because in this case the only goal is larger read system - * calls. Looking further ahead would pin many buffers and perform - * speculative work for no benefit. - * - * C) I/O is necessary, it appears to be random, and this system supports - * read-ahead advice. We'll look further ahead in order to reach the - * configured level of I/O concurrency. - * - * The distance increases rapidly and decays slowly, so that it moves towards - * those levels as different I/O patterns are discovered. For example, a - * sequential scan of fully cached data doesn't bother looking ahead, but a - * sequential scan that hits a region of uncached blocks will start issuing - * increasingly wide read calls until it plateaus at io_combine_limit. + * When no I/O is necessary, there is no point in looking ahead more than one + * block. This is the default initial assumption. Otherwise rapidly increase + * the distance to try to benefit from I/O combining and I/O concurrency. * * The main data structure is a circular queue of buffers of size * max_pinned_buffers plus some extra space for technical reasons, ready to be @@ -113,9 +95,12 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -132,6 +117,7 @@ struct ReadStream /* Next expected block, for detecting sequential access. */ BlockNumber seq_blocknum; + BlockNumber seq_until_processed; /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; @@ -225,21 +211,34 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } -static void -read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +/* + * Start as much of the current pending read as we can. If we have to split it + * because of the per-backend buffer limit, or the buffer manager decides to + * split it, then the pending read is adjusted to hold the remaining portion. + * + * We can always start a read of at least size one if we have no progress yet. + * Otherwise it's possible that we can't start a read at all because of a lack + * of buffers, and then false is returned. Buffer shortages also reduce the + * distance to a level that prevents look-ahead until buffers are released. + */ +static bool +read_stream_start_pending_read(ReadStream *stream) { bool need_wait; + int requested_nblocks; int nblocks; - int flags; + int flags = 0; + int forwarded; int16 io_index; int16 overflow; int16 buffer_index; + int16 buffer_limit; /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); Assert(stream->pending_read_nblocks <= stream->io_combine_limit); - /* We had better not exceed the pin limit by starting this read. */ + /* We had better not exceed the per-stream buffer limit with this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= stream->max_pinned_buffers); @@ -249,21 +248,101 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else Assert(stream->next_buffer_index == stream->oldest_buffer_index); + /* Do we need to issue read-ahead advice? */ + if (stream->advice_enabled) + { + bool no_wait; + + /* + * We only issue advice if we won't immediately have to call + * WaitReadBuffers(). + */ + no_wait = stream->pinned_buffers > 0 || + stream->pending_read_nblocks < stream->distance; + + if (stream->pending_read_blocknum == stream->seq_blocknum) + { + /* + * Sequential: issue advice only until the WaitReadBuffers() calls + * catch up with the first advice issued for this sequential + * region, so the kernel can see sequential access. + */ + if (stream->seq_until_processed != InvalidBlockNumber && no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + else + { + /* Random jump: start tracking new region. */ + stream->seq_until_processed = stream->pending_read_blocknum; + if (no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + } + /* - * If advice hasn't been suppressed, this system supports it, and this - * isn't a strictly sequential pattern, then we'll issue advice. + * Compute the remaining portion of the per-backend buffer limit. If we + * already have some forwarded buffers, we can certainly use those. They + * are already pinned, and are mapped to the starting blocks of the pending + * read, they just don't have any I/O started yet and are not counted in + * stream->pinned_buffers. */ - if (!suppress_advice && - stream->advice_enabled && - stream->pending_read_blocknum != stream->seq_blocknum) - flags = READ_BUFFERS_ISSUE_ADVICE; + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else - flags = 0; + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffer_limit += stream->forwarded_buffers; + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; /* guarantee progress */ + + /* Does the per-backend buffer limit affect this read? */ + nblocks = stream->pending_read_nblocks; + if (buffer_limit < nblocks) + { + int16 new_distance; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* Shrink distance: no more look-ahead until buffers are released. */ + new_distance = stream->pinned_buffers + buffer_limit; + if (stream->distance > new_distance) + stream->distance = new_distance; + + /* If we've already made progress, just give up and wait for buffers. */ + if (stream->pinned_buffers > 0) + return false; + + /* A short read is required to make progress. */ + nblocks = buffer_limit; + } + + /* + * We say how many blocks we want to read, but it may be smaller on return + * if the buffer manager decides it needs a short read at its level. + */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -274,7 +353,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - /* Look-ahead distance decays, no I/O necessary (behavior A). */ + /* Look-ahead distance decays, no I/O necessary. */ if (stream->distance > 1) stream->distance--; } @@ -292,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Compute location of start of next read, without using % operator. */ buffer_index += nblocks; @@ -313,10 +411,12 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + + return true; } static void -read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +read_stream_look_ahead(ReadStream *stream) { while (stream->ios_in_progress < stream->max_ios && stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) @@ -327,8 +427,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) if (stream->pending_read_nblocks == stream->io_combine_limit) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; + read_stream_start_pending_read(stream); continue; } @@ -361,11 +460,10 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (!read_stream_start_pending_read(stream) || + stream->ios_in_progress == stream->max_ios) { - /* And we've hit the limit. Rewind, and stop here. */ + /* And we've hit a buffer or I/O limit. Rewind and wait. */ read_stream_unget_block(stream, blocknum); return; } @@ -382,15 +480,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * signaled end-of-stream, we start the read immediately. Note that the + * pending read could even exceed the distance goal, if the latter was + * reduced on buffer limit exhaustion. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream); + + /* + * There should always be something pinned when we leave this function, + * whether started by this call or not, unless we've hit the end of the + * stream. In the worst case we can always make progress one buffer at a + * time. + */ + Assert(stream->pinned_buffers > 0 || stream->distance == 0); } /* @@ -420,6 +528,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -444,6 +553,15 @@ read_stream_begin_impl(int flags, else max_ios = get_tablespace_io_concurrency(tablespace_id); + /* + * XXX Since we don't have asynchronous I/O yet, if direct I/O is enabled + * then just behave as though I/O concurrency is set to 0. Otherwise we + * would look ahead pinning many buffers for no benefit, for lack of + * advice and AIO. + */ + if (io_direct_flags & IO_DIRECT_DATA) + max_ios = 0; + /* Cap to INT16_MAX to avoid overflowing below */ max_ios = Min(max_ios, PG_INT16_MAX); @@ -475,12 +593,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit our queue to the maximum number of pins we could possibly + * ever be allowed to acquire according to the buffer manager. We may not + * really be able to use them all due to other pins held by this backend, + * but we'll check that later in read_stream_start_pending_read(). + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetSoftLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetSoftPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The soft limit might be zero on a system configured with more + * connections than buffers. We need at least one to make progress. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -546,11 +675,14 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->seq_blocknum = InvalidBlockNumber; + stream->seq_until_processed = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be * reading the whole relation. This way we start out assuming we'll be - * doing full io_combine_limit sized reads (behavior B). + * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); @@ -641,10 +773,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* - * A fast path for all-cached scans (behavior A). This is the same as the - * usual algorithm, but it is specialized for no I/O and no per-buffer - * data, so we can skip the queue management code, stay in the same buffer - * slot and use singular StartReadBuffer(). + * A fast path for all-cached scans. This is the same as the usual + * algorithm, but it is specialized for no I/O and no per-buffer data, so + * we can skip the queue management code, stay in the same buffer slot and + * use singular StartReadBuffer(). */ if (likely(stream->fast_path)) { @@ -652,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -674,6 +808,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], @@ -698,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -719,7 +860,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * space for more, but if we're just starting up we'll need to crank * the handle to get started. */ - read_stream_look_ahead(stream, true); + read_stream_look_ahead(stream); /* End of stream reached? */ if (stream->pinned_buffers == 0) @@ -758,34 +899,31 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) - { - /* Distance ramps up fast (behavior C). */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - } - else - { - /* No advice; move towards io_combine_limit (behavior B). */ - if (stream->distance > stream->io_combine_limit) - { - stream->distance--; - } - else - { - distance = stream->distance * 2; - distance = Min(distance, stream->io_combine_limit); - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - } - } + /* Look-ahead distance ramps up quickly after we do I/O. */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + + /* + * If we've caught up with the first advice issued for the current + * sequential region, cancel further advice until the next random + * jump. The kernel should be able to see the pattern now that we're + * actually making sequential preadv() calls. + */ + if (stream->advice_enabled && + stream->ios[io_index].op.blocknum == stream->seq_until_processed) + stream->seq_until_processed = InvalidBlockNumber; } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -825,11 +963,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->oldest_buffer_index = 0; /* Prepare for the next call. */ - read_stream_look_ahead(stream, false); + read_stream_look_ahead(stream); #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -865,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -874,10 +1014,31 @@ read_stream_reset(ReadStream *stream) stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; + /* There is no point in reading whatever was pending. */ + stream->pending_read_nblocks = 0; + /* Unpin anything that wasn't consumed. */ while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + if (++index == stream->queue_size) + index = 0; + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7915ed624c12..d56bff96cec4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0; static uint32 PrivateRefCountClock = 0; static PrivateRefCountEntry *ReservedRefCountEntry = NULL; +static uint32 MaxProportionalPins; + static void ReservePrivateRefCountEntry(void); static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer); static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); @@ -1255,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, - int flags) + int flags, + bool allow_forwarding) { int actual_nblocks = *nblocks; - int io_buffers_len = 0; int maxcombine = 0; Assert(*nblocks > 0); @@ -1268,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - buffers[i] = PinBufferForBlock(operation->rel, - operation->smgr, - operation->persistence, - operation->forknum, - blockNum + i, - operation->strategy, - &found); + if (allow_forwarding && buffers[i] != InvalidBuffer) + { + BufferDesc *bufHdr; + + /* + * This is a buffer that was pinned by an earlier call to + * StartReadBuffers(), but couldn't be handled in one operation at + * that time. The operation was split, and the caller has passed + * an already pinned buffer back to us to handle the rest of the + * operation. It must continue at the expected block number. + */ + Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i); + + /* + * It might be an already valid buffer (a hit) that followed the + * final contiguous block of an earlier I/O (a miss) marking the + * end of it, or a buffer that some other backend has since made + * valid by performing the I/O for us, in which case we can handle + * it as a hit now. It is safe to check for a BM_VALID flag with + * a relaxed load, because we got a fresh view of it while pinning + * it in the previous call. + * + * On the other hand if we don't see BM_VALID yet, it must be an + * I/O that was split by the previous call and we need to try to + * start a new I/O from this block. We're also racing against any + * other backend that might start the I/O or even manage to mark + * it BM_VALID after this check, BM_VALID after this check, but + * StartBufferIO() will handle those cases. + */ + if (BufferIsLocal(buffers[i])) + bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1); + else + bufHdr = GetBufferDescriptor(buffers[i] - 1); + found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + } + else + { + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + } if (found) { /* - * Terminate the read as soon as we get a hit. It could be a - * single buffer hit, or it could be a hit that follows a readable - * range. We don't want to create more than one readable range, - * so we stop here. + * We have a hit. If it's the first block in the requested range, + * we can return it immediately and report that WaitReadBuffers() + * does not need to be called. If the initial value of *nblocks + * was larger, the caller will have to call again for the rest. */ - actual_nblocks = i + 1; + if (i == 0) + { + *nblocks = 1; + return false; + } + + /* + * Otherwise we already have an I/O to perform, but this block + * can't be included as it is already valid. Split the I/O here. + * There may or may not be more blocks requiring I/O after this + * one, we haven't checked, but it can't be contiguous with this + * hit in the way. We'll leave this buffer pinned, forwarding it + * to the next call, avoiding the need to unpin it here and re-pin + * it in the next call. + */ + actual_nblocks = i; break; } else { - /* Extend the readable range to cover this block. */ - io_buffers_len++; - /* * Check how many blocks we can cover with the same IO. The smgr * implementation might e.g. be limited due to a segment boundary. @@ -1312,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, } *nblocks = actual_nblocks; - if (likely(io_buffers_len == 0)) - return false; - /* Populate information needed for I/O. */ operation->buffers = buffers; operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; - operation->io_buffers_len = io_buffers_len; if (flags & READ_BUFFERS_ISSUE_ADVICE) { @@ -1335,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, smgrprefetch(operation->smgr, operation->forknum, blockNum, - operation->io_buffers_len); + actual_nblocks); } /* Indicate that WaitReadBuffers() should be called. */ @@ -1349,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, * actual number, which may be fewer than requested. Caller sets some of the * members of operation; see struct definition. * + * The initial contents of the elements of buffers up to *nblocks should + * either be InvalidBuffer or an already-pinned buffer that was left by an + * preceding call to StartReadBuffers() that had to be split. On return, some + * elements of buffers may hold pinned buffers beyond the number indicated by + * the updated value of *nblocks. Operations are split on boundaries known to + * smgr (eg md.c segment boundaries that require crossing into a different + * underlying file), or when already cached blocks are found in the buffer + * that prevent the formation of a contiguous read. + * * If false is returned, no I/O is necessary. If true is returned, one I/O * has been started, and WaitReadBuffers() must be called with the same * operation object before the buffers are accessed. Along with the operation * object, the caller-supplied array of buffers must remain valid until - * WaitReadBuffers() is called. + * WaitReadBuffers() is called, and any forwarded buffers must also be + * preserved for a future call unless explicitly released. * * Currently the I/O is only started with optional operating system advice if * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O @@ -1367,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + true /* expect forwarded buffers */ ); } /* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. + * + * This version does not support "forwarded" buffers: they cannot be created + * by reading only one block, and the current contents of *buffer is ignored + * on entry. */ bool StartReadBuffer(ReadBuffersOperation *operation, @@ -1384,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation, int nblocks = 1; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ return result; @@ -1414,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation) IOObject io_object; char persistence; - /* - * Currently operations are only allowed to include a read of some range, - * with an optional extra buffer that is already pinned at the end. So - * nblocks can be at most one more than io_buffers_len. - */ - Assert((operation->nblocks == operation->io_buffers_len) || - (operation->nblocks == operation->io_buffers_len + 1)); - /* Find the range of the physical read we need to perform. */ - nblocks = operation->io_buffers_len; - if (nblocks == 0) - return; /* nothing to do */ - + nblocks = operation->nblocks; buffers = &operation->buffers[0]; blocknum = operation->blocknum; forknum = operation->forknum; persistence = operation->persistence; + Assert(nblocks > 0); + Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; @@ -2097,43 +2153,67 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) return buf; } +/* + * Return the maximum number of buffer than this backend should try to pin at + * once, to avoid pinning more than its fair share. This is the highest value + * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return. + * + * It's called a soft limit because nothing stops a backend from trying to + * acquire more pins than this if it needs them to make progress, but code that + * wants optional extra buffers for optimizations should respect this + * per-backend limit. + */ +uint32 +GetSoftPinLimit(void) +{ + return MaxProportionalPins; +} + +/* + * Return the maximum number of additional buffers that this backend should + * pin if it wants to stay under the per-backend soft limit, considering the + * number of buffers it has already pinned. Unlike LimitAdditionalPins(), the + * result can be zero, so the caller is expected to adjust it if required to + * make progress. + */ +uint32 +GetAdditionalPinLimit(void) +{ + uint32 estimated_pins_held; + + /* + * We get the number of "overflowed" pins for free, but don't know the + * number of pins in PrivateRefCountArray. The cost of calculating that + * exactly doesn't seem worth it, so just assume the max. + */ + estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; + + /* Is this backend already holding more than its fair share? */ + if (estimated_pins_held > MaxProportionalPins) + return 0; + + return MaxProportionalPins - estimated_pins_held; +} + /* * Limit the number of pins a batch operation may additionally acquire, to * avoid running out of pinnable buffers. * - * One additional pin is always allowed, as otherwise the operation likely - * cannot be performed at all. - * - * The number of allowed pins for a backend is computed based on - * shared_buffers and the maximum number of connections possible. That's very - * pessimistic, but outside of toy-sized shared_buffers it should allow - * sufficient pins. + * One additional pin is always allowed, on the assumption that the operation + * requires at least one to make progress. */ void LimitAdditionalPins(uint32 *additional_pins) { - uint32 max_backends; - int max_proportional_pins; + uint32 limit; if (*additional_pins <= 1) return; - max_backends = MaxBackends + NUM_AUXILIARY_PROCS; - max_proportional_pins = NBuffers / max_backends; - - /* - * Subtract the approximate number of buffers already pinned by this - * backend. We get the number of "overflowed" pins for free, but don't - * know the number of pins in PrivateRefCountArray. The cost of - * calculating that exactly doesn't seem worth it, so just assume the max. - */ - max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; - - if (max_proportional_pins <= 0) - max_proportional_pins = 1; - - if (*additional_pins > max_proportional_pins) - *additional_pins = max_proportional_pins; + limit = GetAdditionalPinLimit(); + limit = Max(limit, 1); + if (limit < *additional_pins) + *additional_pins = limit; } /* @@ -3575,6 +3655,15 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * The soft limit on the number of pins each backend should respect, based + * on shared_buffers and the maximum number of connections possible. + * That's very pessimistic, but outside toy-sized shared_buffers it should + * allow plenty of pins. LimitAdditionalPins() or GetAdditionalPinLimit() + * can be used to check the remaining balance. + */ + MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); hash_ctl.keysize = sizeof(int32); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 80b83444eb2d..5378ba843162 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -286,6 +286,22 @@ GetLocalVictimBuffer(void) return BufferDescriptorGetBuffer(bufHdr); } +/* see GetSoftPinLimit() */ +uint32 +GetSoftLocalPinLimit(void) +{ + /* Every backend has its own temporary buffers, and can pin them all. */ + return num_temp_buffers; +} + +/* see GetAdditionalPinLimit() */ +uint32 +GetAdditionalLocalPinLimit(void) +{ + Assert(NLocalPinnedBuffers <= num_temp_buffers); + return num_temp_buffers - NLocalPinnedBuffers; +} + /* see LimitAdditionalPins() */ void LimitAdditionalLocalPins(uint32 *additional_pins) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index b204e4731c18..307f36af3849 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -130,7 +130,6 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; - int16 io_buffers_len; }; typedef struct ReadBuffersOperation ReadBuffersOperation; @@ -290,6 +289,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern uint32 GetSoftPinLimit(void); +extern uint32 GetSoftLocalPinLimit(void); +extern uint32 GetAdditionalPinLimit(void); +extern uint32 GetAdditionalLocalPinLimit(void); extern void LimitAdditionalPins(uint32 *additional_pins); extern void LimitAdditionalLocalPins(uint32 *additional_pins);