Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2021-01-04 03:04:50 +0000
committerAmit Kapila2021-01-04 03:04:50 +0000
commita271a1b50e9bec07e2ef3a05e38e7285113e4ce6 (patch)
treea3cd4b3e22169f548a6c92615f8e713f7001e30f /contrib/test_decoding/sql
parentca3b37487be333a1d241dab1bbdd17a211a88f43 (diff)
Allow decoding at prepare time in ReorderBuffer.
This patch allows PREPARE-time decoding of two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. Now that we decode the changes before the commit, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We detect such failures with a special sqlerrcode ERRCODE_TRANSACTION_ROLLBACK introduced by commit 7259736a6e and stop decoding the remaining changes. Then we rollback the changes when rollback prepared is encountered. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, Arseny Sher, and Dilip Kumar Tested-by: Takamichi Osumi Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'contrib/test_decoding/sql')
-rw-r--r--contrib/test_decoding/sql/twophase.sql112
-rw-r--r--contrib/test_decoding/sql/twophase_stream.sql45
2 files changed, 157 insertions, 0 deletions
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
new file mode 100644
index 00000000000..894e4f5baf1
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -0,0 +1,112 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
+-- call should return within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
new file mode 100644
index 00000000000..e9dd44fdb37
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -0,0 +1,45 @@
+-- Test streaming of two-phase commits
+
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');