diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/common/tupdesc.c | 15 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 196 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 12 | ||||
-rw-r--r-- | src/backend/storage/aio/aio.c | 17 | ||||
-rw-r--r-- | src/backend/storage/aio/aio_callback.c | 7 | ||||
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 7 |
6 files changed, 214 insertions, 40 deletions
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index ffd0c78f905..020d00cd01c 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -142,11 +142,18 @@ void verify_compact_attribute(TupleDesc tupdesc, int attnum) { #ifdef USE_ASSERT_CHECKING - CompactAttribute *cattr = &tupdesc->compact_attrs[attnum]; + CompactAttribute cattr; Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum); CompactAttribute tmp; /* + * Make a temp copy of the TupleDesc's CompactAttribute. This may be a + * shared TupleDesc and the attcacheoff might get changed by another + * backend. + */ + memcpy(&cattr, &tupdesc->compact_attrs[attnum], sizeof(CompactAttribute)); + + /* * Populate the temporary CompactAttribute from the corresponding * Form_pg_attribute */ @@ -156,11 +163,11 @@ verify_compact_attribute(TupleDesc tupdesc, int attnum) * Make the attcacheoff match since it's been reset to -1 by * populate_compact_attribute_internal. Same with attnullability. */ - tmp.attcacheoff = cattr->attcacheoff; - tmp.attnullability = cattr->attnullability; + tmp.attcacheoff = cattr.attcacheoff; + tmp.attnullability = cattr.attnullability; /* Check the freshly populated CompactAttribute matches the TupleDesc's */ - Assert(memcmp(&tmp, cattr, sizeof(CompactAttribute)) == 0); + Assert(memcmp(&tmp, &cattr, sizeof(CompactAttribute)) == 0); #endif } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 67655111875..c4299c76fb1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -109,10 +109,22 @@ #include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * Each transaction has an 8MB limit for invalidation messages distributed from + * other transactions. This limit is set considering scenarios with many + * concurrent logical decoding operations. When the distributed invalidation + * messages reach this threshold, the transaction is marked as + * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost + * some inval messages and hence don't know what needs to be invalidated. + */ +#define MAX_DISTR_INVAL_MSG_PER_TXN \ + ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage)) + /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + } + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); @@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, - txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, * We might have decoded changes for this transaction that could load * the cache as per the current transaction's view (consider DDL's * happened in this transaction). We don't want the decoding of future - * transactions to use those cache entries so execute invalidations. + * transactions to use those cache entries so execute only the inval + * messages in this transaction. */ if (txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) txn->final_lsn = lsn; /* - * Process cache invalidation messages if there are any. Even if we're not - * interested in the transaction's contents, it could have manipulated the - * catalog and we need to update the caches according to that. + * Process only cache invalidation messages in this transaction if there + * are any. Even if we're not interested in the transaction's contents, it + * could have manipulated the catalog and we need to update the caches + * according to that. */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, } /* + * Add new invalidation messages to the reorder buffer queue. + */ +static void +ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferChange *change; + + change = ReorderBufferAllocChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); +} + +/* + * A helper function for ReorderBufferAddInvalidations() and + * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation + * messages to the **invals_out. + */ +static void +ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, + uint32 *ninvals_out, + SharedInvalidationMessage *msgs_new, + Size nmsgs_new) +{ + if (*ninvals_out == 0) + { + *ninvals_out = nmsgs_new; + *invals_out = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs_new); + memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new); + } + else + { + /* Enlarge the array of inval messages */ + *invals_out = (SharedInvalidationMessage *) + repalloc(*invals_out, sizeof(SharedInvalidationMessage) * + (*ninvals_out + nmsgs_new)); + memcpy(*invals_out + *ninvals_out, msgs_new, + nmsgs_new * sizeof(SharedInvalidationMessage)); + *ninvals_out += nmsgs_new; + } +} + +/* * Accumulate the invalidations for executing them later. * * This needs to be called for each XLOG_XACT_INVALIDATIONS message and @@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { ReorderBufferTXN *txn; MemoryContext oldcontext; - ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) - { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else + ReorderBufferAccumulateInvalidations(&txn->invalidations, + &txn->ninvalidations, + msgs, nmsgs); + + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Accumulate the invalidations distributed by other committed transactions + * for executing them later. + * + * This function is similar to ReorderBufferAddInvalidations() but stores + * the given inval messages to the txn->invalidations_distributed with the + * overflow check. + * + * This needs to be called by committed transactions to distribute their + * inval messages to in-progress transactions. + */ +void +ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferTXN *txn; + MemoryContext oldcontext; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction, if available, + * so that we can execute them all together. See comments + * ReorderBufferAddInvalidations. + */ + txn = rbtxn_get_toptxn(txn); + + Assert(nmsgs > 0); + + if (!rbtxn_distr_inval_overflowed(txn)) { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + /* + * Check the transaction has enough space for storing distributed + * invalidation messages. + */ + if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN) + { + /* + * Mark the invalidation message as overflowed and free up the + * messages accumulated so far. + */ + txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED; - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + txn->ninvalidations_distributed = 0; + } + } + else + ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed, + &txn->ninvalidations_distributed, + msgs, nmsgs); } - change = ReorderBufferAllocChange(rb); - change->action = REORDER_BUFFER_CHANGE_INVALIDATION; - change->data.inval.ninvalidations = nmsgs; - change->data.inval.invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(change->data.inval.invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - - ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* Queue the invalidation messages into the transaction */ + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 0d7bddbe4ed..adf18c397db 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact * contents built by the current transaction even after its decoding, * which should have been invalidated due to concurrent catalog * changing transaction. + * + * Distribute only the invalidation messages generated by the current + * committed transaction. Invalidation messages received from other + * transactions would have already been propagated to the relevant + * in-progress transactions. This transaction would have processed + * those invalidations, ensuring that subsequent transactions observe + * a consistent cache state. */ if (txn->xid != xid) { @@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddDistributedInvalidations(builder->reorder, + txn->xid, lsn, + ninvalidations, msgs); } } } diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 6c6c0a908e2..3643f27ad6e 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -556,6 +556,13 @@ bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state) { *state = ioh->state; + + /* + * Ensure that we don't see an earlier state of the handle than ioh->state + * due to compiler or CPU reordering. This protects both ->generation as + * directly used here, and other fields in the handle accessed in the + * caller if the handle was not reused. + */ pg_read_barrier(); return ioh->generation != ref_generation; @@ -773,7 +780,12 @@ pgaio_io_wait_for_free(void) * Note that no interrupts are processed between the state check * and the call to reclaim - that's important as otherwise an * interrupt could have already reclaimed the handle. + * + * Need to ensure that there's no reordering, in the more common + * paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). */ + pg_read_barrier(); pgaio_io_reclaim(ioh); reclaimed++; } @@ -852,7 +864,12 @@ pgaio_io_wait_for_free(void) * check and the call to reclaim - that's important as * otherwise an interrupt could have already reclaimed the * handle. + * + * Need to ensure that there's no reordering, in the more + * common paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). */ + pg_read_barrier(); pgaio_io_reclaim(ioh); break; } diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 0ad9795bb7e..03c9bba0802 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -256,6 +256,9 @@ pgaio_io_call_complete_shared(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_shared(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } ioh->distilled_result = result; @@ -290,6 +293,7 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) /* start with distilled result from shared callback */ result = ioh->distilled_result; + Assert(result.status != PGAIO_RS_UNKNOWN); for (int i = ioh->num_callbacks; i > 0; i--) { @@ -306,6 +310,9 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_local(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } /* diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 743cccc2acd..36be179678d 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -461,7 +461,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) int nwakeups = 0; int worker; - /* Try to get a job to do. */ + /* + * Try to get a job to do. + * + * The lwlock acquisition also provides the necessary memory barrier + * to ensure that we don't see an outdated data in the handle. + */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) { |