Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8c4040e

Browse files
committed
Allocate hash join files in a separate memory context
Should a hash join exceed memory limit, the hashtable is split up into multiple batches. The number of batches is doubled each time a given batch is determined not to fit in memory. Each batch file is allocated with a block-sized buffer for buffering tuples and parallel hash join has additional sharedtuplestore accessor buffers. In some pathological cases requiring a lot of batches, often with skewed data, bad stats, or very large datasets, users can run out-of-memory solely from the memory overhead of all the batch files' buffers. Batch files were allocated in the ExecutorState memory context, making it very hard to identify when this batch explosion was the source of an OOM. This commit allocates the batch files in a dedicated memory context, making it easier to identify the cause of an OOM and work to avoid it. Based on initial draft by Tomas Vondra, with significant reworks and improvements by Jehan-Guillaume de Rorthais. Author: Jehan-Guillaume de Rorthais <jgdr@dalibo.com> Author: Tomas Vondra <tomas.vondra@enterprisedb.com> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/20190421114618.z3mpgmimc3rmubi4@development Discussion: https://postgr.es/m/20230504193006.1b5b9622%40karst#273020ff4061fc7a2fbb1ba96b281f17
1 parent 507615f commit 8c4040e

File tree

5 files changed

+88
-29
lines changed

5 files changed

+88
-29
lines changed

src/backend/executor/nodeHash.c

+33-13
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
484484
*
485485
* The hashtable control block is just palloc'd from the executor's
486486
* per-query memory context. Everything else should be kept inside the
487-
* subsidiary hashCxt or batchCxt.
487+
* subsidiary hashCxt, batchCxt or spillCxt.
488488
*/
489489
hashtable = palloc_object(HashJoinTableData);
490490
hashtable->nbuckets = nbuckets;
@@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
538538
"HashBatchContext",
539539
ALLOCSET_DEFAULT_SIZES);
540540

541+
hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
542+
"HashSpillContext",
543+
ALLOCSET_DEFAULT_SIZES);
544+
541545
/* Allocate data that will live for the life of the hashjoin */
542546

543547
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -570,12 +574,19 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
570574

571575
if (nbatch > 1 && hashtable->parallel_state == NULL)
572576
{
577+
MemoryContext oldctx;
578+
573579
/*
574580
* allocate and initialize the file arrays in hashCxt (not needed for
575581
* parallel case which uses shared tuplestores instead of raw files)
576582
*/
583+
oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
584+
577585
hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
578586
hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
587+
588+
MemoryContextSwitchTo(oldctx);
589+
579590
/* The files will not be opened until needed... */
580591
/* ... but make sure we have temp tablespaces established for them */
581592
PrepareTempTablespaces();
@@ -913,7 +924,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
913924
int oldnbatch = hashtable->nbatch;
914925
int curbatch = hashtable->curbatch;
915926
int nbatch;
916-
MemoryContext oldcxt;
917927
long ninmemory;
918928
long nfreed;
919929
HashMemoryChunk oldchunks;
@@ -934,13 +944,16 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
934944
hashtable, nbatch, hashtable->spaceUsed);
935945
#endif
936946

937-
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
938-
939947
if (hashtable->innerBatchFile == NULL)
940948
{
949+
MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
950+
941951
/* we had no file arrays before */
942952
hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
943953
hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
954+
955+
MemoryContextSwitchTo(oldcxt);
956+
944957
/* time to establish the temp tablespaces, too */
945958
PrepareTempTablespaces();
946959
}
@@ -951,8 +964,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
951964
hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
952965
}
953966

954-
MemoryContextSwitchTo(oldcxt);
955-
956967
hashtable->nbatch = nbatch;
957968

958969
/*
@@ -1024,7 +1035,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
10241035
Assert(batchno > curbatch);
10251036
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
10261037
hashTuple->hashvalue,
1027-
&hashtable->innerBatchFile[batchno]);
1038+
&hashtable->innerBatchFile[batchno],
1039+
hashtable);
10281040

10291041
hashtable->spaceUsed -= hashTupleSize;
10301042
nfreed++;
@@ -1683,7 +1695,8 @@ ExecHashTableInsert(HashJoinTable hashtable,
16831695
Assert(batchno > hashtable->curbatch);
16841696
ExecHashJoinSaveTuple(tuple,
16851697
hashvalue,
1686-
&hashtable->innerBatchFile[batchno]);
1698+
&hashtable->innerBatchFile[batchno],
1699+
hashtable);
16871700
}
16881701

16891702
if (shouldFree)
@@ -2664,7 +2677,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
26642677
/* Put the tuple into a temp file for later batches */
26652678
Assert(batchno > hashtable->curbatch);
26662679
ExecHashJoinSaveTuple(tuple, hashvalue,
2667-
&hashtable->innerBatchFile[batchno]);
2680+
&hashtable->innerBatchFile[batchno],
2681+
hashtable);
26682682
pfree(hashTuple);
26692683
hashtable->spaceUsed -= tupleSize;
26702684
hashtable->spaceUsedSkew -= tupleSize;
@@ -3093,8 +3107,11 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
30933107
pstate->nbatch = nbatch;
30943108
batches = dsa_get_address(hashtable->area, pstate->batches);
30953109

3096-
/* Use hash join memory context. */
3097-
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
3110+
/*
3111+
* Use hash join spill memory context to allocate accessors, including
3112+
* buffers for the temporary files.
3113+
*/
3114+
oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
30983115

30993116
/* Allocate this backend's accessor array. */
31003117
hashtable->nbatch = nbatch;
@@ -3196,8 +3213,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
31963213
*/
31973214
Assert(DsaPointerIsValid(pstate->batches));
31983215

3199-
/* Use hash join memory context. */
3200-
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
3216+
/*
3217+
* Use hash join spill memory context to allocate accessors, including
3218+
* buffers for the temporary files.
3219+
*/
3220+
oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
32013221

32023222
/* Allocate this backend's accessor array. */
32033223
hashtable->nbatch = pstate->nbatch;

src/backend/executor/nodeHashjoin.c

+25-6
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
495495
Assert(parallel_state == NULL);
496496
Assert(batchno > hashtable->curbatch);
497497
ExecHashJoinSaveTuple(mintuple, hashvalue,
498-
&hashtable->outerBatchFile[batchno]);
498+
&hashtable->outerBatchFile[batchno],
499+
hashtable);
499500

500501
if (shouldFree)
501502
heap_free_minimal_tuple(mintuple);
@@ -1317,21 +1318,39 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
13171318
* The data recorded in the file for each tuple is its hash value,
13181319
* then the tuple in MinimalTuple format.
13191320
*
1320-
* Note: it is important always to call this in the regular executor
1321-
* context, not in a shorter-lived context; else the temp file buffers
1322-
* will get messed up.
1321+
* fileptr points to a batch file in one of the the hashtable arrays.
1322+
*
1323+
* The batch files (and their buffers) are allocated in the spill context
1324+
* created for the hashtable.
13231325
*/
13241326
void
13251327
ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1326-
BufFile **fileptr)
1328+
BufFile **fileptr, HashJoinTable hashtable)
13271329
{
13281330
BufFile *file = *fileptr;
13291331

1332+
/*
1333+
* The batch file is lazily created. If this is the first tuple
1334+
* written to this batch, the batch file is created and its buffer is
1335+
* allocated in the spillCxt context, NOT in the batchCxt.
1336+
*
1337+
* During the build phase, buffered files are created for inner
1338+
* batches. Each batch's buffered file is closed (and its buffer freed)
1339+
* after the batch is loaded into memory during the outer side scan.
1340+
* Therefore, it is necessary to allocate the batch file buffer in a
1341+
* memory context which outlives the batch itself.
1342+
*
1343+
* Also, we use spillCxt instead of hashCxt for a better accounting of
1344+
* the spilling memory consumption.
1345+
*/
13301346
if (file == NULL)
13311347
{
1332-
/* First write to this batch file, so open it. */
1348+
MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1349+
13331350
file = BufFileCreateTemp(false);
13341351
*fileptr = file;
1352+
1353+
MemoryContextSwitchTo(oldctx);
13351354
}
13361355

13371356
BufFileWrite(file, &hashvalue, sizeof(uint32));

src/backend/utils/sort/sharedtuplestore.c

+8
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,15 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
308308
{
309309
SharedTuplestoreParticipant *participant;
310310
char name[MAXPGPATH];
311+
MemoryContext oldcxt;
311312

312313
/* Create one. Only this backend will write into it. */
313314
sts_filename(name, accessor, accessor->participant);
315+
316+
oldcxt = MemoryContextSwitchTo(accessor->context);
314317
accessor->write_file =
315318
BufFileCreateFileSet(&accessor->fileset->fs, name);
319+
MemoryContextSwitchTo(oldcxt);
316320

317321
/* Set up the shared state for this backend's file. */
318322
participant = &accessor->sts->participants[accessor->participant];
@@ -527,11 +531,15 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
527531
if (accessor->read_file == NULL)
528532
{
529533
char name[MAXPGPATH];
534+
MemoryContext oldcxt;
530535

531536
sts_filename(name, accessor, accessor->read_participant);
537+
538+
oldcxt = MemoryContextSwitchTo(accessor->context);
532539
accessor->read_file =
533540
BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
534541
false);
542+
MemoryContextSwitchTo(oldcxt);
535543
}
536544

537545
/* Seek and load the chunk header. */

src/include/executor/hashjoin.h

+21-9
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,33 @@
2323
/* ----------------------------------------------------------------
2424
* hash-join hash table structures
2525
*
26-
* Each active hashjoin has a HashJoinTable control block, which is
27-
* palloc'd in the executor's per-query context. All other storage needed
28-
* for the hashjoin is kept in private memory contexts, two for each hashjoin.
29-
* This makes it easy and fast to release the storage when we don't need it
30-
* anymore. (Exception: data associated with the temp files lives in the
31-
* per-query context too, since we always call buffile.c in that context.)
26+
* Each active hashjoin has a HashJoinTable structure, which is
27+
* palloc'd in the executor's per-query context. Other storage needed for
28+
* each hashjoin is kept in child contexts, three for each hashjoin:
29+
* - HashTableContext (hashCxt): the parent hash table storage context
30+
* - HashSpillContext (spillCxt): storage for temp files buffers
31+
* - HashBatchContext (batchCxt): storage for a batch in serial hash join
3232
*
3333
* The hashtable contexts are made children of the per-query context, ensuring
3434
* that they will be discarded at end of statement even if the join is
3535
* aborted early by an error. (Likewise, any temporary files we make will
3636
* be cleaned up by the virtual file manager in event of an error.)
3737
*
3838
* Storage that should live through the entire join is allocated from the
39-
* "hashCxt", while storage that is only wanted for the current batch is
40-
* allocated in the "batchCxt". By resetting the batchCxt at the end of
41-
* each batch, we free all the per-batch storage reliably and without tedium.
39+
* "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is
40+
* the parent of "spillCxt" and "batchCxt". It makes it easy and fast to
41+
* release the storage when we don't need it anymore.
42+
*
43+
* Data associated with temp files is allocated in the "spillCxt" context
44+
* which lives for the duration of the entire join as batch files'
45+
* creation and usage may span batch execution. These files are
46+
* explicitly destroyed by calling BufFileClose() when the code is done
47+
* with them. The aim of this context is to help accounting for the
48+
* memory allocated for temp files and their buffers.
49+
*
50+
* Finally, data used only during a single batch's execution is allocated
51+
* in the "batchCxt". By resetting the batchCxt at the end of each batch,
52+
* we free all the per-batch storage reliably and without tedium.
4253
*
4354
* During first scan of inner relation, we get its tuples from executor.
4455
* If nbatch > 1 then tuples that don't belong in first batch get saved
@@ -350,6 +361,7 @@ typedef struct HashJoinTableData
350361

351362
MemoryContext hashCxt; /* context for whole-hash-join storage */
352363
MemoryContext batchCxt; /* context for this-batch-only storage */
364+
MemoryContext spillCxt; /* context for spilling to temp files */
353365

354366
/* used for dense allocation of tuples (into linked chunks) */
355367
HashMemoryChunk chunks; /* one list for the whole batch */

src/include/executor/nodeHashjoin.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state,
2929
ParallelWorkerContext *pwcxt);
3030

3131
extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
32-
BufFile **fileptr);
32+
BufFile **fileptr, HashJoinTable hashtable);
3333

3434
#endif /* NODEHASHJOIN_H */

0 commit comments

Comments
 (0)