Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1340976

Browse files
author
Vladimir Ershov
committed
fix error messages
set session authorization fix transaction handling
1 parent fad2e22 commit 1340976

File tree

6 files changed

+120
-41
lines changed

6 files changed

+120
-41
lines changed

src/scheduler_executor.c

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ void executor_worker_main(Datum arg)
6161
bool success = true;
6262
executor_error_t EE;
6363
int ret;
64-
char *error;
64+
char *error = NULL;
6565
bool use_pg_vars = true;
66+
schd_executor_status_t status;
6667
/* int rc = 0; */
6768

6869
EE.n = 0;
@@ -82,7 +83,7 @@ void executor_worker_main(Datum arg)
8283
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8384
errmsg("executor corrupted dynamic shared memory segment")));
8485
}
85-
shared->status = SchdExecutorWork;
86+
status = shared->status = SchdExecutorWork;
8687
SetConfigOption("application_name", "pgp-s executor", PGC_USERSET, PGC_S_SESSION);
8788
pgstat_report_activity(STATE_RUNNING, "initialize");
8889
init_worker_mem_ctx("ExecutorMemoryContext");
@@ -92,9 +93,28 @@ void executor_worker_main(Datum arg)
9293
job = initializeExecutorJob(shared);
9394
if(!job)
9495
{
95-
shared->status = SchdExecutorError;
9696
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
9797
"Cannot retrive job information");
98+
shared->status = SchdExecutorError;
99+
delete_worker_mem_ctx();
100+
dsm_detach(seg);
101+
proc_exit(0);
102+
}
103+
104+
if(set_session_authorization(job->executor, &error) < 0)
105+
{
106+
if(error)
107+
{
108+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
109+
"Cannot set session auth: %s", error);
110+
pfree(error);
111+
}
112+
else
113+
{
114+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
115+
"Cannot set session auth: unknown error");
116+
}
117+
shared->status = SchdExecutorError;
98118
delete_worker_mem_ctx();
99119
dsm_detach(seg);
100120
proc_exit(0);
@@ -116,13 +136,15 @@ void executor_worker_main(Datum arg)
116136
{
117137
pgstat_report_activity(STATE_RUNNING, job->dosql[i]);
118138
CHECK_FOR_INTERRUPTS();
119-
if(!job->same_transaction) START_SPI_SNAP();
120-
139+
if(!job->same_transaction)
140+
{
141+
START_SPI_SNAP();
142+
}
121143
ret = execute_spi(job->dosql[i], &error);
122144
if(ret < 0)
123145
{
124146
success = false;
125-
shared->status = SchdExecutorError;
147+
status = SchdExecutorError;
126148
if(error)
127149
{
128150
push_executor_error(&EE, "error on %d: %s", i+1, error);
@@ -139,13 +161,19 @@ void executor_worker_main(Datum arg)
139161
}
140162
else
141163
{
142-
if(!job->same_transaction) STOP_SPI_SNAP();
164+
if(!job->same_transaction)
165+
{
166+
STOP_SPI_SNAP();
167+
}
143168
}
144169
}
145-
if(shared->status != SchdExecutorError)
170+
if(status != SchdExecutorError)
146171
{
147-
if(job->same_transaction) STOP_SPI_SNAP();
148-
shared->status = SchdExecutorDone;
172+
if(job->same_transaction)
173+
{
174+
STOP_SPI_SNAP();
175+
}
176+
status = SchdExecutorDone;
149177
}
150178
if(job->next_time_statement)
151179
{
@@ -161,12 +189,46 @@ void executor_worker_main(Datum arg)
161189
{
162190
set_shared_message(shared, &EE);
163191
}
192+
shared->status = status;
164193

165194
delete_worker_mem_ctx();
166195
dsm_detach(seg);
167196
proc_exit(0);
168197
}
169198

199+
int set_session_authorization(char *username, char **error)
200+
{
201+
Oid types[1] = { TEXTOID };
202+
Oid useroid;
203+
Datum values[1];
204+
bool is_superuser;
205+
int ret;
206+
char *sql = "select usesysid, usesuper from pg_catalog.pg_user where usename = $1";
207+
char buff[1024];
208+
209+
values[0] = CStringGetTextDatum(username);
210+
START_SPI_SNAP();
211+
ret = execute_spi_sql_with_args(sql, 1, types, values, NULL, error);
212+
213+
if(ret < 0) return ret;
214+
if(SPI_processed == 0)
215+
{
216+
STOP_SPI_SNAP();
217+
sprintf(buff, "Cannot find user with name: %s", username);
218+
*error = _copy_string(buff);
219+
220+
return -200;
221+
}
222+
useroid = get_oid_from_spi(0, 1, 0);
223+
is_superuser = get_boolean_from_spi(0, 2, false);
224+
225+
STOP_SPI_SNAP();
226+
227+
SetSessionAuthorization(useroid, is_superuser);
228+
229+
return 1;
230+
}
231+
170232
void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee)
171233
{
172234
int left = PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX;

src/scheduler_executor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ typedef enum {
1717
typedef struct {
1818
char database[PGPRO_SCHEDULER_DBNAME_MAX];
1919
char nodename[PGPRO_SCHEDULER_NODENAME_MAX];
20+
char user[NAMEDATALEN];
2021

2122
int cron_id;
2223
TimestampTz start_at;
@@ -41,6 +42,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee);
4142
int executor_onrollback(job_t *job, executor_error_t *ee);
4243
void set_pg_var(bool resulti, executor_error_t *ee);
4344
int push_executor_error(executor_error_t *e, char *fmt, ...) __attribute__ ((format (gnu_printf, 2, 3)));
45+
int set_session_authorization(char *username, char **error);
4446

4547

4648
#endif

src/scheduler_job.c

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "lib/stringinfo.h"
88
#include "scheduler_spi_utils.h"
99
#include "utils/timestamp.h"
10+
#include "utils/builtins.h"
1011
#include "memutils.h"
1112

1213
job_t *init_scheduler_job(job_t *j)
@@ -22,12 +23,13 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2223
{
2324
job_t *jobs = NULL;
2425
int ret, got, i;
25-
Oid argtypes[1] = { CSTRINGOID };
26+
Oid argtypes[1] = { TEXTOID };
2627
Datum values[1];
27-
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1::text";
28+
const char *get_job_sql = "select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2829

2930
*is_error = *n = 0;
30-
values[0] = CStringGetDatum(nodename);
31+
START_SPI_SNAP();
32+
values[0] = CStringGetTextDatum(nodename);
3133
ret = SPI_execute_with_args(get_job_sql, 1, argtypes, values, NULL, true, 0);
3234
if(ret == SPI_OK_SELECT)
3335
{
@@ -45,13 +47,15 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
4547
jobs[i].timelimit = get_interval_seconds_from_spi(i, 4, 0);
4648
jobs[i].max_instances = get_int_from_spi(i, 5, 1);
4749
jobs[i].node = _copy_string(nodename);
50+
jobs[i].executor = get_text_from_spi(i, 6);
4851
}
4952
}
5053
}
5154
else
5255
{
5356
*is_error = 1;
5457
}
58+
STOP_SPI_SNAP();
5559
return jobs;
5660
}
5761

@@ -107,8 +111,8 @@ job_t *set_job_error(job_t *j, const char *fmt, ...)
107111
int move_job_to_log(job_t *j, bool status)
108112
{
109113
Datum values[4];
110-
char nulls[4] = { 0, 0, 0, 0 };
111-
Oid argtypes[4] = { BOOLOID, CSTRINGOID, INT4OID, TIMESTAMPTZOID };
114+
char nulls[4] = { ' ', ' ', ' ', ' ' };
115+
Oid argtypes[4] = { BOOLOID, TEXTOID, INT4OID, TIMESTAMPTZOID };
112116
int ret;
113117
const char *del_sql = "delete from schedule.at where start_at = $1 and cron = $2";
114118
const char *sql = "insert into schedule.log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message from schedule.at where cron = $3 and start_at = $4";
@@ -118,11 +122,11 @@ int move_job_to_log(job_t *j, bool status)
118122
values[0] = BoolGetDatum(status);
119123
if(j->error)
120124
{
121-
values[1] = CStringGetDatum(j->error);
125+
values[1] = CStringGetTextDatum(j->error);
122126
}
123127
else
124128
{
125-
nulls[1] = 1;
129+
nulls[1] = 'n';
126130
}
127131
values[2] = Int32GetDatum(j->cron_id);
128132
values[3] = TimestampTzGetDatum(j->start_at);
@@ -134,7 +138,7 @@ int move_job_to_log(job_t *j, bool status)
134138
argtypes[1] = INT4OID;
135139
values[0] = TimestampTzGetDatum(j->start_at);
136140
values[1] = Int32GetDatum(j->cron_id);
137-
ret = SPI_execute_with_args(del_sql, 2, argtypes, values, nulls, false, 0);
141+
ret = SPI_execute_with_args(del_sql, 2, argtypes, values, NULL, false, 0);
138142
if(ret == SPI_OK_DELETE)
139143
{
140144
return 1;

0 commit comments

Comments
 (0)