Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 30743fe

Browse files
author
Nikita Glukhov
committed
Partial decompression in TOAST iterators
1 parent 6cb2952 commit 30743fe

File tree

6 files changed

+126
-16
lines changed

6 files changed

+126
-16
lines changed

src/backend/access/common/detoast.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,8 @@ create_detoast_iterator(struct varlena *attr)
365365
/* initialize state for pglz_decompress_iterate() */
366366
iter->ctrl = 0;
367367
iter->ctrlc = INVALID_CTRLC;
368+
iter->len = 0;
369+
iter->off = 0;
368370
}
369371
else
370372
{
@@ -390,7 +392,7 @@ create_detoast_iterator(struct varlena *attr)
390392
return create_detoast_iterator(attr);
391393

392394
}
393-
else if (VARATT_IS_COMPRESSED(attr))
395+
else if (1 && VARATT_IS_COMPRESSED(attr))
394396
{
395397
ToastBuffer *buf;
396398

src/backend/access/common/toast_compression.c

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,82 @@ lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength)
247247
#endif
248248
}
249249

250+
void
251+
toast_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
252+
DetoastIterator iter, const char *destend)
253+
{
254+
const char *sp;
255+
const char *srcend;
256+
char *dp;
257+
int32 dlen;
258+
int32 slen;
259+
bool last_source_chunk;
260+
261+
/*
262+
* In the while loop, sp may be incremented such that it points beyond
263+
* srcend. To guard against reading beyond the end of the current chunk,
264+
* we set srcend such that we exit the loop when we are within four bytes
265+
* of the end of the current chunk. When source->limit reaches
266+
* source->capacity, we are decompressing the last chunk, so we can (and
267+
* need to) read every byte.
268+
*/
269+
last_source_chunk = source->limit == source->capacity;
270+
srcend = last_source_chunk ? source->limit : source->limit - 4;
271+
sp = source->position;
272+
dp = dest->limit;
273+
if (destend > dest->capacity)
274+
destend = dest->capacity;
275+
276+
slen = srcend - source->position;
277+
278+
/*
279+
* Decompress the data using the appropriate decompression routine.
280+
*/
281+
switch (iter->compression_method)
282+
{
283+
case TOAST_PGLZ_COMPRESSION_ID:
284+
dlen = pglz_decompress_state(sp, &slen, dp, destend - dp,
285+
last_source_chunk && destend == dest->capacity,
286+
last_source_chunk,
287+
&iter->decompression_state);
288+
break;
289+
case TOAST_LZ4_COMPRESSION_ID:
290+
if (source->limit < source->capacity)
291+
dlen = 0; /* LZ4 needs need full data to decompress */
292+
else
293+
{
294+
/* decompress the data */
295+
#ifndef USE_LZ4
296+
NO_LZ4_SUPPORT();
297+
dlen = 0;
298+
#else
299+
dlen = LZ4_decompress_safe(source->buf + VARHDRSZ_COMPRESSED,
300+
VARDATA(dest->buf),
301+
VARSIZE(source->buf) - VARHDRSZ_COMPRESSED,
302+
VARDATA_COMPRESSED_GET_EXTSIZE(source->buf));
303+
304+
if (dlen < 0)
305+
ereport(ERROR,
306+
(errcode(ERRCODE_DATA_CORRUPTED),
307+
errmsg_internal("compressed lz4 data is corrupt")));
308+
#endif
309+
slen = 0;
310+
break;
311+
}
312+
default:
313+
elog(ERROR, "invalid compression method id %d", iter->compression_method);
314+
return; /* keep compiler quiet */
315+
}
316+
317+
if (dlen < 0)
318+
ereport(ERROR,
319+
(errcode(ERRCODE_DATA_CORRUPTED),
320+
errmsg_internal("compressed data is corrupt")));
321+
322+
source->position += slen;
323+
dest->limit += dlen;
324+
}
325+
250326
/*
251327
* Extract compression ID from a varlena.
252328
*

src/backend/access/common/toast_internals.c

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -919,9 +919,33 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
919919
(source->limit == source->capacity ? source->limit : (source->limit - 4));
920920
sp = (const unsigned char *) source->position;
921921
dp = (unsigned char *) dest->limit;
922-
if (destend < (unsigned char *) dest->capacity)
922+
if (destend > (unsigned char *) dest->capacity)
923923
destend = (unsigned char *) dest->capacity;
924924

925+
if (iter->len)
926+
{
927+
int32 len = iter->len;
928+
int32 off = iter->off;
929+
int32 copylen = Min(len, destend - dp);
930+
int32 remlen = len - copylen;
931+
932+
while (copylen--)
933+
{
934+
*dp = dp[-off];
935+
dp++;
936+
}
937+
938+
iter->len = remlen;
939+
940+
if (dp >= destend)
941+
{
942+
dest->limit = (char *) dp;
943+
return;
944+
}
945+
946+
Assert(remlen == 0);
947+
}
948+
925949
while (sp < srcend && dp < destend)
926950
{
927951
/*
@@ -942,7 +966,6 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
942966
ctrlc = 0;
943967
}
944968

945-
946969
for (; ctrlc < INVALID_CTRLC && sp < srcend && dp < destend; ctrlc++)
947970
{
948971

@@ -959,6 +982,7 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
959982
*/
960983
int32 len;
961984
int32 off;
985+
int32 copylen;
962986

963987
len = (sp[0] & 0x0f) + 3;
964988
off = ((sp[0] & 0xf0) << 4) | sp[1];
@@ -972,17 +996,21 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
972996
* areas could overlap; to prevent possible uncertainty, we
973997
* copy only non-overlapping regions.
974998
*/
975-
len = Min(len, destend - dp);
976-
while (off < len)
999+
copylen = Min(len, destend - dp);
1000+
iter->len = len - copylen;
1001+
1002+
while (off < copylen)
9771003
{
9781004
/* see comments in common/pg_lzcompress.c */
9791005
memcpy(dp, dp - off, off);
980-
len -= off;
1006+
copylen -= off;
9811007
dp += off;
9821008
off += off;
9831009
}
984-
memcpy(dp, dp - off, len);
985-
dp += len;
1010+
memcpy(dp, dp - off, copylen);
1011+
dp += copylen;
1012+
1013+
iter->off = off;
9861014
}
9871015
else
9881016
{

src/common/pg_lzcompress.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,8 @@ typedef struct pglz_state
700700
*/
701701
int32
702702
pglz_decompress_state(const char *source, int32 slen, char *dest,
703-
int32 rawsize, bool check_complete, void **pstate)
703+
int32 dlen, bool check_complete, bool last_cource_chunk,
704+
void **pstate)
704705
{
705706
pglz_state *state = pstate ? *pstate : NULL;
706707
const unsigned char *sp;
@@ -800,7 +801,7 @@ pglz_decompress_state(const char *source, int32 slen, char *dest,
800801
* must check this, else we risk an infinite loop below in the
801802
* face of corrupt data.)
802803
*/
803-
if (unlikely(sp > srcend || off == 0))
804+
if (unlikely((sp > srcend && last_cource_chunk) || off == 0))
804805
return -1;
805806

806807
/*
@@ -947,5 +948,5 @@ int32
947948
pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize,
948949
bool check_complete)
949950
{
950-
return pglz_decompress_state(source, slen, dest, rawsize, check_complete, NULL);
951+
return pglz_decompress_state(source, slen, dest, rawsize, check_complete, true, NULL);
951952
}

src/include/access/detoast.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ typedef struct DetoastIteratorData
123123
unsigned char ctrl;
124124
int ctrlc;
125125
int nrefs;
126+
int32 len;
127+
int32 off;
126128
bool compressed; /* toast value is compressed? */
127129
bool done;
128130
} DetoastIteratorData;
@@ -174,16 +176,16 @@ detoast_iterate(DetoastIterator detoast_iter, const char *destend)
174176
if (!detoast_iter->compressed)
175177
destend = NULL;
176178

177-
if (destend)
179+
if (1 && destend)
178180
{
179181
const char *srcend = (const char *)
180182
(fetch_iter->buf->limit == fetch_iter->buf->capacity ?
181183
fetch_iter->buf->limit : fetch_iter->buf->limit - 4);
182184

183-
if (fetch_iter->buf->position >= srcend)
185+
if (fetch_iter->buf->position >= srcend && !fetch_iter->done)
184186
fetch_datum_iterate(fetch_iter);
185187
}
186-
else
188+
else if (!fetch_iter->done)
187189
fetch_datum_iterate(fetch_iter);
188190

189191
if (detoast_iter->compressed)

src/include/common/pg_lzcompress.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ extern int32 pglz_compress(const char *source, int32 slen, char *dest,
8787
const PGLZ_Strategy *strategy);
8888
extern int32 pglz_decompress(const char *source, int32 slen, char *dest,
8989
int32 rawsize, bool check_complete);
90-
extern int32 pglz_decompress_state(const char *source, int32 slen, char *dest,
91-
int32 rawsize, bool check_complete,
90+
extern int32 pglz_decompress_state(const char *source, int32 slen,
91+
char *dest, int32 dlen,
92+
bool check_complete, bool last_source_chunk,
9293
void **state);
9394
extern int32 pglz_maximum_compressed_size(int32 rawsize,
9495
int32 total_compressed_size);

0 commit comments

Comments
 (0)