Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3723471

Browse files
skkyalCommitfest Bot
authored and
Commitfest Bot
committed
Fix data loss in logical replication
Previously, logical replication could lose data if one user modified a publication to add a table while another user concurrently modified that table and commit later than the publication modification transaction. The issue arised during the decoding of transactions modifying the table: if the initial catalog cache was built using a snapshot taken before the publication DDL execution, all subsequent changes to that table were decoded with outdated catalog cache, which caused them to be filtered from replication. This happened because invalidation messages were only present in the publication modification transaction, which was decoded before these subsequent changes. This issue is not limited to publication DDLs; similar problems can occur with ALTER TYPE statements executed concurrently with DMLs, leading to incorrect decoding under outdated type contexts. To address this, the commit improves logical decoding by ensuring that invalidation messages from catalog-modifying transactions are distributed to all concurrent in-progress transactions. This allows the necessary rebuild of the catalog cache when decoding new changes, similar to handling historic catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()). Following this change, some performance regression is observed, primarily during frequent execution of publication DDL statements that modify published tables. This is an expected trade-off due to cache rebuild and distribution overhead. The regression is minor or nearly nonexistent when DDLs do not affect published tables or occur infrequently, making this a worthwhile cost to resolve a longstanding data loss issue. An alternative approach considered was to take a strong lock on each affected table during publication modification. However, this would only address issues related to publication DDLs and require locking every relation in the database for publications created as FOR ALL TABLES, which is impractical. Thus, this commit chooses to distribute invalidation messages as outlined above.
1 parent 91f1fe9 commit 3723471

File tree

7 files changed

+134
-15
lines changed

7 files changed

+134
-15
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
1111
twophase_snapshot slot_creation_error catalog_change_snapshot \
12-
skip_snapshot_restore
12+
skip_snapshot_restore invalidation_distrubution
1313

1414
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1515
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
4+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
5+
step s1_begin: BEGIN;
6+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
7+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
8+
step s1_commit: COMMIT;
9+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
10+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
11+
count
12+
-----
13+
1
14+
(1 row)
15+
16+
?column?
17+
--------
18+
stop
19+
(1 row)
20+

contrib/test_decoding/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ tests += {
6363
'twophase_snapshot',
6464
'slot_creation_error',
6565
'skip_snapshot_restore',
66+
'invalidation_distrubution',
6667
],
6768
'regress_args': [
6869
'--temp-config', files('logical.conf'),
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Test that catalog cache invalidation messages are distributed to ongoing
2+
# transactions, ensuring they can access the updated catalog content after
3+
# processing these messages.
4+
setup
5+
{
6+
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
7+
CREATE TABLE tbl1(val1 integer, val2 integer);
8+
CREATE PUBLICATION pub;
9+
}
10+
11+
teardown
12+
{
13+
DROP TABLE tbl1;
14+
DROP PUBLICATION pub;
15+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
16+
}
17+
18+
session "s1"
19+
setup { SET synchronous_commit=on; }
20+
21+
step "s1_begin" { BEGIN; }
22+
step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
23+
step "s1_commit" { COMMIT; }
24+
25+
session "s2"
26+
setup { SET synchronous_commit=on; }
27+
28+
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
29+
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
30+
31+
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
32+
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"

src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5460,3 +5460,26 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
54605460
*cmax = ent->cmax;
54615461
return true;
54625462
}
5463+
5464+
/*
5465+
* Count invalidation messages of specified transaction.
5466+
*
5467+
* Returns number of messages, and msgs is set to the pointer of the linked
5468+
* list for the messages.
5469+
*/
5470+
uint32
5471+
ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
5472+
SharedInvalidationMessage **msgs)
5473+
{
5474+
ReorderBufferTXN *txn;
5475+
5476+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5477+
false);
5478+
5479+
if (txn == NULL)
5480+
return 0;
5481+
5482+
*msgs = txn->invalidations;
5483+
5484+
return txn->ninvalidations;
5485+
}

src/backend/replication/logical/snapbuild.c

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
161161

162162
static void SnapBuildSnapIncRefcount(Snapshot snap);
163163

164-
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
164+
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
165165

166166
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
167167
uint32 xinfo);
@@ -720,23 +720,24 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
720720
}
721721

722722
/*
723-
* Add a new Snapshot to all transactions we're decoding that currently are
724-
* in-progress so they can see new catalog contents made by the transaction
725-
* that just committed. This is necessary because those in-progress
726-
* transactions will use the new catalog's contents from here on (at the very
727-
* least everything they do needs to be compatible with newer catalog
728-
* contents).
723+
* Add a new Snapshot and invalidation messages to all transactions we're
724+
* decoding that currently are in-progress so they can see new catalog contents
725+
* made by the transaction that just committed. This is necessary because those
726+
* in-progress transactions will use the new catalog's contents from here on
727+
* (at the very least everything they do needs to be compatible with newer
728+
* catalog contents).
729729
*/
730730
static void
731-
SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
731+
SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
732732
{
733733
dlist_iter txn_i;
734734
ReorderBufferTXN *txn;
735735

736736
/*
737737
* Iterate through all toplevel transactions. This can include
738738
* subtransactions which we just don't yet know to be that, but that's
739-
* fine, they will just get an unnecessary snapshot queued.
739+
* fine, they will just get an unnecessary snapshot and invalidations
740+
* queued.
740741
*/
741742
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
742743
{
@@ -749,6 +750,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
749750
* transaction which in turn implies we don't yet need a snapshot at
750751
* all. We'll add a snapshot when the first change gets queued.
751752
*
753+
* Similarly, we don't need to add invalidations to a transaction whose
754+
* base snapshot is not yet set. Once a base snapshot is built, it will
755+
* include the xids of committed transactions that have modified the
756+
* catalog, thus reflecting the new catalog contents. The existing
757+
* catalog cache will have already been invalidated after processing
758+
* the invalidations in the transaction that modified catalogs,
759+
* ensuring that a fresh cache is constructed during decoding.
760+
*
752761
* NB: This works correctly even for subtransactions because
753762
* ReorderBufferAssignChild() takes care to transfer the base snapshot
754763
* to the top-level transaction, and while iterating the changequeue
@@ -758,13 +767,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
758767
continue;
759768

760769
/*
761-
* We don't need to add snapshot to prepared transactions as they
762-
* should not see the new catalog contents.
770+
* We don't need to add snapshot or invalidations to prepared
771+
* transactions as they should not see the new catalog contents.
763772
*/
764773
if (rbtxn_is_prepared(txn))
765774
continue;
766775

767-
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
776+
elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
768777
txn->xid, LSN_FORMAT_ARGS(lsn));
769778

770779
/*
@@ -774,6 +783,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
774783
SnapBuildSnapIncRefcount(builder->snapshot);
775784
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
776785
builder->snapshot);
786+
787+
/*
788+
* Add invalidation messages to the reorder buffer of in-progress
789+
* transactions except the current committed transaction, for which we
790+
* will execute invalidations at the end.
791+
*
792+
* It is required, otherwise, we will end up using the stale catcache
793+
* contents built by the current transaction even after its decoding,
794+
* which should have been invalidated due to concurrent catalog
795+
* changing transaction.
796+
*/
797+
if (txn->xid != xid)
798+
{
799+
uint32 ninvalidations;
800+
SharedInvalidationMessage *msgs = NULL;
801+
802+
ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
803+
xid, &msgs);
804+
805+
if (ninvalidations > 0)
806+
{
807+
Assert(msgs != NULL);
808+
809+
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
810+
ninvalidations, msgs);
811+
}
812+
}
777813
}
778814
}
779815

@@ -1045,8 +1081,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
10451081
/* refcount of the snapshot builder for the new snapshot */
10461082
SnapBuildSnapIncRefcount(builder->snapshot);
10471083

1048-
/* add a new catalog snapshot to all currently running transactions */
1049-
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1084+
/*
1085+
* Add a new catalog snapshot and invalidations messages to all
1086+
* currently running transactions.
1087+
*/
1088+
SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
10501089
}
10511090
}
10521091

src/include/replication/reorderbuffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
758758

759759
extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
760760

761+
extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
762+
TransactionId xid,
763+
SharedInvalidationMessage **msgs);
764+
761765
extern void StartupReorderBuffer(void);
762766

763767
#endif

0 commit comments

Comments
 (0)