Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 272248a

Browse files
author
Amit Kapila
committed
Fix catalog lookup with the wrong snapshot during logical decoding.
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, after the restart, if the logical decoding decodes only the commit record of the transaction that has actually modified a catalog, we will miss adding its XID to the snapshot. Thus, we will end up looking at catalogs with the wrong snapshot. To fix this problem, this changes the snapshot builder so that it remembers the last-running-xacts list of the decoded RUNNING_XACTS record after restoring the previously serialized snapshot. Then, we mark the transaction as containing catalog changes if it's in the list of initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. To avoid ABI breakage, we store the array of the initial running transactions in the static variables InitialRunningXacts and NInitialRunningXacts, instead of storing those in SnapBuild or ReorderBuffer. This approach has a false positive; we could end up adding the transaction that didn't change catalog to the snapshot since we cannot distinguish whether the transaction has catalog changes only by checking the COMMIT record. It doesn't have the information on which (sub) transaction has catalog changes, and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the transaction has catalog change. But that won't be a problem since we use snapshot built during decoding only to read system catalogs. On the master branch, we took a more future-proof approach by writing catalog modifying transactions to the serialized snapshot which avoids the above false positive. But we cannot backpatch it because of a change in the SnapBuild. Reported-by: Mike Oh Author: Masahiko Sawada Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi Backpatch-through: 10 Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
1 parent f88798c commit 272248a

File tree

6 files changed

+232
-8
lines changed

6 files changed

+232
-8
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
88
spill slot truncate stream stats twophase twophase_stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
11-
twophase_snapshot slot_creation_error
11+
twophase_snapshot slot_creation_error catalog_change_snapshot
1212

1313
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1414
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
4+
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
5+
?column?
6+
--------
7+
init
8+
(1 row)
9+
10+
step s0_begin: BEGIN;
11+
step s0_savepoint: SAVEPOINT sp1;
12+
step s0_truncate: TRUNCATE tbl1;
13+
step s1_checkpoint: CHECKPOINT;
14+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
15+
data
16+
----
17+
(0 rows)
18+
19+
step s0_commit: COMMIT;
20+
step s0_begin: BEGIN;
21+
step s0_insert: INSERT INTO tbl1 VALUES (1);
22+
step s1_checkpoint: CHECKPOINT;
23+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
24+
data
25+
---------------------------------------
26+
BEGIN
27+
table public.tbl1: TRUNCATE: (no-flags)
28+
COMMIT
29+
(3 rows)
30+
31+
step s0_commit: COMMIT;
32+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
33+
data
34+
-------------------------------------------------------------
35+
BEGIN
36+
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
37+
COMMIT
38+
(3 rows)
39+
40+
?column?
41+
--------
42+
stop
43+
(1 row)
44+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Test decoding only the commit record of the transaction that have
2+
# modified catalogs.
3+
setup
4+
{
5+
DROP TABLE IF EXISTS tbl1;
6+
CREATE TABLE tbl1 (val1 integer, val2 integer);
7+
}
8+
9+
teardown
10+
{
11+
DROP TABLE tbl1;
12+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
13+
}
14+
15+
session "s0"
16+
setup { SET synchronous_commit=on; }
17+
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
18+
step "s0_begin" { BEGIN; }
19+
step "s0_savepoint" { SAVEPOINT sp1; }
20+
step "s0_truncate" { TRUNCATE tbl1; }
21+
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
22+
step "s0_commit" { COMMIT; }
23+
24+
session "s1"
25+
setup { SET synchronous_commit=on; }
26+
step "s1_checkpoint" { CHECKPOINT; }
27+
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
28+
29+
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
30+
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
31+
# during the first checkpoint execution. This transaction must be marked as
32+
# containing catalog changes while decoding the COMMIT record and the decoding
33+
# of the INSERT record must read the pg_class with the correct historic snapshot.
34+
#
35+
# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
36+
# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
37+
# record written by bgwriter. One might think we can either stop the bgwriter or
38+
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
39+
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"

src/backend/replication/logical/decode.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
627627
commit_time = parsed->origin_timestamp;
628628
}
629629

630+
/*
631+
* If the COMMIT record has invalidation messages, it could have catalog
632+
* changes. It is possible that we didn't mark this transaction as
633+
* containing catalog changes when the decoding starts from a commit
634+
* record without decoding the transaction's other changes. So, we ensure
635+
* to mark such transactions as containing catalog change.
636+
*
637+
* This must be done before SnapBuildCommitTxn() so that we can include
638+
* these transactions in the historic snapshot.
639+
*/
640+
if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
641+
SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid,
642+
parsed->nsubxacts, parsed->subxacts,
643+
buf->origptr);
644+
630645
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
631646
parsed->nsubxacts, parsed->subxacts);
632647

src/backend/replication/logical/snapbuild.c

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,38 @@ struct SnapBuild
250250
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
251251
static bool ExportInProgress = false;
252252

253-
/* ->committed manipulation */
254-
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
253+
/*
254+
* Array of transactions and subtransactions that were running when
255+
* the xl_running_xacts record that we decoded was written. The array is
256+
* sorted in xidComparator order. We remove xids from this array when
257+
* they become old enough to matter, and then it eventually becomes empty.
258+
* This array is allocated in builder->context so its lifetime is the same
259+
* as the snapshot builder.
260+
*
261+
* We normally rely on some WAL record types such as HEAP2_NEW_CID to know
262+
* if the transaction has changed the catalog. But it could happen that the
263+
* logical decoding decodes only the commit record of the transaction after
264+
* restoring the previously serialized snapshot in which case we will miss
265+
* adding the xid to the snapshot and end up looking at the catalogs with the
266+
* wrong snapshot.
267+
*
268+
* Now to avoid the above problem, if the COMMIT record of the xid listed in
269+
* InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top
270+
* transaction and its substransactions as containing catalog changes.
271+
*
272+
* We could end up adding the transaction that didn't change catalog
273+
* to the snapshot since we cannot distinguish whether the transaction
274+
* has catalog changes only by checking the COMMIT record. It doesn't
275+
* have the information on which (sub) transaction has catalog changes,
276+
* and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
277+
* transaction has catalog change. But that won't be a problem since we
278+
* use snapshot built during decoding only for reading system catalogs.
279+
*/
280+
static TransactionId *InitialRunningXacts = NULL;
281+
static int NInitialRunningXacts = 0;
282+
283+
/* ->committed and InitailRunningXacts manipulation */
284+
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
255285

256286
/* snapshot building/manipulation/distribution functions */
257287
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -888,12 +918,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
888918
}
889919

890920
/*
891-
* Remove knowledge about transactions we treat as committed that are smaller
892-
* than ->xmin. Those won't ever get checked via the ->committed array but via
893-
* the clog machinery, so we don't need to waste memory on them.
921+
* Remove knowledge about transactions we treat as committed and the initial
922+
* running transactions that are smaller than ->xmin. Those won't ever get
923+
* checked via the ->committed or InitialRunningXacts array, respectively.
924+
* The committed xids will get checked via the clog machinery.
925+
*
926+
* We can ideally remove the transaction from InitialRunningXacts array
927+
* once it is finished (committed/aborted) but that could be costly as we need
928+
* to maintain the xids order in the array.
894929
*/
895930
static void
896-
SnapBuildPurgeCommittedTxn(SnapBuild *builder)
931+
SnapBuildPurgeOlderTxn(SnapBuild *builder)
897932
{
898933
int off;
899934
TransactionId *workspace;
@@ -928,6 +963,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
928963
builder->committed.xcnt = surviving_xids;
929964

930965
pfree(workspace);
966+
967+
/* Quick exit if there is no initial running transactions */
968+
if (NInitialRunningXacts == 0)
969+
return;
970+
971+
/* bound check if there is at least one transaction to remove */
972+
if (!NormalTransactionIdPrecedes(InitialRunningXacts[0],
973+
builder->xmin))
974+
return;
975+
976+
/*
977+
* purge xids in InitialRunningXacts as well. The purged array must also
978+
* be sorted in xidComparator order.
979+
*/
980+
workspace =
981+
MemoryContextAlloc(builder->context,
982+
NInitialRunningXacts * sizeof(TransactionId));
983+
surviving_xids = 0;
984+
for (off = 0; off < NInitialRunningXacts; off++)
985+
{
986+
if (NormalTransactionIdPrecedes(InitialRunningXacts[off],
987+
builder->xmin))
988+
; /* remove */
989+
else
990+
workspace[surviving_xids++] = InitialRunningXacts[off];
991+
}
992+
993+
if (surviving_xids > 0)
994+
memcpy(InitialRunningXacts, workspace,
995+
sizeof(TransactionId) * surviving_xids);
996+
else
997+
{
998+
pfree(InitialRunningXacts);
999+
InitialRunningXacts = NULL;
1000+
}
1001+
1002+
elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u",
1003+
(uint32) NInitialRunningXacts,
1004+
(uint32) surviving_xids,
1005+
builder->xmin);
1006+
1007+
NInitialRunningXacts = surviving_xids;
1008+
pfree(workspace);
9311009
}
9321010

9331011
/*
@@ -1135,7 +1213,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
11351213
builder->xmin = running->oldestRunningXid;
11361214

11371215
/* Remove transactions we don't need to keep track off anymore */
1138-
SnapBuildPurgeCommittedTxn(builder);
1216+
SnapBuildPurgeOlderTxn(builder);
11391217

11401218
/*
11411219
* Advance the xmin limit for the current replication slot, to allow
@@ -1286,6 +1364,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12861364
else if (!builder->building_full_snapshot &&
12871365
SnapBuildRestore(builder, lsn))
12881366
{
1367+
int nxacts = running->subxcnt + running->xcnt;
1368+
Size sz = sizeof(TransactionId) * nxacts;
1369+
1370+
/*
1371+
* Remember the transactions and subtransactions that were running
1372+
* when xl_running_xacts record that we decoded was written. We use
1373+
* this later to identify the transactions have performed catalog
1374+
* changes. See SnapBuildXidSetCatalogChanges.
1375+
*/
1376+
NInitialRunningXacts = nxacts;
1377+
InitialRunningXacts = MemoryContextAlloc(builder->context, sz);
1378+
memcpy(InitialRunningXacts, running->xids, sz);
1379+
qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator);
1380+
12891381
/* there won't be any state to cleanup */
12901382
return false;
12911383
}
@@ -2000,3 +2092,34 @@ CheckPointSnapBuild(void)
20002092
}
20012093
FreeDir(snap_dir);
20022094
}
2095+
2096+
/*
2097+
* If the given xid is in the list of the initial running xacts, we mark the
2098+
* transaction and its subtransactions as containing catalog changes. See
2099+
* comments for NInitialRunningXacts and InitialRunningXacts for additional
2100+
* info.
2101+
*/
2102+
void
2103+
SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt,
2104+
TransactionId *subxacts, XLogRecPtr lsn)
2105+
{
2106+
/*
2107+
* Skip if there is no initial running xacts information or the
2108+
* transaction is already marked as containing catalog changes.
2109+
*/
2110+
if (NInitialRunningXacts == 0 ||
2111+
ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
2112+
return;
2113+
2114+
if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,
2115+
sizeof(TransactionId), xidComparator) != NULL)
2116+
{
2117+
ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
2118+
2119+
for (int i = 0; i < subxcnt; i++)
2120+
{
2121+
ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn);
2122+
ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
2123+
}
2124+
}
2125+
}

src/include/replication/snapbuild.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
9191
struct xl_running_xacts *running);
9292
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
9393

94+
extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid,
95+
int subxcnt, TransactionId *subxacts,
96+
XLogRecPtr lsn);
9497
#endif /* SNAPBUILD_H */

0 commit comments

Comments
 (0)