Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 00b588c

Browse files
committed
Revert "Add walsender messages compression"
This reverts commit 765d063.
1 parent e8d1259 commit 00b588c

File tree

4 files changed

+17
-100
lines changed

4 files changed

+17
-100
lines changed

src/backend/replication/walsender.c

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,12 @@
6060
#include "catalog/pg_type.h"
6161
#include "commands/dbcommands.h"
6262
#include "commands/defrem.h"
63-
#include "common/pg_lzcompress.h"
64-
#include "common/fe_memutils.h"
6563
#include "funcapi.h"
6664
#include "libpq/libpq.h"
6765
#include "libpq/pqformat.h"
6866
#include "miscadmin.h"
6967
#include "nodes/replnodes.h"
7068
#include "pgstat.h"
71-
#include "port/pg_bswap.h"
7269
#include "replication/basebackup.h"
7370
#include "replication/decode.h"
7471
#include "replication/logical.h"
@@ -98,9 +95,7 @@
9895
#include "utils/timestamp.h"
9996

10097
#define ERROR_MESSAGE_SIZE 1024
101-
#define MIN_COMPRESSION_RATIO 2
102-
#define MIN_COMPRESSION_SIZE 32
103-
#define MSG_COMPRESS_HDR_SIZE 4
98+
10499
/*
105100
* Shared structure to repair a corrupted block.
106101
*
@@ -165,9 +160,6 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
165160
* data message */
166161
bool log_replication_commands = false;
167162

168-
169-
bool wal_sender_compression = false;
170-
171163
/*
172164
* State for WalSndWakeupRequest
173165
*/
@@ -1200,29 +1192,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12001192
TimestampTz now;
12011193

12021194
/* output previously gathered data in a CopyData packet */
1203-
if (wal_sender_compression && ctx->out->len > MIN_COMPRESSION_SIZE)
1204-
{
1205-
static char* lzbuf;
1206-
static int lzbuf_size;
1207-
int zip_len;
1208-
int raw_len = ctx->out->len;
1209-
int max_zip_len = PGLZ_MAX_OUTPUT(raw_len) + MSG_COMPRESS_HDR_SIZE;
1210-
if (max_zip_len > lzbuf_size) {
1211-
lzbuf_size = lzbuf_size*2 > max_zip_len ? lzbuf_size*2 : max_zip_len;
1212-
lzbuf = realloc(lzbuf, lzbuf_size);
1213-
}
1214-
zip_len = pglz_compress(ctx->out->data, raw_len, lzbuf + MSG_COMPRESS_HDR_SIZE, PGLZ_strategy_default);
1215-
if (zip_len > 0 && zip_len < raw_len/MIN_COMPRESSION_RATIO)
1216-
{
1217-
uint32 net_size = htonl(raw_len);
1218-
memcpy(lzbuf, &net_size, sizeof(net_size));
1219-
pq_putmessage_noblock('z', lzbuf, zip_len + MSG_COMPRESS_HDR_SIZE);
1220-
elog(LOG, "WRITE compressed message with length %d raw_len=%d", zip_len, raw_len);
1221-
}
1222-
else
1223-
pq_putmessage_noblock('d', ctx->out->data, raw_len);
1224-
} else
1225-
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1195+
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
12261196

12271197
/*
12281198
* Fill the send timestamp last, so that it is taken as late as possible.

src/backend/utils/misc/guc.c

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,16 +1076,6 @@ static struct config_bool ConfigureNamesBool[] =
10761076
NULL, NULL, NULL
10771077
},
10781078

1079-
{
1080-
{"wal_sender_compression", PGC_SUSET, WAL_SETTINGS,
1081-
gettext_noop("Compress logical replicastion messages."),
1082-
NULL
1083-
},
1084-
&wal_sender_compression,
1085-
false,
1086-
NULL, NULL, NULL
1087-
},
1088-
10891079
{
10901080
{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
10911081
gettext_noop("Logs each checkpoint."),

src/include/replication/walsender.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ extern bool wake_wal_senders;
4141
extern PGDLLIMPORT int max_wal_senders;
4242
extern int wal_sender_timeout;
4343
extern bool log_replication_commands;
44-
extern bool wal_sender_compression;
4544

4645
extern void InitWalSender(void);
4746
extern bool exec_replication_command(const char *query_string);

src/interfaces/libpq/fe-protocol3.c

Lines changed: 15 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,13 @@
3333
#include <arpa/inet.h>
3434
#endif
3535

36-
#include "common/pg_lzcompress.h"
37-
#include "utils/elog.h"
3836

3937
/*
4038
* This macro lists the backend message types that could be "long" (more
4139
* than a couple of kilobytes).
4240
*/
4341
#define VALID_LONG_MESSAGE_TYPE(id) \
44-
((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'z' || (id) == 'V' || \
42+
((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'V' || \
4543
(id) == 'E' || (id) == 'N' || (id) == 'A')
4644

4745

@@ -391,7 +389,6 @@ pqParseInput3(PGconn *conn)
391389
conn->copy_already_done = 0;
392390
break;
393391
case 'd': /* Copy Data */
394-
case 'z': /* Copy Compressed Data */
395392

396393
/*
397394
* If we see Copy Data, just silently drop it. This would
@@ -1536,7 +1533,7 @@ getReadyForQuery(PGconn *conn)
15361533
* message available, -1 if end of copy, -2 if error.
15371534
*/
15381535
static int
1539-
getCopyDataMessage(PGconn *conn, char* kind)
1536+
getCopyDataMessage(PGconn *conn)
15401537
{
15411538
char id;
15421539
int msgLength;
@@ -1587,7 +1584,6 @@ getCopyDataMessage(PGconn *conn, char* kind)
15871584
* completeness.) Otherwise, if it's anything except Copy Data,
15881585
* report end-of-copy.
15891586
*/
1590-
*kind = id;
15911587
switch (id)
15921588
{
15931589
case 'A': /* NOTIFY */
@@ -1603,7 +1599,6 @@ getCopyDataMessage(PGconn *conn, char* kind)
16031599
return 0;
16041600
break;
16051601
case 'd': /* Copy Data, pass it back to caller */
1606-
case 'z': /* Copy Compressed Data, pass it back to caller */
16071602
return msgLength;
16081603
case 'c':
16091604

@@ -1646,7 +1641,6 @@ int
16461641
pqGetCopyData3(PGconn *conn, char **buffer, int async)
16471642
{
16481643
int msgLength;
1649-
char id;
16501644

16511645
for (;;)
16521646
{
@@ -1655,7 +1649,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
16551649
* callers, we keep returning 0 until the next message is fully
16561650
* available, even if it is not Copy Data.
16571651
*/
1658-
msgLength = getCopyDataMessage(conn, &id);
1652+
msgLength = getCopyDataMessage(conn);
16591653
if (msgLength < 0)
16601654
return msgLength; /* end-of-copy or error */
16611655
if (msgLength == 0)
@@ -1677,55 +1671,20 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
16771671
msgLength -= 4;
16781672
if (msgLength > 0)
16791673
{
1680-
if (id == 'z') { /* compressed data */
1681-
int zip_len = msgLength -= 4;
1682-
int raw_len;
1683-
int len;
1684-
1685-
Assert(zip_len > 0);
1686-
1687-
if (pqGetInt(&raw_len, 4, conn))
1688-
return -2;
1689-
1690-
*buffer = (char *) malloc(raw_len + 1);
1691-
if (*buffer == NULL)
1692-
{
1693-
printfPQExpBuffer(&conn->errorMessage,
1694-
libpq_gettext("out of memory\n"));
1695-
return -2;
1696-
}
1697-
len = pglz_decompress(&conn->inBuffer[conn->inCursor], zip_len, *buffer, raw_len);
1698-
if (len != raw_len)
1699-
{
1700-
printfPQExpBuffer(&conn->errorMessage,
1701-
libpq_gettext("decompress error\n"));
1702-
return -2;
1703-
}
1704-
(*buffer)[raw_len] = '\0'; /* Add terminating null */
1705-
1706-
1707-
/* Mark message consumed */
1708-
conn->inStart = conn->inCursor + msgLength;
1709-
1710-
return raw_len;
1711-
}
1712-
else
1674+
*buffer = (char *) malloc(msgLength + 1);
1675+
if (*buffer == NULL)
17131676
{
1714-
*buffer = (char *) malloc(msgLength + 1);
1715-
if (*buffer == NULL)
1716-
{
1717-
printfPQExpBuffer(&conn->errorMessage,
1718-
libpq_gettext("out of memory\n"));
1719-
return -2;
1720-
}
1721-
memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
1722-
(*buffer)[msgLength] = '\0'; /* Add terminating null */
1677+
printfPQExpBuffer(&conn->errorMessage,
1678+
libpq_gettext("out of memory\n"));
1679+
return -2;
1680+
}
1681+
memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
1682+
(*buffer)[msgLength] = '\0'; /* Add terminating null */
17231683

1724-
/* Mark message consumed */
1725-
conn->inStart = conn->inCursor + msgLength;
1684+
/* Mark message consumed */
1685+
conn->inStart = conn->inCursor + msgLength;
17261686

1727-
return msgLength;
1728-
}
1687+
return msgLength;
17291688
}
17301689

17311690
/* Empty, so drop it and loop around for another */
@@ -1795,7 +1754,6 @@ pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
17951754
{
17961755
int msgLength;
17971756
int avail;
1798-
char id;
17991757

18001758
if (conn->asyncStatus != PGASYNC_COPY_OUT
18011759
&& conn->asyncStatus != PGASYNC_COPY_BOTH)
@@ -1807,7 +1765,7 @@ pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
18071765
* even if it is not Copy Data. This should keep PQendcopy from blocking.
18081766
* (Note: unlike pqGetCopyData3, we do not change asyncStatus here.)
18091767
*/
1810-
msgLength = getCopyDataMessage(conn, &id);
1768+
msgLength = getCopyDataMessage(conn);
18111769
if (msgLength < 0)
18121770
return -1; /* end-of-copy or error */
18131771
if (msgLength == 0)

0 commit comments

Comments
 (0)