Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bcb5972

Browse files
committed
Refactor shm handlers
1 parent 4dc5801 commit bcb5972

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

jsonbc.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,12 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
342342
detached = true;
343343
shm_mq_detach(mqh);
344344

345+
/*
346+
* we need to clean sender before receiving, because we need to block
347+
* worker after we received data
348+
*/
349+
shm_mq_clean_sender(wd->mqin);
350+
345351
/* get data */
346352
if (!detached)
347353
{
@@ -357,8 +363,7 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
357363
shm_mq_detach(mqh);
358364
}
359365

360-
/* clean and unlock mq */
361-
shm_mq_clean_sender(wd->mqin);
366+
/* clean self as receiver and unlock mq */
362367
shm_mq_clean_receiver(wd->mqout);
363368
pg_atomic_clear_flag(&wd->busy);
364369

jsonbc_worker.c

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "postgres.h"
44
#include "fmgr.h"
5+
#include "miscadmin.h"
56
#include "pgstat.h"
67

78
#include "access/genam.h"
@@ -30,8 +31,6 @@
3031
static bool xact_started = false;
3132
static bool shutdown_requested = false;
3233
static jsonbc_shm_worker *worker_state;
33-
static shm_mq_handle *worker_mq_handle_in;
34-
static shm_mq_handle *worker_mq_handle_out;
3534

3635
Oid jsonbc_dictionary_reloid = InvalidOid;
3736
Oid jsonbc_keys_indoid = InvalidOid;
@@ -82,16 +81,15 @@ init_local_variables(int worker_num)
8281

8382
/* input mq */
8483
shm_mq_set_receiver(worker_state->mqin, MyProc);
85-
worker_mq_handle_in = shm_mq_attach(worker_state->mqin, NULL, NULL);
8684

8785
/* output mq */
8886
shm_mq_set_sender(worker_state->mqout, MyProc);
89-
worker_mq_handle_out = shm_mq_attach(worker_state->mqout, NULL, NULL);
9087

9188
/* not busy at start */
9289
pg_atomic_clear_flag(&worker_state->busy);
9390

94-
elog(LOG, "jsonbc dictionary worker %d started", worker_num + 1);
91+
elog(LOG, "jsonbc dictionary worker %d started with pid: %d",
92+
worker_num + 1, MyProcPid);
9593
}
9694

9795
static void
@@ -224,9 +222,14 @@ jsonbc_get_keys_slow(Oid cmoptoid, uint32 *ids, int nkeys, size_t *reslen)
224222
static void
225223
jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
226224
{
225+
Relation rel;
226+
227227
int i;
228+
Oid relid = jsonbc_get_dictionary_relid();
228229
char *nspc = get_namespace_name(get_extension_schema());
229230

231+
rel = relation_open(relid, ShareLock);
232+
230233
if (SPI_connect() != SPI_OK_CONNECT)
231234
elog(ERROR, "SPI_connect failed");
232235

@@ -270,6 +273,7 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
270273
pfree(sql);
271274
}
272275
SPI_finish();
276+
relation_close(rel, ShareLock);
273277
}
274278

275279
static char *
@@ -361,8 +365,12 @@ worker_main(Datum arg)
361365
Size nbytes;
362366
void *data;
363367

364-
shm_mq_result resmq = shm_mq_receive(worker_mq_handle_in, &nbytes,
365-
&data, true);
368+
shm_mq_handle *mqh;
369+
shm_mq_result resmq;
370+
371+
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
372+
resmq = shm_mq_receive(mqh, &nbytes, &data, true);
373+
366374
if (resmq == SHM_MQ_SUCCESS)
367375
{
368376
JsonbcCommand cmd;
@@ -393,10 +401,14 @@ worker_main(Datum arg)
393401
elog(NOTICE, "jsonbc: got unknown command");
394402
}
395403

396-
resmq = shm_mq_sendv(worker_mq_handle_out, &iov, 1, false);
404+
shm_mq_detach(mqh);
405+
406+
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
407+
resmq = shm_mq_sendv(mqh, &iov, 1, false);
397408
if (resmq != SHM_MQ_SUCCESS)
398409
elog(NOTICE, "jsonbc: backend detached early");
399410

411+
shm_mq_detach(mqh);
400412
pfree((void *) iov.data);
401413
}
402414

@@ -412,8 +424,6 @@ worker_main(Datum arg)
412424
ResetLatch(&MyProc->procLatch);
413425
}
414426

415-
shm_mq_detach(worker_mq_handle_in);
416-
shm_mq_detach(worker_mq_handle_out);
417427
elog(LOG, "jsonbc dictionary worker has ended its work");
418428
proc_exit(0);
419429
}

0 commit comments

Comments
 (0)