Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/postmaster/launch_backend.c2
-rw-r--r--src/backend/postmaster/pmchild.c1
-rw-r--r--src/backend/postmaster/postmaster.c174
-rw-r--r--src/backend/storage/aio/Makefile1
-rw-r--r--src/backend/storage/aio/meson.build1
-rw-r--r--src/backend/storage/aio/method_worker.c88
-rw-r--r--src/backend/tcop/postgres.c7
-rw-r--r--src/backend/utils/activity/pgstat_backend.c1
-rw-r--r--src/backend/utils/activity/pgstat_io.c1
-rw-r--r--src/backend/utils/activity/wait_event_names.txt1
-rw-r--r--src/backend/utils/init/miscinit.c3
-rw-r--r--src/backend/utils/misc/guc_tables.c13
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
13 files changed, 282 insertions, 12 deletions
diff --git a/src/backend/postmaster/launch_backend.c b/src/backend/postmaster/launch_backend.c
index 77fb877dbad..bf6b55ee830 100644
--- a/src/backend/postmaster/launch_backend.c
+++ b/src/backend/postmaster/launch_backend.c
@@ -48,6 +48,7 @@
#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "storage/dsm.h"
+#include "storage/io_worker.h"
#include "storage/pg_shmem.h"
#include "tcop/backend_startup.h"
#include "utils/memutils.h"
@@ -197,6 +198,7 @@ static child_process_kind child_process_kinds[] = {
[B_ARCHIVER] = {"archiver", PgArchiverMain, true},
[B_BG_WRITER] = {"bgwriter", BackgroundWriterMain, true},
[B_CHECKPOINTER] = {"checkpointer", CheckpointerMain, true},
+ [B_IO_WORKER] = {"io_worker", IoWorkerMain, true},
[B_STARTUP] = {"startup", StartupProcessMain, true},
[B_WAL_RECEIVER] = {"wal_receiver", WalReceiverMain, true},
[B_WAL_SUMMARIZER] = {"wal_summarizer", WalSummarizerMain, true},
diff --git a/src/backend/postmaster/pmchild.c b/src/backend/postmaster/pmchild.c
index 0d473226c3a..cde1d23a4ca 100644
--- a/src/backend/postmaster/pmchild.c
+++ b/src/backend/postmaster/pmchild.c
@@ -101,6 +101,7 @@ InitPostmasterChildSlots(void)
pmchild_pools[B_AUTOVAC_WORKER].size = autovacuum_worker_slots;
pmchild_pools[B_BG_WORKER].size = max_worker_processes;
+ pmchild_pools[B_IO_WORKER].size = MAX_IO_WORKERS;
/*
* There can be only one of each of these running at a time. They each
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index d13846298bd..a0c37532d2f 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -108,9 +108,12 @@
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/walsender.h"
+#include "storage/aio_subsys.h"
#include "storage/fd.h"
+#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/proc.h"
#include "tcop/backend_startup.h"
#include "tcop/tcopprot.h"
#include "utils/datetime.h"
@@ -340,6 +343,7 @@ typedef enum
* ckpt */
PM_WAIT_XLOG_ARCHIVAL, /* waiting for archiver and walsenders to
* finish */
+ PM_WAIT_IO_WORKERS, /* waiting for io workers to exit */
PM_WAIT_CHECKPOINTER, /* waiting for checkpointer to shut down */
PM_WAIT_DEAD_END, /* waiting for dead-end children to exit */
PM_NO_CHILDREN, /* all important children have exited */
@@ -402,6 +406,10 @@ bool LoadedSSL = false;
static DNSServiceRef bonjour_sdref = NULL;
#endif
+/* State for IO worker management. */
+static int io_worker_count = 0;
+static PMChild *io_worker_children[MAX_IO_WORKERS];
+
/*
* postmaster.c - function prototypes
*/
@@ -436,6 +444,8 @@ static void TerminateChildren(int signal);
static int CountChildren(BackendTypeMask targetMask);
static void LaunchMissingBackgroundProcesses(void);
static void maybe_start_bgworkers(void);
+static bool maybe_reap_io_worker(int pid);
+static void maybe_adjust_io_workers(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static PMChild *StartChildProcess(BackendType type);
static void StartSysLogger(void);
@@ -1365,6 +1375,11 @@ PostmasterMain(int argc, char *argv[])
*/
AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_STARTING);
+ UpdatePMState(PM_STARTUP);
+
+ /* Make sure we can perform I/O while starting up. */
+ maybe_adjust_io_workers();
+
/* Start bgwriter and checkpointer so they can help with recovery */
if (CheckpointerPMChild == NULL)
CheckpointerPMChild = StartChildProcess(B_CHECKPOINTER);
@@ -1377,7 +1392,6 @@ PostmasterMain(int argc, char *argv[])
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
StartupStatus = STARTUP_RUNNING;
- UpdatePMState(PM_STARTUP);
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
@@ -2502,6 +2516,16 @@ process_pm_child_exit(void)
continue;
}
+ /* Was it an IO worker? */
+ if (maybe_reap_io_worker(pid))
+ {
+ if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
+ HandleChildCrash(pid, exitstatus, _("io worker"));
+
+ maybe_adjust_io_workers();
+ continue;
+ }
+
/*
* Was it a backend or a background worker?
*/
@@ -2723,6 +2747,7 @@ HandleFatalError(QuitSignalReason reason, bool consider_sigabrt)
case PM_WAIT_XLOG_SHUTDOWN:
case PM_WAIT_XLOG_ARCHIVAL:
case PM_WAIT_CHECKPOINTER:
+ case PM_WAIT_IO_WORKERS:
/*
* NB: Similar code exists in PostmasterStateMachine()'s handling
@@ -2905,20 +2930,21 @@ PostmasterStateMachine(void)
/*
* If we are doing crash recovery or an immediate shutdown then we
- * expect archiver, checkpointer and walsender to exit as well,
- * otherwise not.
+ * expect archiver, checkpointer, io workers and walsender to exit as
+ * well, otherwise not.
*/
if (FatalError || Shutdown >= ImmediateShutdown)
targetMask = btmask_add(targetMask,
B_CHECKPOINTER,
B_ARCHIVER,
+ B_IO_WORKER,
B_WAL_SENDER);
/*
- * Normally walsenders and archiver will continue running; they will
- * be terminated later after writing the checkpoint record. We also
- * let dead-end children to keep running for now. The syslogger
- * process exits last.
+ * Normally archiver, checkpointer, IO workers and walsenders will
+ * continue running; they will be terminated later after writing the
+ * checkpoint record. We also let dead-end children to keep running
+ * for now. The syslogger process exits last.
*
* This assertion checks that we have covered all backend types,
* either by including them in targetMask, or by noting here that they
@@ -2933,12 +2959,13 @@ PostmasterStateMachine(void)
B_LOGGER);
/*
- * Archiver, checkpointer and walsender may or may not be in
- * targetMask already.
+ * Archiver, checkpointer, IO workers, and walsender may or may
+ * not be in targetMask already.
*/
remainMask = btmask_add(remainMask,
B_ARCHIVER,
B_CHECKPOINTER,
+ B_IO_WORKER,
B_WAL_SENDER);
/* these are not real postmaster children */
@@ -3039,11 +3066,25 @@ PostmasterStateMachine(void)
{
/*
* PM_WAIT_XLOG_ARCHIVAL state ends when there are no children other
- * than checkpointer, dead-end children and logger left. There
+ * than checkpointer, io workers and dead-end children left. There
* shouldn't be any regular backends left by now anyway; what we're
* really waiting for is for walsenders and archiver to exit.
*/
- if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_LOGGER, B_DEAD_END_BACKEND)) == 0)
+ if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_IO_WORKER,
+ B_LOGGER, B_DEAD_END_BACKEND)) == 0)
+ {
+ UpdatePMState(PM_WAIT_IO_WORKERS);
+ SignalChildren(SIGUSR2, btmask(B_IO_WORKER));
+ }
+ }
+
+ if (pmState == PM_WAIT_IO_WORKERS)
+ {
+ /*
+ * PM_WAIT_IO_WORKERS state ends when there's only checkpointer and
+ * dead_end children left.
+ */
+ if (io_worker_count == 0)
{
UpdatePMState(PM_WAIT_CHECKPOINTER);
@@ -3171,10 +3212,14 @@ PostmasterStateMachine(void)
/* re-create shared memory and semaphores */
CreateSharedMemoryAndSemaphores();
+ UpdatePMState(PM_STARTUP);
+
+ /* Make sure we can perform I/O while starting up. */
+ maybe_adjust_io_workers();
+
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
StartupStatus = STARTUP_RUNNING;
- UpdatePMState(PM_STARTUP);
/* crash recovery started, reset SIGKILL flag */
AbortStartTime = 0;
@@ -3198,6 +3243,7 @@ pmstate_name(PMState state)
PM_TOSTR_CASE(PM_WAIT_BACKENDS);
PM_TOSTR_CASE(PM_WAIT_XLOG_SHUTDOWN);
PM_TOSTR_CASE(PM_WAIT_XLOG_ARCHIVAL);
+ PM_TOSTR_CASE(PM_WAIT_IO_WORKERS);
PM_TOSTR_CASE(PM_WAIT_DEAD_END);
PM_TOSTR_CASE(PM_WAIT_CHECKPOINTER);
PM_TOSTR_CASE(PM_NO_CHILDREN);
@@ -3236,6 +3282,16 @@ LaunchMissingBackgroundProcesses(void)
StartSysLogger();
/*
+ * The number of configured workers might have changed, or a prior start
+ * of a worker might have failed. Check if we need to start/stop any
+ * workers.
+ *
+ * A config file change will always lead to this function being called, so
+ * we always will process the config change in a timely manner.
+ */
+ maybe_adjust_io_workers();
+
+ /*
* The checkpointer and the background writer are active from the start,
* until shutdown is initiated.
*
@@ -4120,6 +4176,7 @@ bgworker_should_start_now(BgWorkerStartTime start_time)
case PM_WAIT_DEAD_END:
case PM_WAIT_XLOG_ARCHIVAL:
case PM_WAIT_XLOG_SHUTDOWN:
+ case PM_WAIT_IO_WORKERS:
case PM_WAIT_BACKENDS:
case PM_STOP_BACKENDS:
break;
@@ -4270,6 +4327,99 @@ maybe_start_bgworkers(void)
}
}
+static bool
+maybe_reap_io_worker(int pid)
+{
+ for (int id = 0; id < MAX_IO_WORKERS; ++id)
+ {
+ if (io_worker_children[id] &&
+ io_worker_children[id]->pid == pid)
+ {
+ ReleasePostmasterChildSlot(io_worker_children[id]);
+
+ --io_worker_count;
+ io_worker_children[id] = NULL;
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ * Start or stop IO workers, to close the gap between the number of running
+ * workers and the number of configured workers. Used to respond to change of
+ * the io_workers GUC (by increasing and decreasing the number of workers), as
+ * well as workers terminating in response to errors (by starting
+ * "replacement" workers).
+ */
+static void
+maybe_adjust_io_workers(void)
+{
+ if (!pgaio_workers_enabled())
+ return;
+
+ /*
+ * If we're in final shutting down state, then we're just waiting for all
+ * processes to exit.
+ */
+ if (pmState >= PM_WAIT_IO_WORKERS)
+ return;
+
+ /* Don't start new workers during an immediate shutdown either. */
+ if (Shutdown >= ImmediateShutdown)
+ return;
+
+ /*
+ * Don't start new workers if we're in the shutdown phase of a crash
+ * restart. But we *do* need to start if we're already starting up again.
+ */
+ if (FatalError && pmState >= PM_STOP_BACKENDS)
+ return;
+
+ Assert(pmState < PM_WAIT_IO_WORKERS);
+
+ /* Not enough running? */
+ while (io_worker_count < io_workers)
+ {
+ PMChild *child;
+ int id;
+
+ /* find unused entry in io_worker_children array */
+ for (id = 0; id < MAX_IO_WORKERS; ++id)
+ {
+ if (io_worker_children[id] == NULL)
+ break;
+ }
+ if (id == MAX_IO_WORKERS)
+ elog(ERROR, "could not find a free IO worker ID");
+
+ /* Try to launch one. */
+ child = StartChildProcess(B_IO_WORKER);
+ if (child != NULL)
+ {
+ io_worker_children[id] = child;
+ ++io_worker_count;
+ }
+ else
+ break; /* XXX try again soon? */
+ }
+
+ /* Too many running? */
+ if (io_worker_count > io_workers)
+ {
+ /* ask the IO worker in the highest slot to exit */
+ for (int id = MAX_IO_WORKERS - 1; id >= 0; --id)
+ {
+ if (io_worker_children[id] != NULL)
+ {
+ kill(io_worker_children[id]->pid, SIGUSR2);
+ break;
+ }
+ }
+ }
+}
+
+
/*
* When a backend asks to be notified about worker state changes, we
* set a flag in its backend entry. The background worker machinery needs
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index 89f821ea7e1..f51c34a37f8 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -15,6 +15,7 @@ OBJS = \
aio_io.o \
aio_target.o \
method_sync.o \
+ method_worker.o \
read_stream.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
index 2c26089d52e..74f94c6e40b 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -7,5 +7,6 @@ backend_sources += files(
'aio_io.c',
'aio_target.c',
'method_sync.c',
+ 'method_worker.c',
'read_stream.c',
)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
new file mode 100644
index 00000000000..0ef9ef93e2b
--- /dev/null
+++ b/src/backend/storage/aio/method_worker.c
@@ -0,0 +1,88 @@
+/*-------------------------------------------------------------------------
+ *
+ * method_worker.c
+ * AIO - perform AIO using worker processes
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/aio/method_worker.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "postmaster/auxprocess.h"
+#include "postmaster/interrupt.h"
+#include "storage/aio_subsys.h"
+#include "storage/io_worker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "tcop/tcopprot.h"
+#include "utils/wait_event.h"
+
+
+/* GUCs */
+int io_workers = 3;
+
+
+void
+IoWorkerMain(const void *startup_data, size_t startup_data_len)
+{
+ sigjmp_buf local_sigjmp_buf;
+
+ MyBackendType = B_IO_WORKER;
+ AuxiliaryProcessMainCommon();
+
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
+
+ /*
+ * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
+ * shutdown sequence, similar to checkpointer.
+ */
+ pqsignal(SIGTERM, SIG_IGN);
+ /* SIGQUIT handler was already set up by InitPostmasterChild */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
+
+ /* see PostgresMain() */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ error_context_stack = NULL;
+ HOLD_INTERRUPTS();
+
+ EmitErrorReport();
+
+ proc_exit(1);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+ while (!ShutdownRequestPending)
+ {
+ WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+ WAIT_EVENT_IO_WORKER_MAIN);
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ proc_exit(0);
+}
+
+bool
+pgaio_workers_enabled(void)
+{
+ /* placeholder for future commit */
+ return false;
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 55ab2da299b..0554a4ae3c7 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3316,6 +3316,13 @@ ProcessInterrupts(void)
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating background worker \"%s\" due to administrator command",
MyBgworkerEntry->bgw_type)));
+ else if (AmIoWorkerProcess())
+ {
+ ereport(DEBUG1,
+ (errmsg_internal("io worker shutting down due to administrator command")));
+
+ proc_exit(0);
+ }
else
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/backend/utils/activity/pgstat_backend.c b/src/backend/utils/activity/pgstat_backend.c
index a8cb54a7732..5518a18e060 100644
--- a/src/backend/utils/activity/pgstat_backend.c
+++ b/src/backend/utils/activity/pgstat_backend.c
@@ -375,6 +375,7 @@ pgstat_tracks_backend_bktype(BackendType bktype)
case B_LOGGER:
case B_BG_WRITER:
case B_CHECKPOINTER:
+ case B_IO_WORKER:
case B_STARTUP:
return false;
diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c
index eb575025596..c8de9c9e2d3 100644
--- a/src/backend/utils/activity/pgstat_io.c
+++ b/src/backend/utils/activity/pgstat_io.c
@@ -376,6 +376,7 @@ pgstat_tracks_io_bktype(BackendType bktype)
case B_BG_WORKER:
case B_BG_WRITER:
case B_CHECKPOINTER:
+ case B_IO_WORKER:
case B_SLOTSYNC_WORKER:
case B_STANDALONE_BACKEND:
case B_STARTUP:
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index b44e4908b25..3f6dc3876b4 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -57,6 +57,7 @@ BGWRITER_HIBERNATE "Waiting in background writer process, hibernating."
BGWRITER_MAIN "Waiting in main loop of background writer process."
CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process."
CHECKPOINTER_SHUTDOWN "Waiting for checkpointer process to be terminated."
+IO_WORKER_MAIN "Waiting in main loop of IO Worker process."
LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index dc3521457c7..43b4dbccc3d 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -293,6 +293,9 @@ GetBackendTypeDesc(BackendType backendType)
case B_CHECKPOINTER:
backendDesc = gettext_noop("checkpointer");
break;
+ case B_IO_WORKER:
+ backendDesc = gettext_noop("io worker");
+ break;
case B_LOGGER:
backendDesc = gettext_noop("logger");
break;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 4984d12606c..c89316ce294 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -75,6 +75,7 @@
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/io_worker.h"
#include "storage/large_object.h"
#include "storage/pg_shmem.h"
#include "storage/predicate.h"
@@ -3268,6 +3269,18 @@ struct config_int ConfigureNamesInt[] =
},
{
+ {"io_workers",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Number of IO worker processes, for io_method=worker."),
+ NULL,
+ },
+ &io_workers,
+ 3, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
{"backend_flush_after", PGC_USERSET, RESOURCES_IO,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
gettext_noop("0 disables forced writeback."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index db44fa563b5..7d0bf1dc006 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -207,6 +207,7 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
+#io_workers = 3 # 1-32;
# - Worker Processes -