Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4d5cff3

Browse files
committed
Add buffers
1 parent c0160e2 commit 4d5cff3

File tree

1 file changed

+154
-63
lines changed

1 file changed

+154
-63
lines changed

jsonbc.c

Lines changed: 154 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,74 @@ PG_MODULE_MAGIC;
1616
PG_FUNCTION_INFO_V1(jsonbc_compression_handler);
1717
PG_FUNCTION_INFO_V1(int4_to_char);
1818

19-
static MemoryContext compression_mcxt = NULL;
20-
21-
static uint32
22-
get_key_id(Oid cmoptoid, char *key, int keylen)
19+
/* we use one buffer for whole transaction to avoid extra allocations */
20+
typedef struct
2321
{
24-
uint32 key_id = 0;
25-
char *s,
26-
*sql;
27-
bool isnull;
28-
Datum datum;
22+
char *buf; /* keys */
23+
int buflen;
24+
uint32 *idsbuf; /* key ids */
25+
int idslen;
26+
} CompressionThroughBuffers;
2927

30-
s = palloc(keylen + 1);
31-
memcpy(s, key, keylen);
32-
s[keylen] = '\0';
28+
static MemoryContext compression_mcxt = NULL;
29+
static CompressionThroughBuffers *compression_buffers = NULL;
3330

34-
sql = psprintf("SELECT id FROM jsonbc_dictionary WHERE cmopt = %d"
35-
" AND key = '%s'", cmoptoid, s);
31+
static void init_memory_context(void);
32+
static void memory_reset_callback(void *arg);
33+
static void get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys);
34+
static char *get_key_by_id(Oid cmoptoid, int32 key_id);
35+
static void encode_varbyte(uint32 val, unsigned char *ptr, int *len);
36+
static uint32 decode_varbyte(unsigned char *ptr);
37+
static char *packJsonbValue(JsonbValue *val, int header_size, int *len);
38+
39+
/* TODO: change to worker, add caches and other stuff */
40+
static void
41+
get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
42+
{
43+
int i;
3644

3745
if (SPI_connect() != SPI_OK_CONNECT)
3846
elog(ERROR, "SPI_connect failed");
3947

40-
if (SPI_exec(sql, 0) != SPI_OK_SELECT)
41-
elog(ERROR, "SPI_exec failed");
42-
43-
if (SPI_processed == 0)
48+
for (i = 0; i < nkeys; i++)
4449
{
45-
char *sql2 = psprintf("with t as (select (coalesce(max(id), 0) + 1) new_id from "
46-
"jsonbc_dictionary where cmopt = %d) insert into jsonbc_dictionary"
47-
" select %d, t.new_id, '%s' from t returning id", cmoptoid, cmoptoid, s);
50+
Datum datum;
51+
bool isnull;
52+
char *sql;
53+
54+
sql = psprintf("SELECT id FROM jsonbc_dictionary WHERE cmopt = %d"
55+
" AND key = '%s'", cmoptoid, buf);
4856

49-
if (SPI_exec(sql2, 0) != SPI_OK_INSERT_RETURNING)
57+
if (SPI_exec(sql, 0) != SPI_OK_SELECT)
5058
elog(ERROR, "SPI_exec failed");
51-
}
5259

53-
datum = SPI_getbinval(SPI_tuptable->vals[0],
54-
SPI_tuptable->tupdesc,
55-
1,
56-
&isnull);
57-
if (isnull)
58-
elog(ERROR, "id is NULL");
60+
if (SPI_processed == 0)
61+
{
62+
char *sql2 = psprintf("with t as (select (coalesce(max(id), 0) + 1) new_id from "
63+
"jsonbc_dictionary where cmopt = %d) insert into jsonbc_dictionary"
64+
" select %d, t.new_id, '%s' from t returning id", cmoptoid, cmoptoid, buf);
5965

60-
key_id = DatumGetInt32(datum);
66+
if (SPI_exec(sql2, 0) != SPI_OK_INSERT_RETURNING)
67+
elog(ERROR, "SPI_exec failed");
68+
}
6169

62-
pfree(s);
63-
pfree(sql);
64-
SPI_finish();
70+
datum = SPI_getbinval(SPI_tuptable->vals[0],
71+
SPI_tuptable->tupdesc,
72+
1,
73+
&isnull);
74+
if (isnull)
75+
elog(ERROR, "id is NULL");
6576

66-
return key_id;
77+
idsbuf[i] = DatumGetInt32(datum);
78+
79+
/* move to next key */
80+
while (*buf != '\0')
81+
buf++;
82+
83+
buf++;
84+
pfree(sql);
85+
}
86+
SPI_finish();
6787
}
6888

6989
static char *
@@ -158,6 +178,41 @@ decode_varbyte(unsigned char *ptr)
158178
return val;
159179
}
160180

181+
static void
182+
init_memory_context(void)
183+
{
184+
MemoryContext old_mcxt;
185+
MemoryContextCallback *cb;
186+
187+
if (compression_mcxt)
188+
return;
189+
190+
compression_mcxt = AllocSetContextCreate(TopTransactionContext,
191+
"jsonbc compression context",
192+
ALLOCSET_DEFAULT_SIZES);
193+
cb = MemoryContextAlloc(TopTransactionContext,
194+
sizeof(MemoryContextCallback));
195+
cb->func = memory_reset_callback;
196+
cb->arg = NULL;
197+
198+
MemoryContextRegisterResetCallback(TopTransactionContext, cb);
199+
200+
old_mcxt = MemoryContextSwitchTo(compression_mcxt);
201+
compression_buffers = palloc(sizeof(CompressionThroughBuffers));
202+
compression_buffers->buflen = 1024;
203+
compression_buffers->idslen = 256;
204+
compression_buffers->buf = palloc(compression_buffers->buflen);
205+
compression_buffers->idsbuf =
206+
(uint32 *) palloc(compression_buffers->idslen * sizeof(uint32));
207+
MemoryContextSwitchTo(old_mcxt);
208+
}
209+
210+
static void
211+
memory_reset_callback(void *arg)
212+
{
213+
compression_mcxt = NULL;
214+
}
215+
161216
/*
162217
* Given a JsonbValue, convert to Jsonb but with different header part.
163218
* The result is palloc'd.
@@ -198,42 +253,84 @@ jsonbc_compress(AttributeCompression *ac, const struct varlena *data)
198253
{
199254
int size;
200255
JsonbIteratorToken r;
201-
JsonbValue v;
256+
JsonbValue jv;
202257
JsonbIterator *it;
203258
JsonbValue *jbv = NULL;
204259
JsonbParseState *state = NULL;
205260
struct varlena *res;
206261

262+
init_memory_context();
263+
207264
it = JsonbIteratorInit(&((Jsonb *) data)->root);
208-
while ((r = JsonbIteratorNext(&it, &v, false)) != 0)
265+
while ((r = JsonbIteratorNext(&it, &jv, false)) != 0)
209266
{
210-
switch (r)
267+
/* we assume that jsonb has already been sorted and uniquefied */
268+
jbv = pushJsonbValue(&state, r, r < WJB_BEGIN_ARRAY ? &jv : NULL);
269+
270+
if (r == WJB_END_OBJECT && jbv->type == jbvObject)
211271
{
212-
case WJB_BEGIN_OBJECT:
213-
break;
214-
case WJB_KEY:
272+
int i,
273+
len,
274+
nkeys = jbv->val.object.nPairs,
275+
offset = 0;
276+
277+
/* maximum length of encoded uint32 is 5 */
278+
char *keyptr = MemoryContextAlloc(compression_mcxt, nkeys * 5),
279+
*buf;
280+
uint32 *idsbuf;
281+
282+
/* increase the size of buffer for key ids if we need to */
283+
if (nkeys > compression_buffers->idslen)
215284
{
216-
int len;
217-
int32 key_id;
218-
unsigned char *ptr = palloc0(6);
285+
compression_buffers->idsbuf =
286+
(uint32 *) repalloc(compression_buffers->idsbuf, nkeys * sizeof(uint32));
287+
compression_buffers->idslen = nkeys;
288+
}
219289

220-
Assert(v.type == jbvString);
221-
key_id = get_key_id(ac->cmoptoid, v.val.string.val, v.val.string.len);
290+
/* calculate length of keys */
291+
len = 0;
292+
for (i = 0; i < nkeys; i++)
293+
{
294+
JsonbValue *v = &jbv->val.object.pairs[i].key;
295+
len += v->val.string.len + 1 /* \0 */;
296+
}
297+
298+
/* increase the buffer if we need to */
299+
if (len > compression_buffers->buflen)
300+
{
301+
compression_buffers->buf =
302+
(char *) repalloc(compression_buffers->buf, len);
303+
compression_buffers->buflen = len;
304+
}
222305

223-
encode_varbyte(key_id, ptr, &len);
224-
Assert(len <= 5);
306+
/* copy all keys to buffer */
307+
buf = compression_buffers->buf;
308+
idsbuf = compression_buffers->idsbuf;
225309

226-
v.type = jbvString;
227-
v.val.string.val = (char *) ptr;
228-
v.val.string.len = len;
229-
break;
310+
for (i = 0; i < nkeys; i++)
311+
{
312+
JsonbValue *v = &jbv->val.object.pairs[i].key;
313+
memcpy(buf + offset, v->val.string.val, v->val.string.len);
314+
offset += v->val.string.len;
315+
buf[offset++] = '\0';
316+
}
317+
318+
Assert(offset == len);
319+
320+
/* retrieve or generate ids */
321+
get_key_ids(ac->cmoptoid, buf, idsbuf, nkeys);
322+
323+
/* replace the old keys with encoded ids */
324+
for (i = 0; i < nkeys; i++)
325+
{
326+
JsonbValue *v = &jbv->val.object.pairs[i].key;
327+
328+
encode_varbyte(idsbuf[i], (unsigned char *) keyptr, &len);
329+
v->val.string.val = keyptr;
330+
v->val.string.len = len;
331+
keyptr += len;
230332
}
231-
case WJB_END_OBJECT:
232-
break;
233-
default:
234-
break;
235333
}
236-
jbv = pushJsonbValue(&state, r, r < WJB_BEGIN_ARRAY ? &v : NULL);
237334
}
238335

239336
/* don't compress scalar values */
@@ -263,7 +360,7 @@ jsonbc_decompress(AttributeCompression *ac, const struct varlena *data)
263360
JsonbParseState *state = NULL;
264361
struct varlena *res;
265362

266-
Assert(compression_mcxt != NULL);
363+
init_memory_context();
267364
Assert(VARATT_IS_CUSTOM_COMPRESSED(data));
268365

269366
jb = (Jsonb *) ((char *) data + VARHDRSZ_CUSTOM_COMPRESSED - offsetof(Jsonb, root));
@@ -286,7 +383,6 @@ jsonbc_decompress(AttributeCompression *ac, const struct varlena *data)
286383
}
287384

288385
res = (struct varlena *) JsonbValueToJsonb(jbv);
289-
MemoryContextReset(compression_mcxt);
290386
return res;
291387
}
292388

@@ -306,10 +402,5 @@ jsonbc_compression_handler(PG_FUNCTION_ARGS)
306402
cmr->compress = jsonbc_compress;
307403
cmr->decompress = jsonbc_decompress;
308404

309-
if (compression_mcxt == NULL)
310-
compression_mcxt = AllocSetContextCreate(TopTransactionContext,
311-
"jsonbc compression context",
312-
ALLOCSET_DEFAULT_SIZES);
313-
314405
PG_RETURN_POINTER(cmr);
315406
}

0 commit comments

Comments
 (0)