Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f7e77f4

Browse files
author
Vladimir Ershov
committed
fix ms windows timeout
1 parent 17e9358 commit f7e77f4

7 files changed

+74
-40
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ else
2020
include $(top_builddir)/src/Makefile.global
2121
include $(top_srcdir)/contrib/contrib-global.mk
2222
endif
23+
24+
#check: temp-install
25+
# $(prove_check)

pgpro_scheduler--2.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,7 +1338,7 @@ CREATE VIEW all_job_status AS
13381338
attempt, resubmit_limit, postpone as max_wait_interval,
13391339
max_run_time as max_duration, submit_time,
13401340
start_time, status as is_success, reason as error, done_time,
1341-
'processing'::job_at_status_t status
1341+
'done'::job_at_status_t status
13421342
FROM schedule.at_jobs_done
13431343
UNION
13441344
SELECT

src/scheduler_executor.c

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,16 @@ void executor_worker_main(Datum arg)
101101

102102
SetConfigOption("application_name", "pgp-s executor", PGC_USERSET, PGC_S_SESSION);
103103
pgstat_report_activity(STATE_RUNNING, "initialize");
104-
init_worker_mem_ctx("ExecutorMemoryContext");
105104
BackgroundWorkerInitializeConnection(shared->database, NULL);
106-
worker_jobs_limit = read_worker_job_limit();
107105

108106
pqsignal(SIGTERM, handle_sigterm);
109107
pqsignal(SIGHUP, worker_spi_sighup);
110108
BackgroundWorkerUnblockSignals();
111109

110+
init_worker_mem_ctx("ExecutorMemoryContext");
111+
switch_to_worker_context();
112+
worker_jobs_limit = read_worker_job_limit();
113+
112114
while(1)
113115
{
114116
/* we need it if idle worker recieve SIGHUP an realize that it done
@@ -619,7 +621,7 @@ void at_executor_worker_main(Datum arg)
619621
bool lets_sleep = false;
620622
/* PGPROC *parent; */
621623

622-
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_executor");
624+
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_at_executor");
623625
seg = dsm_attach(DatumGetInt32(arg));
624626
if(seg == NULL)
625627
ereport(ERROR,
@@ -638,13 +640,15 @@ void at_executor_worker_main(Datum arg)
638640

639641
SetConfigOption("application_name", "pgp-s at executor", PGC_USERSET, PGC_S_SESSION);
640642
pgstat_report_activity(STATE_RUNNING, "initialize");
641-
init_worker_mem_ctx("ExecutorMemoryContext");
642643
BackgroundWorkerInitializeConnection(shared->database, NULL);
643644

644645
pqsignal(SIGTERM, handle_sigterm);
645646
pqsignal(SIGHUP, worker_spi_sighup);
646647
BackgroundWorkerUnblockSignals();
647648

649+
init_worker_mem_ctx("ExecutorMemoryContext");
650+
switch_to_worker_context();
651+
648652
while(1)
649653
{
650654
if(shared->stop_worker) break;
@@ -698,7 +702,7 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
698702

699703
*status = shared->status = SchdExecutorWork;
700704

701-
pgstat_report_activity(STATE_RUNNING, "initialize job");
705+
pgstat_report_activity(STATE_RUNNING, "initialize at job");
702706
START_SPI_SNAP();
703707

704708
job = get_next_at_job_with_lock(shared->nodename, &error);
@@ -718,6 +722,15 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
718722
return 0;
719723
}
720724
current_job_id = job->cron_id;
725+
if(!move_at_job_process(job->cron_id))
726+
{
727+
elog(LOG, "AT EXECUTOR: error move to process");
728+
ABORT_SPI_SNAP();
729+
return -1;
730+
}
731+
STOP_SPI_SNAP(); /* Commit changes */
732+
733+
START_SPI_SNAP();
721734
pgstat_report_activity(STATE_RUNNING, "job initialized");
722735

723736
ResetAllOptions();
@@ -743,13 +756,7 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
743756

744757
if(job->timelimit)
745758
{
746-
#ifdef HAVE_LONG_INT_64
747-
sprintf(buff, "%ld", job->timelimit * 1000);
748-
#else
749-
sprintf(buff, "%lld", job->timelimit * 1000);
750-
#endif
751-
SetConfigOption("statement_timeout", buff, PGC_SUSET, PGC_S_OVERRIDE);
752-
enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
759+
enable_timeout_after(STATEMENT_TIMEOUT, job->timelimit * 1000);
753760
}
754761

755762
if(job->sql_params_n > 0)

src/scheduler_job.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,15 @@ job_t *get_next_at_job_with_lock(char *nodename, char **error)
168168
Datum values[1];
169169
char *oldpath;
170170

171-
const char *get_job_sql = "select id, at, array_append('{}'::text[], do_sql)::text[], params, executor, attempt, resubmit_limit, max_run_time from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
172-
oldpath = set_schema(NULL, true);
171+
const char *get_job_sql = "select id, at, array_append('{}'::text[], do_sql)::text[], params, executor, attempt, resubmit_limit, max_run_time from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
172+
oldpath = set_schema(NULL, true);
173173
*error = NULL;
174174
values[0] = CStringGetTextDatum(nodename);
175175

176-
ret = execute_spi_sql_with_args(get_job_sql, 1,
176+
ret = execute_spi_sql_with_args(get_job_sql, 1,
177177
argtypes, values, NULL, error);
178-
179178
set_schema(oldpath, false);
180-
pfree(oldpath);
179+
pfree(oldpath);
181180
if(ret == SPI_OK_SELECT)
182181
{
183182
got = SPI_processed;
@@ -401,6 +400,22 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
401400
return ret > 0 ? 1: ret;
402401
}
403402

403+
int move_at_job_process(int job_id)
404+
{
405+
const char *sql = "WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_process SELECT * FROM moved_rows";
406+
Datum values[1];
407+
Oid argtypes[1] = { INT4OID };
408+
int ret;
409+
char *oldpath;
410+
411+
values[0] = Int32GetDatum(job_id);
412+
oldpath = set_schema(NULL, true);
413+
ret = SPI_execute_with_args(sql, 1, argtypes, values, NULL, false, 0);
414+
set_schema(oldpath, false);
415+
pfree(oldpath);
416+
return ret > 0 ? 1: ret;
417+
}
418+
404419
int set_at_job_done(job_t *job, char *error, int64 resubmit)
405420
{
406421
char *this_error = NULL;
@@ -412,8 +427,9 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit)
412427
const char *sql;
413428
int n = 3;
414429

415-
const char *sql_submitted = "WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
416-
const char *resubmit_sql = "update ONLY at_jobs_submitted SET attempt = attempt + 1, at = $2 WHERE id = $1";
430+
const char *sql_submitted = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
431+
/* const char *resubmit_sql = "update ONLY at_jobs_submitted SET attempt = attempt + 1, at = $2 WHERE id = $1"; */
432+
const char *resubmit_sql = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, canceled, submit_time FROM moved_rows";
417433

418434
values[0] = Int32GetDatum(job->cron_id);
419435

src/scheduler_job.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ int _at_move_job_to_log(job_t *j, bool status, bool processed);
5858
int resubmit_at_job(job_t *j, TimestampTz next);
5959
job_t *get_next_at_job_with_lock(char *nodename, char **error);
6060
int set_at_job_done(job_t *job, char *error, int64 resubmit);
61+
int move_at_job_process(int job_id);
6162

6263
#endif
6364

src/scheduler_manager.c

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,20 +1601,18 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16011601
MemoryContext old;
16021602
scheduler_manager_slot_t *item;
16031603

1604-
item = worker_alloc(sizeof(scheduler_manager_slot_t));
1605-
item->job = NULL;
1606-
item->started = item->worker_started = GetCurrentTimestamp();
1607-
item->wait_worker_to_die = false;
1608-
item->stop_it = 0;
1604+
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
16091605

16101606
pgstat_report_activity(STATE_RUNNING, "register scheduler at executor");
1611-
16121607
segsize = (Size)sizeof(schd_executor_share_state_t);
1613-
1614-
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
1615-
old = MemoryContextSwitchTo(SchedulerWorkerContext);
16161608
seg = dsm_create(segsize, 0);
16171609

1610+
old = MemoryContextSwitchTo(SchedulerWorkerContext);
1611+
item = worker_alloc(sizeof(scheduler_manager_slot_t));
1612+
item->job = NULL;
1613+
item->started = item->worker_started = GetCurrentTimestamp();
1614+
item->wait_worker_to_die = false;
1615+
item->stop_it = 0;
16181616
item->shared = seg;
16191617
shm_data = dsm_segment_address(item->shared);
16201618

@@ -1640,9 +1638,9 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16401638
elog(LOG, "Cannot register AT executor worker for db: %s",
16411639
shm_data->database);
16421640
dsm_detach(item->shared);
1643-
MemoryContextSwitchTo(old);
16441641
pfree(item);
16451642
ctx->at.slots[pos] = NULL;
1643+
MemoryContextSwitchTo(old);
16461644
return 0;
16471645
}
16481646
status = WaitForBackgroundWorkerStartup(item->handler, &(item->pid));
@@ -1651,13 +1649,13 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16511649
elog(LOG, "Cannot start AT executor worker for db: %s, status: %d",
16521650
shm_data->database, status);
16531651
dsm_detach(item->shared);
1654-
MemoryContextSwitchTo(old);
16551652
pfree(item);
16561653
ctx->at.slots[pos] = NULL;
1654+
MemoryContextSwitchTo(old);
16571655
return 0;
16581656
}
1659-
MemoryContextSwitchTo(old);
16601657
ctx->at.slots[pos] = item;
1658+
MemoryContextSwitchTo(old);
16611659

16621660
return item->pid;
16631661
}

src/scheduler_spi_utils.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,23 +296,27 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
296296
{
297297
int ret = -100;
298298
ErrorData *edata;
299-
MemoryContext old;
300299
int errorSet = 0;
301300
char other[100];
301+
ResourceOwner oldowner = CurrentResourceOwner;
302302

303303
*error = NULL;
304304

305305
SetCurrentStatementStartTimestamp();
306306
BeginInternalSubTransaction(NULL);
307+
switch_to_worker_context();
308+
307309
PG_TRY();
308310
{
309311
ret = SPI_execute_with_args(sql, n, argtypes, values, nulls, false, 0);
310312
ReleaseCurrentSubTransaction();
311-
SPI_restore_connection();
313+
switch_to_worker_context();
314+
CurrentResourceOwner = oldowner;
315+
SPI_restore_connection();
312316
}
313317
PG_CATCH();
314318
{
315-
old = switch_to_worker_context();
319+
switch_to_worker_context();
316320

317321
edata = CopyErrorData();
318322
if(edata->message)
@@ -329,9 +333,10 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
329333
}
330334
errorSet = 1;
331335
RollbackAndReleaseCurrentSubTransaction();
332-
SPI_restore_connection();
336+
switch_to_worker_context();
337+
CurrentResourceOwner = oldowner;
338+
SPI_restore_connection();
333339
FreeErrorData(edata);
334-
MemoryContextSwitchTo(old);
335340
FlushErrorState();
336341
}
337342
PG_END_TRY();
@@ -373,13 +378,13 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
373378
{
374379
int ret = -100;
375380
ErrorData *edata;
376-
MemoryContext old;
377381
int errorSet = 0;
378382
char other[100];
379383
SPIPlanPtr plan;
380384
Oid *paramtypes;
381385
Datum *values;
382386
int i;
387+
ResourceOwner oldowner = CurrentResourceOwner;
383388

384389
*error = NULL;
385390

@@ -393,6 +398,7 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
393398

394399
SetCurrentStatementStartTimestamp();
395400
BeginInternalSubTransaction(NULL);
401+
switch_to_worker_context();
396402

397403
PG_TRY();
398404
{
@@ -403,11 +409,13 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
403409
ret = SPI_execute_plan(plan, values, NULL, false, 0);
404410
}
405411
ReleaseCurrentSubTransaction();
412+
switch_to_worker_context();
413+
CurrentResourceOwner = oldowner;
406414
SPI_restore_connection();
407415
}
408416
PG_CATCH();
409417
{
410-
old = switch_to_worker_context();
418+
switch_to_worker_context();
411419

412420
edata = CopyErrorData();
413421
if(edata->message)
@@ -424,9 +432,10 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
424432
}
425433
errorSet = 1;
426434
FreeErrorData(edata);
427-
MemoryContextSwitchTo(old);
428435
FlushErrorState();
429436
RollbackAndReleaseCurrentSubTransaction();
437+
CurrentResourceOwner = oldowner;
438+
switch_to_worker_context();
430439
SPI_restore_connection();
431440
}
432441
PG_END_TRY();

0 commit comments

Comments
 (0)