26
26
#include "commands/extension.h"
27
27
#include "libpq-fe.h"
28
28
29
+ #include "pg_shardman.h"
30
+
29
31
30
32
/* ensure that extension won't load against incompatible version of Postgres */
31
33
PG_MODULE_MAGIC ;
@@ -35,11 +37,9 @@ typedef struct Cmd
35
37
int64 id ;
36
38
char * cmd_type ;
37
39
char * status ;
40
+ char * * opts ; /* array of n options, opts[n] is NULL */
38
41
} Cmd ;
39
42
40
- extern void _PG_init (void );
41
- extern void shardmaster_main (Datum main_arg );
42
-
43
43
static Cmd * next_cmd (void );
44
44
static void update_cmd_status (int64 id , const char * new_status );
45
45
static PGconn * listen_cmd_log_inserts (void );
@@ -132,10 +132,13 @@ shardmaster_main(Datum main_arg)
132
132
/* main loop */
133
133
while (!got_sigterm )
134
134
{
135
+ /* TODO: new mem ctxt for every command */
135
136
while ((cmd = next_cmd ()) != NULL )
136
137
{
137
138
update_cmd_status (cmd -> id , "in progress" );
138
- elog (LOG , "Working on command %ld, %s" , cmd -> id , cmd -> cmd_type );
139
+ elog (LOG , "Working on command %ld, %s, opts are" , cmd -> id , cmd -> cmd_type );
140
+ for (char * * opts = cmd -> opts ; * opts ; opts ++ )
141
+ elog (LOG , "%s" , * opts );
139
142
update_cmd_status (cmd -> id , "success" );
140
143
}
141
144
wait_notify (conn );
@@ -239,24 +242,44 @@ next_cmd(void)
239
242
" status = 'in progress') t2 using (id);" ;
240
243
e = SPI_execute (cmd_sql , true, 0 );
241
244
if (e < 0 )
242
- {
243
245
elog (FATAL , "Stmt failed: %s" , cmd_sql );
244
- }
245
246
246
247
if (SPI_processed > 0 )
247
248
{
248
249
HeapTuple tuple = SPI_tuptable -> vals [0 ];
249
250
TupleDesc rowdesc = SPI_tuptable -> tupdesc ;
250
251
bool isnull ;
251
- const char * cmd_type = (SPI_getvalue (tuple , rowdesc ,
252
- SPI_fnumber (rowdesc , "cmd_type" )));
252
+ uint64 i ;
253
253
254
+ /* copy the command itself to callee context */
254
255
MemoryContext spicxt = MemoryContextSwitchTo (oldcxt );
255
256
cmd = palloc (sizeof (Cmd ));
256
257
cmd -> id = DatumGetInt64 (SPI_getbinval (tuple , rowdesc ,
257
258
SPI_fnumber (rowdesc , "id" ),
258
259
& isnull ));
259
- cmd -> cmd_type = pstrdup (cmd_type );
260
+ cmd -> cmd_type = SPI_getvalue (tuple , rowdesc ,
261
+ SPI_fnumber (rowdesc , "cmd_type" ));
262
+ MemoryContextSwitchTo (spicxt );
263
+
264
+ /* Now get options. cmd_sql will be freed by SPI_finish */
265
+ cmd_sql = psprintf ("select opt from shardman.cmd_opts where"
266
+ " cmd_id = %ld order by id;" , cmd -> id );
267
+ e = SPI_execute (cmd_sql , true, 0 );
268
+ if (e < 0 )
269
+ elog (FATAL , "Stmt failed: %s" , cmd_sql );
270
+
271
+ MemoryContextSwitchTo (oldcxt );
272
+ /* +1 for NULL in the end */
273
+ cmd -> opts = palloc ((SPI_processed + 1 ) * sizeof (char * ));
274
+ for (i = 0 ; i < SPI_processed ; i ++ )
275
+ {
276
+ tuple = SPI_tuptable -> vals [i ];
277
+ rowdesc = SPI_tuptable -> tupdesc ;
278
+ cmd -> opts [i ] = SPI_getvalue (tuple , rowdesc ,
279
+ SPI_fnumber (rowdesc , "opt" ));
280
+ }
281
+ cmd -> opts [i ] = NULL ;
282
+
260
283
MemoryContextSwitchTo (spicxt );
261
284
}
262
285
0 commit comments