Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 45b8468

Browse files
committed
Added partition num, added func for getting current time.
1 parent cc1f5de commit 45b8468

File tree

4 files changed

+43
-31
lines changed

4 files changed

+43
-31
lines changed

shard.sql

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ CREATE TRIGGER new_table_worker_side AFTER INSERT ON shardman.tables
4545
FOR EACH ROW EXECUTE PROCEDURE new_table_worker_side();
4646
-- fire trigger only on worker nodes
4747
ALTER TABLE shardman.tables ENABLE REPLICA TRIGGER new_table_worker_side;
48-
-- On master side, insert partitions
48+
-- On master side, insert partitions.
49+
-- All of them are primary and have no prev or nxt.
4950
CREATE FUNCTION new_table_master_side() RETURNS TRIGGER AS $$
5051
BEGIN
5152
INSERT INTO shardman.partitions
52-
SELECT part_name, NEW.relation AS relation, NEW.initial_node AS owner
53+
SELECT part_name, 0, NULL, NULL, NEW.relation AS relation, NEW.initial_node AS owner
5354
FROM (SELECT part_name FROM shardman.gen_part_names(
5455
NEW.relation, NEW.partitions_count))
5556
AS partnames;
@@ -59,10 +60,21 @@ $$ LANGUAGE plpgsql;
5960
CREATE TRIGGER new_table_master_side AFTER INSERT ON shardman.tables
6061
FOR EACH ROW EXECUTE PROCEDURE new_table_master_side();
6162

63+
-- Primary shard and its replicas compose a doubly-linked list with 0 shard in
64+
-- the beginning.
6265
CREATE TABLE partitions (
63-
part_name text PRIMARY KEY,
66+
part_name text,
67+
-- Shard number. 0 means primary shard.
68+
num int NOT NULL,
69+
nxt int,
70+
prev int,
6471
relation text NOT NULL REFERENCES tables(relation),
65-
owner int REFERENCES nodes(id) -- node on which partition lies
72+
owner int REFERENCES nodes(id), -- node on which partition lies
73+
PRIMARY KEY (part_name, num),
74+
FOREIGN KEY (part_name, nxt) REFERENCES shardman.partitions(part_name, num),
75+
FOREIGN KEY (part_name, prev) REFERENCES shardman.partitions(part_name, num),
76+
-- primary has no prev, replica must have prev
77+
CONSTRAINT prev_existence CHECK (num = 0 OR prev IS NOT NULL)
6678
);
6779

6880
-- We use _fdw suffix for foreign tables to avoid interleaving with real

src/include/pg_shardman.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ extern uint64 void_spi(char *sql);
4343
extern void update_cmd_status(int64 id, const char *new_status);
4444
extern void cmd_canceled(Cmd *cmd);
4545
extern char *get_worker_node_connstr(int node_id);
46-
extern int32 get_partition_owner(const char *part_name);
46+
extern int32 get_primary_owner(const char *part_name);
4747
extern char *get_partition_relation(const char *part_name);
4848

4949
#endif /* PG_SHARDMAN_H */

src/pg_shardman.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -734,19 +734,19 @@ get_worker_node_connstr(int node_id)
734734
}
735735

736736
/*
737-
* Get node id on which given partition is stored. -1 is returned if there is
738-
* no such partition.
737+
* Get node id on which given primary is stored. -1 is returned if there is
738+
* no such primary.
739739
*/
740740
int32
741-
get_partition_owner(const char *part_name)
741+
get_primary_owner(const char *part_name)
742742
{
743743
char *sql;
744744
bool isnull;
745745
int owner;
746746

747747
SPI_PROLOG;
748748
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
749-
"select owner from shardman.partitions where part_name = '%s';",
749+
"select owner from shardman.partitions where part_name = '%s' and num = 0;",
750750
part_name);
751751

752752
if (SPI_execute(sql, true, 0) < 0)

src/shard.c

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ static void reset_pqconn(PGconn **conn);
223223
static void reset_pqconn_and_res(PGconn **conn, PGresult *res);
224224
static void configure_retry(CopyPartState *cpts, int millis);
225225
static struct timespec timespec_now_plus_millis(int millis);
226+
struct timespec timespec_now(void);
226227

227228
/*
228229
* Steps are:
@@ -377,10 +378,8 @@ move_primary(Cmd *cmd)
377378
void
378379
init_cp_state(CopyPartState *cps, const char *part_name, int32 dst_node)
379380
{
380-
int e;
381-
382381
cps->part_name = part_name;
383-
if ((cps->src_node = get_partition_owner(part_name)) == -1)
382+
if ((cps->src_node = get_primary_owner(part_name)) == -1)
384383
{
385384
shmn_elog(WARNING, "Partition %s doesn't exist, not moving it",
386385
part_name);
@@ -402,10 +401,7 @@ init_cp_state(CopyPartState *cps, const char *part_name, int32 dst_node)
402401
}
403402

404403
/* Task is ready to be processed right now */
405-
if ((e = clock_gettime(CLOCK_MONOTONIC, &cps->waketm)) == -1)
406-
{
407-
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
408-
}
404+
cps->waketm = timespec_now();
409405
cps->fd_to_epoll = -1;
410406
cps->fd_in_epoll_set = -1;
411407

@@ -496,7 +492,6 @@ exec_tasks(CopyPartState **tasks, int ntasks)
496492
slist_head timeout_states = SLIST_STATIC_INIT(timeout_states);
497493
slist_mutable_iter iter;
498494
/* at least one task will require our attention at waketm */
499-
struct timespec waketm;
500495
struct timespec curtm;
501496
int timeout;
502497
int unfinished_moves = 0; /* number of not yet failed or succeeded tasks */
@@ -505,12 +500,10 @@ exec_tasks(CopyPartState **tasks, int ntasks)
505500
int epfd;
506501
struct epoll_event evlist[MAX_EVENTS];
507502

508-
/* In the beginning, all tasks are ready for execution, so wake tm is right
509-
* is actually current time. We also need to put all tasks to the
510-
* timeout_states list to invoke them.
503+
/*
504+
* In the beginning, all tasks are ready for execution, so we need to put
505+
* all tasks to the timeout_states list to invoke them.
511506
*/
512-
if ((e = clock_gettime(CLOCK_MONOTONIC, &waketm)) == -1)
513-
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
514507
for (i = 0; i < ntasks; i++)
515508
{
516509
/* TODO: make sure one part is touched only by one task */
@@ -545,8 +538,7 @@ exec_tasks(CopyPartState **tasks, int ntasks)
545538
CopyPartStateNode *cps_node =
546539
slist_container(CopyPartStateNode, list_node, iter.cur);
547540
CopyPartState *cps = cps_node->cps;
548-
if ((e = clock_gettime(CLOCK_MONOTONIC, &curtm)) == -1)
549-
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
541+
curtm = timespec_now();
550542

551543
if (timespeccmp(cps->waketm, curtm) <= 0)
552544
{
@@ -602,7 +594,6 @@ int
602594
calc_timeout(slist_head *timeout_states)
603595
{
604596
slist_iter iter;
605-
int e;
606597
struct timespec curtm;
607598
int timeout;
608599
/* could use timespec field for this, but that's more readable */
@@ -633,8 +624,7 @@ calc_timeout(slist_head *timeout_states)
633624
if (!waketm_set)
634625
return -1;
635626

636-
if ((e = clock_gettime(CLOCK_MONOTONIC, &curtm)) == -1)
637-
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
627+
curtm = timespec_now();
638628
if (timespeccmp(waketm, curtm) <= 0)
639629
{
640630
shmn_elog(DEBUG1, "Non-negative timeout, waking immediately");
@@ -951,16 +941,26 @@ void configure_retry(CopyPartState *cps, int millis)
951941
}
952942

953943
/*
954-
* Get current time + given milliseconds. Fails with PG elog(FATAL) if gettime
955-
* failed. Not very generic, yes, but exactly what we need.
944+
* Get current CLOCK_MONOTONIC time. Fails with PG elog(FATAL) if gettime
945+
* failed.
956946
*/
957-
struct timespec timespec_now_plus_millis(int millis)
947+
struct timespec timespec_now(void)
958948
{
959-
struct timespec t;
960949
int e;
950+
struct timespec t;
961951

962952
if ((e = clock_gettime(CLOCK_MONOTONIC, &t)) == -1)
963953
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
964954

955+
return t;
956+
}
957+
958+
/*
959+
* Get current time + given milliseconds. Fails with PG elog(FATAL) if gettime
960+
* failed.
961+
*/
962+
struct timespec timespec_now_plus_millis(int millis)
963+
{
964+
struct timespec t = timespec_now();
965965
return timespec_add_millis(t, millis);
966966
}

0 commit comments

Comments
 (0)