@@ -143,7 +143,6 @@ typedef struct BrinLeader
143
143
*/
144
144
BrinShared * brinshared ;
145
145
Sharedsort * sharedsort ;
146
- Snapshot snapshot ;
147
146
WalUsage * walusage ;
148
147
BufferUsage * bufferusage ;
149
148
} BrinLeader ;
@@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
231
230
static void _brin_begin_parallel (BrinBuildState * buildstate , Relation heap , Relation index ,
232
231
bool isconcurrent , int request );
233
232
static void _brin_end_parallel (BrinLeader * brinleader , BrinBuildState * state );
234
- static Size _brin_parallel_estimate_shared (Relation heap , Snapshot snapshot );
233
+ static Size _brin_parallel_estimate_shared (Relation heap );
235
234
static double _brin_parallel_heapscan (BrinBuildState * state );
236
235
static double _brin_parallel_merge (BrinBuildState * state );
237
236
static void _brin_leader_participate_as_worker (BrinBuildState * buildstate ,
@@ -1221,7 +1220,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1221
1220
reltuples = _brin_parallel_merge (state );
1222
1221
1223
1222
_brin_end_parallel (state -> bs_leader , state );
1224
- Assert (!indexInfo -> ii_Concurrent || !TransactionIdIsValid (MyProc -> xmin ));
1225
1223
}
1226
1224
else /* no parallel index build */
1227
1225
{
@@ -1254,7 +1252,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1254
1252
brin_fill_empty_ranges (state ,
1255
1253
state -> bs_currRangeStart ,
1256
1254
state -> bs_maxRangeStart );
1257
- Assert (!indexInfo -> ii_Concurrent || !TransactionIdIsValid (MyProc -> xmin ));
1258
1255
}
1259
1256
1260
1257
/* release resources */
@@ -1269,6 +1266,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1269
1266
1270
1267
result -> heap_tuples = reltuples ;
1271
1268
result -> index_tuples = idxtuples ;
1269
+ Assert (!indexInfo -> ii_Concurrent || !TransactionIdIsValid (MyProc -> xid ));
1272
1270
1273
1271
return result ;
1274
1272
}
@@ -2368,7 +2366,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2368
2366
{
2369
2367
ParallelContext * pcxt ;
2370
2368
int scantuplesortstates ;
2371
- Snapshot snapshot ;
2372
2369
Size estbrinshared ;
2373
2370
Size estsort ;
2374
2371
BrinShared * brinshared ;
@@ -2399,25 +2396,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2399
2396
* Prepare for scan of the base relation. In a normal index build, we use
2400
2397
* SnapshotAny because we must retrieve all tuples and do our own time
2401
2398
* qual checks (because we have to index RECENTLY_DEAD tuples). In a
2402
- * concurrent build, we take a regular MVCC snapshot and index whatever's
2403
- * live according to that.
2399
+ * concurrent build, we take a regular MVCC snapshot and push it as active.
2400
+ * Later we index whatever's live according to that snapshot while that
2401
+ * snapshot is reset periodically.
2404
2402
*/
2405
2403
if (!isconcurrent )
2406
2404
{
2407
2405
Assert (ActiveSnapshotSet ());
2408
- snapshot = SnapshotAny ;
2409
2406
need_pop_active_snapshot = false;
2410
2407
}
2411
2408
else
2412
2409
{
2413
- snapshot = RegisterSnapshot ( GetTransactionSnapshot ());
2410
+ Assert (! ActiveSnapshotSet ());
2414
2411
PushActiveSnapshot (GetTransactionSnapshot ());
2415
2412
}
2416
2413
2417
2414
/*
2418
2415
* Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2419
2416
*/
2420
- estbrinshared = _brin_parallel_estimate_shared (heap , snapshot );
2417
+ estbrinshared = _brin_parallel_estimate_shared (heap );
2421
2418
shm_toc_estimate_chunk (& pcxt -> estimator , estbrinshared );
2422
2419
estsort = tuplesort_estimate_shared (scantuplesortstates );
2423
2420
shm_toc_estimate_chunk (& pcxt -> estimator , estsort );
@@ -2457,8 +2454,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2457
2454
{
2458
2455
if (need_pop_active_snapshot )
2459
2456
PopActiveSnapshot ();
2460
- if (IsMVCCSnapshot (snapshot ))
2461
- UnregisterSnapshot (snapshot );
2462
2457
DestroyParallelContext (pcxt );
2463
2458
ExitParallelMode ();
2464
2459
return ;
@@ -2483,7 +2478,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2483
2478
2484
2479
table_parallelscan_initialize (heap ,
2485
2480
ParallelTableScanFromBrinShared (brinshared ),
2486
- snapshot );
2481
+ isconcurrent ? InvalidSnapshot : SnapshotAny ,
2482
+ isconcurrent );
2487
2483
2488
2484
/*
2489
2485
* Store shared tuplesort-private state, for which we reserved space.
@@ -2529,7 +2525,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2529
2525
brinleader -> nparticipanttuplesorts ++ ;
2530
2526
brinleader -> brinshared = brinshared ;
2531
2527
brinleader -> sharedsort = sharedsort ;
2532
- brinleader -> snapshot = snapshot ;
2533
2528
brinleader -> walusage = walusage ;
2534
2529
brinleader -> bufferusage = bufferusage ;
2535
2530
@@ -2545,6 +2540,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2545
2540
/* Save leader state now that it's clear build will be parallel */
2546
2541
buildstate -> bs_leader = brinleader ;
2547
2542
2543
+ /*
2544
+ * In case of concurrent build snapshots are going to be reset periodically.
2545
+ * We need to wait until all workers imported initial snapshot.
2546
+ */
2547
+ if (isconcurrent )
2548
+ WaitForParallelWorkersToAttach (pcxt , true);
2549
+
2548
2550
/* Join heap scan ourselves */
2549
2551
if (leaderparticipates )
2550
2552
_brin_leader_participate_as_worker (buildstate , heap , index );
@@ -2553,7 +2555,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2553
2555
* Caller needs to wait for all launched workers when we return. Make
2554
2556
* sure that the failure-to-start case will not hang forever.
2555
2557
*/
2556
- WaitForParallelWorkersToAttach (pcxt );
2558
+ if (!isconcurrent )
2559
+ WaitForParallelWorkersToAttach (pcxt , false);
2557
2560
if (need_pop_active_snapshot )
2558
2561
PopActiveSnapshot ();
2559
2562
}
@@ -2576,9 +2579,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2576
2579
for (i = 0 ; i < brinleader -> pcxt -> nworkers_launched ; i ++ )
2577
2580
InstrAccumParallelQuery (& brinleader -> bufferusage [i ], & brinleader -> walusage [i ]);
2578
2581
2579
- /* Free last reference to MVCC snapshot, if one was used */
2580
- if (IsMVCCSnapshot (brinleader -> snapshot ))
2581
- UnregisterSnapshot (brinleader -> snapshot );
2582
2582
DestroyParallelContext (brinleader -> pcxt );
2583
2583
ExitParallelMode ();
2584
2584
}
@@ -2778,14 +2778,14 @@ _brin_parallel_merge(BrinBuildState *state)
2778
2778
2779
2779
/*
2780
2780
* Returns size of shared memory required to store state for a parallel
2781
- * brin index build based on the snapshot its parallel scan will use .
2781
+ * brin index build.
2782
2782
*/
2783
2783
static Size
2784
- _brin_parallel_estimate_shared (Relation heap , Snapshot snapshot )
2784
+ _brin_parallel_estimate_shared (Relation heap )
2785
2785
{
2786
2786
/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2787
2787
return add_size (BUFFERALIGN (sizeof (BrinShared )),
2788
- table_parallelscan_estimate (heap , snapshot ));
2788
+ table_parallelscan_estimate (heap , InvalidSnapshot ));
2789
2789
}
2790
2790
2791
2791
/*
@@ -2807,6 +2807,7 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
2807
2807
/* Perform work common to all participants */
2808
2808
_brin_parallel_scan_and_build (buildstate , brinleader -> brinshared ,
2809
2809
brinleader -> sharedsort , heap , index , sortmem , true);
2810
+ Assert (!brinleader -> brinshared -> isconcurrent || !TransactionIdIsValid (MyProc -> xid ));
2810
2811
}
2811
2812
2812
2813
/*
@@ -2947,6 +2948,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2947
2948
2948
2949
_brin_parallel_scan_and_build (buildstate , brinshared , sharedsort ,
2949
2950
heapRel , indexRel , sortmem , false);
2951
+ if (brinshared -> isconcurrent )
2952
+ {
2953
+ PopActiveSnapshot ();
2954
+ InvalidateCatalogSnapshot ();
2955
+ Assert (!TransactionIdIsValid (MyProc -> xid ));
2956
+ PushActiveSnapshot (GetTransactionSnapshot ());
2957
+ }
2950
2958
2951
2959
/* Report WAL/buffer usage during parallel execution */
2952
2960
bufferusage = shm_toc_lookup (toc , PARALLEL_KEY_BUFFER_USAGE , false);
0 commit comments