17
17
#include <lz4.h>
18
18
#endif
19
19
20
+ #ifdef USE_ZSTD
21
+ #include <zstd.h>
22
+ #endif
23
+
20
24
#include "access/detoast.h"
21
25
#include "access/toast_compression.h"
22
26
#include "common/pg_lzcompress.h"
26
30
/* GUC */
27
31
int default_toast_compression = TOAST_PGLZ_COMPRESSION ;
28
32
29
- #define NO_LZ4_SUPPORT () \
33
+ #ifdef USE_ZSTD
34
+ #define ZSTD_CHECK_ERROR (zstd_ret , msg ) \
35
+ do { \
36
+ if (ZSTD_isError(zstd_ret)) \
37
+ ereport(ERROR, (errmsg("%s: %s", (msg), ZSTD_getErrorName(zstd_ret)))); \
38
+ } while (0)
39
+ #endif
40
+
41
+ #define COMPRESSION_METHOD_NOT_SUPPORTED (method ) \
30
42
ereport(ERROR, \
31
43
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
32
- errmsg("compression method lz4 not supported"), \
33
- errdetail("This functionality requires the server to be built with lz4 support.")))
44
+ errmsg("compression method %s not supported", method ), \
45
+ errdetail("This functionality requires the server to be built with %s support.", method )))
34
46
35
47
/*
36
48
* Compress a varlena using PGLZ.
@@ -140,7 +152,7 @@ struct varlena *
140
152
lz4_compress_datum (const struct varlena * value )
141
153
{
142
154
#ifndef USE_LZ4
143
- NO_LZ4_SUPPORT ( );
155
+ COMPRESSION_METHOD_NOT_SUPPORTED ( "lz4" );
144
156
return NULL ; /* keep compiler quiet */
145
157
#else
146
158
int32 valsize ;
@@ -183,7 +195,7 @@ struct varlena *
183
195
lz4_decompress_datum (const struct varlena * value )
184
196
{
185
197
#ifndef USE_LZ4
186
- NO_LZ4_SUPPORT ( );
198
+ COMPRESSION_METHOD_NOT_SUPPORTED ( "lz4" );
187
199
return NULL ; /* keep compiler quiet */
188
200
#else
189
201
int32 rawsize ;
@@ -216,7 +228,7 @@ struct varlena *
216
228
lz4_decompress_datum_slice (const struct varlena * value , int32 slicelength )
217
229
{
218
230
#ifndef USE_LZ4
219
- NO_LZ4_SUPPORT ( );
231
+ COMPRESSION_METHOD_NOT_SUPPORTED ( "lz4" );
220
232
return NULL ; /* keep compiler quiet */
221
233
#else
222
234
int32 rawsize ;
@@ -246,6 +258,153 @@ lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength)
246
258
#endif
247
259
}
248
260
261
+ /* Compress datum using ZSTD */
262
+ struct varlena *
263
+ zstd_compress_datum (const struct varlena * value , CompressionInfo cmp )
264
+ {
265
+ #ifdef USE_ZSTD
266
+ uint32 valsize = VARSIZE_ANY_EXHDR (value );
267
+ size_t max_size = ZSTD_compressBound (valsize );
268
+ struct varlena * compressed ;
269
+ size_t cmp_size ;
270
+
271
+ if (!cmp .meta ) /* ZSTD no dictionary */
272
+ {
273
+ /* Allocate space for the compressed varlena (header + data) */
274
+ compressed = (struct varlena * ) palloc (max_size + VARHDRSZ_4BCE );
275
+
276
+ cmp_size = ZSTD_compress (VARDATA_4BCE (compressed ),
277
+ max_size ,
278
+ VARDATA_ANY (value ),
279
+ valsize ,
280
+ cmp .zstd_level );
281
+
282
+ if (ZSTD_isError (cmp_size ))
283
+ {
284
+ pfree (compressed );
285
+ ZSTD_CHECK_ERROR (cmp_size , "ZSTD compression failed" );
286
+ }
287
+
288
+ /*
289
+ * If compression did not reduce size, return NULL so that the
290
+ * uncompressed data is stored
291
+ */
292
+ if (cmp_size > valsize )
293
+ {
294
+ pfree (compressed );
295
+ return NULL ;
296
+ }
297
+
298
+ /* Set the compressed size in the varlena header */
299
+ SET_VARSIZE_COMPRESSED (compressed , cmp_size + VARHDRSZ_4BCE );
300
+ }
301
+ else
302
+ elog (ERROR , "ZSTD metadata(dictionary) based compression not supported yet" );
303
+
304
+ return compressed ;
305
+
306
+ #else
307
+ COMPRESSION_METHOD_NOT_SUPPORTED ("zstd" );
308
+ return NULL ;
309
+ #endif
310
+ }
311
+
312
+ /* Decompression routine */
313
+ struct varlena *
314
+ zstd_decompress_datum (const struct varlena * value )
315
+ {
316
+ #ifdef USE_ZSTD
317
+ /* ZSTD no dictionary compression */
318
+ uint32 actual_size_exhdr = VARDATA_COMPRESSED_GET_EXTSIZE (value );
319
+ uint32 zstd_compressed_len ;
320
+ struct varlena * result ;
321
+ size_t uncmp_size ;
322
+ bool meta = VARATT_4BCE_PTR_HAS_META (value );
323
+
324
+ if (!meta ) /* ZSTD no dictionary */
325
+ {
326
+ zstd_compressed_len = VARSIZE_ANY (value ) - VARHDRSZ_4BCE ;
327
+
328
+ /* Allocate space for the uncompressed data */
329
+ result = (struct varlena * ) palloc (actual_size_exhdr + VARHDRSZ );
330
+
331
+ uncmp_size = ZSTD_decompress (VARDATA (result ),
332
+ actual_size_exhdr ,
333
+ VARDATA_4BCE (value ),
334
+ zstd_compressed_len );
335
+
336
+ if (ZSTD_isError (uncmp_size ))
337
+ {
338
+ pfree (result );
339
+ ZSTD_CHECK_ERROR (uncmp_size , "ZSTD decompression failed" );
340
+ }
341
+
342
+ /* Set final size in the varlena header */
343
+ SET_VARSIZE (result , uncmp_size + VARHDRSZ );
344
+ }
345
+ else
346
+ elog (ERROR , "ZSTD metadata(dictionary) based decompression not supported yet" );
347
+
348
+ return result ;
349
+
350
+ #else
351
+ COMPRESSION_METHOD_NOT_SUPPORTED ("zstd" );
352
+ return NULL ;
353
+ #endif
354
+ }
355
+
356
+ /* Decompress a slice of the datum */
357
+ struct varlena *
358
+ zstd_decompress_datum_slice (const struct varlena * value , int32 slicelength )
359
+ {
360
+ #ifdef USE_ZSTD
361
+ /* ZSTD no dictionary compression */
362
+
363
+ struct varlena * result ;
364
+ ZSTD_inBuffer inBuf ;
365
+ ZSTD_outBuffer outBuf ;
366
+ size_t ret ;
367
+ ZSTD_DCtx * ZstdDecompressionCtx ;
368
+ bool meta = VARATT_4BCE_PTR_HAS_META (value );
369
+
370
+ if (!meta ) /* ZSTD no dictionary */
371
+ {
372
+ ZstdDecompressionCtx = ZSTD_createDCtx ();
373
+ inBuf .src = VARDATA_4BCE (value );
374
+ inBuf .size = VARSIZE_ANY (value ) - VARHDRSZ_4BCE ;
375
+ inBuf .pos = 0 ;
376
+
377
+ result = (struct varlena * ) palloc (slicelength + VARHDRSZ );
378
+ outBuf .dst = (char * ) result + VARHDRSZ ;
379
+ outBuf .size = slicelength ;
380
+ outBuf .pos = 0 ;
381
+
382
+ /* Common decompression loop */
383
+ while (inBuf .pos < inBuf .size && outBuf .pos < outBuf .size )
384
+ {
385
+ ret = ZSTD_decompressStream (ZstdDecompressionCtx , & outBuf , & inBuf );
386
+ if (ZSTD_isError (ret ))
387
+ {
388
+ pfree (result );
389
+ ZSTD_freeDCtx (ZstdDecompressionCtx );
390
+ ZSTD_CHECK_ERROR (ret , "zstd decompression failed" );
391
+ }
392
+ }
393
+
394
+ Assert (outBuf .size == slicelength && outBuf .pos == slicelength );
395
+ SET_VARSIZE (result , outBuf .pos + VARHDRSZ );
396
+ ZSTD_freeDCtx (ZstdDecompressionCtx );
397
+ }
398
+ else
399
+ elog (ERROR , "ZSTD metadata(dictionary) based decompression not supported yet" );
400
+
401
+ return result ;
402
+ #else
403
+ COMPRESSION_METHOD_NOT_SUPPORTED ("zstd" );
404
+ return NULL ;
405
+ #endif
406
+ }
407
+
249
408
/*
250
409
* Extract compression ID from a varlena.
251
410
*
@@ -290,10 +449,17 @@ CompressionNameToMethod(const char *compression)
290
449
else if (strcmp (compression , "lz4" ) == 0 )
291
450
{
292
451
#ifndef USE_LZ4
293
- NO_LZ4_SUPPORT ( );
452
+ COMPRESSION_METHOD_NOT_SUPPORTED ( "lz4" );
294
453
#endif
295
454
return TOAST_LZ4_COMPRESSION ;
296
455
}
456
+ else if (strcmp (compression , "zstd" ) == 0 )
457
+ {
458
+ #ifndef USE_ZSTD
459
+ COMPRESSION_METHOD_NOT_SUPPORTED ("zstd" );
460
+ #endif
461
+ return TOAST_ZSTD_COMPRESSION ;
462
+ }
297
463
298
464
return InvalidCompressionMethod ;
299
465
}
@@ -310,6 +476,8 @@ GetCompressionMethodName(char method)
310
476
return "pglz" ;
311
477
case TOAST_LZ4_COMPRESSION :
312
478
return "lz4" ;
479
+ case TOAST_ZSTD_COMPRESSION :
480
+ return "zstd" ;
313
481
default :
314
482
elog (ERROR , "invalid compression method %c" , method );
315
483
return NULL ; /* keep compiler quiet */
@@ -324,6 +492,7 @@ setup_cmp_info(char cmethod, Form_pg_attribute att)
324
492
/* initialize from the attribute’s default settings */
325
493
info .cmethod = cmethod ;
326
494
info .meta = false;
495
+ info .zstd_level = DEFAULT_ZSTD_LEVEL ;
327
496
328
497
/* If the compression method is not valid, use the current default */
329
498
if (!CompressionMethodIsValid (cmethod ))
@@ -334,6 +503,14 @@ setup_cmp_info(char cmethod, Form_pg_attribute att)
334
503
case TOAST_PGLZ_COMPRESSION :
335
504
case TOAST_LZ4_COMPRESSION :
336
505
break ;
506
+ case TOAST_ZSTD_COMPRESSION :
507
+ {
508
+ AttributeOpts * aopt = get_attribute_options (att -> attrelid , att -> attnum );
509
+
510
+ if (aopt != NULL )
511
+ info .zstd_level = aopt -> zstd_level ;
512
+ }
513
+ break ;
337
514
default :
338
515
elog (ERROR , "invalid compression method %c" , info .cmethod );
339
516
}
0 commit comments