Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 33d9b49

Browse files
author
Commitfest Bot
committed
[CF 5382] Adding compression of temporary files
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/5382 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/CAFjYY+LJ5_j82PrSxm2RjM0OjwfN9smJ1VKKMOka-25Jg-0ofg@mail.gmail.com Author(s): Filip Januš
2 parents 47d90b7 + e2334c1 commit 33d9b49

16 files changed

+3842
-16
lines changed

src/Makefile.global.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ with_liburing = @with_liburing@
201201
with_libxml = @with_libxml@
202202
with_libxslt = @with_libxslt@
203203
with_llvm = @with_llvm@
204+
with_lz4 = @with_lz4@
204205
with_system_tzdata = @with_system_tzdata@
205206
with_uuid = @with_uuid@
206207
with_zlib = @with_zlib@

src/backend/access/gist/gistbuildbuffers.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
5454
* Create a temporary file to hold buffer pages that are swapped out of
5555
* memory.
5656
*/
57-
gfbb->pfile = BufFileCreateTemp(false);
57+
gfbb->pfile = BufFileCreateTemp(false, false);
5858
gfbb->nFileBlocks = 0;
5959

6060
/* Initialize free page management. */

src/backend/backup/backup_manifest.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
6565
manifest->buffile = NULL;
6666
else
6767
{
68-
manifest->buffile = BufFileCreateTemp(false);
68+
manifest->buffile = BufFileCreateTemp(false, false);
6969
manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
7070
if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
7171
elog(ERROR, "failed to initialize checksum of backup manifest: %s",

src/backend/executor/nodeHashjoin.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
14341434
{
14351435
MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
14361436

1437-
file = BufFileCreateTemp(false);
1437+
file = BufFileCreateCompressTemp(false);
14381438
*fileptr = file;
14391439

14401440
MemoryContextSwitchTo(oldctx);

src/backend/storage/file/buffile.c

Lines changed: 208 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@
5353
#include "storage/bufmgr.h"
5454
#include "storage/fd.h"
5555
#include "utils/resowner.h"
56+
#include "utils/memutils.h"
57+
58+
#include "common/pg_lzcompress.h"
59+
#ifdef USE_LZ4
60+
#include <lz4.h>
61+
#endif
62+
63+
#define NO_LZ4_SUPPORT() \
64+
ereport(ERROR, \
65+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
66+
errmsg("compression method lz4 not supported"), \
67+
errdetail("This functionality requires the server to be built with lz4 support.")))
5668

5769
/*
5870
* We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
@@ -62,6 +74,8 @@
6274
#define MAX_PHYSICAL_FILESIZE 0x40000000
6375
#define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ)
6476

77+
int temp_file_compression = TEMP_NONE_COMPRESSION;
78+
6579
/*
6680
* This data structure represents a buffered file that consists of one or
6781
* more physical files (each accessed through a virtual file descriptor
@@ -95,7 +109,8 @@ struct BufFile
95109
off_t curOffset; /* offset part of current pos */
96110
int pos; /* next read/write position in buffer */
97111
int nbytes; /* total # of valid bytes in buffer */
98-
112+
bool compress; /* State of usege file compression */
113+
char *cBuffer;
99114
/*
100115
* XXX Should ideally use PGIOAlignedBlock, but might need a way to avoid
101116
* wasting per-file alignment padding when some users create many files.
@@ -127,6 +142,8 @@ makeBufFileCommon(int nfiles)
127142
file->curOffset = 0;
128143
file->pos = 0;
129144
file->nbytes = 0;
145+
file->compress = false;
146+
file->cBuffer = NULL;
130147

131148
return file;
132149
}
@@ -188,9 +205,17 @@ extendBufFile(BufFile *file)
188205
* Note: if interXact is true, the caller had better be calling us in a
189206
* memory context, and with a resource owner, that will survive across
190207
* transaction boundaries.
208+
*
209+
* If compress is true the temporary files will be compressed before
210+
* writing on disk.
211+
*
212+
* Note: The compression does not support random access. Only the hash joins
213+
* use it for now. The seek operation other than seek to the beginning of the
214+
* buffile will corrupt temporary data offsets.
215+
*
191216
*/
192217
BufFile *
193-
BufFileCreateTemp(bool interXact)
218+
BufFileCreateTemp(bool interXact, bool compress)
194219
{
195220
BufFile *file;
196221
File pfile;
@@ -212,9 +237,47 @@ BufFileCreateTemp(bool interXact)
212237
file = makeBufFile(pfile);
213238
file->isInterXact = interXact;
214239

240+
if (temp_file_compression != TEMP_NONE_COMPRESSION)
241+
{
242+
file->compress = compress;
243+
}
244+
215245
return file;
246+
216247
}
248+
/*
249+
* Wrapper for BuffileCreateTemp
250+
* We want to limit the number of memory allocations for the compression buffer,
251+
* only one buffer for all compression operations is enough
252+
*/
253+
BufFile *
254+
BufFileCreateCompressTemp(bool interXact){
255+
static char * buff = NULL;
256+
BufFile *tmpBufFile = BufFileCreateTemp(interXact, true);
217257

258+
if (buff == NULL && temp_file_compression != TEMP_NONE_COMPRESSION)
259+
{
260+
int size = 0;
261+
262+
switch (temp_file_compression)
263+
{
264+
case TEMP_LZ4_COMPRESSION:
265+
#ifdef USE_LZ4
266+
size = LZ4_compressBound(BLCKSZ)+sizeof(int);
267+
#endif
268+
break;
269+
case TEMP_PGLZ_COMPRESSION:
270+
size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ)+sizeof(int);
271+
break;
272+
}
273+
/*
274+
* Persistent buffer for all temporary file compressions
275+
*/
276+
buff = MemoryContextAlloc(TopMemoryContext, size);
277+
}
278+
tmpBufFile->cBuffer = buff;
279+
return tmpBufFile;
280+
}
218281
/*
219282
* Build the name for a given segment of a given BufFile.
220283
*/
@@ -275,6 +338,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
275338
file->files[0] = MakeNewFileSetSegment(file, 0);
276339
file->readOnly = false;
277340

341+
278342
return file;
279343
}
280344

@@ -457,11 +521,75 @@ BufFileLoadBuffer(BufFile *file)
457521
/*
458522
* Read whatever we can get, up to a full bufferload.
459523
*/
460-
file->nbytes = FileRead(thisfile,
524+
if (!file->compress)
525+
{
526+
527+
/*
528+
* Read whatever we can get, up to a full bufferload.
529+
*/
530+
file->nbytes = FileRead(thisfile,
461531
file->buffer.data,
462-
sizeof(file->buffer.data),
532+
sizeof(file->buffer),
533+
file->curOffset,
534+
WAIT_EVENT_BUFFILE_READ);
535+
/*
536+
* Read and decompress data from the temporary file
537+
* The first reading loads size of the compressed block
538+
* Second reading loads compressed data
539+
*/
540+
} else {
541+
int nread;
542+
int nbytes;
543+
544+
nread = FileRead(thisfile,
545+
&nbytes,
546+
sizeof(nbytes),
547+
file->curOffset,
548+
WAIT_EVENT_BUFFILE_READ);
549+
/* if not EOF let's continue */
550+
if (nread > 0)
551+
{
552+
/* A long life buffer limits number of memory allocations */
553+
char * buff = file->cBuffer;
554+
555+
Assert(file->cBuffer != NULL);
556+
/*
557+
* Read compressed data, curOffset differs with pos
558+
* It reads less data than it returns to caller
559+
* So the curOffset must be advanced here based on compressed size
560+
*/
561+
file->curOffset+=sizeof(nbytes);
562+
563+
nread = FileRead(thisfile,
564+
buff,
565+
nbytes,
463566
file->curOffset,
464567
WAIT_EVENT_BUFFILE_READ);
568+
569+
switch (temp_file_compression)
570+
{
571+
case TEMP_LZ4_COMPRESSION:
572+
#ifdef USE_LZ4
573+
file->nbytes = LZ4_decompress_safe(buff,
574+
file->buffer.data,nbytes,sizeof(file->buffer));
575+
#endif
576+
break;
577+
578+
case TEMP_PGLZ_COMPRESSION:
579+
file->nbytes = pglz_decompress(buff,nbytes,
580+
file->buffer.data,sizeof(file->buffer),false);
581+
break;
582+
}
583+
file->curOffset += nread;
584+
585+
if (file->nbytes < 0)
586+
ereport(ERROR,
587+
(errcode(ERRCODE_DATA_CORRUPTED),
588+
errmsg_internal("compressed lz4 data is corrupt")));
589+
}
590+
591+
}
592+
465593
if (file->nbytes < 0)
466594
{
467595
file->nbytes = 0;
@@ -494,9 +622,61 @@ static void
494622
BufFileDumpBuffer(BufFile *file)
495623
{
496624
int wpos = 0;
497-
int bytestowrite;
625+
int bytestowrite = 0;
498626
File thisfile;
499627

628+
629+
/* Save nbytes value because the size changes due to compression */
630+
int nbytesOriginal = file->nbytes;
631+
632+
char * DataToWrite;
633+
DataToWrite = file->buffer.data;
634+
635+
/*
636+
* Prepare compressed data to write
637+
* size of compressed block needs to be added at the beggining of the
638+
* compressed data
639+
*/
640+
641+
642+
if (file->compress) {
643+
char * cData;
644+
int cSize = 0;
645+
646+
Assert(file->cBuffer != NULL);
647+
cData = file->cBuffer;
648+
649+
switch (temp_file_compression)
650+
{
651+
case TEMP_LZ4_COMPRESSION:
652+
{
653+
#ifdef USE_LZ4
654+
int cBufferSize = LZ4_compressBound(file->nbytes);
655+
/*
656+
* Using stream compression would lead to the slight improvement in
657+
* compression ratio
658+
*/
659+
cSize = LZ4_compress_default(file->buffer.data,
660+
cData + sizeof(int),file->nbytes, cBufferSize);
661+
#endif
662+
break;
663+
}
664+
case TEMP_PGLZ_COMPRESSION:
665+
cSize = pglz_compress(file->buffer.data,file->nbytes,
666+
cData + sizeof(int),PGLZ_strategy_always);
667+
break;
668+
}
669+
670+
671+
/* Write size of compressed block in front of compressed data
672+
* It's used to determine amount of data to read within
673+
* decompression process
674+
*/
675+
memcpy(cData,&cSize,sizeof(int));
676+
file->nbytes=cSize + sizeof(int);
677+
DataToWrite = cData;
678+
}
679+
500680
/*
501681
* Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
502682
* crosses a component-file boundary; so we need a loop.
@@ -535,7 +715,7 @@ BufFileDumpBuffer(BufFile *file)
535715
INSTR_TIME_SET_ZERO(io_start);
536716

537717
bytestowrite = FileWrite(thisfile,
538-
file->buffer.data + wpos,
718+
DataToWrite + wpos,
539719
bytestowrite,
540720
file->curOffset,
541721
WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +744,19 @@ BufFileDumpBuffer(BufFile *file)
564744
* logical file position, ie, original value + pos, in case that is less
565745
* (as could happen due to a small backwards seek in a dirty buffer!)
566746
*/
567-
file->curOffset -= (file->nbytes - file->pos);
747+
748+
749+
if (!file->compress)
750+
file->curOffset -= (file->nbytes - file->pos);
751+
else
752+
if (nbytesOriginal - file->pos != 0)
753+
/* curOffset must be corrected also if compression is
754+
* enabled, nbytes was changed by compression but we
755+
* have to use the original value of nbytes
756+
*/
757+
file->curOffset-=bytestowrite;
758+
759+
568760
if (file->curOffset < 0) /* handle possible segment crossing */
569761
{
570762
file->curFile--;
@@ -577,6 +769,7 @@ BufFileDumpBuffer(BufFile *file)
577769
*/
578770
file->pos = 0;
579771
file->nbytes = 0;
772+
580773
}
581774

582775
/*
@@ -602,8 +795,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
602795
{
603796
if (file->pos >= file->nbytes)
604797
{
605-
/* Try to load more data into buffer. */
606-
file->curOffset += file->pos;
798+
/* Try to load more data into buffer.
799+
*
800+
* curOffset is moved within BufFileLoadBuffer
801+
* because stored data size differs from loaded/
802+
* decompressed size
803+
* */
804+
if (!file->compress)
805+
file->curOffset += file->pos;
607806
file->pos = 0;
608807
file->nbytes = 0;
609808
BufFileLoadBuffer(file);

src/backend/utils/misc/guc_tables.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
#include "replication/syncrep.h"
7979
#include "storage/aio.h"
8080
#include "storage/bufmgr.h"
81+
#include "storage/buffile.h"
8182
#include "storage/bufpage.h"
8283
#include "storage/copydir.h"
8384
#include "storage/io_worker.h"
@@ -463,6 +464,18 @@ static const struct config_enum_entry default_toast_compression_options[] = {
463464
#endif
464465
{NULL, 0, false}
465466
};
467+
/*
468+
* pglz and zstd support should be added as future enhancement
469+
*
470+
*/
471+
static const struct config_enum_entry temp_file_compression_options[] = {
472+
{"no", TEMP_NONE_COMPRESSION, false},
473+
{"pglz", TEMP_PGLZ_COMPRESSION, false},
474+
#ifdef USE_LZ4
475+
{"lz4", TEMP_LZ4_COMPRESSION, false},
476+
#endif
477+
{NULL, 0, false}
478+
};
466479

467480
static const struct config_enum_entry wal_compression_options[] = {
468481
{"pglz", WAL_COMPRESSION_PGLZ, false},
@@ -5058,6 +5071,17 @@ struct config_enum ConfigureNamesEnum[] =
50585071
NULL, NULL, NULL
50595072
},
50605073

5074+
{
5075+
{"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT,
5076+
gettext_noop("Sets the default compression method for compressible values."),
5077+
NULL
5078+
},
5079+
&temp_file_compression,
5080+
TEMP_NONE_COMPRESSION,
5081+
temp_file_compression_options,
5082+
NULL, NULL, NULL
5083+
},
5084+
50615085
{
50625086
{"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT,
50635087
gettext_noop("Sets the transaction isolation level of each new transaction."),

0 commit comments

Comments
 (0)