Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1381ad6

Browse files
committed
Function for generating CREATE TABLE for existing table.
Also create_hash_partitions in progress.
1 parent 605a856 commit 1381ad6

File tree

7 files changed

+154
-56
lines changed

7 files changed

+154
-56
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ EXTENSION = pg_shardman # the extension name
33
DATA = pg_shardman--0.0.1.sql
44

55
MODULE_big = pg_shardman
6-
OBJS = src/pg_shardman.o src/pg_shardman_cleanup.o
6+
OBJS = src/pg_shardman.o src/udf.o
77

88
PG_CPPFLAGS += -Isrc/include
99

pg_shardman--0.0.1.sql

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ CREATE TABLE local_meta (
4949
INSERT INTO @extschema@.local_meta VALUES ('node_id', NULL);
5050

5151
-- available commands
52-
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node');
52+
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions');
5353
-- command status
5454
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress', 'success');
5555

@@ -213,6 +213,8 @@ CREATE FUNCTION set_node_id(node_id int) RETURNS void AS $$
213213
UPDATE @extschema@.local_meta SET v = node_id WHERE k = 'node_id';
214214
$$ LANGUAGE sql;
215215

216+
CREATE FUNCTION gen_create_table_sql(relation text) RETURNS text
217+
AS 'pg_shardman' LANGUAGE C;
216218

217219
-- Interface functions
218220

@@ -237,3 +239,20 @@ BEGIN
237239
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id);
238240
END
239241
$$ LANGUAGE plpgsql;
242+
243+
-- Shard table with hash partitions. Params as in pathman, except for relation
244+
-- (master doesn't know oid of the table)
245+
CREATE FUNCTION create_hash_partitions(
246+
node_id int, expr text, relation text, partitions_count int)
247+
RETURNS void AS $$
248+
DECLARE
249+
c_id int;
250+
BEGIN
251+
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'create_hash_partitions')
252+
RETURNING id INTO c_id;
253+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id);
254+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, expr);
255+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation);
256+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, partitions_count);
257+
END
258+
$$ LANGUAGE plpgsql;

postgresql.conf.master.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
log_min_messages = INFO
2-
client_min_messages = LOG
2+
client_min_messages = NOTICE
33

44
# for logical repl
55
wal_level = logical

postgresql.conf.worker.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
shared_preload_libraries = 'pg_pathman, pg_shardman'
22

33
log_min_messages = INFO
4-
client_min_messages = LOG
4+
client_min_messages = NOTICE
55

66
wal_level = logical
77

src/pg_shardman.c

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ static int insert_node(const char *connstring, int64 cmd_id);
5555
static bool node_in_cluster(int id);
5656

5757
static void rm_node(Cmd *cmd);
58-
static bool is_node_active(int node_id);
58+
59+
static void create_hash_partitions(Cmd *cmd);
5960

6061
/* flags set by signal handlers */
6162
static volatile sig_atomic_t got_sigterm = false;
@@ -444,7 +445,7 @@ add_node(Cmd *cmd)
444445
int node_id;
445446
char *sql;
446447

447-
shmn_elog(LOG, "Adding node %s", connstring);
448+
shmn_elog(INFO, "Adding node %s", connstring);
448449
/* Try to execute command indefinitely until it succeeded or canceled */
449450
while (!got_sigusr1 && !got_sigterm)
450451
{
@@ -638,6 +639,7 @@ rm_node(Cmd *cmd)
638639
int node_id = atoi(cmd->opts[0]);
639640
char *sql;
640641

642+
elog(INFO, "Removing node %d ", node_id);
641643
if (!node_in_cluster(node_id))
642644
{
643645
shmn_elog(WARNING, "node %d not in cluster, won't rm it.", node_id);
@@ -672,3 +674,21 @@ rm_node(Cmd *cmd)
672674
pfree(sql);
673675
elog(INFO, "Node %d successfully removed", node_id);
674676
}
677+
678+
679+
/*
680+
* Partition table and get sql to create it;
681+
* Add records about new table and partitions
682+
*/
683+
static void create_hash_partitions(Cmd *cmd)
684+
{
685+
int node_id = atoi(cmd->opts[0]);
686+
const char *expr = cmd->opts[1];
687+
const char *relation = cmd->opts[2];
688+
const char *partitions_count = atoi(cmd->opts[3]);
689+
PGconn *conn = NULL;
690+
PGresult *res = NULL;
691+
char *sql;
692+
693+
shmn_elog(INFO, "Sharding table %s on node %d", relation, node_id);
694+
}

src/pg_shardman_cleanup.c

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/udf.c

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#include "postgres.h"
2+
#include "commands/event_trigger.h"
3+
#include "executor/spi.h"
4+
#include "utils/builtins.h"
5+
#include "commands/dbcommands.h"
6+
#include "miscadmin.h"
7+
8+
/*
9+
* Must be called iff we are dropping extension. Checks that we are dropping
10+
* pg_shardman extension and calls pg_shardman_cleanup to perform the actual
11+
* cleanup.
12+
*/
13+
PG_FUNCTION_INFO_V1(pg_shardman_cleanup_c);
14+
Datum
15+
pg_shardman_cleanup_c(PG_FUNCTION_ARGS)
16+
{
17+
EventTriggerData *trigdata;
18+
DropStmt *stmt;
19+
Value *value;
20+
char *ext_name;
21+
22+
if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) /* internal error */
23+
elog(ERROR, "not fired by event trigger manager");
24+
25+
trigdata = (EventTriggerData *) fcinfo->context;
26+
Assert(trigdata->parsetree->type == T_DropStmt);
27+
stmt = (DropStmt *) trigdata->parsetree;
28+
Assert(stmt->removeType == OBJECT_EXTENSION);
29+
/* So it is list of pointers to Nodes */
30+
Assert(stmt->objects->type == T_List);
31+
/* To Value nodes, actually */
32+
Assert(((Node *) (linitial(stmt->objects)))->type == T_String);
33+
/* Seems like no way to use smth like linitial_node, because Value node
34+
* can have different types */
35+
36+
value = (Value *) linitial(stmt->objects);
37+
ext_name = strVal(value);
38+
if (strcmp(ext_name, "pg_shardman") == 0)
39+
{
40+
/* So we are dropping pg_shardman */
41+
const char *cmd_sql = "select shardman.pg_shardman_cleanup();";
42+
43+
SPI_connect();
44+
if (SPI_execute(cmd_sql, true, 0) < 0)
45+
elog(FATAL, "Stmt failed: %s", cmd_sql);
46+
SPI_finish();
47+
}
48+
49+
PG_RETURN_NULL();
50+
}
51+
52+
/*
53+
* Generate CREATE TABLE sql for relation via pg_dump. Parameter is not
54+
* REGCLASS because pg_dump can't handle oids anyway.
55+
*/
56+
PG_FUNCTION_INFO_V1(gen_create_table_sql);
57+
Datum
58+
gen_create_table_sql(PG_FUNCTION_ARGS)
59+
{
60+
char pg_dump_path[MAXPGPATH];
61+
Name db; /* current db name */
62+
/* let the mmgr free that */
63+
char *relation = text_to_cstring(PG_GETARG_TEXT_PP(0));
64+
const size_t chunksize = 5; /* read max that bytes at time */
65+
/* how much already allocated *including header* */
66+
size_t pallocated = VARHDRSZ + chunksize;
67+
text *sql = (text *) palloc(pallocated);
68+
char *ptr = VARDATA(sql); /* ptr to first free byte */
69+
char *cmd;
70+
FILE *fp;
71+
size_t bytes_read;
72+
SET_VARSIZE(sql, VARHDRSZ);
73+
74+
/* find pg_dump location */
75+
if (find_my_exec("pg_dump", pg_dump_path) != 0)
76+
{
77+
elog(ERROR, "Failed to find pg_dump location");
78+
}
79+
/* find current db */
80+
db = (Name) palloc(NAMEDATALEN);
81+
namestrcpy(db, get_database_name(MyDatabaseId));
82+
83+
cmd = psprintf("%s -t '%s' --schema-only '%s' 2>&1",
84+
pg_dump_path, relation, (NameStr(*db)));
85+
86+
if ((fp = popen(cmd, "r")) == NULL)
87+
{
88+
elog(ERROR, "Failed to run pg_dump, cmd %s", cmd);
89+
}
90+
91+
while ((bytes_read = fread(ptr, sizeof(char), chunksize, fp)) != 0)
92+
{
93+
SET_VARSIZE(sql, VARSIZE_ANY(sql) + bytes_read);
94+
if (pallocated - VARSIZE_ANY(sql) < chunksize)
95+
{
96+
pallocated *= 2;
97+
sql = (text *) repalloc(sql, pallocated);
98+
}
99+
/* since we realloc, can't just += bytes_read here */
100+
ptr = VARDATA(sql) + VARSIZE_ANY_EXHDR(sql);
101+
}
102+
103+
if (pclose(fp)) {
104+
elog(ERROR, "pg_dump exited with error status, output was\n%scmd was \n%s",
105+
text_to_cstring(sql), cmd);
106+
}
107+
108+
PG_RETURN_TEXT_P(sql);
109+
}

0 commit comments

Comments
 (0)