1
1
/* -------------------------------------------------------------------------
2
2
*
3
3
* worker_spi.c
4
- * Sample background worker code that demonstrates usage of a database
5
- * connection.
4
+ * Sample background worker code that demonstrates various coding
5
+ * patterns: establishing a database connection; starting and committing
6
+ * transactions; using GUC variables, and heeding SIGHUP to reread
7
+ * the configuration file; reporting to pg_stat_activity; using the
8
+ * process latch to sleep and exit in case of postmaster death.
6
9
*
7
- * This code connects to a database, create a schema and table, and summarizes
10
+ * This code connects to a database, creates a schema and table, and summarizes
8
11
* the numbers contained therein. To see it working, insert an initial value
9
12
* with "total" type and some initial value; then insert some other rows with
10
13
* "delta" type. Delta rows will be deleted by this worker and their values
11
14
* aggregated into the total.
12
15
*
13
- * Copyright (C) 2012 , PostgreSQL Global Development Group
16
+ * Copyright (C) 2013 , PostgreSQL Global Development Group
14
17
*
15
18
* IDENTIFICATION
16
19
* contrib/worker_spi/worker_spi.c
33
36
#include "executor/spi.h"
34
37
#include "fmgr.h"
35
38
#include "lib/stringinfo.h"
39
+ #include "pgstat.h"
36
40
#include "utils/builtins.h"
37
41
#include "utils/snapmgr.h"
42
+ #include "tcop/utility.h"
38
43
39
44
PG_MODULE_MAGIC ;
40
45
41
46
void _PG_init (void );
42
47
43
- static bool got_sigterm = false;
48
+ /* flags set by signal handlers */
49
+ static volatile sig_atomic_t got_sighup = false;
50
+ static volatile sig_atomic_t got_sigterm = false;
51
+
52
+ /* GUC variables */
53
+ static int worker_spi_naptime = 10 ;
54
+ static int worker_spi_total_workers = 2 ;
44
55
45
56
46
57
typedef struct worktable
@@ -49,6 +60,11 @@ typedef struct worktable
49
60
const char * name ;
50
61
} worktable ;
51
62
63
+ /*
64
+ * Signal handler for SIGTERM
65
+ * Set a flag to let the main loop to terminate, and set our latch to wake
66
+ * it up.
67
+ */
52
68
static void
53
69
worker_spi_sigterm (SIGNAL_ARGS )
54
70
{
@@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS)
61
77
errno = save_errno ;
62
78
}
63
79
80
+ /*
81
+ * Signal handler for SIGHUP
82
+ * Set a flag to let the main loop to reread the config file, and set
83
+ * our latch to wake it up.
84
+ */
64
85
static void
65
86
worker_spi_sighup (SIGNAL_ARGS )
66
87
{
67
- elog ( LOG , "got sighup!" ) ;
88
+ got_sighup = true ;
68
89
if (MyProc )
69
90
SetLatch (& MyProc -> procLatch );
70
91
}
71
92
93
+ /*
94
+ * Initialize workspace for a worker process: create the schema if it doesn't
95
+ * already exist.
96
+ */
72
97
static void
73
98
initialize_worker_spi (worktable * table )
74
99
{
@@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table)
77
102
bool isnull ;
78
103
StringInfoData buf ;
79
104
105
+ SetCurrentStatementStartTimestamp ();
80
106
StartTransactionCommand ();
81
107
SPI_connect ();
82
108
PushActiveSnapshot (GetTransactionSnapshot ());
109
+ pgstat_report_activity (STATE_RUNNING , "initializing spi_worker schema" );
83
110
111
+ /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
84
112
initStringInfo (& buf );
85
113
appendStringInfo (& buf , "select count(*) from pg_namespace where nspname = '%s'" ,
86
114
table -> schema );
@@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table)
110
138
"WHERE type = 'total'" ,
111
139
table -> schema , table -> name , table -> name , table -> name );
112
140
141
+ /* set statement start time */
142
+ SetCurrentStatementStartTimestamp ();
143
+
113
144
ret = SPI_execute (buf .data , false, 0 );
114
145
115
146
if (ret != SPI_OK_UTILITY )
@@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table)
119
150
SPI_finish ();
120
151
PopActiveSnapshot ();
121
152
CommitTransactionCommand ();
153
+ pgstat_report_activity (STATE_IDLE , NULL );
122
154
}
123
155
124
156
static void
@@ -163,6 +195,9 @@ worker_spi_main(void *main_arg)
163
195
table -> name ,
164
196
table -> name );
165
197
198
+ /*
199
+ * Main loop: do this until the SIGTERM handler tells us to terminate
200
+ */
166
201
while (!got_sigterm )
167
202
{
168
203
int ret ;
@@ -176,17 +211,45 @@ worker_spi_main(void *main_arg)
176
211
*/
177
212
rc = WaitLatch (& MyProc -> procLatch ,
178
213
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
179
- 1000L );
214
+ worker_spi_naptime * 1000L );
180
215
ResetLatch (& MyProc -> procLatch );
181
216
182
217
/* emergency bailout if postmaster has died */
183
218
if (rc & WL_POSTMASTER_DEATH )
184
219
proc_exit (1 );
185
220
221
+ /*
222
+ * In case of a SIGHUP, just reload the configuration.
223
+ */
224
+ if (got_sighup )
225
+ {
226
+ got_sighup = false;
227
+ ProcessConfigFile (PGC_SIGHUP );
228
+ }
229
+
230
+ /*
231
+ * Start a transaction on which we can run queries. Note that each
232
+ * StartTransactionCommand() call should be preceded by a
233
+ * SetCurrentStatementStartTimestamp() call, which sets both the time
234
+ * for the statement we're about the run, and also the transaction
235
+ * start time. Also, each other query sent to SPI should probably be
236
+ * preceded by SetCurrentStatementStartTimestamp(), so that statement
237
+ * start time is always up to date.
238
+ *
239
+ * The SPI_connect() call lets us run queries through the SPI manager,
240
+ * and the PushActiveSnapshot() call creates an "active" snapshot which
241
+ * is necessary for queries to have MVCC data to work on.
242
+ *
243
+ * The pgstat_report_activity() call makes our activity visible through
244
+ * the pgstat views.
245
+ */
246
+ SetCurrentStatementStartTimestamp ();
186
247
StartTransactionCommand ();
187
248
SPI_connect ();
188
249
PushActiveSnapshot (GetTransactionSnapshot ());
250
+ pgstat_report_activity (STATE_RUNNING , buf .data );
189
251
252
+ /* We can now execute queries via SPI */
190
253
ret = SPI_execute (buf .data , false, 0 );
191
254
192
255
if (ret != SPI_OK_UPDATE_RETURNING )
@@ -207,9 +270,13 @@ worker_spi_main(void *main_arg)
207
270
table -> schema , table -> name , val );
208
271
}
209
272
273
+ /*
274
+ * And finish our transaction.
275
+ */
210
276
SPI_finish ();
211
277
PopActiveSnapshot ();
212
278
CommitTransactionCommand ();
279
+ pgstat_report_activity (STATE_IDLE , NULL );
213
280
}
214
281
215
282
proc_exit (0 );
@@ -218,46 +285,66 @@ worker_spi_main(void *main_arg)
218
285
/*
219
286
* Entrypoint of this module.
220
287
*
221
- * We register two worker processes here, to demonstrate how that can be done.
288
+ * We register more than one worker process here, to demonstrate how that can
289
+ * be done.
222
290
*/
223
291
void
224
292
_PG_init (void )
225
293
{
226
294
BackgroundWorker worker ;
227
295
worktable * table ;
228
-
229
- /* register the worker processes. These values are common for both */
296
+ unsigned int i ;
297
+ char name [20 ];
298
+
299
+ /* get the configuration */
300
+ DefineCustomIntVariable ("worker_spi.naptime" ,
301
+ "Duration between each check (in seconds)." ,
302
+ NULL ,
303
+ & worker_spi_naptime ,
304
+ 10 ,
305
+ 1 ,
306
+ INT_MAX ,
307
+ PGC_SIGHUP ,
308
+ 0 ,
309
+ NULL ,
310
+ NULL ,
311
+ NULL );
312
+ DefineCustomIntVariable ("worker_spi.total_workers" ,
313
+ "Number of workers." ,
314
+ NULL ,
315
+ & worker_spi_total_workers ,
316
+ 2 ,
317
+ 1 ,
318
+ 100 ,
319
+ PGC_POSTMASTER ,
320
+ 0 ,
321
+ NULL ,
322
+ NULL ,
323
+ NULL );
324
+
325
+ /* set up common data for all our workers */
230
326
worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
231
327
BGWORKER_BACKEND_DATABASE_CONNECTION ;
232
328
worker .bgw_start_time = BgWorkerStart_RecoveryFinished ;
329
+ worker .bgw_restart_time = BGW_NEVER_RESTART ;
233
330
worker .bgw_main = worker_spi_main ;
234
331
worker .bgw_sighup = worker_spi_sighup ;
235
332
worker .bgw_sigterm = worker_spi_sigterm ;
236
333
237
334
/*
238
- * These values are used for the first worker.
239
- *
240
- * Note these are palloc'd. The reason this works after starting a new
241
- * worker process is that if we only fork, they point to valid allocated
242
- * memory in the child process; and if we fork and then exec, the exec'd
243
- * process will run this code again, and so the memory is also valid there.
335
+ * Now fill in worker-specific data, and do the actual registrations.
244
336
*/
245
- table = palloc (sizeof (worktable ));
246
- table -> schema = pstrdup ("schema1" );
247
- table -> name = pstrdup ("counted" );
337
+ for (i = 1 ; i <= worker_spi_total_workers ; i ++ )
338
+ {
339
+ sprintf (name , "worker %d" , i );
340
+ worker .bgw_name = pstrdup (name );
248
341
249
- worker .bgw_name = "SPI worker 1" ;
250
- worker .bgw_restart_time = BGW_NEVER_RESTART ;
251
- worker .bgw_main_arg = (void * ) table ;
252
- RegisterBackgroundWorker (& worker );
253
-
254
- /* Values for the second worker */
255
- table = palloc (sizeof (worktable ));
256
- table -> schema = pstrdup ("our schema2" );
257
- table -> name = pstrdup ("counted rows" );
258
-
259
- worker .bgw_name = "SPI worker 2" ;
260
- worker .bgw_restart_time = 2 ;
261
- worker .bgw_main_arg = (void * ) table ;
262
- RegisterBackgroundWorker (& worker );
342
+ table = palloc (sizeof (worktable ));
343
+ sprintf (name , "schema%d" , i );
344
+ table -> schema = pstrdup (name );
345
+ table -> name = pstrdup ("counted" );
346
+ worker .bgw_main_arg = (void * ) table ;
347
+
348
+ RegisterBackgroundWorker (& worker );
349
+ }
263
350
}
0 commit comments