Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9f2213a

Browse files
author
Amit Kapila
committed
Allow the logical_replication_mode to be used on the subscriber.
Extend the existing developer option 'logical_replication_mode' to help test the parallel apply of large transactions on the subscriber. When set to 'buffered', the leader sends changes to parallel apply workers via a shared memory queue. When set to 'immediate', the leader serializes all changes to files and notifies the parallel apply workers to read and apply them at the end of the transaction. This helps in adding tests to cover the serialization code path in parallel streaming mode. Author: Hou Zhijie Reviewed-by: Peter Smith, Kuroda Hayato, Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
1 parent fb1a59d commit 9f2213a

File tree

6 files changed

+172
-20
lines changed

6 files changed

+172
-20
lines changed

doc/src/sgml/config.sgml

+24-11
Original file line numberDiff line numberDiff line change
@@ -11701,22 +11701,35 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
1170111701
</term>
1170211702
<listitem>
1170311703
<para>
11704-
Allows streaming or serializing changes immediately in logical decoding.
11705-
The allowed values of <varname>logical_replication_mode</varname> are
11706-
<literal>buffered</literal> and <literal>immediate</literal>. When set
11707-
to <literal>immediate</literal>, stream each change if
11704+
The allowed values are <literal>buffered</literal> and
11705+
<literal>immediate</literal>. The default is <literal>buffered</literal>.
11706+
This parameter is intended to be used to test logical decoding and
11707+
replication of large transactions. The effect of
11708+
<varname>logical_replication_mode</varname> is different for the
11709+
publisher and subscriber:
11710+
</para>
11711+
11712+
<para>
11713+
On the publisher side, <varname>logical_replication_mode</varname>
11714+
allows streaming or serializing changes immediately in logical decoding.
11715+
When set to <literal>immediate</literal>, stream each change if the
1170811716
<literal>streaming</literal> option (see optional parameters set by
1170911717
<link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>)
1171011718
is enabled, otherwise, serialize each change. When set to
11711-
<literal>buffered</literal>, which is the default, decoding will stream
11712-
or serialize changes when <varname>logical_decoding_work_mem</varname>
11713-
is reached.
11719+
<literal>buffered</literal>, the decoding will stream or serialize
11720+
changes when <varname>logical_decoding_work_mem</varname> is reached.
1171411721
</para>
11722+
1171511723
<para>
11716-
This parameter is intended to be used to test logical decoding and
11717-
replication of large transactions for which otherwise we need to
11718-
generate the changes till <varname>logical_decoding_work_mem</varname>
11719-
is reached.
11724+
On the subscriber side, if the <literal>streaming</literal> option is set to
11725+
<literal>parallel</literal>, <varname>logical_replication_mode</varname>
11726+
can be used to direct the leader apply worker to send changes to the
11727+
shared memory queue or to serialize all changes to the file. When set to
11728+
<literal>buffered</literal>, the leader sends changes to parallel apply
11729+
workers via a shared memory queue. When set to
11730+
<literal>immediate</literal>, the leader serializes all changes to files
11731+
and notifies the parallel apply workers to read and apply them at the
11732+
end of the transaction.
1172011733
</para>
1172111734
</listitem>
1172211735
</varlistentry>

src/backend/replication/logical/applyparallelworker.c

+11-5
Original file line numberDiff line numberDiff line change
@@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
11491149
Assert(!IsTransactionState());
11501150
Assert(!winfo->serialize_changes);
11511151

1152+
/*
1153+
* We don't try to send data to parallel worker for 'immediate' mode. This
1154+
* is primarily used for testing purposes.
1155+
*/
1156+
if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
1157+
return false;
1158+
11521159
/*
11531160
* This timeout is a bit arbitrary but testing revealed that it is sufficient
11541161
* to send the message unless the parallel apply worker is waiting on some
@@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
11871194
startTime = GetCurrentTimestamp();
11881195
else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
11891196
SHM_SEND_TIMEOUT_MS))
1190-
{
1191-
ereport(LOG,
1192-
(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1193-
winfo->shared->xid)));
11941197
return false;
1195-
}
11961198
}
11971199
}
11981200

@@ -1206,6 +1208,10 @@ void
12061208
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
12071209
bool stream_locked)
12081210
{
1211+
ereport(LOG,
1212+
(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1213+
winfo->shared->xid)));
1214+
12091215
/*
12101216
* The parallel apply worker could be stuck for some reason (say waiting
12111217
* on some lock by other backend), so stop trying to send data directly to

src/backend/utils/misc/guc_tables.c

+4-2
Original file line numberDiff line numberDiff line change
@@ -4920,8 +4920,10 @@ struct config_enum ConfigureNamesEnum[] =
49204920

49214921
{
49224922
{"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
4923-
gettext_noop("Controls when to replicate each change."),
4924-
gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."),
4923+
gettext_noop("Controls when to replicate or apply each change."),
4924+
gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. "
4925+
"On the subscriber, it allows serialization of all changes to files and notifies the "
4926+
"parallel apply workers to read and apply them at the end of the transaction."),
49254927
GUC_NOT_IN_SAMPLE
49264928
},
49274929
&logical_replication_mode,

src/test/subscription/t/015_stream.pl

+28
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,34 @@ sub test_streaming
312312
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
313313
is($result, qq(10000), 'data replicated to subscriber after dropping index');
314314

315+
# Test serializing changes to files and notify the parallel apply worker to
316+
# apply them at the end of the transaction.
317+
$node_subscriber->append_conf('postgresql.conf',
318+
'logical_replication_mode = immediate');
319+
# Reset the log_min_messages to default.
320+
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
321+
$node_subscriber->reload;
322+
323+
# Run a query to make sure that the reload has taken effect.
324+
$node_subscriber->safe_psql('postgres', q{SELECT 1});
325+
326+
$offset = -s $node_subscriber->logfile;
327+
328+
$node_publisher->safe_psql('postgres',
329+
"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
330+
331+
# Ensure that the changes are serialized.
332+
$node_subscriber->wait_for_log(
333+
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
334+
$offset);
335+
336+
$node_publisher->wait_for_catchup($appname);
337+
338+
# Check that transaction is committed on subscriber
339+
$result =
340+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
341+
is($result, qq(15000), 'parallel apply worker replayed all changes from file');
342+
315343
$node_subscriber->stop;
316344
$node_publisher->stop;
317345

src/test/subscription/t/018_stream_subxact_abort.pl

+60-1
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,17 @@ sub test_streaming
143143
"CREATE TABLE test_tab (a int primary key, b varchar)");
144144
$node_publisher->safe_psql('postgres',
145145
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
146+
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
146147

147148
# Setup structure on subscriber
148149
$node_subscriber->safe_psql('postgres',
149150
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
151+
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
150152

151153
# Setup logical replication
152154
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
153155
$node_publisher->safe_psql('postgres',
154-
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
156+
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
155157

156158
my $appname = 'tap_sub';
157159

@@ -198,6 +200,63 @@ sub test_streaming
198200

199201
test_streaming($node_publisher, $node_subscriber, $appname, 1);
200202

203+
# Test serializing changes to files and notify the parallel apply worker to
204+
# apply them at the end of the transaction.
205+
$node_subscriber->append_conf('postgresql.conf',
206+
'logical_replication_mode = immediate');
207+
# Reset the log_min_messages to default.
208+
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
209+
$node_subscriber->reload;
210+
211+
# Run a query to make sure that the reload has taken effect.
212+
$node_subscriber->safe_psql('postgres', q{SELECT 1});
213+
214+
my $offset = -s $node_subscriber->logfile;
215+
216+
$node_publisher->safe_psql(
217+
'postgres', q{
218+
BEGIN;
219+
INSERT INTO test_tab_2 values(1);
220+
ROLLBACK;
221+
});
222+
223+
# Ensure that the changes are serialized.
224+
$node_subscriber->wait_for_log(
225+
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
226+
$offset);
227+
228+
$node_publisher->wait_for_catchup($appname);
229+
230+
# Check that transaction is aborted on subscriber
231+
$result =
232+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
233+
is($result, qq(0), 'check rollback was reflected on subscriber');
234+
235+
# Serialize the ABORT sub-transaction.
236+
$offset = -s $node_subscriber->logfile;
237+
238+
$node_publisher->safe_psql(
239+
'postgres', q{
240+
BEGIN;
241+
INSERT INTO test_tab_2 values(1);
242+
SAVEPOINT sp;
243+
INSERT INTO test_tab_2 values(1);
244+
ROLLBACK TO sp;
245+
COMMIT;
246+
});
247+
248+
# Ensure that the changes are serialized.
249+
$node_subscriber->wait_for_log(
250+
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
251+
$offset);
252+
253+
$node_publisher->wait_for_catchup($appname);
254+
255+
# Check that only sub-transaction is aborted on subscriber.
256+
$result =
257+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
258+
is($result, qq(1), 'check rollback to savepoint was reflected on subscriber');
259+
201260
$node_subscriber->stop;
202261
$node_publisher->stop;
203262

src/test/subscription/t/023_twophase_stream.pl

+45-1
Original file line numberDiff line numberDiff line change
@@ -319,16 +319,18 @@ sub test_streaming
319319
"CREATE TABLE test_tab (a int primary key, b varchar)");
320320
$node_publisher->safe_psql('postgres',
321321
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
322+
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
322323

323324
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
324325
$node_subscriber->safe_psql('postgres',
325326
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
326327
);
328+
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
327329

328330
# Setup logical replication (streaming = on)
329331
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
330332
$node_publisher->safe_psql('postgres',
331-
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
333+
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
332334

333335
my $appname = 'tap_sub';
334336

@@ -384,6 +386,48 @@ sub test_streaming
384386

385387
test_streaming($node_publisher, $node_subscriber, $appname, 1);
386388

389+
# Test serializing changes to files and notify the parallel apply worker to
390+
# apply them at the end of the transaction.
391+
$node_subscriber->append_conf('postgresql.conf',
392+
'logical_replication_mode = immediate');
393+
# Reset the log_min_messages to default.
394+
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
395+
$node_subscriber->reload;
396+
397+
# Run a query to make sure that the reload has taken effect.
398+
$node_subscriber->safe_psql('postgres', q{SELECT 1});
399+
400+
my $offset = -s $node_subscriber->logfile;
401+
402+
$node_publisher->safe_psql(
403+
'postgres', q{
404+
BEGIN;
405+
INSERT INTO test_tab_2 values(1);
406+
PREPARE TRANSACTION 'xact';
407+
});
408+
409+
# Ensure that the changes are serialized.
410+
$node_subscriber->wait_for_log(
411+
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
412+
$offset);
413+
414+
$node_publisher->wait_for_catchup($appname);
415+
416+
# Check that transaction is in prepared state on subscriber
417+
$result = $node_subscriber->safe_psql('postgres',
418+
"SELECT count(*) FROM pg_prepared_xacts;");
419+
is($result, qq(1), 'transaction is prepared on subscriber');
420+
421+
# Check that 2PC gets committed on subscriber
422+
$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
423+
424+
$node_publisher->wait_for_catchup($appname);
425+
426+
# Check that transaction is committed on subscriber
427+
$result =
428+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
429+
is($result, qq(1), 'transaction is committed on subscriber');
430+
387431
###############################
388432
# check all the cleanup
389433
###############################

0 commit comments

Comments
 (0)