Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b8b4a86

Browse files
author
Vladimir Ershov
committed
add GUC schema var - what scheme in use now
1 parent 52e3f73 commit b8b4a86

File tree

4 files changed

+63
-50
lines changed

4 files changed

+63
-50
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ pgpro_scheduler это расширение PostgreSQL и не требует н
4141
* **schedule.database** - строковая переменная, указывает с какими базам может
4242
работать планировщик. Что бы указать несколько баз, нужно перечислить их
4343
имена через запятую. По умолчанию - пустая строка.
44+
* **schedule.scheme** - строковая переменная, указывает в какой `scheme`
45+
находятся служебные таблицы планировщика. Для изменения требуется
46+
перезагрузка. Обычно ее не надо менять. Может использоваться для работы
47+
на реплике, если используется foreign data wrapper. По умолчанию -
48+
schedule.
4449
* **schedule.nodename** - строковая переменная, содержит название узла.
4550
По умолчанию - master. Если расширение используется на одной машине,
4651
то переменная не имеет смысла.

src/pgpro_scheduler.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ volatile sig_atomic_t got_sighup = false;
4040
volatile sig_atomic_t got_sigterm = false;
4141

4242
/* Custom GUC variables */
43-
static char *scheduler_databases = "";
44-
static char *scheduler_nodename = "master";
45-
static char *scheduler_transaction_state = "undefined";
43+
static char *scheduler_databases = NULL;
44+
static char *scheduler_nodename = NULL;
45+
static char *scheduler_transaction_state = NULL;
4646
static int scheduler_max_workers = 2;
4747
static bool scheduler_service_enabled = false;
48+
static char *scheduler_schema = NULL;
4849
/* Custom GUC done */
4950

5051
extern void
@@ -398,7 +399,18 @@ pg_scheduler_startup(void)
398399

399400
void _PG_init(void)
400401
{
401-
RequestAddinShmemSpace(1000);
402+
DefineCustomStringVariable(
403+
"schedule.schema",
404+
"The name of scheduler schema",
405+
NULL,
406+
&scheduler_schema,
407+
"schedule",
408+
PGC_POSTMASTER,
409+
0,
410+
NULL,
411+
NULL,
412+
NULL
413+
);
402414
DefineCustomStringVariable(
403415
"schedule.database",
404416
"On which databases scheduler could be run",

src/scheduler_job.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2525
int ret, got, i;
2626
Oid argtypes[1] = { TEXTOID };
2727
Datum values[1];
28-
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
28+
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement from at at, cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2929

3030
*is_error = *n = 0;
3131
START_SPI_SNAP();
@@ -68,7 +68,7 @@ job_t *get_expired_jobs(char *nodename, int *n, int *is_error)
6868

6969
*n = *is_error = 0;
7070
initStringInfo(&sql);
71-
appendStringInfo(&sql, "select start_at, last_start_available, cron, started, active from schedule.at where last_start_available < 'now' and not active and node = '%s'", nodename);
71+
appendStringInfo(&sql, "select start_at, last_start_available, cron, started, active from at where last_start_available < 'now' and not active and node = '%s'", nodename);
7272
ret = SPI_execute(sql.data, true, 0);
7373
if(ret == SPI_OK_SELECT)
7474
{
@@ -115,8 +115,8 @@ int move_job_to_log(job_t *j, bool status)
115115
char nulls[4] = { ' ', ' ', ' ', ' ' };
116116
Oid argtypes[4] = { BOOLOID, TEXTOID, INT4OID, TIMESTAMPTZOID };
117117
int ret;
118-
const char *del_sql = "delete from schedule.at where start_at = $1 and cron = $2";
119-
const char *sql = "insert into schedule.log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message from schedule.at where cron = $3 and start_at = $4";
118+
const char *del_sql = "delete from at where start_at = $1 and cron = $2";
119+
const char *sql = "insert into log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message from at where cron = $3 and start_at = $4";
120120

121121
/* in perl was this at first $status = 0 if $job->{spoiled}; skip so far */
122122

src/scheduler_manager.c

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -44,55 +44,51 @@ extern volatile sig_atomic_t got_sigterm;
4444

4545
int checkSchedulerNamespace(void)
4646
{
47-
const char *sql = "select count(*) from pg_namespace where nspname = 'schedule'";
48-
int found = 0;
49-
int ret;
50-
int ntup;
51-
bool isnull;
47+
const char *sql = "select count(*) from pg_catalog.pg_namespace where nspname = $1";
48+
int count = 0;
49+
const char *schema;
50+
Oid argtypes[1] = { TEXTOID };
51+
Datum values[1];
5252

53-
pgstat_report_activity(STATE_RUNNING, "initialize: check namespace");
5453
SetCurrentStatementStartTimestamp();
54+
pgstat_report_activity(STATE_RUNNING, "initialize: check namespace");
55+
56+
schema = GetConfigOption("schedule.schema", false, true);
57+
5558
StartTransactionCommand();
5659
SPI_connect();
5760
PushActiveSnapshot(GetTransactionSnapshot());
5861

59-
ret = SPI_execute(sql, true, 0);
60-
if(ret == SPI_OK_SELECT && SPI_processed == 1)
62+
values[0] = CStringGetTextDatum(schema);
63+
count = select_count_with_args(sql, 1, argtypes, values, NULL);
64+
65+
if(count == -1)
6166
{
62-
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
63-
SPI_tuptable->tupdesc, 1, &isnull)
64-
);
65-
if(!isnull && ntup == 1)
66-
{
67-
found = 1;
68-
}
69-
else if(isnull)
70-
{
71-
elog(LOG, "Scheduler manager: %s: cannot check namespace: count return null",
72-
MyBgworkerEntry->bgw_name);
73-
}
74-
else if(ntup > 1)
75-
{
76-
elog(LOG, "Scheduler manager: %s: cannot check namespace: found %d namespaces",
77-
MyBgworkerEntry->bgw_name, ntup);
78-
}
67+
elog(ERROR, "Scheduler manager: %s: cannot check namespace: sql error",
68+
MyBgworkerEntry->bgw_name);
7969
}
80-
else if(ret != SPI_OK_SELECT)
70+
else if(count > 1 || count == 0 )
8171
{
82-
elog(LOG, "Scheduler manager: %s: cannot check namespace: error code %d",
83-
MyBgworkerEntry->bgw_name, ret);
72+
elog(LOG, "Scheduler manager: %s: cannot check namespace: found %d namespaces",
73+
MyBgworkerEntry->bgw_name, count);
8474
}
85-
else if(SPI_processed != 1)
75+
else if(count == -2)
8676
{
87-
elog(LOG, "Scheduler manager: %s: cannot check namespace: count return %ud tups",
88-
MyBgworkerEntry->bgw_name,
89-
(unsigned)SPI_processed);
77+
elog(LOG, "Scheduler manager: %s: cannot check namespace: count return null",
78+
MyBgworkerEntry->bgw_name);
9079
}
80+
else if(count != 1)
81+
{
82+
elog(ERROR, "Scheduler manager: %s: cannot check namespace: unknown error %d",
83+
MyBgworkerEntry->bgw_name, count);
84+
}
85+
9186
SPI_finish();
9287
PopActiveSnapshot();
9388
CommitTransactionCommand();
89+
if(count) SetConfigOption("search_path", "schedule", PGC_USERSET, PGC_S_SESSION);
9490

95-
return found;
91+
return count;
9692
}
9793

9894
int get_scheduler_maxworkers(void)
@@ -269,7 +265,7 @@ scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *
269265

270266
*nt = 0;
271267
initStringInfo(&sql);
272-
appendStringInfo(&sql, "select id, rule, postpone, _next_exec_time, next_time_statement from schedule.cron where active and not broken and (start_date <= 'now' or start_date is null) and (end_date <= 'now' or end_date is null) and node = '%s'", ctx->nodename);
268+
appendStringInfo(&sql, "select id, rule, postpone, _next_exec_time, next_time_statement from cron where active and not broken and (start_date <= 'now' or start_date is null) and (end_date <= 'now' or end_date is null) and node = '%s'", ctx->nodename);
273269

274270
pgstat_report_activity(STATE_RUNNING, "select 'at' tasks");
275271

@@ -590,7 +586,7 @@ int how_many_instances_on_work(scheduler_manager_ctx_t *ctx, int cron_id)
590586
int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
591587
{
592588
scheduler_manager_slot_t *item;
593-
const char *sql = "update schedule.at set started = 'now'::timestamp with time zone, active = true where cron = $1 and start_at = $2";
589+
const char *sql = "update at set started = 'now'::timestamp with time zone, active = true where cron = $1 and start_at = $2";
594590
Datum values[2];
595591
Oid argtypes[2] = {INT4OID, TIMESTAMPTZOID};
596592
int ret;
@@ -977,7 +973,7 @@ int mark_job_broken(scheduler_manager_ctx_t *ctx, int cron_id, char *reason)
977973
Oid types[2] = { INT4OID, TEXTOID };
978974
Datum values[2];
979975
char *error;
980-
char *sql = "update schedule.cron set reason = $2, broken = true where id = $1";
976+
char *sql = "update cron set reason = $2, broken = true where id = $1";
981977
int ret;
982978

983979
values[0] = Int32GetDatum(cron_id);
@@ -997,7 +993,7 @@ int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz
997993
bool nulls[2] = { ' ', ' ' };
998994
char *error;
999995
int ret;
1000-
char *sql = "update schedule.cron set _next_exec_time = $2 where id = $1";
996+
char *sql = "update cron set _next_exec_time = $2 where id = $1";
1001997

1002998
values[0] = Int32GetDatum(cron_id);
1003999
if(next > 0)
@@ -1073,9 +1069,9 @@ int insert_at_record(char *nodename, int cron_id, TimestampTz start_at, Timestam
10731069
Datum values[4];
10741070
char nulls[4] = { ' ', ' ', ' ', ' ' };
10751071
Oid argtypes[4];
1076-
char *insert_sql = "insert into schedule.at (start_at, last_start_available, node, retry, cron, active) values ($1, $2, $3, 0, $4, false)";
1077-
char *at_sql = "select count(start_at) from schedule.at where cron = $1 and start_at = $2";
1078-
char *log_sql = "select count(start_at) from schedule.log where cron = $1 and start_at = $2";
1072+
char *insert_sql = "insert into at (start_at, last_start_available, node, retry, cron, active) values ($1, $2, $3, 0, $4, false)";
1073+
char *at_sql = "select count(start_at) from at where cron = $1 and start_at = $2";
1074+
char *log_sql = "select count(start_at) from log where cron = $1 and start_at = $2";
10791075
int count, ret;
10801076

10811077
argtypes[0] = INT4OID;
@@ -1213,11 +1209,11 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
12131209
char *error = NULL;
12141210

12151211
START_SPI_SNAP();
1216-
if(execute_spi("truncate schedule.at", &error) < 0)
1212+
if(execute_spi("truncate at", &error) < 0)
12171213
{
12181214
manager_fatal_error(ctx, 0, "Cannot clean 'at' table: %s", error);
12191215
}
1220-
if(execute_spi("update schedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL", &error) < 0)
1216+
if(execute_spi("update cron set _next_exec_time = NULL where _next_exec_time is not NULL", &error) < 0)
12211217
{
12221218
manager_fatal_error(ctx, 0, "Cannot clean cron _next time: %s", error);
12231219
}

0 commit comments

Comments
 (0)