@@ -268,21 +268,23 @@ typedef struct AggStatePerTransData
268
268
*/
269
269
int numInputs ;
270
270
271
- /*
272
- * At each input row, we evaluate all argument expressions needed for all
273
- * the aggregates in this Agg node in a single ExecProject call. inputoff
274
- * is the starting index of this aggregate's argument expressions in the
275
- * resulting tuple (in AggState->evalslot).
276
- */
277
- int inputoff ;
278
-
279
271
/*
280
272
* Number of aggregated input columns to pass to the transfn. This
281
273
* includes the ORDER BY columns for ordered-set aggs, but not for plain
282
274
* aggs. (This doesn't count the transition state value!)
283
275
*/
284
276
int numTransInputs ;
285
277
278
+ /*
279
+ * At each input row, we perform a single ExecProject call to evaluate all
280
+ * argument expressions that will certainly be needed at this row; that
281
+ * includes this aggregate's filter expression if it has one, or its
282
+ * regular argument expressions (including any ORDER BY columns) if it
283
+ * doesn't. inputoff is the starting index of this aggregate's required
284
+ * expressions in the resulting tuple.
285
+ */
286
+ int inputoff ;
287
+
286
288
/* Oid of the state transition or combine function */
287
289
Oid transfn_oid ;
288
290
@@ -295,9 +297,8 @@ typedef struct AggStatePerTransData
295
297
/* Oid of state value's datatype */
296
298
Oid aggtranstype ;
297
299
298
- /* ExprStates of the FILTER and argument expressions. */
299
- ExprState * aggfilter ; /* state of FILTER expression, if any */
300
- List * aggdirectargs ; /* states of direct-argument expressions */
300
+ /* ExprStates for any direct-argument expressions */
301
+ List * aggdirectargs ;
301
302
302
303
/*
303
304
* fmgr lookup data for transition function or combine function. Note in
@@ -353,20 +354,21 @@ typedef struct AggStatePerTransData
353
354
transtypeByVal ;
354
355
355
356
/*
356
- * Stuff for evaluation of aggregate inputs in cases where the aggregate
357
- * requires sorted input. The arguments themselves will be evaluated via
358
- * AggState->evalslot/evalproj for all aggregates at once, but we only
359
- * want to sort the relevant columns for individual aggregates .
357
+ * Stuff for evaluation of aggregate inputs, when they must be evaluated
358
+ * separately because there's a FILTER expression. In such cases we will
359
+ * create a sortslot and the result will be stored there, whether or not
360
+ * we're actually sorting .
360
361
*/
361
- TupleDesc sortdesc ; /* descriptor of input tuples */
362
+ ProjectionInfo * evalproj ; /* projection machinery */
362
363
363
364
/*
364
365
* Slots for holding the evaluated input arguments. These are set up
365
- * during ExecInitAgg() and then used for each input row requiring
366
- * processing besides what's done in AggState->evalproj .
366
+ * during ExecInitAgg() and then used for each input row requiring either
367
+ * FILTER or ORDER BY/DISTINCT processing .
367
368
*/
368
369
TupleTableSlot * sortslot ; /* current input tuple */
369
370
TupleTableSlot * uniqslot ; /* used for multi-column DISTINCT */
371
+ TupleDesc sortdesc ; /* descriptor of input tuples */
370
372
371
373
/*
372
374
* These values are working state that is initialized at the start of an
@@ -973,30 +975,36 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
973
975
int numGroupingSets = Max (aggstate -> phase -> numsets , 1 );
974
976
int numHashes = aggstate -> num_hashes ;
975
977
int numTrans = aggstate -> numtrans ;
976
- TupleTableSlot * slot = aggstate -> evalslot ;
978
+ TupleTableSlot * combinedslot ;
977
979
978
- /* compute input for all aggregates */
979
- if (aggstate -> evalproj )
980
- aggstate -> evalslot = ExecProject (aggstate -> evalproj );
980
+ /* compute required inputs for all aggregates */
981
+ combinedslot = ExecProject (aggstate -> combinedproj );
981
982
982
983
for (transno = 0 ; transno < numTrans ; transno ++ )
983
984
{
984
985
AggStatePerTrans pertrans = & aggstate -> pertrans [transno ];
985
- ExprState * filter = pertrans -> aggfilter ;
986
986
int numTransInputs = pertrans -> numTransInputs ;
987
- int i ;
988
987
int inputoff = pertrans -> inputoff ;
988
+ TupleTableSlot * slot ;
989
+ int i ;
989
990
990
991
/* Skip anything FILTERed out */
991
- if (filter )
992
+ if (pertrans -> aggref -> aggfilter )
992
993
{
993
- Datum res ;
994
- bool isnull ;
995
-
996
- res = ExecEvalExprSwitchContext (filter , aggstate -> tmpcontext ,
997
- & isnull );
998
- if (isnull || !DatumGetBool (res ))
994
+ /* Check the result of the filter expression */
995
+ if (combinedslot -> tts_isnull [inputoff ] ||
996
+ !DatumGetBool (combinedslot -> tts_values [inputoff ]))
999
997
continue ;
998
+
999
+ /* Now it's safe to evaluate this agg's arguments */
1000
+ slot = ExecProject (pertrans -> evalproj );
1001
+ /* There's no offset needed in this slot, of course */
1002
+ inputoff = 0 ;
1003
+ }
1004
+ else
1005
+ {
1006
+ /* arguments are already evaluated into combinedslot @ inputoff */
1007
+ slot = combinedslot ;
1000
1008
}
1001
1009
1002
1010
if (pertrans -> numSortCols > 0 )
@@ -1030,11 +1038,21 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
1030
1038
tuplesort_putdatum (pertrans -> sortstates [setno ],
1031
1039
slot -> tts_values [inputoff ],
1032
1040
slot -> tts_isnull [inputoff ]);
1041
+ else if (pertrans -> aggref -> aggfilter )
1042
+ {
1043
+ /*
1044
+ * When filtering and ordering, we already have a slot
1045
+ * containing just the argument columns.
1046
+ */
1047
+ Assert (slot == pertrans -> sortslot );
1048
+ tuplesort_puttupleslot (pertrans -> sortstates [setno ], slot );
1049
+ }
1033
1050
else
1034
1051
{
1035
1052
/*
1036
- * Copy slot contents, starting from inputoff, into sort
1037
- * slot.
1053
+ * Copy argument columns from combined slot, starting at
1054
+ * inputoff, into sortslot, so that we can store just the
1055
+ * columns we want.
1038
1056
*/
1039
1057
ExecClearTuple (pertrans -> sortslot );
1040
1058
memcpy (pertrans -> sortslot -> tts_values ,
@@ -1043,9 +1061,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
1043
1061
memcpy (pertrans -> sortslot -> tts_isnull ,
1044
1062
& slot -> tts_isnull [inputoff ],
1045
1063
pertrans -> numInputs * sizeof (bool ));
1046
- pertrans -> sortslot -> tts_nvalid = pertrans -> numInputs ;
1047
1064
ExecStoreVirtualTuple (pertrans -> sortslot );
1048
- tuplesort_puttupleslot (pertrans -> sortstates [setno ], pertrans -> sortslot );
1065
+ tuplesort_puttupleslot (pertrans -> sortstates [setno ],
1066
+ pertrans -> sortslot );
1049
1067
}
1050
1068
}
1051
1069
}
@@ -1117,7 +1135,7 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
1117
1135
Assert (aggstate -> phase -> numsets <= 1 );
1118
1136
1119
1137
/* compute input for all aggregates */
1120
- slot = ExecProject (aggstate -> evalproj );
1138
+ slot = ExecProject (aggstate -> combinedproj );
1121
1139
1122
1140
for (transno = 0 ; transno < numTrans ; transno ++ )
1123
1141
{
@@ -2681,6 +2699,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
2681
2699
int phase ;
2682
2700
int phaseidx ;
2683
2701
List * combined_inputeval ;
2702
+ TupleDesc combineddesc ;
2703
+ TupleTableSlot * combinedslot ;
2684
2704
ListCell * l ;
2685
2705
Bitmapset * all_grouped_cols = NULL ;
2686
2706
int numGroupingSets = 1 ;
@@ -3345,19 +3365,17 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
3345
3365
aggstate -> numtrans = transno + 1 ;
3346
3366
3347
3367
/*
3348
- * Build a single projection computing the aggregate arguments for all
3368
+ * Build a single projection computing the required arguments for all
3349
3369
* aggregates at once; if there's more than one, that's considerably
3350
3370
* faster than doing it separately for each.
3351
3371
*
3352
- * First create a targetlist combining the targetlists of all the
3353
- * per-trans states.
3372
+ * First create a targetlist representing the values to compute.
3354
3373
*/
3355
3374
combined_inputeval = NIL ;
3356
3375
column_offset = 0 ;
3357
3376
for (transno = 0 ; transno < aggstate -> numtrans ; transno ++ )
3358
3377
{
3359
3378
AggStatePerTrans pertrans = & pertransstates [transno ];
3360
- ListCell * arg ;
3361
3379
3362
3380
/*
3363
3381
* Mark this per-trans state with its starting column in the combined
@@ -3366,38 +3384,70 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
3366
3384
pertrans -> inputoff = column_offset ;
3367
3385
3368
3386
/*
3369
- * Adjust resnos in the copied target entries to match the combined
3370
- * slot.
3387
+ * If the aggregate has a FILTER, we can only evaluate the filter
3388
+ * expression, not the actual input expressions, during the combined
3389
+ * eval step --- unless we're ignoring the filter because this node is
3390
+ * running combinefns not transfns.
3371
3391
*/
3372
- foreach (arg , pertrans -> aggref -> args )
3392
+ if (pertrans -> aggref -> aggfilter &&
3393
+ !DO_AGGSPLIT_COMBINE (aggstate -> aggsplit ))
3373
3394
{
3374
- TargetEntry * source_tle = lfirst_node (TargetEntry , arg );
3375
3395
TargetEntry * tle ;
3376
3396
3377
- tle = flatCopyTargetEntry (source_tle );
3378
- tle -> resno += column_offset ;
3379
-
3397
+ tle = makeTargetEntry (pertrans -> aggref -> aggfilter ,
3398
+ column_offset + 1 , NULL , false);
3380
3399
combined_inputeval = lappend (combined_inputeval , tle );
3400
+ column_offset ++ ;
3401
+
3402
+ /*
3403
+ * We'll need separate projection machinery for the real args.
3404
+ * Arrange to evaluate them into the sortslot previously created.
3405
+ */
3406
+ Assert (pertrans -> sortslot );
3407
+ pertrans -> evalproj = ExecBuildProjectionInfo (pertrans -> aggref -> args ,
3408
+ aggstate -> tmpcontext ,
3409
+ pertrans -> sortslot ,
3410
+ & aggstate -> ss .ps ,
3411
+ NULL );
3381
3412
}
3413
+ else
3414
+ {
3415
+ /*
3416
+ * Add agg's input expressions to combined_inputeval, adjusting
3417
+ * resnos in the copied target entries to match the combined slot.
3418
+ */
3419
+ ListCell * arg ;
3420
+
3421
+ foreach (arg , pertrans -> aggref -> args )
3422
+ {
3423
+ TargetEntry * source_tle = lfirst_node (TargetEntry , arg );
3424
+ TargetEntry * tle ;
3425
+
3426
+ tle = flatCopyTargetEntry (source_tle );
3427
+ tle -> resno += column_offset ;
3382
3428
3383
- column_offset += list_length (pertrans -> aggref -> args );
3429
+ combined_inputeval = lappend (combined_inputeval , tle );
3430
+ }
3431
+
3432
+ column_offset += list_length (pertrans -> aggref -> args );
3433
+ }
3384
3434
}
3385
3435
3386
3436
/* Now create a projection for the combined targetlist */
3387
- aggstate -> evaldesc = ExecTypeFromTL (combined_inputeval , false);
3388
- aggstate -> evalslot = ExecInitExtraTupleSlot (estate );
3389
- aggstate -> evalproj = ExecBuildProjectionInfo ( combined_inputeval ,
3390
- aggstate -> tmpcontext ,
3391
- aggstate -> evalslot ,
3392
- & aggstate -> ss . ps ,
3393
- NULL );
3394
- ExecSetSlotDescriptor ( aggstate -> evalslot , aggstate -> evaldesc );
3437
+ combineddesc = ExecTypeFromTL (combined_inputeval , false);
3438
+ combinedslot = ExecInitExtraTupleSlot (estate );
3439
+ ExecSetSlotDescriptor ( combinedslot , combineddesc );
3440
+ aggstate -> combinedproj = ExecBuildProjectionInfo ( combined_inputeval ,
3441
+ aggstate -> tmpcontext ,
3442
+ combinedslot ,
3443
+ & aggstate -> ss . ps ,
3444
+ NULL );
3395
3445
3396
3446
/*
3397
3447
* Last, check whether any more aggregates got added onto the node while
3398
3448
* we processed the expressions for the aggregate arguments (including not
3399
- * only the regular arguments handled immediately above, but any FILTER
3400
- * expressions and direct arguments we might've handled earlier). If so,
3449
+ * only the regular arguments and FILTER expressions handled immediately
3450
+ * above, but any direct arguments we might've handled earlier). If so,
3401
3451
* we have nested aggregate functions, which is semantically nonsensical,
3402
3452
* so complain. (This should have been caught by the parser, so we don't
3403
3453
* need to work hard on a helpful error message; but we defend against it
@@ -3462,6 +3512,8 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3462
3512
else
3463
3513
pertrans -> numTransInputs = numArguments ;
3464
3514
3515
+ /* inputoff and evalproj will be set up later, in ExecInitAgg */
3516
+
3465
3517
/*
3466
3518
* When combining states, we have no use at all for the aggregate
3467
3519
* function's transfn. Instead we use the combinefn. In this case, the
@@ -3577,9 +3629,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3577
3629
3578
3630
}
3579
3631
3580
- /* Initialize the input and FILTER expressions */
3581
- pertrans -> aggfilter = ExecInitExpr (aggref -> aggfilter ,
3582
- (PlanState * ) aggstate );
3632
+ /* Initialize any direct-argument expressions */
3583
3633
pertrans -> aggdirectargs = ExecInitExprList (aggref -> aggdirectargs ,
3584
3634
(PlanState * ) aggstate );
3585
3635
@@ -3613,16 +3663,20 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3613
3663
pertrans -> numSortCols = numSortCols ;
3614
3664
pertrans -> numDistinctCols = numDistinctCols ;
3615
3665
3616
- if (numSortCols > 0 )
3666
+ /*
3667
+ * If we have either sorting or filtering to do, create a tupledesc and
3668
+ * slot corresponding to the aggregated inputs (including sort
3669
+ * expressions) of the agg.
3670
+ */
3671
+ if (numSortCols > 0 || aggref -> aggfilter )
3617
3672
{
3618
- /*
3619
- * Get a tupledesc and slot corresponding to the aggregated inputs
3620
- * (including sort expressions) of the agg.
3621
- */
3622
3673
pertrans -> sortdesc = ExecTypeFromTL (aggref -> args , false);
3623
3674
pertrans -> sortslot = ExecInitExtraTupleSlot (estate );
3624
3675
ExecSetSlotDescriptor (pertrans -> sortslot , pertrans -> sortdesc );
3676
+ }
3625
3677
3678
+ if (numSortCols > 0 )
3679
+ {
3626
3680
/*
3627
3681
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3628
3682
* (yet)
0 commit comments