diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/gist/gistget.c | 4 | ||||
-rw-r--r-- | src/backend/access/hash/hashsearch.c | 4 | ||||
-rw-r--r-- | src/backend/access/heap/heapam.c | 650 | ||||
-rw-r--r-- | src/backend/access/heap/heapam_handler.c | 166 | ||||
-rw-r--r-- | src/backend/access/index/genam.c | 110 | ||||
-rw-r--r-- | src/backend/access/index/indexam.c | 164 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 2 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtsearch.c | 6 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtsort.c | 20 | ||||
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 2 | ||||
-rw-r--r-- | src/backend/access/table/tableam.c | 293 | ||||
-rw-r--r-- | src/backend/access/table/tableamapi.c | 26 | ||||
-rw-r--r-- | src/backend/access/tablesample/system.c | 7 |
13 files changed, 836 insertions, 618 deletions
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 156b9d699f7..8108fbb7d8e 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -561,7 +561,7 @@ getNextNearest(IndexScanDesc scan) if (GISTSearchItemIsHeap(*item)) { /* found a heap item at currently minimal distance */ - scan->xs_ctup.t_self = item->data.heap.heapPtr; + scan->xs_heaptid = item->data.heap.heapPtr; scan->xs_recheck = item->data.heap.recheck; index_store_float8_orderby_distances(scan, so->orderByTypes, @@ -650,7 +650,7 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir) so->pageData[so->curPageData - 1].offnum; } /* continuing to return tuples from a leaf page */ - scan->xs_ctup.t_self = so->pageData[so->curPageData].heapPtr; + scan->xs_heaptid = so->pageData[so->curPageData].heapPtr; scan->xs_recheck = so->pageData[so->curPageData].recheck; /* in an index-only scan, also return the reconstructed tuple */ diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c index ccd3fdceac0..61c90e6bb78 100644 --- a/src/backend/access/hash/hashsearch.c +++ b/src/backend/access/hash/hashsearch.c @@ -119,7 +119,7 @@ _hash_next(IndexScanDesc scan, ScanDirection dir) /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; - scan->xs_ctup.t_self = currItem->heapTid; + scan->xs_heaptid = currItem->heapTid; return true; } @@ -432,7 +432,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir) /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; - scan->xs_ctup.t_self = currItem->heapTid; + scan->xs_heaptid = currItem->heapTid; /* if we're here, _hash_readpage found a valid tuples */ return true; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dc3499349b6..3c8a5da0bc8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -41,6 +41,7 @@ #include "access/parallel.h" #include "access/relscan.h" #include "access/sysattr.h" +#include "access/tableam.h" #include "access/transam.h" #include "access/tuptoaster.h" #include "access/valid.h" @@ -68,22 +69,6 @@ #include "utils/snapmgr.h" -/* GUC variable */ -bool synchronize_seqscans = true; - - -static HeapScanDesc heap_beginscan_internal(Relation relation, - Snapshot snapshot, - int nkeys, ScanKey key, - ParallelHeapScanDesc parallel_scan, - bool allow_strat, - bool allow_sync, - bool allow_pagemode, - bool is_bitmapscan, - bool is_samplescan, - bool temp_snap); -static void heap_parallelscan_startblock_init(HeapScanDesc scan); -static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -207,6 +192,7 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = static void initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) { + ParallelBlockTableScanDesc bpscan = NULL; bool allow_strat; bool allow_sync; @@ -221,10 +207,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - if (scan->rs_parallel != NULL) - scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + if (scan->rs_base.rs_parallel != NULL) + { + bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + scan->rs_nblocks = bpscan->phs_nblocks; + } else - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_base.rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -238,11 +227,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * Note that heap_parallelscan_initialize has a very similar test; if you * change this, consider changing that one, too. */ - if (!RelationUsesLocalBuffers(scan->rs_rd) && + if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && scan->rs_nblocks > NBuffers / 4) { - allow_strat = scan->rs_allow_strat; - allow_sync = scan->rs_allow_sync; + allow_strat = scan->rs_base.rs_allow_strat; + allow_sync = scan->rs_base.rs_allow_sync; } else allow_strat = allow_sync = false; @@ -260,10 +249,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_strategy = NULL; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ - scan->rs_syncscan = scan->rs_parallel->phs_syncscan; + /* For parallel scan, believe whatever ParallelTableScanDesc says. */ + scan->rs_base.rs_syncscan = scan->rs_base.rs_parallel->phs_syncscan; } else if (keep_startblock) { @@ -272,16 +261,16 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * so that rewinding a cursor doesn't generate surprising results. * Reset the active syncscan setting, though. */ - scan->rs_syncscan = (allow_sync && synchronize_seqscans); + scan->rs_base.rs_syncscan = (allow_sync && synchronize_seqscans); } else if (allow_sync && synchronize_seqscans) { - scan->rs_syncscan = true; - scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks); + scan->rs_base.rs_syncscan = true; + scan->rs_startblock = ss_get_location(scan->rs_base.rs_rd, scan->rs_nblocks); } else { - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; scan->rs_startblock = 0; } @@ -298,15 +287,15 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * copy the scan key, if appropriate */ if (key != NULL) - memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData)); + memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData)); /* * Currently, we don't have a stats counter for bitmap heap scans (but the * underlying bitmap index scans will be counted) or sample scans (we only * update stats for tuple fetches there) */ - if (!scan->rs_bitmapscan && !scan->rs_samplescan) - pgstat_count_heap_scan(scan->rs_rd); + if (!scan->rs_base.rs_bitmapscan && !scan->rs_base.rs_samplescan) + pgstat_count_heap_scan(scan->rs_base.rs_rd); } /* @@ -316,10 +305,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * numBlks is number of pages to scan (InvalidBlockNumber means "all") */ void -heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks) +heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks) { + HeapScanDesc scan = (HeapScanDesc) sscan; + Assert(!scan->rs_inited); /* else too late to change */ - Assert(!scan->rs_syncscan); /* else rs_startblock is significant */ + Assert(!scan->rs_base.rs_syncscan); /* else rs_startblock is significant */ /* Check startBlk is valid (but allow case of zero blocks...) */ Assert(startBlk == 0 || startBlk < scan->rs_nblocks); @@ -336,8 +327,9 @@ heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks) * which tuples on the page are visible. */ void -heapgetpage(HeapScanDesc scan, BlockNumber page) +heapgetpage(TableScanDesc sscan, BlockNumber page) { + HeapScanDesc scan = (HeapScanDesc) sscan; Buffer buffer; Snapshot snapshot; Page dp; @@ -364,20 +356,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) CHECK_FOR_INTERRUPTS(); /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page, + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page, RBM_NORMAL, scan->rs_strategy); scan->rs_cblock = page; - if (!scan->rs_pageatatime) + if (!scan->rs_base.rs_pageatatime) return; buffer = scan->rs_cbuf; - snapshot = scan->rs_snapshot; + snapshot = scan->rs_base.rs_snapshot; /* * Prune and repair fragmentation for the whole page, if possible. */ - heap_page_prune_opt(scan->rs_rd, buffer); + heap_page_prune_opt(scan->rs_base.rs_rd, buffer); /* * We must hold share lock on the buffer content while examining tuple @@ -387,7 +379,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) LockBuffer(buffer, BUFFER_LOCK_SHARE); dp = BufferGetPage(buffer); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); ntup = 0; @@ -422,7 +414,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) HeapTupleData loctup; bool valid; - loctup.t_tableOid = RelationGetRelid(scan->rs_rd); + loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd); loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); loctup.t_len = ItemIdGetLength(lpp); ItemPointerSet(&(loctup.t_self), page, lineoff); @@ -432,8 +424,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) else valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, - buffer, snapshot); + CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, + &loctup, buffer, snapshot); if (valid) scan->rs_vistuples[ntup++] = lineoff; @@ -476,7 +468,7 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - Snapshot snapshot = scan->rs_snapshot; + Snapshot snapshot = scan->rs_base.rs_snapshot; bool backward = ScanDirectionIsBackward(dir); BlockNumber page; bool finished; @@ -502,11 +494,16 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - heap_parallelscan_startblock_init(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - page = heap_parallelscan_nextpage(scan); + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + pbscan); + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -518,7 +515,7 @@ heapgettup(HeapScanDesc scan, } else page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; } @@ -533,7 +530,7 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); /* page and lineoff now reference the physically next tid */ @@ -542,7 +539,7 @@ heapgettup(HeapScanDesc scan, else if (backward) { /* backward parallel scan not supported */ - Assert(scan->rs_parallel == NULL); + Assert(scan->rs_base.rs_parallel == NULL); if (!scan->rs_inited) { @@ -562,13 +559,13 @@ heapgettup(HeapScanDesc scan, * time, and much more likely that we'll just bollix things for * forward scanners. */ - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); } else { @@ -579,7 +576,7 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); if (!scan->rs_inited) @@ -610,11 +607,11 @@ heapgettup(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -649,11 +646,12 @@ heapgettup(HeapScanDesc scan, snapshot, scan->rs_cbuf); - CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, - scan->rs_cbuf, snapshot); + CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, + tuple, scan->rs_cbuf, + snapshot); if (valid && key != NULL) - HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), nkeys, key, valid); if (valid) @@ -696,9 +694,13 @@ heapgettup(HeapScanDesc scan, page = scan->rs_nblocks; page--; } - else if (scan->rs_parallel != NULL) + else if (scan->rs_base.rs_parallel != NULL) { - page = heap_parallelscan_nextpage(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); finished = (page == InvalidBlockNumber); } else @@ -721,8 +723,8 @@ heapgettup(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_base.rs_syncscan) + ss_report_location(scan->rs_base.rs_rd, page); } /* @@ -739,12 +741,12 @@ heapgettup(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber((Page) dp); linesleft = lines; if (backward) @@ -806,11 +808,16 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - heap_parallelscan_startblock_init(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + pbscan); - page = heap_parallelscan_nextpage(scan); + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -822,7 +829,7 @@ heapgettup_pagemode(HeapScanDesc scan, } else page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); lineindex = 0; scan->rs_inited = true; } @@ -834,7 +841,7 @@ heapgettup_pagemode(HeapScanDesc scan, } dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; /* page and lineindex now reference the next visible tid */ @@ -843,7 +850,7 @@ heapgettup_pagemode(HeapScanDesc scan, else if (backward) { /* backward parallel scan not supported */ - Assert(scan->rs_parallel == NULL); + Assert(scan->rs_base.rs_parallel == NULL); if (!scan->rs_inited) { @@ -863,13 +870,13 @@ heapgettup_pagemode(HeapScanDesc scan, * time, and much more likely that we'll just bollix things for * forward scanners. */ - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); } else { @@ -878,7 +885,7 @@ heapgettup_pagemode(HeapScanDesc scan, } dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; if (!scan->rs_inited) @@ -908,11 +915,11 @@ heapgettup_pagemode(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -950,7 +957,7 @@ heapgettup_pagemode(HeapScanDesc scan, { bool valid; - HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), nkeys, key, valid); if (valid) { @@ -986,9 +993,13 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_nblocks; page--; } - else if (scan->rs_parallel != NULL) + else if (scan->rs_base.rs_parallel != NULL) { - page = heap_parallelscan_nextpage(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); finished = (page == InvalidBlockNumber); } else @@ -1011,8 +1022,8 @@ heapgettup_pagemode(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_base.rs_syncscan) + ss_report_location(scan->rs_base.rs_rd, page); } /* @@ -1029,10 +1040,10 @@ heapgettup_pagemode(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; linesleft = lines; if (backward) @@ -1095,86 +1106,16 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, */ -/* ---------------- - * heap_beginscan - begin relation scan - * - * heap_beginscan is the "standard" case. - * - * heap_beginscan_catalog differs in setting up its own temporary snapshot. - * - * heap_beginscan_strat offers an extended API that lets the caller control - * whether a nondefault buffer access strategy can be used, and whether - * syncscan can be chosen (possibly resulting in the scan not starting from - * block zero). Both of these default to true with plain heap_beginscan. - * - * heap_beginscan_bm is an alternative entry point for setting up a - * HeapScanDesc for a bitmap heap scan. Although that scan technology is - * really quite unlike a standard seqscan, there is just enough commonality - * to make it worth using the same data structure. - * - * heap_beginscan_sampling is an alternative entry point for setting up a - * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth - * using the same data structure although the behavior is rather different. - * In addition to the options offered by heap_beginscan_strat, this call - * also allows control of whether page-mode visibility checking is used. - * ---------------- - */ -HeapScanDesc +TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - true, true, true, false, false, false); -} - -HeapScanDesc -heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) -{ - Oid relid = RelationGetRelid(relation); - Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - true, true, true, false, false, true); -} - -HeapScanDesc -heap_beginscan_strat(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - bool allow_strat, bool allow_sync) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - allow_strat, allow_sync, true, - false, false, false); -} - -HeapScanDesc -heap_beginscan_bm(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - false, false, true, true, false, false); -} - -HeapScanDesc -heap_beginscan_sampling(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - allow_strat, allow_sync, allow_pagemode, - false, true, false); -} - -static HeapScanDesc -heap_beginscan_internal(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - ParallelHeapScanDesc parallel_scan, - bool allow_strat, - bool allow_sync, - bool allow_pagemode, - bool is_bitmapscan, - bool is_samplescan, - bool temp_snap) + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + bool allow_strat, + bool allow_sync, + bool allow_pagemode, + bool is_bitmapscan, + bool is_samplescan, + bool temp_snap) { HeapScanDesc scan; @@ -1192,21 +1133,22 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, */ scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData)); - scan->rs_rd = relation; - scan->rs_snapshot = snapshot; - scan->rs_nkeys = nkeys; - scan->rs_bitmapscan = is_bitmapscan; - scan->rs_samplescan = is_samplescan; + scan->rs_base.rs_rd = relation; + scan->rs_base.rs_snapshot = snapshot; + scan->rs_base.rs_nkeys = nkeys; + scan->rs_base.rs_bitmapscan = is_bitmapscan; + scan->rs_base.rs_samplescan = is_samplescan; scan->rs_strategy = NULL; /* set in initscan */ - scan->rs_allow_strat = allow_strat; - scan->rs_allow_sync = allow_sync; - scan->rs_temp_snap = temp_snap; - scan->rs_parallel = parallel_scan; + scan->rs_base.rs_allow_strat = allow_strat; + scan->rs_base.rs_allow_sync = allow_sync; + scan->rs_base.rs_temp_snap = temp_snap; + scan->rs_base.rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot */ - scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot); + scan->rs_base.rs_pageatatime = + allow_pagemode && snapshot && IsMVCCSnapshot(snapshot); /* * For a seqscan in a serializable transaction, acquire a predicate lock @@ -1230,23 +1172,29 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, * initscan() and we don't want to allocate memory again */ if (nkeys > 0) - scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); + scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); else - scan->rs_key = NULL; + scan->rs_base.rs_key = NULL; initscan(scan, key, false); - return scan; + return (TableScanDesc) scan; } -/* ---------------- - * heap_rescan - restart a relation scan - * ---------------- - */ void -heap_rescan(HeapScanDesc scan, - ScanKey key) +heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, + bool allow_strat, bool allow_sync, bool allow_pagemode) { + HeapScanDesc scan = (HeapScanDesc) sscan; + + if (set_params) + { + scan->rs_base.rs_allow_strat = allow_strat; + scan->rs_base.rs_allow_sync = allow_sync; + scan->rs_base.rs_pageatatime = + allow_pagemode && IsMVCCSnapshot(scan->rs_base.rs_snapshot); + } + /* * unpin scan buffers */ @@ -1259,37 +1207,11 @@ heap_rescan(HeapScanDesc scan, initscan(scan, key, true); } -/* ---------------- - * heap_rescan_set_params - restart a relation scan after changing params - * - * This call allows changing the buffer strategy, syncscan, and pagemode - * options before starting a fresh scan. Note that although the actual use - * of syncscan might change (effectively, enabling or disabling reporting), - * the previously selected startblock will be kept. - * ---------------- - */ void -heap_rescan_set_params(HeapScanDesc scan, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode) +heap_endscan(TableScanDesc sscan) { - /* adjust parameters */ - scan->rs_allow_strat = allow_strat; - scan->rs_allow_sync = allow_sync; - scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot); - /* ... and rescan */ - heap_rescan(scan, key); -} + HeapScanDesc scan = (HeapScanDesc) sscan; -/* ---------------- - * heap_endscan - end relation scan - * - * See how to integrate with index scans. - * Check handling if reldesc caching. - * ---------------- - */ -void -heap_endscan(HeapScanDesc scan) -{ /* Note: no locking manipulations needed */ /* @@ -1301,246 +1223,20 @@ heap_endscan(HeapScanDesc scan) /* * decrement relation reference count and free scan descriptor storage */ - RelationDecrementReferenceCount(scan->rs_rd); + RelationDecrementReferenceCount(scan->rs_base.rs_rd); - if (scan->rs_key) - pfree(scan->rs_key); + if (scan->rs_base.rs_key) + pfree(scan->rs_base.rs_key); if (scan->rs_strategy != NULL) FreeAccessStrategy(scan->rs_strategy); - if (scan->rs_temp_snap) - UnregisterSnapshot(scan->rs_snapshot); + if (scan->rs_base.rs_temp_snap) + UnregisterSnapshot(scan->rs_base.rs_snapshot); pfree(scan); } -/* ---------------- - * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc - * - * Sadly, this doesn't reduce to a constant, because the size required - * to serialize the snapshot can vary. - * ---------------- - */ -Size -heap_parallelscan_estimate(Snapshot snapshot) -{ - Size sz = offsetof(ParallelHeapScanDescData, phs_snapshot_data); - - if (IsMVCCSnapshot(snapshot)) - sz = add_size(sz, EstimateSnapshotSpace(snapshot)); - else - Assert(snapshot == SnapshotAny); - - return sz; -} - -/* ---------------- - * heap_parallelscan_initialize - initialize ParallelHeapScanDesc - * - * Must allow as many bytes of shared memory as returned by - * heap_parallelscan_estimate. Call this just once in the leader - * process; then, individual workers attach via heap_beginscan_parallel. - * ---------------- - */ -void -heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, - Snapshot snapshot) -{ - target->phs_relid = RelationGetRelid(relation); - target->phs_nblocks = RelationGetNumberOfBlocks(relation); - /* compare phs_syncscan initialization to similar logic in initscan */ - target->phs_syncscan = synchronize_seqscans && - !RelationUsesLocalBuffers(relation) && - target->phs_nblocks > NBuffers / 4; - SpinLockInit(&target->phs_mutex); - target->phs_startblock = InvalidBlockNumber; - pg_atomic_init_u64(&target->phs_nallocated, 0); - if (IsMVCCSnapshot(snapshot)) - { - SerializeSnapshot(snapshot, target->phs_snapshot_data); - target->phs_snapshot_any = false; - } - else - { - Assert(snapshot == SnapshotAny); - target->phs_snapshot_any = true; - } -} - -/* ---------------- - * heap_parallelscan_reinitialize - reset a parallel scan - * - * Call this in the leader process. Caller is responsible for - * making sure that all workers have finished the scan beforehand. - * ---------------- - */ -void -heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan) -{ - pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); -} - -/* ---------------- - * heap_beginscan_parallel - join a parallel scan - * - * Caller must hold a suitable lock on the correct relation. - * ---------------- - */ -HeapScanDesc -heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) -{ - Snapshot snapshot; - - Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); - - if (!parallel_scan->phs_snapshot_any) - { - /* Snapshot was serialized -- restore it */ - snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); - RegisterSnapshot(snapshot); - } - else - { - /* SnapshotAny passed by caller (not serialized) */ - snapshot = SnapshotAny; - } - - return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, - true, true, true, false, false, - !parallel_scan->phs_snapshot_any); -} - -/* ---------------- - * heap_parallelscan_startblock_init - find and set the scan's startblock - * - * Determine where the parallel seq scan should start. This function may - * be called many times, once by each parallel worker. We must be careful - * only to set the startblock once. - * ---------------- - */ -static void -heap_parallelscan_startblock_init(HeapScanDesc scan) -{ - BlockNumber sync_startpage = InvalidBlockNumber; - ParallelHeapScanDesc parallel_scan; - - Assert(scan->rs_parallel); - parallel_scan = scan->rs_parallel; - -retry: - /* Grab the spinlock. */ - SpinLockAcquire(¶llel_scan->phs_mutex); - - /* - * If the scan's startblock has not yet been initialized, we must do so - * now. If this is not a synchronized scan, we just start at block 0, but - * if it is a synchronized scan, we must get the starting position from - * the synchronized scan machinery. We can't hold the spinlock while - * doing that, though, so release the spinlock, get the information we - * need, and retry. If nobody else has initialized the scan in the - * meantime, we'll fill in the value we fetched on the second time - * through. - */ - if (parallel_scan->phs_startblock == InvalidBlockNumber) - { - if (!parallel_scan->phs_syncscan) - parallel_scan->phs_startblock = 0; - else if (sync_startpage != InvalidBlockNumber) - parallel_scan->phs_startblock = sync_startpage; - else - { - SpinLockRelease(¶llel_scan->phs_mutex); - sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); - goto retry; - } - } - SpinLockRelease(¶llel_scan->phs_mutex); -} - -/* ---------------- - * heap_parallelscan_nextpage - get the next page to scan - * - * Get the next page to scan. Even if there are no pages left to scan, - * another backend could have grabbed a page to scan and not yet finished - * looking at it, so it doesn't follow that the scan is done when the - * first backend gets an InvalidBlockNumber return. - * ---------------- - */ -static BlockNumber -heap_parallelscan_nextpage(HeapScanDesc scan) -{ - BlockNumber page; - ParallelHeapScanDesc parallel_scan; - uint64 nallocated; - - Assert(scan->rs_parallel); - parallel_scan = scan->rs_parallel; - - /* - * phs_nallocated tracks how many pages have been allocated to workers - * already. When phs_nallocated >= rs_nblocks, all blocks have been - * allocated. - * - * Because we use an atomic fetch-and-add to fetch the current value, the - * phs_nallocated counter will exceed rs_nblocks, because workers will - * still increment the value, when they try to allocate the next block but - * all blocks have been allocated already. The counter must be 64 bits - * wide because of that, to avoid wrapping around when rs_nblocks is close - * to 2^32. - * - * The actual page to return is calculated by adding the counter to the - * starting block number, modulo nblocks. - */ - nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1); - if (nallocated >= scan->rs_nblocks) - page = InvalidBlockNumber; /* all blocks have been allocated */ - else - page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks; - - /* - * Report scan location. Normally, we report the current page number. - * When we reach the end of the scan, though, we report the starting page, - * not the ending page, just so the starting positions for later scans - * doesn't slew backwards. We only report the position at the end of the - * scan once, though: subsequent callers will report nothing. - */ - if (scan->rs_syncscan) - { - if (page != InvalidBlockNumber) - ss_report_location(scan->rs_rd, page); - else if (nallocated == scan->rs_nblocks) - ss_report_location(scan->rs_rd, parallel_scan->phs_startblock); - } - - return page; -} - -/* ---------------- - * heap_update_snapshot - * - * Update snapshot info in heap scan descriptor. - * ---------------- - */ -void -heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) -{ - Assert(IsMVCCSnapshot(snapshot)); - - RegisterSnapshot(snapshot); - scan->rs_snapshot = snapshot; - scan->rs_temp_snap = true; -} - -/* ---------------- - * heap_getnext - retrieve next tuple in scan - * - * Fix to work with index relations. - * We don't return the buffer anymore, but you can get it from the - * returned HeapTuple. - * ---------------- - */ - #ifdef HEAPDEBUGALL #define HEAPDEBUG_1 \ elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \ @@ -1557,17 +1253,32 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) HeapTuple -heap_getnext(HeapScanDesc scan, ScanDirection direction) +heap_getnext(TableScanDesc sscan, ScanDirection direction) { + HeapScanDesc scan = (HeapScanDesc) sscan; + + /* + * This is still widely used directly, without going through table AM, so + * add a safety check. It's possible we should, at a later point, + * downgrade this to an assert. The reason for checking the AM routine, + * rather than the AM oid, is that this allows to write regression tests + * that create another AM reusing the heap handler. + */ + if (unlikely(sscan->rs_rd->rd_tableam != GetHeapamTableAmRoutine())) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("only heap AM is supported"))); + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ - if (scan->rs_pageatatime) + if (scan->rs_base.rs_pageatatime) heapgettup_pagemode(scan, direction, - scan->rs_nkeys, scan->rs_key); + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); else - heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key); + heapgettup(scan, direction, + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); if (scan->rs_ctup.t_data == NULL) { @@ -1581,9 +1292,58 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) */ HEAPDEBUG_3; /* heap_getnext returning tuple */ - pgstat_count_heap_getnext(scan->rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd); + + return &scan->rs_ctup; +} + +#ifdef HEAPAMSLOTDEBUGALL +#define HEAPAMSLOTDEBUG_1 \ + elog(DEBUG2, "heapam_getnextslot([%s,nkeys=%d],dir=%d) called", \ + RelationGetRelationName(scan->rs_base.rs_rd), scan->rs_base.rs_nkeys, (int) direction) +#define HEAPAMSLOTDEBUG_2 \ + elog(DEBUG2, "heapam_getnextslot returning EOS") +#define HEAPAMSLOTDEBUG_3 \ + elog(DEBUG2, "heapam_getnextslot returning tuple") +#else +#define HEAPAMSLOTDEBUG_1 +#define HEAPAMSLOTDEBUG_2 +#define HEAPAMSLOTDEBUG_3 +#endif + +bool +heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + + /* Note: no locking manipulations needed */ + + HEAPAMSLOTDEBUG_1; /* heap_getnextslot( info ) */ + + if (scan->rs_base.rs_pageatatime) + heapgettup_pagemode(scan, direction, + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); + else + heapgettup(scan, direction, scan->rs_base.rs_nkeys, scan->rs_base.rs_key); - return &(scan->rs_ctup); + if (scan->rs_ctup.t_data == NULL) + { + HEAPAMSLOTDEBUG_2; /* heap_getnextslot returning EOS */ + ExecClearTuple(slot); + return false; + } + + /* + * if we get here it means we have a new current scan tuple, so point to + * the proper return buffer and return the tuple. + */ + HEAPAMSLOTDEBUG_3; /* heap_getnextslot returning tuple */ + + pgstat_count_heap_getnext(scan->rs_base.rs_rd); + + ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, + scan->rs_cbuf); + return true; } /* diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 518d1df84a1..6a26fcef94c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -19,15 +19,181 @@ */ #include "postgres.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "storage/bufmgr.h" #include "utils/builtins.h" static const TableAmRoutine heapam_methods; +/* ------------------------------------------------------------------------ + * Slot related callbacks for heap AM + * ------------------------------------------------------------------------ + */ + +static const TupleTableSlotOps * +heapam_slot_callbacks(Relation relation) +{ + return &TTSOpsBufferHeapTuple; +} + + +/* ------------------------------------------------------------------------ + * Index Scan Callbacks for heap AM + * ------------------------------------------------------------------------ + */ + +static IndexFetchTableData * +heapam_index_fetch_begin(Relation rel) +{ + IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData)); + + hscan->xs_base.rel = rel; + hscan->xs_cbuf = InvalidBuffer; + + return &hscan->xs_base; +} + +static void +heapam_index_fetch_reset(IndexFetchTableData *scan) +{ + IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; + + if (BufferIsValid(hscan->xs_cbuf)) + { + ReleaseBuffer(hscan->xs_cbuf); + hscan->xs_cbuf = InvalidBuffer; + } +} + +static void +heapam_index_fetch_end(IndexFetchTableData *scan) +{ + IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; + + heapam_index_fetch_reset(scan); + + pfree(hscan); +} + +static bool +heapam_index_fetch_tuple(struct IndexFetchTableData *scan, + ItemPointer tid, + Snapshot snapshot, + TupleTableSlot *slot, + bool *call_again, bool *all_dead) +{ + IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + bool got_heap_tuple; + + Assert(TTS_IS_BUFFERTUPLE(slot)); + + /* We can skip the buffer-switching logic if we're in mid-HOT chain. */ + if (!*call_again) + { + /* Switch to correct buffer if we don't have it already */ + Buffer prev_buf = hscan->xs_cbuf; + + hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, + hscan->xs_base.rel, + ItemPointerGetBlockNumber(tid)); + + /* + * Prune page, but only if we weren't already on this page + */ + if (prev_buf != hscan->xs_cbuf) + heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf); + } + + /* Obtain share-lock on the buffer so we can examine visibility */ + LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_SHARE); + got_heap_tuple = heap_hot_search_buffer(tid, + hscan->xs_base.rel, + hscan->xs_cbuf, + snapshot, + &bslot->base.tupdata, + all_dead, + !*call_again); + bslot->base.tupdata.t_self = *tid; + LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK); + + if (got_heap_tuple) + { + /* + * Only in a non-MVCC snapshot can more than one member of the HOT + * chain be visible. + */ + *call_again = !IsMVCCSnapshot(snapshot); + + slot->tts_tableOid = RelationGetRelid(scan->rel); + ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf); + } + else + { + /* We've reached the end of the HOT chain. */ + *call_again = false; + } + + return got_heap_tuple; +} + + +/* ------------------------------------------------------------------------ + * Callbacks for non-modifying operations on individual tuples for heap AM + * ------------------------------------------------------------------------ + */ + +static bool +heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, + Snapshot snapshot) +{ + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + bool res; + + Assert(TTS_IS_BUFFERTUPLE(slot)); + Assert(BufferIsValid(bslot->buffer)); + + /* + * We need buffer pin and lock to call HeapTupleSatisfiesVisibility. + * Caller should be holding pin, but not lock. + */ + LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE); + res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot, + bslot->buffer); + LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK); + + return res; +} + + +/* ------------------------------------------------------------------------ + * Definition of the heap table access method. + * ------------------------------------------------------------------------ + */ + static const TableAmRoutine heapam_methods = { .type = T_TableAmRoutine, + + .slot_callbacks = heapam_slot_callbacks, + + .scan_begin = heap_beginscan, + .scan_end = heap_endscan, + .scan_rescan = heap_rescan, + .scan_getnextslot = heap_getnextslot, + + .parallelscan_estimate = table_block_parallelscan_estimate, + .parallelscan_initialize = table_block_parallelscan_initialize, + .parallelscan_reinitialize = table_block_parallelscan_reinitialize, + + .index_fetch_begin = heapam_index_fetch_begin, + .index_fetch_reset = heapam_index_fetch_reset, + .index_fetch_end = heapam_index_fetch_end, + .index_fetch_tuple = heapam_index_fetch_tuple, + + .tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot, }; diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index e0a5ea42d52..5222966e510 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -22,6 +22,7 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/relscan.h" +#include "access/tableam.h" #include "access/transam.h" #include "catalog/index.h" #include "lib/stringinfo.h" @@ -83,6 +84,7 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys) scan = (IndexScanDesc) palloc(sizeof(IndexScanDescData)); scan->heapRelation = NULL; /* may be set later */ + scan->xs_heapfetch = NULL; scan->indexRelation = indexRelation; scan->xs_snapshot = InvalidSnapshot; /* caller must initialize this */ scan->numberOfKeys = nkeys; @@ -123,11 +125,6 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys) scan->xs_hitup = NULL; scan->xs_hitupdesc = NULL; - ItemPointerSetInvalid(&scan->xs_ctup.t_self); - scan->xs_ctup.t_data = NULL; - scan->xs_cbuf = InvalidBuffer; - scan->xs_continue_hot = false; - return scan; } @@ -335,6 +332,7 @@ systable_beginscan(Relation heapRelation, sysscan->heap_rel = heapRelation; sysscan->irel = irel; + sysscan->slot = table_slot_create(heapRelation, NULL); if (snapshot == NULL) { @@ -384,9 +382,9 @@ systable_beginscan(Relation heapRelation, * disadvantage; and there are no compensating advantages, because * it's unlikely that such scans will occur in parallel. */ - sysscan->scan = heap_beginscan_strat(heapRelation, snapshot, - nkeys, key, - true, false); + sysscan->scan = table_beginscan_strat(heapRelation, snapshot, + nkeys, key, + true, false); sysscan->iscan = NULL; } @@ -401,28 +399,46 @@ systable_beginscan(Relation heapRelation, * Note that returned tuple is a reference to data in a disk buffer; * it must not be modified, and should be presumed inaccessible after * next getnext() or endscan() call. + * + * XXX: It'd probably make sense to offer a slot based interface, at least + * optionally. */ HeapTuple systable_getnext(SysScanDesc sysscan) { - HeapTuple htup; + HeapTuple htup = NULL; if (sysscan->irel) { - htup = index_getnext(sysscan->iscan, ForwardScanDirection); + if (index_getnext_slot(sysscan->iscan, ForwardScanDirection, sysscan->slot)) + { + bool shouldFree; - /* - * We currently don't need to support lossy index operators for any - * system catalog scan. It could be done here, using the scan keys to - * drive the operator calls, if we arranged to save the heap attnums - * during systable_beginscan(); this is practical because we still - * wouldn't need to support indexes on expressions. - */ - if (htup && sysscan->iscan->xs_recheck) - elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree); + Assert(!shouldFree); + + /* + * We currently don't need to support lossy index operators for + * any system catalog scan. It could be done here, using the scan + * keys to drive the operator calls, if we arranged to save the + * heap attnums during systable_beginscan(); this is practical + * because we still wouldn't need to support indexes on + * expressions. + */ + if (sysscan->iscan->xs_recheck) + elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + } } else - htup = heap_getnext(sysscan->scan, ForwardScanDirection); + { + if (table_scan_getnextslot(sysscan->scan, ForwardScanDirection, sysscan->slot)) + { + bool shouldFree; + + htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree); + Assert(!shouldFree); + } + } return htup; } @@ -446,37 +462,20 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) Snapshot freshsnap; bool result; + Assert(tup == ExecFetchSlotHeapTuple(sysscan->slot, false, NULL)); + /* - * Trust that LockBuffer() and HeapTupleSatisfiesMVCC() do not themselves + * Trust that table_tuple_satisfies_snapshot() and its subsidiaries + * (commonly LockBuffer() and HeapTupleSatisfiesMVCC()) do not themselves * acquire snapshots, so we need not register the snapshot. Those * facilities are too low-level to have any business scanning tables. */ freshsnap = GetCatalogSnapshot(RelationGetRelid(sysscan->heap_rel)); - if (sysscan->irel) - { - IndexScanDesc scan = sysscan->iscan; - - Assert(IsMVCCSnapshot(scan->xs_snapshot)); - Assert(tup == &scan->xs_ctup); - Assert(BufferIsValid(scan->xs_cbuf)); - /* must hold a buffer lock to call HeapTupleSatisfiesVisibility */ - LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE); - result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->xs_cbuf); - LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); - } - else - { - HeapScanDesc scan = sysscan->scan; - - Assert(IsMVCCSnapshot(scan->rs_snapshot)); - Assert(tup == &scan->rs_ctup); - Assert(BufferIsValid(scan->rs_cbuf)); - /* must hold a buffer lock to call HeapTupleSatisfiesVisibility */ - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->rs_cbuf); - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - } + result = table_tuple_satisfies_snapshot(sysscan->heap_rel, + sysscan->slot, + freshsnap); + return result; } @@ -488,13 +487,19 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) void systable_endscan(SysScanDesc sysscan) { + if (sysscan->slot) + { + ExecDropSingleTupleTableSlot(sysscan->slot); + sysscan->slot = NULL; + } + if (sysscan->irel) { index_endscan(sysscan->iscan); index_close(sysscan->irel, AccessShareLock); } else - heap_endscan(sysscan->scan); + table_endscan(sysscan->scan); if (sysscan->snapshot) UnregisterSnapshot(sysscan->snapshot); @@ -541,6 +546,7 @@ systable_beginscan_ordered(Relation heapRelation, sysscan->heap_rel = heapRelation; sysscan->irel = indexRelation; + sysscan->slot = table_slot_create(heapRelation, NULL); if (snapshot == NULL) { @@ -586,10 +592,12 @@ systable_beginscan_ordered(Relation heapRelation, HeapTuple systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) { - HeapTuple htup; + HeapTuple htup = NULL; Assert(sysscan->irel); - htup = index_getnext(sysscan->iscan, direction); + if (index_getnext_slot(sysscan->iscan, direction, sysscan->slot)) + htup = ExecFetchSlotHeapTuple(sysscan->slot, false, NULL); + /* See notes in systable_getnext */ if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); @@ -603,6 +611,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) void systable_endscan_ordered(SysScanDesc sysscan) { + if (sysscan->slot) + { + ExecDropSingleTupleTableSlot(sysscan->slot); + sysscan->slot = NULL; + } + Assert(sysscan->irel); index_endscan(sysscan->iscan); if (sysscan->snapshot) diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 4ad30186d97..ae1c87ebadd 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -72,6 +72,7 @@ #include "access/amapi.h" #include "access/heapam.h" #include "access/relscan.h" +#include "access/tableam.h" #include "access/transam.h" #include "access/xlog.h" #include "catalog/index.h" @@ -235,6 +236,9 @@ index_beginscan(Relation heapRelation, scan->heapRelation = heapRelation; scan->xs_snapshot = snapshot; + /* prepare to fetch index matches from table */ + scan->xs_heapfetch = table_index_fetch_begin(heapRelation); + return scan; } @@ -318,16 +322,12 @@ index_rescan(IndexScanDesc scan, Assert(nkeys == scan->numberOfKeys); Assert(norderbys == scan->numberOfOrderBys); - /* Release any held pin on a heap page */ - if (BufferIsValid(scan->xs_cbuf)) - { - ReleaseBuffer(scan->xs_cbuf); - scan->xs_cbuf = InvalidBuffer; - } - - scan->xs_continue_hot = false; + /* Release resources (like buffer pins) from table accesses */ + if (scan->xs_heapfetch) + table_index_fetch_reset(scan->xs_heapfetch); scan->kill_prior_tuple = false; /* for safety */ + scan->xs_heap_continue = false; scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys, orderbys, norderbys); @@ -343,11 +343,11 @@ index_endscan(IndexScanDesc scan) SCAN_CHECKS; CHECK_SCAN_PROCEDURE(amendscan); - /* Release any held pin on a heap page */ - if (BufferIsValid(scan->xs_cbuf)) + /* Release resources (like buffer pins) from table accesses */ + if (scan->xs_heapfetch) { - ReleaseBuffer(scan->xs_cbuf); - scan->xs_cbuf = InvalidBuffer; + table_index_fetch_end(scan->xs_heapfetch); + scan->xs_heapfetch = NULL; } /* End the AM's scan */ @@ -379,17 +379,16 @@ index_markpos(IndexScanDesc scan) /* ---------------- * index_restrpos - restore a scan position * - * NOTE: this only restores the internal scan state of the index AM. - * The current result tuple (scan->xs_ctup) doesn't change. See comments - * for ExecRestrPos(). - * - * NOTE: in the presence of HOT chains, mark/restore only works correctly - * if the scan's snapshot is MVCC-safe; that ensures that there's at most one - * returnable tuple in each HOT chain, and so restoring the prior state at the - * granularity of the index AM is sufficient. Since the only current user - * of mark/restore functionality is nodeMergejoin.c, this effectively means - * that merge-join plans only work for MVCC snapshots. This could be fixed - * if necessary, but for now it seems unimportant. + * NOTE: this only restores the internal scan state of the index AM. See + * comments for ExecRestrPos(). + * + * NOTE: For heap, in the presence of HOT chains, mark/restore only works + * correctly if the scan's snapshot is MVCC-safe; that ensures that there's at + * most one returnable tuple in each HOT chain, and so restoring the prior + * state at the granularity of the index AM is sufficient. Since the only + * current user of mark/restore functionality is nodeMergejoin.c, this + * effectively means that merge-join plans only work for MVCC snapshots. This + * could be fixed if necessary, but for now it seems unimportant. * ---------------- */ void @@ -400,9 +399,12 @@ index_restrpos(IndexScanDesc scan) SCAN_CHECKS; CHECK_SCAN_PROCEDURE(amrestrpos); - scan->xs_continue_hot = false; + /* release resources (like buffer pins) from table accesses */ + if (scan->xs_heapfetch) + table_index_fetch_reset(scan->xs_heapfetch); scan->kill_prior_tuple = false; /* for safety */ + scan->xs_heap_continue = false; scan->indexRelation->rd_indam->amrestrpos(scan); } @@ -483,6 +485,9 @@ index_parallelrescan(IndexScanDesc scan) { SCAN_CHECKS; + if (scan->xs_heapfetch) + table_index_fetch_reset(scan->xs_heapfetch); + /* amparallelrescan is optional; assume no-op if not provided by AM */ if (scan->indexRelation->rd_indam->amparallelrescan != NULL) scan->indexRelation->rd_indam->amparallelrescan(scan); @@ -513,6 +518,9 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, scan->heapRelation = heaprel; scan->xs_snapshot = snapshot; + /* prepare to fetch index matches from table */ + scan->xs_heapfetch = table_index_fetch_begin(heaprel); + return scan; } @@ -535,7 +543,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) /* * The AM's amgettuple proc finds the next index entry matching the scan - * keys, and puts the TID into scan->xs_ctup.t_self. It should also set + * keys, and puts the TID into scan->xs_heaptid. It should also set * scan->xs_recheck and possibly scan->xs_itup/scan->xs_hitup, though we * pay no attention to those fields here. */ @@ -543,23 +551,23 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) /* Reset kill flag immediately for safety */ scan->kill_prior_tuple = false; + scan->xs_heap_continue = false; /* If we're out of index entries, we're done */ if (!found) { - /* ... but first, release any held pin on a heap page */ - if (BufferIsValid(scan->xs_cbuf)) - { - ReleaseBuffer(scan->xs_cbuf); - scan->xs_cbuf = InvalidBuffer; - } + /* release resources (like buffer pins) from table accesses */ + if (scan->xs_heapfetch) + table_index_fetch_reset(scan->xs_heapfetch); + return NULL; } + Assert(ItemPointerIsValid(&scan->xs_heaptid)); pgstat_count_index_tuples(scan->indexRelation, 1); /* Return the TID of the tuple we found. */ - return &scan->xs_ctup.t_self; + return &scan->xs_heaptid; } /* ---------------- @@ -580,53 +588,18 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) * enough information to do it efficiently in the general case. * ---------------- */ -HeapTuple -index_fetch_heap(IndexScanDesc scan) +bool +index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot) { - ItemPointer tid = &scan->xs_ctup.t_self; bool all_dead = false; - bool got_heap_tuple; - - /* We can skip the buffer-switching logic if we're in mid-HOT chain. */ - if (!scan->xs_continue_hot) - { - /* Switch to correct buffer if we don't have it already */ - Buffer prev_buf = scan->xs_cbuf; - - scan->xs_cbuf = ReleaseAndReadBuffer(scan->xs_cbuf, - scan->heapRelation, - ItemPointerGetBlockNumber(tid)); + bool found; - /* - * Prune page, but only if we weren't already on this page - */ - if (prev_buf != scan->xs_cbuf) - heap_page_prune_opt(scan->heapRelation, scan->xs_cbuf); - } + found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid, + scan->xs_snapshot, slot, + &scan->xs_heap_continue, &all_dead); - /* Obtain share-lock on the buffer so we can examine visibility */ - LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE); - got_heap_tuple = heap_hot_search_buffer(tid, scan->heapRelation, - scan->xs_cbuf, - scan->xs_snapshot, - &scan->xs_ctup, - &all_dead, - !scan->xs_continue_hot); - LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); - - if (got_heap_tuple) - { - /* - * Only in a non-MVCC snapshot can more than one member of the HOT - * chain be visible. - */ - scan->xs_continue_hot = !IsMVCCSnapshot(scan->xs_snapshot); + if (found) pgstat_count_heap_fetch(scan->indexRelation); - return &scan->xs_ctup; - } - - /* We've reached the end of the HOT chain. */ - scan->xs_continue_hot = false; /* * If we scanned a whole HOT chain and found only dead tuples, tell index @@ -638,17 +611,17 @@ index_fetch_heap(IndexScanDesc scan) if (!scan->xactStartedInRecovery) scan->kill_prior_tuple = all_dead; - return NULL; + return found; } /* ---------------- - * index_getnext - get the next heap tuple from a scan + * index_getnext_slot - get the next tuple from a scan * - * The result is the next heap tuple satisfying the scan keys and the - * snapshot, or NULL if no more matching tuples exist. + * The result is true if a tuple satisfying the scan keys and the snapshot was + * found, false otherwise. The tuple is stored in the specified slot. * - * On success, the buffer containing the heap tup is pinned (the pin will be - * dropped in a future index_getnext_tid, index_fetch_heap or index_endscan + * On success, resources (like buffer pins) are likely to be held, and will be + * dropped by a future index_getnext_tid, index_fetch_heap or index_endscan * call). * * Note: caller must check scan->xs_recheck, and perform rechecking of the @@ -656,32 +629,23 @@ index_fetch_heap(IndexScanDesc scan) * enough information to do it efficiently in the general case. * ---------------- */ -HeapTuple -index_getnext(IndexScanDesc scan, ScanDirection direction) +bool +index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot) { - HeapTuple heapTuple; - ItemPointer tid; - for (;;) { - if (scan->xs_continue_hot) - { - /* - * We are resuming scan of a HOT chain after having returned an - * earlier member. Must still hold pin on current heap page. - */ - Assert(BufferIsValid(scan->xs_cbuf)); - Assert(ItemPointerGetBlockNumber(&scan->xs_ctup.t_self) == - BufferGetBlockNumber(scan->xs_cbuf)); - } - else + if (!scan->xs_heap_continue) { + ItemPointer tid; + /* Time to fetch the next TID from the index */ tid = index_getnext_tid(scan, direction); /* If we're out of index entries, we're done */ if (tid == NULL) break; + + Assert(ItemPointerEquals(tid, &scan->xs_heaptid)); } /* @@ -689,12 +653,12 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) * If we don't find anything, loop around and grab the next TID from * the index. */ - heapTuple = index_fetch_heap(scan); - if (heapTuple != NULL) - return heapTuple; + Assert(ItemPointerIsValid(&scan->xs_heaptid)); + if (index_fetch_heap(scan, slot)) + return true; } - return NULL; /* failure exit */ + return false; } /* ---------------- diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 98917de2efd..60e0b90ccf2 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -310,7 +310,7 @@ btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) if (_bt_first(scan, ForwardScanDirection)) { /* Save tuple ID, and continue scanning */ - heapTid = &scan->xs_ctup.t_self; + heapTid = &scan->xs_heaptid; tbm_add_tuples(tbm, heapTid, 1, false); ntids++; diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 92832237a8b..af3da3aa5b6 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -1135,7 +1135,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; - scan->xs_ctup.t_self = currItem->heapTid; + scan->xs_heaptid = currItem->heapTid; if (scan->xs_want_itup) scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset); @@ -1185,7 +1185,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; - scan->xs_ctup.t_self = currItem->heapTid; + scan->xs_heaptid = currItem->heapTid; if (scan->xs_want_itup) scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset); @@ -1964,7 +1964,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; - scan->xs_ctup.t_self = currItem->heapTid; + scan->xs_heaptid = currItem->heapTid; if (scan->xs_want_itup) scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index dc398e11867..e37cbac7b3c 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -61,6 +61,7 @@ #include "access/nbtree.h" #include "access/parallel.h" #include "access/relscan.h" +#include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" @@ -158,9 +159,9 @@ typedef struct BTShared /* * This variable-sized field must come last. * - * See _bt_parallel_estimate_shared() and heap_parallelscan_estimate(). + * See _bt_parallel_estimate_shared() and table_parallelscan_estimate(). */ - ParallelHeapScanDescData heapdesc; + ParallelTableScanDescData heapdesc; } BTShared; /* @@ -282,7 +283,7 @@ static void _bt_load(BTWriteState *wstate, static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request); static void _bt_end_parallel(BTLeader *btleader); -static Size _bt_parallel_estimate_shared(Snapshot snapshot); +static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain); static void _bt_leader_participate_as_worker(BTBuildState *buildstate); @@ -1275,7 +1276,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and * PARALLEL_KEY_TUPLESORT tuplesort workspace */ - estbtshared = _bt_parallel_estimate_shared(snapshot); + estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot); shm_toc_estimate_chunk(&pcxt->estimator, estbtshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1316,7 +1317,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->havedead = false; btshared->indtuples = 0.0; btshared->brokenhotchain = false; - heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot); + table_parallelscan_initialize(btspool->heap, &btshared->heapdesc, + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1403,10 +1405,10 @@ _bt_end_parallel(BTLeader *btleader) * btree index build based on the snapshot its parallel scan will use. */ static Size -_bt_parallel_estimate_shared(Snapshot snapshot) +_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot) { return add_size(offsetof(BTShared, heapdesc), - heap_parallelscan_estimate(snapshot)); + table_parallelscan_estimate(heap, snapshot)); } /* @@ -1617,7 +1619,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, { SortCoordinate coordinate; BTBuildState buildstate; - HeapScanDesc scan; + TableScanDesc scan; double reltuples; IndexInfo *indexInfo; @@ -1670,7 +1672,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); indexInfo->ii_Concurrent = btshared->isconcurrent; - scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc); + scan = table_beginscan_parallel(btspool->heap, &btshared->heapdesc); reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo, true, _bt_build_callback, (void *) &buildstate, scan); diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index dc0d63924db..9365bc57ad5 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -927,7 +927,7 @@ spggettuple(IndexScanDesc scan, ScanDirection dir) if (so->iPtr < so->nPtrs) { /* continuing to return reported tuples */ - scan->xs_ctup.t_self = so->heapPtrs[so->iPtr]; + scan->xs_heaptid = so->heapPtrs[so->iPtr]; scan->xs_recheck = so->recheck[so->iPtr]; scan->xs_hitup = so->reconTups[so->iPtr]; diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 84851e4ff88..628d930c130 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -6,13 +6,304 @@ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * src/backend/access/table/tableam.c + * + * IDENTIFICATION + * src/backend/access/table/tableam.c + * + * NOTES + * Note that most function in here are documented in tableam.h, rather than + * here. That's because there's a lot of inline functions in tableam.h and + * it'd be harder to understand if one constantly had to switch between files. + * *---------------------------------------------------------------------- */ #include "postgres.h" +#include "access/heapam.h" /* for ss_* */ #include "access/tableam.h" +#include "access/xact.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; +bool synchronize_seqscans = true; + + +/* ---------------------------------------------------------------------------- + * Slot functions. + * ---------------------------------------------------------------------------- + */ + +const TupleTableSlotOps * +table_slot_callbacks(Relation relation) +{ + const TupleTableSlotOps *tts_cb; + + if (relation->rd_tableam) + tts_cb = relation->rd_tableam->slot_callbacks(relation); + else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + { + /* + * Historically FDWs expect to store heap tuples in slots. Continue + * handing them one, to make it less painful to adapt FDWs to new + * versions. The cost of a heap slot over a virtual slot is pretty + * small. + */ + tts_cb = &TTSOpsHeapTuple; + } + else + { + /* + * These need to be supported, as some parts of the code (like COPY) + * need to create slots for such relations too. It seems better to + * centralize the knowledge that a heap slot is the right thing in + * that case here. + */ + Assert(relation->rd_rel->relkind == RELKIND_VIEW || + relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + tts_cb = &TTSOpsVirtual; + } + + return tts_cb; +} + +TupleTableSlot * +table_slot_create(Relation relation, List **reglist) +{ + const TupleTableSlotOps *tts_cb; + TupleTableSlot *slot; + + tts_cb = table_slot_callbacks(relation); + slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb); + + if (reglist) + *reglist = lappend(*reglist, slot); + + return slot; +} + + +/* ---------------------------------------------------------------------------- + * Table scan functions. + * ---------------------------------------------------------------------------- + */ + +TableScanDesc +table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key) +{ + Oid relid = RelationGetRelid(relation); + Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); + + return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key, NULL, + true, true, true, false, false, true); +} + +void +table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot) +{ + Assert(IsMVCCSnapshot(snapshot)); + + RegisterSnapshot(snapshot); + scan->rs_snapshot = snapshot; + scan->rs_temp_snap = true; +} + + +/* ---------------------------------------------------------------------------- + * Parallel table scan related functions. + * ---------------------------------------------------------------------------- + */ + +Size +table_parallelscan_estimate(Relation rel, Snapshot snapshot) +{ + Size sz = 0; + + if (IsMVCCSnapshot(snapshot)) + sz = add_size(sz, EstimateSnapshotSpace(snapshot)); + else + Assert(snapshot == SnapshotAny); + + sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); + + return sz; +} + +void +table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, + Snapshot snapshot) +{ + Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); + + pscan->phs_snapshot_off = snapshot_off; + + if (IsMVCCSnapshot(snapshot)) + { + SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); + pscan->phs_snapshot_any = false; + } + else + { + Assert(snapshot == SnapshotAny); + pscan->phs_snapshot_any = true; + } +} + +TableScanDesc +table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + + if (!parallel_scan->phs_snapshot_any) + { + /* Snapshot was serialized -- restore it */ + snapshot = RestoreSnapshot((char *) parallel_scan + + parallel_scan->phs_snapshot_off); + RegisterSnapshot(snapshot); + } + else + { + /* SnapshotAny passed by caller (not serialized) */ + snapshot = SnapshotAny; + } + + return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, !parallel_scan->phs_snapshot_any); +} + + +/* ---------------------------------------------------------------------------- + * Helper functions to implement parallel scans for block oriented AMs. + * ---------------------------------------------------------------------------- + */ + +Size +table_block_parallelscan_estimate(Relation rel) +{ + return sizeof(ParallelBlockTableScanDescData); +} + +Size +table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) +{ + ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; + + bpscan->base.phs_relid = RelationGetRelid(rel); + bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel); + /* compare phs_syncscan initialization to similar logic in initscan */ + bpscan->base.phs_syncscan = synchronize_seqscans && + !RelationUsesLocalBuffers(rel) && + bpscan->phs_nblocks > NBuffers / 4; + SpinLockInit(&bpscan->phs_mutex); + bpscan->phs_startblock = InvalidBlockNumber; + pg_atomic_init_u64(&bpscan->phs_nallocated, 0); + + return sizeof(ParallelBlockTableScanDescData); +} + +void +table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) +{ + ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; + + pg_atomic_write_u64(&bpscan->phs_nallocated, 0); +} + +/* + * find and set the scan's startblock + * + * Determine where the parallel seq scan should start. This function may be + * called many times, once by each parallel worker. We must be careful only + * to set the startblock once. + */ +void +table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +{ + BlockNumber sync_startpage = InvalidBlockNumber; + +retry: + /* Grab the spinlock. */ + SpinLockAcquire(&pbscan->phs_mutex); + + /* + * If the scan's startblock has not yet been initialized, we must do so + * now. If this is not a synchronized scan, we just start at block 0, but + * if it is a synchronized scan, we must get the starting position from + * the synchronized scan machinery. We can't hold the spinlock while + * doing that, though, so release the spinlock, get the information we + * need, and retry. If nobody else has initialized the scan in the + * meantime, we'll fill in the value we fetched on the second time + * through. + */ + if (pbscan->phs_startblock == InvalidBlockNumber) + { + if (!pbscan->base.phs_syncscan) + pbscan->phs_startblock = 0; + else if (sync_startpage != InvalidBlockNumber) + pbscan->phs_startblock = sync_startpage; + else + { + SpinLockRelease(&pbscan->phs_mutex); + sync_startpage = ss_get_location(rel, pbscan->phs_nblocks); + goto retry; + } + } + SpinLockRelease(&pbscan->phs_mutex); +} + +/* + * get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the first + * backend gets an InvalidBlockNumber return. + */ +BlockNumber +table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +{ + BlockNumber page; + uint64 nallocated; + + /* + * phs_nallocated tracks how many pages have been allocated to workers + * already. When phs_nallocated >= rs_nblocks, all blocks have been + * allocated. + * + * Because we use an atomic fetch-and-add to fetch the current value, the + * phs_nallocated counter will exceed rs_nblocks, because workers will + * still increment the value, when they try to allocate the next block but + * all blocks have been allocated already. The counter must be 64 bits + * wide because of that, to avoid wrapping around when rs_nblocks is close + * to 2^32. + * + * The actual page to return is calculated by adding the counter to the + * starting block number, modulo nblocks. + */ + nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + if (nallocated >= pbscan->phs_nblocks) + page = InvalidBlockNumber; /* all blocks have been allocated */ + else + page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks; + + /* + * Report scan location. Normally, we report the current page number. + * When we reach the end of the scan, though, we report the starting page, + * not the ending page, just so the starting positions for later scans + * doesn't slew backwards. We only report the position at the end of the + * scan once, though: subsequent callers will report nothing. + */ + if (pbscan->base.phs_syncscan) + { + if (page != InvalidBlockNumber) + ss_report_location(rel, page); + else if (nallocated == pbscan->phs_nblocks) + ss_report_location(rel, pbscan->phs_startblock); + } + + return page; +} diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 54a078d68aa..3d3b82e1e58 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -44,6 +44,26 @@ GetTableAmRoutine(Oid amhandler) elog(ERROR, "Table access method handler %u did not return a TableAmRoutine struct", amhandler); + /* + * Assert that all required callbacks are present. That makes it a bit + * easier to keep AMs up to date, e.g. when forward porting them to a new + * major version. + */ + Assert(routine->scan_begin != NULL); + Assert(routine->scan_end != NULL); + Assert(routine->scan_rescan != NULL); + + Assert(routine->parallelscan_estimate != NULL); + Assert(routine->parallelscan_initialize != NULL); + Assert(routine->parallelscan_reinitialize != NULL); + + Assert(routine->index_fetch_begin != NULL); + Assert(routine->index_fetch_reset != NULL); + Assert(routine->index_fetch_end != NULL); + Assert(routine->index_fetch_tuple != NULL); + + Assert(routine->tuple_satisfies_snapshot != NULL); + return routine; } @@ -98,7 +118,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok) { Oid result; Relation rel; - HeapScanDesc scandesc; + TableScanDesc scandesc; HeapTuple tuple; ScanKeyData entry[1]; @@ -113,7 +133,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok) Anum_pg_am_amname, BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(tableamname)); - scandesc = heap_beginscan_catalog(rel, 1, entry); + scandesc = table_beginscan_catalog(rel, 1, entry); tuple = heap_getnext(scandesc, ForwardScanDirection); /* We assume that there can be at most one matching tuple */ @@ -123,7 +143,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok) else result = InvalidOid; - heap_endscan(scandesc); + table_endscan(scandesc); heap_close(rel, AccessShareLock); if (!OidIsValid(result) && !missing_ok) diff --git a/src/backend/access/tablesample/system.c b/src/backend/access/tablesample/system.c index fb1a5634244..26f7de3e45d 100644 --- a/src/backend/access/tablesample/system.c +++ b/src/backend/access/tablesample/system.c @@ -180,7 +180,8 @@ static BlockNumber system_nextsampleblock(SampleScanState *node) { SystemSamplerData *sampler = (SystemSamplerData *) node->tsm_state; - HeapScanDesc scan = node->ss.ss_currentScanDesc; + TableScanDesc scan = node->ss.ss_currentScanDesc; + HeapScanDesc hscan = (HeapScanDesc) scan; BlockNumber nextblock = sampler->nextblock; uint32 hashinput[2]; @@ -199,7 +200,7 @@ system_nextsampleblock(SampleScanState *node) * Loop over block numbers until finding suitable block or reaching end of * relation. */ - for (; nextblock < scan->rs_nblocks; nextblock++) + for (; nextblock < hscan->rs_nblocks; nextblock++) { uint32 hash; @@ -211,7 +212,7 @@ system_nextsampleblock(SampleScanState *node) break; } - if (nextblock < scan->rs_nblocks) + if (nextblock < hscan->rs_nblocks) { /* Found a suitable block; remember where we should start next time */ sampler->nextblock = nextblock + 1; |