Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5aa2350

Browse files
committed
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986a) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
1 parent c6e96a2 commit 5aa2350

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2766
-89
lines changed

contrib/test_decoding/Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ submake-isolation:
3737
submake-test_decoding:
3838
$(MAKE) -C $(top_builddir)/contrib/test_decoding
3939

40-
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
40+
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
41+
binary prepared replorigin
4142

4243
regresscheck: all | submake-regress submake-test_decoding temp-install
4344
$(MKDIR_P) regression_output
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
CREATE TABLE origin_tbl(id serial primary key, data text);
4+
CREATE TABLE target_tbl(id serial primary key, data text);
5+
SELECT pg_replication_origin_create('test_decoding: regression_slot');
6+
pg_replication_origin_create
7+
------------------------------
8+
1
9+
(1 row)
10+
11+
-- ensure duplicate creations fail
12+
SELECT pg_replication_origin_create('test_decoding: regression_slot');
13+
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
14+
DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
15+
--ensure deletions work (once)
16+
SELECT pg_replication_origin_create('test_decoding: temp');
17+
pg_replication_origin_create
18+
------------------------------
19+
2
20+
(1 row)
21+
22+
SELECT pg_replication_origin_drop('test_decoding: temp');
23+
pg_replication_origin_drop
24+
----------------------------
25+
26+
(1 row)
27+
28+
SELECT pg_replication_origin_drop('test_decoding: temp');
29+
ERROR: cache lookup failed for replication origin 'test_decoding: temp'
30+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
31+
?column?
32+
----------
33+
init
34+
(1 row)
35+
36+
-- origin tx
37+
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
38+
INSERT INTO target_tbl(data)
39+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
40+
-- as is normal, the insert into target_tbl shows up
41+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
42+
data
43+
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
44+
BEGIN
45+
table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
46+
table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
47+
table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
48+
COMMIT
49+
(5 rows)
50+
51+
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
52+
-- mark session as replaying
53+
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
54+
pg_replication_origin_session_setup
55+
-------------------------------------
56+
57+
(1 row)
58+
59+
-- ensure we prevent duplicate setup
60+
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
61+
ERROR: cannot setup replication origin when one is already setup
62+
BEGIN;
63+
-- setup transaction origin
64+
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
65+
pg_replication_origin_xact_setup
66+
----------------------------------
67+
68+
(1 row)
69+
70+
INSERT INTO target_tbl(data)
71+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
72+
COMMIT;
73+
-- check replication progress for the session is correct
74+
SELECT pg_replication_origin_session_progress(false);
75+
pg_replication_origin_session_progress
76+
----------------------------------------
77+
0/AABBCCDD
78+
(1 row)
79+
80+
SELECT pg_replication_origin_session_progress(true);
81+
pg_replication_origin_session_progress
82+
----------------------------------------
83+
0/AABBCCDD
84+
(1 row)
85+
86+
SELECT pg_replication_origin_session_reset();
87+
pg_replication_origin_session_reset
88+
-------------------------------------
89+
90+
(1 row)
91+
92+
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
93+
local_id | external_id | remote_lsn | ?column?
94+
----------+--------------------------------+------------+----------
95+
1 | test_decoding: regression_slot | 0/AABBCCDD | t
96+
(1 row)
97+
98+
-- check replication progress identified by name is correct
99+
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
100+
pg_replication_origin_progress
101+
--------------------------------
102+
0/AABBCCDD
103+
(1 row)
104+
105+
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
106+
pg_replication_origin_progress
107+
--------------------------------
108+
0/AABBCCDD
109+
(1 row)
110+
111+
-- ensure reset requires previously setup state
112+
SELECT pg_replication_origin_session_reset();
113+
ERROR: no replication origin is configured
114+
-- and magically the replayed xact will be filtered!
115+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
116+
data
117+
------
118+
(0 rows)
119+
120+
--but new original changes still show up
121+
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
122+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
123+
data
124+
--------------------------------------------------------------------------------
125+
BEGIN
126+
table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
127+
COMMIT
128+
(3 rows)
129+
130+
SELECT pg_drop_replication_slot('regression_slot');
131+
pg_drop_replication_slot
132+
--------------------------
133+
134+
(1 row)
135+
136+
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
137+
pg_replication_origin_drop
138+
----------------------------
139+
140+
(1 row)
141+
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
4+
CREATE TABLE origin_tbl(id serial primary key, data text);
5+
CREATE TABLE target_tbl(id serial primary key, data text);
6+
7+
SELECT pg_replication_origin_create('test_decoding: regression_slot');
8+
-- ensure duplicate creations fail
9+
SELECT pg_replication_origin_create('test_decoding: regression_slot');
10+
11+
--ensure deletions work (once)
12+
SELECT pg_replication_origin_create('test_decoding: temp');
13+
SELECT pg_replication_origin_drop('test_decoding: temp');
14+
SELECT pg_replication_origin_drop('test_decoding: temp');
15+
16+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
17+
18+
-- origin tx
19+
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
20+
INSERT INTO target_tbl(data)
21+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
22+
23+
-- as is normal, the insert into target_tbl shows up
24+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
25+
26+
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
27+
28+
-- mark session as replaying
29+
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
30+
31+
-- ensure we prevent duplicate setup
32+
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
33+
34+
BEGIN;
35+
-- setup transaction origin
36+
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
37+
INSERT INTO target_tbl(data)
38+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
39+
COMMIT;
40+
41+
-- check replication progress for the session is correct
42+
SELECT pg_replication_origin_session_progress(false);
43+
SELECT pg_replication_origin_session_progress(true);
44+
45+
SELECT pg_replication_origin_session_reset();
46+
47+
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
48+
49+
-- check replication progress identified by name is correct
50+
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
51+
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
52+
53+
-- ensure reset requires previously setup state
54+
SELECT pg_replication_origin_session_reset();
55+
56+
-- and magically the replayed xact will be filtered!
57+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
58+
59+
--but new original changes still show up
60+
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
61+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
62+
63+
SELECT pg_drop_replication_slot('regression_slot');
64+
SELECT pg_replication_origin_drop('test_decoding: regression_slot');

contrib/test_decoding/test_decoding.c

+28
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "replication/output_plugin.h"
2323
#include "replication/logical.h"
24+
#include "replication/origin.h"
2425

2526
#include "utils/builtins.h"
2627
#include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
4344
bool include_timestamp;
4445
bool skip_empty_xacts;
4546
bool xact_wrote_changes;
47+
bool only_local;
4648
} TestDecodingData;
4749

4850
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
5961
static void pg_decode_change(LogicalDecodingContext *ctx,
6062
ReorderBufferTXN *txn, Relation rel,
6163
ReorderBufferChange *change);
64+
static bool pg_decode_filter(LogicalDecodingContext *ctx,
65+
RepOriginId origin_id);
6266

6367
void
6468
_PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7680
cb->begin_cb = pg_decode_begin_txn;
7781
cb->change_cb = pg_decode_change;
7882
cb->commit_cb = pg_decode_commit_txn;
83+
cb->filter_by_origin_cb = pg_decode_filter;
7984
cb->shutdown_cb = pg_decode_shutdown;
8085
}
8186

@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
97102
data->include_xids = true;
98103
data->include_timestamp = false;
99104
data->skip_empty_xacts = false;
105+
data->only_local = false;
100106

101107
ctx->output_plugin_private = data;
102108

@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
155161
errmsg("could not parse value \"%s\" for parameter \"%s\"",
156162
strVal(elem->arg), elem->defname)));
157163
}
164+
else if (strcmp(elem->defname, "only-local") == 0)
165+
{
166+
167+
if (elem->arg == NULL)
168+
data->only_local = true;
169+
else if (!parse_bool(strVal(elem->arg), &data->only_local))
170+
ereport(ERROR,
171+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
172+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
173+
strVal(elem->arg), elem->defname)));
174+
}
158175
else
159176
{
160177
ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
223240
OutputPluginWrite(ctx, true);
224241
}
225242

243+
static bool
244+
pg_decode_filter(LogicalDecodingContext *ctx,
245+
RepOriginId origin_id)
246+
{
247+
TestDecodingData *data = ctx->output_plugin_private;
248+
249+
if (data->only_local && origin_id != InvalidRepOriginId)
250+
return true;
251+
return false;
252+
}
253+
226254
/*
227255
* Print literal `outputstr' already represented as string of type `typid'
228256
* into stringbuf `s'.

0 commit comments

Comments
 (0)