Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7fa224b

Browse files
committed
replace table cmd_opts with a column in cmd_log
1 parent 1918e1e commit 7fa224b

File tree

2 files changed

+78
-64
lines changed

2 files changed

+78
-64
lines changed

init.sql

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress',
3434
CREATE TABLE cmd_log (
3535
id bigserial PRIMARY KEY,
3636
cmd_type cmd NOT NULL,
37+
cmd_opts TEXT[],
3738
status cmd_status DEFAULT 'waiting' NOT NULL
3839
);
3940

@@ -48,25 +49,17 @@ CREATE TRIGGER cmd_log_inserts
4849
AFTER INSERT ON cmd_log
4950
FOR EACH STATEMENT EXECUTE PROCEDURE notify_shardlord();
5051

51-
-- probably better to keep opts in an array field, but working with arrays from
52-
-- libpq is not very handy
53-
-- opts must be inserted sequentially, we order by them by id
54-
CREATE TABLE cmd_opts (
55-
id bigserial PRIMARY KEY,
56-
cmd_id bigint REFERENCES cmd_log(id),
57-
opt text
58-
);
59-
6052
-- Interface functions
6153

6254
-- Add a node. Its state will be reset, all shardman data lost.
6355
CREATE FUNCTION add_node(connstring text) RETURNS int AS $$
6456
DECLARE
6557
c_id int;
6658
BEGIN
67-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'add_node')
68-
RETURNING id INTO c_id;
69-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, connstring);
59+
INSERT INTO @extschema@.cmd_log
60+
VALUES (DEFAULT, 'add_node', ARRAY[connstring])
61+
RETURNING id INTO c_id;
62+
7063
RETURN c_id;
7164
END
7265
$$ LANGUAGE plpgsql;
@@ -76,12 +69,15 @@ CREATE FUNCTION rm_node(node_id int, force bool default false) RETURNS int AS $$
7669
DECLARE
7770
c_id int;
7871
BEGIN
79-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'rm_node')
80-
RETURNING id INTO c_id;
81-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id);
82-
IF force THEN
83-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, 'force');
84-
END IF;
72+
INSERT INTO @extschema@.cmd_log
73+
VALUES (DEFAULT,
74+
'rm_node',
75+
CASE force
76+
WHEN true THEN ARRAY[node_id::text, 'force']
77+
ELSE ARRAY[node_id::text]
78+
END)
79+
RETURNING id INTO c_id;
80+
8581
RETURN c_id;
8682
END
8783
$$ LANGUAGE plpgsql;
@@ -95,12 +91,12 @@ CREATE FUNCTION create_hash_partitions(
9591
DECLARE
9692
c_id int;
9793
BEGIN
98-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'create_hash_partitions')
99-
RETURNING id INTO c_id;
100-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id);
101-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation);
102-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, expr);
103-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, partitions_count);
94+
INSERT INTO @extschema@.cmd_log
95+
VALUES (DEFAULT,
96+
'create_hash_partitions',
97+
ARRAY[node_id::text, relation, expr, partitions_count::text])
98+
RETURNING id INTO c_id;
99+
104100
IF rebalance THEN
105101
PERFORM shardman.rebalance(relation);
106102
END IF;
@@ -117,11 +113,10 @@ CREATE FUNCTION move_part(part_name text, dest int, src int DEFAULT NULL)
117113
DECLARE
118114
c_id int;
119115
BEGIN
120-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'move_part')
121-
RETURNING id INTO c_id;
122-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, part_name);
123-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, src);
124-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, dest);
116+
INSERT INTO @extschema@.cmd_log
117+
VALUES (DEFAULT, 'move_part', ARRAY[part_name, src::text, dest::text])
118+
RETURNING id INTO c_id;
119+
125120
RETURN c_id;
126121
END $$ LANGUAGE plpgsql;
127122

@@ -132,10 +127,10 @@ CREATE FUNCTION create_replica(part_name text, dest int) RETURNS int AS $$
132127
DECLARE
133128
c_id int;
134129
BEGIN
135-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'create_replica')
136-
RETURNING id INTO c_id;
137-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, part_name);
138-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, dest);
130+
INSERT INTO @extschema@.cmd_log
131+
VALUES (DEFAULT, 'create_replica', ARRAY[part_name, dest::text])
132+
RETURNING id INTO c_id;
133+
139134
RETURN c_id;
140135
END $$ LANGUAGE plpgsql;
141136

@@ -144,9 +139,10 @@ CREATE FUNCTION rebalance(relation text) RETURNS int AS $$
144139
DECLARE
145140
c_id int;
146141
BEGIN
147-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'rebalance')
148-
RETURNING id INTO c_id;
149-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation);
142+
INSERT INTO @extschema@.cmd_log
143+
VALUES (DEFAULT, 'rebalance', ARRAY[relation])
144+
RETURNING id INTO c_id;
145+
150146
RETURN c_id;
151147
END $$ LANGUAGE plpgsql;
152148

@@ -157,10 +153,10 @@ CREATE FUNCTION set_replevel(relation text, replevel int) RETURNS int AS $$
157153
DECLARE
158154
c_id int;
159155
BEGIN
160-
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'set_replevel')
161-
RETURNING id INTO c_id;
162-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation);
163-
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, replevel);
156+
INSERT INTO @extschema@.cmd_log
157+
VALUES (DEFAULT, 'set_replevel', ARRAY[relation, replevel::text])
158+
RETURNING id INTO c_id;
159+
164160
RETURN c_id;
165161
END $$ LANGUAGE plpgsql STRICT;
166162

src/pg_shardman.c

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -367,56 +367,74 @@ wait_notify()
367367
Cmd *
368368
next_cmd(void)
369369
{
370-
const char *sql;
371-
Cmd *cmd = NULL;
372-
MemoryContext oldcxt = CurrentMemoryContext;
373-
int e;
370+
const char *sql;
371+
Cmd *cmd = NULL;
372+
MemoryContext oldcxt = CurrentMemoryContext,
373+
spicxt;
374+
int e;
374375

375376
SPI_PROLOG;
376377

377-
sql = "select * from shardman.cmd_log t1 join"
378-
" (select MIN(id) id from shardman.cmd_log where status = 'waiting' OR"
379-
" status = 'in progress') t2 using (id);";
378+
/* Get the oldest pending task (id + cmd_type) */
379+
sql = "select id, cmd_type from shardman.cmd_log"
380+
" where status in ('waiting', 'in progress')"
381+
" order by id asc"
382+
" limit 1;";
380383
e = SPI_execute(sql, true, 0);
381384
if (e < 0)
382385
shmn_elog(FATAL, "Stmt failed: %s", sql);
383386

384387
if (SPI_processed > 0)
385388
{
386-
HeapTuple tuple = SPI_tuptable->vals[0];
387-
TupleDesc rowdesc = SPI_tuptable->tupdesc;
388-
bool isnull;
389-
uint64 i;
389+
TupleDesc rowdesc;
390+
HeapTuple tuple;
391+
bool isnull;
392+
int cmd_id_attr,
393+
cmd_type_attr,
394+
opt_attr;
395+
uint64 i;
396+
397+
/* get tuple and its descriptor */
398+
rowdesc = SPI_tuptable->tupdesc;
399+
tuple = SPI_tuptable->vals[0];
400+
401+
/* get attribute numbers */
402+
cmd_id_attr = SPI_fnumber(rowdesc, "id");
403+
Assert(cmd_id_attr != SPI_ERROR_NOATTRIBUTE);
404+
cmd_type_attr = SPI_fnumber(rowdesc, "cmd_type");
405+
Assert(cmd_type_attr != SPI_ERROR_NOATTRIBUTE);
390406

391407
/* copy the command itself to callee context */
392-
MemoryContext spicxt = MemoryContextSwitchTo(oldcxt);
393-
cmd = palloc(sizeof(Cmd));
408+
spicxt = MemoryContextSwitchTo(oldcxt);
409+
cmd = palloc0(sizeof(Cmd));
394410
cmd->id = DatumGetInt64(SPI_getbinval(tuple, rowdesc,
395-
SPI_fnumber(rowdesc, "id"),
411+
cmd_id_attr,
396412
&isnull));
397-
cmd->cmd_type = SPI_getvalue(tuple, rowdesc,
398-
SPI_fnumber(rowdesc, "cmd_type"));
413+
cmd->cmd_type = SPI_getvalue(tuple, rowdesc, cmd_type_attr);
399414
MemoryContextSwitchTo(spicxt);
400415

401416
/* Now get options. sql will be freed by SPI_finish */
402-
sql = psprintf("select opt from shardman.cmd_opts where"
403-
" cmd_id = %ld order by id;", cmd->id);
417+
sql = psprintf("select unnest(cmd_opts) opt from shardman.cmd_log"
418+
" where id = " INT64_FORMAT, cmd->id);
404419
e = SPI_execute(sql, true, 0);
405420
if (e < 0)
406421
shmn_elog(FATAL, "Stmt failed: %s", sql);
407422

408423
MemoryContextSwitchTo(oldcxt);
409424
/* +1 for NULL in the end */
410-
cmd->opts = palloc((SPI_processed + 1) * sizeof(char*));
425+
cmd->opts = palloc0((SPI_processed + 1) * sizeof(char *));
411426
for (i = 0; i < SPI_processed; i++)
412427
{
413-
tuple = SPI_tuptable->vals[i];
428+
/* get tuple and its descriptor */
414429
rowdesc = SPI_tuptable->tupdesc;
415-
cmd->opts[i] = SPI_getvalue(tuple, rowdesc,
416-
SPI_fnumber(rowdesc, "opt"));
417-
}
418-
cmd->opts[i] = NULL;
430+
tuple = SPI_tuptable->vals[i];
419431

432+
/* get attribute numbers */
433+
opt_attr = SPI_fnumber(rowdesc, "opt");
434+
Assert(opt_attr != SPI_ERROR_NOATTRIBUTE);
435+
436+
cmd->opts[i] = SPI_getvalue(tuple, rowdesc, opt_attr);
437+
}
420438
MemoryContextSwitchTo(spicxt);
421439
}
422440

0 commit comments

Comments
 (0)