Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 115358b

Browse files
committed
Merge branch 'PGPROEE9_6_scheduler' into PGPROEE9_6
2 parents 366c9ec + 872a8be commit 115358b

16 files changed

+1772
-479
lines changed

contrib/pgpro_scheduler/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ else
2020
include $(top_builddir)/src/Makefile.global
2121
include $(top_srcdir)/contrib/contrib-global.mk
2222
endif
23+
24+
#check: temp-install
25+
# $(prove_check)

contrib/pgpro_scheduler/pgpro_scheduler--2.0.sql

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATE TYPE job_status_t AS ENUM ('working', 'done', 'error');
77
CREATE TYPE job_at_status_t AS ENUM ('submitted', 'processing', 'done');
8+
CREATE SEQUENCE schedule.at_jobs_submitted_id_seq;
89

910
CREATE TABLE at_jobs_submitted(
10-
id SERIAL PRIMARY KEY,
11+
id int PRIMARY KEY,
1112
node text,
1213
name text,
1314
comments text,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTER TABLE at_jobs_done ADD reason text;
3839
ALTER TABLE at_jobs_done ADD done_time timestamp with time zone default now();
3940

41+
ALTER TABLE at_jobs_submitted ALTER id SET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATE TABLE cron(
4245
id SERIAL PRIMARY KEY,
@@ -281,7 +284,12 @@ DECLARE
281284
cron_id INTEGER;
282285
BEGIN
283286
cron_id := NEW.id;
284-
IF NOT NEW.active OR NEW.broken OR NEW.rule <> OLD.rule OR NEW.postpone <> OLD.postpone THEN
287+
IF NOT NEW.active OR NEW.broken OR
288+
coalesce(NEW.rule <> OLD.rule, true) OR
289+
coalesce(NEW.postpone <> OLD.postpone, true) OR
290+
coalesce(NEW.start_date <> OLD.start_date, true) OR
291+
coalesce(NEW.end_date <> OLD.end_date, true)
292+
THEN
285293
DELETE FROM at WHERE cron = cron_id AND active = false;
286294
END IF;
287295
RETURN OLD;
@@ -1203,9 +1211,9 @@ BEGIN
12031211
END IF;
12041212

12051213
IF usename = '___all___' THEN
1206-
sql_cmd := 'SELECT * FROM log as l , cron as cron WHERE cron.id = l.cron';
1214+
sql_cmd := 'SELECT * FROM log as l LEFT OUTER JOIN cron ON cron.id = l.cron';
12071215
ELSE
1208-
sql_cmd := 'SELECT * FROM log as l , cron as cron WHERE cron.executor = ''' || usename || ''' AND cron.id = l.cron';
1216+
sql_cmd := 'SELECT * FROM log as l LEFT OUTER JOIN cron ON cron.executor = ''' || usename || ''' AND cron.id = l.cron';
12091217
END IF;
12101218

12111219
FOR ii IN EXECUTE sql_cmd LOOP
@@ -1338,7 +1346,7 @@ CREATE VIEW all_job_status AS
13381346
attempt, resubmit_limit, postpone as max_wait_interval,
13391347
max_run_time as max_duration, submit_time,
13401348
start_time, status as is_success, reason as error, done_time,
1341-
'processing'::job_at_status_t status
1349+
'done'::job_at_status_t status
13421350
FROM schedule.at_jobs_done
13431351
UNION
13441352
SELECT

contrib/pgpro_scheduler/src/char_array.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ char_array_t *makeCharArray(void)
2525

2626
char_array_t *sortCharArray(char_array_t *a)
2727
{
28+
if(a->n <= 1) return a;
2829
qsort(a->data, a->n, sizeof(char *), __sort_char_string);
2930

3031
return a;

contrib/pgpro_scheduler/src/memutils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ void *worker_alloc(Size size)
2929

3030
void delete_worker_mem_ctx(void)
3131
{
32+
MemoryContextSwitchTo(TopMemoryContext);
3233
MemoryContextDelete(SchedulerWorkerContext);
3334
}

contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ char_array_t *readBasesToCheck(void)
212212
pgstat_report_activity(STATE_RUNNING, "read configuration");
213213
result = makeCharArray();
214214

215-
value = GetConfigOption("schedule.database", 1, 0);
215+
value = GetConfigOption("schedule.database", true, false);
216216
if(!value || strlen(value) == 0)
217217
{
218218
return result;
@@ -254,6 +254,7 @@ char_array_t *readBasesToCheck(void)
254254
pfree(clean_value);
255255
if(names->n == 0)
256256
{
257+
destroyCharArray(names);
257258
return result;
258259
}
259260

@@ -264,45 +265,39 @@ char_array_t *readBasesToCheck(void)
264265
appendStringInfo(&sql, "'%s'", names->data[i]);
265266
if(i + 1 != names->n) appendStringInfo(&sql, ",");
266267
}
268+
destroyCharArray(names);
267269
appendStringInfo(&sql, ")");
268-
SetCurrentStatementStartTimestamp();
269-
StartTransactionCommand();
270-
SPI_connect();
271-
PushActiveSnapshot(GetTransactionSnapshot());
270+
271+
START_SPI_SNAP();
272272

273273
ret = SPI_execute(sql.data, true, 0);
274274
if (ret != SPI_OK_SELECT)
275275
{
276-
SPI_finish();
277-
PopActiveSnapshot();
278-
CommitTransactionCommand();
276+
STOP_SPI_SNAP();
277+
elog(ERROR, "cannot select from pg_database");
279278
}
280-
destroyCharArray(names);
281279
processed = SPI_processed;
282280
if(processed == 0)
283281
{
284-
SPI_finish();
285-
PopActiveSnapshot();
286-
CommitTransactionCommand();
282+
STOP_SPI_SNAP();
287283
return result;
288284
}
289285
for(i=0; i < processed; i++)
290286
{
291287
clean_value = SPI_getvalue(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1);
292288
pushCharArray(result, clean_value);
293289
}
294-
SPI_finish();
295-
PopActiveSnapshot();
296-
CommitTransactionCommand();
290+
STOP_SPI_SNAP();
297291
sortCharArray(result);
292+
298293
return result;
299294
}
300295

301296
void parent_scheduler_main(Datum arg)
302297
{
303298
int rc = 0, i;
304299
char_array_t *names = NULL;
305-
schd_managers_poll_t *poll;
300+
schd_managers_poll_t *pool;
306301
schd_manager_share_t *shared;
307302
bool refresh = false;
308303

@@ -319,10 +314,10 @@ void parent_scheduler_main(Datum arg)
319314

320315
BackgroundWorkerInitializeConnection("postgres", NULL);
321316
names = readBasesToCheck();
322-
poll = initSchedulerManagerPool(names);
317+
pool = initSchedulerManagerPool(names);
323318
destroyCharArray(names);
324319

325-
set_supervisor_pgstatus(poll);
320+
set_supervisor_pgstatus(pool);
326321

327322
while(!got_sigterm)
328323
{
@@ -334,62 +329,62 @@ void parent_scheduler_main(Datum arg)
334329
ProcessConfigFile(PGC_SIGHUP);
335330
refresh = false;
336331
names = NULL;
337-
if(is_scheduler_enabled() != poll->enabled)
332+
if(is_scheduler_enabled() != pool->enabled)
338333
{
339-
if(poll->enabled)
334+
if(pool->enabled)
340335
{
341-
poll->enabled = false;
342-
stopAllManagers(poll);
343-
set_supervisor_pgstatus(poll);
336+
pool->enabled = false;
337+
stopAllManagers(pool);
338+
set_supervisor_pgstatus(pool);
344339
}
345340
else
346341
{
347342
refresh = true;
348-
poll->enabled = true;
343+
pool->enabled = true;
349344
names = readBasesToCheck();
350345
}
351346
}
352-
else if(poll->enabled)
347+
else if(pool->enabled)
353348
{
354349
names = readBasesToCheck();
355-
if(isBaseListChanged(names, poll)) refresh = true;
350+
if(isBaseListChanged(names, pool)) refresh = true;
356351
else destroyCharArray(names);
357352
}
358353

359354
if(refresh)
360355
{
361-
refreshManagers(names, poll);
362-
set_supervisor_pgstatus(poll);
356+
refreshManagers(names, pool);
357+
set_supervisor_pgstatus(pool);
363358
destroyCharArray(names);
364359
}
365360
}
366361
else
367362
{
368-
for(i=0; i < poll->n; i++)
363+
for(i=0; i < pool->n; i++)
369364
{
370-
shared = dsm_segment_address(poll->workers[i]->shared);
365+
shared = dsm_segment_address(pool->workers[i]->shared);
371366

372367
if(shared->setbychild)
373368
{
374-
/* elog(LOG, "got status change from: %s", poll->workers[i]->dbname); */
369+
/* elog(LOG, "got status change from: %s", pool->workers[i]->dbname); */
375370
shared->setbychild = false;
376371
if(shared->status == SchdManagerConnected)
377372
{
378-
poll->workers[i]->connected = true;
373+
pool->workers[i]->connected = true;
379374
}
380375
else if(shared->status == SchdManagerQuit)
381376
{
382-
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, true);
383-
set_supervisor_pgstatus(poll);
377+
removeManagerFromPoll(pool, pool->workers[i]->dbname, 1, true);
378+
set_supervisor_pgstatus(pool);
384379
}
385380
else if(shared->status == SchdManagerDie)
386381
{
387-
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, false);
388-
set_supervisor_pgstatus(poll);
382+
removeManagerFromPoll(pool, pool->workers[i]->dbname, 1, false);
383+
set_supervisor_pgstatus(pool);
389384
}
390385
else
391386
{
392-
elog(WARNING, "manager: %s set strange status: %d", poll->workers[i]->dbname, shared->status);
387+
elog(WARNING, "manager: %s set strange status: %d", pool->workers[i]->dbname, shared->status);
393388
}
394389
}
395390
}
@@ -399,7 +394,7 @@ void parent_scheduler_main(Datum arg)
399394
CHECK_FOR_INTERRUPTS();
400395
ResetLatch(MyLatch);
401396
}
402-
stopAllManagers(poll);
397+
stopAllManagers(pool);
403398
delete_worker_mem_ctx();
404399

405400
proc_exit(0);
@@ -413,7 +408,7 @@ pg_scheduler_startup(void)
413408
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
414409
BGWORKER_BACKEND_DATABASE_CONNECTION;
415410
worker.bgw_start_time = BgWorkerStart_ConsistentState;
416-
worker.bgw_restart_time = BGW_NEVER_RESTART;
411+
worker.bgw_restart_time = 10;
417412
worker.bgw_main = NULL;
418413
worker.bgw_notify_pid = 0;
419414
worker.bgw_main_arg = Int32GetDatum(0);

contrib/pgpro_scheduler/src/sched_manager_poll.c

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status = palloc(sizeof(char) * 9);
46+
status = worker_alloc(sizeof(char) * 9);
4747
memcpy(status, "disabled", 8);
4848
status[8] = 0;
4949
return status;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len = dbnames ? strlen(dbnames): 0;
5353
if(len == 0)
5454
{
55-
status = palloc(sizeof(char)*26);
55+
status = worker_alloc(sizeof(char)*26);
5656
memcpy(status, "waiting databases to set", 25);
5757
status[25] = 0;
5858
}
5959
else
6060
{
61-
status = palloc(sizeof(char) * (len + 10));
61+
status = worker_alloc(sizeof(char) * (len + 10));
6262
memcpy(status, "work on: ", 9);
6363
memcpy(status+9, dbnames, len);
6464
status[len+9] = 0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i < (poll->n - 1))
8383
appendStringInfo(&string, ", ");
8484
}
85-
out = palloc(sizeof(char) * (string.len + 1));
85+
out = worker_alloc(sizeof(char) * (string.len + 1));
8686
memcpy(out, string.data, string.len);
8787
out[string.len] = 0;
8888
pfree(string.data);
@@ -297,29 +297,18 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
297297
schd_manager_t *man;
298298
schd_manager_share_t *shm_data;
299299
Size segsize;
300-
/* shm_toc_estimator e;
301-
shm_toc *toc; */
302300
dsm_segment *seg;
303301

304-
/* shm_toc_initialize_estimator(&e);
305-
shm_toc_estimate_chunk(&e, sizeof(schd_manager_share_t));
306-
shm_toc_estimate_keys(&e, 1);
307-
segsize = shm_toc_estimate(&e); */
308302
segsize = (Size)sizeof(schd_manager_share_t);
309303

310304
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
311305
seg = dsm_create(segsize, 0);
312306

313-
man = palloc(sizeof(schd_manager_t));
314-
man->dbname = palloc(sizeof(char *) * (strlen(name) + 1));
307+
man = worker_alloc(sizeof(schd_manager_t));
308+
man->dbname = worker_alloc(sizeof(char *) * (strlen(name) + 1));
315309
man->connected = false;
316310
memcpy(man->dbname, name, strlen(name) + 1);
317311
man->shared = seg;
318-
/* toc = shm_toc_create(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(man->shared),
319-
segsize);
320-
321-
shm_data = shm_toc_allocate(toc, sizeof(schd_manager_share_t));
322-
shm_toc_insert(toc, 0, shm_data); */
323312
shm_data = dsm_segment_address(man->shared);
324313

325314
shm_data->setbyparent = true;
@@ -331,7 +320,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331320
pos = poll->n++;
332321
poll->workers = poll->workers ?
333322
repalloc(poll->workers, sizeof(schd_manager_t *) * poll->n):
334-
palloc(sizeof(schd_manager_t *));
323+
worker_alloc(sizeof(schd_manager_t *));
335324
poll->workers[pos] = man;
336325
if(sort) _sortPollManagers(poll);
337326

0 commit comments

Comments
 (0)