diff options
author | Amit Kapila | 2022-03-22 01:41:19 +0000 |
---|---|---|
committer | Amit Kapila | 2022-03-22 01:41:19 +0000 |
commit | 208c5d65bbd60e33e272964578cb74182ac726a8 (patch) | |
tree | 5f3a99783f4c7be35c16237c5b10ebc711b37293 /src/backend | |
parent | 315ae75e9b6da72456eaa44e55ace9ab1b95ef74 (diff) |
Add ALTER SUBSCRIPTION ... SKIP.
This feature allows skipping the transaction on subscriber nodes.
If incoming change violates any constraint, logical replication stops
until it's resolved. Currently, users need to either manually resolve the
conflict by updating a subscriber-side database or by using function
pg_replication_origin_advance() to skip the conflicting transaction. This
commit introduces a simpler way to skip the conflicting transactions.
The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX),
which allows the apply worker to skip the transaction finished at
specified LSN. The apply worker skips all data modification changes within
the transaction.
Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 1 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 2 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 73 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 233 |
5 files changed, 310 insertions, 8 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a6304f5f81a..0ff0982f7b2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->skiplsn = subform->subskiplsn; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bb1ac30cd19..bd48ee7bd25 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subdisableonerr, subslotname, + substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3922658bbca..e16f04626de 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -45,6 +45,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" /* @@ -62,6 +63,7 @@ #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 +#define SUBOPT_LSN 0x00000800 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +86,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + XLogRecPtr lsn; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_LSN) && + strcmp(defel->defname, "lsn") == 0) + { + char *lsn_str = defGetString(defel); + XLogRecPtr lsn; + + if (IsSet(opts->specified_opts, SUBOPT_LSN)) + errorConflictingDefElem(defel, pstate); + + /* Setting lsn = NONE is treated as resetting LSN */ + if (strcmp(lsn_str, "none") == 0) + lsn = InvalidXLogRecPtr; + else + { + /* Parse the argument as LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(lsn_str))); + + if (XLogRecPtrIsInvalid(lsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid WAL location (LSN): %s", lsn_str))); + } + + opts->specified_opts |= SUBOPT_LSN; + opts->lsn = lsn; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SKIP: + { + parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); + + /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */ + Assert(IsSet(opts.specified_opts, SUBOPT_LSN)); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + /* + * If the user sets subskiplsn, we do a sanity check to make + * sure that the specified LSN is a probable value. + */ + if (!XLogRecPtrIsInvalid(opts.lsn)) + { + RepOriginId originid; + char originname[NAMEDATALEN]; + XLogRecPtr remote_lsn; + + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, false); + remote_lsn = replorigin_get_progress(originid, false); + + /* Check the given LSN is at least a future LSN */ + if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X", + LSN_FORMAT_ARGS(opts.lsn), + LSN_FORMAT_ARGS(remote_lsn)))); + } + + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + update_tuple = true; + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a03b33b53bd..0036c2f9e2d 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9983,6 +9983,15 @@ AlterSubscriptionStmt: (Node *)makeBoolean(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name SKIP definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SKIP; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 03e069c7cdd..82dcffc2db8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -189,6 +190,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/syscache.h" @@ -259,6 +261,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for + * the subscription if the remote transaction's finish LSN matches the subskiplsn. + * Once we start skipping changes, we don't stop it until we skip all changes of + * the transaction even if pg_subscription is updated and MySubscription->skiplsn + * gets changed or reset during that. Also, in streaming transaction cases, we + * don't skip receiving and spooling the changes since we decide whether or not + * to skip applying the changes when starting to apply changes. The subskiplsn is + * cleared after successfully skipping the transaction or applying non-empty + * transaction. The latter prevents the mistakenly specified subskiplsn from + * being left. + */ +static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; +#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); +static void stop_skipping_changes(void); +static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + /* Functions for apply error callback */ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); @@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + maybe_start_skipping_changes(begin_data.final_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + maybe_start_skipping_changes(begin_data.prepare_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + clear_subscription_skip_lsn(prepare_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + + clear_subscription_skip_lsn(rollback_data.rollback_end_lsn); } pgstat_report_stat(false); @@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Similar to prepare case, the subskiplsn could be left in a case of + * server crash but it's okay. See the comments in apply_handle_prepare(). + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) MemoryContext oldcxt; BufFile *fd; + maybe_start_skipping_changes(lsn); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1455,9 +1503,27 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(LogicalRepCommitData *commit_data) { + if (is_skipping_changes()) + { + stop_skipping_changes(); + + /* + * Start a new transaction to clear the subskiplsn, if not started + * yet. + */ + if (!IsTransactionState()) + StartTransactionCommand(); + } + if (IsTransactionState()) { /* + * The transaction is either non-empty or skipped, so we clear the + * subskiplsn. + */ + clear_subscription_skip_lsn(commit_data->commit_lsn); + + /* * Update origin state so we can restart streaming from correct * position in case of crash. */ @@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3738,6 +3824,139 @@ IsLogicalWorker(void) return MyLogicalRepWorker != NULL; } +/* + * Start skipping changes of the transaction if the given LSN matches the + * LSN specified by subscription's skiplsn. + */ +static void +maybe_start_skipping_changes(XLogRecPtr finish_lsn) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || + MySubscription->skiplsn != finish_lsn)) + return; + + /* Start skipping all changes of this transaction */ + skip_xact_finish_lsn = finish_lsn; + + ereport(LOG, + errmsg("start skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn))); +} + +/* + * Stop skipping changes by resetting skip_xact_finish_lsn if enabled. + */ +static void +stop_skipping_changes(void) +{ + if (!is_skipping_changes()) + return; + + ereport(LOG, + (errmsg("done skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn)))); + + /* Stop skipping changes */ + skip_xact_finish_lsn = InvalidXLogRecPtr; +} + +/* + * Clear subskiplsn of pg_subscription catalog. + * + * finish_lsn is the transaction's finish LSN that is used to check if the + * subskiplsn matches it. If not matched, we raise a warning when clearing the + * subskiplsn in order to inform users for cases e.g., where the user mistakenly + * specified the wrong subskiplsn. + */ +static void +clear_subscription_skip_lsn(XLogRecPtr finish_lsn) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + XLogRecPtr myskiplsn = MySubscription->skiplsn; + bool started_tx = false; + + if (likely(XLogRecPtrIsInvalid(myskiplsn))) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Protect subskiplsn of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* + * Clear the subskiplsn. If the user has already changed subskiplsn before + * clearing it we don't update the catalog and the replication origin + * state won't get advanced. So in the worst case, if the server crashes + * before sending an acknowledgment of the flush position the transaction + * will be sent again and the user needs to set subskiplsn again. We can + * reduce the possibility by logging a replication origin WAL record to + * advance the origin LSN instead but there is no way to advance the + * origin timestamp and it doesn't seem to be worth doing anything about + * it since it's a very rare case. + */ + if (subform->subskiplsn == myskiplsn) + { + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subskiplsn */ + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + if (myskiplsn != finish_lsn) + ereport(WARNING, + errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name), + errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X", + LSN_FORMAT_ARGS(finish_lsn), + LSN_FORMAT_ARGS(myskiplsn))); + } + + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} + /* Error callback to give more context info about the change being applied */ static void apply_error_callback(void *arg) |