26
26
27
27
28
28
#define MSGLEN (INTALIGN(size) + payload + sizeof(int))
29
- #define MinSizeOfPoolState offsetof(PoolState, queue)
30
29
31
- int MtmQueueSize ;
32
30
bool MtmIsPoolWorker ;
33
31
bool MtmIsLogicalReceiver ;
34
32
int MtmMaxWorkers ;
35
33
36
- static PoolState * MtmPool ;
34
+ static BgwPool * MtmPool ;
35
+
36
+ /* DSM Queue shared between receiver and its workers */
37
+ static char * queue = NULL ;
37
38
38
39
void BgwPoolDynamicWorkerMainLoop (Datum arg );
39
40
41
+
42
+ void
43
+ BgwPoolInit (BgwPool * pool )
44
+ {
45
+ SpinLockInit (& pool -> lock );
46
+ pool -> nWorkers = 0 ;
47
+ pool -> shutdown = false;
48
+ pool -> producerBlocked = false;
49
+ pool -> head = 0 ;
50
+ pool -> tail = 0 ;
51
+ pool -> active = 0 ;
52
+ pool -> pending = 0 ;
53
+ pool -> size = 0 ;
54
+ pool -> lastDynamicWorkerStartTime = 0 ;
55
+ ConditionVariableInit (& pool -> syncpoint_cv );
56
+ ConditionVariableInit (& pool -> available_cv );
57
+ ConditionVariableInit (& pool -> overflow_cv );
58
+ pool -> bgwhandles = (BackgroundWorkerHandle * * ) ShmemAlloc (MtmMaxWorkers *
59
+ sizeof (BackgroundWorkerHandle * ));
60
+ }
61
+
62
+ /*
63
+ * Call at the start the multimaster WAL receiver.
64
+ */
65
+ void
66
+ BgwPoolStart (BgwPool * pool , char * poolName , Oid db_id , Oid user_id )
67
+ {
68
+ dsm_segment * seg ;
69
+ size_t size = INTALIGN (MtmTransSpillThreshold * 1024L * 2 );
70
+
71
+ /* ToDo: remember a segment creation failure (and NULL) case. */
72
+ seg = dsm_create (size , 0 );
73
+ Assert (seg != NULL );
74
+ dsm_pin_segment (seg );
75
+ dsm_pin_mapping (seg );
76
+ pool -> dsmhandler = dsm_segment_handle (seg );
77
+ queue = (char * ) dsm_segment_address (seg );
78
+ Assert (queue != NULL );
79
+
80
+ strncpy (pool -> poolName , poolName , MAX_NAME_LEN );
81
+ pool -> db_id = db_id ;
82
+ pool -> user_id = user_id ;
83
+ pool -> size = size ;
84
+ }
85
+
40
86
static void
41
87
BgwShutdownHandler (int sig )
42
88
{
@@ -57,25 +103,22 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
57
103
}
58
104
59
105
static void
60
- BgwPoolMainLoop (dsm_handle pool_handler )
106
+ BgwPoolMainLoop (BgwPool * poolDesc )
61
107
{
62
108
int size ;
63
109
void * work ;
64
110
int payload = INTALIGN (sizeof (MtmReceiverContext ));
65
111
MtmReceiverContext ctx ;
66
112
static PortalData fakePortal ;
67
- Oid db_id ;
68
- Oid user_id ;
69
113
dsm_segment * seg ;
70
- PoolState * pool ;
71
114
72
115
/* Connect to the queue */
73
- Assert (!dsm_find_mapping (pool_handler ));
74
- seg = dsm_attach (pool_handler );
75
- dsm_pin_mapping (seg );
76
- pool = dsm_segment_address ( seg ) ;
77
- MtmPool = pool ;
78
- mtm_log ( BgwPoolEvent , "[%d] Start background worker shutdown=%d" , MyProcPid , pool -> shutdown );
116
+ Assert (!dsm_find_mapping (poolDesc -> dsmhandler ));
117
+ seg = dsm_attach (poolDesc -> dsmhandler );
118
+ queue = dsm_segment_address (seg );
119
+ MtmPool = poolDesc ;
120
+ mtm_log ( BgwPoolEvent , "[%d] Start background worker shutdown=%d" ,
121
+ MyProcPid , poolDesc -> shutdown );
79
122
80
123
MtmIsPoolWorker = true;
81
124
@@ -94,9 +137,7 @@ BgwPoolMainLoop(dsm_handle pool_handler)
94
137
// from on_shem_exit_hook
95
138
96
139
BackgroundWorkerUnblockSignals ();
97
- memcpy (& db_id , MyBgworkerEntry -> bgw_extra , sizeof (Oid ));
98
- memcpy (& user_id , MyBgworkerEntry -> bgw_extra + sizeof (Oid ), sizeof (Oid ));
99
- BackgroundWorkerInitializeConnectionByOid (db_id , user_id , 0 );
140
+ BackgroundWorkerInitializeConnectionByOid (poolDesc -> db_id , poolDesc -> user_id , 0 );
100
141
ActivePortal = & fakePortal ;
101
142
ActivePortal -> status = PORTAL_ACTIVE ;
102
143
ActivePortal -> sourceText = "" ;
@@ -116,17 +157,17 @@ BgwPoolMainLoop(dsm_handle pool_handler)
116
157
}
117
158
118
159
// XXX: change to LWLock
119
- SpinLockAcquire (& pool -> lock );
160
+ SpinLockAcquire (& poolDesc -> lock );
120
161
121
162
/* Worker caught the shutdown signal - release locks and return. */
122
- if (pool -> shutdown )
163
+ if (poolDesc -> shutdown )
123
164
{
124
- SpinLockRelease (& pool -> lock );
165
+ SpinLockRelease (& poolDesc -> lock );
125
166
break ;
126
167
}
127
168
128
169
/* Empty queue */
129
- if (pool -> head == pool -> tail )
170
+ if (poolDesc -> head == poolDesc -> tail )
130
171
{
131
172
/*
132
173
* We need to prepare conditional variable before release of the
@@ -135,74 +176,70 @@ BgwPoolMainLoop(dsm_handle pool_handler)
135
176
* sleep preparation worker will go to a sleep and receiver will
136
177
* remain in opinion, that worker waked up and doing its work.
137
178
*/
138
- ConditionVariablePrepareToSleep (& pool -> available_cv );
139
- SpinLockRelease (& pool -> lock );
140
- /*
141
- * TODO: At this point receiver may have enough time to set shutdown
142
- * sign, call ConditionVariableBroadcast(), and return.
143
- * In this case worker never exit frim the sleep.
144
- */
145
- ConditionVariableSleep (& pool -> available_cv , PG_WAIT_EXTENSION );
179
+ ConditionVariablePrepareToSleep (& poolDesc -> available_cv );
180
+ SpinLockRelease (& poolDesc -> lock );
181
+
182
+ ConditionVariableSleep (& poolDesc -> available_cv , PG_WAIT_EXTENSION );
146
183
continue ;
147
184
}
148
185
149
186
/* Wait for end of the node joining operation */
150
- while (pool -> n_holders > 0 && !pool -> shutdown )
187
+ while (poolDesc -> n_holders > 0 && !poolDesc -> shutdown )
151
188
{
152
- SpinLockRelease (& pool -> lock );
189
+ SpinLockRelease (& poolDesc -> lock );
153
190
ConditionVariableSleep (& Mtm -> receiver_barrier_cv , PG_WAIT_EXTENSION );
154
- SpinLockAcquire (& pool -> lock );
191
+ SpinLockAcquire (& poolDesc -> lock );
155
192
}
156
193
157
- size = * (int * ) & pool -> queue [pool -> head ];
158
- Assert (size < pool -> size );
194
+ size = * (int * ) & queue [poolDesc -> head ];
195
+ Assert (size < poolDesc -> size );
159
196
work = palloc (size );
160
- pool -> pending -= 1 ;
161
- pool -> active += 1 ;
197
+ poolDesc -> pending -= 1 ;
198
+ poolDesc -> active += 1 ;
162
199
163
- if (pool -> head + MSGLEN > pool -> size )
200
+ if (poolDesc -> head + MSGLEN > poolDesc -> size )
164
201
{
165
- ctx = * (MtmReceiverContext * ) & pool -> queue ;
166
- memcpy (work , & pool -> queue [payload ], size );
167
- pool -> head = payload + INTALIGN (size );
202
+ ctx = * (MtmReceiverContext * ) & queue ;
203
+ memcpy (work , & queue [payload ], size );
204
+ poolDesc -> head = payload + INTALIGN (size );
168
205
}
169
206
else
170
207
{
171
- memcpy (& ctx , & pool -> queue [pool -> head + sizeof (int )], payload );
172
- memcpy (work , & pool -> queue [pool -> head + sizeof (int ) + payload ], size );
173
- pool -> head += MSGLEN ;
208
+ memcpy (& ctx , & queue [poolDesc -> head + sizeof (int )], payload );
209
+ memcpy (work , & queue [poolDesc -> head + sizeof (int ) + payload ], size );
210
+ poolDesc -> head += MSGLEN ;
174
211
}
175
212
176
213
/* wrap head */
177
- if (pool -> head == pool -> size )
178
- pool -> head = 0 ;
214
+ if (poolDesc -> head == poolDesc -> size )
215
+ poolDesc -> head = 0 ;
179
216
180
217
/*
181
218
* We should reset head and tail in order to accept messages bigger
182
219
* than half of buffer size.
183
220
*/
184
- if (pool -> head == pool -> tail )
221
+ if (poolDesc -> head == poolDesc -> tail )
185
222
{
186
- pool -> head = 0 ;
187
- pool -> tail = 0 ;
223
+ poolDesc -> head = 0 ;
224
+ poolDesc -> tail = 0 ;
188
225
}
189
226
190
- if (pool -> producerBlocked )
227
+ if (poolDesc -> producerBlocked )
191
228
{
192
- pool -> producerBlocked = false;
193
- ConditionVariableBroadcast (& pool -> overflow_cv );
229
+ poolDesc -> producerBlocked = false;
230
+ ConditionVariableBroadcast (& poolDesc -> overflow_cv );
194
231
}
195
232
196
- SpinLockRelease (& pool -> lock );
233
+ SpinLockRelease (& poolDesc -> lock );
197
234
198
235
MtmExecutor (work , size , & ctx );
199
236
pfree (work );
200
237
201
- SpinLockAcquire (& pool -> lock );
202
- pool -> active -= 1 ;
203
- SpinLockRelease (& pool -> lock );
238
+ SpinLockAcquire (& poolDesc -> lock );
239
+ poolDesc -> active -= 1 ;
240
+ SpinLockRelease (& poolDesc -> lock );
204
241
205
- ConditionVariableBroadcast (& pool -> syncpoint_cv );
242
+ ConditionVariableBroadcast (& poolDesc -> syncpoint_cv );
206
243
}
207
244
208
245
dsm_detach (seg );
@@ -211,56 +248,9 @@ BgwPoolMainLoop(dsm_handle pool_handler)
211
248
212
249
void BgwPoolDynamicWorkerMainLoop (Datum arg )
213
250
{
214
- BgwPoolMainLoop ((dsm_handle ) DatumGetUInt32 (arg ));
251
+ BgwPoolMainLoop ((BgwPool * ) DatumGetPointer (arg ));
215
252
}
216
253
217
- /*
218
- * Call at the start the multimaster WAL receiver.
219
- */
220
- void
221
- BgwPoolStart (BgwPool * pool , char * poolName , Oid db_id , Oid user_id )
222
- {
223
- dsm_segment * seg ;
224
- size_t size = INTALIGN (MtmQueueSize );
225
-
226
- /* ToDo: remember a segment creation failure (and NULL) case. */
227
- seg = dsm_create (MinSizeOfPoolState + size , 0 );
228
- Assert (seg != NULL );
229
- dsm_pin_segment (seg );
230
- dsm_pin_mapping (seg );
231
- pool -> pool_handler = dsm_segment_handle (seg );
232
- pool -> state = (PoolState * ) dsm_segment_address (seg );
233
- Assert (pool -> state != NULL );
234
-
235
- SpinLockInit (& pool -> state -> lock );
236
- ConditionVariableInit (& pool -> state -> available_cv );
237
- ConditionVariableInit (& pool -> state -> overflow_cv );
238
- ConditionVariableInit (& pool -> state -> syncpoint_cv );
239
-
240
- strncpy (pool -> poolName , poolName , MAX_NAME_LEN );
241
- pool -> db_id = db_id ;
242
- pool -> user_id = user_id ;
243
- pool -> nWorkers = 0 ;
244
- pool -> state -> shutdown = false;
245
- pool -> state -> producerBlocked = false;
246
- pool -> state -> head = 0 ;
247
- pool -> state -> tail = 0 ;
248
- pool -> state -> active = 0 ;
249
- pool -> state -> pending = 0 ;
250
- pool -> state -> size = size ;
251
- pool -> lastDynamicWorkerStartTime = 0 ;
252
- }
253
-
254
- size_t PoolStateGetQueueSize (PoolState * pool )
255
- {
256
- size_t used ;
257
- SpinLockAcquire (& pool -> lock );
258
- used = pool -> head <= pool -> tail ? pool -> tail - pool -> head : pool -> size - pool -> head + pool -> tail ;
259
- SpinLockRelease (& pool -> lock );
260
- return used ;
261
- }
262
-
263
-
264
254
static void BgwStartExtraWorker (BgwPool * pool )
265
255
{
266
256
BackgroundWorker worker ;
@@ -274,9 +264,7 @@ static void BgwStartExtraWorker(BgwPool* pool)
274
264
worker .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION ;
275
265
worker .bgw_start_time = BgWorkerStart_ConsistentState ;
276
266
worker .bgw_restart_time = BGW_NEVER_RESTART ;
277
- worker .bgw_main_arg = UInt32GetDatum (pool -> pool_handler );
278
- memcpy (worker .bgw_extra , & pool -> db_id , sizeof (Oid ));
279
- memcpy (worker .bgw_extra + sizeof (Oid ), & pool -> user_id , sizeof (Oid ));
267
+ worker .bgw_main_arg = PointerGetDatum (pool );
280
268
sprintf (worker .bgw_library_name , "multimaster" );
281
269
sprintf (worker .bgw_function_name , "BgwPoolDynamicWorkerMainLoop" );
282
270
snprintf (worker .bgw_name , BGW_MAXLEN , "%s-dynworker-%d" , pool -> poolName , (int ) pool -> nWorkers + 1 );
@@ -299,12 +287,13 @@ static void BgwStartExtraWorker(BgwPool* pool)
299
287
* After return from routine work and ctx buffers can be reused safely.
300
288
*/
301
289
void
302
- BgwPoolExecute (BgwPool * bgwpool , void * work , int size , MtmReceiverContext * ctx )
290
+ BgwPoolExecute (BgwPool * pool , void * work , int size , MtmReceiverContext * ctx )
303
291
{
304
- PoolState * pool = bgwpool -> state ;
305
- int payload = INTALIGN (sizeof (MtmReceiverContext ));
292
+ int payload = INTALIGN (sizeof (MtmReceiverContext ));
306
293
307
294
Assert (pool != NULL );
295
+ Assert (queue != NULL );
296
+
308
297
// XXX: align with spill size and assert that
309
298
if (MSGLEN > pool -> size )
310
299
{
@@ -333,27 +322,27 @@ BgwPoolExecute(BgwPool* bgwpool, void* work, int size, MtmReceiverContext *ctx)
333
322
{
334
323
pool -> pending += 1 ;
335
324
336
- if (pool -> active + pool -> pending > bgwpool -> nWorkers )
337
- BgwStartExtraWorker (bgwpool );
325
+ if (pool -> active + pool -> pending > pool -> nWorkers )
326
+ BgwStartExtraWorker (pool );
338
327
339
328
/*
340
329
* We always have free space for size at tail, as everything is
341
330
* int-aligned and when pool->tail becomes equal to pool->size it
342
331
* is switched to zero.
343
332
*/
344
- * (int * ) & pool -> queue [pool -> tail ] = size ;
333
+ * (int * ) & queue [pool -> tail ] = size ;
345
334
346
335
if (pool -> size - pool -> tail >= MSGLEN )
347
336
{
348
- memcpy (& pool -> queue [pool -> tail + sizeof (int )], ctx , payload );
349
- memcpy (& pool -> queue [pool -> tail + sizeof (int ) + payload ], work , size );
337
+ memcpy (& queue [pool -> tail + sizeof (int )], ctx , payload );
338
+ memcpy (& queue [pool -> tail + sizeof (int ) + payload ], work , size );
350
339
pool -> tail += MSGLEN ;
351
340
}
352
341
else
353
342
{
354
343
/* Message can't fit into the end of queue. */
355
- memcpy (pool -> queue , ctx , payload );
356
- memcpy (& pool -> queue [payload ], work , size );
344
+ memcpy (queue , ctx , payload );
345
+ memcpy (& queue [payload ], work , size );
357
346
pool -> tail = MSGLEN - sizeof (int );
358
347
}
359
348
@@ -377,10 +366,10 @@ BgwPoolExecute(BgwPool* bgwpool, void* work, int size, MtmReceiverContext *ctx)
377
366
}
378
367
379
368
/*
380
- * Initiate shutdown process of the worker : set shutdown sign and wake up all
381
- * another workers.
369
+ * Initiate shutdown process of workers : set shutdown sign and wake up all
370
+ * workers.
382
371
*/
383
- void PoolStateShutdown (PoolState * pool )
372
+ void PoolStateShutdown (BgwPool * pool )
384
373
{
385
374
SpinLockAcquire (& pool -> lock );
386
375
pool -> shutdown = true;
@@ -396,6 +385,7 @@ BgwPoolCancel(BgwPool* pool)
396
385
{
397
386
int i ;
398
387
388
+ SpinLockAcquire (& pool -> lock );
399
389
for (i = 0 ; i < pool -> nWorkers ; i ++ )
400
390
{
401
391
BgwHandleStatus status ;
@@ -408,4 +398,5 @@ BgwPoolCancel(BgwPool* pool)
408
398
kill (pid , SIGINT );
409
399
}
410
400
}
401
+ SpinLockRelease (& pool -> lock );
411
402
}
0 commit comments