Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Herrera2015-05-07 16:02:22 +0000
committerAlvaro Herrera2015-05-07 16:02:22 +0000
commitdb5f98ab4fa44bc563ec62d7b1aada4fc276d9b2 (patch)
tree0b05cf901eed7ffab21935a5f6491e8215bbe670 /src/backend
parent7be47c56af3d3013955c91c2877c08f2a0e3e6a2 (diff)
Improve BRIN infra, minmax opclass and regression test
The minmax opclass was using the wrong support functions when cross-datatypes queries were run. Instead of trying to fix the pg_amproc definitions (which apparently is not possible), use the already correct pg_amop entries instead. This requires jumping through more hoops (read: extra syscache lookups) to obtain the underlying functions to execute, but it is necessary for correctness. Author: Emre Hasegeli, tweaked by Álvaro Review: Andreas Karlsson Also change BrinOpcInfo to record each stored type's typecache entry instead of just the OID. Turns out that the full type cache is necessary in brin_deform_tuple: the original code used the indexed type's byval and typlen properties to extract the stored tuple, which is correct in Minmax; but in other implementations that want to store something different, that's wrong. The realization that this is a bug comes from Emre also, but I did not use his patch. I also adopted Emre's regression test code (with smallish changes), which is more complete.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/brin/brin_minmax.c154
-rw-r--r--src/backend/access/brin/brin_tuple.c6
2 files changed, 92 insertions, 68 deletions
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
index 299d6f7bf6e..d9fa9cd5976 100644
--- a/src/backend/access/brin/brin_minmax.c
+++ b/src/backend/access/brin/brin_minmax.c
@@ -15,35 +15,21 @@
#include "access/brin_tuple.h"
#include "access/skey.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_amop.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
+#include "utils/rel.h"
#include "utils/syscache.h"
-/*
- * Procedure numbers must not collide with BRIN_PROCNUM defines in
- * brin_internal.h. Note we only need inequality functions.
- */
-#define MINMAX_NUM_PROCNUMS 4 /* # support procs we need */
-#define PROCNUM_LESS 11
-#define PROCNUM_LESSEQUAL 12
-#define PROCNUM_GREATEREQUAL 13
-#define PROCNUM_GREATER 14
-
-/*
- * Subtract this from procnum to obtain index in MinmaxOpaque arrays
- * (Must be equal to minimum of private procnums)
- */
-#define PROCNUM_BASE 11
-
typedef struct MinmaxOpaque
{
- FmgrInfo operators[MINMAX_NUM_PROCNUMS];
- bool inited[MINMAX_NUM_PROCNUMS];
+ Oid cached_subtype;
+ FmgrInfo strategy_procinfos[BTMaxStrategyNumber];
} MinmaxOpaque;
-static FmgrInfo *minmax_get_procinfo(BrinDesc *bdesc, uint16 attno,
- uint16 procnum);
+static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
+ Oid subtype, uint16 strategynum);
Datum
@@ -53,8 +39,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS)
BrinOpcInfo *result;
/*
- * opaque->operators is initialized lazily, as indicated by 'inited' which
- * is initialized to all false by palloc0.
+ * opaque->strategy_procinfos is initialized lazily; here it is set to
+ * all-uninitialized by palloc0 which sets fn_oid to InvalidOid.
*/
result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) +
@@ -62,8 +48,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS)
result->oi_nstored = 2;
result->oi_opaque = (MinmaxOpaque *)
MAXALIGN((char *) result + SizeofBrinOpcInfo(2));
- result->oi_typids[0] = typoid;
- result->oi_typids[1] = typoid;
+ result->oi_typcache[0] = result->oi_typcache[1] =
+ lookup_type_cache(typoid, 0);
PG_RETURN_POINTER(result);
}
@@ -122,7 +108,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
* and update them accordingly. First check if it's less than the
* existing minimum.
*/
- cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_LESS);
+ cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
+ BTLessStrategyNumber);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[0]);
if (DatumGetBool(compar))
{
@@ -135,7 +122,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
/*
* And now compare it to the existing maximum.
*/
- cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_GREATER);
+ cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
+ BTGreaterStrategyNumber);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[1]);
if (DatumGetBool(compar))
{
@@ -159,10 +147,12 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
ScanKey key = (ScanKey) PG_GETARG_POINTER(2);
- Oid colloid = PG_GET_COLLATION();
+ Oid colloid = PG_GET_COLLATION(),
+ subtype;
AttrNumber attno;
Datum value;
Datum matches;
+ FmgrInfo *finfo;
Assert(key->sk_attno == column->bv_attno);
@@ -189,18 +179,16 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false);
attno = key->sk_attno;
+ subtype = key->sk_subtype;
value = key->sk_argument;
switch (key->sk_strategy)
{
case BTLessStrategyNumber:
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_LESS),
- colloid, column->bv_values[0], value);
- break;
case BTLessEqualStrategyNumber:
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_LESSEQUAL),
- colloid, column->bv_values[0], value);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+ key->sk_strategy);
+ matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
+ value);
break;
case BTEqualStrategyNumber:
@@ -209,25 +197,24 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
* the current page range if the minimum value in the range <=
* scan key, and the maximum value >= scan key.
*/
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_LESSEQUAL),
- colloid, column->bv_values[0], value);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+ BTLessEqualStrategyNumber);
+ matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
+ value);
if (!DatumGetBool(matches))
break;
/* max() >= scankey */
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_GREATEREQUAL),
- colloid, column->bv_values[1], value);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+ BTGreaterEqualStrategyNumber);
+ matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
+ value);
break;
case BTGreaterEqualStrategyNumber:
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_GREATEREQUAL),
- colloid, column->bv_values[1], value);
- break;
case BTGreaterStrategyNumber:
- matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_GREATER),
- colloid, column->bv_values[1], value);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+ key->sk_strategy);
+ matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
+ value);
break;
default:
/* shouldn't happen */
@@ -252,6 +239,7 @@ brin_minmax_union(PG_FUNCTION_ARGS)
Oid colloid = PG_GET_COLLATION();
AttrNumber attno;
Form_pg_attribute attr;
+ FmgrInfo *finfo;
bool needsadj;
Assert(col_a->bv_attno == col_b->bv_attno);
@@ -284,9 +272,10 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/* Adjust minimum, if B's min is less than A's min */
- needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_LESS),
- colloid, col_b->bv_values[0], col_a->bv_values[0]);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
+ BTLessStrategyNumber);
+ needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[0],
+ col_a->bv_values[0]);
if (needsadj)
{
if (!attr->attbyval)
@@ -296,9 +285,10 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/* Adjust maximum, if B's max is greater than A's max */
- needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
- PROCNUM_GREATER),
- colloid, col_b->bv_values[1], col_a->bv_values[1]);
+ finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
+ BTGreaterStrategyNumber);
+ needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[1],
+ col_a->bv_values[1]);
if (needsadj)
{
if (!attr->attbyval)
@@ -311,27 +301,61 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/*
- * Return the procedure corresponding to the given function support number.
+ * Cache and return the procedure for the given strategy.
*/
-static FmgrInfo *
-minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
+FmgrInfo *
+minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno, Oid subtype,
+ uint16 strategynum)
{
MinmaxOpaque *opaque;
- uint16 basenum = procnum - PROCNUM_BASE;
+
+ Assert(strategynum >= 1 &&
+ strategynum <= BTMaxStrategyNumber);
opaque = (MinmaxOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
/*
- * We cache these in the opaque struct, to avoid repetitive syscache
- * lookups.
+ * We cache the procedures for the previous subtype in the opaque struct,
+ * to avoid repetitive syscache lookups. If the subtype changed,
+ * invalidate all the cached entries.
*/
- if (!opaque->inited[basenum])
+ if (opaque->cached_subtype != subtype)
+ {
+ uint16 i;
+
+ for (i = 1; i <= BTMaxStrategyNumber; i++)
+ opaque->strategy_procinfos[i - 1].fn_oid = InvalidOid;
+ opaque->cached_subtype = subtype;
+ }
+
+ if (opaque->strategy_procinfos[strategynum - 1].fn_oid == InvalidOid)
{
- fmgr_info_copy(&opaque->operators[basenum],
- index_getprocinfo(bdesc->bd_index, attno, procnum),
- bdesc->bd_context);
- opaque->inited[basenum] = true;
+ Form_pg_attribute attr;
+ HeapTuple tuple;
+ Oid opfamily,
+ oprid;
+ bool isNull;
+
+ opfamily = bdesc->bd_index->rd_opfamily[attno - 1];
+ attr = bdesc->bd_tupdesc->attrs[attno - 1];
+ tuple = SearchSysCache4(AMOPSTRATEGY, ObjectIdGetDatum(opfamily),
+ ObjectIdGetDatum(attr->atttypid),
+ ObjectIdGetDatum(subtype),
+ Int16GetDatum(strategynum));
+
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
+ strategynum, attr->atttypid, subtype, opfamily);
+
+ oprid = DatumGetObjectId(SysCacheGetAttr(AMOPSTRATEGY, tuple,
+ Anum_pg_amop_amopopr, &isNull));
+ ReleaseSysCache(tuple);
+ Assert(!isNull && RegProcedureIsValid(oprid));
+
+ fmgr_info_cxt(get_opcode(oprid),
+ &opaque->strategy_procinfos[strategynum - 1],
+ bdesc->bd_context);
}
- return &opaque->operators[basenum];
+ return &opaque->strategy_procinfos[strategynum - 1];
}
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index 08fa998a525..22ce74a4f43 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -68,7 +68,7 @@ brtuple_disk_tupdesc(BrinDesc *brdesc)
{
for (j = 0; j < brdesc->bd_info[i]->oi_nstored; j++)
TupleDescInitEntry(tupdesc, attno++, NULL,
- brdesc->bd_info[i]->oi_typids[j],
+ brdesc->bd_info[i]->oi_typcache[j]->type_id,
-1, 0);
}
@@ -444,8 +444,8 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
for (i = 0; i < brdesc->bd_info[keyno]->oi_nstored; i++)
dtup->bt_columns[keyno].bv_values[i] =
datumCopy(values[valueno++],
- brdesc->bd_tupdesc->attrs[keyno]->attbyval,
- brdesc->bd_tupdesc->attrs[keyno]->attlen);
+ brdesc->bd_info[keyno]->oi_typcache[i]->typbyval,
+ brdesc->bd_info[keyno]->oi_typcache[i]->typlen);
dtup->bt_columns[keyno].bv_hasnulls = hasnulls[keyno];
dtup->bt_columns[keyno].bv_allnulls = false;