@@ -287,6 +287,9 @@ BEGIN
287
287
rm_node_id, node .id ,
288
288
node .id , rm_node_id,
289
289
rm_node_id, node .id );
290
+ -- Subscription with associated slot can not be droped inside block, but if we do not override syncronous_commit policy,
291
+ -- then this command will be blocked waiting for sync replicas. So we need first do unbound slot from subscription.
292
+ -- But it is possible only for disabled subscriptions. So we have to perform three steps: disable subscription, unbound slot, drop subscription.
290
293
alts := format(' %s{%s:ALTER SUBSCRIPTION sub_%s_%s DISABLE;ALTER SUBSCRIPTION sub_%s_%s SET (slot_name=NONE)}{%s:ALTER SUBSCRIPTION sub_%s_%s DISABLE;ALTER SUBSCRIPTION sub_%s_%s SET (slot_name=NONE)}' ,
291
294
alts, rm_node_id, rm_node_id, node .id , rm_node_id, node .id ,
292
295
node .id , node .id , rm_node_id, node .id , rm_node_id);
@@ -642,7 +645,7 @@ BEGIN
642
645
RETURN;
643
646
END IF;
644
647
645
- -- Check if destination belong to the same replication group as source
648
+ -- Check if destination belongs to the same replication group as source
646
649
IF dst_repl_group<> src_repl_group AND shardman .get_redundancy_of_partition (mv_part_name)> 0
647
650
THEN
648
651
RAISE EXCEPTION ' Can not move partition % to different replication group' , mv_part_name;
@@ -940,6 +943,133 @@ END
940
943
$$ LANGUAGE plpgsql;
941
944
942
945
946
+ -- Move replica to other node. This function is able to move replica only within replication group.
947
+ -- It initiates copying data to new replica, disables logical replication to original replica,
948
+ -- waits completion of initial table sync and then removes old replca.
949
+ CREATE FUNCTION mv_replica (mv_part_name text , src_node_id int , dst_node_id int )
950
+ RETURNS void AS $$
951
+ DECLARE
952
+ src_repl_group text ;
953
+ dst_repl_group text ;
954
+ master_node_id int ;
955
+ BEGIN
956
+ IF shardman .redirect_to_shardlord (format(' mv_replica(%L, %L, %L)' , mv_part_name, src_node_id, dst_node_id))
957
+ THEN
958
+ RETURN;
959
+ END IF;
960
+
961
+ IF src_node_id = dst_node_id
962
+ THEN
963
+ -- Nothing to do: replica is already here
964
+ RAISE NOTICE ' Replica % is already located at node %' , mv_part_name,dst_node_id;
965
+ RETURN;
966
+ END IF;
967
+
968
+ -- Check if there is such replica at source node
969
+ IF EXISTS(SELECT * FROM shardman .replicas WHERE part_name= mv_part_name AND node_id= src_node_id)
970
+ THEN
971
+ RAISE EXCEPTION ' Replica of & does not exist on node %' , mv_part_name, src_node_id;
972
+ END IF;
973
+
974
+ -- Check if destination belongs to the same replication group as source
975
+ SELECT replication_group INTO src_repl_group FROM shardman .nodes WHERE id= src_node_id;
976
+ SELECT replication_group INTO dst_repl_group FROM shardman .nodes WHERE id= dst_node_id;
977
+ IF dst_repl_group<> src_repl_group
978
+ THEN
979
+ RAISE EXCEPTION ' Can not move replica % from replication group % to %' , mv_part_name, src_repl_group, dst_repl_group;
980
+ END IF;
981
+
982
+ -- Check if there is no replica of this partition at the destination node
983
+ IF EXISTS(SELECT * FROM shardman .replicas WHERE part_name= mv_part_name AND node_id= dst_node_id)
984
+ THEN
985
+ RAISE EXCEPTION ' Can not move replica % to node % with existed replica' , mv_part_name, dst_node_id;
986
+ END
987
+
988
+ -- Get node ID of primary partition
989
+ SELECT node_id INTO master_node_id FROM shardman .partitions WHERE part_name= mv_part_name;
990
+
991
+ -- Alter publications at master node
992
+ PERFORM shardman .broadcast (format(' %s:ALTER PUBLICATION node_%s ADD TABLE %I;%s:ALTER PUBLICATION node_%s DROP TABLE %I;' ,
993
+ dst_node_id, mv_part_name, src_node_id, mv_part_name));
994
+
995
+ -- Refresh subscriptions
996
+ PERFORM shardman .broadcast (format(' %s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION WITH (copy_data=false);'
997
+ ' %s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION;' ,
998
+ src_node_id, src_node_id, master_node_id,
999
+ dst_node_id, dst_node_id, master_node_id),
1000
+ super_connstr => true);
1001
+
1002
+ -- Wait completion of initial table sync
1003
+ PERFORM shardman .wait_sync_completion (master_node_id, dst_node_id);
1004
+
1005
+ -- Update metadata
1006
+ UPDATE shardman .replicas SET node_id= dst_node_id WHERE node_id= src_node_id AND part_name= mv_part_name;
1007
+
1008
+ -- Truncate original table
1009
+ PERFORM shardman .broadcast (format(' %s:TRUNCATE TABLE %L;' , src_node_id, mv_part_name);
1010
+ END
1011
+ $$ LANGUAGE plpgsql;
1012
+
1013
+
1014
+ -- Rebalance replicas between nodes. This function tries to evenly
1015
+ -- redistribute replicas of partitions of tables which names match LIKE 'pattern'
1016
+ -- between all nodes of replication groups.
1017
+ -- It is not able to move replica between replication groups.
1018
+ -- This function intentionally moves one replica per time to minimize
1019
+ -- influence on system performance.
1020
+ CREATE FUNCTION rebalance_replicas (table_pattern text = ' %' ) RETURNS void AS $$
1021
+ DECLARE
1022
+ dst_node int ;
1023
+ src_node int ;
1024
+ min_count bigint ;
1025
+ max_count bigint ;
1026
+ mv_part_name text ;
1027
+ repl_group text ;
1028
+ done bool;
1029
+ BEGIN
1030
+ IF shardman .redirect_to_shardlord (format(' rebalance_replicas(%L)' , table_pattern))
1031
+ THEN
1032
+ RETURN;
1033
+ END IF;
1034
+
1035
+ LOOP
1036
+ done := true;
1037
+ -- Repeat for all replication groups
1038
+ FOR repl_group IN SELECT DISTINCT replication_group FROM shardman .nodes
1039
+ LOOP
1040
+ -- Select node in this group with minimal number of replicas
1041
+ SELECT node_id, count (* ) n_parts INTO dst_node, min_count
1042
+ FROM shardman .replicas r JOIN shardman .nodes n ON r .node_id = n .id
1043
+ WHERE n .replication_group = repl_group AND r .relation LIKE table_pattern
1044
+ GROUP BY node_id
1045
+ ORDER BY n_parts ASC LIMIT 1 ;
1046
+ -- Select node in this group with maximal number of partitions
1047
+ SELECT node_id, count (* ) n_parts INTO src_node,max_count
1048
+ FROM shardman .replicas r JOIN shardman .nodes n ON r .node_id = n .id
1049
+ WHERE n .replication_group = repl_group AND r .relation LIKE table_pattern
1050
+ GROUP BY node_id
1051
+ ORDER BY n_parts DESC LIMIT 1 ;
1052
+ -- If difference of number of replicas on this nodes is greater
1053
+ -- than 1, then move random partition
1054
+ IF max_count - min_count > 1 THEN
1055
+ SELECT src .part_name INTO mv_part_name
1056
+ FROM shardman .replicas src
1057
+ WHERE src .node_id = src_node AND src .relation LIKE table_pattern AND
1058
+ NOT EXISTS(SELECT * FROM shardman .replicas dst
1059
+ WHERE dst .node_id = dst_node AND dst .part_name = src .part_name )
1060
+ ORDER BY random() LIMIT 1 ;
1061
+ PERFORM shardman .mv_replica (mv_part_name, src_node, dst_node);
1062
+ done := false;
1063
+ END IF;
1064
+ END LOOP;
1065
+
1066
+ EXIT WHEN done;
1067
+ END LOOP;
1068
+ END
1069
+ $$ LANGUAGE plpgsql;
1070
+
1071
+
1072
+
943
1073
-- -------------------------------------------------------------------
944
1074
-- Utility functions
945
1075
-- -------------------------------------------------------------------
@@ -1066,6 +1196,28 @@ CREATE FUNCTION synchronous_replication()
1066
1196
CREATE FUNCTION is_shardlord ()
1067
1197
RETURNS bool AS ' pg_shardman' LANGUAGE C STRICT;
1068
1198
1199
+ -- Get subscription status
1200
+ CREATE FUNCTION get_subscription_status (sname text ) RETURNS char AS $$
1201
+ SELECT srsubstate FROM pg_subscription_rel srel
1202
+ JOIN pg_subscription s on srel .srsubid = s .oid where subname= sname;
1203
+ $$ LANGUAGE sql;
1204
+
1205
+ -- Wait initial sync completion
1206
+ CREATE FUNCTION wait_sync_completion (src_node_id int , dst_node_id int ) RETURNS void AS $$
1207
+ DECLARE
1208
+ timeout_sec int = 1 ;
1209
+ response text ;
1210
+ BEGIN
1211
+ LOOP
1212
+ response := shardman .broadcast (format(' %s:SELECT shardman.get_subscription_status(' ' sub_%s_%s' ' );' ,
1213
+ dst_node_id, dst_node_id, src_node_id));
1214
+ EXIT WHEN response= ' r' ;
1215
+ PERFORM pg_sleep(timeout_sec);
1216
+ END LOOP;
1217
+ END
1218
+ $$ LANGUAGE plpgsql;
1219
+
1220
+
1069
1221
-- Wait completion of partition copy using LR
1070
1222
CREATE FUNCTION wait_copy_completion (src_node_id int , dst_node_id int , part_name text ) RETURNS void AS $$
1071
1223
DECLARE
@@ -1081,9 +1233,7 @@ BEGIN
1081
1233
LOOP
1082
1234
IF NOT synced
1083
1235
THEN
1084
- response := shardman .broadcast (format(
1085
- ' %s:SELECT srsubstate FROM pg_subscription_rel srel
1086
- JOIN pg_subscription s on srel.srsubid = s.oid where subname=%L;' ,
1236
+ response := shardman .broadcast (format(' %s:SELECT shardman.get_subscription_status(%L);' ,
1087
1237
dst_node_id, slot));
1088
1238
IF response= ' r' THEN
1089
1239
synced := true;
0 commit comments