@@ -140,6 +140,12 @@ static XLogRecPtr pg_lsn_in_c(const char *lsn);
140
140
static struct timespec timespec_now_plus_millis (int millis );
141
141
struct timespec timespec_now (void );
142
142
143
+ static char *
144
+ get_data_lname (char const * part_name , int pub_node , int sub_node )
145
+ {
146
+ return psprintf ("shardman_data_%s_%d_%d" , part_name , pub_node , sub_node );
147
+ }
148
+
143
149
/*
144
150
* Fill MovePartState for moving partition. If src_node is
145
151
* SHMN_INVALID_NODE_ID, assume primary partition must be moved. If something
@@ -223,12 +229,14 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
223
229
if (mps -> prev_node != SHMN_INVALID_NODE_ID )
224
230
{
225
231
mps -> prev_sql = psprintf (
226
- "select shardman.part_moved_prev('%s', %d, %d);" ,
227
- part_name , mps -> cp .src_node , mps -> cp .dst_node );
232
+ "select shardman.part_moved_prev('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
233
+ part_name , mps -> cp .src_node , mps -> cp .dst_node ,
234
+ get_data_lname (part_name , shardman_my_id , mps -> cp .dst_node ));
228
235
}
229
236
mps -> dst_sql = psprintf (
230
- "select shardman.part_moved_dst('%s', %d, %d);" ,
231
- part_name , mps -> cp .src_node , mps -> cp .dst_node );
237
+ "select shardman.part_moved_dst('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
238
+ part_name , mps -> cp .src_node , mps -> cp .dst_node ,
239
+ get_data_lname (part_name , mps -> cp .dst_node , get_next_node (part_name , mps -> cp .src_node )));
232
240
if (mps -> next_node != SHMN_INVALID_NODE_ID )
233
241
{
234
242
mps -> next_sql = psprintf (
@@ -280,8 +288,9 @@ init_cr_state(CreateReplicaState *crs, const char *part_name, int32 dst_node)
280
288
* settings are not getting reloaded, but not sure why.
281
289
*/
282
290
crs -> create_data_pub_sql = psprintf (
283
- "select shardman.replica_created_create_data_pub('%s', %d, %d);" ,
284
- part_name , crs -> cp .src_node , crs -> cp .dst_node );
291
+ "select shardman.replica_created_create_data_pub('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
292
+ part_name , crs -> cp .src_node , crs -> cp .dst_node ,
293
+ get_data_lname (part_name , crs -> cp .src_node , crs -> cp .dst_node ));
285
294
crs -> create_data_sub_sql = psprintf (
286
295
"select shardman.replica_created_create_data_sub('%s', %d, %d);" ,
287
296
part_name , crs -> cp .src_node , crs -> cp .dst_node );
@@ -338,10 +347,11 @@ init_cp_state(CopyPartState *cps)
338
347
* transaction that performed writes
339
348
*/
340
349
cps -> src_create_pub_and_rs_sql = psprintf (
341
- "begin; drop publication if exists %s cascade;"
342
- " create publication %s for table %s; end;"
343
- " select shardman.create_repslot('%s');" ,
344
- cps -> logname , cps -> logname , cps -> part_name , cps -> logname
350
+ "drop publication if exists %s cascade;"
351
+ "create publication %s for table %s;"
352
+ "select shardman.drop_repslot('%s');"
353
+ "SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
354
+ cps -> logname , cps -> logname , cps -> part_name , cps -> logname , cps -> logname
345
355
);
346
356
cps -> relation = get_partition_relation (cps -> part_name );
347
357
Assert (cps -> relation != NULL );
@@ -638,6 +648,29 @@ exec_move_part(MovePartState *mps)
638
648
mps -> cp .exec_res = TASK_DONE ;
639
649
}
640
650
651
+ static bool remote_exec (PGconn * * conn , CopyPartState * cps , char * stmts )
652
+ {
653
+ char * sql = stmts , * sep ;
654
+ PGresult * res ;
655
+ while ((sep = strchr (sql , ';' )) != NULL ) {
656
+ * sep = '\0' ;
657
+ res = PQexec (* conn , sql );
658
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK && PQresultStatus (res ) != PGRES_COMMAND_OK )
659
+ {
660
+ shmn_elog (LOG , "REMOTE_EXEC: execution of query '%s' failed for paritions %s: %s" ,
661
+ sql , cps -> part_name , PQerrorMessage (* conn ));
662
+ * sep = ';' ;
663
+ reset_pqconn_and_res (conn , res );
664
+ configure_retry (cps , shardman_cmd_retry_naptime );
665
+ return false;
666
+ }
667
+ PQclear (res );
668
+ * sep = ';' ;
669
+ sql = sep + 1 ;
670
+ }
671
+ return true;
672
+ }
673
+
641
674
/*
642
675
* Reconfigure LR channel for moved primary: prev to moved, moved to next or
643
676
* both, if they exist.
@@ -648,56 +681,30 @@ exec_move_part(MovePartState *mps)
648
681
int
649
682
mp_rebuild_lr (MovePartState * mps )
650
683
{
651
- PGresult * res ;
652
-
653
684
if (mps -> prev_node != SHMN_INVALID_NODE_ID )
654
685
{
655
686
if (ensure_pqconn (& mps -> prev_conn , mps -> prev_connstr ,
656
687
(CopyPartState * ) mps ) == -1 )
657
688
return -1 ;
658
- res = PQexec (mps -> prev_conn , mps -> prev_sql );
659
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
660
- {
661
- shmn_elog (NOTICE , "Moving part %s: failed to configure LR on prev replica: %s" ,
662
- mps -> cp .part_name , PQerrorMessage (mps -> prev_conn ));
663
- reset_pqconn_and_res (& mps -> prev_conn , res );
664
- configure_retry ((CopyPartState * ) mps , shardman_cmd_retry_naptime );
689
+ if (!remote_exec (& mps -> prev_conn , (CopyPartState * )mps , mps -> prev_sql ))
665
690
return -1 ;
666
- }
667
- PQclear (res );
668
691
shmn_elog (DEBUG1 , "mp %s: LR conf on prev done" , mps -> cp .part_name );
669
692
}
670
693
671
694
if (ensure_pqconn_cp ((CopyPartState * ) mps ,
672
695
ENSURE_PQCONN_DST ) == -1 )
673
696
return -1 ;
674
- res = PQexec (mps -> cp .dst_conn , mps -> dst_sql );
675
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
676
- {
677
- shmn_elog (NOTICE , "Moving part %s: failed to configure LR on dst : %s" ,
678
- mps -> cp .part_name , PQerrorMessage (mps -> cp .dst_conn ));
679
- reset_pqconn_and_res (& mps -> cp .dst_conn , res );
680
- configure_retry ((CopyPartState * ) mps , shardman_cmd_retry_naptime );
697
+ if (!remote_exec (& mps -> cp .dst_conn , (CopyPartState * )mps , mps -> dst_sql ))
681
698
return -1 ;
682
- }
683
- PQclear (res );
684
699
shmn_elog (DEBUG1 , "mp %s: LR conf on dst done" , mps -> cp .part_name );
685
700
686
701
if (mps -> next_node != SHMN_INVALID_NODE_ID )
687
702
{
688
703
if (ensure_pqconn (& mps -> next_conn , mps -> next_connstr ,
689
704
(CopyPartState * ) mps ) == -1 )
690
705
return -1 ;
691
- res = PQexec (mps -> next_conn , mps -> next_sql );
692
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
693
- {
694
- shmn_elog (NOTICE , "Moving part %s: failed to configure LR on next replica: %s" ,
695
- mps -> cp .part_name , PQerrorMessage (mps -> next_conn ));
696
- reset_pqconn_and_res (& mps -> next_conn , res );
697
- configure_retry ((CopyPartState * ) mps , shardman_cmd_retry_naptime );
706
+ if (!remote_exec (& mps -> next_conn , (CopyPartState * )mps , mps -> next_sql ))
698
707
return -1 ;
699
- }
700
- PQclear (res );
701
708
shmn_elog (DEBUG1 , "mp %s: LR conf on next done" , mps -> cp .part_name );
702
709
}
703
710
@@ -747,46 +754,20 @@ exec_create_replica(CreateReplicaState *crs)
747
754
int
748
755
cr_rebuild_lr (CreateReplicaState * crs )
749
756
{
750
- PGresult * res ;
751
-
752
757
if (ensure_pqconn_cp ((CopyPartState * ) crs ,
753
758
ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
754
759
return -1 ;
755
760
756
- res = PQexec (crs -> cp .dst_conn , crs -> drop_cp_sub_sql );
757
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
758
- {
759
- shmn_elog (NOTICE , "Creating replica %s: failed to configure LR, step 1: %s" ,
760
- crs -> cp .part_name , PQerrorMessage (crs -> cp .dst_conn ));
761
- reset_pqconn_and_res (& crs -> cp .dst_conn , res );
762
- configure_retry ((CopyPartState * ) crs , shardman_cmd_retry_naptime );
761
+ if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs , crs -> drop_cp_sub_sql ))
763
762
return -1 ;
764
- }
765
- PQclear (res );
766
763
shmn_elog (DEBUG1 , "cr %s: drop_cp_sub done" , crs -> cp .part_name );
767
764
768
- res = PQexec (crs -> cp .src_conn , crs -> create_data_pub_sql );
769
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
770
- {
771
- shmn_elog (NOTICE , "Creating replica %s: failed to configure LR, step 2: %s" ,
772
- crs -> cp .part_name , PQerrorMessage (crs -> cp .src_conn ));
773
- reset_pqconn_and_res (& crs -> cp .src_conn , res );
774
- configure_retry ((CopyPartState * ) crs , shardman_cmd_retry_naptime );
765
+ if (!remote_exec (& crs -> cp .src_conn , (CopyPartState * ) crs , crs -> create_data_pub_sql ))
775
766
return -1 ;
776
- }
777
- PQclear (res );
778
767
shmn_elog (DEBUG1 , "cr %s: create_data_pub done" , crs -> cp .part_name );
779
768
780
- res = PQexec (crs -> cp .dst_conn , crs -> create_data_sub_sql );
781
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
782
- {
783
- shmn_elog (NOTICE , "Creating replica %s: failed to configure LR, step 3: %s" ,
784
- crs -> cp .part_name , PQerrorMessage (crs -> cp .dst_conn ));
785
- reset_pqconn_and_res (& crs -> cp .dst_conn , res );
786
- configure_retry ((CopyPartState * ) crs , shardman_cmd_retry_naptime );
769
+ if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs , crs -> create_data_sub_sql ))
787
770
return -1 ;
788
- }
789
- PQclear (res );
790
771
shmn_elog (DEBUG1 , "cr %s: create_data_sub done" , crs -> cp .part_name );
791
772
792
773
return 0 ;
@@ -847,7 +828,6 @@ exec_cp(CopyPartState *cps)
847
828
int
848
829
cp_start_tablesync (CopyPartState * cps )
849
830
{
850
- PGresult * res ;
851
831
XLogRecPtr lord_lsn = GetXLogWriteRecPtr ();
852
832
853
833
if (ensure_pqconn_cp (cps , ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
@@ -878,37 +858,16 @@ cp_start_tablesync(CopyPartState *cps)
878
858
goto fail ;
879
859
}
880
860
881
- res = PQexec (cps -> dst_conn , cps -> dst_drop_sub_sql );
882
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
883
- {
884
- shmn_elog (NOTICE , "Failed to drop sub on dst: %s" ,
885
- PQerrorMessage (cps -> dst_conn ));
886
- reset_pqconn_and_res (& cps -> dst_conn , res );
861
+ if (!remote_exec (& cps -> dst_conn , cps , cps -> dst_drop_sub_sql ))
887
862
goto fail ;
888
- }
889
- PQclear (res );
890
863
shmn_elog (DEBUG1 , "cp %s: sub on dst dropped, if any" , cps -> part_name );
891
864
892
- res = PQexec (cps -> src_conn , cps -> src_create_pub_and_rs_sql );
893
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
894
- {
895
- shmn_elog (NOTICE , "Failed to create pub and repslot on src: %s" ,
896
- PQerrorMessage (cps -> src_conn ));
897
- reset_pqconn_and_res (& cps -> src_conn , res );
865
+ if (!remote_exec (& cps -> src_conn , cps , cps -> src_create_pub_and_rs_sql ))
898
866
goto fail ;
899
- }
900
- PQclear (res );
901
867
shmn_elog (DEBUG1 , "cp %s: pub and rs recreated on src" , cps -> part_name );
902
868
903
- res = PQexec (cps -> dst_conn , cps -> dst_create_tab_and_sub_sql );
904
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
905
- {
906
- shmn_elog (NOTICE , "Failed to recreate table & sub on dst: %s" ,
907
- PQerrorMessage (cps -> dst_conn ));
908
- reset_pqconn_and_res (& cps -> dst_conn , res );
869
+ if (!remote_exec (& cps -> dst_conn , cps , cps -> dst_create_tab_and_sub_sql ))
909
870
goto fail ;
910
- }
911
- PQclear (res );
912
871
shmn_elog (DEBUG1 , "cp %s: table & sub created on dst, tablesync started" ,
913
872
cps -> part_name );
914
873
0 commit comments