diff options
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r-- | contrib/test_decoding/Makefile | 2 | ||||
-rw-r--r-- | contrib/test_decoding/expected/twophase.out | 235 | ||||
-rw-r--r-- | contrib/test_decoding/expected/twophase_stream.out | 147 | ||||
-rw-r--r-- | contrib/test_decoding/sql/twophase.sql | 112 | ||||
-rw-r--r-- | contrib/test_decoding/sql/twophase_stream.sql | 45 |
5 files changed, 540 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a4c76f0136..76d4a697726 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats + spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out new file mode 100644 index 00000000000..f9f6bedd1cf --- /dev/null +++ b/contrib/test_decoding/expected/twophase.out @@ -0,0 +1,235 @@ +-- Test prepared transactions. When two-phase-commit is enabled, transactions are +-- decoded at PREPARE time rather than at COMMIT PREPARED time. +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); +-- Test that decoding happens at PREPARE time when two-phase-commit is enabled. +-- Decoding after COMMIT PREPARED must have all the commands in the transaction. +BEGIN; +INSERT INTO test_prepared1 VALUES (1); +INSERT INTO test_prepared1 VALUES (2); +-- should show nothing because the xact has not been prepared yet. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +PREPARE TRANSACTION 'test_prepared#1'; +-- should show both the above inserts and the PREPARE TRANSACTION. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:2 + PREPARE TRANSACTION 'test_prepared#1' +(4 rows) + +COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:2 + PREPARE TRANSACTION 'test_prepared#1' + COMMIT PREPARED 'test_prepared#1' +(5 rows) + +-- Test that rollback of a prepared xact is decoded. +BEGIN; +INSERT INTO test_prepared1 VALUES (3); +PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(3 rows) + +ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + +-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test. +BEGIN; +ALTER TABLE test_prepared1 ADD COLUMN data text; +INSERT INTO test_prepared1 VALUES (4, 'frakbar'); +PREPARE TRANSACTION 'test_prepared#3'; +-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + +-- The insert should show the newly altered column but not the DDL. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(3 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +INSERT INTO test_prepared2 VALUES (5); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:5 + COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' + COMMIT PREPARED 'test_prepared#3' +(4 rows) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (6); +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:6 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 + COMMIT +(6 rows) + +-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (8, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (9, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +SELECT 'test_prepared1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +----------------+----------+--------------------- + test_prepared1 | relation | RowExclusiveLock + test_prepared1 | relation | ShareLock + test_prepared1 | relation | AccessExclusiveLock +(3 rows) + +-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The +-- call should return within a second. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' +(4 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' + COMMIT PREPARED 'test_prepared_lock' +(5 rows) + +-- Test savepoints and sub-xacts. Creating savepoints will create +-- sub-xacts implicitly. +BEGIN; +CREATE TABLE test_prepared_savepoint (a int); +INSERT INTO test_prepared_savepoint VALUES (1); +SAVEPOINT test_savepoint; +INSERT INTO test_prepared_savepoint VALUES (2); +ROLLBACK TO SAVEPOINT test_savepoint; +PREPARE TRANSACTION 'test_prepared_savepoint'; +-- should show only 1, not 2 +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------ + BEGIN + table public.test_prepared_savepoint: INSERT: a[integer]:1 + PREPARE TRANSACTION 'test_prepared_savepoint' +(3 rows) + +COMMIT PREPARED 'test_prepared_savepoint'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------ + BEGIN + table public.test_prepared_savepoint: INSERT: a[integer]:1 + PREPARE TRANSACTION 'test_prepared_savepoint' + COMMIT PREPARED 'test_prepared_savepoint' +(4 rows) + +-- Test that a GID containing "_nodecode" gets decoded at commit prepared time. +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:20 data[text]:null + COMMIT +(3 rows) + +-- Test 8: +-- cleanup and make sure results are also empty +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out new file mode 100644 index 00000000000..3acc4acd365 --- /dev/null +++ b/contrib/test_decoding/expected/twophase_stream.out @@ -0,0 +1,147 @@ +-- Test streaming of two-phase commits +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stream_test(data text); +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED +BEGIN; +SAVEPOINT s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +ROLLBACK TO s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1'; +-- should show the inserts after a ROLLBACK +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +---------------------------------------------------------- + streaming message: transactional: 1 prefix: test, sz: 50 + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + preparing streamed transaction 'test1' +(24 rows) + +COMMIT PREPARED 'test1'; +--should show the COMMIT PREPARED and the other changes in the transaction +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------------------------------------------- + BEGIN + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20' + PREPARE TRANSACTION 'test1' + COMMIT PREPARED 'test1' +(23 rows) + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with +-- filtered gid. gids with '_nodecode' will not be decoded at prepare time. +BEGIN; +SAVEPOINT s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +ROLLBACK to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1_nodecode'; +-- should NOT show inserts after a ROLLBACK +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +---------------------------------------------------------- + streaming message: transactional: 1 prefix: test, sz: 50 +(1 row) + +COMMIT PREPARED 'test1_nodecode'; +-- should show the inserts but not show a COMMIT PREPARED but a COMMIT +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------------------------------------------- + BEGIN + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20' + COMMIT +(22 rows) + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql new file mode 100644 index 00000000000..894e4f5baf1 --- /dev/null +++ b/contrib/test_decoding/sql/twophase.sql @@ -0,0 +1,112 @@ +-- Test prepared transactions. When two-phase-commit is enabled, transactions are +-- decoded at PREPARE time rather than at COMMIT PREPARED time. +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); + +-- Test that decoding happens at PREPARE time when two-phase-commit is enabled. +-- Decoding after COMMIT PREPARED must have all the commands in the transaction. +BEGIN; +INSERT INTO test_prepared1 VALUES (1); +INSERT INTO test_prepared1 VALUES (2); +-- should show nothing because the xact has not been prepared yet. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +PREPARE TRANSACTION 'test_prepared#1'; +-- should show both the above inserts and the PREPARE TRANSACTION. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test that rollback of a prepared xact is decoded. +BEGIN; +INSERT INTO test_prepared1 VALUES (3); +PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test. +BEGIN; +ALTER TABLE test_prepared1 ADD COLUMN data text; +INSERT INTO test_prepared1 VALUES (4, 'frakbar'); +PREPARE TRANSACTION 'test_prepared#3'; +-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; +-- The insert should show the newly altered column but not the DDL. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +INSERT INTO test_prepared2 VALUES (5); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (6); +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (8, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (9, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +SELECT 'test_prepared1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; +-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The +-- call should return within a second. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test savepoints and sub-xacts. Creating savepoints will create +-- sub-xacts implicitly. +BEGIN; +CREATE TABLE test_prepared_savepoint (a int); +INSERT INTO test_prepared_savepoint VALUES (1); +SAVEPOINT test_savepoint; +INSERT INTO test_prepared_savepoint VALUES (2); +ROLLBACK TO SAVEPOINT test_savepoint; +PREPARE TRANSACTION 'test_prepared_savepoint'; +-- should show only 1, not 2 +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_savepoint'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test that a GID containing "_nodecode" gets decoded at commit prepared time. +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 8: +-- cleanup and make sure results are also empty +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql new file mode 100644 index 00000000000..e9dd44fdb37 --- /dev/null +++ b/contrib/test_decoding/sql/twophase_stream.sql @@ -0,0 +1,45 @@ +-- Test streaming of two-phase commits + +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stream_test(data text); + +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED +BEGIN; +SAVEPOINT s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +ROLLBACK TO s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1'; +-- should show the inserts after a ROLLBACK +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +COMMIT PREPARED 'test1'; +--should show the COMMIT PREPARED and the other changes in the transaction +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with +-- filtered gid. gids with '_nodecode' will not be decoded at prepare time. +BEGIN; +SAVEPOINT s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +ROLLBACK to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1_nodecode'; +-- should NOT show inserts after a ROLLBACK +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +COMMIT PREPARED 'test1_nodecode'; +-- should show the inserts but not show a COMMIT PREPARED but a COMMIT +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); |