Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4f38b03

Browse files
committed
Use semaphores in backends, fix mq bug
1 parent 8685f7d commit 4f38b03

File tree

3 files changed

+18
-12
lines changed

3 files changed

+18
-12
lines changed

jsonbc.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ jsonbc_shmem_startup_hook(void)
150150
toc = shm_toc_create(JSONBC_SHM_MQ_MAGIC, workers_data, size);
151151
hdr = shm_toc_allocate(toc, sizeof(jsonbc_shm_hdr));
152152
hdr->workers_ready = 0;
153+
sem_init(&hdr->workers_sem, 1, jsonbc_nworkers);
153154
shm_toc_insert(toc, 0, hdr);
154155
mqkey = jsonbc_nworkers + 1;
155156

@@ -322,6 +323,7 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
322323
bool callback_succeded = false;
323324
shm_mq_result resmq;
324325
shm_mq_handle *mqh;
326+
jsonbc_shm_hdr *hdr;
325327

326328
char *res;
327329
Size reslen;
@@ -330,7 +332,9 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
330332
/* TODO: maybe add support of multiple databases for dictionaries */
331333
elog(ERROR, "jsonbc workers are not available");
332334

333-
again:
335+
hdr = shm_toc_lookup(toc, 0, false);
336+
sem_wait(&hdr->workers_sem);
337+
334338
for (i = 0; i < jsonbc_nworkers; i++)
335339
{
336340
jsonbc_shm_worker *wd = shm_toc_lookup(toc, i + 1, false);
@@ -339,6 +343,8 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
339343

340344
/* send data */
341345
shm_mq_set_sender(wd->mqin, MyProc);
346+
shm_mq_set_receiver(wd->mqout, MyProc);
347+
342348
mqh = shm_mq_attach(wd->mqin, NULL, NULL);
343349
resmq = shm_mq_sendv(mqh, iov, iov_len, false);
344350
if (resmq != SHM_MQ_SUCCESS)
@@ -348,7 +354,6 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
348354
/* get data */
349355
if (!detached)
350356
{
351-
shm_mq_set_receiver(wd->mqout, MyProc);
352357
mqh = shm_mq_attach(wd->mqout, NULL, NULL);
353358
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
354359
if (resmq != SHM_MQ_SUCCESS)
@@ -361,6 +366,7 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
361366
}
362367

363368
/* clean self as receiver and unlock mq */
369+
shm_mq_clean_sender(wd->mqin);
364370
shm_mq_clean_receiver(wd->mqout);
365371
pg_atomic_clear_flag(&wd->busy);
366372

@@ -371,14 +377,10 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
371377
elog(ERROR, "jsonbc: communication error");
372378

373379
/* we're done here */
374-
return;
380+
break;
375381
}
376382

377-
CHECK_FOR_INTERRUPTS();
378-
pg_usleep(100);
379-
380-
/* TODO: add attempts count check */
381-
goto again;
383+
sem_post(&hdr->workers_sem);
382384
}
383385

384386
/* Get key IDs using workers */

jsonbc.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define JSONBC_H
33

44
#include <postgres.h>
5+
#include <semaphore.h>
56

67
#include "port/atomics.h"
78
#include "storage/proc.h"
@@ -17,7 +18,8 @@ typedef enum {
1718

1819
typedef struct jsonbc_shm_hdr
1920
{
20-
volatile int workers_ready;
21+
sem_t workers_sem;
22+
volatile int workers_ready;
2123
} jsonbc_shm_hdr;
2224

2325
typedef struct jsonbc_shm_worker

jsonbc_worker.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
250250
if (SPI_exec(sql, 0) != SPI_OK_SELECT)
251251
elog(ERROR, "SPI_exec failed");
252252

253+
pfree(sql);
254+
253255
if (SPI_processed == 0)
254256
{
255257
char *sql2 = psprintf("with t as (select (coalesce(max(id), 0) + 1) new_id from "
@@ -259,6 +261,8 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
259261

260262
if (SPI_exec(sql2, 0) != SPI_OK_INSERT_RETURNING)
261263
elog(ERROR, "SPI_exec failed");
264+
265+
pfree(sql2);
262266
}
263267

264268
datum = SPI_getbinval(SPI_tuptable->vals[0],
@@ -275,7 +279,6 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
275279
buf++;
276280

277281
buf++;
278-
pfree(sql);
279282
}
280283
SPI_finish();
281284
relation_close(rel, ShareLock);
@@ -382,7 +385,7 @@ worker_main(Datum arg)
382385
JsonbcCommand cmd;
383386
Oid cmoptoid;
384387
shm_mq_iovec iov;
385-
char *ptr = data;
388+
char *ptr = data;
386389
int nkeys = *((int *) ptr);
387390

388391
ptr += sizeof(int);
@@ -409,7 +412,6 @@ worker_main(Datum arg)
409412
}
410413

411414
shm_mq_detach(mqh);
412-
shm_mq_clean_sender(worker_state->mqin);
413415

414416
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
415417
resmq = shm_mq_sendv(mqh, &iov, 1, false);

0 commit comments

Comments
 (0)