Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1519b41

Browse files
committed
Fix working with indexes
1 parent 2e0f308 commit 1519b41

File tree

2 files changed

+45
-44
lines changed

2 files changed

+45
-44
lines changed

jsonbc.c

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
276276
shm_mq_result resmq;
277277
shm_mq_handle *mqh;
278278
jsonbc_shm_hdr *hdr;
279+
jsonbc_shm_worker *wd;
279280

280281
char *res;
281282
Size reslen;
@@ -289,50 +290,53 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
289290

290291
for (i = 0; i < jsonbc_nworkers; i++)
291292
{
292-
jsonbc_shm_worker *wd = shm_toc_lookup(toc, i + 1, false);
293-
if (!pg_atomic_test_set_flag(&wd->busy))
294-
continue;
293+
wd = shm_toc_lookup(toc, i + 1, false);
294+
if (pg_atomic_test_set_flag(&wd->busy))
295+
break;
296+
}
297+
298+
if (i == jsonbc_nworkers)
299+
{
300+
sem_post(&hdr->workers_sem);
301+
elog(ERROR, "jsonbc: could not make a connection with workers");
302+
}
295303

296-
/* send data */
297-
shm_mq_set_sender(wd->mqin, MyProc);
298-
shm_mq_set_receiver(wd->mqout, MyProc);
304+
/* send data */
305+
shm_mq_set_sender(wd->mqin, MyProc);
306+
shm_mq_set_receiver(wd->mqout, MyProc);
299307

300-
mqh = shm_mq_attach(wd->mqin, NULL, NULL);
301-
resmq = shm_mq_sendv(mqh, iov, iov_len, false);
308+
mqh = shm_mq_attach(wd->mqin, NULL, NULL);
309+
resmq = shm_mq_sendv(mqh, iov, iov_len, false);
310+
if (resmq != SHM_MQ_SUCCESS)
311+
detached = true;
312+
shm_mq_detach(mqh);
313+
314+
/* get data */
315+
if (!detached)
316+
{
317+
mqh = shm_mq_attach(wd->mqout, NULL, NULL);
318+
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
302319
if (resmq != SHM_MQ_SUCCESS)
303320
detached = true;
304-
shm_mq_detach(mqh);
305321

306-
/* get data */
307322
if (!detached)
308-
{
309-
mqh = shm_mq_attach(wd->mqout, NULL, NULL);
310-
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
311-
if (resmq != SHM_MQ_SUCCESS)
312-
detached = true;
323+
callback_succeded = callback(res, reslen, callback_arg);
313324

314-
if (!detached)
315-
callback_succeded = callback(res, reslen, callback_arg);
316-
317-
shm_mq_detach(mqh);
318-
}
319-
320-
/* clean self as receiver and unlock mq */
321-
shm_mq_clean_sender(wd->mqin);
322-
shm_mq_clean_receiver(wd->mqout);
323-
pg_atomic_clear_flag(&wd->busy);
325+
shm_mq_detach(mqh);
326+
}
324327

325-
if (detached)
326-
elog(ERROR, "jsonbc: worker has detached");
328+
/* clean self as receiver and unlock mq */
329+
shm_mq_clean_sender(wd->mqin);
330+
shm_mq_clean_receiver(wd->mqout);
331+
pg_atomic_clear_flag(&wd->busy);
332+
sem_post(&hdr->workers_sem);
327333

328-
if (!callback_succeded)
329-
elog(ERROR, "jsonbc: communication error");
334+
if (detached)
335+
elog(ERROR, "jsonbc: worker has detached");
330336

331-
/* we're done here */
332-
break;
333-
}
337+
if (!callback_succeded)
338+
elog(ERROR, "jsonbc: communication error");
334339

335-
sem_post(&hdr->workers_sem);
336340
}
337341

338342
/* Get key IDs using workers */

jsonbc_worker.c

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,12 @@ jsonbc_get_key(Relation rel, Relation indrel, Oid cmoptoid, uint32 key_id)
225225
char *res;
226226

227227
ScanKeyInit(&skey[0],
228-
JSONBC_DICTIONARY_REL_ATT_CMOPT,
228+
1,
229229
BTEqualStrategyNumber,
230230
F_OIDEQ,
231231
ObjectIdGetDatum(cmoptoid));
232232
ScanKeyInit(&skey[1],
233-
JSONBC_DICTIONARY_REL_ATT_ID,
233+
2,
234234
BTEqualStrategyNumber,
235235
F_INT4EQ,
236236
Int32GetDatum(key_id));
@@ -327,12 +327,12 @@ jsonbc_get_key_id(Relation rel, Relation indrel, Oid cmoptoid, char *key)
327327
uint32 result = 0;
328328

329329
ScanKeyInit(&skey[0],
330-
JSONBC_DICTIONARY_REL_ATT_CMOPT,
330+
1,
331331
BTEqualStrategyNumber,
332332
F_OIDEQ,
333333
ObjectIdGetDatum(cmoptoid));
334334
ScanKeyInit(&skey[1],
335-
JSONBC_DICTIONARY_REL_ATT_KEY,
335+
2,
336336
BTEqualStrategyNumber,
337337
F_TEXTEQ,
338338
CStringGetTextDatum(key));
@@ -360,6 +360,7 @@ static void
360360
jsonbc_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
361361
{
362362
Relation rel = NULL;
363+
Relation indrel;
363364
int i;
364365
Oid relid = jsonbc_get_dictionary_relid();
365366
bool spi_on = false;
@@ -371,7 +372,6 @@ jsonbc_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
371372
{
372373
uint32 hkey;
373374
bool found;
374-
Relation indrel;
375375
MemoryContext oldcontext;
376376
jsonbc_cached_key *ckey;
377377
jsonbc_pair *pair;
@@ -409,9 +409,9 @@ jsonbc_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
409409
{
410410
start_xact_command();
411411
rel = relation_open(relid, AccessShareLock);
412+
indrel = index_open(jsonbc_keys_indoid, AccessShareLock);
412413
}
413414

414-
indrel = index_open(jsonbc_keys_indoid, AccessShareLock);
415415
idsbuf[i] = jsonbc_get_key_id(rel, indrel, cmoptoid, buf);
416416

417417
if (idsbuf[i] == 0)
@@ -452,24 +452,21 @@ jsonbc_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
452452
}
453453
index_close(indrel2, ExclusiveLock);
454454
}
455-
index_close(indrel, AccessShareLock);
456455

457456
pair->id = idsbuf[i];
458457
next:
459458
Assert(idsbuf[i] > 0);
460459

461460
/* move to next key */
462-
while (*buf != '\0')
463-
buf++;
464-
465-
buf++;
461+
while (*buf++ != '\0');
466462
}
467463

468464
if (spi_on)
469465
SPI_finish();
470466

471467
if (rel)
472468
{
469+
index_close(indrel, AccessShareLock);
473470
relation_close(rel, AccessShareLock);
474471
finish_xact_command();
475472
}

0 commit comments

Comments
 (0)