Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 705e20f

Browse files
author
Amit Kapila
committed
Optionally disable subscriptions on error.
Logical replication apply workers for a subscription can easily get stuck in an infinite loop of attempting to apply a change, triggering an error (such as a constraint violation), exiting with the error written to the subscription server log, and restarting. To partially remedy the situation, this patch adds a new subscription option named 'disable_on_error'. To be consistent with old behavior, this option defaults to false. When true, both the tablesync worker and apply worker catch any errors thrown and disable the subscription in order to break the loop. The error is still also written in the logs. Once the subscription is disabled, users can either manually resolve the conflict/error or skip the conflicting transaction by using pg_replication_origin_advance() function. After resolving the conflict, users need to enable the subscription to allow apply process to proceed. Author: Osumi Takamichi and Mark Dilger Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
1 parent 369398e commit 705e20f

File tree

17 files changed

+421
-108
lines changed

17 files changed

+421
-108
lines changed

doc/src/sgml/catalogs.sgml

+10
Original file line numberDiff line numberDiff line change
@@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
77697769
</para></entry>
77707770
</row>
77717771

7772+
<row>
7773+
<entry role="catalog_table_entry"><para role="column_definition">
7774+
<structfield>subdisableonerr</structfield> <type>bool</type>
7775+
</para>
7776+
<para>
7777+
If true, the subscription will be disabled if one of its workers
7778+
detects an error
7779+
</para></entry>
7780+
</row>
7781+
77727782
<row>
77737783
<entry role="catalog_table_entry"><para role="column_definition">
77747784
<structfield>subconninfo</structfield> <type>text</type>

doc/src/sgml/logical-replication.sgml

+3-2
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
364364
the replication origin name can be found from the server log (LSN 0/14C0378 and
365365
replication origin <literal>pg_16395</literal> in the above case). To skip the
366366
transaction, the subscription needs to be disabled temporarily by
367-
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
368-
can be skipped by calling the
367+
<command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
368+
subscription can be used with the <literal>disable_on_error</literal> option.
369+
Then, the transaction can be skipped by calling the
369370
<link linkend="pg-replication-origin-advance">
370371
<function>pg_replication_origin_advance()</function></link> function with
371372
the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the

doc/src/sgml/ref/alter_subscription.sgml

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
204204
information. The parameters that can be altered
205205
are <literal>slot_name</literal>,
206206
<literal>synchronous_commit</literal>,
207-
<literal>binary</literal>, and
208-
<literal>streaming</literal>.
207+
<literal>binary</literal>, <literal>streaming</literal>, and
208+
<literal>disable_on_error</literal>.
209209
</para>
210210
</listitem>
211211
</varlistentry>

doc/src/sgml/ref/create_subscription.sgml

+12
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
290290

291291
</listitem>
292292
</varlistentry>
293+
294+
<varlistentry>
295+
<term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
296+
<listitem>
297+
<para>
298+
Specifies whether the subscription should be automatically disabled
299+
if any errors are detected by subscription workers during data
300+
replication from the publisher. The default is
301+
<literal>false</literal>.
302+
</para>
303+
</listitem>
304+
</varlistentry>
293305
</variablelist>
294306
</para>
295307

src/backend/catalog/pg_subscription.c

+40
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
6969
sub->binary = subform->subbinary;
7070
sub->stream = subform->substream;
7171
sub->twophasestate = subform->subtwophasestate;
72+
sub->disableonerr = subform->subdisableonerr;
7273

7374
/* Get conninfo */
7475
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -156,6 +157,45 @@ FreeSubscription(Subscription *sub)
156157
pfree(sub);
157158
}
158159

160+
/*
161+
* Disable the given subscription.
162+
*/
163+
void
164+
DisableSubscription(Oid subid)
165+
{
166+
Relation rel;
167+
bool nulls[Natts_pg_subscription];
168+
bool replaces[Natts_pg_subscription];
169+
Datum values[Natts_pg_subscription];
170+
HeapTuple tup;
171+
172+
/* Look up the subscription in the catalog */
173+
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
174+
tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
175+
176+
if (!HeapTupleIsValid(tup))
177+
elog(ERROR, "cache lookup failed for subscription %u", subid);
178+
179+
LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
180+
181+
/* Form a new tuple. */
182+
memset(values, 0, sizeof(values));
183+
memset(nulls, false, sizeof(nulls));
184+
memset(replaces, false, sizeof(replaces));
185+
186+
/* Set the subscription to disabled. */
187+
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
188+
replaces[Anum_pg_subscription_subenabled - 1] = true;
189+
190+
/* Update the catalog */
191+
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
192+
replaces);
193+
CatalogTupleUpdate(rel, &tup->t_self, tup);
194+
heap_freetuple(tup);
195+
196+
table_close(rel, NoLock);
197+
}
198+
159199
/*
160200
* get_subscription_oid - given a subscription name, look up the OID
161201
*

src/backend/catalog/system_views.sql

+2-1
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12611261
-- All columns of pg_subscription except subconninfo are publicly readable.
12621262
REVOKE ALL ON pg_subscription FROM public;
12631263
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
1264-
substream, subtwophasestate, subslotname, subsynccommit, subpublications)
1264+
substream, subtwophasestate, subdisableonerr, subslotname,
1265+
subsynccommit, subpublications)
12651266
ON pg_subscription TO public;
12661267

12671268
CREATE VIEW pg_stat_subscription_stats AS

src/backend/commands/subscriptioncmds.c

+25-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#define SUBOPT_BINARY 0x00000080
6262
#define SUBOPT_STREAMING 0x00000100
6363
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
64+
#define SUBOPT_DISABLE_ON_ERR 0x00000400
6465

6566
/* check if the 'val' has 'bits' set */
6667
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
8283
bool binary;
8384
bool streaming;
8485
bool twophase;
86+
bool disableonerr;
8587
} SubOpts;
8688

8789
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
130132
opts->streaming = false;
131133
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
132134
opts->twophase = false;
135+
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
136+
opts->disableonerr = false;
133137

134138
/* Parse options */
135139
foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
249253
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
250254
opts->twophase = defGetBoolean(defel);
251255
}
256+
else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
257+
strcmp(defel->defname, "disable_on_error") == 0)
258+
{
259+
if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
260+
errorConflictingDefElem(defel, pstate);
261+
262+
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
263+
opts->disableonerr = defGetBoolean(defel);
264+
}
252265
else
253266
ereport(ERROR,
254267
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
390403
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
391404
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
392405
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
393-
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
406+
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
407+
SUBOPT_DISABLE_ON_ERR);
394408
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
395409

396410
/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
464478
CharGetDatum(opts.twophase ?
465479
LOGICALREP_TWOPHASE_STATE_PENDING :
466480
LOGICALREP_TWOPHASE_STATE_DISABLED);
481+
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
467482
values[Anum_pg_subscription_subconninfo - 1] =
468483
CStringGetTextDatum(conninfo);
469484
if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
864879
{
865880
supported_opts = (SUBOPT_SLOT_NAME |
866881
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
867-
SUBOPT_STREAMING);
882+
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
868883

869884
parse_subscription_options(pstate, stmt->options,
870885
supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
913928
replaces[Anum_pg_subscription_substream - 1] = true;
914929
}
915930

931+
if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
932+
{
933+
values[Anum_pg_subscription_subdisableonerr - 1]
934+
= BoolGetDatum(opts.disableonerr);
935+
replaces[Anum_pg_subscription_subdisableonerr - 1]
936+
= true;
937+
}
938+
916939
update_tuple = true;
917940
break;
918941
}

0 commit comments

Comments
 (0)