Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_aggregate.c241
-rw-r--r--src/backend/commands/aggregatecmds.c101
-rw-r--r--src/backend/executor/nodeAgg.c13
-rw-r--r--src/backend/executor/nodeWindowAgg.c639
-rw-r--r--src/backend/optimizer/util/clauses.c6
-rw-r--r--src/backend/parser/parse_agg.c32
6 files changed, 903 insertions, 129 deletions
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index fe6dc8a9a24..633b8f1d6ac 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -57,10 +57,16 @@ AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
+ List *aggmtransfnName,
+ List *aggminvtransfnName,
+ List *aggmfinalfnName,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
- const char *agginitval)
+ Oid aggmTransType,
+ int32 aggmTransSpace,
+ const char *agginitval,
+ const char *aggminitval)
{
Relation aggdesc;
HeapTuple tup;
@@ -69,14 +75,19 @@ AggregateCreate(const char *aggName,
Form_pg_proc proc;
Oid transfn;
Oid finalfn = InvalidOid; /* can be omitted */
+ Oid mtransfn = InvalidOid; /* can be omitted */
+ Oid minvtransfn = InvalidOid; /* can be omitted */
+ Oid mfinalfn = InvalidOid; /* can be omitted */
Oid sortop = InvalidOid; /* can be omitted */
Oid *aggArgTypes = parameterTypes->values;
bool hasPolyArg;
bool hasInternalArg;
+ bool mtransIsStrict = false;
Oid rettype;
Oid finaltype;
Oid fnArgs[FUNC_MAX_ARGS];
int nargs_transfn;
+ int nargs_finalfn;
Oid procOid;
TupleDesc tupDesc;
int i;
@@ -129,6 +140,16 @@ AggregateCreate(const char *aggName,
errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
/*
+ * Likewise for moving-aggregate transtype, if any
+ */
+ if (OidIsValid(aggmTransType) &&
+ IsPolymorphicType(aggmTransType) && !hasPolyArg)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("cannot determine transition data type"),
+ errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
+
+ /*
* An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In
* principle we could support regular variadic types, but it would make
* things much more complicated because we'd have to assemble the correct
@@ -234,32 +255,120 @@ AggregateCreate(const char *aggName,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
}
+
ReleaseSysCache(tup);
- /* handle finalfn, if supplied */
- if (aggfinalfnName)
+ /* handle moving-aggregate transfn, if supplied */
+ if (aggmtransfnName)
{
- int nargs_finalfn;
+ /*
+ * The arguments are the same as for the regular transfn, except that
+ * the transition data type might be different. So re-use the fnArgs
+ * values set up above, except for that one.
+ */
+ Assert(OidIsValid(aggmTransType));
+ fnArgs[0] = aggmTransType;
+
+ mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn,
+ fnArgs, variadicArgType,
+ &rettype);
+
+ /* As above, return type must exactly match declared mtranstype. */
+ if (rettype != aggmTransType)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("return type of transition function %s is not %s",
+ NameListToString(aggmtransfnName),
+ format_type_be(aggmTransType))));
+
+ tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for function %u", mtransfn);
+ proc = (Form_pg_proc) GETSTRUCT(tup);
/*
- * For ordinary aggs, the finalfn just takes the transtype. For
- * ordered-set aggs, it takes the transtype plus all args. (The
- * aggregated args are useless at runtime, and are actually passed as
- * NULLs, but we may need them in the function signature to allow
- * resolution of a polymorphic agg's result type.)
+ * If the mtransfn is strict and the minitval is NULL, check first
+ * input type and mtranstype are binary-compatible.
*/
- fnArgs[0] = aggTransType;
- if (AGGKIND_IS_ORDERED_SET(aggKind))
+ if (proc->proisstrict && aggminitval == NULL)
{
- nargs_finalfn = numArgs + 1;
- memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
- }
- else
- {
- nargs_finalfn = 1;
- /* variadic-ness of the aggregate doesn't affect finalfn */
- variadicArgType = InvalidOid;
+ if (numArgs < 1 ||
+ !IsBinaryCoercible(aggArgTypes[0], aggmTransType))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
}
+
+ /* Remember if mtransfn is strict; we may need this below */
+ mtransIsStrict = proc->proisstrict;
+
+ ReleaseSysCache(tup);
+ }
+
+ /* handle minvtransfn, if supplied */
+ if (aggminvtransfnName)
+ {
+ /*
+ * This must have the same number of arguments with the same types as
+ * the forward transition function, so just re-use the fnArgs data.
+ */
+ Assert(aggmtransfnName);
+
+ minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn,
+ fnArgs, variadicArgType,
+ &rettype);
+
+ /* As above, return type must exactly match declared mtranstype. */
+ if (rettype != aggmTransType)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("return type of inverse transition function %s is not %s",
+ NameListToString(aggminvtransfnName),
+ format_type_be(aggmTransType))));
+
+ tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for function %u", minvtransfn);
+ proc = (Form_pg_proc) GETSTRUCT(tup);
+
+ /*
+ * We require the strictness settings of the forward and inverse
+ * transition functions to agree. This saves having to handle
+ * assorted special cases at execution time.
+ */
+ if (proc->proisstrict != mtransIsStrict)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("strictness of aggregate's forward and inverse transition functions must match")));
+
+ ReleaseSysCache(tup);
+ }
+
+ /*
+ * Set up fnArgs for looking up finalfn(s)
+ *
+ * For ordinary aggs, the finalfn just takes the transtype. For
+ * ordered-set aggs, it takes the transtype plus all args. (The
+ * aggregated args are useless at runtime, and are actually passed as
+ * NULLs, but we may need them in the function signature to allow
+ * resolution of a polymorphic agg's result type.)
+ */
+ fnArgs[0] = aggTransType;
+ if (AGGKIND_IS_ORDERED_SET(aggKind))
+ {
+ nargs_finalfn = numArgs + 1;
+ memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
+ }
+ else
+ {
+ nargs_finalfn = 1;
+ /* variadic-ness of the aggregate doesn't affect finalfn */
+ variadicArgType = InvalidOid;
+ }
+
+ /* handle finalfn, if supplied */
+ if (aggfinalfnName)
+ {
finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn,
fnArgs, variadicArgType,
&finaltype);
@@ -314,6 +423,49 @@ AggregateCreate(const char *aggName,
errmsg("unsafe use of pseudo-type \"internal\""),
errdetail("A function returning \"internal\" must have at least one \"internal\" argument.")));
+ /*
+ * If a moving-aggregate implementation is supplied, look up its finalfn
+ * if any, and check that the implied aggregate result type matches the
+ * plain implementation.
+ */
+ if (OidIsValid(aggmTransType))
+ {
+ /* handle finalfn, if supplied */
+ if (aggmfinalfnName)
+ {
+ /*
+ * The arguments are the same as for the regular finalfn, except
+ * that the transition data type might be different. So re-use
+ * the fnArgs values set up above, except for that one.
+ */
+ fnArgs[0] = aggmTransType;
+
+ mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn,
+ fnArgs, variadicArgType,
+ &rettype);
+
+ /* As above, check strictness if it's an ordered-set agg */
+ if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("final function of an ordered-set aggregate must not be declared STRICT")));
+ }
+ else
+ {
+ /*
+ * If no finalfn, aggregate result type is type of the state value
+ */
+ rettype = aggmTransType;
+ }
+ Assert(OidIsValid(rettype));
+ if (rettype != finaltype)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s",
+ format_type_be(aggmTransType),
+ format_type_be(aggTransType))));
+ }
+
/* handle sortop, if supplied */
if (aggsortopName)
{
@@ -340,6 +492,13 @@ AggregateCreate(const char *aggName,
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, aggTransType);
+ if (OidIsValid(aggmTransType))
+ {
+ aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error_type(aclresult, aggmTransType);
+ }
+
aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, finaltype);
@@ -392,13 +551,22 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
+ values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
+ values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
+ values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
+ values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType);
+ values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace);
if (agginitval)
values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
else
nulls[Anum_pg_aggregate_agginitval - 1] = true;
+ if (aggminitval)
+ values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval);
+ else
+ nulls[Anum_pg_aggregate_aggminitval - 1] = true;
aggdesc = heap_open(AggregateRelationId, RowExclusiveLock);
tupDesc = aggdesc->rd_att;
@@ -414,6 +582,7 @@ AggregateCreate(const char *aggName,
* Create dependencies for the aggregate (above and beyond those already
* made by ProcedureCreate). Note: we don't need an explicit dependency
* on aggTransType since we depend on it indirectly through transfn.
+ * Likewise for aggmTransType if any.
*/
myself.classId = ProcedureRelationId;
myself.objectId = procOid;
@@ -434,6 +603,33 @@ AggregateCreate(const char *aggName,
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
+ /* Depends on forward transition function, if any */
+ if (OidIsValid(mtransfn))
+ {
+ referenced.classId = ProcedureRelationId;
+ referenced.objectId = mtransfn;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
+ /* Depends on inverse transition function, if any */
+ if (OidIsValid(minvtransfn))
+ {
+ referenced.classId = ProcedureRelationId;
+ referenced.objectId = minvtransfn;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
+ /* Depends on final function, if any */
+ if (OidIsValid(mfinalfn))
+ {
+ referenced.classId = ProcedureRelationId;
+ referenced.objectId = mfinalfn;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
/* Depends on sort operator, if any */
if (OidIsValid(sortop))
{
@@ -447,7 +643,12 @@ AggregateCreate(const char *aggName,
}
/*
- * lookup_agg_function -- common code for finding both transfn and finalfn
+ * lookup_agg_function
+ * common code for finding transfn, invtransfn and finalfn
+ *
+ * Returns OID of function, and stores its return type into *rettype
+ *
+ * NB: must not scribble on input_types[], as we may re-use those
*/
static Oid
lookup_agg_function(List *fnName,
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 640e19cf120..9714112f6d4 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
char aggKind = AGGKIND_NORMAL;
List *transfuncName = NIL;
List *finalfuncName = NIL;
+ List *mtransfuncName = NIL;
+ List *minvtransfuncName = NIL;
+ List *mfinalfuncName = NIL;
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
+ TypeName *mtransType = NULL;
int32 transSpace = 0;
+ int32 mtransSpace = 0;
char *initval = NULL;
+ char *minitval = NULL;
int numArgs;
int numDirectArgs = 0;
oidvector *parameterTypes;
@@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
List *parameterDefaults;
Oid variadicArgType;
Oid transTypeId;
+ Oid mtransTypeId = InvalidOid;
char transTypeType;
+ char mtransTypeType = 0;
ListCell *pl;
/* Convert list of names to a name and namespace */
@@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
finalfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
+ mtransfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
+ minvtransfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0)
+ mfinalfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "sortop") == 0)
sortoperatorName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "basetype") == 0)
@@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "sspace") == 0)
transSpace = defGetInt32(defel);
+ else if (pg_strcasecmp(defel->defname, "mstype") == 0)
+ mtransType = defGetTypeName(defel);
+ else if (pg_strcasecmp(defel->defname, "msspace") == 0)
+ mtransSpace = defGetInt32(defel);
else if (pg_strcasecmp(defel->defname, "initcond") == 0)
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "initcond1") == 0)
initval = defGetString(defel);
+ else if (pg_strcasecmp(defel->defname, "minitcond") == 0)
+ minitval = defGetString(defel);
else
ereport(WARNING,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -159,6 +179,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
errmsg("aggregate sfunc must be specified")));
/*
+ * if mtransType is given, mtransfuncName and minvtransfuncName must be as
+ * well; if not, then none of the moving-aggregate options should have
+ * been given.
+ */
+ if (mtransType != NULL)
+ {
+ if (mtransfuncName == NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msfunc must be specified when mstype is specified")));
+ if (minvtransfuncName == NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minvfunc must be specified when mstype is specified")));
+ }
+ else
+ {
+ if (mtransfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msfunc must not be specified without mstype")));
+ if (minvtransfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minvfunc must not be specified without mstype")));
+ if (mfinalfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate mfinalfunc must not be specified without mstype")));
+ if (mtransSpace != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msspace must not be specified without mstype")));
+ if (minitval != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minitcond must not be specified without mstype")));
+ }
+
+ /*
* look up the aggregate's input datatype(s).
*/
if (oldstyle)
@@ -251,6 +311,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
}
/*
+ * If a moving-aggregate transtype is specified, look that up. Same
+ * restrictions as for transtype.
+ */
+ if (mtransType)
+ {
+ mtransTypeId = typenameTypeId(NULL, mtransType);
+ mtransTypeType = get_typtype(mtransTypeId);
+ if (mtransTypeType == TYPTYPE_PSEUDO &&
+ !IsPolymorphicType(mtransTypeId))
+ {
+ if (mtransTypeId == INTERNALOID && superuser())
+ /* okay */ ;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate transition data type cannot be %s",
+ format_type_be(mtransTypeId))));
+ }
+ }
+
+ /*
* If we have an initval, and it's not for a pseudotype (particularly a
* polymorphic type), make sure it's acceptable to the type's input
* function. We will store the initval as text, because the input
@@ -269,6 +350,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
}
/*
+ * Likewise for moving-aggregate initval.
+ */
+ if (minitval && mtransTypeType != TYPTYPE_PSEUDO)
+ {
+ Oid typinput,
+ typioparam;
+
+ getTypeInputInfo(mtransTypeId, &typinput, &typioparam);
+ (void) OidInputFunctionCall(typinput, minitval, typioparam, -1);
+ }
+
+ /*
* Most of the argument-checking is done inside of AggregateCreate
*/
return AggregateCreate(aggName, /* aggregate name */
@@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
variadicArgType,
transfuncName, /* step function name */
finalfuncName, /* final function name */
+ mtransfuncName, /* fwd trans function name */
+ minvtransfuncName, /* inv trans function name */
+ mfinalfuncName, /* final function name */
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
- initval); /* initial condition */
+ mtransTypeId, /* transition data type */
+ mtransSpace, /* transition space */
+ initval, /* initial condition */
+ minitval); /* initial condition */
}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 7e4bca5b4d8..d60845bcd34 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1798,8 +1798,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggref->aggtype,
aggref->inputcollid,
transfn_oid,
+ InvalidOid, /* invtrans is not needed here */
finalfn_oid,
&transfnexpr,
+ NULL,
&finalfnexpr);
/* set up infrastructure for calling the transfn and finalfn */
@@ -1847,7 +1849,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* type and transtype are the same (or at least binary-compatible), so
* that it's OK to use the first aggregated input value as the initial
* transValue. This should have been checked at agg definition time,
- * but just in case...
+ * but we must check again in case the transfn's strictness property
+ * has been changed.
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
@@ -2126,6 +2129,12 @@ ExecReScanAgg(AggState *node)
ExecReScan(node->ss.ps.lefttree);
}
+
+/***********************************************************************
+ * API exposed to aggregate functions
+ ***********************************************************************/
+
+
/*
* AggCheckCallContext - test if a SQL function is being called as an aggregate
*
@@ -2152,7 +2161,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
{
if (aggcontext)
- *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext;
+ *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
return AGG_CONTEXT_WINDOW;
}
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0b558e59231..046637fb092 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData
*/
typedef struct WindowStatePerAggData
{
- /* Oids of transfer functions */
+ /* Oids of transition functions */
Oid transfn_oid;
+ Oid invtransfn_oid; /* may be InvalidOid */
Oid finalfn_oid; /* may be InvalidOid */
/*
- * fmgr lookup data for transfer functions --- only valid when
+ * fmgr lookup data for transition functions --- only valid when
* corresponding oid is not InvalidOid. Note in particular that fn_strict
* flags are kept here.
*/
FmgrInfo transfn;
+ FmgrInfo invtransfn;
FmgrInfo finalfn;
/*
@@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData
int wfuncno; /* index of associated PerFuncData */
+ /* Context holding transition value and possibly other subsidiary data */
+ MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
+
/* Current transition value */
Datum transValue; /* current transition value */
bool transValueIsNull;
- bool noTransValue; /* true if transValue not set yet */
+ int64 transValueCount; /* number of currently-aggregated rows */
+
+ /* Data local to eval_windowaggregates() */
+ bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
static void initialize_windowaggregate(WindowAggState *winstate,
@@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate,
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
+static bool advance_windowaggregate_base(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
static void finalize_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate,
@@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate,
{
MemoryContext oldContext;
+ /*
+ * If we're using a private aggcontext, we may reset it here. But if the
+ * context is shared, we don't know which other aggregates may still need
+ * it, so we must leave it to the caller to reset at an appropriate time.
+ */
+ if (peraggstate->aggcontext != winstate->aggcontext)
+ MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
+
if (peraggstate->initValueIsNull)
peraggstate->transValue = peraggstate->initValue;
else
{
- oldContext = MemoryContextSwitchTo(winstate->aggcontext);
+ oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->transValue = datumCopy(peraggstate->initValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
MemoryContextSwitchTo(oldContext);
}
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
- peraggstate->noTransValue = peraggstate->initValueIsNull;
+ peraggstate->transValueCount = 0;
+ peraggstate->resultValue = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate,
{
/*
* For a strict transfn, nothing happens when there's a NULL input; we
- * just keep the prior transValue.
+ * just keep the prior transValue. Note transValueCount doesn't
+ * change either.
*/
for (i = 1; i <= numArguments; i++)
{
@@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate,
return;
}
}
- if (peraggstate->noTransValue)
+
+ /*
+ * For strict transition functions with initial value NULL we use the
+ * first non-NULL input as the initial state. (We already checked
+ * that the agg's input type is binary-compatible with its transtype,
+ * so straight copy here is OK.)
+ *
+ * We must copy the datum into aggcontext if it is pass-by-ref. We do
+ * not need to pfree the old transValue, since it's NULL.
+ */
+ if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
{
- /*
- * transValue has not been initialized. This is the first non-NULL
- * input value. We use it as the initial value for transValue. (We
- * already checked that the agg's input type is binary-compatible
- * with its transtype, so straight copy here is OK.)
- *
- * We must copy the datum into aggcontext if it is pass-by-ref. We
- * do not need to pfree the old transValue, since it's NULL.
- */
- MemoryContextSwitchTo(winstate->aggcontext);
+ MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->transValue = datumCopy(fcinfo->arg[1],
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
peraggstate->transValueIsNull = false;
- peraggstate->noTransValue = false;
+ peraggstate->transValueCount = 1;
MemoryContextSwitchTo(oldContext);
return;
}
+
if (peraggstate->transValueIsNull)
{
/*
* Don't call a strict function with NULL inputs. Note it is
* possible to get here despite the above tests, if the transfn is
- * strict *and* returned a NULL on a prior cycle. If that happens
- * we will propagate the NULL all the way to the end.
+ * strict *and* returned a NULL on a prior cycle. If that happens
+ * we will propagate the NULL all the way to the end. That can
+ * only happen if there's no inverse transition function, though,
+ * since we disallow transitions back to NULL when there is one.
*/
MemoryContextSwitchTo(oldContext);
+ Assert(!OidIsValid(peraggstate->invtransfn_oid));
return;
}
}
/*
- * OK to call the transition function
+ * OK to call the transition function. Set winstate->curaggcontext while
+ * calling it, for possible use by AggCheckCallContext.
*/
InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
numArguments + 1,
@@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate,
(void *) winstate, NULL);
fcinfo->arg[0] = peraggstate->transValue;
fcinfo->argnull[0] = peraggstate->transValueIsNull;
+ winstate->curaggcontext = peraggstate->aggcontext;
newVal = FunctionCallInvoke(fcinfo);
+ winstate->curaggcontext = NULL;
+
+ /*
+ * Moving-aggregate transition functions must not return NULL, see
+ * advance_windowaggregate_base().
+ */
+ if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("moving-aggregate transition function must not return NULL")));
+
+ /*
+ * We must track the number of rows included in transValue, since to
+ * remove the last input, advance_windowaggregate_base() musn't call the
+ * inverse transition function, but simply reset transValue back to its
+ * initial value.
+ */
+ peraggstate->transValueCount++;
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
@@ -322,7 +368,161 @@ advance_windowaggregate(WindowAggState *winstate,
{
if (!fcinfo->isnull)
{
- MemoryContextSwitchTo(winstate->aggcontext);
+ MemoryContextSwitchTo(peraggstate->aggcontext);
+ newVal = datumCopy(newVal,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ }
+ if (!peraggstate->transValueIsNull)
+ pfree(DatumGetPointer(peraggstate->transValue));
+ }
+
+ MemoryContextSwitchTo(oldContext);
+ peraggstate->transValue = newVal;
+ peraggstate->transValueIsNull = fcinfo->isnull;
+}
+
+/*
+ * advance_windowaggregate_base
+ * Remove the oldest tuple from an aggregation.
+ *
+ * This is very much like advance_windowaggregate, except that we will call
+ * the inverse transition function (which caller must have checked is
+ * available).
+ *
+ * Returns true if we successfully removed the current row from this
+ * aggregate, false if not (in the latter case, caller is responsible
+ * for cleaning up by restarting the aggregation).
+ */
+static bool
+advance_windowaggregate_base(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
+ int numArguments = perfuncstate->numArguments;
+ FunctionCallInfoData fcinfodata;
+ FunctionCallInfo fcinfo = &fcinfodata;
+ Datum newVal;
+ ListCell *arg;
+ int i;
+ MemoryContext oldContext;
+ ExprContext *econtext = winstate->tmpcontext;
+ ExprState *filter = wfuncstate->aggfilter;
+
+ oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+ /* Skip anything FILTERed out */
+ if (filter)
+ {
+ bool isnull;
+ Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ return true;
+ }
+ }
+
+ /* We start from 1, since the 0th arg will be the transition value */
+ i = 1;
+ foreach(arg, wfuncstate->args)
+ {
+ ExprState *argstate = (ExprState *) lfirst(arg);
+
+ fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
+ &fcinfo->argnull[i], NULL);
+ i++;
+ }
+
+ if (peraggstate->invtransfn.fn_strict)
+ {
+ /*
+ * For a strict (inv)transfn, nothing happens when there's a NULL
+ * input; we just keep the prior transValue. Note transValueCount
+ * doesn't change either.
+ */
+ for (i = 1; i <= numArguments; i++)
+ {
+ if (fcinfo->argnull[i])
+ {
+ MemoryContextSwitchTo(oldContext);
+ return true;
+ }
+ }
+ }
+
+ /* There should still be an added but not yet removed value */
+ Assert(peraggstate->transValueCount > 0);
+
+ /*
+ * In moving-aggregate mode, the state must never be NULL, except possibly
+ * before any rows have been aggregated (which is surely not the case at
+ * this point). This restriction allows us to interpret a NULL result
+ * from the inverse function as meaning "sorry, can't do an inverse
+ * transition in this case". We already checked this in
+ * advance_windowaggregate, but just for safety, check again.
+ */
+ if (peraggstate->transValueIsNull)
+ elog(ERROR, "aggregate transition value is NULL before inverse transition");
+
+ /*
+ * We mustn't use the inverse transition function to remove the last
+ * input. Doing so would yield a non-NULL state, whereas we should be in
+ * the initial state afterwards which may very well be NULL. So instead,
+ * we simply re-initialize the aggregate in this case.
+ */
+ if (peraggstate->transValueCount == 1)
+ {
+ MemoryContextSwitchTo(oldContext);
+ initialize_windowaggregate(winstate,
+ &winstate->perfunc[peraggstate->wfuncno],
+ peraggstate);
+ return true;
+ }
+
+ /*
+ * OK to call the inverse transition function. Set
+ * winstate->curaggcontext while calling it, for possible use by
+ * AggCheckCallContext.
+ */
+ InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
+ numArguments + 1,
+ perfuncstate->winCollation,
+ (void *) winstate, NULL);
+ fcinfo->arg[0] = peraggstate->transValue;
+ fcinfo->argnull[0] = peraggstate->transValueIsNull;
+ winstate->curaggcontext = peraggstate->aggcontext;
+ newVal = FunctionCallInvoke(fcinfo);
+ winstate->curaggcontext = NULL;
+
+ /*
+ * If the function returns NULL, report failure, forcing a restart.
+ */
+ if (fcinfo->isnull)
+ {
+ MemoryContextSwitchTo(oldContext);
+ return false;
+ }
+
+ /* Update number of rows included in transValue */
+ peraggstate->transValueCount--;
+
+ /*
+ * If pass-by-ref datatype, must copy the new value into aggcontext and
+ * pfree the prior transValue. But if invtransfn returned a pointer to
+ * its first input, we don't need to do anything.
+ *
+ * Note: the checks for null values here will never fire, but it seems
+ * best to have this stanza look just like advance_windowaggregate.
+ */
+ if (!peraggstate->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
+ {
+ if (!fcinfo->isnull)
+ {
+ MemoryContextSwitchTo(peraggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -334,6 +534,8 @@ advance_windowaggregate(WindowAggState *winstate,
MemoryContextSwitchTo(oldContext);
peraggstate->transValue = newVal;
peraggstate->transValueIsNull = fcinfo->isnull;
+
+ return true;
}
/*
@@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate,
}
else
{
+ winstate->curaggcontext = peraggstate->aggcontext;
*result = FunctionCallInvoke(&fcinfo);
+ winstate->curaggcontext = NULL;
*isnull = fcinfo.isnull;
}
}
@@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate,
* eval_windowaggregates
* evaluate plain aggregates being used as window functions
*
- * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be
+ * This differs from nodeAgg.c in two ways. First, if the window's frame
+ * start position moves, we use the inverse transition function (if it exists)
+ * to remove rows from the transition value. And second, we expect to be
* able to call aggregate final functions repeatedly after aggregating more
* data onto the same transition value. This is not a behavior required by
* nodeAgg.c.
@@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate)
{
WindowStatePerAgg peraggstate;
int wfuncno,
- numaggs;
- int i;
+ numaggs,
+ numaggs_restart,
+ i;
+ int64 aggregatedupto_nonrestarted;
MemoryContext oldContext;
ExprContext *econtext;
WindowObject agg_winobj;
TupleTableSlot *agg_row_slot;
+ TupleTableSlot *temp_slot;
numaggs = winstate->numaggs;
if (numaggs == 0)
@@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate)
econtext = winstate->ss.ps.ps_ExprContext;
agg_winobj = winstate->agg_winobj;
agg_row_slot = winstate->agg_row_slot;
+ temp_slot = winstate->temp_slot_1;
/*
* Currently, we support only a subset of the SQL-standard window framing
@@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate)
* damage the running transition value, but we have the same assumption in
* nodeAgg.c too (when it rescans an existing hash table).
*
- * For other frame start rules, we discard the aggregate state and re-run
- * the aggregates whenever the frame head row moves. We can still
- * optimize as above whenever successive rows share the same frame head.
+ * If the frame start does sometimes move, we can still optimize as above
+ * whenever successive rows share the same frame head, but if the frame
+ * head moves beyond the previous head we try to remove those rows using
+ * the aggregate's inverse transition function. This function restores
+ * the aggregate's current state to what it would be if the removed row
+ * had never been aggregated in the first place. Inverse transition
+ * functions may optionally return NULL, indicating that the function was
+ * unable to remove the tuple from aggregation. If this happens, or if
+ * the aggregate doesn't have an inverse transition function at all, we
+ * must perform the aggregation all over again for all tuples within the
+ * new frame boundaries.
*
* In many common cases, multiple rows share the same frame and hence the
* same aggregate value. (In particular, if there's no ORDER BY in a RANGE
@@ -452,75 +670,195 @@ eval_windowaggregates(WindowAggState *winstate)
* 'aggregatedupto' keeps track of the first row that has not yet been
* accumulated into the aggregate transition values. Whenever we start a
* new peer group, we accumulate forward to the end of the peer group.
- *
- * TODO: Rerunning aggregates from the frame start can be pretty slow. For
- * some aggregates like SUM and COUNT we could avoid that by implementing
- * a "negative transition function" that would be called for each row as
- * it exits the frame. We'd have to think about avoiding recalculation of
- * volatile arguments of aggregate functions, too.
*/
/*
* First, update the frame head position.
+ *
+ * The frame head should never move backwards, and the code below wouldn't
+ * cope if it did, so for safety we complain if it does.
*/
- update_frameheadpos(agg_winobj, winstate->temp_slot_1);
+ update_frameheadpos(agg_winobj, temp_slot);
+ if (winstate->frameheadpos < winstate->aggregatedbase)
+ elog(ERROR, "window frame head moved backward");
/*
- * Initialize aggregates on first call for partition, or if the frame head
- * position moved since last time.
+ * If the frame didn't change compared to the previous row, we can re-use
+ * the result values that were previously saved at the bottom of this
+ * function. Since we don't know the current frame's end yet, this is not
+ * possible to check for fully. But if the frame end mode is UNBOUNDED
+ * FOLLOWING or CURRENT ROW, and the current row lies within the previous
+ * row's frame, then the two frames' ends must coincide. Note that on the
+ * first row aggregatedbase == aggregatedupto, meaning this test must
+ * fail, so we don't need to check the "there was no previous row" case
+ * explicitly here.
*/
- if (winstate->currentpos == 0 ||
- winstate->frameheadpos != winstate->aggregatedbase)
+ if (winstate->aggregatedbase == winstate->frameheadpos &&
+ (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
+ FRAMEOPTION_END_CURRENT_ROW)) &&
+ winstate->aggregatedbase <= winstate->currentpos &&
+ winstate->aggregatedupto > winstate->currentpos)
{
- /*
- * Discard transient aggregate values
- */
- MemoryContextResetAndDeleteChildren(winstate->aggcontext);
-
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- initialize_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
+ econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
}
+ return;
+ }
+ /*----------
+ * Initialize restart flags.
+ *
+ * We restart the aggregation:
+ * - if we're processing the first row in the partition, or
+ * - if the frame's head moved and we cannot use an inverse
+ * transition function, or
+ * - if the new frame doesn't overlap the old one
+ *
+ * Note that we don't strictly need to restart in the last case, but if
+ * we're going to remove all rows from the aggregation anyway, a restart
+ * surely is faster.
+ *----------
+ */
+ numaggs_restart = 0;
+ for (i = 0; i < numaggs; i++)
+ {
+ peraggstate = &winstate->peragg[i];
+ if (winstate->currentpos == 0 ||
+ (winstate->aggregatedbase != winstate->frameheadpos &&
+ !OidIsValid(peraggstate->invtransfn_oid)) ||
+ winstate->aggregatedupto <= winstate->frameheadpos)
+ {
+ peraggstate->restart = true;
+ numaggs_restart++;
+ }
+ else
+ peraggstate->restart = false;
+ }
+
+ /*
+ * If we have any possibly-moving aggregates, attempt to advance
+ * aggregatedbase to match the frame's head by removing input rows that
+ * fell off the top of the frame from the aggregations. This can fail,
+ * i.e. advance_windowaggregate_base() can return false, in which case
+ * we'll restart that aggregate below.
+ */
+ while (numaggs_restart < numaggs &&
+ winstate->aggregatedbase < winstate->frameheadpos)
+ {
/*
- * If we created a mark pointer for aggregates, keep it pushed up to
- * frame head, so that tuplestore can discard unnecessary rows.
+ * Fetch the next tuple of those being removed. This should never fail
+ * as we should have been here before.
*/
- if (agg_winobj->markptr >= 0)
- WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
+ if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
+ temp_slot))
+ elog(ERROR, "could not re-fetch previously fetched frame row");
+
+ /* Set tuple context for evaluation of aggregate arguments */
+ winstate->tmpcontext->ecxt_outertuple = temp_slot;
/*
- * Initialize for loop below
+ * Perform the inverse transition for each aggregate function in the
+ * window, unless it has already been marked as needing a restart.
*/
- ExecClearTuple(agg_row_slot);
- winstate->aggregatedbase = winstate->frameheadpos;
- winstate->aggregatedupto = winstate->frameheadpos;
+ for (i = 0; i < numaggs; i++)
+ {
+ bool ok;
+
+ peraggstate = &winstate->peragg[i];
+ if (peraggstate->restart)
+ continue;
+
+ wfuncno = peraggstate->wfuncno;
+ ok = advance_windowaggregate_base(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
+ if (!ok)
+ {
+ /* Inverse transition function has failed, must restart */
+ peraggstate->restart = true;
+ numaggs_restart++;
+ }
+ }
+
+ /* Reset per-input-tuple context after each tuple */
+ ResetExprContext(winstate->tmpcontext);
+
+ /* And advance the aggregated-row state */
+ winstate->aggregatedbase++;
+ ExecClearTuple(temp_slot);
}
/*
- * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
- * except when the frame head moves. In END_CURRENT_ROW mode, we only
- * have to recalculate when the frame head moves or currentpos has
- * advanced past the place we'd aggregated up to. Check for these cases
- * and if so, reuse the saved result values.
+ * If we successfully advanced the base rows of all the aggregates,
+ * aggregatedbase now equals frameheadpos; but if we failed for any, we
+ * must forcibly update aggregatedbase.
*/
- if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
- FRAMEOPTION_END_CURRENT_ROW)) &&
- winstate->aggregatedbase <= winstate->currentpos &&
- winstate->aggregatedupto > winstate->currentpos)
+ winstate->aggregatedbase = winstate->frameheadpos;
+
+ /*
+ * If we created a mark pointer for aggregates, keep it pushed up to frame
+ * head, so that tuplestore can discard unnecessary rows.
+ */
+ if (agg_winobj->markptr >= 0)
+ WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
+
+ /*
+ * Now restart the aggregates that require it.
+ *
+ * We assume that aggregates using the shared context always restart if
+ * *any* aggregate restarts, and we may thus clean up the shared
+ * aggcontext if that is the case. Private aggcontexts are reset by
+ * initialize_windowaggregate() if their owning aggregate restarts. If we
+ * aren't restarting an aggregate, we need to free any previously saved
+ * result for it, else we'll leak memory.
+ */
+ if (numaggs_restart > 0)
+ MemoryContextResetAndDeleteChildren(winstate->aggcontext);
+ for (i = 0; i < numaggs; i++)
{
- for (i = 0; i < numaggs; i++)
+ peraggstate = &winstate->peragg[i];
+
+ /* Aggregates using the shared ctx must restart if *any* agg does */
+ Assert(peraggstate->aggcontext != winstate->aggcontext ||
+ numaggs_restart == 0 ||
+ peraggstate->restart);
+
+ if (peraggstate->restart)
{
- peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
- econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
+ initialize_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
+ }
+ else if (!peraggstate->resultValueIsNull)
+ {
+ if (!peraggstate->resulttypeByVal)
+ pfree(DatumGetPointer(peraggstate->resultValue));
+ peraggstate->resultValue = (Datum) 0;
+ peraggstate->resultValueIsNull = true;
}
- return;
+ }
+
+ /*
+ * Non-restarted aggregates now contain the rows between aggregatedbase
+ * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
+ * contain no rows. If there are any restarted aggregates, we must thus
+ * begin aggregating anew at frameheadpos, otherwise we may simply
+ * continue at aggregatedupto. We must remember the old value of
+ * aggregatedupto to know how long to skip advancing non-restarted
+ * aggregates. If we modify aggregatedupto, we must also clear
+ * agg_row_slot, per the loop invariant below.
+ */
+ aggregatedupto_nonrestarted = winstate->aggregatedupto;
+ if (numaggs_restart > 0 &&
+ winstate->aggregatedupto != winstate->frameheadpos)
+ {
+ winstate->aggregatedupto = winstate->frameheadpos;
+ ExecClearTuple(agg_row_slot);
}
/*
@@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate)
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
+
+ /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
+ if (!peraggstate->restart &&
+ winstate->aggregatedupto < aggregatedupto_nonrestarted)
+ continue;
+
wfuncno = peraggstate->wfuncno;
advance_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
@@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate)
ExecClearTuple(agg_row_slot);
}
+ /* The frame's end is not supposed to move backwards, ever */
+ Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
+
/*
* finalize aggregates and fill result/isnull fields.
*/
@@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate)
* advance that the next row can't possibly share the same frame. Is
* it worth detecting that and skipping this code?
*/
- if (!peraggstate->resulttypeByVal)
+ if (!peraggstate->resulttypeByVal && !*isnull)
{
- /*
- * clear old resultValue in order not to leak memory. (Note: the
- * new result can't possibly be the same datum as old resultValue,
- * because we never passed it to the trans function.)
- */
- if (!peraggstate->resultValueIsNull)
- pfree(DatumGetPointer(peraggstate->resultValue));
-
- /*
- * If pass-by-ref, copy it into our aggregate context.
- */
- if (!*isnull)
- {
- oldContext = MemoryContextSwitchTo(winstate->aggcontext);
- peraggstate->resultValue =
- datumCopy(*result,
- peraggstate->resulttypeByVal,
- peraggstate->resulttypeLen);
- MemoryContextSwitchTo(oldContext);
- }
+ oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
+ peraggstate->resultValue =
+ datumCopy(*result,
+ peraggstate->resulttypeByVal,
+ peraggstate->resulttypeLen);
+ MemoryContextSwitchTo(oldContext);
}
else
{
@@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
(void *) perfuncstate->winobj, NULL);
/* Just in case, make all the regular argument slots be null */
memset(fcinfo.argnull, true, perfuncstate->numArguments);
+ /* Window functions don't have a current aggregate context, either */
+ winstate->curaggcontext = NULL;
*result = FunctionCallInvoke(&fcinfo);
*isnull = fcinfo.isnull;
@@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate)
*/
MemoryContextResetAndDeleteChildren(winstate->partcontext);
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
+ for (i = 0; i < winstate->numaggs; i++)
+ {
+ if (winstate->peragg[i].aggcontext != winstate->aggcontext)
+ MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
+ }
if (winstate->buffer)
tuplestore_end(winstate->buffer);
@@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- /* Create mid-lived context for aggregate trans values etc */
+ /*
+ * Create mid-lived context for aggregate trans values etc.
+ *
+ * Note that moving aggregates each use their own private context, not
+ * this one.
+ */
winstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
"WindowAgg_Aggregates",
@@ -1657,12 +2002,10 @@ void
ExecEndWindowAgg(WindowAggState *node)
{
PlanState *outerPlan;
+ int i;
release_partition(node);
- pfree(node->perfunc);
- pfree(node->peragg);
-
ExecClearTuple(node->ss.ss_ScanTupleSlot);
ExecClearTuple(node->first_part_slot);
ExecClearTuple(node->agg_row_slot);
@@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node)
node->ss.ps.ps_ExprContext = node->tmpcontext;
ExecFreeExprContext(&node->ss.ps);
+ for (i = 0; i < node->numaggs; i++)
+ {
+ if (node->peragg[i].aggcontext != node->aggcontext)
+ MemoryContextDelete(node->peragg[i].aggcontext);
+ }
MemoryContextDelete(node->partcontext);
MemoryContextDelete(node->aggcontext);
+ pfree(node->perfunc);
+ pfree(node->peragg);
+
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
}
@@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
HeapTuple aggTuple;
Form_pg_aggregate aggform;
Oid aggtranstype;
+ AttrNumber initvalAttNo;
AclResult aclresult;
Oid transfn_oid,
+ invtransfn_oid,
finalfn_oid;
Expr *transfnexpr,
+ *invtransfnexpr,
*finalfnexpr;
Datum textInitVal;
int i;
@@ -1757,13 +2111,39 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
/*
+ * Figure out whether we want to use the moving-aggregate implementation,
+ * and collect the right set of fields from the pg_attribute entry.
+ *
+ * If the frame head can't move, we don't need moving-aggregate code. Even
+ * if we'd like to use it, don't do so if the aggregate's arguments (and
+ * FILTER clause if any) contain any calls to volatile functions.
+ * Otherwise, the difference between restarting and not restarting the
+ * aggregation would be user-visible.
+ */
+ if (OidIsValid(aggform->aggminvtransfn) &&
+ !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
+ !contain_volatile_functions((Node *) wfunc))
+ {
+ peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
+ peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
+ peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
+ aggtranstype = aggform->aggmtranstype;
+ initvalAttNo = Anum_pg_aggregate_aggminitval;
+ }
+ else
+ {
+ peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
+ peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
+ peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
+ aggtranstype = aggform->aggtranstype;
+ initvalAttNo = Anum_pg_aggregate_agginitval;
+ }
+
+ /*
* ExecInitWindowAgg already checked permission to call aggregate function
* ... but we still need to check the component functions
*/
- peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
- peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
-
/* Check that aggregate owner has permission to call component fns */
{
HeapTuple procTuple;
@@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(transfn_oid));
InvokeFunctionExecuteHook(transfn_oid);
+
+ if (OidIsValid(invtransfn_oid))
+ {
+ aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(invtransfn_oid));
+ InvokeFunctionExecuteHook(invtransfn_oid);
+ }
+
if (OidIsValid(finalfn_oid))
{
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
@@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
/* resolve actual type of transition state, if polymorphic */
aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
- aggform->aggtranstype,
+ aggtranstype,
inputTypes,
numArguments);
@@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->wintype,
wfunc->inputcollid,
transfn_oid,
+ invtransfn_oid,
finalfn_oid,
&transfnexpr,
+ &invtransfnexpr,
&finalfnexpr);
fmgr_info(transfn_oid, &peraggstate->transfn);
fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
+ if (OidIsValid(invtransfn_oid))
+ {
+ fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
+ fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
+ }
+
if (OidIsValid(finalfn_oid))
{
fmgr_info(finalfn_oid, &peraggstate->finalfn);
@@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* initval is potentially null, so don't try to access it as a struct
* field. Must do it the hard way with SysCacheGetAttr.
*/
- textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
- Anum_pg_aggregate_agginitval,
+ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
&peraggstate->initValueIsNull);
if (peraggstate->initValueIsNull)
@@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* If the transfn is strict and the initval is NULL, make sure input type
* and transtype are the same (or at least binary-compatible), so that
* it's OK to use the first input value as the initial transValue. This
- * should have been checked at agg definition time, but just in case...
+ * should have been checked at agg definition time, but we must check
+ * again in case the transfn's strictness property has been changed.
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
@@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->winfnoid)));
}
+ /*
+ * Insist that forward and inverse transition functions have the same
+ * strictness setting. Allowing them to differ would require handling
+ * more special cases in advance_windowaggregate and
+ * advance_windowaggregate_base, for no discernible benefit. This should
+ * have been checked at agg definition time, but we must check again in
+ * case either function's strictness property has been changed.
+ */
+ if (OidIsValid(invtransfn_oid) &&
+ peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("strictness of aggregate's forward and inverse transition functions must match")));
+
+ /*
+ * Moving aggregates use their own aggcontext.
+ *
+ * This is necessary because they might restart at different times, so we
+ * might never be able to reset the shared context otherwise. We can't
+ * make it the aggregates' responsibility to clean up after themselves,
+ * because strict aggregates must be restarted whenever we remove their
+ * last non-NULL input, which the aggregate won't be aware is happening.
+ * Also, just pfree()ing the transValue upon restarting wouldn't help,
+ * since we'd miss any indirectly referenced data. We could, in theory,
+ * make the memory allocation rules for moving aggregates different than
+ * they have historically been for plain aggregates, but that seems grotty
+ * and likely to lead to memory leaks.
+ */
+ if (OidIsValid(invtransfn_oid))
+ peraggstate->aggcontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "WindowAgg_AggregatePrivate",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ else
+ peraggstate->aggcontext = winstate->aggcontext;
+
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 201529b885a..3f307e6464c 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -471,7 +471,11 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
Assert(aggref->agglevelsup == 0);
- /* fetch info about aggregate from pg_aggregate */
+ /*
+ * Fetch info about aggregate from pg_aggregate. Note it's correct to
+ * ignore the moving-aggregate variant, since what we're concerned
+ * with here is aggregates not window functions.
+ */
aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(aggTuple))
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 9613e2aec81..272d27f919e 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1187,11 +1187,13 @@ resolve_aggregate_transtype(Oid aggfuncid,
* For an ordered-set aggregate, remember that agg_input_types describes
* the direct arguments followed by the aggregated arguments.
*
- * transfn_oid and finalfn_oid identify the funcs to be called; the latter
- * may be InvalidOid.
+ * transfn_oid, invtransfn_oid and finalfn_oid identify the funcs to be
+ * called; the latter two may be InvalidOid.
*
- * Pointers to the constructed trees are returned into *transfnexpr and
- * *finalfnexpr. The latter is set to NULL if there's no finalfn.
+ * Pointers to the constructed trees are returned into *transfnexpr,
+ * *invtransfnexpr and *finalfnexpr. If there is no invtransfn or finalfn,
+ * the respective pointers are set to NULL. Since use of the invtransfn is
+ * optional, NULL may be passed for invtransfnexpr.
*/
void
build_aggregate_fnexprs(Oid *agg_input_types,
@@ -1203,8 +1205,10 @@ build_aggregate_fnexprs(Oid *agg_input_types,
Oid agg_result_type,
Oid agg_input_collation,
Oid transfn_oid,
+ Oid invtransfn_oid,
Oid finalfn_oid,
Expr **transfnexpr,
+ Expr **invtransfnexpr,
Expr **finalfnexpr)
{
Param *argp;
@@ -1249,6 +1253,26 @@ build_aggregate_fnexprs(Oid *agg_input_types,
fexpr->funcvariadic = agg_variadic;
*transfnexpr = (Expr *) fexpr;
+ /*
+ * Build invtransfn expression if requested, with same args as transfn
+ */
+ if (invtransfnexpr != NULL)
+ {
+ if (OidIsValid(invtransfn_oid))
+ {
+ fexpr = makeFuncExpr(invtransfn_oid,
+ agg_state_type,
+ args,
+ InvalidOid,
+ agg_input_collation,
+ COERCE_EXPLICIT_CALL);
+ fexpr->funcvariadic = agg_variadic;
+ *invtransfnexpr = (Expr *) fexpr;
+ }
+ else
+ *invtransfnexpr = NULL;
+ }
+
/* see if we have a final function */
if (!OidIsValid(finalfn_oid))
{