5
5
#include "postmaster/bgworker.h"
6
6
#include "storage/s_lock.h"
7
7
#include "storage/spin.h"
8
+ #include "storage/proc.h"
8
9
#include "storage/pg_sema.h"
9
10
#include "storage/shmem.h"
10
11
#include "datatype/timestamp.h"
16
17
bool MtmIsLogicalReceiver ;
17
18
int MtmMaxWorkers ;
18
19
20
+ static BgwPool * pool ;
21
+
22
+ static void BgwShutdownWorker (int sig )
23
+ {
24
+ BgwPoolStop (pool );
25
+ }
26
+
19
27
static void BgwPoolMainLoop (BgwPool * pool )
20
28
{
21
29
int size ;
22
30
void * work ;
23
31
static PortalData fakePortal ;
32
+ sigset_t sset ;
24
33
25
34
MtmIsLogicalReceiver = true;
26
35
36
+ signal (SIGINT , BgwShutdownWorker );
37
+ signal (SIGQUIT , BgwShutdownWorker );
38
+ signal (SIGTERM , BgwShutdownWorker );
39
+
40
+ sigfillset (& sset );
41
+ sigprocmask (SIG_UNBLOCK , & sset , NULL );
42
+
27
43
BackgroundWorkerUnblockSignals ();
28
44
BackgroundWorkerInitializeConnection (pool -> dbname , pool -> dbuser );
29
45
ActivePortal = & fakePortal ;
30
46
ActivePortal -> status = PORTAL_ACTIVE ;
31
47
ActivePortal -> sourceText = "" ;
32
48
33
- while (true) {
49
+ while (true) {
34
50
PGSemaphoreLock (& pool -> available );
35
51
SpinLockAcquire (& pool -> lock );
52
+ if (pool -> shutdown ) {
53
+ break ;
54
+ }
36
55
size = * (int * )& pool -> queue [pool -> head ];
37
56
Assert (size < pool -> size );
38
57
work = malloc (size );
@@ -64,6 +83,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
64
83
pool -> lastPeakTime = 0 ;
65
84
SpinLockRelease (& pool -> lock );
66
85
}
86
+ SpinLockRelease (& pool -> lock );
67
87
}
68
88
69
89
void BgwPoolInit (BgwPool * pool , BgwPoolExecutor executor , char const * dbname , char const * dbuser , size_t queueSize , size_t nWorkers )
@@ -75,6 +95,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
75
95
PGSemaphoreReset (& pool -> available );
76
96
PGSemaphoreReset (& pool -> overflow );
77
97
SpinLockInit (& pool -> lock );
98
+ pool -> shutdown = false;
78
99
pool -> producerBlocked = false;
79
100
pool -> head = 0 ;
80
101
pool -> tail = 0 ;
@@ -167,7 +188,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
167
188
}
168
189
169
190
SpinLockAcquire (& pool -> lock );
170
- while (true ) {
191
+ while (! pool -> shutdown ) {
171
192
if ((pool -> head <= pool -> tail && pool -> size - pool -> tail < size + 4 && pool -> head < size )
172
193
|| (pool -> head > pool -> tail && pool -> head - pool -> tail < size + 4 ))
173
194
{
@@ -204,3 +225,11 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
204
225
SpinLockRelease (& pool -> lock );
205
226
}
206
227
228
+ void BgwPoolStop (BgwPool * pool )
229
+ {
230
+ SpinLockAcquire (& pool -> lock );
231
+ pool -> shutdown = true;
232
+ SpinLockRelease (& pool -> lock );
233
+ PGSemaphoreUnlock (& pool -> available );
234
+ PGSemaphoreUnlock (& pool -> overflow );
235
+ }
0 commit comments