Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 35fcb21

Browse files
committed
Add bench script, fix mq errors
1 parent f80969d commit 35fcb21

File tree

5 files changed

+137
-34
lines changed

5 files changed

+137
-34
lines changed

bench/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/football.json

bench/bench.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/usr/bin/env python3
2+
3+
import contextlib
4+
import glob
5+
import json
6+
import os
7+
8+
from testgres import get_new_node
9+
10+
sources = [
11+
('football', './football.json'),
12+
]
13+
14+
conf = '''
15+
shared_preload_libraries='jsonbd'
16+
'''
17+
18+
19+
@contextlib.contextmanager
20+
def cwd(path):
21+
print("cwd: ", path)
22+
curdir = os.getcwd()
23+
os.chdir(path)
24+
25+
try:
26+
yield
27+
finally:
28+
print("cwd:", curdir)
29+
os.chdir(curdir)
30+
31+
32+
with get_new_node('node1') as node:
33+
node.init()
34+
node.append_conf('postgresql.conf', conf)
35+
node.start()
36+
37+
node.safe_psql('create extension jsonbd')
38+
39+
for name, root_dir in sources:
40+
table_name = name
41+
table_name_c = '%s_c' % name
42+
43+
node.safe_psql('create table %s(a jsonb)' % table_name)
44+
node.safe_psql('create table %s(a jsonb compression jsonbd)' % table_name_c)
45+
node.safe_psql('alter table %s alter column a set storage external' % table_name)
46+
47+
with node.connect() as con:
48+
with cwd(os.path.abspath(root_dir)):
49+
for filename in glob.iglob('**/*.json', recursive=True):
50+
if filename == 'package.json':
51+
continue
52+
53+
with open(filename, 'r') as f:
54+
data = json.load(f)
55+
56+
if isinstance(data, dict):
57+
if 'rounds' in data:
58+
for obj in data['rounds']:
59+
sql = 'insert into {} values (%s)'
60+
for i in range(10):
61+
con.execute(sql.format(table_name), (json.dumps(obj), ))
62+
con.execute(sql.format(table_name_c), (json.dumps(obj), ))
63+
64+
print(node.safe_psql("select pg_size_pretty(pg_total_relation_size('%s'))" % table_name))
65+
print(node.safe_psql("select pg_size_pretty(pg_total_relation_size('%s'))" % table_name_c))

jsonbd.c

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -360,15 +360,15 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
360360
mqout = shm_mq_create(hdr->launcher.mqout, shm_mq_minimum_size);
361361

362362
/*
363-
* important that sender on mqout should be set earlier than
364-
* receiver on mqin
365-
*/
366-
shm_mq_set_sender(mqout, hdr->launcher.proc);
363+
* set sender, create handle and wake up launcher and wait until
364+
* it's connected
365+
* */
367366
shm_mq_set_receiver(mqout, MyProc);
368367
shm_mq_set_sender(mqin, MyProc);
369-
shm_mq_set_receiver(mqin, hdr->launcher.proc);
370-
371368
mqh = shm_mq_attach(mqin, NULL, NULL);
369+
SetLatch(&hdr->launcher.proc->procLatch);
370+
shm_mq_wait_for_attach(mqh);
371+
372372
resmq = shm_mq_sendv(mqh,
373373
&((shm_mq_iovec) {(char *) &MyDatabaseId, sizeof(MyDatabaseId)}), 1, false);
374374
if (resmq != SHM_MQ_SUCCESS)
@@ -405,12 +405,17 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
405405
mqin = shm_mq_create(wd->mqin, jsonbd_total_queue_size);
406406
mqout = shm_mq_create(wd->mqout, jsonbd_total_queue_size);
407407

408-
shm_mq_set_sender(mqin, MyProc);
409-
shm_mq_set_receiver(mqin, wd->proc);
410-
shm_mq_set_sender(mqout, wd->proc);
408+
/*
409+
* create handle and wake up worker and wait until
410+
* it's connected
411+
* */
411412
shm_mq_set_receiver(mqout, MyProc);
412-
413+
shm_mq_set_sender(mqin, MyProc);
413414
mqh = shm_mq_attach(mqin, NULL, NULL);
415+
SetLatch(&wd->latch);
416+
shm_mq_wait_for_attach(mqh);
417+
418+
/* send data */
414419
resmq = shm_mq_sendv(mqh, iov, iov_len, false);
415420
if (resmq != SHM_MQ_SUCCESS)
416421
detached = true;

jsonbd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef struct jsonbd_shm_worker
2828
PGPROC *proc;
2929
volatile Oid dboid; /* database of the worker */
3030
LWLock *lock;
31+
Latch latch;
3132
} jsonbd_shm_worker;
3233

3334
/* Shared memory structures */

jsonbd_worker.c

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ init_worker(dsm_segment *seg)
160160

161161
/* Set launcher free */
162162
SetLatch(&hdr->launcher_latch);
163+
InitLatch(&worker_state->latch);
163164

164165
/* make this worker visible in backend cycle */
165166
hdr->workers_ready++;
@@ -534,7 +535,7 @@ jsonbd_launcher_main(Datum arg)
534535
shm_toc *toc;
535536
jsonbd_shm_hdr *hdr;
536537

537-
shm_mq_handle *mqh = NULL;
538+
shm_mq_handle *mqh;
538539
int worker_num = 1;
539540
int database_num = 0;
540541

@@ -572,22 +573,41 @@ jsonbd_launcher_main(Datum arg)
572573
if (shutdown_requested)
573574
break;
574575

575-
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
576-
0, PG_WAIT_EXTENSION);
576+
/* Wait to be signalled. */
577+
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
578+
0, PG_WAIT_EXTENSION);
577579

578580
if (rc & WL_POSTMASTER_DEATH)
579581
break;
580582

581-
ResetLatch(&MyProc->procLatch);
583+
/* Reset the latch so we don't spin. */
584+
ResetLatch(MyLatch);
582585

583-
if (shm_mq_get_receiver(worker_state->mqin) != MyProc)
586+
if (shm_mq_get_sender(worker_state->mqin) == NULL)
587+
{
588+
CHECK_FOR_INTERRUPTS();
584589
continue;
590+
}
585591

586-
Assert(shm_mq_get_sender(worker_state->mqout) == MyProc);
592+
/*
593+
* set myself as receiver on mqin and sender on mqout,
594+
* and get data from backend
595+
* */
596+
if (!shm_mq_get_sender(worker_state->mqout))
597+
shm_mq_set_sender(worker_state->mqout, MyProc);
598+
599+
if (!shm_mq_get_receiver(worker_state->mqin))
600+
shm_mq_set_receiver(worker_state->mqin, MyProc);
587601

588602
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
589603
resmq = shm_mq_receive(mqh, &nbytes, &data, false);
590604

605+
if (resmq == SHM_MQ_DETACHED)
606+
{
607+
shm_mq_detach(mqh);
608+
continue;
609+
}
610+
591611
if (resmq == SHM_MQ_SUCCESS)
592612
{
593613
int started = 0;
@@ -614,9 +634,6 @@ jsonbd_launcher_main(Datum arg)
614634

615635
shm_mq_detach(mqh);
616636

617-
/* we don't need start this cycle again after we send data */
618-
shm_mq_clean_receiver(worker_state->mqin);
619-
620637
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
621638
if (started)
622639
{
@@ -629,15 +646,11 @@ jsonbd_launcher_main(Datum arg)
629646
}
630647
else
631648
resmq = shm_mq_sendv(mqh, &((shm_mq_iovec) {"n", 2}), 1, false);
632-
633-
if (resmq != SHM_MQ_SUCCESS)
634-
elog(NOTICE, "jsonbd: backend detached early");
635-
636-
shm_mq_detach(mqh);
637-
638-
/* mark we need new handle */
639-
mqh = NULL;
640649
}
650+
if (resmq != SHM_MQ_SUCCESS)
651+
elog(NOTICE, "jsonbd: backend detached early");
652+
653+
/* shm_mq_detach(mqh); */
641654
}
642655

643656
elog(LOG, "jsonbd launcher has ended its work");
@@ -679,22 +692,37 @@ jsonbd_worker_main(Datum arg)
679692
if (shutdown_requested)
680693
break;
681694

682-
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
683-
0, PG_WAIT_EXTENSION);
695+
/* Wait to be signalled. */
696+
rc = WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
697+
0, PG_WAIT_EXTENSION);
684698

685699
if (rc & WL_POSTMASTER_DEATH)
686700
break;
687701

688-
ResetLatch(&MyProc->procLatch);
702+
/* Reset the latch so we don't spin. */
703+
ResetLatch(&worker_state->latch);
689704

690-
if (shm_mq_get_receiver(worker_state->mqin) != MyProc)
705+
if (shm_mq_get_sender(worker_state->mqin) == NULL)
706+
{
707+
CHECK_FOR_INTERRUPTS();
691708
continue;
709+
}
710+
711+
if (!shm_mq_get_sender(worker_state->mqout))
712+
shm_mq_set_sender(worker_state->mqout, MyProc);
692713

693-
Assert(shm_mq_get_sender(worker_state->mqout) == MyProc);
714+
if (!shm_mq_get_receiver(worker_state->mqin))
715+
shm_mq_set_receiver(worker_state->mqin, MyProc);
694716

695717
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
696718
resmq = shm_mq_receive(mqh, &nbytes, &data, false);
697719

720+
if (resmq == SHM_MQ_DETACHED)
721+
{
722+
shm_mq_detach(mqh);
723+
continue;
724+
}
725+
698726
if (resmq == SHM_MQ_SUCCESS)
699727
{
700728
JsonbcCommand cmd;
@@ -740,7 +768,6 @@ jsonbd_worker_main(Datum arg)
740768
}
741769

742770
shm_mq_detach(mqh);
743-
shm_mq_clean_receiver(worker_state->mqin);
744771
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
745772

746773
if (iov != NULL)
@@ -751,7 +778,11 @@ jsonbd_worker_main(Datum arg)
751778
if (resmq != SHM_MQ_SUCCESS)
752779
elog(NOTICE, "jsonbd: backend detached early");
753780

754-
shm_mq_detach(mqh);
781+
/*
782+
* it is not safe call shm_mq_detach here, since mq can be
783+
* already cleared.
784+
* */
785+
/* shm_mq_detach(mqh); */
755786
MemoryContextReset(worker_context);
756787
}
757788
}

0 commit comments

Comments
 (0)