Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 19ee000

Browse files
author
Vladimir Ershov
committed
proper exit procedure
set job broken fatal manager error pocessing
1 parent 73569c8 commit 19ee000

File tree

7 files changed

+136
-36
lines changed

7 files changed

+136
-36
lines changed

src/pgpro_scheduler.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,6 @@ void parent_scheduler_main(Datum arg)
287287
{
288288
for(i=0; i < poll->n; i++)
289289
{
290-
/* toc = shm_toc_attach(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(poll->workers[i]->shared));
291-
shared = shm_toc_lookup(toc, 0); */
292290
shared = dsm_segment_address(poll->workers[i]->shared);
293291

294292
if(shared->setbychild)
@@ -301,7 +299,12 @@ void parent_scheduler_main(Datum arg)
301299
}
302300
else if(shared->status == SchdManagerQuit)
303301
{
304-
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1);
302+
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, true);
303+
set_supervisor_pgstatus(poll);
304+
}
305+
else if(shared->status == SchdManagerDie)
306+
{
307+
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, false);
305308
set_supervisor_pgstatus(poll);
306309
}
307310
else
@@ -320,7 +323,7 @@ void parent_scheduler_main(Datum arg)
320323
stopAllManagers(poll);
321324
delete_worker_mem_ctx();
322325

323-
proc_exit(1);
326+
proc_exit(0);
324327
}
325328

326329
void

src/sched_manager_poll.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,15 @@ void changeChildBgwState(schd_manager_share_t *s, schd_manager_status_t status)
8888
s->setbychild = true;
8989

9090
parent = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
91-
if(parent) SetLatch(&parent->procLatch);
92-
elog(LOG, "set LATCH %d - %d" , MyBgworkerEntry->bgw_notify_pid, status);
91+
if(parent)
92+
{
93+
SetLatch(&parent->procLatch);
94+
elog(LOG, "set LATCH to %d - status = %d" , MyBgworkerEntry->bgw_notify_pid, status);
95+
}
96+
else
97+
{
98+
elog(LOG, "unable to set LATCH to %d", MyBgworkerEntry->bgw_notify_pid);
99+
}
93100
}
94101

95102
int stopAllManagers(schd_managers_poll_t *poll)
@@ -98,6 +105,8 @@ int stopAllManagers(schd_managers_poll_t *poll)
98105
PGPROC *child;
99106
schd_manager_share_t *shared;
100107

108+
elog(LOG, "Stop all managers");
109+
101110
for(i=0; i < poll->n; i++)
102111
{
103112
shared = dsm_segment_address(poll->workers[i]->shared);
@@ -108,7 +117,10 @@ int stopAllManagers(schd_managers_poll_t *poll)
108117
{
109118
elog(LOG, "cannot get PGRPOC of %s scheduler", poll->workers[i]->dbname);
110119
}
111-
SetLatch(&child->procLatch);
120+
else
121+
{
122+
SetLatch(&child->procLatch);
123+
}
112124
}
113125

114126
/* MAYBE: WAIT? */
@@ -212,7 +224,7 @@ void _sortPollManagers(schd_managers_poll_t *poll)
212224
qsort(poll->workers, poll->n, sizeof(schd_manager_t *), __cmp_managers);
213225
}
214226

215-
int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort)
227+
int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort, bool stop_worker)
216228
{
217229
int found = 0;
218230
int i;
@@ -230,8 +242,11 @@ int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort)
230242
if(found == 0) return 0;
231243
mng = poll->workers[i];
232244

233-
elog(LOG, "Stop scheduler manager for %s", mng->dbname);
234-
TerminateBackgroundWorker(mng->handler);
245+
if(stop_worker)
246+
{
247+
elog(LOG, "Stop scheduler manager for %s", mng->dbname);
248+
TerminateBackgroundWorker(mng->handler);
249+
}
235250

236251
if(poll->n == 1)
237252
{
@@ -357,7 +372,7 @@ int refreshManagers(char_array_t *names, schd_managers_poll_t *poll)
357372
{
358373
for(i = 0; i < delete->n; i++)
359374
{
360-
removeManagerFromPoll(poll, delete->data[i], 0);
375+
removeManagerFromPoll(poll, delete->data[i], 0, true);
361376
}
362377
}
363378
if(new->n)

src/sched_manager_poll.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void destroyManagerRecord(schd_manager_t *man);
4141
int stopAllManagers(schd_managers_poll_t *poll);
4242
int isBaseListChanged(char_array_t *names, schd_managers_poll_t *pool);
4343
void _sortPollManagers(schd_managers_poll_t *poll);
44-
int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort);
44+
int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort, bool stop_worker);
4545
int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort);
4646
int refreshManagers(char_array_t *names, schd_managers_poll_t *poll);
4747
char *poll_dbnames(schd_managers_poll_t *poll);

src/scheduler_executor.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ void executor_worker_main(Datum arg)
185185
}
186186
*/
187187
shared->next_time = get_next_excution_time(job->next_time_statement, &EE);
188+
if(shared->next_time == 0)
189+
{
190+
shared->set_invalid = true;
191+
sprintf(shared->set_invalid_reason, "unable to execute next time statement");
192+
}
188193
}
189194
pgstat_report_activity(STATE_RUNNING, "finish job processing");
190195

src/scheduler_executor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ typedef struct {
2828
char message[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
2929

3030
TimestampTz next_time;
31+
32+
bool set_invalid;
33+
char set_invalid_reason[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
3134
} schd_executor_share_t;
3235

3336
typedef struct {
@@ -41,7 +44,7 @@ void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee);
4144
TimestampTz get_next_excution_time(char *sql, executor_error_t *ee);
4245
int executor_onrollback(job_t *job, executor_error_t *ee);
4346
void set_pg_var(bool resulti, executor_error_t *ee);
44-
int push_executor_error(executor_error_t *e, char *fmt, ...) __attribute__ ((format (gnu_printf, 2, 3)));
47+
int push_executor_error(executor_error_t *e, char *fmt, ...) pg_attribute_printf(2, 3);
4548
int set_session_authorization(char *username, char **error);
4649

4750

src/scheduler_manager.c

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ char *get_scheduler_nodename(void)
115115
return _copy_string((char *)(opt == NULL || strlen(opt) == 0 ? "master": opt));
116116
}
117117

118-
scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname)
118+
scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname, dsm_segment *seg)
119119
{
120120
int i;
121121
scheduler_manager_ctx_t *ctx;
@@ -126,6 +126,8 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname)
126126
ctx->nodename = get_scheduler_nodename();
127127
ctx->database = _copy_string(dbname);
128128

129+
ctx->seg = seg;
130+
129131
ctx->slots = worker_alloc(sizeof(scheduler_manager_slot_t *) * ctx->slots_len);
130132
for(i=0; i < ctx->slots_len; i++)
131133
{
@@ -169,10 +171,22 @@ void destroy_scheduler_manager_context(scheduler_manager_ctx_t *ctx)
169171
pfree(ctx);
170172
}
171173

172-
void scheduler_manager_stop(scheduler_manager_ctx_t *ctx)
174+
int scheduler_manager_stop(scheduler_manager_ctx_t *ctx)
173175
{
176+
int i;
177+
int onwork;
178+
179+
onwork = ctx->slots_len - ctx->free_slots;
180+
if(onwork == 0) return 0;
181+
174182
pgstat_report_activity(STATE_RUNNING, "stop executors");
175-
/* TODO stop worker but before stop all started kid workers */
183+
for(i=0; i < onwork; i++)
184+
{
185+
elog(LOG, "Schedule manager: terminate bgworker %d",
186+
ctx->slots[i]->pid);
187+
TerminateBackgroundWorker(ctx->slots[i]->handler);
188+
}
189+
return onwork;
176190
}
177191

178192
scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *nt)
@@ -588,6 +602,8 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
588602
shm_data->start_at = item->job->start_at;
589603
shm_data->message[0] = 0;
590604
shm_data->next_time = 0;
605+
shm_data->set_invalid = false;
606+
shm_data->set_invalid_reason[0] = 0;
591607

592608
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
593609
BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -808,12 +824,17 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
808824
}
809825
else if(toremove[i].reason == RmDone)
810826
{
827+
shm_data = dsm_segment_address(item->shared);
811828
job_status = true;
829+
if(shm_data->message[0] != 0)
830+
{
831+
set_job_error(item->job, "%s", shm_data->message);
832+
}
812833
}
813834
else if(toremove[i].reason == RmError)
814835
{
815836
shm_data = dsm_segment_address(item->shared);
816-
if(strlen(shm_data->message) > 0)
837+
if(shm_data->message[0] != 0)
817838
{
818839
set_job_error(item->job, "%s", shm_data->message);
819840
}
@@ -830,19 +851,23 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
830851
if(removeJob)
831852
{
832853
START_SPI_SNAP();
854+
shm_data = dsm_segment_address(item->shared);
855+
856+
if(shm_data->set_invalid)
857+
{
858+
mark_job_broken(ctx, item->job->cron_id, shm_data->set_invalid_reason);
859+
}
833860
if(item->job->next_time_statement)
834861
{
835-
shm_data = dsm_segment_address(item->shared);
836862
if(shm_data->next_time > 0)
837863
{
838864
next_time = _round_timestamp_to_minute(shm_data->next_time);
839865
next_time_str = make_date_from_timestamp(next_time);
840866
if(insert_at_record(ctx->nodename, item->job->cron_id, next_time, 0, &error) < 0)
841867
{
842-
elog(ERROR, "Cannot insert next time at record: %s",
843-
error ? error: "unknown error");
868+
manager_fatal_error(ctx, 0, "Cannot insert next time at record: %s", error ? error: "unknown error");
844869
}
845-
update_cron_texttime(item->job->cron_id, next_time);
870+
update_cron_texttime(ctx,item->job->cron_id, next_time);
846871
if(!item->job->error)
847872
{
848873
set_job_error(item->job, "set next exec time: %s", next_time_str);
@@ -868,7 +893,25 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
868893
return 1;
869894
}
870895

871-
int update_cron_texttime(int cron_id, TimestampTz next)
896+
int mark_job_broken(scheduler_manager_ctx_t *ctx, int cron_id, char *reason)
897+
{
898+
Oid types[2] = { INT4OID, TEXTOID };
899+
Datum values[2];
900+
char *error;
901+
char *sql = "update schedule.cron set reason = $2, broken = true where id = $1";
902+
int ret;
903+
904+
values[0] = Int32GetDatum(cron_id);
905+
values[1] = CStringGetTextDatum(reason);
906+
ret = execute_spi_sql_with_args(sql, 2, types, values, NULL, &error);
907+
if(ret < 0)
908+
{
909+
manager_fatal_error(ctx, 0, "Cannot set cron %d broken: %s", cron_id, error);
910+
}
911+
return ret;
912+
}
913+
914+
int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz next)
872915
{
873916
Oid types[2] = { INT4OID, TIMESTAMPTZOID };
874917
Datum values[2];
@@ -889,7 +932,7 @@ int update_cron_texttime(int cron_id, TimestampTz next)
889932
ret = execute_spi_sql_with_args(sql, 2, types, values, nulls, &error);
890933
if(ret < 0)
891934
{
892-
elog(ERROR, "Cannot update cron %d next time: %s", cron_id, error);
935+
manager_fatal_error(ctx, 0, "Cannot update cron %d next time: %s", cron_id, error);
893936
}
894937

895938
return ret;
@@ -1025,6 +1068,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10251068
{
10261069
n_exec_dates = 0;
10271070
ntimes = 0;
1071+
realloced = false;
10281072

10291073
next_times = scheduler_calc_next_task_time(&(tasks[i]),
10301074
GetCurrentTimestamp(), timestamp_add_seconds(0, 600),
@@ -1035,12 +1079,13 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10351079
{
10361080
date1 = make_date_from_timestamp(start);
10371081
date2 = make_date_from_timestamp(stop);
1082+
10381083

10391084
for(j=0; j < n_exec_dates; j++)
10401085
{
10411086
r1 = strcmp(date1, exec_dates[j]);
10421087
r2 = strcmp(exec_dates[j], date2);
1043-
if((r1 == 0 || r1 == -1) && (r2 == 0 || r2 == -1))
1088+
if(r1 <= 0 && r2 <= 0)
10441089
{
10451090
if(!realloced)
10461091
{
@@ -1069,7 +1114,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10691114
{
10701115
if(insert_at_record(ctx->nodename, tasks[i].id, next_times[j], tasks[i].postpone, &error) < 0)
10711116
{
1072-
elog(ERROR, "Cannot insert AT task: %s", error ? error: "unknown error");
1117+
manager_fatal_error(ctx, 0, "Cannot insert AT task: %s", error ? error: "unknown error");
10731118
}
10741119
}
10751120
pfree(next_times);
@@ -1083,18 +1128,18 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10831128
return ntasks;
10841129
}
10851130

1086-
void clean_at_table(void)
1131+
void clean_at_table(scheduler_manager_ctx_t *ctx)
10871132
{
10881133
char *error = NULL;
10891134

10901135
START_SPI_SNAP();
10911136
if(execute_spi("truncate schedule.at", &error) < 0)
10921137
{
1093-
elog(ERROR, "Cannot clean 'at' table: %s", error);
1138+
manager_fatal_error(ctx, 0, "Cannot clean 'at' table: %s", error);
10941139
}
10951140
if(execute_spi("update schedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL", &error) < 0)
10961141
{
1097-
elog(ERROR, "Cannot clean cron _next time: %s", error);
1142+
manager_fatal_error(ctx, 0, "Cannot clean cron _next time: %s", error);
10981143
}
10991144
STOP_SPI_SNAP();
11001145
}
@@ -1120,6 +1165,7 @@ void manager_worker_main(Datum arg)
11201165

11211166
if(shared->status != SchdManagerInit && !(shared->setbyparent))
11221167
{
1168+
dsm_detach(seg);
11231169
ereport(ERROR,
11241170
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
11251171
errmsg("corrupted dynamic shared memory segment")));
@@ -1150,6 +1196,7 @@ void manager_worker_main(Datum arg)
11501196
dsm_detach(seg);
11511197
proc_exit(0);
11521198
}
1199+
elog(LOG, "ON");
11531200
SetCurrentStatementStartTimestamp();
11541201
pgstat_report_activity(STATE_RUNNING, "initialize.");
11551202

@@ -1160,9 +1207,9 @@ void manager_worker_main(Datum arg)
11601207
pgstat_report_activity(STATE_RUNNING, "initialize context");
11611208
changeChildBgwState(shared, SchdManagerConnected);
11621209
init_worker_mem_ctx("WorkerMemoryContext");
1163-
ctx = initialize_scheduler_manager_context(database);
1164-
clean_at_table();
1165-
1210+
ctx = initialize_scheduler_manager_context(database, seg);
1211+
clean_at_table(ctx);
1212+
elog(LOG, "Start main loop");
11661213
while(!got_sigterm)
11671214
{
11681215
if(rc)
@@ -1194,12 +1241,35 @@ void manager_worker_main(Datum arg)
11941241
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L);
11951242
ResetLatch(MyLatch);
11961243
}
1244+
scheduler_manager_stop(ctx);
11971245
delete_worker_mem_ctx();
1198-
/* destroy_scheduler_manager_context(ctx); - no need any more */
11991246
changeChildBgwState(shared, SchdManagerDie);
12001247
pfree(database);
12011248
dsm_detach(seg);
12021249
proc_exit(0);
12031250
}
12041251

1252+
void manager_fatal_error(scheduler_manager_ctx_t *ctx, int ecode, char *message, ...)
1253+
{
1254+
va_list arglist;
1255+
char buf[1024];
1256+
1257+
scheduler_manager_stop(ctx);
1258+
changeChildBgwState((schd_manager_share_t *)(dsm_segment_address(ctx->seg)), SchdManagerDie);
1259+
dsm_detach(ctx->seg);
1260+
1261+
va_start(arglist, message);
1262+
vsnprintf(buf, 1024, message, arglist);
1263+
va_end(arglist);
1264+
1265+
1266+
delete_worker_mem_ctx();
1267+
if(ecode == 0)
1268+
{
1269+
ecode = ERRCODE_INTERNAL_ERROR;
1270+
}
1271+
1272+
ereport(ERROR, (errcode(ecode), errmsg("%s", buf)));
1273+
}
1274+
12051275

0 commit comments

Comments
 (0)