Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4db3744

Browse files
committed
Test code for shared memory message queue facility.
This code is intended as a demonstration of how the dynamic shared memory and dynamic background worker facilities can be used to establish a group of coooperating processes which can coordinate their activities using the shared memory message queue facility. By itself, the code does nothing particularly interesting: it simply allows messages to be passed through a loop of workers and back to the original process. But it's a useful unit test, in addition to its demonstration value.
1 parent ec9037d commit 4db3744

File tree

11 files changed

+932
-0
lines changed

11 files changed

+932
-0
lines changed

contrib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ SUBDIRS = \
5151
tablefunc \
5252
tcn \
5353
test_parser \
54+
test_shm_mq \
5455
tsearch2 \
5556
unaccent \
5657
vacuumlo \

contrib/test_shm_mq/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Generated subdirectories
2+
/log/
3+
/results/
4+
/tmp_check/

contrib/test_shm_mq/Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# contrib/test_shm_mq/Makefile
2+
3+
MODULE_big = test_shm_mq
4+
OBJS = test.o setup.o worker.o
5+
6+
EXTENSION = test_shm_mq
7+
DATA = test_shm_mq--1.0.sql
8+
9+
REGRESS = test_shm_mq
10+
11+
ifdef USE_PGXS
12+
PG_CONFIG = pg_config
13+
PGXS := $(shell $(PG_CONFIG) --pgxs)
14+
include $(PGXS)
15+
else
16+
subdir = contrib/test_shm_mq
17+
top_builddir = ../..
18+
include $(top_builddir)/src/Makefile.global
19+
include $(top_srcdir)/contrib/contrib-global.mk
20+
endif
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE EXTENSION test_shm_mq;
2+
--
3+
-- These tests don't produce any interesting output. We're checking that
4+
-- the operations complete without crashing or hanging and that none of their
5+
-- internal sanity tests fail.
6+
--
7+
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,400)), 10000, 1);
8+
test_shm_mq
9+
-------------
10+
11+
(1 row)
12+
13+
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,270000)), 200, 3);
14+
test_shm_mq_pipelined
15+
-----------------------
16+
17+
(1 row)
18+

contrib/test_shm_mq/setup.c

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
/*--------------------------------------------------------------------------
2+
*
3+
* setup.c
4+
* Code to set up a dynamic shared memory segments and a specified
5+
* number of background workers for shared memory message queue
6+
* testing.
7+
*
8+
* Copyright (C) 2013, PostgreSQL Global Development Group
9+
*
10+
* IDENTIFICATION
11+
* contrib/test_shm_mq/setup.c
12+
*
13+
* -------------------------------------------------------------------------
14+
*/
15+
16+
#include "postgres.h"
17+
18+
#include "miscadmin.h"
19+
#include "postmaster/bgworker.h"
20+
#include "storage/procsignal.h"
21+
#include "storage/shm_toc.h"
22+
#include "utils/memutils.h"
23+
24+
#include "test_shm_mq.h"
25+
26+
typedef struct
27+
{
28+
int nworkers;
29+
BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
30+
} worker_state;
31+
32+
static void setup_dynamic_shared_memory(uint64 queue_size, int nworkers,
33+
dsm_segment **segp,
34+
test_shm_mq_header **hdrp,
35+
shm_mq **outp, shm_mq **inp);
36+
static worker_state *setup_background_workers(int nworkers,
37+
dsm_segment *seg);
38+
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
39+
static void wait_for_workers_to_become_ready(worker_state *wstate,
40+
volatile test_shm_mq_header *hdr);
41+
static bool check_worker_status(worker_state *wstate);
42+
43+
/*
44+
* Set up a dynamic shared memory segment and zero or more background workers
45+
* for a test run.
46+
*/
47+
void
48+
test_shm_mq_setup(uint64 queue_size, int32 nworkers, dsm_segment **segp,
49+
shm_mq_handle **output, shm_mq_handle **input)
50+
{
51+
dsm_segment *seg;
52+
test_shm_mq_header *hdr;
53+
shm_mq *outq;
54+
shm_mq *inq;
55+
worker_state *wstate;
56+
57+
/* Set up a dynamic shared memory segment. */
58+
setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
59+
*segp = seg;
60+
61+
/* Register background workers. */
62+
wstate = setup_background_workers(nworkers, seg);
63+
64+
/* Attach the queues. */
65+
*output = shm_mq_attach(outq, seg, wstate->handle[0]);
66+
*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
67+
68+
/* Wait for workers to become ready. */
69+
wait_for_workers_to_become_ready(wstate, hdr);
70+
71+
/*
72+
* Once we reach this point, all workers are ready. We no longer need
73+
* to kill them if we die; they'll die on their own as the message queues
74+
* shut down.
75+
*/
76+
cancel_on_dsm_detach(seg, cleanup_background_workers,
77+
PointerGetDatum(wstate));
78+
pfree(wstate);
79+
}
80+
81+
/*
82+
* Set up a dynamic shared memory segment.
83+
*
84+
* We set up a small control region that contains only a test_shm_mq_header,
85+
* plus one region per message queue. There are as many message queues as
86+
* the number of workers, plus one.
87+
*/
88+
static void
89+
setup_dynamic_shared_memory(uint64 queue_size, int nworkers,
90+
dsm_segment **segp, test_shm_mq_header **hdrp,
91+
shm_mq **outp, shm_mq **inp)
92+
{
93+
shm_toc_estimator e;
94+
int i;
95+
uint64 segsize;
96+
dsm_segment *seg;
97+
shm_toc *toc;
98+
test_shm_mq_header *hdr;
99+
100+
/* Ensure a valid queue size. */
101+
if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
102+
ereport(ERROR,
103+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104+
errmsg("queue size must be at least " UINT64_FORMAT " bytes",
105+
shm_mq_minimum_size)));
106+
107+
/*
108+
* Estimate how much shared memory we need.
109+
*
110+
* Because the TOC machinery may choose to insert padding of oddly-sized
111+
* requests, we must estimate each chunk separately.
112+
*
113+
* We need one key to register the location of the header, and we need
114+
* nworkers + 1 keys to track the locations of the message queues.
115+
*/
116+
shm_toc_initialize_estimator(&e);
117+
shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
118+
for (i = 0; i <= nworkers; ++i)
119+
shm_toc_estimate_chunk(&e, queue_size);
120+
shm_toc_estimate_keys(&e, 2 + nworkers);
121+
segsize = shm_toc_estimate(&e);
122+
123+
/* Create the shared memory segment and establish a table of contents. */
124+
seg = dsm_create(shm_toc_estimate(&e));
125+
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
126+
segsize);
127+
128+
/* Set up the header region. */
129+
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
130+
SpinLockInit(&hdr->mutex);
131+
hdr->workers_total = nworkers;
132+
hdr->workers_attached = 0;
133+
hdr->workers_ready = 0;
134+
shm_toc_insert(toc, 0, hdr);
135+
136+
/* Set up one message queue per worker, plus one. */
137+
for (i = 0; i <= nworkers; ++i)
138+
{
139+
shm_mq *mq;
140+
141+
mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
142+
shm_toc_insert(toc, i + 1, mq);
143+
144+
if (i == 0)
145+
{
146+
/* We send messages to the first queue. */
147+
shm_mq_set_sender(mq, MyProc);
148+
*outp = mq;
149+
}
150+
if (i == nworkers)
151+
{
152+
/* We receive messages from the last queue. */
153+
shm_mq_set_receiver(mq, MyProc);
154+
*inp = mq;
155+
}
156+
}
157+
158+
/* Return results to caller. */
159+
*segp = seg;
160+
*hdrp = hdr;
161+
}
162+
163+
/*
164+
* Register background workers.
165+
*/
166+
static worker_state *
167+
setup_background_workers(int nworkers, dsm_segment *seg)
168+
{
169+
MemoryContext oldcontext;
170+
BackgroundWorker worker;
171+
worker_state *wstate;
172+
int i;
173+
174+
/*
175+
* We need the worker_state object and the background worker handles to
176+
* which it points to be allocated in CurTransactionContext rather than
177+
* ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
178+
* hooks run.
179+
*/
180+
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
181+
182+
/* Create worker state object. */
183+
wstate = MemoryContextAlloc(TopTransactionContext,
184+
offsetof(worker_state, handle) +
185+
sizeof(BackgroundWorkerHandle *) * nworkers);
186+
wstate->nworkers = 0;
187+
188+
/*
189+
* Arrange to kill all the workers if we abort before all workers are
190+
* finished hooking themselves up to the dynamic shared memory segment.
191+
*
192+
* If we die after all the workers have finished hooking themselves up
193+
* to the dynamic shared memory segment, we'll mark the two queues to
194+
* which we're directly connected as detached, and the worker(s)
195+
* connected to those queues will exit, marking any other queues to
196+
* which they are connected as detached. This will cause any
197+
* as-yet-unaware workers connected to those queues to exit in their
198+
* turn, and so on, until everybody exits.
199+
*
200+
* But suppose the workers which are supposed to connect to the queues
201+
* to which we're directly attached exit due to some error before they
202+
* actually attach the queues. The remaining workers will have no way of
203+
* knowing this. From their perspective, they're still waiting for those
204+
* workers to start, when in fact they've already died.
205+
*/
206+
on_dsm_detach(seg, cleanup_background_workers,
207+
PointerGetDatum(wstate));
208+
209+
/* Configure a worker. */
210+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
211+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
212+
worker.bgw_restart_time = BGW_NEVER_RESTART;
213+
worker.bgw_main = NULL; /* new worker might not have library loaded */
214+
sprintf(worker.bgw_library_name, "test_shm_mq");
215+
sprintf(worker.bgw_function_name, "test_shm_mq_main");
216+
snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq");
217+
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
218+
/* set bgw_notify_pid, so we can detect if the worker stops */
219+
worker.bgw_notify_pid = MyProcPid;
220+
221+
/* Register the workers. */
222+
for (i = 0; i < nworkers; ++i)
223+
{
224+
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
225+
ereport(ERROR,
226+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
227+
errmsg("could not register background process"),
228+
errhint("You may need to increase max_worker_processes.")));
229+
++wstate->nworkers;
230+
}
231+
232+
/* All done. */
233+
MemoryContextSwitchTo(oldcontext);
234+
return wstate;
235+
}
236+
237+
static void
238+
cleanup_background_workers(dsm_segment *seg, Datum arg)
239+
{
240+
worker_state *wstate = (worker_state *) DatumGetPointer(arg);
241+
242+
while (wstate->nworkers > 0)
243+
{
244+
--wstate->nworkers;
245+
TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
246+
}
247+
}
248+
249+
static void
250+
wait_for_workers_to_become_ready(worker_state *wstate,
251+
volatile test_shm_mq_header *hdr)
252+
{
253+
bool save_set_latch_on_sigusr1;
254+
bool result = false;
255+
256+
save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
257+
set_latch_on_sigusr1 = true;
258+
259+
PG_TRY();
260+
{
261+
for (;;)
262+
{
263+
int workers_ready;
264+
265+
/* If all the workers are ready, we have succeeded. */
266+
SpinLockAcquire(&hdr->mutex);
267+
workers_ready = hdr->workers_ready;
268+
SpinLockRelease(&hdr->mutex);
269+
if (workers_ready >= wstate->nworkers)
270+
{
271+
result = true;
272+
break;
273+
}
274+
275+
/* If any workers (or the postmaster) have died, we have failed. */
276+
if (!check_worker_status(wstate))
277+
{
278+
result = false;
279+
break;
280+
}
281+
282+
/* Wait to be signalled. */
283+
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
284+
285+
/* An interrupt may have occurred while we were waiting. */
286+
CHECK_FOR_INTERRUPTS();
287+
288+
/* Reset the latch so we don't spin. */
289+
ResetLatch(&MyProc->procLatch);
290+
}
291+
}
292+
PG_CATCH();
293+
{
294+
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
295+
PG_RE_THROW();
296+
}
297+
PG_END_TRY();
298+
299+
if (!result)
300+
ereport(ERROR,
301+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
302+
errmsg("one or more background workers failed to start")));
303+
}
304+
305+
static bool
306+
check_worker_status(worker_state *wstate)
307+
{
308+
int n;
309+
310+
/* If any workers (or the postmaster) have died, we have failed. */
311+
for (n = 0; n < wstate->nworkers; ++n)
312+
{
313+
BgwHandleStatus status;
314+
pid_t pid;
315+
316+
status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
317+
if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
318+
return false;
319+
}
320+
321+
/* Otherwise, things still look OK. */
322+
return true;
323+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE EXTENSION test_shm_mq;
2+
3+
--
4+
-- These tests don't produce any interesting output. We're checking that
5+
-- the operations complete without crashing or hanging and that none of their
6+
-- internal sanity tests fail.
7+
--
8+
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,400)), 10000, 1);
9+
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,270000)), 200, 3);

0 commit comments

Comments
 (0)