Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d2b77ad

Browse files
author
Vladimir Ershov
committed
avoid with query
1 parent f7e77f4 commit d2b77ad

8 files changed

+255
-31
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATE TYPE job_status_t AS ENUM ('working', 'done', 'error');
77
CREATE TYPE job_at_status_t AS ENUM ('submitted', 'processing', 'done');
8+
CREATE SEQUENCE schedule.at_jobs_submitted_id_seq;
89

910
CREATE TABLE at_jobs_submitted(
10-
id SERIAL PRIMARY KEY,
11+
id int PRIMARY KEY,
1112
node text,
1213
name text,
1314
comments text,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTER TABLE at_jobs_done ADD reason text;
3839
ALTER TABLE at_jobs_done ADD done_time timestamp with time zone default now();
3940

41+
ALTER TABLE at_jobs_submitted ALTER id SET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATE TABLE cron(
4245
id SERIAL PRIMARY KEY,

src/pgpro_scheduler.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ char_array_t *readBasesToCheck(void)
276276
SPI_finish();
277277
PopActiveSnapshot();
278278
CommitTransactionCommand();
279+
elog(ERROR, "cannot select from pg_database");
279280
}
280281
destroyCharArray(names);
281282
processed = SPI_processed;

src/sched_manager_poll.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status = palloc(sizeof(char) * 9);
46+
status = worker_alloc(sizeof(char) * 9);
4747
memcpy(status, "disabled", 8);
4848
status[8] = 0;
4949
return status;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len = dbnames ? strlen(dbnames): 0;
5353
if(len == 0)
5454
{
55-
status = palloc(sizeof(char)*26);
55+
status = worker_alloc(sizeof(char)*26);
5656
memcpy(status, "waiting databases to set", 25);
5757
status[25] = 0;
5858
}
5959
else
6060
{
61-
status = palloc(sizeof(char) * (len + 10));
61+
status = worker_alloc(sizeof(char) * (len + 10));
6262
memcpy(status, "work on: ", 9);
6363
memcpy(status+9, dbnames, len);
6464
status[len+9] = 0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i < (poll->n - 1))
8383
appendStringInfo(&string, ", ");
8484
}
85-
out = palloc(sizeof(char) * (string.len + 1));
85+
out = worker_alloc(sizeof(char) * (string.len + 1));
8686
memcpy(out, string.data, string.len);
8787
out[string.len] = 0;
8888
pfree(string.data);
@@ -310,8 +310,8 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
310310
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
311311
seg = dsm_create(segsize, 0);
312312

313-
man = palloc(sizeof(schd_manager_t));
314-
man->dbname = palloc(sizeof(char *) * (strlen(name) + 1));
313+
man = worker_alloc(sizeof(schd_manager_t));
314+
man->dbname = worker_alloc(sizeof(char *) * (strlen(name) + 1));
315315
man->connected = false;
316316
memcpy(man->dbname, name, strlen(name) + 1);
317317
man->shared = seg;
@@ -331,7 +331,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331331
pos = poll->n++;
332332
poll->workers = poll->workers ?
333333
repalloc(poll->workers, sizeof(schd_manager_t *) * poll->n):
334-
palloc(sizeof(schd_manager_t *));
334+
worker_alloc(sizeof(schd_manager_t *));
335335
poll->workers[pos] = man;
336336
if(sort) _sortPollManagers(poll);
337337

src/scheduler_executor.c

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -696,23 +696,24 @@ void at_executor_worker_main(Datum arg)
696696
int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t *status)
697697
{
698698
char *error = NULL;
699+
char *set_error = NULL;
699700
job_t *job;
700-
int ret;
701+
int ret, set_ret;
701702
char buff[512];
702703

703704
*status = shared->status = SchdExecutorWork;
704705

705706
pgstat_report_activity(STATE_RUNNING, "initialize at job");
706707
START_SPI_SNAP();
707708

708-
job = get_next_at_job_with_lock(shared->nodename, &error);
709-
709+
/* job = get_next_at_job_with_lock(shared->nodename, &error); */
710+
job = get_at_job_for_process(shared->nodename, &error);
710711
if(!job)
711712
{
712713
if(error)
713714
{
714715
shared->status = SchdExecutorIdling;
715-
elog(LOG, "AT EXECUTOR: ERROR: %s", error);
716+
elog(LOG, "AT EXECUTOR ERROR: get job: %s", error);
716717
pfree(error);
717718
ABORT_SPI_SNAP();
718719
return -1;
@@ -722,30 +723,35 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
722723
return 0;
723724
}
724725
current_job_id = job->cron_id;
725-
if(!move_at_job_process(job->cron_id))
726+
/* if(!move_at_job_process(job->cron_id))
726727
{
727728
elog(LOG, "AT EXECUTOR: error move to process");
728729
ABORT_SPI_SNAP();
729730
return -1;
730-
}
731+
} */
731732
STOP_SPI_SNAP(); /* Commit changes */
732-
733-
START_SPI_SNAP();
734733
pgstat_report_activity(STATE_RUNNING, "job initialized");
734+
START_SPI_SNAP();
735735

736736
ResetAllOptions();
737737
if(set_session_authorization_by_name(job->executor, &error) == InvalidOid)
738738
{
739739
if(error)
740740
{
741-
set_at_job_done(job, error, 0);
741+
set_ret = set_at_job_done(job, error, 0, &set_error);
742742
pfree(error);
743743
}
744744
else
745745
{
746-
set_at_job_done(job, "Unknown set session auth error", 0);
746+
set_ret = set_at_job_done(job, "Unknown set session auth error", 0, &set_error);
747747
}
748748
shared->status = SchdExecutorIdling;
749+
if(set_ret < 0)
750+
{
751+
elog(LOG, "AT-EXECUTOR ERROR: move after auth: %s", set_error);
752+
ABORT_SPI_SNAP();
753+
return -1;
754+
}
749755
STOP_SPI_SNAP();
750756
return 1;
751757
}
@@ -778,26 +784,40 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
778784
{
779785
if(error)
780786
{
781-
set_at_job_done(job, error, resubmit_current_job);
787+
set_ret = set_at_job_done(job, error, resubmit_current_job, &set_error);
782788
pfree(error);
783789
}
784790
else
785791
{
786792
sprintf(buff, "error in command: code: %d", ret);
787-
set_at_job_done(job, buff, resubmit_current_job);
793+
set_ret = set_at_job_done(job, buff, resubmit_current_job, &set_error);
788794
}
789795
}
790796
else
791797
{
792-
set_at_job_done(job, NULL, resubmit_current_job);
798+
set_ret = set_at_job_done(job, NULL, resubmit_current_job, &set_error);
793799
}
794-
STOP_SPI_SNAP();
795800

796801
resubmit_current_job = 0;
797802
current_job_id = -1;
798803
pgstat_report_activity(STATE_RUNNING, "finish job processing");
804+
if(set_ret > 0)
805+
{
806+
STOP_SPI_SNAP();
807+
return 1;
808+
}
809+
if(set_error)
810+
{
811+
elog(LOG, "AT_EXECUTOR ERROR: set log: %s", set_error);
812+
pfree(set_error);
813+
}
814+
else
815+
{
816+
elog(LOG, "AT_EXECUTOR ERROR: set log: unknown error");
817+
}
818+
ABORT_SPI_SNAP();
799819

800-
return 1;
820+
return -1;
801821
}
802822

803823
Oid set_session_authorization_by_name(char *rolename, char **error)

0 commit comments

Comments
 (0)