Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 765d063

Browse files
committed
Add walsender messages compression
1 parent a2b6520 commit 765d063

File tree

4 files changed

+100
-17
lines changed

4 files changed

+100
-17
lines changed

src/backend/replication/walsender.c

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,15 @@
6060
#include "catalog/pg_type.h"
6161
#include "commands/dbcommands.h"
6262
#include "commands/defrem.h"
63+
#include "common/pg_lzcompress.h"
64+
#include "common/fe_memutils.h"
6365
#include "funcapi.h"
6466
#include "libpq/libpq.h"
6567
#include "libpq/pqformat.h"
6668
#include "miscadmin.h"
6769
#include "nodes/replnodes.h"
6870
#include "pgstat.h"
71+
#include "port/pg_bswap.h"
6972
#include "replication/basebackup.h"
7073
#include "replication/decode.h"
7174
#include "replication/logical.h"
@@ -95,7 +98,9 @@
9598
#include "utils/timestamp.h"
9699

97100
#define ERROR_MESSAGE_SIZE 1024
98-
101+
#define MIN_COMPRESSION_RATIO 2
102+
#define MIN_COMPRESSION_SIZE 32
103+
#define MSG_COMPRESS_HDR_SIZE 4
99104
/*
100105
* Shared structure to repair a corrupted block.
101106
*
@@ -160,6 +165,9 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
160165
* data message */
161166
bool log_replication_commands = false;
162167

168+
169+
bool wal_sender_compression = false;
170+
163171
/*
164172
* State for WalSndWakeupRequest
165173
*/
@@ -1192,7 +1200,29 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11921200
TimestampTz now;
11931201

11941202
/* output previously gathered data in a CopyData packet */
1195-
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1203+
if (wal_sender_compression && ctx->out->len > MIN_COMPRESSION_SIZE)
1204+
{
1205+
static char* lzbuf;
1206+
static int lzbuf_size;
1207+
int zip_len;
1208+
int raw_len = ctx->out->len;
1209+
int max_zip_len = PGLZ_MAX_OUTPUT(raw_len) + MSG_COMPRESS_HDR_SIZE;
1210+
if (max_zip_len > lzbuf_size) {
1211+
lzbuf_size = lzbuf_size*2 > max_zip_len ? lzbuf_size*2 : max_zip_len;
1212+
lzbuf = realloc(lzbuf, lzbuf_size);
1213+
}
1214+
zip_len = pglz_compress(ctx->out->data, raw_len, lzbuf + MSG_COMPRESS_HDR_SIZE, PGLZ_strategy_default);
1215+
if (zip_len > 0 && zip_len < raw_len/MIN_COMPRESSION_RATIO)
1216+
{
1217+
uint32 net_size = htonl(raw_len);
1218+
memcpy(lzbuf, &net_size, sizeof(net_size));
1219+
pq_putmessage_noblock('z', lzbuf, zip_len + MSG_COMPRESS_HDR_SIZE);
1220+
elog(LOG, "WRITE compressed message with length %d raw_len=%d", zip_len, raw_len);
1221+
}
1222+
else
1223+
pq_putmessage_noblock('d', ctx->out->data, raw_len);
1224+
} else
1225+
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
11961226

11971227
/*
11981228
* Fill the send timestamp last, so that it is taken as late as possible.

src/backend/utils/misc/guc.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,16 @@ static struct config_bool ConfigureNamesBool[] =
10761076
NULL, NULL, NULL
10771077
},
10781078

1079+
{
1080+
{"wal_sender_compression", PGC_SUSET, WAL_SETTINGS,
1081+
gettext_noop("Compress logical replicastion messages."),
1082+
NULL
1083+
},
1084+
&wal_sender_compression,
1085+
false,
1086+
NULL, NULL, NULL
1087+
},
1088+
10791089
{
10801090
{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
10811091
gettext_noop("Logs each checkpoint."),

src/include/replication/walsender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ extern bool wake_wal_senders;
4141
extern PGDLLIMPORT int max_wal_senders;
4242
extern int wal_sender_timeout;
4343
extern bool log_replication_commands;
44+
extern bool wal_sender_compression;
4445

4546
extern void InitWalSender(void);
4647
extern bool exec_replication_command(const char *query_string);

src/interfaces/libpq/fe-protocol3.c

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
#include <arpa/inet.h>
3434
#endif
3535

36+
#include "common/pg_lzcompress.h"
37+
#include "utils/elog.h"
3638

3739
/*
3840
* This macro lists the backend message types that could be "long" (more
3941
* than a couple of kilobytes).
4042
*/
4143
#define VALID_LONG_MESSAGE_TYPE(id) \
42-
((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'V' || \
44+
((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'z' || (id) == 'V' || \
4345
(id) == 'E' || (id) == 'N' || (id) == 'A')
4446

4547

@@ -389,6 +391,7 @@ pqParseInput3(PGconn *conn)
389391
conn->copy_already_done = 0;
390392
break;
391393
case 'd': /* Copy Data */
394+
case 'z': /* Copy Compressed Data */
392395

393396
/*
394397
* If we see Copy Data, just silently drop it. This would
@@ -1533,7 +1536,7 @@ getReadyForQuery(PGconn *conn)
15331536
* message available, -1 if end of copy, -2 if error.
15341537
*/
15351538
static int
1536-
getCopyDataMessage(PGconn *conn)
1539+
getCopyDataMessage(PGconn *conn, char* kind)
15371540
{
15381541
char id;
15391542
int msgLength;
@@ -1584,6 +1587,7 @@ getCopyDataMessage(PGconn *conn)
15841587
* completeness.) Otherwise, if it's anything except Copy Data,
15851588
* report end-of-copy.
15861589
*/
1590+
*kind = id;
15871591
switch (id)
15881592
{
15891593
case 'A': /* NOTIFY */
@@ -1599,6 +1603,7 @@ getCopyDataMessage(PGconn *conn)
15991603
return 0;
16001604
break;
16011605
case 'd': /* Copy Data, pass it back to caller */
1606+
case 'z': /* Copy Compressed Data, pass it back to caller */
16021607
return msgLength;
16031608
case 'c':
16041609

@@ -1641,6 +1646,7 @@ int
16411646
pqGetCopyData3(PGconn *conn, char **buffer, int async)
16421647
{
16431648
int msgLength;
1649+
char id;
16441650

16451651
for (;;)
16461652
{
@@ -1649,7 +1655,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
16491655
* callers, we keep returning 0 until the next message is fully
16501656
* available, even if it is not Copy Data.
16511657
*/
1652-
msgLength = getCopyDataMessage(conn);
1658+
msgLength = getCopyDataMessage(conn, &id);
16531659
if (msgLength < 0)
16541660
return msgLength; /* end-of-copy or error */
16551661
if (msgLength == 0)
@@ -1671,20 +1677,55 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
16711677
msgLength -= 4;
16721678
if (msgLength > 0)
16731679
{
1674-
*buffer = (char *) malloc(msgLength + 1);
1675-
if (*buffer == NULL)
1676-
{
1677-
printfPQExpBuffer(&conn->errorMessage,
1678-
libpq_gettext("out of memory\n"));
1679-
return -2;
1680+
if (id == 'z') { /* compressed data */
1681+
int zip_len = msgLength -= 4;
1682+
int raw_len;
1683+
int len;
1684+
1685+
Assert(zip_len > 0);
1686+
1687+
if (pqGetInt(&raw_len, 4, conn))
1688+
return -2;
1689+
1690+
*buffer = (char *) malloc(raw_len + 1);
1691+
if (*buffer == NULL)
1692+
{
1693+
printfPQExpBuffer(&conn->errorMessage,
1694+
libpq_gettext("out of memory\n"));
1695+
return -2;
1696+
}
1697+
len = pglz_decompress(&conn->inBuffer[conn->inCursor], zip_len, *buffer, raw_len);
1698+
if (len != raw_len)
1699+
{
1700+
printfPQExpBuffer(&conn->errorMessage,
1701+
libpq_gettext("decompress error\n"));
1702+
return -2;
1703+
}
1704+
(*buffer)[raw_len] = '\0'; /* Add terminating null */
1705+
1706+
1707+
/* Mark message consumed */
1708+
conn->inStart = conn->inCursor + msgLength;
1709+
1710+
return raw_len;
16801711
}
1681-
memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
1682-
(*buffer)[msgLength] = '\0'; /* Add terminating null */
1712+
else
1713+
{
1714+
*buffer = (char *) malloc(msgLength + 1);
1715+
if (*buffer == NULL)
1716+
{
1717+
printfPQExpBuffer(&conn->errorMessage,
1718+
libpq_gettext("out of memory\n"));
1719+
return -2;
1720+
}
1721+
memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
1722+
(*buffer)[msgLength] = '\0'; /* Add terminating null */
16831723

1684-
/* Mark message consumed */
1685-
conn->inStart = conn->inCursor + msgLength;
1724+
/* Mark message consumed */
1725+
conn->inStart = conn->inCursor + msgLength;
16861726

1687-
return msgLength;
1727+
return msgLength;
1728+
}
16881729
}
16891730

16901731
/* Empty, so drop it and loop around for another */
@@ -1754,6 +1795,7 @@ pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
17541795
{
17551796
int msgLength;
17561797
int avail;
1798+
char id;
17571799

17581800
if (conn->asyncStatus != PGASYNC_COPY_OUT
17591801
&& conn->asyncStatus != PGASYNC_COPY_BOTH)
@@ -1765,7 +1807,7 @@ pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
17651807
* even if it is not Copy Data. This should keep PQendcopy from blocking.
17661808
* (Note: unlike pqGetCopyData3, we do not change asyncStatus here.)
17671809
*/
1768-
msgLength = getCopyDataMessage(conn);
1810+
msgLength = getCopyDataMessage(conn, &id);
17691811
if (msgLength < 0)
17701812
return -1; /* end-of-copy or error */
17711813
if (msgLength == 0)

0 commit comments

Comments
 (0)