Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7b3d427

Browse files
author
Commitfest Bot
committed
[CF 4766] v20 - data loss bug in initial sync of logical replication
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/4766 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/OSCPR01MB14966DDB92FA7DA8FA8658216F5DE2@OSCPR01MB14966.jpnprd01.prod.outlook.com Author(s): Tomas Vondra
2 parents 91f1fe9 + 3723471 commit 7b3d427

File tree

7 files changed

+134
-15
lines changed

7 files changed

+134
-15
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
1111
twophase_snapshot slot_creation_error catalog_change_snapshot \
12-
skip_snapshot_restore
12+
skip_snapshot_restore invalidation_distrubution
1313

1414
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1515
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
4+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
5+
step s1_begin: BEGIN;
6+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
7+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
8+
step s1_commit: COMMIT;
9+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
10+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
11+
count
12+
-----
13+
1
14+
(1 row)
15+
16+
?column?
17+
--------
18+
stop
19+
(1 row)
20+

contrib/test_decoding/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ tests += {
6363
'twophase_snapshot',
6464
'slot_creation_error',
6565
'skip_snapshot_restore',
66+
'invalidation_distrubution',
6667
],
6768
'regress_args': [
6869
'--temp-config', files('logical.conf'),
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Test that catalog cache invalidation messages are distributed to ongoing
2+
# transactions, ensuring they can access the updated catalog content after
3+
# processing these messages.
4+
setup
5+
{
6+
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
7+
CREATE TABLE tbl1(val1 integer, val2 integer);
8+
CREATE PUBLICATION pub;
9+
}
10+
11+
teardown
12+
{
13+
DROP TABLE tbl1;
14+
DROP PUBLICATION pub;
15+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
16+
}
17+
18+
session "s1"
19+
setup { SET synchronous_commit=on; }
20+
21+
step "s1_begin" { BEGIN; }
22+
step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
23+
step "s1_commit" { COMMIT; }
24+
25+
session "s2"
26+
setup { SET synchronous_commit=on; }
27+
28+
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
29+
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
30+
31+
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
32+
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"

src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5460,3 +5460,26 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
54605460
*cmax = ent->cmax;
54615461
return true;
54625462
}
5463+
5464+
/*
5465+
* Count invalidation messages of specified transaction.
5466+
*
5467+
* Returns number of messages, and msgs is set to the pointer of the linked
5468+
* list for the messages.
5469+
*/
5470+
uint32
5471+
ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
5472+
SharedInvalidationMessage **msgs)
5473+
{
5474+
ReorderBufferTXN *txn;
5475+
5476+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5477+
false);
5478+
5479+
if (txn == NULL)
5480+
return 0;
5481+
5482+
*msgs = txn->invalidations;
5483+
5484+
return txn->ninvalidations;
5485+
}

src/backend/replication/logical/snapbuild.c

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
161161

162162
static void SnapBuildSnapIncRefcount(Snapshot snap);
163163

164-
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
164+
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
165165

166166
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
167167
uint32 xinfo);
@@ -720,23 +720,24 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
720720
}
721721

722722
/*
723-
* Add a new Snapshot to all transactions we're decoding that currently are
724-
* in-progress so they can see new catalog contents made by the transaction
725-
* that just committed. This is necessary because those in-progress
726-
* transactions will use the new catalog's contents from here on (at the very
727-
* least everything they do needs to be compatible with newer catalog
728-
* contents).
723+
* Add a new Snapshot and invalidation messages to all transactions we're
724+
* decoding that currently are in-progress so they can see new catalog contents
725+
* made by the transaction that just committed. This is necessary because those
726+
* in-progress transactions will use the new catalog's contents from here on
727+
* (at the very least everything they do needs to be compatible with newer
728+
* catalog contents).
729729
*/
730730
static void
731-
SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
731+
SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
732732
{
733733
dlist_iter txn_i;
734734
ReorderBufferTXN *txn;
735735

736736
/*
737737
* Iterate through all toplevel transactions. This can include
738738
* subtransactions which we just don't yet know to be that, but that's
739-
* fine, they will just get an unnecessary snapshot queued.
739+
* fine, they will just get an unnecessary snapshot and invalidations
740+
* queued.
740741
*/
741742
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
742743
{
@@ -749,6 +750,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
749750
* transaction which in turn implies we don't yet need a snapshot at
750751
* all. We'll add a snapshot when the first change gets queued.
751752
*
753+
* Similarly, we don't need to add invalidations to a transaction whose
754+
* base snapshot is not yet set. Once a base snapshot is built, it will
755+
* include the xids of committed transactions that have modified the
756+
* catalog, thus reflecting the new catalog contents. The existing
757+
* catalog cache will have already been invalidated after processing
758+
* the invalidations in the transaction that modified catalogs,
759+
* ensuring that a fresh cache is constructed during decoding.
760+
*
752761
* NB: This works correctly even for subtransactions because
753762
* ReorderBufferAssignChild() takes care to transfer the base snapshot
754763
* to the top-level transaction, and while iterating the changequeue
@@ -758,13 +767,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
758767
continue;
759768

760769
/*
761-
* We don't need to add snapshot to prepared transactions as they
762-
* should not see the new catalog contents.
770+
* We don't need to add snapshot or invalidations to prepared
771+
* transactions as they should not see the new catalog contents.
763772
*/
764773
if (rbtxn_is_prepared(txn))
765774
continue;
766775

767-
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
776+
elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
768777
txn->xid, LSN_FORMAT_ARGS(lsn));
769778

770779
/*
@@ -774,6 +783,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
774783
SnapBuildSnapIncRefcount(builder->snapshot);
775784
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
776785
builder->snapshot);
786+
787+
/*
788+
* Add invalidation messages to the reorder buffer of in-progress
789+
* transactions except the current committed transaction, for which we
790+
* will execute invalidations at the end.
791+
*
792+
* It is required, otherwise, we will end up using the stale catcache
793+
* contents built by the current transaction even after its decoding,
794+
* which should have been invalidated due to concurrent catalog
795+
* changing transaction.
796+
*/
797+
if (txn->xid != xid)
798+
{
799+
uint32 ninvalidations;
800+
SharedInvalidationMessage *msgs = NULL;
801+
802+
ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
803+
xid, &msgs);
804+
805+
if (ninvalidations > 0)
806+
{
807+
Assert(msgs != NULL);
808+
809+
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
810+
ninvalidations, msgs);
811+
}
812+
}
777813
}
778814
}
779815

@@ -1045,8 +1081,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
10451081
/* refcount of the snapshot builder for the new snapshot */
10461082
SnapBuildSnapIncRefcount(builder->snapshot);
10471083

1048-
/* add a new catalog snapshot to all currently running transactions */
1049-
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1084+
/*
1085+
* Add a new catalog snapshot and invalidations messages to all
1086+
* currently running transactions.
1087+
*/
1088+
SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
10501089
}
10511090
}
10521091

src/include/replication/reorderbuffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
758758

759759
extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
760760

761+
extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
762+
TransactionId xid,
763+
SharedInvalidationMessage **msgs);
764+
761765
extern void StartupReorderBuffer(void);
762766

763767
#endif

0 commit comments

Comments
 (0)