Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 752aaff

Browse files
vigneshwaran-cCommitfest Bot
authored and
Commitfest Bot
committed
Add support for two-phase commit in pg_createsubscriber
This patch introduces the '--enable-two-phase' option to the 'pg_createsubscriber' utility, allowing users to enable two-phase commit for all subscriptions during their creation. By default, two-phase commit is disabled if the option is not provided. When two-phase commit is enabled, prepared transactions are sent to the subscriber at the time of 'PREPARE TRANSACTION', and they are processed as two-phase transactions on the subscriber as well. If disabled, prepared transactions are sent only when committed and are processed immediately by the subscriber.
1 parent 2421e9a commit 752aaff

File tree

3 files changed

+78
-30
lines changed

3 files changed

+78
-30
lines changed

doc/src/sgml/ref/pg_createsubscriber.sgml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,19 @@ PostgreSQL documentation
165165
</listitem>
166166
</varlistentry>
167167

168+
<varlistentry>
169+
<term><option>-T</option></term>
170+
<term><option>--enable-two-phase</option></term>
171+
<listitem>
172+
<para>
173+
Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
174+
commit for the subscription. When multiple databases are specified, this
175+
option applies uniformly to all subscriptions created on those databases.
176+
The default is <literal>false</literal>.
177+
</para>
178+
</listitem>
179+
</varlistentry>
180+
168181
<varlistentry>
169182
<term><option>-U <replaceable class="parameter">username</replaceable></option></term>
170183
<term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -300,7 +313,9 @@ PostgreSQL documentation
300313
greater than or equal to the number of specified databases. The target
301314
server must have <xref linkend="guc-max-worker-processes"/> configured to a
302315
value greater than the number of specified databases. The target server
303-
must accept local connections.
316+
must accept local connections. If you are planning to use the
317+
<option>--enable-two-phase</option> switch then you will also need to set
318+
the <xref linkend="guc-max-prepared-transactions"/> appropriately.
304319
</para>
305320

306321
<para>
@@ -360,6 +375,7 @@ PostgreSQL documentation
360375
</para>
361376

362377
<para>
378+
Unless the <option>--enable-two-phase</option> switch is specified,
363379
<application>pg_createsubscriber</application> sets up logical
364380
replication with two-phase commit disabled. This means that any
365381
prepared transactions will be replicated at the time

src/bin/pg_basebackup/pg_createsubscriber.c

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
3838
char *socket_dir; /* directory for Unix-domain socket, if any */
3939
char *sub_port; /* subscriber port number */
4040
const char *sub_username; /* subscriber username */
41+
bool two_phase; /* enable-two-phase option */
4142
SimpleStringList database_names; /* list of database names */
4243
SimpleStringList pub_names; /* list of publication names */
4344
SimpleStringList sub_names; /* list of subscription names */
@@ -58,6 +59,12 @@ struct LogicalRepInfo
5859
bool made_publication; /* publication was created */
5960
};
6061

62+
struct LogicalRepInfos
63+
{
64+
struct LogicalRepInfo *dbinfo;
65+
bool two_phase; /* enable-two-phase option */
66+
};
67+
6168
static void cleanup_objects_atexit(void);
6269
static void usage();
6370
static char *get_base_conninfo(const char *conninfo, char **dbname);
@@ -117,7 +124,7 @@ static bool dry_run = false;
117124

118125
static bool success = false;
119126

120-
static struct LogicalRepInfo *dbinfo;
127+
static struct LogicalRepInfos dbinfos;
121128
static int num_dbs = 0; /* number of specified databases */
122129
static int num_pubs = 0; /* number of specified publications */
123130
static int num_subs = 0; /* number of specified subscriptions */
@@ -172,17 +179,17 @@ cleanup_objects_atexit(void)
172179

173180
for (int i = 0; i < num_dbs; i++)
174181
{
175-
if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
182+
if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot)
176183
{
177184
PGconn *conn;
178185

179-
conn = connect_database(dbinfo[i].pubconninfo, false);
186+
conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false);
180187
if (conn != NULL)
181188
{
182-
if (dbinfo[i].made_publication)
183-
drop_publication(conn, &dbinfo[i]);
184-
if (dbinfo[i].made_replslot)
185-
drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
189+
if (dbinfos.dbinfo[i].made_publication)
190+
drop_publication(conn, &dbinfos.dbinfo[i]);
191+
if (dbinfos.dbinfo[i].made_replslot)
192+
drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
186193
disconnect_database(conn, false);
187194
}
188195
else
@@ -192,16 +199,18 @@ cleanup_objects_atexit(void)
192199
* that some objects were left on primary and should be
193200
* removed before trying again.
194201
*/
195-
if (dbinfo[i].made_publication)
202+
if (dbinfos.dbinfo[i].made_publication)
196203
{
197204
pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
198-
dbinfo[i].pubname, dbinfo[i].dbname);
205+
dbinfos.dbinfo[i].pubname,
206+
dbinfos.dbinfo[i].dbname);
199207
pg_log_warning_hint("Drop this publication before trying again.");
200208
}
201-
if (dbinfo[i].made_replslot)
209+
if (dbinfos.dbinfo[i].made_replslot)
202210
{
203211
pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
204-
dbinfo[i].replslotname, dbinfo[i].dbname);
212+
dbinfos.dbinfo[i].replslotname,
213+
dbinfos.dbinfo[i].dbname);
205214
pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
206215
}
207216
}
@@ -227,6 +236,7 @@ usage(void)
227236
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228237
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229238
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
239+
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
230240
printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231241
printf(_(" -v, --verbose output verbose messages\n"));
232242
printf(_(" --config-file=FILENAME use specified main server configuration\n"
@@ -479,9 +489,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
479489
dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
480490
dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
481491
dbinfo[i].pubconninfo);
482-
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
492+
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
483493
dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
484-
dbinfo[i].subconninfo);
494+
dbinfo[i].subconninfo,
495+
dbinfos.two_phase ? "true" : "false");
485496

486497
if (num_pubs > 0)
487498
pubcell = pubcell->next;
@@ -938,11 +949,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
938949
failed = true;
939950
}
940951

941-
if (max_prepared_transactions != 0)
952+
if (max_prepared_transactions != 0 && dbinfos.two_phase)
942953
{
943954
pg_log_warning("two_phase option will not be enabled for replication slots");
944955
pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
945956
"Prepared transactions will be replicated at COMMIT PREPARED.");
957+
pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
946958
}
947959

948960
/*
@@ -1345,8 +1357,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
13451357
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
13461358

13471359
appendPQExpBuffer(str,
1348-
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1349-
slot_name_esc);
1360+
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1361+
slot_name_esc,
1362+
dbinfos.two_phase ? "true" : "false");
13501363

13511364
PQfreemem(slot_name_esc);
13521365

@@ -1722,8 +1735,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
17221735
appendPQExpBuffer(str,
17231736
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
17241737
"WITH (create_slot = false, enabled = false, "
1725-
"slot_name = %s, copy_data = false)",
1726-
subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1738+
"slot_name = %s, copy_data = false, two_phase = %s)",
1739+
subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1740+
dbinfos.two_phase ? "true" : "false");
17271741

17281742
PQfreemem(pubname_esc);
17291743
PQfreemem(subname_esc);
@@ -1895,6 +1909,7 @@ main(int argc, char **argv)
18951909
{"publisher-server", required_argument, NULL, 'P'},
18961910
{"socketdir", required_argument, NULL, 's'},
18971911
{"recovery-timeout", required_argument, NULL, 't'},
1912+
{"enable-two-phase", no_argument, NULL, 'T'},
18981913
{"subscriber-username", required_argument, NULL, 'U'},
18991914
{"verbose", no_argument, NULL, 'v'},
19001915
{"version", no_argument, NULL, 'V'},
@@ -1950,6 +1965,7 @@ main(int argc, char **argv)
19501965
opt.socket_dir = NULL;
19511966
opt.sub_port = DEFAULT_SUB_PORT;
19521967
opt.sub_username = NULL;
1968+
opt.two_phase = false;
19531969
opt.database_names = (SimpleStringList)
19541970
{
19551971
0
@@ -1972,7 +1988,7 @@ main(int argc, char **argv)
19721988

19731989
get_restricted_token();
19741990

1975-
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1991+
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
19761992
long_options, &option_index)) != -1)
19771993
{
19781994
switch (c)
@@ -2009,6 +2025,9 @@ main(int argc, char **argv)
20092025
case 't':
20102026
opt.recovery_timeout = atoi(optarg);
20112027
break;
2028+
case 'T':
2029+
opt.two_phase = true;
2030+
break;
20122031
case 'U':
20132032
opt.sub_username = pg_strdup(optarg);
20142033
break;
@@ -2170,12 +2189,14 @@ main(int argc, char **argv)
21702189
/* Rudimentary check for a data directory */
21712190
check_data_directory(subscriber_dir);
21722191

2192+
dbinfos.two_phase = opt.two_phase;
2193+
21732194
/*
21742195
* Store database information for publisher and subscriber. It should be
21752196
* called before atexit() because its return is used in the
21762197
* cleanup_objects_atexit().
21772198
*/
2178-
dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2199+
dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
21792200

21802201
/* Register a function to clean up objects in case of failure */
21812202
atexit(cleanup_objects_atexit);
@@ -2184,7 +2205,7 @@ main(int argc, char **argv)
21842205
* Check if the subscriber data directory has the same system identifier
21852206
* than the publisher data directory.
21862207
*/
2187-
pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2208+
pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
21882209
sub_sysid = get_standby_sysid(subscriber_dir);
21892210
if (pub_sysid != sub_sysid)
21902211
pg_fatal("subscriber data directory is not a copy of the source database cluster");
@@ -2214,10 +2235,10 @@ main(int argc, char **argv)
22142235
start_standby_server(&opt, true, false);
22152236

22162237
/* Check if the standby server is ready for logical replication */
2217-
check_subscriber(dbinfo);
2238+
check_subscriber(dbinfos.dbinfo);
22182239

22192240
/* Check if the primary server is ready for logical replication */
2220-
check_publisher(dbinfo);
2241+
check_publisher(dbinfos.dbinfo);
22212242

22222243
/*
22232244
* Stop the target server. The recovery process requires that the server
@@ -2230,10 +2251,10 @@ main(int argc, char **argv)
22302251
stop_standby_server(subscriber_dir);
22312252

22322253
/* Create the required objects for each database on publisher */
2233-
consistent_lsn = setup_publisher(dbinfo);
2254+
consistent_lsn = setup_publisher(dbinfos.dbinfo);
22342255

22352256
/* Write the required recovery parameters */
2236-
setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2257+
setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
22372258

22382259
/*
22392260
* Start subscriber so the recovery parameters will take effect. Wait
@@ -2244,21 +2265,21 @@ main(int argc, char **argv)
22442265
start_standby_server(&opt, true, true);
22452266

22462267
/* Waiting the subscriber to be promoted */
2247-
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2268+
wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
22482269

22492270
/*
22502271
* Create the subscription for each database on subscriber. It does not
22512272
* enable it immediately because it needs to adjust the replication start
22522273
* point to the LSN reported by setup_publisher(). It also cleans up
22532274
* publications created by this tool and replication to the standby.
22542275
*/
2255-
setup_subscriber(dbinfo, consistent_lsn);
2276+
setup_subscriber(dbinfos.dbinfo, consistent_lsn);
22562277

22572278
/* Remove primary_slot_name if it exists on primary */
2258-
drop_primary_replication_slot(dbinfo, primary_slot_name);
2279+
drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
22592280

22602281
/* Remove failover replication slots if they exist on subscriber */
2261-
drop_failover_replication_slots(dbinfo);
2282+
drop_failover_replication_slots(dbinfos.dbinfo);
22622283

22632284
/* Stop the subscriber */
22642285
pg_log_info("stopping the subscriber");

src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ sub generate_db
373373

374374
# Run pg_createsubscriber on node S. --verbose is used twice
375375
# to show more information.
376+
# In passing, also test the --enable-two-phase option
376377
command_ok(
377378
[
378379
'pg_createsubscriber',
@@ -388,6 +389,7 @@ sub generate_db
388389
'--replication-slot' => 'replslot2',
389390
'--database' => $db1,
390391
'--database' => $db2,
392+
'--enable-two-phase'
391393
],
392394
'run pg_createsubscriber on node S');
393395

@@ -406,6 +408,15 @@ sub generate_db
406408
# Start subscriber
407409
$node_s->start;
408410

411+
# Verify that all subtwophase states are pending or enabled,
412+
# e.g. there are no subscriptions where subtwophase is disabled ('d')
413+
is( $node_s->safe_psql(
414+
'postgres',
415+
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
416+
),
417+
't',
418+
'subscriptions are created with the two-phase option enabled');
419+
409420
# Confirm the pre-existing subscription has been removed
410421
$result = $node_s->safe_psql(
411422
'postgres', qq(

0 commit comments

Comments
 (0)