Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f85fa95

Browse files
committed
Cleanup on DROP EXTENSION, shmn_elog, master publicates metadata
1 parent 3048d2e commit f85fa95

File tree

7 files changed

+179
-33
lines changed

7 files changed

+179
-33
lines changed

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
EXTENSION = pg_shardman # the extension name
2-
DATA = pg_shardman--0.0.1.sql # script files to install with CREATE EXTENSION
2+
# This file will be executed by CREATE EXTENSION, so let pgxs install it.
3+
DATA = pg_shardman--0.0.1.sql
34

45
MODULE_big = pg_shardman
5-
OBJS = src/pg_shardman.o
6+
OBJS = src/pg_shardman.o src/pg_shardman_cleanup.o
67

78
PG_CPPFLAGS += -Isrc/include
89

bin/shardman_start.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,7 @@ for port in $master_port "${worker_ports[@]}"; do
1212
psql -p $port -c "drop extension if exists pg_shardman;"
1313
psql -p $port -c "create extension pg_shardman cascade;"
1414
done
15+
# to restart master bgw
16+
restart_nodes
1517

1618
psql

pg_shardman--0.0.1.sql

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,53 @@ INSERT INTO @extschema@.local_meta VALUES ('node_id', NULL);
5050

5151
-- Internal functions
5252

53+
-- These tables will be replicated to worker nodes, notifying them about changes.
54+
-- Called on master.
55+
CREATE FUNCTION create_meta_pub() RETURNS void AS $$
56+
BEGIN
57+
IF NOT EXISTS (SELECT * FROM pg_publication WHERE pubname = 'shardman_meta_pub') THEN
58+
CREATE PUBLICATION shardman_meta_pub FOR TABLE shardman.nodes;
59+
END IF;
60+
END;
61+
$$ LANGUAGE plpgsql;
62+
63+
-- These tables will be replicated to worker nodes, notifying them about changes.
64+
-- Called on worker nodes.
65+
CREATE FUNCTION create_meta_sub() RETURNS void AS $$
66+
BEGIN
67+
IF NOT EXISTS (SELECT * FROM pg_publication WHERE pubname = 'shardman_meta_pub') THEN
68+
CREATE PUBLICATION shardman_meta_pub FOR TABLE shardman.nodes;
69+
END IF;
70+
END;
71+
$$ LANGUAGE plpgsql;
72+
73+
-- Remove all our logical replication stuff in case of drop extension.
74+
-- Dropping extension cleanup is not that easy:
75+
-- - pg offers event triggers sql_drop, dd_command_end and ddl_command_start
76+
-- - sql_drop looks like what we need, but we we can't do it from deleting
77+
-- extension itself -- the trigger will be already deleted at the moment we
78+
-- need it.
79+
-- - same with dd_command_end
80+
-- - ddl_command_start apparently doesn't provide us with info what exactly
81+
-- is happening, I mean its impossible to learn with plpgsql what extension
82+
-- is deleting.
83+
-- - because of that I resort to C function which examines parse tree and if
84+
-- it is our extension is deleting, it calls plpgsql cleanup func
85+
CREATE OR REPLACE FUNCTION pg_shardman_cleanup() RETURNS void AS $$
86+
DECLARE
87+
pub RECORD;
88+
BEGIN
89+
FOR pub IN SELECT pubname FROM pg_publication WHERE pubname LIKE 'shardman_%' LOOP
90+
EXECUTE 'DROP PUBLICATION ' || quote_ident(pub.pubname);
91+
END LOOP;
92+
END;
93+
$$ LANGUAGE plpgsql;
94+
CREATE FUNCTION pg_shardman_cleanup_c() RETURNS event_trigger
95+
AS 'pg_shardman' LANGUAGE C;
96+
CREATE EVENT TRIGGER cleanup_lr_trigger ON ddl_command_start
97+
WHEN TAG in ('DROP EXTENSION')
98+
EXECUTE PROCEDURE pg_shardman_cleanup_c();
99+
53100
-- Get local node id. NULL means node is not in the cluster yet.
54101
CREATE FUNCTION get_node_id() RETURNS int AS $$
55102
SELECT v::int FROM @extschema@.local_meta WHERE k = 'node_id';

pg_shardman.control

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
comment = 'Postgresql sharding via pg_pathman, postgres_fdw, LR and others'
2+
# CREATE EXTENSION will run extname--version.sql file
23
default_version = '0.0.1'
34
# TODO: make it relocatable; for that, we need
45
# * learn the scheme when connecting from another node

src/include/pg_shardman.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef PG_SHARDMAN_H
22
#define PG_SHARDMAN_H
33

4+
#define shmn_elog(level,fmt,...) elog(level, "[SHARDMAN] " fmt, ## __VA_ARGS__)
5+
46
extern void _PG_init(void);
57
extern void shardmaster_main(Datum main_arg);
68

src/pg_shardman.c

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ typedef struct Cmd
4343
static Cmd *next_cmd(void);
4444
static void update_cmd_status(int64 id, const char *new_status);
4545
static PGconn *listen_cmd_log_inserts(void);
46+
static void publicate_metadata(void);
4647
static void wait_notify(PGconn *conn);
4748
static void shardmaster_sigterm(SIGNAL_ARGS);
4849
static void shardmaster_sigusr1(SIGNAL_ARGS);
@@ -60,11 +61,11 @@ static bool shardman_master = false;
6061
static char *shardman_master_dbname = "postgres";
6162
static int shardman_cmd_retry_naptime = 10000;
6263

63-
/* just global vars */
64+
/* Just global vars. */
6465
/* Connection to local server for LISTEN notifications. Is is global for easy
6566
* cleanup after receiving SIGTERM.
6667
*/
67-
static PGconn *conn;
68+
static PGconn *conn = NULL;
6869

6970
/*
7071
* Entrypoint of the module. Define variables and register background worker.
@@ -125,6 +126,7 @@ _PG_init()
125126
shardmaster_worker.bgw_notify_pid = 0;
126127
RegisterBackgroundWorker(&shardmaster_worker);
127128
}
129+
/* TODO: clean up publications if we were master before */
128130
}
129131

130132
/*
@@ -134,7 +136,7 @@ void
134136
shardmaster_main(Datum main_arg)
135137
{
136138
Cmd *cmd;
137-
elog(LOG, "Shardmaster started");
139+
shmn_elog(LOG, "Shardmaster started");
138140

139141
/* Connect to the database to use SPI*/
140142
BackgroundWorkerInitializeConnection(shardman_master_dbname, NULL);
@@ -147,6 +149,7 @@ shardmaster_main(Datum main_arg)
147149
/* We're now ready to receive signals */
148150
BackgroundWorkerUnblockSignals();
149151

152+
publicate_metadata();
150153
conn = listen_cmd_log_inserts();
151154

152155
/* main loop */
@@ -156,28 +159,50 @@ shardmaster_main(Datum main_arg)
156159
while ((cmd = next_cmd()) != NULL)
157160
{
158161
update_cmd_status(cmd->id, "in progress");
159-
elog(LOG, "Working on command %ld, %s, opts are", cmd->id, cmd->cmd_type);
162+
shmn_elog(LOG, "Working on command %ld, %s, opts are",
163+
cmd->id, cmd->cmd_type);
160164
for (char **opts = cmd->opts; *opts; opts++)
161-
elog(LOG, "%s", *opts);
165+
shmn_elog(LOG, "%s", *opts);
162166
if (strcmp(cmd->cmd_type, "add_node") == 0)
163167
add_node(cmd);
164168
else
165-
elog(FATAL, "Unknown cmd type %s", cmd->cmd_type);
169+
shmn_elog(FATAL, "Unknown cmd type %s", cmd->cmd_type);
166170
}
167171
wait_notify(conn);
168172
check_for_sigterm();
169173
}
170174

171175
}
172176

177+
/*
178+
* Create publication on tables with metadata.
179+
*/
180+
void
181+
publicate_metadata(void)
182+
{
183+
const char *cmd_sql = "select shardman.create_meta_pub();";
184+
int e;
185+
186+
StartTransactionCommand();
187+
SPI_connect();
188+
PushActiveSnapshot(GetTransactionSnapshot());
189+
190+
e = SPI_execute(cmd_sql, true, 0);
191+
if (e < 0)
192+
shmn_elog(FATAL, "Stmt failed: %s", cmd_sql);
193+
194+
PopActiveSnapshot();
195+
SPI_finish();
196+
CommitTransactionCommand();
197+
}
198+
173199
/*
174200
* Open libpq connection to our server and start listening to cmd_log inserts
175201
* notifications.
176202
*/
177203
PGconn *
178204
listen_cmd_log_inserts(void)
179205
{
180-
PGconn *conn;
181206
char *conninfo;
182207
PGresult *res;
183208

@@ -186,13 +211,13 @@ listen_cmd_log_inserts(void)
186211
pfree(conninfo);
187212
/* Check to see that the backend connection was successfully made */
188213
if (PQstatus(conn) != CONNECTION_OK)
189-
elog(FATAL, "Connection to database failed: %s",
214+
shmn_elog(FATAL, "Connection to local database failed: %s",
190215
PQerrorMessage(conn));
191216

192217
res = PQexec(conn, "LISTEN shardman_cmd_log_update");
193218
if (PQresultStatus(res) != PGRES_COMMAND_OK)
194219
{
195-
elog(FATAL, "LISTEN command failed: %s", PQerrorMessage(conn));
220+
shmn_elog(FATAL, "LISTEN command failed: %s", PQerrorMessage(conn));
196221
}
197222
PQclear(res);
198223

@@ -212,7 +237,7 @@ wait_notify(PGconn *conn)
212237

213238
sock = PQsocket(conn);
214239
if (sock < 0)
215-
elog(FATAL, "Couldn't get sock from pgconn");
240+
shmn_elog(FATAL, "Couldn't get sock from pgconn");
216241

217242
FD_ZERO(&input_mask);
218243
FD_SET(sock, &input_mask);
@@ -221,14 +246,14 @@ wait_notify(PGconn *conn)
221246
{
222247
if (errno == EINTR)
223248
return; /* signal has arrived */
224-
elog(FATAL, "select() failed: %s", strerror(errno));
249+
shmn_elog(FATAL, "select() failed: %s", strerror(errno));
225250
}
226251

227252
PQconsumeInput(conn);
228253
/* eat all notifications at once */
229254
while ((notify = PQnotifies(conn)) != NULL)
230255
{
231-
elog(LOG, "NOTIFY %s received from backend PID %d",
256+
shmn_elog(LOG, "NOTIFY %s received from backend PID %d",
232257
notify->relname, notify->be_pid);
233258
PQfreemem(notify);
234259
}
@@ -257,7 +282,7 @@ next_cmd(void)
257282
" status = 'in progress') t2 using (id);";
258283
e = SPI_execute(cmd_sql, true, 0);
259284
if (e < 0)
260-
elog(FATAL, "Stmt failed: %s", cmd_sql);
285+
shmn_elog(FATAL, "Stmt failed: %s", cmd_sql);
261286

262287
if (SPI_processed > 0)
263288
{
@@ -281,7 +306,7 @@ next_cmd(void)
281306
" cmd_id = %ld order by id;", cmd->id);
282307
e = SPI_execute(cmd_sql, true, 0);
283308
if (e < 0)
284-
elog(FATAL, "Stmt failed: %s", cmd_sql);
309+
shmn_elog(FATAL, "Stmt failed: %s", cmd_sql);
285310

286311
MemoryContextSwitchTo(oldcxt);
287312
/* +1 for NULL in the end */
@@ -323,7 +348,7 @@ update_cmd_status(int64 id, const char *new_status)
323348
pfree(sql);
324349
if (e < 0)
325350
{
326-
elog(FATAL, "Stmt failed: %s", sql);
351+
shmn_elog(FATAL, "Stmt failed: %s", sql);
327352
}
328353

329354
PopActiveSnapshot();
@@ -345,12 +370,13 @@ pg_shardman_installed_local(void)
345370
if (get_extension_oid("pg_shardman", true) == InvalidOid)
346371
{
347372
installed = false;
348-
elog(WARNING, "pg_shardman library is preloaded, but extenstion is not created");
373+
shmn_elog(WARNING, "pg_shardman library is preloaded, but extenstion is not created");
349374
}
350375
PopActiveSnapshot();
351376
CommitTransactionCommand();
352377

353378
/* shardmaster won't run without extension */
379+
/* TODO: unregister bgw? */
354380
if (!installed)
355381
proc_exit(1);
356382
}
@@ -383,8 +409,9 @@ check_for_sigterm(void)
383409
{
384410
if (got_sigterm)
385411
{
386-
elog(LOG, "Shardmaster received SIGTERM, exiting");
387-
PQfinish(conn);
412+
shmn_elog(LOG, "Shardmaster received SIGTERM, exiting");
413+
if (conn != NULL)
414+
PQfinish(conn);
388415
proc_exit(0);
389416
}
390417
}
@@ -402,15 +429,16 @@ static void add_node(Cmd *cmd)
402429
const char *conninfo = cmd->opts[0];
403430
PGresult *res = NULL;
404431
bool pg_shardman_installed;
432+
int node_id;
405433

406-
elog(LOG, "Adding node %s", conninfo);
434+
shmn_elog(LOG, "Adding node %s", conninfo);
407435
/* Try to execute command indefinitely until it succeeded or canceled */
408436
while (!got_sigusr1 && !got_sigterm)
409437
{
410438
conn = PQconnectdb(conninfo);
411439
if (PQstatus(conn) != CONNECTION_OK)
412440
{
413-
elog(NOTICE, "Connection to add_node node failed: %s",
441+
shmn_elog(NOTICE, "Connection to add_node node failed: %s",
414442
PQerrorMessage(conn));
415443
goto attempt_failed;
416444
}
@@ -421,7 +449,7 @@ static void add_node(Cmd *cmd)
421449
" where name = 'pg_shardman';");
422450
if (PQresultStatus(res) != PGRES_TUPLES_OK)
423451
{
424-
elog(NOTICE, "Failed to check whether pg_shardman is installed on"
452+
shmn_elog(NOTICE, "Failed to check whether pg_shardman is installed on"
425453
" node to add%s", PQerrorMessage(conn));
426454
goto attempt_failed;
427455
}
@@ -435,22 +463,37 @@ static void add_node(Cmd *cmd)
435463
res = PQexec(conn, "select shardman.get_node_id();");
436464
if (PQresultStatus(res) != PGRES_TUPLES_OK)
437465
{
438-
elog(NOTICE, "Failed to get node id, %s", PQerrorMessage(conn));
466+
shmn_elog(NOTICE, "Failed to get node id, %s", PQerrorMessage(conn));
439467
goto attempt_failed;
440468
}
441-
int node_id = atoi(PQgetvalue(res, 0, 0));
469+
node_id = atoi(PQgetvalue(res, 0, 0));
470+
PQclear(res);
442471
if (node_in_cluster(node_id))
443472
{
444-
elog(WARNING, "node %d with connstring %s is already in cluster,"
473+
shmn_elog(WARNING, "node %d with connstring %s is already in cluster,"
445474
" won't add it.", node_id, conninfo);
446-
PQclear(res);
447475
PQfinish(conn);
448476
update_cmd_status(cmd->id, "failed");
449477
return;
450478
}
451479
}
452480

481+
/* Now, when we are sure that node is not in the cluster, we reinstall
482+
* the extension to reset its state, whether is was installed before
483+
* or not.
484+
*/
485+
res = PQexec(conn, "drop extension if exists pg_shardman; "
486+
" create extension pg_shardman;");
487+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
488+
{
489+
shmn_elog(NOTICE, "Failed to reinstall pg_shardman, %s", PQerrorMessage(conn));
490+
goto attempt_failed;
491+
}
453492
PQclear(res);
493+
494+
/* TODO */
495+
496+
/* done */
454497
PQfinish(conn);
455498
update_cmd_status(cmd->id, "success");
456499
return;
@@ -461,14 +504,14 @@ static void add_node(Cmd *cmd)
461504
if (conn != NULL)
462505
PQfinish(conn);
463506

464-
elog(LOG, "Attempt to execute add_node failed, sleeping and retrying");
465-
pg_usleep(shardman_cmd_retry_naptime * 1000);
507+
shmn_elog(LOG, "Attempt to execute add_node failed, sleeping and retrying");
508+
pg_usleep(shardman_cmd_retry_naptime * 1000L);
466509
}
467510

468511
check_for_sigterm();
469512

470513
/* Command canceled via sigusr1 */
471-
elog(LOG, "Command %ld canceled", cmd->id);
514+
shmn_elog(LOG, "Command %ld canceled", cmd->id);
472515
update_cmd_status(cmd->id, "canceled");
473516
got_sigusr1 = false;
474517
return;
@@ -481,7 +524,7 @@ static bool
481524
node_in_cluster(int id)
482525
{
483526
int e;
484-
char *sql = "select id from shardman.nodes;";
527+
const char *sql = "select id from shardman.nodes;";
485528
bool res = false;
486529
HeapTuple tuple;
487530
TupleDesc rowdesc;
@@ -492,9 +535,9 @@ node_in_cluster(int id)
492535
SPI_connect();
493536
PushActiveSnapshot(GetTransactionSnapshot());
494537

495-
e = SPI_exec(sql, 0);
538+
e = SPI_execute(sql, true, 0);
496539
if (e < 0)
497-
elog(FATAL, "Stmt failed: %s", sql);
540+
shmn_elog(FATAL, "Stmt failed: %s", sql);
498541

499542
rowdesc = SPI_tuptable->tupdesc;
500543
for (i = 0; i < SPI_processed; i++)

0 commit comments

Comments
 (0)