Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8685f7d

Browse files
committed
Fix creating of the dictionary
1 parent e8a7579 commit 8685f7d

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

jsonbc.c

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,16 @@ static void encode_varbyte(uint32 val, unsigned char *ptr, int *len);
7171
static uint32 decode_varbyte(unsigned char *ptr);
7272
static char *packJsonbValue(JsonbValue *val, int header_size, int *len);
7373
static void setup_guc_variables(void);
74-
static void shm_mq_clean_sender(shm_mq *mq);
75-
static void shm_mq_clean_receiver(shm_mq *mq);
7674
static void jsonbc_get_keys(Oid cmoptoid, uint32 *ids, int nkeys, char **keys);
7775
static void jsonbc_get_key_ids(Oid cmoptoid, char *buf, int buflen, uint32 *idsbuf, int nkeys);
7876

79-
static inline Size
77+
static size_t
8078
jsonbc_get_queue_size(void)
8179
{
8280
return (Size) (jsonbc_queue_size * 1024);
8381
}
8482

85-
static void
83+
void
8684
shm_mq_clean_sender(shm_mq *mq)
8785
{
8886
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
@@ -95,7 +93,7 @@ shm_mq_clean_sender(shm_mq *mq)
9593
amq->mq_detached = false;
9694
}
9795

98-
static void
96+
void
9997
shm_mq_clean_receiver(shm_mq *mq)
10098
{
10199
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
@@ -347,12 +345,6 @@ jsonbc_communicate(shm_mq_iovec *iov, int iov_len,
347345
detached = true;
348346
shm_mq_detach(mqh);
349347

350-
/*
351-
* we need to clean sender before receiving, because we need to block
352-
* worker after we received data
353-
*/
354-
shm_mq_clean_sender(wd->mqin);
355-
356348
/* get data */
357349
if (!detached)
358350
{

jsonbc.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,7 @@ extern int jsonbc_nworkers;
3636
extern int jsonbc_cache_size;
3737
extern int jsonbc_queue_size;
3838

39+
extern void shm_mq_clean_receiver(shm_mq *mq);
40+
extern void shm_mq_clean_sender(shm_mq *mq);
41+
3942
#endif

jsonbc_worker.c

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "access/sysattr.h"
1313
#include "catalog/pg_extension.h"
1414
#include "catalog/indexing.h"
15+
#include "catalog/namespace.h"
1516
#include "commands/extension.h"
1617
#include "executor/spi.h"
1718
#include "port/atomics.h"
@@ -154,6 +155,8 @@ jsonbc_get_keys_slow(Oid cmoptoid, uint32 *ids, int nkeys, size_t *reslen)
154155
Relation rel,
155156
idxrel;
156157

158+
start_xact_command();
159+
157160
rel = relation_open(relid, AccessShareLock);
158161
idxrel = index_open(jsonbc_id_indoid, AccessShareLock);
159162

@@ -211,6 +214,8 @@ jsonbc_get_keys_slow(Oid cmoptoid, uint32 *ids, int nkeys, size_t *reslen)
211214
index_close(idxrel, AccessShareLock);
212215
relation_close(rel, AccessShareLock);
213216

217+
finish_xact_command();
218+
214219
*reslen = keyptr - keys;
215220
return keys;
216221
}
@@ -226,8 +231,8 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
226231

227232
int i;
228233
Oid relid = jsonbc_get_dictionary_relid();
229-
char *nspc = get_namespace_name(get_extension_schema());
230234

235+
start_xact_command();
231236
rel = relation_open(relid, ShareLock);
232237

233238
if (SPI_connect() != SPI_OK_CONNECT)
@@ -239,18 +244,18 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
239244
bool isnull;
240245
char *sql;
241246

242-
sql = psprintf("SELECT id FROM %s.jsonbc_dictionary WHERE cmopt = %d"
243-
" AND key = '%s'", nspc, cmoptoid, buf);
247+
sql = psprintf("SELECT id FROM public.jsonbc_dictionary WHERE cmopt = %d"
248+
" AND key = '%s'", cmoptoid, buf);
244249

245250
if (SPI_exec(sql, 0) != SPI_OK_SELECT)
246251
elog(ERROR, "SPI_exec failed");
247252

248253
if (SPI_processed == 0)
249254
{
250255
char *sql2 = psprintf("with t as (select (coalesce(max(id), 0) + 1) new_id from "
251-
"%s.jsonbc_dictionary where cmopt = %d) insert into %s.jsonbc_dictionary"
256+
"public.jsonbc_dictionary where cmopt = %d) insert into public.jsonbc_dictionary"
252257
" select %d, t.new_id, '%s' from t returning id",
253-
nspc, cmoptoid, nspc, cmoptoid, buf);
258+
cmoptoid, cmoptoid, buf);
254259

255260
if (SPI_exec(sql2, 0) != SPI_OK_INSERT_RETURNING)
256261
elog(ERROR, "SPI_exec failed");
@@ -274,6 +279,7 @@ jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
274279
}
275280
SPI_finish();
276281
relation_close(rel, ShareLock);
282+
finish_xact_command();
277283
}
278284

279285
static char *
@@ -296,7 +302,7 @@ jsonbc_cmd_get_ids(int nkeys, Oid cmoptoid, char *buf, size_t *buflen)
296302
ErrorData *error;
297303
MemoryContextSwitchTo(old_mcxt);
298304
error = CopyErrorData();
299-
elog(LOG, "jsonbc: error occured %s", error->message);
305+
elog(LOG, "jsonbc: error occured: %s", error->message);
300306
FlushErrorState();
301307
pfree(error);
302308

@@ -316,16 +322,14 @@ jsonbc_cmd_get_keys(int nkeys, Oid cmoptoid, uint32 *ids, size_t *reslen)
316322

317323
PG_TRY();
318324
{
319-
start_xact_command();
320325
keys = jsonbc_get_keys_slow(cmoptoid, ids, nkeys, reslen);
321-
finish_xact_command();
322326
}
323327
PG_CATCH();
324328
{
325329
ErrorData *error;
326330
MemoryContextSwitchTo(old_mcxt);
327331
error = CopyErrorData();
328-
elog(LOG, "jsonbc: error occured %s", error->message);
332+
elog(LOG, "jsonbc: error occured: %s", error->message);
329333
FlushErrorState();
330334
pfree(error);
331335
}
@@ -399,11 +403,13 @@ worker_main(Datum arg)
399403
iov.data = "";
400404
iov.len = 1;
401405
}
406+
break;
402407
default:
403408
elog(NOTICE, "jsonbc: got unknown command");
404409
}
405410

406411
shm_mq_detach(mqh);
412+
shm_mq_clean_sender(worker_state->mqin);
407413

408414
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
409415
resmq = shm_mq_sendv(mqh, &iov, 1, false);
@@ -436,24 +442,32 @@ worker_main(Datum arg)
436442
static Oid
437443
jsonbc_get_dictionary_relid(void)
438444
{
439-
Oid relid;
445+
Oid relid,
446+
nspoid;
440447

441448
if (OidIsValid(jsonbc_dictionary_reloid))
442449
return jsonbc_dictionary_reloid;
443450

444-
relid = get_relname_relid(JSONBC_DICTIONARY_REL, InvalidOid);
451+
start_xact_command();
452+
453+
nspoid = get_namespace_oid("public", false);
454+
relid = get_relname_relid(JSONBC_DICTIONARY_REL, nspoid);
445455
if (relid == InvalidOid)
446456
{
447457
if (SPI_connect() != SPI_OK_CONNECT)
448458
elog(ERROR, "SPI_connect failed");
449459

450-
if (SPI_exec(sql_dictionary, 0) != SPI_OK_UTILITY)
460+
if (SPI_execute(sql_dictionary, false, 0) != SPI_OK_UTILITY)
451461
elog(ERROR, "could not create \"jsonbc\" dictionary");
452462

453463
SPI_finish();
464+
CommandCounterIncrement();
465+
466+
finish_xact_command();
467+
start_xact_command();
454468

455469
/* get just created table Oid */
456-
relid = get_relname_relid(JSONBC_DICTIONARY_REL, InvalidOid);
470+
relid = get_relname_relid(JSONBC_DICTIONARY_REL, nspoid);
457471
jsonbc_id_indoid = InvalidOid;
458472
jsonbc_keys_indoid = InvalidOid;
459473
}
@@ -487,6 +501,8 @@ jsonbc_get_dictionary_relid(void)
487501
relation_close(rel, NoLock);
488502
}
489503

504+
finish_xact_command();
505+
490506
/* check we did fill global variables */
491507
Assert(OidIsValid(jsonbc_id_indoid));
492508
Assert(OidIsValid(jsonbc_keys_indoid));

0 commit comments

Comments
 (0)