Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f140ee8

Browse files
committed
Background worker knows how to retrieve new commands.
Besides, * It can update their status; * Extension linked with libpq (to LISTEN later); * cmd id type changed to int64; * Insertions into cmd_log now NOTIFY automatically; * We should review the question how to use SPI from bgw carefully;
1 parent 013e68c commit f140ee8

File tree

6 files changed

+241
-13
lines changed

6 files changed

+241
-13
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*.o
2+
*.so
3+
.dir-locals.el

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
EXTENSION = pg_shardman # the extension name
22
DATA = pg_shardman--0.0.1.sql # script files to install with CREATE EXTENSION
33

4+
MODULE_big = pg_shardman
5+
OBJS = src/pg_shardman.o
6+
47
# You can specify path to pg_config in PG_CONFIG var
58
ifndef PG_CONFIG
69
PG_CONFIG := pg_config
710
endif
811
PGXS := $(shell $(PG_CONFIG) --pgxs)
12+
13+
INCLUDEDIR := $(shell $(PG_CONFIG) --includedir)
14+
PG_CPPFLAGS = -I$(INCLUDEDIR) # add server's include directory for libpq-fe.h
15+
SHLIB_LINK += -lpq # add libpq
16+
917
include $(PGXS)

pg_shardman--0.0.1.sql

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,46 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_shardman" to load this file. \quit
33

4-
-- types & tables
5-
64
-- available commands
75
CREATE TYPE cmd AS ENUM ('add_node', 'remove_node');
86
-- command status
9-
CREATE TYPE cmd_status AS ENUM ('waiting', 'failed', 'success');
7+
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'in progress', 'success');
108

119
CREATE TABLE cmd_log (
12-
id serial PRIMARY KEY,
10+
id bigserial PRIMARY KEY,
1311
cmd_type cmd NOT NULL,
14-
cmd_status cmd_status DEFAULT 'waiting' NOT NULL
12+
status cmd_status DEFAULT 'waiting' NOT NULL
1513
);
1614

15+
-- Notify shardman master bgw about new commands
16+
CREATE FUNCTION notify_shardmaster() RETURNS trigger AS $$
17+
BEGIN
18+
NOTIFY shardman_cmd_log_update;
19+
RETURN NULL;
20+
END
21+
$$ LANGUAGE plpgsql;
22+
CREATE TRIGGER cmd_log_update
23+
AFTER INSERT ON cmd_log
24+
FOR EACH STATEMENT
25+
EXECUTE PROCEDURE notify_shardmaster();
26+
27+
1728
-- probably better to keep opts in an array field, but working with arrays from
1829
-- libpq is not very handy
1930
CREATE TABLE cmd_opts (
20-
cmd_id int REFERENCES cmd_log(id),
31+
cmd_id bigint REFERENCES cmd_log(id),
2132
opt text NOT NULL
2233
);
2334

24-
-- Probably later we should provide as flexible interface as libpq does for
25-
-- specifying connstrings
35+
-- list of nodes present in the cluster
2636
CREATE TABLE nodes (
2737
id serial PRIMARY KEY,
2838
connstring text
2939
);
3040

31-
-- functions
41+
-- Interface functions
3242

33-
-- TODO: try to connect immediately and ensure that its id (if any) is not
43+
-- TODO: during the initial connection, ensure that nodes id (if any) is not
3444
-- present in the cluster
3545
CREATE FUNCTION add_node(connstring text) RETURNS void AS $$
3646
DECLARE

pg_shardman.control

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
comment = 'Postgresql sharding via pg_pathman, postgres_fdw, LR and others'
22
default_version = '0.0.1'
3-
relocatable = false
3+
# TODO: make it relocatable; for that, we need
4+
# * learn the scheme when connecting from another node
5+
# * somehow avoid @extschema@ at init sql at all
6+
relocatable = false
7+
schema = shardman

readme.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,5 @@ add to postgresql.conf
1111
shared_preload_libraries = '$libdir/pg_shardman'
1212

1313
restart postgres server and run
14-
create schema if not exists shardman;
1514
drop extension if exists pg_shardman;
16-
create extension pg_shardman with schema shardman;
15+
create extension pg_shardman;

src/pg_shardman.c

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/* -------------------------------------------------------------------------
2+
*
3+
* shardmaster.c
4+
* Background worker executing sharding tasks.
5+
*
6+
* -------------------------------------------------------------------------
7+
*/
8+
#include "postgres.h"
9+
10+
#include <sys/time.h> /* For 'select' portability */
11+
#include <sys/select.h>
12+
13+
/* These are always necessary for a bgworker */
14+
#include "miscadmin.h"
15+
#include "postmaster/bgworker.h"
16+
#include "storage/ipc.h"
17+
#include "storage/latch.h"
18+
#include "storage/lwlock.h"
19+
#include "storage/proc.h"
20+
#include "storage/shmem.h"
21+
22+
#include "utils/guc.h"
23+
#include "utils/snapmgr.h"
24+
#include "executor/spi.h"
25+
#include "access/xact.h"
26+
#include "libpq-fe.h"
27+
28+
29+
30+
31+
/* ensure that extension won't load against incompatible version of Postgres */
32+
PG_MODULE_MAGIC;
33+
34+
typedef struct Cmd
35+
{
36+
int64 id;
37+
char *cmd_type;
38+
char *status;
39+
} Cmd;
40+
41+
extern void _PG_init(void);
42+
extern void shardmaster_main(Datum main_arg);
43+
44+
static Cmd *next_cmd(void);
45+
static void update_cmd_status(int64 id, const char *new_status);
46+
47+
/* GUC variables */
48+
static bool shardman_master = false;
49+
static char *shardman_master_dbname = "postgres";
50+
51+
/*
52+
* Entrypoint of the module. Define variables and register background worker.
53+
*/
54+
void
55+
_PG_init()
56+
{
57+
BackgroundWorker shardmaster_worker;
58+
DefineCustomBoolVariable("shardman.master",
59+
"This node is the master?",
60+
NULL,
61+
&shardman_master,
62+
false,
63+
PGC_POSTMASTER,
64+
0,
65+
NULL, NULL, NULL);
66+
67+
DefineCustomStringVariable(
68+
"shardman.master_dbname",
69+
"Name of the database with extension on master node, shardmaster bgw"
70+
"will connect to it",
71+
NULL,
72+
&shardman_master_dbname,
73+
"postgres",
74+
PGC_POSTMASTER,
75+
0,
76+
NULL, NULL, NULL
77+
);
78+
79+
/* register shardmaster */
80+
sprintf(shardmaster_worker.bgw_name, "shardmaster");
81+
shardmaster_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
82+
BGWORKER_BACKEND_DATABASE_CONNECTION;
83+
shardmaster_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
84+
shardmaster_worker.bgw_restart_time = 1;
85+
sprintf(shardmaster_worker.bgw_library_name, "pg_shardman");
86+
sprintf(shardmaster_worker.bgw_function_name, "shardmaster_main");
87+
shardmaster_worker.bgw_notify_pid = 0;
88+
RegisterBackgroundWorker(&shardmaster_worker);
89+
}
90+
91+
/*
92+
* shardmaster bgw starts here
93+
*/
94+
void
95+
shardmaster_main(Datum main_arg)
96+
{
97+
Cmd *cmd;
98+
PGconn *conn;
99+
const char *conninfo;
100+
elog(LOG, "Shardmaster started");
101+
102+
/* Connect to the database to use SPI*/
103+
BackgroundWorkerInitializeConnection(shardman_master_dbname, NULL);
104+
105+
conninfo = psprintf("dbname = %s", shardman_master_dbname);
106+
conn = PQconnectdb(conninfo);
107+
/* Check to see that the backend connection was successfully made */
108+
if (PQstatus(conn) != CONNECTION_OK)
109+
elog(FATAL, "Connection to database failed: %s",
110+
PQerrorMessage(conn));
111+
112+
113+
/* pg_usleep(10000000L); */
114+
while ((cmd = next_cmd()) != NULL)
115+
{
116+
elog(LOG, "Working on command %ld, %s", cmd->id, cmd->cmd_type);
117+
update_cmd_status(cmd->id, "success");
118+
}
119+
120+
121+
/* while (1948) */
122+
/* { */
123+
124+
/* } */
125+
PQfinish(conn);
126+
proc_exit(0);
127+
}
128+
129+
/*
130+
* Retrieve next cmd to work on -- uncompleted command with min id.
131+
* Returns NULL if queue is empty. Memory is allocated in the current cxt.
132+
*/
133+
Cmd *
134+
next_cmd(void)
135+
{
136+
const char *cmd_sql;
137+
Cmd *cmd = NULL;
138+
MemoryContext oldcxt = CurrentMemoryContext;
139+
int e;
140+
141+
SetCurrentStatementStartTimestamp();
142+
StartTransactionCommand();
143+
SPI_connect();
144+
PushActiveSnapshot(GetTransactionSnapshot());
145+
146+
cmd_sql = "select * from shardman.cmd_log t1 join"
147+
" (select MIN(id) id from shardman.cmd_log where status = 'waiting' OR"
148+
" status = 'in progress') t2 using (id);";
149+
e = SPI_execute(cmd_sql, true, 0);
150+
if (e < 0)
151+
{
152+
elog(FATAL, "Stmt failed: %s", cmd_sql);
153+
}
154+
155+
if (SPI_processed > 0)
156+
{
157+
HeapTuple tuple = SPI_tuptable->vals[0];
158+
TupleDesc rowdesc = SPI_tuptable->tupdesc;
159+
bool isnull;
160+
const char *cmd_type = (SPI_getvalue(tuple, rowdesc,
161+
SPI_fnumber(rowdesc, "cmd_type")));
162+
163+
MemoryContext spicxt = MemoryContextSwitchTo(oldcxt);
164+
cmd = palloc(sizeof(Cmd));
165+
cmd->id = DatumGetInt64(SPI_getbinval(tuple, rowdesc,
166+
SPI_fnumber(rowdesc, "id"),
167+
&isnull));
168+
cmd->cmd_type = pstrdup(cmd_type);
169+
MemoryContextSwitchTo(spicxt);
170+
}
171+
172+
PopActiveSnapshot();
173+
SPI_finish();
174+
CommitTransactionCommand();
175+
176+
return cmd;
177+
}
178+
179+
/*
180+
* Update command status
181+
*/
182+
void
183+
update_cmd_status(int64 id, const char *new_status)
184+
{
185+
char *sql;
186+
int e;
187+
SetCurrentStatementStartTimestamp();
188+
StartTransactionCommand();
189+
SPI_connect();
190+
PushActiveSnapshot(GetTransactionSnapshot());
191+
192+
sql = psprintf("update shardman.cmd_log set status = '%s' where id = %ld;",
193+
new_status, id);
194+
e = SPI_exec(sql, 0);
195+
if (e < 0)
196+
{
197+
elog(FATAL, "Stmt failed: %s", sql);
198+
}
199+
pfree(sql);
200+
201+
PopActiveSnapshot();
202+
SPI_finish();
203+
CommitTransactionCommand();
204+
}

0 commit comments

Comments
 (0)