@@ -46,7 +46,10 @@ static PGconn *listen_cmd_log_inserts(void);
46
46
static void wait_notify (PGconn * conn );
47
47
static void shardmaster_sigterm (SIGNAL_ARGS );
48
48
static void shardmaster_sigusr1 (SIGNAL_ARGS );
49
- static void pg_shardman_installed (void );
49
+ static void check_for_sigterm (void );
50
+ static void pg_shardman_installed_local (void );
51
+ static void add_node (Cmd * cmd );
52
+ static bool node_in_cluster (int id );
50
53
51
54
/* flags set by signal handlers */
52
55
static volatile sig_atomic_t got_sigterm = false;
@@ -55,6 +58,13 @@ static volatile sig_atomic_t got_sigusr1 = false;
55
58
/* GUC variables */
56
59
static bool shardman_master = false;
57
60
static char * shardman_master_dbname = "postgres" ;
61
+ static int shardman_cmd_retry_naptime = 10000 ;
62
+
63
+ /* just global vars */
64
+ /* Connection to local server for LISTEN notifications. Is is global for easy
65
+ * cleanup after receiving SIGTERM.
66
+ */
67
+ static PGconn * conn ;
58
68
59
69
/*
60
70
* Entrypoint of the module. Define variables and register background worker.
@@ -91,6 +101,17 @@ _PG_init()
91
101
NULL , NULL , NULL
92
102
);
93
103
104
+ DefineCustomIntVariable ("shardman.cmd_retry_naptime" ,
105
+ "Sleep time in millisec between retrying to execute failing command" ,
106
+ NULL ,
107
+ & shardman_cmd_retry_naptime ,
108
+ 10000 ,
109
+ 0 ,
110
+ INT_MAX ,
111
+ PGC_SIGHUP ,
112
+ 0 ,
113
+ NULL , NULL , NULL );
114
+
94
115
if (shardman_master )
95
116
{
96
117
/* register shardmaster */
@@ -113,13 +134,12 @@ void
113
134
shardmaster_main (Datum main_arg )
114
135
{
115
136
Cmd * cmd ;
116
- PGconn * conn ;
117
137
elog (LOG , "Shardmaster started" );
118
138
119
139
/* Connect to the database to use SPI*/
120
140
BackgroundWorkerInitializeConnection (shardman_master_dbname , NULL );
121
141
/* sanity check */
122
- pg_shardman_installed ();
142
+ pg_shardman_installed_local ();
123
143
124
144
/* Establish signal handlers before unblocking signals. */
125
145
pqsignal (SIGTERM , shardmaster_sigterm );
@@ -130,7 +150,7 @@ shardmaster_main(Datum main_arg)
130
150
conn = listen_cmd_log_inserts ();
131
151
132
152
/* main loop */
133
- while (! got_sigterm )
153
+ while (1948 )
134
154
{
135
155
/* TODO: new mem ctxt for every command */
136
156
while ((cmd = next_cmd ()) != NULL )
@@ -139,20 +159,15 @@ shardmaster_main(Datum main_arg)
139
159
elog (LOG , "Working on command %ld, %s, opts are" , cmd -> id , cmd -> cmd_type );
140
160
for (char * * opts = cmd -> opts ; * opts ; opts ++ )
141
161
elog (LOG , "%s" , * opts );
142
- update_cmd_status (cmd -> id , "success" );
162
+ if (strcmp (cmd -> cmd_type , "add_node" ) == 0 )
163
+ add_node (cmd );
164
+ else
165
+ elog (FATAL , "Unknown cmd type %s" , cmd -> cmd_type );
143
166
}
144
167
wait_notify (conn );
145
- if (got_sigusr1 )
146
- {
147
- elog (LOG , "SIGUSR1 arrived, aborting current command" );
148
- update_cmd_status (cmd -> id , "canceled" );
149
- got_sigusr1 = false;
150
- }
168
+ check_for_sigterm ();
151
169
}
152
170
153
- elog (LOG , "Shardmaster received SIGTERM, exiting" );
154
- PQfinish (conn );
155
- proc_exit (0 );
156
171
}
157
172
158
173
/*
@@ -321,7 +336,7 @@ update_cmd_status(int64 id, const char *new_status)
321
336
* this point
322
337
*/
323
338
static void
324
- pg_shardman_installed (void )
339
+ pg_shardman_installed_local (void )
325
340
{
326
341
bool installed = true;
327
342
@@ -359,3 +374,141 @@ shardmaster_sigusr1(SIGNAL_ARGS)
359
374
{
360
375
got_sigusr1 = true;
361
376
}
377
+
378
+ /*
379
+ * Cleanup and exit in case of SIGTERM
380
+ */
381
+ static void
382
+ check_for_sigterm (void )
383
+ {
384
+ if (got_sigterm )
385
+ {
386
+ elog (LOG , "Shardmaster received SIGTERM, exiting" );
387
+ PQfinish (conn );
388
+ proc_exit (0 );
389
+ }
390
+ }
391
+
392
+ /*
393
+ * Adding node consists of
394
+ * - verifying that the node is not present in the cluster at the moment
395
+ * - subscription creation
396
+ * - setting node id
397
+ * - adding node to 'nodes' table
398
+ */
399
+ static void add_node (Cmd * cmd )
400
+ {
401
+ PGconn * conn = NULL ;
402
+ const char * conninfo = cmd -> opts [0 ];
403
+ PGresult * res = NULL ;
404
+ bool pg_shardman_installed ;
405
+
406
+ elog (LOG , "Adding node %s" , conninfo );
407
+ /* Try to execute command indefinitely until it succeeded or canceled */
408
+ while (!got_sigusr1 && !got_sigterm )
409
+ {
410
+ conn = PQconnectdb (conninfo );
411
+ if (PQstatus (conn ) != CONNECTION_OK )
412
+ {
413
+ elog (NOTICE , "Connection to add_node node failed: %s" ,
414
+ PQerrorMessage (conn ));
415
+ goto attempt_failed ;
416
+ }
417
+
418
+ /* Check if our extension is installed on the node */
419
+ res = PQexec (conn ,
420
+ "select installed_version from pg_available_extensions"
421
+ " where name = 'pg_shardman';" );
422
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
423
+ {
424
+ elog (NOTICE , "Failed to check whether pg_shardman is installed on"
425
+ " node to add%s" , PQerrorMessage (conn ));
426
+ goto attempt_failed ;
427
+ }
428
+ pg_shardman_installed = PQntuples (res ) == 1 && !PQgetisnull (res , 0 , 0 );
429
+ PQclear (res );
430
+
431
+ if (pg_shardman_installed )
432
+ {
433
+ /* extension is installed, so we have to check whether this node
434
+ * is already in the cluster */
435
+ res = PQexec (conn , "select shardman.get_node_id();" );
436
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
437
+ {
438
+ elog (NOTICE , "Failed to get node id, %s" , PQerrorMessage (conn ));
439
+ goto attempt_failed ;
440
+ }
441
+ int node_id = atoi (PQgetvalue (res , 0 , 0 ));
442
+ if (node_in_cluster (node_id ))
443
+ {
444
+ elog (WARNING , "node %d with connstring %s is already in cluster,"
445
+ " won't add it." , node_id , conninfo );
446
+ PQclear (res );
447
+ PQfinish (conn );
448
+ update_cmd_status (cmd -> id , "failed" );
449
+ return ;
450
+ }
451
+ }
452
+
453
+ PQclear (res );
454
+ PQfinish (conn );
455
+ update_cmd_status (cmd -> id , "success" );
456
+ return ;
457
+
458
+ attempt_failed : /* clean resources, sleep, check sigusr1 and try again */
459
+ if (res != NULL )
460
+ PQclear (res );
461
+ if (conn != NULL )
462
+ PQfinish (conn );
463
+
464
+ elog (LOG , "Attempt to execute add_node failed, sleeping and retrying" );
465
+ pg_usleep (shardman_cmd_retry_naptime * 1000 );
466
+ }
467
+
468
+ check_for_sigterm ();
469
+
470
+ /* Command canceled via sigusr1 */
471
+ elog (LOG , "Command %ld canceled" , cmd -> id );
472
+ update_cmd_status (cmd -> id , "canceled" );
473
+ got_sigusr1 = false;
474
+ return ;
475
+ }
476
+
477
+ /*
478
+ * Returns true, if node 'id' is in the cluster, false otherwise.
479
+ */
480
+ static bool
481
+ node_in_cluster (int id )
482
+ {
483
+ int e ;
484
+ char * sql = "select id from shardman.nodes;" ;
485
+ bool res = false;
486
+ HeapTuple tuple ;
487
+ TupleDesc rowdesc ;
488
+ uint64 i ;
489
+ bool isnull ;
490
+
491
+ StartTransactionCommand ();
492
+ SPI_connect ();
493
+ PushActiveSnapshot (GetTransactionSnapshot ());
494
+
495
+ e = SPI_exec (sql , 0 );
496
+ if (e < 0 )
497
+ elog (FATAL , "Stmt failed: %s" , sql );
498
+
499
+ rowdesc = SPI_tuptable -> tupdesc ;
500
+ for (i = 0 ; i < SPI_processed ; i ++ )
501
+ {
502
+ tuple = SPI_tuptable -> vals [i ];
503
+ if (id == DatumGetInt32 (SPI_getbinval (tuple , rowdesc ,
504
+ SPI_fnumber (rowdesc , "id" ),
505
+ & isnull )))
506
+ res = true;
507
+ }
508
+
509
+ PopActiveSnapshot ();
510
+ SPI_finish ();
511
+ CommitTransactionCommand ();
512
+
513
+ return res ;
514
+ }
0 commit comments