Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9b89a12

Browse files
committed
Continue work on launcher
1 parent 10668a7 commit 9b89a12

File tree

5 files changed

+188
-83
lines changed

5 files changed

+188
-83
lines changed

jsonbd.c

Lines changed: 118 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,47 @@ jsonbd_shmem_size(void)
7474
shm_toc_initialize_estimator(&e);
7575

7676
shm_toc_estimate_chunk(&e, sizeof(jsonbd_shm_hdr));
77-
for (i = 0; i < jsonbd_nworkers; i++)
77+
shm_toc_estimate_chunk(&e, PGSemaphoreShmemSize(1));
78+
79+
for (i = 0; i < MAX_JSONBD_WORKERS; i++)
7880
{
7981
shm_toc_estimate_chunk(&e, sizeof(jsonbd_shm_worker));
8082
shm_toc_estimate_chunk(&e, jsonbd_get_queue_size());
8183
shm_toc_estimate_chunk(&e, jsonbd_get_queue_size());
8284
}
83-
shm_toc_estimate_keys(&e, jsonbd_nworkers * 3 + 1);
85+
86+
/* 3 keys each worker + 3 for header (header itself and two queues) */
87+
shm_toc_estimate_keys(&e, MAX_JSONBD_WORKERS * 3 + 3);
8488
size = shm_toc_estimate(&e);
8589
return size;
8690
}
8791

92+
static void
93+
jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
94+
size_t queue_size)
95+
{
96+
static int mqkey = MAX_JSONBD_WORKERS + 1;
97+
98+
/* each worker will have two mq, for input and output */
99+
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
100+
wd->mqout = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
101+
102+
/* init worker context */
103+
pg_atomic_init_flag(&wd->busy);
104+
wd->proc = NULL;
105+
106+
shm_mq_clean_receiver(wd->mqin);
107+
shm_mq_clean_receiver(wd->mqout);
108+
shm_mq_clean_sender(wd->mqin);
109+
shm_mq_clean_sender(wd->mqout);
110+
111+
if (worker_num)
112+
shm_toc_insert(toc, i + 1, wd);
113+
114+
shm_toc_insert(toc, mqkey++, wd->mqin);
115+
shm_toc_insert(toc, mqkey++, wd->mqout);
116+
}
117+
88118
static void
89119
jsonbd_shmem_startup_hook(void)
90120
{
@@ -102,40 +132,29 @@ jsonbd_shmem_startup_hook(void)
102132
if (!found)
103133
{
104134
int i;
105-
int mqkey;
135+
jsonbd_shm_worker *wd;
136+
size_t queue_size = jsonbd_get_queue_size();
106137

107-
toc = shm_toc_create(JSONBC_SHM_MQ_MAGIC, workers_data, size);
138+
toc = shm_toc_create(JSONBD_SHM_MQ_MAGIC, workers_data, size);
139+
140+
/* initialize header */
108141
hdr = shm_toc_allocate(toc, sizeof(jsonbd_shm_hdr));
109142
hdr->workers_ready = 0;
110-
sem_init(&hdr->workers_sem, 1, jsonbd_nworkers);
111-
shm_toc_insert(toc, 0, hdr);
112-
mqkey = jsonbd_nworkers + 1;
143+
hdr->launcher_sem = PGSemaphoreCreate();
144+
jsonbd_init_worker(toc, &hdr->launcher, sizeof(Oid));
113145

114-
for (i = 0; i < jsonbd_nworkers; i++)
115-
{
116-
size_t queue_size = jsonbd_get_queue_size();
117-
118-
jsonbd_shm_worker *wd = shm_toc_allocate(toc, sizeof(jsonbd_shm_worker));
119-
120-
/* each worker will have two mq, for input and output */
121-
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
122-
wd->mqout = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
146+
for (i = 0; i < MAX_DATABASES; i++)
147+
sem_init(&hdr->workers_sem[i], 1, jsonbd_nworkers);
123148

124-
/* init worker context */
125-
pg_atomic_init_flag(&wd->busy);
126-
wd->proc = NULL;
127-
128-
shm_mq_clean_receiver(wd->mqin);
129-
shm_mq_clean_receiver(wd->mqout);
130-
shm_mq_clean_sender(wd->mqin);
131-
shm_mq_clean_sender(wd->mqout);
149+
shm_toc_insert(toc, 0, hdr);
132150

133-
shm_toc_insert(toc, i + 1, wd);
134-
shm_toc_insert(toc, mqkey++, wd->mqin);
135-
shm_toc_insert(toc, mqkey++, wd->mqout);
151+
for (i = 0; i < MAX_JSONBD_WORKERS; i++)
152+
{
153+
wd = shm_toc_allocate(toc, sizeof(jsonbd_shm_worker));
154+
jsonbd_init_worker(toc, wd, i + 1, queue_size);
136155
}
137156
}
138-
else toc = shm_toc_attach(JSONBC_SHM_MQ_MAGIC, workers_data);
157+
else toc = shm_toc_attach(JSONBD_SHM_MQ_MAGIC, workers_data);
139158

140159
LWLockRelease(AddinShmemInitLock);
141160
}
@@ -159,8 +178,7 @@ _PG_init(void)
159178
{
160179
int i;
161180
RequestAddinShmemSpace(jsonbd_shmem_size());
162-
for (i = 0; i < jsonbd_nworkers; i++)
163-
jsonbd_register_worker(i);
181+
jsonbd_register_worker_launcher();
164182
}
165183
else elog(LOG, "jsonbd: workers are disabled");
166184
}
@@ -174,7 +192,7 @@ setup_guc_variables(void)
174192
&jsonbd_nworkers,
175193
1, /* default */
176194
0, /* if zero then no workers */
177-
MAX_JSONBC_WORKERS,
195+
MAX_JSONBD_WORKERS,
178196
PGC_SUSET,
179197
0,
180198
NULL,
@@ -256,37 +274,90 @@ static void
256274
jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
257275
bool (*callback)(char *, size_t, void *), void *callback_arg)
258276
{
259-
int i;
277+
int i,
278+
j;
260279
bool detached = false;
261280
bool callback_succeded = false;
262281
shm_mq_result resmq;
263282
shm_mq_handle *mqh;
264283
jsonbd_shm_hdr *hdr;
265-
jsonbd_shm_worker *wd;
284+
jsonbd_shm_worker *wd,
285+
*wd_inner;
266286

267287
char *res;
268288
Size reslen;
289+
sem_t *cursem = NULL;
269290

270291
if (jsonbd_nworkers <= 0)
271292
/* TODO: maybe add support of multiple databases for dictionaries */
272293
elog(ERROR, "jsonbd workers are not available");
273294

274295
hdr = shm_toc_lookup(toc, 0, false);
275-
sem_wait(&hdr->workers_sem);
276296

277-
for (i = 0; i < jsonbd_nworkers; i++)
297+
while (cursem == NULL)
278298
{
279-
wd = shm_toc_lookup(toc, i + 1, false);
280-
if (pg_atomic_test_set_flag(&wd->busy))
281-
break;
282-
}
299+
/* find some not busy worker */
300+
for (i = 0; i < hdr->workers_ready; i++)
301+
{
302+
wd = shm_toc_lookup(toc, i + 1, false);
303+
if (wd->dboid != MyDatabaseId)
304+
continue;
305+
306+
/*
307+
* we found first worker for our database, next 'jsonbd_nworkers'
308+
* workers should be ours, and one of them should be free after sem_wait
309+
*/
310+
cursem = wd->dbsem;
311+
sem_wait(cursem);
312+
for (j = i; j < (i + jsonbd_nworkers); j++)
313+
{
314+
wd = shm_toc_lookup(toc, j + 1, false);
315+
Assert(wd->dboid == MyDatabaseId);
316+
if (pg_atomic_test_set_flag(&wd->busy))
317+
goto comm;
318+
}
283319

284-
if (i == jsonbd_nworkers)
285-
{
286-
sem_post(&hdr->workers_sem);
287-
elog(ERROR, "jsonbd: could not make a connection with workers");
320+
/* should never reach here if all worked correctly */
321+
sem_post(cursem);
322+
elog(ERROR, "jsonbd: could not make a connection with workers");
323+
}
324+
325+
/*
326+
* there are no workers for our database,
327+
* so we should launch them using our jsonbd workers launcher
328+
*/
329+
PGSemaphoreLock(hdr->launcher_sem);
330+
shm_mq_set_sender(hdr->launcher.mqin, MyProc);
331+
shm_mq_set_receiver(hdr->launcher.mqout, MyProc);
332+
333+
mqh = shm_mq_attach(hdr->launcher.mqin, NULL, NULL);
334+
resmq = shm_mq_sendv(mqh,
335+
&((shm_mq_iovec) {(char *) &MyDatabaseId, sizeof(MyDatabaseId)}), 1, false);
336+
if (resmq != SHM_MQ_SUCCESS)
337+
detached = true;
338+
shm_mq_detach(mqh);
339+
340+
if (!detached)
341+
{
342+
mqh = shm_mq_attach(hdr->launcher.mqout, NULL, NULL);
343+
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
344+
if (resmq != SHM_MQ_SUCCESS)
345+
detached = true;
346+
347+
shm_mq_detach(mqh);
348+
}
349+
350+
shm_mq_clean_sender(hdr->launcher.mqin);
351+
shm_mq_clean_receiver(hdr->launcher.mqout);
352+
PGSemaphoreUnlock(hdr->launcher_sem);
353+
354+
if (detached)
355+
elog(ERROR, "jsonbd: workers launcher was detached");
288356
}
289357

358+
goto comm:
359+
detached = false;
360+
290361
/* send data */
291362
shm_mq_set_sender(wd->mqin, MyProc);
292363
shm_mq_set_receiver(wd->mqout, MyProc);
@@ -315,7 +386,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
315386
shm_mq_clean_sender(wd->mqin);
316387
shm_mq_clean_receiver(wd->mqout);
317388
pg_atomic_clear_flag(&wd->busy);
318-
sem_post(&hdr->workers_sem);
389+
sem_post(cursem);
319390

320391
if (detached)
321392
elog(ERROR, "jsonbd: worker has detached");
@@ -329,7 +400,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
329400
static void
330401
jsonbd_worker_get_key_ids(Oid cmoptoid, char *buf, int buflen, uint32 *idsbuf, int nkeys)
331402
{
332-
JsonbcCommand cmd = JSONBC_CMD_GET_IDS;
403+
JsonbcCommand cmd = JSONBD_CMD_GET_IDS;
333404
shm_mq_iovec iov[4];
334405
ids_callback_state state;
335406

@@ -354,7 +425,7 @@ jsonbd_worker_get_key_ids(Oid cmoptoid, char *buf, int buflen, uint32 *idsbuf, i
354425
static char *
355426
jsonbd_worker_get_keys(Oid cmoptoid, uint32 *ids, int nkeys, size_t *buflen)
356427
{
357-
JsonbcCommand cmd = JSONBC_CMD_GET_KEYS;
428+
JsonbcCommand cmd = JSONBD_CMD_GET_KEYS;
358429
shm_mq_iovec iov[4];
359430
keys_callback_state state;
360431

@@ -478,7 +549,7 @@ memory_reset_callback(void *arg)
478549
* The result is palloc'd.
479550
* It adds a space for header, that should be filled later.
480551
*/
481-
#ifdef PGPRO_JSONBC
552+
#ifdef PGPRO_JSONBD
482553
static char *
483554
packJsonbValue(JsonbValue *val, int header_size, int *len)
484555
{

jsonbd.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
#include "port/atomics.h"
99
#include "storage/proc.h"
1010
#include "storage/shm_mq.h"
11+
#include "storage/pg_sema.h"
1112

1213
#define JSONBD_SHM_MQ_MAGIC 0xAAAA
13-
#define MAX_JSONBD_WORKERS 10
14-
#define MAX_DATABASES 10
14+
15+
#define MAX_JSONBD_WORKERS_PER_DATABASE 3
16+
#define MAX_DATABASES 10 /* FIXME: need more? */
17+
#define MAX_JSONBD_WORKERS (MAX_DATABASES * MAX_JSONBD_WORKERS_PER_DATABASE)
1518

1619
typedef enum {
1720
JSONBD_CMD_GET_IDS,
@@ -21,8 +24,10 @@ typedef enum {
2124
/* Shared memory structures */
2225
typedef struct jsonbd_shm_hdr
2326
{
24-
sem_t workers_sem[MAX_DATABASES];
25-
volatile int workers_ready;
27+
PGSemaphore launcher_sem;
28+
sem_t workers_sem[MAX_DATABASES];
29+
volatile int workers_ready;
30+
jsonbd_shm_worker launcher;
2631
} jsonbd_shm_hdr;
2732

2833
typedef struct jsonbd_shm_worker
@@ -64,7 +69,7 @@ typedef struct jsonbd_cached_id
6469
/* Worker launch variables */
6570
typedef struct jsonbd_worker_init
6671
{
67-
char dbname[NAMEDATALEN];
72+
Oid dboid;
6873
int shm_key;
6974
} jsonc_worker_init;
7075

jsonbd_utils.c

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,6 @@ struct shm_mq_alt
2323
#error "shm_mq struct in jsonbd is copied from PostgreSQL 11, please correct it according to your version"
2424
#endif
2525

26-
RangeTblEntry *
27-
add_range_table_to_estate(EState *estate, Relation rel)
28-
{
29-
RangeTblEntry *rte = makeNode(RangeTblEntry);
30-
char *refname = RelationGetRelationName(rel);
31-
32-
rte->rtekind = RTE_RELATION;
33-
rte->alias = NULL;
34-
rte->relid = RelationGetRelid(rel);
35-
rte->relkind = rel->rd_rel->relkind;
36-
37-
rte->eref = makeAlias(refname, NIL);
38-
39-
rte->lateral = false;
40-
rte->inh = false;
41-
rte->inFromCl = false;
42-
43-
rte->requiredPerms = ACL_SELECT;
44-
rte->checkAsUser = InvalidOid;
45-
rte->selectedCols = NULL;
46-
rte->insertedCols = NULL;
47-
rte->updatedCols = NULL;
48-
49-
estate->es_range_table = lappend(estate->es_range_table, rte);
50-
51-
return rte;
52-
}
53-
5426
/**
5527
* Get 32-bit Murmur3 hash. Ported from qLibc library.
5628
* Added compability with C99, and postgres code style

jsonbd_utils.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
#ifndef JSONBC_UTILS_H
2-
#define JSONBC_UTILS_H
1+
#ifndef JSONBD_UTILS_H
2+
#define JSONBD_UTILS_H
33

44
#include "postgres.h"
55
#include "nodes/execnodes.h"
66
#include "nodes/parsenodes.h"
77

8-
extern RangeTblEntry * add_range_table_to_estate(EState *estate, Relation rel);
98
extern uint32 qhashmurmur3_32(const void *data, size_t nbytes);
109
extern void shm_mq_clean_receiver(shm_mq *mq);
1110
extern void shm_mq_clean_sender(shm_mq *mq);

0 commit comments

Comments
 (0)