@@ -163,47 +163,44 @@ create_partial_tempscan_plan(PlannerInfo *root, RelOptInfo *rel,
163
163
cscan -> custom_plans = custom_plans ;
164
164
cscan -> methods = & plan_methods ;
165
165
cscan -> flags = best_path -> flags ;
166
- cscan -> custom_private = best_path -> custom_private ;
166
+ cscan -> custom_private = list_make1 ( makeInteger ( best_path -> path . parallel_workers )) ;
167
167
168
168
return & cscan -> scan .plan ;
169
169
}
170
170
171
171
typedef struct SharedTempScanInfo
172
172
{
173
- int nworkers ;
173
+ int nworkers_launched ;
174
174
dsm_handle handle ;
175
175
} SharedTempScanInfo ;
176
176
177
- #define SharedTempScanInfoHeaderSize offsetof(SharedTempScanInfo, data)
178
-
179
- typedef struct TempScanInfo
180
- {
181
- shm_mq_handle * * tqueue ;
182
- DestReceiver * * receiver ;
183
- } TempScanInfo ;
184
-
185
177
typedef struct ParallelTempScanState
186
178
{
187
179
CustomScanState node ;
188
180
189
181
bool initialized ;
182
+ int nworkers ; /* workers planned. Needed to know how much resources to free */
190
183
DestReceiver * * receiver ; /* Must be NULL for workers */
191
- TempScanInfo ptsi ;
184
+ shm_mq_handle * * tqueue ;
185
+ ParallelContext * pcxt ;
192
186
SharedTempScanInfo * shared ;
193
187
194
188
TupleQueueReader * reader ;
189
+ bool parallelMode ;
195
190
} ParallelTempScanState ;
196
191
197
192
static Node *
198
193
create_tempscan_state (CustomScan * cscan )
199
194
{
200
195
ParallelTempScanState * ts = palloc0 (sizeof (ParallelTempScanState ));
201
196
CustomScanState * cstate = (CustomScanState * ) ts ;
197
+ int path_workers = linitial_node (Integer , cscan -> custom_private )-> ival ;
202
198
203
199
Assert (list_length (cscan -> custom_plans ) == 1 );
204
200
205
201
cstate -> ss .ps .type = T_CustomScanState ;
206
202
cstate -> methods = & exec_methods ;
203
+ ts -> parallelMode = (path_workers > 0 );
207
204
208
205
/*
209
206
* Setup slotOps manually. Although we just put incoming tuple to the result
@@ -213,7 +210,9 @@ create_tempscan_state(CustomScan *cscan)
213
210
cstate -> slotOps = & TTSOpsMinimalTuple ;
214
211
215
212
ts -> receiver = NULL ;
213
+ ts -> tqueue = NULL ;
216
214
ts -> initialized = false;
215
+
217
216
ts -> shared = NULL ;
218
217
219
218
if (!IsParallelWorker ())
@@ -270,76 +269,115 @@ ExecTempScan(CustomScanState *node)
270
269
{
271
270
ParallelTempScanState * ts = (ParallelTempScanState * ) node ;
272
271
TupleTableSlot * result = ts -> node .ss .ss_ScanTupleSlot ;
272
+ TupleTableSlot * slot ;
273
+ bool should_free ;
274
+ MinimalTuple tup ;
275
+ int i ;
273
276
274
277
/*
275
278
* HACK. At this point Custom DSM already initialised and we can switch off
276
279
* this parameter.
277
280
*/
278
- ts -> node .ss .ps .plan -> parallel_aware = false;
279
-
280
- /* Forbid rescanning */
281
- ts -> initialized = true;
281
+ if (ts -> pcxt -> nworkers_launched == 0 )
282
+ ts -> node .ss .ps .plan -> parallel_aware = false;
282
283
283
- if (! IsParallelWorker ())
284
+ if (IsParallelWorker ())
284
285
{
285
- TupleTableSlot * slot ;
286
- bool should_free ;
287
- MinimalTuple tuple ;
288
- int i ;
286
+ MinimalTuple tup ;
287
+ bool done ;
289
288
290
- Assert (list_length (node -> custom_ps ) == 1 );
289
+ /* Parallel worker should receive something from the tqueue */
290
+ tup = TupleQueueReaderNext (ts -> reader , false, & done );
291
291
292
- slot = ExecProcNode ((PlanState * ) linitial (node -> custom_ps ));
293
- if (TupIsNull (slot ))
292
+ if (done )
294
293
{
295
- if (ts -> ptsi .receiver != NULL )
296
- {
297
- for (i = 0 ; i < ts -> shared -> nworkers ; i ++ )
298
- {
299
- ts -> ptsi .receiver [i ]-> rDestroy (ts -> ptsi .receiver [i ]);
300
- ts -> ptsi .receiver [i ] = NULL ;
301
- ts -> ptsi .tqueue [i ] = NULL ;
302
- }
303
- pfree (ts -> ptsi .receiver );
304
- ts -> ptsi .receiver = NULL ;
305
- }
306
-
307
- /* The end of the table is achieved, Return empty tuple to all */
294
+ Assert (tup == NULL );
308
295
return NULL ;
309
296
}
310
297
298
+ /* TODO: should free ? */
299
+ ExecStoreMinimalTuple (tup , result , false);
300
+ result -> tts_ops -> copyslot (result , result );
301
+ return result ;
302
+ }
303
+
304
+ Assert (list_length (node -> custom_ps ) == 1 );
305
+
306
+ if (!ts -> initialized )
307
+ {
308
+ /*
309
+ * Save number of workers because we will need it on later
310
+ * stages of the execution.
311
+ */
312
+ ts -> shared -> nworkers_launched = ts -> pcxt -> nworkers_launched ;
313
+ ts -> initialized = true;
314
+ }
315
+
316
+ slot = ExecProcNode ((PlanState * ) linitial (node -> custom_ps ));
317
+ if (ts -> receiver == NULL )
318
+ return slot ;
319
+
320
+ if (TupIsNull (slot ))
321
+ {
322
+ /* Parallel workers case */
323
+ for (i = 0 ; i < ts -> shared -> nworkers_launched ; i ++ )
324
+ {
325
+ ts -> receiver [i ]-> rDestroy (ts -> receiver [i ]);
326
+ ts -> receiver [i ] = NULL ;
327
+ ts -> tqueue [i ] = NULL ;
328
+ }
329
+ pfree (ts -> receiver );
330
+ ts -> receiver = NULL ;
331
+ /* The end of the table is achieved, Return empty tuple to all */
332
+ return NULL ;
333
+ }
334
+
335
+ if (!ts -> parallelMode )
336
+ {
311
337
/* Prepare mimimal tuple to send all workers and upstream locally. */
312
- tuple = ExecFetchSlotMinimalTuple (slot , & should_free );
313
- ExecStoreMinimalTuple (tuple , result , should_free );
338
+ tup = ExecFetchSlotMinimalTuple (slot , & should_free );
339
+ ExecStoreMinimalTuple (tup , result , should_free );
314
340
315
- if (ts -> ptsi .receiver != NULL )
341
+ /* Send the same tuple to each of worker. Don't forget myself */
342
+ for (i = 0 ; i < ts -> shared -> nworkers_launched ; ++ i )
316
343
{
317
- for ( i = 0 ; i < ts -> shared -> nworkers ; ++ i )
318
- {
319
- ts -> ptsi . receiver [i ]-> receiveSlot (result , ts -> ptsi . receiver [i ]);
320
- }
344
+ bool ret ;
345
+
346
+ ret = ts -> receiver [i ]-> receiveSlot (result , ts -> receiver [i ]);
347
+ Assert ( ret );
321
348
}
349
+ return result ;
322
350
}
323
351
else
324
352
{
325
- MinimalTuple tup ;
326
- bool done ;
353
+ int nworkers = ts -> pcxt -> nworkers_launched ;
354
+ /* Overwise we should tuple only to one of the workers */
327
355
328
- /* Parallel worker should receive something from the tqueue */
329
- tup = TupleQueueReaderNext (ts -> reader , false, & done );
356
+ typedef struct TQueueDestReceiver
357
+ {
358
+ DestReceiver pub ; /* public fields */
359
+ shm_mq_handle * queue ; /* shm_mq to send to */
360
+ } TQueueDestReceiver ;
330
361
331
- if (done )
362
+ TQueueDestReceiver * rec ;
363
+
364
+ while (nworkers > 0 )
332
365
{
333
- Assert ( tup == NULL );
334
- return NULL ;
335
- }
366
+ /* Prepare mimimal tuple */
367
+ tup = ExecFetchSlotMinimalTuple ( slot , & should_free ) ;
368
+ ExecStoreMinimalTuple ( tup , result , should_free );
336
369
337
- /* TODO: should free ? */
338
- ExecStoreMinimalTuple (tup , result , false);
339
- result -> tts_ops -> copyslot (result , result );
370
+ for (i = 0 ; i < nworkers ; i ++ )
371
+ {
372
+ rec = (TQueueDestReceiver * ) ts -> receiver [i ];
373
+ result = shm_mq_send (tqueue -> queue , tuple -> t_len , tuple , false, false);
374
+ (void ) WaitLatch (MyLatch ,
375
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
376
+ (nap .tv_sec * 1000L ) + (nap .tv_usec / 1000L ),
377
+ WAIT_EVENT_AUTOVACUUM_MAIN );
378
+ }
379
+ }
340
380
}
341
-
342
- return result ;
343
381
}
344
382
345
383
static void
@@ -355,13 +393,13 @@ EndTempScan(CustomScanState *node)
355
393
ExecEndNode ((PlanState * ) linitial (node -> custom_ps ));
356
394
357
395
/* Can happen if not all tuples needed */
358
- if (ts -> ptsi . receiver != NULL )
396
+ if (ts -> receiver != NULL )
359
397
{
360
398
int i ;
361
399
362
- for (i = 0 ; i < ts -> shared -> nworkers ; ++ i )
400
+ for (i = 0 ; i < ts -> nworkers ; ++ i )
363
401
{
364
- ts -> ptsi . receiver [i ]-> rDestroy (ts -> ptsi . receiver [i ]);
402
+ ts -> receiver [i ]-> rDestroy (ts -> receiver [i ]);
365
403
}
366
404
}
367
405
}
@@ -454,14 +492,29 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
454
492
create_index_paths (root , rel );
455
493
create_tidscan_paths (root , rel );
456
494
495
+ if (rel -> consider_parallel && rel -> lateral_relids == NULL )
496
+ {
497
+ int parallel_workers ;
498
+
499
+ parallel_workers = compute_parallel_worker (rel , rel -> pages , -1 ,
500
+ max_parallel_workers_per_gather );
501
+
502
+ /* If any limit was set to zero, the user doesn't want a parallel scan. */
503
+ if (parallel_workers <= 0 )
504
+ return ;
505
+
506
+ /* Add an unordered partial path based on a parallel sequential scan. */
507
+ add_partial_path (rel , create_seqscan_path (root , rel , NULL , parallel_workers ));
508
+ }
509
+
457
510
/*
458
511
* Dangerous zone. But we assume it is strictly local. What about extension
459
512
* which could call ours and may have desire to add some partial paths after
460
513
* us?
461
514
*/
462
515
463
- list_free (rel -> partial_pathlist );
464
- rel -> partial_pathlist = NIL ;
516
+ // list_free(rel->partial_pathlist);
517
+ // rel->partial_pathlist = NIL;
465
518
466
519
/*
467
520
* Set guard over each parallel_safe path
@@ -488,8 +541,8 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
488
541
* lateral references guarantees we don't need to change any parameters
489
542
* on a ReScan?
490
543
*/
491
- add_path (rel , (Path * )
492
- create_material_path (cpath -> parent , (Path * ) cpath ));
544
+ add_path (rel , (Path * ) cpath
545
+ /* create_material_path(cpath->parent, (Path *) cpath)*/ );
493
546
}
494
547
495
548
list_free (parallel_safe_lst );
@@ -607,33 +660,44 @@ InitializeDSMTempScan(CustomScanState *node, ParallelContext *pcxt,
607
660
DSM_CREATE_NULL_IF_MAXSEGMENTS );
608
661
Assert (seg != NULL ); /* Don't process this case so far */
609
662
663
+ ts -> pcxt = pcxt ;
664
+
610
665
/* Save shared data for common usage in parallel workers */
611
666
ts -> shared = (SharedTempScanInfo * ) coordinate ;
612
667
ts -> shared -> handle = dsm_segment_handle (seg );
668
+ ts -> nworkers = pcxt -> nworkers ;
613
669
614
670
/*
615
- * Save number of workers because we will need it on later stages of the
616
- * execution.
671
+ * We can't initialise queues to workers here because not sure about real
672
+ * number of workers will be launched (depends on the number of free slots
673
+ * for background workers - see max_worker_processes).
617
674
*/
618
- ts -> shared -> nworkers = pcxt -> nworkers ;
619
675
620
- if (ts -> shared -> nworkers > 0 )
621
- {
622
- int i ;
623
- dsm_segment * seg = dsm_find_mapping (ts -> shared -> handle );
676
+ /*
677
+ * Initialise receivers here.
678
+ * We don't do it earlier because real number of launched workers
679
+ * will be known only after the Gather node launch them.
680
+ * Anyway, in the case of any troubles we can initialise them
681
+ * earlier and just not use the tail of them during the execution.
682
+ */
683
+ if (ts -> shared && ts -> nworkers > 0 )
684
+ {
685
+ int i ;
686
+ dsm_segment * seg = dsm_find_mapping (ts -> shared -> handle );
624
687
625
- ts -> ptsi . tqueue =
626
- ExecParallelSetupTupleQueues (ts -> shared -> nworkers ,
688
+ ts -> tqueue =
689
+ ExecParallelSetupTupleQueues (ts -> nworkers ,
627
690
(char * ) dsm_segment_address (seg ),
628
691
seg );
629
692
630
- ts -> ptsi .receiver = palloc (ts -> shared -> nworkers * sizeof (DestReceiver * ));
631
- for (i = 0 ; i < ts -> shared -> nworkers ; i ++ )
632
- {
633
- ts -> ptsi .receiver [i ] =
634
- CreateTupleQueueDestReceiver (ts -> ptsi .tqueue [i ]);
635
- }
693
+ ts -> receiver = palloc (ts -> nworkers * sizeof (DestReceiver * ));
694
+ for (i = 0 ; i < ts -> nworkers ; i ++ )
695
+ {
696
+ ts -> receiver [i ] = CreateTupleQueueDestReceiver (ts -> tqueue [i ]);
636
697
}
698
+ }
699
+ else
700
+ elog (WARNING , "Workers do not needed" );
637
701
}
638
702
639
703
static void
@@ -662,6 +726,7 @@ InitializeWorkerTempScan(CustomScanState *node, shm_toc *toc,
662
726
shm_mq_set_receiver (mq , MyProc );
663
727
664
728
ts -> reader = CreateTupleQueueReader (shm_mq_attach (mq , seg , NULL ));
729
+ ts -> initialized = true;
665
730
}
666
731
667
732
static void
0 commit comments