Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a72e2db

Browse files
author
Vladimir Ershov
committed
util functions but some mem alloc problems near ALTER SYSTEM
1 parent 079fc46 commit a72e2db

12 files changed

+300
-22
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ Install extension as follows:
2828
$ sudo make USE_PGXS=1 install
2929
$ psql <DBNAME> -c "CREATE EXTENSION pgpro_scheduler"
3030

31+
Install extension to use it with multimaster(mmts) extension:
32+
33+
$ cd pgpro_scheduler
34+
$ make USE_PGXS=1 WITH_MTM=<path to mmts extension>
35+
$ sudo make USE_PGXS=1 install
36+
$ psql <DBNAME> -c "CREATE EXTENSION pgpro_scheduler"
37+
3138
## Configuration
3239

3340
The extension defines a number of PostgreSQL variables (GUC). This variables

pgpro_scheduler--2.2.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,6 +1450,26 @@ CREATE FUNCTION nodename()
14501450
AS 'MODULE_PATHNAME', 'nodename'
14511451
LANGUAGE C IMMUTABLE;
14521452

1453+
CREATE FUNCTION enable()
1454+
RETURNS boolean
1455+
AS 'MODULE_PATHNAME', 'scheduler_enable'
1456+
LANGUAGE C IMMUTABLE;
1457+
1458+
CREATE FUNCTION disable()
1459+
RETURNS boolean
1460+
AS 'MODULE_PATHNAME', 'scheduler_disable'
1461+
LANGUAGE C IMMUTABLE;
1462+
1463+
CREATE FUNCTION start()
1464+
RETURNS boolean
1465+
AS 'MODULE_PATHNAME', 'scheduler_start'
1466+
LANGUAGE C IMMUTABLE;
1467+
1468+
CREATE FUNCTION stop()
1469+
RETURNS boolean
1470+
AS 'MODULE_PATHNAME', 'scheduler_stop'
1471+
LANGUAGE C IMMUTABLE;
1472+
14531473
--------------
14541474
-- TRIGGERS --
14551475
--------------

src/char_array.c

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,57 @@ char_array_t *pushCharArray(char_array_t *a, const char *str)
4343
return a;
4444
}
4545

46+
char *joinCharArray(char_array_t *a, char *clue)
47+
{
48+
int i;
49+
char *ret;
50+
char *ptr;
51+
int realn = 0;
52+
size_t size = 0;
53+
int clue_len;
54+
55+
if(a->n > 0)
56+
{
57+
for(i=0; i < a->n; i++)
58+
{
59+
if(a->data[i])
60+
{
61+
realn++;
62+
size += strlen(a->data[i]);
63+
}
64+
}
65+
}
66+
if(realn == 0)
67+
{
68+
ret = (char *)palloc(sizeof(char) * 1);
69+
if(!ret) elog(ERROR, "chararray: cannot alloc memory (join)");
70+
ret[0] = 0;
71+
return ret;
72+
}
73+
clue_len = strlen(clue);
74+
size += clue_len * realn + 1;
75+
ret = (char *)palloc(sizeof(char) * realn);
76+
if(!ret) elog(ERROR, "chararray: cannot alloc memory (join)");
77+
78+
ptr = ret;
79+
for(i=0; i < a->n; i++)
80+
{
81+
if(a->data[i])
82+
{
83+
memcpy(ptr, a->data[i], strlen(a->data[i]));
84+
ptr += strlen(a->data[i]);
85+
if(--realn > 0)
86+
{
87+
memcpy(ptr, clue, clue_len);
88+
ptr += clue_len;
89+
}
90+
}
91+
}
92+
ret[size-1] = 0;
93+
94+
return ret;
95+
}
96+
4697
void destroyCharArray(char_array_t *a)
4798
{
4899
int i;

src/char_array.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ int __sort_char_string(const void *a, const void *b);
1010
char_array_t *makeCharArray(void);
1111
char_array_t *pushCharArray(char_array_t *a, const char *str);
1212
char_array_t *sortCharArray(char_array_t *a);
13+
char *joinCharArray(char_array_t *a, char *clue);
1314
void destroyCharArray(char_array_t *data);
1415

1516

src/memutils.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "memutils.h"
44

55
MemoryContext SchedulerWorkerContext = NULL;
6+
bool worker_context_faked = false;
67

78
MemoryContext init_mem_ctx(const char *name)
89
{
@@ -17,6 +18,15 @@ bool is_worker_context_initialized(void)
1718
return SchedulerWorkerContext == NULL ? false: true;
1819
}
1920

21+
MemoryContext fake_worker_mem_ctx(MemoryContext ctx)
22+
{
23+
AssertState(SchedulerWorkerContext == NULL);
24+
SchedulerWorkerContext = ctx;
25+
worker_context_faked = true;
26+
27+
return SchedulerWorkerContext;
28+
}
29+
2030
MemoryContext init_worker_mem_ctx(const char *name)
2131
{
2232
AssertState(SchedulerWorkerContext == NULL);
@@ -54,20 +64,23 @@ void *worker_alloc(Size size)
5464

5565
void drop_worker_context(void)
5666
{
57-
if(SchedulerWorkerContext)
67+
if(SchedulerWorkerContext && !worker_context_faked)
5868
{
5969
MemoryContextDelete(SchedulerWorkerContext);
60-
SchedulerWorkerContext = NULL;
6170
}
71+
SchedulerWorkerContext = NULL;
72+
worker_context_faked = false;
73+
6274
}
6375

6476
void delete_worker_mem_ctx(MemoryContext old)
6577
{
6678
if(!old) old = TopMemoryContext;
6779

6880
MemoryContextSwitchTo(old);
69-
MemoryContextDelete(SchedulerWorkerContext);
81+
if(!worker_context_faked) MemoryContextDelete(SchedulerWorkerContext);
7082
SchedulerWorkerContext = NULL;
83+
worker_context_faked = false;
7184
}
7285

7386
char *_mcopy_string(MemoryContext ctx, char *str)

src/memutils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ extern MemoryContext SchedulerWorkerContext;
88

99
MemoryContext init_worker_mem_ctx(const char *name);
1010
MemoryContext init_mem_ctx(const char *name);
11+
MemoryContext fake_worker_mem_ctx(MemoryContext ctx);
1112
MemoryContext switch_to_worker_context(void);
1213
void *worker_alloc(Size size);
1314
void delete_worker_mem_ctx(MemoryContext toswitch);

src/pgpro_scheduler.c

Lines changed: 161 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#define ENABLE_LIST_COMPAT 1
2+
13
#include "postgres.h"
24

35
#include "miscadmin.h"
@@ -33,6 +35,7 @@
3335
#include "access/sysattr.h"
3436
#include "access/htup_details.h"
3537
#include "utils/fmgroids.h"
38+
#include "nodes/pg_list.h"
3639

3740

3841
#include "char_array.h"
@@ -479,17 +482,14 @@ char_array_t *readBasesToCheck(void)
479482
destroyCharArray(names);
480483
appendStringInfo(&sql, ")");
481484

482-
elog(LOG, "SCHEDULER: select from PG_DATABASES %s", sql.data);
483485
START_SPI_SNAP();
484-
elog(LOG, "SCHEDULER: %s", sql.data);
485486

486487
ret = SPI_execute(sql.data, true, 0);
487488
if (ret != SPI_OK_SELECT)
488489
{
489490
STOP_SPI_SNAP();
490491
elog(ERROR, "cannot select from pg_database");
491492
}
492-
elog(LOG, "SCHEDULER: select from PG_DATABASES done");
493493
processed = SPI_processed;
494494
if(processed == 0)
495495
{
@@ -571,6 +571,41 @@ void destroyNodesToSelect(scheduler_nodes_data_t *nodes)
571571
nodes->n = -1;
572572
}
573573

574+
/* Alter system to set some valiable */
575+
576+
void alter_system(char *name, char *value)
577+
{
578+
AlterSystemStmt *stmt;
579+
A_Const *vv;
580+
581+
stmt = makeNode(AlterSystemStmt);
582+
583+
stmt->setstmt = makeNode(VariableSetStmt);
584+
stmt->setstmt->kind = VAR_SET_VALUE;
585+
stmt->setstmt->is_local = false;
586+
stmt->setstmt->name = name;
587+
588+
vv = makeNode(A_Const);
589+
vv->val.type = T_String;
590+
vv->val.val.str = value;
591+
vv->location = -1;
592+
stmt->setstmt->args = makeList1(vv);
593+
594+
AlterSystemSetConfigFile(stmt);
595+
}
596+
597+
/* Sends SIGHUP to postmaster to reload config files */
598+
599+
bool reload_config(void)
600+
{
601+
if(kill(PostmasterPid, SIGHUP))
602+
{
603+
ereport(WARNING, (errmsg("failed to send signal to postmaster: %m")));
604+
return false;
605+
}
606+
return true;
607+
}
608+
574609
void parent_scheduler_main(Datum arg)
575610
{
576611
int rc = 0, i;
@@ -589,19 +624,14 @@ void parent_scheduler_main(Datum arg)
589624
pqsignal(SIGHUP, worker_spi_sighup);
590625
pqsignal(SIGTERM, worker_spi_sigterm);
591626
BackgroundWorkerUnblockSignals();
592-
elog(LOG, "SCHEDULER: going to make connection");
593627

594628
BackgroundWorkerInitializeConnection("postgres", NULL);
595-
elog(LOG, "SCHEDULER: connected");
596629
names = readBasesToCheck();
597-
elog(LOG, "SCHEDULER: init pool");
598630
pool = initSchedulerManagerPool(names);
599-
elog(LOG, "SCHEDULER: init pool initialized");
600631

601632
destroyCharArray(names);
602633

603634
set_supervisor_pgstatus(pool);
604-
elog(LOG, "SCHEDULER: start main cycle");
605635

606636
while(!got_sigterm)
607637
{
@@ -907,4 +937,127 @@ cron_string_to_json_text(PG_FUNCTION_ARGS)
907937
PG_RETURN_NULL();
908938
}
909939

940+
PG_FUNCTION_INFO_V1(scheduler_enable);
941+
Datum
942+
scheduler_enable(PG_FUNCTION_ARGS)
943+
{
944+
if(!superuser())
945+
{
946+
elog(ERROR, "Only superuser allowed to enable pgpro_scheduler");
947+
}
948+
alter_system("schedule.enabled", "on");
949+
950+
PG_RETURN_BOOL(reload_config());
951+
}
952+
953+
PG_FUNCTION_INFO_V1(scheduler_disable);
954+
Datum
955+
scheduler_disable(PG_FUNCTION_ARGS)
956+
{
957+
if(!superuser())
958+
{
959+
elog(ERROR, "Only superuser allowed to disable pgpro_scheduler");
960+
}
961+
alter_system("schedule.enabled", "off");
962+
963+
PG_RETURN_BOOL(reload_config());
964+
}
965+
966+
PG_FUNCTION_INFO_V1(scheduler_start);
967+
Datum
968+
scheduler_start(PG_FUNCTION_ARGS)
969+
{
970+
char *dbname;
971+
char *current;
972+
char *new;
973+
char_array_t *in_use;
974+
int i;
910975

976+
if(!superuser())
977+
{
978+
elog(ERROR, "Only superuser allowed to start pgpro_scheduler");
979+
}
980+
init_worker_mem_ctx("scheduler start");
981+
982+
dbname = get_database_name(MyDatabaseId);
983+
current = (char *)GetConfigOption("schedule.database", false, true);
984+
in_use = _split_string_to_char_array(current, true);
985+
if(in_use == NULL || in_use->n == 0)
986+
{
987+
alter_system("schedule.database", dbname);
988+
}
989+
else
990+
{
991+
for(i=0; i < in_use->n; i++)
992+
{
993+
if(strcmp(dbname, in_use->data[i]) == 0)
994+
{
995+
elog(WARNING, "Database %s already has pgpro_scheduler running", dbname);
996+
drop_worker_context();
997+
PG_RETURN_BOOL(false);
998+
999+
}
1000+
}
1001+
}
1002+
if(!in_use) in_use = makeCharArray();
1003+
pushCharArray(in_use, dbname);
1004+
new = joinCharArray(in_use, ", ");
1005+
1006+
elog(NOTICE, "set: [%d] %s", (int)strlen(new), new);
1007+
alter_system("schedule.database", new);
1008+
1009+
drop_worker_context();
1010+
PG_RETURN_BOOL(reload_config());
1011+
}
1012+
1013+
PG_FUNCTION_INFO_V1(scheduler_stop);
1014+
Datum
1015+
scheduler_stop(PG_FUNCTION_ARGS)
1016+
{
1017+
char *dbname;
1018+
char *current;
1019+
char *new;
1020+
char_array_t *in_use;
1021+
int i;
1022+
bool found = false;
1023+
1024+
if(!superuser())
1025+
{
1026+
elog(ERROR, "Only superuser allowed to stop pgpro_scheduler");
1027+
}
1028+
init_worker_mem_ctx("scheduler stop");
1029+
1030+
dbname = get_database_name(MyDatabaseId);
1031+
current = (char *)GetConfigOption("schedule.database", false, true);
1032+
in_use = _split_string_to_char_array(current, true);
1033+
if(in_use == NULL || in_use->n == 0)
1034+
{
1035+
elog(WARNING, "Database %s is not running pgpro_scheduler", dbname);
1036+
drop_worker_context();
1037+
PG_RETURN_BOOL(false);
1038+
}
1039+
else
1040+
{
1041+
for(i=0; i < in_use->n; i++)
1042+
{
1043+
if(strcmp(dbname, in_use->data[i]) == 0)
1044+
{
1045+
pfree(in_use->data[i]);
1046+
in_use->data[i] = NULL;
1047+
found = true;
1048+
}
1049+
}
1050+
}
1051+
if(found)
1052+
{
1053+
new = joinCharArray(in_use, ", ");
1054+
elog(NOTICE, "set: [%d] %s", (int)strlen(new), new);
1055+
alter_system("schedule.database", new);
1056+
drop_worker_context();
1057+
PG_RETURN_BOOL(reload_config());
1058+
}
1059+
1060+
elog(WARNING, "Database %s is not running pgpro_scheduler", dbname);
1061+
drop_worker_context();
1062+
PG_RETURN_BOOL(false);
1063+
}

0 commit comments

Comments
 (0)