14
14
* Copyright (c) 1998-2010, PostgreSQL Global Development Group
15
15
*
16
16
* IDENTIFICATION
17
- * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.124 2010/07/30 04:30:23 rhaas Exp $
17
+ * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.125 2010/08/03 23:09:29 rhaas Exp $
18
18
*
19
19
*-------------------------------------------------------------------------
20
20
*/
35
35
#include "utils/int8.h"
36
36
#include "utils/numeric.h"
37
37
38
- /*
39
- * Sign values and macros to deal with packing/unpacking n_sign_dscale
40
- */
41
- #define NUMERIC_SIGN_MASK 0xC000
42
- #define NUMERIC_POS 0x0000
43
- #define NUMERIC_NEG 0x4000
44
- #define NUMERIC_NAN 0xC000
45
- #define NUMERIC_DSCALE_MASK 0x3FFF
46
- #define NUMERIC_SIGN (n ) ((n)->n_sign_dscale & NUMERIC_SIGN_MASK)
47
- #define NUMERIC_DSCALE (n ) ((n)->n_sign_dscale & NUMERIC_DSCALE_MASK)
48
- #define NUMERIC_IS_NAN (n ) (NUMERIC_SIGN(n) != NUMERIC_POS && \
49
- NUMERIC_SIGN(n) != NUMERIC_NEG)
50
- #define NUMERIC_HDRSZ (VARHDRSZ + sizeof(uint16) + sizeof(int16))
51
-
52
-
53
- /*
54
- * The Numeric data type stored in the database
55
- *
56
- * NOTE: by convention, values in the packed form have been stripped of
57
- * all leading and trailing zero digits (where a "digit" is of base NBASE).
58
- * In particular, if the value is zero, there will be no digits at all!
59
- * The weight is arbitrary in that case, but we normally set it to zero.
60
- */
61
- struct NumericData
62
- {
63
- int32 vl_len_ ; /* varlena header (do not touch directly!) */
64
- uint16 n_sign_dscale ; /* Sign + display scale */
65
- int16 n_weight ; /* Weight of 1st digit */
66
- char n_data [1 ]; /* Digits (really array of NumericDigit) */
67
- };
68
-
69
-
70
38
/* ----------
71
39
* Uncomment the following to enable compilation of dump_numeric()
72
40
* and dump_var() and to get a dump of any result produced by make_result().
@@ -120,6 +88,122 @@ typedef signed char NumericDigit;
120
88
typedef int16 NumericDigit ;
121
89
#endif
122
90
91
+ /*
92
+ * The Numeric type as stored on disk.
93
+ *
94
+ * If the high bits of the first word of a NumericChoice (n_header, or
95
+ * n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the
96
+ * numeric follows the NumericShort format; if they are NUMERIC_POS or
97
+ * NUMERIC_NEG, it follows the NumericLong format. If they are NUMERIC_NAN,
98
+ * it is a NaN. We currently always store a NaN using just two bytes (i.e.
99
+ * only n_header), but previous releases used only the NumericLong format,
100
+ * so we might find 4-byte NaNs on disk if a database has been migrated using
101
+ * pg_upgrade. In either case, when the high bits indicate a NaN, the
102
+ * remaining bits are never examined. Currently, we always initialize these
103
+ * to zero, but it might be possible to use them for some other purpose in
104
+ * the future.
105
+ *
106
+ * In the NumericShort format, the remaining 14 bits of the header word
107
+ * (n_short.n_header) are allocated as follows: 1 for sign (positive or
108
+ * negative), 6 for dynamic scale, and 7 for weight. In practice, most
109
+ * commonly-encountered values can be represented this way.
110
+ *
111
+ * In the NumericLong format, the remaining 14 bits of the header word
112
+ * (n_long.n_sign_dscale) represent the display scale; and the weight is
113
+ * stored separately in n_weight.
114
+ *
115
+ * NOTE: by convention, values in the packed form have been stripped of
116
+ * all leading and trailing zero digits (where a "digit" is of base NBASE).
117
+ * In particular, if the value is zero, there will be no digits at all!
118
+ * The weight is arbitrary in that case, but we normally set it to zero.
119
+ */
120
+
121
+ struct NumericShort
122
+ {
123
+ uint16 n_header ; /* Sign + display scale + weight */
124
+ NumericDigit n_data [1 ]; /* Digits */
125
+ };
126
+
127
+ struct NumericLong
128
+ {
129
+ uint16 n_sign_dscale ; /* Sign + display scale */
130
+ int16 n_weight ; /* Weight of 1st digit */
131
+ NumericDigit n_data [1 ]; /* Digits */
132
+ };
133
+
134
+ union NumericChoice
135
+ {
136
+ uint16 n_header ; /* Header word */
137
+ struct NumericLong n_long ; /* Long form (4-byte header) */
138
+ struct NumericShort n_short ; /* Short form (2-byte header) */
139
+ };
140
+
141
+ struct NumericData
142
+ {
143
+ int32 vl_len_ ; /* varlena header (do not touch directly!) */
144
+ union NumericChoice choice ; /* choice of format */
145
+ };
146
+
147
+
148
+ /*
149
+ * Interpretation of high bits.
150
+ */
151
+
152
+ #define NUMERIC_SIGN_MASK 0xC000
153
+ #define NUMERIC_POS 0x0000
154
+ #define NUMERIC_NEG 0x4000
155
+ #define NUMERIC_SHORT 0x8000
156
+ #define NUMERIC_NAN 0xC000
157
+
158
+ #define NUMERIC_FLAGBITS (n ) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
159
+ #define NUMERIC_IS_NAN (n ) (NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
160
+ #define NUMERIC_IS_SHORT (n ) (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
161
+
162
+ #define NUMERIC_HDRSZ (VARHDRSZ + sizeof(uint16) + sizeof(int16))
163
+ #define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16))
164
+
165
+ /*
166
+ * If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header;
167
+ * otherwise, we want the long one. Instead of testing against each value, we
168
+ * can just look at the high bit, for a slight efficiency gain.
169
+ */
170
+ #define NUMERIC_HEADER_SIZE (n ) \
171
+ (VARHDRSZ + sizeof(uint16) + \
172
+ (((NUMERIC_FLAGBITS(n) & 0x8000) == 0) ? sizeof(int16) : 0))
173
+
174
+ /*
175
+ * Short format definitions.
176
+ */
177
+
178
+ #define NUMERIC_SHORT_SIGN_MASK 0x2000
179
+ #define NUMERIC_SHORT_DSCALE_MASK 0x1F80
180
+ #define NUMERIC_SHORT_DSCALE_SHIFT 7
181
+ #define NUMERIC_SHORT_DSCALE_MAX \
182
+ (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
183
+ #define NUMERIC_SHORT_WEIGHT_SIGN_MASK 0x0040
184
+ #define NUMERIC_SHORT_WEIGHT_MASK 0x003F
185
+ #define NUMERIC_SHORT_WEIGHT_MAX NUMERIC_SHORT_WEIGHT_MASK
186
+ #define NUMERIC_SHORT_WEIGHT_MIN (-(NUMERIC_SHORT_WEIGHT_MASK+1))
187
+
188
+ /*
189
+ * Extract sign, display scale, weight.
190
+ */
191
+
192
+ #define NUMERIC_DSCALE_MASK 0x3FFF
193
+
194
+ #define NUMERIC_SIGN (n ) \
195
+ (NUMERIC_IS_SHORT(n) ? \
196
+ (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
197
+ NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
198
+ #define NUMERIC_DSCALE (n ) (NUMERIC_IS_SHORT((n)) ? \
199
+ ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
200
+ >> NUMERIC_SHORT_DSCALE_SHIFT \
201
+ : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
202
+ #define NUMERIC_WEIGHT (n ) (NUMERIC_IS_SHORT((n)) ? \
203
+ (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
204
+ ~NUMERIC_SHORT_WEIGHT_MASK : 0) \
205
+ | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
206
+ : ((n)->choice.n_long.n_weight))
123
207
124
208
/* ----------
125
209
* NumericVar is the format we use for arithmetic. The digit-array part
@@ -266,9 +350,14 @@ static void dump_var(const char *str, NumericVar *var);
266
350
267
351
#define init_var (v ) MemSetAligned(v, 0, sizeof(NumericVar))
268
352
269
- #define NUMERIC_DIGITS (num ) ((NumericDigit *)(num)->n_data)
353
+ #define NUMERIC_DIGITS (num ) (NUMERIC_IS_SHORT(num) ? \
354
+ (num)->choice.n_short.n_data : (num)->choice.n_long.n_data)
270
355
#define NUMERIC_NDIGITS (num ) \
271
- ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
356
+ ((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
357
+ #define NUMERIC_CAN_BE_SHORT (scale ,weight ) \
358
+ ((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
359
+ (weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
360
+ (weight) >= NUMERIC_SHORT_WEIGHT_MIN)
272
361
273
362
static void alloc_var (NumericVar * var , int ndigits );
274
363
static void free_var (NumericVar * var );
@@ -652,15 +741,23 @@ numeric (PG_FUNCTION_ARGS)
652
741
/*
653
742
* If the number is certainly in bounds and due to the target scale no
654
743
* rounding could be necessary, just make a copy of the input and modify
655
- * its scale fields. (Note we assume the existing dscale is honest...)
744
+ * its scale fields, unless the larger scale forces us to abandon the
745
+ * short representation. (Note we assume the existing dscale is honest...)
656
746
*/
657
- ddigits = (num -> n_weight + 1 ) * DEC_DIGITS ;
658
- if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE (num ))
747
+ ddigits = (NUMERIC_WEIGHT (num ) + 1 ) * DEC_DIGITS ;
748
+ if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE (num )
749
+ && (NUMERIC_CAN_BE_SHORT (scale , NUMERIC_WEIGHT (num ))
750
+ || !NUMERIC_IS_SHORT (num )))
659
751
{
660
752
new = (Numeric ) palloc (VARSIZE (num ));
661
753
memcpy (new , num , VARSIZE (num ));
662
- new -> n_sign_dscale = NUMERIC_SIGN (new ) |
663
- ((uint16 ) scale & NUMERIC_DSCALE_MASK );
754
+ if (NUMERIC_IS_SHORT (num ))
755
+ new -> choice .n_short .n_header =
756
+ (num -> choice .n_short .n_header & ~NUMERIC_SHORT_DSCALE_MASK )
757
+ | (scale << NUMERIC_SHORT_DSCALE_SHIFT );
758
+ else
759
+ new -> choice .n_long .n_sign_dscale = NUMERIC_SIGN (new ) |
760
+ ((uint16 ) scale & NUMERIC_DSCALE_MASK );
664
761
PG_RETURN_NUMERIC (new );
665
762
}
666
763
@@ -766,7 +863,11 @@ numeric_abs(PG_FUNCTION_ARGS)
766
863
res = (Numeric ) palloc (VARSIZE (num ));
767
864
memcpy (res , num , VARSIZE (num ));
768
865
769
- res -> n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE (num );
866
+ if (NUMERIC_IS_SHORT (num ))
867
+ res -> choice .n_short .n_header =
868
+ num -> choice .n_short .n_header & ~NUMERIC_SHORT_SIGN_MASK ;
869
+ else
870
+ res -> choice .n_long .n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE (num );
770
871
771
872
PG_RETURN_NUMERIC (res );
772
873
}
@@ -795,13 +896,18 @@ numeric_uminus(PG_FUNCTION_ARGS)
795
896
* we can identify a ZERO by the fact that there are no digits at all. Do
796
897
* nothing to a zero.
797
898
*/
798
- if (VARSIZE (num ) != NUMERIC_HDRSZ )
899
+ if (NUMERIC_NDIGITS (num ) != 0 )
799
900
{
800
901
/* Else, flip the sign */
801
- if (NUMERIC_SIGN (num ) == NUMERIC_POS )
802
- res -> n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE (num );
902
+ if (NUMERIC_IS_SHORT (num ))
903
+ res -> choice .n_short .n_header =
904
+ num -> choice .n_short .n_header ^ NUMERIC_SHORT_SIGN_MASK ;
905
+ else if (NUMERIC_SIGN (num ) == NUMERIC_POS )
906
+ res -> choice .n_long .n_sign_dscale =
907
+ NUMERIC_NEG | NUMERIC_DSCALE (num );
803
908
else
804
- res -> n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE (num );
909
+ res -> choice .n_long .n_sign_dscale =
910
+ NUMERIC_POS | NUMERIC_DSCALE (num );
805
911
}
806
912
807
913
PG_RETURN_NUMERIC (res );
@@ -845,7 +951,7 @@ numeric_sign(PG_FUNCTION_ARGS)
845
951
* The packed format is known to be totally zero digit trimmed always. So
846
952
* we can identify a ZERO by the fact that there are no digits at all.
847
953
*/
848
- if (VARSIZE (num ) == NUMERIC_HDRSZ )
954
+ if (NUMERIC_NDIGITS (num ) == 0 )
849
955
set_var_from_var (& const_zero , & result );
850
956
else
851
957
{
@@ -1283,9 +1389,9 @@ cmp_numerics(Numeric num1, Numeric num2)
1283
1389
else
1284
1390
{
1285
1391
result = cmp_var_common (NUMERIC_DIGITS (num1 ), NUMERIC_NDIGITS (num1 ),
1286
- num1 -> n_weight , NUMERIC_SIGN (num1 ),
1392
+ NUMERIC_WEIGHT ( num1 ) , NUMERIC_SIGN (num1 ),
1287
1393
NUMERIC_DIGITS (num2 ), NUMERIC_NDIGITS (num2 ),
1288
- num2 -> n_weight , NUMERIC_SIGN (num2 ));
1394
+ NUMERIC_WEIGHT ( num2 ) , NUMERIC_SIGN (num2 ));
1289
1395
}
1290
1396
1291
1397
return result ;
@@ -1302,12 +1408,13 @@ hash_numeric(PG_FUNCTION_ARGS)
1302
1408
int end_offset ;
1303
1409
int i ;
1304
1410
int hash_len ;
1411
+ NumericDigit * digits ;
1305
1412
1306
1413
/* If it's NaN, don't try to hash the rest of the fields */
1307
1414
if (NUMERIC_IS_NAN (key ))
1308
1415
PG_RETURN_UINT32 (0 );
1309
1416
1310
- weight = key -> n_weight ;
1417
+ weight = NUMERIC_WEIGHT ( key ) ;
1311
1418
start_offset = 0 ;
1312
1419
end_offset = 0 ;
1313
1420
@@ -1317,9 +1424,10 @@ hash_numeric(PG_FUNCTION_ARGS)
1317
1424
* zeros are suppressed, but we're paranoid. Note that we measure the
1318
1425
* starting and ending offsets in units of NumericDigits, not bytes.
1319
1426
*/
1427
+ digits = NUMERIC_DIGITS (key );
1320
1428
for (i = 0 ; i < NUMERIC_NDIGITS (key ); i ++ )
1321
1429
{
1322
- if (NUMERIC_DIGITS ( key ) [i ] != (NumericDigit ) 0 )
1430
+ if (digits [i ] != (NumericDigit ) 0 )
1323
1431
break ;
1324
1432
1325
1433
start_offset ++ ;
@@ -1340,7 +1448,7 @@ hash_numeric(PG_FUNCTION_ARGS)
1340
1448
1341
1449
for (i = NUMERIC_NDIGITS (key ) - 1 ; i >= 0 ; i -- )
1342
1450
{
1343
- if (NUMERIC_DIGITS ( key ) [i ] != (NumericDigit ) 0 )
1451
+ if (digits [i ] != (NumericDigit ) 0 )
1344
1452
break ;
1345
1453
1346
1454
end_offset ++ ;
@@ -2536,7 +2644,7 @@ numeric_avg(PG_FUNCTION_ARGS)
2536
2644
2537
2645
/* SQL92 defines AVG of no values to be NULL */
2538
2646
/* N is zero iff no digits (cf. numeric_uminus) */
2539
- if (VARSIZE (N ) == NUMERIC_HDRSZ )
2647
+ if (NUMERIC_NDIGITS (N ) == 0 )
2540
2648
PG_RETURN_NULL ();
2541
2649
2542
2650
PG_RETURN_DATUM (DirectFunctionCall2 (numeric_div ,
@@ -2974,7 +3082,8 @@ dump_numeric(const char *str, Numeric num)
2974
3082
2975
3083
ndigits = NUMERIC_NDIGITS (num );
2976
3084
2977
- printf ("%s: NUMERIC w=%d d=%d " , str , num -> n_weight , NUMERIC_DSCALE (num ));
3085
+ printf ("%s: NUMERIC w=%d d=%d " , str ,
3086
+ NUMERIC_WEIGHT (num ), NUMERIC_DSCALE (num ));
2978
3087
switch (NUMERIC_SIGN (num ))
2979
3088
{
2980
3089
case NUMERIC_POS :
@@ -3265,11 +3374,11 @@ set_var_from_num(Numeric num, NumericVar *dest)
3265
3374
3266
3375
alloc_var (dest , ndigits );
3267
3376
3268
- dest -> weight = num -> n_weight ;
3377
+ dest -> weight = NUMERIC_WEIGHT ( num ) ;
3269
3378
dest -> sign = NUMERIC_SIGN (num );
3270
3379
dest -> dscale = NUMERIC_DSCALE (num );
3271
3380
3272
- memcpy (dest -> digits , num -> n_data , ndigits * sizeof (NumericDigit ));
3381
+ memcpy (dest -> digits , NUMERIC_DIGITS ( num ) , ndigits * sizeof (NumericDigit ));
3273
3382
}
3274
3383
3275
3384
@@ -3561,11 +3670,11 @@ make_result(NumericVar *var)
3561
3670
3562
3671
if (sign == NUMERIC_NAN )
3563
3672
{
3564
- result = (Numeric ) palloc (NUMERIC_HDRSZ );
3673
+ result = (Numeric ) palloc (NUMERIC_HDRSZ_SHORT );
3565
3674
3566
- SET_VARSIZE (result , NUMERIC_HDRSZ );
3567
- result -> n_weight = 0 ;
3568
- result -> n_sign_dscale = NUMERIC_NAN ;
3675
+ SET_VARSIZE (result , NUMERIC_HDRSZ_SHORT );
3676
+ result -> choice . n_header = NUMERIC_NAN ;
3677
+ /* the header word is all we need */
3569
3678
3570
3679
dump_numeric ("make_result()" , result );
3571
3680
return result ;
@@ -3592,16 +3701,33 @@ make_result(NumericVar *var)
3592
3701
}
3593
3702
3594
3703
/* Build the result */
3595
- len = NUMERIC_HDRSZ + n * sizeof (NumericDigit );
3596
- result = (Numeric ) palloc (len );
3597
- SET_VARSIZE (result , len );
3598
- result -> n_weight = weight ;
3599
- result -> n_sign_dscale = sign | (var -> dscale & NUMERIC_DSCALE_MASK );
3704
+ if (NUMERIC_CAN_BE_SHORT (var -> dscale , weight ))
3705
+ {
3706
+ len = NUMERIC_HDRSZ_SHORT + n * sizeof (NumericDigit );
3707
+ result = (Numeric ) palloc (len );
3708
+ SET_VARSIZE (result , len );
3709
+ result -> choice .n_short .n_header =
3710
+ (sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK )
3711
+ : NUMERIC_SHORT )
3712
+ | (var -> dscale << NUMERIC_SHORT_DSCALE_SHIFT )
3713
+ | (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0 )
3714
+ | (weight & NUMERIC_SHORT_WEIGHT_MASK );
3715
+ }
3716
+ else
3717
+ {
3718
+ len = NUMERIC_HDRSZ + n * sizeof (NumericDigit );
3719
+ result = (Numeric ) palloc (len );
3720
+ SET_VARSIZE (result , len );
3721
+ result -> choice .n_long .n_sign_dscale =
3722
+ sign | (var -> dscale & NUMERIC_DSCALE_MASK );
3723
+ result -> choice .n_long .n_weight = weight ;
3724
+ }
3600
3725
3601
- memcpy (result -> n_data , digits , n * sizeof (NumericDigit ));
3726
+ memcpy (NUMERIC_DIGITS (result ), digits , n * sizeof (NumericDigit ));
3727
+ Assert (NUMERIC_NDIGITS (result ) == n );
3602
3728
3603
3729
/* Check for overflow of int16 fields */
3604
- if (result -> n_weight != weight ||
3730
+ if (NUMERIC_WEIGHT ( result ) != weight ||
3605
3731
NUMERIC_DSCALE (result ) != var -> dscale )
3606
3732
ereport (ERROR ,
3607
3733
(errcode (ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE ),
0 commit comments