@@ -3970,6 +3970,63 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr RedoRecPtr, XLogRecPtr endptr)
3970
3970
FreeDir (xldir );
3971
3971
}
3972
3972
3973
+ /*
3974
+ * Find latest WAL LSN
3975
+ */
3976
+ static XLogRecPtr
3977
+ GetLastLSN (void )
3978
+ {
3979
+ DIR * xldir ;
3980
+ struct dirent * xlde ;
3981
+ XLogRecPtr lastLsn ;
3982
+ uint64 logSegNo ;
3983
+ unsigned tli ;
3984
+ uint64 lastSegNo = 0 ;
3985
+ XLogReaderState * xlogreader ;
3986
+ char * errormsg ;
3987
+
3988
+ xldir = AllocateDir (XLOGDIR );
3989
+
3990
+ while ((xlde = ReadDir (xldir , XLOGDIR )) != NULL )
3991
+ {
3992
+ /* Ignore files that are not XLOG segments */
3993
+ if (!IsXLogFileName (xlde -> d_name ))
3994
+ continue ;
3995
+
3996
+ XLogFromFileName (xlde -> d_name , & tli , & logSegNo , wal_segment_size );
3997
+ if (tli == ThisTimeLineID && logSegNo > lastSegNo )
3998
+ lastSegNo = logSegNo ;
3999
+ }
4000
+
4001
+ FreeDir (xldir );
4002
+
4003
+ XLogSegNoOffsetToRecPtr (lastSegNo , SizeOfXLogLongPHD , wal_segment_size , lastLsn );
4004
+
4005
+ xlogreader = XLogReaderAllocate (wal_segment_size , & read_local_xlog_page , NULL );
4006
+
4007
+ while (XLogReadRecord (xlogreader , lastLsn , & errormsg ))
4008
+ lastLsn = xlogreader -> EndRecPtr ;
4009
+
4010
+ XLogReaderFree (xlogreader );
4011
+
4012
+ return lastLsn ;
4013
+ }
4014
+
4015
+ /*
4016
+ * Launch WalReceiver starting from last LSN if not started yet.
4017
+ */
4018
+ static void
4019
+ StartWalRcv (void )
4020
+ {
4021
+ if (!WalRcvStreaming () && PrimaryConnInfo && strcmp (PrimaryConnInfo , "" ) != 0 )
4022
+ {
4023
+ XLogRecPtr lastLSN = GetLastLSN ();
4024
+ curFileTLI = ThisTimeLineID ;
4025
+ RequestXLogStreaming (ThisTimeLineID , lastLSN , PrimaryConnInfo ,
4026
+ PrimarySlotName );
4027
+ }
4028
+ }
4029
+
3973
4030
/*
3974
4031
* Remove WAL files that are not part of the given timeline's history.
3975
4032
*
@@ -6004,6 +6061,12 @@ recoveryApplyDelay(XLogReaderState *record)
6004
6061
if (secs <= 0 && microsecs <= 0 )
6005
6062
return false;
6006
6063
6064
+ /*
6065
+ * Start WAL receiver if not started yet, to avoid WALs overflow at primary node
6066
+ * or large gap between primary and replica when apply delay is specified.
6067
+ */
6068
+ StartWalRcv ();
6069
+
6007
6070
while (true)
6008
6071
{
6009
6072
ResetLatch (& XLogCtl -> recoveryWakeupLatch );
@@ -11821,6 +11884,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
11821
11884
return false;
11822
11885
11823
11886
/*
11887
+ * If WAL receiver was altery started because of apply delay,
11888
+ * thre restart it.
11889
+ */
11890
+ if (WalRcvStreaming ())
11891
+ ShutdownWalRcv ();
11892
+
11893
+ /*
11824
11894
* If primary_conninfo is set, launch walreceiver to try
11825
11895
* to stream the missing WAL.
11826
11896
*
0 commit comments