@@ -476,7 +476,7 @@ ProcessRepliesIfAny(void)
476
476
{
477
477
unsigned char firstchar ;
478
478
int r ;
479
- int received = false;
479
+ bool received = false;
480
480
481
481
for (;;)
482
482
{
@@ -700,6 +700,9 @@ WalSndLoop(void)
700
700
/* Loop forever, unless we get an error */
701
701
for (;;)
702
702
{
703
+ /* Clear any already-pending wakeups */
704
+ ResetLatch (& MyWalSnd -> latch );
705
+
703
706
/*
704
707
* Emergency bailout if postmaster has died. This is to avoid the
705
708
* necessity for manual cleanup of all postmaster children.
@@ -718,60 +721,81 @@ WalSndLoop(void)
718
721
/* Normal exit from the walsender is here */
719
722
if (walsender_shutdown_requested )
720
723
{
721
- /* Inform the standby that XLOG streaming was done */
724
+ /* Inform the standby that XLOG streaming is done */
722
725
pq_puttextmessage ('C' , "COPY 0" );
723
726
pq_flush ();
724
727
725
728
proc_exit (0 );
726
729
}
727
730
731
+ /* Check for input from the client */
732
+ ProcessRepliesIfAny ();
733
+
728
734
/*
729
735
* If we don't have any pending data in the output buffer, try to send
730
- * some more.
736
+ * some more. If there is some, we don't bother to call XLogSend
737
+ * again until we've flushed it ... but we'd better assume we are not
738
+ * caught up.
731
739
*/
732
740
if (!pq_is_send_pending ())
733
- {
734
741
XLogSend (output_message , & caughtup );
742
+ else
743
+ caughtup = false;
744
+
745
+ /* Try to flush pending output to the client */
746
+ if (pq_flush_if_writable () != 0 )
747
+ break ;
735
748
749
+ /* If nothing remains to be sent right now ... */
750
+ if (caughtup && !pq_is_send_pending ())
751
+ {
736
752
/*
737
- * Even if we wrote all the WAL that was available when we started
738
- * sending, more might have arrived while we were sending this
739
- * batch. We had the latch set while sending, so we have not
740
- * received any signals from that time. Let's arm the latch again,
741
- * and after that check that we're still up-to-date.
753
+ * If we're in catchup state, move to streaming. This is an
754
+ * important state change for users to know about, since before
755
+ * this point data loss might occur if the primary dies and we
756
+ * need to failover to the standby. The state change is also
757
+ * important for synchronous replication, since commits that
758
+ * started to wait at that point might wait for some time.
742
759
*/
743
- if (caughtup && ! pq_is_send_pending () )
760
+ if (MyWalSnd -> state == WALSNDSTATE_CATCHUP )
744
761
{
745
- ResetLatch (& MyWalSnd -> latch );
762
+ ereport (DEBUG1 ,
763
+ (errmsg ("standby \"%s\" has now caught up with primary" ,
764
+ application_name )));
765
+ WalSndSetState (WALSNDSTATE_STREAMING );
766
+ }
746
767
768
+ /*
769
+ * When SIGUSR2 arrives, we send any outstanding logs up to the
770
+ * shutdown checkpoint record (i.e., the latest record) and exit.
771
+ * This may be a normal termination at shutdown, or a promotion,
772
+ * the walsender is not sure which.
773
+ */
774
+ if (walsender_ready_to_stop )
775
+ {
776
+ /* ... let's just be real sure we're caught up ... */
747
777
XLogSend (output_message , & caughtup );
778
+ if (caughtup && !pq_is_send_pending ())
779
+ {
780
+ walsender_shutdown_requested = true;
781
+ continue ; /* don't want to wait more */
782
+ }
748
783
}
749
784
}
750
785
751
- /* Flush pending output to the client */
752
- if (pq_flush_if_writable () != 0 )
753
- break ;
754
-
755
786
/*
756
- * When SIGUSR2 arrives, we send any outstanding logs up to the
757
- * shutdown checkpoint record (i.e., the latest record) and exit.
787
+ * We don't block if not caught up, unless there is unsent data
788
+ * pending in which case we'd better block until the socket is
789
+ * write-ready. This test is only needed for the case where XLogSend
790
+ * loaded a subset of the available data but then pq_flush_if_writable
791
+ * flushed it all --- we should immediately try to send more.
758
792
*/
759
- if (walsender_ready_to_stop && !pq_is_send_pending ())
760
- {
761
- XLogSend (output_message , & caughtup );
762
- ProcessRepliesIfAny ();
763
- if (caughtup && !pq_is_send_pending ())
764
- walsender_shutdown_requested = true;
765
- }
766
-
767
- if ((caughtup || pq_is_send_pending ()) &&
768
- !got_SIGHUP &&
769
- !walsender_shutdown_requested )
793
+ if (caughtup || pq_is_send_pending ())
770
794
{
771
795
TimestampTz finish_time = 0 ;
772
- long sleeptime ;
796
+ long sleeptime = -1 ;
773
797
774
- /* Reschedule replication timeout */
798
+ /* Determine time until replication timeout */
775
799
if (replication_timeout > 0 )
776
800
{
777
801
long secs ;
@@ -795,12 +819,16 @@ WalSndLoop(void)
795
819
sleeptime = WalSndDelay ;
796
820
}
797
821
798
- /* Sleep */
822
+ /* Sleep until something happens or replication timeout */
799
823
WaitLatchOrSocket (& MyWalSnd -> latch , MyProcPort -> sock ,
800
824
true, pq_is_send_pending (),
801
825
sleeptime );
802
826
803
- /* Check for replication timeout */
827
+ /*
828
+ * Check for replication timeout. Note we ignore the corner case
829
+ * possibility that the client replied just as we reached the
830
+ * timeout ... he's supposed to reply *before* that.
831
+ */
804
832
if (replication_timeout > 0 &&
805
833
GetCurrentTimestamp () >= finish_time )
806
834
{
@@ -814,24 +842,6 @@ WalSndLoop(void)
814
842
break ;
815
843
}
816
844
}
817
-
818
- /*
819
- * If we're in catchup state, see if its time to move to streaming.
820
- * This is an important state change for users, since before this
821
- * point data loss might occur if the primary dies and we need to
822
- * failover to the standby. The state change is also important for
823
- * synchronous replication, since commits that started to wait at that
824
- * point might wait for some time.
825
- */
826
- if (MyWalSnd -> state == WALSNDSTATE_CATCHUP && caughtup )
827
- {
828
- ereport (DEBUG1 ,
829
- (errmsg ("standby \"%s\" has now caught up with primary" ,
830
- application_name )));
831
- WalSndSetState (WALSNDSTATE_STREAMING );
832
- }
833
-
834
- ProcessRepliesIfAny ();
835
845
}
836
846
837
847
/*
0 commit comments