86
86
* has no need to touch anyone's ProcState, except in the infrequent cases
87
87
* when SICleanupQueue is needed. The only point of overlap is that
88
88
* the writer wants to change maxMsgNum while readers need to read it.
89
- * We deal with that by having a spinlock that readers must take for just
90
- * long enough to read maxMsgNum, while writers take it for just long enough
91
- * to write maxMsgNum. (The exact rule is that you need the spinlock to
92
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93
- * spinlock to write maxMsgNum unless you are holding both locks.)
94
- *
95
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
96
- * writable, the spinlock might seem unnecessary. The reason it is needed
97
- * is to provide a memory barrier: we need to be sure that messages written
98
- * to the array are actually there before maxMsgNum is increased, and that
99
- * readers will see that data after fetching maxMsgNum. Multiprocessors
100
- * that have weak memory-ordering guarantees can fail without the memory
101
- * barrier instructions that are included in the spinlock sequences.
89
+ * We deal with that by using atomic operations with proper memory barriers.
102
90
*/
103
91
104
92
@@ -139,7 +127,7 @@ typedef struct ProcState
139
127
/* procPid is zero in an inactive ProcState array entry. */
140
128
pid_t procPid ; /* PID of backend, for signaling */
141
129
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
142
- int nextMsgNum ; /* next message number to read */
130
+ uint32 nextMsgNum ; /* next message number to read */
143
131
bool resetState ; /* backend needs to reset its state */
144
132
bool signaled ; /* backend has been sent catchup signal */
145
133
bool hasMessages ; /* backend has unread messages */
@@ -167,11 +155,9 @@ typedef struct SISeg
167
155
/*
168
156
* General state information
169
157
*/
170
- int minMsgNum ; /* oldest message still needed */
171
- int maxMsgNum ; /* next message number to be assigned */
172
- int nextThreshold ; /* # of messages to call SICleanupQueue */
173
-
174
- slock_t msgnumLock ; /* spinlock protecting maxMsgNum */
158
+ uint32 minMsgNum ; /* oldest message still needed */
159
+ pg_atomic_uint32 maxMsgNum ; /* next message number to be assigned */
160
+ uint32 nextThreshold ; /* # of messages to call SICleanupQueue */
175
161
176
162
/*
177
163
* Circular buffer holding shared-inval messages
@@ -244,9 +230,8 @@ SharedInvalShmemInit(void)
244
230
245
231
/* Clear message counters, save size of procState array, init spinlock */
246
232
shmInvalBuffer -> minMsgNum = 0 ;
247
- shmInvalBuffer -> maxMsgNum = 0 ;
233
+ pg_atomic_init_u32 ( & shmInvalBuffer -> maxMsgNum , 0 ) ;
248
234
shmInvalBuffer -> nextThreshold = CLEANUP_MIN ;
249
- SpinLockInit (& shmInvalBuffer -> msgnumLock );
250
235
251
236
/* The buffer[] array is initially all unused, so we need not fill it */
252
237
@@ -304,7 +289,7 @@ SharedInvalBackendInit(bool sendOnly)
304
289
305
290
/* mark myself active, with all extant messages already read */
306
291
stateP -> procPid = MyProcPid ;
307
- stateP -> nextMsgNum = segP -> maxMsgNum ;
292
+ stateP -> nextMsgNum = pg_atomic_read_u32 ( & segP -> maxMsgNum ) ;
308
293
stateP -> resetState = false;
309
294
stateP -> signaled = false;
310
295
stateP -> hasMessages = false;
@@ -383,8 +368,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
383
368
while (n > 0 )
384
369
{
385
370
int nthistime = Min (n , WRITE_QUANTUM );
386
- int numMsgs ;
387
- int max ;
371
+ uint32 numMsgs ;
372
+ uint32 max ;
388
373
int i ;
389
374
390
375
n -= nthistime ;
@@ -400,7 +385,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
400
385
*/
401
386
for (;;)
402
387
{
403
- numMsgs = segP -> maxMsgNum - segP -> minMsgNum ;
388
+ numMsgs = pg_atomic_read_u32 ( & segP -> maxMsgNum ) - segP -> minMsgNum ;
404
389
if (numMsgs + nthistime > MAXNUMMESSAGES ||
405
390
numMsgs >= segP -> nextThreshold )
406
391
SICleanupQueue (true, nthistime );
@@ -411,17 +396,20 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
411
396
/*
412
397
* Insert new message(s) into proper slot of circular buffer
413
398
*/
414
- max = segP -> maxMsgNum ;
399
+ max = pg_atomic_read_u32 ( & segP -> maxMsgNum ) ;
415
400
while (nthistime -- > 0 )
416
401
{
417
402
segP -> buffer [max % MAXNUMMESSAGES ] = * data ++ ;
418
403
max ++ ;
419
404
}
420
405
421
- /* Update current value of maxMsgNum using spinlock */
422
- SpinLockAcquire (& segP -> msgnumLock );
423
- segP -> maxMsgNum = max ;
424
- SpinLockRelease (& segP -> msgnumLock );
406
+ /*
407
+ * We need to write maxMsgNum strictly after write of messages and
408
+ * strictly before write to hasMessages. Two write barriers could be
409
+ * used before and after. But to simplify code, write_membarrier is
410
+ * used.
411
+ */
412
+ pg_atomic_write_membarrier_u32 (& segP -> maxMsgNum , max );
425
413
426
414
/*
427
415
* Now that the maxMsgNum change is globally visible, we give everyone
@@ -474,7 +462,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
474
462
{
475
463
SISeg * segP ;
476
464
ProcState * stateP ;
477
- int max ;
465
+ uint32 max ;
478
466
int n ;
479
467
480
468
segP = shmInvalBuffer ;
@@ -507,10 +495,14 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
507
495
*/
508
496
stateP -> hasMessages = false;
509
497
510
- /* Fetch current value of maxMsgNum using spinlock */
511
- SpinLockAcquire (& segP -> msgnumLock );
512
- max = segP -> maxMsgNum ;
513
- SpinLockRelease (& segP -> msgnumLock );
498
+ /*
499
+ * Full barrier before read of maxMsgNum is to synchronize against
500
+ * hasMessages=false. To synchronize message reads read barrier after is
501
+ * enough.
502
+ */
503
+ pg_memory_barrier ();
504
+ max = pg_atomic_read_u32 (& segP -> maxMsgNum );
505
+ pg_read_barrier ();
514
506
515
507
if (stateP -> resetState )
516
508
{
@@ -577,11 +569,11 @@ void
577
569
SICleanupQueue (bool callerHasWriteLock , int minFree )
578
570
{
579
571
SISeg * segP = shmInvalBuffer ;
580
- int min ,
572
+ uint32 min ,
581
573
minsig ,
582
574
lowbound ,
583
- numMsgs ,
584
- i ;
575
+ numMsgs ;
576
+ int i ;
585
577
ProcState * needSig = NULL ;
586
578
587
579
/* Lock out all writers and readers */
@@ -596,14 +588,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
596
588
* backends here it is possible for them to keep sending messages without
597
589
* a problem even when they are the only active backend.
598
590
*/
599
- min = segP -> maxMsgNum ;
600
- minsig = min - SIG_THRESHOLD ;
601
- lowbound = min - MAXNUMMESSAGES + minFree ;
591
+ min = pg_atomic_read_u32 ( & segP -> maxMsgNum ) ;
592
+ minsig = min - Min ( min , SIG_THRESHOLD ) ;
593
+ lowbound = min - Min ( min , MAXNUMMESSAGES - minFree ) ;
602
594
603
595
for (i = 0 ; i < segP -> numProcs ; i ++ )
604
596
{
605
597
ProcState * stateP = & segP -> procState [segP -> pgprocnos [i ]];
606
- int n = stateP -> nextMsgNum ;
598
+ uint32 n = stateP -> nextMsgNum ;
607
599
608
600
/* Ignore if already in reset state */
609
601
Assert (stateP -> procPid != 0 );
@@ -642,7 +634,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
642
634
if (min >= MSGNUMWRAPAROUND )
643
635
{
644
636
segP -> minMsgNum -= MSGNUMWRAPAROUND ;
645
- segP -> maxMsgNum -= MSGNUMWRAPAROUND ;
637
+ pg_atomic_sub_fetch_u32 ( & segP -> maxMsgNum , MSGNUMWRAPAROUND ) ;
646
638
for (i = 0 ; i < segP -> numProcs ; i ++ )
647
639
segP -> procState [segP -> pgprocnos [i ]].nextMsgNum -= MSGNUMWRAPAROUND ;
648
640
}
@@ -651,7 +643,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
651
643
* Determine how many messages are still in the queue, and set the
652
644
* threshold at which we should repeat SICleanupQueue().
653
645
*/
654
- numMsgs = segP -> maxMsgNum - segP -> minMsgNum ;
646
+ numMsgs = pg_atomic_read_u32 ( & segP -> maxMsgNum ) - segP -> minMsgNum ;
655
647
if (numMsgs < CLEANUP_MIN )
656
648
segP -> nextThreshold = CLEANUP_MIN ;
657
649
else
0 commit comments