Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/gist/gistget.c4
-rw-r--r--src/backend/access/hash/hashsearch.c4
-rw-r--r--src/backend/access/heap/heapam.c650
-rw-r--r--src/backend/access/heap/heapam_handler.c166
-rw-r--r--src/backend/access/index/genam.c110
-rw-r--r--src/backend/access/index/indexam.c164
-rw-r--r--src/backend/access/nbtree/nbtree.c2
-rw-r--r--src/backend/access/nbtree/nbtsearch.c6
-rw-r--r--src/backend/access/nbtree/nbtsort.c20
-rw-r--r--src/backend/access/spgist/spgscan.c2
-rw-r--r--src/backend/access/table/tableam.c293
-rw-r--r--src/backend/access/table/tableamapi.c26
-rw-r--r--src/backend/access/tablesample/system.c7
13 files changed, 836 insertions, 618 deletions
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 156b9d699f7..8108fbb7d8e 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -561,7 +561,7 @@ getNextNearest(IndexScanDesc scan)
if (GISTSearchItemIsHeap(*item))
{
/* found a heap item at currently minimal distance */
- scan->xs_ctup.t_self = item->data.heap.heapPtr;
+ scan->xs_heaptid = item->data.heap.heapPtr;
scan->xs_recheck = item->data.heap.recheck;
index_store_float8_orderby_distances(scan, so->orderByTypes,
@@ -650,7 +650,7 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
so->pageData[so->curPageData - 1].offnum;
}
/* continuing to return tuples from a leaf page */
- scan->xs_ctup.t_self = so->pageData[so->curPageData].heapPtr;
+ scan->xs_heaptid = so->pageData[so->curPageData].heapPtr;
scan->xs_recheck = so->pageData[so->curPageData].recheck;
/* in an index-only scan, also return the reconstructed tuple */
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index ccd3fdceac0..61c90e6bb78 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -119,7 +119,7 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
- scan->xs_ctup.t_self = currItem->heapTid;
+ scan->xs_heaptid = currItem->heapTid;
return true;
}
@@ -432,7 +432,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
- scan->xs_ctup.t_self = currItem->heapTid;
+ scan->xs_heaptid = currItem->heapTid;
/* if we're here, _hash_readpage found a valid tuples */
return true;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dc3499349b6..3c8a5da0bc8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -41,6 +41,7 @@
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/sysattr.h"
+#include "access/tableam.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
@@ -68,22 +69,6 @@
#include "utils/snapmgr.h"
-/* GUC variable */
-bool synchronize_seqscans = true;
-
-
-static HeapScanDesc heap_beginscan_internal(Relation relation,
- Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap);
-static void heap_parallelscan_startblock_init(HeapScanDesc scan);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -207,6 +192,7 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
static void
initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
{
+ ParallelBlockTableScanDesc bpscan = NULL;
bool allow_strat;
bool allow_sync;
@@ -221,10 +207,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- if (scan->rs_parallel != NULL)
- scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ scan->rs_nblocks = bpscan->phs_nblocks;
+ }
else
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_base.rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
@@ -238,11 +227,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* Note that heap_parallelscan_initialize has a very similar test; if you
* change this, consider changing that one, too.
*/
- if (!RelationUsesLocalBuffers(scan->rs_rd) &&
+ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
{
- allow_strat = scan->rs_allow_strat;
- allow_sync = scan->rs_allow_sync;
+ allow_strat = scan->rs_base.rs_allow_strat;
+ allow_sync = scan->rs_base.rs_allow_sync;
}
else
allow_strat = allow_sync = false;
@@ -260,10 +249,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_strategy = NULL;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
- scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ /* For parallel scan, believe whatever ParallelTableScanDesc says. */
+ scan->rs_base.rs_syncscan = scan->rs_base.rs_parallel->phs_syncscan;
}
else if (keep_startblock)
{
@@ -272,16 +261,16 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* so that rewinding a cursor doesn't generate surprising results.
* Reset the active syncscan setting, though.
*/
- scan->rs_syncscan = (allow_sync && synchronize_seqscans);
+ scan->rs_base.rs_syncscan = (allow_sync && synchronize_seqscans);
}
else if (allow_sync && synchronize_seqscans)
{
- scan->rs_syncscan = true;
- scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ scan->rs_base.rs_syncscan = true;
+ scan->rs_startblock = ss_get_location(scan->rs_base.rs_rd, scan->rs_nblocks);
}
else
{
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
scan->rs_startblock = 0;
}
@@ -298,15 +287,15 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* copy the scan key, if appropriate
*/
if (key != NULL)
- memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
+ memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData));
/*
* Currently, we don't have a stats counter for bitmap heap scans (but the
* underlying bitmap index scans will be counted) or sample scans (we only
* update stats for tuple fetches there)
*/
- if (!scan->rs_bitmapscan && !scan->rs_samplescan)
- pgstat_count_heap_scan(scan->rs_rd);
+ if (!scan->rs_base.rs_bitmapscan && !scan->rs_base.rs_samplescan)
+ pgstat_count_heap_scan(scan->rs_base.rs_rd);
}
/*
@@ -316,10 +305,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* numBlks is number of pages to scan (InvalidBlockNumber means "all")
*/
void
-heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
+heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
Assert(!scan->rs_inited); /* else too late to change */
- Assert(!scan->rs_syncscan); /* else rs_startblock is significant */
+ Assert(!scan->rs_base.rs_syncscan); /* else rs_startblock is significant */
/* Check startBlk is valid (but allow case of zero blocks...) */
Assert(startBlk == 0 || startBlk < scan->rs_nblocks);
@@ -336,8 +327,9 @@ heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
* which tuples on the page are visible.
*/
void
-heapgetpage(HeapScanDesc scan, BlockNumber page)
+heapgetpage(TableScanDesc sscan, BlockNumber page)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
Buffer buffer;
Snapshot snapshot;
Page dp;
@@ -364,20 +356,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
CHECK_FOR_INTERRUPTS();
/* read page using selected strategy */
- scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
+ scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
RBM_NORMAL, scan->rs_strategy);
scan->rs_cblock = page;
- if (!scan->rs_pageatatime)
+ if (!scan->rs_base.rs_pageatatime)
return;
buffer = scan->rs_cbuf;
- snapshot = scan->rs_snapshot;
+ snapshot = scan->rs_base.rs_snapshot;
/*
* Prune and repair fragmentation for the whole page, if possible.
*/
- heap_page_prune_opt(scan->rs_rd, buffer);
+ heap_page_prune_opt(scan->rs_base.rs_rd, buffer);
/*
* We must hold share lock on the buffer content while examining tuple
@@ -387,7 +379,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
LockBuffer(buffer, BUFFER_LOCK_SHARE);
dp = BufferGetPage(buffer);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
@@ -422,7 +414,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
HeapTupleData loctup;
bool valid;
- loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
+ loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
@@ -432,8 +424,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
- buffer, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ &loctup, buffer, snapshot);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
@@ -476,7 +468,7 @@ heapgettup(HeapScanDesc scan,
ScanKey key)
{
HeapTuple tuple = &(scan->rs_ctup);
- Snapshot snapshot = scan->rs_snapshot;
+ Snapshot snapshot = scan->rs_base.rs_snapshot;
bool backward = ScanDirectionIsBackward(dir);
BlockNumber page;
bool finished;
@@ -502,11 +494,16 @@ heapgettup(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
- page = heap_parallelscan_nextpage(scan);
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -518,7 +515,7 @@ heapgettup(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
@@ -533,7 +530,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
@@ -542,7 +539,7 @@ heapgettup(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -562,13 +559,13 @@ heapgettup(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -579,7 +576,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
if (!scan->rs_inited)
@@ -610,11 +607,11 @@ heapgettup(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -649,11 +646,12 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
- CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
- scan->rs_cbuf, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ tuple, scan->rs_cbuf,
+ snapshot);
if (valid && key != NULL)
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
@@ -696,9 +694,13 @@ heapgettup(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -721,8 +723,8 @@ heapgettup(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -739,12 +741,12 @@ heapgettup(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines;
if (backward)
@@ -806,11 +808,16 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
- page = heap_parallelscan_nextpage(scan);
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -822,7 +829,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineindex = 0;
scan->rs_inited = true;
}
@@ -834,7 +841,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
@@ -843,7 +850,7 @@ heapgettup_pagemode(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -863,13 +870,13 @@ heapgettup_pagemode(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -878,7 +885,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
@@ -908,11 +915,11 @@ heapgettup_pagemode(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -950,7 +957,7 @@ heapgettup_pagemode(HeapScanDesc scan,
{
bool valid;
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
{
@@ -986,9 +993,13 @@ heapgettup_pagemode(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -1011,8 +1022,8 @@ heapgettup_pagemode(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -1029,10 +1040,10 @@ heapgettup_pagemode(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
linesleft = lines;
if (backward)
@@ -1095,86 +1106,16 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
*/
-/* ----------------
- * heap_beginscan - begin relation scan
- *
- * heap_beginscan is the "standard" case.
- *
- * heap_beginscan_catalog differs in setting up its own temporary snapshot.
- *
- * heap_beginscan_strat offers an extended API that lets the caller control
- * whether a nondefault buffer access strategy can be used, and whether
- * syncscan can be chosen (possibly resulting in the scan not starting from
- * block zero). Both of these default to true with plain heap_beginscan.
- *
- * heap_beginscan_bm is an alternative entry point for setting up a
- * HeapScanDesc for a bitmap heap scan. Although that scan technology is
- * really quite unlike a standard seqscan, there is just enough commonality
- * to make it worth using the same data structure.
- *
- * heap_beginscan_sampling is an alternative entry point for setting up a
- * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth
- * using the same data structure although the behavior is rather different.
- * In addition to the options offered by heap_beginscan_strat, this call
- * also allows control of whether page-mode visibility checking is used.
- * ----------------
- */
-HeapScanDesc
+TableScanDesc
heap_beginscan(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
-{
- Oid relid = RelationGetRelid(relation);
- Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
-
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, true);
-}
-
-HeapScanDesc
-heap_beginscan_strat(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, true,
- false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_bm(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- false, false, true, true, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_sampling(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, allow_pagemode,
- false, true, false);
-}
-
-static HeapScanDesc
-heap_beginscan_internal(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap)
+ int nkeys, ScanKey key,
+ ParallelTableScanDesc parallel_scan,
+ bool allow_strat,
+ bool allow_sync,
+ bool allow_pagemode,
+ bool is_bitmapscan,
+ bool is_samplescan,
+ bool temp_snap)
{
HeapScanDesc scan;
@@ -1192,21 +1133,22 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
*/
scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
- scan->rs_rd = relation;
- scan->rs_snapshot = snapshot;
- scan->rs_nkeys = nkeys;
- scan->rs_bitmapscan = is_bitmapscan;
- scan->rs_samplescan = is_samplescan;
+ scan->rs_base.rs_rd = relation;
+ scan->rs_base.rs_snapshot = snapshot;
+ scan->rs_base.rs_nkeys = nkeys;
+ scan->rs_base.rs_bitmapscan = is_bitmapscan;
+ scan->rs_base.rs_samplescan = is_samplescan;
scan->rs_strategy = NULL; /* set in initscan */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_temp_snap = temp_snap;
- scan->rs_parallel = parallel_scan;
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_temp_snap = temp_snap;
+ scan->rs_base.rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
*/
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot);
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && snapshot && IsMVCCSnapshot(snapshot);
/*
* For a seqscan in a serializable transaction, acquire a predicate lock
@@ -1230,23 +1172,29 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
* initscan() and we don't want to allocate memory again
*/
if (nkeys > 0)
- scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
+ scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
else
- scan->rs_key = NULL;
+ scan->rs_base.rs_key = NULL;
initscan(scan, key, false);
- return scan;
+ return (TableScanDesc) scan;
}
-/* ----------------
- * heap_rescan - restart a relation scan
- * ----------------
- */
void
-heap_rescan(HeapScanDesc scan,
- ScanKey key)
+heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
+ bool allow_strat, bool allow_sync, bool allow_pagemode)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ if (set_params)
+ {
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && IsMVCCSnapshot(scan->rs_base.rs_snapshot);
+ }
+
/*
* unpin scan buffers
*/
@@ -1259,37 +1207,11 @@ heap_rescan(HeapScanDesc scan,
initscan(scan, key, true);
}
-/* ----------------
- * heap_rescan_set_params - restart a relation scan after changing params
- *
- * This call allows changing the buffer strategy, syncscan, and pagemode
- * options before starting a fresh scan. Note that although the actual use
- * of syncscan might change (effectively, enabling or disabling reporting),
- * the previously selected startblock will be kept.
- * ----------------
- */
void
-heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
+heap_endscan(TableScanDesc sscan)
{
- /* adjust parameters */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot);
- /* ... and rescan */
- heap_rescan(scan, key);
-}
+ HeapScanDesc scan = (HeapScanDesc) sscan;
-/* ----------------
- * heap_endscan - end relation scan
- *
- * See how to integrate with index scans.
- * Check handling if reldesc caching.
- * ----------------
- */
-void
-heap_endscan(HeapScanDesc scan)
-{
/* Note: no locking manipulations needed */
/*
@@ -1301,246 +1223,20 @@ heap_endscan(HeapScanDesc scan)
/*
* decrement relation reference count and free scan descriptor storage
*/
- RelationDecrementReferenceCount(scan->rs_rd);
+ RelationDecrementReferenceCount(scan->rs_base.rs_rd);
- if (scan->rs_key)
- pfree(scan->rs_key);
+ if (scan->rs_base.rs_key)
+ pfree(scan->rs_base.rs_key);
if (scan->rs_strategy != NULL)
FreeAccessStrategy(scan->rs_strategy);
- if (scan->rs_temp_snap)
- UnregisterSnapshot(scan->rs_snapshot);
+ if (scan->rs_base.rs_temp_snap)
+ UnregisterSnapshot(scan->rs_base.rs_snapshot);
pfree(scan);
}
-/* ----------------
- * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
- *
- * Sadly, this doesn't reduce to a constant, because the size required
- * to serialize the snapshot can vary.
- * ----------------
- */
-Size
-heap_parallelscan_estimate(Snapshot snapshot)
-{
- Size sz = offsetof(ParallelHeapScanDescData, phs_snapshot_data);
-
- if (IsMVCCSnapshot(snapshot))
- sz = add_size(sz, EstimateSnapshotSpace(snapshot));
- else
- Assert(snapshot == SnapshotAny);
-
- return sz;
-}
-
-/* ----------------
- * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
- *
- * Must allow as many bytes of shared memory as returned by
- * heap_parallelscan_estimate. Call this just once in the leader
- * process; then, individual workers attach via heap_beginscan_parallel.
- * ----------------
- */
-void
-heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
- Snapshot snapshot)
-{
- target->phs_relid = RelationGetRelid(relation);
- target->phs_nblocks = RelationGetNumberOfBlocks(relation);
- /* compare phs_syncscan initialization to similar logic in initscan */
- target->phs_syncscan = synchronize_seqscans &&
- !RelationUsesLocalBuffers(relation) &&
- target->phs_nblocks > NBuffers / 4;
- SpinLockInit(&target->phs_mutex);
- target->phs_startblock = InvalidBlockNumber;
- pg_atomic_init_u64(&target->phs_nallocated, 0);
- if (IsMVCCSnapshot(snapshot))
- {
- SerializeSnapshot(snapshot, target->phs_snapshot_data);
- target->phs_snapshot_any = false;
- }
- else
- {
- Assert(snapshot == SnapshotAny);
- target->phs_snapshot_any = true;
- }
-}
-
-/* ----------------
- * heap_parallelscan_reinitialize - reset a parallel scan
- *
- * Call this in the leader process. Caller is responsible for
- * making sure that all workers have finished the scan beforehand.
- * ----------------
- */
-void
-heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
-{
- pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
-}
-
-/* ----------------
- * heap_beginscan_parallel - join a parallel scan
- *
- * Caller must hold a suitable lock on the correct relation.
- * ----------------
- */
-HeapScanDesc
-heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
-{
- Snapshot snapshot;
-
- Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
-
- if (!parallel_scan->phs_snapshot_any)
- {
- /* Snapshot was serialized -- restore it */
- snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
- RegisterSnapshot(snapshot);
- }
- else
- {
- /* SnapshotAny passed by caller (not serialized) */
- snapshot = SnapshotAny;
- }
-
- return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
- true, true, true, false, false,
- !parallel_scan->phs_snapshot_any);
-}
-
-/* ----------------
- * heap_parallelscan_startblock_init - find and set the scan's startblock
- *
- * Determine where the parallel seq scan should start. This function may
- * be called many times, once by each parallel worker. We must be careful
- * only to set the startblock once.
- * ----------------
- */
-static void
-heap_parallelscan_startblock_init(HeapScanDesc scan)
-{
- BlockNumber sync_startpage = InvalidBlockNumber;
- ParallelHeapScanDesc parallel_scan;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
-retry:
- /* Grab the spinlock. */
- SpinLockAcquire(&parallel_scan->phs_mutex);
-
- /*
- * If the scan's startblock has not yet been initialized, we must do so
- * now. If this is not a synchronized scan, we just start at block 0, but
- * if it is a synchronized scan, we must get the starting position from
- * the synchronized scan machinery. We can't hold the spinlock while
- * doing that, though, so release the spinlock, get the information we
- * need, and retry. If nobody else has initialized the scan in the
- * meantime, we'll fill in the value we fetched on the second time
- * through.
- */
- if (parallel_scan->phs_startblock == InvalidBlockNumber)
- {
- if (!parallel_scan->phs_syncscan)
- parallel_scan->phs_startblock = 0;
- else if (sync_startpage != InvalidBlockNumber)
- parallel_scan->phs_startblock = sync_startpage;
- else
- {
- SpinLockRelease(&parallel_scan->phs_mutex);
- sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
- goto retry;
- }
- }
- SpinLockRelease(&parallel_scan->phs_mutex);
-}
-
-/* ----------------
- * heap_parallelscan_nextpage - get the next page to scan
- *
- * Get the next page to scan. Even if there are no pages left to scan,
- * another backend could have grabbed a page to scan and not yet finished
- * looking at it, so it doesn't follow that the scan is done when the
- * first backend gets an InvalidBlockNumber return.
- * ----------------
- */
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
-{
- BlockNumber page;
- ParallelHeapScanDesc parallel_scan;
- uint64 nallocated;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
- /*
- * phs_nallocated tracks how many pages have been allocated to workers
- * already. When phs_nallocated >= rs_nblocks, all blocks have been
- * allocated.
- *
- * Because we use an atomic fetch-and-add to fetch the current value, the
- * phs_nallocated counter will exceed rs_nblocks, because workers will
- * still increment the value, when they try to allocate the next block but
- * all blocks have been allocated already. The counter must be 64 bits
- * wide because of that, to avoid wrapping around when rs_nblocks is close
- * to 2^32.
- *
- * The actual page to return is calculated by adding the counter to the
- * starting block number, modulo nblocks.
- */
- nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
- if (nallocated >= scan->rs_nblocks)
- page = InvalidBlockNumber; /* all blocks have been allocated */
- else
- page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
-
- /*
- * Report scan location. Normally, we report the current page number.
- * When we reach the end of the scan, though, we report the starting page,
- * not the ending page, just so the starting positions for later scans
- * doesn't slew backwards. We only report the position at the end of the
- * scan once, though: subsequent callers will report nothing.
- */
- if (scan->rs_syncscan)
- {
- if (page != InvalidBlockNumber)
- ss_report_location(scan->rs_rd, page);
- else if (nallocated == scan->rs_nblocks)
- ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
- }
-
- return page;
-}
-
-/* ----------------
- * heap_update_snapshot
- *
- * Update snapshot info in heap scan descriptor.
- * ----------------
- */
-void
-heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
-{
- Assert(IsMVCCSnapshot(snapshot));
-
- RegisterSnapshot(snapshot);
- scan->rs_snapshot = snapshot;
- scan->rs_temp_snap = true;
-}
-
-/* ----------------
- * heap_getnext - retrieve next tuple in scan
- *
- * Fix to work with index relations.
- * We don't return the buffer anymore, but you can get it from the
- * returned HeapTuple.
- * ----------------
- */
-
#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
@@ -1557,17 +1253,32 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
HeapTuple
-heap_getnext(HeapScanDesc scan, ScanDirection direction)
+heap_getnext(TableScanDesc sscan, ScanDirection direction)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /*
+ * This is still widely used directly, without going through table AM, so
+ * add a safety check. It's possible we should, at a later point,
+ * downgrade this to an assert. The reason for checking the AM routine,
+ * rather than the AM oid, is that this allows to write regression tests
+ * that create another AM reusing the heap handler.
+ */
+ if (unlikely(sscan->rs_rd->rd_tableam != GetHeapamTableAmRoutine()))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("only heap AM is supported")));
+
/* Note: no locking manipulations needed */
HEAPDEBUG_1; /* heap_getnext( info ) */
- if (scan->rs_pageatatime)
+ if (scan->rs_base.rs_pageatatime)
heapgettup_pagemode(scan, direction,
- scan->rs_nkeys, scan->rs_key);
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
else
- heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);
+ heapgettup(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
if (scan->rs_ctup.t_data == NULL)
{
@@ -1581,9 +1292,58 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
*/
HEAPDEBUG_3; /* heap_getnext returning tuple */
- pgstat_count_heap_getnext(scan->rs_rd);
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ return &scan->rs_ctup;
+}
+
+#ifdef HEAPAMSLOTDEBUGALL
+#define HEAPAMSLOTDEBUG_1 \
+ elog(DEBUG2, "heapam_getnextslot([%s,nkeys=%d],dir=%d) called", \
+ RelationGetRelationName(scan->rs_base.rs_rd), scan->rs_base.rs_nkeys, (int) direction)
+#define HEAPAMSLOTDEBUG_2 \
+ elog(DEBUG2, "heapam_getnextslot returning EOS")
+#define HEAPAMSLOTDEBUG_3 \
+ elog(DEBUG2, "heapam_getnextslot returning tuple")
+#else
+#define HEAPAMSLOTDEBUG_1
+#define HEAPAMSLOTDEBUG_2
+#define HEAPAMSLOTDEBUG_3
+#endif
+
+bool
+heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
+{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /* Note: no locking manipulations needed */
+
+ HEAPAMSLOTDEBUG_1; /* heap_getnextslot( info ) */
+
+ if (scan->rs_base.rs_pageatatime)
+ heapgettup_pagemode(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
+ else
+ heapgettup(scan, direction, scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
- return &(scan->rs_ctup);
+ if (scan->rs_ctup.t_data == NULL)
+ {
+ HEAPAMSLOTDEBUG_2; /* heap_getnextslot returning EOS */
+ ExecClearTuple(slot);
+ return false;
+ }
+
+ /*
+ * if we get here it means we have a new current scan tuple, so point to
+ * the proper return buffer and return the tuple.
+ */
+ HEAPAMSLOTDEBUG_3; /* heap_getnextslot returning tuple */
+
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ ExecStoreBufferHeapTuple(&scan->rs_ctup, slot,
+ scan->rs_cbuf);
+ return true;
}
/*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 518d1df84a1..6a26fcef94c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -19,15 +19,181 @@
*/
#include "postgres.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "storage/bufmgr.h"
#include "utils/builtins.h"
static const TableAmRoutine heapam_methods;
+/* ------------------------------------------------------------------------
+ * Slot related callbacks for heap AM
+ * ------------------------------------------------------------------------
+ */
+
+static const TupleTableSlotOps *
+heapam_slot_callbacks(Relation relation)
+{
+ return &TTSOpsBufferHeapTuple;
+}
+
+
+/* ------------------------------------------------------------------------
+ * Index Scan Callbacks for heap AM
+ * ------------------------------------------------------------------------
+ */
+
+static IndexFetchTableData *
+heapam_index_fetch_begin(Relation rel)
+{
+ IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData));
+
+ hscan->xs_base.rel = rel;
+ hscan->xs_cbuf = InvalidBuffer;
+
+ return &hscan->xs_base;
+}
+
+static void
+heapam_index_fetch_reset(IndexFetchTableData *scan)
+{
+ IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
+
+ if (BufferIsValid(hscan->xs_cbuf))
+ {
+ ReleaseBuffer(hscan->xs_cbuf);
+ hscan->xs_cbuf = InvalidBuffer;
+ }
+}
+
+static void
+heapam_index_fetch_end(IndexFetchTableData *scan)
+{
+ IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
+
+ heapam_index_fetch_reset(scan);
+
+ pfree(hscan);
+}
+
+static bool
+heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
+ ItemPointer tid,
+ Snapshot snapshot,
+ TupleTableSlot *slot,
+ bool *call_again, bool *all_dead)
+{
+ IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
+ BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+ bool got_heap_tuple;
+
+ Assert(TTS_IS_BUFFERTUPLE(slot));
+
+ /* We can skip the buffer-switching logic if we're in mid-HOT chain. */
+ if (!*call_again)
+ {
+ /* Switch to correct buffer if we don't have it already */
+ Buffer prev_buf = hscan->xs_cbuf;
+
+ hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
+ hscan->xs_base.rel,
+ ItemPointerGetBlockNumber(tid));
+
+ /*
+ * Prune page, but only if we weren't already on this page
+ */
+ if (prev_buf != hscan->xs_cbuf)
+ heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf);
+ }
+
+ /* Obtain share-lock on the buffer so we can examine visibility */
+ LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_SHARE);
+ got_heap_tuple = heap_hot_search_buffer(tid,
+ hscan->xs_base.rel,
+ hscan->xs_cbuf,
+ snapshot,
+ &bslot->base.tupdata,
+ all_dead,
+ !*call_again);
+ bslot->base.tupdata.t_self = *tid;
+ LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+
+ if (got_heap_tuple)
+ {
+ /*
+ * Only in a non-MVCC snapshot can more than one member of the HOT
+ * chain be visible.
+ */
+ *call_again = !IsMVCCSnapshot(snapshot);
+
+ slot->tts_tableOid = RelationGetRelid(scan->rel);
+ ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf);
+ }
+ else
+ {
+ /* We've reached the end of the HOT chain. */
+ *call_again = false;
+ }
+
+ return got_heap_tuple;
+}
+
+
+/* ------------------------------------------------------------------------
+ * Callbacks for non-modifying operations on individual tuples for heap AM
+ * ------------------------------------------------------------------------
+ */
+
+static bool
+heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
+ Snapshot snapshot)
+{
+ BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+ bool res;
+
+ Assert(TTS_IS_BUFFERTUPLE(slot));
+ Assert(BufferIsValid(bslot->buffer));
+
+ /*
+ * We need buffer pin and lock to call HeapTupleSatisfiesVisibility.
+ * Caller should be holding pin, but not lock.
+ */
+ LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE);
+ res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot,
+ bslot->buffer);
+ LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK);
+
+ return res;
+}
+
+
+/* ------------------------------------------------------------------------
+ * Definition of the heap table access method.
+ * ------------------------------------------------------------------------
+ */
+
static const TableAmRoutine heapam_methods = {
.type = T_TableAmRoutine,
+
+ .slot_callbacks = heapam_slot_callbacks,
+
+ .scan_begin = heap_beginscan,
+ .scan_end = heap_endscan,
+ .scan_rescan = heap_rescan,
+ .scan_getnextslot = heap_getnextslot,
+
+ .parallelscan_estimate = table_block_parallelscan_estimate,
+ .parallelscan_initialize = table_block_parallelscan_initialize,
+ .parallelscan_reinitialize = table_block_parallelscan_reinitialize,
+
+ .index_fetch_begin = heapam_index_fetch_begin,
+ .index_fetch_reset = heapam_index_fetch_reset,
+ .index_fetch_end = heapam_index_fetch_end,
+ .index_fetch_tuple = heapam_index_fetch_tuple,
+
+ .tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
};
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index e0a5ea42d52..5222966e510 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -22,6 +22,7 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/relscan.h"
+#include "access/tableam.h"
#include "access/transam.h"
#include "catalog/index.h"
#include "lib/stringinfo.h"
@@ -83,6 +84,7 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
scan = (IndexScanDesc) palloc(sizeof(IndexScanDescData));
scan->heapRelation = NULL; /* may be set later */
+ scan->xs_heapfetch = NULL;
scan->indexRelation = indexRelation;
scan->xs_snapshot = InvalidSnapshot; /* caller must initialize this */
scan->numberOfKeys = nkeys;
@@ -123,11 +125,6 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
scan->xs_hitup = NULL;
scan->xs_hitupdesc = NULL;
- ItemPointerSetInvalid(&scan->xs_ctup.t_self);
- scan->xs_ctup.t_data = NULL;
- scan->xs_cbuf = InvalidBuffer;
- scan->xs_continue_hot = false;
-
return scan;
}
@@ -335,6 +332,7 @@ systable_beginscan(Relation heapRelation,
sysscan->heap_rel = heapRelation;
sysscan->irel = irel;
+ sysscan->slot = table_slot_create(heapRelation, NULL);
if (snapshot == NULL)
{
@@ -384,9 +382,9 @@ systable_beginscan(Relation heapRelation,
* disadvantage; and there are no compensating advantages, because
* it's unlikely that such scans will occur in parallel.
*/
- sysscan->scan = heap_beginscan_strat(heapRelation, snapshot,
- nkeys, key,
- true, false);
+ sysscan->scan = table_beginscan_strat(heapRelation, snapshot,
+ nkeys, key,
+ true, false);
sysscan->iscan = NULL;
}
@@ -401,28 +399,46 @@ systable_beginscan(Relation heapRelation,
* Note that returned tuple is a reference to data in a disk buffer;
* it must not be modified, and should be presumed inaccessible after
* next getnext() or endscan() call.
+ *
+ * XXX: It'd probably make sense to offer a slot based interface, at least
+ * optionally.
*/
HeapTuple
systable_getnext(SysScanDesc sysscan)
{
- HeapTuple htup;
+ HeapTuple htup = NULL;
if (sysscan->irel)
{
- htup = index_getnext(sysscan->iscan, ForwardScanDirection);
+ if (index_getnext_slot(sysscan->iscan, ForwardScanDirection, sysscan->slot))
+ {
+ bool shouldFree;
- /*
- * We currently don't need to support lossy index operators for any
- * system catalog scan. It could be done here, using the scan keys to
- * drive the operator calls, if we arranged to save the heap attnums
- * during systable_beginscan(); this is practical because we still
- * wouldn't need to support indexes on expressions.
- */
- if (htup && sysscan->iscan->xs_recheck)
- elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
+ htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree);
+ Assert(!shouldFree);
+
+ /*
+ * We currently don't need to support lossy index operators for
+ * any system catalog scan. It could be done here, using the scan
+ * keys to drive the operator calls, if we arranged to save the
+ * heap attnums during systable_beginscan(); this is practical
+ * because we still wouldn't need to support indexes on
+ * expressions.
+ */
+ if (sysscan->iscan->xs_recheck)
+ elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
+ }
}
else
- htup = heap_getnext(sysscan->scan, ForwardScanDirection);
+ {
+ if (table_scan_getnextslot(sysscan->scan, ForwardScanDirection, sysscan->slot))
+ {
+ bool shouldFree;
+
+ htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree);
+ Assert(!shouldFree);
+ }
+ }
return htup;
}
@@ -446,37 +462,20 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
Snapshot freshsnap;
bool result;
+ Assert(tup == ExecFetchSlotHeapTuple(sysscan->slot, false, NULL));
+
/*
- * Trust that LockBuffer() and HeapTupleSatisfiesMVCC() do not themselves
+ * Trust that table_tuple_satisfies_snapshot() and its subsidiaries
+ * (commonly LockBuffer() and HeapTupleSatisfiesMVCC()) do not themselves
* acquire snapshots, so we need not register the snapshot. Those
* facilities are too low-level to have any business scanning tables.
*/
freshsnap = GetCatalogSnapshot(RelationGetRelid(sysscan->heap_rel));
- if (sysscan->irel)
- {
- IndexScanDesc scan = sysscan->iscan;
-
- Assert(IsMVCCSnapshot(scan->xs_snapshot));
- Assert(tup == &scan->xs_ctup);
- Assert(BufferIsValid(scan->xs_cbuf));
- /* must hold a buffer lock to call HeapTupleSatisfiesVisibility */
- LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
- result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->xs_cbuf);
- LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
- }
- else
- {
- HeapScanDesc scan = sysscan->scan;
-
- Assert(IsMVCCSnapshot(scan->rs_snapshot));
- Assert(tup == &scan->rs_ctup);
- Assert(BufferIsValid(scan->rs_cbuf));
- /* must hold a buffer lock to call HeapTupleSatisfiesVisibility */
- LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->rs_cbuf);
- LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
- }
+ result = table_tuple_satisfies_snapshot(sysscan->heap_rel,
+ sysscan->slot,
+ freshsnap);
+
return result;
}
@@ -488,13 +487,19 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
void
systable_endscan(SysScanDesc sysscan)
{
+ if (sysscan->slot)
+ {
+ ExecDropSingleTupleTableSlot(sysscan->slot);
+ sysscan->slot = NULL;
+ }
+
if (sysscan->irel)
{
index_endscan(sysscan->iscan);
index_close(sysscan->irel, AccessShareLock);
}
else
- heap_endscan(sysscan->scan);
+ table_endscan(sysscan->scan);
if (sysscan->snapshot)
UnregisterSnapshot(sysscan->snapshot);
@@ -541,6 +546,7 @@ systable_beginscan_ordered(Relation heapRelation,
sysscan->heap_rel = heapRelation;
sysscan->irel = indexRelation;
+ sysscan->slot = table_slot_create(heapRelation, NULL);
if (snapshot == NULL)
{
@@ -586,10 +592,12 @@ systable_beginscan_ordered(Relation heapRelation,
HeapTuple
systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
{
- HeapTuple htup;
+ HeapTuple htup = NULL;
Assert(sysscan->irel);
- htup = index_getnext(sysscan->iscan, direction);
+ if (index_getnext_slot(sysscan->iscan, direction, sysscan->slot))
+ htup = ExecFetchSlotHeapTuple(sysscan->slot, false, NULL);
+
/* See notes in systable_getnext */
if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
@@ -603,6 +611,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
void
systable_endscan_ordered(SysScanDesc sysscan)
{
+ if (sysscan->slot)
+ {
+ ExecDropSingleTupleTableSlot(sysscan->slot);
+ sysscan->slot = NULL;
+ }
+
Assert(sysscan->irel);
index_endscan(sysscan->iscan);
if (sysscan->snapshot)
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 4ad30186d97..ae1c87ebadd 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -72,6 +72,7 @@
#include "access/amapi.h"
#include "access/heapam.h"
#include "access/relscan.h"
+#include "access/tableam.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "catalog/index.h"
@@ -235,6 +236,9 @@ index_beginscan(Relation heapRelation,
scan->heapRelation = heapRelation;
scan->xs_snapshot = snapshot;
+ /* prepare to fetch index matches from table */
+ scan->xs_heapfetch = table_index_fetch_begin(heapRelation);
+
return scan;
}
@@ -318,16 +322,12 @@ index_rescan(IndexScanDesc scan,
Assert(nkeys == scan->numberOfKeys);
Assert(norderbys == scan->numberOfOrderBys);
- /* Release any held pin on a heap page */
- if (BufferIsValid(scan->xs_cbuf))
- {
- ReleaseBuffer(scan->xs_cbuf);
- scan->xs_cbuf = InvalidBuffer;
- }
-
- scan->xs_continue_hot = false;
+ /* Release resources (like buffer pins) from table accesses */
+ if (scan->xs_heapfetch)
+ table_index_fetch_reset(scan->xs_heapfetch);
scan->kill_prior_tuple = false; /* for safety */
+ scan->xs_heap_continue = false;
scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
orderbys, norderbys);
@@ -343,11 +343,11 @@ index_endscan(IndexScanDesc scan)
SCAN_CHECKS;
CHECK_SCAN_PROCEDURE(amendscan);
- /* Release any held pin on a heap page */
- if (BufferIsValid(scan->xs_cbuf))
+ /* Release resources (like buffer pins) from table accesses */
+ if (scan->xs_heapfetch)
{
- ReleaseBuffer(scan->xs_cbuf);
- scan->xs_cbuf = InvalidBuffer;
+ table_index_fetch_end(scan->xs_heapfetch);
+ scan->xs_heapfetch = NULL;
}
/* End the AM's scan */
@@ -379,17 +379,16 @@ index_markpos(IndexScanDesc scan)
/* ----------------
* index_restrpos - restore a scan position
*
- * NOTE: this only restores the internal scan state of the index AM.
- * The current result tuple (scan->xs_ctup) doesn't change. See comments
- * for ExecRestrPos().
- *
- * NOTE: in the presence of HOT chains, mark/restore only works correctly
- * if the scan's snapshot is MVCC-safe; that ensures that there's at most one
- * returnable tuple in each HOT chain, and so restoring the prior state at the
- * granularity of the index AM is sufficient. Since the only current user
- * of mark/restore functionality is nodeMergejoin.c, this effectively means
- * that merge-join plans only work for MVCC snapshots. This could be fixed
- * if necessary, but for now it seems unimportant.
+ * NOTE: this only restores the internal scan state of the index AM. See
+ * comments for ExecRestrPos().
+ *
+ * NOTE: For heap, in the presence of HOT chains, mark/restore only works
+ * correctly if the scan's snapshot is MVCC-safe; that ensures that there's at
+ * most one returnable tuple in each HOT chain, and so restoring the prior
+ * state at the granularity of the index AM is sufficient. Since the only
+ * current user of mark/restore functionality is nodeMergejoin.c, this
+ * effectively means that merge-join plans only work for MVCC snapshots. This
+ * could be fixed if necessary, but for now it seems unimportant.
* ----------------
*/
void
@@ -400,9 +399,12 @@ index_restrpos(IndexScanDesc scan)
SCAN_CHECKS;
CHECK_SCAN_PROCEDURE(amrestrpos);
- scan->xs_continue_hot = false;
+ /* release resources (like buffer pins) from table accesses */
+ if (scan->xs_heapfetch)
+ table_index_fetch_reset(scan->xs_heapfetch);
scan->kill_prior_tuple = false; /* for safety */
+ scan->xs_heap_continue = false;
scan->indexRelation->rd_indam->amrestrpos(scan);
}
@@ -483,6 +485,9 @@ index_parallelrescan(IndexScanDesc scan)
{
SCAN_CHECKS;
+ if (scan->xs_heapfetch)
+ table_index_fetch_reset(scan->xs_heapfetch);
+
/* amparallelrescan is optional; assume no-op if not provided by AM */
if (scan->indexRelation->rd_indam->amparallelrescan != NULL)
scan->indexRelation->rd_indam->amparallelrescan(scan);
@@ -513,6 +518,9 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
scan->heapRelation = heaprel;
scan->xs_snapshot = snapshot;
+ /* prepare to fetch index matches from table */
+ scan->xs_heapfetch = table_index_fetch_begin(heaprel);
+
return scan;
}
@@ -535,7 +543,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
/*
* The AM's amgettuple proc finds the next index entry matching the scan
- * keys, and puts the TID into scan->xs_ctup.t_self. It should also set
+ * keys, and puts the TID into scan->xs_heaptid. It should also set
* scan->xs_recheck and possibly scan->xs_itup/scan->xs_hitup, though we
* pay no attention to those fields here.
*/
@@ -543,23 +551,23 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
/* Reset kill flag immediately for safety */
scan->kill_prior_tuple = false;
+ scan->xs_heap_continue = false;
/* If we're out of index entries, we're done */
if (!found)
{
- /* ... but first, release any held pin on a heap page */
- if (BufferIsValid(scan->xs_cbuf))
- {
- ReleaseBuffer(scan->xs_cbuf);
- scan->xs_cbuf = InvalidBuffer;
- }
+ /* release resources (like buffer pins) from table accesses */
+ if (scan->xs_heapfetch)
+ table_index_fetch_reset(scan->xs_heapfetch);
+
return NULL;
}
+ Assert(ItemPointerIsValid(&scan->xs_heaptid));
pgstat_count_index_tuples(scan->indexRelation, 1);
/* Return the TID of the tuple we found. */
- return &scan->xs_ctup.t_self;
+ return &scan->xs_heaptid;
}
/* ----------------
@@ -580,53 +588,18 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
* enough information to do it efficiently in the general case.
* ----------------
*/
-HeapTuple
-index_fetch_heap(IndexScanDesc scan)
+bool
+index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
{
- ItemPointer tid = &scan->xs_ctup.t_self;
bool all_dead = false;
- bool got_heap_tuple;
-
- /* We can skip the buffer-switching logic if we're in mid-HOT chain. */
- if (!scan->xs_continue_hot)
- {
- /* Switch to correct buffer if we don't have it already */
- Buffer prev_buf = scan->xs_cbuf;
-
- scan->xs_cbuf = ReleaseAndReadBuffer(scan->xs_cbuf,
- scan->heapRelation,
- ItemPointerGetBlockNumber(tid));
+ bool found;
- /*
- * Prune page, but only if we weren't already on this page
- */
- if (prev_buf != scan->xs_cbuf)
- heap_page_prune_opt(scan->heapRelation, scan->xs_cbuf);
- }
+ found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid,
+ scan->xs_snapshot, slot,
+ &scan->xs_heap_continue, &all_dead);
- /* Obtain share-lock on the buffer so we can examine visibility */
- LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
- got_heap_tuple = heap_hot_search_buffer(tid, scan->heapRelation,
- scan->xs_cbuf,
- scan->xs_snapshot,
- &scan->xs_ctup,
- &all_dead,
- !scan->xs_continue_hot);
- LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
-
- if (got_heap_tuple)
- {
- /*
- * Only in a non-MVCC snapshot can more than one member of the HOT
- * chain be visible.
- */
- scan->xs_continue_hot = !IsMVCCSnapshot(scan->xs_snapshot);
+ if (found)
pgstat_count_heap_fetch(scan->indexRelation);
- return &scan->xs_ctup;
- }
-
- /* We've reached the end of the HOT chain. */
- scan->xs_continue_hot = false;
/*
* If we scanned a whole HOT chain and found only dead tuples, tell index
@@ -638,17 +611,17 @@ index_fetch_heap(IndexScanDesc scan)
if (!scan->xactStartedInRecovery)
scan->kill_prior_tuple = all_dead;
- return NULL;
+ return found;
}
/* ----------------
- * index_getnext - get the next heap tuple from a scan
+ * index_getnext_slot - get the next tuple from a scan
*
- * The result is the next heap tuple satisfying the scan keys and the
- * snapshot, or NULL if no more matching tuples exist.
+ * The result is true if a tuple satisfying the scan keys and the snapshot was
+ * found, false otherwise. The tuple is stored in the specified slot.
*
- * On success, the buffer containing the heap tup is pinned (the pin will be
- * dropped in a future index_getnext_tid, index_fetch_heap or index_endscan
+ * On success, resources (like buffer pins) are likely to be held, and will be
+ * dropped by a future index_getnext_tid, index_fetch_heap or index_endscan
* call).
*
* Note: caller must check scan->xs_recheck, and perform rechecking of the
@@ -656,32 +629,23 @@ index_fetch_heap(IndexScanDesc scan)
* enough information to do it efficiently in the general case.
* ----------------
*/
-HeapTuple
-index_getnext(IndexScanDesc scan, ScanDirection direction)
+bool
+index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
- HeapTuple heapTuple;
- ItemPointer tid;
-
for (;;)
{
- if (scan->xs_continue_hot)
- {
- /*
- * We are resuming scan of a HOT chain after having returned an
- * earlier member. Must still hold pin on current heap page.
- */
- Assert(BufferIsValid(scan->xs_cbuf));
- Assert(ItemPointerGetBlockNumber(&scan->xs_ctup.t_self) ==
- BufferGetBlockNumber(scan->xs_cbuf));
- }
- else
+ if (!scan->xs_heap_continue)
{
+ ItemPointer tid;
+
/* Time to fetch the next TID from the index */
tid = index_getnext_tid(scan, direction);
/* If we're out of index entries, we're done */
if (tid == NULL)
break;
+
+ Assert(ItemPointerEquals(tid, &scan->xs_heaptid));
}
/*
@@ -689,12 +653,12 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
* If we don't find anything, loop around and grab the next TID from
* the index.
*/
- heapTuple = index_fetch_heap(scan);
- if (heapTuple != NULL)
- return heapTuple;
+ Assert(ItemPointerIsValid(&scan->xs_heaptid));
+ if (index_fetch_heap(scan, slot))
+ return true;
}
- return NULL; /* failure exit */
+ return false;
}
/* ----------------
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 98917de2efd..60e0b90ccf2 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -310,7 +310,7 @@ btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
if (_bt_first(scan, ForwardScanDirection))
{
/* Save tuple ID, and continue scanning */
- heapTid = &scan->xs_ctup.t_self;
+ heapTid = &scan->xs_heaptid;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 92832237a8b..af3da3aa5b6 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -1135,7 +1135,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
readcomplete:
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
- scan->xs_ctup.t_self = currItem->heapTid;
+ scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
@@ -1185,7 +1185,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
- scan->xs_ctup.t_self = currItem->heapTid;
+ scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
@@ -1964,7 +1964,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
- scan->xs_ctup.t_self = currItem->heapTid;
+ scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index dc398e11867..e37cbac7b3c 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -61,6 +61,7 @@
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/relscan.h"
+#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
@@ -158,9 +159,9 @@ typedef struct BTShared
/*
* This variable-sized field must come last.
*
- * See _bt_parallel_estimate_shared() and heap_parallelscan_estimate().
+ * See _bt_parallel_estimate_shared() and table_parallelscan_estimate().
*/
- ParallelHeapScanDescData heapdesc;
+ ParallelTableScanDescData heapdesc;
} BTShared;
/*
@@ -282,7 +283,7 @@ static void _bt_load(BTWriteState *wstate,
static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent,
int request);
static void _bt_end_parallel(BTLeader *btleader);
-static Size _bt_parallel_estimate_shared(Snapshot snapshot);
+static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot);
static double _bt_parallel_heapscan(BTBuildState *buildstate,
bool *brokenhotchain);
static void _bt_leader_participate_as_worker(BTBuildState *buildstate);
@@ -1275,7 +1276,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
* Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and
* PARALLEL_KEY_TUPLESORT tuplesort workspace
*/
- estbtshared = _bt_parallel_estimate_shared(snapshot);
+ estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, estbtshared);
estsort = tuplesort_estimate_shared(scantuplesortstates);
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -1316,7 +1317,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
btshared->havedead = false;
btshared->indtuples = 0.0;
btshared->brokenhotchain = false;
- heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot);
+ table_parallelscan_initialize(btspool->heap, &btshared->heapdesc,
+ snapshot);
/*
* Store shared tuplesort-private state, for which we reserved space.
@@ -1403,10 +1405,10 @@ _bt_end_parallel(BTLeader *btleader)
* btree index build based on the snapshot its parallel scan will use.
*/
static Size
-_bt_parallel_estimate_shared(Snapshot snapshot)
+_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot)
{
return add_size(offsetof(BTShared, heapdesc),
- heap_parallelscan_estimate(snapshot));
+ table_parallelscan_estimate(heap, snapshot));
}
/*
@@ -1617,7 +1619,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
{
SortCoordinate coordinate;
BTBuildState buildstate;
- HeapScanDesc scan;
+ TableScanDesc scan;
double reltuples;
IndexInfo *indexInfo;
@@ -1670,7 +1672,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
/* Join parallel scan */
indexInfo = BuildIndexInfo(btspool->index);
indexInfo->ii_Concurrent = btshared->isconcurrent;
- scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc);
+ scan = table_beginscan_parallel(btspool->heap, &btshared->heapdesc);
reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
true, _bt_build_callback,
(void *) &buildstate, scan);
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index dc0d63924db..9365bc57ad5 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -927,7 +927,7 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
if (so->iPtr < so->nPtrs)
{
/* continuing to return reported tuples */
- scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
+ scan->xs_heaptid = so->heapPtrs[so->iPtr];
scan->xs_recheck = so->recheck[so->iPtr];
scan->xs_hitup = so->reconTups[so->iPtr];
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 84851e4ff88..628d930c130 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -6,13 +6,304 @@
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * src/backend/access/table/tableam.c
+ *
+ * IDENTIFICATION
+ * src/backend/access/table/tableam.c
+ *
+ * NOTES
+ * Note that most function in here are documented in tableam.h, rather than
+ * here. That's because there's a lot of inline functions in tableam.h and
+ * it'd be harder to understand if one constantly had to switch between files.
+ *
*----------------------------------------------------------------------
*/
#include "postgres.h"
+#include "access/heapam.h" /* for ss_* */
#include "access/tableam.h"
+#include "access/xact.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
/* GUC variables */
char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
+bool synchronize_seqscans = true;
+
+
+/* ----------------------------------------------------------------------------
+ * Slot functions.
+ * ----------------------------------------------------------------------------
+ */
+
+const TupleTableSlotOps *
+table_slot_callbacks(Relation relation)
+{
+ const TupleTableSlotOps *tts_cb;
+
+ if (relation->rd_tableam)
+ tts_cb = relation->rd_tableam->slot_callbacks(relation);
+ else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /*
+ * Historically FDWs expect to store heap tuples in slots. Continue
+ * handing them one, to make it less painful to adapt FDWs to new
+ * versions. The cost of a heap slot over a virtual slot is pretty
+ * small.
+ */
+ tts_cb = &TTSOpsHeapTuple;
+ }
+ else
+ {
+ /*
+ * These need to be supported, as some parts of the code (like COPY)
+ * need to create slots for such relations too. It seems better to
+ * centralize the knowledge that a heap slot is the right thing in
+ * that case here.
+ */
+ Assert(relation->rd_rel->relkind == RELKIND_VIEW ||
+ relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+ tts_cb = &TTSOpsVirtual;
+ }
+
+ return tts_cb;
+}
+
+TupleTableSlot *
+table_slot_create(Relation relation, List **reglist)
+{
+ const TupleTableSlotOps *tts_cb;
+ TupleTableSlot *slot;
+
+ tts_cb = table_slot_callbacks(relation);
+ slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb);
+
+ if (reglist)
+ *reglist = lappend(*reglist, slot);
+
+ return slot;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Table scan functions.
+ * ----------------------------------------------------------------------------
+ */
+
+TableScanDesc
+table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
+{
+ Oid relid = RelationGetRelid(relation);
+ Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
+
+ return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key, NULL,
+ true, true, true, false, false, true);
+}
+
+void
+table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot)
+{
+ Assert(IsMVCCSnapshot(snapshot));
+
+ RegisterSnapshot(snapshot);
+ scan->rs_snapshot = snapshot;
+ scan->rs_temp_snap = true;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Parallel table scan related functions.
+ * ----------------------------------------------------------------------------
+ */
+
+Size
+table_parallelscan_estimate(Relation rel, Snapshot snapshot)
+{
+ Size sz = 0;
+
+ if (IsMVCCSnapshot(snapshot))
+ sz = add_size(sz, EstimateSnapshotSpace(snapshot));
+ else
+ Assert(snapshot == SnapshotAny);
+
+ sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
+
+ return sz;
+}
+
+void
+table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
+ Snapshot snapshot)
+{
+ Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+
+ pscan->phs_snapshot_off = snapshot_off;
+
+ if (IsMVCCSnapshot(snapshot))
+ {
+ SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
+ pscan->phs_snapshot_any = false;
+ }
+ else
+ {
+ Assert(snapshot == SnapshotAny);
+ pscan->phs_snapshot_any = true;
+ }
+}
+
+TableScanDesc
+table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan)
+{
+ Snapshot snapshot;
+
+ Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+
+ if (!parallel_scan->phs_snapshot_any)
+ {
+ /* Snapshot was serialized -- restore it */
+ snapshot = RestoreSnapshot((char *) parallel_scan +
+ parallel_scan->phs_snapshot_off);
+ RegisterSnapshot(snapshot);
+ }
+ else
+ {
+ /* SnapshotAny passed by caller (not serialized) */
+ snapshot = SnapshotAny;
+ }
+
+ return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, parallel_scan,
+ true, true, true, false, false, !parallel_scan->phs_snapshot_any);
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Helper functions to implement parallel scans for block oriented AMs.
+ * ----------------------------------------------------------------------------
+ */
+
+Size
+table_block_parallelscan_estimate(Relation rel)
+{
+ return sizeof(ParallelBlockTableScanDescData);
+}
+
+Size
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+{
+ ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
+
+ bpscan->base.phs_relid = RelationGetRelid(rel);
+ bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
+ /* compare phs_syncscan initialization to similar logic in initscan */
+ bpscan->base.phs_syncscan = synchronize_seqscans &&
+ !RelationUsesLocalBuffers(rel) &&
+ bpscan->phs_nblocks > NBuffers / 4;
+ SpinLockInit(&bpscan->phs_mutex);
+ bpscan->phs_startblock = InvalidBlockNumber;
+ pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
+
+ return sizeof(ParallelBlockTableScanDescData);
+}
+
+void
+table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
+{
+ ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
+
+ pg_atomic_write_u64(&bpscan->phs_nallocated, 0);
+}
+
+/*
+ * find and set the scan's startblock
+ *
+ * Determine where the parallel seq scan should start. This function may be
+ * called many times, once by each parallel worker. We must be careful only
+ * to set the startblock once.
+ */
+void
+table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
+{
+ BlockNumber sync_startpage = InvalidBlockNumber;
+
+retry:
+ /* Grab the spinlock. */
+ SpinLockAcquire(&pbscan->phs_mutex);
+
+ /*
+ * If the scan's startblock has not yet been initialized, we must do so
+ * now. If this is not a synchronized scan, we just start at block 0, but
+ * if it is a synchronized scan, we must get the starting position from
+ * the synchronized scan machinery. We can't hold the spinlock while
+ * doing that, though, so release the spinlock, get the information we
+ * need, and retry. If nobody else has initialized the scan in the
+ * meantime, we'll fill in the value we fetched on the second time
+ * through.
+ */
+ if (pbscan->phs_startblock == InvalidBlockNumber)
+ {
+ if (!pbscan->base.phs_syncscan)
+ pbscan->phs_startblock = 0;
+ else if (sync_startpage != InvalidBlockNumber)
+ pbscan->phs_startblock = sync_startpage;
+ else
+ {
+ SpinLockRelease(&pbscan->phs_mutex);
+ sync_startpage = ss_get_location(rel, pbscan->phs_nblocks);
+ goto retry;
+ }
+ }
+ SpinLockRelease(&pbscan->phs_mutex);
+}
+
+/*
+ * get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the first
+ * backend gets an InvalidBlockNumber return.
+ */
+BlockNumber
+table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
+{
+ BlockNumber page;
+ uint64 nallocated;
+
+ /*
+ * phs_nallocated tracks how many pages have been allocated to workers
+ * already. When phs_nallocated >= rs_nblocks, all blocks have been
+ * allocated.
+ *
+ * Because we use an atomic fetch-and-add to fetch the current value, the
+ * phs_nallocated counter will exceed rs_nblocks, because workers will
+ * still increment the value, when they try to allocate the next block but
+ * all blocks have been allocated already. The counter must be 64 bits
+ * wide because of that, to avoid wrapping around when rs_nblocks is close
+ * to 2^32.
+ *
+ * The actual page to return is calculated by adding the counter to the
+ * starting block number, modulo nblocks.
+ */
+ nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
+ if (nallocated >= pbscan->phs_nblocks)
+ page = InvalidBlockNumber; /* all blocks have been allocated */
+ else
+ page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
+
+ /*
+ * Report scan location. Normally, we report the current page number.
+ * When we reach the end of the scan, though, we report the starting page,
+ * not the ending page, just so the starting positions for later scans
+ * doesn't slew backwards. We only report the position at the end of the
+ * scan once, though: subsequent callers will report nothing.
+ */
+ if (pbscan->base.phs_syncscan)
+ {
+ if (page != InvalidBlockNumber)
+ ss_report_location(rel, page);
+ else if (nallocated == pbscan->phs_nblocks)
+ ss_report_location(rel, pbscan->phs_startblock);
+ }
+
+ return page;
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 54a078d68aa..3d3b82e1e58 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -44,6 +44,26 @@ GetTableAmRoutine(Oid amhandler)
elog(ERROR, "Table access method handler %u did not return a TableAmRoutine struct",
amhandler);
+ /*
+ * Assert that all required callbacks are present. That makes it a bit
+ * easier to keep AMs up to date, e.g. when forward porting them to a new
+ * major version.
+ */
+ Assert(routine->scan_begin != NULL);
+ Assert(routine->scan_end != NULL);
+ Assert(routine->scan_rescan != NULL);
+
+ Assert(routine->parallelscan_estimate != NULL);
+ Assert(routine->parallelscan_initialize != NULL);
+ Assert(routine->parallelscan_reinitialize != NULL);
+
+ Assert(routine->index_fetch_begin != NULL);
+ Assert(routine->index_fetch_reset != NULL);
+ Assert(routine->index_fetch_end != NULL);
+ Assert(routine->index_fetch_tuple != NULL);
+
+ Assert(routine->tuple_satisfies_snapshot != NULL);
+
return routine;
}
@@ -98,7 +118,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
{
Oid result;
Relation rel;
- HeapScanDesc scandesc;
+ TableScanDesc scandesc;
HeapTuple tuple;
ScanKeyData entry[1];
@@ -113,7 +133,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
Anum_pg_am_amname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(tableamname));
- scandesc = heap_beginscan_catalog(rel, 1, entry);
+ scandesc = table_beginscan_catalog(rel, 1, entry);
tuple = heap_getnext(scandesc, ForwardScanDirection);
/* We assume that there can be at most one matching tuple */
@@ -123,7 +143,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
else
result = InvalidOid;
- heap_endscan(scandesc);
+ table_endscan(scandesc);
heap_close(rel, AccessShareLock);
if (!OidIsValid(result) && !missing_ok)
diff --git a/src/backend/access/tablesample/system.c b/src/backend/access/tablesample/system.c
index fb1a5634244..26f7de3e45d 100644
--- a/src/backend/access/tablesample/system.c
+++ b/src/backend/access/tablesample/system.c
@@ -180,7 +180,8 @@ static BlockNumber
system_nextsampleblock(SampleScanState *node)
{
SystemSamplerData *sampler = (SystemSamplerData *) node->tsm_state;
- HeapScanDesc scan = node->ss.ss_currentScanDesc;
+ TableScanDesc scan = node->ss.ss_currentScanDesc;
+ HeapScanDesc hscan = (HeapScanDesc) scan;
BlockNumber nextblock = sampler->nextblock;
uint32 hashinput[2];
@@ -199,7 +200,7 @@ system_nextsampleblock(SampleScanState *node)
* Loop over block numbers until finding suitable block or reaching end of
* relation.
*/
- for (; nextblock < scan->rs_nblocks; nextblock++)
+ for (; nextblock < hscan->rs_nblocks; nextblock++)
{
uint32 hash;
@@ -211,7 +212,7 @@ system_nextsampleblock(SampleScanState *node)
break;
}
- if (nextblock < scan->rs_nblocks)
+ if (nextblock < hscan->rs_nblocks)
{
/* Found a suitable block; remember where we should start next time */
sampler->nextblock = nextblock + 1;