Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b38692b

Browse files
author
Vladimir Ershov
committed
up && running
1 parent 5eae385 commit b38692b

9 files changed

+442
-256
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 301 additions & 216 deletions
Large diffs are not rendered by default.

src/pgpro_scheduler.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ int get_integer_from_string(char *s, int start, int len)
113113
char *make_date_from_timestamp(TimestampTz ts, bool hires)
114114
{
115115
struct pg_tm dt;
116-
char *str = worker_alloc(sizeof(char) * 19);
116+
char *str = worker_alloc(sizeof(char) * 20);
117117
int tz;
118118
fsec_t fsec;
119119
const char *tzn;

src/scheduler_executor.c

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
extern volatile sig_atomic_t got_sighup;
3636
extern volatile sig_atomic_t got_sigterm;
3737

38+
static int64 current_job_id = -1;
39+
static int resubmit_current_job = 0;
40+
3841
static void handle_sigterm(SIGNAL_ARGS);
3942

4043
static void
@@ -85,6 +88,8 @@ void executor_worker_main(Datum arg)
8588
errmsg("executor corrupted dynamic shared memory segment")));
8689
}
8790
status = shared->status = SchdExecutorWork;
91+
shared->message[0] = 0;
92+
8893
SetConfigOption("application_name", "pgp-s executor", PGC_USERSET, PGC_S_SESSION);
8994
pgstat_report_activity(STATE_RUNNING, "initialize");
9095
init_worker_mem_ctx("ExecutorMemoryContext");
@@ -94,13 +99,16 @@ void executor_worker_main(Datum arg)
9499
job = initializeExecutorJob(shared);
95100
if(!job)
96101
{
97-
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
102+
if(shared->message[0] == 0)
103+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
98104
"Cannot retrive job information");
99105
shared->status = SchdExecutorError;
100106
delete_worker_mem_ctx();
101107
dsm_detach(seg);
102108
proc_exit(0);
103109
}
110+
current_job_id = job->cron_id;
111+
pgstat_report_activity(STATE_RUNNING, "job initialized");
104112

105113
if(set_session_authorization(job->executor, &error) < 0)
106114
{
@@ -195,6 +203,7 @@ void executor_worker_main(Datum arg)
195203
sprintf(shared->set_invalid_reason, "unable to execute next time statement");
196204
}
197205
}
206+
current_job_id = -1;
198207
pgstat_report_activity(STATE_RUNNING, "finish job processing");
199208

200209
if(EE.n > 0)
@@ -391,11 +400,19 @@ job_t *initializeExecutorJob(schd_executor_share_t *data)
391400
{
392401
job_t *J;
393402
char *error = NULL;
403+
const char *schema;
404+
const char *old_path;
405+
406+
old_path = GetConfigOption("search_path", false, true);
407+
schema = GetConfigOption("schedule.schema", false, true);
408+
SetConfigOption("search_path", schema, PGC_USERSET, PGC_S_SESSION);
394409

395410
J = data->type == CronJob ?
396411
get_cron_job(data->cron_id, data->start_at, data->nodename, &error):
397412
get_at_job(data->cron_id, data->nodename, &error);
398413

414+
SetConfigOption("search_path", old_path, PGC_USERSET, PGC_S_SESSION);
415+
399416
if(error)
400417
{
401418
snprintf(data->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
@@ -441,3 +458,25 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
441458

442459
return e->n;
443460
}
461+
462+
PG_FUNCTION_INFO_V1(get_self_id);
463+
Datum
464+
get_self_id(PG_FUNCTION_ARGS)
465+
{
466+
if(current_job_id == -1)
467+
{
468+
elog(ERROR, "There is no active job in progress");
469+
}
470+
PG_RETURN_INT64(current_job_id);
471+
}
472+
473+
PG_FUNCTION_INFO_V1(resubmit);
474+
Datum
475+
resubmit(PG_FUNCTION_ARGS)
476+
{
477+
if(current_job_id == -1)
478+
{
479+
elog(ERROR, "There is no active job in progress");
480+
}
481+
PG_RETURN_BOOL(true);
482+
}

src/scheduler_executor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ void set_pg_var(bool resulti, executor_error_t *ee);
4848
int push_executor_error(executor_error_t *e, char *fmt, ...) pg_attribute_printf(2, 3);
4949
int set_session_authorization(char *username, char **error);
5050

51+
extern Datum get_self_id(PG_FUNCTION_ARGS);
52+
extern Datum resubmit(PG_FUNCTION_ARGS);
53+
5154

5255
#endif
5356

src/scheduler_job.c

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *init_scheduler_job(job_t *j, unsigned char type)
2525
job_t *get_at_job(int cron_id, char *nodename, char **perror)
2626
{
2727
job_t *j;
28-
const char *sql = "select last_start_available, same_transaction, do_sql, executor, postpone, max_run_time as time_limit, max_instances, onrollback_statement, start_at from at_process where node = $1 and id = $2";
28+
const char *sql = "select last_start_available, array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit, at, params, depends_on from ONLY at_jobs_process where node = $1 and id = $2";
2929
Oid argtypes[2] = { TEXTOID, INT4OID};
3030
Datum args[2];
3131
int ret;
@@ -60,15 +60,16 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6060
*perror = _copy_string(buffer);
6161
return NULL;
6262
}
63-
STOP_SPI_SNAP();
6463

6564
j = init_scheduler_job(NULL, AtJob);
6665
j->node = _copy_string(nodename);
67-
j->same_transaction = get_boolean_from_spi(0, 2, false);
68-
j->dosql = get_textarray_from_spi(0, 3, &j->dosql_n);
69-
j->executor = get_text_from_spi(0, 4);
70-
j->onrollback = get_text_from_spi(0, 8);
71-
j->start_at = get_timestamp_from_spi(0, 9, 0);
66+
j->dosql = get_textarray_from_spi(0, 2, &j->dosql_n);
67+
j->executor = get_text_from_spi(0, 3);
68+
j->start_at = get_timestamp_from_spi(0, 6, 0);
69+
j->sql_params = get_textarray_from_spi(0, 7, &j->sql_params_n);
70+
j->depends_on = get_int64array_from_spi(0, 8, &j->depends_on_n);
71+
72+
STOP_SPI_SNAP();
7273

7374
*perror = NULL;
7475
return j;
@@ -125,7 +126,6 @@ job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **pe
125126
pfree(ts);
126127
return NULL;
127128
}
128-
STOP_SPI_SNAP();
129129

130130
j = init_scheduler_job(NULL, CronJob);
131131
j->start_at = start_at;
@@ -135,6 +135,7 @@ job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **pe
135135
j->executor = get_text_from_spi(0, 4);
136136
j->onrollback = get_text_from_spi(0, 8);
137137
j->next_time_statement = get_text_from_spi(0, 9);
138+
STOP_SPI_SNAP();
138139

139140
*perror = NULL;
140141
return j;
@@ -167,7 +168,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
167168
*is_error = *n = 0;
168169
START_SPI_SNAP();
169170
values[0] = CStringGetTextDatum(nodename);
170-
values[1] = Int32GetDatum(limit);
171+
values[1] = Int32GetDatum(limit+1);
171172
ret = SPI_execute_with_args(get_job_sql, 2, argtypes, values, NULL, true, 0);
172173
if(ret == SPI_OK_SELECT)
173174
{
@@ -208,7 +209,7 @@ job_t *_cron_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
208209
*is_error = *n = 0;
209210
START_SPI_SNAP();
210211
values[0] = CStringGetTextDatum(nodename);
211-
values[1] = Int32GetDatum(limit);
212+
values[1] = Int32GetDatum(limit + 1);
212213
ret = SPI_execute_with_args(get_job_sql, 2, argtypes, values, NULL, true, 0);
213214
if(ret == SPI_OK_SELECT)
214215
{
@@ -409,6 +410,15 @@ void destroy_job(job_t *j, int selfdestroy)
409410
}
410411
pfree(j->dosql);
411412
}
413+
if(j->sql_params_n && j->sql_params)
414+
{
415+
for(i=0; i < j->sql_params_n; i++)
416+
{
417+
if(j->sql_params[i]) pfree(j->sql_params[i]);
418+
}
419+
pfree(j->sql_params);
420+
}
421+
if(j->depends_on_n && j->depends_on) pfree(j->depends_on);
412422

413423
if(selfdestroy) pfree(j);
414424
}

src/scheduler_job.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ typedef struct {
2424
bool same_transaction;
2525
int dosql_n;
2626
char **dosql;
27+
int sql_params_n;
28+
char **sql_params;
29+
int depends_on_n;
30+
int64 *depends_on;
2731
TimestampTz postpone;
2832
char *executor;
2933
char *owner;

src/scheduler_manager.c

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -810,12 +810,11 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
810810
interval = 2;
811811
}
812812

813-
814-
if(*check_time > GetCurrentTimestamp()) return -1;
813+
if(*check_time > GetCurrentTimestamp()) return 0;
815814
if(p->free == 0)
816815
{
817-
*check_time = timestamp_add_seconds(0, 1);
818-
return -2;
816+
if(type == CronJob) *check_time = timestamp_add_seconds(0, 1);
817+
return 1;
819818
}
820819

821820
jobs = get_jobs_to_do(ctx->nodename, type, &njobs, &is_error, p->free);
@@ -825,7 +824,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
825824
{
826825
*check_time = timestamp_add_seconds(0, interval);
827826
elog(LOG, "Error while retrieving jobs");
828-
return -3;
827+
return 0;
829828
}
830829
if(nwaiting == 0)
831830
{
@@ -897,7 +896,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
897896

898897
if(nwaiting > 0)
899898
{
900-
interval = 1;
899+
interval = type == CronJob ? 1: 0;
901900
}
902901
else
903902
{
@@ -917,7 +916,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
917916
}
918917

919918
*check_time = timestamp_add_seconds(0, interval);
920-
return 1;
919+
return nwaiting;
921920
}
922921

923922
void destroy_slot_item(scheduler_manager_slot_t *item)
@@ -1396,6 +1395,8 @@ void manager_worker_main(Datum arg)
13961395
schd_manager_share_t *shared;
13971396
dsm_segment *seg;
13981397
scheduler_manager_ctx_t *ctx;
1398+
int wait = 0;
1399+
int terminate_main_loop = 0;
13991400

14001401
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
14011402
seg = dsm_attach(DatumGetInt32(arg));
@@ -1469,35 +1470,31 @@ void manager_worker_main(Datum arg)
14691470
}
14701471
if(!got_sighup && !got_sigterm)
14711472
{
1472-
if(rc & WL_LATCH_SET)
1473+
terminate_main_loop = 0;
1474+
while(1)
14731475
{
1474-
_pdebug("got latch from some bgworker");
1475-
if(check_parent_stop_signal(ctx)) break;
1476-
scheduler_start_jobs(ctx, AtJob);
1477-
scheduler_check_slots(ctx, &(ctx->cron));
1478-
scheduler_check_slots(ctx, &(ctx->at));
1479-
set_slots_stat_report(ctx);
1480-
_pdebug("quit got latch");
1481-
}
1482-
else if(rc & WL_TIMEOUT)
1483-
{
1484-
scheduler_make_atcron_record(ctx);
1485-
/* if there are any expired jobs to get rid of */
1486-
scheduler_vanish_expired_jobs(ctx, AtJob);
1487-
scheduler_vanish_expired_jobs(ctx, CronJob);
1488-
/* start jobs */
1489-
scheduler_start_jobs(ctx, AtJob);
1490-
scheduler_start_jobs(ctx, CronJob);
1491-
/* check slots, first "at" 'cause them faster */
1476+
wait = 0;
1477+
if(check_parent_stop_signal(ctx))
1478+
{
1479+
terminate_main_loop = 1;
1480+
break;
1481+
}
1482+
wait += scheduler_start_jobs(ctx, AtJob);
1483+
wait += scheduler_start_jobs(ctx, CronJob);
14921484
scheduler_check_slots(ctx, &(ctx->at));
14931485
scheduler_check_slots(ctx, &(ctx->cron));
1494-
/* set statistics of working slots */
14951486
set_slots_stat_report(ctx);
1487+
if(wait == 0) break;
14961488
}
1489+
if(terminate_main_loop) break;
1490+
scheduler_make_atcron_record(ctx);
1491+
/* if there are any expired jobs to get rid of */
1492+
scheduler_vanish_expired_jobs(ctx, AtJob);
1493+
scheduler_vanish_expired_jobs(ctx, CronJob);
14971494
}
14981495
}
14991496
rc = WaitLatch(MyLatch,
1500-
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L);
1497+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 500L);
15011498
ResetLatch(MyLatch);
15021499
}
15031500
scheduler_manager_stop(ctx);

src/scheduler_spi_utils.c

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,53 @@ bool get_boolean_from_spi(int row_n, int pos, bool def)
8080
return DatumGetBool(datum);
8181
}
8282

83+
int64 *get_int64array_from_spi(int row_n, int pos, int *N)
84+
{
85+
Datum datum;
86+
bool is_null;
87+
ArrayType *input;
88+
Datum *datums;
89+
bool i_typbyval;
90+
char i_typalign;
91+
int16 i_typlen;
92+
int len, i, arr_len;
93+
bool *nulls;
94+
int64 *result;
95+
96+
*N = 0;
97+
98+
datum = SPI_getbinval(SPI_tuptable->vals[row_n], SPI_tuptable->tupdesc,
99+
pos, &is_null);
100+
if(is_null) return NULL;
101+
102+
input = DatumGetArrayTypeP(datum);
103+
if(ARR_ELEMTYPE(input) != INT8OID)
104+
{
105+
return NULL;
106+
}
107+
get_typlenbyvalalign(INT8OID, &i_typlen, &i_typbyval, &i_typalign);
108+
deconstruct_array(input, INT8OID, i_typlen, i_typbyval, i_typalign, &datums, &nulls, &len);
109+
110+
if(len == 0) return NULL;
111+
arr_len = len;
112+
113+
for(i=0; i < len; i++)
114+
{
115+
if(nulls[i]) arr_len--;
116+
}
117+
result = worker_alloc(sizeof(int64) * arr_len);
118+
for(i=0; i < len; i++)
119+
{
120+
if(!nulls[i])
121+
{
122+
result[*N] = Int64GetDatum(datums[i]);
123+
(*N)++;
124+
}
125+
}
126+
127+
return result;
128+
}
129+
83130
char **get_textarray_from_spi(int row_n, int pos, int *N)
84131
{
85132
Datum datum;

src/scheduler_spi_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Datum select_onedatumvalue_sql(const char *sql, bool *is_null);
2323
int select_count_with_args(const char *sql, int n, Oid *argtypes, Datum *values, char *nulls);
2424
long int get_interval_seconds_from_spi(int row_n, int pos, long def);
2525
char **get_textarray_from_spi(int row_n, int pos, int *N);
26+
int64 *get_int64array_from_spi(int row_n, int pos, int *N);
2627
bool get_boolean_from_spi(int row_n, int pos, bool def);
2728
char *get_text_from_spi(int row_n, int pos);
2829
Oid get_oid_from_spi(int row_n, int pos, Oid def);

0 commit comments

Comments
 (0)