Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1ac27b7

Browse files
committed
Sending cmds to shardlord.
Cmds are deconstructed from array and sent to shardlord in generic way if we are on worker node. We are gradually moving from having keeping local state in local_meta to gucs and refuse to support multiple databases. * Current local_meta can be stale: what if lord hasn't yet set its lordship in local_meta, but backend already executes create_hash_partitions? We have to look at GUC anyway. * GUCs are bad because they can be modified by users and because they are per-cluster. We can't do much about the first, but we already have shardman_shardlord which fordbids two shardlords in the cluster. It would be hard to keep several bgws, notifies, so we are far from full support of several databases anyway. Also, * SPI macros now can be used inside xacts, but we have to precede them with SPI_XACT_STATUS always. * We now have get_node_connstr instead of get_worker_node_connstr. * cmd_type and status is now text + constraints instead of enum to avoid casting. Author: Dmitry Ivanov <ivadmi5@gmail.com>
1 parent 065ecc7 commit 1ac27b7

File tree

8 files changed

+332
-150
lines changed

8 files changed

+332
-150
lines changed

init.sql

Lines changed: 109 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -23,62 +23,51 @@ BEGIN
2323
END
2424
$$;
2525

26-
-- available commands
27-
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions',
28-
'move_part', 'create_replica', 'rebalance',
29-
'set_replevel');
30-
-- command status
31-
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress',
32-
'success', 'done');
33-
3426
CREATE TABLE cmd_log (
3527
id bigserial PRIMARY KEY,
36-
cmd_type cmd NOT NULL,
28+
cmd_type TEXT NOT NULL,
3729
cmd_opts TEXT[],
38-
status cmd_status DEFAULT 'waiting' NOT NULL
30+
status TEXT DEFAULT 'waiting' NOT NULL,
31+
32+
-- available commands
33+
CONSTRAINT check_cmd_type
34+
CHECK (cmd_type IN ('add_node', 'rm_node', 'create_hash_partitions',
35+
'move_part', 'create_replica', 'rebalance',
36+
'set_replevel')),
37+
38+
-- command status
39+
CONSTRAINT check_cmd_status
40+
CHECK (status IN ('waiting', 'canceled', 'failed', 'in progress',
41+
'success', 'done'))
3942
);
4043

41-
-- Notify shardlord bgw about new commands
42-
CREATE FUNCTION notify_shardlord() RETURNS trigger AS $$
43-
BEGIN
44-
NOTIFY shardman_cmd_log_update;
45-
RETURN NULL;
46-
END
47-
$$ LANGUAGE plpgsql;
48-
CREATE TRIGGER cmd_log_inserts
49-
AFTER INSERT ON cmd_log
50-
FOR EACH STATEMENT EXECUTE PROCEDURE notify_shardlord();
5144

5245
-- Interface functions
5346

47+
5448
-- Add a node. Its state will be reset, all shardman data lost.
5549
CREATE FUNCTION add_node(connstring text) RETURNS int AS $$
5650
DECLARE
57-
c_id int;
51+
cmd text;
52+
opts text[];
5853
BEGIN
59-
INSERT INTO @extschema@.cmd_log
60-
VALUES (DEFAULT, 'add_node', ARRAY[connstring])
61-
RETURNING id INTO c_id;
54+
cmd = 'add_node';
55+
opts = ARRAY[connstring::text];
6256

63-
RETURN c_id;
57+
RETURN @extschema@.register_cmd(cmd, opts);
6458
END
6559
$$ LANGUAGE plpgsql;
6660

6761
-- Remove node. Its state will be reset, all shardman data lost.
68-
CREATE FUNCTION rm_node(node_id int, force bool default false) RETURNS int AS $$
62+
CREATE FUNCTION rm_node(node_id int, force bool DEFAULT false) RETURNS int AS $$
6963
DECLARE
70-
c_id int;
64+
cmd text;
65+
opts text[];
7166
BEGIN
72-
INSERT INTO @extschema@.cmd_log
73-
VALUES (DEFAULT,
74-
'rm_node',
75-
CASE force
76-
WHEN true THEN ARRAY[node_id::text, 'force']
77-
ELSE ARRAY[node_id::text]
78-
END)
79-
RETURNING id INTO c_id;
80-
81-
RETURN c_id;
67+
cmd = 'rm_node';
68+
opts = ARRAY[node_id::text, force::text];
69+
70+
RETURN @extschema@.register_cmd(cmd, opts);
8271
END
8372
$$ LANGUAGE plpgsql;
8473

@@ -89,79 +78,112 @@ CREATE FUNCTION create_hash_partitions(
8978
rebalance bool DEFAULT true)
9079
RETURNS int AS $$
9180
DECLARE
92-
c_id int;
81+
cmd_id int;
82+
cmd text;
83+
opts text[];
9384
BEGIN
94-
INSERT INTO @extschema@.cmd_log
95-
VALUES (DEFAULT,
96-
'create_hash_partitions',
97-
ARRAY[node_id::text, relation, expr, partitions_count::text])
98-
RETURNING id INTO c_id;
99-
100-
IF rebalance THEN
101-
PERFORM shardman.rebalance(relation);
85+
cmd = 'create_hash_partitions';
86+
opts = ARRAY[node_id::text,
87+
relation::text,
88+
expr::text,
89+
partitions_count::text,
90+
rebalance::text];
91+
92+
cmd_id = @extschema@.register_cmd(cmd, opts);
93+
94+
-- additional steps must check node's type
95+
IF @extschema@.me_lord() AND rebalance THEN
96+
cmd_id = @extschema@.rebalance(relation);
10297
END IF;
103-
RETURN c_id;
98+
99+
-- return last command's id
100+
RETURN cmd_id;
104101
END
105102
$$ LANGUAGE plpgsql;
106103

107104
-- Move primary or replica partition to another node. Params:
108105
-- 'part_name' is name of the partition to move
109-
-- 'dest' is id of the destination node
106+
-- 'dst' is id of the destination node
110107
-- 'src' is id of the node with partition. If NULL, primary partition is moved.
111-
CREATE FUNCTION move_part(part_name text, dest int, src int DEFAULT NULL)
108+
CREATE FUNCTION move_part(part_name text, dst int, src int DEFAULT NULL)
112109
RETURNS int AS $$
113110
DECLARE
114-
c_id int;
111+
cmd text;
112+
opts text[];
115113
BEGIN
116-
INSERT INTO @extschema@.cmd_log
117-
VALUES (DEFAULT, 'move_part', ARRAY[part_name, src::text, dest::text])
118-
RETURNING id INTO c_id;
114+
cmd = 'move_part';
115+
opts = ARRAY[part_name::text, dst::text, src::text];
119116

120-
RETURN c_id;
121-
END $$ LANGUAGE plpgsql;
117+
RETURN @extschema@.register_cmd(cmd, opts);
118+
END
119+
$$ LANGUAGE plpgsql;
122120

123121
-- Create replica partition. Params:
124122
-- 'part_name' is name of the partition to replicate
125-
-- 'dest' is id of the node on which part will be created
126-
CREATE FUNCTION create_replica(part_name text, dest int) RETURNS int AS $$
123+
-- 'dst' is id of the node on which part will be created
124+
CREATE FUNCTION create_replica(part_name text, dst int) RETURNS int AS $$
127125
DECLARE
128-
c_id int;
126+
cmd text;
127+
opts text[];
129128
BEGIN
130-
INSERT INTO @extschema@.cmd_log
131-
VALUES (DEFAULT, 'create_replica', ARRAY[part_name, dest::text])
132-
RETURNING id INTO c_id;
129+
cmd = 'create_replica';
130+
opts = ARRAY[part_name::text, dst::text];
133131

134-
RETURN c_id;
135-
END $$ LANGUAGE plpgsql;
132+
RETURN @extschema@.register_cmd(cmd, opts);
133+
END
134+
$$ LANGUAGE plpgsql;
136135

137136
-- Evenly distribute partitions of table 'relation' across all nodes.
138137
CREATE FUNCTION rebalance(relation text) RETURNS int AS $$
139138
DECLARE
140-
c_id int;
139+
cmd text;
140+
opts text[];
141141
BEGIN
142-
INSERT INTO @extschema@.cmd_log
143-
VALUES (DEFAULT, 'rebalance', ARRAY[relation])
144-
RETURNING id INTO c_id;
142+
cmd = 'rebalance';
143+
opts = ARRAY[relation::text];
145144

146-
RETURN c_id;
147-
END $$ LANGUAGE plpgsql;
145+
RETURN @extschema@.register_cmd(cmd, opts);
146+
END
147+
$$ LANGUAGE plpgsql;
148148

149149
-- Add replicas to partitions of table 'relation' until we reach replevel
150150
-- replicas for each one. Note that it is pointless to set replevel to more than
151151
-- number of active workers - 1. Replica deletions is not implemented yet.
152152
CREATE FUNCTION set_replevel(relation text, replevel int) RETURNS int AS $$
153153
DECLARE
154-
c_id int;
154+
cmd text;
155+
opts text[];
155156
BEGIN
156-
INSERT INTO @extschema@.cmd_log
157-
VALUES (DEFAULT, 'set_replevel', ARRAY[relation, replevel::text])
158-
RETURNING id INTO c_id;
157+
cmd = 'set_replevel';
158+
opts = ARRAY[relation::text, replevel::text];
159+
160+
RETURN @extschema@.register_cmd(cmd, opts);
161+
END
162+
$$ LANGUAGE plpgsql STRICT;
159163

160-
RETURN c_id;
161-
END $$ LANGUAGE plpgsql STRICT;
162164

163165
-- Internal functions
164166

167+
168+
-- Register command cmd_type for execution on shardlord.
169+
CREATE FUNCTION register_cmd(cmd_type text, cmd_opts text[]) RETURNS int AS $$
170+
DECLARE
171+
cmd_id int;
172+
BEGIN
173+
IF NOT @extschema@.me_lord() THEN
174+
RETURN @extschema@.execute_on_lord_c(cmd_type, cmd_opts);
175+
END IF;
176+
177+
INSERT INTO @extschema@.cmd_log
178+
VALUES (DEFAULT, cmd_type, cmd_opts)
179+
RETURNING id INTO cmd_id;
180+
181+
NOTIFY shardman_cmd_log_update; -- Notify bgw about the job
182+
183+
RETURN cmd_id;
184+
END
185+
$$ LANGUAGE plpgsql STRICT;
186+
165187
-- Called on shardlord bgw start. Add itself to nodes table, set id, create
166188
-- publication.
167189
CREATE FUNCTION lord_boot() RETURNS void AS $$
@@ -194,7 +216,8 @@ BEGIN
194216
IF init_lord THEN
195217
-- TODO: set up lr channels
196218
END IF;
197-
END $$ LANGUAGE plpgsql;
219+
END
220+
$$ LANGUAGE plpgsql;
198221

199222
-- These tables will be replicated to worker nodes, notifying them about changes.
200223
-- Called on lord.
@@ -255,15 +278,17 @@ CREATE FUNCTION terminate_repslot_walsender(slot_name text) RETURNS void AS $$
255278
BEGIN
256279
EXECUTE format('SELECT pg_terminate_backend(active_pid) FROM
257280
pg_replication_slots WHERE slot_name = %L', slot_name);
258-
END $$ LANGUAGE plpgsql STRICT;
281+
END
282+
$$ LANGUAGE plpgsql STRICT;
259283

260284
-- Drop with fire repslot and publication with the same name. Useful for 1-to-1
261285
-- pub-sub.
262286
CREATE FUNCTION drop_repslot_and_pub(pub name) RETURNS void AS $$
263287
BEGIN
264288
EXECUTE format('DROP PUBLICATION IF EXISTS %I', pub);
265289
PERFORM shardman.drop_repslot(pub, true);
266-
END $$ LANGUAGE plpgsql STRICT;
290+
END
291+
$$ LANGUAGE plpgsql STRICT;
267292

268293
-- If sub exists, disable it, detach repslot from it and possibly drop. We
269294
-- manage repslots ourselves, so it is essential to detach rs before dropping
@@ -283,7 +308,8 @@ BEGIN
283308
EXECUTE format('DROP SUBSCRIPTION %I', subname);
284309
END IF;
285310
END IF;
286-
END $$ LANGUAGE plpgsql STRICT;
311+
END
312+
$$ LANGUAGE plpgsql STRICT;
287313

288314
-- Remove all our logical replication stuff in case of drop extension.
289315
-- Dropping extension cleanup is not that easy:
@@ -337,3 +363,7 @@ CREATE EVENT TRIGGER cleanup_lr_trigger ON ddl_command_start
337363

338364
CREATE FUNCTION alter_system_c(opt text, val text) RETURNS void
339365
AS 'pg_shardman' LANGUAGE C STRICT;
366+
367+
-- Ask shardlord to perform a command if we're but a worker node.
368+
CREATE FUNCTION execute_on_lord_c(cmd_type text, cmd_opts text[]) RETURNS text
369+
AS 'pg_shardman' LANGUAGE C STRICT;

membership.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,11 @@ BEGIN
7272
UPDATE shardman.local_meta SET v = my_id WHERE k = 'my_id';
7373
END $$ LANGUAGE plpgsql STRICT;
7474

75-
-- This node is shardlord?
75+
-- Is this node a shardlord?
7676
CREATE FUNCTION me_lord() RETURNS bool AS $$
7777
BEGIN
78-
RETURN shardlord FROM shardman.nodes WHERE id = shardman.my_id();
78+
-- We'd like to get rid of local_meta in favor of GUCs.
79+
RETURN setting::bool FROM pg_settings WHERE name = '@extschema@.shardlord';
7980
END $$ LANGUAGE plpgsql STRICT;
8081

8182
-- This node is worker node?

readme.txt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ shardlord or not.
7070

7171
Currently extension scheme is fixed, it is, who would have thought, 'shardman'.
7272

73-
Now you can issue commands to the shardlord. All shardman commands (cmds) you
73+
Now you can issue commands to any node. If node is a simple worker, it will
74+
immediately send the command to the shardlord. All shardman commands (cmds) you
7475
issue return immediately because they technically just submit the cmd to the
7576
shardlord; he learns about them and starts the actual execution. At any time you
7677
can cancel currently executing command, just send SIGUSR1 to the shardlord. This
@@ -91,8 +92,7 @@ difference between 'success' and 'done'. We set the latter when the command is
9192
not atomic itself, but consists of several atomic steps, some of which were
9293
probably executed successfully and some failed.
9394

94-
Currently cmd_log can be seen and commands issued only on the shardlord, but
95-
that's easy to change.
95+
Currently cmd_log can be seen only on the shardlord, but that's going to change.
9696

9797
Let's get to the actual commands, which are implemented as functions in
9898
the extension's schema.
@@ -163,21 +163,21 @@ CREATE TABLE partitions (
163163
PRIMARY KEY (part_name, owner)
164164
);
165165

166-
move_part(part_name text, dest int, src int DEFAULT NULL)
167-
Move shard 'part_name' from node 'src' to node 'dest'. If src is NULL, primary
168-
shard is moved. Cmd fails if there is already copy of this shard on 'dest'.
166+
move_part(part_name text, dst int, src int DEFAULT NULL)
167+
Move shard 'part_name' from node 'src' to node 'dst'. If src is NULL, primary
168+
shard is moved. Cmd fails if there is already copy of this shard on 'dst'.
169169

170170
rebalance(relation text)
171171
Evenly distribute all partitions including replicas of table 'relation' across
172172
all nodes. Currently this is pretty dumb function, it just tries to move each
173173
shard once to node choosen in round-robin manner, completely ignoring current
174-
distribution. Since dest node can already have replica of this partition, it is
174+
distribution. Since dst node can already have replica of this partition, it is
175175
not uncommon to see warnings about failed moves during execution. After
176176
completion cmd status is 'done', not 'success'.
177177

178-
create_replica(part_name text, dest int)
179-
Create replica of shard 'part_name' on node 'dest'. Cmd fails if there is already
180-
replica of this shard on 'dest'.
178+
create_replica(part_name text, dst int)
179+
Create replica of shard 'part_name' on node 'dst'. Cmd fails if there is already
180+
replica of this shard on 'dst'.
181181

182182
set_replevel(relation text, replevel int)
183183
Add replicas to shards of sharded table 'relation' until we reach replevel

src/copypart.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
189189
else
190190
{
191191
mps->cp.type = COPYPARTTASK_MOVE_REPLICA;
192-
mps->prev_connstr = get_worker_node_connstr(mps->prev_node);
192+
mps->prev_connstr = get_node_connstr(mps->prev_node, SNT_WORKER);
193193
}
194194
}
195195
mps->cp.dst_node = dst_node;
@@ -206,7 +206,7 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
206206
* This part has replica, so after moving part we have to
207207
* reconfigure LR channel properly.
208208
*/
209-
mps->next_connstr = get_worker_node_connstr(mps->next_node);
209+
mps->next_connstr = get_node_connstr(mps->next_node, SNT_WORKER);
210210
}
211211

212212
mps->cp.update_metadata_sql = psprintf(
@@ -323,9 +323,9 @@ init_cp_state(CopyPartState *cps)
323323
cps->fd_to_epoll = -1;
324324
cps->fd_in_epoll_set = -1;
325325

326-
cps->src_connstr = get_worker_node_connstr(cps->src_node);
326+
cps->src_connstr = get_node_connstr(cps->src_node, SNT_WORKER);
327327
Assert(cps->src_connstr != NULL);
328-
cps->dst_connstr = get_worker_node_connstr(cps->dst_node);
328+
cps->dst_connstr = get_node_connstr(cps->dst_node, SNT_WORKER);
329329
Assert(cps->dst_connstr != NULL);
330330

331331
/* constant strings */

0 commit comments

Comments
 (0)