Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMasahiko Sawada2024-07-11 13:48:23 +0000
committerMasahiko Sawada2024-07-11 13:48:23 +0000
commitbb19b70081e2248f242cd00227abff5b1e105eb6 (patch)
tree9f3eb7f5c0cf592eee3eee43e22db3efd4b39bf1 /contrib/test_decoding
parentc194de0713ebe71aaeeb5ebed4af2390cc1b521c (diff)
Fix possibility of logical decoding partial transaction changes.
When creating and initializing a logical slot, the restart_lsn is set to the latest WAL insertion point (or the latest replay point on standbys). Subsequently, WAL records are decoded from that point to find the start point for extracting changes in the DecodingContextFindStartpoint() function. Since the initial restart_lsn could be in the middle of a transaction, the start point must be a consistent point where we won't see the data for partial transactions. Previously, when not building a full snapshot, serialized snapshots were restored, and the SnapBuild jumps to the consistent state even while finding the start point. Consequently, the slot's restart_lsn and confirmed_flush could be set to the middle of a transaction. This could lead to various unexpected consequences. Specifically, there were reports of logical decoding decoding partial transactions, and assertion failures occurred because only subtransactions were decoded without decoding their top-level transaction until decoding the commit record. To resolve this issue, the changes prevent restoring the serialized snapshot and jumping to the consistent state while finding the start point. On v17 and HEAD, a flag indicating whether snapshot restores should be skipped has been added to the SnapBuild struct, and SNAPBUILD_VERSION has been bumpded. On backbranches, the flag is stored in the LogicalDecodingContext instead, preserving on-disk compatibility. Backpatch to all supported versions. Reported-by: Drew Callahan Reviewed-by: Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.com Backpatch-through: 12
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile3
-rw-r--r--contrib/test_decoding/expected/skip_snapshot_restore.out45
-rw-r--r--contrib/test_decoding/meson.build1
-rw-r--r--contrib/test_decoding/specs/skip_snapshot_restore.spec46
4 files changed, 94 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce6037064..a4ba1a509ae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
- twophase_snapshot slot_creation_error catalog_change_snapshot
+ twophase_snapshot slot_creation_error catalog_change_snapshot \
+ skip_snapshot_restore
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out
new file mode 100644
index 00000000000..c64dbd9c4e0
--- /dev/null
+++ b/contrib/test_decoding/expected/skip_snapshot_restore.out
@@ -0,0 +1,45 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_insert1: INSERT INTO tbl VALUES (1);
+step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); <waiting ...>
+step s2_checkpoint: CHECKPOINT;
+step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO tbl VALUES (2);
+step s0_commit: COMMIT;
+step s1_init: <... completed>
+?column?
+--------
+init
+(1 row)
+
+step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-----------------------------------------
+BEGIN
+table public.tbl: INSERT: val1[integer]:1
+table public.tbl: INSERT: val1[integer]:2
+COMMIT
+(4 rows)
+
+step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index f1548c0fafc..f643dc81a2c 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -62,6 +62,7 @@ tests += {
'concurrent_stream',
'twophase_snapshot',
'slot_creation_error',
+ 'skip_snapshot_restore',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec
new file mode 100644
index 00000000000..3f1fb6f02c7
--- /dev/null
+++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec
@@ -0,0 +1,46 @@
+# Test that a slot creation skips to restore serialized snapshot to reach
+# the consistent state.
+
+setup
+{
+ DROP TABLE IF EXISTS tbl;
+ CREATE TABLE tbl (val1 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl;
+ SELECT 'stop' FROM pg_drop_replication_slot('slot0');
+ SELECT 'stop' FROM pg_drop_replication_slot('slot1');
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_insert1" { INSERT INTO tbl VALUES (1); }
+step "s0_insert2" { INSERT INTO tbl VALUES (2); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); }
+step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+session "s2"
+setup { SET synchronous_commit = on ;}
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+
+# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the
+# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1"
+# serializes consistent snapshots to the disk at LSNs where are before
+# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but
+# must not restore any serialized snapshots and will reach the consistent state
+# when decoding a RUNNING_XACT record generated after s0-transaction's commit.
+# We check if the get_changes on 'slot1' will not return any s0-transaction's
+# changes as its confirmed_flush_lsn will be after the s0-transaction's commit
+# record.
+permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1"