@@ -32,7 +32,8 @@ CREATE TABLE nodes (
32
32
system_id bigint NOT NULL UNIQUE,
33
33
super_connection_string text UNIQUE NOT NULL ,
34
34
connection_string text UNIQUE NOT NULL ,
35
- replication_group text NOT NULL -- group of nodes within which shard replicas are allocated
35
+ -- group of nodes within which shard replicas are allocated
36
+ replication_group text NOT NULL
36
37
);
37
38
38
39
-- List of sharded tables
@@ -72,7 +73,7 @@ CREATE TABLE replicas (
72
73
-- * It allows to set up pgbouncer, as replication can't go through it.
73
74
-- If conn_string is null, super_conn_string is used everywhere.
74
75
CREATE FUNCTION add_node (super_conn_string text , conn_string text = NULL ,
75
- repl_group text = ' default ' ) RETURNS int AS $$
76
+ repl_group text = NULL ) RETURNS int AS $$
76
77
DECLARE
77
78
new_node_id int ;
78
79
node shardman .nodes ;
@@ -113,18 +114,22 @@ BEGIN
113
114
-- Insert new node in nodes table
114
115
INSERT INTO shardman .nodes (system_id, super_connection_string,
115
116
connection_string, replication_group)
116
- VALUES (0 , super_conn_string, conn_string_effective, repl_group )
117
+ VALUES (0 , super_conn_string, conn_string_effective, ' ' )
117
118
RETURNING id INTO new_node_id;
118
119
119
- -- We have to update system_id after insert, because otherwise broadcast
120
- -- will not work
120
+ -- We have to update system_id along with dependant repl_group after insert,
121
+ -- because otherwise broadcast will not work.
121
122
sys_id := shardman .broadcast (
122
123
format(' %s:SELECT shardman.get_system_identifier();' ,
123
124
new_node_id))::bigint ;
124
125
IF EXISTS(SELECT 1 FROM shardman .nodes where system_id = sys_id) THEN
125
126
RAISE EXCEPTION ' Node with system id % is already in the cluster' , sys_id;
126
127
END IF;
127
- UPDATE shardman .nodes SET system_id= sys_id WHERE id= new_node_id;
128
+ UPDATE shardman .nodes SET system_id = sys_id WHERE id = new_node_id;
129
+ -- By default, use system id as repl group name
130
+ UPDATE shardman .nodes SET replication_group =
131
+ (CASE WHEN repl_group IS NULL THEN sys_id::text ELSE repl_group END)
132
+ WHERE id = new_node_id;
128
133
129
134
-- Adjust replication channels within replication group.
130
135
-- We need all-to-all replication channels between all group members.
@@ -137,10 +142,10 @@ BEGIN
137
142
%s:CREATE PUBLICATION node_%s;
138
143
%s:SELECT pg_create_logical_replication_slot(' ' node_%s' ' , ' ' pgoutput' ' );
139
144
%s:SELECT pg_create_logical_replication_slot(' ' node_%s' ' , ' ' pgoutput' ' );' ,
140
- pubs, node .id , new_node_id,
141
- new_node_id, node .id ,
142
- node .id , new_node_id,
143
- new_node_id, node .id );
145
+ pubs, node .id , new_node_id,
146
+ new_node_id, node .id ,
147
+ node .id , new_node_id,
148
+ new_node_id, node .id );
144
149
-- Add to new node subscriptions to existing nodes and add subscription
145
150
-- to new node to all existing nodes
146
151
-- sub name is sub_$subnodeid_pubnodeid to avoid application_name collision
@@ -202,18 +207,16 @@ BEGIN
202
207
-- Broadcast command for creating user mapping for this servers
203
208
PERFORM shardman .broadcast (usms);
204
209
205
- -- Create FDWs at new node for all existed partitions
210
+ -- Create FDWs at new node for all existing partitions
206
211
FOR t IN SELECT * from shardman .tables WHERE sharding_key IS NOT NULL
207
212
LOOP
208
213
create_tables := format(' %s{%s:%s}' ,
209
214
create_tables, new_node_id, t .create_sql );
210
215
create_partitions := format(' %s%s:SELECT create_hash_partitions(%L,%L,%L);' ,
211
216
create_partitions, new_node_id, t .relation , t .sharding_key , t .partitions_count );
212
217
SELECT shardman .reconstruct_table_attrs (t .relation ) INTO table_attrs;
213
- FOR part IN SELECT * from shardman .partitions WHERE relation= t .relation
218
+ FOR part IN SELECT * FROM shardman .partitions WHERE relation= t .relation
214
219
LOOP
215
- SELECT connection_string INTO conn_string from shardman .nodes WHERE id= part .node_id ;
216
- SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (conn_str) INTO server_opts, um_opts;
217
220
srv_name := format(' node_%s' , part .node_id );
218
221
fdw_part_name := format(' %s_fdw' , part .part_name );
219
222
create_fdws := format(' %s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L);' ,
@@ -460,14 +463,27 @@ BEGIN
460
463
END
461
464
$$ LANGUAGE plpgsql;
462
465
463
-
466
+ -- Bail out with ERROR if some replication group doesn't have 'redundancy'
467
+ -- replicas
468
+ CREATE FUNCTION check_max_replicas (redundancy int ) RETURNS void AS $$
469
+ DECLARE
470
+ rg record;
471
+ BEGIN
472
+ FOR rg IN SELECT count (* ), replication_group FROM shardman .nodes
473
+ GROUP BY replication_group LOOP
474
+ IF rg .count < redundancy + 1 THEN
475
+ RAISE EXCEPTION ' Requested redundancy % is too high: replication group % has % members' , redundancy, rg .replication_group , rg .count ;
476
+ END IF;
477
+ END LOOP;
478
+ END
479
+ $$ LANGUAGE plpgsql;
464
480
465
481
-- Shard table with hash partitions. Parameters are the same as in pathman.
466
482
-- It also scatter partitions through all nodes.
467
483
-- This function expects that empty table is created at shardlord.
468
484
-- It can be executed only at shardlord and there is no need to redirect this
469
485
-- function to shardlord.
470
- CREATE FUNCTION create_hash_partitions (rel regclass , expr text , part_count int ,
486
+ CREATE FUNCTION create_hash_partitions (rel_name name , expr text , part_count int ,
471
487
redundancy int = 0 )
472
488
RETURNS void AS $$
473
489
DECLARE
@@ -483,7 +499,6 @@ DECLARE
483
499
create_partitions text = ' ' ;
484
500
create_fdws text = ' ' ;
485
501
replace_parts text = ' ' ;
486
- rel_name text = rel::text ;
487
502
i int ;
488
503
n_nodes int ;
489
504
BEGIN
@@ -496,6 +511,9 @@ BEGIN
496
511
RAISE EXCEPTION ' Please add some nodes first' ;
497
512
END IF;
498
513
514
+ -- Check right away to avoid unneccessary recover()
515
+ PERFORM shardman .check_max_replicas (redundancy);
516
+
499
517
-- Generate SQL statement creating this table
500
518
SELECT shardman .gen_create_table_sql (rel_name) INTO create_table;
501
519
@@ -522,7 +540,7 @@ BEGIN
522
540
n_nodes := array_length(node_ids, 1 );
523
541
524
542
-- Reconstruct table attributes from parent table
525
- SELECT shardman .reconstruct_table_attrs (rel_name) INTO table_attrs;
543
+ SELECT shardman .reconstruct_table_attrs (rel_name::regclass ) INTO table_attrs;
526
544
527
545
FOR i IN 0 ..part_count- 1
528
546
LOOP
@@ -553,15 +571,15 @@ BEGIN
553
571
554
572
IF redundancy <> 0
555
573
THEN
556
- PERFORM shardman .set_redundancy (rel , redundancy, copy_data => false);
574
+ PERFORM shardman .set_redundancy (rel_name , redundancy, copy_data => false);
557
575
END IF;
558
576
END
559
577
$$ LANGUAGE plpgsql;
560
578
561
579
-- Provide requested level of redundancy. 0 means no redundancy.
562
580
-- If existing level of redundancy is greater than specified, then right now this
563
581
-- function does nothing.
564
- CREATE FUNCTION set_redundancy (rel regclass , redundancy int , copy_data bool = true)
582
+ CREATE FUNCTION set_redundancy (rel_name name , redundancy int , copy_data bool = true)
565
583
RETURNS void AS $$
566
584
DECLARE
567
585
part shardman .partitions ;
@@ -570,20 +588,21 @@ DECLARE
570
588
repl_group text ;
571
589
pubs text = ' ' ;
572
590
subs text = ' ' ;
573
- rel_name text = rel::text ;
574
591
sub_options text = ' ' ;
575
592
BEGIN
576
593
IF shardman .redirect_to_shardlord (format(' set_redundancy(%L, %L)' , rel_name, redundancy))
577
594
THEN
578
595
RETURN;
579
596
END IF;
580
597
598
+ PERFORM shardman .check_max_replicas (redundancy);
599
+
581
600
IF NOT copy_data THEN
582
601
sub_options := ' WITH (copy_data=false)' ;
583
602
END IF;
584
603
585
604
-- Loop through all partitions of this table
586
- FOR part IN SELECT * from shardman .partitions where relation= rel_name
605
+ FOR part IN SELECT * FROM shardman .partitions WHERE relation= rel_name
587
606
LOOP
588
607
-- Count number of replicas of this partition
589
608
SELECT count (* ) INTO n_replicas FROM shardman .replicas WHERE part_name= part .part_name ;
@@ -665,10 +684,9 @@ $$ LANGUAGE plpgsql;
665
684
666
685
667
686
-- Remove table from all nodes.
668
- CREATE FUNCTION rm_table (rel regclass )
687
+ CREATE FUNCTION rm_table (rel_name name )
669
688
RETURNS void AS $$
670
689
DECLARE
671
- rel_name text = rel::text ;
672
690
node_id int ;
673
691
pname text ;
674
692
drop1 text = ' ' ;
@@ -702,10 +720,11 @@ BEGIN
702
720
END
703
721
$$ LANGUAGE plpgsql;
704
722
705
- -- Move partition to other node. This function is able to move partition only within replication group.
706
- -- It creates temporary logical replication channel to copy partition to new location.
707
- -- Until logical replication almost caught-up access to old partition is now denied.
708
- -- Then we revoke all access to this table until copy is completed and all FDWs are updated.
723
+ -- Move partition to other node. This function can move partition only within
724
+ -- replication group. It creates temporary logical replication channel to copy
725
+ -- partition to new location. Until logical replication almost caught-up access
726
+ -- to old partition is denied. Then we revoke all access to this table until
727
+ -- copy is completed and all FDWs are updated.
709
728
CREATE FUNCTION mv_partition (mv_part_name text , dst_node_id int )
710
729
RETURNS void AS $$
711
730
DECLARE
@@ -750,12 +769,12 @@ BEGIN
750
769
-- Check if destination belongs to the same replication group as source
751
770
IF dst_repl_group<> src_repl_group AND shardman .get_redundancy_of_partition (mv_part_name)> 0
752
771
THEN
753
- RAISE EXCEPTION ' Can not move partition % to different replication group' , mv_part_name;
772
+ RAISE EXCEPTION ' Unable to move partition % to different replication group' , mv_part_name;
754
773
END IF;
755
774
756
775
IF EXISTS(SELECT * FROM shardman .replicas WHERE part_name= mv_part_name AND node_id= dst_node_id)
757
776
THEN
758
- RAISE EXCEPTION ' Can not move partition % to node % with existed replica' , mv_part_name, dst_node_id;
777
+ RAISE EXCEPTION ' Unable to move partition % to node % with existing replica' , mv_part_name, dst_node_id;
759
778
END IF;
760
779
761
780
-- Copy partition data to new location
@@ -845,8 +864,8 @@ $$ LANGUAGE sql;
845
864
846
865
-- Get minimal redundancy of the specified relation.
847
866
-- This command can be executed only at shardlord.
848
- CREATE FUNCTION get_min_redundancy (rel regclass) returns bigint AS $$
849
- SELECT min (redundancy) FROM (SELECT count (* ) redundancy FROM shardman .replicas WHERE relation= rel:: text GROUP BY part_name) s;
867
+ CREATE FUNCTION get_min_redundancy (rel_name name) RETURNS bigint AS $$
868
+ SELECT min (redundancy) FROM (SELECT count (* ) redundancy FROM shardman .replicas WHERE relation= rel_name GROUP BY part_name) s;
850
869
$$ LANGUAGE sql;
851
870
852
871
-- Execute command at all shardman nodes.
@@ -896,7 +915,7 @@ $$ LANGUAGE sql;
896
915
-- It is not able to move partition between replication groups.
897
916
-- This function intentionally moves one partition per time to minimize
898
917
-- influence on system performance.
899
- CREATE FUNCTION rebalance (table_pattern text = ' %' ) RETURNS void AS $$
918
+ CREATE FUNCTION rebalance (part_pattern text = ' %' ) RETURNS void AS $$
900
919
DECLARE
901
920
dst_node int ;
902
921
src_node int ;
@@ -906,7 +925,7 @@ DECLARE
906
925
repl_group text ;
907
926
done bool;
908
927
BEGIN
909
- IF shardman .redirect_to_shardlord (format(' rebalance(%L)' , table_pattern ))
928
+ IF shardman .redirect_to_shardlord (format(' rebalance(%L)' , part_pattern ))
910
929
THEN
911
930
RETURN;
912
931
END IF;
@@ -919,21 +938,21 @@ BEGIN
919
938
-- Select node in this group with minimal number of partitions
920
939
SELECT node_id, count (* ) n_parts INTO dst_node, min_count
921
940
FROM shardman .partitions p JOIN shardman .nodes n ON p .node_id = n .id
922
- WHERE n .replication_group = repl_group AND p .relation LIKE table_pattern
941
+ WHERE n .replication_group = repl_group AND p .relation LIKE part_pattern
923
942
GROUP BY node_id
924
943
ORDER BY n_parts ASC LIMIT 1 ;
925
944
-- Select node in this group with maximal number of partitions
926
945
SELECT node_id, count (* ) n_parts INTO src_node,max_count
927
946
FROM shardman .partitions p JOIN shardman .nodes n ON p .node_id = n .id
928
- WHERE n .replication_group = repl_group AND p .relation LIKE table_pattern
947
+ WHERE n .replication_group = repl_group AND p .relation LIKE part_pattern
929
948
GROUP BY node_id
930
949
ORDER BY n_parts DESC LIMIT 1 ;
931
950
-- If difference of number of partitions on this nodes is greater
932
951
-- than 1, then move random partition
933
952
IF max_count - min_count > 1 THEN
934
953
SELECT p .part_name INTO mv_part_name
935
954
FROM shardman .partitions p
936
- WHERE p .node_id = src_node AND p .relation LIKE table_pattern AND
955
+ WHERE p .node_id = src_node AND p .relation LIKE part_pattern AND
937
956
NOT EXISTS(SELECT * from shardman .replicas r
938
957
WHERE r .node_id = dst_node AND r .part_name = p .part_name )
939
958
ORDER BY random() LIMIT 1 ;
@@ -947,8 +966,8 @@ BEGIN
947
966
END
948
967
$$ LANGUAGE plpgsql;
949
968
950
- -- Share table between all nodes. This function should be executed at shardlord. The empty table should be present at shardlord,
951
- -- but not at nodes.
969
+ -- Share table between all nodes. This function should be executed at
970
+ -- shardlord. The empty table should be present at shardlord, but not at nodes.
952
971
CREATE FUNCTION create_shared_table (rel regclass, master_node_id int = 1 ) RETURNS void AS $$
953
972
DECLARE
954
973
node shardman .nodes ;
@@ -1121,7 +1140,7 @@ $$ LANGUAGE plpgsql;
1121
1140
-- It is not able to move replica between replication groups.
1122
1141
-- This function intentionally moves one replica per time to minimize
1123
1142
-- influence on system performance.
1124
- CREATE FUNCTION rebalance_replicas (table_pattern text = ' %' ) RETURNS void AS $$
1143
+ CREATE FUNCTION rebalance_replicas (replica_pattern text = ' %' ) RETURNS void AS $$
1125
1144
DECLARE
1126
1145
dst_node int ;
1127
1146
src_node int ;
@@ -1131,7 +1150,7 @@ DECLARE
1131
1150
repl_group text ;
1132
1151
done bool;
1133
1152
BEGIN
1134
- IF shardman .redirect_to_shardlord (format(' rebalance_replicas(%L)' , table_pattern ))
1153
+ IF shardman .redirect_to_shardlord (format(' rebalance_replicas(%L)' , replica_pattern ))
1135
1154
THEN
1136
1155
RETURN;
1137
1156
END IF;
@@ -1144,21 +1163,21 @@ BEGIN
1144
1163
-- Select node in this group with minimal number of replicas
1145
1164
SELECT node_id, count (* ) n_parts INTO dst_node, min_count
1146
1165
FROM shardman .replicas r JOIN shardman .nodes n ON r .node_id = n .id
1147
- WHERE n .replication_group = repl_group AND r .relation LIKE table_pattern
1166
+ WHERE n .replication_group = repl_group AND r .relation LIKE replica_pattern
1148
1167
GROUP BY node_id
1149
1168
ORDER BY n_parts ASC LIMIT 1 ;
1150
1169
-- Select node in this group with maximal number of partitions
1151
1170
SELECT node_id, count (* ) n_parts INTO src_node,max_count
1152
1171
FROM shardman .replicas r JOIN shardman .nodes n ON r .node_id = n .id
1153
- WHERE n .replication_group = repl_group AND r .relation LIKE table_pattern
1172
+ WHERE n .replication_group = repl_group AND r .relation LIKE replica_pattern
1154
1173
GROUP BY node_id
1155
1174
ORDER BY n_parts DESC LIMIT 1 ;
1156
1175
-- If difference of number of replicas on this nodes is greater
1157
1176
-- than 1, then move random partition
1158
1177
IF max_count - min_count > 1 THEN
1159
1178
SELECT src .part_name INTO mv_part_name
1160
1179
FROM shardman .replicas src
1161
- WHERE src .node_id = src_node AND src .relation LIKE table_pattern
1180
+ WHERE src .node_id = src_node AND src .relation LIKE replica_pattern
1162
1181
AND NOT EXISTS(SELECT * FROM shardman .replicas dst
1163
1182
WHERE dst .node_id = dst_node AND dst .part_name = src .part_name )
1164
1183
AND NOT EXISTS(SELECT * FROM shardman .partitions p
0 commit comments