Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4f3de81

Browse files
author
Vladimir Ershov
committed
new vision on at workers
1 parent 16b5a14 commit 4f3de81

10 files changed

+472
-39
lines changed

src/pgpro_scheduler.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,29 @@ bool is_scheduler_enabled(void)
164164
return false;
165165
}
166166

167+
char *set_schema(const char *name, bool get_old)
168+
{
169+
char *schema_name = NULL;
170+
char *current = NULL;
171+
bool free_name = false;
172+
173+
if(get_old)
174+
current = _copy_string((char *)GetConfigOption("search_path", true, false));
175+
if(name)
176+
{
177+
schema_name = (char *)name;
178+
}
179+
else
180+
{
181+
schema_name = _copy_string((char *)GetConfigOption("schedule.schema", true, false));
182+
free_name = true;
183+
}
184+
SetConfigOption("search_path", schema_name, PGC_USERSET, PGC_S_SESSION);
185+
if(free_name) pfree(schema_name);
186+
187+
return current;
188+
}
189+
167190

168191
/** END of SOME UTILS **/
169192

src/pgpro_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@ int get_integer_from_string(char *s, int start, int len);
4040
TimestampTz get_timestamp_from_string(char *str);
4141
TimestampTz _round_timestamp_to_minute(TimestampTz ts);
4242
bool is_scheduler_enabled(void);
43+
char *set_schema(const char *name, bool get_old);
4344

4445
#endif

src/scheduler_executor.c

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
#include "storage/procarray.h"
1313
#include "storage/shm_toc.h"
1414
#include "catalog/pg_type.h"
15+
#include "catalog/pg_authid.h"
16+
#include "utils/syscache.h"
17+
#include "access/htup_details.h"
1518

1619
#include "pgstat.h"
1720
#include "fmgr.h"
@@ -54,6 +57,7 @@ handle_sigterm(SIGNAL_ARGS)
5457
}
5558

5659
errno = save_errno;
60+
proc_exit(0);
5761
}
5862

5963
int read_worker_job_limit(void)
@@ -598,3 +602,222 @@ resubmit(PG_FUNCTION_ARGS)
598602

599603
PG_RETURN_INT64(resubmit_current_job);
600604
}
605+
606+
/* main procedure for at command workers */
607+
608+
void at_executor_worker_main(Datum arg)
609+
{
610+
schd_executor_share_t *shared;
611+
dsm_segment *seg;
612+
int result;
613+
int rc = 0;
614+
schd_executor_status_t status;
615+
bool lets_sleep = false;
616+
/* PGPROC *parent; */
617+
double begin, elapsed;
618+
struct timeval tv;
619+
620+
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_executor");
621+
seg = dsm_attach(DatumGetInt32(arg));
622+
if(seg == NULL)
623+
ereport(ERROR,
624+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
625+
errmsg("executor unable to map dynamic shared memory segment")));
626+
shared = dsm_segment_address(seg);
627+
/* parent = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); */
628+
629+
if(shared->status != SchdExecutorInit)
630+
{
631+
ereport(ERROR,
632+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
633+
errmsg("executor corrupted dynamic shared memory segment")));
634+
}
635+
636+
SetConfigOption("application_name", "pgp-s at executor", PGC_USERSET, PGC_S_SESSION);
637+
pgstat_report_activity(STATE_RUNNING, "initialize");
638+
init_worker_mem_ctx("ExecutorMemoryContext");
639+
BackgroundWorkerInitializeConnection(shared->database, NULL);
640+
641+
pqsignal(SIGTERM, handle_sigterm);
642+
pqsignal(SIGHUP, worker_spi_sighup);
643+
BackgroundWorkerUnblockSignals();
644+
645+
while(1)
646+
{
647+
if(got_sighup)
648+
{
649+
got_sighup = false;
650+
ProcessConfigFile(PGC_SIGHUP);
651+
}
652+
CHECK_FOR_INTERRUPTS();
653+
gettimeofday(&tv, NULL);
654+
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
655+
result = process_one_job(shared, &status);
656+
gettimeofday(&tv, NULL);
657+
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
658+
elog(LOG, "job done %d = %f", result, elapsed);
659+
660+
if(result == 0)
661+
{
662+
lets_sleep = true;
663+
}
664+
else if(result < 0)
665+
{
666+
delete_worker_mem_ctx();
667+
dsm_detach(seg);
668+
proc_exit(1);
669+
}
670+
CHECK_FOR_INTERRUPTS();
671+
672+
if(lets_sleep)
673+
{
674+
elog(LOG, "sleeping");
675+
pgstat_report_activity(STATE_IDLE, "waiting for a job");
676+
rc = WaitLatch(MyLatch,
677+
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, 1000L);
678+
ResetLatch(MyLatch);
679+
if(rc && rc & WL_POSTMASTER_DEATH) break;
680+
}
681+
}
682+
683+
delete_worker_mem_ctx();
684+
dsm_detach(seg);
685+
proc_exit(0);
686+
}
687+
688+
int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
689+
{
690+
char *error = NULL;
691+
job_t *job;
692+
int ret;
693+
char buff[512];
694+
double begin, elapsed;
695+
struct timeval tv;
696+
697+
*status = shared->status = SchdExecutorWork;
698+
shared->message[0] = 0;
699+
700+
pgstat_report_activity(STATE_RUNNING, "initialize job");
701+
START_SPI_SNAP();
702+
703+
gettimeofday(&tv, NULL);
704+
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
705+
706+
job = get_next_at_job_with_lock(shared->nodename, &error);
707+
708+
gettimeofday(&tv, NULL);
709+
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
710+
elog(LOG, "got jobs = %f", elapsed);
711+
712+
if(!job)
713+
{
714+
STOP_SPI_SNAP();
715+
if(error)
716+
{
717+
shared->status = SchdExecutorIdling;
718+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
719+
"Cannot get job: %s", error);
720+
pfree(error);
721+
return -1;
722+
}
723+
shared->status = SchdExecutorIdling;
724+
return 0;
725+
}
726+
current_job_id = job->cron_id;
727+
pgstat_report_activity(STATE_RUNNING, "job initialized");
728+
729+
ResetAllOptions();
730+
if(set_session_authorization_by_name(job->executor, &error) == InvalidOid)
731+
{
732+
if(error)
733+
{
734+
set_at_job_done(job, error, 0);
735+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
736+
"Cannot set session auth: %s", error);
737+
pfree(error);
738+
}
739+
else
740+
{
741+
set_at_job_done(job, "Unknown set session auth error", 0);
742+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
743+
"Cannot set session auth: unknown error");
744+
}
745+
shared->status = SchdExecutorIdling;
746+
STOP_SPI_SNAP();
747+
return 1;
748+
}
749+
750+
pgstat_report_activity(STATE_RUNNING, "process job");
751+
CHECK_FOR_INTERRUPTS();
752+
SetConfigOption("schedule.transaction_state", "running", PGC_INTERNAL, PGC_S_SESSION);
753+
754+
if(job->timelimit)
755+
{
756+
#ifdef HAVE_LONG_INT_64
757+
sprintf(buff, "%ld", job->timelimit * 1000);
758+
#else
759+
sprintf(buff, "%lld", job->timelimit * 1000);
760+
#endif
761+
SetConfigOption("statement_timeout", buff, PGC_SUSET, PGC_S_OVERRIDE);
762+
}
763+
764+
if(job->sql_params_n > 0)
765+
{
766+
ret = execute_spi_params_prepared(job->dosql[0], job->sql_params_n, job->sql_params, &error);
767+
}
768+
else
769+
{
770+
ret = execute_spi(job->dosql[0], &error);
771+
}
772+
ResetAllOptions();
773+
SetConfigOption("enable_seqscan", "off", PGC_USERSET, PGC_S_SESSION);
774+
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
775+
if(ret < 0)
776+
{
777+
if(error)
778+
{
779+
set_at_job_done(job, error, resubmit_current_job);
780+
pfree(error);
781+
}
782+
else
783+
{
784+
sprintf(buff, "error in command: code: %d", ret);
785+
set_at_job_done(job, buff, resubmit_current_job);
786+
}
787+
788+
}
789+
else
790+
{
791+
set_at_job_done(job, NULL, resubmit_current_job);
792+
}
793+
STOP_SPI_SNAP();
794+
795+
resubmit_current_job = 0;
796+
current_job_id = -1;
797+
pgstat_report_activity(STATE_RUNNING, "finish job processing");
798+
799+
return 1;
800+
}
801+
802+
Oid set_session_authorization_by_name(char *rolename, char **error)
803+
{
804+
HeapTuple roleTup;
805+
Form_pg_authid rform;
806+
char buffer[512];
807+
Oid roleoid;
808+
809+
roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
810+
if(!HeapTupleIsValid(roleTup))
811+
{
812+
snprintf(buffer, 512, "There is no user name: %s", rolename);
813+
*error = _copy_string(buffer);
814+
return InvalidOid;
815+
}
816+
rform = (Form_pg_authid) GETSTRUCT(roleTup);
817+
roleoid = HeapTupleGetOid(roleTup);
818+
SetSessionAuthorization(roleoid, rform->rolsuper);
819+
ReleaseSysCache(roleTup);
820+
821+
return roleoid;
822+
}
823+

src/scheduler_executor.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ typedef enum {
1313
SchdExecutorDone,
1414
SchdExecutorResubmit,
1515
SchdExecutorError,
16-
SchdExecutorLimitReached
16+
SchdExecutorLimitReached,
17+
SchdExecutorIdling
1718
} schd_executor_status_t;
1819

1920
typedef struct {
@@ -55,6 +56,9 @@ int push_executor_error(executor_error_t *e, char *fmt, ...) pg_attribute_print
5556
int set_session_authorization(char *username, char **error);
5657
int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status);
5758
int read_worker_job_limit(void);
59+
void at_executor_worker_main(Datum arg);
60+
int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *status);
61+
Oid set_session_authorization_by_name(char *rolename, char **error);
5862

5963
extern Datum get_self_id(PG_FUNCTION_ARGS);
6064
extern Datum resubmit(PG_FUNCTION_ARGS);

0 commit comments

Comments
 (0)