Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d48d20b

Browse files
committed
Merge branch 'broadcast' of https://git.postgrespro.ru/a.sher/pg_shardman into broadcast
2 parents dbdf82d + 03fb53b commit d48d20b

File tree

2 files changed

+61
-32
lines changed

2 files changed

+61
-32
lines changed

pg_shardman--1.0.sql

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
-- List of nodes present in the cluster
1717
CREATE TABLE nodes (
1818
id serial PRIMARY KEY,
19+
super_connection_string text UNIQUE NOT NULL,
1920
connection_string text UNIQUE NOT NULL,
2021
replication_group text NOT NULL -- group of nodes within which shard replicas are allocated
2122
);
@@ -45,8 +46,17 @@ CREATE TABLE replicas (
4546

4647
-- Shardman interface functions
4748

48-
-- Add a node: adjust logical replication channels in replication group and create foreign servers
49-
CREATE FUNCTION add_node(conn_string text, repl_group text = 'all') RETURNS void AS $$
49+
-- Add a node: adjust logical replication channels in replication group and
50+
-- create foreign servers.
51+
-- 'super_conn_string' is connection string to the node which allows to login to
52+
-- the node as superuser, and 'conn_string' can be some other connstring.
53+
-- The former is used for configuring logical replication, the latter for DDL
54+
-- and for setting up FDW. This separation serves two purposes:
55+
-- * It allows to access data without requiring superuser priviliges;
56+
-- * It allows to set up pgbouncer, as replication can't go through it.
57+
-- If conn_string is null, super_conn_string is used everywhere.
58+
CREATE FUNCTION add_node(super_conn_string text, conn_string text = NULL,
59+
repl_group text = 'all') RETURNS void AS $$
5060
DECLARE
5161
new_node_id int;
5262
node shardman.nodes;
@@ -71,14 +81,18 @@ DECLARE
7181
fdw_part_name text;
7282
table_attrs text;
7383
srv_name text;
84+
conn_string_effective text = COALESCE(conn_string, super_conn_string);
7485
BEGIN
75-
IF shardman.redirect_to_shardlord(format('add_node(%L, %L)', conn_string, repl_group))
86+
IF shardman.redirect_to_shardlord(
87+
format('add_node(%L, %L, %L)', super_conn_string, conn_string, repl_group))
7688
THEN
7789
RETURN;
7890
END IF;
7991

8092
-- Insert new node in nodes table
81-
INSERT INTO shardman.nodes (connection_string,replication_group) VALUES (conn_string, repl_group) RETURNING id INTO new_node_id;
93+
INSERT INTO shardman.nodes (super_connection_string, connection_string, replication_group)
94+
VALUES (super_conn_string, conn_string_effective, repl_group)
95+
RETURNING id INTO new_node_id;
8296

8397
-- Adjust replication channels within replication group.
8498
-- We need all-to-all replication channels between all group members.
@@ -100,14 +114,14 @@ BEGIN
100114
subs := format('%s%s:CREATE SUBSCRIPTION sub_%s_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=''node_%s'', synchronous_commit=local);
101115
%s:CREATE SUBSCRIPTION sub_%s_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=''node_%s'', synchronous_commit=local);',
102116
subs,
103-
node.id, node.id, new_node_id, conn_string, node.id, node.id,
104-
new_node_id, new_node_id, node.id, node.connection_string, new_node_id, new_node_id);
117+
node.id, node.id, new_node_id, super_conn_string, node.id, node.id,
118+
new_node_id, new_node_id, node.id, node.super_connection_string, new_node_id, new_node_id);
105119
END LOOP;
106120

107121
-- Broadcast create publication commands
108-
PERFORM shardman.broadcast(pubs);
122+
PERFORM shardman.broadcast(pubs, super_connstr => true);
109123
-- Broadcast create subscription commands
110-
PERFORM shardman.broadcast(subs);
124+
PERFORM shardman.broadcast(subs, super_connstr => true);
111125

112126
-- In case of synchronous replication broadcast update synchronous standby list commands
113127
IF shardman.synchronous_replication() AND
@@ -125,8 +139,8 @@ BEGIN
125139
conf := format('%s%s:SELECT pg_reload_conf();', conf, node.id);
126140
END LOOP;
127141

128-
PERFORM shardman.broadcast(sync, sync_commit => true);
129-
PERFORM shardman.broadcast(conf);
142+
PERFORM shardman.broadcast(sync, sync_commit_on => true, super_connstr => true);
143+
PERFORM shardman.broadcast(conf, super_connstr => true);
130144
END IF;
131145

132146
-- Add foreign servers for connection to the new node and backward
@@ -251,18 +265,20 @@ BEGIN
251265
END LOOP;
252266

253267
-- Broadcast drop subscription commands, ignore errors because removed node may be not available
254-
PERFORM shardman.broadcast(subs, ignore_errors:=true, sync_commit => true);
268+
PERFORM shardman.broadcast(subs, ignore_errors:=true, sync_commit_on => true,
269+
super_connst => true);
255270
-- Broadcast drop replication commands
256-
PERFORM shardman.broadcast(pubs, ignore_errors:=true);
271+
PERFORM shardman.broadcast(pubs, ignore_errors:=true, super_connstr => true);
257272

258273
-- In case of synchronous replication update synchronous standbys list
259274
IF shardman.synchronous_replication()
260275
THEN
261276
-- On removed node, reset synchronous standbys list
262277
sync := format('%s%s:ALTER SYSTEM SET synchronous_standby_names to '''';',
263278
sync, rm_node_id, sync_standbys);
264-
PERFORM shardman.broadcast(sync, ignore_errors => true, sync_commit => true);
265-
PERFORM shardman.broadcast(conf, ignore_errors:=true);
279+
PERFORM shardman.broadcast(sync, ignore_errors => true,
280+
sync_commit_on => true, super_connstr => true);
281+
PERFORM shardman.broadcast(conf, ignore_errors:=true, super_connstr => true);
266282
END IF;
267283

268284
-- Remove foreign servers at all nodes for the removed node
@@ -305,9 +321,9 @@ BEGIN
305321
END LOOP;
306322

307323
-- Broadcast alter publication commands
308-
PERFORM shardman.broadcast(pubs);
324+
PERFORM shardman.broadcast(pubs, super_connstr => true);
309325
-- Broadcast refresh alter subscription commands
310-
PERFORM shardman.broadcast(subs);
326+
PERFORM shardman.broadcast(subs, super_connstr => true);
311327
ELSE -- there is no replica: we have to create new empty partition at random mode and redirect all FDWs to it
312328
SELECT id INTO new_master_id FROM shardman.nodes WHERE id<>rm_node_id ORDER BY random() LIMIT 1;
313329
INSERT INTO shardman.partitions (part_name,node_id,relation) VALUES (part.part.name,new_master_id,part.relation);
@@ -478,15 +494,16 @@ BEGIN
478494
END LOOP;
479495

480496
-- Broadcast alter publication commands
481-
PERFORM shardman.broadcast(pubs);
497+
PERFORM shardman.broadcast(pubs, super_connstr => true);
482498
-- Broadcast alter subscription commands
483-
PERFORM shardman.broadcast(subs, synchronous => copy_data);
499+
PERFORM shardman.broadcast(subs, synchronous => copy_data, super_connstr => true);
484500

485501
-- This function doesn't wait completion of replication sync
486502
END
487503
$$ LANGUAGE plpgsql;
488504

489-
-- Remove table from all nodes. All table partitions are removed.
505+
-- Remove table from all nodes. All table partitions are removed, but replicas
506+
-- and logical stuff not.
490507
CREATE FUNCTION rm_table(rel regclass)
491508
RETURNS void AS $$
492509
DECLARE
@@ -547,7 +564,7 @@ BEGIN
547564
END IF;
548565
src_node_id := part.node_id;
549566

550-
SELECT replication_group,connection_string INTO src_repl_group,conn_string FROM shardman.nodes WHERE id=src_node_id;
567+
SELECT replication_group, super_connection_string INTO src_repl_group, conn_string FROM shardman.nodes WHERE id=src_node_id;
551568
SELECT replication_group INTO dst_repl_group FROM shardman.nodes WHERE id=dst_node_id;
552569

553570
IF src_node_id = dst_node_id THEN
@@ -576,8 +593,8 @@ BEGIN
576593
dst_node_id, mv_part_name, conn_string, mv_part_name, mv_part_name);
577594

578595
-- Create publication and slot for copying
579-
PERFORM shardman.broadcast(pubs);
580-
PERFORM shardman.broadcast(subs);
596+
PERFORM shardman.broadcast(pubs, super_connstr => true);
597+
PERFORM shardman.broadcast(subs, super_connstr => true);
581598

582599
-- Wait completion of partition copy and prohibit access to this partition
583600
PERFORM shardman.wait_copy_completion(src_node_id, mv_part_name);
@@ -603,12 +620,12 @@ BEGIN
603620
END LOOP;
604621

605622
-- Broadcast alter publication commands
606-
PERFORM shardman.broadcast(pubs);
623+
PERFORM shardman.broadcast(pubs, super_connstr => true);
607624
-- Broadcast alter subscription commands
608-
PERFORM shardman.broadcast(subs);
625+
PERFORM shardman.broadcast(subs, super_connstr => true);
609626
-- Drop copy subscription
610627
PERFORM shardman.broadcast(format('%s:DROP SUBSCRIPTION copy_%s;',
611-
dst_node_id, mv_part_name), sync_commit:=true);
628+
dst_node_id, mv_part_name), sync_commit_on => true, super_connstr => true);
612629

613630
-- Update owner of this partition
614631
UPDATE shardman.partitions SET node_id=dst_node_id WHERE part_name=mv_part_name;
@@ -687,6 +704,7 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
687704
-- prefix: node-id:sql-statement;
688705
-- To run multiple statements on node, wrap them in {}:
689706
-- {node-id:statement; statement;}
707+
-- Node id '0' means shardlord, shardlord_connstring guc is used.
690708
-- Don't specify them separately with 2pc, we use only one prepared_xact name.
691709
-- No escaping is performed, so ';', '{' and '}' inside queries are not supported.
692710
-- By default functions throws error is execution is failed at some of the
@@ -699,12 +717,15 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
699717
-- If two_phase parameter is true, then each statement is wrapped in blocked and
700718
-- prepared with subsequent commit or rollback of prepared transaction at second
701719
-- phase of two phase commit.
702-
-- If sync_commit is false, we do set session synchronous_commit to local;
720+
-- If sync_commit_on is false, we set session synchronous_commit to local.
721+
-- If super_connstr is true, super connstring is used everywhere, usual
722+
-- connstr otherwise.
703723
CREATE FUNCTION broadcast(cmds text,
704724
ignore_errors bool = false,
705725
two_phase bool = false,
706-
sync_commit bool = false,
707-
synchronous bool = false)
726+
sync_commit_on bool = false,
727+
synchronous bool = false,
728+
super_connstr bool = false)
708729
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
709730

710731
-- Options to postgres_fdw are specified in two places: user & password in user

pg_shardman.c

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ broadcast(PG_FUNCTION_ARGS)
123123
char* sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
124124
bool ignore_errors = PG_GETARG_BOOL(1);
125125
bool two_phase = PG_GETARG_BOOL(2);
126-
bool sync_commit = PG_GETARG_BOOL(3);
126+
bool sync_commit_on = PG_GETARG_BOOL(3);
127127
bool sequential = PG_GETARG_BOOL(4);
128+
bool super_connstr = PG_GETARG_BOOL(5);
128129
char* sep;
129130
PGresult *res;
130131
char* fetch_node_connstr;
@@ -160,7 +161,14 @@ broadcast(PG_FUNCTION_ARGS)
160161
sql += n;
161162
if (node_id != 0)
162163
{
163-
fetch_node_connstr = psprintf("select connection_string from shardman.nodes where id=%d", node_id);
164+
if (super_connstr)
165+
fetch_node_connstr = psprintf(
166+
"select super_connection_string from shardman.nodes where id=%d",
167+
node_id);
168+
else
169+
fetch_node_connstr = psprintf(
170+
"select connection_string from shardman.nodes where id=%d",
171+
node_id);
164172
if (SPI_exec(fetch_node_connstr, 0) < 0 || SPI_processed != 1)
165173
{
166174
elog(ERROR, "SHARDMAN: Failed to fetch connection string for node %d",
@@ -193,7 +201,7 @@ broadcast(PG_FUNCTION_ARGS)
193201
PQerrorMessage(conn[n_cmds-1]));
194202
goto cleanup;
195203
}
196-
if (!sync_commit)
204+
if (!sync_commit_on)
197205
{
198206
if (two_phase)
199207
{
@@ -219,7 +227,7 @@ broadcast(PG_FUNCTION_ARGS)
219227
node_id, PQerrorMessage(conn[n_cmds-1]));
220228
goto cleanup;
221229
}
222-
if (!sync_commit)
230+
if (!sync_commit_on)
223231
pfree(sql);
224232

225233
sql = sep + 1;

0 commit comments

Comments
 (0)