@@ -61,7 +61,7 @@ DECLARE
61
61
um_opts text ;
62
62
new_server_opts text ;
63
63
new_um_opts text ;
64
- sync_standbys text ;
64
+ sync_standbys text [] ;
65
65
shardlord_conn_string text ;
66
66
create_table text ;
67
67
create_tables text = ' ' ;
@@ -71,20 +71,17 @@ DECLARE
71
71
fdw_part_name text ;
72
72
table_attrs text ;
73
73
srv_name text ;
74
+ sync_standbyss text = ' haha' ;
74
75
BEGIN
75
76
-- Insert new node in nodes table
76
77
INSERT INTO shardman .nodes (connection_string,replication_group) VALUES (conn_string, repl_group) RETURNING id INTO new_node_id;
77
78
78
- -- Construct list of synchronous standbyes (subscriptions have name node_$ID)
79
- SELECT string_agg(' node_' || id, ' ,' ) INTO sync_standbys from shardman .nodes ;
80
- -- Construct foreign server options from connection string of new node
81
- SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (conn_string) INTO new_server_opts, new_um_opts;
82
-
83
79
-- Adjust replication channels within replication group.
84
80
-- We need all-to-all replication channels between all group members.
85
- FOR node IN SELECT * FROM shardman .nodes WHERE replication_group= repl_group AND id<> new_node_id
81
+ FOR node IN SELECT * FROM shardman .nodes WHERE replication_group = repl_group AND id <> new_node_id
86
82
LOOP
87
- -- Add to new node publications for all existed nodes and add publication for new node to all existed nodes
83
+ -- Add to new node publications for all existing nodes and add
84
+ -- publication for new node to all existing nodes
88
85
pubs := format(' %s%s:CREATE PUBLICATION node_%s;
89
86
%s:CREATE PUBLICATION node_%s;
90
87
%s:SELECT pg_create_logical_replication_slot(' ' node_%s' ' , ' ' pgoutput' ' );
@@ -93,17 +90,14 @@ BEGIN
93
90
new_node_id, node .id ,
94
91
node .id , new_node_id,
95
92
new_node_id, node .id );
96
- -- Add to new node subscriptions to existed nodes and add subscription to new node to all existed nodes
97
- subs := format(' %s%s:CREATE SUBSCRIPTION node_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=' ' node_%s' ' , synchronous_commit=local);
98
- %s:CREATE SUBSCRIPTION node_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=' ' node_%s' ' , synchronous_commit=local);' ,
99
- subs, node .id , new_node_id, conn_string, node .id , node .id ,
100
- new_node_id, node .id , node .connection_string , new_node_id, new_node_id);
101
-
102
- -- Adjust synchronous standby list at all nodes
103
- sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
104
- sync, node .id , sync_standbys);
105
- conf := format(' %s%s:SELECT pg_reload_conf();' ,
106
- conf, node .id );
93
+ -- Add to new node subscriptions to existing nodes and add subscription
94
+ -- to new node to all existing nodes
95
+ -- sub name is node_pubnodeid_subnodeid to avoid application_name collision
96
+ subs := format(' %s%s:CREATE SUBSCRIPTION node_%s_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=' ' node_%s' ' , synchronous_commit=local);
97
+ %s:CREATE SUBSCRIPTION node_%s_%s CONNECTION %L PUBLICATION node_%s with (create_slot=false, slot_name=' ' node_%s' ' , synchronous_commit=local);' ,
98
+ subs,
99
+ node .id , new_node_id, node .id , conn_string, node .id , node .id ,
100
+ new_node_id, node .id , new_node_id, node .connection_string , new_node_id, new_node_id);
107
101
END LOOP;
108
102
109
103
-- Broadcast create publication commands
@@ -112,19 +106,26 @@ BEGIN
112
106
PERFORM shardman .broadcast (subs);
113
107
114
108
-- In case of synchronous replication broadcast update synchronous standby list commands
115
- IF shardman .synchronous_replication ()
109
+ IF shardman .synchronous_replication () AND
110
+ (SELECT COUNT (* ) FROM shardman .nodes WHERE replication_group = repl_group) > 1
116
111
THEN
117
- -- Adjust synchronous standby list at new nodes
118
- sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
119
- sync, new_node_id, sync_stanbys);
120
- -- Reload configuration at new node
121
- conf := format(' %s%s:SELECT pg_reload_conf();' ,
122
- conf, new_node_id);
112
+ -- Take all nodes in replicationg group excluding myself
113
+ FOR node IN SELECT * FROM shardman .nodes WHERE replication_group = repl_group LOOP
114
+ sync_standbys :=
115
+ ARRAY(SELECT string_agg(format(' node_%s_%s' , node .id , id), ' ,' ) FROM shardman .nodes
116
+ WHERE replication_group = repl_group AND id <> node .id );
117
+ sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to ' ' FIRST %s (%s)' ' ;' ,
118
+ sync, node .id , array_length(sync_standbys, 1 ),
119
+ array_to_string(sync_standbys, ' ,' ));
120
+ conf := format(' %s%s:SELECT pg_reload_conf();' , conf, node .id );
121
+ END LOOP;
123
122
PERFORM shardman .broadcast (sync);
124
123
PERFORM shardman .broadcast (conf);
125
124
END IF;
126
125
127
126
-- Add foreign servers for connection to the new node and backward
127
+ -- Construct foreign server options from connection string of new node
128
+ SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (conn_string) INTO new_server_opts, new_um_opts;
128
129
FOR node IN SELECT * FROM shardman .nodes WHERE id<> new_node_id
129
130
LOOP
130
131
-- Construct foreign server options from connection string of this node
181
182
$$ LANGUAGE plpgsql;
182
183
183
184
184
- -- Remove node: try to choose alternative from one of replicas of this nodes, exclude node from replication channels and remove foreign servers.
185
- -- To remove node with existed partitions use force=true parameter
185
+ -- Remove node: try to choose alternative from one of replicas of this nodes,
186
+ -- exclude node from replication channels and remove foreign servers.
187
+ -- To remove node with existing partitions use force=true parameter.
186
188
CREATE FUNCTION rm_node (rm_node_id int , force bool = false) RETURNS void AS $$
187
189
DECLARE
188
190
node shardman .nodes ;
@@ -198,14 +200,14 @@ DECLARE
198
200
BEGIN
199
201
-- If it is not forced remove, check if there are no partitions at this node
200
202
IF NOT force THEN
201
- IF EXISTS (SELECT * FROM shardman .partitions WHERE node= rm_node_id)
203
+ IF EXISTS (SELECT * FROM shardman .partitions parts WHERE parts . node = rm_node_id)
202
204
THEN
203
205
RAISE EXCEPTION ' Use force=true to remove non-empty node' ;
204
206
END IF;
205
207
END IF;
206
208
207
209
-- Construct new synchronous standby list
208
- SELECT string_agg(' node_' || id, ' ,' ) INTO sync_standbys from shardman .nodes WHERE id<> rm_node_id;
210
+ SELECT string_agg(' node_' || id, ' ,' ) INTO sync_standbys from shardman .nodes WHERE id<> rm_node_id;
209
211
210
212
-- Remove all subscriptions and publications of this node
211
213
FOR node IN SELECT * FROM shardman .nodes WHERE replication_group= repl_group AND id<> rm_node_id
@@ -333,7 +335,7 @@ DECLARE
333
335
i int ;
334
336
n_nodes int ;
335
337
BEGIN
336
- IF EXISTS(SELECT relation from shardman .partitions where relation = rel_name)
338
+ IF EXISTS(SELECT relation FROM shardman .partitions WHERE relation = rel_name)
337
339
THEN
338
340
RAISE EXCEPTION ' Table % is already sharded' , rel_name;
339
341
END IF;
@@ -372,7 +374,7 @@ BEGIN
372
374
fdw_part_name := format(' %s_fdw' , part_name);
373
375
-- Insert information about new partition in partitions table
374
376
INSERT INTO shardman .partitions (part_name,node,relation) VALUES (part_name, node_id, rel_name);
375
- -- Construct name of the servers where partition will be located
377
+ -- Construct name of the server where partition will be located
376
378
srv_name := format(' node_%s' , node_id);
377
379
378
380
-- Replace local partition with foreign table at all nodes except owner
394
396
$$ LANGUAGE plpgsql;
395
397
396
398
-- Provide requested level of redundancy. 0 means no redundancy.
397
- -- If existed level of redundancy is greater than specified, then right now this function does nothing.
399
+ -- If existing level of redundancy is greater than specified, then right now this
400
+ -- function does nothing.
398
401
CREATE FUNCTION set_redundancy (rel regclass, redundancy int )
399
402
RETURNS void AS $$
400
403
DECLARE
@@ -406,7 +409,7 @@ DECLARE
406
409
subs text = ' ' ;
407
410
rel_name text = rel::text ;
408
411
BEGIN
409
- -- Loop though all partitions of this table
412
+ -- Loop through all partitions of this table
410
413
FOR part IN SELECT * from shardman .partitions where relation= rel_name
411
414
LOOP
412
415
-- Count number of replicas of this partition
@@ -425,8 +428,8 @@ BEGIN
425
428
-- Establish publications and subscriptions for this partition
426
429
pubs := format(' %s%s:ALTER PUBLICATION node_%s ADD TABLE %I;' ,
427
430
pubs, part .node , repl_node, part .part_name );
428
- subs := format(' %s%s:ALTER SUBSCRIPTION node_%s REFRESH PUBLICATION;' ,
429
- subs, repl_node, part .node );
431
+ subs := format(' %s%s:ALTER SUBSCRIPTION node_%s_% s REFRESH PUBLICATION;' ,
432
+ subs, repl_node, part .node , repl_node );
430
433
END LOOP;
431
434
END IF;
432
435
END LOOP;
@@ -471,13 +474,22 @@ CREATE FUNCTION reconstruct_table_attrs(relation regclass)
471
474
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
472
475
473
476
-- Broadcast SQL commands to nodes and wait their completion.
474
- -- cmds is list of SQL commands separated by by semi-columns with node prefix: node-id:sql-statement;
475
- -- By default functions throws error is execution is failed at some of the nodes, with ignore_errors=true errors are ignored
476
- -- and function returns string with "Error:" prefix containing list of errors separated by semi-columns with nodes prefixes.
477
- -- In case o normal completion this function return list with node prefixes separated by semi-columns with single result of select queries
478
- -- or number of affected rows for other commands.
479
- -- If two_phase parameter is true, then each statement is wrapped in blocked and prepared with subsequent commit or rollback of prepared transaction
480
- -- at second phase of two phase commit.
477
+ -- cmds is list of SQL commands separated by semi-columns with node
478
+ -- prefix: node-id:sql-statement;
479
+ -- To run multiple statements on node, wrap them in {}:
480
+ -- {node-id:statement; statement;}
481
+ -- Don't specify them separately with 2pc, we use only one prepared_xact name.
482
+ -- No escaping is performed, so ';', '{' and '}' inside queries are not supported.
483
+ -- By default functions throws error is execution is failed at some of the
484
+ -- nodes, with ignore_errors=true errors are ignored and function returns string
485
+ -- with "Error:" prefix containing list of errors separated by semicolons with
486
+ -- nodes prefixes.
487
+ -- In case of normal completion this function return list with node prefixes
488
+ -- separated by semi-columns with single result for select queries or number of
489
+ -- affected rows for other commands.
490
+ -- If two_phase parameter is true, then each statement is wrapped in blocked and
491
+ -- prepared with subsequent commit or rollback of prepared transaction at second
492
+ -- phase of two phase commit.
481
493
CREATE FUNCTION broadcast (cmds text , ignore_errors bool = false, two_phase bool = false)
482
494
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
483
495
@@ -547,4 +559,3 @@ CREATE FUNCTION shardlord_connection_string()
547
559
-- Check from configuration parameters is synchronous replication mode was enabled
548
560
CREATE FUNCTION synchronous_replication ()
549
561
RETURNS bool AS ' pg_shardman' LANGUAGE C STRICT;
550
-
0 commit comments