Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/twophase.out235
-rw-r--r--contrib/test_decoding/expected/twophase_stream.out147
-rw-r--r--contrib/test_decoding/sql/twophase.sql112
-rw-r--r--contrib/test_decoding/sql/twophase_stream.sql45
5 files changed, 540 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a4c76f0136..76d4a697726 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats
+ spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
new file mode 100644
index 00000000000..f9f6bedd1cf
--- /dev/null
+++ b/contrib/test_decoding/expected/twophase.out
@@ -0,0 +1,235 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:2
+ PREPARE TRANSACTION 'test_prepared#1'
+(4 rows)
+
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:2
+ PREPARE TRANSACTION 'test_prepared#1'
+ COMMIT PREPARED 'test_prepared#1'
+(5 rows)
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(3 rows)
+
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+ relation | locktype | mode
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(3 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:5
+ COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+ COMMIT PREPARED 'test_prepared#3'
+(4 rows)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+(6 rows)
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+ relation | locktype | mode
+----------------+----------+---------------------
+ test_prepared1 | relation | RowExclusiveLock
+ test_prepared1 | relation | ShareLock
+ test_prepared1 | relation | AccessExclusiveLock
+(3 rows)
+
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
+-- call should return within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+(4 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ COMMIT PREPARED 'test_prepared_lock'
+(5 rows)
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------------------------------------------------------------
+ BEGIN
+ table public.test_prepared_savepoint: INSERT: a[integer]:1
+ PREPARE TRANSACTION 'test_prepared_savepoint'
+(3 rows)
+
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------------------------------------------------------------
+ BEGIN
+ table public.test_prepared_savepoint: INSERT: a[integer]:1
+ PREPARE TRANSACTION 'test_prepared_savepoint'
+ COMMIT PREPARED 'test_prepared_savepoint'
+(4 rows)
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
new file mode 100644
index 00000000000..3acc4acd365
--- /dev/null
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -0,0 +1,147 @@
+-- Test streaming of two-phase commits
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE stream_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column?
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ preparing streamed transaction 'test1'
+(24 rows)
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
+ PREPARE TRANSACTION 'test1'
+ COMMIT PREPARED 'test1'
+(23 rows)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column?
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+(1 row)
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
+ COMMIT
+(22 rows)
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
new file mode 100644
index 00000000000..894e4f5baf1
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -0,0 +1,112 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
+-- call should return within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
new file mode 100644
index 00000000000..e9dd44fdb37
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -0,0 +1,45 @@
+-- Test streaming of two-phase commits
+
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');