@@ -677,7 +677,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
677
677
XLogRecPtr pagePtr ,
678
678
TimeLineID newTLI );
679
679
static void CheckPointGuts (XLogRecPtr checkPointRedo , int flags );
680
- static void KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo );
680
+ static void KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinLSN ,
681
+ XLogSegNo * logSegNo );
681
682
static XLogRecPtr XLogGetReplicationSlotMinimumLSN (void );
682
683
683
684
static void AdvanceXLInsertBuffer (XLogRecPtr upto , TimeLineID tli ,
@@ -7087,6 +7088,7 @@ CreateCheckPoint(int flags)
7087
7088
VirtualTransactionId * vxids ;
7088
7089
int nvxids ;
7089
7090
int oldXLogAllowed = 0 ;
7091
+ XLogRecPtr slotsMinReqLSN ;
7090
7092
7091
7093
/*
7092
7094
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7315,6 +7317,15 @@ CreateCheckPoint(int flags)
7315
7317
*/
7316
7318
END_CRIT_SECTION ();
7317
7319
7320
+ /*
7321
+ * Get the current minimum LSN to be used later in the WAL segment
7322
+ * cleanup. We may clean up only WAL segments, which are not needed
7323
+ * according to synchronized LSNs of replication slots. The slot's LSN
7324
+ * might be advanced concurrently, so we call this before
7325
+ * CheckPointReplicationSlots() synchronizes replication slots.
7326
+ */
7327
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7328
+
7318
7329
/*
7319
7330
* In some cases there are groups of actions that must all occur on one
7320
7331
* side or the other of a checkpoint record. Before flushing the
@@ -7503,17 +7514,25 @@ CreateCheckPoint(int flags)
7503
7514
* prevent the disk holding the xlog from growing full.
7504
7515
*/
7505
7516
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
7506
- KeepLogSeg (recptr , & _logSegNo );
7517
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
7507
7518
if (InvalidateObsoleteReplicationSlots (RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT ,
7508
7519
_logSegNo , InvalidOid ,
7509
7520
InvalidTransactionId ))
7510
7521
{
7522
+ /*
7523
+ * Recalculate the current minimum LSN to be used in the WAL segment
7524
+ * cleanup. Then, we must synchronize the replication slots again in
7525
+ * order to make this LSN safe to use.
7526
+ */
7527
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7528
+ CheckPointReplicationSlots (shutdown );
7529
+
7511
7530
/*
7512
7531
* Some slots have been invalidated; recalculate the old-segment
7513
7532
* horizon, starting again from RedoRecPtr.
7514
7533
*/
7515
7534
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
7516
- KeepLogSeg (recptr , & _logSegNo );
7535
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
7517
7536
}
7518
7537
_logSegNo -- ;
7519
7538
RemoveOldXlogFiles (_logSegNo , RedoRecPtr , recptr ,
@@ -7788,6 +7807,7 @@ CreateRestartPoint(int flags)
7788
7807
XLogRecPtr endptr ;
7789
7808
XLogSegNo _logSegNo ;
7790
7809
TimestampTz xtime ;
7810
+ XLogRecPtr slotsMinReqLSN ;
7791
7811
7792
7812
/* Concurrent checkpoint/restartpoint cannot happen */
7793
7813
Assert (!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER );
@@ -7870,6 +7890,15 @@ CreateRestartPoint(int flags)
7870
7890
MemSet (& CheckpointStats , 0 , sizeof (CheckpointStats ));
7871
7891
CheckpointStats .ckpt_start_t = GetCurrentTimestamp ();
7872
7892
7893
+ /*
7894
+ * Get the current minimum LSN to be used later in the WAL segment
7895
+ * cleanup. We may clean up only WAL segments, which are not needed
7896
+ * according to synchronized LSNs of replication slots. The slot's LSN
7897
+ * might be advanced concurrently, so we call this before
7898
+ * CheckPointReplicationSlots() synchronizes replication slots.
7899
+ */
7900
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7901
+
7873
7902
if (log_checkpoints )
7874
7903
LogCheckpointStart (flags , true);
7875
7904
@@ -7958,17 +7987,25 @@ CreateRestartPoint(int flags)
7958
7987
receivePtr = GetWalRcvFlushRecPtr (NULL , NULL );
7959
7988
replayPtr = GetXLogReplayRecPtr (& replayTLI );
7960
7989
endptr = (receivePtr < replayPtr ) ? replayPtr : receivePtr ;
7961
- KeepLogSeg (endptr , & _logSegNo );
7990
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
7962
7991
if (InvalidateObsoleteReplicationSlots (RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT ,
7963
7992
_logSegNo , InvalidOid ,
7964
7993
InvalidTransactionId ))
7965
7994
{
7995
+ /*
7996
+ * Recalculate the current minimum LSN to be used in the WAL segment
7997
+ * cleanup. Then, we must synchronize the replication slots again in
7998
+ * order to make this LSN safe to use.
7999
+ */
8000
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
8001
+ CheckPointReplicationSlots (flags & CHECKPOINT_IS_SHUTDOWN );
8002
+
7966
8003
/*
7967
8004
* Some slots have been invalidated; recalculate the old-segment
7968
8005
* horizon, starting again from RedoRecPtr.
7969
8006
*/
7970
8007
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
7971
- KeepLogSeg (endptr , & _logSegNo );
8008
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
7972
8009
}
7973
8010
_logSegNo -- ;
7974
8011
@@ -8063,6 +8100,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
8063
8100
XLogSegNo oldestSegMaxWalSize ; /* oldest segid kept by max_wal_size */
8064
8101
XLogSegNo oldestSlotSeg ; /* oldest segid kept by slot */
8065
8102
uint64 keepSegs ;
8103
+ XLogRecPtr slotsMinReqLSN ;
8066
8104
8067
8105
/*
8068
8106
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -8076,8 +8114,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
8076
8114
* oldestSlotSeg to the current segment.
8077
8115
*/
8078
8116
currpos = GetXLogWriteRecPtr ();
8117
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
8079
8118
XLByteToSeg (currpos , oldestSlotSeg , wal_segment_size );
8080
- KeepLogSeg (currpos , & oldestSlotSeg );
8119
+ KeepLogSeg (currpos , slotsMinReqLSN , & oldestSlotSeg );
8081
8120
8082
8121
/*
8083
8122
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -8138,7 +8177,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
8138
8177
* invalidation is optionally done here, instead.
8139
8178
*/
8140
8179
static void
8141
- KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo )
8180
+ KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinReqLSN , XLogSegNo * logSegNo )
8142
8181
{
8143
8182
XLogSegNo currSegNo ;
8144
8183
XLogSegNo segno ;
@@ -8151,7 +8190,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
8151
8190
* Calculate how many segments are kept by slots first, adjusting for
8152
8191
* max_slot_wal_keep_size.
8153
8192
*/
8154
- keep = XLogGetReplicationSlotMinimumLSN () ;
8193
+ keep = slotsMinReqLSN ;
8155
8194
if (keep != InvalidXLogRecPtr && keep < recptr )
8156
8195
{
8157
8196
XLByteToSeg (keep , segno , wal_segment_size );
0 commit comments