Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/common/tupdesc.c15
-rw-r--r--src/backend/replication/logical/reorderbuffer.c196
-rw-r--r--src/backend/replication/logical/snapbuild.c12
-rw-r--r--src/include/replication/reorderbuffer.h16
4 files changed, 200 insertions, 39 deletions
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index ffd0c78f905..020d00cd01c 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -142,11 +142,18 @@ void
verify_compact_attribute(TupleDesc tupdesc, int attnum)
{
#ifdef USE_ASSERT_CHECKING
- CompactAttribute *cattr = &tupdesc->compact_attrs[attnum];
+ CompactAttribute cattr;
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum);
CompactAttribute tmp;
/*
+ * Make a temp copy of the TupleDesc's CompactAttribute. This may be a
+ * shared TupleDesc and the attcacheoff might get changed by another
+ * backend.
+ */
+ memcpy(&cattr, &tupdesc->compact_attrs[attnum], sizeof(CompactAttribute));
+
+ /*
* Populate the temporary CompactAttribute from the corresponding
* Form_pg_attribute
*/
@@ -156,11 +163,11 @@ verify_compact_attribute(TupleDesc tupdesc, int attnum)
* Make the attcacheoff match since it's been reset to -1 by
* populate_compact_attribute_internal. Same with attnullability.
*/
- tmp.attcacheoff = cattr->attcacheoff;
- tmp.attnullability = cattr->attnullability;
+ tmp.attcacheoff = cattr.attcacheoff;
+ tmp.attnullability = cattr.attnullability;
/* Check the freshly populated CompactAttribute matches the TupleDesc's */
- Assert(memcmp(&tmp, cattr, sizeof(CompactAttribute)) == 0);
+ Assert(memcmp(&tmp, &cattr, sizeof(CompactAttribute)) == 0);
#endif
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 67655111875..c4299c76fb1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -109,10 +109,22 @@
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+ ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ }
+
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations,
- txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
* We might have decoded changes for this transaction that could load
* the cache as per the current transaction's view (consider DDL's
* happened in this transaction). We don't want the decoding of future
- * transactions to use those cache entries so execute invalidations.
+ * transactions to use those cache entries so execute only the inval
+ * messages in this transaction.
*/
if (txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
txn->final_lsn = lsn;
/*
- * Process cache invalidation messages if there are any. Even if we're not
- * interested in the transaction's contents, it could have manipulated the
- * catalog and we need to update the caches according to that.
+ * Process only cache invalidation messages in this transaction if there
+ * are any. Even if we're not interested in the transaction's contents, it
+ * could have manipulated the catalog and we need to update the caches
+ * according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferChange *change;
+
+ change = ReorderBufferAllocChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+ uint32 *ninvals_out,
+ SharedInvalidationMessage *msgs_new,
+ Size nmsgs_new)
+{
+ if (*ninvals_out == 0)
+ {
+ *ninvals_out = nmsgs_new;
+ *invals_out = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+ memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+ }
+ else
+ {
+ /* Enlarge the array of inval messages */
+ *invals_out = (SharedInvalidationMessage *)
+ repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+ (*ninvals_out + nmsgs_new));
+ memcpy(*invals_out + *ninvals_out, msgs_new,
+ nmsgs_new * sizeof(SharedInvalidationMessage));
+ *ninvals_out += nmsgs_new;
+ }
+}
+
+/*
* Accumulate the invalidations for executing them later.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferTXN *txn;
MemoryContext oldcontext;
- ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
Assert(nmsgs > 0);
- /* Accumulate invalidations. */
- if (txn->ninvalidations == 0)
- {
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
- }
- else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations,
+ &txn->ninvalidations,
+ msgs, nmsgs);
+
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together. See comments
+ * ReorderBufferAddInvalidations.
+ */
+ txn = rbtxn_get_toptxn(txn);
+
+ Assert(nmsgs > 0);
+
+ if (!rbtxn_distr_inval_overflowed(txn))
{
- txn->invalidations = (SharedInvalidationMessage *)
- repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
- (txn->ninvalidations + nmsgs));
+ /*
+ * Check the transaction has enough space for storing distributed
+ * invalidation messages.
+ */
+ if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+ {
+ /*
+ * Mark the invalidation message as overflowed and free up the
+ * messages accumulated so far.
+ */
+ txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
- memcpy(txn->invalidations + txn->ninvalidations, msgs,
- nmsgs * sizeof(SharedInvalidationMessage));
- txn->ninvalidations += nmsgs;
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ txn->ninvalidations_distributed = 0;
+ }
+ }
+ else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+ &txn->ninvalidations_distributed,
+ msgs, nmsgs);
}
- change = ReorderBufferAllocChange(rb);
- change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
- change->data.inval.ninvalidations = nmsgs;
- change->data.inval.invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(change->data.inval.invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
-
- ReorderBufferQueueChange(rb, xid, lsn, change, false);
+ /* Queue the invalidation messages into the transaction */
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0d7bddbe4ed..adf18c397db 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
* contents built by the current transaction even after its decoding,
* which should have been invalidated due to concurrent catalog
* changing transaction.
+ *
+ * Distribute only the invalidation messages generated by the current
+ * committed transaction. Invalidation messages received from other
+ * transactions would have already been propagated to the relevant
+ * in-progress transactions. This transaction would have processed
+ * those invalidations, ensuring that subsequent transactions observe
+ * a consistent cache state.
*/
if (txn->xid != xid)
{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
{
Assert(msgs != NULL);
- ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
- ninvalidations, msgs);
+ ReorderBufferAddDistributedInvalidations(builder->reorder,
+ txn->xid, lsn,
+ ninvalidations, msgs);
}
}
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 24e88c409ba..fa0745552f8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -176,6 +176,7 @@ typedef struct ReorderBufferChange
#define RBTXN_SENT_PREPARE 0x0200
#define RBTXN_IS_COMMITTED 0x0400
#define RBTXN_IS_ABORTED 0x0800
+#define RBTXN_DISTR_INVAL_OVERFLOWED 0x1000
#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
@@ -265,6 +266,12 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
+/* Is the array of distributed inval messages overflowed? */
+#define rbtxn_distr_inval_overflowed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
+)
+
/* Is this a top-level transaction? */
#define rbtxn_is_toptxn(txn) \
( \
@@ -422,6 +429,12 @@ typedef struct ReorderBufferTXN
uint32 ninvalidations;
SharedInvalidationMessage *invalidations;
+ /*
+ * Stores cache invalidation messages distributed by other transactions.
+ */
+ uint32 ninvalidations_distributed;
+ SharedInvalidationMessage *invalidations_distributed;
+
/* ---
* Position in one of two lists:
* * list of subtransactions if we are *known* to be subxact
@@ -738,6 +751,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
CommandId cmin, CommandId cmax, CommandId combocid);
extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
+extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs);
extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);