Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2af2bd8

Browse files
Amit Kapilapull[bot]
Amit Kapila
authored andcommitted
Allow users to skip logical replication of data having origin.
This patch adds a new SUBSCRIPTION parameter "origin". It specifies whether the subscription will request the publisher to only send changes that don't have an origin or send changes regardless of origin. Setting it to "none" means that the subscription will request the publisher to only send changes that have no origin associated. Setting it to "any" means that the publisher sends changes regardless of their origin. The default is "any". Usage: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999' PUBLICATION pub1 WITH (origin = none); This can be used to avoid loops (infinite replication of the same data) among replication nodes. This feature allows filtering only the replication data originating from WAL but for initial sync (initial copy of table data) we don't have such a facility as we can only distinguish the data based on origin from WAL. As a follow-up patch, we are planning to forbid the initial sync if the origin is specified as none and we notice that the publication tables were also replicated from other publishers to avoid duplicate data or loops. We forbid to allow creating origin with names 'none' and 'any' to avoid confusion with the same name options. Author: Vignesh C, Amit Kapila Reviewed-By: Peter Smith, Amit Kapila, Dilip Kumar, Shi yu, Ashutosh Bapat, Hayato Kuroda Discussion: https://postgr.es/m/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com
1 parent d2dceab commit 2af2bd8

File tree

24 files changed

+463
-78
lines changed

24 files changed

+463
-78
lines changed

contrib/test_decoding/expected/replorigin.out

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5656

5757
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5858
ERROR: replication origin "regress_test_decoding: temp" does not exist
59+
-- specifying reserved origin names is not supported
60+
SELECT pg_replication_origin_create('any');
61+
ERROR: replication origin name "any" is reserved
62+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
63+
SELECT pg_replication_origin_create('none');
64+
ERROR: replication origin name "none" is reserved
65+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
66+
SELECT pg_replication_origin_create('pg_replication_origin');
67+
ERROR: replication origin name "pg_replication_origin" is reserved
68+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
5969
-- various failure checks for undefined slots
6070
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
6171
ERROR: replication origin "regress_test_decoding: temp" does not exist

contrib/test_decoding/sql/replorigin.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp');
3131
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3232
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3333

34+
-- specifying reserved origin names is not supported
35+
SELECT pg_replication_origin_create('any');
36+
SELECT pg_replication_origin_create('none');
37+
SELECT pg_replication_origin_create('pg_replication_origin');
38+
3439
-- various failure checks for undefined slots
3540
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
3641
select pg_replication_origin_session_setup('regress_test_decoding: temp');

doc/src/sgml/catalogs.sgml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
79437943
see <xref linkend="logical-replication-publication"/>.
79447944
</para></entry>
79457945
</row>
7946+
7947+
<row>
7948+
<entry role="catalog_table_entry"><para role="column_definition">
7949+
<structfield>suborigin</structfield> <type>text</type>
7950+
</para>
7951+
<para>
7952+
The origin value must be either <literal>none</literal> or
7953+
<literal>any</literal>. The default is <literal>any</literal>.
7954+
If <literal>none</literal>, the subscription will request the publisher
7955+
to only send changes that don't have an origin. If
7956+
<literal>any</literal>, the publisher sends changes regardless of their
7957+
origin.
7958+
</para></entry>
7959+
</row>
79467960
</tbody>
79477961
</tgroup>
79487962
</table>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
207207
information. The parameters that can be altered
208208
are <literal>slot_name</literal>,
209209
<literal>synchronous_commit</literal>,
210-
<literal>binary</literal>, <literal>streaming</literal>, and
211-
<literal>disable_on_error</literal>.
210+
<literal>binary</literal>, <literal>streaming</literal>,
211+
<literal>disable_on_error</literal>, and
212+
<literal>origin</literal>.
212213
</para>
213214
</listitem>
214215
</varlistentry>

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
302302
</para>
303303
</listitem>
304304
</varlistentry>
305+
306+
<varlistentry>
307+
<term><literal>origin</literal> (<type>string</type>)</term>
308+
<listitem>
309+
<para>
310+
Specifies whether the subscription will request the publisher to only
311+
send changes that don't have an origin or send changes regardless of
312+
origin. Setting <literal>origin</literal> to <literal>none</literal>
313+
means that the subscription will request the publisher to only send
314+
changes that don't have an origin. Setting <literal>origin</literal>
315+
to <literal>any</literal> means that the publisher sends changes
316+
regardless of their origin. The default is <literal>any</literal>.
317+
</para>
318+
</listitem>
319+
</varlistentry>
305320
</variablelist></para>
306321

307322
</listitem>

src/backend/catalog/pg_subscription.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok)
106106
Assert(!isnull);
107107
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
108108

109+
/* Get origin */
110+
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
111+
tup,
112+
Anum_pg_subscription_suborigin,
113+
&isnull);
114+
Assert(!isnull);
115+
sub->origin = TextDatumGetCString(datum);
116+
109117
ReleaseSysCache(tup);
110118

111119
return sub;

src/backend/catalog/system_views.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12981298
-- All columns of pg_subscription except subconninfo are publicly readable.
12991299
REVOKE ALL ON pg_subscription FROM public;
13001300
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
1301-
subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
1302-
subsynccommit, subpublications)
1301+
subbinary, substream, subtwophasestate, subdisableonerr,
1302+
subslotname, subsynccommit, subpublications, suborigin)
13031303
ON pg_subscription TO public;
13041304

13051305
CREATE VIEW pg_stat_subscription_stats AS

src/backend/commands/subscriptioncmds.c

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
6565
#define SUBOPT_DISABLE_ON_ERR 0x00000400
6666
#define SUBOPT_LSN 0x00000800
67+
#define SUBOPT_ORIGIN 0x00001000
6768

6869
/* check if the 'val' has 'bits' set */
6970
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
8687
bool streaming;
8788
bool twophase;
8889
bool disableonerr;
90+
char *origin;
8991
XLogRecPtr lsn;
9092
} SubOpts;
9193

@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
118120
IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
119121
SUBOPT_COPY_DATA));
120122

121-
/* Set default values for the boolean supported options. */
123+
/* Set default values for the supported options. */
122124
if (IsSet(supported_opts, SUBOPT_CONNECT))
123125
opts->connect = true;
124126
if (IsSet(supported_opts, SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
137139
opts->twophase = false;
138140
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
139141
opts->disableonerr = false;
142+
if (IsSet(supported_opts, SUBOPT_ORIGIN))
143+
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
140144

141145
/* Parse options */
142146
foreach(lc, stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
265269
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
266270
opts->disableonerr = defGetBoolean(defel);
267271
}
272+
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
273+
strcmp(defel->defname, "origin") == 0)
274+
{
275+
if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
276+
errorConflictingDefElem(defel, pstate);
277+
278+
opts->specified_opts |= SUBOPT_ORIGIN;
279+
pfree(opts->origin);
280+
281+
/*
282+
* Even though the "origin" parameter allows only "none" and "any"
283+
* values, it is implemented as a string type so that the
284+
* parameter can be extended in future versions to support
285+
* filtering using origin names specified by the user.
286+
*/
287+
opts->origin = defGetString(defel);
288+
289+
if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
290+
(pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
291+
ereport(ERROR,
292+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293+
errmsg("unrecognized origin value: \"%s\"", opts->origin));
294+
}
268295
else if (IsSet(supported_opts, SUBOPT_LSN) &&
269296
strcmp(defel->defname, "lsn") == 0)
270297
{
@@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
530557
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
531558
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
532559
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
533-
SUBOPT_DISABLE_ON_ERR);
560+
SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
534561
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
535562

536563
/*
@@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
617644
CStringGetTextDatum(opts.synchronous_commit);
618645
values[Anum_pg_subscription_subpublications - 1] =
619646
publicationListToArray(publications);
647+
values[Anum_pg_subscription_suborigin - 1] =
648+
CStringGetTextDatum(opts.origin);
620649

621650
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
622651

@@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10141043
{
10151044
supported_opts = (SUBOPT_SLOT_NAME |
10161045
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1017-
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
1046+
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
1047+
SUBOPT_ORIGIN);
10181048

10191049
parse_subscription_options(pstate, stmt->options,
10201050
supported_opts, &opts);
@@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10711101
= true;
10721102
}
10731103

1104+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1105+
{
1106+
values[Anum_pg_subscription_suborigin - 1] =
1107+
CStringGetTextDatum(opts.origin);
1108+
replaces[Anum_pg_subscription_suborigin - 1] = true;
1109+
}
1110+
10741111
update_tuple = true;
10751112
break;
10761113
}

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
451451
PQserverVersion(conn->streamConn) >= 150000)
452452
appendStringInfoString(&cmd, ", two_phase 'on'");
453453

454+
if (options->proto.logical.origin &&
455+
PQserverVersion(conn->streamConn) >= 160000)
456+
appendStringInfo(&cmd, ", origin '%s'",
457+
options->proto.logical.origin);
458+
454459
pubnames = options->proto.logical.publication_names;
455460
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
456461
if (!pubnames_str)

src/backend/replication/logical/origin.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
#include "access/xloginsert.h"
7878
#include "catalog/catalog.h"
7979
#include "catalog/indexing.h"
80+
#include "catalog/pg_subscription.h"
8081
#include "funcapi.h"
8182
#include "miscadmin.h"
8283
#include "nodes/execnodes.h"
@@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
195196
}
196197

197198

199+
/*
200+
* IsReservedOriginName
201+
* True iff name is either "none" or "any".
202+
*/
203+
static bool
204+
IsReservedOriginName(const char *name)
205+
{
206+
return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
207+
(pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
208+
}
209+
198210
/* ---------------------------------------------------------------------------
199211
* Functions for working with replication origins themselves.
200212
* ---------------------------------------------------------------------------
@@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
12441256

12451257
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
12461258

1247-
/* Replication origins "pg_xxx" are reserved for internal use */
1248-
if (IsReservedName(name))
1259+
/*
1260+
* Replication origins "any and "none" are reserved for system options.
1261+
* The origins "pg_xxx" are reserved for internal use.
1262+
*/
1263+
if (IsReservedName(name) || IsReservedOriginName(name))
12491264
ereport(ERROR,
12501265
(errcode(ERRCODE_RESERVED_NAME),
12511266
errmsg("replication origin name \"%s\" is reserved",
12521267
name),
1253-
errdetail("Origin names starting with \"pg_\" are reserved.")));
1268+
errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1269+
LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
12541270

12551271
/*
12561272
* If built with appropriate switch, whine when regression-testing

src/backend/replication/logical/worker.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
30773077
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
30783078
newsub->binary != MySubscription->binary ||
30793079
newsub->stream != MySubscription->stream ||
3080+
strcmp(newsub->origin, MySubscription->origin) != 0 ||
30803081
newsub->owner != MySubscription->owner ||
30813082
!equal(newsub->publications, MySubscription->publications))
30823083
{
@@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
37583759
options.proto.logical.binary = MySubscription->binary;
37593760
options.proto.logical.streaming = MySubscription->stream;
37603761
options.proto.logical.twophase = false;
3762+
options.proto.logical.origin = pstrdup(MySubscription->origin);
37613763

37623764
if (!am_tablesync_worker())
37633765
{

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "catalog/partition.h"
1717
#include "catalog/pg_publication.h"
1818
#include "catalog/pg_publication_rel.h"
19+
#include "catalog/pg_subscription.h"
1920
#include "commands/defrem.h"
2021
#include "executor/executor.h"
2122
#include "fmgr.h"
@@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
7980

8081
static bool publications_valid;
8182
static bool in_streaming;
83+
static bool publish_no_origin;
8284

8385
static List *LoadPublications(List *pubnames);
8486
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data)
285287
bool messages_option_given = false;
286288
bool streaming_given = false;
287289
bool two_phase_option_given = false;
290+
bool origin_option_given = false;
288291

289292
data->binary = false;
290293
data->streaming = false;
@@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data)
378381

379382
data->two_phase = defGetBoolean(defel);
380383
}
384+
else if (strcmp(defel->defname, "origin") == 0)
385+
{
386+
if (origin_option_given)
387+
ereport(ERROR,
388+
errcode(ERRCODE_SYNTAX_ERROR),
389+
errmsg("conflicting or redundant options"));
390+
origin_option_given = true;
391+
392+
data->origin = defGetString(defel);
393+
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
394+
publish_no_origin = true;
395+
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
396+
publish_no_origin = false;
397+
else
398+
ereport(ERROR,
399+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
400+
errmsg("unrecognized origin value: \"%s\"", data->origin));
401+
}
381402
else
382403
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
383404
}
@@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16961717
}
16971718

16981719
/*
1699-
* Currently we always forward.
1720+
* Return true if the data is associated with an origin and the user has
1721+
* requested the changes that don't have an origin, false otherwise.
17001722
*/
17011723
static bool
17021724
pgoutput_origin_filter(LogicalDecodingContext *ctx,
17031725
RepOriginId origin_id)
17041726
{
1727+
if (publish_no_origin && origin_id != InvalidRepOriginId)
1728+
return true;
1729+
17051730
return false;
17061731
}
17071732

0 commit comments

Comments
 (0)