diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1914859b2eed..bdfd0a59ab7d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -677,7 +677,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr, TimeLineID newTLI); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN, + XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, @@ -7087,6 +7088,7 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; int oldXLogAllowed = 0; + XLogRecPtr slotsMinReqLSN; /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just @@ -7315,6 +7317,16 @@ CreateCheckPoint(int flags) */ END_CRIT_SECTION(); + /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + ReplicationSlotsComputeRequiredLSN(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + /* * In some cases there are groups of actions that must all occur on one * side or the other of a checkpoint record. Before flushing the @@ -7498,22 +7510,35 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); +#ifdef USE_INJECTION_POINTS + INJECTION_POINT("checkpoint-before-old-wal-removal", NULL); +#endif + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { + /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + ReplicationSlotsComputeRequiredLSN(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(shutdown); + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr, @@ -7788,6 +7813,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; XLogSegNo _logSegNo; TimestampTz xtime; + XLogRecPtr slotsMinReqLSN; /* Concurrent checkpoint/restartpoint cannot happen */ Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER); @@ -7870,6 +7896,16 @@ CreateRestartPoint(int flags) MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); CheckpointStats.ckpt_start_t = GetCurrentTimestamp(); + /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + ReplicationSlotsComputeRequiredLSN(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + if (log_checkpoints) LogCheckpointStart(flags, true); @@ -7958,17 +7994,26 @@ CreateRestartPoint(int flags) receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { + /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + ReplicationSlotsComputeRequiredLSN(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; @@ -8063,6 +8108,7 @@ GetWALAvailability(XLogRecPtr targetLSN) XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */ XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */ uint64 keepSegs; + XLogRecPtr slotsMinReqLSN; /* * slot does not reserve WAL. Either deactivated, or has never been active @@ -8076,8 +8122,9 @@ GetWALAvailability(XLogRecPtr targetLSN) * oldestSlotSeg to the current segment. */ currpos = GetXLogWriteRecPtr(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size); - KeepLogSeg(currpos, &oldestSlotSeg); + KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg); /* * Find the oldest extant segment file. We get 1 until checkpoint removes @@ -8138,7 +8185,7 @@ GetWALAvailability(XLogRecPtr targetLSN) * invalidation is optionally done here, instead. */ static void -KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) +KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo) { XLogSegNo currSegNo; XLogSegNo segno; @@ -8151,7 +8198,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) * Calculate how many segments are kept by slots first, adjusting for * max_slot_wal_keep_size. */ - keep = XLogGetReplicationSlotMinimumLSN(); + keep = slotsMinReqLSN; if (keep != InvalidXLogRecPtr && keep < recptr) { XLByteToSeg(keep, segno, wal_segment_size); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1d56d0c4ef31..34e973393c22 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/xlog_internal.h" #include "access/xlogutils.h" #include "fmgr.h" #include "miscadmin.h" @@ -41,6 +42,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/memutils.h" @@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + XLogRecPtr restart_lsn pg_attribute_unused(); SpinLockAcquire(&MyReplicationSlot->mutex); + /* remember the old restart lsn */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + /* * Prevent moving the confirmed_flush backwards, as this could lead to * data duplication issues caused by replicating already replicated @@ -1878,9 +1884,29 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); - /* first write new xmin to disk, so we know what's up after a crash */ + /* + * First, write new xmin and restart_lsn to disk so we know what's up + * after a crash. Even when we do this, the checkpointer can see the + * updated restart_lsn value in the shared memory; then, a crash can + * happen before we manage to write that value to the disk. Thus, + * checkpointer still needs to make special efforts to keep WAL + * segments required by the restart_lsn written to the disk. See + * CreateCheckPoint() and CreateRestartPoint() for details. + */ if (updated_xmin || updated_restart) { +#ifdef USE_INJECTION_POINTS + XLogSegNo seg1, + seg2; + + XLByteToSeg(restart_lsn, seg1, wal_segment_size); + XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size); + + /* trigger injection point, but only if segment changes */ + if (seg1 != seg2) + INJECTION_POINT("logical-replication-slot-advance-segment", NULL); +#endif + ReplicationSlotMarkDirty(); ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); @@ -1899,7 +1925,6 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); } } else diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 656e66e0ae0a..30662c09275b 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -335,7 +335,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); } return updated_config || updated_xmin_or_lsn; @@ -502,9 +501,6 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); - /* Prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb6..dd18fe10f7d2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1008,7 +1008,6 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * limits. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); /* * If removing the directory fails, the worst thing that will happen is @@ -1494,9 +1493,6 @@ ReplicationSlotReserveWal(void) slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - /* * If all required WAL is still there, great, otherwise retry. The * slot should prevent further removal of WAL, unless there's a @@ -2014,7 +2010,6 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, if (invalidated) { ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); } return invalidated; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 36cc2ed4e440..3300fb9b1c98 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -583,7 +583,6 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) * advancing potentially done. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); ReplicationSlotRelease(); @@ -819,7 +818,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); ReplicationSlotSave(); #ifdef USE_ASSERT_CHECKING diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9fa8beb6103d..dff749f00a8f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2384,7 +2384,6 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (changed) { ReplicationSlotMarkDirty(); - ReplicationSlotsComputeRequiredLSN(); PhysicalWakeupLogicalWalSnd(); } @@ -2393,6 +2392,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) * be energy wasted - the worst thing lost information could cause here is * to give wrong information in a statistics view - we'll just potentially * be more conservative in removing files. + * + * Checkpointer makes special efforts to keep the WAL segments required by + * the restart_lsn written to the disk. See CreateCheckPoint() and + * CreateRestartPoint() for details. */ } diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index cb983766c679..92429d284025 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -54,6 +54,8 @@ tests += { 't/043_no_contrecord_switch.pl', 't/044_invalidate_inactive_slots.pl', 't/045_archive_restartpoint.pl', + 't/046_checkpoint_logical_slot.pl', + 't/047_checkpoint_physical_slot.pl' ], }, } diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl new file mode 100644 index 000000000000..b4265c4a6a53 --- /dev/null +++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl @@ -0,0 +1,139 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# This test verifies the case when the logical slot is advanced during +# checkpoint. The test checks that the logical slot's restart_lsn still refers +# to an existed WAL segment after immediate restart. +# +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my ($node, $result); + +$node = PostgreSQL::Test::Cluster->new('mike'); +$node->init; +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node->append_conf('postgresql.conf', "wal_level = 'logical'"); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION injection_points)); + +# Create a simple table to generate data into. +$node->safe_psql('postgres', + q{create table t (id serial primary key, b text)}); + +# Create the two slots we'll need. +$node->safe_psql('postgres', + q{select pg_create_logical_replication_slot('slot_logical', 'test_decoding')} +); +$node->safe_psql('postgres', + q{select pg_create_physical_replication_slot('slot_physical', true)}); + +# Advance both slots to the current position just to have everything "valid". +$node->safe_psql('postgres', + q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null)} +); +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run checkpoint to flush current state to disk and set a baseline. +$node->safe_psql('postgres', q{checkpoint}); + +# Generate some transactions to get RUNNING_XACTS. +my $xacts = $node->background_psql('postgres'); +$xacts->query_until( + qr/run_xacts/, + q(\echo run_xacts +SELECT 1 \watch 0.1 +\q +)); + +# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +# Run another checkpoint to set a new restore LSN. +$node->safe_psql('postgres', q{checkpoint}); + +# Another 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +# Run another checkpoint, this time in the background, and make it wait +# on the injection point) so that the checkpoint stops right before +# removing old WAL segments. +note('starting checkpoint\n'); + +my $checkpoint = $node->background_psql('postgres'); +$checkpoint->query_safe( + q(select injection_points_attach('checkpoint-before-old-wal-removal','wait')) +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +\q +)); + +# Wait until the checkpoint stops right before removing WAL segments. +note('waiting for injection_point\n'); +$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal'); +note('injection_point is reached'); + +# Try to advance the logical slot, but make it stop when it moves to the next +# WAL segment (this has to happen in the background, too). +my $logical = $node->background_psql('postgres'); +$logical->query_safe( + q{select injection_points_attach('logical-replication-slot-advance-segment','wait');} +); +$logical->query_until( + qr/get_changes/, + q( +\echo get_changes +select count(*) from pg_logical_slot_get_changes('slot_logical', null, null) \watch 1 +\q +)); + +# Wait until the slot's restart_lsn points to the next WAL segment. +note('waiting for injection_point\n'); +$node->wait_for_event('client backend', + 'logical-replication-slot-advance-segment'); +note('injection_point is reached'); + +# OK, we're in the right situation: time to advance the physical slot, which +# recalculates the required LSN, and then unblock the checkpoint, which +# removes the WAL still needed by the logical slot. +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Continue the checkpoint. +$node->safe_psql('postgres', + q{select injection_points_wakeup('checkpoint-before-old-wal-removal')}); + +# Abruptly stop the server (1 second should be enough for the checkpoint +# to finish; it would be better). +$node->stop('immediate'); + +$node->start; + +eval { + $node->safe_psql('postgres', + q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null);} + ); +}; +is($@, '', "Logical slot still valid"); + +done_testing(); diff --git a/src/test/recovery/t/047_checkpoint_physical_slot.pl b/src/test/recovery/t/047_checkpoint_physical_slot.pl new file mode 100644 index 000000000000..454e56b9bd2d --- /dev/null +++ b/src/test/recovery/t/047_checkpoint_physical_slot.pl @@ -0,0 +1,133 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# This test verifies the case when the physical slot is advanced during +# checkpoint. The test checks that the physical slot's restart_lsn still refers +# to an existed WAL segment after immediate restart. +# +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my ($node, $result); + +$node = PostgreSQL::Test::Cluster->new('mike'); +$node->init; +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node->append_conf('postgresql.conf', "wal_level = 'replica'"); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION injection_points)); + +# Create a simple table to generate data into. +$node->safe_psql('postgres', + q{create table t (id serial primary key, b text)}); + +# Create a physical replication slot. +$node->safe_psql('postgres', + q{select pg_create_physical_replication_slot('slot_physical', true)}); + +# Advance slot to the current position, just to have everything "valid". +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run checkpoint to flush current state to disk and set a baseline. +$node->safe_psql('postgres', q{checkpoint}); + +# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,100000) s(i)} +); + +# Advance slot to the current position, just to have everything "valid". +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run another checkpoint to set a new restore LSN. +$node->safe_psql('postgres', q{checkpoint}); + +# Another 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +my $restart_lsn_init = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn_init); +note("restart lsn before checkpoint: $restart_lsn_init"); + +# Run another checkpoint, this time in the background, and make it wait +# on the injection point) so that the checkpoint stops right before +# removing old WAL segments. +note('starting checkpoint'); + +my $checkpoint = $node->background_psql('postgres'); +$checkpoint->query_safe( + q{select injection_points_attach('checkpoint-before-old-wal-removal','wait')} +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +\q +)); + +# Wait until the checkpoint stops right before removing WAL segments. +note('waiting for injection_point'); +$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal'); +note('injection_point is reached'); + +# OK, we're in the right situation: time to advance the physical slot, which +# recalculates the required LSN and then unblock the checkpoint, which +# removes the WAL still needed by the physical slot. +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Continue the checkpoint. +$node->safe_psql('postgres', + q{select injection_points_wakeup('checkpoint-before-old-wal-removal')}); + +my $restart_lsn_old = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn_old); +note("restart lsn before stop: $restart_lsn_old"); + +# Abruptly stop the server (1 second should be enough for the checkpoint +# to finish; it would be better). +$node->stop('immediate'); + +$node->start; + +# Get the restart_lsn of the slot right after restarting. +my $restart_lsn = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn); +note("restart lsn: $restart_lsn"); + +# Get the WAL segment name for the slot's restart_lsn. +my $restart_lsn_segment = $node->safe_psql('postgres', + "SELECT pg_walfile_name('$restart_lsn'::pg_lsn)"); +chomp($restart_lsn_segment); + +# Check if the required wal segment exists. +note("required by slot segment name: $restart_lsn_segment"); +my $datadir = $node->data_dir; +ok( -f "$datadir/pg_wal/$restart_lsn_segment", + "WAL segment $restart_lsn_segment for physical slot's restart_lsn $restart_lsn exists" +); + +done_testing();