@@ -222,24 +222,32 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
222
222
223
223
if (mps -> prev_node != SHMN_INVALID_NODE_ID )
224
224
{
225
+ char * prev_dst_lname = get_data_lname (part_name , shardman_my_id ,
226
+ mps -> cp .dst_node );
225
227
mps -> prev_sql = psprintf (
226
228
"select shardman.part_moved_prev('%s', %d, %d);"
227
229
" select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
228
- part_name , mps -> cp .src_node , mps -> cp .dst_node ,
229
- get_data_lname (part_name , shardman_my_id , mps -> cp .dst_node ));
230
+ part_name , mps -> cp .src_node , mps -> cp .dst_node , prev_dst_lname );
231
+
232
+ mps -> sync_standby_prev_sql = psprintf (
233
+ "select shardman.ensure_sync_standby('%s');" , prev_dst_lname );
230
234
}
231
235
mps -> dst_sql = psprintf (
232
236
"select shardman.part_moved_dst('%s', %d, %d);" ,
233
237
part_name , mps -> cp .src_node , mps -> cp .dst_node );
234
238
if (mps -> next_node != SHMN_INVALID_NODE_ID )
235
239
{
240
+ char * dst_next_lname = get_data_lname (part_name , mps -> cp .dst_node ,
241
+ mps -> next_node );
236
242
mps -> next_sql = psprintf (
237
243
"select shardman.part_moved_next('%s', %d, %d);" ,
238
244
part_name , mps -> cp .src_node , mps -> cp .dst_node );
239
245
mps -> dst_sql = psprintf (
240
246
"%s select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
241
- mps -> dst_sql ,
242
- get_data_lname (part_name , mps -> cp .dst_node , mps -> next_node ));
247
+ mps -> dst_sql , dst_next_lname );
248
+
249
+ mps -> sync_standby_dst_sql = psprintf (
250
+ "select shardman.ensure_sync_standby('%s');" , dst_next_lname );
243
251
}
244
252
}
245
253
@@ -287,6 +295,11 @@ init_cr_state(CreateReplicaState *crs, const char *part_name, int32 dst_node)
287
295
crs -> create_data_sub_sql = psprintf (
288
296
"select shardman.replica_created_create_data_sub('%s', %d, %d);" ,
289
297
part_name , crs -> cp .src_node , crs -> cp .dst_node );
298
+ crs -> sync_standby_sql = psprintf (
299
+ "select shardman.ensure_sync_standby('%s');"
300
+ " select shardman.readonly_table_off('%s'::regclass);" ,
301
+ get_data_lname (part_name , crs -> cp .src_node , crs -> cp .dst_node ),
302
+ part_name );
290
303
}
291
304
292
305
/*
@@ -386,18 +399,14 @@ static void finalize_cp_state(CopyPartState *cps)
386
399
/* Failed tasks never open pq connections */
387
400
if (cps -> res != TASK_FAILED )
388
401
{
389
- if (cps -> src_conn != NULL )
390
- reset_pqconn (& cps -> src_conn );
391
- if (cps -> dst_conn != NULL )
392
- reset_pqconn (& cps -> dst_conn );
402
+ reset_pqconn (& cps -> src_conn );
403
+ reset_pqconn (& cps -> dst_conn );
393
404
if (cps -> type == COPYPARTTASK_MOVE_PRIMARY ||
394
405
cps -> type == COPYPARTTASK_MOVE_REPLICA )
395
406
{
396
407
MovePartState * mps = (MovePartState * ) cps ;
397
- if (mps -> prev_conn != NULL )
398
- reset_pqconn (& mps -> prev_conn );
399
- if (mps -> next_conn != NULL )
400
- reset_pqconn (& mps -> next_conn );
408
+ reset_pqconn (& mps -> prev_conn );
409
+ reset_pqconn (& mps -> next_conn );
401
410
}
402
411
}
403
412
}
@@ -649,7 +658,8 @@ exec_move_part(MovePartState *mps)
649
658
*/
650
659
static bool remote_exec (PGconn * * conn , CopyPartState * cps , char * stmts )
651
660
{
652
- char * sql = stmts , * sep ;
661
+ char * sql = stmts ;
662
+ char * sep ;
653
663
PGresult * res ;
654
664
while ((sep = strchr (sql , ';' )) != NULL ) {
655
665
* sep = '\0' ;
@@ -698,6 +708,14 @@ mp_rebuild_lr(MovePartState *mps)
698
708
return -1 ;
699
709
shmn_elog (DEBUG1 , "mp %s: LR conf on dst done" , mps -> cp .part_name );
700
710
711
+ if (mps -> prev_node != SHMN_INVALID_NODE_ID )
712
+ {
713
+ if (!remote_exec (& mps -> prev_conn , (CopyPartState * ) mps ,
714
+ mps -> sync_standby_prev_sql ))
715
+ return -1 ;
716
+ shmn_elog (DEBUG1 , "mp %s: make sync standby on prev" , mps -> cp .part_name );
717
+ }
718
+
701
719
if (mps -> next_node != SHMN_INVALID_NODE_ID )
702
720
{
703
721
if (ensure_pqconn (& mps -> next_conn , mps -> next_connstr ,
@@ -706,6 +724,10 @@ mp_rebuild_lr(MovePartState *mps)
706
724
if (!remote_exec (& mps -> next_conn , (CopyPartState * ) mps , mps -> next_sql ))
707
725
return -1 ;
708
726
shmn_elog (DEBUG1 , "mp %s: LR conf on next done" , mps -> cp .part_name );
727
+
728
+ if (!remote_exec (& mps -> cp .dst_conn , (CopyPartState * ) mps ,
729
+ mps -> sync_standby_dst_sql ))
730
+ return -1 ;
709
731
}
710
732
711
733
return 0 ;
@@ -742,14 +764,6 @@ exec_create_replica(CreateReplicaState *crs)
742
764
* Work to do in general is described below. We execute them in steps written
743
765
* in parentheses so that every time we create sub, pub is already exist and
744
766
* every time we drop pub, sub is already dropped.
745
- * On old tail node, i.e. src:
746
- * - Drop repslot & pub used for copy (create_data_pub)
747
- * - Create repslot & pub for new data channel (create_data_pub)
748
- * - Make it synchronous; make table writable. (create_data_pub)
749
- * On new tail node, i.e. dst:
750
- * - Make table read-only for all but apply workers (drop_cp_sub)
751
- * - Drop sub used for copy (drop_cp_sub)
752
- * - Create sub for new data channel. (create_data_sub)
753
767
*/
754
768
int
755
769
cr_rebuild_lr (CreateReplicaState * crs )
@@ -758,18 +772,25 @@ cr_rebuild_lr(CreateReplicaState *crs)
758
772
ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
759
773
return -1 ;
760
774
761
- if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs , crs -> drop_cp_sub_sql ))
775
+ if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs ,
776
+ crs -> drop_cp_sub_sql ))
762
777
return -1 ;
763
778
shmn_elog (DEBUG1 , "cr %s: drop_cp_sub done" , crs -> cp .part_name );
764
779
765
780
if (!remote_exec (& crs -> cp .src_conn , (CopyPartState * ) crs , crs -> create_data_pub_sql ))
766
781
return -1 ;
767
782
shmn_elog (DEBUG1 , "cr %s: create_data_pub done" , crs -> cp .part_name );
768
783
769
- if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs , crs -> create_data_sub_sql ))
784
+ if (!remote_exec (& crs -> cp .dst_conn , (CopyPartState * ) crs ,
785
+ crs -> create_data_sub_sql ))
770
786
return -1 ;
771
787
shmn_elog (DEBUG1 , "cr %s: create_data_sub done" , crs -> cp .part_name );
772
788
789
+ if (!remote_exec (& crs -> cp .src_conn , (CopyPartState * ) crs ,
790
+ crs -> sync_standby_sql ))
791
+ return -1 ;
792
+ shmn_elog (DEBUG1 , "cr %s: sync_standby done" , crs -> cp .part_name );
793
+
773
794
return 0 ;
774
795
}
775
796
@@ -1075,17 +1096,24 @@ ensure_pqconn(PGconn **conn, const char *connstr,
1075
1096
}
1076
1097
if (* conn == NULL )
1077
1098
{
1099
+ char s [] = "set session synchronous_commit to local;" ;
1100
+
1078
1101
Assert (connstr != NULL );
1079
1102
* conn = PQconnectdb (connstr );
1080
1103
if (PQstatus (* conn ) != CONNECTION_OK )
1081
1104
{
1082
- shmn_elog (NOTICE , "Connection to node %s failed: %s" ,
1083
- connstr , PQerrorMessage (* conn ));
1105
+ shmn_elog (NOTICE , "Connection to node %s failed: %s" , connstr ,
1106
+ PQerrorMessage (* conn ));
1084
1107
reset_pqconn (conn );
1085
1108
configure_retry (cps , shardman_cmd_retry_naptime );
1086
1109
return -1 ;
1087
1110
}
1088
1111
shmn_elog (DEBUG1 , "Connection to %s established" , connstr );
1112
+
1113
+ /* All our cmds don't need to wait for sync replication */
1114
+ /* remote_exec modifies sql, so it must be writable */
1115
+ if (!remote_exec (conn , cps , s ))
1116
+ return -1 ;
1089
1117
}
1090
1118
return 0 ;
1091
1119
}
0 commit comments