@@ -595,6 +595,53 @@ static bool is_vops_type(Oid typeid)
595
595
PG_RETURN_POINTER(state); \
596
596
}
597
597
598
+
599
+ static void
600
+ vops_avg_state_accumulate (vops_avg_state * state , float8 newval )
601
+ {
602
+ float8 N ,
603
+ Sx ,
604
+ Sxx ,
605
+ tmp ;
606
+
607
+ N = state -> N ;
608
+ Sx = state -> Sx ;
609
+ Sxx = state -> Sxx ;
610
+
611
+ /*
612
+ * Use the Youngs-Cramer algorithm to incorporate the new value into the
613
+ * transition values.
614
+ */
615
+ N += 1.0 ;
616
+ Sx += newval ;
617
+ if (state -> N > 0.0 )
618
+ {
619
+ tmp = newval * N - Sx ;
620
+ Sxx += tmp * tmp / (N * state -> N );
621
+
622
+ /*
623
+ * Overflow check. We only report an overflow error when finite
624
+ * inputs lead to infinite results. Note also that Sxx should be NaN
625
+ * if any of the inputs are infinite, so we intentionally prevent Sxx
626
+ * from becoming infinite.
627
+ */
628
+ if (isinf (Sx ) || isinf (Sxx ))
629
+ {
630
+ if (!isinf (state -> Sx ) && !isinf (newval ))
631
+ ereport (ERROR ,
632
+ (errcode (ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE ),
633
+ errmsg ("value out of range: overflow" )));
634
+
635
+ Sxx = get_float8_nan ();
636
+ }
637
+ }
638
+
639
+ state -> N = N ;
640
+ state -> Sx = Sx ;
641
+ state -> Sxx = Sxx ;
642
+ }
643
+
644
+
598
645
#define AVG_AGG (TYPE ) \
599
646
PG_FUNCTION_INFO_V1(vops_##TYPE##_avg_accumulate); \
600
647
Datum vops_##TYPE##_avg_accumulate(PG_FUNCTION_ARGS) \
@@ -611,8 +658,7 @@ static bool is_vops_type(Oid typeid)
611
658
elog(ERROR, "aggregate function called in non-aggregate context"); \
612
659
state = (vops_avg_state*)MemoryContextAllocZero(agg_context, sizeof(vops_avg_state)); \
613
660
} \
614
- state->count += 1; \
615
- state->sum += opd->payload[i]; \
661
+ vops_avg_state_accumulate(state, opd->payload[i]); \
616
662
} \
617
663
} \
618
664
if (state == NULL) { \
@@ -1208,7 +1254,75 @@ PG_FUNCTION_INFO_V1(vops_avg_final);
1208
1254
Datum vops_avg_final (PG_FUNCTION_ARGS )
1209
1255
{
1210
1256
vops_avg_state * state = (vops_avg_state * )PG_GETARG_POINTER (0 );
1211
- PG_RETURN_FLOAT8 (state -> sum / state -> count );
1257
+ if (state -> N == 0.0 )
1258
+ PG_RETURN_NULL ();
1259
+ PG_RETURN_FLOAT8 (state -> Sx / state -> N );
1260
+ }
1261
+
1262
+ static void
1263
+ vops_agg_state_combine (vops_avg_state * state1 , vops_avg_state * state2 )
1264
+ {
1265
+ float8 N1 ,
1266
+ Sx1 ,
1267
+ Sxx1 ,
1268
+ N2 ,
1269
+ Sx2 ,
1270
+ Sxx2 ,
1271
+ tmp ,
1272
+ N ,
1273
+ Sx ,
1274
+ Sxx ;
1275
+
1276
+ N1 = state1 -> N ;
1277
+ Sx1 = state1 -> Sx ;
1278
+ Sxx1 = state1 -> Sxx ;
1279
+
1280
+ N2 = state2 -> N ;
1281
+ Sx2 = state2 -> Sx ;
1282
+ Sxx2 = state2 -> Sxx ;
1283
+
1284
+ /*--------------------
1285
+ * The transition values combine using a generalization of the
1286
+ * Youngs-Cramer algorithm as follows:
1287
+ *
1288
+ * N = N1 + N2
1289
+ * Sx = Sx1 + Sx2
1290
+ * Sxx = Sxx1 + Sxx2 + N1 * N2 * (Sx1/N1 - Sx2/N2)^2 / N;
1291
+ *
1292
+ * It's worth handling the special cases N1 = 0 and N2 = 0 separately
1293
+ * since those cases are trivial, and we then don't need to worry about
1294
+ * division-by-zero errors in the general case.
1295
+ *--------------------
1296
+ */
1297
+ if (N1 == 0.0 )
1298
+ {
1299
+ N = N2 ;
1300
+ Sx = Sx2 ;
1301
+ Sxx = Sxx2 ;
1302
+ }
1303
+ else if (N2 == 0.0 )
1304
+ {
1305
+ N = N1 ;
1306
+ Sx = Sx1 ;
1307
+ Sxx = Sxx1 ;
1308
+ }
1309
+ else
1310
+ {
1311
+ N = N1 + N2 ;
1312
+ Sx = float8_pl (Sx1 , Sx2 );
1313
+ tmp = Sx1 / N1 - Sx2 / N2 ;
1314
+ Sxx = Sxx1 + Sxx2 + N1 * N2 * tmp * tmp / N ;
1315
+ check_float8_val (Sxx , isinf (Sxx1 ) || isinf (Sxx2 ), true);
1316
+ }
1317
+
1318
+ /*
1319
+ * If we're invoked as an aggregate, we can cheat and modify our first
1320
+ * parameter in-place to reduce palloc overhead. Otherwise we construct a
1321
+ * new array with the updated transition data and return it.
1322
+ */
1323
+ state1 -> N = N ;
1324
+ state1 -> Sx = Sx ;
1325
+ state1 -> Sxx = Sxx ;
1212
1326
}
1213
1327
1214
1328
PG_FUNCTION_INFO_V1 (vops_avg_combine );
@@ -1227,8 +1341,7 @@ Datum vops_avg_combine(PG_FUNCTION_ARGS)
1227
1341
* state0 = * state1 ;
1228
1342
}
1229
1343
} else if (state1 != NULL ) {
1230
- state0 -> sum += state1 -> sum ;
1231
- state0 -> count += state1 -> count ;
1344
+ vops_agg_state_combine (state0 , state1 );
1232
1345
}
1233
1346
PG_RETURN_POINTER (state0 );
1234
1347
}
@@ -1240,8 +1353,9 @@ Datum vops_avg_serial(PG_FUNCTION_ARGS)
1240
1353
StringInfoData buf ;
1241
1354
bytea * result ;
1242
1355
pq_begintypsend (& buf );
1243
- pq_sendint64 (& buf , state -> count );
1244
- pq_sendfloat8 (& buf , state -> sum );
1356
+ pq_sendfloat8 (& buf , state -> N );
1357
+ pq_sendfloat8 (& buf , state -> Sx );
1358
+ pq_sendfloat8 (& buf , state -> Sxx );
1245
1359
result = pq_endtypsend (& buf );
1246
1360
PG_RETURN_BYTEA_P (result );
1247
1361
}
@@ -1256,8 +1370,9 @@ Datum vops_avg_deserial(PG_FUNCTION_ARGS)
1256
1370
initStringInfo (& buf );
1257
1371
appendBinaryStringInfo (& buf , VARDATA (sstate ), VARSIZE (sstate ) - VARHDRSZ );
1258
1372
1259
- state -> count = pq_getmsgint64 (& buf );
1260
- state -> sum = pq_getmsgfloat8 (& buf );
1373
+ state -> N = pq_getmsgfloat8 (& buf );
1374
+ state -> Sx = pq_getmsgfloat8 (& buf );
1375
+ state -> Sxx = pq_getmsgfloat8 (& buf );
1261
1376
1262
1377
pq_getmsgend (& buf );
1263
1378
pfree (buf .data );
0 commit comments