From 8c2dbe63736f3dcab36e02dd2bf9682cfe668c7b Mon Sep 17 00:00:00 2001 From: Yura Sokolov Date: Mon, 3 Feb 2025 11:58:33 +0300 Subject: [PATCH] sinvaladt.c: use atomic operations on maxMsgNum msgnumLock spinlock could be contended. Comment states it was used as memory barrier. Lets use atomic ops with memory barriers directly instead. --- src/backend/storage/ipc/sinvaladt.c | 80 +++++++++++++---------------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index c5748b690f40..1d7dd0c6daf6 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -86,19 +86,7 @@ * has no need to touch anyone's ProcState, except in the infrequent cases * when SICleanupQueue is needed. The only point of overlap is that * the writer wants to change maxMsgNum while readers need to read it. - * We deal with that by having a spinlock that readers must take for just - * long enough to read maxMsgNum, while writers take it for just long enough - * to write maxMsgNum. (The exact rule is that you need the spinlock to - * read maxMsgNum if you are not holding SInvalWriteLock, and you need the - * spinlock to write maxMsgNum unless you are holding both locks.) - * - * Note: since maxMsgNum is an int and hence presumably atomically readable/ - * writable, the spinlock might seem unnecessary. The reason it is needed - * is to provide a memory barrier: we need to be sure that messages written - * to the array are actually there before maxMsgNum is increased, and that - * readers will see that data after fetching maxMsgNum. Multiprocessors - * that have weak memory-ordering guarantees can fail without the memory - * barrier instructions that are included in the spinlock sequences. + * We deal with that by using atomic operations with proper memory barriers. */ @@ -139,7 +127,7 @@ typedef struct ProcState /* procPid is zero in an inactive ProcState array entry. */ pid_t procPid; /* PID of backend, for signaling */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ - int nextMsgNum; /* next message number to read */ + uint32 nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ bool signaled; /* backend has been sent catchup signal */ bool hasMessages; /* backend has unread messages */ @@ -167,11 +155,9 @@ typedef struct SISeg /* * General state information */ - int minMsgNum; /* oldest message still needed */ - int maxMsgNum; /* next message number to be assigned */ - int nextThreshold; /* # of messages to call SICleanupQueue */ - - slock_t msgnumLock; /* spinlock protecting maxMsgNum */ + uint32 minMsgNum; /* oldest message still needed */ + pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */ + uint32 nextThreshold; /* # of messages to call SICleanupQueue */ /* * Circular buffer holding shared-inval messages @@ -244,9 +230,8 @@ SharedInvalShmemInit(void) /* Clear message counters, save size of procState array, init spinlock */ shmInvalBuffer->minMsgNum = 0; - shmInvalBuffer->maxMsgNum = 0; + pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0); shmInvalBuffer->nextThreshold = CLEANUP_MIN; - SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ @@ -304,7 +289,7 @@ SharedInvalBackendInit(bool sendOnly) /* mark myself active, with all extant messages already read */ stateP->procPid = MyProcPid; - stateP->nextMsgNum = segP->maxMsgNum; + stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum); stateP->resetState = false; stateP->signaled = false; stateP->hasMessages = false; @@ -383,8 +368,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) while (n > 0) { int nthistime = Min(n, WRITE_QUANTUM); - int numMsgs; - int max; + uint32 numMsgs; + uint32 max; int i; n -= nthistime; @@ -400,7 +385,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) */ for (;;) { - numMsgs = segP->maxMsgNum - segP->minMsgNum; + numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum; if (numMsgs + nthistime > MAXNUMMESSAGES || numMsgs >= segP->nextThreshold) SICleanupQueue(true, nthistime); @@ -411,17 +396,20 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) /* * Insert new message(s) into proper slot of circular buffer */ - max = segP->maxMsgNum; + max = pg_atomic_read_u32(&segP->maxMsgNum); while (nthistime-- > 0) { segP->buffer[max % MAXNUMMESSAGES] = *data++; max++; } - /* Update current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); - segP->maxMsgNum = max; - SpinLockRelease(&segP->msgnumLock); + /* + * We need to write maxMsgNum strictly after write of messages and + * strictly before write to hasMessages. Two write barriers could be + * used before and after. But to simplify code, write_membarrier is + * used. + */ + pg_atomic_write_membarrier_u32(&segP->maxMsgNum, max); /* * Now that the maxMsgNum change is globally visible, we give everyone @@ -474,7 +462,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) { SISeg *segP; ProcState *stateP; - int max; + uint32 max; int n; segP = shmInvalBuffer; @@ -507,10 +495,14 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) */ stateP->hasMessages = false; - /* Fetch current value of maxMsgNum using spinlock */ - SpinLockAcquire(&segP->msgnumLock); - max = segP->maxMsgNum; - SpinLockRelease(&segP->msgnumLock); + /* + * Full barrier before read of maxMsgNum is to synchronize against + * hasMessages=false. To synchronize message reads read barrier after is + * enough. + */ + pg_memory_barrier(); + max = pg_atomic_read_u32(&segP->maxMsgNum); + pg_read_barrier(); if (stateP->resetState) { @@ -577,11 +569,11 @@ void SICleanupQueue(bool callerHasWriteLock, int minFree) { SISeg *segP = shmInvalBuffer; - int min, + uint32 min, minsig, lowbound, - numMsgs, - i; + numMsgs; + int i; ProcState *needSig = NULL; /* Lock out all writers and readers */ @@ -596,14 +588,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) * backends here it is possible for them to keep sending messages without * a problem even when they are the only active backend. */ - min = segP->maxMsgNum; - minsig = min - SIG_THRESHOLD; - lowbound = min - MAXNUMMESSAGES + minFree; + min = pg_atomic_read_u32(&segP->maxMsgNum); + minsig = min - Min(min, SIG_THRESHOLD); + lowbound = min - Min(min, MAXNUMMESSAGES - minFree); for (i = 0; i < segP->numProcs; i++) { ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; - int n = stateP->nextMsgNum; + uint32 n = stateP->nextMsgNum; /* Ignore if already in reset state */ Assert(stateP->procPid != 0); @@ -642,7 +634,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) if (min >= MSGNUMWRAPAROUND) { segP->minMsgNum -= MSGNUMWRAPAROUND; - segP->maxMsgNum -= MSGNUMWRAPAROUND; + pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND); for (i = 0; i < segP->numProcs; i++) segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND; } @@ -651,7 +643,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) * Determine how many messages are still in the queue, and set the * threshold at which we should repeat SICleanupQueue(). */ - numMsgs = segP->maxMsgNum - segP->minMsgNum; + numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum; if (numMsgs < CLEANUP_MIN) segP->nextThreshold = CLEANUP_MIN; else