Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0f3797c

Browse files
committed
Fixes for shared memqueue in receiver
1 parent fd54999 commit 0f3797c

File tree

4 files changed

+84
-32
lines changed

4 files changed

+84
-32
lines changed

expected/multimaster.out

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,13 @@ table twopc_test2;
303303
2
304304
(1 row)
305305

306+
-- check ring buffer in receiver
307+
CREATE TABLE bmscantest (a int, b int, t text);
308+
-- that tx is approx 4mb and move rb tail to the center
309+
INSERT INTO bmscantest
310+
SELECT (r%53), (r%59), 'foooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo'
311+
FROM generate_series(1,40000) r;
312+
-- that tx is approx 9mb and will not fit neither before head nor after tail
313+
INSERT INTO bmscantest
314+
SELECT (r%53), (r%59), 'foooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo'
315+
FROM generate_series(1,70000) r;

sql/multimaster.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,15 @@ table twopc_test;
230230
table twopc_test2;
231231

232232

233+
-- check ring buffer in receiver
234+
CREATE TABLE bmscantest (a int, b int, t text);
235+
236+
-- that tx is approx 4mb and move rb tail to the center
237+
INSERT INTO bmscantest
238+
SELECT (r%53), (r%59), 'foooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo'
239+
FROM generate_series(1,40000) r;
240+
241+
-- that tx is approx 9mb and will not fit neither before head nor after tail
242+
INSERT INTO bmscantest
243+
SELECT (r%53), (r%59), 'foooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo'
244+
FROM generate_series(1,70000) r;

src/bgwpool.c

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ BgwPoolMainLoop(BgwPool* pool)
5454
{
5555
int size;
5656
void* work;
57-
size_t payload = sizeof(MtmReceiverContext) + sizeof(size_t);
57+
int payload = INTALIGN(sizeof(MtmReceiverContext));
5858
MtmReceiverContext ctx;
5959
static PortalData fakePortal;
6060

@@ -102,7 +102,6 @@ BgwPoolMainLoop(BgwPool* pool)
102102
break;
103103
}
104104
size = * (int *) &pool->queue[pool->head];
105-
ctx = * (MtmReceiverContext *) &pool->queue[pool->head + sizeof(size_t)];
106105

107106
Assert(size < pool->size);
108107
work = palloc(size);
@@ -111,20 +110,33 @@ BgwPoolMainLoop(BgwPool* pool)
111110
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0)
112111
pool->lastPeakTime = MtmGetSystemTime();
113112

114-
if (pool->head + size + payload > pool->size)
113+
if (pool->head + size + payload + sizeof(int) > pool->size)
115114
{
116-
memcpy(work, pool->queue, size);
117-
pool->head = INTALIGN(size);
115+
ctx = * (MtmReceiverContext *) &pool->queue;
116+
memcpy(work, &pool->queue[payload], size);
117+
pool->head = payload + INTALIGN(size);
118118
}
119119
else
120120
{
121-
memcpy(work, &pool->queue[pool->head + payload], size);
122-
pool->head += payload + INTALIGN(size);
121+
memcpy(&ctx, &pool->queue[pool->head + sizeof(int)], payload);
122+
memcpy(work, &pool->queue[pool->head + sizeof(int) + payload], size);
123+
pool->head += sizeof(int) + payload + INTALIGN(size);
123124
}
124125

125-
if (pool->size == pool->head)
126+
/* wrap head */
127+
if (pool->head == pool->size)
126128
pool->head = 0;
127129

130+
/*
131+
* We should reset head and tail in order to accept messages bigger
132+
* than half of buffer size.
133+
*/
134+
if (pool->head == pool->tail)
135+
{
136+
pool->head = 0;
137+
pool->tail = 0;
138+
}
139+
128140
if (pool->producerBlocked)
129141
{
130142
pool->producerBlocked = false;
@@ -160,7 +172,7 @@ BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, size_t queueSize, size_t nW
160172
MtmPool = pool;
161173

162174
pool->bgwhandles = (BackgroundWorkerHandle **) ShmemAlloc(MtmMaxWorkers * sizeof(BackgroundWorkerHandle *));
163-
pool->queue = (char*)ShmemAlloc(queueSize);
175+
pool->queue = (char*)ShmemAlloc(INTALIGN(queueSize));
164176
if (pool->queue == NULL) {
165177
elog(PANIC, "Failed to allocate memory for background workers pool: %zd bytes requested", queueSize);
166178
}
@@ -252,11 +264,12 @@ static void BgwStartExtraWorker(BgwPool* pool)
252264
}
253265

254266
void
255-
BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx)
267+
BgwPoolExecute(BgwPool* pool, void* work, int size, MtmReceiverContext *ctx)
256268
{
257-
size_t payload = sizeof(MtmReceiverContext) + sizeof(size_t);
269+
int payload = INTALIGN(sizeof(MtmReceiverContext));
258270

259-
if (size + payload > pool->size)
271+
// XXX: align with spill size and assert that
272+
if (size + sizeof(int) + payload > pool->size)
260273
{
261274
/*
262275
* Size of work is larger than size of shared buffer:
@@ -269,18 +282,19 @@ BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx)
269282
SpinLockAcquire(&pool->lock);
270283
while (!pool->shutdown)
271284
{
272-
if ((pool->head <= pool->tail && pool->size - pool->tail < size + payload && pool->head < size)
273-
|| (pool->head > pool->tail && pool->head - pool->tail < size + payload))
274-
{
275-
if (pool->lastPeakTime == 0)
276-
pool->lastPeakTime = MtmGetSystemTime();
277-
278-
pool->producerBlocked = true;
279-
SpinLockRelease(&pool->lock);
280-
PGSemaphoreLock(pool->overflow);
281-
SpinLockAcquire(&pool->lock);
282-
}
283-
else
285+
/*
286+
* If queue is not wrapped through the end of buffer (head <= tail) we can
287+
* fit message either to the end (between tail and pool->size) or to the
288+
* beginning (between queue beginning and head). In both cases we can fit
289+
* size word after the tail.
290+
* If queue is wrapped through the end of buffer (tail < head) we can fit
291+
* message only between head and tail.
292+
*/
293+
if ((pool->head <= pool->tail &&
294+
(pool->size - pool->tail >= size + payload + sizeof(int) ||
295+
pool->head >= size + payload))
296+
|| (pool->head > pool->tail &&
297+
pool->head - pool->tail >= size + payload + sizeof(int)))
284298
{
285299
pool->pending += 1;
286300

@@ -290,18 +304,24 @@ BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx)
290304
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0)
291305
pool->lastPeakTime = MtmGetSystemTime();
292306

293-
*(int *)&pool->queue[pool->tail] = size;
294-
*(MtmReceiverContext *)&pool->queue[pool->tail + sizeof(size_t)] = *ctx;
307+
/*
308+
* We always have free space for size at tail, as everything is
309+
* int-aligded and when pool->tail becomes equal to pool->size it
310+
* is switched to zero.
311+
*/
312+
*(int *) &pool->queue[pool->tail] = size;
295313

296-
if (pool->size - pool->tail >= size + payload)
314+
if (pool->size - pool->tail >= payload + size + sizeof(int))
297315
{
298-
memcpy(&pool->queue[pool->tail + payload], work, size);
299-
pool->tail += payload + INTALIGN(size);
316+
memcpy(&pool->queue[pool->tail + sizeof(int)], ctx, payload);
317+
memcpy(&pool->queue[pool->tail + sizeof(int) + payload], work, size);
318+
pool->tail += sizeof(int) + payload + INTALIGN(size);
300319
}
301320
else
302321
{
303-
memcpy(pool->queue, work, size);
304-
pool->tail = INTALIGN(size);
322+
memcpy(pool->queue, ctx, payload);
323+
memcpy(&pool->queue[payload], work, size);
324+
pool->tail = payload + INTALIGN(size);
305325
}
306326

307327
if (pool->tail == pool->size)
@@ -310,6 +330,16 @@ BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx)
310330
PGSemaphoreUnlock(pool->available);
311331
break;
312332
}
333+
else
334+
{
335+
if (pool->lastPeakTime == 0)
336+
pool->lastPeakTime = MtmGetSystemTime();
337+
338+
pool->producerBlocked = true;
339+
SpinLockRelease(&pool->lock);
340+
PGSemaphoreLock(pool->overflow);
341+
SpinLockAcquire(&pool->lock);
342+
}
313343
}
314344
SpinLockRelease(&pool->lock);
315345
}

src/include/bgwpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ extern void BgwPoolStart(BgwPool* pool, char *poolName, Oid db_id, Oid user_id);
5252

5353
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, size_t queueSize, size_t nWorkers);
5454

55-
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx);
55+
extern void BgwPoolExecute(BgwPool* pool, void* work, int size, MtmReceiverContext *ctx);
5656

5757
extern size_t BgwPoolGetQueueSize(BgwPool* pool);
5858

0 commit comments

Comments
 (0)