Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Add prepare API support for streaming transactions in logical replication.
authorAmit Kapila <akapila@postgresql.org>
Wed, 4 Aug 2021 02:17:06 +0000 (07:47 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 4 Aug 2021 02:17:06 +0000 (07:47 +0530)
Commit a8fd13cab0 added support for prepared transactions to built-in
logical replication via a new option "two_phase" for a subscription. The
"two_phase" option was not allowed with the existing streaming option.

This commit permits the combination of "streaming" and "two_phase"
subscription options. It extends the pgoutput plugin and the subscriber
side code to add the prepare API for streaming transactions which will
apply the changes accumulated in the spool-file at prepare time.

Author: Peter Smith and Ajin Cherian
Reviewed-by: Vignesh C, Amit Kapila, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com

12 files changed:
doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/logicalproto.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/022_twophase_cascade.pl
src/test/subscription/t/023_twophase_stream.pl [new file with mode: 0644]

index 89b8090b79f3a42d71224c09e16a9d4b416b6634..0d0de291f3e0e86960e4b8ee6477636d4e048770 100644 (file)
@@ -1199,6 +1199,9 @@ OutputPluginWrite(ctx, true);
     <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
     and <function>stream_change_cb</function>) and two optional callbacks
     (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+    Also, if streaming of two-phase commands is to be supported, then additional
+    callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
+    for details).
    </para>
 
    <para>
@@ -1237,7 +1240,13 @@ stream_start_cb(...);   &lt;-- start of second block of changes
   stream_change_cb(...);
 stream_stop_cb(...);    &lt;-- end of second block of changes
 
-stream_commit_cb(...);  &lt;-- commit of the streamed transaction
+
+[a. when using normal commit]
+stream_commit_cb(...);    &lt;-- commit of the streamed transaction
+
+[b. when using two-phase commit]
+stream_prepare_cb(...);   &lt;-- prepare the streamed transaction
+commit_prepared_cb(...);  &lt;-- commit of the prepared transaction
 </programlisting>
    </para>
 
index 991994de1df288fa7c3e1da4a79935c4515e49ce..91ec237c210b03a06ea4ec586190c0c03fcc975c 100644 (file)
@@ -7411,7 +7411,7 @@ Stream Abort
 </variablelist>
 
 <para>
-The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
+The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared, Stream Prepare)
 are available since protocol version 3.
 </para>
 
@@ -7714,6 +7714,80 @@ are available since protocol version 3.
 </listitem>
 </varlistentry>
 
+<varlistentry>
+
+<term>Stream Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('p')</term>
+<listitem><para>
+                Identifies the message as a two-phase stream prepare message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int8(0)
+</term>
+<listitem><para>
+                Flags; currently unused.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int64 (XLogRecPtr)
+</term>
+<listitem><para>
+                The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int64 (XLogRecPtr)
+</term>
+<listitem><para>
+                The end LSN of the prepare transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int64 (TimestampTz)
+</term>
+<listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int32 (TransactionId)
+</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
 </variablelist>
 
 <para>
index 143390593d0d623cde26571ea9494613be2e2343..702934eba13ccacca1308134d305e8cc56f020d8 100644 (file)
@@ -238,11 +238,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           subscriber as a whole.
          </para>
 
-         <para>
-          The <literal>streaming</literal> option cannot be used with the
-          <literal>two_phase</literal> option.
-         </para>
-
         </listitem>
        </varlistentry>
        <varlistentry>
@@ -269,11 +264,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           to know the actual two-phase state.
          </para>
 
-         <para>
-          The <literal>two_phase</literal> option cannot be used with the
-          <literal>streaming</literal> option.
-         </para>
-
         </listitem>
        </varlistentry>
       </variablelist></para>
index 22ae9823288975be489726815f2e843395760ba3..5157f44058fa074534b34b226694e587b783aeb9 100644 (file)
@@ -335,25 +335,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
                     errmsg("subscription with %s must also set %s",
                            "slot_name = NONE", "create_slot = false")));
    }
-
-   /*
-    * Do additional checking for the disallowed combination of two_phase and
-    * streaming. While streaming and two_phase can theoretically be
-    * supported, it needs more analysis to allow them together.
-    */
-   if (opts->twophase &&
-       IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
-       IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
-   {
-       if (opts->streaming &&
-           IsSet(supported_opts, SUBOPT_STREAMING) &&
-           IsSet(opts->specified_opts, SUBOPT_STREAMING))
-           ereport(ERROR,
-                   (errcode(ERRCODE_SYNTAX_ERROR),
-           /*- translator: both %s are strings of the form "option = value" */
-                    errmsg("%s and %s are mutually exclusive options",
-                           "two_phase = true", "streaming = true")));
-   }
 }
 
 /*
@@ -933,12 +914,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
                if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
                {
-                   if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
-                       ereport(ERROR,
-                               (errcode(ERRCODE_SYNTAX_ERROR),
-                                errmsg("cannot set %s for two-phase enabled subscription",
-                                       "streaming = true")));
-
                    values[Anum_pg_subscription_substream - 1] =
                        BoolGetDatum(opts.streaming);
                    replaces[Anum_pg_subscription_substream - 1] = true;
index 2d774567e089ca1386be83d3163c6e078d3f0750..52b65e95721c7869ccf69c4fa7768c29b9b343ff 100644 (file)
@@ -145,7 +145,8 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
 }
 
 /*
- * The core functionality for logicalrep_write_prepare.
+ * The core functionality for logicalrep_write_prepare and
+ * logicalrep_write_stream_prepare.
  */
 static void
 logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
@@ -188,7 +189,8 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
 }
 
 /*
- * The core functionality for logicalrep_read_prepare.
+ * The core functionality for logicalrep_read_prepare and
+ * logicalrep_read_stream_prepare.
  */
 static void
 logicalrep_read_prepare_common(StringInfo in, char *msgtype,
@@ -209,6 +211,8 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype,
        elog(ERROR, "end_lsn is not set in %s message", msgtype);
    prepare_data->prepare_time = pq_getmsgint64(in);
    prepare_data->xid = pq_getmsgint(in, 4);
+   if (prepare_data->xid == InvalidTransactionId)
+       elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
 
    /* read gid (copy it into a pre-allocated buffer) */
    strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
@@ -339,6 +343,27 @@ logicalrep_read_rollback_prepared(StringInfo in,
    strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
 }
 
+/*
+ * Write STREAM PREPARE to the output stream.
+ */
+void
+logicalrep_write_stream_prepare(StringInfo out,
+                               ReorderBufferTXN *txn,
+                               XLogRecPtr prepare_lsn)
+{
+   logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
+                                   txn, prepare_lsn);
+}
+
+/*
+ * Read STREAM PREPARE from the stream.
+ */
+void
+logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+   logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
+}
+
 /*
  * Write ORIGIN to the output stream.
  */
index 249de807984ea0c9d41c241a4e9db8c6f6f3c4b4..ecaed157f29995de4a2a532ee1927a7259f3fc1c 100644 (file)
@@ -1052,6 +1052,56 @@ apply_handle_rollback_prepared(StringInfo s)
    pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+/*
+ * Handle STREAM PREPARE.
+ *
+ * Logic is in two parts:
+ * 1. Replay all the spooled operations
+ * 2. Mark the transaction as prepared
+ */
+static void
+apply_handle_stream_prepare(StringInfo s)
+{
+   LogicalRepPreparedTxnData prepare_data;
+
+   if (in_streamed_transaction)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("STREAM PREPARE message without STREAM STOP")));
+
+   /* Tablesync should never receive prepare. */
+   if (am_tablesync_worker())
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("tablesync worker received a STREAM PREPARE message")));
+
+   logicalrep_read_stream_prepare(s, &prepare_data);
+
+   elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
+
+   /* Replay all the spooled operations. */
+   apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
+
+   /* Mark the transaction as prepared. */
+   apply_handle_prepare_internal(&prepare_data);
+
+   CommitTransactionCommand();
+
+   pgstat_report_stat(false);
+
+   store_flush_position(prepare_data.end_lsn);
+
+   in_remote_transaction = false;
+
+   /* unlink the files with serialized changes and subxact info. */
+   stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+
+   /* Process any tables that are being synchronized in parallel. */
+   process_syncing_tables(prepare_data.end_lsn);
+
+   pgstat_report_activity(STATE_IDLE, NULL);
+}
+
 /*
  * Handle ORIGIN message.
  *
@@ -1291,7 +1341,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
     */
    oldcxt = MemoryContextSwitchTo(TopTransactionContext);
 
-   /* open the spool file for the committed transaction */
+   /* Open the spool file for the committed/prepared transaction */
    changes_filename(path, MyLogicalRepWorker->subid, xid);
    elog(DEBUG1, "replaying changes from file \"%s\"", path);
 
@@ -2357,6 +2407,10 @@ apply_dispatch(StringInfo s)
        case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
            apply_handle_rollback_prepared(s);
            return;
+
+       case LOGICAL_REP_MSG_STREAM_PREPARE:
+           apply_handle_stream_prepare(s);
+           return;
    }
 
    ereport(ERROR,
index e4314af13ae6c22b2647b4cc3493bf4016daa3d4..286119c8c83f68027fe8a4c7ec79f133b7bc4ac8 100644 (file)
@@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
                                   ReorderBufferTXN *txn,
                                   XLogRecPtr commit_lsn);
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->stream_message_cb = pgoutput_message;
    cb->stream_truncate_cb = pgoutput_truncate;
    /* transaction streaming - two-phase commit */
-   cb->stream_prepare_cb = NULL;
+   cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
 }
 
 static void
@@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data)
        }
        else
            elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
-
-       /*
-        * Do additional checking for the disallowed combination of two_phase
-        * and streaming. While streaming and two_phase can theoretically be
-        * supported, it needs more analysis to allow them together.
-        */
-       if (data->two_phase && data->streaming)
-           ereport(ERROR,
-                   (errcode(ERRCODE_SYNTAX_ERROR),
-                    errmsg("%s and %s are mutually exclusive options",
-                           "two_phase", "streaming")));
    }
 }
 
@@ -1029,6 +1020,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    cleanup_rel_sync_cache(txn->xid, true);
 }
 
+/*
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+                           ReorderBufferTXN *txn,
+                           XLogRecPtr prepare_lsn)
+{
+   Assert(rbtxn_is_streamed(txn));
+
+   OutputPluginUpdateProgress(ctx);
+   OutputPluginPrepareWrite(ctx, true);
+   logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+   OutputPluginWrite(ctx, true);
+}
+
 /*
  * Initialize the relation schema sync cache for a decoding session.
  *
index 63de90d94a5724ac619b194778c9ba97ff8e7abf..2e2951315128dd0bc99ae80bba396d39418a53c1 100644 (file)
@@ -67,7 +67,8 @@ typedef enum LogicalRepMsgType
    LOGICAL_REP_MSG_STREAM_START = 'S',
    LOGICAL_REP_MSG_STREAM_END = 'E',
    LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
-   LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+   LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+   LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
 } LogicalRepMsgType;
 
 /*
@@ -196,7 +197,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN
                                               TimestampTz prepare_time);
 extern void logicalrep_read_rollback_prepared(StringInfo in,
                                              LogicalRepRollbackPreparedTxnData *rollback_data);
-
+extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
+                                           XLogRecPtr prepare_lsn);
+extern void logicalrep_read_stream_prepare(StringInfo in,
+                                          LogicalRepPreparedTxnData *prepare_data);
 
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
                                    XLogRecPtr origin_lsn);
index 67f92b38787622db608d812afe9dd44b2c27123d..77b4437b693d2bcb523123345918ae9ebf0450c6 100644 (file)
@@ -282,27 +282,29 @@ WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ..
 --fail - alter of two_phase option not supported.
 ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 ERROR:  unrecognized subscription parameter: "two_phase"
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
-ERROR:  cannot set streaming = true for two-phase enabled subscription
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
                                                                      List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
 (1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
-ERROR:  two_phase = true and streaming = true are mutually exclusive options
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                            List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo 
-------+-------+---------+-------------+--------+-----------+------------------+--------------------+----------
-(0 rows)
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+(1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
index 88743ab33bdf419d3d83fef5720e195fb946b71c..d42104c19106fc63ac8437e997c3b42e24bbd2ed 100644 (file)
@@ -215,20 +215,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 --fail - alter of two_phase option not supported.
 ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
-
 \dRs+
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 
 \dRs+
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
index d7cc99959f892101dbcd58a04d9f0fef6e0488a7..a47c62d8fde103053a9f4e6174a99bc2109bbc74 100644 (file)
@@ -2,11 +2,14 @@
 # Copyright (c) 2021, PostgreSQL Global Development Group
 
 # Test cascading logical replication of 2PC.
+#
+# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
+#
 use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 27;
+use Test::More tests => 41;
 
 ###############################
 # Setup a cascade of pub/sub nodes.
@@ -17,20 +20,26 @@ use Test::More tests => 27;
 # node_A
 my $node_A = PostgresNode->new('node_A');
 $node_A->init(allows_streaming => 'logical');
-$node_A->append_conf('postgresql.conf',
-   qq(max_prepared_transactions = 10));
+$node_A->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
 $node_A->start;
 # node_B
 my $node_B = PostgresNode->new('node_B');
 $node_B->init(allows_streaming => 'logical');
-$node_B->append_conf('postgresql.conf',
-   qq(max_prepared_transactions = 10));
+$node_B->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
 $node_B->start;
 # node_C
 my $node_C = PostgresNode->new('node_C');
 $node_C->init(allows_streaming => 'logical');
-$node_C->append_conf('postgresql.conf',
-   qq(max_prepared_transactions = 10));
+$node_C->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
 $node_C->start;
 
 # Create some pre-existing content on node_A
@@ -45,12 +54,29 @@ $node_B->safe_psql('postgres',
 $node_C->safe_psql('postgres',
    "CREATE TABLE tab_full (a int PRIMARY KEY)");
 
+# Create some pre-existing content on node_A (for streaming tests)
+$node_A->safe_psql('postgres',
+   "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_A->safe_psql('postgres',
+   "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Create the same tables on node_B and node_C
+# columns a and b are compatible with same table name on node_A
+$node_B->safe_psql('postgres',
+   "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_C->safe_psql('postgres',
+   "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
 # Setup logical replication
 
+# -----------------------
+# 2PC NON-STREAMING TESTS
+# -----------------------
+
 # node_A (pub) -> node_B (sub)
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres',
-   "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+   "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab");
 my $appname_B = 'tap_sub_B';
 $node_B->safe_psql('postgres', "
    CREATE SUBSCRIPTION tap_sub_B
@@ -61,7 +87,7 @@ $node_B->safe_psql('postgres',    "
 # node_B (pub) -> node_C (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres',
-   "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+   "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab");
 my $appname_C = 'tap_sub_C';
 $node_C->safe_psql('postgres', "
    CREATE SUBSCRIPTION tap_sub_C
@@ -203,6 +229,141 @@ is($result, qq(21), 'Rows committed are present on subscriber B');
 $result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
 is($result, qq(21), 'Rows committed are present on subscriber C');
 
+# ---------------------
+# 2PC + STREAMING TESTS
+# ---------------------
+
+my $oldpid_B = $node_A->safe_psql('postgres', "
+   SELECT pid FROM pg_stat_replication
+   WHERE application_name = '$appname_B';");
+my $oldpid_C = $node_B->safe_psql('postgres', "
+   SELECT pid FROM pg_stat_replication
+   WHERE application_name = '$appname_C';");
+
+# Setup logical replication (streaming = on)
+
+$node_B->safe_psql('postgres', "
+   ALTER SUBSCRIPTION tap_sub_B
+   SET (streaming = on);");
+$node_C->safe_psql('postgres', "
+   ALTER SUBSCRIPTION tap_sub_C
+   SET (streaming = on)");
+
+# Wait for subscribers to finish initialization
+
+$node_A->poll_query_until('postgres', "
+   SELECT pid != $oldpid_B FROM pg_stat_replication
+   WHERE application_name = '$appname_B';"
+) or die "Timed out while waiting for apply to restart";
+$node_B->poll_query_until('postgres', "
+   SELECT pid != $oldpid_C FROM pg_stat_replication
+   WHERE application_name = '$appname_C';"
+) or die "Timed out while waiting for apply to restart";
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber(s) after the commit.
+###############################
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+# Then 2PC PREPARE
+$node_A->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults');
+$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
+# 0. Cleanup from previous test leaving only 2 rows.
+# 1. Insert one more row.
+# 2. Record a SAVEPOINT.
+# 3. Data is streamed using 2PC.
+# 4. Do rollback to SAVEPOINT prior to the streamed inserts.
+# 5. Then COMMIT PREPARED.
+#
+# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1).
+###############################
+
+# First, delete the data except for 2 rows (delete will be replicated)
+$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+   BEGIN;
+   INSERT INTO test_tab VALUES (9999, 'foobar');
+   SAVEPOINT sp_inner;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   ROLLBACK TO SAVEPOINT sp_inner;
+   PREPARE TRANSACTION 'outer';
+   ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# All the streamed data (prior to the SAVEPOINT) should be rolled back.
+# (9999, 'foobar') should be committed.
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber C');
+
 ###############################
 # check all the cleanup
 ###############################
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
new file mode 100644 (file)
index 0000000..c72c6b5
--- /dev/null
@@ -0,0 +1,284 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test logical replication of 2PC with streaming.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 18;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication (streaming = on)
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', "
+   CREATE SUBSCRIPTION tap_sub
+   CONNECTION '$publisher_connstr application_name=$appname'
+   PUBLICATION tap_pub
+   WITH (streaming = on, two_phase = on)");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+   "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+   "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# Check initial data was copied to subscriber
+###############################
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber side after the commit.
+###############################
+
+# check that 2PC gets replicated to subscriber
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. Do rollback prepared.
+#
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres',  "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# Note: both publisher and subscriber do crash/restart.
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+
+###############################
+# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a ROLLBACK PREPARED.
+#
+# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
+# (the original 2 + inserted 1).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber,
+# but the extra INSERT outside of the 2PC still was replicated
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a COMMIT PREPARED.
+#
+# Expect 2PC data + the extra row are on the subscriber
+# (the 3334 + inserted 1 = 3335).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+   BEGIN;
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+   DELETE FROM test_tab WHERE mod(a,3) = 0;
+   PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');