88
88
*/
89
89
#include "postgres.h"
90
90
#include "libpq-fe.h"
91
+ #include "access/xlog.h"
91
92
#include "access/xlogdefs.h"
93
+ #include "catalog/pg_subscription_rel.h"
92
94
#include "utils/pg_lsn.h"
93
95
#include "utils/builtins.h"
94
96
#include "lib/ilist.h"
@@ -125,12 +127,16 @@ static void exec_create_replica(CreateReplicaState *cps);
125
127
static int mp_rebuild_lr (MovePartState * cps );
126
128
static int cr_rebuild_lr (CreateReplicaState * cps );
127
129
static int cp_start_tablesync (CopyPartState * cpts );
130
+ static int check_sub_sync (const char * subname , PGconn * * conn ,
131
+ XLogRecPtr ref_lsn , const char * log_pref );
128
132
static int cp_start_finalsync (CopyPartState * cpts );
129
133
static int cp_finalize (CopyPartState * cpts );
130
134
static int ensure_pqconn_cp (CopyPartState * cpts , int nodes );
131
135
static int ensure_pqconn (PGconn * * conn , const char * connstr ,
132
136
CopyPartState * cps );
133
137
static void configure_retry (CopyPartState * cpts , int millis );
138
+ static char * received_lsn_sql (const char * subname );
139
+ static XLogRecPtr pg_lsn_in_c (const char * lsn );
134
140
static struct timespec timespec_now_plus_millis (int millis );
135
141
struct timespec timespec_now (void );
136
142
@@ -297,7 +303,7 @@ init_cp_state(CopyPartState *cps)
297
303
Assert (cps -> dst_node != 0 );
298
304
Assert (cps -> part_name != NULL );
299
305
300
- /* Check that table with such name is not already exists on dst node */
306
+ /* Check that table with such name does not already exist on dst node */
301
307
sql = psprintf (
302
308
"select owner from shardman.partitions where part_name = '%s' and owner = %d" ,
303
309
cps -> part_name , cps -> dst_node );
@@ -359,18 +365,11 @@ init_cp_state(CopyPartState *cps)
359
365
cps -> part_name , cps -> relation ,
360
366
cps -> logname ,
361
367
cps -> logname , cps -> src_connstr , cps -> logname , cps -> logname );
362
- cps -> substate_sql = psprintf (
363
- "select srsubstate from pg_subscription_rel srel join pg_subscription"
364
- " s on srel.srsubid = s.oid where subname = '%s';" ,
365
- cps -> logname
366
- );
368
+ cps -> substate_sql = get_substate_sql (cps -> logname );
367
369
cps -> readonly_sql = psprintf (
368
370
"select shardman.readonly_table_on('%s')" , cps -> part_name
369
371
);
370
- cps -> received_lsn_sql = psprintf (
371
- "select received_lsn from pg_stat_subscription where subname = '%s'" ,
372
- cps -> logname
373
- );
372
+ cps -> received_lsn_sql = received_lsn_sql (cps -> logname );
374
373
375
374
cps -> curstep = COPYPART_START_TABLESYNC ;
376
375
cps -> res = TASK_IN_PROGRESS ;
@@ -816,7 +815,7 @@ cr_rebuild_lr(CreateReplicaState *crs)
816
815
* i.e. when received_lsn is equal to remembered lsn on src. This is harder
817
816
* to replace with notify, but we can try that too.
818
817
* - Done. After successfull execution, we are left with two copies of the
819
- * table with src one locked for writes and with LR channel configured
818
+ * table with src locked for writes and with LR channel configured
820
819
* between them. TODO: drop channel here, because we don't reuse it anyway.
821
820
* Currently we drop the channel in metadata update triggers.
822
821
*/
@@ -849,18 +848,42 @@ int
849
848
cp_start_tablesync (CopyPartState * cps )
850
849
{
851
850
PGresult * res ;
851
+ XLogRecPtr lord_lsn = GetXLogWriteRecPtr ();
852
852
853
853
if (ensure_pqconn_cp (cps , ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
854
854
return -1 ;
855
855
856
+ /*
857
+ * Make sure that meta sub is up-to-date on src and dst. If not, subtle
858
+ * bugs may arise: imagine we move part from x to y, and then immediately
859
+ * create replica on x from y back again. During repl creation we delete
860
+ * old real partition on x before meta row about part move reaches x. When
861
+ * it finally arrives, we try to replace real partition with fdw one, but
862
+ * the former was dropped.
863
+ *
864
+ * We get current lsn and verify that lsn of src and dst is as big as
865
+ * ours. Obviously, during this check other backends might increase lsn,
866
+ * but we rely on fact that shardlord itself is single-threaded, so
867
+ * external changes are not interesting.
868
+ */
869
+ if (check_sub_sync ("shardman_meta_sub" , & cps -> src_conn , lord_lsn ,
870
+ "meta sub" ) == -1 )
871
+ {
872
+ goto fail ;
873
+ }
874
+ if (check_sub_sync ("shardman_meta_sub" , & cps -> dst_conn , lord_lsn ,
875
+ "meta sub" ) == -1 )
876
+ {
877
+ goto fail ;
878
+ }
879
+
856
880
res = PQexec (cps -> dst_conn , cps -> dst_drop_sub_sql );
857
881
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
858
882
{
859
883
shmn_elog (NOTICE , "Failed to drop sub on dst: %s" ,
860
884
PQerrorMessage (cps -> dst_conn ));
861
885
reset_pqconn_and_res (& cps -> dst_conn , res );
862
- configure_retry (cps , shardman_cmd_retry_naptime );
863
- return -1 ;
886
+ goto fail ;
864
887
}
865
888
PQclear (res );
866
889
shmn_elog (DEBUG1 , "cp %s: sub on dst dropped, if any" , cps -> part_name );
@@ -871,8 +894,7 @@ cp_start_tablesync(CopyPartState *cps)
871
894
shmn_elog (NOTICE , "Failed to create pub and repslot on src: %s" ,
872
895
PQerrorMessage (cps -> src_conn ));
873
896
reset_pqconn_and_res (& cps -> src_conn , res );
874
- configure_retry (cps , shardman_cmd_retry_naptime );
875
- return -1 ;
897
+ goto fail ;
876
898
}
877
899
PQclear (res );
878
900
shmn_elog (DEBUG1 , "cp %s: pub and rs recreated on src" , cps -> part_name );
@@ -883,15 +905,56 @@ cp_start_tablesync(CopyPartState *cps)
883
905
shmn_elog (NOTICE , "Failed to recreate table & sub on dst: %s" ,
884
906
PQerrorMessage (cps -> dst_conn ));
885
907
reset_pqconn_and_res (& cps -> dst_conn , res );
886
- configure_retry (cps , shardman_cmd_retry_naptime );
887
- return -1 ;
908
+ goto fail ;
888
909
}
889
910
PQclear (res );
890
911
shmn_elog (DEBUG1 , "cp %s: table & sub created on dst, tablesync started" ,
891
912
cps -> part_name );
892
913
893
914
cps -> curstep = COPYPART_START_FINALSYNC ;
894
915
return 0 ;
916
+
917
+ fail :
918
+ configure_retry (cps , shardman_cmd_retry_naptime );
919
+ return -1 ;
920
+ }
921
+
922
+ /*
923
+ * Ask node via given PGconn about last received lsn for given sub and compare
924
+ * it to given ref_lsn. If node's lsn lags behind or libpq failed, return -1,
925
+ * otherwise 0. Log messages are prefixed with log_pref. Subscription must
926
+ * exist.
927
+ */
928
+ int
929
+ check_sub_sync (const char * subname , PGconn * * conn , XLogRecPtr ref_lsn ,
930
+ const char * log_pref )
931
+ {
932
+ PGresult * res ;
933
+ char * received_lsn_str ;
934
+ XLogRecPtr received_lsn ;
935
+ char * sql = received_lsn_sql (subname );
936
+
937
+ res = PQexec (* conn , sql );
938
+ pfree (sql );
939
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
940
+ {
941
+ shmn_elog (LOG , "%s: failed to learn sub lsn on src: %s" ,
942
+ log_pref , PQerrorMessage (* conn ));
943
+ reset_pqconn_and_res (conn , res );
944
+ return -1 ;
945
+ }
946
+ Assert (PQntuples (res ) == 1 );
947
+ received_lsn_str = PQgetvalue (res , 0 , 0 );
948
+ shmn_elog (DEBUG1 , "%s: received_lsn is %s" , log_pref , received_lsn_str );
949
+ received_lsn = pg_lsn_in_c (received_lsn_str );
950
+ PQclear (res );
951
+ if (received_lsn < ref_lsn )
952
+ {
953
+ shmn_elog (DEBUG1 , "%s: sub is not yet synced, received_lsn is %lu, "
954
+ " but we wait for %lu" , log_pref , received_lsn , ref_lsn );
955
+ return -1 ;
956
+ }
957
+ return 0 ;
895
958
}
896
959
897
960
/*
905
968
cp_start_finalsync (CopyPartState * cps )
906
969
{
907
970
PGresult * res ;
908
- int ntups ;
909
971
char substate ;
910
972
char * sync_point ;
911
973
@@ -921,20 +983,9 @@ cp_start_finalsync(CopyPartState *cps)
921
983
configure_retry (cps , shardman_cmd_retry_naptime );
922
984
return -1 ;
923
985
}
924
- ntups = PQntuples (res );
925
- if (ntups != 1 )
926
- {
927
- shmn_elog (WARNING , "cp %s: num of subrels != 1" , cps -> part_name );
928
- /*
929
- * Since several or 0 subrels is absolutely wrong situtation, we start
930
- * from the beginning.
931
- */
932
- cps -> curstep = COPYPART_START_TABLESYNC ;
933
- configure_retry (cps , shardman_cmd_retry_naptime );
934
- return -1 ;
935
- }
986
+ Assert (PQntuples (res ) == 1 );
936
987
substate = PQgetvalue (res , 0 , 0 )[0 ];
937
- if (substate != 'r' )
988
+ if (substate != SUBREL_STATE_READY )
938
989
{
939
990
shmn_elog (DEBUG1 , "cp %s: init sync is not yet finished, its state"
940
991
" is %c" , cps -> part_name , substate );
@@ -976,41 +1027,18 @@ cp_start_finalsync(CopyPartState *cps)
976
1027
}
977
1028
978
1029
/*
979
- * Wait until final sync is done and update metadata . Returns -1 if anything
980
- * goes wrong and 0 otherwise.
1030
+ * Check that final sync is done and update curstep . Returns -1 if anything
1031
+ * goes wrong or sync is not finished and 0 otherwise.
981
1032
*/
982
1033
int
983
1034
cp_finalize (CopyPartState * cps )
984
1035
{
985
-
986
- PGresult * res ;
987
- XLogRecPtr received_lsn ;
988
- char * received_lsn_str ;
989
-
990
1036
if (ensure_pqconn_cp (cps , ENSURE_PQCONN_DST ) == -1 )
991
1037
return -1 ;
992
1038
993
- res = PQexec ( cps -> dst_conn , cps -> received_lsn_sql );
994
- if ( PQresultStatus ( res ) != PGRES_TUPLES_OK )
1039
+ if ( check_sub_sync ( cps -> logname , & cps -> dst_conn , cps -> sync_point ,
1040
+ cps -> part_name ) == -1 )
995
1041
{
996
- shmn_elog (NOTICE , "Failed to learn received_lsn on dst: %s" ,
997
- PQerrorMessage (cps -> dst_conn ));
998
- reset_pqconn_and_res (& cps -> dst_conn , res );
999
- configure_retry (cps , shardman_cmd_retry_naptime );
1000
- return -1 ;
1001
- }
1002
- received_lsn_str = PQgetvalue (res , 0 , 0 );
1003
- shmn_elog (DEBUG1 , "cp %s: received_lsn is %s" , cps -> part_name ,
1004
- received_lsn_str );
1005
- received_lsn = DatumGetLSN (DirectFunctionCall1Coll (
1006
- pg_lsn_in , InvalidOid ,
1007
- CStringGetDatum (received_lsn_str )));
1008
- PQclear (res );
1009
- if (received_lsn < cps -> sync_point )
1010
- {
1011
- shmn_elog (DEBUG1 , "cp %s: final sync is not yet finished,"
1012
- "received_lsn is %lu, but we wait for %lu" ,
1013
- cps -> part_name , received_lsn , cps -> sync_point );
1014
1042
configure_retry (cps , shardman_poll_interval );
1015
1043
return -1 ;
1016
1044
}
@@ -1080,6 +1108,26 @@ void configure_retry(CopyPartState *cps, int millis)
1080
1108
cps -> exec_res = TASK_WAKEMEUP ;
1081
1109
}
1082
1110
1111
+ /*
1112
+ * SQL to get last received lsn for given subscription
1113
+ */
1114
+ char *
1115
+ received_lsn_sql (const char * subname )
1116
+ {
1117
+ return psprintf ("select received_lsn from pg_stat_subscription"
1118
+ " where subname = '%s';" , subname );
1119
+ }
1120
+
1121
+ /*
1122
+ * Convert C string lsn in standard form to binary format.
1123
+ */
1124
+ XLogRecPtr
1125
+ pg_lsn_in_c (const char * lsn )
1126
+ {
1127
+ return DatumGetLSN (DirectFunctionCall1Coll (pg_lsn_in , InvalidOid ,
1128
+ CStringGetDatum (lsn )));
1129
+ }
1130
+
1083
1131
/*
1084
1132
* Get current CLOCK_MONOTONIC time. Fails with PG elog(FATAL) if gettime
1085
1133
* failed.
0 commit comments