20
20
* per-transaction state information.
21
21
*
22
22
* Replication is either synchronous or not synchronous (async). If it is
23
- * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
24
- * for the flush location on the standby before releasing the waiting backend.
23
+ * async, we just fastpath out of here. If it is sync, then we wait for
24
+ * the write or flush location on the standby before releasing the waiting backend.
25
25
* Further complexity in that interaction is expected in later releases.
26
26
*
27
27
* The best performing way to manage the waiting backends is to have a
@@ -67,13 +67,15 @@ char *SyncRepStandbyNames;
67
67
68
68
static bool announce_next_takeover = true;
69
69
70
- static void SyncRepQueueInsert (void );
70
+ static int SyncRepWaitMode = SYNC_REP_NO_WAIT ;
71
+
72
+ static void SyncRepQueueInsert (int mode );
71
73
static void SyncRepCancelWait (void );
72
74
73
75
static int SyncRepGetStandbyPriority (void );
74
76
75
77
#ifdef USE_ASSERT_CHECKING
76
- static bool SyncRepQueueIsOrderedByLSN (void );
78
+ static bool SyncRepQueueIsOrderedByLSN (int mode );
77
79
#endif
78
80
79
81
/*
@@ -120,7 +122,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
120
122
* be a low cost check.
121
123
*/
122
124
if (!WalSndCtl -> sync_standbys_defined ||
123
- XLByteLE (XactCommitLSN , WalSndCtl -> lsn ))
125
+ XLByteLE (XactCommitLSN , WalSndCtl -> lsn [ SyncRepWaitMode ] ))
124
126
{
125
127
LWLockRelease (SyncRepLock );
126
128
return ;
@@ -132,8 +134,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
132
134
*/
133
135
MyProc -> waitLSN = XactCommitLSN ;
134
136
MyProc -> syncRepState = SYNC_REP_WAITING ;
135
- SyncRepQueueInsert ();
136
- Assert (SyncRepQueueIsOrderedByLSN ());
137
+ SyncRepQueueInsert (SyncRepWaitMode );
138
+ Assert (SyncRepQueueIsOrderedByLSN (SyncRepWaitMode ));
137
139
LWLockRelease (SyncRepLock );
138
140
139
141
/* Alter ps display to show waiting for sync rep. */
@@ -267,18 +269,19 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
267
269
}
268
270
269
271
/*
270
- * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
272
+ * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
271
273
*
272
274
* Usually we will go at tail of queue, though it's possible that we arrive
273
275
* here out of order, so start at tail and work back to insertion point.
274
276
*/
275
277
static void
276
- SyncRepQueueInsert (void )
278
+ SyncRepQueueInsert (int mode )
277
279
{
278
280
PGPROC * proc ;
279
281
280
- proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue ),
281
- & (WalSndCtl -> SyncRepQueue ),
282
+ Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
283
+ proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue [mode ]),
284
+ & (WalSndCtl -> SyncRepQueue [mode ]),
282
285
offsetof(PGPROC , syncRepLinks ));
283
286
284
287
while (proc )
@@ -290,15 +293,15 @@ SyncRepQueueInsert(void)
290
293
if (XLByteLT (proc -> waitLSN , MyProc -> waitLSN ))
291
294
break ;
292
295
293
- proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue ),
296
+ proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue [ mode ] ),
294
297
& (proc -> syncRepLinks ),
295
298
offsetof(PGPROC , syncRepLinks ));
296
299
}
297
300
298
301
if (proc )
299
302
SHMQueueInsertAfter (& (proc -> syncRepLinks ), & (MyProc -> syncRepLinks ));
300
303
else
301
- SHMQueueInsertAfter (& (WalSndCtl -> SyncRepQueue ), & (MyProc -> syncRepLinks ));
304
+ SHMQueueInsertAfter (& (WalSndCtl -> SyncRepQueue [ mode ] ), & (MyProc -> syncRepLinks ));
302
305
}
303
306
304
307
/*
@@ -368,7 +371,8 @@ SyncRepReleaseWaiters(void)
368
371
{
369
372
volatile WalSndCtlData * walsndctl = WalSndCtl ;
370
373
volatile WalSnd * syncWalSnd = NULL ;
371
- int numprocs = 0 ;
374
+ int numwrite = 0 ;
375
+ int numflush = 0 ;
372
376
int priority = 0 ;
373
377
int i ;
374
378
@@ -419,20 +423,28 @@ SyncRepReleaseWaiters(void)
419
423
return ;
420
424
}
421
425
422
- if (XLByteLT (walsndctl -> lsn , MyWalSnd -> flush ))
426
+ /*
427
+ * Set the lsn first so that when we wake backends they will release
428
+ * up to this location.
429
+ */
430
+ if (XLByteLT (walsndctl -> lsn [SYNC_REP_WAIT_WRITE ], MyWalSnd -> write ))
423
431
{
424
- /*
425
- * Set the lsn first so that when we wake backends they will release
426
- * up to this location.
427
- */
428
- walsndctl -> lsn = MyWalSnd -> flush ;
429
- numprocs = SyncRepWakeQueue (false);
432
+ walsndctl -> lsn [SYNC_REP_WAIT_WRITE ] = MyWalSnd -> write ;
433
+ numwrite = SyncRepWakeQueue (false, SYNC_REP_WAIT_WRITE );
434
+ }
435
+ if (XLByteLT (walsndctl -> lsn [SYNC_REP_WAIT_FLUSH ], MyWalSnd -> flush ))
436
+ {
437
+ walsndctl -> lsn [SYNC_REP_WAIT_FLUSH ] = MyWalSnd -> flush ;
438
+ numflush = SyncRepWakeQueue (false, SYNC_REP_WAIT_FLUSH );
430
439
}
431
440
432
441
LWLockRelease (SyncRepLock );
433
442
434
- elog (DEBUG3 , "released %d procs up to %X/%X" ,
435
- numprocs ,
443
+ elog (DEBUG3 , "released %d procs up to write %X/%X, %d procs up to flush %X/%X" ,
444
+ numwrite ,
445
+ MyWalSnd -> write .xlogid ,
446
+ MyWalSnd -> write .xrecoff ,
447
+ numflush ,
436
448
MyWalSnd -> flush .xlogid ,
437
449
MyWalSnd -> flush .xrecoff );
438
450
@@ -507,40 +519,42 @@ SyncRepGetStandbyPriority(void)
507
519
}
508
520
509
521
/*
510
- * Walk queue from head. Set the state of any backends that need to be woken,
511
- * remove them from the queue, and then wake them. Pass all = true to wake
512
- * whole queue; otherwise, just wake up to the walsender's LSN.
522
+ * Walk the specified queue from head. Set the state of any backends that
523
+ * need to be woken, remove them from the queue, and then wake them.
524
+ * Pass all = true to wake whole queue; otherwise, just wake up to
525
+ * the walsender's LSN.
513
526
*
514
527
* Must hold SyncRepLock.
515
528
*/
516
529
int
517
- SyncRepWakeQueue (bool all )
530
+ SyncRepWakeQueue (bool all , int mode )
518
531
{
519
532
volatile WalSndCtlData * walsndctl = WalSndCtl ;
520
533
PGPROC * proc = NULL ;
521
534
PGPROC * thisproc = NULL ;
522
535
int numprocs = 0 ;
523
536
524
- Assert (SyncRepQueueIsOrderedByLSN ());
537
+ Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
538
+ Assert (SyncRepQueueIsOrderedByLSN (mode ));
525
539
526
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue ),
527
- & (WalSndCtl -> SyncRepQueue ),
540
+ proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [ mode ] ),
541
+ & (WalSndCtl -> SyncRepQueue [ mode ] ),
528
542
offsetof(PGPROC , syncRepLinks ));
529
543
530
544
while (proc )
531
545
{
532
546
/*
533
547
* Assume the queue is ordered by LSN
534
548
*/
535
- if (!all && XLByteLT (walsndctl -> lsn , proc -> waitLSN ))
549
+ if (!all && XLByteLT (walsndctl -> lsn [ mode ] , proc -> waitLSN ))
536
550
return numprocs ;
537
551
538
552
/*
539
553
* Move to next proc, so we can delete thisproc from the queue.
540
554
* thisproc is valid, proc may be NULL after this.
541
555
*/
542
556
thisproc = proc ;
543
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue ),
557
+ proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [ mode ] ),
544
558
& (proc -> syncRepLinks ),
545
559
offsetof(PGPROC , syncRepLinks ));
546
560
@@ -588,7 +602,12 @@ SyncRepUpdateSyncStandbysDefined(void)
588
602
* wants synchronous replication, we'd better wake them up.
589
603
*/
590
604
if (!sync_standbys_defined )
591
- SyncRepWakeQueue (true);
605
+ {
606
+ int i ;
607
+
608
+ for (i = 0 ; i < NUM_SYNC_REP_WAIT_MODE ; i ++ )
609
+ SyncRepWakeQueue (true, i );
610
+ }
592
611
593
612
/*
594
613
* Only allow people to join the queue when there are synchronous
@@ -605,16 +624,18 @@ SyncRepUpdateSyncStandbysDefined(void)
605
624
606
625
#ifdef USE_ASSERT_CHECKING
607
626
static bool
608
- SyncRepQueueIsOrderedByLSN (void )
627
+ SyncRepQueueIsOrderedByLSN (int mode )
609
628
{
610
629
PGPROC * proc = NULL ;
611
630
XLogRecPtr lastLSN ;
612
631
632
+ Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
633
+
613
634
lastLSN .xlogid = 0 ;
614
635
lastLSN .xrecoff = 0 ;
615
636
616
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue ),
617
- & (WalSndCtl -> SyncRepQueue ),
637
+ proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [ mode ] ),
638
+ & (WalSndCtl -> SyncRepQueue [ mode ] ),
618
639
offsetof(PGPROC , syncRepLinks ));
619
640
620
641
while (proc )
@@ -628,7 +649,7 @@ SyncRepQueueIsOrderedByLSN(void)
628
649
629
650
lastLSN = proc -> waitLSN ;
630
651
631
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue ),
652
+ proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [ mode ] ),
632
653
& (proc -> syncRepLinks ),
633
654
offsetof(PGPROC , syncRepLinks ));
634
655
}
@@ -675,3 +696,20 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source)
675
696
676
697
return true;
677
698
}
699
+
700
+ void
701
+ assign_synchronous_commit (int newval , void * extra )
702
+ {
703
+ switch (newval )
704
+ {
705
+ case SYNCHRONOUS_COMMIT_REMOTE_WRITE :
706
+ SyncRepWaitMode = SYNC_REP_WAIT_WRITE ;
707
+ break ;
708
+ case SYNCHRONOUS_COMMIT_REMOTE_FLUSH :
709
+ SyncRepWaitMode = SYNC_REP_WAIT_FLUSH ;
710
+ break ;
711
+ default :
712
+ SyncRepWaitMode = SYNC_REP_NO_WAIT ;
713
+ break ;
714
+ }
715
+ }
0 commit comments