Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5de94a0

Browse files
author
Amit Kapila
committed
Add 'logical_decoding_mode' GUC.
This enables streaming or serializing changes immediately in logical decoding. This parameter is intended to be used to test logical decoding and replication of large transactions for which otherwise we need to generate the changes till logical_decoding_work_mem is reached. This helps in reducing the timing of existing tests related to logical replication of in-progress transactions and will help in writing tests for for the upcoming feature for parallelly applying large in-progress transactions. Author: Shi yu Reviewed-by: Sawada Masahiko, Shveta Mallik, Amit Kapila, Dilip Kumar, Kuroda Hayato, Kyotaro Horiguchi Discussion: https://postgr.es/m/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent d3c0cc4 commit 5de94a0

File tree

9 files changed

+134
-67
lines changed

9 files changed

+134
-67
lines changed

doc/src/sgml/config.sgml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11597,6 +11597,34 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
1159711597
</listitem>
1159811598
</varlistentry>
1159911599

11600+
<varlistentry id="guc-logical-decoding-mode" xreflabel="logical_decoding_mode">
11601+
<term><varname>logical_decoding_mode</varname> (<type>enum</type>)
11602+
<indexterm>
11603+
<primary><varname>logical_decoding_mode</varname> configuration parameter</primary>
11604+
</indexterm>
11605+
</term>
11606+
<listitem>
11607+
<para>
11608+
Allows streaming or serializing changes immediately in logical decoding.
11609+
The allowed values of <varname>logical_decoding_mode</varname> are
11610+
<literal>buffered</literal> and <literal>immediate</literal>. When set
11611+
to <literal>immediate</literal>, stream each change if
11612+
<literal>streaming</literal> option (see optional parameters set by
11613+
<link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>)
11614+
is enabled, otherwise, serialize each change. When set to
11615+
<literal>buffered</literal>, which is the default, decoding will stream
11616+
or serialize changes when <varname>logical_decoding_work_mem</varname>
11617+
is reached.
11618+
</para>
11619+
<para>
11620+
This parameter is intended to be used to test logical decoding and
11621+
replication of large transactions for which otherwise we need to
11622+
generate the changes till <varname>logical_decoding_work_mem</varname>
11623+
is reached.
11624+
</para>
11625+
</listitem>
11626+
</varlistentry>
11627+
1160011628
</variablelist>
1160111629
</sect1>
1160211630
<sect1 id="runtime-config-short">

src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
209209
int logical_decoding_work_mem;
210210
static const Size max_changes_in_memory = 4096; /* XXX for restore only */
211211

212+
/* GUC variable */
213+
int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED;
214+
212215
/* ---------------------------------------
213216
* primary reorderbuffer support routines
214217
* ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
35403543
/*
35413544
* Check whether the logical_decoding_work_mem limit was reached, and if yes
35423545
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
3543-
* disk until we reach under the memory limit.
3546+
* disk or send to the output plugin until we reach under the memory limit.
3547+
*
3548+
* If logical_decoding_mode is set to "immediate", stream or serialize the changes
3549+
* immediately.
35443550
*
35453551
* XXX At this point we select the transactions until we reach under the memory
35463552
* limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
35523558
{
35533559
ReorderBufferTXN *txn;
35543560

3555-
/* bail out if we haven't exceeded the memory limit */
3556-
if (rb->size < logical_decoding_work_mem * 1024L)
3561+
/*
3562+
* Bail out if logical_decoding_mode is buffered and we haven't exceeded
3563+
* the memory limit.
3564+
*/
3565+
if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED &&
3566+
rb->size < logical_decoding_work_mem * 1024L)
35573567
return;
35583568

35593569
/*
3560-
* Loop until we reach under the memory limit. One might think that just
3561-
* by evicting the largest (sub)transaction we will come under the memory
3562-
* limit based on assumption that the selected transaction is at least as
3563-
* large as the most recent change (which caused us to go over the memory
3564-
* limit). However, that is not true because a user can reduce the
3565-
* logical_decoding_work_mem to a smaller value before the most recent
3570+
* If logical_decoding_mode is immediate, loop until there's no change.
3571+
* Otherwise, loop until we reach under the memory limit. One might think
3572+
* that just by evicting the largest (sub)transaction we will come under
3573+
* the memory limit based on assumption that the selected transaction is
3574+
* at least as large as the most recent change (which caused us to go over
3575+
* the memory limit). However, that is not true because a user can reduce
3576+
* the logical_decoding_work_mem to a smaller value before the most recent
35663577
* change.
35673578
*/
3568-
while (rb->size >= logical_decoding_work_mem * 1024L)
3579+
while (rb->size >= logical_decoding_work_mem * 1024L ||
3580+
(logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE &&
3581+
rb->size > 0))
35693582
{
35703583
/*
35713584
* Pick the largest transaction (or subtransaction) and evict it from

src/backend/utils/misc/guc_tables.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
395395
{NULL, 0, false}
396396
};
397397

398+
static const struct config_enum_entry logical_decoding_mode_options[] = {
399+
{"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
400+
{"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
401+
{NULL, 0, false}
402+
};
403+
398404
StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
399405
"array length mismatch");
400406

@@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] =
48774883
NULL, NULL, NULL
48784884
},
48794885

4886+
{
4887+
{"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS,
4888+
gettext_noop("Allows streaming or serializing each change in logical decoding."),
4889+
NULL,
4890+
GUC_NOT_IN_SAMPLE
4891+
},
4892+
&logical_decoding_mode,
4893+
LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options,
4894+
NULL, NULL, NULL
4895+
},
4896+
48804897
/* End-of-list marker */
48814898
{
48824899
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL

src/include/replication/reorderbuffer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@
1818
#include "utils/timestamp.h"
1919

2020
extern PGDLLIMPORT int logical_decoding_work_mem;
21+
extern PGDLLIMPORT int logical_decoding_mode;
22+
23+
/* possible values for logical_decoding_mode */
24+
typedef enum
25+
{
26+
LOGICAL_DECODING_MODE_BUFFERED,
27+
LOGICAL_DECODING_MODE_IMMEDIATE
28+
} LogicalDecodingMode;
2129

2230
/* an individual tuple, stored in one chunk of memory */
2331
typedef struct ReorderBufferTupleBuf

src/test/subscription/t/016_stream_subxact.pl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming of large transaction containing large subtransactions
4+
# Test streaming of transaction containing subtransactions
55
use strict;
66
use warnings;
77
use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@
1212
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1313
$node_publisher->init(allows_streaming => 'logical');
1414
$node_publisher->append_conf('postgresql.conf',
15-
'logical_decoding_work_mem = 64kB');
15+
'logical_decoding_mode = immediate');
1616
$node_publisher->start;
1717

1818
# Create subscriber node
@@ -49,27 +49,27 @@
4949
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
5050
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
5151

52-
# Insert, update and delete enough rows to exceed 64kB limit.
52+
# Insert, update and delete some rows.
5353
$node_publisher->safe_psql(
5454
'postgres', q{
5555
BEGIN;
56-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i);
56+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
5757
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
5858
DELETE FROM test_tab WHERE mod(a,3) = 0;
5959
SAVEPOINT s1;
60-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
60+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
6161
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
6262
DELETE FROM test_tab WHERE mod(a,3) = 0;
6363
SAVEPOINT s2;
64-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
64+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
6565
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
6666
DELETE FROM test_tab WHERE mod(a,3) = 0;
6767
SAVEPOINT s3;
68-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
68+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
6969
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
7070
DELETE FROM test_tab WHERE mod(a,3) = 0;
7171
SAVEPOINT s4;
72-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
72+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
7373
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
7474
DELETE FROM test_tab WHERE mod(a,3) = 0;
7575
COMMIT;
@@ -80,7 +80,7 @@
8080
$result =
8181
$node_subscriber->safe_psql('postgres',
8282
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
83-
is($result, qq(1667|1667|1667),
83+
is($result, qq(12|12|12),
8484
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
8585
);
8686

src/test/subscription/t/018_stream_subxact_abort.pl

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming of large transaction containing multiple subtransactions and rollbacks
4+
# Test streaming of transaction containing multiple subtransactions and rollbacks
55
use strict;
66
use warnings;
77
use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@
1212
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1313
$node_publisher->init(allows_streaming => 'logical');
1414
$node_publisher->append_conf('postgresql.conf',
15-
'logical_decoding_work_mem = 64kB');
15+
'logical_decoding_mode = immediate');
1616
$node_publisher->start;
1717

1818
# Create subscriber node
@@ -48,25 +48,25 @@
4848
"SELECT count(*), count(c) FROM test_tab");
4949
is($result, qq(2|0), 'check initial data was copied to subscriber');
5050

51-
# large (streamed) transaction with DDL, DML and ROLLBACKs
51+
# streamed transaction with DDL, DML and ROLLBACKs
5252
$node_publisher->safe_psql(
5353
'postgres', q{
5454
BEGIN;
55-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
55+
INSERT INTO test_tab VALUES (3, md5(3::text));
5656
SAVEPOINT s1;
57-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
57+
INSERT INTO test_tab VALUES (4, md5(4::text));
5858
SAVEPOINT s2;
59-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
59+
INSERT INTO test_tab VALUES (5, md5(5::text));
6060
SAVEPOINT s3;
61-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
61+
INSERT INTO test_tab VALUES (6, md5(6::text));
6262
ROLLBACK TO s2;
63-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
63+
INSERT INTO test_tab VALUES (7, md5(7::text));
6464
ROLLBACK TO s1;
65-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
65+
INSERT INTO test_tab VALUES (8, md5(8::text));
6666
SAVEPOINT s4;
67-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
67+
INSERT INTO test_tab VALUES (9, md5(9::text));
6868
SAVEPOINT s5;
69-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
69+
INSERT INTO test_tab VALUES (10, md5(10::text));
7070
COMMIT;
7171
});
7272

@@ -75,24 +75,24 @@
7575
$result =
7676
$node_subscriber->safe_psql('postgres',
7777
"SELECT count(*), count(c) FROM test_tab");
78-
is($result, qq(2000|0),
78+
is($result, qq(6|0),
7979
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
8080
);
8181

82-
# large (streamed) transaction with subscriber receiving out of order
83-
# subtransaction ROLLBACKs
82+
# streamed transaction with subscriber receiving out of order subtransaction
83+
# ROLLBACKs
8484
$node_publisher->safe_psql(
8585
'postgres', q{
8686
BEGIN;
87-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
87+
INSERT INTO test_tab VALUES (11, md5(11::text));
8888
SAVEPOINT s1;
89-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
89+
INSERT INTO test_tab VALUES (12, md5(12::text));
9090
SAVEPOINT s2;
91-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
91+
INSERT INTO test_tab VALUES (13, md5(13::text));
9292
SAVEPOINT s3;
93-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
93+
INSERT INTO test_tab VALUES (14, md5(14::text));
9494
RELEASE s2;
95-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
95+
INSERT INTO test_tab VALUES (15, md5(15::text));
9696
ROLLBACK TO s1;
9797
COMMIT;
9898
});
@@ -102,18 +102,18 @@
102102
$result =
103103
$node_subscriber->safe_psql('postgres',
104104
"SELECT count(*), count(c) FROM test_tab");
105-
is($result, qq(2500|0),
105+
is($result, qq(7|0),
106106
'check rollback to savepoint was reflected on subscriber');
107107

108-
# large (streamed) transaction with subscriber receiving rollback
108+
# streamed transaction with subscriber receiving rollback
109109
$node_publisher->safe_psql(
110110
'postgres', q{
111111
BEGIN;
112-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
112+
INSERT INTO test_tab VALUES (16, md5(16::text));
113113
SAVEPOINT s1;
114-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
114+
INSERT INTO test_tab VALUES (17, md5(17::text));
115115
SAVEPOINT s2;
116-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
116+
INSERT INTO test_tab VALUES (18, md5(18::text));
117117
ROLLBACK;
118118
});
119119

@@ -122,7 +122,7 @@
122122
$result =
123123
$node_subscriber->safe_psql('postgres',
124124
"SELECT count(*), count(c) FROM test_tab");
125-
is($result, qq(2500|0), 'check rollback was reflected on subscriber');
125+
is($result, qq(7|0), 'check rollback was reflected on subscriber');
126126

127127
$node_subscriber->stop;
128128
$node_publisher->stop;

src/test/subscription/t/019_stream_subxact_ddl_abort.pl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
4+
# Test streaming of transaction with subtransactions, DDLs, DMLs, and
55
# rollbacks
66
use strict;
77
use warnings;
@@ -13,7 +13,7 @@
1313
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1414
$node_publisher->init(allows_streaming => 'logical');
1515
$node_publisher->append_conf('postgresql.conf',
16-
'logical_decoding_work_mem = 64kB');
16+
'logical_decoding_mode = immediate');
1717
$node_publisher->start;
1818

1919
# Create subscriber node
@@ -49,23 +49,23 @@
4949
"SELECT count(*), count(c) FROM test_tab");
5050
is($result, qq(2|0), 'check initial data was copied to subscriber');
5151

52-
# large (streamed) transaction with DDL, DML and ROLLBACKs
52+
# streamed transaction with DDL, DML and ROLLBACKs
5353
$node_publisher->safe_psql(
5454
'postgres', q{
5555
BEGIN;
56-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
56+
INSERT INTO test_tab VALUES (3, md5(3::text));
5757
ALTER TABLE test_tab ADD COLUMN c INT;
5858
SAVEPOINT s1;
59-
INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
59+
INSERT INTO test_tab VALUES (4, md5(4::text), -4);
6060
ALTER TABLE test_tab ADD COLUMN d INT;
6161
SAVEPOINT s2;
62-
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
62+
INSERT INTO test_tab VALUES (5, md5(5::text), -5, 5*2);
6363
ALTER TABLE test_tab ADD COLUMN e INT;
6464
SAVEPOINT s3;
65-
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
65+
INSERT INTO test_tab VALUES (6, md5(6::text), -6, 6*2, -6*3);
6666
ALTER TABLE test_tab DROP COLUMN c;
6767
ROLLBACK TO s1;
68-
INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
68+
INSERT INTO test_tab VALUES (4, md5(4::text), 4);
6969
COMMIT;
7070
});
7171

@@ -74,7 +74,7 @@
7474
$result =
7575
$node_subscriber->safe_psql('postgres',
7676
"SELECT count(*), count(c) FROM test_tab");
77-
is($result, qq(1000|500),
77+
is($result, qq(4|1),
7878
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
7979
);
8080

0 commit comments

Comments
 (0)