--- /dev/null
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
--- /dev/null
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution. This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter. One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/*
+ * Array of transactions and subtransactions that were running when
+ * the xl_running_xacts record that we decoded was written. The array is
+ * sorted in xidComparator order. We remove xids from this array when
+ * they become old enough to matter, and then it eventually becomes empty.
+ * This array is allocated in builder->context so its lifetime is the same
+ * as the snapshot builder.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that the
+ * logical decoding decodes only the commit record of the transaction after
+ * restoring the previously serialized snapshot in which case we will miss
+ * adding the xid to the snapshot and end up looking at the catalogs with the
+ * wrong snapshot.
+ *
+ * Now to avoid the above problem, if the COMMIT record of the xid listed in
+ * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top
+ * transaction and its substransactions as containing catalog changes.
+ *
+ * We could end up adding the transaction that didn't change catalog
+ * to the snapshot since we cannot distinguish whether the transaction
+ * has catalog changes only by checking the COMMIT record. It doesn't
+ * have the information on which (sub) transaction has catalog changes,
+ * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
+ * transaction has catalog change. But that won't be a problem since we
+ * use snapshot built during decoding only for reading system catalogs.
+ */
+static TransactionId *InitialRunningXacts = NULL;
+static int NInitialRunningXacts = 0;
+
+/* ->committed and InitailRunningXacts manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
}
/*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed and the initial
+ * running transactions that are smaller than ->xmin. Those won't ever get
+ * checked via the ->committed or InitialRunningXacts array, respectively.
+ * The committed xids will get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from InitialRunningXacts array
+ * once it is finished (committed/aborted) but that could be costly as we need
+ * to maintain the xids order in the array.
*/
static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
builder->committed.xcnt = surviving_xids;
pfree(workspace);
+
+ /* Quick exit if there is no initial running transactions */
+ if (NInitialRunningXacts == 0)
+ return;
+
+ /* bound check if there is at least one transaction to remove */
+ if (!NormalTransactionIdPrecedes(InitialRunningXacts[0],
+ builder->xmin))
+ return;
+
+ /*
+ * purge xids in InitialRunningXacts as well. The purged array must also
+ * be sorted in xidComparator order.
+ */
+ workspace =
+ MemoryContextAlloc(builder->context,
+ NInitialRunningXacts * sizeof(TransactionId));
+ surviving_xids = 0;
+ for (off = 0; off < NInitialRunningXacts; off++)
+ {
+ if (NormalTransactionIdPrecedes(InitialRunningXacts[off],
+ builder->xmin))
+ ; /* remove */
+ else
+ workspace[surviving_xids++] = InitialRunningXacts[off];
+ }
+
+ if (surviving_xids > 0)
+ memcpy(InitialRunningXacts, workspace,
+ sizeof(TransactionId) * surviving_xids);
+ else
+ {
+ pfree(InitialRunningXacts);
+ InitialRunningXacts = NULL;
+ }
+
+ elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u",
+ (uint32) NInitialRunningXacts,
+ (uint32) surviving_xids,
+ builder->xmin);
+
+ NInitialRunningXacts = surviving_xids;
+ pfree(workspace);
}
/*
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
- SnapBuildPurgeCommittedTxn(builder);
+ SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
+ int nxacts = running->subxcnt + running->xcnt;
+ Size sz = sizeof(TransactionId) * nxacts;
+
+ /*
+ * Remember the transactions and subtransactions that were running
+ * when xl_running_xacts record that we decoded was written. We use
+ * this later to identify the transactions have performed catalog
+ * changes. See SnapBuildXidSetCatalogChanges.
+ */
+ NInitialRunningXacts = nxacts;
+ InitialRunningXacts = MemoryContextAlloc(builder->context, sz);
+ memcpy(InitialRunningXacts, running->xids, sz);
+ qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator);
+
/* there won't be any state to cleanup */
return false;
}
}
FreeDir(snap_dir);
}
+
+/*
+ * If the given xid is in the list of the initial running xacts, we mark the
+ * transaction and its subtransactions as containing catalog changes. See
+ * comments for NInitialRunningXacts and InitialRunningXacts for additional
+ * info.
+ */
+void
+SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt,
+ TransactionId *subxacts, XLogRecPtr lsn)
+{
+ /*
+ * Skip if there is no initial running xacts information or the
+ * transaction is already marked as containing catalog changes.
+ */
+ if (NInitialRunningXacts == 0 ||
+ ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ return;
+
+ if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,
+ sizeof(TransactionId), xidComparator) != NULL)
+ {
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+
+ for (int i = 0; i < subxcnt; i++)
+ {
+ ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
+ }
+ }
+}