diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/origin.c | 66 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 12 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 |
4 files changed, 48 insertions, 38 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a3c7adbf1a8..10677da56b2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -31,7 +31,7 @@ #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" -#include "replication/slot.h" +#include "replication/origin.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -325,10 +325,10 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, subname))); /* Report this after the initial starting message for consistency. */ - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("cannot start logical replication workers when \"max_replication_slots\"=0"))); + errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0"))); /* * We need to do the modification of the shared memory under lock so that diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index c3c1d7a2a51..6583dd497da 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -90,6 +90,7 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/guc.h" #include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -99,6 +100,9 @@ #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint" #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp" +/* GUC variables */ +int max_active_replication_origins = 10; + /* * Replay progress of a single remote node. */ @@ -151,7 +155,7 @@ typedef struct ReplicationStateCtl { /* Tranche to use for per-origin LWLocks */ int tranche_id; - /* Array of length max_replication_slots */ + /* Array of length max_active_replication_origins */ ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; @@ -162,10 +166,7 @@ TimestampTz replorigin_session_origin_timestamp = 0; /* * Base address into a shared memory array of replication states of size - * max_replication_slots. - * - * XXX: Should we use a separate variable to size this rather than - * max_replication_slots? + * max_active_replication_origins. */ static ReplicationState *replication_states; @@ -186,12 +187,12 @@ static ReplicationState *session_replication_state = NULL; #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) static void -replorigin_check_prerequisites(bool check_slots, bool recoveryOK) +replorigin_check_prerequisites(bool check_origins, bool recoveryOK) { - if (check_slots && max_replication_slots == 0) + if (check_origins && max_active_replication_origins == 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0"))); + errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0"))); if (!recoveryOK && RecoveryInProgress()) ereport(ERROR, @@ -352,7 +353,7 @@ replorigin_state_clear(RepOriginId roident, bool nowait) restart: LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state = &replication_states[i]; @@ -511,18 +512,13 @@ ReplicationOriginShmemSize(void) { Size size = 0; - /* - * XXX: max_replication_slots is arguably the wrong thing to use, as here - * we keep the replay state of *remote* transactions. But for now it seems - * sufficient to reuse it, rather than introduce a separate GUC. - */ - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return size; size = add_size(size, offsetof(ReplicationStateCtl, states)); size = add_size(size, - mul_size(max_replication_slots, sizeof(ReplicationState))); + mul_size(max_active_replication_origins, sizeof(ReplicationState))); return size; } @@ -531,7 +527,7 @@ ReplicationOriginShmemInit(void) { bool found; - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; replication_states_ctl = (ReplicationStateCtl *) @@ -548,7 +544,7 @@ ReplicationOriginShmemInit(void) replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE; - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); @@ -570,7 +566,7 @@ ReplicationOriginShmemInit(void) * * So its just the magic, followed by the statically sized * ReplicationStateOnDisk structs. Note that the maximum number of - * ReplicationState is determined by max_replication_slots. + * ReplicationState is determined by max_active_replication_origins. * --------------------------------------------------------------------------- */ void @@ -583,7 +579,7 @@ CheckPointReplicationOrigin(void) uint32 magic = REPLICATION_STATE_MAGIC; pg_crc32c crc; - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; INIT_CRC32C(crc); @@ -625,7 +621,7 @@ CheckPointReplicationOrigin(void) LWLockAcquire(ReplicationOriginLock, LW_SHARED); /* write actual data */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationStateOnDisk disk_state; ReplicationState *curstate = &replication_states[i]; @@ -718,7 +714,7 @@ StartupReplicationOrigin(void) already_started = true; #endif - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; INIT_CRC32C(crc); @@ -728,8 +724,8 @@ StartupReplicationOrigin(void) fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); /* - * might have had max_replication_slots == 0 last run, or we just brought - * up a standby. + * might have had max_active_replication_origins == 0 last run, or we just + * brought up a standby. */ if (fd < 0 && errno == ENOENT) return; @@ -796,10 +792,10 @@ StartupReplicationOrigin(void) COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); - if (last_state == max_replication_slots) + if (last_state == max_active_replication_origins) ereport(PANIC, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("could not find free replication state, increase \"max_replication_slots\""))); + errmsg("could not find free replication state, increase \"max_active_replication_origins\""))); /* copy data to shared memory */ replication_states[last_state].roident = disk_state.roident; @@ -852,7 +848,7 @@ replorigin_redo(XLogReaderState *record) xlrec = (xl_replorigin_drop *) XLogRecGetData(record); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state = &replication_states[i]; @@ -917,7 +913,7 @@ replorigin_advance(RepOriginId node, * Search for either an existing slot for the origin, or a free one we can * use. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *curstate = &replication_states[i]; @@ -958,7 +954,7 @@ replorigin_advance(RepOriginId node, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with ID %d", node), - errhint("Increase \"max_replication_slots\" and try again."))); + errhint("Increase \"max_active_replication_origins\" and try again."))); if (replication_state == NULL) { @@ -1024,7 +1020,7 @@ replorigin_get_progress(RepOriginId node, bool flush) /* prevent slots from being concurrently dropped */ LWLockAcquire(ReplicationOriginLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state; @@ -1110,7 +1106,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) registered_cleanup = true; } - Assert(max_replication_slots > 0); + Assert(max_active_replication_origins > 0); if (session_replication_state != NULL) ereport(ERROR, @@ -1124,7 +1120,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) * Search for either an existing slot for the origin, or a free one we can * use. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *curstate = &replication_states[i]; @@ -1159,7 +1155,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with ID %d", node), - errhint("Increase \"max_replication_slots\" and try again."))); + errhint("Increase \"max_active_replication_origins\" and try again."))); else if (session_replication_state == NULL) { /* initialize new slot */ @@ -1195,7 +1191,7 @@ replorigin_session_reset(void) { ConditionVariable *cv; - Assert(max_replication_slots != 0); + Assert(max_active_replication_origins != 0); if (session_replication_state == NULL) ereport(ERROR, @@ -1536,7 +1532,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS) * filled. Note that we do not take any locks, so slightly corrupted/out * of date values are a possibility. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state; Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 97cfd6e5a82..17d28f458f2 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3375,6 +3375,18 @@ struct config_int ConfigureNamesInt[] = }, { + {"max_active_replication_origins", + PGC_POSTMASTER, + REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the maximum number of active replication origins."), + NULL + }, + &max_active_replication_origins, + 10, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " "log file rotation."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9f31e4071c7..0b9e3066bde 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -389,6 +389,8 @@ # These settings are ignored on a publisher. +#max_active_replication_origins = 10 # max number of active replication origins + # (change requires restart) #max_logical_replication_workers = 4 # taken from max_worker_processes # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers |