@@ -122,10 +122,17 @@ wait_command_completion(PGconn* conn)
122
122
return true;
123
123
}
124
124
125
+ typedef struct
126
+ {
127
+ PGconn * con ;
128
+ char * sql ;
129
+ int node ;
130
+ } Channel ;
131
+
125
132
Datum
126
133
broadcast (PG_FUNCTION_ARGS )
127
134
{
128
- char * sql_full = text_to_cstring (PG_GETARG_TEXT_PP (0 ));
135
+ char * sql_full = text_to_cstring (PG_GETARG_TEXT_PP (0 ));
129
136
char * cmd = pstrdup (sql_full );
130
137
bool ignore_errors = PG_GETARG_BOOL (1 );
131
138
bool two_phase = PG_GETARG_BOOL (2 );
@@ -143,17 +150,18 @@ broadcast(PG_FUNCTION_ARGS)
143
150
int n_cmds = 0 ;
144
151
int i ;
145
152
int n_cons = 1024 ;
146
- PGconn * * conn ;
153
+ Channel * chan ;
154
+ PGconn * con ;
147
155
StringInfoData resp ;
148
156
149
- char * errmsg = NULL ;
157
+ char const * errmsg = "" ;
150
158
151
159
elog (DEBUG1 , "Broadcast commmand '%s'" , cmd );
152
160
153
161
initStringInfo (& resp );
154
162
155
163
SPI_connect ();
156
- conn = (PGconn * * ) palloc (sizeof (PGconn * ) * n_cons );
164
+ chan = (Channel * ) palloc (sizeof (Channel ) * n_cons );
157
165
158
166
while ((sep = strchr (cmd , * cmd == '{' ? '}' : ';' )) != NULL )
159
167
{
@@ -163,7 +171,7 @@ broadcast(PG_FUNCTION_ARGS)
163
171
cmd += 1 ;
164
172
rc = sscanf (cmd , "%d:%n" , & node_id , & n );
165
173
if (rc != 1 ) {
166
- elog (ERROR , "SHARDMAN: Invalid command string: %s " , cmd );
174
+ elog (ERROR , "SHARDMAN: Invalid command string: '%s' in '%s' " , cmd , sql_full );
167
175
}
168
176
sql = cmd + n ;
169
177
cmd = sep + 1 ;
@@ -191,21 +199,26 @@ broadcast(PG_FUNCTION_ARGS)
191
199
}
192
200
if (n_cmds >= n_cons )
193
201
{
194
- conn = (PGconn * * ) repalloc (conn , sizeof (PGconn * ) * (n_cons *= 2 ));
202
+ chan = (Channel * ) repalloc (chan , sizeof (Channel ) * (n_cons *= 2 ));
195
203
}
196
204
197
- conn [n_cmds ] = PQconnectdb (conn_str );
198
- if (PQstatus (conn [n_cmds ++ ]) != CONNECTION_OK )
205
+ con = PQconnectdb (conn_str );
206
+ chan [n_cmds ].con = con ;
207
+ chan [n_cmds ].node = node_id ;
208
+ chan [n_cmds ].sql = sql ;
209
+ n_cmds += 1 ;
210
+
211
+ if (PQstatus (con ) != CONNECTION_OK )
199
212
{
200
213
if (ignore_errors )
201
214
{
202
- errmsg = psprintf ("%s%d:Connection failure: %s. " ,
203
- errmsg ? errmsg : "" , node_id ,
204
- PQerrorMessage ( conn [n_cmds - 1 ])) ;
215
+ errmsg = psprintf ("%s<error> %d:Connection failure: %s</error> " ,
216
+ errmsg , node_id , PQerrorMessage ( con ));
217
+ chan [n_cmds - 1 ]. sql = NULL ;
205
218
continue ;
206
219
}
207
220
errmsg = psprintf ("Failed to connect to node %d: %s" , node_id ,
208
- PQerrorMessage (conn [ n_cmds - 1 ] ));
221
+ PQerrorMessage (con ));
209
222
goto cleanup ;
210
223
}
211
224
if (!sync_commit_on )
@@ -221,18 +234,18 @@ broadcast(PG_FUNCTION_ARGS)
221
234
}
222
235
}
223
236
elog (DEBUG1 , "Send command '%s' to node %d" , sql , node_id );
224
- if (!PQsendQuery (conn [ n_cmds - 1 ] , sql )
225
- || (sequential && !wait_command_completion (conn [ n_cmds - 1 ] )))
237
+ if (!PQsendQuery (con , sql )
238
+ || (sequential && !wait_command_completion (con )))
226
239
{
227
240
if (ignore_errors )
228
241
{
229
- errmsg = psprintf ("%s%d:Failed to send query '%s': %s'. " ,
230
- errmsg ? errmsg : "" , node_id , sql ,
231
- PQerrorMessage ( conn [n_cmds - 1 ])) ;
242
+ errmsg = psprintf ("%s<error> %d:Failed to send query '%s': %s</error> " ,
243
+ errmsg , node_id , sql , PQerrorMessage ( con ));
244
+ chan [n_cmds - 1 ]. sql = NULL ;
232
245
continue ;
233
246
}
234
247
errmsg = psprintf ("Failed to send query '%s' to node %d: %s'" , sql ,
235
- node_id , PQerrorMessage (conn [ n_cmds - 1 ] ));
248
+ node_id , PQerrorMessage (con ));
236
249
goto cleanup ;
237
250
}
238
251
}
@@ -248,7 +261,15 @@ broadcast(PG_FUNCTION_ARGS)
248
261
PGresult * res = NULL ;
249
262
ExecStatusType status ;
250
263
251
- while ((next_res = PQgetResult (conn [i ])) != NULL )
264
+ con = chan [i ].con ;
265
+
266
+ if (chan [i ].sql == NULL )
267
+ {
268
+ /* Ignore commands which were not sent */
269
+ continue ;
270
+ }
271
+
272
+ while ((next_res = PQgetResult (con )) != NULL )
252
273
{
253
274
if (res != NULL )
254
275
{
@@ -260,10 +281,12 @@ broadcast(PG_FUNCTION_ARGS)
260
281
{
261
282
if (ignore_errors )
262
283
{
263
- errmsg = psprintf ("%s%d:Failed to received response for '%s': %s." , errmsg ? errmsg : "" , node_id , sql_full , PQerrorMessage (conn [i ]));
284
+ errmsg = psprintf ("%s<error>%d:Failed to received response for '%s': %s</error>" ,
285
+ errmsg , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
264
286
continue ;
265
287
}
266
- errmsg = psprintf ("Failed to receive response for query %s from node %d: %s" , sql_full , node_id , PQerrorMessage (conn [i ]));
288
+ errmsg = psprintf ("Failed to receive response for query %s from node %d: %s" ,
289
+ chan [i ].sql , chan [i ].node , PQerrorMessage (con ));
267
290
goto cleanup ;
268
291
}
269
292
@@ -273,11 +296,13 @@ broadcast(PG_FUNCTION_ARGS)
273
296
{
274
297
if (ignore_errors )
275
298
{
276
- errmsg = psprintf ("%s%d:Command %s failed: %s." , errmsg ? errmsg : "" , node_id , sql_full , PQerrorMessage (conn [i ]));
299
+ errmsg = psprintf ("%s<error>%d:Command %s failed: %s</error>" ,
300
+ errmsg , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
277
301
PQclear (res );
278
302
continue ;
279
303
}
280
- errmsg = psprintf ("Command %s failed at node %d: %s" , sql_full , node_id , PQerrorMessage (conn [i ]));
304
+ errmsg = psprintf ("Command %s failed at node %d: %s" ,
305
+ chan [i ].sql , chan [i ].node , PQerrorMessage (con ));
281
306
PQclear (res );
282
307
goto cleanup ;
283
308
}
@@ -292,11 +317,13 @@ broadcast(PG_FUNCTION_ARGS)
292
317
if (ignore_errors )
293
318
{
294
319
appendStringInfoString (& resp , "?" );
295
- elog (WARNING , "SHARDMAN: Query '%s' doesn't return single tuple at node %d" , sql_full , node_id );
320
+ elog (WARNING , "SHARDMAN: Query '%s' doesn't return single tuple at node %d" ,
321
+ chan [i ].sql , chan [i ].node );
296
322
}
297
323
else
298
324
{
299
- errmsg = psprintf ("Query '%s' doesn't return single tuple at node %d" , sql_full , node_id );
325
+ errmsg = psprintf ("Query '%s' doesn't return single tuple at node %d" ,
326
+ chan [i ].sql , chan [i ].node );
300
327
PQclear (res );
301
328
goto cleanup ;
302
329
}
@@ -315,37 +342,39 @@ broadcast(PG_FUNCTION_ARGS)
315
342
cleanup :
316
343
for (i = 0 ; i < n_cmds ; i ++ )
317
344
{
345
+ con = chan [i ].con ;
318
346
if (two_phase )
319
347
{
320
- if (errmsg )
348
+ if (* errmsg )
321
349
{
322
- res = PQexec (conn [ i ] , "ROLLBACK PREPARED 'shardlord'" );
350
+ res = PQexec (con , "ROLLBACK PREPARED 'shardlord'" );
323
351
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
324
352
{
325
353
elog (WARNING , "SHARDMAN: Rollback of 2PC failed at node %d: %s" ,
326
- node_id , PQerrorMessage (conn [ i ] ));
354
+ chan [ i ]. node , PQerrorMessage (con ));
327
355
}
328
356
PQclear (res );
329
357
}
330
358
else
331
359
{
332
- res = PQexec (conn [ i ] , "COMMIT PREPARED 'shardlord'" );
360
+ res = PQexec (con , "COMMIT PREPARED 'shardlord'" );
333
361
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
334
362
{
335
363
elog (WARNING , "SHARDMAN: Commit of 2PC failed at node %d: %s" ,
336
- node_id , PQerrorMessage (conn [ i ] ));
364
+ chan [ i ]. node , PQerrorMessage (con ));
337
365
}
338
366
PQclear (res );
339
367
}
340
368
}
341
- PQfinish (conn [ i ] );
369
+ PQfinish (con );
342
370
}
343
371
344
- if (errmsg )
372
+ if (* errmsg )
345
373
{
346
374
if (ignore_errors )
347
375
{
348
- appendStringInfo (& resp , "Error:%s" , errmsg );
376
+ resetStringInfo (& resp );
377
+ appendStringInfoString (& resp , errmsg );
349
378
elog (WARNING , "SHARDMAN: %s" , errmsg );
350
379
}
351
380
else
@@ -354,7 +383,7 @@ broadcast(PG_FUNCTION_ARGS)
354
383
}
355
384
}
356
385
357
- pfree (conn );
386
+ pfree (chan );
358
387
SPI_finish ();
359
388
360
389
PG_RETURN_TEXT_P (cstring_to_text (resp .data ));
@@ -366,7 +395,7 @@ broadcast(PG_FUNCTION_ARGS)
366
395
* defaults, everything. Parameter is not REGCLASS because pg_dump can't
367
396
* handle oids anyway. Connstring must be proper libpq connstring, it is feed
368
397
* to pg_dump.
369
- * TODO: actually we should have much more control on what is dumped, so we
398
+ * TODO: actually we should have muchmore control on what is dumped, so we
370
399
* need to copy-paste parts of messy pg_dump or collect the needed data
371
400
* manually walking over catalogs.
372
401
*/
0 commit comments