diff options
author | Alvaro Herrera | 2015-05-07 16:02:22 +0000 |
---|---|---|
committer | Alvaro Herrera | 2015-05-07 16:02:22 +0000 |
commit | db5f98ab4fa44bc563ec62d7b1aada4fc276d9b2 (patch) | |
tree | 0b05cf901eed7ffab21935a5f6491e8215bbe670 /src/backend | |
parent | 7be47c56af3d3013955c91c2877c08f2a0e3e6a2 (diff) |
Improve BRIN infra, minmax opclass and regression test
The minmax opclass was using the wrong support functions when
cross-datatypes queries were run. Instead of trying to fix the
pg_amproc definitions (which apparently is not possible), use the
already correct pg_amop entries instead. This requires jumping through
more hoops (read: extra syscache lookups) to obtain the underlying
functions to execute, but it is necessary for correctness.
Author: Emre Hasegeli, tweaked by Álvaro
Review: Andreas Karlsson
Also change BrinOpcInfo to record each stored type's typecache entry
instead of just the OID. Turns out that the full type cache is
necessary in brin_deform_tuple: the original code used the indexed
type's byval and typlen properties to extract the stored tuple, which is
correct in Minmax; but in other implementations that want to store
something different, that's wrong. The realization that this is a bug
comes from Emre also, but I did not use his patch.
I also adopted Emre's regression test code (with smallish changes),
which is more complete.
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/brin/brin_minmax.c | 154 | ||||
-rw-r--r-- | src/backend/access/brin/brin_tuple.c | 6 |
2 files changed, 92 insertions, 68 deletions
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c index 299d6f7bf6e..d9fa9cd5976 100644 --- a/src/backend/access/brin/brin_minmax.c +++ b/src/backend/access/brin/brin_minmax.c @@ -15,35 +15,21 @@ #include "access/brin_tuple.h" #include "access/skey.h" #include "catalog/pg_type.h" +#include "catalog/pg_amop.h" #include "utils/datum.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "utils/syscache.h" -/* - * Procedure numbers must not collide with BRIN_PROCNUM defines in - * brin_internal.h. Note we only need inequality functions. - */ -#define MINMAX_NUM_PROCNUMS 4 /* # support procs we need */ -#define PROCNUM_LESS 11 -#define PROCNUM_LESSEQUAL 12 -#define PROCNUM_GREATEREQUAL 13 -#define PROCNUM_GREATER 14 - -/* - * Subtract this from procnum to obtain index in MinmaxOpaque arrays - * (Must be equal to minimum of private procnums) - */ -#define PROCNUM_BASE 11 - typedef struct MinmaxOpaque { - FmgrInfo operators[MINMAX_NUM_PROCNUMS]; - bool inited[MINMAX_NUM_PROCNUMS]; + Oid cached_subtype; + FmgrInfo strategy_procinfos[BTMaxStrategyNumber]; } MinmaxOpaque; -static FmgrInfo *minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, - uint16 procnum); +static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno, + Oid subtype, uint16 strategynum); Datum @@ -53,8 +39,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS) BrinOpcInfo *result; /* - * opaque->operators is initialized lazily, as indicated by 'inited' which - * is initialized to all false by palloc0. + * opaque->strategy_procinfos is initialized lazily; here it is set to + * all-uninitialized by palloc0 which sets fn_oid to InvalidOid. */ result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) + @@ -62,8 +48,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS) result->oi_nstored = 2; result->oi_opaque = (MinmaxOpaque *) MAXALIGN((char *) result + SizeofBrinOpcInfo(2)); - result->oi_typids[0] = typoid; - result->oi_typids[1] = typoid; + result->oi_typcache[0] = result->oi_typcache[1] = + lookup_type_cache(typoid, 0); PG_RETURN_POINTER(result); } @@ -122,7 +108,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS) * and update them accordingly. First check if it's less than the * existing minimum. */ - cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_LESS); + cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid, + BTLessStrategyNumber); compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[0]); if (DatumGetBool(compar)) { @@ -135,7 +122,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS) /* * And now compare it to the existing maximum. */ - cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_GREATER); + cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid, + BTGreaterStrategyNumber); compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[1]); if (DatumGetBool(compar)) { @@ -159,10 +147,12 @@ brin_minmax_consistent(PG_FUNCTION_ARGS) BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0); BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1); ScanKey key = (ScanKey) PG_GETARG_POINTER(2); - Oid colloid = PG_GET_COLLATION(); + Oid colloid = PG_GET_COLLATION(), + subtype; AttrNumber attno; Datum value; Datum matches; + FmgrInfo *finfo; Assert(key->sk_attno == column->bv_attno); @@ -189,18 +179,16 @@ brin_minmax_consistent(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); attno = key->sk_attno; + subtype = key->sk_subtype; value = key->sk_argument; switch (key->sk_strategy) { case BTLessStrategyNumber: - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_LESS), - colloid, column->bv_values[0], value); - break; case BTLessEqualStrategyNumber: - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_LESSEQUAL), - colloid, column->bv_values[0], value); + finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype, + key->sk_strategy); + matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0], + value); break; case BTEqualStrategyNumber: @@ -209,25 +197,24 @@ brin_minmax_consistent(PG_FUNCTION_ARGS) * the current page range if the minimum value in the range <= * scan key, and the maximum value >= scan key. */ - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_LESSEQUAL), - colloid, column->bv_values[0], value); + finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype, + BTLessEqualStrategyNumber); + matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0], + value); if (!DatumGetBool(matches)) break; /* max() >= scankey */ - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_GREATEREQUAL), - colloid, column->bv_values[1], value); + finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype, + BTGreaterEqualStrategyNumber); + matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1], + value); break; case BTGreaterEqualStrategyNumber: - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_GREATEREQUAL), - colloid, column->bv_values[1], value); - break; case BTGreaterStrategyNumber: - matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_GREATER), - colloid, column->bv_values[1], value); + finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype, + key->sk_strategy); + matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1], + value); break; default: /* shouldn't happen */ @@ -252,6 +239,7 @@ brin_minmax_union(PG_FUNCTION_ARGS) Oid colloid = PG_GET_COLLATION(); AttrNumber attno; Form_pg_attribute attr; + FmgrInfo *finfo; bool needsadj; Assert(col_a->bv_attno == col_b->bv_attno); @@ -284,9 +272,10 @@ brin_minmax_union(PG_FUNCTION_ARGS) } /* Adjust minimum, if B's min is less than A's min */ - needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_LESS), - colloid, col_b->bv_values[0], col_a->bv_values[0]); + finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid, + BTLessStrategyNumber); + needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[0], + col_a->bv_values[0]); if (needsadj) { if (!attr->attbyval) @@ -296,9 +285,10 @@ brin_minmax_union(PG_FUNCTION_ARGS) } /* Adjust maximum, if B's max is greater than A's max */ - needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno, - PROCNUM_GREATER), - colloid, col_b->bv_values[1], col_a->bv_values[1]); + finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid, + BTGreaterStrategyNumber); + needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[1], + col_a->bv_values[1]); if (needsadj) { if (!attr->attbyval) @@ -311,27 +301,61 @@ brin_minmax_union(PG_FUNCTION_ARGS) } /* - * Return the procedure corresponding to the given function support number. + * Cache and return the procedure for the given strategy. */ -static FmgrInfo * -minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum) +FmgrInfo * +minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno, Oid subtype, + uint16 strategynum) { MinmaxOpaque *opaque; - uint16 basenum = procnum - PROCNUM_BASE; + + Assert(strategynum >= 1 && + strategynum <= BTMaxStrategyNumber); opaque = (MinmaxOpaque *) bdesc->bd_info[attno - 1]->oi_opaque; /* - * We cache these in the opaque struct, to avoid repetitive syscache - * lookups. + * We cache the procedures for the previous subtype in the opaque struct, + * to avoid repetitive syscache lookups. If the subtype changed, + * invalidate all the cached entries. */ - if (!opaque->inited[basenum]) + if (opaque->cached_subtype != subtype) + { + uint16 i; + + for (i = 1; i <= BTMaxStrategyNumber; i++) + opaque->strategy_procinfos[i - 1].fn_oid = InvalidOid; + opaque->cached_subtype = subtype; + } + + if (opaque->strategy_procinfos[strategynum - 1].fn_oid == InvalidOid) { - fmgr_info_copy(&opaque->operators[basenum], - index_getprocinfo(bdesc->bd_index, attno, procnum), - bdesc->bd_context); - opaque->inited[basenum] = true; + Form_pg_attribute attr; + HeapTuple tuple; + Oid opfamily, + oprid; + bool isNull; + + opfamily = bdesc->bd_index->rd_opfamily[attno - 1]; + attr = bdesc->bd_tupdesc->attrs[attno - 1]; + tuple = SearchSysCache4(AMOPSTRATEGY, ObjectIdGetDatum(opfamily), + ObjectIdGetDatum(attr->atttypid), + ObjectIdGetDatum(subtype), + Int16GetDatum(strategynum)); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + strategynum, attr->atttypid, subtype, opfamily); + + oprid = DatumGetObjectId(SysCacheGetAttr(AMOPSTRATEGY, tuple, + Anum_pg_amop_amopopr, &isNull)); + ReleaseSysCache(tuple); + Assert(!isNull && RegProcedureIsValid(oprid)); + + fmgr_info_cxt(get_opcode(oprid), + &opaque->strategy_procinfos[strategynum - 1], + bdesc->bd_context); } - return &opaque->operators[basenum]; + return &opaque->strategy_procinfos[strategynum - 1]; } diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c index 08fa998a525..22ce74a4f43 100644 --- a/src/backend/access/brin/brin_tuple.c +++ b/src/backend/access/brin/brin_tuple.c @@ -68,7 +68,7 @@ brtuple_disk_tupdesc(BrinDesc *brdesc) { for (j = 0; j < brdesc->bd_info[i]->oi_nstored; j++) TupleDescInitEntry(tupdesc, attno++, NULL, - brdesc->bd_info[i]->oi_typids[j], + brdesc->bd_info[i]->oi_typcache[j]->type_id, -1, 0); } @@ -444,8 +444,8 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple) for (i = 0; i < brdesc->bd_info[keyno]->oi_nstored; i++) dtup->bt_columns[keyno].bv_values[i] = datumCopy(values[valueno++], - brdesc->bd_tupdesc->attrs[keyno]->attbyval, - brdesc->bd_tupdesc->attrs[keyno]->attlen); + brdesc->bd_info[keyno]->oi_typcache[i]->typbyval, + brdesc->bd_info[keyno]->oi_typcache[i]->typlen); dtup->bt_columns[keyno].bv_hasnulls = hasnulls[keyno]; dtup->bt_columns[keyno].bv_allnulls = false; |