78
78
* Portions Copyright (c) 1994, Regents of the University of California
79
79
*
80
80
* IDENTIFICATION
81
- * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.16 2001/05/07 00:43:24 tgl Exp $
81
+ * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.17 2001/06/02 19:01:52 tgl Exp $
82
82
*
83
83
*-------------------------------------------------------------------------
84
84
*/
90
90
#include "catalog/catname.h"
91
91
#include "catalog/pg_amop.h"
92
92
#include "catalog/pg_amproc.h"
93
+ #include "catalog/pg_operator.h"
93
94
#include "miscadmin.h"
94
95
#include "utils/fmgroids.h"
95
96
#include "utils/logtape.h"
96
97
#include "utils/lsyscache.h"
98
+ #include "utils/syscache.h"
97
99
#include "utils/tuplesort.h"
98
100
99
101
/*
@@ -1732,53 +1734,25 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
1732
1734
for (nkey = 0 ; nkey < state -> nKeys ; nkey ++ )
1733
1735
{
1734
1736
ScanKey scanKey = state -> scanKeys + nkey ;
1735
- SortFunctionKind fnKind = state -> sortFnKinds [nkey ];
1736
1737
AttrNumber attno = scanKey -> sk_attno ;
1737
- Datum lattr ,
1738
- rattr ;
1738
+ Datum datum1 ,
1739
+ datum2 ;
1739
1740
bool isnull1 ,
1740
1741
isnull2 ;
1742
+ int32 compare ;
1741
1743
1742
- lattr = heap_getattr (ltup , attno , tupDesc , & isnull1 );
1743
- rattr = heap_getattr (rtup , attno , tupDesc , & isnull2 );
1744
+ datum1 = heap_getattr (ltup , attno , tupDesc , & isnull1 );
1745
+ datum2 = heap_getattr (rtup , attno , tupDesc , & isnull2 );
1744
1746
1745
- if (isnull1 )
1746
- {
1747
- if (!isnull2 )
1748
- return 1 ; /* NULL sorts after non-NULL */
1749
- }
1750
- else if (isnull2 )
1751
- return -1 ;
1752
- else
1747
+ compare = ApplySortFunction (& scanKey -> sk_func ,
1748
+ state -> sortFnKinds [nkey ],
1749
+ datum1 , isnull1 , datum2 , isnull2 );
1750
+ if (compare != 0 )
1753
1751
{
1754
- int32 compare ;
1755
-
1756
- if (fnKind == SORTFUNC_LT )
1757
- {
1758
- if (DatumGetBool (FunctionCall2 (& scanKey -> sk_func ,
1759
- lattr , rattr )))
1760
- compare = -1 ; /* a < b */
1761
- else if (DatumGetBool (FunctionCall2 (& scanKey -> sk_func ,
1762
- rattr , lattr )))
1763
- compare = 1 ; /* a > b */
1764
- else
1765
- compare = 0 ;
1766
- }
1767
- else
1768
- {
1769
- /* sort function is CMP or REVCMP */
1770
- compare = DatumGetInt32 (FunctionCall2 (& scanKey -> sk_func ,
1771
- lattr , rattr ));
1772
- if (fnKind == SORTFUNC_REVCMP )
1773
- compare = - compare ;
1774
- }
1775
-
1776
- if (compare != 0 )
1777
- {
1778
- if (scanKey -> sk_flags & SK_COMMUTE )
1779
- compare = - compare ;
1780
- return compare ;
1781
- }
1752
+ /* dead code? SK_COMMUTE can't actually be set here, can it? */
1753
+ if (scanKey -> sk_flags & SK_COMMUTE )
1754
+ compare = - compare ;
1755
+ return compare ;
1782
1756
}
1783
1757
}
1784
1758
@@ -1861,7 +1835,6 @@ tuplesize_heap(Tuplesortstate *state, void *tup)
1861
1835
static int
1862
1836
comparetup_index (Tuplesortstate * state , const void * a , const void * b )
1863
1837
{
1864
-
1865
1838
/*
1866
1839
* This is almost the same as _bt_tuplecompare(), but we need to keep
1867
1840
* track of whether any null fields are present.
@@ -1880,41 +1853,28 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
1880
1853
for (i = 1 ; i <= keysz ; i ++ )
1881
1854
{
1882
1855
ScanKey entry = & scankey [i - 1 ];
1883
- Datum attrDatum1 ,
1884
- attrDatum2 ;
1885
- bool isFirstNull ,
1886
- isSecondNull ;
1856
+ Datum datum1 ,
1857
+ datum2 ;
1858
+ bool isnull1 ,
1859
+ isnull2 ;
1887
1860
int32 compare ;
1888
1861
1889
- attrDatum1 = index_getattr (tuple1 , i , tupDes , & isFirstNull );
1890
- attrDatum2 = index_getattr (tuple2 , i , tupDes , & isSecondNull );
1862
+ datum1 = index_getattr (tuple1 , i , tupDes , & isnull1 );
1863
+ datum2 = index_getattr (tuple2 , i , tupDes , & isnull2 );
1891
1864
1892
1865
/* see comments about NULLs handling in btbuild */
1893
- if (isFirstNull ) /* attr in tuple1 is NULL */
1894
- {
1895
- if (isSecondNull ) /* attr in tuple2 is NULL too */
1896
- {
1897
- compare = 0 ;
1898
- equal_hasnull = true;
1899
- }
1900
- else
1901
- compare = 1 ; /* NULL ">" not-NULL */
1902
- }
1903
- else if (isSecondNull ) /* attr in tuple1 is NOT_NULL and */
1904
- { /* attr in tuple2 is NULL */
1905
- compare = -1 ; /* not-NULL "<" NULL */
1906
- }
1907
- else
1908
- {
1909
- /* the comparison function is always of CMP type */
1910
- compare = DatumGetInt32 (FunctionCall2 (& entry -> sk_func ,
1911
- attrDatum1 ,
1912
- attrDatum2 ));
1913
- }
1866
+
1867
+ /* the comparison function is always of CMP type */
1868
+ compare = ApplySortFunction (& entry -> sk_func , SORTFUNC_CMP ,
1869
+ datum1 , isnull1 , datum2 , isnull2 );
1914
1870
1915
1871
if (compare != 0 )
1916
1872
return (int ) compare ; /* done when we find unequal
1917
1873
* attributes */
1874
+
1875
+ /* they are equal, so we only need to examine one null flag */
1876
+ if (isnull1 )
1877
+ equal_hasnull = true;
1918
1878
}
1919
1879
1920
1880
/*
@@ -2002,35 +1962,9 @@ comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
2002
1962
DatumTuple * ltup = (DatumTuple * ) a ;
2003
1963
DatumTuple * rtup = (DatumTuple * ) b ;
2004
1964
2005
- if (ltup -> isNull )
2006
- {
2007
- if (!rtup -> isNull )
2008
- return 1 ; /* NULL sorts after non-NULL */
2009
- return 0 ;
2010
- }
2011
- else if (rtup -> isNull )
2012
- return -1 ;
2013
- else if (state -> sortFnKind == SORTFUNC_LT )
2014
- {
2015
- if (DatumGetBool (FunctionCall2 (& state -> sortOpFn ,
2016
- ltup -> val , rtup -> val )))
2017
- return -1 ; /* a < b */
2018
- if (DatumGetBool (FunctionCall2 (& state -> sortOpFn ,
2019
- rtup -> val , ltup -> val )))
2020
- return 1 ; /* a > b */
2021
- return 0 ;
2022
- }
2023
- else
2024
- {
2025
- /* sort function is CMP or REVCMP */
2026
- int32 compare ;
2027
-
2028
- compare = DatumGetInt32 (FunctionCall2 (& state -> sortOpFn ,
2029
- ltup -> val , rtup -> val ));
2030
- if (state -> sortFnKind == SORTFUNC_REVCMP )
2031
- compare = - compare ;
2032
- return compare ;
2033
- }
1965
+ return ApplySortFunction (& state -> sortOpFn , state -> sortFnKind ,
1966
+ ltup -> val , ltup -> isNull ,
1967
+ rtup -> val , rtup -> isNull );
2034
1968
}
2035
1969
2036
1970
static void *
@@ -2123,6 +2057,7 @@ SelectSortFunction(Oid sortOperator,
2123
2057
HeapScanDesc scan ;
2124
2058
ScanKeyData skey [3 ];
2125
2059
HeapTuple tuple ;
2060
+ Form_pg_operator optup ;
2126
2061
Oid opclass = InvalidOid ;
2127
2062
2128
2063
/*
@@ -2207,11 +2142,97 @@ SelectSortFunction(Oid sortOperator,
2207
2142
return ;
2208
2143
}
2209
2144
2210
- /* Can't find a comparator, so use the operator as-is */
2211
-
2212
- * kind = SORTFUNC_LT ;
2213
- * sortFunction = get_opcode (sortOperator );
2214
- if (!RegProcedureIsValid (* sortFunction ))
2215
- elog (ERROR , "SelectSortFunction: operator %u has no implementation" ,
2145
+ /*
2146
+ * Can't find a comparator, so use the operator as-is. Decide whether
2147
+ * it is forward or reverse sort by looking at its name (grotty, but
2148
+ * this only matters for deciding which end NULLs should get sorted to).
2149
+ */
2150
+ tuple = SearchSysCache (OPEROID ,
2151
+ ObjectIdGetDatum (sortOperator ),
2152
+ 0 , 0 , 0 );
2153
+ if (!HeapTupleIsValid (tuple ))
2154
+ elog (ERROR , "SelectSortFunction: cache lookup failed for operator %u" ,
2216
2155
sortOperator );
2156
+ optup = (Form_pg_operator ) GETSTRUCT (tuple );
2157
+ if (strcmp (NameStr (optup -> oprname ), ">" ) == 0 )
2158
+ * kind = SORTFUNC_REVLT ;
2159
+ else
2160
+ * kind = SORTFUNC_LT ;
2161
+ * sortFunction = optup -> oprcode ;
2162
+ ReleaseSysCache (tuple );
2163
+
2164
+ Assert (RegProcedureIsValid (* sortFunction ));
2165
+ }
2166
+
2167
+ /*
2168
+ * Apply a sort function (by now converted to fmgr lookup form)
2169
+ * and return a 3-way comparison result. This takes care of handling
2170
+ * NULLs and sort ordering direction properly.
2171
+ */
2172
+ int32
2173
+ ApplySortFunction (FmgrInfo * sortFunction , SortFunctionKind kind ,
2174
+ Datum datum1 , bool isNull1 ,
2175
+ Datum datum2 , bool isNull2 )
2176
+ {
2177
+ switch (kind )
2178
+ {
2179
+ case SORTFUNC_LT :
2180
+ if (isNull1 )
2181
+ {
2182
+ if (isNull2 )
2183
+ return 0 ;
2184
+ return 1 ; /* NULL sorts after non-NULL */
2185
+ }
2186
+ if (isNull2 )
2187
+ return -1 ;
2188
+ if (DatumGetBool (FunctionCall2 (sortFunction , datum1 , datum2 )))
2189
+ return -1 ; /* a < b */
2190
+ if (DatumGetBool (FunctionCall2 (sortFunction , datum2 , datum1 )))
2191
+ return 1 ; /* a > b */
2192
+ return 0 ;
2193
+
2194
+ case SORTFUNC_REVLT :
2195
+ /* We reverse the ordering of NULLs, but not the operator */
2196
+ if (isNull1 )
2197
+ {
2198
+ if (isNull2 )
2199
+ return 0 ;
2200
+ return -1 ; /* NULL sorts before non-NULL */
2201
+ }
2202
+ if (isNull2 )
2203
+ return 1 ;
2204
+ if (DatumGetBool (FunctionCall2 (sortFunction , datum1 , datum2 )))
2205
+ return -1 ; /* a < b */
2206
+ if (DatumGetBool (FunctionCall2 (sortFunction , datum2 , datum1 )))
2207
+ return 1 ; /* a > b */
2208
+ return 0 ;
2209
+
2210
+ case SORTFUNC_CMP :
2211
+ if (isNull1 )
2212
+ {
2213
+ if (isNull2 )
2214
+ return 0 ;
2215
+ return 1 ; /* NULL sorts after non-NULL */
2216
+ }
2217
+ if (isNull2 )
2218
+ return -1 ;
2219
+ return DatumGetInt32 (FunctionCall2 (sortFunction ,
2220
+ datum1 , datum2 ));
2221
+
2222
+ case SORTFUNC_REVCMP :
2223
+ if (isNull1 )
2224
+ {
2225
+ if (isNull2 )
2226
+ return 0 ;
2227
+ return -1 ; /* NULL sorts before non-NULL */
2228
+ }
2229
+ if (isNull2 )
2230
+ return 1 ;
2231
+ return - DatumGetInt32 (FunctionCall2 (sortFunction ,
2232
+ datum1 , datum2 ));
2233
+
2234
+ default :
2235
+ elog (ERROR , "Invalid SortFunctionKind %d" , (int ) kind );
2236
+ return 0 ; /* can't get here, but keep compiler quiet */
2237
+ }
2217
2238
}
0 commit comments