diff options
author | Andres Freund | 2025-03-18 14:52:33 +0000 |
---|---|---|
committer | Andres Freund | 2025-03-18 15:54:01 +0000 |
commit | 55b454d0e14084c841a034073abbf1a0ea937a45 (patch) | |
tree | 4bdd85a6acb02123b35bfeb1c33c1c071be30ba1 /src/backend | |
parent | 549ea06e4217aca10d3a73dc09cf5018c51bc23a (diff) |
aio: Infrastructure for io_method=worker
This commit contains the basic, system-wide, infrastructure for
io_method=worker. It does not yet actually execute IO, this commit just
provides the infrastructure for running IO workers, kept separate for easier
review.
The number of IO workers can be adjusted with a PGC_SIGHUP GUC. Eventually
we'd like to make the number of workers dynamically scale up/down based on the
current "IO load".
To allow the number of IO workers to be increased without a restart, we need
to reserve PGPROC entries for the workers unconditionally. This has been
judged to be worth the cost. If it turns out to be problematic, we can
introduce a PGC_POSTMASTER GUC to control the maximum number.
As io workers might be needed during shutdown, e.g. for AIO during the
shutdown checkpoint, a new PMState phase is added. IO workers are shut down
after the shutdown checkpoint has been performed and walsender/archiver have
shut down, but before the checkpointer itself shuts down. See also
87a6690cc69.
Updates PGSTAT_FILE_FORMAT_ID due to the addition of a new BackendType.
Reviewed-by: Noah Misch <noah@leadboat.com>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Co-authored-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/postmaster/launch_backend.c | 2 | ||||
-rw-r--r-- | src/backend/postmaster/pmchild.c | 1 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 174 | ||||
-rw-r--r-- | src/backend/storage/aio/Makefile | 1 | ||||
-rw-r--r-- | src/backend/storage/aio/meson.build | 1 | ||||
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 88 | ||||
-rw-r--r-- | src/backend/tcop/postgres.c | 7 | ||||
-rw-r--r-- | src/backend/utils/activity/pgstat_backend.c | 1 | ||||
-rw-r--r-- | src/backend/utils/activity/pgstat_io.c | 1 | ||||
-rw-r--r-- | src/backend/utils/activity/wait_event_names.txt | 1 | ||||
-rw-r--r-- | src/backend/utils/init/miscinit.c | 3 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 13 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 1 |
13 files changed, 282 insertions, 12 deletions
diff --git a/src/backend/postmaster/launch_backend.c b/src/backend/postmaster/launch_backend.c index 77fb877dbad..bf6b55ee830 100644 --- a/src/backend/postmaster/launch_backend.c +++ b/src/backend/postmaster/launch_backend.c @@ -48,6 +48,7 @@ #include "replication/slotsync.h" #include "replication/walreceiver.h" #include "storage/dsm.h" +#include "storage/io_worker.h" #include "storage/pg_shmem.h" #include "tcop/backend_startup.h" #include "utils/memutils.h" @@ -197,6 +198,7 @@ static child_process_kind child_process_kinds[] = { [B_ARCHIVER] = {"archiver", PgArchiverMain, true}, [B_BG_WRITER] = {"bgwriter", BackgroundWriterMain, true}, [B_CHECKPOINTER] = {"checkpointer", CheckpointerMain, true}, + [B_IO_WORKER] = {"io_worker", IoWorkerMain, true}, [B_STARTUP] = {"startup", StartupProcessMain, true}, [B_WAL_RECEIVER] = {"wal_receiver", WalReceiverMain, true}, [B_WAL_SUMMARIZER] = {"wal_summarizer", WalSummarizerMain, true}, diff --git a/src/backend/postmaster/pmchild.c b/src/backend/postmaster/pmchild.c index 0d473226c3a..cde1d23a4ca 100644 --- a/src/backend/postmaster/pmchild.c +++ b/src/backend/postmaster/pmchild.c @@ -101,6 +101,7 @@ InitPostmasterChildSlots(void) pmchild_pools[B_AUTOVAC_WORKER].size = autovacuum_worker_slots; pmchild_pools[B_BG_WORKER].size = max_worker_processes; + pmchild_pools[B_IO_WORKER].size = MAX_IO_WORKERS; /* * There can be only one of each of these running at a time. They each diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d13846298bd..a0c37532d2f 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -108,9 +108,12 @@ #include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/walsender.h" +#include "storage/aio_subsys.h" #include "storage/fd.h" +#include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/proc.h" #include "tcop/backend_startup.h" #include "tcop/tcopprot.h" #include "utils/datetime.h" @@ -340,6 +343,7 @@ typedef enum * ckpt */ PM_WAIT_XLOG_ARCHIVAL, /* waiting for archiver and walsenders to * finish */ + PM_WAIT_IO_WORKERS, /* waiting for io workers to exit */ PM_WAIT_CHECKPOINTER, /* waiting for checkpointer to shut down */ PM_WAIT_DEAD_END, /* waiting for dead-end children to exit */ PM_NO_CHILDREN, /* all important children have exited */ @@ -402,6 +406,10 @@ bool LoadedSSL = false; static DNSServiceRef bonjour_sdref = NULL; #endif +/* State for IO worker management. */ +static int io_worker_count = 0; +static PMChild *io_worker_children[MAX_IO_WORKERS]; + /* * postmaster.c - function prototypes */ @@ -436,6 +444,8 @@ static void TerminateChildren(int signal); static int CountChildren(BackendTypeMask targetMask); static void LaunchMissingBackgroundProcesses(void); static void maybe_start_bgworkers(void); +static bool maybe_reap_io_worker(int pid); +static void maybe_adjust_io_workers(void); static bool CreateOptsFile(int argc, char *argv[], char *fullprogname); static PMChild *StartChildProcess(BackendType type); static void StartSysLogger(void); @@ -1365,6 +1375,11 @@ PostmasterMain(int argc, char *argv[]) */ AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_STARTING); + UpdatePMState(PM_STARTUP); + + /* Make sure we can perform I/O while starting up. */ + maybe_adjust_io_workers(); + /* Start bgwriter and checkpointer so they can help with recovery */ if (CheckpointerPMChild == NULL) CheckpointerPMChild = StartChildProcess(B_CHECKPOINTER); @@ -1377,7 +1392,6 @@ PostmasterMain(int argc, char *argv[]) StartupPMChild = StartChildProcess(B_STARTUP); Assert(StartupPMChild != NULL); StartupStatus = STARTUP_RUNNING; - UpdatePMState(PM_STARTUP); /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -2502,6 +2516,16 @@ process_pm_child_exit(void) continue; } + /* Was it an IO worker? */ + if (maybe_reap_io_worker(pid)) + { + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, _("io worker")); + + maybe_adjust_io_workers(); + continue; + } + /* * Was it a backend or a background worker? */ @@ -2723,6 +2747,7 @@ HandleFatalError(QuitSignalReason reason, bool consider_sigabrt) case PM_WAIT_XLOG_SHUTDOWN: case PM_WAIT_XLOG_ARCHIVAL: case PM_WAIT_CHECKPOINTER: + case PM_WAIT_IO_WORKERS: /* * NB: Similar code exists in PostmasterStateMachine()'s handling @@ -2905,20 +2930,21 @@ PostmasterStateMachine(void) /* * If we are doing crash recovery or an immediate shutdown then we - * expect archiver, checkpointer and walsender to exit as well, - * otherwise not. + * expect archiver, checkpointer, io workers and walsender to exit as + * well, otherwise not. */ if (FatalError || Shutdown >= ImmediateShutdown) targetMask = btmask_add(targetMask, B_CHECKPOINTER, B_ARCHIVER, + B_IO_WORKER, B_WAL_SENDER); /* - * Normally walsenders and archiver will continue running; they will - * be terminated later after writing the checkpoint record. We also - * let dead-end children to keep running for now. The syslogger - * process exits last. + * Normally archiver, checkpointer, IO workers and walsenders will + * continue running; they will be terminated later after writing the + * checkpoint record. We also let dead-end children to keep running + * for now. The syslogger process exits last. * * This assertion checks that we have covered all backend types, * either by including them in targetMask, or by noting here that they @@ -2933,12 +2959,13 @@ PostmasterStateMachine(void) B_LOGGER); /* - * Archiver, checkpointer and walsender may or may not be in - * targetMask already. + * Archiver, checkpointer, IO workers, and walsender may or may + * not be in targetMask already. */ remainMask = btmask_add(remainMask, B_ARCHIVER, B_CHECKPOINTER, + B_IO_WORKER, B_WAL_SENDER); /* these are not real postmaster children */ @@ -3039,11 +3066,25 @@ PostmasterStateMachine(void) { /* * PM_WAIT_XLOG_ARCHIVAL state ends when there are no children other - * than checkpointer, dead-end children and logger left. There + * than checkpointer, io workers and dead-end children left. There * shouldn't be any regular backends left by now anyway; what we're * really waiting for is for walsenders and archiver to exit. */ - if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_LOGGER, B_DEAD_END_BACKEND)) == 0) + if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_IO_WORKER, + B_LOGGER, B_DEAD_END_BACKEND)) == 0) + { + UpdatePMState(PM_WAIT_IO_WORKERS); + SignalChildren(SIGUSR2, btmask(B_IO_WORKER)); + } + } + + if (pmState == PM_WAIT_IO_WORKERS) + { + /* + * PM_WAIT_IO_WORKERS state ends when there's only checkpointer and + * dead_end children left. + */ + if (io_worker_count == 0) { UpdatePMState(PM_WAIT_CHECKPOINTER); @@ -3171,10 +3212,14 @@ PostmasterStateMachine(void) /* re-create shared memory and semaphores */ CreateSharedMemoryAndSemaphores(); + UpdatePMState(PM_STARTUP); + + /* Make sure we can perform I/O while starting up. */ + maybe_adjust_io_workers(); + StartupPMChild = StartChildProcess(B_STARTUP); Assert(StartupPMChild != NULL); StartupStatus = STARTUP_RUNNING; - UpdatePMState(PM_STARTUP); /* crash recovery started, reset SIGKILL flag */ AbortStartTime = 0; @@ -3198,6 +3243,7 @@ pmstate_name(PMState state) PM_TOSTR_CASE(PM_WAIT_BACKENDS); PM_TOSTR_CASE(PM_WAIT_XLOG_SHUTDOWN); PM_TOSTR_CASE(PM_WAIT_XLOG_ARCHIVAL); + PM_TOSTR_CASE(PM_WAIT_IO_WORKERS); PM_TOSTR_CASE(PM_WAIT_DEAD_END); PM_TOSTR_CASE(PM_WAIT_CHECKPOINTER); PM_TOSTR_CASE(PM_NO_CHILDREN); @@ -3236,6 +3282,16 @@ LaunchMissingBackgroundProcesses(void) StartSysLogger(); /* + * The number of configured workers might have changed, or a prior start + * of a worker might have failed. Check if we need to start/stop any + * workers. + * + * A config file change will always lead to this function being called, so + * we always will process the config change in a timely manner. + */ + maybe_adjust_io_workers(); + + /* * The checkpointer and the background writer are active from the start, * until shutdown is initiated. * @@ -4120,6 +4176,7 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_WAIT_DEAD_END: case PM_WAIT_XLOG_ARCHIVAL: case PM_WAIT_XLOG_SHUTDOWN: + case PM_WAIT_IO_WORKERS: case PM_WAIT_BACKENDS: case PM_STOP_BACKENDS: break; @@ -4270,6 +4327,99 @@ maybe_start_bgworkers(void) } } +static bool +maybe_reap_io_worker(int pid) +{ + for (int id = 0; id < MAX_IO_WORKERS; ++id) + { + if (io_worker_children[id] && + io_worker_children[id]->pid == pid) + { + ReleasePostmasterChildSlot(io_worker_children[id]); + + --io_worker_count; + io_worker_children[id] = NULL; + return true; + } + } + return false; +} + +/* + * Start or stop IO workers, to close the gap between the number of running + * workers and the number of configured workers. Used to respond to change of + * the io_workers GUC (by increasing and decreasing the number of workers), as + * well as workers terminating in response to errors (by starting + * "replacement" workers). + */ +static void +maybe_adjust_io_workers(void) +{ + if (!pgaio_workers_enabled()) + return; + + /* + * If we're in final shutting down state, then we're just waiting for all + * processes to exit. + */ + if (pmState >= PM_WAIT_IO_WORKERS) + return; + + /* Don't start new workers during an immediate shutdown either. */ + if (Shutdown >= ImmediateShutdown) + return; + + /* + * Don't start new workers if we're in the shutdown phase of a crash + * restart. But we *do* need to start if we're already starting up again. + */ + if (FatalError && pmState >= PM_STOP_BACKENDS) + return; + + Assert(pmState < PM_WAIT_IO_WORKERS); + + /* Not enough running? */ + while (io_worker_count < io_workers) + { + PMChild *child; + int id; + + /* find unused entry in io_worker_children array */ + for (id = 0; id < MAX_IO_WORKERS; ++id) + { + if (io_worker_children[id] == NULL) + break; + } + if (id == MAX_IO_WORKERS) + elog(ERROR, "could not find a free IO worker ID"); + + /* Try to launch one. */ + child = StartChildProcess(B_IO_WORKER); + if (child != NULL) + { + io_worker_children[id] = child; + ++io_worker_count; + } + else + break; /* XXX try again soon? */ + } + + /* Too many running? */ + if (io_worker_count > io_workers) + { + /* ask the IO worker in the highest slot to exit */ + for (int id = MAX_IO_WORKERS - 1; id >= 0; --id) + { + if (io_worker_children[id] != NULL) + { + kill(io_worker_children[id]->pid, SIGUSR2); + break; + } + } + } +} + + /* * When a backend asks to be notified about worker state changes, we * set a flag in its backend entry. The background worker machinery needs diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile index 89f821ea7e1..f51c34a37f8 100644 --- a/src/backend/storage/aio/Makefile +++ b/src/backend/storage/aio/Makefile @@ -15,6 +15,7 @@ OBJS = \ aio_io.o \ aio_target.o \ method_sync.o \ + method_worker.o \ read_stream.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build index 2c26089d52e..74f94c6e40b 100644 --- a/src/backend/storage/aio/meson.build +++ b/src/backend/storage/aio/meson.build @@ -7,5 +7,6 @@ backend_sources += files( 'aio_io.c', 'aio_target.c', 'method_sync.c', + 'method_worker.c', 'read_stream.c', ) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c new file mode 100644 index 00000000000..0ef9ef93e2b --- /dev/null +++ b/src/backend/storage/aio/method_worker.c @@ -0,0 +1,88 @@ +/*------------------------------------------------------------------------- + * + * method_worker.c + * AIO - perform AIO using worker processes + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/method_worker.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "postmaster/auxprocess.h" +#include "postmaster/interrupt.h" +#include "storage/aio_subsys.h" +#include "storage/io_worker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/wait_event.h" + + +/* GUCs */ +int io_workers = 3; + + +void +IoWorkerMain(const void *startup_data, size_t startup_data_len) +{ + sigjmp_buf local_sigjmp_buf; + + MyBackendType = B_IO_WORKER; + AuxiliaryProcessMainCommon(); + + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, die); /* to allow manually triggering worker restart */ + + /* + * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the + * shutdown sequence, similar to checkpointer. + */ + pqsignal(SIGTERM, SIG_IGN); + /* SIGQUIT handler was already set up by InitPostmasterChild */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SignalHandlerForShutdownRequest); + + /* see PostgresMain() */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + error_context_stack = NULL; + HOLD_INTERRUPTS(); + + EmitErrorReport(); + + proc_exit(1); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + while (!ShutdownRequestPending) + { + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, + WAIT_EVENT_IO_WORKER_MAIN); + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + proc_exit(0); +} + +bool +pgaio_workers_enabled(void) +{ + /* placeholder for future commit */ + return false; +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 55ab2da299b..0554a4ae3c7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3316,6 +3316,13 @@ ProcessInterrupts(void) (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating background worker \"%s\" due to administrator command", MyBgworkerEntry->bgw_type))); + else if (AmIoWorkerProcess()) + { + ereport(DEBUG1, + (errmsg_internal("io worker shutting down due to administrator command"))); + + proc_exit(0); + } else ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/pgstat_backend.c b/src/backend/utils/activity/pgstat_backend.c index a8cb54a7732..5518a18e060 100644 --- a/src/backend/utils/activity/pgstat_backend.c +++ b/src/backend/utils/activity/pgstat_backend.c @@ -375,6 +375,7 @@ pgstat_tracks_backend_bktype(BackendType bktype) case B_LOGGER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_IO_WORKER: case B_STARTUP: return false; diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index eb575025596..c8de9c9e2d3 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -376,6 +376,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_BG_WORKER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_IO_WORKER: case B_SLOTSYNC_WORKER: case B_STANDALONE_BACKEND: case B_STARTUP: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index b44e4908b25..3f6dc3876b4 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -57,6 +57,7 @@ BGWRITER_HIBERNATE "Waiting in background writer process, hibernating." BGWRITER_MAIN "Waiting in main loop of background writer process." CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." CHECKPOINTER_SHUTDOWN "Waiting for checkpointer process to be terminated." +IO_WORKER_MAIN "Waiting in main loop of IO Worker process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index dc3521457c7..43b4dbccc3d 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -293,6 +293,9 @@ GetBackendTypeDesc(BackendType backendType) case B_CHECKPOINTER: backendDesc = gettext_noop("checkpointer"); break; + case B_IO_WORKER: + backendDesc = gettext_noop("io worker"); + break; case B_LOGGER: backendDesc = gettext_noop("logger"); break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4984d12606c..c89316ce294 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -75,6 +75,7 @@ #include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" +#include "storage/io_worker.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" #include "storage/predicate.h" @@ -3268,6 +3269,18 @@ struct config_int ConfigureNamesInt[] = }, { + {"io_workers", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Number of IO worker processes, for io_method=worker."), + NULL, + }, + &io_workers, + 3, 1, MAX_IO_WORKERS, + NULL, NULL, NULL + }, + + { {"backend_flush_after", PGC_USERSET, RESOURCES_IO, gettext_noop("Number of pages after which previously performed writes are flushed to disk."), gettext_noop("0 disables forced writeback."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index db44fa563b5..7d0bf1dc006 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -207,6 +207,7 @@ # can execute simultaneously # -1 sets based on shared_buffers # (change requires restart) +#io_workers = 3 # 1-32; # - Worker Processes - |