@@ -473,6 +473,7 @@ cmd_canceled(Cmd *cmd)
473
473
* - recreating repslot
474
474
* - recreating subscription
475
475
* - setting node id on the node itself
476
+ * - waiting for initial tablesync
476
477
* - marking node as active and cmd as success
477
478
* We do all this stuff to make all actions are idempodent to be able to retry
478
479
* them in case of any failure.
@@ -487,6 +488,7 @@ add_node(Cmd *cmd)
487
488
bool pg_shardman_installed ;
488
489
int32 node_id ;
489
490
char * sql ;
491
+ bool tablesync_done = false;
490
492
491
493
shmn_elog (INFO , "Adding node %s" , connstr );
492
494
/* Try to execute command indefinitely until it succeeded or canceled */
@@ -577,15 +579,63 @@ add_node(Cmd *cmd)
577
579
"select shardman.set_node_id(%d);" ,
578
580
shardman_shardlord_connstring , node_id , node_id );
579
581
res = PQexec (conn , sql );
582
+ pfree (sql );
580
583
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
581
584
{
582
585
shmn_elog (NOTICE , "Failed to create subscription and set node id, %s" ,
583
586
PQerrorMessage (conn ));
584
587
goto attempt_failed ;
585
588
}
586
-
587
589
PQclear (res );
588
- PQfinish (conn );
590
+
591
+ /*
592
+ * Wait until initial tablesync is completed. This is necessary as
593
+ * e.g. we might miss UPDATE statements on partitions table, triggers
594
+ * on newly added node won't fire and metadata would be inconsistent.
595
+ */
596
+ sql =
597
+ "select srrelid, srsubstate from pg_subscription_rel srel join"
598
+ " pg_subscription s on srel.srsubid = s.oid where"
599
+ " subname = 'shardman_meta_sub';" ;
600
+ while (!tablesync_done )
601
+ {
602
+ int i ;
603
+
604
+ res = PQexec (conn , sql );
605
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
606
+ {
607
+ shmn_elog (NOTICE , "Adding node %s: failed to learn sub status, %s " ,
608
+ connstr , PQerrorMessage (conn ));
609
+ goto attempt_failed ;
610
+ }
611
+
612
+ tablesync_done = true;
613
+ for (i = 0 ; i < PQntuples (res ); i ++ )
614
+ {
615
+ char * subrelid = PQgetvalue (res , i , 0 );
616
+ char subrelstate = PQgetvalue (res , i , 1 )[0 ];
617
+ if (subrelstate != 'r' )
618
+ {
619
+ tablesync_done = false;
620
+ shmn_elog (DEBUG1 ,
621
+ "adding node %s: init sync is not yet finished"
622
+ " for rel %s, its state is %c" ,
623
+ connstr , subrelid , subrelstate );
624
+ pg_usleep (shardman_poll_interval * 1000L );
625
+ SHMN_CHECK_FOR_INTERRUPTS ();
626
+ if (got_sigusr1 )
627
+ {
628
+ reset_pqconn_and_res (& conn , res );
629
+ cmd_canceled (cmd );
630
+ return ;
631
+ }
632
+ break ;
633
+ }
634
+ }
635
+ PQclear (res );
636
+ }
637
+
638
+ reset_pqconn (& conn );
589
639
590
640
/*
591
641
* Mark add_node cmd as success and node as active, we must do that in
@@ -604,11 +654,7 @@ add_node(Cmd *cmd)
604
654
return ;
605
655
606
656
attempt_failed : /* clean resources, sleep, check sigusr1 and try again */
607
- if (res != NULL )
608
- PQclear (res );
609
- if (conn != NULL )
610
- PQfinish (conn );
611
-
657
+ reset_pqconn_and_res (& conn , res );
612
658
shmn_elog (LOG , "Attempt to execute add_node failed, sleeping and retrying" );
613
659
/* TODO: sleep using waitlatch? */
614
660
pg_usleep (shardman_cmd_retry_naptime * 1000L );
@@ -711,7 +757,21 @@ rm_node(Cmd *cmd)
711
757
elog (INFO , "Node %d successfully removed" , node_id );
712
758
}
713
759
714
-
760
+ /*
761
+ * Finish pq connection and set ptr to NULL. You must be sure that the
762
+ * connection exists!
763
+ */
764
+ void
765
+ reset_pqconn (PGconn * * conn ) { PQfinish (* conn ); * conn = NULL ; }
766
+ /*
767
+ * Same, but also clear res. You must be sure that both connection and res
768
+ * exist.
769
+ */
770
+ void
771
+ reset_pqconn_and_res (PGconn * * conn , PGresult * res )
772
+ {
773
+ PQclear (res ); reset_pqconn (conn );
774
+ }
715
775
716
776
/*
717
777
* Get connstr of worker node with id node_id. Memory is palloc'ed.
0 commit comments