20
20
/* epoll max events */
21
21
#define MAX_EVENTS 64
22
22
23
+ /* Bitmask for ensure_pqconn */
24
+ #define ENSURE_PQCONN_SRC (1 << 0)
25
+ #define ENSURE_PQCONN_DST (1 << 1)
26
+
27
+ /* final result of 1 master partition move */
23
28
typedef enum
24
29
{
25
30
MOVEMPART_IN_PROGRESS ,
26
31
MOVEMPART_FAILED ,
27
32
MOVEMPART_SUCCESS
28
- } MoveMPartResult ;
33
+ } MoveMPartRes ;
29
34
30
- /* result of one iteration of processing */
35
+ /* result of one iteration of master partition moving */
31
36
typedef enum
32
37
{
33
38
EXECMOVEMPART_EPOLL , /* add me to epoll on epolled_fd on EPOLLIN */
34
39
EXECMOVEMPART_WAKEMEUP , /* wake me up again on waketm */
35
40
EXECMOVEMPART_DONE /* the work is done, never invoke me again */
36
41
} ExecMoveMPartRes ;
37
42
43
+ /* Current step of 1 master partition move */
44
+ typedef enum
45
+ {
46
+ MOVEMPARTSTEP_START_TABLESYNC
47
+ } MoveMPartStep ;
48
+
38
49
typedef struct
39
50
{
40
51
const char * part_name ; /* partition name */
@@ -47,7 +58,11 @@ typedef struct
47
58
/* exec_move_mpart sets fd here when it wants to be wakened by epoll */
48
59
int fd_to_epoll ;
49
60
int fd_in_epoll_set ; /* socket *currently* in epoll set. -1 of none */
50
- MoveMPartResult result ;
61
+ PGconn * src_conn ; /* connection to src */
62
+ PGconn * dst_conn ; /* connection to dst */
63
+ MoveMPartStep curstep ; /* current step */
64
+ ExecMoveMPartRes exec_res ; /* result of the last iteration */
65
+ MoveMPartRes res ; /* result of the whole move */
51
66
} MoveMPartState ;
52
67
53
68
typedef struct
@@ -58,12 +73,19 @@ typedef struct
58
73
59
74
static void init_mmp_state (MoveMPartState * mmps , const char * part_name ,
60
75
int32 dst_node );
76
+ static void finalize_mmp_state (MoveMPartState * mmps );
61
77
static void move_mparts (MoveMPartState * mmpss , int nparts );
62
78
static int calc_timeout (struct timespec waketm , bool waketm_set );
63
79
static void update_waketm (struct timespec * waketm , bool * waketm_set ,
64
80
MoveMPartState * mmps );
65
81
static void epoll_subscribe (int epfd , MoveMPartState * mmps );
66
- static ExecMoveMPartRes exec_move_mpart (MoveMPartState * mmps );
82
+ static void exec_move_mpart (MoveMPartState * mmps );
83
+ static int start_tablesync (MoveMPartState * mmpts );
84
+ static int ensure_pqconn (MoveMPartState * mmpts , int nodes );
85
+ static int ensure_pqconn_intern (PGconn * * conn , const char * connstr ,
86
+ MoveMPartState * mmps );
87
+ static void configure_retry (MoveMPartState * mmpts , int millis );
88
+ static struct timespec timespec_now_plus_millis (int millis );
67
89
68
90
/*
69
91
* Steps are:
@@ -197,10 +219,13 @@ create_hash_partitions(Cmd *cmd)
197
219
* - Sleep & check in connection to dest waiting for completion of final sync,
198
220
* i.e. when received_lsn is equal to remembered lsn on src.
199
221
* - Now update metadata on master, mark cmd as complete and we are done.
222
+ * src table will be dropped via metadata update
200
223
*
201
224
* If we don't save progress (whether initial sync started or done, lsn,
202
225
* etc), we have to start everything from the ground if master reboots. This
203
- * is arguably fine.
226
+ * is arguably fine. There is also a very small chance that the command will
227
+ * complete but fail before status is set to 'success', and after reboot will
228
+ * fail because the partition was already moved.
204
229
*
205
230
*/
206
231
void
@@ -213,7 +238,18 @@ move_mpart(Cmd *cmd)
213
238
init_mmp_state (mmps , part_name , dst_node );
214
239
215
240
move_mparts (mmps , 1 );
216
- update_cmd_status (cmd -> id , "success" );
241
+ check_for_sigterm ();
242
+ if (got_sigusr1 )
243
+ {
244
+ cmd_canceled (cmd );
245
+ return ;
246
+ }
247
+
248
+ Assert (mmps -> res != MOVEMPART_IN_PROGRESS );
249
+ if (mmps -> res == MOVEMPART_FAILED )
250
+ update_cmd_status (cmd -> id , "failed" );
251
+ else if (mmps -> res == MOVEMPART_SUCCESS )
252
+ update_cmd_status (cmd -> id , "success" );
217
253
}
218
254
219
255
@@ -231,7 +267,7 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
231
267
{
232
268
shmn_elog (WARNING , "Partition %s doesn't exist, not moving it" ,
233
269
part_name );
234
- mmps -> result = MOVEMPART_FAILED ;
270
+ mmps -> res = MOVEMPART_FAILED ;
235
271
return ;
236
272
}
237
273
mmps -> dst_node = dst_node ;
@@ -244,7 +280,7 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
244
280
{
245
281
shmn_elog (WARNING , "Node %d doesn't exist, not moving %s to it" ,
246
282
mmps -> dst_node , part_name );
247
- mmps -> result = MOVEMPART_FAILED ;
283
+ mmps -> res = MOVEMPART_FAILED ;
248
284
return ;
249
285
}
250
286
@@ -256,11 +292,35 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
256
292
mmps -> fd_to_epoll = -1 ;
257
293
mmps -> fd_in_epoll_set = -1 ;
258
294
259
- mmps -> result = MOVEMPART_IN_PROGRESS ;
295
+ mmps -> src_conn = NULL ;
296
+ mmps -> dst_conn = NULL ;
297
+
298
+ mmps -> curstep = MOVEMPARTSTEP_START_TABLESYNC ;
299
+ mmps -> res = MOVEMPART_IN_PROGRESS ;
260
300
}
261
301
262
302
/*
263
- * Move partitions as specified in move_mpart_states list
303
+ * Close pq connections, if any.
304
+ */
305
+ static void finalize_mmp_state (MoveMPartState * mmps )
306
+ {
307
+ if (mmps -> src_conn != NULL )
308
+ {
309
+ PQfinish (mmps -> src_conn );
310
+ mmps -> src_conn = NULL ;
311
+ }
312
+ if (mmps -> dst_conn != NULL )
313
+ {
314
+ PQfinish (mmps -> dst_conn );
315
+ mmps -> dst_conn = NULL ;
316
+ }
317
+ }
318
+
319
+ /*
320
+ * Move partitions as specified in move_mpart_states array. Results (and
321
+ * general state is saved in this array too. Tries to move all parts until
322
+ * all have failed/succeeded or sigusr1/sigterm is caugth.
323
+ *
264
324
*/
265
325
void
266
326
move_mparts (MoveMPartState * mmpss , int nparts )
@@ -289,7 +349,7 @@ move_mparts(MoveMPartState *mmpss, int nparts)
289
349
waketm_set = true;
290
350
for (i = 0 ; i < nparts ; i ++ )
291
351
{
292
- if (mmpss [i ].result != MOVEMPART_FAILED )
352
+ if (mmpss [i ].res != MOVEMPART_FAILED )
293
353
{
294
354
MoveMPartStateNode * mmps_node = palloc (sizeof (MoveMPartStateNode ));
295
355
elog (DEBUG4 , "Adding task %s to timeout list" , mmpss [i ].part_name );
@@ -303,7 +363,7 @@ move_mparts(MoveMPartState *mmpss, int nparts)
303
363
shmn_elog (FATAL , "epoll_create1 failed" );
304
364
305
365
/* TODO: check for signals */
306
- while (unfinished_moves > 0 )
366
+ while (unfinished_moves > 0 && ! got_sigusr1 && ! got_sigterm )
307
367
{
308
368
timeout = calc_timeout (waketm , waketm_set );
309
369
e = epoll_wait (epfd , evlist , MAX_EVENTS , timeout );
@@ -328,7 +388,8 @@ move_mparts(MoveMPartState *mmpss, int nparts)
328
388
if (timespeccmp (mmps -> waketm , curtm ) <= 0 )
329
389
{
330
390
shmn_elog (DEBUG1 , "%s is ready for exec" , mmps -> part_name );
331
- switch (exec_move_mpart (mmps ))
391
+ exec_move_mpart (mmps );
392
+ switch (mmps -> exec_res )
332
393
{
333
394
case EXECMOVEMPART_WAKEMEUP :
334
395
/* We need to wake this task again, update waketm and
@@ -354,6 +415,20 @@ move_mparts(MoveMPartState *mmpss, int nparts)
354
415
}
355
416
}
356
417
418
+ /*
419
+ * Free list. This not necessary though, we are finishing cmd and
420
+ * everything will be freed soon.
421
+ */
422
+ slist_foreach_modify (iter , & timeout_states )
423
+ {
424
+ MoveMPartStateNode * mmps_node =
425
+ slist_container (MoveMPartStateNode , list_node , iter .cur );
426
+ slist_delete_current (& iter );
427
+ pfree (mmps_node );
428
+ }
429
+ /* But this is important, as libpq manages memory on its own */
430
+ for (i = 0 ; i < nparts ; i ++ )
431
+ finalize_mmp_state (& mmpss [i ]);
357
432
close (epfd );
358
433
}
359
434
@@ -431,9 +506,104 @@ epoll_subscribe(int epfd, MoveMPartState *mmps)
431
506
* Actually run MoveMPart state machine. Return value says when (if ever)
432
507
* we want to be executed again.
433
508
*/
434
- ExecMoveMPartRes
509
+ void
435
510
exec_move_mpart (MoveMPartState * mmps )
436
511
{
512
+ switch (mmps -> curstep )
513
+ {
514
+ case MOVEMPARTSTEP_START_TABLESYNC :
515
+ if (start_tablesync (mmps ) == -1 )
516
+ return ;
517
+ break ;
518
+ }
437
519
shmn_elog (DEBUG1 , "Partition %s is moved" , mmps -> part_name );
438
- return EXECMOVEMPART_DONE ;
520
+ mmps -> res = MOVEMPART_SUCCESS ;
521
+ mmps -> exec_res = EXECMOVEMPART_DONE ;
522
+ }
523
+
524
+
525
+ /*
526
+ * Set up logical replication between src and dst. If anything goes wrong,
527
+ * it configures mmps properly and returns -1, otherwise 0.
528
+ */
529
+ int
530
+ start_tablesync (MoveMPartState * mmps )
531
+ {
532
+ if (ensure_pqconn (mmps , ENSURE_PQCONN_SRC | ENSURE_PQCONN_DST ) == -1 )
533
+ return -1 ;
534
+ return 0 ;
535
+ }
536
+
537
+ /*
538
+ * Ensure that pq connection to src and dst node is CONNECTION_OK. nodes
539
+ * is a bitmask specifying with which nodes -- src, dst or both -- connection
540
+ * must be ensured. -1 is returned if we have failed to establish connection;
541
+ * mmps is then configured to sleep retry time. 0 is returned if ok.
542
+ */
543
+ int
544
+ ensure_pqconn (MoveMPartState * mmps , int nodes )
545
+ {
546
+ if ((nodes & ENSURE_PQCONN_SRC ) &&
547
+ (ensure_pqconn_intern (& mmps -> src_conn , mmps -> src_connstr , mmps ) == -1 ))
548
+ return -1 ;
549
+ if ((nodes & ENSURE_PQCONN_DST ) &&
550
+ (ensure_pqconn_intern (& mmps -> dst_conn , mmps -> dst_connstr , mmps ) == -1 ))
551
+ return -1 ;
552
+ return 0 ;
553
+ }
554
+
555
+ /*
556
+ * Working horse of ensure_pqconn
557
+ */
558
+ int
559
+ ensure_pqconn_intern (PGconn * * conn , const char * connstr ,
560
+ MoveMPartState * mmps )
561
+ {
562
+ if (* conn != NULL &&
563
+ PQstatus (* conn ) != CONNECTION_OK )
564
+ {
565
+ PQfinish (* conn );
566
+ * conn = NULL ;
567
+ }
568
+ if (* conn == NULL )
569
+ {
570
+ * conn = PQconnectdb (connstr );
571
+ if (PQstatus (* conn ) != CONNECTION_OK )
572
+ {
573
+ shmn_elog (NOTICE , "Connection to node failed: %s" ,
574
+ PQerrorMessage (* conn ));
575
+ /* not checking for not enough memory error :( */
576
+ PQfinish (* conn );
577
+ * conn = NULL ;
578
+ configure_retry (mmps , shardman_cmd_retry_naptime );
579
+ return -1 ;
580
+ }
581
+ shmn_elog (DEBUG1 , "Connection to %s established" , connstr );
582
+ }
583
+ return 0 ;
584
+ }
585
+
586
+
587
+ /*
588
+ * Configure mmps so that main loop wakes us again after given retry millis.
589
+ */
590
+ static void configure_retry (MoveMPartState * mmps , int millis )
591
+ {
592
+ mmps -> waketm = timespec_now_plus_millis (millis );
593
+ mmps -> exec_res = EXECMOVEMPART_WAKEMEUP ;
594
+ }
595
+
596
+ /*
597
+ * Get current time + given milliseconds. Fails with PG elog(FATAL) if gettime
598
+ * failed. Not very generic, yes, but exactly what we need.
599
+ */
600
+ struct timespec timespec_now_plus_millis (int millis )
601
+ {
602
+ struct timespec t ;
603
+ int e ;
604
+
605
+ if ((e = clock_gettime (CLOCK_MONOTONIC , & t )) == -1 )
606
+ shmn_elog (FATAL , "clock_gettime failed, %s" , strerror (e ));
607
+
608
+ return timespec_add_millis (t , millis );
439
609
}
0 commit comments