Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 778e78a

Browse files
committed
Fix accumulation of parallel worker instrumentation.
When a Gather or Gather Merge node is started and stopped multiple times, the old code wouldn't reset the shared state between executions, potentially resulting in dramatically inflated instrumentation data for nodes beneath it. (The per-worker instrumentation ended up OK, I think, but the overall totals were inflated.) Report by hubert depesz lubaczewski. Analysis and fix by Amit Kapila, reviewed and tweaked a bit by me. Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com
1 parent ff14730 commit 778e78a

File tree

3 files changed

+66
-13
lines changed

3 files changed

+66
-13
lines changed

src/backend/executor/execParallel.c

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,19 @@ ExecParallelReinitialize(PlanState *planstate,
612612
/* Old workers must already be shut down */
613613
Assert(pei->finished);
614614

615+
/* Clear the instrumentation space from the last round. */
616+
if (pei->instrumentation)
617+
{
618+
Instrumentation *instrument;
619+
SharedExecutorInstrumentation *sh_instr;
620+
int i;
621+
622+
sh_instr = pei->instrumentation;
623+
instrument = GetInstrumentationArray(sh_instr);
624+
for (i = 0; i < sh_instr->num_workers * sh_instr->num_plan_nodes; ++i)
625+
InstrInit(&instrument[i], pei->planstate->state->es_instrument);
626+
}
627+
615628
ReinitializeParallelDSM(pei->pcxt);
616629
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
617630
pei->reader = NULL;
@@ -699,21 +712,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
699712
for (n = 0; n < instrumentation->num_workers; ++n)
700713
InstrAggNode(planstate->instrument, &instrument[n]);
701714

702-
/*
703-
* Also store the per-worker detail.
704-
*
705-
* Worker instrumentation should be allocated in the same context as the
706-
* regular instrumentation information, which is the per-query context.
707-
* Switch into per-query memory context.
708-
*/
709-
oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
710-
ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
711-
planstate->worker_instrument =
712-
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
713-
MemoryContextSwitchTo(oldcontext);
715+
if (!planstate->worker_instrument)
716+
{
717+
/*
718+
* Allocate space for the per-worker detail.
719+
*
720+
* Worker instrumentation should be allocated in the same context as
721+
* the regular instrumentation information, which is the per-query
722+
* context. Switch into per-query memory context.
723+
*/
724+
oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
725+
ibytes =
726+
mul_size(instrumentation->num_workers, sizeof(Instrumentation));
727+
planstate->worker_instrument =
728+
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
729+
MemoryContextSwitchTo(oldcontext);
730+
731+
for (n = 0; n < instrumentation->num_workers; ++n)
732+
InstrInit(&planstate->worker_instrument->instrument[n],
733+
planstate->state->es_instrument);
734+
}
714735

715736
planstate->worker_instrument->num_workers = instrumentation->num_workers;
716-
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
737+
738+
/* Accumulate the per-worker detail. */
739+
for (n = 0; n < instrumentation->num_workers; ++n)
740+
InstrAggNode(&planstate->worker_instrument->instrument[n],
741+
&instrument[n]);
717742

718743
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
719744
instrumentation);

src/test/regress/expected/select_parallel.out

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,28 @@ select count(*) from bmscantest where a>1;
300300
99999
301301
(1 row)
302302

303+
-- test accumulation of stats for parallel node
303304
reset enable_seqscan;
305+
alter table tenk2 set (parallel_workers = 0);
306+
explain (analyze, timing off, summary off, costs off)
307+
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
308+
and tenk2.thousand=0;
309+
QUERY PLAN
310+
--------------------------------------------------------------------------
311+
Aggregate (actual rows=1 loops=1)
312+
-> Nested Loop (actual rows=98000 loops=1)
313+
-> Seq Scan on tenk2 (actual rows=10 loops=1)
314+
Filter: (thousand = 0)
315+
Rows Removed by Filter: 9990
316+
-> Gather (actual rows=9800 loops=10)
317+
Workers Planned: 4
318+
Workers Launched: 4
319+
-> Parallel Seq Scan on tenk1 (actual rows=1960 loops=50)
320+
Filter: (hundred > 1)
321+
Rows Removed by Filter: 40
322+
(11 rows)
323+
324+
alter table tenk2 reset (parallel_workers);
304325
reset enable_indexscan;
305326
reset enable_hashjoin;
306327
reset enable_mergejoin;

src/test/regress/sql/select_parallel.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,14 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo
116116
create index i_bmtest ON bmscantest(a);
117117
select count(*) from bmscantest where a>1;
118118

119+
-- test accumulation of stats for parallel node
119120
reset enable_seqscan;
121+
alter table tenk2 set (parallel_workers = 0);
122+
explain (analyze, timing off, summary off, costs off)
123+
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
124+
and tenk2.thousand=0;
125+
alter table tenk2 reset (parallel_workers);
126+
120127
reset enable_indexscan;
121128
reset enable_hashjoin;
122129
reset enable_mergejoin;

0 commit comments

Comments
 (0)