Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c795670

Browse files
author
Vladimir Ershov
committed
resubmit && resubmit_limit
1 parent 672905b commit c795670

11 files changed

+118
-13
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ CREATE TABLE at_jobs_submitted(
1717
executor text,
1818
owner text,
1919
last_start_available timestamp with time zone,
20+
attempt bigint default 0,
21+
resubmit_limit bigint default 100,
2022
postpone interval,
2123
max_run_time interval,
2224
submit_time timestamp with time zone default now()
@@ -153,8 +155,8 @@ CREATE FUNCTION get_self_id()
153155
AS 'MODULE_PATHNAME', 'get_self_id'
154156
LANGUAGE C IMMUTABLE;
155157

156-
CREATE FUNCTION resubmit()
157-
RETURNS boolean
158+
CREATE FUNCTION resubmit(run_after interval default NULL)
159+
RETURNS bigint
158160
AS 'MODULE_PATHNAME', 'resubmit'
159161
LANGUAGE C IMMUTABLE;
160162

@@ -168,7 +170,8 @@ CREATE FUNCTION submit_job(
168170
run_as text default NULL,
169171
depends_on bigint[] default NULL,
170172
name text default NULL,
171-
comments text default NULL
173+
comments text default NULL,
174+
resubmit_limit bigint default 100
172175
) RETURNS bigint AS
173176
$BODY$
174177
DECLARE
@@ -215,10 +218,12 @@ BEGIN
215218

216219
INSERT INTO at_jobs_submitted
217220
(node, at, do_sql, owner, executor, name, comments, max_run_time,
218-
postpone, last_start_available, depends_on, params)
221+
postpone, last_start_available, depends_on, params,
222+
attempt, resubmit_limit)
219223
VALUES
220224
(node, run_after, query, session_user, executor, name, comments,
221-
max_duration, max_wait_interval, last_avail, depends_on, params)
225+
max_duration, max_wait_interval, last_avail, depends_on, params,
226+
0, resubmit_limit)
222227
RETURNING id INTO job_id;
223228

224229
RETURN job_id;

src/pgpro_scheduler.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void reload_db_role_config(char *dbname)
9292
CommitTransactionCommand();
9393
}
9494

95-
TimestampTz timestamp_add_seconds(TimestampTz to, int add)
95+
TimestampTz timestamp_add_seconds(TimestampTz to, int64 add)
9696
{
9797
if(to == 0) to = GetCurrentTimestamp();
9898
#ifdef HAVE_INT64_TIMESTAMP

src/pgpro_scheduler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ int checkSchedulerNamespace(void);
3434
pid_t registerManagerWorker(schd_manager_t *man);
3535

3636
void reload_db_role_config(char *dbname);
37-
TimestampTz timestamp_add_seconds(TimestampTz to, int add);
37+
TimestampTz timestamp_add_seconds(TimestampTz to, int64 add);
3838
char *make_date_from_timestamp(TimestampTz ts, bool hires);
3939
int get_integer_from_string(char *s, int start, int len);
4040
TimestampTz get_timestamp_from_string(char *str);

src/scheduler_executor.c

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extern volatile sig_atomic_t got_sighup;
3636
extern volatile sig_atomic_t got_sigterm;
3737

3838
static int64 current_job_id = -1;
39-
static int resubmit_current_job = 0;
39+
static int64 resubmit_current_job = 0;
4040

4141
static void handle_sigterm(SIGNAL_ARGS);
4242

@@ -193,7 +193,24 @@ void executor_worker_main(Datum arg)
193193
{
194194
STOP_SPI_SNAP();
195195
}
196-
status = SchdExecutorDone;
196+
if(job->type == AtJob && resubmit_current_job > 0)
197+
{
198+
if(job->attempt >= job->resubmit_limit)
199+
{
200+
status = SchdExecutorError;
201+
push_executor_error(&EE, "Cannot resubmit: limit reached (%ld)", job->resubmit_limit);
202+
resubmit_current_job = 0;
203+
}
204+
else
205+
{
206+
status = SchdExecutorResubmit;
207+
}
208+
}
209+
else
210+
{
211+
status = SchdExecutorDone;
212+
}
213+
197214
SetConfigOption("schedule.transaction_state", "success", PGC_INTERNAL, PGC_S_SESSION);
198215
}
199216
if(job->next_time_statement)
@@ -218,6 +235,11 @@ void executor_worker_main(Datum arg)
218235
set_shared_message(shared, &EE);
219236
}
220237
shared->status = status;
238+
if(status == SchdExecutorResubmit)
239+
{
240+
shared->next_time = timestamp_add_seconds(0, resubmit_current_job);
241+
resubmit_current_job = 0;
242+
}
221243

222244
delete_worker_mem_ctx();
223245
dsm_detach(seg);
@@ -481,9 +503,28 @@ PG_FUNCTION_INFO_V1(resubmit);
481503
Datum
482504
resubmit(PG_FUNCTION_ARGS)
483505
{
506+
Interval *interval;
507+
484508
if(current_job_id == -1)
485509
{
486510
elog(ERROR, "There is no active job in progress");
487511
}
488-
PG_RETURN_BOOL(true);
512+
if(PG_ARGISNULL(0))
513+
{
514+
resubmit_current_job = 1;
515+
PG_RETURN_INT64(1);
516+
}
517+
interval = PG_GETARG_INTERVAL_P(0);
518+
#ifdef HAVE_INT64_TIMESTAMP
519+
resubmit_current_job = interval->time / 1000000.0;
520+
#else
521+
resubmit_current_job = interval->time;
522+
#endif
523+
resubmit_current_job +=
524+
(DAYS_PER_YEAR * SECS_PER_DAY) * (interval->month / MONTHS_PER_YEAR);
525+
resubmit_current_job +=
526+
(DAYS_PER_MONTH * SECS_PER_DAY) * (interval->month % MONTHS_PER_YEAR);
527+
resubmit_current_job += SECS_PER_DAY * interval->day;
528+
529+
PG_RETURN_INT64(resubmit_current_job);
489530
}

src/scheduler_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ typedef enum {
1111
SchdExecutorInit,
1212
SchdExecutorWork,
1313
SchdExecutorDone,
14+
SchdExecutorResubmit,
1415
SchdExecutorError
1516
} schd_executor_status_t;
1617

src/scheduler_job.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *init_scheduler_job(job_t *j, unsigned char type)
2525
job_t *get_at_job(int cron_id, char *nodename, char **perror)
2626
{
2727
job_t *j;
28-
const char *sql = "select last_start_available, array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit, at, params, depends_on from ONLY at_jobs_process where node = $1 and id = $2";
28+
const char *sql = "select last_start_available, array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit, at, params, depends_on, attempt, resubmit_limit from ONLY at_jobs_process where node = $1 and id = $2";
2929
Oid argtypes[2] = { TEXTOID, INT4OID};
3030
Datum args[2];
3131
int ret;
@@ -69,6 +69,8 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6969
j->start_at = get_timestamp_from_spi(0, 6, 0);
7070
j->sql_params = get_textarray_from_spi(0, 7, &j->sql_params_n);
7171
j->depends_on = get_int64array_from_spi(0, 8, &j->depends_on_n);
72+
j->attempt = get_int64_from_spi(0, 9, 0);
73+
j->resubmit_limit = get_int64_from_spi(0, 10, 0);
7274

7375
STOP_SPI_SNAP();
7476

@@ -333,7 +335,7 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
333335
{
334336
Datum values[3];
335337
char nulls[3] = { ' ', ' ', ' ' };
336-
Oid argtypes[4] = { INT4OID, BOOLOID, TEXTOID };
338+
Oid argtypes[3] = { INT4OID, BOOLOID, TEXTOID };
337339
int ret;
338340
const char *sql_process = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
339341
const char *sql_submitted = "WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
@@ -356,6 +358,20 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
356358
return ret > 0 ? 1: ret;
357359
}
358360

361+
int resubmit_at_job(job_t *j, TimestampTz next)
362+
{
363+
Datum values[2];
364+
Oid argtypes[2] = { INT4OID, TIMESTAMPTZOID };
365+
int ret;
366+
const char *sql = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, submit_time FROM moved_rows";
367+
368+
values[0] = Int32GetDatum(j->cron_id);
369+
values[1] = TimestampTzGetDatum(next);
370+
ret = SPI_execute_with_args(sql, 2, argtypes, values, NULL, false, 0);
371+
372+
return ret > 0 ? 1: ret;
373+
}
374+
359375
int _cron_move_job_to_log(job_t *j, bool status)
360376
{
361377
Datum values[4];

src/scheduler_job.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ typedef struct {
3636
char *onrollback;
3737
char *next_time_statement;
3838
bool is_active;
39+
int64 attempt;
40+
int64 resubmit_limit;
3941
char *error;
4042
} job_t;
4143

@@ -52,6 +54,7 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror);
5254
job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **perror);
5355
int _cron_move_job_to_log(job_t *j, bool status);
5456
int _at_move_job_to_log(job_t *j, bool status, bool processed);
57+
int resubmit_at_job(job_t *j, TimestampTz next);
5558

5659
#endif
5760

src/scheduler_manager.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,12 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
972972
toremove[nremove].reason = shm_data->status == SchdExecutorDone ? RmDone: RmError;
973973
nremove++;
974974
}
975+
else if(shm_data->status == SchdExecutorResubmit)
976+
{
977+
toremove[nremove].pos = i;
978+
toremove[nremove].reason = RmDoneResubmit;
979+
nremove++;
980+
}
975981
}
976982
}
977983
if(nremove)
@@ -996,6 +1002,11 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
9961002
item->wait_worker_to_die = true;
9971003
}
9981004
}
1005+
else if(toremove[i].reason == RmDoneResubmit)
1006+
{
1007+
removeJob = true;
1008+
job_status = true;
1009+
}
9991010
else if(toremove[i].reason == RmWaitWorker) /* wait worker to die */
10001011
{
10011012
if(GetBackgroundWorkerPid(item->handler, &tmppid) == BGWH_STARTED)
@@ -1066,7 +1077,22 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10661077
}
10671078
}
10681079
}
1069-
move_job_to_log(item->job, job_status, true);
1080+
if(toremove[i].reason == RmDoneResubmit)
1081+
{
1082+
if(item->job->type == AtJob)
1083+
{
1084+
resubmit_at_job(item->job, shm_data->next_time);
1085+
}
1086+
else
1087+
{
1088+
set_job_error(item->job, "cannot resubmit Cron job");
1089+
move_job_to_log(item->job, false, true);
1090+
}
1091+
}
1092+
else
1093+
{
1094+
move_job_to_log(item->job, job_status, true);
1095+
}
10701096
STOP_SPI_SNAP();
10711097

10721098
last = p->len - p->free - 1;

src/scheduler_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef enum {
2828
RmTimeout,
2929
RmWaitWorker,
3030
RmError,
31+
RmDoneResubmit,
3132
RmDone
3233
} schd_remove_reason_t;
3334

src/scheduler_spi_utils.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ int get_int_from_spi(int row_n, int pos, int def)
230230
return (int)DatumGetInt32(datum);
231231
}
232232

233+
int64 get_int64_from_spi(int row_n, int pos, int def)
234+
{
235+
Datum datum;
236+
bool is_null;
237+
238+
datum = SPI_getbinval(SPI_tuptable->vals[row_n], SPI_tuptable->tupdesc,
239+
pos, &is_null);
240+
if(is_null) return def;
241+
return (int64)DatumGetInt64(datum);
242+
}
243+
233244
Datum select_onedatumvalue_sql(const char *sql, bool *is_null)
234245
{
235246
int ret;

src/scheduler_spi_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ void ABORT_SPI_SNAP(void);
1818
char *_copy_string(char *str);
1919
TimestampTz get_timestamp_from_spi(int row_n, int pos, TimestampTz def);
2020
int get_int_from_spi(int row_n, int pos, int def);
21+
int64 get_int64_from_spi(int row_n, int pos, int def);
2122
int select_oneintvalue_sql(const char *sql, int d);
2223
Datum select_onedatumvalue_sql(const char *sql, bool *is_null);
2324
int select_count_with_args(const char *sql, int n, Oid *argtypes, Datum *values, char *nulls);

0 commit comments

Comments
 (0)