Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit aa9e56e

Browse files
committed
create_hash_partitions moved to new file.
Also argument of wait_notify removed since it is global var anyway.
1 parent bb4f8c2 commit aa9e56e

File tree

5 files changed

+203
-140
lines changed

5 files changed

+203
-140
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ EXTENSION = pg_shardman # the extension name
33
DATA = pg_shardman--0.0.1.sql
44

55
MODULE_big = pg_shardman
6-
OBJS = src/pg_shardman.o src/udf.o
6+
OBJS = src/pg_shardman.o src/udf.o src/shard.o
77

88
PG_CPPFLAGS += -Isrc/include
99

src/include/pg_shardman.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef PG_SHARDMAN_H
22
#define PG_SHARDMAN_H
33

4+
#include <signal.h>
5+
46
#define shmn_elog(level,fmt,...) elog(level, "[SHARDMAN] " fmt, ## __VA_ARGS__)
57

68
#define SPI_PROLOG do { \
@@ -15,7 +17,30 @@
1517
CommitTransactionCommand(); \
1618
} while (0);
1719

20+
/* flags set by signal handlers */
21+
extern volatile sig_atomic_t got_sigterm;
22+
extern volatile sig_atomic_t got_sigusr1;
23+
24+
/* GUC variables */
25+
extern bool shardman_master;
26+
extern char *shardman_master_dbname;
27+
extern char *shardman_master_connstring;
28+
extern int shardman_cmd_retry_naptime;
29+
30+
typedef struct Cmd
31+
{
32+
int64 id;
33+
char *cmd_type;
34+
char *status;
35+
char **opts; /* array of n options, opts[n] is NULL */
36+
} Cmd;
37+
1838
extern void _PG_init(void);
1939
extern void shardmaster_main(Datum main_arg);
40+
extern void check_for_sigterm(void);
41+
extern uint64 void_spi(char *sql);
42+
extern void update_cmd_status(int64 id, const char *new_status);
43+
extern void cmd_canceled(Cmd *cmd);
44+
extern char *get_worker_node_connstring(int node_id);
2045

2146
#endif /* PG_SHARDMAN_H */

src/include/shard.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#ifndef SHARD_H
2+
#define SHARD_H
3+
4+
#include "pg_shardman.h"
5+
6+
extern void create_hash_partitions(Cmd *cmd);
7+
extern void move_mpart(Cmd *cmd);
8+
9+
#endif /* SHARD_H */

src/pg_shardman.c

Lines changed: 15 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/* -------------------------------------------------------------------------
22
*
33
* shardmaster.c
4-
* Background worker executing sharding tasks.
4+
* Background worker accepting sharding tasks for execution and common
5+
* routines.
56
*
67
* -------------------------------------------------------------------------
78
*/
@@ -27,48 +28,34 @@
2728
#include "libpq-fe.h"
2829

2930
#include "pg_shardman.h"
31+
#include "shard.h"
3032

3133

3234
/* ensure that extension won't load against incompatible version of Postgres */
3335
PG_MODULE_MAGIC;
3436

35-
typedef struct Cmd
36-
{
37-
int64 id;
38-
char *cmd_type;
39-
char *status;
40-
char **opts; /* array of n options, opts[n] is NULL */
41-
} Cmd;
42-
4337
static Cmd *next_cmd(void);
44-
static void update_cmd_status(int64 id, const char *new_status);
4538
static PGconn *listen_cmd_log_inserts(void);
46-
static uint64 void_spi(char *sql);
47-
static void wait_notify(PGconn *conn);
39+
static void wait_notify(void);
4840
static void shardmaster_sigterm(SIGNAL_ARGS);
4941
static void shardmaster_sigusr1(SIGNAL_ARGS);
50-
static void check_for_sigterm(void);
5142
static void pg_shardman_installed_local(void);
52-
static void cmd_canceled(Cmd *cmd);
53-
static char *get_worker_node_connstring(int node_id);
5443

5544
static void add_node(Cmd *cmd);
5645
static int insert_node(const char *connstring, int64 cmd_id);
5746
static bool node_in_cluster(int id);
5847

5948
static void rm_node(Cmd *cmd);
6049

61-
static void create_hash_partitions(Cmd *cmd);
62-
6350
/* flags set by signal handlers */
64-
static volatile sig_atomic_t got_sigterm = false;
65-
static volatile sig_atomic_t got_sigusr1 = false;
51+
volatile sig_atomic_t got_sigterm = false;
52+
volatile sig_atomic_t got_sigusr1 = false;
6653

6754
/* GUC variables */
68-
static bool shardman_master = false;
69-
static char *shardman_master_dbname = "postgres";
70-
static char *shardman_master_connstring = "";
71-
static int shardman_cmd_retry_naptime = 10000;
55+
bool shardman_master;
56+
char *shardman_master_dbname;
57+
char *shardman_master_connstring;
58+
int shardman_cmd_retry_naptime;
7259

7360
/* Just global vars. */
7461
/* Connection to local server for LISTEN notifications. Is is global for easy
@@ -168,8 +155,8 @@ shardmaster_main(Datum main_arg)
168155
/* Establish signal handlers before unblocking signals. */
169156
pqsignal(SIGTERM, shardmaster_sigterm);
170157
pqsignal(SIGUSR1, shardmaster_sigusr1);
171-
/* We're now ready to receive signals */
172-
BackgroundWorkerUnblockSignals();
158+
/* We're now ready to receive signals */
159+
BackgroundWorkerUnblockSignals();
173160

174161
void_spi("select shardman.master_boot();");
175162
conn = listen_cmd_log_inserts();
@@ -194,7 +181,7 @@ shardmaster_main(Datum main_arg)
194181
else
195182
shmn_elog(FATAL, "Unknown cmd type %s", cmd->cmd_type);
196183
}
197-
wait_notify(conn);
184+
wait_notify();
198185
check_for_sigterm();
199186
}
200187

@@ -250,7 +237,7 @@ listen_cmd_log_inserts(void)
250237
* no notifcations, we also return.
251238
*/
252239
void
253-
wait_notify(PGconn *conn)
240+
wait_notify()
254241
{
255242
int sock;
256243
fd_set input_mask;
@@ -270,6 +257,7 @@ wait_notify(PGconn *conn)
270257
shmn_elog(FATAL, "select() failed: %s", strerror(errno));
271258
}
272259

260+
/* TODO: what if connection broke? */
273261
PQconsumeInput(conn);
274262
/* eat all notifications at once */
275263
while ((notify = PQnotifies(conn)) != NULL)
@@ -688,118 +676,6 @@ rm_node(Cmd *cmd)
688676
}
689677

690678

691-
/*
692-
* Steps are:
693-
* - Ensure table is not partitioned already;
694-
* - Partition table and get sql to create it;
695-
* - Add records about new table and partitions;
696-
*/
697-
static void create_hash_partitions(Cmd *cmd)
698-
{
699-
int node_id = atoi(cmd->opts[0]);
700-
const char *relation = cmd->opts[1];
701-
const char *expr = cmd->opts[2];
702-
int partitions_count = atoi(cmd->opts[3]);
703-
char *connstring;
704-
PGconn *conn = NULL;
705-
PGresult *res = NULL;
706-
char *sql;
707-
uint64 table_exists;
708-
char *create_table_sql;
709-
710-
shmn_elog(INFO, "Sharding table %s on node %d", relation, node_id);
711-
712-
/* Check that table with such name is not already sharded */
713-
sql = psprintf(
714-
"select relation from shardman.tables where relation = '%s'",
715-
relation);
716-
table_exists = void_spi(sql);
717-
if (table_exists)
718-
{
719-
shmn_elog(WARNING, "table %s already sharded, won't partition it.",
720-
relation);
721-
update_cmd_status(cmd->id, "failed");
722-
return;
723-
}
724-
/* connstring mem freed with ctxt */
725-
if ((connstring = get_worker_node_connstring(node_id)) == NULL)
726-
{
727-
shmn_elog(WARNING, "create_hash_partitions failed, no such worker node: %d",
728-
node_id);
729-
update_cmd_status(cmd->id, "failed");
730-
return;
731-
}
732-
733-
/* Note that we have to run statements in separate transactions, otherwise
734-
* we have a deadlock between pathman and pg_dump */
735-
sql = psprintf(
736-
"begin; select create_hash_partitions('%s', '%s', %d); end;"
737-
"select shardman.gen_create_table_sql('%s', '%s');",
738-
relation, expr, partitions_count,
739-
relation, connstring);
740-
741-
/* Try to execute command indefinitely until it succeeded or canceled */
742-
while (!got_sigusr1 && !got_sigterm)
743-
{
744-
conn = PQconnectdb(connstring);
745-
if (PQstatus(conn) != CONNECTION_OK)
746-
{
747-
shmn_elog(NOTICE, "Connection to node failed: %s",
748-
PQerrorMessage(conn));
749-
goto attempt_failed;
750-
}
751-
752-
/* Partition table and get sql to create it */
753-
res = PQexec(conn, sql);
754-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
755-
{
756-
shmn_elog(NOTICE, "Failed to partition table and get sql to create it: %s",
757-
PQerrorMessage(conn));
758-
goto attempt_failed;
759-
}
760-
create_table_sql = PQgetvalue(res, 0, 0);
761-
762-
/* TODO: if master fails at this moment (which is extremely unlikely
763-
* though), after restart it will try to partition table again and
764-
* fail. We should check if the table is already partitioned and don't
765-
* do that again, except for, probably, the case when it was
766-
* partitioned by someone else.
767-
*/
768-
/*
769-
* Insert table to 'tables' table (no pun intended), insert partitions
770-
* and mark partitioning cmd as successfull
771-
*/
772-
sql = psprintf("insert into shardman.tables values"
773-
" ('%s', '%s', %d, $create_table$%s$create_table$, %d);"
774-
" update shardman.cmd_log set status = 'success'"
775-
" where id = %ld;",
776-
relation, expr, partitions_count, create_table_sql,
777-
node_id, cmd->id);
778-
void_spi(sql);
779-
pfree(sql);
780-
781-
PQclear(res); /* can't free any earlier, it stores sql */
782-
PQfinish(conn);
783-
784-
/* done */
785-
elog(INFO, "Table %s successfully partitioned", relation);
786-
return;
787-
788-
attempt_failed: /* clean resources, sleep, check sigusr1 and try again */
789-
if (res != NULL)
790-
PQclear(res);
791-
if (conn != NULL)
792-
PQfinish(conn);
793-
794-
shmn_elog(LOG, "Attempt to execute create_hash_partitions failed,"
795-
" sleeping and retrying");
796-
/* TODO: sleep using waitlatch? */
797-
pg_usleep(shardman_cmd_retry_naptime * 1000L);
798-
}
799-
check_for_sigterm();
800-
801-
cmd_canceled(cmd);
802-
}
803679

804680
/*
805681
* Get connstring of worker node with id node_id. Memory is palloc'ed.

0 commit comments

Comments
 (0)