diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_aggregate.c | 241 | ||||
-rw-r--r-- | src/backend/commands/aggregatecmds.c | 101 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 13 | ||||
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 639 | ||||
-rw-r--r-- | src/backend/optimizer/util/clauses.c | 6 | ||||
-rw-r--r-- | src/backend/parser/parse_agg.c | 32 |
6 files changed, 903 insertions, 129 deletions
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index fe6dc8a9a24..633b8f1d6ac 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -57,10 +57,16 @@ AggregateCreate(const char *aggName, Oid variadicArgType, List *aggtransfnName, List *aggfinalfnName, + List *aggmtransfnName, + List *aggminvtransfnName, + List *aggmfinalfnName, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, - const char *agginitval) + Oid aggmTransType, + int32 aggmTransSpace, + const char *agginitval, + const char *aggminitval) { Relation aggdesc; HeapTuple tup; @@ -69,14 +75,19 @@ AggregateCreate(const char *aggName, Form_pg_proc proc; Oid transfn; Oid finalfn = InvalidOid; /* can be omitted */ + Oid mtransfn = InvalidOid; /* can be omitted */ + Oid minvtransfn = InvalidOid; /* can be omitted */ + Oid mfinalfn = InvalidOid; /* can be omitted */ Oid sortop = InvalidOid; /* can be omitted */ Oid *aggArgTypes = parameterTypes->values; bool hasPolyArg; bool hasInternalArg; + bool mtransIsStrict = false; Oid rettype; Oid finaltype; Oid fnArgs[FUNC_MAX_ARGS]; int nargs_transfn; + int nargs_finalfn; Oid procOid; TupleDesc tupDesc; int i; @@ -129,6 +140,16 @@ AggregateCreate(const char *aggName, errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); /* + * Likewise for moving-aggregate transtype, if any + */ + if (OidIsValid(aggmTransType) && + IsPolymorphicType(aggmTransType) && !hasPolyArg) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("cannot determine transition data type"), + errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); + + /* * An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In * principle we could support regular variadic types, but it would make * things much more complicated because we'd have to assemble the correct @@ -234,32 +255,120 @@ AggregateCreate(const char *aggName, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); } + ReleaseSysCache(tup); - /* handle finalfn, if supplied */ - if (aggfinalfnName) + /* handle moving-aggregate transfn, if supplied */ + if (aggmtransfnName) { - int nargs_finalfn; + /* + * The arguments are the same as for the regular transfn, except that + * the transition data type might be different. So re-use the fnArgs + * values set up above, except for that one. + */ + Assert(OidIsValid(aggmTransType)); + fnArgs[0] = aggmTransType; + + mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of transition function %s is not %s", + NameListToString(aggmtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", mtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); /* - * For ordinary aggs, the finalfn just takes the transtype. For - * ordered-set aggs, it takes the transtype plus all args. (The - * aggregated args are useless at runtime, and are actually passed as - * NULLs, but we may need them in the function signature to allow - * resolution of a polymorphic agg's result type.) + * If the mtransfn is strict and the minitval is NULL, check first + * input type and mtranstype are binary-compatible. */ - fnArgs[0] = aggTransType; - if (AGGKIND_IS_ORDERED_SET(aggKind)) + if (proc->proisstrict && aggminitval == NULL) { - nargs_finalfn = numArgs + 1; - memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); - } - else - { - nargs_finalfn = 1; - /* variadic-ness of the aggregate doesn't affect finalfn */ - variadicArgType = InvalidOid; + if (numArgs < 1 || + !IsBinaryCoercible(aggArgTypes[0], aggmTransType)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); } + + /* Remember if mtransfn is strict; we may need this below */ + mtransIsStrict = proc->proisstrict; + + ReleaseSysCache(tup); + } + + /* handle minvtransfn, if supplied */ + if (aggminvtransfnName) + { + /* + * This must have the same number of arguments with the same types as + * the forward transition function, so just re-use the fnArgs data. + */ + Assert(aggmtransfnName); + + minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of inverse transition function %s is not %s", + NameListToString(aggminvtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", minvtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); + + /* + * We require the strictness settings of the forward and inverse + * transition functions to agree. This saves having to handle + * assorted special cases at execution time. + */ + if (proc->proisstrict != mtransIsStrict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + ReleaseSysCache(tup); + } + + /* + * Set up fnArgs for looking up finalfn(s) + * + * For ordinary aggs, the finalfn just takes the transtype. For + * ordered-set aggs, it takes the transtype plus all args. (The + * aggregated args are useless at runtime, and are actually passed as + * NULLs, but we may need them in the function signature to allow + * resolution of a polymorphic agg's result type.) + */ + fnArgs[0] = aggTransType; + if (AGGKIND_IS_ORDERED_SET(aggKind)) + { + nargs_finalfn = numArgs + 1; + memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); + } + else + { + nargs_finalfn = 1; + /* variadic-ness of the aggregate doesn't affect finalfn */ + variadicArgType = InvalidOid; + } + + /* handle finalfn, if supplied */ + if (aggfinalfnName) + { finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn, fnArgs, variadicArgType, &finaltype); @@ -314,6 +423,49 @@ AggregateCreate(const char *aggName, errmsg("unsafe use of pseudo-type \"internal\""), errdetail("A function returning \"internal\" must have at least one \"internal\" argument."))); + /* + * If a moving-aggregate implementation is supplied, look up its finalfn + * if any, and check that the implied aggregate result type matches the + * plain implementation. + */ + if (OidIsValid(aggmTransType)) + { + /* handle finalfn, if supplied */ + if (aggmfinalfnName) + { + /* + * The arguments are the same as for the regular finalfn, except + * that the transition data type might be different. So re-use + * the fnArgs values set up above, except for that one. + */ + fnArgs[0] = aggmTransType; + + mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, check strictness if it's an ordered-set agg */ + if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("final function of an ordered-set aggregate must not be declared STRICT"))); + } + else + { + /* + * If no finalfn, aggregate result type is type of the state value + */ + rettype = aggmTransType; + } + Assert(OidIsValid(rettype)); + if (rettype != finaltype) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s", + format_type_be(aggmTransType), + format_type_be(aggTransType)))); + } + /* handle sortop, if supplied */ if (aggsortopName) { @@ -340,6 +492,13 @@ AggregateCreate(const char *aggName, if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, aggTransType); + if (OidIsValid(aggmTransType)) + { + aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error_type(aclresult, aggmTransType); + } + aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE); if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, finaltype); @@ -392,13 +551,22 @@ AggregateCreate(const char *aggName, values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs); values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn); values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn); + values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn); + values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn); + values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn); values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop); values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType); values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace); + values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType); + values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace); if (agginitval) values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval); else nulls[Anum_pg_aggregate_agginitval - 1] = true; + if (aggminitval) + values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval); + else + nulls[Anum_pg_aggregate_aggminitval - 1] = true; aggdesc = heap_open(AggregateRelationId, RowExclusiveLock); tupDesc = aggdesc->rd_att; @@ -414,6 +582,7 @@ AggregateCreate(const char *aggName, * Create dependencies for the aggregate (above and beyond those already * made by ProcedureCreate). Note: we don't need an explicit dependency * on aggTransType since we depend on it indirectly through transfn. + * Likewise for aggmTransType if any. */ myself.classId = ProcedureRelationId; myself.objectId = procOid; @@ -434,6 +603,33 @@ AggregateCreate(const char *aggName, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + /* Depends on forward transition function, if any */ + if (OidIsValid(mtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on inverse transition function, if any */ + if (OidIsValid(minvtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = minvtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on final function, if any */ + if (OidIsValid(mfinalfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mfinalfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* Depends on sort operator, if any */ if (OidIsValid(sortop)) { @@ -447,7 +643,12 @@ AggregateCreate(const char *aggName, } /* - * lookup_agg_function -- common code for finding both transfn and finalfn + * lookup_agg_function + * common code for finding transfn, invtransfn and finalfn + * + * Returns OID of function, and stores its return type into *rettype + * + * NB: must not scribble on input_types[], as we may re-use those */ static Oid lookup_agg_function(List *fnName, diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 640e19cf120..9714112f6d4 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, char aggKind = AGGKIND_NORMAL; List *transfuncName = NIL; List *finalfuncName = NIL; + List *mtransfuncName = NIL; + List *minvtransfuncName = NIL; + List *mfinalfuncName = NIL; List *sortoperatorName = NIL; TypeName *baseType = NULL; TypeName *transType = NULL; + TypeName *mtransType = NULL; int32 transSpace = 0; + int32 mtransSpace = 0; char *initval = NULL; + char *minitval = NULL; int numArgs; int numDirectArgs = 0; oidvector *parameterTypes; @@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, List *parameterDefaults; Oid variadicArgType; Oid transTypeId; + Oid mtransTypeId = InvalidOid; char transTypeType; + char mtransTypeType = 0; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "finalfunc") == 0) finalfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "msfunc") == 0) + mtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "minvfunc") == 0) + minvtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0) + mfinalfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "sortop") == 0) sortoperatorName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "basetype") == 0) @@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transType = defGetTypeName(defel); else if (pg_strcasecmp(defel->defname, "sspace") == 0) transSpace = defGetInt32(defel); + else if (pg_strcasecmp(defel->defname, "mstype") == 0) + mtransType = defGetTypeName(defel); + else if (pg_strcasecmp(defel->defname, "msspace") == 0) + mtransSpace = defGetInt32(defel); else if (pg_strcasecmp(defel->defname, "initcond") == 0) initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "initcond1") == 0) initval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "minitcond") == 0) + minitval = defGetString(defel); else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -159,6 +179,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, errmsg("aggregate sfunc must be specified"))); /* + * if mtransType is given, mtransfuncName and minvtransfuncName must be as + * well; if not, then none of the moving-aggregate options should have + * been given. + */ + if (mtransType != NULL) + { + if (mtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must be specified when mstype is specified"))); + if (minvtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must be specified when mstype is specified"))); + } + else + { + if (mtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must not be specified without mstype"))); + if (minvtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must not be specified without mstype"))); + if (mfinalfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate mfinalfunc must not be specified without mstype"))); + if (mtransSpace != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msspace must not be specified without mstype"))); + if (minitval != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minitcond must not be specified without mstype"))); + } + + /* * look up the aggregate's input datatype(s). */ if (oldstyle) @@ -251,6 +311,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, } /* + * If a moving-aggregate transtype is specified, look that up. Same + * restrictions as for transtype. + */ + if (mtransType) + { + mtransTypeId = typenameTypeId(NULL, mtransType); + mtransTypeType = get_typtype(mtransTypeId); + if (mtransTypeType == TYPTYPE_PSEUDO && + !IsPolymorphicType(mtransTypeId)) + { + if (mtransTypeId == INTERNALOID && superuser()) + /* okay */ ; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate transition data type cannot be %s", + format_type_be(mtransTypeId)))); + } + } + + /* * If we have an initval, and it's not for a pseudotype (particularly a * polymorphic type), make sure it's acceptable to the type's input * function. We will store the initval as text, because the input @@ -269,6 +350,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, } /* + * Likewise for moving-aggregate initval. + */ + if (minitval && mtransTypeType != TYPTYPE_PSEUDO) + { + Oid typinput, + typioparam; + + getTypeInputInfo(mtransTypeId, &typinput, &typioparam); + (void) OidInputFunctionCall(typinput, minitval, typioparam, -1); + } + + /* * Most of the argument-checking is done inside of AggregateCreate */ return AggregateCreate(aggName, /* aggregate name */ @@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, variadicArgType, transfuncName, /* step function name */ finalfuncName, /* final function name */ + mtransfuncName, /* fwd trans function name */ + minvtransfuncName, /* inv trans function name */ + mfinalfuncName, /* final function name */ sortoperatorName, /* sort operator name */ transTypeId, /* transition data type */ transSpace, /* transition space */ - initval); /* initial condition */ + mtransTypeId, /* transition data type */ + mtransSpace, /* transition space */ + initval, /* initial condition */ + minitval); /* initial condition */ } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 7e4bca5b4d8..d60845bcd34 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -1798,8 +1798,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggref->aggtype, aggref->inputcollid, transfn_oid, + InvalidOid, /* invtrans is not needed here */ finalfn_oid, &transfnexpr, + NULL, &finalfnexpr); /* set up infrastructure for calling the transfn and finalfn */ @@ -1847,7 +1849,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * type and transtype are the same (or at least binary-compatible), so * that it's OK to use the first aggregated input value as the initial * transValue. This should have been checked at agg definition time, - * but just in case... + * but we must check again in case the transfn's strictness property + * has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -2126,6 +2129,12 @@ ExecReScanAgg(AggState *node) ExecReScan(node->ss.ps.lefttree); } + +/*********************************************************************** + * API exposed to aggregate functions + ***********************************************************************/ + + /* * AggCheckCallContext - test if a SQL function is being called as an aggregate * @@ -2152,7 +2161,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) { if (aggcontext) - *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext; + *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext; return AGG_CONTEXT_WINDOW; } diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b558e59231..046637fb092 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData */ typedef struct WindowStatePerAggData { - /* Oids of transfer functions */ + /* Oids of transition functions */ Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ Oid finalfn_oid; /* may be InvalidOid */ /* - * fmgr lookup data for transfer functions --- only valid when + * fmgr lookup data for transition functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that fn_strict * flags are kept here. */ FmgrInfo transfn; + FmgrInfo invtransfn; FmgrInfo finalfn; /* @@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData int wfuncno; /* index of associated PerFuncData */ + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + /* Current transition value */ Datum transValue; /* current transition value */ bool transValueIsNull; - bool noTransValue; /* true if transValue not set yet */ + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate, static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, @@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + if (peraggstate->initValueIsNull) peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } peraggstate->transValueIsNull = peraggstate->initValueIsNull; - peraggstate->noTransValue = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; peraggstate->resultValueIsNull = true; } @@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate, { /* * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. + * just keep the prior transValue. Note transValueCount doesn't + * change either. */ for (i = 1; i <= numArguments; i++) { @@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate, return; } } - if (peraggstate->noTransValue) + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) { - /* - * transValue has not been initialized. This is the first non-NULL - * input value. We use it as the initial value for transValue. (We - * already checked that the agg's input type is binary-compatible - * with its transtype, so straight copy here is OK.) - * - * We must copy the datum into aggcontext if it is pass-by-ref. We - * do not need to pfree the old transValue, since it's NULL. - */ - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + peraggstate->transValueCount = 1; MemoryContextSwitchTo(oldContext); return; } + if (peraggstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. */ MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); return; } } /* - * OK to call the transition function + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numArguments + 1, @@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate, (void *) winstate, NULL); fcinfo->arg[0] = peraggstate->transValue; fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return NULL, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return NULL"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() musn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; /* * If pass-by-ref datatype, must copy the new value into aggcontext and @@ -322,7 +368,161 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + pfree(DatumGetPointer(peraggstate->transValue)); + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; +} + +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + FunctionCallInfoData fcinfodata; + FunctionCallInfo fcinfo = &fcinfodata; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, + &fcinfo->argnull[i], NULL); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->argnull[i]) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->arg[0] = peraggstate->transValue; + fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if invtransfn returned a pointer to + * its first input, we don't need to do anything. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -334,6 +534,8 @@ advance_windowaggregate(WindowAggState *winstate, MemoryContextSwitchTo(oldContext); peraggstate->transValue = newVal; peraggstate->transValueIsNull = fcinfo->isnull; + + return true; } /* @@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate, } else { + winstate->curaggcontext = peraggstate->aggcontext; *result = FunctionCallInvoke(&fcinfo); + winstate->curaggcontext = NULL; *isnull = fcinfo.isnull; } } @@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate, * eval_windowaggregates * evaluate plain aggregates being used as window functions * - * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be * able to call aggregate final functions repeatedly after aggregating more * data onto the same transition value. This is not a behavior required by * nodeAgg.c. @@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate) { WindowStatePerAgg peraggstate; int wfuncno, - numaggs; - int i; + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; MemoryContext oldContext; ExprContext *econtext; WindowObject agg_winobj; TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; numaggs = winstate->numaggs; if (numaggs == 0) @@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate) econtext = winstate->ss.ps.ps_ExprContext; agg_winobj = winstate->agg_winobj; agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; /* * Currently, we support only a subset of the SQL-standard window framing @@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption in * nodeAgg.c too (when it rescans an existing hash table). * - * For other frame start rules, we discard the aggregate state and re-run - * the aggregates whenever the frame head row moves. We can still - * optimize as above whenever successive rows share the same frame head. + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE @@ -452,75 +670,195 @@ eval_windowaggregates(WindowAggState *winstate) * 'aggregatedupto' keeps track of the first row that has not yet been * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. - * - * TODO: Rerunning aggregates from the frame start can be pretty slow. For - * some aggregates like SUM and COUNT we could avoid that by implementing - * a "negative transition function" that would be called for each row as - * it exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. */ /* * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. */ - update_frameheadpos(agg_winobj, winstate->temp_slot_1); + update_frameheadpos(agg_winobj, temp_slot); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); /* - * Initialize aggregates on first call for partition, or if the frame head - * position moved since last time. + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, and the current row lies within the previous + * row's frame, then the two frames' ends must coincide. Note that on the + * first row aggregatedbase == aggregatedupto, meaning this test must + * fail, so we don't need to check the "there was no previous row" case + * explicitly here. */ - if (winstate->currentpos == 0 || - winstate->frameheadpos != winstate->aggregatedbase) + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) { - /* - * Discard transient aggregate values - */ - MemoryContextResetAndDeleteChildren(winstate->aggcontext); - for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; } + return; + } + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { /* - * If we created a mark pointer for aggregates, keep it pushed up to - * frame head, so that tuplestore can discard unnecessary rows. + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. */ - if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; /* - * Initialize for loop below + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. */ - ExecClearTuple(agg_row_slot); - winstate->aggregatedbase = winstate->frameheadpos; - winstate->aggregatedupto = winstate->frameheadpos; + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); } /* - * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates - * except when the frame head moves. In END_CURRENT_ROW mode, we only - * have to recalculate when the frame head moves or currentpos has - * advanced past the place we'd aggregated up to. Check for these cases - * and if so, reuse the saved result values. + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. */ - if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | - FRAMEOPTION_END_CURRENT_ROW)) && - winstate->aggregatedbase <= winstate->currentpos && - winstate->aggregatedupto > winstate->currentpos) + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) { - for (i = 0; i < numaggs; i++) + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) { - peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; - econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; } - return; + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); } /* @@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate) for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + wfuncno = peraggstate->wfuncno; advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], @@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate) ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + /* * finalize aggregates and fill result/isnull fields. */ @@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate) * advance that the next row can't possibly share the same frame. Is * it worth detecting that and skipping this code? */ - if (!peraggstate->resulttypeByVal) + if (!peraggstate->resulttypeByVal && !*isnull) { - /* - * clear old resultValue in order not to leak memory. (Note: the - * new result can't possibly be the same datum as old resultValue, - * because we never passed it to the trans function.) - */ - if (!peraggstate->resultValueIsNull) - pfree(DatumGetPointer(peraggstate->resultValue)); - - /* - * If pass-by-ref, copy it into our aggregate context. - */ - if (!*isnull) - { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); - peraggstate->resultValue = - datumCopy(*result, - peraggstate->resulttypeByVal, - peraggstate->resulttypeLen); - MemoryContextSwitchTo(oldContext); - } + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); } else { @@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, (void *) perfuncstate->winobj, NULL); /* Just in case, make all the regular argument slots be null */ memset(fcinfo.argnull, true, perfuncstate->numArguments); + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; *result = FunctionCallInvoke(&fcinfo); *isnull = fcinfo.isnull; @@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate) */ MemoryContextResetAndDeleteChildren(winstate->partcontext); MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* Create mid-lived context for aggregate trans values etc */ + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ winstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "WindowAgg_Aggregates", @@ -1657,12 +2002,10 @@ void ExecEndWindowAgg(WindowAggState *node) { PlanState *outerPlan; + int i; release_partition(node); - pfree(node->perfunc); - pfree(node->peragg); - ExecClearTuple(node->ss.ss_ScanTupleSlot); ExecClearTuple(node->first_part_slot); ExecClearTuple(node->agg_row_slot); @@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } MemoryContextDelete(node->partcontext); MemoryContextDelete(node->aggcontext); + pfree(node->perfunc); + pfree(node->peragg); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; + AttrNumber initvalAttNo; AclResult aclresult; Oid transfn_oid, + invtransfn_oid, finalfn_oid; Expr *transfnexpr, + *invtransfnexpr, *finalfnexpr; Datum textInitVal; int i; @@ -1757,13 +2111,39 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * If the frame head can't move, we don't need moving-aggregate code. Even + * if we'd like to use it, don't do so if the aggregate's arguments (and + * FILTER clause if any) contain any calls to volatile functions. + * Otherwise, the difference between restarting and not restarting the + * aggregation would be user-visible. + */ + if (OidIsValid(aggform->aggminvtransfn) && + !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && + !contain_volatile_functions((Node *) wfunc)) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + + /* * ExecInitWindowAgg already checked permission to call aggregate function * ... but we still need to check the component functions */ - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; @@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, /* resolve actual type of transition state, if polymorphic */ aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, - aggform->aggtranstype, + aggtranstype, inputTypes, numArguments); @@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->wintype, wfunc->inputcollid, transfn_oid, + invtransfn_oid, finalfn_oid, &transfnexpr, + &invtransfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); @@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) @@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * If the transfn is strict and the initval is NULL, make sure input type * and transtype are the same (or at least binary-compatible), so that * it's OK to use the first input value as the initial transValue. This - * should have been checked at agg definition time, but just in case... + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid))); } + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_AggregatePrivate", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + else + peraggstate->aggcontext = winstate->aggcontext; + ReleaseSysCache(aggTuple); return peraggstate; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 201529b885a..3f307e6464c 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -471,7 +471,11 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context) Assert(aggref->agglevelsup == 0); - /* fetch info about aggregate from pg_aggregate */ + /* + * Fetch info about aggregate from pg_aggregate. Note it's correct to + * ignore the moving-aggregate variant, since what we're concerned + * with here is aggregates not window functions. + */ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 9613e2aec81..272d27f919e 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -1187,11 +1187,13 @@ resolve_aggregate_transtype(Oid aggfuncid, * For an ordered-set aggregate, remember that agg_input_types describes * the direct arguments followed by the aggregated arguments. * - * transfn_oid and finalfn_oid identify the funcs to be called; the latter - * may be InvalidOid. + * transfn_oid, invtransfn_oid and finalfn_oid identify the funcs to be + * called; the latter two may be InvalidOid. * - * Pointers to the constructed trees are returned into *transfnexpr and - * *finalfnexpr. The latter is set to NULL if there's no finalfn. + * Pointers to the constructed trees are returned into *transfnexpr, + * *invtransfnexpr and *finalfnexpr. If there is no invtransfn or finalfn, + * the respective pointers are set to NULL. Since use of the invtransfn is + * optional, NULL may be passed for invtransfnexpr. */ void build_aggregate_fnexprs(Oid *agg_input_types, @@ -1203,8 +1205,10 @@ build_aggregate_fnexprs(Oid *agg_input_types, Oid agg_result_type, Oid agg_input_collation, Oid transfn_oid, + Oid invtransfn_oid, Oid finalfn_oid, Expr **transfnexpr, + Expr **invtransfnexpr, Expr **finalfnexpr) { Param *argp; @@ -1249,6 +1253,26 @@ build_aggregate_fnexprs(Oid *agg_input_types, fexpr->funcvariadic = agg_variadic; *transfnexpr = (Expr *) fexpr; + /* + * Build invtransfn expression if requested, with same args as transfn + */ + if (invtransfnexpr != NULL) + { + if (OidIsValid(invtransfn_oid)) + { + fexpr = makeFuncExpr(invtransfn_oid, + agg_state_type, + args, + InvalidOid, + agg_input_collation, + COERCE_EXPLICIT_CALL); + fexpr->funcvariadic = agg_variadic; + *invtransfnexpr = (Expr *) fexpr; + } + else + *invtransfnexpr = NULL; + } + /* see if we have a final function */ if (!OidIsValid(finalfn_oid)) { |