@@ -49,10 +49,13 @@ static void shardmaster_sigterm(SIGNAL_ARGS);
49
49
static void shardmaster_sigusr1 (SIGNAL_ARGS );
50
50
static void check_for_sigterm (void );
51
51
static void pg_shardman_installed_local (void );
52
+
52
53
static void add_node (Cmd * cmd );
53
54
static int insert_node (const char * connstring , int64 cmd_id );
54
55
static bool node_in_cluster (int id );
55
- static void activate_node (int64 cmd_id , int node_id );
56
+
57
+ static void rm_node (Cmd * cmd );
58
+ static bool is_node_active (int node_id );
56
59
57
60
/* flags set by signal handlers */
58
61
static volatile sig_atomic_t got_sigterm = false;
@@ -135,8 +138,8 @@ _PG_init()
135
138
shardmaster_worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
136
139
BGWORKER_BACKEND_DATABASE_CONNECTION ;
137
140
shardmaster_worker .bgw_start_time = BgWorkerStart_RecoveryFinished ;
138
- /* shardmaster_worker.bgw_restart_time = 10; */
139
- shardmaster_worker .bgw_restart_time = BGW_NEVER_RESTART ;
141
+ shardmaster_worker .bgw_restart_time = 10 ;
142
+ /* shardmaster_worker.bgw_restart_time = BGW_NEVER_RESTART; */
140
143
sprintf (shardmaster_worker .bgw_library_name , "pg_shardman" );
141
144
sprintf (shardmaster_worker .bgw_function_name , "shardmaster_main" );
142
145
shardmaster_worker .bgw_notify_pid = 0 ;
@@ -181,6 +184,8 @@ shardmaster_main(Datum main_arg)
181
184
shmn_elog (LOG , "%s" , * opts );
182
185
if (strcmp (cmd -> cmd_type , "add_node" ) == 0 )
183
186
add_node (cmd );
187
+ else if (strcmp (cmd -> cmd_type , "rm_node" ) == 0 )
188
+ rm_node (cmd );
184
189
else
185
190
shmn_elog (FATAL , "Unknown cmd type %s" , cmd -> cmd_type );
186
191
}
@@ -371,7 +376,7 @@ pg_shardman_installed_local(void)
371
376
{
372
377
installed = false;
373
378
shmn_elog (WARNING , "pg_shardman library is preloaded, but extenstion"
374
- "is not created" );
379
+ " is not created" );
375
380
}
376
381
PopActiveSnapshot ();
377
382
CommitTransactionCommand ();
@@ -419,14 +424,18 @@ check_for_sigterm(void)
419
424
420
425
/*
421
426
* Adding node consists of
422
- * - verifying that the node is not present in the cluster at the moment
423
- * - extension recreation
424
- * - repl slot recreation
425
- * - subscription creation
426
- * - setting node id
427
- * - adding node to 'nodes' table
427
+ * - verifying the node is not 'active' in the cluster, i.e. 'nodes' table
428
+ * - adding node to the 'nodes' as not active, get its new id
429
+ * - reinstalling extenstion
430
+ * - recreating repslot
431
+ * - recreating subscription
432
+ * - setting node id on the node itself
433
+ * - marking node as active and cmd as success
434
+ * We do all this stuff to make all actions are idempodent to be able to retry
435
+ * them in case of any failure.
428
436
*/
429
- static void add_node (Cmd * cmd )
437
+ void
438
+ add_node (Cmd * cmd )
430
439
{
431
440
PGconn * conn = NULL ;
432
441
const char * connstring = cmd -> opts [0 ];
@@ -473,7 +482,7 @@ static void add_node(Cmd *cmd)
473
482
474
483
if (!PQgetisnull (res , 0 , 0 ))
475
484
{
476
- /* Node is in cluster. Is it active in our cluster ? */
485
+ /* Node is in cluster. Was it there before we started adding ? */
477
486
node_id = atoi (PQgetvalue (res , 0 , 0 ));
478
487
PQclear (res );
479
488
if (node_in_cluster (node_id ))
@@ -529,12 +538,24 @@ static void add_node(Cmd *cmd)
529
538
PQerrorMessage (conn ));
530
539
goto attempt_failed ;
531
540
}
532
- pg_shardman_installed = PQntuples ( res ) == 1 && ! PQgetisnull ( res , 0 , 0 );
541
+
533
542
PQclear (res );
543
+ PQfinish (conn );
544
+
545
+ /*
546
+ * Mark add_node cmd as success and node as active, we must do that in
547
+ * one txn.
548
+ */
549
+ sql = psprintf (
550
+ "update shardman.nodes set status = 'active' where id = %d;"
551
+ "update shardman.cmd_log set status = 'success' where id = %ld;" ,
552
+ node_id , cmd -> id );
553
+ void_spi (sql );
554
+ pfree (sql );
534
555
535
556
/* done */
536
- PQfinish ( conn );
537
- activate_node ( cmd -> id , node_id );
557
+ elog ( INFO , "Node %s successfully added, it is assigned id %d" ,
558
+ connstring , node_id );
538
559
return ;
539
560
540
561
attempt_failed : /* clean resources, sleep, check sigusr1 and try again */
@@ -544,6 +565,7 @@ static void add_node(Cmd *cmd)
544
565
PQfinish (conn );
545
566
546
567
shmn_elog (LOG , "Attempt to execute add_node failed, sleeping and retrying" );
568
+ /* TODO: sleep using waitlatch? */
547
569
pg_usleep (shardman_cmd_retry_naptime * 1000L );
548
570
}
549
571
@@ -581,53 +603,72 @@ insert_node(const char *connstring, int64 cmd_id)
581
603
}
582
604
583
605
/*
584
- * Returns true, if node 'id' is active in our cluster, false otherwise.
606
+ * Returns true, if node 'id' is in cluster and not in add_in_progress state
585
607
*/
586
608
static bool
587
609
node_in_cluster (int id )
588
610
{
589
- int e ;
590
- const char * sql = "select id from shardman.nodes where active;" ;
591
- bool res = false;
592
- HeapTuple tuple ;
593
- TupleDesc rowdesc ;
594
- uint64 i ;
595
- bool isnull ;
611
+ char * sql = psprintf (
612
+ "select id from shardman.nodes where id = %d and status != 'add_in_progress';" ,
613
+ id );
614
+ bool res ;
596
615
597
616
SPI_PROLOG ;
598
- e = SPI_execute (sql , true, 0 );
599
- if (e < 0 )
617
+ if (SPI_execute (sql , true, 0 ) < 0 )
600
618
shmn_elog (FATAL , "Stmt failed: %s" , sql );
619
+ pfree (sql );
620
+ res = SPI_processed == 1 ;
601
621
602
- rowdesc = SPI_tuptable -> tupdesc ;
603
- for (i = 0 ; i < SPI_processed ; i ++ )
604
- {
605
- tuple = SPI_tuptable -> vals [i ];
606
- if (id == DatumGetInt32 (SPI_getbinval (tuple , rowdesc ,
607
- SPI_fnumber (rowdesc , "id" ),
608
- & isnull )))
609
- res = true;
610
- }
611
622
SPI_EPILOG ;
612
-
613
623
return res ;
614
624
}
615
625
616
626
/*
617
- * Mark add_node cmd as success and node as active, we must do that in one txn
627
+ * Remove node, losing all data on it. We
628
+ * - ensure that there is active node with given id in the cluster
629
+ * - mark node as rm_in_progress and commit so this reaches node via LR
630
+ * - wait a bit to let it unsubscribe
631
+ * - drop replication slot, remove node row and mark cmd as success
632
+ * Everything is idempotent. Note that we are not allowed to remove repl slot
633
+ * when the walsender connection is alive, that's why we sleep here.
618
634
*/
619
- void activate_node (int64 cmd_id , int node_id )
635
+ void
636
+ rm_node (Cmd * cmd )
620
637
{
621
- int e ;
622
- char * sql = psprintf (
623
- "update shardman.nodes set active = true where id = %d;"
624
- "update shardman.cmd_log set status = 'success' where id = %ld;" ,
625
- node_id , cmd_id );
638
+ int node_id = atoi (cmd -> opts [0 ]);
639
+ char * sql ;
626
640
627
- SPI_PROLOG ;
628
- e = SPI_exec (sql , 0 );
641
+ if (!node_in_cluster (node_id ))
642
+ {
643
+ shmn_elog (WARNING , "node %d not in cluster, won't rm it." , node_id );
644
+ update_cmd_status (cmd -> id , "failed" );
645
+ return ;
646
+ }
647
+
648
+ sql = psprintf (
649
+ "update shardman.nodes set status = 'rm_in_progress' where id = %d;" ,
650
+ node_id );
651
+ void_spi (sql );
629
652
pfree (sql );
630
- if (e < 0 )
631
- shmn_elog (FATAL , "Stmt failed: %s" , sql );
632
- SPI_EPILOG ;
653
+
654
+ /* Let node drop the subscription */
655
+ pg_usleep (2 * 1000000L );
656
+
657
+ /*
658
+ * It is extremely unlikely that node still keeps walsender process
659
+ * connected but ignored our node status update, so this should succeed.
660
+ * If not, bgw exits, but postmaster will restart us to try again.
661
+ * TODO: at this stage, user can't cancel command at all, this should be
662
+ * fixed.
663
+ */
664
+ sql = psprintf (
665
+ "select shardman.drop_repslot('shardman_meta_sub_%d');"
666
+ /* keep silent cmd_log fk constraint */
667
+ "update shardman.cmd_log set node_id = null where node_id = %d;"
668
+ "delete from shardman.nodes where id = %d;"
669
+ "update shardman.cmd_log set status = 'success' where id = %ld;" ,
670
+ node_id , node_id , node_id , cmd -> id );
671
+ void_spi (sql );
672
+ pfree (sql );
673
+ elog (INFO , "Node %d successfully removed" , node_id );
633
674
}
0 commit comments