diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/nbtree/nbtsearch.c | 11 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 4 | ||||
-rw-r--r-- | src/backend/executor/execGrouping.c | 4 | ||||
-rw-r--r-- | src/backend/executor/nodeTidrangescan.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 18 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 57 | ||||
-rw-r--r-- | src/backend/storage/aio/method_io_uring.c | 6 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_restore.c | 6 | ||||
-rw-r--r-- | src/bin/psql/command.c | 5 | ||||
-rw-r--r-- | src/bin/psql/common.c | 30 | ||||
-rw-r--r-- | src/bin/psql/help.c | 5 | ||||
-rw-r--r-- | src/bin/psql/t/001_basic.pl | 46 | ||||
-rw-r--r-- | src/include/replication/slot.h | 8 | ||||
-rw-r--r-- | src/test/recovery/meson.build | 2 | ||||
-rw-r--r-- | src/test/recovery/t/046_checkpoint_logical_slot.pl | 139 | ||||
-rw-r--r-- | src/test/recovery/t/047_checkpoint_physical_slot.pl | 133 | ||||
-rw-r--r-- | src/test/regress/expected/psql_pipeline.out | 188 | ||||
-rw-r--r-- | src/test/regress/sql/psql_pipeline.sql | 100 |
18 files changed, 426 insertions, 342 deletions
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 070f14c8b91..36544ecfd58 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -2282,9 +2282,12 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir) * previously-saved right link or left link. lastcurrblkno is the page that * was current at the point where the blkno link was saved, which we use to * reason about concurrent page splits/page deletions during backwards scans. + * In the common case where seized=false, blkno is either so->currPos.nextPage + * or so->currPos.prevPage, and lastcurrblkno is so->currPos.currPage. * - * On entry, caller shouldn't hold any locks or pins on any page (we work - * directly off of blkno and lastcurrblkno instead). Parallel scan callers + * On entry, so->currPos shouldn't be locked by caller. so->currPos.buf must + * be InvalidBuffer/unpinned as needed by caller (note that lastcurrblkno + * won't need to be read again in almost all cases). Parallel scan callers * that seized the scan before calling here should pass seized=true; such a * caller's blkno and lastcurrblkno arguments come from the seized scan. * seized=false callers just pass us the blkno/lastcurrblkno taken from their @@ -2301,8 +2304,8 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir) * success exit (except during so->dropPin index scans, when we drop the pin * eagerly to avoid blocking VACUUM). * - * If there are no more matching records in the given direction, we drop all - * locks and pins, invalidate so->currPos, and return false. + * If there are no more matching records in the given direction, we invalidate + * so->currPos (while ensuring it retains no locks or pins), and return false. * * We always release the scan for a parallel scan caller, regardless of * success or failure; we'll call _bt_parallel_release as soon as possible. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1914859b2ee..47ffc0a2307 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7498,6 +7498,10 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); +#ifdef USE_INJECTION_POINTS + INJECTION_POINT("checkpoint-before-old-wal-removal", NULL); +#endif + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c index 255bd795361..b5400749353 100644 --- a/src/backend/executor/execGrouping.c +++ b/src/backend/executor/execGrouping.c @@ -144,7 +144,7 @@ execTuplesHashPrepare(int numCols, * hashfunctions: FmgrInfos of datatype-specific hashing functions to use * collations: collations to use in comparisons * nbuckets: initial estimate of hashtable size - * additionalsize: size of data stored in ->additional + * additionalsize: size of data that may be stored along with the hash entry * metacxt: memory context for long-lived allocation, but not per-entry data * tablecxt: memory context in which to store table entries * tempcxt: short-lived context for evaluation hash and comparison functions @@ -288,7 +288,7 @@ ResetTupleHashTable(TupleHashTable hashtable) * * If isnew isn't NULL, then a new entry is created if no existing entry * matches. On return, *isnew is true if the entry is newly created, - * false if it existed already. ->additional_data in the new entry has + * false if it existed already. The additional data in the new entry has * been zeroed. */ TupleHashEntry diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c index ab2eab9596e..26f7420b64b 100644 --- a/src/backend/executor/nodeTidrangescan.c +++ b/src/backend/executor/nodeTidrangescan.c @@ -128,9 +128,11 @@ TidExprListCreate(TidRangeScanState *tidrangestate) * TidRangeEval * * Compute and set node's block and offset range to scan by evaluating - * the trss_tidexprs. Returns false if we detect the range cannot + * node->trss_tidexprs. Returns false if we detect the range cannot * contain any tuples. Returns true if it's possible for the range to - * contain tuples. + * contain tuples. We don't bother validating that trss_mintid is less + * than or equal to trss_maxtid, as the scan_set_tidrange() table AM + * function will handle that. * ---------------------------------------------------------------- */ static bool diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1d56d0c4ef3..f1eb798f3e9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/xlog_internal.h" #include "access/xlogutils.h" #include "fmgr.h" #include "miscadmin.h" @@ -41,6 +42,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/memutils.h" @@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + XLogRecPtr restart_lsn pg_attribute_unused(); SpinLockAcquire(&MyReplicationSlot->mutex); + /* remember the old restart lsn */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + /* * Prevent moving the confirmed_flush backwards, as this could lead to * data duplication issues caused by replicating already replicated @@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) /* first write new xmin to disk, so we know what's up after a crash */ if (updated_xmin || updated_restart) { +#ifdef USE_INJECTION_POINTS + XLogSegNo seg1, + seg2; + + XLByteToSeg(restart_lsn, seg1, wal_segment_size); + XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size); + + /* trigger injection point, but only if segment changes */ + if (seg1 != seg2) + INJECTION_POINT("logical-replication-slot-advance-segment", NULL); +#endif + ReplicationSlotMarkDirty(); ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..c64f020742f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_saved_restart_lsn = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1766,6 +1810,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, */ SpinLockAcquire(&s->mutex); + Assert(s->data.restart_lsn >= s->last_saved_restart_lsn); + restart_lsn = s->data.restart_lsn; /* we do nothing if the slot is already invalid */ @@ -1835,7 +1881,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->last_saved_restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2079,6 +2128,12 @@ CheckPointReplicationSlots(bool is_shutdown) SaveSlotToPath(s, path, LOG); } LWLockRelease(ReplicationSlotAllocationLock); + + /* + * Recompute the required LSN as SaveSlotToPath() updated + * last_saved_restart_lsn for slots. + */ + ReplicationSlotsComputeRequiredLSN(); } /* @@ -2354,6 +2409,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2569,6 +2625,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index cc312b641ca..b78048328e1 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -400,9 +400,9 @@ pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation) while (true) { pgaio_debug_io(DEBUG3, ioh, - "wait_one io_gen: %llu, ref_gen: %llu, cycle %d", - (long long unsigned) ioh->generation, - (long long unsigned) ref_generation, + "wait_one io_gen: %" PRIu64 ", ref_gen: %" PRIu64 ", cycle %d", + ioh->generation, + ref_generation, waited); if (pgaio_io_was_recycled(ioh, ref_generation, &state) || diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index f2182e91825..c4b6214d618 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -712,9 +712,9 @@ usage(const char *progname) printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); - printf(_(" --with-data dump the data\n")); - printf(_(" --with-schema dump the schema\n")); - printf(_(" --with-statistics dump the statistics\n")); + printf(_(" --with-data restore the data\n")); + printf(_(" --with-schema restore the schema\n")); + printf(_(" --with-statistics restore the statistics\n")); printf(_("\nConnection options:\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 81a5ba844ba..1f7635d0c23 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -778,6 +778,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch) int ssl_in_use, password_used, gssapi_used; + int version_num; char *paramval; if (!active_branch) @@ -793,7 +794,9 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch) /* Get values for the parameters */ host = PQhost(pset.db); hostaddr = PQhostaddr(pset.db); - protocol_version = psprintf("%d", PQprotocolVersion(pset.db)); + version_num = PQfullProtocolVersion(pset.db); + protocol_version = psprintf("%d.%d", version_num / 10000, + version_num % 10000); ssl_in_use = PQsslInUse(pset.db); password_used = PQconnectionUsedPassword(pset.db); gssapi_used = PQconnectionUsedGSSAPI(pset.db); diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 47352b7faed..b53cd8ab698 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -1867,18 +1867,30 @@ ExecQueryAndProcessResults(const char *query, { FILE *copy_stream = NULL; - if (pset.piped_syncs > 1) + if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF) { /* - * When reading COPY data, the backend ignores sync messages - * and will not send a matching ReadyForQuery response. Even - * if we adjust piped_syncs and requested_results, it is not - * possible to salvage this as the sync message would still be - * in libpq's command queue and we would be stuck in a busy - * pipeline state. Thus, we abort the connection to avoid - * this state. + * Running COPY within a pipeline can break the protocol + * synchronisation in multiple ways, and psql shows its limits + * when it comes to tracking this information. + * + * While in COPY mode, the backend process ignores additional + * Sync messages and will not send the matching ReadyForQuery + * expected by the frontend. + * + * Additionally, libpq automatically sends a Sync with the + * Copy message, creating an unexpected synchronisation point. + * A failure during COPY would leave the pipeline in an + * aborted state while the backend would be in a clean state, + * ready to process commands. + * + * Improving those issues would require modifications in how + * libpq handles pipelines and COPY. Hence, for the time + * being, we forbid the use of COPY within a pipeline, + * aborting the connection to avoid an inconsistent state on + * psql side if trying to use a COPY command. */ - pg_log_info("\\syncpipeline after COPY is not supported, aborting connection"); + pg_log_info("COPY in a pipeline is not supported, aborting connection"); exit(EXIT_BADCONN); } diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index 403b51325a7..ce05b3a5132 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -463,8 +463,9 @@ helpVariables(unsigned short int pager) " VERSION_NAME\n" " VERSION_NUM\n" " psql's version (in verbose string, short string, or numeric format)\n"); - HELP0(" WATCH_INTERVAL\n" - " if set to a number, overrides the default two second \\watch interval\n"); + HELPN(" WATCH_INTERVAL\n" + " number of seconds \\watch waits between executions (default %s)\n", + DEFAULT_WATCH_INTERVAL); HELP0("\nDisplay settings:\n"); HELP0("Usage:\n"); diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl index ae5c1d66405..f42c3961e09 100644 --- a/src/bin/psql/t/001_basic.pl +++ b/src/bin/psql/t/001_basic.pl @@ -483,8 +483,8 @@ psql_like($node, "copy (values ('foo'),('bar')) to stdout \\g | $pipe_cmd", my $c4 = slurp_file($g_file); like($c4, qr/foo.*bar/s); -# Tests with pipelines. These trigger FATAL failures in the backend, -# so they cannot be tested via SQL. +# Test COPY within pipelines. These abort the connection from +# the frontend so they cannot be tested via SQL. $node->safe_psql('postgres', 'CREATE TABLE psql_pipeline()'); my $log_location = -s $node->logfile; psql_fails_like( @@ -493,53 +493,41 @@ psql_fails_like( COPY psql_pipeline FROM STDIN; SELECT 'val1'; \\syncpipeline -\\getresults \\endpipeline}, - qr/server closed the connection unexpectedly/, - 'protocol sync loss in pipeline: direct COPY, SELECT, sync and getresult' -); + qr/COPY in a pipeline is not supported, aborting connection/, + 'COPY FROM in pipeline: fails'); $node->wait_for_log( qr/FATAL: .*terminating connection because protocol synchronization was lost/, $log_location); +# Remove \syncpipeline here. psql_fails_like( $node, qq{\\startpipeline -COPY psql_pipeline FROM STDIN \\bind \\sendpipeline -SELECT 'val1' \\bind \\sendpipeline -\\syncpipeline -\\getresults -\\endpipeline}, - qr/server closed the connection unexpectedly/, - 'protocol sync loss in pipeline: bind COPY, SELECT, sync and getresult'); - -# This time, test without the \getresults and \syncpipeline. -psql_fails_like( - $node, - qq{\\startpipeline -COPY psql_pipeline FROM STDIN; +COPY psql_pipeline TO STDOUT; SELECT 'val1'; \\endpipeline}, - qr/server closed the connection unexpectedly/, - 'protocol sync loss in pipeline: COPY, SELECT and sync'); + qr/COPY in a pipeline is not supported, aborting connection/, + 'COPY TO in pipeline: fails'); -# Tests sending a sync after a COPY TO/FROM. These abort the connection -# from the frontend. psql_fails_like( $node, qq{\\startpipeline -COPY psql_pipeline FROM STDIN; +\\copy psql_pipeline from stdin; +SELECT 'val1'; \\syncpipeline \\endpipeline}, - qr/\\syncpipeline after COPY is not supported, aborting connection/, - 'sending sync after COPY FROM'); + qr/COPY in a pipeline is not supported, aborting connection/, + '\copy from in pipeline: fails'); + +# Sync attempt after a COPY TO/FROM. psql_fails_like( $node, qq{\\startpipeline -COPY psql_pipeline TO STDOUT; +\\copy psql_pipeline to stdout; \\syncpipeline \\endpipeline}, - qr/\\syncpipeline after COPY is not supported, aborting connection/, - 'sending sync after COPY TO'); + qr/COPY in a pipeline is not supported, aborting connection/, + '\copy to in pipeline: fails'); done_testing(); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index eb0b93b1114..ffacba9d2ae 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -215,6 +215,14 @@ typedef struct ReplicationSlot * recently stopped. */ TimestampTz inactive_since; + + /* + * Latest restart_lsn that has been flushed to disk. For persistent slots + * the flushed LSN should be taken into account when calculating the + * oldest LSN for WAL segments removal. + */ + XLogRecPtr last_saved_restart_lsn; + } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index cb983766c67..92429d28402 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -54,6 +54,8 @@ tests += { 't/043_no_contrecord_switch.pl', 't/044_invalidate_inactive_slots.pl', 't/045_archive_restartpoint.pl', + 't/046_checkpoint_logical_slot.pl', + 't/047_checkpoint_physical_slot.pl' ], }, } diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl new file mode 100644 index 00000000000..b4265c4a6a5 --- /dev/null +++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl @@ -0,0 +1,139 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# This test verifies the case when the logical slot is advanced during +# checkpoint. The test checks that the logical slot's restart_lsn still refers +# to an existed WAL segment after immediate restart. +# +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my ($node, $result); + +$node = PostgreSQL::Test::Cluster->new('mike'); +$node->init; +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node->append_conf('postgresql.conf', "wal_level = 'logical'"); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION injection_points)); + +# Create a simple table to generate data into. +$node->safe_psql('postgres', + q{create table t (id serial primary key, b text)}); + +# Create the two slots we'll need. +$node->safe_psql('postgres', + q{select pg_create_logical_replication_slot('slot_logical', 'test_decoding')} +); +$node->safe_psql('postgres', + q{select pg_create_physical_replication_slot('slot_physical', true)}); + +# Advance both slots to the current position just to have everything "valid". +$node->safe_psql('postgres', + q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null)} +); +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run checkpoint to flush current state to disk and set a baseline. +$node->safe_psql('postgres', q{checkpoint}); + +# Generate some transactions to get RUNNING_XACTS. +my $xacts = $node->background_psql('postgres'); +$xacts->query_until( + qr/run_xacts/, + q(\echo run_xacts +SELECT 1 \watch 0.1 +\q +)); + +# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +# Run another checkpoint to set a new restore LSN. +$node->safe_psql('postgres', q{checkpoint}); + +# Another 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +# Run another checkpoint, this time in the background, and make it wait +# on the injection point) so that the checkpoint stops right before +# removing old WAL segments. +note('starting checkpoint\n'); + +my $checkpoint = $node->background_psql('postgres'); +$checkpoint->query_safe( + q(select injection_points_attach('checkpoint-before-old-wal-removal','wait')) +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +\q +)); + +# Wait until the checkpoint stops right before removing WAL segments. +note('waiting for injection_point\n'); +$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal'); +note('injection_point is reached'); + +# Try to advance the logical slot, but make it stop when it moves to the next +# WAL segment (this has to happen in the background, too). +my $logical = $node->background_psql('postgres'); +$logical->query_safe( + q{select injection_points_attach('logical-replication-slot-advance-segment','wait');} +); +$logical->query_until( + qr/get_changes/, + q( +\echo get_changes +select count(*) from pg_logical_slot_get_changes('slot_logical', null, null) \watch 1 +\q +)); + +# Wait until the slot's restart_lsn points to the next WAL segment. +note('waiting for injection_point\n'); +$node->wait_for_event('client backend', + 'logical-replication-slot-advance-segment'); +note('injection_point is reached'); + +# OK, we're in the right situation: time to advance the physical slot, which +# recalculates the required LSN, and then unblock the checkpoint, which +# removes the WAL still needed by the logical slot. +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Continue the checkpoint. +$node->safe_psql('postgres', + q{select injection_points_wakeup('checkpoint-before-old-wal-removal')}); + +# Abruptly stop the server (1 second should be enough for the checkpoint +# to finish; it would be better). +$node->stop('immediate'); + +$node->start; + +eval { + $node->safe_psql('postgres', + q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null);} + ); +}; +is($@, '', "Logical slot still valid"); + +done_testing(); diff --git a/src/test/recovery/t/047_checkpoint_physical_slot.pl b/src/test/recovery/t/047_checkpoint_physical_slot.pl new file mode 100644 index 00000000000..454e56b9bd2 --- /dev/null +++ b/src/test/recovery/t/047_checkpoint_physical_slot.pl @@ -0,0 +1,133 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# This test verifies the case when the physical slot is advanced during +# checkpoint. The test checks that the physical slot's restart_lsn still refers +# to an existed WAL segment after immediate restart. +# +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my ($node, $result); + +$node = PostgreSQL::Test::Cluster->new('mike'); +$node->init; +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node->append_conf('postgresql.conf', "wal_level = 'replica'"); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION injection_points)); + +# Create a simple table to generate data into. +$node->safe_psql('postgres', + q{create table t (id serial primary key, b text)}); + +# Create a physical replication slot. +$node->safe_psql('postgres', + q{select pg_create_physical_replication_slot('slot_physical', true)}); + +# Advance slot to the current position, just to have everything "valid". +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run checkpoint to flush current state to disk and set a baseline. +$node->safe_psql('postgres', q{checkpoint}); + +# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,100000) s(i)} +); + +# Advance slot to the current position, just to have everything "valid". +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Run another checkpoint to set a new restore LSN. +$node->safe_psql('postgres', q{checkpoint}); + +# Another 2M rows; that's about 260MB (~20 segments) worth of WAL. +$node->safe_psql('postgres', + q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)} +); + +my $restart_lsn_init = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn_init); +note("restart lsn before checkpoint: $restart_lsn_init"); + +# Run another checkpoint, this time in the background, and make it wait +# on the injection point) so that the checkpoint stops right before +# removing old WAL segments. +note('starting checkpoint'); + +my $checkpoint = $node->background_psql('postgres'); +$checkpoint->query_safe( + q{select injection_points_attach('checkpoint-before-old-wal-removal','wait')} +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +\q +)); + +# Wait until the checkpoint stops right before removing WAL segments. +note('waiting for injection_point'); +$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal'); +note('injection_point is reached'); + +# OK, we're in the right situation: time to advance the physical slot, which +# recalculates the required LSN and then unblock the checkpoint, which +# removes the WAL still needed by the physical slot. +$node->safe_psql('postgres', + q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} +); + +# Continue the checkpoint. +$node->safe_psql('postgres', + q{select injection_points_wakeup('checkpoint-before-old-wal-removal')}); + +my $restart_lsn_old = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn_old); +note("restart lsn before stop: $restart_lsn_old"); + +# Abruptly stop the server (1 second should be enough for the checkpoint +# to finish; it would be better). +$node->stop('immediate'); + +$node->start; + +# Get the restart_lsn of the slot right after restarting. +my $restart_lsn = $node->safe_psql('postgres', + q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'} +); +chomp($restart_lsn); +note("restart lsn: $restart_lsn"); + +# Get the WAL segment name for the slot's restart_lsn. +my $restart_lsn_segment = $node->safe_psql('postgres', + "SELECT pg_walfile_name('$restart_lsn'::pg_lsn)"); +chomp($restart_lsn_segment); + +# Check if the required wal segment exists. +note("required by slot segment name: $restart_lsn_segment"); +my $datadir = $node->data_dir; +ok( -f "$datadir/pg_wal/$restart_lsn_segment", + "WAL segment $restart_lsn_segment for physical slot's restart_lsn $restart_lsn exists" +); + +done_testing(); diff --git a/src/test/regress/expected/psql_pipeline.out b/src/test/regress/expected/psql_pipeline.out index a30dec088b9..e78e6bfa0ad 100644 --- a/src/test/regress/expected/psql_pipeline.out +++ b/src/test/regress/expected/psql_pipeline.out @@ -228,192 +228,6 @@ BEGIN \bind \sendpipeline INSERT INTO psql_pipeline VALUES ($1) \bind 1 \sendpipeline COMMIT \bind \sendpipeline \endpipeline --- COPY FROM STDIN --- with \sendpipeline and \bind -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\endpipeline - ?column? ----------- - val1 -(1 row) - --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\endpipeline - ?column? ----------- - val1 -(1 row) - --- COPY FROM STDIN with \flushrequest + \getresults --- with \sendpipeline and \bind -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\flushrequest -\getresults - ?column? ----------- - val1 -(1 row) - -message type 0x5a arrived from server while idle -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\flushrequest -\getresults - ?column? ----------- - val1 -(1 row) - -message type 0x5a arrived from server while idle -\endpipeline --- COPY FROM STDIN with \syncpipeline + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\syncpipeline -\getresults - ?column? ----------- - val1 -(1 row) - -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\syncpipeline -\getresults - ?column? ----------- - val1 -(1 row) - -\endpipeline --- COPY TO STDOUT --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\endpipeline - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\endpipeline - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 --- COPY TO STDOUT with \flushrequest + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\flushrequest -\getresults - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\flushrequest -\getresults - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 -\endpipeline --- COPY TO STDOUT with \syncpipeline + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\syncpipeline -\getresults - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\syncpipeline -\getresults - ?column? ----------- - val1 -(1 row) - -1 \N -2 test2 -20 test2 -3 test3 -30 test3 -4 test4 -40 test4 -\endpipeline -- Use \parse and \bind_named \startpipeline SELECT $1 \parse '' @@ -740,7 +554,7 @@ SELECT COUNT(*) FROM psql_pipeline \bind \sendpipeline count ------- - 7 + 1 (1 row) -- After an error, pipeline is aborted and requires \syncpipeline to be diff --git a/src/test/regress/sql/psql_pipeline.sql b/src/test/regress/sql/psql_pipeline.sql index 16e1e1e84cd..5945eca1ef7 100644 --- a/src/test/regress/sql/psql_pipeline.sql +++ b/src/test/regress/sql/psql_pipeline.sql @@ -105,106 +105,6 @@ INSERT INTO psql_pipeline VALUES ($1) \bind 1 \sendpipeline COMMIT \bind \sendpipeline \endpipeline --- COPY FROM STDIN --- with \sendpipeline and \bind -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\endpipeline -2 test2 -\. --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\endpipeline -20 test2 -\. - --- COPY FROM STDIN with \flushrequest + \getresults --- with \sendpipeline and \bind -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\flushrequest -\getresults -3 test3 -\. -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\flushrequest -\getresults -30 test3 -\. -\endpipeline - --- COPY FROM STDIN with \syncpipeline + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -COPY psql_pipeline FROM STDIN \bind \sendpipeline -\syncpipeline -\getresults -4 test4 -\. -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -COPY psql_pipeline FROM STDIN; -\syncpipeline -\getresults -40 test4 -\. -\endpipeline - --- COPY TO STDOUT --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\endpipeline - --- COPY TO STDOUT with \flushrequest + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\flushrequest -\getresults -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\flushrequest -\getresults -\endpipeline - --- COPY TO STDOUT with \syncpipeline + \getresults --- with \bind and \sendpipeline -\startpipeline -SELECT $1 \bind 'val1' \sendpipeline -copy psql_pipeline TO STDOUT \bind \sendpipeline -\syncpipeline -\getresults -\endpipeline --- with semicolon -\startpipeline -SELECT 'val1'; -copy psql_pipeline TO STDOUT; -\syncpipeline -\getresults -\endpipeline - -- Use \parse and \bind_named \startpipeline SELECT $1 \parse '' |