Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 267ed06

Browse files
author
Vladimir Ershov
committed
fix sleepy
1 parent 4f3de81 commit 267ed06

File tree

3 files changed

+11
-40
lines changed

3 files changed

+11
-40
lines changed

src/scheduler_executor.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -671,12 +671,12 @@ void at_executor_worker_main(Datum arg)
671671

672672
if(lets_sleep)
673673
{
674-
elog(LOG, "sleeping");
675674
pgstat_report_activity(STATE_IDLE, "waiting for a job");
676675
rc = WaitLatch(MyLatch,
677676
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, 1000L);
678677
ResetLatch(MyLatch);
679678
if(rc && rc & WL_POSTMASTER_DEATH) break;
679+
lets_sleep = false;
680680
}
681681
}
682682

@@ -711,15 +711,17 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
711711

712712
if(!job)
713713
{
714-
STOP_SPI_SNAP();
715714
if(error)
716715
{
717716
shared->status = SchdExecutorIdling;
718717
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
719718
"Cannot get job: %s", error);
719+
elog(LOG, "AT EXECUTOR: ERROR: %s", error);
720720
pfree(error);
721+
ABORT_SPI_SNAP();
721722
return -1;
722723
}
724+
STOP_SPI_SNAP();
723725
shared->status = SchdExecutorIdling;
724726
return 0;
725727
}
@@ -784,7 +786,6 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
784786
sprintf(buff, "error in command: code: %d", ret);
785787
set_at_job_done(job, buff, resubmit_current_job);
786788
}
787-
788789
}
789790
else
790791
{

src/scheduler_manager.c

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -681,33 +681,19 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
681681
schd_executor_share_t *sdata;
682682
PGPROC *worker;
683683
bool started = false;
684-
struct timeval tv;
685-
double begin, elapsed;
686684

687685
p = job->type == CronJob ? &(ctx->cron) : &(ctx->at);
688686
if(p->free == 0)
689687
{
690688
return -1;
691689
}
692690

693-
SetCurrentStatementStartTimestamp();
694-
695-
gettimeofday(&tv, NULL);
696-
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
691+
START_SPI_SNAP();
697692

698-
StartTransactionCommand();
699-
PushActiveSnapshot(GetTransactionSnapshot());
700-
SPI_connect();
701693
ret = job->type == CronJob ?
702694
set_cron_job_started(job): set_at_job_started(job);
703-
SPI_finish();
704-
PopActiveSnapshot();
705-
706-
gettimeofday(&tv, NULL);
707-
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
708-
elog(LOG, "move job %f", elapsed);
709-
CommitTransactionCommand();
710695

696+
START_SPI_SNAP();
711697

712698
if(ret)
713699
{
@@ -878,8 +864,6 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
878864
char *ts;
879865
scheduler_manager_pool_t *p;
880866
TimestampTz *check_time;
881-
struct timeval tv;
882-
double begin, elapsed;
883867

884868
if(type == CronJob)
885869
{
@@ -900,16 +884,9 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
900884
return 1;
901885
}
902886

903-
gettimeofday(&tv, NULL);
904-
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
905887

906888
jobs = get_jobs_to_do(ctx->nodename, type, &njobs, &is_error, p->free);
907889

908-
gettimeofday(&tv, NULL);
909-
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
910-
elog(LOG, "get todo %d = %f", type, elapsed);
911-
912-
913890
nwaiting = njobs;
914891
if(is_error)
915892
{
@@ -929,19 +906,16 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
929906
N = p->free;
930907
if(N > nwaiting) N = nwaiting;
931908

932-
/* START_SPI_SNAP(); */
933909

934910
for(i = start_i; i < N + start_i; i++)
935911
{
936912
ni = type == CronJob ?
937913
how_many_instances_on_work(ctx, &(jobs[i])): 100000;
938914
if(type == CronJob && ni >= jobs[i].max_instances)
939915
{
940-
/* START_SPI_SNAP(); */
941916
set_job_error(&jobs[i], "max instances limit reached");
942917
move_job_to_log(&jobs[i], false, false);
943918
destroy_job(&jobs[i], 0);
944-
/* STOP_SPI_SNAP(); */
945919
jobs[i].cron_id = -1;
946920
}
947921
else
@@ -964,21 +938,13 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
964938
elog(ERROR, "Cannot set job to free slot type=%d, id=%d",
965939
jobs[i].type, jobs[i].cron_id);
966940
}
967-
/* START_SPI_SNAP(); */
968941
move_job_to_log(&jobs[i], false, false);
969942
destroy_job(&jobs[i], 0);
970943
jobs[i].cron_id = -1;
971-
/* STOP_SPI_SNAP(); */
972944
}
973-
gettimeofday(&tv, NULL);
974-
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
975-
elog(LOG, "get todo %d set one job = %f", type, elapsed);
976945
}
977946
}
978947

979-
/* STOP_SPI_SNAP(); */
980-
981-
982948
if(N < nwaiting)
983949
{
984950
start_i += N;
@@ -1597,7 +1563,7 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
15971563
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
15981564
sprintf(worker.bgw_library_name, "pgpro_scheduler");
15991565
sprintf(worker.bgw_function_name, "at_executor_worker_main");
1600-
snprintf(worker.bgw_name, BGW_MAXLEN, "scheduler executor %s", shm_data->database);
1566+
snprintf(worker.bgw_name, BGW_MAXLEN, "scheduler at-executor %s", shm_data->database);
16011567
worker.bgw_notify_pid = MyProcPid;
16021568

16031569
if(!RegisterDynamicBackgroundWorker(&worker, &(item->handler)))

src/scheduler_spi_utils.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ Datum select_onedatumvalue_sql(const char *sql, bool *is_null)
246246
int ret;
247247
Datum datum = 0;
248248

249+
SetCurrentStatementStartTimestamp();
249250
ret = SPI_execute(sql, true, 0);
250251
if(ret == SPI_OK_SELECT)
251252
{
@@ -266,6 +267,7 @@ int select_count_with_args(const char *sql, int n, Oid *argtypes, Datum *values,
266267
Datum datum;
267268
bool is_null;
268269

270+
SetCurrentStatementStartTimestamp();
269271
ret = SPI_execute_with_args(sql, n, argtypes, values, nulls, true, 0);
270272
if(ret == SPI_OK_SELECT)
271273
{
@@ -303,6 +305,7 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
303305

304306
PG_TRY();
305307
{
308+
SetCurrentStatementStartTimestamp();
306309
ret = SPI_execute_with_args(sql, n, argtypes, values, nulls, false, 0);
307310
}
308311
PG_CATCH();
@@ -389,6 +392,7 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
389392
plan = SPI_prepare(sql, nparams, paramtypes);
390393
if(plan)
391394
{
395+
SetCurrentStatementStartTimestamp();
392396
ret = SPI_execute_plan(plan, values, NULL, false, 0);
393397
}
394398
}

0 commit comments

Comments
 (0)