33
33
#include "utils/resowner.h"
34
34
#include "utils/snapmgr.h"
35
35
#include "utils/tqual.h"
36
+ #include "utils/syscache.h"
36
37
37
38
static bool xact_started = false;
38
39
static bool shutdown_requested = false;
@@ -48,24 +49,17 @@ Oid jsonbd_id_indoid = InvalidOid;
48
49
void jsonbd_worker_main (Datum arg );
49
50
void jsonbd_launcher_main (Datum arg );
50
51
static bool jsonbd_register_worker (int , Oid , int );
52
+ static char * jsonbd_get_dictionary_name (Oid relid );
51
53
52
54
#define JSONBD_DICTIONARY_REL "jsonbd_dictionary"
53
55
54
- static const char * sql_dictionary = \
55
- "CREATE TABLE public." JSONBD_DICTIONARY_REL
56
- " (cmopt OID NOT NULL,"
57
- " id INT4 NOT NULL,"
58
- " key TEXT NOT NULL);"
59
- "CREATE UNIQUE INDEX jsonbd_dict_on_id ON " JSONBD_DICTIONARY_REL "(cmopt, id);"
60
- "CREATE UNIQUE INDEX jsonbd_dict_on_key ON " JSONBD_DICTIONARY_REL " (cmopt, key);" ;
61
-
62
56
static const char * sql_insert = \
63
- "WITH t AS (SELECT (COALESCE(MAX(id), 0) + 1) new_id FROM "
64
- JSONBD_DICTIONARY_REL " WHERE cmopt = %d) INSERT INTO " JSONBD_DICTIONARY_REL
57
+ "WITH t AS (SELECT (COALESCE(MAX(id), 0) + 1) new_id FROM %s "
58
+ " WHERE acoid = %d) INSERT INTO %s"
65
59
" SELECT %d, t.new_id, '%s' FROM t RETURNING id" ;
66
60
67
61
enum {
68
- JSONBD_DICTIONARY_REL_ATT_CMOPT = 1 ,
62
+ JSONBD_DICTIONARY_REL_ATT_ACOID = 1 ,
69
63
JSONBD_DICTIONARY_REL_ATT_ID ,
70
64
JSONBD_DICTIONARY_REL_ATT_KEY ,
71
65
JSONBD_DICTIONARY_REL_ATT_COUNT
@@ -353,8 +347,17 @@ jsonbd_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
353
347
Relation indrel ;
354
348
int i ;
355
349
Oid relid = jsonbd_get_dictionary_relid ();
356
- bool spi_on = false;
350
+ bool spi_on = false,
351
+ failed = false;
357
352
jsonbd_cached_cmopt * cmcache ;
353
+ static char * relname = NULL ;
354
+
355
+ if (relname == NULL )
356
+ {
357
+ start_xact_command ();
358
+ relname = jsonbd_get_dictionary_name (relid );
359
+ finish_xact_command ();
360
+ }
358
361
359
362
cmcache = get_cached_compression_options (cmoptoid );
360
363
@@ -418,20 +421,27 @@ jsonbd_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
418
421
/* still need to add */
419
422
Datum datum ;
420
423
bool isnull ;
421
- char * sql2 = psprintf (sql_insert , cmoptoid , cmoptoid , buf );
424
+ char * sql2 = psprintf (sql_insert , relname , cmoptoid ,
425
+ relname , cmoptoid , buf );
422
426
423
427
/* TODO: maybe use bulk inserts instead of SPI */
424
428
if (!spi_on )
425
429
{
426
430
/* lazy SPI initialization */
427
431
if (SPI_connect () != SPI_OK_CONNECT )
428
- elog (ERROR , "SPI_connect failed" );
432
+ {
433
+ failed = true;
434
+ goto finish ;
435
+ }
429
436
430
437
spi_on = true;
431
438
}
432
439
433
440
if (SPI_exec (sql2 , 0 ) != SPI_OK_INSERT_RETURNING )
434
- elog (ERROR , "SPI_exec failed" );
441
+ {
442
+ failed = true;
443
+ goto finish ;
444
+ }
435
445
436
446
pfree (sql2 );
437
447
@@ -454,6 +464,7 @@ jsonbd_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
454
464
while (* buf ++ != '\0' );
455
465
}
456
466
467
+ finish :
457
468
if (spi_on )
458
469
SPI_finish ();
459
470
@@ -463,6 +474,9 @@ jsonbd_get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
463
474
relation_close (rel , AccessShareLock );
464
475
finish_xact_command ();
465
476
}
477
+
478
+ if (failed )
479
+ elog (ERROR , "get key ids error" );
466
480
}
467
481
468
482
static char *
@@ -654,6 +668,9 @@ jsonbd_worker_main(Datum arg)
654
668
655
669
/* Initialize connection and local variables */
656
670
seg = dsm_attach ((dsm_handle ) DatumGetInt32 (arg ));
671
+ if (!seg )
672
+ goto finish ;
673
+
657
674
init_worker (seg );
658
675
659
676
MemoryContextSwitchTo (worker_context );
@@ -746,6 +763,7 @@ jsonbd_worker_main(Datum arg)
746
763
}
747
764
}
748
765
766
+ finish :
749
767
elog (LOG , "jsonbd dictionary worker has ended its work" );
750
768
proc_exit (0 );
751
769
}
@@ -832,35 +850,16 @@ jsonbd_register_launcher(void)
832
850
Oid
833
851
jsonbd_get_dictionary_relid (void )
834
852
{
835
- Oid relid ,
836
- nspoid ;
853
+ Oid relid ;
837
854
838
855
if (OidIsValid (jsonbd_dictionary_reloid ))
839
856
return jsonbd_dictionary_reloid ;
840
857
841
858
start_xact_command ();
842
859
843
- nspoid = get_namespace_oid ("public" , false);
844
- relid = get_relname_relid (JSONBD_DICTIONARY_REL , nspoid );
860
+ relid = get_relname_relid (JSONBD_DICTIONARY_REL , get_jsonbd_schema ());
845
861
if (relid == InvalidOid )
846
- {
847
- if (SPI_connect () != SPI_OK_CONNECT )
848
- elog (ERROR , "SPI_connect failed" );
849
-
850
- if (SPI_execute (sql_dictionary , false, 0 ) != SPI_OK_UTILITY )
851
- elog (ERROR , "could not create \"jsonbd\" dictionary" );
852
-
853
- SPI_finish ();
854
- CommandCounterIncrement ();
855
-
856
- finish_xact_command ();
857
- start_xact_command ();
858
-
859
- /* get just created table Oid */
860
- relid = get_relname_relid (JSONBD_DICTIONARY_REL , nspoid );
861
- jsonbd_id_indoid = InvalidOid ;
862
- jsonbd_keys_indoid = InvalidOid ;
863
- }
862
+ elog (ERROR , "jsonbd dictionary relation does not exist" );
864
863
865
864
/* fill index Oids too */
866
865
if (jsonbd_id_indoid == InvalidOid )
@@ -904,3 +903,33 @@ jsonbd_get_dictionary_relid(void)
904
903
jsonbd_dictionary_reloid = relid ;
905
904
return relid ;
906
905
}
906
+
907
+ static char *
908
+ jsonbd_get_dictionary_name (Oid relid )
909
+ {
910
+ HeapTuple tp ;
911
+ Form_pg_class reltup ;
912
+ char * relname ;
913
+ char * nspname ;
914
+ char * result ;
915
+ MemoryContext old_mcxt ;
916
+
917
+ tp = SearchSysCache1 (RELOID , ObjectIdGetDatum (relid ));
918
+ if (!HeapTupleIsValid (tp ))
919
+ elog (ERROR , "cache lookup failed for relation %u" , relid );
920
+ reltup = (Form_pg_class ) GETSTRUCT (tp );
921
+ relname = NameStr (reltup -> relname );
922
+
923
+ nspname = get_namespace_name (reltup -> relnamespace );
924
+ if (!nspname )
925
+ elog (ERROR , "cache lookup failed for namespace %u" ,
926
+ reltup -> relnamespace );
927
+
928
+ old_mcxt = MemoryContextSwitchTo (TopMemoryContext );
929
+ result = quote_qualified_identifier (nspname , relname );
930
+ MemoryContextSwitchTo (old_mcxt );
931
+
932
+ ReleaseSysCache (tp );
933
+
934
+ return result ;
935
+ }
0 commit comments