Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 73569c8

Browse files
author
Vladimir Ershov
committed
next time
new custom var bug fixes
1 parent 1340976 commit 73569c8

File tree

6 files changed

+188
-96
lines changed

6 files changed

+188
-96
lines changed

pgpro_scheduler--1.0.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ CREATE TABLE schedule.cron(
2424
max_instances integer default 1,
2525
start_date timestamp with time zone,
2626
end_date timestamp with time zone,
27-
reason text
27+
reason text,
28+
_next_exec_time timestamp with time zone
2829
);
2930

3031
CREATE TABLE schedule.at(

src/pgpro_scheduler.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ volatile sig_atomic_t got_sigterm = false;
3939

4040
static char *sched_databases = "";
4141
static char *sched_nodename = "master";
42+
static char *sched_transaction_state = "undefined";
4243
static int sched_max_workers = 2;
4344

4445
extern void
@@ -91,7 +92,7 @@ char *make_date_from_timestamp(TimestampTz ts)
9192
fsec_t fsec;
9293
const char *tzn;
9394

94-
timestamp2tm(ts, &tz, &dt, &fsec, &tzn, NULL ); /* TODO ERROR */
95+
timestamp2tm(ts, &tz, &dt, &fsec, &tzn, NULL );
9596
sprintf(str, "%04d-%02d-%02d %02d:%02d", dt.tm_year , dt.tm_mon,
9697
dt.tm_mday, dt.tm_hour, dt.tm_min);
9798
return str;
@@ -180,13 +181,11 @@ char_array_t *readBasesToCheck(void)
180181
pfree(clean_value);
181182
return result;
182183
}
183-
elog(LOG, "clean value: %s [%d,%d]", clean_value, cv_len, nnames);
184184
names = makeCharArray();
185185
for(i=0; i < cv_len + 1; i++)
186186
{
187187
if(clean_value[i] == 0)
188188
{
189-
elog(LOG, "start position: %d", start_pos);
190189
ptr = clean_value + start_pos;
191190
if(strlen(ptr)) pushCharArray(names, ptr);
192191
start_pos = i + 1;
@@ -259,7 +258,6 @@ void parent_scheduler_main(Datum arg)
259258

260259
BackgroundWorkerInitializeConnection("postgres", NULL);
261260
names = readBasesToCheck();
262-
elog(LOG, "GOT NAMES");
263261
poll = initSchedulerManagerPool(names);
264262
destroyCharArray(names);
265263

@@ -374,6 +372,18 @@ void _PG_init(void)
374372
NULL,
375373
NULL
376374
);
375+
DefineCustomStringVariable(
376+
"schedule.transaction_state",
377+
"State of scheduler executor transaction",
378+
"If not under scheduler executor process the variable has no mean and has a value = 'undefined', possible values: progress, success, failure",
379+
&sched_transaction_state ,
380+
"undefined",
381+
PGC_INTERNAL,
382+
0,
383+
NULL,
384+
NULL,
385+
NULL
386+
);
377387
DefineCustomIntVariable(
378388
"schedule.max_workers",
379389
"How much workers can serve scheduler on one database",

src/scheduler_executor.c

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,12 @@ void executor_worker_main(Datum arg)
5858
dsm_segment *seg;
5959
job_t *job;
6060
int i;
61-
bool success = true;
6261
executor_error_t EE;
6362
int ret;
6463
char *error = NULL;
65-
bool use_pg_vars = true;
64+
/* bool use_pg_vars = true; */
65+
/* bool success = true; */
6666
schd_executor_status_t status;
67-
/* int rc = 0; */
6867

6968
EE.n = 0;
7069
EE.errors = NULL;
@@ -127,6 +126,7 @@ void executor_worker_main(Datum arg)
127126
CHECK_FOR_INTERRUPTS();
128127
/* rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 0);
129128
ResetLatch(MyLatch); */
129+
SetConfigOption("schedule.transaction_state", "running", PGC_INTERNAL, PGC_S_SESSION);
130130

131131
if(job->same_transaction)
132132
{
@@ -143,8 +143,9 @@ void executor_worker_main(Datum arg)
143143
ret = execute_spi(job->dosql[i], &error);
144144
if(ret < 0)
145145
{
146-
success = false;
146+
/* success = false; */
147147
status = SchdExecutorError;
148+
SetConfigOption("schedule.transaction_state", "failure", PGC_INTERNAL, PGC_S_SESSION);
148149
if(error)
149150
{
150151
push_executor_error(&EE, "error on %d: %s", i+1, error);
@@ -174,13 +175,15 @@ void executor_worker_main(Datum arg)
174175
STOP_SPI_SNAP();
175176
}
176177
status = SchdExecutorDone;
178+
SetConfigOption("schedule.transaction_state", "success", PGC_INTERNAL, PGC_S_SESSION);
177179
}
178180
if(job->next_time_statement)
179181
{
180-
if(use_pg_vars) /* may be to define custom var is better */
182+
/* if(use_pg_vars)
181183
{
182184
set_pg_var(success, &EE);
183185
}
186+
*/
184187
shared->next_time = get_next_excution_time(job->next_time_statement, &EE);
185188
}
186189
pgstat_report_activity(STATE_RUNNING, "finish job processing");
@@ -261,7 +264,7 @@ void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee)
261264
}
262265
else
263266
{
264-
memcpy(ptr, ", ", 2);
267+
memcpy(ptr, "; ", 2);
265268
left -= 2;
266269
ptr += 2;
267270
}
@@ -277,7 +280,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
277280
bool isnull;
278281

279282
START_SPI_SNAP();
280-
pgstat_report_activity(STATE_RUNNING, "finish job processing");
283+
pgstat_report_activity(STATE_RUNNING, "culc next time execution time");
281284
ret = execute_spi(sql, &error);
282285
if(ret < 0)
283286
{
@@ -333,7 +336,6 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
333336
{
334337
if(error)
335338
{
336-
elog(LOG, "EXECUTOR: onrollback error: %s", error);
337339
push_executor_error(ee, "onrollback error: %s", error);
338340
pfree(error);
339341
}
@@ -371,7 +373,7 @@ void set_pg_var(bool result, executor_error_t *ee)
371373
}
372374
else
373375
{
374-
push_executor_error(ee, "set variable: code: %d", ret);
376+
push_executor_error(ee, "set variable error code: %d", ret);
375377
}
376378
}
377379
}

src/scheduler_job.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2525
int ret, got, i;
2626
Oid argtypes[1] = { TEXTOID };
2727
Datum values[1];
28-
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
28+
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2929

3030
*is_error = *n = 0;
3131
START_SPI_SNAP();
@@ -48,6 +48,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
4848
jobs[i].max_instances = get_int_from_spi(i, 5, 1);
4949
jobs[i].node = _copy_string(nodename);
5050
jobs[i].executor = get_text_from_spi(i, 6);
51+
jobs[i].next_time_statement = get_text_from_spi(i, 7);
5152
}
5253
}
5354
}

0 commit comments

Comments
 (0)