Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d9c2e1a

Browse files
committed
Start work on bulk inserting
1 parent 6865f7c commit d9c2e1a

File tree

2 files changed

+111
-10
lines changed

2 files changed

+111
-10
lines changed

jsonbc.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ typedef struct jsonbc_shm_worker
3030
PGPROC *proc;
3131
} jsonbc_shm_worker;
3232

33+
typedef struct jsonbc_cached_cmopt
34+
{
35+
HTAB *keys;
36+
} cmopt_cached;
37+
38+
typedef struct jsonbc_cached_key
39+
{
40+
HTAB *
41+
}
42+
3343
extern void _PG_init(void);
3444
extern void jsonbc_register_worker(int n);
3545

jsonbc_worker.c

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "access/htup_details.h"
1212
#include "access/sysattr.h"
1313
#include "catalog/pg_extension.h"
14+
#include "catalog/pg_compression_opt.h"
1415
#include "catalog/indexing.h"
1516
#include "catalog/namespace.h"
1617
#include "commands/extension.h"
@@ -44,16 +45,20 @@ static Oid jsonbc_get_dictionary_relid(void);
4445

4546
#define JSONBC_DICTIONARY_REL "jsonbc_dictionary"
4647

47-
static const char *sql_dictionary = "CREATE TABLE public." JSONBC_DICTIONARY_REL
48-
" (cmopt OID NOT NULL,"
49-
" id INT4 NOT NULL,"
50-
" key TEXT NOT NULL);"
51-
" CREATE UNIQUE INDEX jsonbc_dict_on_id ON " JSONBC_DICTIONARY_REL "(cmopt, id);"
52-
" CREATE UNIQUE INDEX jsonbc_dict_on_key ON " JSONBC_DICTIONARY_REL " (cmopt, key);";
53-
54-
#define JSONBC_DICTIONARY_REL_ATT_CMOPT 1
55-
#define JSONBC_DICTIONARY_REL_ATT_ID 2
56-
#define JSONBC_DICTIONARY_REL_ATT_KEY 3
48+
static const char *sql_dictionary = \
49+
"CREATE TABLE public." JSONBC_DICTIONARY_REL
50+
" (cmopt OID NOT NULL,"
51+
" id INT4 NOT NULL,"
52+
" key TEXT NOT NULL);"
53+
"CREATE UNIQUE INDEX jsonbc_dict_on_id ON " JSONBC_DICTIONARY_REL "(cmopt, id);"
54+
"CREATE UNIQUE INDEX jsonbc_dict_on_key ON " JSONBC_DICTIONARY_REL " (cmopt, key);";
55+
56+
enum {
57+
JSONBC_DICTIONARY_REL_ATT_CMOPT = 1,
58+
JSONBC_DICTIONARY_REL_ATT_ID,
59+
JSONBC_DICTIONARY_REL_ATT_KEY,
60+
JSONBC_DICTIONARY_REL_ATT_COUNT
61+
};
5762

5863
/*
5964
* Handle SIGTERM in BGW's process.
@@ -207,6 +212,92 @@ jsonbc_get_keys_slow(Oid cmoptoid, uint32 *ids, int nkeys)
207212
return keys;
208213
}
209214

215+
static void
216+
jsonbc_bulk_insert_keys(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
217+
{
218+
Oid relid = jsonbc_get_dictionary_relid();
219+
Relation rel;
220+
221+
int i,
222+
counter,
223+
hi_options;
224+
HeapTuple *buffered;
225+
Datum values[JSONBC_DICTIONARY_REL_ATT_COUNT];
226+
Datum nulls[JSONBC_DICTIONARY_REL_ATT_COUNT];
227+
BulkInsertState bistate;
228+
TupleTableSlot *myslot;
229+
ResultRelInfo *resultRelInfo;
230+
ExprContext *econtext;
231+
EState *estate = CreateExecutorState();
232+
MemoryContext oldcontext = CurrentMemoryContext;
233+
234+
start_xact_command();
235+
rel = heap_open(relid, RowExclusiveLock);
236+
bistate = GetBulkInsertState();
237+
myslot = MakeTupleTableSlot();
238+
ExecSetSlotDescriptor(myslot, RelationGetDescr(rel));
239+
240+
/* we need resultRelInfo to insert to indexes */
241+
resultRelInfo = makeNode(ResultRelInfo);
242+
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
243+
244+
ExecOpenIndices(resultRelInfo, false);
245+
MemSet(values, 0, sizeof(values));
246+
MemSet(nulls, 0, sizeof(nulls));
247+
248+
/* only one process can insert to dictionary at same time */
249+
LockDatabaseObject(relid, cmoptoid, 0, ExclusiveLock);
250+
251+
buffered = palloc(sizeof(HeapTuple) * nkeys);
252+
for (i = 0; i < nkeys; i++)
253+
{
254+
HeapTuple tuple;
255+
256+
values[JSONBC_DICTIONARY_REL_ATT_CMOPT - 1] = ObjectIdGetDatum(cmoptoid);
257+
values[JSONBC_DICTIONARY_REL_ATT_ID - 1] = Int32GetDatum(counter++);
258+
values[JSONBC_DICTIONARY_REL_ATT_KEY - 1] = CStringGetTextDatum(buf);
259+
260+
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
261+
tuple->t_tableOid = relid;
262+
263+
buffered[i] = tuple;
264+
265+
/* move to next key */
266+
while (*buf != '\0')
267+
buf++;
268+
269+
buf++;
270+
}
271+
272+
if (!XLogIsNeeded())
273+
hi_options |= HEAP_INSERT_SKIP_WAL;
274+
275+
heap_multi_insert(rel,
276+
buffered,
277+
nkeys,
278+
mycid,
279+
hi_options,
280+
bistate);
281+
282+
/* update indexes */
283+
for (i = 0; i < nkeys; i++)
284+
{
285+
List *recheckIndexes;
286+
287+
ExecStoreTuple(buffered[i], myslot, InvalidBuffer, false);
288+
recheckIndexes =
289+
ExecInsertIndexTuples(myslot, &(buffered[i]->t_self),
290+
estate, false, NULL, NIL);
291+
list_free(recheckIndexes);
292+
}
293+
294+
UnlockDatabaseObject(relid, cmoptoid, 0, ExclusiveLock);
295+
296+
FreeBulkInsertState(bistate);
297+
heap_close(rel, RowExclusiveLock);
298+
finish_xact_command();
299+
}
300+
210301
/*
211302
* Get key IDs using relation
212303
* TODO: change to direct access

0 commit comments

Comments
 (0)