Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 273382f

Browse files
committed
Fix bug in launcher
1 parent defb236 commit 273382f

File tree

6 files changed

+86
-46
lines changed

6 files changed

+86
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
regression.diffs
77
regression.out
88
*.plist
9+
tests.log

jsonbd.c

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,16 @@ static void
103103
jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
104104
size_t queue_size)
105105
{
106+
LWLockPadded *locks;
106107
static int mqkey = MAX_JSONBD_WORKERS + 1;
107108

108109
/* each worker will have two mq, for input and output */
109110
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
110111
wd->mqout = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
111112

112113
/* init worker context */
113-
pg_atomic_init_flag(&wd->busy);
114114
wd->proc = NULL;
115115
wd->dboid = InvalidOid;
116-
wd->dbsem = NULL;
117116

118117
shm_mq_clean_receiver(wd->mqin);
119118
shm_mq_clean_receiver(wd->mqout);
@@ -125,6 +124,10 @@ jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
125124

126125
shm_toc_insert(toc, mqkey++, wd->mqin);
127126
shm_toc_insert(toc, mqkey++, wd->mqout);
127+
128+
/* initialize worker's lwlock */
129+
locks = GetNamedLWLockTranche(JSONBD_LWLOCKS_TRANCHE);
130+
wd->lock = &locks[worker_num].lock;
128131
}
129132

130133
static void
@@ -152,7 +155,6 @@ jsonbd_shmem_startup_hook(void)
152155
/* Initialize header */
153156
hdr = shm_toc_allocate(toc, sizeof(jsonbd_shm_hdr));
154157
hdr->workers_ready = 0;
155-
sem_init(&hdr->launcher_sem, 1, 1);
156158
jsonbd_init_worker(toc, &hdr->launcher, 0, shm_mq_minimum_size);
157159
shm_toc_insert(toc, 0, hdr);
158160

@@ -184,6 +186,8 @@ _PG_init(void)
184186

185187
if (jsonbd_nworkers)
186188
{
189+
/* jsonbd workers and one lwlock for launcher */
190+
RequestNamedLWLockTranche(JSONBD_LWLOCKS_TRANCHE, MAX_JSONBD_WORKERS + 1);
187191
RequestAddinShmemSpace(jsonbd_shmem_size());
188192
jsonbd_register_launcher();
189193
}
@@ -289,36 +293,39 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
289293
shm_mq_result resmq;
290294
shm_mq_handle *mqh;
291295
jsonbd_shm_hdr *hdr;
292-
jsonbd_shm_worker *wd;
296+
jsonbd_shm_worker *wd = NULL;
293297

294298
char *res;
295299
Size reslen;
296-
sem_t *cursem = NULL;
297300

298301
if (jsonbd_nworkers <= 0)
299302
elog(ERROR, "jsonbd workers are not available");
300303

301304
hdr = shm_toc_lookup(toc, 0, false);
302305

303-
while (cursem == NULL)
306+
/*
307+
* find some not busy worker,
308+
* the backend can intercept a worker that just started by another
309+
* backend, that's ok
310+
*/
311+
while (true)
304312
{
305-
/*
306-
* find some not busy worker,
307-
* the backend can intercept a worker that just started by another
308-
* backend, that's ok
309-
*/
310313
for (i = 0; i < hdr->workers_ready; i++)
311314
{
315+
bool locked;
316+
312317
wd = shm_toc_lookup(toc, i + 1, false);
313318
if (wd->dboid != MyDatabaseId)
314319
continue;
315320

316321
/*
317322
* we found first worker for our database, next 'jsonbd_nworkers'
318-
* workers should be ours, and one of them should be free after sem_wait
323+
* workers should be ours
319324
*/
320-
cursem = wd->dbsem;
321-
sem_wait(cursem);
325+
locked = LWLockConditionalAcquire(wd->lock, LW_EXCLUSIVE);
326+
if (locked)
327+
goto comm;
328+
322329
for (j = i; j < (i + jsonbd_nworkers); j++)
323330
{
324331
wd = shm_toc_lookup(toc, j + 1, false);
@@ -327,20 +334,27 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
327334
/* somehow not all workers started for this database, try next */
328335
continue;
329336

330-
if (pg_atomic_test_set_flag(&wd->busy))
337+
locked = LWLockConditionalAcquire(wd->lock, LW_EXCLUSIVE);
338+
if (locked)
331339
goto comm;
332340
}
333341

334-
/* should never reach here if all worked correctly */
335-
sem_post(cursem);
336-
elog(ERROR, "jsonbd: could not make a connection with workers");
342+
/* if none of the workers were free, we just wait on last one */
343+
Assert(!locked);
344+
LWLockAcquire(wd->lock, LW_EXCLUSIVE);
345+
goto comm;
337346
}
338347

339348
/*
340-
* there are no workers for our database,
349+
* There are no workers for our database,
341350
* so we should launch them using our jsonbd workers launcher
351+
*
352+
* But if the launcher already locked, we should check the workers
353+
* list again
342354
*/
343-
sem_wait(&hdr->launcher_sem);
355+
if (!LWLockAcquireOrWait(hdr->launcher.lock, LW_EXCLUSIVE))
356+
continue;
357+
344358
shm_mq_set_sender(hdr->launcher.mqin, MyProc);
345359
shm_mq_set_receiver(hdr->launcher.mqout, MyProc);
346360

@@ -366,7 +380,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
366380

367381
shm_mq_clean_sender(hdr->launcher.mqin);
368382
shm_mq_clean_receiver(hdr->launcher.mqout);
369-
sem_post(&hdr->launcher_sem);
383+
LWLockRelease(hdr->launcher.lock);
370384

371385
if (detached)
372386
elog(ERROR, "jsonbd: workers launcher was detached");
@@ -376,6 +390,8 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
376390
}
377391

378392
comm:
393+
Assert(wd != NULL);
394+
379395
detached = false;
380396

381397
/* send data */
@@ -405,8 +421,8 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
405421
/* clean self as receiver and unlock mq */
406422
shm_mq_clean_sender(wd->mqin);
407423
shm_mq_clean_receiver(wd->mqout);
408-
pg_atomic_clear_flag(&wd->busy);
409-
sem_post(cursem);
424+
425+
LWLockRelease(wd->lock);
410426

411427
if (detached)
412428
elog(ERROR, "jsonbd: worker has detached");

jsonbd.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#define JSONBD_SHM_MQ_MAGIC 0xAAAA
1313

14+
#define JSONBD_LWLOCKS_TRANCHE "jsonbd lwlocks tranche"
1415
#define MAX_JSONBD_WORKERS_PER_DATABASE 3
1516
#define MAX_DATABASES 10 /* FIXME: need more? */
1617
#define MAX_JSONBD_WORKERS (MAX_DATABASES * MAX_JSONBD_WORKERS_PER_DATABASE)
@@ -22,19 +23,16 @@ typedef enum {
2223

2324
typedef struct jsonbd_shm_worker
2425
{
25-
sem_t *dbsem;
2626
shm_mq *mqin;
2727
shm_mq *mqout;
28-
pg_atomic_flag busy;
2928
PGPROC *proc;
3029
volatile Oid dboid; /* database of the worker */
30+
LWLock *lock;
3131
} jsonbd_shm_worker;
3232

3333
/* Shared memory structures */
3434
typedef struct jsonbd_shm_hdr
3535
{
36-
sem_t launcher_sem;
37-
sem_t workers_sem[MAX_DATABASES];
3836
volatile int workers_ready;
3937
jsonbd_shm_worker launcher;
4038
Latch launcher_latch;

jsonbd_worker.c

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ init_worker(dsm_segment *seg)
136136

137137
worker_state = shm_toc_lookup(toc, worker_args->worker_num, false);
138138
worker_state->proc = MyProc;
139-
worker_state->dbsem = &hdr->workers_sem[worker_args->database_num];
140139
worker_state->dboid = worker_args->dboid;
141140

142141
/* input mq */
@@ -145,9 +144,6 @@ init_worker(dsm_segment *seg)
145144
/* output mq */
146145
shm_mq_set_sender(worker_state->mqout, MyProc);
147146

148-
/* not busy at start */
149-
pg_atomic_clear_flag(&worker_state->busy);
150-
151147
/* this context will be reset after each task */
152148
Assert(worker_context == NULL);
153149
worker_context = AllocSetContextCreate(TopMemoryContext,
@@ -612,9 +608,6 @@ jsonbd_launcher_main(Datum arg)
612608
if (started != jsonbd_nworkers)
613609
elog(NOTICE, "jsonbd: not all workers for %d has started", dboid);
614610

615-
/* semaphore value should be equal to workers count for database */
616-
sem_init(&hdr->workers_sem[database_num], 1, started);
617-
618611
/* we report ok if at least one worker has started */
619612
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"y", 2}), 1, false);
620613
database_num += 1;
@@ -762,7 +755,6 @@ jsonbd_worker_main(Datum arg)
762755
static bool
763756
jsonbd_register_worker(int worker_num, Oid dboid, int database_num)
764757
{
765-
pid_t pid;
766758
BackgroundWorker worker;
767759
BackgroundWorkerHandle *bgw_handle;
768760
jsonbd_worker_args *worker_args;
@@ -804,13 +796,6 @@ jsonbd_register_worker(int worker_num, Oid dboid, int database_num)
804796
return false;
805797
}
806798

807-
/* Wait till the worker starts */
808-
if (WaitForBackgroundWorkerStartup(bgw_handle, &pid) == BGWH_POSTMASTER_DIED)
809-
{
810-
elog(LOG, "jsonbd: postmaster died during bgworker startup");
811-
return false;
812-
}
813-
814799
/* Wait to be signalled. */
815800
#if PG_VERSION_NUM >= 100000
816801
WaitLatch(&hdr->launcher_latch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);

sql/basic.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
CREATE SCHEMA comp;
22
CREATE EXTENSION jsonbd SCHEMA comp;
3-
CREATE TABLE comp.t(a JSONB COMPRESSED jsonbd);
3+
CREATE TABLE comp.t(a JSONB COMPRESSION jsonbd);
44
\d+ comp.t;
55

66
CREATE OR REPLACE FUNCTION comp.add_record()

tests/jsonbd_test.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,45 @@
33
import unittest
44
import random
55
import json
6+
import os.path
7+
import subprocess
68

79
from testgres import get_new_node
810

11+
# set setup base logging config, it can be turned on by `use_logging`
12+
# parameter on node setup
13+
14+
import logging
15+
import logging.config
16+
17+
logfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'tests.log')
18+
LOG_CONFIG = {
19+
'version': 1,
20+
'handlers': {
21+
'console': {
22+
'class': 'logging.StreamHandler',
23+
'formatter': 'base_format',
24+
'level': logging.DEBUG,
25+
},
26+
'file': {
27+
'class': 'logging.FileHandler',
28+
'filename': logfile,
29+
'formatter': 'base_format',
30+
'level': logging.DEBUG,
31+
},
32+
},
33+
'formatters': {
34+
'base_format': {
35+
'format': '%(node)-5s: %(message)s',
36+
},
37+
},
38+
'root': {
39+
'handlers': ('file', ),
40+
'level': 'DEBUG',
41+
},
42+
}
43+
44+
logging.config.dictConfig(LOG_CONFIG)
945

1046
insert_cmd = '''
1147
INSERT INTO comp.t
@@ -32,19 +68,23 @@ def generate_dict():
3268

3369

3470
class Tests(unittest.TestCase):
71+
def set_trace(self, con, command="pg_debug"):
72+
pid = con.execute("select pg_backend_pid()")[0][0]
73+
p = subprocess.Popen([command], stdin=subprocess.PIPE)
74+
p.communicate(str(pid).encode())
75+
3576
def test_correctness(self):
3677
with get_new_node('node1') as node:
3778
node.init()
3879
node.append_conf("postgresql.conf", "shared_preload_libraries='jsonbd'\n")
3980
node.start()
4081

4182
node.psql('postgres', 'create extension jsonbd')
42-
node.psql('postgres', 'create table t1(pk serial, a jsonb compressed jsonbd);')
83+
node.psql('postgres', 'create table t1(pk serial, a jsonb compression jsonbd);')
4384

4485
data = []
4586
with node.connect('postgres') as con:
46-
import ipdb; ipdb.set_trace()
47-
for i in range(2):
87+
for i in range(1000):
4888
d = generate_dict()
4989
data.append(d)
5090
con.execute("insert into t1 (a) values ('%s');" % json.dumps(d))

0 commit comments

Comments
 (0)