@@ -5536,7 +5536,15 @@ LockBuffer(Buffer buffer, int mode)
5536
5536
else if (mode == BUFFER_LOCK_SHARE )
5537
5537
LWLockAcquire (BufferDescriptorGetContentLock (buf ), LW_SHARED );
5538
5538
else if (mode == BUFFER_LOCK_EXCLUSIVE )
5539
+ {
5540
+ /*
5541
+ * FIXME: Wait for AIO writes, otherwise there would be a risk of
5542
+ * deadlock. This isn't entirely trivial to do in a race-free way, IO
5543
+ * could be started between us checking whether there is IO and
5544
+ * enqueueing ourselves for the lock.
5545
+ */
5539
5546
LWLockAcquire (BufferDescriptorGetContentLock (buf ), LW_EXCLUSIVE );
5547
+ }
5540
5548
else
5541
5549
elog (ERROR , "unrecognized buffer lock mode: %d" , mode );
5542
5550
}
@@ -5551,6 +5559,19 @@ ConditionalLockBuffer(Buffer buffer)
5551
5559
{
5552
5560
BufferDesc * buf ;
5553
5561
5562
+ /*
5563
+ * FIXME: Wait for AIO writes. Some code does not deal well
5564
+ * ConditionalLockBuffer() continuously failing, e.g.
5565
+ * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by
5566
+ * just making this function always fail and running the regression
5567
+ * tests). While that code could be fixed, it'd be hard to find all
5568
+ * problematic places.
5569
+ *
5570
+ * It would be OK to wait for the IO as waiting for IO completion does not
5571
+ * need to wait for any locks that could lead to an undetected deadlock or
5572
+ * such.
5573
+ */
5574
+
5554
5575
Assert (BufferIsPinned (buffer ));
5555
5576
if (BufferIsLocal (buffer ))
5556
5577
return true; /* act as though we got it */
@@ -5614,10 +5635,8 @@ LockBufferForCleanup(Buffer buffer)
5614
5635
CheckBufferIsPinnedOnce (buffer );
5615
5636
5616
5637
/*
5617
- * We do not yet need to be worried about in-progress AIOs holding a pin,
5618
- * as we, so far, only support doing reads via AIO and this function can
5619
- * only be called once the buffer is valid (i.e. no read can be in
5620
- * flight).
5638
+ * FIXME: See AIO related comments in LockBuffer() and
5639
+ * ConditionalLockBuffer()
5621
5640
*/
5622
5641
5623
5642
/* Nobody else to wait for */
@@ -5630,6 +5649,11 @@ LockBufferForCleanup(Buffer buffer)
5630
5649
{
5631
5650
uint32 buf_state ;
5632
5651
5652
+ /*
5653
+ * FIXME: LockBuffer()'s handling of in-progress writes (once
5654
+ * implemented) should suffice to deal with deadlock risk.
5655
+ */
5656
+
5633
5657
/* Try to acquire lock */
5634
5658
LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
5635
5659
buf_state = LockBufHdr (bufHdr );
@@ -5777,7 +5801,13 @@ ConditionalLockBufferForCleanup(Buffer buffer)
5777
5801
5778
5802
Assert (BufferIsValid (buffer ));
5779
5803
5780
- /* see AIO related comment in LockBufferForCleanup() */
5804
+ /*
5805
+ * FIXME: Should wait for IO for the same reason as in
5806
+ * ConditionalLockBuffer(). Needs to happen before the
5807
+ * ConditionalLockBuffer() call below, as we'd never reach the
5808
+ * ConditionalLockBuffer() call due the buffer pin held for the duration
5809
+ * of the IO.
5810
+ */
5781
5811
5782
5812
if (BufferIsLocal (buffer ))
5783
5813
{
@@ -5834,7 +5864,10 @@ IsBufferCleanupOK(Buffer buffer)
5834
5864
5835
5865
Assert (BufferIsValid (buffer ));
5836
5866
5837
- /* see AIO related comment in LockBufferForCleanup() */
5867
+ /*
5868
+ * FIXME: See AIO related comments in LockBuffer() and
5869
+ * ConditionalLockBuffer()
5870
+ */
5838
5871
5839
5872
if (BufferIsLocal (buffer ))
5840
5873
{
@@ -7140,12 +7173,129 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *td,
7140
7173
affected_count > 1 ? errhint_internal (hint_mult , affected_count - 1 ) : 0 );
7141
7174
}
7142
7175
7176
+ /*
7177
+ * Helper for AIO writev completion callbacks, supporting both shared and temp
7178
+ * buffers. Gets called once for each buffer in a multi-page write.
7179
+ */
7180
+ static pg_attribute_always_inline PgAioResult
7181
+ buffer_writev_complete_one (uint8 buf_off , Buffer buffer , uint8 flags ,
7182
+ bool failed , bool is_temp )
7183
+ {
7184
+ BufferDesc * buf_hdr = is_temp ?
7185
+ GetLocalBufferDescriptor (- buffer - 1 )
7186
+ : GetBufferDescriptor (buffer - 1 );
7187
+ PgAioResult result = {.status = PGAIO_RS_OK };
7188
+ bool clear_dirty ;
7189
+ uint32 set_flag_bits ;
7190
+
7191
+ #ifdef USE_ASSERT_CHECKING
7192
+ {
7193
+ uint32 buf_state = pg_atomic_read_u32 (& buf_hdr -> state );
7194
+
7195
+ Assert (buf_state & BM_VALID );
7196
+ Assert (buf_state & BM_TAG_VALID );
7197
+ /* temp buffers don't use BM_IO_IN_PROGRESS */
7198
+ if (!is_temp )
7199
+ Assert (buf_state & BM_IO_IN_PROGRESS );
7200
+ Assert (buf_state & BM_DIRTY );
7201
+ }
7202
+ #endif
7203
+
7204
+ clear_dirty = failed ? false : true;
7205
+ set_flag_bits = failed ? BM_IO_ERROR : 0 ;
7206
+
7207
+ if (is_temp )
7208
+ TerminateLocalBufferIO (buf_hdr , clear_dirty , set_flag_bits , true);
7209
+ else
7210
+ TerminateBufferIO (buf_hdr , clear_dirty , set_flag_bits , false, true);
7211
+
7212
+ /*
7213
+ * The initiator of IO is not managing the lock (i.e. we called
7214
+ * LWLockDisown()), we are.
7215
+ */
7216
+ if (!is_temp )
7217
+ LWLockReleaseDisowned (BufferDescriptorGetContentLock (buf_hdr ),
7218
+ LW_SHARED );
7219
+
7220
+ /* FIXME: tracepoint */
7221
+
7222
+ return result ;
7223
+ }
7224
+
7225
+ /*
7226
+ * Perform completion handling of a single AIO write. This write may cover
7227
+ * multiple blocks / buffers.
7228
+ *
7229
+ * Shared between shared and local buffers, to reduce code duplication.
7230
+ */
7231
+ static pg_attribute_always_inline PgAioResult
7232
+ buffer_writev_complete (PgAioHandle * ioh , PgAioResult prior_result ,
7233
+ uint8 cb_data , bool is_temp )
7234
+ {
7235
+ PgAioResult result = prior_result ;
7236
+ PgAioTargetData * td = pgaio_io_get_target_data (ioh );
7237
+ uint64 * io_data ;
7238
+ uint8 handle_data_len ;
7239
+
7240
+ if (is_temp )
7241
+ {
7242
+ Assert (td -> smgr .is_temp );
7243
+ Assert (pgaio_io_get_owner (ioh ) == MyProcNumber );
7244
+ }
7245
+ else
7246
+ Assert (!td -> smgr .is_temp );
7247
+
7248
+ /*
7249
+ * Iterate over all the buffers affected by this IO and call appropriate
7250
+ * per-buffer completion function for each buffer.
7251
+ */
7252
+ io_data = pgaio_io_get_handle_data (ioh , & handle_data_len );
7253
+ for (uint8 buf_off = 0 ; buf_off < handle_data_len ; buf_off ++ )
7254
+ {
7255
+ Buffer buf = io_data [buf_off ];
7256
+ PgAioResult buf_result ;
7257
+ bool failed ;
7258
+
7259
+ Assert (BufferIsValid (buf ));
7260
+
7261
+ /*
7262
+ * If the entire failed on a lower-level, each buffer needs to be
7263
+ * marked as failed. In case of a partial read, some buffers may be
7264
+ * ok.
7265
+ */
7266
+ failed =
7267
+ prior_result .status == PGAIO_RS_ERROR
7268
+ || prior_result .result <= buf_off ;
7269
+
7270
+ buf_result = buffer_writev_complete_one (buf_off , buf , cb_data , failed ,
7271
+ is_temp );
7272
+
7273
+ /*
7274
+ * If there wasn't any prior error and the IO for this page failed in
7275
+ * some form, set the whole IO's to the page's result.
7276
+ */
7277
+ if (result .status != PGAIO_RS_ERROR && buf_result .status != PGAIO_RS_OK )
7278
+ {
7279
+ result = buf_result ;
7280
+ pgaio_result_report (result , td , LOG );
7281
+ }
7282
+ }
7283
+
7284
+ return result ;
7285
+ }
7286
+
7143
7287
static void
7144
7288
shared_buffer_readv_stage (PgAioHandle * ioh , uint8 cb_data )
7145
7289
{
7146
7290
buffer_stage_common (ioh , false, false);
7147
7291
}
7148
7292
7293
+ static void
7294
+ shared_buffer_writev_stage (PgAioHandle * ioh , uint8 cb_data )
7295
+ {
7296
+ buffer_stage_common (ioh , true, false);
7297
+ }
7298
+
7149
7299
static PgAioResult
7150
7300
shared_buffer_readv_complete (PgAioHandle * ioh , PgAioResult prior_result ,
7151
7301
uint8 cb_data )
@@ -7191,6 +7341,13 @@ shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result,
7191
7341
return prior_result ;
7192
7342
}
7193
7343
7344
+ static PgAioResult
7345
+ shared_buffer_writev_complete (PgAioHandle * ioh , PgAioResult prior_result ,
7346
+ uint8 cb_data )
7347
+ {
7348
+ return buffer_writev_complete (ioh , prior_result , cb_data , false);
7349
+ }
7350
+
7194
7351
static void
7195
7352
local_buffer_readv_stage (PgAioHandle * ioh , uint8 cb_data )
7196
7353
{
@@ -7204,6 +7361,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
7204
7361
return buffer_readv_complete (ioh , prior_result , cb_data , true);
7205
7362
}
7206
7363
7364
+ static void
7365
+ local_buffer_writev_stage (PgAioHandle * ioh , uint8 cb_data )
7366
+ {
7367
+ /*
7368
+ * Currently this is unreachable as the only write support is for
7369
+ * checkpointer / bgwriter, which don't deal with local buffers.
7370
+ */
7371
+ elog (ERROR , "should be unreachable" );
7372
+ }
7373
+
7374
+
7207
7375
/* readv callback is passed READ_BUFFERS_* flags as callback data */
7208
7376
const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
7209
7377
.stage = shared_buffer_readv_stage ,
@@ -7213,6 +7381,11 @@ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
7213
7381
.report = buffer_readv_report ,
7214
7382
};
7215
7383
7384
+ const PgAioHandleCallbacks aio_shared_buffer_writev_cb = {
7385
+ .stage = shared_buffer_writev_stage ,
7386
+ .complete_shared = shared_buffer_writev_complete ,
7387
+ };
7388
+
7216
7389
/* readv callback is passed READ_BUFFERS_* flags as callback data */
7217
7390
const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
7218
7391
.stage = local_buffer_readv_stage ,
@@ -7226,3 +7399,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
7226
7399
.complete_local = local_buffer_readv_complete ,
7227
7400
.report = buffer_readv_report ,
7228
7401
};
7402
+
7403
+ const PgAioHandleCallbacks aio_local_buffer_writev_cb = {
7404
+ .stage = local_buffer_writev_stage ,
7405
+ };
0 commit comments