7
7
*/
8
8
#include "postgres.h"
9
9
#include "libpq-fe.h"
10
+ #include "access/xlogdefs.h"
11
+ #include "utils/pg_lsn.h"
12
+ #include "utils/builtins.h"
10
13
#include "lib/ilist.h"
11
14
12
15
#include <unistd.h>
@@ -40,11 +43,15 @@ typedef enum
40
43
EXECMOVEMPART_DONE /* the work is done, never invoke me again */
41
44
} ExecMoveMPartRes ;
42
45
43
- /* Current step of 1 master partition move */
46
+ /*
47
+ * Current step of 1 master partition move. See comments to corresponding
48
+ * funcs, e.g. start_tablesync.
49
+ */
44
50
typedef enum
45
51
{
46
52
MOVEMPARTSTEP_START_TABLESYNC ,
47
- MOVEMPARTSTEP_WAIT_TABLESYNC
53
+ MOVEMPARTSTEP_START_FINALSYNC ,
54
+ MOVEMPARTSTEP_FINALIZE
48
55
} MoveMPartStep ;
49
56
50
57
typedef struct
@@ -72,7 +79,12 @@ typedef struct
72
79
char * src_create_pub_and_rs_sql ; /* create publ and repslot on src */
73
80
char * relation ; /* name of sharded relation */
74
81
char * dst_create_tab_and_sub_sql ; /* create table and sub on dst */
82
+ char * substate_sql ; /* get current state of subscription */
83
+ char * readonly_sql ; /* make src table read-only */
84
+ char * received_lsn_sql ; /* get last received lsn on dst */
85
+ char * update_metadata_sql ;
75
86
87
+ XLogRecPtr sync_point ; /* when dst reached this point, it is synced */
76
88
MoveMPartStep curstep ; /* current step */
77
89
ExecMoveMPartRes exec_res ; /* result of the last iteration */
78
90
MoveMPartRes res ; /* result of the whole move */
@@ -92,6 +104,8 @@ static int calc_timeout(slist_head *timeout_states);
92
104
static void epoll_subscribe (int epfd , MoveMPartState * mmps );
93
105
static void exec_move_mpart (MoveMPartState * mmps );
94
106
static int start_tablesync (MoveMPartState * mmpts );
107
+ static int start_finalsync (MoveMPartState * mmpts );
108
+ static int finalize (MoveMPartState * mmpts );
95
109
static int ensure_pqconn (MoveMPartState * mmpts , int nodes );
96
110
static int ensure_pqconn_intern (PGconn * * conn , const char * connstr ,
97
111
MoveMPartState * mmps );
@@ -229,7 +243,7 @@ create_hash_partitions(Cmd *cmd)
229
243
* - Sleep & check in connection to the dest waiting for completion of the
230
244
* initial sync. Later this should be substituted with listen/notify.
231
245
* - When done, lock writes (better lock reads too) on source and remember
232
- * current wal lsn on it.
246
+ * pg_current_wal_lsn() on it.
233
247
* - Now final sync has started, remember that at least in ram.
234
248
* - Sleep & check in connection to dest waiting for completion of final sync,
235
249
* i.e. when received_lsn is equal to remembered lsn on src.
@@ -346,6 +360,21 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
346
360
mmps -> part_name , mmps -> relation ,
347
361
mmps -> logname ,
348
362
mmps -> logname , mmps -> src_connstr , mmps -> logname , mmps -> logname );
363
+ mmps -> substate_sql = psprintf (
364
+ "select srsubstate from pg_subscription_rel srel join pg_subscription"
365
+ " s on srel.srsubid = s.oid where subname = '%s';" ,
366
+ mmps -> logname
367
+ );
368
+ mmps -> readonly_sql = psprintf (
369
+ "select shardman.readonly_table_on('%s')" , mmps -> part_name
370
+ );
371
+ mmps -> received_lsn_sql = psprintf (
372
+ "select received_lsn from pg_stat_subscription where subname = '%s'" ,
373
+ mmps -> logname
374
+ );
375
+ mmps -> update_metadata_sql = psprintf (
376
+ "update shardman.partitions set owner = %d where part_name = '%s';" ,
377
+ mmps -> dst_node , mmps -> part_name );
349
378
350
379
mmps -> curstep = MOVEMPARTSTEP_START_TABLESYNC ;
351
380
mmps -> res = MOVEMPART_IN_PROGRESS ;
@@ -405,7 +434,6 @@ move_mparts(MoveMPartState *mmpss, int nparts)
405
434
if ((epfd = epoll_create1 (0 )) == -1 )
406
435
shmn_elog (FATAL , "epoll_create1 failed" );
407
436
408
- /* TODO: check for signals */
409
437
while (unfinished_moves > 0 && !got_sigusr1 && !got_sigterm )
410
438
{
411
439
timeout = calc_timeout (& timeout_states );
@@ -567,13 +595,13 @@ exec_move_mpart(MoveMPartState *mmps)
567
595
{
568
596
if (start_tablesync (mmps ) == -1 )
569
597
return ;
570
- else
571
- mmps -> curstep = MOVEMPARTSTEP_WAIT_TABLESYNC ;
572
598
}
573
-
574
- shmn_elog (DEBUG1 , "Partition %s is moved" , mmps -> part_name );
575
- mmps -> res = MOVEMPART_SUCCESS ;
576
- mmps -> exec_res = EXECMOVEMPART_DONE ;
599
+ if (mmps -> curstep == MOVEMPARTSTEP_START_FINALSYNC )
600
+ {
601
+ if (start_finalsync (mmps ) == -1 )
602
+ return ;
603
+ }
604
+ finalize (mmps );
577
605
}
578
606
579
607
/*
@@ -625,6 +653,136 @@ start_tablesync(MoveMPartState *mmps)
625
653
shmn_elog (DEBUG1 , "mmp %s: table & sub created on dst, tablesync started" ,
626
654
mmps -> part_name );
627
655
656
+ mmps -> curstep = MOVEMPARTSTEP_START_FINALSYNC ;
657
+ return 0 ;
658
+ }
659
+
660
+ /*
661
+ * - wait until initial sync is done;
662
+ * - make src read only and save its pg_current_wal() in mmps;
663
+ * - now we are ready to wait for final sync
664
+ * Returns -1 if anything goes wrong and 0 otherwise. current wal is saved
665
+ * in mmps.
666
+ */
667
+ int
668
+ start_finalsync (MoveMPartState * mmps )
669
+ {
670
+ PGresult * res ;
671
+ int ntups ;
672
+ char substate ;
673
+ char * sync_point ;
674
+
675
+ if (ensure_pqconn (mmps , ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
676
+ return -1 ;
677
+
678
+ res = PQexec (mmps -> dst_conn , mmps -> substate_sql );
679
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
680
+ {
681
+ shmn_elog (NOTICE , "Failed to learn sub status on dst: %s" ,
682
+ PQerrorMessage (mmps -> dst_conn ));
683
+ reset_pqconn_and_res (& mmps -> dst_conn , res );
684
+ configure_retry (mmps , shardman_cmd_retry_naptime );
685
+ return -1 ;
686
+ }
687
+ ntups = PQntuples (res );
688
+ if (ntups != 1 )
689
+ {
690
+ shmn_elog (WARNING , "mmp %s: num of subrels != 1" , mmps -> part_name );
691
+ /*
692
+ * Since several or 0 subrels is absolutely wrong situtation, we start
693
+ * from the beginning.
694
+ */
695
+ mmps -> curstep = MOVEMPARTSTEP_START_TABLESYNC ;
696
+ configure_retry (mmps , shardman_cmd_retry_naptime );
697
+ return -1 ;
698
+ }
699
+ substate = PQgetvalue (res , 0 , 0 )[0 ];
700
+ if (substate != 'r' )
701
+ {
702
+ shmn_elog (DEBUG1 , "mmp %s: init sync is not yet finished, its state"
703
+ " is %c" , mmps -> part_name , substate );
704
+ configure_retry (mmps , shardman_poll_interval );
705
+ return -1 ;
706
+ }
707
+ shmn_elog (DEBUG1 , "mmp %s: init sync finished" , mmps -> part_name );
708
+ PQclear (res );
709
+
710
+ res = PQexec (mmps -> src_conn , mmps -> readonly_sql );
711
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
712
+ {
713
+ shmn_elog (NOTICE , "Failed to make src table read only: %s" ,
714
+ PQerrorMessage (mmps -> src_conn ));
715
+ reset_pqconn_and_res (& mmps -> src_conn , res );
716
+ configure_retry (mmps , shardman_cmd_retry_naptime );
717
+ return -1 ;
718
+ }
719
+ shmn_elog (DEBUG1 , "mmp %s: src made read only" , mmps -> part_name );
720
+ PQclear (res );
721
+
722
+ res = PQexec (mmps -> src_conn , "select pg_current_wal_lsn();" );
723
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
724
+ {
725
+ shmn_elog (NOTICE , "Failed to get current lsn on src: %s" ,
726
+ PQerrorMessage (mmps -> src_conn ));
727
+ reset_pqconn_and_res (& mmps -> src_conn , res );
728
+ configure_retry (mmps , shardman_cmd_retry_naptime );
729
+ return -1 ;
730
+ }
731
+ sync_point = PQgetvalue (res , 0 , 0 );
732
+ mmps -> sync_point = DatumGetLSN (DirectFunctionCall1Coll (pg_lsn_in , InvalidOid ,
733
+ CStringGetDatum (sync_point )));
734
+ shmn_elog (DEBUG1 , "mmp %s: sync lsn is %s" , mmps -> part_name , sync_point );
735
+ PQclear (res );
736
+
737
+ mmps -> curstep = MOVEMPARTSTEP_FINALIZE ;
738
+ return 0 ;
739
+ }
740
+
741
+ /*
742
+ * Wait until final sync is done and update metadata. Returns -1 if anything
743
+ * goes wrong and 0 otherwise.
744
+ */
745
+ int
746
+ finalize (MoveMPartState * mmps )
747
+ {
748
+
749
+ PGresult * res ;
750
+ XLogRecPtr received_lsn ;
751
+ char * received_lsn_str ;
752
+
753
+ if (ensure_pqconn (mmps , ENSURE_PQCONN_DST ) == -1 )
754
+ return -1 ;
755
+
756
+ res = PQexec (mmps -> dst_conn , mmps -> received_lsn_sql );
757
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
758
+ {
759
+ shmn_elog (NOTICE , "Failed to learn received_lsn on dst: %s" ,
760
+ PQerrorMessage (mmps -> dst_conn ));
761
+ reset_pqconn_and_res (& mmps -> dst_conn , res );
762
+ configure_retry (mmps , shardman_cmd_retry_naptime );
763
+ return -1 ;
764
+ }
765
+ received_lsn_str = PQgetvalue (res , 0 , 0 );
766
+ shmn_elog (DEBUG1 , "mmp %s: received_lsn is %s" , mmps -> part_name ,
767
+ received_lsn_str );
768
+ received_lsn = DatumGetLSN (DirectFunctionCall1Coll (
769
+ pg_lsn_in , InvalidOid ,
770
+ CStringGetDatum (received_lsn_str )));
771
+ PQclear (res );
772
+ if (received_lsn < mmps -> sync_point )
773
+ {
774
+ shmn_elog (DEBUG1 , "mmp %s: final sync is not yet finished,"
775
+ "received_lsn is %lu, but we wait for %lu" ,
776
+ mmps -> part_name , received_lsn , mmps -> sync_point );
777
+ configure_retry (mmps , shardman_poll_interval );
778
+ return -1 ;
779
+ }
780
+
781
+ void_spi (mmps -> update_metadata_sql );
782
+
783
+ shmn_elog (DEBUG1 , "Partition %s successfully moved" , mmps -> part_name );
784
+ mmps -> res = MOVEMPART_SUCCESS ;
785
+ mmps -> exec_res = EXECMOVEMPART_DONE ;
628
786
return 0 ;
629
787
}
630
788
0 commit comments