Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xlog.c55
-rw-r--r--src/backend/replication/logical/logical.c10
-rw-r--r--src/backend/replication/walsender.c4
3 files changed, 60 insertions, 9 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fc104a377ad..f734bb47cfa 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -920,7 +920,8 @@ static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN,
+ XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
@@ -8921,6 +8922,7 @@ CreateCheckPoint(int flags)
XLogRecPtr last_important_lsn;
VirtualTransactionId *vxids;
int nvxids;
+ XLogRecPtr slotsMinReqLSN;
/*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -9141,6 +9143,15 @@ CreateCheckPoint(int flags)
END_CRIT_SECTION();
/*
+ * Get the current minimum LSN to be used later in the WAL segment
+ * cleanup. We may clean up only WAL segments, which are not needed
+ * according to synchronized LSNs of replication slots. The slot's LSN
+ * might be advanced concurrently, so we call this before
+ * CheckPointReplicationSlots() synchronizes replication slots.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
+ /*
* In some cases there are groups of actions that must all occur on one
* side or the other of a checkpoint record. Before flushing the
* checkpoint record we must explicitly wait for any backend currently
@@ -9304,15 +9315,23 @@ CreateCheckPoint(int flags)
* prevent the disk holding the xlog from growing full.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(recptr, &_logSegNo);
+ KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(_logSegNo))
{
/*
+ * Recalculate the current minimum LSN to be used in the WAL segment
+ * cleanup. Then, we must synchronize the replication slots again in
+ * order to make this LSN safe to use.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ CheckPointReplicationSlots();
+
+ /*
* Some slots have been invalidated; recalculate the old-segment
* horizon, starting again from RedoRecPtr.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(recptr, &_logSegNo);
+ KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
}
_logSegNo--;
RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
@@ -9534,6 +9553,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
XLogSegNo _logSegNo;
TimestampTz xtime;
+ XLogRecPtr slotsMinReqLSN;
/*
* Acquire CheckpointLock to ensure only one restartpoint or checkpoint
@@ -9623,6 +9643,15 @@ CreateRestartPoint(int flags)
MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
+ /*
+ * Get the current minimum LSN to be used later in the WAL segment
+ * cleanup. We may clean up only WAL segments, which are not needed
+ * according to synchronized LSNs of replication slots. The slot's LSN
+ * might be advanced concurrently, so we call this before
+ * CheckPointReplicationSlots() synchronizes replication slots.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
if (log_checkpoints)
LogCheckpointStart(flags, true);
@@ -9708,15 +9737,23 @@ CreateRestartPoint(int flags)
receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
- KeepLogSeg(endptr, &_logSegNo);
+ KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(_logSegNo))
{
/*
+ * Recalculate the current minimum LSN to be used in the WAL segment
+ * cleanup. Then, we must synchronize the replication slots again in
+ * order to make this LSN safe to use.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ CheckPointReplicationSlots();
+
+ /*
* Some slots have been invalidated; recalculate the old-segment
* horizon, starting again from RedoRecPtr.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(endptr, &_logSegNo);
+ KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
}
_logSegNo--;
@@ -9818,6 +9855,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */
XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */
uint64 keepSegs;
+ XLogRecPtr slotsMinReqLSN;
/*
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -9831,8 +9869,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
* oldestSlotSeg to the current segment.
*/
currpos = GetXLogWriteRecPtr();
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size);
- KeepLogSeg(currpos, &oldestSlotSeg);
+ KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg);
/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -9893,7 +9932,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
* invalidation is optionally done here, instead.
*/
static void
-KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
+KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo)
{
XLogSegNo currSegNo;
XLogSegNo segno;
@@ -9906,7 +9945,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
* Calculate how many segments are kept by slots first, adjusting for
* max_slot_wal_keep_size.
*/
- keep = XLogGetReplicationSlotMinimumLSN();
+ keep = slotsMinReqLSN;
if (keep != InvalidXLogRecPtr && keep < recptr)
{
XLByteToSeg(keep, segno, wal_segment_size);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 08e38c29fa6..25695809037 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1087,7 +1087,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex);
- /* first write new xmin to disk, so we know what's up after a crash */
+ /*
+ * First, write new xmin and restart_lsn to disk so we know what's up
+ * after a crash. Even when we do this, the checkpointer can see the
+ * updated restart_lsn value in the shared memory; then, a crash can
+ * happen before we manage to write that value to the disk. Thus,
+ * checkpointer still needs to make special efforts to keep WAL
+ * segments required by the restart_lsn written to the disk. See
+ * CreateCheckPoint() and CreateRestartPoint() for details.
+ */
if (updated_xmin || updated_restart)
{
ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3e024044295..1e9936b4142 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1894,6 +1894,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
* be energy wasted - the worst lost information can do here is give us
* wrong information in a statistics view - we'll just potentially be more
* conservative in removing files.
+ *
+ * Checkpointer makes special efforts to keep the WAL segments required by
+ * the restart_lsn written to the disk. See CreateCheckPoint() and
+ * CreateRestartPoint() for details.
*/
}