Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 672905b

Browse files
author
Vladimir Ershov
committed
all params checked && execute with params
1 parent b38692b commit 672905b

7 files changed

+120
-18
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ BEGIN
195195
last_avail := NULL;
196196
END IF;
197197

198+
IF node IS NULL THEN
199+
node := 'master';
200+
END IF;
201+
198202
IF run_as IS NOT NULL AND run_as <> session_user THEN
199203
executor := run_as;
200204
BEGIN
@@ -211,10 +215,10 @@ BEGIN
211215

212216
INSERT INTO at_jobs_submitted
213217
(node, at, do_sql, owner, executor, name, comments, max_run_time,
214-
postpone, last_start_available, depends_on)
218+
postpone, last_start_available, depends_on, params)
215219
VALUES
216220
(node, run_after, query, session_user, executor, name, comments,
217-
max_duration, max_wait_interval, last_avail, depends_on)
221+
max_duration, max_wait_interval, last_avail, depends_on, params)
218222
RETURNING id INTO job_id;
219223

220224
RETURN job_id;

src/scheduler_executor.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,14 @@ void executor_worker_main(Datum arg)
150150
{
151151
START_SPI_SNAP();
152152
}
153-
ret = execute_spi(job->dosql[i], &error);
153+
if(job->type == AtJob && i == 0 && job->sql_params_n > 0)
154+
{
155+
ret = execute_spi_params_prepared(job->dosql[i], job->sql_params_n, job->sql_params, &error);
156+
}
157+
else
158+
{
159+
ret = execute_spi(job->dosql[i], &error);
160+
}
154161
if(ret < 0)
155162
{
156163
/* success = false; */

src/scheduler_job.c

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6262
}
6363

6464
j = init_scheduler_job(NULL, AtJob);
65+
j->cron_id = cron_id;
6566
j->node = _copy_string(nodename);
6667
j->dosql = get_textarray_from_spi(0, 2, &j->dosql_n);
6768
j->executor = get_text_from_spi(0, 3);
@@ -163,7 +164,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
163164
int ret, got, i;
164165
Oid argtypes[2] = { TEXTOID, INT4OID };
165166
Datum values[2];
166-
const char *get_job_sql = "select id, at, last_start_available, max_run_time, max_instances, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2";
167+
const char *get_job_sql = "select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2";
167168

168169
*is_error = *n = 0;
169170
START_SPI_SNAP();
@@ -184,9 +185,8 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
184185
jobs[i].start_at = get_timestamp_from_spi(i, 2, 0);
185186
jobs[i].last_start_avail = get_timestamp_from_spi(i, 3, 0);
186187
jobs[i].timelimit = get_interval_seconds_from_spi(i, 4, 0);
187-
jobs[i].max_instances = get_int_from_spi(i, 5, 1);
188188
jobs[i].node = _copy_string(nodename);
189-
jobs[i].executor = get_text_from_spi(i, 6);
189+
jobs[i].executor = get_text_from_spi(i, 5);
190190
}
191191
}
192192
}
@@ -323,19 +323,23 @@ job_t *set_job_error(job_t *j, const char *fmt, ...)
323323
return j;
324324
}
325325

326-
int move_job_to_log(job_t *j, bool status)
326+
int move_job_to_log(job_t *j, bool status, bool process)
327327
{
328328
if(j->type == CronJob) _cron_move_job_to_log(j, status);
329-
return _at_move_job_to_log(j, status);
329+
return _at_move_job_to_log(j, status, process);
330330
}
331331

332-
int _at_move_job_to_log(job_t *j, bool status)
332+
int _at_move_job_to_log(job_t *j, bool status, bool process)
333333
{
334334
Datum values[3];
335335
char nulls[3] = { ' ', ' ', ' ' };
336336
Oid argtypes[4] = { INT4OID, BOOLOID, TEXTOID };
337337
int ret;
338-
const char *sql = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
338+
const char *sql_process = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
339+
const char *sql_submitted = "WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
340+
const char *sql;
341+
342+
sql = process ? sql_process: sql_submitted;
339343

340344
values[0] = Int32GetDatum(j->cron_id);
341345
values[1] = BoolGetDatum(status);

src/scheduler_job.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ job_t *_cron_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit);
4646
job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit);
4747
job_t *get_jobs_to_do(char *nodename, task_type_t type, int *n, int *is_error, int limit);
4848
job_t *set_job_error(job_t *j, const char *fmt, ...) pg_attribute_printf(2, 3);
49-
int move_job_to_log(job_t *j, bool status);
49+
int move_job_to_log(job_t *j, bool status, bool processed);
5050
void destroy_job(job_t *j, int selfdestroy);
5151
job_t *get_at_job(int cron_id, char *nodename, char **perror);
5252
job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **perror);
5353
int _cron_move_job_to_log(job_t *j, bool status);
54-
int _at_move_job_to_log(job_t *j, bool status);
54+
int _at_move_job_to_log(job_t *j, bool status, bool processed);
5555

5656
#endif
5757

src/scheduler_manager.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -840,12 +840,13 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
840840

841841
for(i = start_i; i < N + start_i; i++)
842842
{
843-
ni = how_many_instances_on_work(ctx, &(jobs[i]));
844-
if(ni >= jobs[i].max_instances)
843+
ni = type == CronJob ?
844+
how_many_instances_on_work(ctx, &(jobs[i])): 100000;
845+
if(type == CronJob && ni >= jobs[i].max_instances)
845846
{
846847
START_SPI_SNAP();
847848
set_job_error(&jobs[i], "max instances limit reached");
848-
move_job_to_log(&jobs[i], false);
849+
move_job_to_log(&jobs[i], false, false);
849850
destroy_job(&jobs[i], 0);
850851
STOP_SPI_SNAP();
851852
jobs[i].cron_id = -1;
@@ -867,9 +868,11 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
867868
set_job_error(&jobs[i],
868869
"Cannot set at job %d to worker",
869870
jobs[i].cron_id);
871+
elog(ERROR, "Cannot set job to free slot type=%d, id=%d",
872+
jobs[i].type, jobs[i].cron_id);
870873
}
871874
START_SPI_SNAP();
872-
move_job_to_log(&jobs[i], false);
875+
move_job_to_log(&jobs[i], false, false);
873876
destroy_job(&jobs[i], 0);
874877
jobs[i].cron_id = -1;
875878
STOP_SPI_SNAP();
@@ -1063,7 +1066,7 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10631066
}
10641067
}
10651068
}
1066-
move_job_to_log(item->job, job_status);
1069+
move_job_to_log(item->job, job_status, true);
10671070
STOP_SPI_SNAP();
10681071

10691072
last = p->len - p->free - 1;
@@ -1179,7 +1182,7 @@ int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type
11791182
set_job_error(&expired[i], "job start time %s expired", ts);
11801183
}
11811184

1182-
move_ret = move_job_to_log(&expired[i], 0);
1185+
move_ret = move_job_to_log(&expired[i], 0, false);
11831186
if(move_ret < 0)
11841187
{
11851188
elog(LOG, "Scheduler manager %s: cannot move %s job %d@%s%s to log",

src/scheduler_spi_utils.c

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,87 @@ int execute_spi(const char *sql, char **error)
351351
return execute_spi_sql_with_args(sql, 0, NULL, NULL, NULL, error);
352352
}
353353

354+
int execute_spi_params_prepared(const char *sql, int nparams, char **params, char **error)
355+
{
356+
int ret = -100;
357+
ErrorData *edata;
358+
MemoryContext old;
359+
int errorSet = 0;
360+
char other[100];
361+
SPIPlanPtr plan;
362+
Oid *paramtypes;
363+
Datum *values;
364+
int i;
365+
366+
*error = NULL;
367+
368+
paramtypes = worker_alloc(sizeof(Oid) * nparams);
369+
values = worker_alloc(sizeof(Datum) * nparams);
370+
for(i=0; i < nparams; i++)
371+
{
372+
paramtypes[i] = TEXTOID;
373+
values[i] = CStringGetTextDatum(params[i]);
374+
}
375+
376+
PG_TRY();
377+
{
378+
plan = SPI_prepare(sql, nparams, paramtypes);
379+
if(plan)
380+
{
381+
ret = SPI_execute_plan(plan, values, NULL, false, 0);
382+
}
383+
}
384+
PG_CATCH();
385+
{
386+
old = switch_to_worker_context();
387+
388+
edata = CopyErrorData();
389+
if(edata->message)
390+
{
391+
*error = _copy_string(edata->message);
392+
}
393+
else if(edata->detail)
394+
{
395+
*error = _copy_string(edata->detail);
396+
}
397+
else
398+
{
399+
*error = _copy_string("unknown error");
400+
}
401+
errorSet = 1;
402+
FreeErrorData(edata);
403+
MemoryContextSwitchTo(old);
404+
FlushErrorState();
405+
}
406+
PG_END_TRY();
407+
408+
pfree(values);
409+
pfree(paramtypes);
410+
411+
if(!errorSet && ret < 0)
412+
{
413+
if(ret == SPI_ERROR_CONNECT)
414+
{
415+
*error = _copy_string("Connection error");
416+
}
417+
else if(ret == SPI_ERROR_COPY)
418+
{
419+
*error = _copy_string("COPY error");
420+
}
421+
else if(ret == SPI_ERROR_OPUNKNOWN)
422+
{
423+
*error = _copy_string("SPI_ERROR_OPUNKNOWN");
424+
}
425+
else if(ret == SPI_ERROR_UNCONNECTED)
426+
{
427+
*error = _copy_string("Unconnected call");
428+
}
429+
else
430+
{
431+
sprintf(other, "error number: %d", ret);
432+
*error = _copy_string(other);
433+
}
434+
}
354435

436+
return ret;
437+
}

src/scheduler_spi_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ char *get_text_from_spi(int row_n, int pos);
2929
Oid get_oid_from_spi(int row_n, int pos, Oid def);
3030
int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *values, char *nulls, char **error);
3131
int execute_spi(const char *sql, char **error);
32+
int execute_spi_params_prepared(const char *sql, int nparams, char **params, char **error);
3233

3334
#endif

0 commit comments

Comments
 (0)