36
36
#include "executor/nodeGather.h"
37
37
#include "executor/nodeSubplan.h"
38
38
#include "executor/tqueue.h"
39
+ #include "miscadmin.h"
39
40
#include "utils/memutils.h"
40
41
#include "utils/rel.h"
41
42
42
43
43
44
static TupleTableSlot * gather_getnext (GatherState * gatherstate );
45
+ static HeapTuple gather_readnext (GatherState * gatherstate );
44
46
static void ExecShutdownGatherWorkers (GatherState * node );
45
47
46
48
@@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
125
127
TupleTableSlot *
126
128
ExecGather (GatherState * node )
127
129
{
130
+ TupleTableSlot * fslot = node -> funnel_slot ;
128
131
int i ;
129
132
TupleTableSlot * slot ;
130
133
TupleTableSlot * resultSlot ;
@@ -148,6 +151,7 @@ ExecGather(GatherState *node)
148
151
*/
149
152
if (gather -> num_workers > 0 && IsInParallelMode ())
150
153
{
154
+ ParallelContext * pcxt ;
151
155
bool got_any_worker = false;
152
156
153
157
/* Initialize the workers required to execute Gather node. */
@@ -160,18 +164,26 @@ ExecGather(GatherState *node)
160
164
* Register backend workers. We might not get as many as we
161
165
* requested, or indeed any at all.
162
166
*/
163
- LaunchParallelWorkers (node -> pei -> pcxt );
167
+ pcxt = node -> pei -> pcxt ;
168
+ LaunchParallelWorkers (pcxt );
164
169
165
- /* Set up a tuple queue to collect the results. */
166
- node -> funnel = CreateTupleQueueFunnel ();
167
- for (i = 0 ; i < node -> pei -> pcxt -> nworkers ; ++ i )
170
+ /* Set up tuple queue readers to read the results. */
171
+ if (pcxt -> nworkers > 0 )
168
172
{
169
- if (node -> pei -> pcxt -> worker [i ].bgwhandle )
173
+ node -> nreaders = 0 ;
174
+ node -> reader =
175
+ palloc (pcxt -> nworkers * sizeof (TupleQueueReader * ));
176
+
177
+ for (i = 0 ; i < pcxt -> nworkers ; ++ i )
170
178
{
179
+ if (pcxt -> worker [i ].bgwhandle == NULL )
180
+ continue ;
181
+
171
182
shm_mq_set_handle (node -> pei -> tqueue [i ],
172
- node -> pei -> pcxt -> worker [i ].bgwhandle );
173
- RegisterTupleQueueOnFunnel (node -> funnel ,
174
- node -> pei -> tqueue [i ]);
183
+ pcxt -> worker [i ].bgwhandle );
184
+ node -> reader [node -> nreaders ++ ] =
185
+ CreateTupleQueueReader (node -> pei -> tqueue [i ],
186
+ fslot -> tts_tupleDescriptor );
175
187
got_any_worker = true;
176
188
}
177
189
}
@@ -182,7 +194,7 @@ ExecGather(GatherState *node)
182
194
}
183
195
184
196
/* Run plan locally if no workers or not single-copy. */
185
- node -> need_to_scan_locally = (node -> funnel == NULL )
197
+ node -> need_to_scan_locally = (node -> reader == NULL )
186
198
|| !gather -> single_copy ;
187
199
node -> initialized = true;
188
200
}
@@ -254,13 +266,9 @@ ExecEndGather(GatherState *node)
254
266
}
255
267
256
268
/*
257
- * gather_getnext
258
- *
259
- * Get the next tuple from shared memory queue. This function
260
- * is responsible for fetching tuples from all the queues associated
261
- * with worker backends used in Gather node execution and if there is
262
- * no data available from queues or no worker is available, it does
263
- * fetch the data from local node.
269
+ * Read the next tuple. We might fetch a tuple from one of the tuple queues
270
+ * using gather_readnext, or if no tuple queue contains a tuple and the
271
+ * single_copy flag is not set, we might generate one locally instead.
264
272
*/
265
273
static TupleTableSlot *
266
274
gather_getnext (GatherState * gatherstate )
@@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate)
270
278
TupleTableSlot * fslot = gatherstate -> funnel_slot ;
271
279
HeapTuple tup ;
272
280
273
- while (gatherstate -> funnel != NULL || gatherstate -> need_to_scan_locally )
281
+ while (gatherstate -> reader != NULL || gatherstate -> need_to_scan_locally )
274
282
{
275
- if (gatherstate -> funnel != NULL )
283
+ if (gatherstate -> reader != NULL )
276
284
{
277
- bool done = false;
278
-
279
- /* wait only if local scan is done */
280
- tup = TupleQueueFunnelNext (gatherstate -> funnel ,
281
- gatherstate -> need_to_scan_locally ,
282
- & done );
283
- if (done )
284
- ExecShutdownGatherWorkers (gatherstate );
285
+ tup = gather_readnext (gatherstate );
285
286
286
287
if (HeapTupleIsValid (tup ))
287
288
{
@@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate)
309
310
return ExecClearTuple (fslot );
310
311
}
311
312
313
+ /*
314
+ * Attempt to read a tuple from one of our parallel workers.
315
+ */
316
+ static HeapTuple
317
+ gather_readnext (GatherState * gatherstate )
318
+ {
319
+ int waitpos = gatherstate -> nextreader ;
320
+
321
+ for (;;)
322
+ {
323
+ TupleQueueReader * reader ;
324
+ HeapTuple tup ;
325
+ bool readerdone ;
326
+
327
+ /* Make sure we've read all messages from workers. */
328
+ HandleParallelMessages ();
329
+
330
+ /* Attempt to read a tuple, but don't block if none is available. */
331
+ reader = gatherstate -> reader [gatherstate -> nextreader ];
332
+ tup = TupleQueueReaderNext (reader , true, & readerdone );
333
+
334
+ /*
335
+ * If this reader is done, remove it. If all readers are done,
336
+ * clean up remaining worker state.
337
+ */
338
+ if (readerdone )
339
+ {
340
+ DestroyTupleQueueReader (reader );
341
+ -- gatherstate -> nreaders ;
342
+ if (gatherstate -> nreaders == 0 )
343
+ {
344
+ ExecShutdownGather (gatherstate );
345
+ return NULL ;
346
+ }
347
+ else
348
+ {
349
+ memmove (& gatherstate -> reader [gatherstate -> nextreader ],
350
+ & gatherstate -> reader [gatherstate -> nextreader + 1 ],
351
+ sizeof (TupleQueueReader * )
352
+ * (gatherstate -> nreaders - gatherstate -> nextreader ));
353
+ if (gatherstate -> nextreader >= gatherstate -> nreaders )
354
+ gatherstate -> nextreader = 0 ;
355
+ if (gatherstate -> nextreader < waitpos )
356
+ -- waitpos ;
357
+ }
358
+ continue ;
359
+ }
360
+
361
+ /* Advance nextreader pointer in round-robin fashion. */
362
+ gatherstate -> nextreader =
363
+ (gatherstate -> nextreader + 1 ) % gatherstate -> nreaders ;
364
+
365
+ /* If we got a tuple, return it. */
366
+ if (tup )
367
+ return tup ;
368
+
369
+ /* Have we visited every TupleQueueReader? */
370
+ if (gatherstate -> nextreader == waitpos )
371
+ {
372
+ /*
373
+ * If (still) running plan locally, return NULL so caller can
374
+ * generate another tuple from the local copy of the plan.
375
+ */
376
+ if (gatherstate -> need_to_scan_locally )
377
+ return NULL ;
378
+
379
+ /* Nothing to do except wait for developments. */
380
+ WaitLatch (MyLatch , WL_LATCH_SET , 0 );
381
+ CHECK_FOR_INTERRUPTS ();
382
+ ResetLatch (MyLatch );
383
+ }
384
+ }
385
+ }
386
+
312
387
/* ----------------------------------------------------------------
313
388
* ExecShutdownGatherWorkers
314
389
*
@@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate)
320
395
void
321
396
ExecShutdownGatherWorkers (GatherState * node )
322
397
{
323
- /* Shut down tuple queue funnel before shutting down workers. */
324
- if (node -> funnel != NULL )
398
+ /* Shut down tuple queue readers before shutting down workers. */
399
+ if (node -> reader != NULL )
325
400
{
326
- DestroyTupleQueueFunnel (node -> funnel );
327
- node -> funnel = NULL ;
401
+ int i ;
402
+
403
+ for (i = 0 ; i < node -> nreaders ; ++ i )
404
+ DestroyTupleQueueReader (node -> reader [i ]);
405
+ node -> reader = NULL ;
328
406
}
329
407
330
408
/* Now shut down the workers. */
0 commit comments