Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Make EXPLAIN report maximum hashtable usage across multiple rescans.
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 11 Apr 2020 16:39:19 +0000 (12:39 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 11 Apr 2020 16:39:19 +0000 (12:39 -0400)
Before discarding the old hash table in ExecReScanHashJoin, capture
its statistics, ensuring that we report the maximum hashtable size
across repeated rescans of the hash input relation.  We can repurpose
the existing code for reporting hashtable size in parallel workers
to help with this, making the patch pretty small.  This also ensures
that if rescans happen within parallel workers, we get the correct
maximums across all instances.

Konstantin Knizhnik and Tom Lane, per diagnosis by Thomas Munro
of a trouble report from Alvaro Herrera.

Discussion: https://postgr.es/m/20200323165059.GA24950@alvherre.pgsql

src/backend/commands/explain.c
src/backend/executor/nodeHash.c
src/backend/executor/nodeHashjoin.c
src/include/executor/nodeHash.h
src/include/nodes/execnodes.h

index 455f54ef83feb44a47cb95fb16b12ee39b561ea0..f3c8da1e01bccefb7d02a9193194959e329ecd8e 100644 (file)
@@ -2964,22 +2964,25 @@ show_hash_info(HashState *hashstate, ExplainState *es)
    HashInstrumentation hinstrument = {0};
 
    /*
+    * Collect stats from the local process, even when it's a parallel query.
     * In a parallel query, the leader process may or may not have run the
     * hash join, and even if it did it may not have built a hash table due to
     * timing (if it started late it might have seen no tuples in the outer
     * relation and skipped building the hash table).  Therefore we have to be
     * prepared to get instrumentation data from all participants.
     */
-   if (hashstate->hashtable)
-       ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
+   if (hashstate->hinstrument)
+       memcpy(&hinstrument, hashstate->hinstrument,
+              sizeof(HashInstrumentation));
 
    /*
     * Merge results from workers.  In the parallel-oblivious case, the
     * results from all participants should be identical, except where
     * participants didn't run the join at all so have no data.  In the
     * parallel-aware case, we need to consider all the results.  Each worker
-    * may have seen a different subset of batches and we want to find the
-    * highest memory usage for any one batch across all batches.
+    * may have seen a different subset of batches and we want to report the
+    * highest memory usage across all batches.  We take the maxima of other
+    * values too, for the same reasons as in ExecHashAccumInstrumentation.
     */
    if (hashstate->shared_info)
    {
@@ -2990,31 +2993,16 @@ show_hash_info(HashState *hashstate, ExplainState *es)
        {
            HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
 
-           if (worker_hi->nbatch > 0)
-           {
-               /*
-                * Every participant should agree on the buckets, so to be
-                * sure we have a value we'll just overwrite each time.
-                */
-               hinstrument.nbuckets = worker_hi->nbuckets;
-               hinstrument.nbuckets_original = worker_hi->nbuckets_original;
-
-               /*
-                * Normally every participant should agree on the number of
-                * batches too, but it's possible for a backend that started
-                * late and missed the whole join not to have the final nbatch
-                * number.  So we'll take the largest number.
-                */
-               hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
-               hinstrument.nbatch_original = worker_hi->nbatch_original;
-
-               /*
-                * In a parallel-aware hash join, for now we report the
-                * maximum peak memory reported by any worker.
-                */
-               hinstrument.space_peak =
-                   Max(hinstrument.space_peak, worker_hi->space_peak);
-           }
+           hinstrument.nbuckets = Max(hinstrument.nbuckets,
+                                      worker_hi->nbuckets);
+           hinstrument.nbuckets_original = Max(hinstrument.nbuckets_original,
+                                               worker_hi->nbuckets_original);
+           hinstrument.nbatch = Max(hinstrument.nbatch,
+                                    worker_hi->nbatch);
+           hinstrument.nbatch_original = Max(hinstrument.nbatch_original,
+                                             worker_hi->nbatch_original);
+           hinstrument.space_peak = Max(hinstrument.space_peak,
+                                        worker_hi->space_peak);
        }
    }
 
index c881dc1de81a57405cbbd7049be225c76d21acb5..5da13ada726ad8779348a03c47a9ae568430a062 100644 (file)
@@ -2597,7 +2597,10 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
    size = offsetof(SharedHashInfo, hinstrument) +
        pcxt->nworkers * sizeof(HashInstrumentation);
    node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+
+   /* Each per-worker area must start out as zeroes. */
    memset(node->shared_info, 0, size);
+
    node->shared_info->num_workers = pcxt->nworkers;
    shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
                   node->shared_info);
@@ -2616,22 +2619,33 @@ ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
    if (!node->ps.instrument)
        return;
 
+   /*
+    * Find our entry in the shared area, and set up a pointer to it so that
+    * we'll accumulate stats there when shutting down or rebuilding the hash
+    * table.
+    */
    shared_info = (SharedHashInfo *)
        shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
    node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
 }
 
 /*
- * Copy instrumentation data from this worker's hash table (if it built one)
- * to DSM memory so the leader can retrieve it.  This must be done in an
- * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
- * we've detached from the DSM segment.
+ * Collect EXPLAIN stats if needed, saving them into DSM memory if
+ * ExecHashInitializeWorker was called, or local storage if not.  In the
+ * parallel case, this must be done in ExecShutdownHash() rather than
+ * ExecEndHash() because the latter runs after we've detached from the DSM
+ * segment.
  */
 void
 ExecShutdownHash(HashState *node)
 {
+   /* Allocate save space if EXPLAIN'ing and we didn't do so already */
+   if (node->ps.instrument && !node->hinstrument)
+       node->hinstrument = (HashInstrumentation *)
+           palloc0(sizeof(HashInstrumentation));
+   /* Now accumulate data for the current (final) hash table */
    if (node->hinstrument && node->hashtable)
-       ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+       ExecHashAccumInstrumentation(node->hinstrument, node->hashtable);
 }
 
 /*
@@ -2655,18 +2669,34 @@ ExecHashRetrieveInstrumentation(HashState *node)
 }
 
 /*
- * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
- * struct.
+ * Accumulate instrumentation data from 'hashtable' into an
+ * initially-zeroed HashInstrumentation struct.
+ *
+ * This is used to merge information across successive hash table instances
+ * within a single plan node.  We take the maximum values of each interesting
+ * number.  The largest nbuckets and largest nbatch values might have occurred
+ * in different instances, so there's some risk of confusion from reporting
+ * unrelated numbers; but there's a bigger risk of misdiagnosing a performance
+ * issue if we don't report the largest values.  Similarly, we want to report
+ * the largest spacePeak regardless of whether it happened in the same
+ * instance as the largest nbuckets or nbatch.  All the instances should have
+ * the same nbuckets_original and nbatch_original; but there's little value
+ * in depending on that here, so handle them the same way.
  */
 void
-ExecHashGetInstrumentation(HashInstrumentation *instrument,
-                          HashJoinTable hashtable)
+ExecHashAccumInstrumentation(HashInstrumentation *instrument,
+                            HashJoinTable hashtable)
 {
-   instrument->nbuckets = hashtable->nbuckets;
-   instrument->nbuckets_original = hashtable->nbuckets_original;
-   instrument->nbatch = hashtable->nbatch;
-   instrument->nbatch_original = hashtable->nbatch_original;
-   instrument->space_peak = hashtable->spacePeak;
+   instrument->nbuckets = Max(instrument->nbuckets,
+                              hashtable->nbuckets);
+   instrument->nbuckets_original = Max(instrument->nbuckets_original,
+                                       hashtable->nbuckets_original);
+   instrument->nbatch = Max(instrument->nbatch,
+                            hashtable->nbatch);
+   instrument->nbatch_original = Max(instrument->nbatch_original,
+                                     hashtable->nbatch_original);
+   instrument->space_peak = Max(instrument->space_peak,
+                                hashtable->spacePeak);
 }
 
 /*
index 9e28ddd8951e0545926572ba4c32c9a58acbdd2a..cc8edacdd01799152203a0a8ac41f442ac395946 100644 (file)
@@ -1338,8 +1338,16 @@ ExecReScanHashJoin(HashJoinState *node)
            /* must destroy and rebuild hash table */
            HashState  *hashNode = castNode(HashState, innerPlanState(node));
 
-           /* for safety, be sure to clear child plan node's pointer too */
            Assert(hashNode->hashtable == node->hj_HashTable);
+           /* accumulate stats from old hash table, if wanted */
+           /* (this should match ExecShutdownHash) */
+           if (hashNode->ps.instrument && !hashNode->hinstrument)
+               hashNode->hinstrument = (HashInstrumentation *)
+                   palloc0(sizeof(HashInstrumentation));
+           if (hashNode->hinstrument)
+               ExecHashAccumInstrumentation(hashNode->hinstrument,
+                                            hashNode->hashtable);
+           /* for safety, be sure to clear child plan node's pointer too */
            hashNode->hashtable = NULL;
 
            ExecHashTableDestroy(node->hj_HashTable);
index 1336fde6b4d5e941282c784da5bb8f59b204e302..64d2ce693ca7d40e4a02e7d20241e3ee16e9b5f7 100644 (file)
@@ -73,7 +73,7 @@ extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
 extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
 extern void ExecHashRetrieveInstrumentation(HashState *node);
 extern void ExecShutdownHash(HashState *node);
-extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
-                                      HashJoinTable hashtable);
+extern void ExecHashAccumInstrumentation(HashInstrumentation *instrument,
+                                        HashJoinTable hashtable);
 
 #endif                         /* NODEHASH_H */
index 4c009b1a7c5432901d280e441b743a5c5f3f0505..4fee043bb2be1051407f39a27a96ad9ea1c06e1e 100644 (file)
@@ -2358,7 +2358,7 @@ typedef struct HashInstrumentation
    int         nbuckets_original;  /* planned number of buckets */
    int         nbatch;         /* number of batches at end of execution */
    int         nbatch_original;    /* planned number of batches */
-   size_t      space_peak;     /* peak memory usage in bytes */
+   Size        space_peak;     /* peak memory usage in bytes */
 } HashInstrumentation;
 
 /* ----------------
@@ -2381,8 +2381,20 @@ typedef struct HashState
    HashJoinTable hashtable;    /* hash table for the hashjoin */
    List       *hashkeys;       /* list of ExprState nodes */
 
-   SharedHashInfo *shared_info;    /* one entry per worker */
-   HashInstrumentation *hinstrument;   /* this worker's entry */
+   /*
+    * In a parallelized hash join, the leader retains a pointer to the
+    * shared-memory stats area in its shared_info field, and then copies the
+    * shared-memory info back to local storage before DSM shutdown.  The
+    * shared_info field remains NULL in workers, or in non-parallel joins.
+    */
+   SharedHashInfo *shared_info;
+
+   /*
+    * If we are collecting hash stats, this points to an initially-zeroed
+    * collection area, which could be either local storage or in shared
+    * memory; either way it's for just one process.
+    */
+   HashInstrumentation *hinstrument;
 
    /* Parallel hash state. */
    struct ParallelHashJoinState *parallel_state;