diff options
author | Amit Kapila | 2024-01-30 11:01:09 +0000 |
---|---|---|
committer | Amit Kapila | 2024-01-30 11:19:28 +0000 |
commit | 776621a5e4796fa214b6b29a7ca134f6c138572a (patch) | |
tree | ba58dcb69a247bc073eca865879e730c5790ce3a /src/backend/commands/subscriptioncmds.c | |
parent | b527ebc1d37aa82b771dc9c76111bed1bce35a05 (diff) |
Add a failover option to subscriptions.
This commit introduces a new subscription option named 'failover', which
provides users with the ability to set the failover property of the
replication slot on the publisher when creating or altering a
subscription.
This uses the replication commands introduced by commit 7329240437 to
enable the failover option for a logical replication slot.
If the failover option is set to true, the associated replication slots
(i.e. the main slot and the table sync slots) in the upstream database are
enabled to be synchronized to the standbys. Note that the capability to
sync the replication slots will be added in subsequent commits.
Thanks to Masahiko Sawada for the design inputs.
Author: Shveta Malik, Hou Zhijie, Ajin Cherian
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 116 |
1 files changed, 111 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index eaf2ec3b362..b647a81fc86 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -69,8 +69,10 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 -#define SUBOPT_LSN 0x00002000 -#define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00002000 +#define SUBOPT_LSN 0x00004000 +#define SUBOPT_ORIGIN 0x00008000 + /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -95,6 +97,7 @@ typedef struct SubOpts bool disableonerr; bool passwordrequired; bool runasowner; + bool failover; char *origin; XLogRecPtr lsn; } SubOpts; @@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->passwordrequired = true; if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER)) opts->runasowner = false; + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RUN_AS_OWNER; opts->runasowner = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (opts->failover && + IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "failover = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; @@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); + values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - false, CRS_NOEXPORT_SNAPSHOT, NULL); + opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * If the slot_name is specified without the create_slot option, + * it is possible that the user intends to use an existing slot on + * the publisher, so here we alter the failover property of the + * slot to match the failover value in subscription. + * + * We do not need to change the failover to false if the server + * does not support failover (e.g. pre-PG17). + */ + else if (opts.slot_name && + (opts.failover || walrcv_server_version(wrconn) >= 170000)) + { + walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + opts.slot_name, opts.failover ? "true" : "false"))); + } } PG_FINALLY(); { @@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subrunasowner - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) + { + if (!sub->slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that does not have a slot name", + "failover"))); + + /* + * Do not allow changing the failover state if the + * subscription is enabled. This is because the failover + * state of the slot on the publisher cannot be modified + * if the slot is currently acquired by the apply worker. + */ + if (sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + "failover"))); + + values[Anum_pg_subscription_subfailover - 1] = + BoolGetDatum(opts.failover); + replaces[Anum_pg_subscription_subfailover - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, heap_freetuple(tup); } + /* + * Try to acquire the connection necessary for altering slot. + * + * This has to be at the end because otherwise if there is an error while + * doing the database operations we won't be able to rollback altered + * slot. + */ + if (replaces[Anum_pg_subscription_subfailover - 1]) + { + bool must_use_password; + char *err; + WalReceiverConn *wrconn; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + must_use_password = sub->passwordrequired && !sub->ownersuperuser; + wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + sub->slotname, opts.failover ? "true" : "false"))); + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + } + table_close(rel, RowExclusiveLock); ObjectAddressSet(myself, SubscriptionRelationId, subid); |