Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 887227a

Browse files
committed
Add option to modify sync commit per subscription
This also changes default behaviour of subscription workers to synchronous_commit = off. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parent 25371a7 commit 887227a

File tree

13 files changed

+145
-33
lines changed

13 files changed

+145
-33
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6530,6 +6530,16 @@
65306530
<entry>If true, the subscription is enabled and should be replicating.</entry>
65316531
</row>
65326532

6533+
<row>
6534+
<entry><structfield>subsynccommit</structfield></entry>
6535+
<entry><type>text</type></entry>
6536+
<entry></entry>
6537+
<entry>
6538+
Contains the value of the <varname>synchronous_commit</varname>
6539+
setting for the subscription workers.
6540+
</entry>
6541+
</row>
6542+
65336543
<row>
65346544
<entry><structfield>subconninfo</structfield></entry>
65356545
<entry><type>text</type></entry>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep
2626
<phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>
2727

2828
SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
29+
| SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
2930

3031
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH }
3132
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
@@ -91,6 +92,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
9192
<varlistentry>
9293
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
9394
<term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term>
95+
<term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
9496
<listitem>
9597
<para>
9698
These clauses alter properties originally set by

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
3232
| CREATE SLOT | NOCREATE SLOT
3333
| SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
3434
| COPY DATA | NOCOPY DATA
35+
| SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
3536
| NOCONNECT
3637
</synopsis>
3738
</refsynopsisdiv>
@@ -147,6 +148,36 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
147148
</listitem>
148149
</varlistentry>
149150

151+
<varlistentry>
152+
<term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
153+
<listitem>
154+
<para>
155+
The value of this parameter overrides the
156+
<xref linkend="guc-synchronous-commit"> setting. The default value is
157+
<literal>off</literal>.
158+
</para>
159+
160+
<para>
161+
It is safe to use <literal>off</literal> for logical replication: If the
162+
subscriber loses transactions because of missing synchronization, the
163+
data will be resent from the publisher.
164+
</para>
165+
166+
<para>
167+
A different setting might be appropriate when doing synchronous logical
168+
replication. The logical replication workers report the positions of
169+
writes and flushes to the publisher, and when using synchronous
170+
replication, the publisher will wait for the actual flush. This means
171+
that setting <literal>SYNCHRONOUS_COMMIT</literal> for the subscriber
172+
to <literal>off</literal> when the subscription is used for synchronous
173+
replication might increase the latency for <command>COMMIT</command> on
174+
the publisher. In this scenario, it can be advantageous to set
175+
<literal>SYNCHRONOUS_COMMIT</literal> to <literal>local</literal> or
176+
higher.
177+
</para>
178+
</listitem>
179+
</varlistentry>
180+
150181
<varlistentry>
151182
<term><literal>NOCONNECT</literal></term>
152183
<listitem>

src/backend/catalog/pg_subscription.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok)
8585
Assert(!isnull);
8686
sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
8787

88+
/* Get synccommit */
89+
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
90+
tup,
91+
Anum_pg_subscription_subsynccommit,
92+
&isnull);
93+
Assert(!isnull);
94+
sub->synccommit = TextDatumGetCString(datum);
95+
8896
/* Get publications */
8997
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
9098
tup,

src/backend/commands/subscriptioncmds.c

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "storage/lmgr.h"
4545

4646
#include "utils/builtins.h"
47+
#include "utils/guc.h"
4748
#include "utils/lsyscache.h"
4849
#include "utils/memutils.h"
4950
#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
6061
static void
6162
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
6263
bool *enabled, bool *create_slot, char **slot_name,
63-
bool *copy_data)
64+
bool *copy_data, char **synchronous_commit)
6465
{
6566
ListCell *lc;
6667
bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
8081
*slot_name = NULL;
8182
if (copy_data)
8283
*copy_data = true;
84+
if (synchronous_commit)
85+
*synchronous_commit = NULL;
8386

8487
/* Parse options */
8588
foreach (lc, options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
165168
copy_data_given = true;
166169
*copy_data = !defGetBoolean(defel);
167170
}
171+
else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
172+
synchronous_commit)
173+
{
174+
if (*synchronous_commit)
175+
ereport(ERROR,
176+
(errcode(ERRCODE_SYNTAX_ERROR),
177+
errmsg("conflicting or redundant options")));
178+
179+
*synchronous_commit = defGetString(defel);
180+
181+
/* Test if the given value is valid for synchronous_commit GUC. */
182+
(void) set_config_option("synchronous_commit", *synchronous_commit,
183+
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
184+
false, 0, false);
185+
}
168186
else
169187
elog(ERROR, "unrecognized option: %s", defel->defname);
170188
}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
269287
bool enabled_given;
270288
bool enabled;
271289
bool copy_data;
290+
char *synchronous_commit;
272291
char *conninfo;
273292
char *slotname;
274293
char originname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
280299
* Connection and publication should not be specified here.
281300
*/
282301
parse_subscription_options(stmt->options, &connect, &enabled_given,
283-
&enabled, &create_slot, &slotname, &copy_data);
302+
&enabled, &create_slot, &slotname, &copy_data,
303+
&synchronous_commit);
284304

285305
/*
286306
* Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
311331

312332
if (slotname == NULL)
313333
slotname = stmt->subname;
334+
/* The default for synchronous_commit of subscriptions is off. */
335+
if (synchronous_commit == NULL)
336+
synchronous_commit = "off";
314337

315338
conninfo = stmt->conninfo;
316339
publications = stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
334357
CStringGetTextDatum(conninfo);
335358
values[Anum_pg_subscription_subslotname - 1] =
336359
DirectFunctionCall1(namein, CStringGetDatum(slotname));
360+
values[Anum_pg_subscription_subsynccommit - 1] =
361+
CStringGetTextDatum(synchronous_commit);
337362
values[Anum_pg_subscription_subpublications - 1] =
338363
publicationListToArray(publications);
339364

@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
582607
case ALTER_SUBSCRIPTION_OPTIONS:
583608
{
584609
char *slot_name;
610+
char *synchronous_commit;
585611

586612
parse_subscription_options(stmt->options, NULL, NULL, NULL,
587-
NULL, &slot_name, NULL);
613+
NULL, &slot_name, NULL,
614+
&synchronous_commit);
588615

589-
values[Anum_pg_subscription_subslotname - 1] =
590-
DirectFunctionCall1(namein, CStringGetDatum(slot_name));
591-
replaces[Anum_pg_subscription_subslotname - 1] = true;
616+
if (slot_name)
617+
{
618+
values[Anum_pg_subscription_subslotname - 1] =
619+
DirectFunctionCall1(namein, CStringGetDatum(slot_name));
620+
replaces[Anum_pg_subscription_subslotname - 1] = true;
621+
}
622+
if (synchronous_commit)
623+
{
624+
values[Anum_pg_subscription_subsynccommit - 1] =
625+
CStringGetTextDatum(synchronous_commit);
626+
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
627+
}
592628

593629
update_tuple = true;
594630
break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
601637

602638
parse_subscription_options(stmt->options, NULL,
603639
&enabled_given, &enabled, NULL,
604-
NULL, NULL);
640+
NULL, NULL, NULL);
605641
Assert(enabled_given);
606642

607643
values[Anum_pg_subscription_subenabled - 1] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
626662
Subscription *sub = GetSubscription(subid, false);
627663

628664
parse_subscription_options(stmt->options, NULL, NULL, NULL,
629-
NULL, NULL, &copy_data);
665+
NULL, NULL, &copy_data, NULL);
630666

631667
values[Anum_pg_subscription_subpublications - 1] =
632668
publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
652688
Subscription *sub = GetSubscription(subid, false);
653689

654690
parse_subscription_options(stmt->options, NULL, NULL, NULL,
655-
NULL, NULL, &copy_data);
691+
NULL, NULL, &copy_data, NULL);
656692

657693
AlterSubscription_refresh(sub, copy_data);
658694

src/backend/replication/logical/launcher.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,13 @@ get_subscription_list(void)
129129
*/
130130
oldcxt = MemoryContextSwitchTo(resultcxt);
131131

132-
sub = (Subscription *) palloc(sizeof(Subscription));
132+
sub = (Subscription *) palloc0(sizeof(Subscription));
133133
sub->oid = HeapTupleGetOid(tup);
134134
sub->dbid = subform->subdbid;
135135
sub->owner = subform->subowner;
136136
sub->enabled = subform->subenabled;
137137
sub->name = pstrdup(NameStr(subform->subname));
138-
139138
/* We don't fill fields we are not interested in. */
140-
sub->conninfo = NULL;
141-
sub->slotname = NULL;
142-
sub->publications = NIL;
143139

144140
res = lappend(res, sub);
145141
MemoryContextSwitchTo(oldcxt);

src/backend/replication/logical/worker.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,10 @@ reread_subscription(void)
14161416

14171417
MemoryContextSwitchTo(oldctx);
14181418

1419+
/* Change synchronous commit according to the user's wishes */
1420+
SetConfigOption("synchronous_commit", MySubscription->synccommit,
1421+
PGC_BACKEND, PGC_S_OVERRIDE);
1422+
14191423
if (started_tx)
14201424
CommitTransactionCommand();
14211425

@@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg)
14851489
MySubscriptionValid = true;
14861490
MemoryContextSwitchTo(oldctx);
14871491

1492+
/* Setup synchronous commit according to the user's wishes */
1493+
SetConfigOption("synchronous_commit", MySubscription->synccommit,
1494+
PGC_BACKEND, PGC_S_OVERRIDE);
1495+
14881496
if (!MySubscription->enabled)
14891497
{
14901498
ereport(LOG,

src/bin/pg_dump/pg_dump.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout)
36833683
int i_rolname;
36843684
int i_subconninfo;
36853685
int i_subslotname;
3686+
int i_subsynccommit;
36863687
int i_subpublications;
36873688
int i,
36883689
ntups;
@@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout)
37143715
appendPQExpBuffer(query,
37153716
"SELECT s.tableoid, s.oid, s.subname,"
37163717
"(%s s.subowner) AS rolname, "
3717-
" s.subconninfo, s.subslotname, s.subpublications "
3718+
" s.subconninfo, s.subslotname, s.subsynccommit, "
3719+
" s.subpublications "
37183720
"FROM pg_catalog.pg_subscription s "
37193721
"WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
37203722
" WHERE datname = current_database())",
@@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout)
37293731
i_rolname = PQfnumber(res, "rolname");
37303732
i_subconninfo = PQfnumber(res, "subconninfo");
37313733
i_subslotname = PQfnumber(res, "subslotname");
3734+
i_subsynccommit = PQfnumber(res, "subsynccommit");
37323735
i_subpublications = PQfnumber(res, "subpublications");
37333736

37343737
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout)
37443747
subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
37453748
subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
37463749
subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
3750+
subinfo[i].subsynccommit =
3751+
pg_strdup(PQgetvalue(res, i, i_subsynccommit));
37473752
subinfo[i].subpublications =
37483753
pg_strdup(PQgetvalue(res, i, i_subpublications));
37493754

@@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
38103815

38113816
appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data);
38123817
appendStringLiteralAH(query, subinfo->subslotname, fout);
3818+
3819+
if (strcmp(subinfo->subsynccommit, "off") != 0)
3820+
appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit));
3821+
38133822
appendPQExpBufferStr(query, ");\n");
38143823

38153824
appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name));

src/bin/pg_dump/pg_dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo
616616
char *rolname;
617617
char *subconninfo;
618618
char *subslotname;
619+
char *subsynccommit;
619620
char *subpublications;
620621
} SubscriptionInfo;
621622

src/bin/psql/describe.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose)
51995199
PQExpBufferData buf;
52005200
PGresult *res;
52015201
printQueryOpt myopt = pset.popt;
5202-
static const bool translate_columns[] = {false, false, false, false, false};
5202+
static const bool translate_columns[] = {false, false, false, false,
5203+
false, false};
52035204

52045205
if (pset.sversion < 100000)
52055206
{
@@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose)
52255226
if (verbose)
52265227
{
52275228
appendPQExpBuffer(&buf,
5229+
", subsynccommit AS \"%s\"\n"
52285230
", subconninfo AS \"%s\"\n",
5231+
gettext_noop("Synchronous commit"),
52295232
gettext_noop("Conninfo"));
52305233
}
52315234

src/include/catalog/pg_subscription.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
4343
#ifdef CATALOG_VARLEN /* variable-length fields start here */
4444
text subconninfo; /* Connection string to the publisher */
4545
NameData subslotname; /* Slot name on publisher */
46-
46+
text subsynccommit; /* Synchronous commit setting for worker */
4747
text subpublications[1]; /* List of publications subscribed to */
4848
#endif
4949
} FormData_pg_subscription;
@@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
5454
* compiler constants for pg_subscription
5555
* ----------------
5656
*/
57-
#define Natts_pg_subscription 7
57+
#define Natts_pg_subscription 8
5858
#define Anum_pg_subscription_subdbid 1
5959
#define Anum_pg_subscription_subname 2
6060
#define Anum_pg_subscription_subowner 3
6161
#define Anum_pg_subscription_subenabled 4
6262
#define Anum_pg_subscription_subconninfo 5
6363
#define Anum_pg_subscription_subslotname 6
64-
#define Anum_pg_subscription_subpublications 7
64+
#define Anum_pg_subscription_subsynccommit 7
65+
#define Anum_pg_subscription_subpublications 8
6566

6667

6768
typedef struct Subscription
@@ -73,6 +74,7 @@ typedef struct Subscription
7374
bool enabled; /* Indicates if the subscription is enabled */
7475
char *conninfo; /* Connection string to the publisher */
7576
char *slotname; /* Name of the replication slot */
77+
char *synccommit; /* Synchronous commit setting for worker */
7678
List *publications; /* List of publication names to subscribe to */
7779
} Subscription;
7880

0 commit comments

Comments
 (0)