@@ -43,7 +43,8 @@ typedef enum
43
43
/* Current step of 1 master partition move */
44
44
typedef enum
45
45
{
46
- MOVEMPARTSTEP_START_TABLESYNC
46
+ MOVEMPARTSTEP_START_TABLESYNC ,
47
+ MOVEMPARTSTEP_WAIT_TABLESYNC
47
48
} MoveMPartStep ;
48
49
49
50
typedef struct
@@ -314,18 +315,37 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
314
315
mmps -> part_name , mmps -> src_node , mmps -> dst_node );
315
316
mmps -> dst_drop_sub_sql = psprintf (
316
317
"drop subscription if exists %s cascade;" , mmps -> logname );
318
+ /*
319
+ * Note that we run stmts in separate txns: repslot can't be created in in
320
+ * transaction that performed writes
321
+ */
317
322
mmps -> src_create_pub_and_rs_sql = psprintf (
318
- "drop publication if exists %s cascade;"
319
- " create publication %s for table %s;"
323
+ "begin; drop publication if exists %s cascade;"
324
+ " create publication %s for table %s; end; "
320
325
" select shardman.create_repslot('%s');" ,
321
326
mmps -> logname , mmps -> logname , mmps -> part_name , mmps -> logname
322
327
);
323
328
mmps -> relation = get_partition_relation (part_name );
324
329
mmps -> dst_create_tab_and_sub_sql = psprintf (
325
330
"drop table if exists %s cascade;"
331
+ /*
332
+ * TODO: we are mimicking pathman's partition creation here. At least
333
+ * one difference is that we don't copy foreign keys, so this should
334
+ * be fixed. For example, we could directly call pathman's
335
+ * create_single_partition_internal func here, though currently it is
336
+ * static. We could also just use old empty partition and not remove
337
+ * it, but considering (in very far perspective) ALTER TABLE this is
338
+ * wrong approach.
339
+ */
326
340
" create table %s (like %s including defaults including indexes"
327
- " including storage);" ,
328
- mmps -> part_name , mmps -> part_name , mmps -> relation );
341
+ " including storage);"
342
+ " drop subscription if exists %s cascade;"
343
+ " create subscription %s connection '%s' publication %s with"
344
+ " (create_slot = false, slot_name = '%s');" ,
345
+ mmps -> part_name ,
346
+ mmps -> part_name , mmps -> relation ,
347
+ mmps -> logname ,
348
+ mmps -> logname , mmps -> src_connstr , mmps -> logname , mmps -> logname );
329
349
330
350
mmps -> curstep = MOVEMPARTSTEP_START_TABLESYNC ;
331
351
mmps -> res = MOVEMPART_IN_PROGRESS ;
@@ -543,12 +563,14 @@ exec_move_mpart(MoveMPartState *mmps)
543
563
/* Mark waketm as invalid for safety */
544
564
mmps -> waketm = (struct timespec ) {0 };
545
565
546
- switch (mmps -> curstep )
566
+ if (mmps -> curstep == MOVEMPARTSTEP_START_TABLESYNC )
547
567
{
548
- case MOVEMPARTSTEP_START_TABLESYNC :
549
- if (start_tablesync (mmps ) == -1 )
550
- return ;
568
+ if (start_tablesync (mmps ) == -1 )
569
+ return ;
570
+ else
571
+ mmps -> curstep = MOVEMPARTSTEP_WAIT_TABLESYNC ;
551
572
}
573
+
552
574
shmn_elog (DEBUG1 , "Partition %s is moved" , mmps -> part_name );
553
575
mmps -> res = MOVEMPART_SUCCESS ;
554
576
mmps -> exec_res = EXECMOVEMPART_DONE ;
@@ -579,17 +601,31 @@ start_tablesync(MoveMPartState *mmps)
579
601
shmn_elog (DEBUG1 , "mmp %s: sub on dst dropped, if any" , mmps -> part_name );
580
602
581
603
res = PQexec (mmps -> src_conn , mmps -> src_create_pub_and_rs_sql );
582
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
604
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
583
605
{
584
606
shmn_elog (NOTICE , "Failed to create pub and repslot on src: %s" ,
585
607
PQerrorMessage (mmps -> src_conn ));
586
608
reset_pqconn_and_res (& mmps -> src_conn , res );
587
609
configure_retry (mmps , shardman_cmd_retry_naptime );
588
610
return -1 ;
589
611
}
590
- return 0 ;
591
612
PQclear (res );
592
613
shmn_elog (DEBUG1 , "mmp %s: pub and rs recreated on src" , mmps -> part_name );
614
+
615
+ res = PQexec (mmps -> dst_conn , mmps -> dst_create_tab_and_sub_sql );
616
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
617
+ {
618
+ shmn_elog (NOTICE , "Failed to recreate table & sub on dst: %s" ,
619
+ PQerrorMessage (mmps -> dst_conn ));
620
+ reset_pqconn_and_res (& mmps -> dst_conn , res );
621
+ configure_retry (mmps , shardman_cmd_retry_naptime );
622
+ return -1 ;
623
+ }
624
+ PQclear (res );
625
+ shmn_elog (DEBUG1 , "mmp %s: table & sub created on dst, tablesync started" ,
626
+ mmps -> part_name );
627
+
628
+ return 0 ;
593
629
}
594
630
595
631
/*
0 commit comments