Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7e7064e

Browse files
committed
set_replevel with random node choosing
1 parent 8e307ee commit 7e7064e

File tree

6 files changed

+165
-8
lines changed

6 files changed

+165
-8
lines changed

init.sql

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ $$;
2424

2525
-- available commands
2626
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions',
27-
'move_part', 'create_replica', 'rebalance');
27+
'move_part', 'create_replica', 'rebalance',
28+
'set_replevel');
2829
-- command status
2930
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress',
3031
'success', 'done');
@@ -143,6 +144,20 @@ BEGIN
143144
RETURN c_id;
144145
END $$ LANGUAGE plpgsql;
145146

147+
-- Add replicas to partitions of table 'relation' until we reach replevel
148+
-- replicas for each one. Note that it is pointless to set replevel to more than
149+
-- number of active workers - 1. Replica deletions is not implemented yet.
150+
CREATE FUNCTION set_replevel(relation text, replevel int) RETURNS int AS $$
151+
DECLARE
152+
c_id int;
153+
BEGIN
154+
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'set_replevel')
155+
RETURNING id INTO c_id;
156+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation);
157+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, replevel);
158+
RETURN c_id;
159+
END $$ LANGUAGE plpgsql STRICT;
160+
146161
-- Internal functions
147162

148163
-- Called on shardmaster bgw start. Add itself to nodes table, set id, create

shard.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,18 @@ CREATE FUNCTION reconstruct_table_attrs(relation regclass)
631631
-- Other funcs
632632
------------------------------------------------------------
633633

634+
-- Drop (locally) all partitions of given table, if they exist
635+
CREATE OR REPLACE FUNCTION drop_parts(relation text, partitions_count int)
636+
RETURNS void as $$
637+
DECLARE
638+
r record;
639+
BEGIN
640+
FOR r IN SELECT part_name
641+
FROM shardman.gen_part_names(relation, partitions_count) LOOP
642+
EXECUTE format('DROP TABLE IF EXISTS %I;', r.part_name);
643+
END LOOP;
644+
END $$ LANGUAGE plpgsql STRICT;
645+
634646
-- generate one-column table with partition names as 'tablename'_'partnum''suffix'
635647
CREATE FUNCTION gen_part_names(relation text, partitions_count int,
636648
suffix text DEFAULT '')

src/include/pg_shardman.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ typedef struct Partition
4545
int32 owner;
4646
} Partition;
4747

48+
typedef struct RepCount
49+
{
50+
char *part_name;
51+
int64 count;
52+
} RepCount;
53+
4854
extern void _PG_init(void);
4955
extern void shardmaster_main(Datum main_arg);
5056
extern void check_for_sigterm(void);
@@ -59,5 +65,6 @@ extern int32 get_next_node(const char *part_name, int32 node_id);
5965
extern int32 get_prev_node(const char *part_name, int32 node_id, bool *part_exists);
6066
extern char *get_partition_relation(const char *part_name);
6167
extern Partition *get_parts(const char *relation, uint64 *num_parts);
68+
extern RepCount *get_repcount(const char *relation, uint64 *num_parts);
6269

6370
#endif /* PG_SHARDMAN_H */

src/include/shard.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ extern void create_hash_partitions(Cmd *cmd);
77
extern void move_part(Cmd *cmd);
88
extern void create_replica(Cmd *cmd);
99
extern void rebalance(Cmd *cmd);
10+
extern void set_replevel(Cmd *cmd);
1011

1112
#endif /* SHARD_H */

src/pg_shardman.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ shardmaster_main(Datum main_arg)
208208
create_replica(cmd);
209209
else if (strcmp(cmd->cmd_type, "rebalance") == 0)
210210
rebalance(cmd);
211+
else if (strcmp(cmd->cmd_type, "set_replevel") == 0)
212+
set_replevel(cmd);
211213
else
212214
shmn_elog(FATAL, "Unknown cmd type %s", cmd->cmd_type);
213215
}
@@ -997,3 +999,44 @@ get_parts(const char *relation, uint64 *num_parts)
997999
SPI_EPILOG;
9981000
return parts;
9991001
}
1002+
1003+
/*
1004+
* Calculate how many replicas has each partitions of given relation
1005+
*/
1006+
RepCount *
1007+
get_repcount(const char *relation, uint64 *num_parts)
1008+
{
1009+
char *sql;
1010+
bool isnull;
1011+
RepCount *repcounts;
1012+
TupleDesc rowdesc;
1013+
MemoryContext spicxt;
1014+
MemoryContext oldcxt = CurrentMemoryContext;
1015+
uint64 i;
1016+
1017+
SPI_PROLOG;
1018+
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
1019+
"select part_name, count(case when prv is not null then 1 end) from"
1020+
" shardman.partitions where relation = '%s' group by part_name;",
1021+
relation);
1022+
1023+
if (SPI_execute(sql, true, 0) < 0)
1024+
shmn_elog(FATAL, "Stmt failed : %s", sql);
1025+
rowdesc = SPI_tuptable->tupdesc;
1026+
1027+
*num_parts = SPI_processed;
1028+
/* We need to allocate in our ctxt, not spi's */
1029+
spicxt = MemoryContextSwitchTo(oldcxt);
1030+
repcounts = palloc(sizeof(RepCount) * (*num_parts));
1031+
for (i = 0; i < *num_parts; i++)
1032+
{
1033+
HeapTuple tuple = SPI_tuptable->vals[i];
1034+
repcounts[i].part_name = SPI_getvalue(tuple, rowdesc, 1);
1035+
repcounts[i].count = DatumGetInt64(SPI_getbinval(tuple, rowdesc, 2,
1036+
&isnull));
1037+
}
1038+
MemoryContextSwitchTo(spicxt);
1039+
1040+
SPI_EPILOG;
1041+
return repcounts;
1042+
}

src/shard.c

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
#include "postgres.h"
99

10+
#include <time.h>
11+
1012
#include "copypart.h"
1113
#include "pg_shardman.h"
1214
#include "shard.h"
@@ -61,8 +63,10 @@ create_hash_partitions(Cmd *cmd)
6163
* we have a deadlock between pathman and pg_dump. pfree'd with ctxt
6264
*/
6365
sql = psprintf(
64-
"begin; select create_hash_partitions('%s', '%s', %d); end;"
66+
"begin; select shardman.drop_parts('%s', '%d');"
67+
" select create_hash_partitions('%s', '%s', %d); end;"
6568
"select shardman.gen_create_table_sql('%s', '%s');",
69+
relation, partitions_count,
6670
relation, expr, partitions_count,
6771
relation, connstr);
6872

@@ -197,10 +201,9 @@ rebalance(Cmd *cmd)
197201
char *relation = cmd->opts[0];
198202
uint64 num_workers;
199203
int32 *workers = get_workers(&num_workers);
204+
uint64 worker_idx;
200205
uint64 num_parts;
201206
Partition *parts = get_parts(relation, &num_parts);
202-
uint64 i;
203-
uint64 worker_idx;
204207
uint64 part_idx;
205208
CopyPartState **tasks = palloc(sizeof(CopyPartState*) * num_parts);
206209

@@ -220,10 +223,6 @@ rebalance(Cmd *cmd)
220223
return;
221224
}
222225

223-
shmn_elog(DEBUG1, "parts of %s are:", relation);
224-
for (i = 0; i < num_parts; i++)
225-
shmn_elog(DEBUG1, "%s on %d", parts[i].part_name, parts[i].owner);
226-
227226
for (part_idx = 0, worker_idx = 0; part_idx < num_parts; part_idx++)
228227
{
229228
Partition part = parts[part_idx];
@@ -243,5 +242,85 @@ rebalance(Cmd *cmd)
243242
return;
244243
}
245244

245+
shmn_elog(INFO, "Relation %s rebalanced:", relation);
246246
update_cmd_status(cmd->id, "done");
247247
}
248+
249+
/*
250+
* Add replicas to parts of given relation until we reach replevel replicas
251+
* for each one. Worker nodes are choosen in random manner.
252+
*/
253+
void
254+
set_replevel(Cmd *cmd)
255+
{
256+
char *relation = cmd->opts[0];
257+
uint32 replevel = atoi(cmd->opts[1]);
258+
uint64 num_workers;
259+
int32 *workers = get_workers(&num_workers);
260+
uint64 num_parts;
261+
RepCount *repcounts = NULL;
262+
uint64 part_idx;
263+
CopyPartState **tasks = NULL;
264+
int ntasks;
265+
int i;
266+
267+
if (num_workers == 0)
268+
{
269+
elog(WARNING, "Set replevel on table %s failed: no active workers",
270+
relation);
271+
update_cmd_status(cmd->id, "failed");
272+
return;
273+
}
274+
275+
if (replevel > num_workers - 1)
276+
{
277+
elog(WARNING, "Set replevel on table %s: using replevel %ld instead of"
278+
"%d as we have only %ld active workers",
279+
relation, num_workers - 1, replevel, num_workers);
280+
replevel = num_workers - 1;
281+
}
282+
283+
srand(time(NULL));
284+
/*
285+
* We can add only one replica per part in one exec_tasks; loop until
286+
* required number of replicas are reached.
287+
*/
288+
while (1948)
289+
{
290+
repcounts = get_repcount(relation, &num_parts);
291+
tasks = palloc(sizeof(CopyPartState*) * num_parts);
292+
ntasks = 0;
293+
294+
for (part_idx = 0; part_idx < num_parts; part_idx++)
295+
{
296+
RepCount rc = repcounts[part_idx];
297+
if (rc.count < replevel)
298+
{
299+
CreateReplicaState *crs = palloc0(sizeof(CreateReplicaState));
300+
init_cr_state(crs, rc.part_name, workers[rand() % num_workers]);
301+
tasks[ntasks] = (CopyPartState *) crs;
302+
ntasks++;
303+
}
304+
}
305+
306+
if (ntasks == 0)
307+
break;
308+
309+
exec_tasks(tasks, ntasks);
310+
check_for_sigterm();
311+
if (got_sigusr1)
312+
{
313+
cmd_canceled(cmd);
314+
return;
315+
}
316+
317+
pfree(repcounts);
318+
for (i = 0; i < ntasks; i++)
319+
pfree(tasks[i]);
320+
pfree(tasks);
321+
}
322+
323+
shmn_elog(INFO, "Relation %s now has at least %d replicas", relation,
324+
replevel);
325+
update_cmd_status(cmd->id, "success");
326+
}

0 commit comments

Comments
 (0)