Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 24ad7e2

Browse files
MasaoFujiiCommitfest Bot
authored and
Commitfest Bot
committed
Add per-subscription wal_receiver_timeout setting.
This commit allows setting wal_receiver_timeout per subscription using the CREATE SUBSCRIPTION and ALTER SUBSCRIPTION commands. The value is stored in the subwalrcvtimeout column of the pg_subscription catalog. When set, this value overrides the global wal_receiver_timeout for the subscription’s apply worker. The default is -1, which means the global setting (from the server configuration, command line, role, or database) remains in effect. This feature is useful for configuring different timeout values for each subscription, especially when connecting to multiple publisher servers, to improve failure detection. Bump catalog version.
1 parent f92ece4 commit 24ad7e2

File tree

12 files changed

+228
-84
lines changed

12 files changed

+228
-84
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8118,6 +8118,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
81188118
</para></entry>
81198119
</row>
81208120

8121+
<row>
8122+
<entry role="catalog_table_entry"><para role="column_definition">
8123+
<structfield>subwalrcvtimeout</structfield> <type>text</type>
8124+
</para>
8125+
<para>
8126+
The <varname>wal_receiver_timeout</varname>
8127+
setting for the subscription's workers to use
8128+
</para></entry>
8129+
</row>
8130+
81218131
<row>
81228132
<entry role="catalog_table_entry"><para role="column_definition">
81238133
<structfield>subpublications</structfield> <type>text[]</type>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
235235
<link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
236236
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
237237
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
238-
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
239-
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
238+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
239+
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
240+
<link linkend="sql-createsubscription-params-with-wal-receiver-timeout"><literal>wal_receiver_timeout</literal></link>.
240241
Only a superuser can set <literal>password_required = false</literal>.
241242
</para>
242243

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
435435
</para>
436436
</listitem>
437437
</varlistentry>
438-
</variablelist></para>
439438

439+
<varlistentry id="sql-createsubscription-params-with-wal-receiver-timeout">
440+
<term><literal>wal_receiver_timeout</literal> (<type>text</type>)</term>
441+
<listitem>
442+
<para>
443+
The value of this parameter overrides the
444+
<xref linkend="guc-wal-receiver-timeout"/> setting within this
445+
subscription's apply worker processes. The default value is
446+
<literal>-1</literal>, which means it does not override the global setting,
447+
i.e., the value from the server configuration, command line, role or
448+
database settings will be used instead.
449+
</para>
450+
</listitem>
451+
</varlistentry>
452+
</variablelist></para>
440453
</listitem>
441454
</varlistentry>
442455
</variablelist>

src/backend/catalog/pg_subscription.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ GetSubscription(Oid subid, bool missing_ok)
126126
Anum_pg_subscription_subsynccommit);
127127
sub->synccommit = TextDatumGetCString(datum);
128128

129+
/* Get walrcvtimeout */
130+
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
131+
tup,
132+
Anum_pg_subscription_subwalrcvtimeout);
133+
sub->walrcvtimeout = TextDatumGetCString(datum);
134+
129135
/* Get publications */
130136
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
131137
tup,

src/backend/commands/subscriptioncmds.c

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
#define SUBOPT_FAILOVER 0x00002000
7474
#define SUBOPT_LSN 0x00004000
7575
#define SUBOPT_ORIGIN 0x00008000
76+
#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
7677

7778
/* check if the 'val' has 'bits' set */
7879
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -100,6 +101,7 @@ typedef struct SubOpts
100101
bool failover;
101102
char *origin;
102103
XLogRecPtr lsn;
104+
char *wal_receiver_timeout;
103105
} SubOpts;
104106

105107
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -357,6 +359,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
357359
opts->specified_opts |= SUBOPT_LSN;
358360
opts->lsn = lsn;
359361
}
362+
else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
363+
strcmp(defel->defname, "wal_receiver_timeout") == 0)
364+
{
365+
bool parsed;
366+
int val;
367+
368+
if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
369+
errorConflictingDefElem(defel, pstate);
370+
371+
opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
372+
opts->wal_receiver_timeout = defGetString(defel);
373+
374+
/*
375+
* Test if the given value is valid for wal_receiver_timeeout GUC.
376+
* Skip this test if the value is -1, since -1 is allowed for the
377+
* wal_receiver_timeout subscription option, but not for the GUC
378+
* itself.
379+
*/
380+
parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
381+
if (!parsed || val != -1)
382+
(void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
383+
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
384+
false, 0, false);
385+
}
360386
else
361387
ereport(ERROR,
362388
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -563,7 +589,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
563589
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
564590
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
565591
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
566-
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
592+
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN |
593+
SUBOPT_WAL_RECEIVER_TIMEOUT);
567594
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
568595

569596
/*
@@ -638,6 +665,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
638665
if (opts.synchronous_commit == NULL)
639666
opts.synchronous_commit = "off";
640667

668+
/*
669+
* The default for wal_receiver_timeout of subscriptions is -1, which
670+
* means the value is inherited from the server configuration, command
671+
* line, or role/database settings.
672+
*/
673+
if (opts.wal_receiver_timeout == NULL)
674+
opts.wal_receiver_timeout = "-1";
675+
641676
conninfo = stmt->conninfo;
642677
publications = stmt->publication;
643678

@@ -679,6 +714,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
679714
nulls[Anum_pg_subscription_subslotname - 1] = true;
680715
values[Anum_pg_subscription_subsynccommit - 1] =
681716
CStringGetTextDatum(opts.synchronous_commit);
717+
values[Anum_pg_subscription_subwalrcvtimeout - 1] =
718+
CStringGetTextDatum(opts.wal_receiver_timeout);
682719
values[Anum_pg_subscription_subpublications - 1] =
683720
publicationListToArray(publications);
684721
values[Anum_pg_subscription_suborigin - 1] =
@@ -1165,7 +1202,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
11651202
SUBOPT_DISABLE_ON_ERR |
11661203
SUBOPT_PASSWORD_REQUIRED |
11671204
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1168-
SUBOPT_ORIGIN);
1205+
SUBOPT_ORIGIN | SUBOPT_WAL_RECEIVER_TIMEOUT);
11691206

11701207
parse_subscription_options(pstate, stmt->options,
11711208
supported_opts, &opts);
@@ -1332,6 +1369,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
13321369
replaces[Anum_pg_subscription_suborigin - 1] = true;
13331370
}
13341371

1372+
if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1373+
{
1374+
values[Anum_pg_subscription_subwalrcvtimeout - 1] =
1375+
CStringGetTextDatum(opts.wal_receiver_timeout);
1376+
replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
1377+
}
1378+
13351379
update_tuple = true;
13361380
break;
13371381
}

src/backend/replication/logical/worker.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void);
414414
static TransApplyAction get_transaction_apply_action(TransactionId xid,
415415
ParallelApplyWorkerInfo **winfo);
416416

417+
static void set_wal_receiver_timeout(void);
418+
417419
static void replorigin_reset(int code, Datum arg);
418420

419421
/*
@@ -4084,12 +4086,46 @@ maybe_reread_subscription(void)
40844086
SetConfigOption("synchronous_commit", MySubscription->synccommit,
40854087
PGC_BACKEND, PGC_S_OVERRIDE);
40864088

4089+
/* Change wal_receiver_timeout according to the user's wishes */
4090+
set_wal_receiver_timeout();
4091+
40874092
if (started_tx)
40884093
CommitTransactionCommand();
40894094

40904095
MySubscriptionValid = true;
40914096
}
40924097

4098+
/*
4099+
* Change wal_receiver_timeout to MySubscription->walrcvtimeout.
4100+
*/
4101+
static void
4102+
set_wal_receiver_timeout(void)
4103+
{
4104+
bool parsed;
4105+
int val;
4106+
4107+
/*
4108+
* Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
4109+
* which comes from the subscription's wal_receiver_timeout option. If the
4110+
* value is -1, reset the GUC to its default, meaning it will inherit from
4111+
* the server config, command line, or role/database settings.
4112+
*/
4113+
parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
4114+
if (parsed && val == -1)
4115+
SetConfigOption("wal_receiver_timeout", NULL,
4116+
PGC_BACKEND, PGC_S_SESSION);
4117+
else
4118+
SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
4119+
PGC_BACKEND, PGC_S_SESSION);
4120+
4121+
/*
4122+
* Log the current wal_receiver_timeout GUC value (in milliseconds) as a
4123+
* debug message to verify it was set correctly.
4124+
*/
4125+
elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
4126+
MySubscription->name, wal_receiver_timeout);
4127+
}
4128+
40934129
/*
40944130
* Callback from subscription syscache invalidation.
40954131
*/
@@ -4711,6 +4747,9 @@ InitializeLogRepWorker(void)
47114747
SetConfigOption("synchronous_commit", MySubscription->synccommit,
47124748
PGC_BACKEND, PGC_S_OVERRIDE);
47134749

4750+
/* Change wal_receiver_timeout according to the user's wishes */
4751+
set_wal_receiver_timeout();
4752+
47144753
/*
47154754
* Keep us informed about subscription or role changes. Note that the
47164755
* role's superuser privilege can be revoked.

src/bin/pg_dump/pg_dump.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4955,6 +4955,7 @@ getSubscriptions(Archive *fout)
49554955
int i_subconninfo;
49564956
int i_subslotname;
49574957
int i_subsynccommit;
4958+
int i_subwalrcvtimeout;
49584959
int i_subpublications;
49594960
int i_suborigin;
49604961
int i_suboriginremotelsn;
@@ -5032,10 +5033,16 @@ getSubscriptions(Archive *fout)
50325033

50335034
if (fout->remoteVersion >= 170000)
50345035
appendPQExpBufferStr(query,
5035-
" s.subfailover\n");
5036+
" s.subfailover,\n");
50365037
else
50375038
appendPQExpBufferStr(query,
5038-
" false AS subfailover\n");
5039+
" false AS subfailover,\n");
5040+
5041+
/* 180000 should be changed to 190000 */
5042+
if (fout->remoteVersion >= 180000)
5043+
appendPQExpBufferStr(query, " s.subwalrcvtimeout\n");
5044+
else
5045+
appendPQExpBufferStr(query, " '-1' AS subwalrcvtimeout\n");
50395046

50405047
appendPQExpBufferStr(query,
50415048
"FROM pg_subscription s\n");
@@ -5072,6 +5079,7 @@ getSubscriptions(Archive *fout)
50725079
i_subconninfo = PQfnumber(res, "subconninfo");
50735080
i_subslotname = PQfnumber(res, "subslotname");
50745081
i_subsynccommit = PQfnumber(res, "subsynccommit");
5082+
i_subwalrcvtimeout = PQfnumber(res, "subwalrcvtimeout");
50755083
i_subpublications = PQfnumber(res, "subpublications");
50765084
i_suborigin = PQfnumber(res, "suborigin");
50775085
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
@@ -5111,6 +5119,8 @@ getSubscriptions(Archive *fout)
51115119
pg_strdup(PQgetvalue(res, i, i_subslotname));
51125120
subinfo[i].subsynccommit =
51135121
pg_strdup(PQgetvalue(res, i, i_subsynccommit));
5122+
subinfo[i].subwalrcvtimeout =
5123+
pg_strdup(PQgetvalue(res, i, i_subwalrcvtimeout));
51145124
subinfo[i].subpublications =
51155125
pg_strdup(PQgetvalue(res, i, i_subpublications));
51165126
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
@@ -5363,6 +5373,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
53635373
if (strcmp(subinfo->subsynccommit, "off") != 0)
53645374
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
53655375

5376+
if (strcmp(subinfo->subwalrcvtimeout, "-1") != 0)
5377+
appendPQExpBuffer(query, ", wal_receiver_timeout = %s", fmtId(subinfo->subwalrcvtimeout));
5378+
53665379
if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
53675380
appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
53685381

src/bin/pg_dump/pg_dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ typedef struct _SubscriptionInfo
711711
char *subconninfo;
712712
char *subslotname;
713713
char *subsynccommit;
714+
char *subwalrcvtimeout;
714715
char *subpublications;
715716
char *suborigin;
716717
char *suboriginremotelsn;

src/bin/psql/describe.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6745,7 +6745,7 @@ describeSubscriptions(const char *pattern, bool verbose)
67456745
printQueryOpt myopt = pset.popt;
67466746
static const bool translate_columns[] = {false, false, false, false,
67476747
false, false, false, false, false, false, false, false, false, false,
6748-
false};
6748+
false, false};
67496749

67506750
if (pset.sversion < 100000)
67516751
{
@@ -6820,6 +6820,12 @@ describeSubscriptions(const char *pattern, bool verbose)
68206820
gettext_noop("Synchronous commit"),
68216821
gettext_noop("Conninfo"));
68226822

6823+
/* 180000 should be changed to 190000 */
6824+
if (pset.sversion >= 180000)
6825+
appendPQExpBuffer(&buf,
6826+
", subwalrcvtimeout AS \"%s\"\n",
6827+
gettext_noop("Receiver timeout"));
6828+
68236829
/* Skip LSN is only supported in v15 and higher */
68246830
if (pset.sversion >= 150000)
68256831
appendPQExpBuffer(&buf,

src/include/catalog/pg_subscription.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
8888
/* Synchronous commit setting for worker */
8989
text subsynccommit BKI_FORCE_NOT_NULL;
9090

91+
/* wal_receiver_timeout setting for worker */
92+
text subwalrcvtimeout BKI_FORCE_NOT_NULL;
93+
9194
/* List of publications subscribed to */
9295
text subpublications[1] BKI_FORCE_NOT_NULL;
9396

@@ -134,6 +137,7 @@ typedef struct Subscription
134137
char *conninfo; /* Connection string to the publisher */
135138
char *slotname; /* Name of the replication slot */
136139
char *synccommit; /* Synchronous commit setting for worker */
140+
char *walrcvtimeout; /* wal_receiver_timeout setting for worker */
137141
List *publications; /* List of publication names to subscribe to */
138142
char *origin; /* Only publish data originating from the
139143
* specified origin */

0 commit comments

Comments
 (0)