74
74
* specify since which lsn start replication, tables must be synced anyway
75
75
* during these operations, so what the point of reusing old sub?
76
76
*
77
- * About fdws on replicas: we have to keep partition of parent table as fdw,
78
- * because otherwise we would not be able to write anything to it. On the
79
- * other hand, keeping the whole list of replicas is a bit excessive and
80
- * slower in case of primary failure: we need actually only primary and
81
- * ourself.
82
- *
83
77
* Currently we don't save progress of separate tasks (e.g. for copy part
84
78
* whether initial sync started or done, lsn, etc), so we have to start
85
79
* everything from the ground if shardlord reboots. This is arguably fine.
@@ -140,7 +134,7 @@ static XLogRecPtr pg_lsn_in_c(const char *lsn);
140
134
static struct timespec timespec_now_plus_millis (int millis );
141
135
struct timespec timespec_now (void );
142
136
143
- static char *
137
+ static char *
144
138
get_data_lname (char const * part_name , int pub_node , int sub_node )
145
139
{
146
140
return psprintf ("shardman_data_%s_%d_%d" , part_name , pub_node , sub_node );
@@ -229,19 +223,23 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
229
223
if (mps -> prev_node != SHMN_INVALID_NODE_ID )
230
224
{
231
225
mps -> prev_sql = psprintf (
232
- "select shardman.part_moved_prev('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
226
+ "select shardman.part_moved_prev('%s', %d, %d);"
227
+ " select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
233
228
part_name , mps -> cp .src_node , mps -> cp .dst_node ,
234
229
get_data_lname (part_name , shardman_my_id , mps -> cp .dst_node ));
235
230
}
236
231
mps -> dst_sql = psprintf (
237
- "select shardman.part_moved_dst('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
238
- part_name , mps -> cp .src_node , mps -> cp .dst_node ,
239
- get_data_lname (part_name , mps -> cp .dst_node , get_next_node (part_name , mps -> cp .src_node )));
232
+ "select shardman.part_moved_dst('%s', %d, %d);" ,
233
+ part_name , mps -> cp .src_node , mps -> cp .dst_node );
240
234
if (mps -> next_node != SHMN_INVALID_NODE_ID )
241
235
{
242
236
mps -> next_sql = psprintf (
243
237
"select shardman.part_moved_next('%s', %d, %d);" ,
244
238
part_name , mps -> cp .src_node , mps -> cp .dst_node );
239
+ mps -> dst_sql = psprintf (
240
+ "%s select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
241
+ mps -> dst_sql ,
242
+ get_data_lname (part_name , mps -> cp .dst_node , mps -> next_node ));
245
243
}
246
244
}
247
245
@@ -280,17 +278,12 @@ init_cr_state(CreateReplicaState *crs, const char *part_name, int32 dst_node)
280
278
crs -> drop_cp_sub_sql = psprintf (
281
279
"select shardman.replica_created_drop_cp_sub('%s', %d, %d);" ,
282
280
part_name , crs -> cp .src_node , crs -> cp .dst_node );
283
- /*
284
- * Separate trxn for ensure_sync_standby as in init_mp_state. It is
285
- * interesting that while I got expected behaviour (hanged transaction) in
286
- * move_part if ensure_sync_standby was executed in one trxn with create
287
- * repslot and pub, here I didn't. Probably it is committing so fast that
288
- * settings are not getting reloaded, but not sure why.
289
- */
290
- crs -> create_data_pub_sql = psprintf (
291
- "select shardman.replica_created_create_data_pub('%s', %d, %d); SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
292
- part_name , crs -> cp .src_node , crs -> cp .dst_node ,
293
- get_data_lname (part_name , crs -> cp .src_node , crs -> cp .dst_node ));
281
+
282
+ crs -> create_data_pub_sql =
283
+ psprintf ("select shardman.replica_created_create_data_pub('%s', %d, %d);"
284
+ " select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
285
+ part_name , crs -> cp .src_node , crs -> cp .dst_node ,
286
+ get_data_lname (part_name , crs -> cp .src_node , crs -> cp .dst_node ));
294
287
crs -> create_data_sub_sql = psprintf (
295
288
"select shardman.replica_created_create_data_sub('%s', %d, %d);" ,
296
289
part_name , crs -> cp .src_node , crs -> cp .dst_node );
@@ -350,7 +343,7 @@ init_cp_state(CopyPartState *cps)
350
343
"drop publication if exists %s cascade;"
351
344
"create publication %s for table %s;"
352
345
"select shardman.drop_repslot('%s');"
353
- "SELECT pg_create_logical_replication_slot('%s', 'pgoutput');" ,
346
+ "select pg_create_logical_replication_slot('%s', 'pgoutput');" ,
354
347
cps -> logname , cps -> logname , cps -> part_name , cps -> logname , cps -> logname
355
348
);
356
349
cps -> relation = get_partition_relation (cps -> part_name );
@@ -648,14 +641,21 @@ exec_move_part(MovePartState *mps)
648
641
mps -> cp .exec_res = TASK_DONE ;
649
642
}
650
643
644
+ /*
645
+ * Execute given statement in separate transactions. In case of any failure
646
+ * return false, destroy connection and configure_retry on given cps.
647
+ * This function is used only for internal SQL, where we guarantee no ';'
648
+ * in statements.
649
+ */
651
650
static bool remote_exec (PGconn * * conn , CopyPartState * cps , char * stmts )
652
651
{
653
652
char * sql = stmts , * sep ;
654
653
PGresult * res ;
655
654
while ((sep = strchr (sql , ';' )) != NULL ) {
656
655
* sep = '\0' ;
657
656
res = PQexec (* conn , sql );
658
- if (PQresultStatus (res ) != PGRES_TUPLES_OK && PQresultStatus (res ) != PGRES_COMMAND_OK )
657
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK &&
658
+ PQresultStatus (res ) != PGRES_COMMAND_OK )
659
659
{
660
660
shmn_elog (LOG , "REMOTE_EXEC: execution of query '%s' failed for paritions %s: %s" ,
661
661
sql , cps -> part_name , PQerrorMessage (* conn ));
@@ -686,15 +686,15 @@ mp_rebuild_lr(MovePartState *mps)
686
686
if (ensure_pqconn (& mps -> prev_conn , mps -> prev_connstr ,
687
687
(CopyPartState * ) mps ) == -1 )
688
688
return -1 ;
689
- if (!remote_exec (& mps -> prev_conn , (CopyPartState * )mps , mps -> prev_sql ))
689
+ if (!remote_exec (& mps -> prev_conn , (CopyPartState * ) mps , mps -> prev_sql ))
690
690
return -1 ;
691
691
shmn_elog (DEBUG1 , "mp %s: LR conf on prev done" , mps -> cp .part_name );
692
692
}
693
693
694
694
if (ensure_pqconn_cp ((CopyPartState * ) mps ,
695
695
ENSURE_PQCONN_DST ) == -1 )
696
696
return -1 ;
697
- if (!remote_exec (& mps -> cp .dst_conn , (CopyPartState * )mps , mps -> dst_sql ))
697
+ if (!remote_exec (& mps -> cp .dst_conn , (CopyPartState * ) mps , mps -> dst_sql ))
698
698
return -1 ;
699
699
shmn_elog (DEBUG1 , "mp %s: LR conf on dst done" , mps -> cp .part_name );
700
700
@@ -703,7 +703,7 @@ mp_rebuild_lr(MovePartState *mps)
703
703
if (ensure_pqconn (& mps -> next_conn , mps -> next_connstr ,
704
704
(CopyPartState * ) mps ) == -1 )
705
705
return -1 ;
706
- if (!remote_exec (& mps -> next_conn , (CopyPartState * )mps , mps -> next_sql ))
706
+ if (!remote_exec (& mps -> next_conn , (CopyPartState * ) mps , mps -> next_sql ))
707
707
return -1 ;
708
708
shmn_elog (DEBUG1 , "mp %s: LR conf on next done" , mps -> cp .part_name );
709
709
}
@@ -850,12 +850,12 @@ cp_start_tablesync(CopyPartState *cps)
850
850
if (check_sub_sync ("shardman_meta_sub" , & cps -> src_conn , lord_lsn ,
851
851
"meta sub" ) == -1 )
852
852
{
853
- goto fail ;
853
+ goto configure_retry_and_fail ;
854
854
}
855
855
if (check_sub_sync ("shardman_meta_sub" , & cps -> dst_conn , lord_lsn ,
856
856
"meta sub" ) == -1 )
857
857
{
858
- goto fail ;
858
+ goto configure_retry_and_fail ;
859
859
}
860
860
861
861
if (!remote_exec (& cps -> dst_conn , cps , cps -> dst_drop_sub_sql ))
@@ -866,16 +866,17 @@ cp_start_tablesync(CopyPartState *cps)
866
866
goto fail ;
867
867
shmn_elog (DEBUG1 , "cp %s: pub and rs recreated on src" , cps -> part_name );
868
868
869
- if (!remote_exec (& cps -> dst_conn , cps , cps -> dst_create_tab_and_sub_sql ))
869
+ if (!remote_exec (& cps -> dst_conn , cps , cps -> dst_create_tab_and_sub_sql ))
870
870
goto fail ;
871
871
shmn_elog (DEBUG1 , "cp %s: table & sub created on dst, tablesync started" ,
872
872
cps -> part_name );
873
873
874
874
cps -> curstep = COPYPART_START_FINALSYNC ;
875
875
return 0 ;
876
876
877
- fail :
877
+ configure_retry_and_fail :
878
878
configure_retry (cps , shardman_cmd_retry_naptime );
879
+ fail :
879
880
return -1 ;
880
881
}
881
882
@@ -1078,8 +1079,8 @@ ensure_pqconn(PGconn **conn, const char *connstr,
1078
1079
* conn = PQconnectdb (connstr );
1079
1080
if (PQstatus (* conn ) != CONNECTION_OK )
1080
1081
{
1081
- shmn_elog (NOTICE , "Connection to node failed: %s" ,
1082
- PQerrorMessage (* conn ));
1082
+ shmn_elog (NOTICE , "Connection to node %s failed: %s" ,
1083
+ connstr , PQerrorMessage (* conn ));
1083
1084
reset_pqconn (conn );
1084
1085
configure_retry (cps , shardman_cmd_retry_naptime );
1085
1086
return -1 ;
0 commit comments