Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 079fc46

Browse files
author
Vladimir Ershov
committed
submit job fix
1 parent 9b5cf96 commit 079fc46

File tree

9 files changed

+115
-25
lines changed

9 files changed

+115
-25
lines changed

src/pgpro_scheduler.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -848,10 +848,15 @@ nodename(PG_FUNCTION_ARGS)
848848
text *text_p;
849849
int len;
850850
char *nname;
851+
bool drop_ctx = false;
851852

852-
init_worker_mem_ctx("get_nodename call");
853+
if(!is_worker_context_initialized())
854+
{
855+
init_worker_mem_ctx("get_nodename call");
856+
drop_ctx = true;
857+
}
853858
nname = get_scheduler_nodename(CurrentMemoryContext);
854-
drop_worker_context();
859+
if(drop_ctx) drop_worker_context();
855860

856861
len = strlen(nname);
857862
text_p = (text *) palloc(sizeof(char)*len + VARHDRSZ);

src/scheduler_executor.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
797797
scheduler_atjob_id_OID = get_atttype(subm_rel_oid, 1);
798798

799799

800+
800801
job = get_at_job_for_process(mem, shared->nodename, &error);
801802
if(!job)
802803
{

src/scheduler_job.c

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit, char **set_error)
568568
char *this_error = NULL;
569569
Datum values[21];
570570
char nulls[21];
571-
Oid argtypes[21] = { scheduler_atjob_id_OID };
571+
Oid argtypes[21];
572572
bool canceled = false;
573573
int i;
574574
char *oldpath;
@@ -577,6 +577,7 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit, char **set_error)
577577
int n = 1;
578578
spi_response_t *r;
579579
spi_response_t *r2;
580+
spi_response_t *r3;
580581

581582
const char *get_sql = "select * from at_jobs_process where id = $1";
582583
const char *insert_sql = "insert into at_jobs_done values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)";
@@ -585,6 +586,7 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit, char **set_error)
585586

586587
oldpath = set_schema(NULL, true);
587588

589+
argtypes[0] = scheduler_atjob_id_OID;
588590
values[0] = scheduler_atjob_id_OID == INT8OID ?
589591
Int64GetDatum(job->cron_id): Int32GetDatum(job->cron_id);
590592

@@ -678,20 +680,22 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit, char **set_error)
678680
destroy_spi_data(r2);
679681
return -1;
680682
}
681-
destroy_spi_data(r);
682-
destroy_spi_data(r2);
683683

684-
r = execute_spi_sql_with_args(CurrentMemoryContext, delete_sql, 1, argtypes, values, NULL);
684+
r3 = execute_spi_sql_with_args(CurrentMemoryContext, delete_sql, 1, argtypes, values, NULL);
685685
set_schema(oldpath, false);
686686
pfree(oldpath);
687687

688-
if(r->retval != SPI_OK_DELETE)
688+
if(r3->retval != SPI_OK_DELETE)
689689
{
690-
*set_error = _mcopy_string(CurrentMemoryContext, r->error);
690+
*set_error = _mcopy_string(CurrentMemoryContext, r3->error);
691691
destroy_spi_data(r);
692+
destroy_spi_data(r2);
693+
destroy_spi_data(r3);
692694
return -1;
693695
}
694696
destroy_spi_data(r);
697+
destroy_spi_data(r2);
698+
destroy_spi_data(r3);
695699

696700
return 1;
697701
}

src/scheduler_mtm.c

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "catalog/namespace.h"
2727
#include "catalog/pg_proc.h"
2828
#include "utils/builtins.h"
29+
#include "dynloader.h"
2930

3031
#include "char_array.h"
3132
#include "pgpro_scheduler.h"
@@ -48,6 +49,14 @@ extern volatile sig_atomic_t got_sigterm;
4849

4950
sched_mtm_node_list_t *MTM_nodes = NULL;
5051

52+
void (*fptr_MtmToggleDML)(void) = NULL;
53+
MtmNodeStatus (*fptr_MtmGetCurrentStatus)(void) = NULL;
54+
nodemask_t (*fptr_MtmGetDisabledNodeMask)(void) = NULL;
55+
56+
MtmNodeStatus call_MtmGetCurrentStatus(void);
57+
nodemask_t call_MtmGetDisabledNodeMask(void);
58+
void call_MtmToggleDML(void);
59+
5160
/* Returns Oid of "mtm_nodes" table
5261
* and stores in in global var. Consequental calls will only return
5362
* stored value
@@ -142,6 +151,59 @@ set_state(PG_FUNCTION_ARGS)
142151
PG_RETURN_BOOL(state);
143152
}
144153

154+
nodemask_t call_MtmGetDisabledNodeMask(void)
155+
{
156+
void *ptr;
157+
158+
if(fptr_MtmGetDisabledNodeMask) (*fptr_MtmGetDisabledNodeMask)();
159+
ptr = pg_dlsym(RTLD_DEFAULT, "MtmGetDisabledNodeMask");
160+
if(ptr)
161+
{
162+
fptr_MtmGetDisabledNodeMask = (nodemask_t (*)(void))ptr;
163+
return (*fptr_MtmGetDisabledNodeMask)();
164+
}
165+
else
166+
{
167+
elog(ERROR, "could not find object MtmGetDisabledNodeMask in multimaster object");
168+
}
169+
}
170+
171+
MtmNodeStatus call_MtmGetCurrentStatus(void)
172+
{
173+
void *ptr;
174+
175+
if(fptr_MtmGetCurrentStatus) (*fptr_MtmGetCurrentStatus)();
176+
ptr = pg_dlsym(RTLD_DEFAULT, "MtmGetCurrentStatus");
177+
if(ptr)
178+
{
179+
fptr_MtmGetCurrentStatus = (MtmNodeStatus (*)(void))ptr;
180+
return (*fptr_MtmGetCurrentStatus)();
181+
}
182+
else
183+
{
184+
elog(ERROR, "could not find object MtmGetCurrentStatus in multimaster object");
185+
}
186+
}
187+
188+
/* Function for find and call MtmToggleDML from multimaster.so */
189+
190+
void call_MtmToggleDML(void)
191+
{
192+
void *ptr;
193+
194+
if(fptr_MtmToggleDML) (*fptr_MtmToggleDML)();
195+
ptr = pg_dlsym(RTLD_DEFAULT, "MtmToggleDML");
196+
if(ptr)
197+
{
198+
fptr_MtmToggleDML = (void (*)(void))ptr;
199+
(*fptr_MtmToggleDML)();
200+
}
201+
else
202+
{
203+
elog(ERROR, "could not find object MtmToggleDML in multimaster object");
204+
}
205+
}
206+
145207
/*
146208
*-------------------------------------------------------
147209
* HERE we set status of the MTM node in special table schedule.mtm_nodes
@@ -230,7 +292,7 @@ void set_mtm_node_status(bool status)
230292
index_endscan(indexScan);
231293
index_close(nodes_idx, RowExclusiveLock);
232294
heap_close(nodes_heap, RowExclusiveLock);
233-
MtmToggleDML();
295+
call_MtmToggleDML();
234296
if(use_transaction) CommitTransactionCommand();
235297
}
236298

@@ -354,7 +416,7 @@ void mtm_scheduler_StartTransactionCommand(void)
354416

355417
if(is_under_mtm())
356418
{
357-
if(MtmGetCurrentStatus() != MTM_ONLINE)
419+
if(call_MtmGetCurrentStatus() != MTM_ONLINE)
358420
{
359421
/* TODO try to change ps string */
360422
while(!got_sigterm)
@@ -369,7 +431,7 @@ void mtm_scheduler_StartTransactionCommand(void)
369431
#endif
370432
if(rc & WL_POSTMASTER_DEATH) proc_exit(1);
371433
ResetLatch(MyLatch);
372-
if(MtmGetCurrentStatus() == MTM_ONLINE) break;
434+
if(call_MtmGetCurrentStatus() == MTM_ONLINE) break;
373435
}
374436
if(got_sigterm)
375437
{
@@ -433,7 +495,7 @@ void check_mtm_nodes_status(void)
433495
if(MTM_nodes == NULL) return;
434496

435497
mtm_scheduler_StartTransactionCommand();
436-
mtm_disabled = MtmGetDisabledNodeMask();
498+
mtm_disabled = call_MtmGetDisabledNodeMask();
437499

438500
LWLockAcquire((LWLockId)&MTM_nodes->locks[0], LW_EXCLUSIVE);
439501

@@ -600,7 +662,7 @@ bool is_mtm_node_available(int node_id)
600662
{
601663
nodemask_t mtm_disabled;
602664

603-
mtm_disabled = MtmGetDisabledNodeMask();
665+
mtm_disabled = call_MtmGetDisabledNodeMask();
604666
return !BIT_CHECK(mtm_disabled, node_id-1);
605667
}
606668

src/scheduler_mtm.o

-53.8 KB
Binary file not shown.

src/scheduler_spi_utils.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,29 @@ spi_response_t *__copy_spi_data(MemoryContext ctx, int ret, int n)
121121
r->n_rows = 0;
122122
r->n_attrs = 0;
123123

124+
MemoryContextSwitchTo(old);
124125
return r;
125126
}
126127
r->n_rows = n;
127128
r->n_attrs = SPI_tuptable->tupdesc->natts;
128129
r->types = MemoryContextAlloc(ctx, sizeof(Oid) * r->n_attrs);
129-
if(!r->types) return NULL;
130+
if(!r->types)
131+
{
132+
MemoryContextSwitchTo(old);
133+
return NULL;
134+
}
130135
r->rows = MemoryContextAlloc(ctx, sizeof(spi_val_t *) * n);
131-
if(!r->rows) return NULL;
136+
if(!r->rows)
137+
{
138+
MemoryContextSwitchTo(old);
139+
return NULL;
140+
}
132141
r->ref = MemoryContextAlloc(ctx, sizeof(bool) * r->n_attrs);
133-
if(!r->ref) return NULL;
142+
if(!r->ref)
143+
{
144+
MemoryContextSwitchTo(old);
145+
return NULL;
146+
}
134147

135148
for(i=0; i < r->n_attrs; i++)
136149
{
@@ -141,7 +154,11 @@ spi_response_t *__copy_spi_data(MemoryContext ctx, int ret, int n)
141154
for(i=0; i < n; i++)
142155
{
143156
r->rows[i] = MemoryContextAlloc(ctx, sizeof(spi_val_t) * r->n_attrs);
144-
if(!(r->rows[i])) return NULL;
157+
if(!(r->rows[i]))
158+
{
159+
MemoryContextSwitchTo(old);
160+
return NULL;
161+
}
145162
for(j=0; j < r->n_attrs; j++)
146163
{
147164
dat = SPI_getbinval(SPI_tuptable->vals[i],

test/perl/runtest.pl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@
9898
"ALTER SYSTEM SET schedule.database = '$dbname'",
9999
"ALTER SYSTEM SET schedule.enabled = on",
100100
"SELECT pg_reload_conf();",
101-
"CREATE TABLE test_results( time_mark timestamp, commentary text )",
101+
"CREATE TABLE test_results(id serial primary key, time_mark timestamp, commentary text )",
102102
"DROP ROLE IF EXISTS tester",
103103
"CREATE ROLE tester",
104104
"GRANT INSERT ON test_results TO tester",
105-
"CREATE TABLE task_info (pid integer, name text, vanished timestamp, finished boolean default false)",
105+
"CREATE TABLE task_info (node text, pid integer, name text, vanished timestamp, finished boolean default false, primary key (node, pid))",
106106
"GRANT ALL ON task_info TO tester",
107107
);
108108
map { __do_sql($dbh, $_) } @sql2;
@@ -122,7 +122,7 @@
122122
);
123123
my $harness = TAP::Harness->new( \%args );
124124
my @tests = glob( 't/*.t' );
125-
#@tests = ('t/terminateBackend.t');
125+
## @tests = ('t/simpleSubmitJob.t', 't/submitJobDependsOn.t');
126126
$harness->runtests(@tests);
127127

128128

test/perl/t/resubmitJobWithLimit.t

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ $dbh->do($query);
1313
ok($dbh->err == 0) or (print $DBI::errstr . "\n" and $dbh->disconnect() and BAIL_OUT);
1414

1515
$query = "SELECT * from schedule.submit_job(\'INSERT INTO test_results
16-
(time_mark, commentary) VALUES(now(), ''resubmitJob''); SELECT * from schedule.resubmit();\',
16+
(time_mark, commentary) VALUES(now(), ''resubmitJob'');
17+
SELECT * from schedule.resubmit();\',
1718
resubmit_limit := 1);";
1819
my $sth = $dbh->prepare($query);
1920
ok($sth->execute()) or (print $DBI::errstr . "\n" and $dbh->disconnect() and BAIL_OUT);

test/perl/t/terminateBackend.t

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ sub one_task_do
4646
'name', '$name' ,
4747
'cron', '* * * * *',
4848
'commands', jsonb_build_array(
49-
'insert into task_info values ( pg_backend_pid(), ''$name'')',
49+
'insert into task_info values ( schedule.nodename(), pg_backend_pid(), ''$name'')',
5050
'$sql_part',
51-
'update task_info set finished = true where pid = pg_backend_pid()'
51+
'update task_info set finished = true where pid = pg_backend_pid() and node = schedule.nodename()'
5252
),
5353
'max_instances', 1$add_to_task
5454
)
@@ -69,7 +69,7 @@ sub one_task_do
6969
ok($sth->execute(), "terminate $name job") or BAIL_OUT($DBI::errstr);
7070
$sth->finish();
7171

72-
$sth = $dbh->prepare('UPDATE task_info SET vanished = now() where pid = ?');
72+
$sth = $dbh->prepare('UPDATE task_info SET vanished = now() where pid = ? and node = schedule.nodename()');
7373
ok($sth->execute($pid), "set $name task vanished") or BAIL_OUT(print $DBI::errstr);
7474
$sth->finish;
7575

@@ -87,7 +87,7 @@ sub wait_for_task_to_begin
8787
my $how_long = shift;
8888

8989
my $iter = $how_long;
90-
my $sth1 = $db->prepare('SELECT pid from task_info where name = ? and vanished is null and finished = false limit 1');
90+
my $sth1 = $db->prepare('SELECT pid from task_info where name = ? and vanished is null and finished = false and node = schedule.nodename() limit 1');
9191

9292
while($iter-- > 0)
9393
{

0 commit comments

Comments
 (0)