Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2c09a5c

Browse files
committed
Fix accumulation of parallel worker instrumentation.
When a Gather or Gather Merge node is started and stopped multiple times, the old code wouldn't reset the shared state between executions, potentially resulting in dramatically inflated instrumentation data for nodes beneath it. (The per-worker instrumentation ended up OK, I think, but the overall totals were inflated.) Report by hubert depesz lubaczewski. Analysis and fix by Amit Kapila, reviewed and tweaked a bit by me. Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com
1 parent 5bcf389 commit 2c09a5c

File tree

3 files changed

+66
-13
lines changed

3 files changed

+66
-13
lines changed

src/backend/executor/execParallel.c

+38-13
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,19 @@ ExecParallelReinitialize(PlanState *planstate,
808808
/* Old workers must already be shut down */
809809
Assert(pei->finished);
810810

811+
/* Clear the instrumentation space from the last round. */
812+
if (pei->instrumentation)
813+
{
814+
Instrumentation *instrument;
815+
SharedExecutorInstrumentation *sh_instr;
816+
int i;
817+
818+
sh_instr = pei->instrumentation;
819+
instrument = GetInstrumentationArray(sh_instr);
820+
for (i = 0; i < sh_instr->num_workers * sh_instr->num_plan_nodes; ++i)
821+
InstrInit(&instrument[i], pei->planstate->state->es_instrument);
822+
}
823+
811824
/* Force parameters we're going to pass to workers to be evaluated. */
812825
ExecEvalParamExecParams(sendParams, estate);
813826

@@ -925,21 +938,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
925938
for (n = 0; n < instrumentation->num_workers; ++n)
926939
InstrAggNode(planstate->instrument, &instrument[n]);
927940

928-
/*
929-
* Also store the per-worker detail.
930-
*
931-
* Worker instrumentation should be allocated in the same context as the
932-
* regular instrumentation information, which is the per-query context.
933-
* Switch into per-query memory context.
934-
*/
935-
oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
936-
ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
937-
planstate->worker_instrument =
938-
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
939-
MemoryContextSwitchTo(oldcontext);
941+
if (!planstate->worker_instrument)
942+
{
943+
/*
944+
* Allocate space for the per-worker detail.
945+
*
946+
* Worker instrumentation should be allocated in the same context as
947+
* the regular instrumentation information, which is the per-query
948+
* context. Switch into per-query memory context.
949+
*/
950+
oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
951+
ibytes =
952+
mul_size(instrumentation->num_workers, sizeof(Instrumentation));
953+
planstate->worker_instrument =
954+
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
955+
MemoryContextSwitchTo(oldcontext);
956+
957+
for (n = 0; n < instrumentation->num_workers; ++n)
958+
InstrInit(&planstate->worker_instrument->instrument[n],
959+
planstate->state->es_instrument);
960+
}
940961

941962
planstate->worker_instrument->num_workers = instrumentation->num_workers;
942-
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
963+
964+
/* Accumulate the per-worker detail. */
965+
for (n = 0; n < instrumentation->num_workers; ++n)
966+
InstrAggNode(&planstate->worker_instrument->instrument[n],
967+
&instrument[n]);
943968

944969
/* Perform any node-type-specific work that needs to be done. */
945970
switch (nodeTag(planstate))

src/test/regress/expected/select_parallel.out

+21
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,28 @@ select count(*) from bmscantest where a>1;
378378
99999
379379
(1 row)
380380

381+
-- test accumulation of stats for parallel node
381382
reset enable_seqscan;
383+
alter table tenk2 set (parallel_workers = 0);
384+
explain (analyze, timing off, summary off, costs off)
385+
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
386+
and tenk2.thousand=0;
387+
QUERY PLAN
388+
--------------------------------------------------------------------------
389+
Aggregate (actual rows=1 loops=1)
390+
-> Nested Loop (actual rows=98000 loops=1)
391+
-> Seq Scan on tenk2 (actual rows=10 loops=1)
392+
Filter: (thousand = 0)
393+
Rows Removed by Filter: 9990
394+
-> Gather (actual rows=9800 loops=10)
395+
Workers Planned: 4
396+
Workers Launched: 4
397+
-> Parallel Seq Scan on tenk1 (actual rows=1960 loops=50)
398+
Filter: (hundred > 1)
399+
Rows Removed by Filter: 40
400+
(11 rows)
401+
402+
alter table tenk2 reset (parallel_workers);
382403
reset enable_indexscan;
383404
reset enable_hashjoin;
384405
reset enable_mergejoin;

src/test/regress/sql/select_parallel.sql

+7
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,14 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo
149149
create index i_bmtest ON bmscantest(a);
150150
select count(*) from bmscantest where a>1;
151151

152+
-- test accumulation of stats for parallel node
152153
reset enable_seqscan;
154+
alter table tenk2 set (parallel_workers = 0);
155+
explain (analyze, timing off, summary off, costs off)
156+
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
157+
and tenk2.thousand=0;
158+
alter table tenk2 reset (parallel_workers);
159+
153160
reset enable_indexscan;
154161
reset enable_hashjoin;
155162
reset enable_mergejoin;

0 commit comments

Comments
 (0)