Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/nbtree/nbtsearch.c11
-rw-r--r--src/backend/access/transam/xlog.c4
-rw-r--r--src/backend/executor/execGrouping.c4
-rw-r--r--src/backend/executor/nodeTidrangescan.c6
-rw-r--r--src/backend/replication/logical/logical.c18
-rw-r--r--src/backend/replication/slot.c57
-rw-r--r--src/backend/storage/aio/method_io_uring.c6
-rw-r--r--src/bin/pg_dump/pg_restore.c6
-rw-r--r--src/bin/psql/command.c5
-rw-r--r--src/bin/psql/common.c30
-rw-r--r--src/bin/psql/help.c5
-rw-r--r--src/bin/psql/t/001_basic.pl46
-rw-r--r--src/include/replication/slot.h8
-rw-r--r--src/test/recovery/meson.build2
-rw-r--r--src/test/recovery/t/046_checkpoint_logical_slot.pl139
-rw-r--r--src/test/recovery/t/047_checkpoint_physical_slot.pl133
-rw-r--r--src/test/regress/expected/psql_pipeline.out188
-rw-r--r--src/test/regress/sql/psql_pipeline.sql100
18 files changed, 426 insertions, 342 deletions
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 070f14c8b91..36544ecfd58 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -2282,9 +2282,12 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir)
* previously-saved right link or left link. lastcurrblkno is the page that
* was current at the point where the blkno link was saved, which we use to
* reason about concurrent page splits/page deletions during backwards scans.
+ * In the common case where seized=false, blkno is either so->currPos.nextPage
+ * or so->currPos.prevPage, and lastcurrblkno is so->currPos.currPage.
*
- * On entry, caller shouldn't hold any locks or pins on any page (we work
- * directly off of blkno and lastcurrblkno instead). Parallel scan callers
+ * On entry, so->currPos shouldn't be locked by caller. so->currPos.buf must
+ * be InvalidBuffer/unpinned as needed by caller (note that lastcurrblkno
+ * won't need to be read again in almost all cases). Parallel scan callers
* that seized the scan before calling here should pass seized=true; such a
* caller's blkno and lastcurrblkno arguments come from the seized scan.
* seized=false callers just pass us the blkno/lastcurrblkno taken from their
@@ -2301,8 +2304,8 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir)
* success exit (except during so->dropPin index scans, when we drop the pin
* eagerly to avoid blocking VACUUM).
*
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, invalidate so->currPos, and return false.
+ * If there are no more matching records in the given direction, we invalidate
+ * so->currPos (while ensuring it retains no locks or pins), and return false.
*
* We always release the scan for a parallel scan caller, regardless of
* success or failure; we'll call _bt_parallel_release as soon as possible.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1914859b2ee..47ffc0a2307 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7498,6 +7498,10 @@ CreateCheckPoint(int flags)
if (PriorRedoPtr != InvalidXLogRecPtr)
UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
+#ifdef USE_INJECTION_POINTS
+ INJECTION_POINT("checkpoint-before-old-wal-removal", NULL);
+#endif
+
/*
* Delete old log files, those no longer needed for last checkpoint to
* prevent the disk holding the xlog from growing full.
diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index 255bd795361..b5400749353 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -144,7 +144,7 @@ execTuplesHashPrepare(int numCols,
* hashfunctions: FmgrInfos of datatype-specific hashing functions to use
* collations: collations to use in comparisons
* nbuckets: initial estimate of hashtable size
- * additionalsize: size of data stored in ->additional
+ * additionalsize: size of data that may be stored along with the hash entry
* metacxt: memory context for long-lived allocation, but not per-entry data
* tablecxt: memory context in which to store table entries
* tempcxt: short-lived context for evaluation hash and comparison functions
@@ -288,7 +288,7 @@ ResetTupleHashTable(TupleHashTable hashtable)
*
* If isnew isn't NULL, then a new entry is created if no existing entry
* matches. On return, *isnew is true if the entry is newly created,
- * false if it existed already. ->additional_data in the new entry has
+ * false if it existed already. The additional data in the new entry has
* been zeroed.
*/
TupleHashEntry
diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c
index ab2eab9596e..26f7420b64b 100644
--- a/src/backend/executor/nodeTidrangescan.c
+++ b/src/backend/executor/nodeTidrangescan.c
@@ -128,9 +128,11 @@ TidExprListCreate(TidRangeScanState *tidrangestate)
* TidRangeEval
*
* Compute and set node's block and offset range to scan by evaluating
- * the trss_tidexprs. Returns false if we detect the range cannot
+ * node->trss_tidexprs. Returns false if we detect the range cannot
* contain any tuples. Returns true if it's possible for the range to
- * contain tuples.
+ * contain tuples. We don't bother validating that trss_mintid is less
+ * than or equal to trss_maxtid, as the scan_set_tidrange() table AM
+ * function will handle that.
* ----------------------------------------------------------------
*/
static bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..f1eb798f3e9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
+ XLogRecPtr restart_lsn pg_attribute_unused();
SpinLockAcquire(&MyReplicationSlot->mutex);
+ /* remember the old restart lsn */
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+
/*
* Prevent moving the confirmed_flush backwards, as this could lead to
* data duplication issues caused by replicating already replicated
@@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
+#ifdef USE_INJECTION_POINTS
+ XLogSegNo seg1,
+ seg2;
+
+ XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+ XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+ /* trigger injection point, but only if segment changes */
+ if (seg1 != seg2)
+ INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87fa9cb..c64f020742f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+ slot->last_saved_restart_lsn = InvalidXLogRecPtr;
slot->inactive_since = 0;
/*
@@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
restart_lsn < min_required))
@@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
s = &ReplicationSlotCtl->replication_slots[i];
@@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn == InvalidXLogRecPtr)
continue;
@@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void)
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+ Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
/*
* The replication slot mechanism is used to prevent removal of required
@@ -1766,6 +1810,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
*/
SpinLockAcquire(&s->mutex);
+ Assert(s->data.restart_lsn >= s->last_saved_restart_lsn);
+
restart_lsn = s->data.restart_lsn;
/* we do nothing if the slot is already invalid */
@@ -1835,7 +1881,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* just rely on .invalidated.
*/
if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+ {
s->data.restart_lsn = InvalidXLogRecPtr;
+ s->last_saved_restart_lsn = InvalidXLogRecPtr;
+ }
/* Let caller know */
*invalidated = true;
@@ -2079,6 +2128,12 @@ CheckPointReplicationSlots(bool is_shutdown)
SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);
+
+ /*
+ * Recompute the required LSN as SaveSlotToPath() updated
+ * last_saved_restart_lsn for slots.
+ */
+ ReplicationSlotsComputeRequiredLSN();
}
/*
@@ -2354,6 +2409,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
if (!slot->just_dirtied)
slot->dirty = false;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
SpinLockRelease(&slot->mutex);
LWLockRelease(&slot->io_in_progress_lock);
@@ -2569,6 +2625,7 @@ RestoreSlotFromDisk(const char *name)
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c
index cc312b641ca..b78048328e1 100644
--- a/src/backend/storage/aio/method_io_uring.c
+++ b/src/backend/storage/aio/method_io_uring.c
@@ -400,9 +400,9 @@ pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
while (true)
{
pgaio_debug_io(DEBUG3, ioh,
- "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
- (long long unsigned) ioh->generation,
- (long long unsigned) ref_generation,
+ "wait_one io_gen: %" PRIu64 ", ref_gen: %" PRIu64 ", cycle %d",
+ ioh->generation,
+ ref_generation,
waited);
if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index f2182e91825..c4b6214d618 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -712,9 +712,9 @@ usage(const char *progname)
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));
- printf(_(" --with-data dump the data\n"));
- printf(_(" --with-schema dump the schema\n"));
- printf(_(" --with-statistics dump the statistics\n"));
+ printf(_(" --with-data restore the data\n"));
+ printf(_(" --with-schema restore the schema\n"));
+ printf(_(" --with-statistics restore the statistics\n"));
printf(_("\nConnection options:\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 81a5ba844ba..1f7635d0c23 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -778,6 +778,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
int ssl_in_use,
password_used,
gssapi_used;
+ int version_num;
char *paramval;
if (!active_branch)
@@ -793,7 +794,9 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
/* Get values for the parameters */
host = PQhost(pset.db);
hostaddr = PQhostaddr(pset.db);
- protocol_version = psprintf("%d", PQprotocolVersion(pset.db));
+ version_num = PQfullProtocolVersion(pset.db);
+ protocol_version = psprintf("%d.%d", version_num / 10000,
+ version_num % 10000);
ssl_in_use = PQsslInUse(pset.db);
password_used = PQconnectionUsedPassword(pset.db);
gssapi_used = PQconnectionUsedGSSAPI(pset.db);
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 47352b7faed..b53cd8ab698 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -1867,18 +1867,30 @@ ExecQueryAndProcessResults(const char *query,
{
FILE *copy_stream = NULL;
- if (pset.piped_syncs > 1)
+ if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
/*
- * When reading COPY data, the backend ignores sync messages
- * and will not send a matching ReadyForQuery response. Even
- * if we adjust piped_syncs and requested_results, it is not
- * possible to salvage this as the sync message would still be
- * in libpq's command queue and we would be stuck in a busy
- * pipeline state. Thus, we abort the connection to avoid
- * this state.
+ * Running COPY within a pipeline can break the protocol
+ * synchronisation in multiple ways, and psql shows its limits
+ * when it comes to tracking this information.
+ *
+ * While in COPY mode, the backend process ignores additional
+ * Sync messages and will not send the matching ReadyForQuery
+ * expected by the frontend.
+ *
+ * Additionally, libpq automatically sends a Sync with the
+ * Copy message, creating an unexpected synchronisation point.
+ * A failure during COPY would leave the pipeline in an
+ * aborted state while the backend would be in a clean state,
+ * ready to process commands.
+ *
+ * Improving those issues would require modifications in how
+ * libpq handles pipelines and COPY. Hence, for the time
+ * being, we forbid the use of COPY within a pipeline,
+ * aborting the connection to avoid an inconsistent state on
+ * psql side if trying to use a COPY command.
*/
- pg_log_info("\\syncpipeline after COPY is not supported, aborting connection");
+ pg_log_info("COPY in a pipeline is not supported, aborting connection");
exit(EXIT_BADCONN);
}
diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c
index 403b51325a7..ce05b3a5132 100644
--- a/src/bin/psql/help.c
+++ b/src/bin/psql/help.c
@@ -463,8 +463,9 @@ helpVariables(unsigned short int pager)
" VERSION_NAME\n"
" VERSION_NUM\n"
" psql's version (in verbose string, short string, or numeric format)\n");
- HELP0(" WATCH_INTERVAL\n"
- " if set to a number, overrides the default two second \\watch interval\n");
+ HELPN(" WATCH_INTERVAL\n"
+ " number of seconds \\watch waits between executions (default %s)\n",
+ DEFAULT_WATCH_INTERVAL);
HELP0("\nDisplay settings:\n");
HELP0("Usage:\n");
diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl
index ae5c1d66405..f42c3961e09 100644
--- a/src/bin/psql/t/001_basic.pl
+++ b/src/bin/psql/t/001_basic.pl
@@ -483,8 +483,8 @@ psql_like($node, "copy (values ('foo'),('bar')) to stdout \\g | $pipe_cmd",
my $c4 = slurp_file($g_file);
like($c4, qr/foo.*bar/s);
-# Tests with pipelines. These trigger FATAL failures in the backend,
-# so they cannot be tested via SQL.
+# Test COPY within pipelines. These abort the connection from
+# the frontend so they cannot be tested via SQL.
$node->safe_psql('postgres', 'CREATE TABLE psql_pipeline()');
my $log_location = -s $node->logfile;
psql_fails_like(
@@ -493,53 +493,41 @@ psql_fails_like(
COPY psql_pipeline FROM STDIN;
SELECT 'val1';
\\syncpipeline
-\\getresults
\\endpipeline},
- qr/server closed the connection unexpectedly/,
- 'protocol sync loss in pipeline: direct COPY, SELECT, sync and getresult'
-);
+ qr/COPY in a pipeline is not supported, aborting connection/,
+ 'COPY FROM in pipeline: fails');
$node->wait_for_log(
qr/FATAL: .*terminating connection because protocol synchronization was lost/,
$log_location);
+# Remove \syncpipeline here.
psql_fails_like(
$node,
qq{\\startpipeline
-COPY psql_pipeline FROM STDIN \\bind \\sendpipeline
-SELECT 'val1' \\bind \\sendpipeline
-\\syncpipeline
-\\getresults
-\\endpipeline},
- qr/server closed the connection unexpectedly/,
- 'protocol sync loss in pipeline: bind COPY, SELECT, sync and getresult');
-
-# This time, test without the \getresults and \syncpipeline.
-psql_fails_like(
- $node,
- qq{\\startpipeline
-COPY psql_pipeline FROM STDIN;
+COPY psql_pipeline TO STDOUT;
SELECT 'val1';
\\endpipeline},
- qr/server closed the connection unexpectedly/,
- 'protocol sync loss in pipeline: COPY, SELECT and sync');
+ qr/COPY in a pipeline is not supported, aborting connection/,
+ 'COPY TO in pipeline: fails');
-# Tests sending a sync after a COPY TO/FROM. These abort the connection
-# from the frontend.
psql_fails_like(
$node,
qq{\\startpipeline
-COPY psql_pipeline FROM STDIN;
+\\copy psql_pipeline from stdin;
+SELECT 'val1';
\\syncpipeline
\\endpipeline},
- qr/\\syncpipeline after COPY is not supported, aborting connection/,
- 'sending sync after COPY FROM');
+ qr/COPY in a pipeline is not supported, aborting connection/,
+ '\copy from in pipeline: fails');
+
+# Sync attempt after a COPY TO/FROM.
psql_fails_like(
$node,
qq{\\startpipeline
-COPY psql_pipeline TO STDOUT;
+\\copy psql_pipeline to stdout;
\\syncpipeline
\\endpipeline},
- qr/\\syncpipeline after COPY is not supported, aborting connection/,
- 'sending sync after COPY TO');
+ qr/COPY in a pipeline is not supported, aborting connection/,
+ '\copy to in pipeline: fails');
done_testing();
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index eb0b93b1114..ffacba9d2ae 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -215,6 +215,14 @@ typedef struct ReplicationSlot
* recently stopped.
*/
TimestampTz inactive_since;
+
+ /*
+ * Latest restart_lsn that has been flushed to disk. For persistent slots
+ * the flushed LSN should be taken into account when calculating the
+ * oldest LSN for WAL segments removal.
+ */
+ XLogRecPtr last_saved_restart_lsn;
+
} ReplicationSlot;
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index cb983766c67..92429d28402 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -54,6 +54,8 @@ tests += {
't/043_no_contrecord_switch.pl',
't/044_invalidate_inactive_slots.pl',
't/045_archive_restartpoint.pl',
+ 't/046_checkpoint_logical_slot.pl',
+ 't/047_checkpoint_physical_slot.pl'
],
},
}
diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl
new file mode 100644
index 00000000000..b4265c4a6a5
--- /dev/null
+++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# This test verifies the case when the logical slot is advanced during
+# checkpoint. The test checks that the logical slot's restart_lsn still refers
+# to an existed WAL segment after immediate restart.
+#
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+
+$node = PostgreSQL::Test::Cluster->new('mike');
+$node->init;
+$node->append_conf('postgresql.conf',
+ "shared_preload_libraries = 'injection_points'");
+$node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+
+# Create a simple table to generate data into.
+$node->safe_psql('postgres',
+ q{create table t (id serial primary key, b text)});
+
+# Create the two slots we'll need.
+$node->safe_psql('postgres',
+ q{select pg_create_logical_replication_slot('slot_logical', 'test_decoding')}
+);
+$node->safe_psql('postgres',
+ q{select pg_create_physical_replication_slot('slot_physical', true)});
+
+# Advance both slots to the current position just to have everything "valid".
+$node->safe_psql('postgres',
+ q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null)}
+);
+$node->safe_psql('postgres',
+ q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run checkpoint to flush current state to disk and set a baseline.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Generate some transactions to get RUNNING_XACTS.
+my $xacts = $node->background_psql('postgres');
+$xacts->query_until(
+ qr/run_xacts/,
+ q(\echo run_xacts
+SELECT 1 \watch 0.1
+\q
+));
+
+# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+ q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+# Run another checkpoint to set a new restore LSN.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Another 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+ q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+# Run another checkpoint, this time in the background, and make it wait
+# on the injection point) so that the checkpoint stops right before
+# removing old WAL segments.
+note('starting checkpoint\n');
+
+my $checkpoint = $node->background_psql('postgres');
+$checkpoint->query_safe(
+ q(select injection_points_attach('checkpoint-before-old-wal-removal','wait'))
+);
+$checkpoint->query_until(
+ qr/starting_checkpoint/,
+ q(\echo starting_checkpoint
+checkpoint;
+\q
+));
+
+# Wait until the checkpoint stops right before removing WAL segments.
+note('waiting for injection_point\n');
+$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal');
+note('injection_point is reached');
+
+# Try to advance the logical slot, but make it stop when it moves to the next
+# WAL segment (this has to happen in the background, too).
+my $logical = $node->background_psql('postgres');
+$logical->query_safe(
+ q{select injection_points_attach('logical-replication-slot-advance-segment','wait');}
+);
+$logical->query_until(
+ qr/get_changes/,
+ q(
+\echo get_changes
+select count(*) from pg_logical_slot_get_changes('slot_logical', null, null) \watch 1
+\q
+));
+
+# Wait until the slot's restart_lsn points to the next WAL segment.
+note('waiting for injection_point\n');
+$node->wait_for_event('client backend',
+ 'logical-replication-slot-advance-segment');
+note('injection_point is reached');
+
+# OK, we're in the right situation: time to advance the physical slot, which
+# recalculates the required LSN, and then unblock the checkpoint, which
+# removes the WAL still needed by the logical slot.
+$node->safe_psql('postgres',
+ q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Continue the checkpoint.
+$node->safe_psql('postgres',
+ q{select injection_points_wakeup('checkpoint-before-old-wal-removal')});
+
+# Abruptly stop the server (1 second should be enough for the checkpoint
+# to finish; it would be better).
+$node->stop('immediate');
+
+$node->start;
+
+eval {
+ $node->safe_psql('postgres',
+ q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null);}
+ );
+};
+is($@, '', "Logical slot still valid");
+
+done_testing();
diff --git a/src/test/recovery/t/047_checkpoint_physical_slot.pl b/src/test/recovery/t/047_checkpoint_physical_slot.pl
new file mode 100644
index 00000000000..454e56b9bd2
--- /dev/null
+++ b/src/test/recovery/t/047_checkpoint_physical_slot.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# This test verifies the case when the physical slot is advanced during
+# checkpoint. The test checks that the physical slot's restart_lsn still refers
+# to an existed WAL segment after immediate restart.
+#
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+
+$node = PostgreSQL::Test::Cluster->new('mike');
+$node->init;
+$node->append_conf('postgresql.conf',
+ "shared_preload_libraries = 'injection_points'");
+$node->append_conf('postgresql.conf', "wal_level = 'replica'");
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+
+# Create a simple table to generate data into.
+$node->safe_psql('postgres',
+ q{create table t (id serial primary key, b text)});
+
+# Create a physical replication slot.
+$node->safe_psql('postgres',
+ q{select pg_create_physical_replication_slot('slot_physical', true)});
+
+# Advance slot to the current position, just to have everything "valid".
+$node->safe_psql('postgres',
+ q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run checkpoint to flush current state to disk and set a baseline.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+ q{insert into t (b) select md5(i::text) from generate_series(1,100000) s(i)}
+);
+
+# Advance slot to the current position, just to have everything "valid".
+$node->safe_psql('postgres',
+ q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run another checkpoint to set a new restore LSN.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Another 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+ q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+my $restart_lsn_init = $node->safe_psql('postgres',
+ q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn_init);
+note("restart lsn before checkpoint: $restart_lsn_init");
+
+# Run another checkpoint, this time in the background, and make it wait
+# on the injection point) so that the checkpoint stops right before
+# removing old WAL segments.
+note('starting checkpoint');
+
+my $checkpoint = $node->background_psql('postgres');
+$checkpoint->query_safe(
+ q{select injection_points_attach('checkpoint-before-old-wal-removal','wait')}
+);
+$checkpoint->query_until(
+ qr/starting_checkpoint/,
+ q(\echo starting_checkpoint
+checkpoint;
+\q
+));
+
+# Wait until the checkpoint stops right before removing WAL segments.
+note('waiting for injection_point');
+$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal');
+note('injection_point is reached');
+
+# OK, we're in the right situation: time to advance the physical slot, which
+# recalculates the required LSN and then unblock the checkpoint, which
+# removes the WAL still needed by the physical slot.
+$node->safe_psql('postgres',
+ q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Continue the checkpoint.
+$node->safe_psql('postgres',
+ q{select injection_points_wakeup('checkpoint-before-old-wal-removal')});
+
+my $restart_lsn_old = $node->safe_psql('postgres',
+ q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn_old);
+note("restart lsn before stop: $restart_lsn_old");
+
+# Abruptly stop the server (1 second should be enough for the checkpoint
+# to finish; it would be better).
+$node->stop('immediate');
+
+$node->start;
+
+# Get the restart_lsn of the slot right after restarting.
+my $restart_lsn = $node->safe_psql('postgres',
+ q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn);
+note("restart lsn: $restart_lsn");
+
+# Get the WAL segment name for the slot's restart_lsn.
+my $restart_lsn_segment = $node->safe_psql('postgres',
+ "SELECT pg_walfile_name('$restart_lsn'::pg_lsn)");
+chomp($restart_lsn_segment);
+
+# Check if the required wal segment exists.
+note("required by slot segment name: $restart_lsn_segment");
+my $datadir = $node->data_dir;
+ok( -f "$datadir/pg_wal/$restart_lsn_segment",
+ "WAL segment $restart_lsn_segment for physical slot's restart_lsn $restart_lsn exists"
+);
+
+done_testing();
diff --git a/src/test/regress/expected/psql_pipeline.out b/src/test/regress/expected/psql_pipeline.out
index a30dec088b9..e78e6bfa0ad 100644
--- a/src/test/regress/expected/psql_pipeline.out
+++ b/src/test/regress/expected/psql_pipeline.out
@@ -228,192 +228,6 @@ BEGIN \bind \sendpipeline
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \sendpipeline
COMMIT \bind \sendpipeline
\endpipeline
--- COPY FROM STDIN
--- with \sendpipeline and \bind
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\endpipeline
- ?column?
-----------
- val1
-(1 row)
-
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\endpipeline
- ?column?
-----------
- val1
-(1 row)
-
--- COPY FROM STDIN with \flushrequest + \getresults
--- with \sendpipeline and \bind
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\flushrequest
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-message type 0x5a arrived from server while idle
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\flushrequest
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-message type 0x5a arrived from server while idle
-\endpipeline
--- COPY FROM STDIN with \syncpipeline + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\syncpipeline
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\syncpipeline
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-\endpipeline
--- COPY TO STDOUT
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\endpipeline
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\endpipeline
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
--- COPY TO STDOUT with \flushrequest + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\flushrequest
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\flushrequest
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
-\endpipeline
--- COPY TO STDOUT with \syncpipeline + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\syncpipeline
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\syncpipeline
-\getresults
- ?column?
-----------
- val1
-(1 row)
-
-1 \N
-2 test2
-20 test2
-3 test3
-30 test3
-4 test4
-40 test4
-\endpipeline
-- Use \parse and \bind_named
\startpipeline
SELECT $1 \parse ''
@@ -740,7 +554,7 @@ SELECT COUNT(*) FROM psql_pipeline \bind \sendpipeline
count
-------
- 7
+ 1
(1 row)
-- After an error, pipeline is aborted and requires \syncpipeline to be
diff --git a/src/test/regress/sql/psql_pipeline.sql b/src/test/regress/sql/psql_pipeline.sql
index 16e1e1e84cd..5945eca1ef7 100644
--- a/src/test/regress/sql/psql_pipeline.sql
+++ b/src/test/regress/sql/psql_pipeline.sql
@@ -105,106 +105,6 @@ INSERT INTO psql_pipeline VALUES ($1) \bind 1 \sendpipeline
COMMIT \bind \sendpipeline
\endpipeline
--- COPY FROM STDIN
--- with \sendpipeline and \bind
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\endpipeline
-2 test2
-\.
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\endpipeline
-20 test2
-\.
-
--- COPY FROM STDIN with \flushrequest + \getresults
--- with \sendpipeline and \bind
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\flushrequest
-\getresults
-3 test3
-\.
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\flushrequest
-\getresults
-30 test3
-\.
-\endpipeline
-
--- COPY FROM STDIN with \syncpipeline + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-COPY psql_pipeline FROM STDIN \bind \sendpipeline
-\syncpipeline
-\getresults
-4 test4
-\.
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-COPY psql_pipeline FROM STDIN;
-\syncpipeline
-\getresults
-40 test4
-\.
-\endpipeline
-
--- COPY TO STDOUT
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\endpipeline
-
--- COPY TO STDOUT with \flushrequest + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\flushrequest
-\getresults
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\flushrequest
-\getresults
-\endpipeline
-
--- COPY TO STDOUT with \syncpipeline + \getresults
--- with \bind and \sendpipeline
-\startpipeline
-SELECT $1 \bind 'val1' \sendpipeline
-copy psql_pipeline TO STDOUT \bind \sendpipeline
-\syncpipeline
-\getresults
-\endpipeline
--- with semicolon
-\startpipeline
-SELECT 'val1';
-copy psql_pipeline TO STDOUT;
-\syncpipeline
-\getresults
-\endpipeline
-
-- Use \parse and \bind_named
\startpipeline
SELECT $1 \parse ''