Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b461eca

Browse files
author
Vladimir Ershov
committed
начало документации
остановка и запуск планировщика
1 parent 9fbb5eb commit b461eca

File tree

8 files changed

+575
-26
lines changed

8 files changed

+575
-26
lines changed

README.md

Lines changed: 470 additions & 0 deletions
Large diffs are not rendered by default.

src/pgpro_scheduler.c

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ PG_MODULE_MAGIC;
3636
volatile sig_atomic_t got_sighup = false;
3737
volatile sig_atomic_t got_sigterm = false;
3838

39-
40-
static char *sched_databases = "";
41-
static char *sched_nodename = "master";
42-
static char *sched_transaction_state = "undefined";
43-
static int sched_max_workers = 2;
39+
/* Custom GUC variables */
40+
static char *scheduler_databases = "";
41+
static char *scheduler_nodename = "master";
42+
static char *scheduler_transaction_state = "undefined";
43+
static int scheduler_max_workers = 2;
44+
static bool scheduler_service_enabled = false;
45+
/* Custom GUC done */
4446

4547
extern void
4648
worker_spi_sighup(SIGNAL_ARGS)
@@ -127,6 +129,15 @@ TimestampTz _round_timestamp_to_minute(TimestampTz ts)
127129
#endif
128130
}
129131

132+
bool is_scheduler_enabled(void)
133+
{
134+
const char *opt;
135+
136+
opt = GetConfigOption("schedule.enabled", false, true);
137+
if(memcmp(opt, "on", 2) == 0) return true;
138+
return false;
139+
}
140+
130141

131142
/** END of SOME UTILS **/
132143

@@ -242,9 +253,10 @@ char_array_t *readBasesToCheck(void)
242253
void parent_scheduler_main(Datum arg)
243254
{
244255
int rc = 0, i;
245-
char_array_t *names;
256+
char_array_t *names = NULL;
246257
schd_managers_poll_t *poll;
247258
schd_manager_share_t *shared;
259+
bool refresh = false;
248260

249261
init_worker_mem_ctx("Parent scheduler context");
250262

@@ -270,17 +282,36 @@ void parent_scheduler_main(Datum arg)
270282
{
271283
got_sighup = false;
272284
ProcessConfigFile(PGC_SIGHUP);
273-
names = readBasesToCheck();
274-
if(isBaseListChanged(names, poll))
285+
refresh = false;
286+
names = NULL;
287+
if(is_scheduler_enabled() != poll->enabled)
288+
{
289+
if(poll->enabled)
290+
{
291+
poll->enabled = false;
292+
stopAllManagers(poll);
293+
set_supervisor_pgstatus(poll);
294+
}
295+
else
296+
{
297+
refresh = true;
298+
poll->enabled = true;
299+
names = readBasesToCheck();
300+
}
301+
}
302+
else
303+
{
304+
names = readBasesToCheck();
305+
if(isBaseListChanged(names, poll)) refresh = true;
306+
else destroyCharArray(names);
307+
}
308+
309+
if(refresh)
275310
{
276311
refreshManagers(names, poll);
277312
set_supervisor_pgstatus(poll);
313+
destroyCharArray(names);
278314
}
279-
destroyCharArray(names);
280-
}
281-
else if(got_sigterm)
282-
{
283-
/* TODO STOP ALL MANAGERS */
284315
}
285316
else
286317
{
@@ -341,7 +372,7 @@ pg_scheduler_startup(void)
341372
worker.bgw_main_arg = 0;
342373
strcpy(worker.bgw_name, "pgpro scheduler");
343374

344-
elog(LOG, "Register WORKER");
375+
/* elog(LOG, "Register WORKER"); */
345376

346377

347378
RegisterBackgroundWorker(&worker);
@@ -354,9 +385,9 @@ void _PG_init(void)
354385
"schedule.database",
355386
"On which databases scheduler could be run",
356387
NULL,
357-
&sched_databases,
388+
&scheduler_databases,
358389
"",
359-
PGC_SUSET,
390+
PGC_SIGHUP,
360391
0,
361392
NULL,
362393
NULL,
@@ -366,9 +397,9 @@ void _PG_init(void)
366397
"schedule.nodename",
367398
"The name of scheduler node",
368399
NULL,
369-
&sched_nodename,
400+
&scheduler_nodename,
370401
"master",
371-
PGC_SUSET,
402+
PGC_SIGHUP,
372403
0,
373404
NULL,
374405
NULL,
@@ -378,7 +409,7 @@ void _PG_init(void)
378409
"schedule.transaction_state",
379410
"State of scheduler executor transaction",
380411
"If not under scheduler executor process the variable has no mean and has a value = 'undefined', possible values: progress, success, failure",
381-
&sched_transaction_state ,
412+
&scheduler_transaction_state ,
382413
"undefined",
383414
PGC_INTERNAL,
384415
0,
@@ -390,11 +421,23 @@ void _PG_init(void)
390421
"schedule.max_workers",
391422
"How much workers can serve scheduler on one database",
392423
NULL,
393-
&sched_max_workers,
424+
&scheduler_max_workers,
394425
2,
395426
1,
396427
100,
397-
PGC_SUSET,
428+
PGC_SIGHUP,
429+
0,
430+
NULL,
431+
NULL,
432+
NULL
433+
);
434+
DefineCustomBoolVariable(
435+
"schedule.enabled",
436+
"Enable schedule service",
437+
NULL,
438+
&scheduler_service_enabled,
439+
false,
440+
PGC_SIGHUP,
398441
0,
399442
NULL,
400443
NULL,

src/pgpro_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ char *make_date_from_timestamp(TimestampTz ts);
3939
int get_integer_from_string(char *s, int start, int len);
4040
TimestampTz get_timestamp_from_string(char *str);
4141
TimestampTz _round_timestamp_to_minute(TimestampTz ts);
42+
bool is_scheduler_enabled(void);
4243

4344
#endif

src/sched_manager_poll.c

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "storage/procarray.h"
1515
#include "pgpro_scheduler.h"
1616
#include "utils/resowner.h"
17+
#include "memutils.h"
1718

1819
#include "postmaster/bgworker.h"
1920

@@ -40,6 +41,14 @@ char *supervisor_state(schd_managers_poll_t *poll)
4041
char *status;
4142
int len;
4243

44+
if(!poll->enabled)
45+
{
46+
status = palloc(sizeof(char) * 9);
47+
memcpy(status, "disabled", 8);
48+
status[8] = 0;
49+
return status;
50+
}
51+
4352
len = dbnames ? strlen(dbnames): 0;
4453
if(len == 0)
4554
{
@@ -130,7 +139,8 @@ int stopAllManagers(schd_managers_poll_t *poll)
130139
destroyManagerRecord(poll->workers[i]);
131140
}
132141
pfree(poll->workers);
133-
pfree(poll);
142+
poll->workers = NULL;
143+
poll->n = 0;
134144

135145
return 1;
136146
}
@@ -145,10 +155,17 @@ void destroyManagerRecord(schd_manager_t *man)
145155
schd_managers_poll_t *initSchedulerManagerPool(char_array_t *names)
146156
{
147157
int i;
148-
schd_managers_poll_t *p = palloc(sizeof(schd_managers_poll_t));
158+
schd_managers_poll_t *p = worker_alloc(sizeof(schd_managers_poll_t));
149159

150160
p->n = 0;
151161
p->workers = NULL;
162+
p->enabled = true;
163+
164+
if(!is_scheduler_enabled())
165+
{
166+
p->enabled = false;
167+
return p;
168+
}
152169

153170
if(names->n > 0)
154171
{
@@ -335,8 +352,6 @@ int refreshManagers(char_array_t *names, schd_managers_poll_t *poll)
335352
char_array_t *same = makeCharArray();
336353
char_array_t *delete = makeCharArray();
337354

338-
elog(LOG, "NEW names: %d", names->n);
339-
340355
for(i=0; i < names->n; i++)
341356
{
342357
found = 0;

src/sched_manager_poll.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ typedef struct {
3232
typedef struct {
3333
int n;
3434
schd_manager_t **workers;
35+
bool enabled;
3536
} schd_managers_poll_t;
3637

3738
void changeChildBgwState(schd_manager_share_t *, schd_manager_status_t);

src/scheduler_executor.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ void executor_worker_main(Datum arg)
145145
{
146146
/* success = false; */
147147
status = SchdExecutorError;
148-
SetConfigOption("schedule.transaction_state", "failure", PGC_INTERNAL, PGC_S_SESSION);
149148
if(error)
150149
{
151150
push_executor_error(&EE, "error on %d: %s", i+1, error);
@@ -156,6 +155,7 @@ void executor_worker_main(Datum arg)
156155
push_executor_error(&EE, "error on %d: code: %d", i+1, ret);
157156
}
158157
ABORT_SPI_SNAP();
158+
SetConfigOption("schedule.transaction_state", "failure", PGC_INTERNAL, PGC_S_SESSION);
159159
executor_onrollback(job, &EE);
160160

161161
break;

src/scheduler_manager.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,23 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
12231223
STOP_SPI_SNAP();
12241224
}
12251225

1226+
bool check_parent_stop_signal(scheduler_manager_ctx_t *ctx)
1227+
{
1228+
schd_manager_share_t *shared;
1229+
1230+
shared = dsm_segment_address(ctx->seg);
1231+
if(shared->setbyparent)
1232+
{
1233+
shared->setbyparent = false;
1234+
if(shared->status == SchdManagerStop)
1235+
{
1236+
elog(LOG, "Recieve stop signal from parent");
1237+
return true;
1238+
}
1239+
}
1240+
return false;
1241+
}
1242+
12261243
void set_slots_stat_report(scheduler_manager_ctx_t *ctx)
12271244
{
12281245
char state[128];
@@ -1314,6 +1331,7 @@ void manager_worker_main(Datum arg)
13141331
if(rc & WL_LATCH_SET)
13151332
{
13161333
_pdebug("got latch from some bgworker");
1334+
if(check_parent_stop_signal(ctx)) break;
13171335
scheduler_check_slots(ctx);
13181336
set_slots_stat_report(ctx);
13191337
_pdebug("quit got latch");

src/scheduler_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,6 @@ int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz
9494
int mark_job_broken(scheduler_manager_ctx_t *ctx, int cron_id, char *reason);
9595
void manager_fatal_error(scheduler_manager_ctx_t *ctx, int ecode, char *message, ...) pg_attribute_printf(3, 4);
9696
void set_slots_stat_report(scheduler_manager_ctx_t *ctx);
97+
bool check_parent_stop_signal(scheduler_manager_ctx_t *ctx);
9798

9899
#endif

0 commit comments

Comments
 (0)