Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 66d56a9

Browse files
committed
fixup! Fix compilation
1 parent 1fffd85 commit 66d56a9

File tree

3 files changed

+67
-34
lines changed

3 files changed

+67
-34
lines changed

jsonbd.c

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static uint32 decode_varbyte(unsigned char *ptr);
6060
static size_t
6161
jsonbd_get_queue_size(void)
6262
{
63-
return (Size) (jsonbd_queue_size * 1024);
63+
return (Size) (shm_mq_minimum_size + jsonbd_queue_size * 1024);
6464
}
6565

6666
static size_t
@@ -74,7 +74,9 @@ jsonbd_shmem_size(void)
7474
shm_toc_initialize_estimator(&e);
7575

7676
shm_toc_estimate_chunk(&e, sizeof(jsonbd_shm_hdr));
77-
shm_toc_estimate_chunk(&e, PGSemaphoreShmemSize(1));
77+
78+
/* two queues for launcher */
79+
shm_toc_estimate_chunk(&e, shm_mq_minimum_size * 2);
7880

7981
for (i = 0; i < MAX_JSONBD_WORKERS; i++)
8082
{
@@ -83,8 +85,8 @@ jsonbd_shmem_size(void)
8385
shm_toc_estimate_chunk(&e, jsonbd_get_queue_size());
8486
}
8587

86-
/* 3 keys each worker + 3 for header (header itself, launcher and its two queues) */
87-
shm_toc_estimate_keys(&e, MAX_JSONBD_WORKERS * 3 + 4);
88+
/* 3 keys each worker + 3 for header (header itself and two queues) */
89+
shm_toc_estimate_keys(&e, MAX_JSONBD_WORKERS * 3 + 3);
8890
size = shm_toc_estimate(&e);
8991
return size;
9092
}
@@ -95,14 +97,13 @@ jsonbd_shmem_size(void)
9597
* About keys in toc:
9698
* 0 - for header
9799
* 1..MAX_JSONBD_WORKERS - workers
98-
* MAX_JSONBD_WORKERS + 1 - launcher
99-
* MAX_JSONBD_WORKERS + 2 .. - queues
100+
* MAX_JSONBD_WORKERS + 1 .. - queues
100101
*/
101102
static void
102103
jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
103104
size_t queue_size)
104105
{
105-
static int mqkey = MAX_JSONBD_WORKERS + 2;
106+
static int mqkey = MAX_JSONBD_WORKERS + 1;
106107

107108
/* each worker will have two mq, for input and output */
108109
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
@@ -146,15 +147,11 @@ jsonbd_shmem_startup_hook(void)
146147

147148
toc = shm_toc_create(JSONBD_SHM_MQ_MAGIC, workers_data, size);
148149

149-
/* initialize header */
150+
/* Initialize header */
150151
hdr = shm_toc_allocate(toc, sizeof(jsonbd_shm_hdr));
151152
hdr->workers_ready = 0;
152-
hdr->launcher_sem = PGSemaphoreCreate();
153-
jsonbd_init_worker(toc, &hdr->launcher, MAX_JSONBD_WORKERS + 1, sizeof(Oid));
154-
155-
for (i = 0; i < MAX_DATABASES; i++)
156-
sem_init(&hdr->workers_sem[i], 1, jsonbd_nworkers);
157-
153+
sem_init(&hdr->launcher_sem, 1, 1);
154+
jsonbd_init_worker(toc, &hdr->launcher, 0, shm_mq_minimum_size);
158155
shm_toc_insert(toc, 0, hdr);
159156

160157
for (i = 0; i < MAX_JSONBD_WORKERS; i++)
@@ -285,6 +282,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
285282
int i,
286283
j;
287284
bool detached = false;
285+
bool launch_failed = false;
288286
bool callback_succeded = false;
289287
shm_mq_result resmq;
290288
shm_mq_handle *mqh;
@@ -321,8 +319,8 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
321319
wd = shm_toc_lookup(toc, j + 1, false);
322320

323321
if (wd->dboid != MyDatabaseId)
324-
/* somehow not all workers started for this database */
325-
break;
322+
/* somehow not all workers started for this database, try next */
323+
continue;
326324

327325
if (pg_atomic_test_set_flag(&wd->busy))
328326
goto comm;
@@ -337,7 +335,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
337335
* there are no workers for our database,
338336
* so we should launch them using our jsonbd workers launcher
339337
*/
340-
PGSemaphoreLock(hdr->launcher_sem);
338+
sem_wait(&hdr->launcher_sem);
341339
shm_mq_set_sender(hdr->launcher.mqin, MyProc);
342340
shm_mq_set_receiver(hdr->launcher.mqout, MyProc);
343341

@@ -355,15 +353,21 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
355353
if (resmq != SHM_MQ_SUCCESS)
356354
detached = true;
357355

356+
if (reslen != 2 || res[0] == 'n')
357+
launch_failed = true;
358+
358359
shm_mq_detach(mqh);
359360
}
360361

361362
shm_mq_clean_sender(hdr->launcher.mqin);
362363
shm_mq_clean_receiver(hdr->launcher.mqout);
363-
PGSemaphoreUnlock(hdr->launcher_sem);
364+
sem_post(&hdr->launcher_sem);
364365

365366
if (detached)
366367
elog(ERROR, "jsonbd: workers launcher was detached");
368+
369+
if (launch_failed)
370+
elog(ERROR, "jsonbd: could not launch dictionary worker, see logs");
367371
}
368372

369373
comm:

jsonbd.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include "port/atomics.h"
99
#include "storage/proc.h"
1010
#include "storage/shm_mq.h"
11-
#include "storage/pg_sema.h"
1211

1312
#define JSONBD_SHM_MQ_MAGIC 0xAAAA
1413

@@ -34,7 +33,7 @@ typedef struct jsonbd_shm_worker
3433
/* Shared memory structures */
3534
typedef struct jsonbd_shm_hdr
3635
{
37-
PGSemaphore launcher_sem;
36+
sem_t launcher_sem;
3837
sem_t workers_sem[MAX_DATABASES];
3938
volatile int workers_ready;
4039
jsonbd_shm_worker launcher;

jsonbd_worker.c

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "catalog/pg_compression_opt.h"
1717
#include "catalog/pg_extension.h"
1818
#include "commands/extension.h"
19+
#include "commands/dbcommands.h"
1920
#include "executor/spi.h"
2021
#include "port/atomics.h"
2122
#include "storage/ipc.h"
@@ -525,6 +526,7 @@ jsonbd_worker_launcher(void)
525526

526527
shm_mq_handle *mqh = NULL;
527528
int worker_num = 1;
529+
int database_num = 0;
528530

529531
/* Establish signal handlers before unblocking signals */
530532
pqsignal(SIGTERM, handle_sigterm);
@@ -544,6 +546,8 @@ jsonbd_worker_launcher(void)
544546
shm_mq_set_receiver(worker_state->mqin, MyProc);
545547
shm_mq_set_sender(worker_state->mqout, MyProc);
546548

549+
elog(LOG, "jsonbd launcher started with pid: %d", MyProcPid);
550+
547551
while (true)
548552
{
549553
int rc;
@@ -559,28 +563,42 @@ jsonbd_worker_launcher(void)
559563

560564
if (resmq == SHM_MQ_SUCCESS)
561565
{
562-
bool success = true;
566+
int started = 0;
563567
int i;
564568
Oid dboid;
565569

566-
Assert(nbytes == sizeof(Oid));
567-
dboid = *((Oid *) data);
568-
569-
/* start workers for specified database */
570-
for (i=0; i < jsonbd_nworkers; i++)
570+
if (database_num >= MAX_DATABASES)
571+
elog(NOTICE, "jsonbd: reached maximum count of supported databases");
572+
else
571573
{
572-
bool res;
573-
res = jsonbd_register_worker(worker_num++, dboid);
574-
if (!res)
575-
success = false;
574+
Assert(nbytes == sizeof(Oid));
575+
dboid = *((Oid *) data);
576+
577+
/* start workers for specified database */
578+
for (i=0; i < jsonbd_nworkers; i++)
579+
{
580+
bool res;
581+
res = jsonbd_register_worker(worker_num++, dboid);
582+
if (res)
583+
started++;
584+
}
576585
}
577586

578587
shm_mq_detach(mqh);
579588
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
580-
if (success)
581-
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"ok", 3}), 1, false);
589+
if (started)
590+
{
591+
if (started != jsonbd_nworkers)
592+
elog(NOTICE, "jsonbd: not all workers for %d has started", dboid);
593+
594+
/* semaphore value should be equal to workers count for database */
595+
sem_init(&hdr->workers_sem[database_num], 1, started);
596+
597+
/* we report ok if only one worker has started */
598+
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"y", 2}), 1, false);
599+
}
582600
else
583-
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"fail", 5}), 1, false);
601+
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"n", 2}), 1, false);
584602

585603
if (resmq != SHM_MQ_SUCCESS)
586604
elog(NOTICE, "jsonbd: backend detached early");
@@ -754,6 +772,12 @@ jsonbd_register_worker(int worker_num, Oid dboid)
754772
BackgroundWorkerHandle *bgw_handle;
755773
jsonbd_worker_args *worker_args;
756774

775+
if (worker_num > MAX_JSONBD_WORKERS)
776+
{
777+
elog(LOG, "Reached maximum count of jsonbd dictionary workers");
778+
return false;
779+
}
780+
757781
/* Initialize DSM segment */
758782
seg = dsm_create(sizeof(jsonbd_worker_args), 0);
759783
worker_args = (jsonbd_worker_args *) dsm_segment_address(seg);
@@ -767,7 +791,13 @@ jsonbd_register_worker(int worker_num, Oid dboid)
767791
worker.bgw_notify_pid = MyProcPid;
768792
memcpy(worker.bgw_library_name, "jsonbd", BGW_MAXLEN);
769793
memcpy(worker.bgw_function_name, CppAsString(jsonbd_worker_main), BGW_MAXLEN);
770-
snprintf(worker.bgw_name, BGW_MAXLEN, "jsonbd dictionary worker %d", worker_num);
794+
795+
/* we need transaction to access syscache */
796+
start_xact_command();
797+
snprintf(worker.bgw_name, BGW_MAXLEN, "jsonbd, worker %d, db: \"%s\"",
798+
worker_num, get_database_name(dboid));
799+
finish_xact_command();
800+
771801
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
772802

773803
/* Start dynamic worker */

0 commit comments

Comments
 (0)