@@ -305,19 +305,42 @@ rumVacuumPostingTreeLeaves(RumVacuumState * gvs, OffsetNumber attnum,
305
305
/*
306
306
* Delete a posting tree page.
307
307
*/
308
- static void
309
- rumDeletePage (RumVacuumState * gvs , BlockNumber deleteBlkno , BlockNumber leftBlkno ,
308
+ static bool
309
+ rumDeletePage (RumVacuumState * gvs , BlockNumber deleteBlkno ,
310
310
BlockNumber parentBlkno , OffsetNumber myoff , bool isParentRoot )
311
311
{
312
+ BlockNumber leftBlkno ,
313
+ rightBlkno ;
312
314
Buffer dBuffer ;
313
- Buffer lBuffer ;
315
+ Buffer lBuffer ,
316
+ rBuffer ;
314
317
Buffer pBuffer ;
315
318
Page lPage ,
316
319
dPage ,
320
+ rPage ,
317
321
parentPage ;
318
- BlockNumber rightlink ;
319
322
GenericXLogState * state ;
320
323
324
+ restart :
325
+
326
+ dBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , deleteBlkno ,
327
+ RBM_NORMAL , gvs -> strategy );
328
+
329
+ LockBuffer (dBuffer , RUM_EXCLUSIVE );
330
+
331
+ dPage = BufferGetPage (dBuffer );
332
+ leftBlkno = RumPageGetOpaque (dPage )-> leftlink ;
333
+ rightBlkno = RumPageGetOpaque (dPage )-> rightlink ;
334
+
335
+ /* do not remove left/right most pages */
336
+ if (leftBlkno == InvalidBlockNumber || rightBlkno == InvalidBlockNumber )
337
+ {
338
+ UnlockReleaseBuffer (dBuffer );
339
+ return false;
340
+ }
341
+
342
+ LockBuffer (dBuffer , RUM_UNLOCK );
343
+
321
344
state = GenericXLogStart (gvs -> index );
322
345
323
346
/*
@@ -326,23 +349,44 @@ rumDeletePage(RumVacuumState * gvs, BlockNumber deleteBlkno, BlockNumber leftBlk
326
349
*/
327
350
lBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , leftBlkno ,
328
351
RBM_NORMAL , gvs -> strategy );
329
- dBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , deleteBlkno ,
352
+ rBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , rightBlkno ,
330
353
RBM_NORMAL , gvs -> strategy );
331
354
pBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , parentBlkno ,
332
355
RBM_NORMAL , gvs -> strategy );
333
356
334
357
LockBuffer (lBuffer , RUM_EXCLUSIVE );
335
358
LockBuffer (dBuffer , RUM_EXCLUSIVE );
359
+ LockBuffer (rBuffer , RUM_EXCLUSIVE );
336
360
if (!isParentRoot ) /* parent is already locked by
337
361
* LockBufferForCleanup() */
338
362
LockBuffer (pBuffer , RUM_EXCLUSIVE );
339
363
340
- /* Unlink the page by changing left sibling's rightlink */
341
364
dPage = GenericXLogRegisterBuffer (state , dBuffer , 0 );
342
- rightlink = RumPageGetOpaque (dPage )-> rightlink ;
343
-
344
365
lPage = GenericXLogRegisterBuffer (state , lBuffer , 0 );
345
- RumPageGetOpaque (lPage )-> rightlink = rightlink ;
366
+ rPage = GenericXLogRegisterBuffer (state , rBuffer , 0 );
367
+
368
+ /*
369
+ * last chance to check
370
+ */
371
+ if (!(RumPageGetOpaque (lPage )-> rightlink == deleteBlkno &&
372
+ RumPageGetOpaque (rPage )-> leftlink == deleteBlkno &&
373
+ RumPageGetOpaque (dPage )-> maxoff < FirstOffsetNumber ))
374
+ {
375
+ if (!isParentRoot )
376
+ LockBuffer (pBuffer , RUM_UNLOCK );
377
+ ReleaseBuffer (pBuffer );
378
+ UnlockReleaseBuffer (lBuffer );
379
+ UnlockReleaseBuffer (dBuffer );
380
+ UnlockReleaseBuffer (rBuffer );
381
+
382
+ if (RumPageGetOpaque (dPage )-> maxoff >= FirstOffsetNumber )
383
+ return false;
384
+
385
+ goto restart ;
386
+ }
387
+
388
+ RumPageGetOpaque (lPage )-> rightlink = rightBlkno ;
389
+ RumPageGetOpaque (rPage )-> leftlink = leftBlkno ;
346
390
347
391
/* Delete downlink from parent */
348
392
parentPage = GenericXLogRegisterBuffer (state , pBuffer , 0 );
@@ -357,7 +401,7 @@ rumDeletePage(RumVacuumState * gvs, BlockNumber deleteBlkno, BlockNumber leftBlk
357
401
RumPageDeletePostingItem (parentPage , myoff );
358
402
359
403
/*
360
- * we shouldn't change rightlink field to save workability of running
404
+ * we shouldn't change left/right link field to save workability of running
361
405
* search scan
362
406
*/
363
407
RumPageGetOpaque (dPage )-> flags = RUM_DELETED ;
@@ -369,8 +413,11 @@ rumDeletePage(RumVacuumState * gvs, BlockNumber deleteBlkno, BlockNumber leftBlk
369
413
ReleaseBuffer (pBuffer );
370
414
UnlockReleaseBuffer (lBuffer );
371
415
UnlockReleaseBuffer (dBuffer );
416
+ UnlockReleaseBuffer (rBuffer );
372
417
373
418
gvs -> result -> pages_deleted ++ ;
419
+
420
+ return true;
374
421
}
375
422
376
423
typedef struct DataPageDeleteStack
@@ -379,20 +426,20 @@ typedef struct DataPageDeleteStack
379
426
struct DataPageDeleteStack * parent ;
380
427
381
428
BlockNumber blkno ; /* current block number */
382
- BlockNumber leftBlkno ; /* rightest non-deleted page on left */
383
429
bool isRoot ;
384
430
} DataPageDeleteStack ;
385
431
386
432
/*
387
433
* scans posting tree and deletes empty pages
388
434
*/
389
435
static bool
390
- rumScanToDelete (RumVacuumState * gvs , BlockNumber blkno , bool isRoot , DataPageDeleteStack * parent , OffsetNumber myoff )
436
+ rumScanToDelete (RumVacuumState * gvs , BlockNumber blkno , bool isRoot ,
437
+ DataPageDeleteStack * parent , OffsetNumber myoff )
391
438
{
392
439
DataPageDeleteStack * me ;
393
440
Buffer buffer ;
394
441
Page page ;
395
- bool meDelete = FALSE ;
442
+ bool meDelete = false ;
396
443
397
444
if (isRoot )
398
445
{
@@ -405,7 +452,6 @@ rumScanToDelete(RumVacuumState * gvs, BlockNumber blkno, bool isRoot, DataPageDe
405
452
me = (DataPageDeleteStack * ) palloc0 (sizeof (DataPageDeleteStack ));
406
453
me -> parent = parent ;
407
454
parent -> child = me ;
408
- me -> leftBlkno = InvalidBlockNumber ;
409
455
}
410
456
else
411
457
me = parent -> child ;
@@ -431,22 +477,11 @@ rumScanToDelete(RumVacuumState * gvs, BlockNumber blkno, bool isRoot, DataPageDe
431
477
}
432
478
}
433
479
434
- if (RumPageGetOpaque (page )-> maxoff < FirstOffsetNumber )
435
- {
436
- /* we never delete the left- or rightmost branch */
437
- if (me -> leftBlkno != InvalidBlockNumber && !RumPageRightMost (page ))
438
- {
439
- Assert (!isRoot );
440
- rumDeletePage (gvs , blkno , me -> leftBlkno , me -> parent -> blkno , myoff , me -> parent -> isRoot );
441
- meDelete = TRUE;
442
- }
443
- }
480
+ if (RumPageGetOpaque (page )-> maxoff < FirstOffsetNumber && !isRoot )
481
+ meDelete = rumDeletePage (gvs , blkno , me -> parent -> blkno , myoff , me -> parent -> isRoot );
444
482
445
483
ReleaseBuffer (buffer );
446
484
447
- if (!meDelete )
448
- me -> leftBlkno = blkno ;
449
-
450
485
return meDelete ;
451
486
}
452
487
@@ -465,7 +500,6 @@ rumVacuumPostingTree(RumVacuumState * gvs, OffsetNumber attnum, BlockNumber root
465
500
}
466
501
467
502
memset (& root , 0 , sizeof (DataPageDeleteStack ));
468
- root .leftBlkno = InvalidBlockNumber ;
469
503
root .isRoot = TRUE;
470
504
471
505
vacuum_delay_point ();
0 commit comments