Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7a7811e

Browse files
committed
Skeleton for move_mpart
1 parent f6d6e71 commit 7a7811e

File tree

8 files changed

+314
-6
lines changed

8 files changed

+314
-6
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ EXTENSION = pg_shardman # the extension name
33
DATA = pg_shardman--0.0.1.sql
44

55
MODULE_big = pg_shardman
6-
OBJS = src/pg_shardman.o src/udf.o src/shard.o
6+
OBJS = src/pg_shardman.o src/udf.o src/shard.o src/timeutils.o
77

88
PG_CPPFLAGS += -Isrc/include
99

pg_shardman--0.0.1.sql

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ DECLARE
111111
fdw_part_name text;
112112
BEGIN
113113
IF NEW.owner != (SELECT shardman.get_node_id()) THEN
114-
raise info 'creating foreign table';
115114
SELECT nodes.connstring FROM shardman.nodes WHERE id = NEW.owner
116115
INTO connstring;
117116
EXECUTE format('DROP SERVER IF EXISTS %I CASCADE;', NEW.part_name);
@@ -208,7 +207,8 @@ CREATE TABLE local_meta (
208207
INSERT INTO @extschema@.local_meta VALUES ('node_id', NULL);
209208

210209
-- available commands
211-
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions');
210+
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions',
211+
'move_mpart');
212212
-- command status
213213
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress', 'success');
214214

@@ -466,6 +466,21 @@ BEGIN
466466
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, partitions_count);
467467
END
468468
$$ LANGUAGE plpgsql;
469+
470+
-- Move master partition to another node. Params:
471+
-- 'part_name' is name of the partition to move
472+
-- 'dest' is id of the destination node
473+
CREATE FUNCTION move_mpart(part_name text, dest int) RETURNS int AS $$
474+
DECLARE
475+
c_id int;
476+
BEGIN
477+
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'move_mpart')
478+
RETURNING id INTO c_id;
479+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, part_name);
480+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, dest);
481+
RETURN c_id;
482+
END $$ LANGUAGE plpgsql;
483+
469484
-- Otherwise partitioned tables on worker nodes not will be dropped properly,
470485
-- see pathman's docs.
471486
ALTER EVENT TRIGGER pathman_ddl_trigger ENABLE ALWAYS;

postgresql.conf.common.template

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
shared_preload_libraries = 'pg_pathman, pg_shardman'
22

3-
log_min_messages = INFO
3+
# just to suppress logging
4+
autovacuum = off
5+
6+
log_min_messages = DEBUG1
47
# client_min_messages = NOTICE
58
client_min_messages = INFO
69

src/include/pg_shardman.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,6 @@ extern uint64 void_spi(char *sql);
4242
extern void update_cmd_status(int64 id, const char *new_status);
4343
extern void cmd_canceled(Cmd *cmd);
4444
extern char *get_worker_node_connstring(int node_id);
45+
extern int32 get_partition_owner(const char *part_name);
4546

4647
#endif /* PG_SHARDMAN_H */

src/include/timeutils.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#ifndef TIMEUTILS_H
2+
#define TIMEUTILS_H
3+
4+
#include <time.h>
5+
6+
extern int timespeccmp(struct timespec t1, struct timespec t2);
7+
extern struct timespec timespec_add_millis(struct timespec t, long millis);
8+
extern int timespec_diff_millis(struct timespec t1, struct timespec t2);
9+
10+
#endif /* TIMEUTILS_H */

src/pg_shardman.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,3 +714,39 @@ get_worker_node_connstring(int node_id)
714714
SPI_EPILOG;
715715
return res;
716716
}
717+
718+
/*
719+
* Get node id on which given partition is stored. -1 is returned if there is
720+
* no such partition.
721+
*/
722+
int32
723+
get_partition_owner(const char *part_name)
724+
{
725+
char *sql;
726+
bool isnull;
727+
int owner;
728+
729+
SPI_PROLOG;
730+
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
731+
"select owner from shardman.partitions where part_name = '%s';",
732+
part_name);
733+
734+
if (SPI_execute(sql, true, 0) < 0)
735+
{
736+
shmn_elog(FATAL, "Stmt failed : %s", sql);
737+
}
738+
739+
if (SPI_processed == 0)
740+
{
741+
owner = -1;
742+
}
743+
else
744+
{
745+
owner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
746+
SPI_tuptable->tupdesc,
747+
1, &isnull));
748+
}
749+
750+
SPI_EPILOG;
751+
return owner;
752+
}

src/shard.c

Lines changed: 192 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,56 @@
77
*/
88
#include "postgres.h"
99
#include "libpq-fe.h"
10+
#include "lib/ilist.h"
11+
12+
#include <time.h>
13+
#include <limits.h>
14+
#include <sys/epoll.h>
1015

1116
#include "shard.h"
17+
#include "timeutils.h"
18+
19+
typedef enum
20+
{
21+
MOVEMPART_IN_PROGRESS,
22+
MOVEMPART_FAILED,
23+
MOVEMPART_SUCCESS
24+
} MoveMPartResult;
25+
26+
/* result of one iteration of processing */
27+
typedef enum
28+
{
29+
EXECMOVEMPART_EPOLL, /* add me to epoll on epolled_fd on EPOLLIN */
30+
EXECMOVEMPART_WAKEMEUP, /* wake me up again on waketm */
31+
EXECMOVEMPART_DONE /* the work is done, never invoke me again */
32+
} ExecMoveMPartRes;
33+
34+
typedef struct
35+
{
36+
const char *part_name; /* partition name */
37+
int32 src_node; /* node we are moving partition from */
38+
int32 dst_node; /* node we are moving partition to */
39+
char *src_connstr;
40+
char *dst_connstr;
41+
struct timespec waketm; /* wake me up at waketm to do the job */
42+
/* We need to epoll only on socket with dst to wait for copy */
43+
/* exec_move_mpart sets fd here when it wants to be wakened by epoll */
44+
int fd_to_epoll;
45+
int fd_in_epoll_set; /* socket *currently* in epoll set. -1 of none */
46+
MoveMPartResult result;
47+
} MoveMPartState;
48+
49+
typedef struct
50+
{
51+
slist_node list_node;
52+
MoveMPartState *mmps;
53+
} MoveMPartStateNode;
54+
55+
static void init_mmp_state(MoveMPartState *mmps, const char *part_name,
56+
int32 dst_node);
57+
static void move_mparts(MoveMPartState *mmpss, int nparts);
58+
static int calc_timeout(slist_head *timeout_states);
59+
static ExecMoveMPartRes exec_move_mpart(MoveMPartState *mmps);
1260

1361
/*
1462
* Steps are:
@@ -124,13 +172,14 @@ create_hash_partitions(Cmd *cmd)
124172
cmd_canceled(cmd);
125173
}
126174

127-
128175
/*
129176
* Move master partition to specified node. We
130177
* - Disable subscription on destination, otherwise we can't drop rep slot on
131178
source.
132179
* - Idempotently create publication and repl slot on source.
133-
* - Idempotently create table and subscription on destination.
180+
* - Idempotently create table and async subscription on destination.
181+
* We use async subscription, because sync would block table while copy is
182+
* in progress. But with async, we have to lock the table after initial sync.
134183
* - Now inital copy has started, remember that at least in ram to retry
135184
* from this point if network fails.
136185
* - Sleep & check in connection to the dest waiting for completion of the
@@ -145,9 +194,150 @@ create_hash_partitions(Cmd *cmd)
145194
* If we don't save progress (whether initial sync started or done, lsn,
146195
* etc), we have to start everything from the ground if master reboots. This
147196
* is arguably fine.
197+
*
148198
*/
149199
void
150200
move_mpart(Cmd *cmd)
201+
{
202+
char *part_name = cmd->opts[0];
203+
int32 dst_node = atoi(cmd->opts[1]);
204+
205+
MoveMPartState *mmps = palloc(sizeof(MoveMPartState));
206+
init_mmp_state(mmps, part_name, dst_node);
207+
208+
move_mparts(mmps, 1);
209+
update_cmd_status(cmd->id, "success");
210+
}
211+
212+
213+
/*
214+
* Fill MoveMPartState, retrieving needed data. If something goes wrong, we
215+
* don't bother to fill the rest of fields.
216+
*/
217+
void
218+
init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
219+
{
220+
int e;
221+
222+
mmps->part_name = part_name;
223+
if ((mmps->src_node = get_partition_owner(part_name)) == -1)
224+
{
225+
shmn_elog(WARNING, "Partition %s doesn't exist, not moving it",
226+
part_name);
227+
mmps->result = MOVEMPART_FAILED;
228+
return;
229+
}
230+
mmps->dst_node = dst_node;
231+
232+
/* src_connstr is surely not NULL since src_node is referenced by
233+
part_name */
234+
mmps->src_connstr = get_worker_node_connstr(mmps->src_node);
235+
mmps->dst_connstr = get_worker_node_connstr(mmps->dst_node);
236+
if (mmps->dst_connstr == NULL)
237+
{
238+
shmn_elog(WARNING, "Node %d doesn't exist, not moving %s to it",
239+
mmps->dst_node, part_name);
240+
mmps->result = MOVEMPART_FAILED;
241+
return;
242+
}
243+
244+
/* Task is ready to be processed right now */
245+
if ((e = clock_gettime(CLOCK_MONOTONIC, &mmps->waketm)) == -1)
246+
{
247+
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
248+
}
249+
mmps->fd_in_epoll_set = -1;
250+
251+
mmps->result = MOVEMPART_IN_PROGRESS;
252+
}
253+
254+
/*
255+
* Move partitions as specified in move_mpart_states list
256+
*/
257+
void
258+
move_mparts(MoveMPartState *mmpss, int nparts)
259+
{
260+
/* list of sleeping mmp states we need to wake after specified timeout */
261+
slist_head timeout_states = SLIST_STATIC_INIT(timeout_states);
262+
slist_iter iter;
263+
264+
int timeout; /* at least one task will be ready after timeout millis */
265+
int unfinished_moves = 0; /* number of not yet failed or succeeded tasks */
266+
int i;
267+
int e;
268+
int epfd;
269+
270+
for (i = 0; i < nparts; i++)
271+
{
272+
if (mmpss[i].result != MOVEMPART_FAILED)
273+
{
274+
/* In the beginning, all tasks are ready immediately */
275+
MoveMPartStateNode *mmps_node = palloc(sizeof(MoveMPartStateNode));
276+
elog(DEBUG4, "Adding task %s to timeout list", mmpss[i].part_name);
277+
mmps_node->mmps = &mmpss[i];
278+
slist_push_head(&timeout_states, &mmps_node->list_node);
279+
unfinished_moves++;
280+
}
281+
}
282+
283+
if ((epfd = epoll_create1(0)) == -1)
284+
{
285+
shmn_elog(FATAL, "epoll_create1 failed");
286+
}
287+
288+
while (unfinished_moves > 0)
289+
{
290+
timeout = calc_timeout(&timeout_states);
291+
unfinished_moves--;
292+
}
293+
}
294+
295+
/* Calculate when we need to wake if no epoll events are happening */
296+
int
297+
calc_timeout(slist_head *timeout_states)
298+
{
299+
slist_iter iter;
300+
struct timespec curtm;
301+
int e;
302+
int timeout = -1; /* If no tasks wait for us, don't wake */
303+
304+
slist_foreach(iter, timeout_states)
305+
{
306+
MoveMPartStateNode *mmps_node =
307+
slist_container(MoveMPartStateNode, list_node, iter.cur);
308+
MoveMPartState *mmps = mmps_node->mmps;
309+
shmn_elog(DEBUG1, "Peeking into %s task wake time", mmps->part_name);
310+
if ((e = clock_gettime(CLOCK_MONOTONIC, &curtm)) == -1)
311+
{
312+
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
313+
}
314+
if (timespeccmp(curtm, mmps->waketm) >= 0)
315+
{
316+
shmn_elog(DEBUG1, "Task %s is already ready", mmps->part_name);
317+
timeout = 0;
318+
return timeout;
319+
}
320+
else
321+
{
322+
int diff = Max(0, timespec_diff_millis(mmps->waketm, curtm));
323+
if (timeout == -1)
324+
timeout = diff;
325+
else
326+
timeout = Min(timeout, diff);
327+
shmn_elog(DEBUG1, "Timeout set to %d due to task %s ",
328+
timeout, mmps->part_name);
329+
}
330+
}
331+
332+
return timeout;
333+
}
334+
335+
/*
336+
* Actually run MoveMPart state machine. Return value says when (if ever)
337+
* we want to be executed again.
338+
*/
339+
ExecMoveMPartRes
340+
exec_move_mpart(MoveMPartState *mmps)
151341
{
152342

153343
}

src/timeutils.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#include "timeutils.h"
2+
3+
#define MILLION 1000000L
4+
#define BILLION 1000000000L
5+
6+
/*
7+
* Like strcmp, but for timespec. Returns -1 if t1 < t2.
8+
*/
9+
int
10+
timespeccmp(struct timespec t1, struct timespec t2)
11+
{
12+
if (t1.tv_sec == t2.tv_sec)
13+
{
14+
if (t1.tv_nsec < t2.tv_nsec)
15+
return -1;
16+
else if (t1.tv_nsec > t2.tv_nsec)
17+
return 1;
18+
return 0;
19+
}
20+
else if (t1.tv_sec < t2.tv_sec)
21+
return -1;
22+
return 1;
23+
}
24+
25+
/*
26+
* Add milliseconds to timespec
27+
*/
28+
struct timespec
29+
timespec_add_millis(struct timespec t, long millis)
30+
{
31+
time_t tv_sec;
32+
long tv_nsec;
33+
34+
tv_sec = t.tv_sec + millis / 1000;
35+
tv_nsec = t.tv_nsec + (millis % 1000) * MILLION;
36+
if (tv_nsec >= BILLION) {
37+
tv_nsec -= BILLION;
38+
tv_sec++;
39+
}
40+
41+
return (struct timespec) { .tv_sec = tv_sec, .tv_nsec = tv_nsec };
42+
}
43+
44+
/*
45+
* Get t1 - t2 difference in milliseconds. Not reliable if time_t is unsigned
46+
*/
47+
int
48+
timespec_diff_millis(struct timespec t1, struct timespec t2)
49+
{
50+
int sec_diff = t1.tv_sec - t2.tv_sec;
51+
long nsec_diff = t1.tv_nsec - t2.tv_nsec;
52+
return sec_diff * 1000 + nsec_diff / 1000;
53+
}

0 commit comments

Comments
 (0)