Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0da8c7a

Browse files
committed
Shardmaster bgw subscribed to new cmds.
Also, * process_shared_preload_libraries_in_progress check * shardmaster started only on master * checking that extension is installed * handling sigterm on exit and sigusr1 for later cmd canceling
1 parent f140ee8 commit 0da8c7a

File tree

3 files changed

+163
-28
lines changed

3 files changed

+163
-28
lines changed

pg_shardman--0.0.1.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ BEGIN
1919
RETURN NULL;
2020
END
2121
$$ LANGUAGE plpgsql;
22-
CREATE TRIGGER cmd_log_update
22+
CREATE TRIGGER cmd_log_inserts
2323
AFTER INSERT ON cmd_log
2424
FOR EACH STATEMENT
2525
EXECUTE PROCEDURE notify_shardmaster();

pg_shardman.control

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ default_version = '0.0.1'
44
# * learn the scheme when connecting from another node
55
# * somehow avoid @extschema@ at init sql at all
66
relocatable = false
7-
schema = shardman
7+
schema = shardman
8+
requires = 'pg_pathman, postgres_fdw'

src/pg_shardman.c

Lines changed: 160 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
#include "utils/snapmgr.h"
2424
#include "executor/spi.h"
2525
#include "access/xact.h"
26+
#include "commands/extension.h"
2627
#include "libpq-fe.h"
2728

2829

29-
30-
3130
/* ensure that extension won't load against incompatible version of Postgres */
3231
PG_MODULE_MAGIC;
3332

@@ -43,6 +42,15 @@ extern void shardmaster_main(Datum main_arg);
4342

4443
static Cmd *next_cmd(void);
4544
static void update_cmd_status(int64 id, const char *new_status);
45+
static PGconn *listen_cmd_log_inserts(void);
46+
static void wait_notify(PGconn *conn);
47+
static void shardmaster_sigterm(SIGNAL_ARGS);
48+
static void shardmaster_sigusr1(SIGNAL_ARGS);
49+
static void pg_shardman_installed(void);
50+
51+
/* flags set by signal handlers */
52+
static volatile sig_atomic_t got_sigterm = false;
53+
static volatile sig_atomic_t got_sigusr1 = false;
4654

4755
/* GUC variables */
4856
static bool shardman_master = false;
@@ -55,6 +63,13 @@ void
5563
_PG_init()
5664
{
5765
BackgroundWorker shardmaster_worker;
66+
67+
if (!process_shared_preload_libraries_in_progress)
68+
{
69+
ereport(ERROR, (errmsg("pg_shardman can only be loaded via shared_preload_libraries"),
70+
errhint("Add pg_shardman to shared_preload_libraries.")));
71+
}
72+
5873
DefineCustomBoolVariable("shardman.master",
5974
"This node is the master?",
6075
NULL,
@@ -76,16 +91,19 @@ _PG_init()
7691
NULL, NULL, NULL
7792
);
7893

79-
/* register shardmaster */
80-
sprintf(shardmaster_worker.bgw_name, "shardmaster");
81-
shardmaster_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
82-
BGWORKER_BACKEND_DATABASE_CONNECTION;
83-
shardmaster_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
84-
shardmaster_worker.bgw_restart_time = 1;
85-
sprintf(shardmaster_worker.bgw_library_name, "pg_shardman");
86-
sprintf(shardmaster_worker.bgw_function_name, "shardmaster_main");
87-
shardmaster_worker.bgw_notify_pid = 0;
88-
RegisterBackgroundWorker(&shardmaster_worker);
94+
if (shardman_master)
95+
{
96+
/* register shardmaster */
97+
sprintf(shardmaster_worker.bgw_name, "shardmaster");
98+
shardmaster_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
99+
BGWORKER_BACKEND_DATABASE_CONNECTION;
100+
shardmaster_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
101+
shardmaster_worker.bgw_restart_time = 10;
102+
sprintf(shardmaster_worker.bgw_library_name, "pg_shardman");
103+
sprintf(shardmaster_worker.bgw_function_name, "shardmaster_main");
104+
shardmaster_worker.bgw_notify_pid = 0;
105+
RegisterBackgroundWorker(&shardmaster_worker);
106+
}
89107
}
90108

91109
/*
@@ -96,34 +114,108 @@ shardmaster_main(Datum main_arg)
96114
{
97115
Cmd *cmd;
98116
PGconn *conn;
99-
const char *conninfo;
100117
elog(LOG, "Shardmaster started");
101118

102119
/* Connect to the database to use SPI*/
103120
BackgroundWorkerInitializeConnection(shardman_master_dbname, NULL);
121+
/* sanity check */
122+
pg_shardman_installed();
123+
124+
/* Establish signal handlers before unblocking signals. */
125+
pqsignal(SIGTERM, shardmaster_sigterm);
126+
pqsignal(SIGUSR1, shardmaster_sigusr1);
127+
/* We're now ready to receive signals */
128+
BackgroundWorkerUnblockSignals();
129+
130+
conn = listen_cmd_log_inserts();
131+
132+
/* main loop */
133+
while (!got_sigterm)
134+
{
135+
while ((cmd = next_cmd()) != NULL)
136+
{
137+
update_cmd_status(cmd->id, "in progress");
138+
elog(LOG, "Working on command %ld, %s", cmd->id, cmd->cmd_type);
139+
update_cmd_status(cmd->id, "success");
140+
}
141+
wait_notify(conn);
142+
if (got_sigusr1)
143+
{
144+
elog(LOG, "SIGUSR1 arrived, aborting current command");
145+
update_cmd_status(cmd->id, "canceled");
146+
got_sigusr1 = false;
147+
}
148+
}
149+
150+
elog(LOG, "Shardmaster received SIGTERM, exiting");
151+
PQfinish(conn);
152+
proc_exit(0);
153+
}
154+
155+
/*
156+
* Open libpq connection to our server and start listening to cmd_log inserts
157+
* notifications.
158+
*/
159+
PGconn *
160+
listen_cmd_log_inserts(void)
161+
{
162+
PGconn *conn;
163+
char *conninfo;
164+
PGresult *res;
104165

105166
conninfo = psprintf("dbname = %s", shardman_master_dbname);
106167
conn = PQconnectdb(conninfo);
168+
pfree(conninfo);
107169
/* Check to see that the backend connection was successfully made */
108170
if (PQstatus(conn) != CONNECTION_OK)
109171
elog(FATAL, "Connection to database failed: %s",
110172
PQerrorMessage(conn));
111173

112-
113-
/* pg_usleep(10000000L); */
114-
while ((cmd = next_cmd()) != NULL)
174+
res = PQexec(conn, "LISTEN shardman_cmd_log_update");
175+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
115176
{
116-
elog(LOG, "Working on command %ld, %s", cmd->id, cmd->cmd_type);
117-
update_cmd_status(cmd->id, "success");
177+
elog(FATAL, "LISTEN command failed: %s", PQerrorMessage(conn));
118178
}
179+
PQclear(res);
180+
181+
return conn;
182+
}
183+
184+
/*
185+
* Wait until NOTIFY or signal arrives. If select is alerted, but there are
186+
* no notifcations, we also return.
187+
*/
188+
void
189+
wait_notify(PGconn *conn)
190+
{
191+
int sock;
192+
fd_set input_mask;
193+
PGnotify *notify;
119194

195+
sock = PQsocket(conn);
196+
if (sock < 0)
197+
elog(FATAL, "Couldn't get sock from pgconn");
120198

121-
/* while (1948) */
122-
/* { */
199+
FD_ZERO(&input_mask);
200+
FD_SET(sock, &input_mask);
123201

124-
/* } */
125-
PQfinish(conn);
126-
proc_exit(0);
202+
if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
203+
{
204+
if (errno == EINTR)
205+
return; /* signal has arrived */
206+
elog(FATAL, "select() failed: %s", strerror(errno));
207+
}
208+
209+
PQconsumeInput(conn);
210+
/* eat all notifications at once */
211+
while ((notify = PQnotifies(conn)) != NULL)
212+
{
213+
elog(LOG, "NOTIFY %s received from backend PID %d",
214+
notify->relname, notify->be_pid);
215+
PQfreemem(notify);
216+
}
217+
218+
return;
127219
}
128220

129221
/*
@@ -138,7 +230,6 @@ next_cmd(void)
138230
MemoryContext oldcxt = CurrentMemoryContext;
139231
int e;
140232

141-
SetCurrentStatementStartTimestamp();
142233
StartTransactionCommand();
143234
SPI_connect();
144235
PushActiveSnapshot(GetTransactionSnapshot());
@@ -184,21 +275,64 @@ update_cmd_status(int64 id, const char *new_status)
184275
{
185276
char *sql;
186277
int e;
187-
SetCurrentStatementStartTimestamp();
188278
StartTransactionCommand();
189279
SPI_connect();
190280
PushActiveSnapshot(GetTransactionSnapshot());
191281

192282
sql = psprintf("update shardman.cmd_log set status = '%s' where id = %ld;",
193283
new_status, id);
194284
e = SPI_exec(sql, 0);
285+
pfree(sql);
195286
if (e < 0)
196287
{
197288
elog(FATAL, "Stmt failed: %s", sql);
198289
}
199-
pfree(sql);
200290

201291
PopActiveSnapshot();
202292
SPI_finish();
203293
CommitTransactionCommand();
204294
}
295+
296+
/*
297+
* Verify that extension is installed locally. We must be connected to db at
298+
* this point
299+
*/
300+
static void
301+
pg_shardman_installed(void)
302+
{
303+
bool installed = true;
304+
305+
StartTransactionCommand();
306+
PushActiveSnapshot(GetTransactionSnapshot());
307+
if (get_extension_oid("pg_shardman", true) == InvalidOid)
308+
{
309+
installed = false;
310+
elog(WARNING, "pg_shardman library is preloaded, but extenstion is not created");
311+
}
312+
PopActiveSnapshot();
313+
CommitTransactionCommand();
314+
315+
/* shardmaster won't run without extension */
316+
if (!installed)
317+
proc_exit(1);
318+
}
319+
320+
/*
321+
* Signal handler for SIGTERM
322+
* Set a flag to let the main loop to terminate.
323+
*/
324+
static void
325+
shardmaster_sigterm(SIGNAL_ARGS)
326+
{
327+
got_sigterm = true;
328+
}
329+
330+
/*
331+
* Signal handler for SIGUSR1
332+
* Set a flag to let the main loop to terminate.
333+
*/
334+
static void
335+
shardmaster_sigusr1(SIGNAL_ARGS)
336+
{
337+
got_sigusr1 = true;
338+
}

0 commit comments

Comments
 (0)