Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5eae385

Browse files
author
Vladimir Ershov
committed
starts and wait so far
1 parent a4f01da commit 5eae385

9 files changed

+490
-172
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
CREATE SCHEMA IF NOT EXISTS schedule;
44

55
CREATE TYPE schedule.job_status AS ENUM ('working', 'done', 'error');
6-
CREATE TYPE schedule.at_job_status AS ENUM ('submitted', 'working', 'done', 'error');
76

8-
CREATE TABLE schedule.at_jobs(
7+
CREATE TABLE schedule.at_jobs_submitted(
98
id SERIAL PRIMARY KEY,
109
node text,
1110
name text,
@@ -16,16 +15,27 @@ CREATE TABLE schedule.at_jobs(
1615
onrollback_statement text,
1716
executor text,
1817
owner text,
18+
last_start_available timestamp with time zone,
1919
postpone interval,
2020
max_run_time interval,
2121
max_instances integer default 1,
22-
status at_job_status default 'submited',
23-
submit_time timestamp with time zone default now(),
24-
started timestamp with time zone,
25-
finished timestamp with time zone,
26-
reason text
22+
submit_time timestamp with time zone default now()
2723
);
28-
CREATE INDEX at_jobs_status_node_at_idx on schedule.at (status, node, at);
24+
CREATE INDEX at_jobs_submitted_node_at_idx on schedule.at_jobs_submitted (node, at);
25+
26+
CREATE TABLE schedule.at_jobs_process(
27+
start_time timestamp with time zone default now()
28+
) INHERITS (schedule.at_jobs_submitted);
29+
30+
CREATE INDEX at_jobs_process_node_at_idx on schedule.at_jobs_process (node, at);
31+
32+
CREATE TABLE schedule.at_jobs_done(
33+
status boolean,
34+
reason text,
35+
done_time timestamp with time zone default now()
36+
) INHERITS (schedule.at_jobs_process);
37+
38+
CREATE INDEX at_jobs_done_node_at_idx on schedule.at_jobs_done (node, at);
2939

3040
CREATE TABLE schedule.cron(
3141
id SERIAL PRIMARY KEY,

src/pgpro_scheduler.c

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ char *scheduler_databases = NULL;
4646
char *scheduler_nodename = NULL;
4747
char *scheduler_transaction_state = NULL;
4848
int scheduler_max_workers = 2;
49+
int scheduler_at_max_workers = 2;
4950
bool scheduler_service_enabled = false;
5051
char *scheduler_schema = NULL;
5152
/* Custom GUC done */
@@ -109,17 +110,18 @@ int get_integer_from_string(char *s, int start, int len)
109110
return atoi(buff);
110111
}
111112

112-
char *make_date_from_timestamp(TimestampTz ts)
113+
char *make_date_from_timestamp(TimestampTz ts, bool hires)
113114
{
114115
struct pg_tm dt;
115-
char *str = worker_alloc(sizeof(char) * 17);
116+
char *str = worker_alloc(sizeof(char) * 19);
116117
int tz;
117118
fsec_t fsec;
118119
const char *tzn;
119120

120121
timestamp2tm(ts, &tz, &dt, &fsec, &tzn, NULL );
121-
sprintf(str, "%04d-%02d-%02d %02d:%02d", dt.tm_year , dt.tm_mon,
122-
dt.tm_mday, dt.tm_hour, dt.tm_min);
122+
sprintf(str, "%04d-%02d-%02d %02d:%02d:%02d", dt.tm_year , dt.tm_mon,
123+
dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec);
124+
if(!hires) str[16] = 0;
123125
return str;
124126
}
125127

@@ -457,7 +459,7 @@ void _PG_init(void)
457459
);
458460
DefineCustomIntVariable(
459461
"schedule.max_workers",
460-
"How much workers can serve scheduler on one database",
462+
"How much workers can serve scheduled jobs on one database",
461463
NULL,
462464
&scheduler_max_workers,
463465
2,
@@ -469,6 +471,20 @@ void _PG_init(void)
469471
NULL,
470472
NULL
471473
);
474+
DefineCustomIntVariable(
475+
"schedule.at_max_workers",
476+
"How much workers can serve at jobs on one database",
477+
NULL,
478+
&scheduler_at_max_workers,
479+
2,
480+
1,
481+
100,
482+
PGC_SUSET,
483+
0,
484+
NULL,
485+
NULL,
486+
NULL
487+
);
472488
DefineCustomBoolVariable(
473489
"schedule.enabled",
474490
"Enable schedule service",

src/pgpro_scheduler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pid_t registerManagerWorker(schd_manager_t *man);
3535

3636
void reload_db_role_config(char *dbname);
3737
TimestampTz timestamp_add_seconds(TimestampTz to, int add);
38-
char *make_date_from_timestamp(TimestampTz ts);
38+
char *make_date_from_timestamp(TimestampTz ts, bool hires);
3939
int get_integer_from_string(char *s, int start, int len);
4040
TimestampTz get_timestamp_from_string(char *str);
4141
TimestampTz _round_timestamp_to_minute(TimestampTz ts);

src/scheduler_executor.c

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -389,72 +389,30 @@ void set_pg_var(bool result, executor_error_t *ee)
389389

390390
job_t *initializeExecutorJob(schd_executor_share_t *data)
391391
{
392-
const char *sql = "select at.last_start_available, cron.same_transaction, cron.do_sql, cron.executor, cron.postpone, cron.max_run_time as time_limit, cron.max_instances, cron.onrollback_statement , cron.next_time_statement from schedule.at at, schedule.cron cron where start_at = $1 and at.active and at.cron = cron.id AND cron.node = $2 AND cron.id = $3";
393-
Oid argtypes[3] = { TIMESTAMPTZOID, TEXTOID, INT4OID};
394-
Datum args[3];
395392
job_t *J;
396-
int ret;
397393
char *error = NULL;
398-
char *ts;
399-
400-
args[0] = TimestampTzGetDatum(data->start_at);
401-
args[1] = PointerGetDatum(cstring_to_text(data->nodename));
402-
args[2] = Int32GetDatum(data->cron_id);
403394

404-
START_SPI_SNAP();
405-
ret = execute_spi_sql_with_args(sql, 3, argtypes, args, NULL, &error);
395+
J = data->type == CronJob ?
396+
get_cron_job(data->cron_id, data->start_at, data->nodename, &error):
397+
get_at_job(data->cron_id, data->nodename, &error);
406398

407399
if(error)
408400
{
409-
snprintf(data->message,
410-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
411-
"cannot retrive job: %s", error);
401+
snprintf(data->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
402+
"%s", error);
412403
elog(LOG, "EXECUTOR: %s", data->message);
413404
pfree(error);
414-
PopActiveSnapshot();
415-
AbortCurrentTransaction();
416-
SPI_finish();
417405
return NULL;
418406
}
419-
420-
if(ret == SPI_OK_SELECT)
407+
if(!J)
421408
{
422-
if(SPI_processed == 0)
423-
{
424-
STOP_SPI_SNAP();
425-
ts = make_date_from_timestamp(data->start_at);
426-
snprintf(data->message,
427-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
428-
"cannot find job: %d @ %s [%s]",
429-
data->cron_id, ts, data->nodename);
430-
elog(LOG, "EXECUTOR: %s", data->message);
431-
pfree(ts);
432-
return NULL;
433-
}
434-
J = worker_alloc(sizeof(job_t));
435-
436-
J->cron_id = data->cron_id;
437-
J->start_at = data->start_at;
438-
J->node = _copy_string(data->nodename);
439-
J->same_transaction = get_boolean_from_spi(0, 2, false);
440-
J->dosql = get_textarray_from_spi(0, 3, &J->dosql_n);
441-
J->executor = get_text_from_spi(0, 4);
442-
J->onrollback = get_text_from_spi(0, 8);
443-
J->next_time_statement = get_text_from_spi(0, 9);
444-
445-
STOP_SPI_SNAP();
446-
447-
return J;
409+
snprintf(data->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
410+
"unknown error get job");
411+
elog(LOG, "EXECUTOR: %s", data->message);
412+
return NULL;
448413
}
449-
snprintf(data->message,
450-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
451-
"error while retrive job information: %d", ret);
452-
elog(LOG, "EXECUTOR: %s", data->message);
453-
454-
PopActiveSnapshot();
455-
AbortCurrentTransaction();
456-
SPI_finish();
457-
return NULL;
414+
415+
return J;
458416
}
459417

460418
int push_executor_error(executor_error_t *e, char *fmt, ...)

src/scheduler_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct {
2020
char user[NAMEDATALEN];
2121

2222
int cron_id;
23+
task_type_t type;
2324
TimestampTz start_at;
2425

2526
schd_executor_status_t status;

0 commit comments

Comments
 (0)