Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit be4020d

Browse files
committed
Signal handling revisited.
Macros for easy cmd canceled & sigterm check.
1 parent 545a858 commit be4020d

File tree

5 files changed

+46
-38
lines changed

5 files changed

+46
-38
lines changed

shard.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ BEGIN
220220
END IF;
221221
-- Drop old table anyway;
222222
EXECUTE format('DROP TABLE IF EXISTS %I', NEW.part_name);
223-
224223
ELSEIF me = NEW.owner THEN -- dst node
225224
RAISE DEBUG '[SHARDMAN %] part_moved trigger on dst node',
226225
me;

src/copypart.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ exec_tasks(CopyPartState **tasks, int ntasks)
440440
if ((epfd = epoll_create1(0)) == -1)
441441
shmn_elog(FATAL, "epoll_create1 failed");
442442

443-
while (unfinished_tasks > 0 && !got_sigusr1 && !got_sigterm)
443+
while (unfinished_tasks > 0 && !signal_pending())
444444
{
445445
timeout = calc_timeout(&timeout_states);
446446
e = epoll_wait(epfd, evlist, MAX_EVENTS, timeout);

src/include/pg_shardman.h

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,48 @@
33

44
#include <signal.h>
55

6+
#include "miscadmin.h"
7+
68
#define shmn_elog(level,fmt,...) elog(level, "[SHARDMAN] " fmt, ## __VA_ARGS__)
79

810
#define SPI_PROLOG do { \
9-
StartTransactionCommand(); \
10-
SPI_connect(); \
11-
PushActiveSnapshot(GetTransactionSnapshot()); \
12-
} while (0);
11+
StartTransactionCommand(); \
12+
SPI_connect(); \
13+
PushActiveSnapshot(GetTransactionSnapshot()); \
14+
} while (0);
1315

1416
#define SPI_EPILOG do { \
15-
PopActiveSnapshot(); \
16-
SPI_finish(); \
17-
CommitTransactionCommand(); \
18-
} while (0);
17+
PopActiveSnapshot(); \
18+
SPI_finish(); \
19+
CommitTransactionCommand(); \
20+
} while (0);
1921

2022
/* flags set by signal handlers */
2123
extern volatile sig_atomic_t got_sigterm;
2224
extern volatile sig_atomic_t got_sigusr1;
25+
/*
26+
* Most probably CHECK_FOR_INTERRTUPS here is useless since we handle
27+
* SIGTERM ourselves (to get adequate log message) and don't need anything
28+
* else, but just to be sure...
29+
*/
30+
#define SHMN_CHECK_FOR_INTERRUPTS() \
31+
do { \
32+
check_for_sigterm(); \
33+
CHECK_FOR_INTERRUPTS(); \
34+
} while (0)
35+
/*
36+
* Additionally check for SIGUSR1; if it has arrived, mark cmd as canceled and
37+
* return from current function.
38+
*/
39+
#define SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd) \
40+
do { \
41+
SHMN_CHECK_FOR_INTERRUPTS(); \
42+
if (got_sigusr1) \
43+
{ \
44+
cmd_canceled((cmd)); \
45+
return; \
46+
} \
47+
} while(0)
2348

2449
/* GUC variables */
2550
extern bool shardman_shardlord;
@@ -53,10 +78,11 @@ typedef struct RepCount
5378

5479
extern void _PG_init(void);
5580
extern void shardlord_main(Datum main_arg);
81+
extern bool signal_pending(void);
5682
extern void check_for_sigterm(void);
83+
extern void cmd_canceled(Cmd *cmd);
5784
extern uint64 void_spi(char *sql);
5885
extern void update_cmd_status(int64 id, const char *new_status);
59-
extern void cmd_canceled(Cmd *cmd);
6086
extern char *get_worker_node_connstr(int32 node_id);
6187
extern int32 *get_workers(uint64 *num_workers);
6288
extern int32 get_primary_owner(const char *part_name);

src/pg_shardman.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,14 +431,16 @@ shardlord_sigterm(SIGNAL_ARGS)
431431

432432
/*
433433
* Signal handler for SIGUSR1
434-
* Set a flag to let the main loop to terminate.
434+
* Set a flag to let cancel a command
435435
*/
436436
void
437437
shardlord_sigusr1(SIGNAL_ARGS)
438438
{
439439
got_sigusr1 = true;
440440
}
441441

442+
bool signal_pending(void) { return got_sigterm || got_sigusr1; }
443+
442444
/*
443445
* Cleanup and exit in case of SIGTERM
444446
*/
@@ -488,7 +490,7 @@ add_node(Cmd *cmd)
488490

489491
shmn_elog(INFO, "Adding node %s", connstr);
490492
/* Try to execute command indefinitely until it succeeded or canceled */
491-
while (!got_sigusr1 && !got_sigterm)
493+
while (1948)
492494
{
493495
conn = PQconnectdb(connstr);
494496
if (PQstatus(conn) != CONNECTION_OK)
@@ -610,10 +612,8 @@ add_node(Cmd *cmd)
610612
shmn_elog(LOG, "Attempt to execute add_node failed, sleeping and retrying");
611613
/* TODO: sleep using waitlatch? */
612614
pg_usleep(shardman_cmd_retry_naptime * 1000L);
615+
SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd);
613616
}
614-
check_for_sigterm();
615-
616-
cmd_canceled(cmd);
617617
}
618618

619619
/* See sql func */

src/shard.c

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ create_hash_partitions(Cmd *cmd)
7171
relation, connstr);
7272

7373
/* Try to execute command indefinitely until it succeeded or canceled */
74-
while (!got_sigusr1 && !got_sigterm)
74+
while (1948)
7575
{
7676
conn = PQconnectdb(connstr);
7777
if (PQstatus(conn) != CONNECTION_OK)
@@ -127,22 +127,15 @@ create_hash_partitions(Cmd *cmd)
127127
" sleeping and retrying");
128128
/* TODO: sleep using waitlatch? */
129129
pg_usleep(shardman_cmd_retry_naptime * 1000L);
130+
SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd);
130131
}
131-
check_for_sigterm();
132-
133-
cmd_canceled(cmd);
134132
}
135133

136134
/* Update status of cmd consisting of single task after exec_tasks finishes */
137135
void
138136
cmd_single_task_exec_finished(Cmd *cmd, CopyPartState *cps)
139137
{
140-
check_for_sigterm();
141-
if (got_sigusr1)
142-
{
143-
cmd_canceled(cmd);
144-
return;
145-
}
138+
SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd);
146139

147140
Assert(cps->res != TASK_IN_PROGRESS);
148141
if (cps->res == TASK_FAILED)
@@ -235,12 +228,7 @@ rebalance(Cmd *cmd)
235228
}
236229

237230
exec_tasks(tasks, num_parts);
238-
check_for_sigterm();
239-
if (got_sigusr1)
240-
{
241-
cmd_canceled(cmd);
242-
return;
243-
}
231+
SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd);
244232

245233
shmn_elog(INFO, "Relation %s rebalanced:", relation);
246234
update_cmd_status(cmd->id, "done");
@@ -307,12 +295,7 @@ set_replevel(Cmd *cmd)
307295
break;
308296

309297
exec_tasks(tasks, ntasks);
310-
check_for_sigterm();
311-
if (got_sigusr1)
312-
{
313-
cmd_canceled(cmd);
314-
return;
315-
}
298+
SHMN_CHECK_FOR_INTERRUPTS_CMD(cmd);
316299

317300
pfree(repcounts);
318301
for (i = 0; i < ntasks; i++)

0 commit comments

Comments
 (0)