Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c3fbf50

Browse files
michail-nikolaevCommitfest Bot
authored and
Commitfest Bot
committed
Reset snapshots periodically in non-unique non-parallel concurrent index builds
Long-living snapshots used by CREATE INDEX CONCURRENTLY and REINDEX CONCURRENTLY can hold back the global xmin horizon. Commit d9d0762 attempted to allow VACUUM to ignore such snapshots to mitigate this problem. However, this was reverted in commit e28bb88 because it could cause indexes to miss heap tuples that were HOT-updated and HOT-pruned during the index creation, leading to index corruption. This patch introduces an alternative by periodically resetting the snapshot used during the first phase. By resetting the snapshot every N pages during the heap scan, it allows the xmin horizon to advance. Currently, this technique is applied to: - only during the first scan of the heap: The second scan during index validation still uses a single snapshot to ensure index correctness - non-parallel index builds: Parallel index builds are not yet supported and will be addressed in a following commits - non-unique indexes: Unique index builds still require a consistent snapshot to enforce uniqueness constraints, will be addressed in a following commits A new scan option SO_RESET_SNAPSHOT is introduced. When set, it causes the snapshot to be reset "between" every SO_RESET_SNAPSHOT_EACH_N_PAGE pages during the scan. The heap scan code is adjusted to support this option, and the index build code is modified to use it for applicable concurrent index builds that are not on system catalogs and not using parallel workers.
1 parent fc75dcf commit c3fbf50

File tree

20 files changed

+427
-35
lines changed

20 files changed

+427
-35
lines changed

contrib/amcheck/verify_nbtree.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,8 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
558558
0, /* number of keys */
559559
NULL, /* scan key */
560560
true, /* buffer access strategy OK */
561-
true); /* syncscan OK? */
561+
true, /* syncscan OK? */
562+
false);
562563

563564
/*
564565
* Scan will behave as the first scan of a CREATE INDEX CONCURRENTLY

contrib/pgstattuple/pgstattuple.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ pgstat_heap(Relation rel, FunctionCallInfo fcinfo)
335335
errmsg("only heap AM is supported")));
336336

337337
/* Disable syncscan because we assume we scan from block zero upwards */
338-
scan = table_beginscan_strat(rel, SnapshotAny, 0, NULL, true, false);
338+
scan = table_beginscan_strat(rel, SnapshotAny, 0, NULL, true, false, false);
339339
hscan = (HeapScanDesc) scan;
340340

341341
InitDirtySnapshot(SnapshotDirty);

src/backend/access/brin/brin.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1216,11 +1216,12 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12161216
state->bs_sortstate =
12171217
tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
12181218
TUPLESORT_NONE);
1219-
1219+
InvalidateCatalogSnapshot();
12201220
/* scan the relation and merge per-worker results */
12211221
reltuples = _brin_parallel_merge(state);
12221222

12231223
_brin_end_parallel(state->bs_leader, state);
1224+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12241225
}
12251226
else /* no parallel index build */
12261227
{
@@ -1233,6 +1234,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12331234
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
12341235
brinbuildCallback, state, NULL);
12351236

1237+
InvalidateCatalogSnapshot();
12361238
/*
12371239
* process the final batch
12381240
*
@@ -1252,6 +1254,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12521254
brin_fill_empty_ranges(state,
12531255
state->bs_currRangeStart,
12541256
state->bs_maxRangeStart);
1257+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12551258
}
12561259

12571260
/* release resources */
@@ -2374,6 +2377,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23742377
WalUsage *walusage;
23752378
BufferUsage *bufferusage;
23762379
bool leaderparticipates = true;
2380+
bool need_pop_active_snapshot = true;
23772381
int querylen;
23782382

23792383
#ifdef DISABLE_LEADER_PARTICIPATION
@@ -2399,9 +2403,16 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23992403
* live according to that.
24002404
*/
24012405
if (!isconcurrent)
2406+
{
2407+
Assert(ActiveSnapshotSet());
24022408
snapshot = SnapshotAny;
2409+
need_pop_active_snapshot = false;
2410+
}
24032411
else
2412+
{
24042413
snapshot = RegisterSnapshot(GetTransactionSnapshot());
2414+
PushActiveSnapshot(GetTransactionSnapshot());
2415+
}
24052416

24062417
/*
24072418
* Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
@@ -2444,6 +2455,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
24442455
/* If no DSM segment was available, back out (do serial build) */
24452456
if (pcxt->seg == NULL)
24462457
{
2458+
if (need_pop_active_snapshot)
2459+
PopActiveSnapshot();
24472460
if (IsMVCCSnapshot(snapshot))
24482461
UnregisterSnapshot(snapshot);
24492462
DestroyParallelContext(pcxt);
@@ -2523,6 +2536,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25232536
/* If no workers were successfully launched, back out (do serial build) */
25242537
if (pcxt->nworkers_launched == 0)
25252538
{
2539+
if (need_pop_active_snapshot)
2540+
PopActiveSnapshot();
25262541
_brin_end_parallel(brinleader, NULL);
25272542
return;
25282543
}
@@ -2539,6 +2554,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25392554
* sure that the failure-to-start case will not hang forever.
25402555
*/
25412556
WaitForParallelWorkersToAttach(pcxt);
2557+
if (need_pop_active_snapshot)
2558+
PopActiveSnapshot();
25422559
}
25432560

25442561
/*

src/backend/access/gin/gininsert.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "pgstat.h"
2929
#include "storage/bufmgr.h"
3030
#include "storage/predicate.h"
31+
#include "storage/proc.h"
3132
#include "tcop/tcopprot.h"
3233
#include "utils/datum.h"
3334
#include "utils/memutils.h"
@@ -646,6 +647,8 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
646647
buildstate.accum.ginstate = &buildstate.ginstate;
647648
ginInitBA(&buildstate.accum);
648649

650+
Assert(!indexInfo->ii_Concurrent || indexInfo->ii_ParallelWorkers || !TransactionIdIsValid(MyProc->xid));
651+
649652
/* Report table scan phase started */
650653
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
651654
PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN);
@@ -708,11 +711,13 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
708711
tuplesort_begin_index_gin(heap, index,
709712
maintenance_work_mem, coordinate,
710713
TUPLESORT_NONE);
714+
InvalidateCatalogSnapshot();
711715

712716
/* scan the relation in parallel and merge per-worker results */
713717
reltuples = _gin_parallel_merge(state);
714718

715719
_gin_end_parallel(state->bs_leader, state);
720+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
716721
}
717722
else /* no parallel index build */
718723
{
@@ -722,6 +727,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
722727
*/
723728
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
724729
ginBuildCallback, &buildstate, NULL);
730+
InvalidateCatalogSnapshot();
725731

726732
/* dump remaining entries to the index */
727733
oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
@@ -735,6 +741,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
735741
list, nlist, &buildstate.buildStats);
736742
}
737743
MemoryContextSwitchTo(oldCtx);
744+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
738745
}
739746

740747
MemoryContextDelete(buildstate.funcCtx);
@@ -907,6 +914,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
907914
WalUsage *walusage;
908915
BufferUsage *bufferusage;
909916
bool leaderparticipates = true;
917+
bool need_pop_active_snapshot = true;
910918
int querylen;
911919

912920
#ifdef DISABLE_LEADER_PARTICIPATION
@@ -931,9 +939,16 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
931939
* live according to that.
932940
*/
933941
if (!isconcurrent)
942+
{
943+
Assert(ActiveSnapshotSet());
934944
snapshot = SnapshotAny;
945+
need_pop_active_snapshot = false;
946+
}
935947
else
948+
{
936949
snapshot = RegisterSnapshot(GetTransactionSnapshot());
950+
PushActiveSnapshot(GetTransactionSnapshot());
951+
}
937952

938953
/*
939954
* Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
@@ -976,6 +991,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
976991
/* If no DSM segment was available, back out (do serial build) */
977992
if (pcxt->seg == NULL)
978993
{
994+
if (need_pop_active_snapshot)
995+
PopActiveSnapshot();
979996
if (IsMVCCSnapshot(snapshot))
980997
UnregisterSnapshot(snapshot);
981998
DestroyParallelContext(pcxt);
@@ -1050,6 +1067,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10501067
/* If no workers were successfully launched, back out (do serial build) */
10511068
if (pcxt->nworkers_launched == 0)
10521069
{
1070+
if (need_pop_active_snapshot)
1071+
PopActiveSnapshot();
10531072
_gin_end_parallel(ginleader, NULL);
10541073
return;
10551074
}
@@ -1066,6 +1085,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10661085
* sure that the failure-to-start case will not hang forever.
10671086
*/
10681087
WaitForParallelWorkersToAttach(pcxt);
1088+
if (need_pop_active_snapshot)
1089+
PopActiveSnapshot();
10691090
}
10701091

10711092
/*

src/backend/access/gist/gistbuild.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "optimizer/optimizer.h"
4444
#include "storage/bufmgr.h"
4545
#include "storage/bulk_write.h"
46+
#include "storage/proc.h"
4647

4748
#include "utils/memutils.h"
4849
#include "utils/rel.h"
@@ -259,6 +260,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
259260
buildstate.indtuples = 0;
260261
buildstate.indtuplesSize = 0;
261262

263+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
262264
if (buildstate.buildMode == GIST_SORTED_BUILD)
263265
{
264266
/*
@@ -350,6 +352,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
350352

351353
result->heap_tuples = reltuples;
352354
result->index_tuples = (double) buildstate.indtuples;
355+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
353356

354357
return result;
355358
}

src/backend/access/hash/hash.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
197197

198198
result->heap_tuples = reltuples;
199199
result->index_tuples = buildstate.indtuples;
200+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
200201

201202
return result;
202203
}

src/backend/access/heap/heapam.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "utils/inval.h"
5454
#include "utils/spccache.h"
5555
#include "utils/syscache.h"
56+
#include "utils/injection_point.h"
5657

5758

5859
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
@@ -612,6 +613,36 @@ heap_prepare_pagescan(TableScanDesc sscan)
612613
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
613614
}
614615

616+
/*
617+
* Reset the active snapshot during a scan.
618+
* This ensures the xmin horizon can advance while maintaining safe tuple visibility.
619+
* Note: No other snapshot should be active during this operation.
620+
*/
621+
static inline void
622+
heap_reset_scan_snapshot(TableScanDesc sscan)
623+
{
624+
/* Make sure no other snapshot was set as active. */
625+
Assert(GetActiveSnapshot() == sscan->rs_snapshot);
626+
/* And make sure active snapshot is not registered. */
627+
Assert(GetActiveSnapshot()->regd_count == 0);
628+
PopActiveSnapshot();
629+
630+
sscan->rs_snapshot = InvalidSnapshot; /* just ot be tidy */
631+
Assert(!HaveRegisteredOrActiveSnapshot());
632+
InvalidateCatalogSnapshot();
633+
634+
/* Goal of snapshot reset is to allow horizon to advance. */
635+
Assert(!TransactionIdIsValid(MyProc->xmin));
636+
#if USE_INJECTION_POINTS
637+
/* In some cases it is still not possible due xid assign. */
638+
if (!TransactionIdIsValid(MyProc->xid))
639+
INJECTION_POINT("heap_reset_scan_snapshot_effective", NULL);
640+
#endif
641+
642+
PushActiveSnapshot(GetLatestSnapshot());
643+
sscan->rs_snapshot = GetActiveSnapshot();
644+
}
645+
615646
/*
616647
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
617648
*
@@ -653,7 +684,12 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
653684

654685
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
655686
if (BufferIsValid(scan->rs_cbuf))
687+
{
656688
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
689+
if ((scan->rs_base.rs_flags & SO_RESET_SNAPSHOT) &&
690+
(scan->rs_cblock % SO_RESET_SNAPSHOT_EACH_N_PAGE == 0))
691+
heap_reset_scan_snapshot((TableScanDesc) scan);
692+
}
657693
}
658694

659695
/*
@@ -1304,6 +1340,15 @@ heap_endscan(TableScanDesc sscan)
13041340
if (scan->rs_parallelworkerdata != NULL)
13051341
pfree(scan->rs_parallelworkerdata);
13061342

1343+
if (scan->rs_base.rs_flags & SO_RESET_SNAPSHOT)
1344+
{
1345+
Assert(!(scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT));
1346+
/* Make sure no other snapshot was set as active. */
1347+
Assert(GetActiveSnapshot() == sscan->rs_snapshot);
1348+
/* And make sure snapshot is not registered. */
1349+
Assert(GetActiveSnapshot()->regd_count == 0);
1350+
}
1351+
13071352
if (scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT)
13081353
UnregisterSnapshot(scan->rs_base.rs_snapshot);
13091354

0 commit comments

Comments
 (0)