Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit cda03cf

Browse files
author
Amit Kapila
committed
Allow enabling two-phase option via replication protocol.
Extend the replication command CREATE_REPLICATION_SLOT to support the TWO_PHASE option. This will allow decoding commands like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option. The decoding of the transaction happens at prepare command. This patch also adds support of two-phase in pg_recvlogical via a new option --two-phase. This option will also be used by future patches that allow streaming of transactions at prepare time for built-in logical replication. With this, the out-of-core logical replication solutions can enable replication of two-phase transactions via replication protocol. Author: Ajin Cherian Reviewed-By: Jeff Davis, Vignesh C, Amit Kapila Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/64b9f783c6e125f18f88fbc0c0234e34e71d8639.camel@j-davis.com
1 parent 17707c0 commit cda03cf

12 files changed

+143
-13
lines changed

doc/src/sgml/logicaldecoding.sgml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,19 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot');
144144
</programlisting>
145145

146146
<para>
147-
The following example shows how logical decoding is controlled over the
147+
The following examples shows how logical decoding is controlled over the
148148
streaming replication protocol, using the
149149
program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
150150
distribution. This requires that client authentication is set up to allow
151151
replication connections
152152
(see <xref linkend="streaming-replication-authentication"/>) and
153153
that <varname>max_wal_senders</varname> is set sufficiently high to allow
154-
an additional connection.
154+
an additional connection. The second example shows how to stream two-phase
155+
transactions. Before you use two-phase commands, you must set
156+
<xref linkend="guc-max-prepared-transactions"/> to atleast 1.
155157
</para>
156158
<programlisting>
159+
Example 1:
157160
$ pg_recvlogical -d postgres --slot=test --create-slot
158161
$ pg_recvlogical -d postgres --slot=test --start -f -
159162
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
@@ -164,6 +167,22 @@ table public.data: INSERT: id[integer]:4 data[text]:'4'
164167
COMMIT 693
165168
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
166169
$ pg_recvlogical -d postgres --slot=test --drop-slot
170+
171+
Example 2:
172+
$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
173+
$ pg_recvlogical -d postgres --slot=test --start -f -
174+
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
175+
$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
176+
$ fg
177+
BEGIN 694
178+
table public.data: INSERT: id[integer]:5 data[text]:'5'
179+
PREPARE TRANSACTION 'test', txid 694
180+
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
181+
$ psql -d postgres -c "COMMIT PREPARED 'test';"
182+
$ fg
183+
COMMIT PREPARED 'test', txid 694
184+
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
185+
$ pg_recvlogical -d postgres --slot=test --drop-slot
167186
</programlisting>
168187

169188
<para>

doc/src/sgml/protocol.sgml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
19141914
</varlistentry>
19151915

19161916
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
1917-
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] }
1917+
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
19181918
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
19191919
</term>
19201920
<listitem>
@@ -1955,6 +1955,20 @@ The commands accepted in replication mode are:
19551955
</listitem>
19561956
</varlistentry>
19571957

1958+
<varlistentry>
1959+
<term><literal>TWO_PHASE</literal></term>
1960+
<listitem>
1961+
<para>
1962+
Specify that this logical replication slot supports decoding of two-phase
1963+
transactions. With this option, two-phase commands like
1964+
<literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
1965+
and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
1966+
The transaction will be decoded and transmitted at
1967+
<literal>PREPARE TRANSACTION</literal> time.
1968+
</para>
1969+
</listitem>
1970+
</varlistentry>
1971+
19581972
<varlistentry>
19591973
<term><literal>RESERVE_WAL</literal></term>
19601974
<listitem>

doc/src/sgml/ref/pg_recvlogical.sgml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ PostgreSQL documentation
6565
<option>--plugin</option>, for the database specified
6666
by <option>--dbname</option>.
6767
</para>
68+
69+
<para>
70+
The <option>--two-phase</option> can be specified with
71+
<option>--create-slot</option> to enable two-phase decoding.
72+
</para>
6873
</listitem>
6974
</varlistentry>
7075

@@ -256,6 +261,17 @@ PostgreSQL documentation
256261
</listitem>
257262
</varlistentry>
258263

264+
<varlistentry>
265+
<term><option>-t</option></term>
266+
<term><option>--two-phase</option></term>
267+
<listitem>
268+
<para>
269+
Enables two-phase decoding. This option should only be specified with
270+
<option>--create-slot</option>
271+
</para>
272+
</listitem>
273+
</varlistentry>
274+
259275
<varlistentry>
260276
<term><option>-v</option></term>
261277
<term><option>--verbose</option></term>

src/backend/replication/repl_gram.y

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
8484
%token K_SLOT
8585
%token K_RESERVE_WAL
8686
%token K_TEMPORARY
87+
%token K_TWO_PHASE
8788
%token K_EXPORT_SNAPSHOT
8889
%token K_NOEXPORT_SNAPSHOT
8990
%token K_USE_SNAPSHOT
@@ -283,6 +284,11 @@ create_slot_opt:
283284
$$ = makeDefElem("reserve_wal",
284285
(Node *)makeInteger(true), -1);
285286
}
287+
| K_TWO_PHASE
288+
{
289+
$$ = makeDefElem("two_phase",
290+
(Node *)makeInteger(true), -1);
291+
}
286292
;
287293

288294
/* DROP_REPLICATION_SLOT slot */

src/backend/replication/repl_scanner.l

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; }
103103
LOGICAL { return K_LOGICAL; }
104104
SLOT { return K_SLOT; }
105105
TEMPORARY { return K_TEMPORARY; }
106+
TWO_PHASE { return K_TWO_PHASE; }
106107
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
107108
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
108109
USE_SNAPSHOT { return K_USE_SNAPSHOT; }

src/backend/replication/walsender.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
863863
static void
864864
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
865865
bool *reserve_wal,
866-
CRSSnapshotAction *snapshot_action)
866+
CRSSnapshotAction *snapshot_action,
867+
bool *two_phase)
867868
{
868869
ListCell *lc;
869870
bool snapshot_action_given = false;
870871
bool reserve_wal_given = false;
872+
bool two_phase_given = false;
871873

872874
/* Parse options */
873875
foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
905907
reserve_wal_given = true;
906908
*reserve_wal = true;
907909
}
910+
else if (strcmp(defel->defname, "two_phase") == 0)
911+
{
912+
if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
913+
ereport(ERROR,
914+
(errcode(ERRCODE_SYNTAX_ERROR),
915+
errmsg("conflicting or redundant options")));
916+
two_phase_given = true;
917+
*two_phase = true;
918+
}
908919
else
909920
elog(ERROR, "unrecognized option: %s", defel->defname);
910921
}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
920931
char xloc[MAXFNAMELEN];
921932
char *slot_name;
922933
bool reserve_wal = false;
934+
bool two_phase = false;
923935
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
924936
DestReceiver *dest;
925937
TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
929941

930942
Assert(!MyReplicationSlot);
931943

932-
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
944+
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
933945

934946
/* setup state for WalSndSegmentOpen */
935947
sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
954966
*/
955967
ReplicationSlotCreate(cmd->slotname, true,
956968
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
957-
false);
969+
two_phase);
958970
}
959971

960972
if (cmd->kind == REPLICATION_KIND_LOGICAL)

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
646646
if (temp_replication_slot || create_slot)
647647
{
648648
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
649-
temp_replication_slot, true, true, false))
649+
temp_replication_slot, true, true, false, false))
650650
exit(1);
651651

652652
if (verbose)

src/bin/pg_basebackup/pg_receivewal.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ main(int argc, char **argv)
741741
pg_log_info("creating replication slot \"%s\"", replication_slot);
742742

743743
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
744-
slot_exists_ok))
744+
slot_exists_ok, false))
745745
exit(1);
746746
exit(0);
747747
}

src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
/* Global Options */
3636
static char *outfile = NULL;
3737
static int verbose = 0;
38+
static bool two_phase = false;
3839
static int noloop = 0;
3940
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
4041
static int fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -93,6 +94,7 @@ usage(void)
9394
printf(_(" -s, --status-interval=SECS\n"
9495
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
9596
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
97+
printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
9698
printf(_(" -v, --verbose output verbose messages\n"));
9799
printf(_(" -V, --version output version information, then exit\n"));
98100
printf(_(" -?, --help show this help, then exit\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
678680
{"fsync-interval", required_argument, NULL, 'F'},
679681
{"no-loop", no_argument, NULL, 'n'},
680682
{"verbose", no_argument, NULL, 'v'},
683+
{"two-phase", no_argument, NULL, 't'},
681684
{"version", no_argument, NULL, 'V'},
682685
{"help", no_argument, NULL, '?'},
683686
/* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
726729
}
727730
}
728731

729-
while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
732+
while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
730733
long_options, &option_index)) != -1)
731734
{
732735
switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
749752
case 'v':
750753
verbose++;
751754
break;
755+
case 't':
756+
two_phase = true;
757+
break;
752758
/* connection options */
753759
case 'd':
754760
dbname = pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
920926
exit(1);
921927
}
922928

929+
if (two_phase && !do_create_slot)
930+
{
931+
pg_log_error("--two-phase may only be specified with --create-slot");
932+
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
933+
progname);
934+
exit(1);
935+
}
936+
937+
923938
#ifndef WIN32
924939
pqsignal(SIGINT, sigint_handler);
925940
pqsignal(SIGHUP, sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
976991
pg_log_info("creating replication slot \"%s\"", replication_slot);
977992

978993
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
979-
false, false, slot_exists_ok))
994+
false, false, slot_exists_ok, two_phase))
980995
exit(1);
981996
startpos = InvalidXLogRecPtr;
982997
}

src/bin/pg_basebackup/streamutil.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
486486
bool
487487
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
488488
bool is_temporary, bool is_physical, bool reserve_wal,
489-
bool slot_exists_ok)
489+
bool slot_exists_ok, bool two_phase)
490490
{
491491
PQExpBuffer query;
492492
PGresult *res;
@@ -495,6 +495,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
495495

496496
Assert((is_physical && plugin == NULL) ||
497497
(!is_physical && plugin != NULL));
498+
Assert(!(two_phase && is_physical));
498499
Assert(slot_name != NULL);
499500

500501
/* Build query */
@@ -510,6 +511,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
510511
else
511512
{
512513
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
514+
if (two_phase && PQserverVersion(conn) >= 150000)
515+
appendPQExpBufferStr(query, " TWO_PHASE");
516+
513517
if (PQserverVersion(conn) >= 100000)
514518
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
515519
appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");

src/bin/pg_basebackup/streamutil.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
3434
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
3535
const char *plugin, bool is_temporary,
3636
bool is_physical, bool reserve_wal,
37-
bool slot_exists_ok);
37+
bool slot_exists_ok, bool two_phase);
3838
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
3939
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
4040
TimeLineID *starttli,

src/bin/pg_basebackup/t/030_pg_recvlogical.pl

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use warnings;
66
use TestLib;
77
use PostgresNode;
8-
use Test::More tests => 15;
8+
use Test::More tests => 20;
99

1010
program_help_ok('pg_recvlogical');
1111
program_version_ok('pg_recvlogical');
@@ -22,6 +22,7 @@
2222
max_wal_senders = 4
2323
log_min_messages = 'debug1'
2424
log_error_verbosity = verbose
25+
max_prepared_transactions = 10
2526
});
2627
$node->dump_info;
2728
$node->start;
@@ -63,3 +64,45 @@
6364
'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
6465
],
6566
'replayed a transaction');
67+
68+
$node->command_ok(
69+
[
70+
'pg_recvlogical', '-S',
71+
'test', '-d',
72+
$node->connstr('postgres'), '--drop-slot'
73+
],
74+
'slot dropped');
75+
76+
#test with two-phase option enabled
77+
$node->command_ok(
78+
[
79+
'pg_recvlogical', '-S',
80+
'test', '-d',
81+
$node->connstr('postgres'), '--create-slot', '--two-phase'
82+
],
83+
'slot with two-phase created');
84+
85+
$slot = $node->slot('test');
86+
isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
87+
88+
$node->safe_psql('postgres',
89+
"BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
90+
$node->safe_psql('postgres',
91+
"COMMIT PREPARED 'test'");
92+
$nextlsn =
93+
$node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
94+
chomp($nextlsn);
95+
96+
$node->command_fails(
97+
[
98+
'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
99+
'--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
100+
],
101+
'incorrect usage');
102+
103+
$node->command_ok(
104+
[
105+
'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
106+
'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
107+
],
108+
'replayed a two-phase transaction');

0 commit comments

Comments
 (0)