Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2024-07-24 04:43:36 +0000
committerAmit Kapila2024-07-24 04:43:36 +0000
commit1462aad2e4474ab61174f8ab00992cd3d6d57c7b (patch)
tree9649b47a93c4b7257db1295f65f8d0213c5b3537 /src/backend/commands/subscriptioncmds.c
parent774d47b6c01a8b8111ae390b97343f25ebdf9267 (diff)
Allow altering of two_phase option of a SUBSCRIPTION.
The two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), so the slot option must also be modified. Changing the 'two_phase' option for a subscription from 'true' to 'false' is permitted only when there are no pending prepared transactions corresponding to that subscription. Otherwise, the changes of already prepared transactions can be replicated again along with their corresponding commit leading to duplicate data or errors. To avoid data loss, the 'two_phase' option for a subscription can only be changed from 'false' to 'true' once the initial data synchronization is completed. Therefore this is performed later by the logical replication worker. Author: Hayato Kuroda, Ajin Cherian, Amit Kapila Reviewed-by: Peter Smith, Hou Zhijie, Amit Kapila, Vitaly Davydov, Vignesh C Discussion: https://postgr.es/m/8fab8-65d74c80-1-2f28e880@39088166
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c169
1 files changed, 134 insertions, 35 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 16d83b32539..d124bfe55ca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
#include "access/htup_details.h"
#include "access/table.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
@@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel);
/*
@@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetStreamingMode(defel);
}
- else if (strcmp(defel->defname, "two_phase") == 0)
+ else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+ strcmp(defel->defname, "two_phase") == 0)
{
- /*
- * Do not allow toggling of two_phase option. Doing so could cause
- * missing of transactions and lead to an inconsistent replica.
- * See comments atop worker.c
- *
- * Note: Unsupported twophase indicates that this call originated
- * from AlterSubscription.
- */
- if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
-
if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
errorConflictingDefElem(defel, pstate);
@@ -1080,6 +1071,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
+ * Common checks for altering failover and two_phase options.
+ */
+static void
+CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel)
+{
+ /*
+ * The checks in this function are required only for failover and
+ * two_phase options.
+ */
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0);
+
+ /*
+ * Do not allow changing the option if the subscription is enabled. This
+ * is because both failover and two_phase options of the slot on the
+ * publisher cannot be modified if the slot is currently acquired by the
+ * existing walsender.
+ *
+ * Note that two_phase is enabled (aka changed from 'false' to 'true') on
+ * the publisher by the existing walsender, so we could have allowed that
+ * even when the subscription is enabled. But we kept this restriction for
+ * the sake of consistency and simplicity.
+ */
+ if (sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for enabled subscription",
+ option)));
+
+ if (slot_needs_update)
+ {
+ StringInfoData cmd;
+
+ /*
+ * A valid slot must be associated with the subscription for us to
+ * modify any of the slot's properties.
+ */
+ if (!sub->slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for a subscription that does not have a slot name",
+ option)));
+
+ /* The changed option of the slot can't be rolled back. */
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+ PreventInTransactionBlock(isTopLevel, cmd.data);
+ pfree(cmd.data);
+ }
+}
+
+/*
* Alter the existing subscription.
*/
ObjectAddress
@@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
HeapTuple tup;
Oid subid;
bool update_tuple = false;
+ bool update_failover = false;
+ bool update_two_phase = false;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_ORIGIN);
@@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
}
- if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
{
- if (!sub->slotname)
+ /*
+ * We need to update both the slot and the subscription
+ * for the two_phase option. We can enable the two_phase
+ * option for a slot only once the initial data
+ * synchronization is done. This is to avoid missing some
+ * data as explained in comments atop worker.c.
+ */
+ update_two_phase = !opts.twophase;
+
+ CheckAlterSubOption(sub, "two_phase", update_two_phase,
+ isTopLevel);
+
+ /*
+ * Modifying the two_phase slot option requires a slot
+ * lookup by slot name, so changing the slot name at the
+ * same time is not allowed.
+ */
+ if (update_two_phase &&
+ IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("slot_name and two_phase cannot be altered at the same time")));
+
+ /*
+ * Note that workers may still survive even if the
+ * subscription has been disabled.
+ *
+ * Ensure workers have already been exited to avoid
+ * getting prepared transactions while we are disabling
+ * the two_phase option. Otherwise, the changes of an
+ * already prepared transaction can be replicated again
+ * along with its corresponding commit, leading to
+ * duplicate data or errors.
+ */
+ if (logicalrep_workers_find(subid, true, true))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for a subscription that does not have a slot name",
- "failover")));
+ errmsg("cannot alter two_phase when logical replication worker is still running"),
+ errhint("Try again after some time.")));
/*
- * Do not allow changing the failover state if the
- * subscription is enabled. This is because the failover
- * state of the slot on the publisher cannot be modified
- * if the slot is currently acquired by the apply worker.
+ * two_phase cannot be disabled if there are any
+ * uncommitted prepared transactions present otherwise it
+ * can lead to duplicate data or errors as explained in
+ * the comment above.
*/
- if (sub->enabled)
+ if (update_two_phase &&
+ sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+ LookupGXactBySubid(subid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for enabled subscription",
- "failover")));
+ errmsg("cannot disable two_phase when prepared transactions are present"),
+ errhint("Resolve these transactions and try again.")));
+
+ /* Change system catalog accordingly */
+ values[Anum_pg_subscription_subtwophasestate - 1] =
+ CharGetDatum(opts.twophase ?
+ LOGICALREP_TWOPHASE_STATE_PENDING :
+ LOGICALREP_TWOPHASE_STATE_DISABLED);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+ }
+ if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ {
/*
- * The changed failover option of the slot can't be rolled
- * back.
+ * Similar to the two_phase case above, we need to update
+ * the failover option for both the slot and the
+ * subscription.
*/
- PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)");
+ update_failover = true;
+
+ CheckAlterSubOption(sub, "failover", update_failover,
+ isTopLevel);
values[Anum_pg_subscription_subfailover - 1] =
BoolGetDatum(opts.failover);
@@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering slot.
+ * Try to acquire the connection necessary for altering the slot, if
+ * needed.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (replaces[Anum_pg_subscription_subfailover - 1])
+ if (update_failover || update_two_phase)
{
bool must_use_password;
char *err;
@@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* New workers won't be started because we hold an exclusive lock on the
* subscription till the end of the transaction.
*/
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- subworkers = logicalrep_workers_find(subid, false);
- LWLockRelease(LogicalRepWorkerLock);
+ subworkers = logicalrep_workers_find(subid, false, true);
foreach(lc, subworkers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);