21
21
#include "storage/bufmgr.h"
22
22
#include "storage/proc.h"
23
23
#include "storage/procarray.h"
24
+ #include "storage/read_stream.h"
24
25
#include "storage/smgr.h"
25
26
#include "utils/rel.h"
26
27
#include "utils/snapmgr.h"
@@ -41,6 +42,17 @@ typedef struct corrupt_items
41
42
ItemPointer tids ;
42
43
} corrupt_items ;
43
44
45
+ /* for collect_corrupt_items_read_stream_next_block */
46
+ struct collect_corrupt_items_read_stream_private
47
+ {
48
+ bool all_frozen ;
49
+ bool all_visible ;
50
+ BlockNumber current_blocknum ;
51
+ BlockNumber last_exclusive ;
52
+ Relation rel ;
53
+ Buffer vmbuffer ;
54
+ };
55
+
44
56
PG_FUNCTION_INFO_V1 (pg_visibility_map );
45
57
PG_FUNCTION_INFO_V1 (pg_visibility_map_rel );
46
58
PG_FUNCTION_INFO_V1 (pg_visibility );
@@ -478,6 +490,7 @@ collect_visibility_data(Oid relid, bool include_pd)
478
490
BlockNumber blkno ;
479
491
Buffer vmbuffer = InvalidBuffer ;
480
492
BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
493
+ ReadStream * stream = NULL ;
481
494
482
495
rel = relation_open (relid , AccessShareLock );
483
496
@@ -489,6 +502,22 @@ collect_visibility_data(Oid relid, bool include_pd)
489
502
info -> next = 0 ;
490
503
info -> count = nblocks ;
491
504
505
+ /* Create a stream if reading main fork. */
506
+ if (include_pd )
507
+ {
508
+ BlockRangeReadStreamPrivate p ;
509
+
510
+ p .current_blocknum = 0 ;
511
+ p .last_exclusive = nblocks ;
512
+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
513
+ bstrategy ,
514
+ rel ,
515
+ MAIN_FORKNUM ,
516
+ block_range_read_stream_cb ,
517
+ & p ,
518
+ 0 );
519
+ }
520
+
492
521
for (blkno = 0 ; blkno < nblocks ; ++ blkno )
493
522
{
494
523
int32 mapbits ;
@@ -513,8 +542,7 @@ collect_visibility_data(Oid relid, bool include_pd)
513
542
Buffer buffer ;
514
543
Page page ;
515
544
516
- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
517
- bstrategy );
545
+ buffer = read_stream_next_buffer (stream , NULL );
518
546
LockBuffer (buffer , BUFFER_LOCK_SHARE );
519
547
520
548
page = BufferGetPage (buffer );
@@ -525,6 +553,12 @@ collect_visibility_data(Oid relid, bool include_pd)
525
553
}
526
554
}
527
555
556
+ if (include_pd )
557
+ {
558
+ Assert (read_stream_next_buffer (stream , NULL ) == InvalidBuffer );
559
+ read_stream_end (stream );
560
+ }
561
+
528
562
/* Clean up. */
529
563
if (vmbuffer != InvalidBuffer )
530
564
ReleaseBuffer (vmbuffer );
@@ -610,6 +644,38 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
610
644
}
611
645
}
612
646
647
+ /*
648
+ * Callback function to get next block for read stream object used in
649
+ * collect_corrupt_items() function.
650
+ */
651
+ static BlockNumber
652
+ collect_corrupt_items_read_stream_next_block (ReadStream * stream ,
653
+ void * callback_private_data ,
654
+ void * per_buffer_data )
655
+ {
656
+ struct collect_corrupt_items_read_stream_private * p = callback_private_data ;
657
+
658
+ for (; p -> current_blocknum < p -> last_exclusive ; p -> current_blocknum ++ )
659
+ {
660
+ bool check_frozen = false;
661
+ bool check_visible = false;
662
+
663
+ /* Make sure we are interruptible. */
664
+ CHECK_FOR_INTERRUPTS ();
665
+
666
+ if (p -> all_frozen && VM_ALL_FROZEN (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
667
+ check_frozen = true;
668
+ if (p -> all_visible && VM_ALL_VISIBLE (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
669
+ check_visible = true;
670
+ if (!check_visible && !check_frozen )
671
+ continue ;
672
+
673
+ return p -> current_blocknum ++ ;
674
+ }
675
+
676
+ return InvalidBlockNumber ;
677
+ }
678
+
613
679
/*
614
680
* Returns a list of items whose visibility map information does not match
615
681
* the status of the tuples on the page.
@@ -628,12 +694,13 @@ static corrupt_items *
628
694
collect_corrupt_items (Oid relid , bool all_visible , bool all_frozen )
629
695
{
630
696
Relation rel ;
631
- BlockNumber nblocks ;
632
697
corrupt_items * items ;
633
- BlockNumber blkno ;
634
698
Buffer vmbuffer = InvalidBuffer ;
635
699
BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
636
700
TransactionId OldestXmin = InvalidTransactionId ;
701
+ struct collect_corrupt_items_read_stream_private p ;
702
+ ReadStream * stream ;
703
+ Buffer buffer ;
637
704
638
705
rel = relation_open (relid , AccessShareLock );
639
706
@@ -643,8 +710,6 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
643
710
if (all_visible )
644
711
OldestXmin = GetStrictOldestNonRemovableTransactionId (rel );
645
712
646
- nblocks = RelationGetNumberOfBlocks (rel );
647
-
648
713
/*
649
714
* Guess an initial array size. We don't expect many corrupted tuples, so
650
715
* start with a small array. This function uses the "next" field to track
@@ -658,42 +723,46 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
658
723
items -> count = 64 ;
659
724
items -> tids = palloc (items -> count * sizeof (ItemPointerData ));
660
725
726
+ p .current_blocknum = 0 ;
727
+ p .last_exclusive = RelationGetNumberOfBlocks (rel );
728
+ p .rel = rel ;
729
+ p .vmbuffer = InvalidBuffer ;
730
+ p .all_frozen = all_frozen ;
731
+ p .all_visible = all_visible ;
732
+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
733
+ bstrategy ,
734
+ rel ,
735
+ MAIN_FORKNUM ,
736
+ collect_corrupt_items_read_stream_next_block ,
737
+ & p ,
738
+ 0 );
739
+
661
740
/* Loop over every block in the relation. */
662
- for ( blkno = 0 ; blkno < nblocks ; ++ blkno )
741
+ while (( buffer = read_stream_next_buffer ( stream , NULL )) != InvalidBuffer )
663
742
{
664
743
bool check_frozen = false;
665
744
bool check_visible = false;
666
- Buffer buffer ;
667
745
Page page ;
668
746
OffsetNumber offnum ,
669
747
maxoff ;
748
+ BlockNumber blkno ;
670
749
671
750
/* Make sure we are interruptible. */
672
751
CHECK_FOR_INTERRUPTS ();
673
752
674
- /* Use the visibility map to decide whether to check this page. */
675
- if (all_frozen && VM_ALL_FROZEN (rel , blkno , & vmbuffer ))
676
- check_frozen = true;
677
- if (all_visible && VM_ALL_VISIBLE (rel , blkno , & vmbuffer ))
678
- check_visible = true;
679
- if (!check_visible && !check_frozen )
680
- continue ;
681
-
682
- /* Read and lock the page. */
683
- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
684
- bstrategy );
685
753
LockBuffer (buffer , BUFFER_LOCK_SHARE );
686
754
687
755
page = BufferGetPage (buffer );
688
756
maxoff = PageGetMaxOffsetNumber (page );
757
+ blkno = BufferGetBlockNumber (buffer );
689
758
690
759
/*
691
760
* The visibility map bits might have changed while we were acquiring
692
761
* the page lock. Recheck to avoid returning spurious results.
693
762
*/
694
- if (check_frozen && !VM_ALL_FROZEN (rel , blkno , & vmbuffer ))
763
+ if (all_frozen && !VM_ALL_FROZEN (rel , blkno , & vmbuffer ))
695
764
check_frozen = false;
696
- if (check_visible && !VM_ALL_VISIBLE (rel , blkno , & vmbuffer ))
765
+ if (all_visible && !VM_ALL_VISIBLE (rel , blkno , & vmbuffer ))
697
766
check_visible = false;
698
767
if (!check_visible && !check_frozen )
699
768
{
@@ -778,10 +847,13 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
778
847
779
848
UnlockReleaseBuffer (buffer );
780
849
}
850
+ read_stream_end (stream );
781
851
782
852
/* Clean up. */
783
853
if (vmbuffer != InvalidBuffer )
784
854
ReleaseBuffer (vmbuffer );
855
+ if (p .vmbuffer != InvalidBuffer )
856
+ ReleaseBuffer (p .vmbuffer );
785
857
relation_close (rel , AccessShareLock );
786
858
787
859
/*
0 commit comments