Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b13f7e8

Browse files
author
Vladimir Ershov
committed
Merge commit 'c24a589ef9b0164e766e9ce3a4b85855aa6afc98' into PGPROEE9_6_scheduler
2 parents 4f1dd65 + c24a589 commit b13f7e8

File tree

4 files changed

+29
-10
lines changed

4 files changed

+29
-10
lines changed

contrib/pgpro_scheduler/src/scheduler_job.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ job_t *get_at_job_for_process(MemoryContext mem, char *nodename, char **error)
171171
int i;
172172
char *oldpath;
173173

174-
const char *get_job_sql = "select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
174+
/* const char *get_job_sql = "select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED"; */
175+
const char *get_job_sql = "select * from schedule.at_jobs_submitted s where ((not exists ( select * from schedule.at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from schedule.at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL AND not exists ( select * from schedule.at_jobs_done d where d.id = any(s.depends_on) and d.status=false)) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED ";
175176
const char *insert_sql = "insert into at_jobs_process values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)";
176177
spi_response_t *r;
177178
spi_response_t *r2;

contrib/pgpro_scheduler/src/scheduler_manager.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include "utils/builtins.h"
2626
#include "utils/timestamp.h"
2727
#include <sys/time.h>
28+
#include "utils/lsyscache.h"
29+
#include "catalog/namespace.h"
2830

2931
#include "char_array.h"
3032
#include "sched_manager_poll.h"
@@ -1376,7 +1378,7 @@ int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz
13761378
return ret;
13771379
}
13781380

1379-
int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
1381+
int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type, Oid at_reloid)
13801382
{
13811383
job_t *expired;
13821384
int nexpired = 0;
@@ -1396,10 +1398,14 @@ int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type
13961398
pgstat_report_activity(STATE_RUNNING, "vanish expired tasks");
13971399

13981400
START_SPI_SNAP();
1401+
if(type == AtJob)
1402+
{
1403+
ts_hires = true;
1404+
scheduler_atjob_id_OID = get_atttype(at_reloid, 1);
1405+
}
13991406
expired = type == CronJob ?
14001407
get_expired_cron_jobs(ctx->nodename, &nexpired, &is_error):
14011408
get_expired_at_jobs(ctx->nodename, &nexpired, &is_error);
1402-
if(type == AtJob) ts_hires = true;
14031409

14041410
if(is_error)
14051411
{
@@ -1760,6 +1766,7 @@ void manager_worker_main(Datum arg)
17601766
schd_manager_share_t *parent_shared;
17611767
MemoryContext old = NULL;
17621768
MemoryContext longTerm;
1769+
Oid reloid;
17631770

17641771

17651772
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
@@ -1814,6 +1821,17 @@ void manager_worker_main(Datum arg)
18141821
longTerm = init_mem_ctx("long term context for slots");
18151822
ctx = initialize_scheduler_manager_context(longTerm, database, seg);
18161823

1824+
START_SPI_SNAP();
1825+
reloid = RangeVarGetRelid(makeRangeVarFromNameList(
1826+
stringToQualifiedNameList("schedule.at_jobs_submitted")), NoLock, true);
1827+
STOP_SPI_SNAP();
1828+
if(reloid == InvalidOid)
1829+
{
1830+
ereport(ERROR,
1831+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1832+
errmsg("scheduler manager cannot find jobs table")));
1833+
}
1834+
18171835
start_at_workers(ctx, shared);
18181836
clean_at_table(ctx);
18191837
set_slots_stat_report(ctx);
@@ -1850,8 +1868,8 @@ void manager_worker_main(Datum arg)
18501868
set_slots_stat_report(ctx);
18511869
/* if there are any expired jobs to get rid of */
18521870

1853-
scheduler_vanish_expired_jobs(ctx, AtJob);
1854-
scheduler_vanish_expired_jobs(ctx, CronJob);
1871+
scheduler_vanish_expired_jobs(ctx, AtJob, reloid);
1872+
scheduler_vanish_expired_jobs(ctx, CronJob, reloid);
18551873
}
18561874
}
18571875

contrib/pgpro_scheduler/src/scheduler_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void fill_cron_array_from_rule(Jsonb *J, const char *name, bit_array_t *ce, int
104104
bool is_cron_fit_timestamp(bit_array_t *cron, TimestampTz timestamp);
105105
char **get_dates_array_from_rule(scheduler_task_t *task, int *num);
106106
int get_integer_from_jsonbval(JsonbValue *ai, int def);
107-
int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type);
107+
int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type, Oid at_reloid);
108108
int how_many_instances_on_work(scheduler_manager_ctx_t *ctx, job_t *job);
109109
int insert_at_record(char *nodename, int cron_id, TimestampTz start_at, TimestampTz postpone, char **error);
110110
int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job);

contrib/pgpro_scheduler/test/perl/runtest.pl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@
6565
"ALTER SYSTEM SET schedule.enabled = on",
6666
"SELECT pg_reload_conf();",
6767
"CREATE TABLE test_results( time_mark timestamp, commentary text )",
68-
"DROP ROLE IF EXISTS _pgpro_tester",
69-
"CREATE ROLE _pgpro_tester",
70-
"GRANT INSERT ON test_results TO _pgpro_tester",
68+
"DROP ROLE IF EXISTS tester",
69+
"CREATE ROLE tester",
70+
"GRANT INSERT ON test_results TO tester",
7171
);
7272
map { __do_sql($dbh, $_) } @sql2;
7373
$dbh->disconnect();
@@ -87,7 +87,7 @@
8787
my $harness = TAP::Harness->new( \%args );
8888
my @tests = glob( 't/*.t' );
8989
### @tests = ('t/jobMaxRunTime.t');
90-
$harness->runtests(@tests );
90+
$harness->runtests(@tests);
9191

9292

9393
sub __do_sql

0 commit comments

Comments
 (0)