Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit be33043

Browse files
author
Vladimir Ershov
committed
working copy before tests
1 parent 3a09cce commit be33043

9 files changed

+137
-89
lines changed

src/pgpro_scheduler.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
PG_MODULE_MAGIC;
5050
#endif
5151

52-
int _makeTheOnlyNodesToSelect(sheduler_nodes_data_t *dst, char *nodename);
52+
int _makeTheOnlyNodesToSelect(scheduler_nodes_data_t *dst, char *nodename);
5353

5454
static const char *show_scheduler_nodename(void);
5555
static const char *show_scheduler_database(void);
@@ -507,11 +507,11 @@ char_array_t *readBasesToCheck(void)
507507
return result;
508508
}
509509

510-
int _makeTheOnlyNodesToSelect(sheduler_nodes_data_t *dst, char *nodename)
510+
int _makeTheOnlyNodesToSelect(scheduler_nodes_data_t *dst, char *nodename)
511511
{
512512
dst->n = 1;
513513
dst->argtypes = palloc(sizeof(Oid) * 1);
514-
dst->values = palloc(sizeof(Datum) * 1)
514+
dst->values = palloc(sizeof(Datum) * 1);
515515
dst->cond = my_copy_string(" = $1 ");
516516

517517
dst->values[0] = CStringGetTextDatum(nodename);
@@ -520,7 +520,7 @@ int _makeTheOnlyNodesToSelect(sheduler_nodes_data_t *dst, char *nodename)
520520
return dst->n;
521521
}
522522

523-
int getNodesToSelect(sheduler_nodes_data_t *dst, char *nodename)
523+
int getNodesToSelect(scheduler_nodes_data_t *dst, char *nodename)
524524
{
525525
#ifdef _WITH_MTM
526526
StringInfo cond;
@@ -535,21 +535,21 @@ int getNodesToSelect(sheduler_nodes_data_t *dst, char *nodename)
535535
}
536536
dst->n = MTM_nodes->need_help + 1;
537537
dst->argtypes = palloc(sizeof(Oid) * dst->n);
538-
dst->values = palloc(sizeof(Datum) * dst->n)
538+
dst->values = palloc(sizeof(Datum) * dst->n);
539539
dst->values[0] = CStringGetTextDatum(nodename);
540540
dst->argtypes[0] = TEXTOID;
541541

542-
cond = initStringInfo();
543-
appendStringInfo(cond, "($1");
542+
cond = makeStringInfo();
543+
appendStringInfo(cond, "in ($1");
544544
for(i=0; i < MTM_nodes->nodes_num; i++)
545545
{
546546
if(MTM_nodes->nodes[i].need_help)
547547
{
548548
n++;
549549
dst->values[n] =
550-
CStringGetTextDatum(MTM_nodes->nodes[i].nodename);
550+
CStringGetTextDatum(MTM_nodes->nodes[i].name);
551551
dst->argtypes[n] = TEXTOID;
552-
appendStringInfo(cond, ",$%d", n+1)
552+
appendStringInfo(cond, ",$%d", n+1);
553553
}
554554
}
555555
appendStringInfo(cond, ")");
@@ -563,7 +563,7 @@ int getNodesToSelect(sheduler_nodes_data_t *dst, char *nodename)
563563
#endif
564564
}
565565

566-
void destroyNodesToSelect(sheduler_nodes_data_t *nodes)
566+
void destroyNodesToSelect(scheduler_nodes_data_t *nodes)
567567
{
568568
pfree(nodes->values);
569569
pfree(nodes->argtypes);

src/pgpro_scheduler.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ typedef struct {
3434
Datum *values;
3535
Oid *argtypes;
3636
char *cond;
37-
} sheduler_nodes_data_t;
37+
} scheduler_nodes_data_t;
3838

3939
extern void worker_spi_sighup(SIGNAL_ARGS);
4040
extern void worker_spi_sigterm(SIGNAL_ARGS);
@@ -63,7 +63,7 @@ char *get_scheduler_nodename(MemoryContext mem);
6363
char_array_t *_split_string_to_char_array(char *str, bool doclean);
6464
Oid get_scheduler_schema_oid(void);
6565

66-
int getNodesToSelect(sheduler_nodes_data_t *nodes, char *nodename);
67-
void destroyNodesToSelect(sheduler_nodes_data_t *nodes);
66+
int getNodesToSelect(scheduler_nodes_data_t *nodes, char *nodename);
67+
void destroyNodesToSelect(scheduler_nodes_data_t *nodes);
6868

6969
#endif

src/scheduler_executor.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "pgpro_scheduler.h"
3636
#include "scheduler_executor.h"
3737
#include "scheduler_job.h"
38+
#include "scheduler_mtm.h"
3839
#include "scheduler_spi_utils.h"
3940
#include "memutils.h"
4041
#include "utils/elog.h"
@@ -711,6 +712,10 @@ void at_executor_worker_main(Datum arg)
711712
BackgroundWorkerInitializeConnection(shared->database, NULL);
712713
SetConfigOption("application_name", "pgp-s at executor", PGC_USERSET, PGC_S_SESSION);
713714
pgstat_report_activity(STATE_RUNNING, "initialize");
715+
#ifdef _WITH_MTM
716+
/* need to connect to mtm nodes status struct */
717+
init_scheduler_mtm_nodes();
718+
#endif
714719

715720
START_SPI_SNAP();
716721
reloid = RangeVarGetRelid(makeRangeVarFromNameList(

src/scheduler_job.c

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "scheduler_job.h"
88
#include "lib/stringinfo.h"
99
#include "scheduler_spi_utils.h"
10+
#include "scheduler_mtm.h"
1011
#include "utils/timestamp.h"
1112
#include "utils/builtins.h"
1213
#include "memutils.h"
@@ -170,19 +171,35 @@ job_t *get_at_job_for_process(MemoryContext mem, char *nodename, char **error)
170171
bool nulls[17];
171172
int i;
172173
char *oldpath;
173-
174-
/* const char *get_job_sql = "select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED"; */
175-
const char *get_job_sql = "select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL AND not exists ( select * from at_jobs_done d where d.id = any(s.depends_on) and d.status=false)) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED ";
174+
scheduler_nodes_data_t nodes;
175+
StringInfo get_job_sql;
176176
const char *insert_sql = "insert into at_jobs_process values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)";
177177
spi_response_t *r;
178178
spi_response_t *r2;
179179

180+
get_job_sql = makeStringInfo();
181+
getNodesToSelect(&nodes, nodename);
182+
183+
appendStringInfo(get_job_sql, "select * from at_jobs_submitted s where \
184+
((not exists ( select * from at_jobs_submitted s2 where \
185+
s2.id = any(s.depends_on)) AND not exists \
186+
( select * from at_jobs_process p where p.id = any(s.depends_on)) AND \
187+
s.depends_on is NOT NULL and s.at IS NULL AND not exists \
188+
( select * from at_jobs_done d where d.id = any(s.depends_on) and \
189+
d.status=false)) OR ( s.at IS NOT NULL AND at <= 'now' and \
190+
(last_start_available is NULL OR last_start_available > 'now'))) and \
191+
node %s and not canceled order by at, submit_time \
192+
limit 1 FOR UPDATE SKIP LOCKED ", nodes.cond);
193+
180194
oldpath = set_schema(NULL, true);
181195
*error = NULL;
182-
values[0] = CStringGetTextDatum(nodename);
183196

184-
r = execute_spi_sql_with_args(mem, get_job_sql, 1,
185-
argtypes, values, NULL);
197+
r = execute_spi_sql_with_args(mem, get_job_sql->data, nodes.n,
198+
nodes.argtypes, nodes.values, NULL);
199+
pfree(get_job_sql->data);
200+
pfree(get_job_sql);
201+
destroyNodesToSelect(&nodes);
202+
186203
if(r->retval != SPI_OK_SELECT)
187204
{
188205
set_schema(oldpath, false);
@@ -204,6 +221,7 @@ job_t *get_at_job_for_process(MemoryContext mem, char *nodename, char **error)
204221

205222
job->cron_id = scheduler_atjob_id_OID == INT8OID ?
206223
get_int64_from_spi(r, 0, 1, 0): get_int_from_spi(r, 0, 1, 0);
224+
job->node = get_text_from_spi(mem, r, 0, 2);
207225
job->start_at = get_timestamp_from_spi(r, 0, 5, 0);
208226
job->dosql[0] = get_text_from_spi(mem, r, 0, 6);
209227
job->sql_params = get_textarray_from_spi(mem, r, 0, 7, &job->sql_params_n);
@@ -348,18 +366,18 @@ job_t *_cron_get_jobs_to_do(MemoryContext mem, char *nodename, int *n, int *is_e
348366
{
349367
job_t *jobs = NULL;
350368
int ret, got, i;
351-
stringInfo get_job_sql;
352-
sheduler_nodes_data_t nodes;
369+
StringInfo get_job_sql;
370+
scheduler_nodes_data_t nodes;
353371

354372
get_job_sql = makeStringInfo();
355-
appendStringInfo(&get_job_sql,
373+
appendStringInfo(get_job_sql,
356374
"select at.start_at, at.last_start_available, at.cron, max_run_time, \
357375
cron.max_instances, cron.executor, cron.next_time_statement, \
358376
cron.node from at at, cron cron where start_at <= 'now' and \
359377
not at.active and (last_start_available is NULL OR \
360378
last_start_available > 'now') and at.cron = cron.id AND cron.node ");
361379
getNodesToSelect(&nodes, nodename);
362-
appendStringInfo(&get_job_sql, " %s order by at.start_at limit %d", nodes.cond, limit);
380+
appendStringInfo(get_job_sql, " %s order by at.start_at limit %d", nodes.cond, limit);
363381

364382

365383
*is_error = *n = 0;
@@ -390,6 +408,8 @@ job_t *_cron_get_jobs_to_do(MemoryContext mem, char *nodename, int *n, int *is_e
390408
{
391409
*is_error = 1;
392410
}
411+
pfree(get_job_sql->data);
412+
pfree(get_job_sql);
393413
destroyNodesToSelect(&nodes);
394414
STOP_SPI_SNAP();
395415
return jobs;
@@ -400,12 +420,16 @@ job_t *get_expired_at_jobs(char *nodename, int *n, int *is_error)
400420
StringInfoData sql;
401421
job_t *jobs = NULL;
402422
int ret, got, i;
423+
scheduler_nodes_data_t nodes;
403424

404425
*n = *is_error = 0;
426+
getNodesToSelect(&nodes, nodename);
405427
initStringInfo(&sql);
406-
appendStringInfo(&sql, "select at, last_start_available, id from ONLY at_jobs_submitted where last_start_available < 'now' and node = '%s'", nodename);
407-
ret = SPI_execute(sql.data, true, 0);
428+
appendStringInfo(&sql, "select at, last_start_available, id, node from ONLY at_jobs_submitted where last_start_available < 'now' and node %s", nodes.cond);
429+
430+
ret = SPI_execute_with_args(sql.data, nodes.n, nodes.argtypes, nodes.values, NULL, true, 0);
408431
pfree(sql.data);
432+
destroyNodesToSelect(&nodes);
409433

410434
if(ret == SPI_OK_SELECT)
411435
{
@@ -421,7 +445,7 @@ job_t *get_expired_at_jobs(char *nodename, int *n, int *is_error)
421445
jobs[i].last_start_avail = get_timestamp_from_spi(NULL, i, 2, 0);
422446
jobs[i].cron_id = scheduler_atjob_id_OID == INT8OID ?
423447
get_int64_from_spi(NULL, i, 3, 0): get_int_from_spi(NULL, i, 3, 0);
424-
jobs[i].node = my_copy_string(nodename);
448+
jobs[i].node = get_text_from_spi(CurrentMemoryContext, NULL, i, 4);
425449
}
426450
}
427451
}
@@ -437,12 +461,17 @@ job_t *get_expired_cron_jobs(char *nodename, int *n, int *is_error)
437461
StringInfoData sql;
438462
job_t *jobs = NULL;
439463
int ret, got, i;
464+
scheduler_nodes_data_t nodes;
440465

441466
*n = *is_error = 0;
442467
initStringInfo(&sql);
443-
appendStringInfo(&sql, "select start_at, last_start_available, cron, started, active from at where last_start_available < 'now' and not active and node = '%s'", nodename);
444-
ret = SPI_execute(sql.data, true, 0);
468+
appendStringInfo(&sql, "select start_at, last_start_available, cron, started, active, node from at where last_start_available < 'now' and not active and node ");
469+
getNodesToSelect(&nodes, nodename);
470+
appendStringInfoString(&sql, nodes.cond);
471+
472+
ret = SPI_execute_with_args(sql.data, nodes.n, nodes.argtypes, nodes.values, NULL, true, 0);
445473
pfree(sql.data);
474+
destroyNodesToSelect(&nodes);
446475

447476
if(ret == SPI_OK_SELECT)
448477
{
@@ -457,7 +486,7 @@ job_t *get_expired_cron_jobs(char *nodename, int *n, int *is_error)
457486
jobs[i].start_at = get_timestamp_from_spi(NULL, i, 1, 0);
458487
jobs[i].last_start_avail = get_timestamp_from_spi(NULL, i, 2, 0);
459488
jobs[i].cron_id = get_int_from_spi(NULL, i, 3, 0);
460-
jobs[i].node = my_copy_string(nodename);
489+
jobs[i].node = get_text_from_spi(CurrentMemoryContext,NULL, i, 6);
461490
}
462491
}
463492
}

0 commit comments

Comments
 (0)