@@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1257
1257
Buffer * buffers ,
1258
1258
BlockNumber blockNum ,
1259
1259
int * nblocks ,
1260
- int flags )
1260
+ int flags ,
1261
+ bool allow_forwarding )
1261
1262
{
1262
1263
int actual_nblocks = * nblocks ;
1263
- int io_buffers_len = 0 ;
1264
1264
int maxcombine = 0 ;
1265
1265
1266
1266
Assert (* nblocks > 0 );
@@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1270
1270
{
1271
1271
bool found ;
1272
1272
1273
- buffers [i ] = PinBufferForBlock (operation -> rel ,
1274
- operation -> smgr ,
1275
- operation -> persistence ,
1276
- operation -> forknum ,
1277
- blockNum + i ,
1278
- operation -> strategy ,
1279
- & found );
1273
+ if (allow_forwarding && buffers [i ] != InvalidBuffer )
1274
+ {
1275
+ BufferDesc * bufHdr ;
1276
+
1277
+ /*
1278
+ * This is a buffer that was pinned by an earlier call to
1279
+ * StartReadBuffers(), but couldn't be handled in one operation at
1280
+ * that time. The operation was split, and the caller has passed
1281
+ * an already pinned buffer back to us to handle the rest of the
1282
+ * operation. It must continue at the expected block number.
1283
+ */
1284
+ Assert (BufferGetBlockNumber (buffers [i ]) == blockNum + i );
1285
+
1286
+ /*
1287
+ * It might be an already valid buffer (a hit) that followed the
1288
+ * final contiguous block of an earlier I/O (a miss) marking the
1289
+ * end of it, or a buffer that some other backend has since made
1290
+ * valid by performing the I/O for us, in which case we can handle
1291
+ * it as a hit now. It is safe to check for a BM_VALID flag with
1292
+ * a relaxed load, because we got a fresh view of it while pinning
1293
+ * it in the previous call.
1294
+ *
1295
+ * On the other hand if we don't see BM_VALID yet, it must be an
1296
+ * I/O that was split by the previous call and we need to try to
1297
+ * start a new I/O from this block. We're also racing against any
1298
+ * other backend that might start the I/O or even manage to mark
1299
+ * it BM_VALID after this check, BM_VALID after this check, but
1300
+ * StartBufferIO() will handle those cases.
1301
+ */
1302
+ if (BufferIsLocal (buffers [i ]))
1303
+ bufHdr = GetLocalBufferDescriptor (- buffers [i ] - 1 );
1304
+ else
1305
+ bufHdr = GetBufferDescriptor (buffers [i ] - 1 );
1306
+ found = pg_atomic_read_u32 (& bufHdr -> state ) & BM_VALID ;
1307
+ }
1308
+ else
1309
+ {
1310
+ buffers [i ] = PinBufferForBlock (operation -> rel ,
1311
+ operation -> smgr ,
1312
+ operation -> persistence ,
1313
+ operation -> forknum ,
1314
+ blockNum + i ,
1315
+ operation -> strategy ,
1316
+ & found );
1317
+ }
1280
1318
1281
1319
if (found )
1282
1320
{
1283
1321
/*
1284
- * Terminate the read as soon as we get a hit. It could be a
1285
- * single buffer hit, or it could be a hit that follows a readable
1286
- * range. We don't want to create more than one readable range,
1287
- * so we stop here .
1322
+ * We have a hit. If it's the first block in the requested range,
1323
+ * we can return it immediately and report that WaitReadBuffers()
1324
+ * does not need to be called. If the initial value of *nblocks
1325
+ * was larger, the caller will have to call again for the rest .
1288
1326
*/
1289
- actual_nblocks = i + 1 ;
1327
+ if (i == 0 )
1328
+ {
1329
+ * nblocks = 1 ;
1330
+ return false;
1331
+ }
1332
+
1333
+ /*
1334
+ * Otherwise we already have an I/O to perform, but this block
1335
+ * can't be included as it is already valid. Split the I/O here.
1336
+ * There may or may not be more blocks requiring I/O after this
1337
+ * one, we haven't checked, but it can't be contiguous with this
1338
+ * hit in the way. We'll leave this buffer pinned, forwarding it
1339
+ * to the next call, avoiding the need to unpin it here and re-pin
1340
+ * it in the next call.
1341
+ */
1342
+ actual_nblocks = i ;
1290
1343
break ;
1291
1344
}
1292
1345
else
1293
1346
{
1294
- /* Extend the readable range to cover this block. */
1295
- io_buffers_len ++ ;
1296
-
1297
1347
/*
1298
1348
* Check how many blocks we can cover with the same IO. The smgr
1299
1349
* implementation might e.g. be limited due to a segment boundary.
@@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1314
1364
}
1315
1365
* nblocks = actual_nblocks ;
1316
1366
1317
- if (likely (io_buffers_len == 0 ))
1318
- return false;
1319
-
1320
1367
/* Populate information needed for I/O. */
1321
1368
operation -> buffers = buffers ;
1322
1369
operation -> blocknum = blockNum ;
1323
1370
operation -> flags = flags ;
1324
1371
operation -> nblocks = actual_nblocks ;
1325
- operation -> io_buffers_len = io_buffers_len ;
1326
1372
1327
1373
if (flags & READ_BUFFERS_ISSUE_ADVICE )
1328
1374
{
@@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1337
1383
smgrprefetch (operation -> smgr ,
1338
1384
operation -> forknum ,
1339
1385
blockNum ,
1340
- operation -> io_buffers_len );
1386
+ actual_nblocks );
1341
1387
}
1342
1388
1343
1389
/* Indicate that WaitReadBuffers() should be called. */
@@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1351
1397
* actual number, which may be fewer than requested. Caller sets some of the
1352
1398
* members of operation; see struct definition.
1353
1399
*
1400
+ * The initial contents of the elements of buffers up to *nblocks should
1401
+ * either be InvalidBuffer or an already-pinned buffer that was left by an
1402
+ * preceding call to StartReadBuffers() that had to be split. On return, some
1403
+ * elements of buffers may hold pinned buffers beyond the number indicated by
1404
+ * the updated value of *nblocks. Operations are split on boundaries known to
1405
+ * smgr (eg md.c segment boundaries that require crossing into a different
1406
+ * underlying file), or when already cached blocks are found in the buffer
1407
+ * that prevent the formation of a contiguous read.
1408
+ *
1354
1409
* If false is returned, no I/O is necessary. If true is returned, one I/O
1355
1410
* has been started, and WaitReadBuffers() must be called with the same
1356
1411
* operation object before the buffers are accessed. Along with the operation
1357
1412
* object, the caller-supplied array of buffers must remain valid until
1358
- * WaitReadBuffers() is called.
1413
+ * WaitReadBuffers() is called, and any forwarded buffers must also be
1414
+ * preserved for a future call unless explicitly released.
1359
1415
*
1360
1416
* Currently the I/O is only started with optional operating system advice if
1361
1417
* requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation,
1369
1425
int * nblocks ,
1370
1426
int flags )
1371
1427
{
1372
- return StartReadBuffersImpl (operation , buffers , blockNum , nblocks , flags );
1428
+ return StartReadBuffersImpl (operation , buffers , blockNum , nblocks , flags ,
1429
+ true /* expect forwarded buffers */ );
1373
1430
}
1374
1431
1375
1432
/*
1376
1433
* Single block version of the StartReadBuffers(). This might save a few
1377
1434
* instructions when called from another translation unit, because it is
1378
1435
* specialized for nblocks == 1.
1436
+ *
1437
+ * This version does not support "forwarded" buffers: they cannot be created
1438
+ * by reading only one block, and the current contents of *buffer is ignored
1439
+ * on entry.
1379
1440
*/
1380
1441
bool
1381
1442
StartReadBuffer (ReadBuffersOperation * operation ,
@@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
1386
1447
int nblocks = 1 ;
1387
1448
bool result ;
1388
1449
1389
- result = StartReadBuffersImpl (operation , buffer , blocknum , & nblocks , flags );
1450
+ result = StartReadBuffersImpl (operation , buffer , blocknum , & nblocks , flags ,
1451
+ false /* single block, no forwarding */ );
1390
1452
Assert (nblocks == 1 ); /* single block can't be short */
1391
1453
1392
1454
return result ;
@@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
1416
1478
IOObject io_object ;
1417
1479
char persistence ;
1418
1480
1419
- /*
1420
- * Currently operations are only allowed to include a read of some range,
1421
- * with an optional extra buffer that is already pinned at the end. So
1422
- * nblocks can be at most one more than io_buffers_len.
1423
- */
1424
- Assert ((operation -> nblocks == operation -> io_buffers_len ) ||
1425
- (operation -> nblocks == operation -> io_buffers_len + 1 ));
1426
-
1427
1481
/* Find the range of the physical read we need to perform. */
1428
- nblocks = operation -> io_buffers_len ;
1429
- if (nblocks == 0 )
1430
- return ; /* nothing to do */
1431
-
1482
+ nblocks = operation -> nblocks ;
1432
1483
buffers = & operation -> buffers [0 ];
1433
1484
blocknum = operation -> blocknum ;
1434
1485
forknum = operation -> forknum ;
1435
1486
persistence = operation -> persistence ;
1436
1487
1488
+ Assert (nblocks > 0 );
1489
+ Assert (nblocks <= MAX_IO_COMBINE_LIMIT );
1490
+
1437
1491
if (persistence == RELPERSISTENCE_TEMP )
1438
1492
{
1439
1493
io_context = IOCONTEXT_NORMAL ;
0 commit comments