Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f394590

Browse files
committed
Add rebalance method
1 parent 96f3c3f commit f394590

File tree

2 files changed

+86
-24
lines changed

2 files changed

+86
-24
lines changed

pg_shardman--1.0.sql

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ CREATE TABLE replicas (
5252
-- the node as superuser, and 'conn_string' can be some other connstring.
5353
-- The former is used for configuring logical replication, the latter for DDL
5454
-- and for setting up FDW. This separation serves two purposes:
55-
-- * It allows to access data without requiring superuser priviliges;
55+
-- * It allows to access data without requiring superuser privileges;
5656
-- * It allows to set up pgbouncer, as replication can't go through it.
5757
-- If conn_string is null, super_conn_string is used everywhere.
5858
CREATE FUNCTION add_node(super_conn_string text, conn_string text = NULL,
59-
repl_group text = 'all') RETURNS void AS $$
59+
repl_group text = 'default') RETURNS void AS $$
6060
DECLARE
6161
new_node_id int;
6262
node shardman.nodes;
@@ -127,7 +127,7 @@ BEGIN
127127
IF shardman.synchronous_replication() AND
128128
(SELECT COUNT(*) FROM shardman.nodes WHERE replication_group = repl_group) > 1
129129
THEN
130-
-- Take all nodes in replicationg group excluding myself
130+
-- Take all nodes in replication group excluding myself
131131
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group = repl_group LOOP
132132
-- Construct list of synchronous standbyes=subscriptions to this node
133133
sync_standbys :=
@@ -145,7 +145,7 @@ BEGIN
145145

146146
-- Add foreign servers for connection to the new node and backward
147147
-- Construct foreign server options from connection string of new node
148-
SELECT * FROM shardman.conninfo_to_postgres_fdw_opts(conn_string) INTO new_server_opts, new_um_opts;
148+
SELECT * FROM shardman.conninfo_to_postgres_fdw_opts(conn_string_effective) INTO new_server_opts, new_um_opts;
149149
FOR node IN SELECT * FROM shardman.nodes WHERE id<>new_node_id
150150
LOOP
151151
-- Construct foreign server options from connection string of this node
@@ -696,6 +696,65 @@ BEGIN
696696
END
697697
$$ LANGUAGE plpgsql;
698698

699+
-- Count number of partitions at particular node.
700+
-- This command can be executed only at shardlord.
701+
CREATE FUNCTION get_node_partitions_count(node int) returns bigint AS $$
702+
SELECT count(*) from shardman.partitions WHERE node_id=node;
703+
$$ LANGUAGE sql;
704+
705+
-- Rebalance partitions between nodes. This function tries to evenly redistribute partition between all nodes of replication groups.
706+
-- It is not able to move partition between replication groups.
707+
-- This function intentionally move one partition per time to minimize influence on system performance.
708+
CREATE FUNCTION rebalance(table_pattern text = '%') RETURNS void AS $$
709+
DECLARE
710+
dst_node int;
711+
src_node int;
712+
min_count bigint;
713+
max_count bigint;
714+
mv_part_name text;
715+
repl_group text;
716+
done bool;
717+
BEGIN
718+
IF shardman.redirect_to_shardlord(format('rebalance(%L)', table_pattern))
719+
THEN
720+
RETURN;
721+
END IF;
722+
723+
LOOP
724+
done := true;
725+
-- Repeat for all replication groups
726+
FOR repl_group IN SELECT DISTINCT replication_group FROM shardman.nodes
727+
LOOP
728+
-- Select node in this group with minimal number of partitions
729+
SELECT node_id, count(*) n_parts INTO dst_node,min_count
730+
FROM shardman.partitions p JOIN shardman.nodes n ON p.node_id=n.id
731+
WHERE n.replication_group=repl_group AND p.relation LIKE table_pattern
732+
GROUP BY node_id
733+
ORDER BY n_parts ASC LIMIT 1;
734+
-- Select node in this group with maximal number of partitions
735+
SELECT node_id, count(*) n_parts INTO src_node,max_count
736+
FROM shardman.partitions p JOIN shardman.nodes n ON p.node_id=n.id
737+
WHERE n.replication_group=repl_group AND p.relation LIKE table_pattern
738+
GROUP BY node_id
739+
ORDER BY n_parts DESC LIMIT 1;
740+
-- If difference of number of partitions on this nodes is greater than 1, then move random partition
741+
IF max_count - min_count > 1 THEN
742+
SELECT p.part_name INTO mv_part_name
743+
FROM shardman.partitions p
744+
WHERE p.node_id=src_node AND p.relation LIKE table_pattern AND
745+
NOT EXISTS(SELECT * from shardman.replicas r
746+
WHERE r.node_id=dst_node AND r.part_name=p.part_name)
747+
ORDER BY random() LIMIT 1;
748+
PERFORM shardman.mv_partition(mv_part_name, dst_node);
749+
done := false;
750+
END IF;
751+
END LOOP;
752+
753+
EXIT WHEN done;
754+
END LOOP;
755+
END
756+
$$ LANGUAGE plpgsql;
757+
699758
---------------------------------------------------------------------
700759
-- Utility functions
701760
---------------------------------------------------------------------
@@ -723,7 +782,7 @@ CREATE FUNCTION reconstruct_table_attrs(relation regclass)
723782
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
724783

725784
-- Broadcast SQL commands to nodes and wait their completion.
726-
-- cmds is list of SQL commands separated by semi-columns with node
785+
-- cmds is list of SQL commands terminated by semi-columns with node
727786
-- prefix: node-id:sql-statement;
728787
-- To run multiple statements on node, wrap them in {}:
729788
-- {node-id:statement; statement;}
@@ -732,10 +791,10 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
732791
-- No escaping is performed, so ';', '{' and '}' inside queries are not supported.
733792
-- By default functions throws error is execution is failed at some of the
734793
-- nodes, with ignore_errors=true errors are ignored and function returns string
735-
-- with "Error:" prefix containing list of errors separated by semicolons with
794+
-- with "Error:" prefix containing list of errors terminated by dots with
736795
-- nodes prefixes.
737796
-- In case of normal completion this function return list with node prefixes
738-
-- separated by semi-columns with single result for select queries or number of
797+
-- separated by columns with single result for select queries or number of
739798
-- affected rows for other commands.
740799
-- If two_phase parameter is true, then each statement is wrapped in blocked and
741800
-- prepared with subsequent commit or rollback of prepared transaction at second
@@ -834,9 +893,9 @@ DECLARE
834893
BEGIN
835894
LOOP
836895
response := shardman.broadcast(format('%s:SELECT confirmed_flush_lsn - pg_current_wal_lsn() FROM pg_replication_slots WHERE slot_name=%L;', src_node_id, slot));
837-
lag := trim(trailing ';' from response)::bigint;
896+
lag := response::bigint;
838897

839-
RAISE NOTICE 'Replication lag %', lag;
898+
RAISE DEBUG 'Replication lag %', lag;
840899
IF locked THEN
841900
IF lag<=0 THEN
842901
RETURN;

pg_shardman.c

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,9 @@ broadcast(PG_FUNCTION_ARGS)
162162
sql += n;
163163
if (node_id != 0)
164164
{
165-
if (super_connstr)
166-
fetch_node_connstr = psprintf(
167-
"select super_connection_string from shardman.nodes where id=%d",
168-
node_id);
169-
else
170-
fetch_node_connstr = psprintf(
171-
"select connection_string from shardman.nodes where id=%d",
172-
node_id);
165+
fetch_node_connstr = psprintf(
166+
"select %sconnection_string from shardman.nodes where id=%d",
167+
super_connstr ? "super_" : "", node_id);
173168
if (SPI_exec(fetch_node_connstr, 0) < 0 || SPI_processed != 1)
174169
{
175170
elog(ERROR, "SHARDMAN: Failed to fetch connection string for node %d",
@@ -181,6 +176,10 @@ broadcast(PG_FUNCTION_ARGS)
181176
}
182177
else
183178
{
179+
if (shardlord_connstring == NULL || *shardlord_connstring == '\0')
180+
{
181+
elog(ERROR, "SHARDMAN: Shardlord connection string was not specified in configuration file");
182+
}
184183
conn_str = shardlord_connstring;
185184
}
186185
if (n_cmds >= n_cons)
@@ -193,7 +192,7 @@ broadcast(PG_FUNCTION_ARGS)
193192
{
194193
if (ignore_errors)
195194
{
196-
errmsg = psprintf("%s%d:Connection failure: %s;",
195+
errmsg = psprintf("%s%d:Connection failure: %s.",
197196
errmsg ? errmsg : "", node_id,
198197
PQerrorMessage(conn[n_cmds - 1]));
199198
continue;
@@ -220,7 +219,7 @@ broadcast(PG_FUNCTION_ARGS)
220219
{
221220
if (ignore_errors)
222221
{
223-
errmsg = psprintf("%s%d:Failed to send query '%s': %s'",
222+
errmsg = psprintf("%s%d:Failed to send query '%s': %s'.",
224223
errmsg ? errmsg : "", node_id, sql,
225224
PQerrorMessage(conn[n_cmds-1]));
226225
continue;
@@ -256,7 +255,7 @@ broadcast(PG_FUNCTION_ARGS)
256255
{
257256
if (ignore_errors)
258257
{
259-
errmsg = psprintf("%s%d:Failed to received response for '%s': %s", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
258+
errmsg = psprintf("%s%d:Failed to received response for '%s': %s.", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
260259
continue;
261260
}
262261
errmsg = psprintf("Failed to receive response for query %s from node %d: %s", sql_full, node_id, PQerrorMessage(conn[i]));
@@ -269,21 +268,25 @@ broadcast(PG_FUNCTION_ARGS)
269268
{
270269
if (ignore_errors)
271270
{
272-
errmsg = psprintf("%s%d:Command %s failed: %s", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
271+
errmsg = psprintf("%s%d:Command %s failed: %s.", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
273272
PQclear(res);
274273
continue;
275274
}
276275
errmsg = psprintf("Command %s failed at node %d: %s", sql_full, node_id, PQerrorMessage(conn[i]));
277276
PQclear(res);
278277
goto cleanup;
279278
}
279+
if (i != 0)
280+
{
281+
appendStringInfoChar(&resp, ',');
282+
}
280283
if (status == PGRES_TUPLES_OK)
281284
{
282285
if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
283286
{
284287
if (ignore_errors)
285288
{
286-
appendStringInfoString(&resp, "?;");
289+
appendStringInfoString(&resp, "?");
287290
elog(WARNING, "SHARDMAN: Query '%s' doesn't return single tuple at node %d", sql_full, node_id);
288291
}
289292
else
@@ -295,12 +298,12 @@ broadcast(PG_FUNCTION_ARGS)
295298
}
296299
else
297300
{
298-
appendStringInfo(&resp, "%s;", PQgetvalue(res, 0, 0));
301+
appendStringInfo(&resp, "%s", PQgetvalue(res, 0, 0));
299302
}
300303
}
301304
else
302305
{
303-
appendStringInfo(&resp, "%d;", PQntuples(res));
306+
appendStringInfo(&resp, "%d", PQntuples(res));
304307
}
305308
PQclear(res);
306309
}

0 commit comments

Comments
 (0)