@@ -46,9 +46,9 @@ Oid jsonbd_keys_indoid = InvalidOid;
46
46
Oid jsonbd_id_indoid = InvalidOid ;
47
47
48
48
void jsonbd_worker_main (Datum arg );
49
- void jsonbd_worker_launcher ( void );
49
+ void jsonbd_launcher_main ( Datum arg );
50
50
static Oid jsonbd_get_dictionary_relid (void );
51
- static bool jsonbd_register_worker (int worker_num , Oid dboid );
51
+ static bool jsonbd_register_worker (int , Oid , volatile Latch * );
52
52
53
53
#define JSONBD_DICTIONARY_REL "jsonbd_dictionary"
54
54
@@ -121,16 +121,25 @@ get_cached_compression_options(Oid cmoptoid)
121
121
}
122
122
123
123
static void
124
- init_local_variables ( int worker_num , Oid dboid )
124
+ init_worker ( dsm_segment * seg )
125
125
{
126
126
HASHCTL hash_ctl ;
127
+ jsonbd_worker_args * worker_args ;
127
128
128
129
shm_toc * toc = shm_toc_attach (JSONBD_SHM_MQ_MAGIC , workers_data );
129
130
jsonbd_shm_hdr * hdr = shm_toc_lookup (toc , 0 , false);
131
+
132
+ worker_args = (jsonbd_worker_args * ) dsm_segment_address (seg );
133
+
134
+ /* increase global count of started workers */
130
135
hdr -> workers_ready ++ ;
131
136
132
- worker_state = shm_toc_lookup (toc , worker_num , false);
137
+ /* Connect to our database */
138
+ BackgroundWorkerInitializeConnectionByOid (worker_args -> dboid , InvalidOid );
139
+
140
+ worker_state = shm_toc_lookup (toc , worker_args -> worker_num , false);
133
141
worker_state -> proc = MyProc ;
142
+ worker_state -> dboid = worker_args -> dboid ;
134
143
135
144
/* input mq */
136
145
shm_mq_set_receiver (worker_state -> mqin , MyProc );
@@ -165,7 +174,13 @@ init_local_variables(int worker_num, Oid dboid)
165
174
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT );
166
175
167
176
elog (LOG , "jsonbd dictionary worker %d started with pid: %d" ,
168
- worker_num , MyProcPid );
177
+ worker_args -> worker_num , MyProcPid );
178
+
179
+ /* We don't need this segment anymore */
180
+ dsm_detach (seg );
181
+
182
+ /* Set launcher free */
183
+ SetLatch (& hdr -> launcher_latch );
169
184
}
170
185
171
186
static void
@@ -519,7 +534,7 @@ jsonbd_cmd_get_keys(int nkeys, Oid cmoptoid, uint32 *ids)
519
534
}
520
535
521
536
void
522
- jsonbd_worker_launcher ( void )
537
+ jsonbd_launcher_main ( Datum arg )
523
538
{
524
539
shm_toc * toc ;
525
540
jsonbd_shm_hdr * hdr ;
@@ -534,15 +549,19 @@ jsonbd_worker_launcher(void)
534
549
/* We're now ready to receive signals */
535
550
BackgroundWorkerUnblockSignals ();
536
551
552
+ /* Init this launcher as backend so workers can notify it */
553
+ InitPostgres (NULL , InvalidOid , NULL , InvalidOid , NULL );
554
+
537
555
/* Create resource owner */
538
- CurrentResourceOwner = ResourceOwnerCreate (NULL , "jsonbd_worker_launcher " );
556
+ CurrentResourceOwner = ResourceOwnerCreate (NULL , "jsonbd_launcher_main " );
539
557
540
558
/* Init launcher state */
541
559
Assert (workers_data != NULL );
542
560
toc = shm_toc_attach (JSONBD_SHM_MQ_MAGIC , workers_data );
543
561
hdr = shm_toc_lookup (toc , 0 , false);
544
562
worker_state = & hdr -> launcher ;
545
563
564
+ InitLatch (& hdr -> launcher_latch );
546
565
shm_mq_set_receiver (worker_state -> mqin , MyProc );
547
566
shm_mq_set_sender (worker_state -> mqout , MyProc );
548
567
@@ -575,12 +594,18 @@ jsonbd_worker_launcher(void)
575
594
dboid = * ((Oid * ) data );
576
595
577
596
/* start workers for specified database */
578
- for (i = 0 ; i < jsonbd_nworkers ; i ++ )
597
+ for (i = 0 ; i < jsonbd_nworkers ; i ++ , worker_num ++ )
579
598
{
580
599
bool res ;
581
- res = jsonbd_register_worker (worker_num ++ , dboid );
600
+ jsonbd_shm_worker * wd ;
601
+
602
+ res = jsonbd_register_worker (worker_num , dboid , & hdr -> launcher_latch );
582
603
if (res )
583
604
started ++ ;
605
+
606
+ /* point worker semaphore to database semaphore */
607
+ wd = shm_toc_lookup (toc , worker_num , false);
608
+ wd -> dbsem = & hdr -> workers_sem [database_num ];
584
609
}
585
610
}
586
611
@@ -596,6 +621,8 @@ jsonbd_worker_launcher(void)
596
621
597
622
/* we report ok if only one worker has started */
598
623
resmq = shm_mq_sendv (mqh , & ((shm_mq_iovec ) {"y" , 2 }), 1 , false);
624
+
625
+ database_num += 1 ;
599
626
}
600
627
else
601
628
resmq = shm_mq_sendv (mqh , & ((shm_mq_iovec ) {"n" , 2 }), 1 , false);
@@ -604,7 +631,6 @@ jsonbd_worker_launcher(void)
604
631
elog (NOTICE , "jsonbd: backend detached early" );
605
632
606
633
shm_mq_detach (mqh );
607
- MemoryContextReset (worker_context );
608
634
609
635
/* mark we need new handle */
610
636
mqh = NULL ;
@@ -629,12 +655,7 @@ jsonbd_worker_launcher(void)
629
655
void
630
656
jsonbd_worker_main (Datum arg )
631
657
{
632
- Oid dboid ;
633
- int worker_num ;
634
-
635
658
dsm_segment * seg ;
636
- jsonbd_worker_args * worker_args ;
637
- PGPROC * starter ;
638
659
shm_mq_handle * mqh = NULL ;
639
660
640
661
/* Establish signal handlers before unblocking signals */
@@ -643,31 +664,12 @@ jsonbd_worker_main(Datum arg)
643
664
/* We're now ready to receive signals */
644
665
BackgroundWorkerUnblockSignals ();
645
666
646
- /* Initialize connection */
647
- seg = dsm_attach ((dsm_handle ) DatumGetInt32 (arg ));
648
- worker_args = (jsonbd_worker_args * ) dsm_segment_address (seg );
649
- worker_num = worker_args -> worker_num ;
650
- dboid = worker_args -> dboid ;
651
-
652
- /* Connect to our database */
653
- BackgroundWorkerInitializeConnectionByOid (dboid , InvalidOid );
654
-
655
667
/* Create resource owner */
656
668
CurrentResourceOwner = ResourceOwnerCreate (NULL , "jsonbd_worker" );
657
- init_local_variables (worker_num , dboid );
658
669
659
- /* We don't need this segment anymore */
660
- dsm_detach (seg );
661
-
662
- starter = BackendPidGetProc (MyBgworkerEntry -> bgw_notify_pid );
663
- if (starter == NULL )
664
- {
665
- elog (LOG , "launcher has exited prematurely" );
666
- goto end ;
667
- }
668
-
669
- /* Set launcher free */
670
- SetLatch (& starter -> procLatch );
670
+ /* Initialize connection and local variables */
671
+ seg = dsm_attach ((dsm_handle ) DatumGetInt32 (arg ));
672
+ init_worker (seg );
671
673
672
674
MemoryContextSwitchTo (worker_context );
673
675
@@ -758,13 +760,12 @@ jsonbd_worker_main(Datum arg)
758
760
ResetLatch (& MyProc -> procLatch );
759
761
}
760
762
761
- end :
762
763
elog (LOG , "jsonbd dictionary worker has ended its work" );
763
764
proc_exit (0 );
764
765
}
765
766
766
767
static bool
767
- jsonbd_register_worker (int worker_num , Oid dboid )
768
+ jsonbd_register_worker (int worker_num , Oid dboid , volatile Latch * launcher_latch )
768
769
{
769
770
pid_t pid ;
770
771
dsm_segment * seg ;
@@ -811,13 +812,12 @@ jsonbd_register_worker(int worker_num, Oid dboid)
811
812
812
813
/* Wait to be signalled. */
813
814
#if PG_VERSION_NUM >= 100000
814
- WaitLatch (MyLatch , WL_LATCH_SET , 0 , PG_WAIT_EXTENSION );
815
+ WaitLatch (launcher_latch , WL_LATCH_SET , 0 , PG_WAIT_EXTENSION );
815
816
#else
816
- WaitLatch (MyLatch , WL_LATCH_SET , 0 );
817
+ WaitLatch (launcher_latch , WL_LATCH_SET , 0 );
817
818
#endif
818
819
819
- /* Reset the latch so we don't spin. */
820
- ResetLatch (MyLatch );
820
+ ResetLatch (launcher_latch );
821
821
822
822
/* Remove the segment */
823
823
dsm_detach (seg );
@@ -839,7 +839,7 @@ jsonbd_register_launcher(void)
839
839
worker .bgw_restart_time = 0 ;
840
840
worker .bgw_notify_pid = 0 ;
841
841
memcpy (worker .bgw_library_name , "jsonbd" , BGW_MAXLEN );
842
- memcpy (worker .bgw_function_name , CppAsString (jsonbd_worker_launcher ), BGW_MAXLEN );
842
+ memcpy (worker .bgw_function_name , CppAsString (jsonbd_launcher_main ), BGW_MAXLEN );
843
843
snprintf (worker .bgw_name , BGW_MAXLEN , "jsonbd launcher" );
844
844
worker .bgw_main_arg = (Datum ) Int32GetDatum (0 );
845
845
RegisterBackgroundWorker (& worker );
0 commit comments