7
7
* -------------------------------------------------------------------------
8
8
*/
9
9
#include "postgres.h"
10
- #include "libpq-fe.h"
11
- #include "miscadmin.h"
10
+
11
+ #include "access/htup_details.h"
12
+ #include "access/xact.h"
13
+ #include "access/xlog.h"
14
+ #include "catalog/pg_type.h"
15
+ #include "commands/event_trigger.h"
12
16
#include "executor/spi.h"
13
17
#include "funcapi.h"
18
+ #include "libpq-fe.h"
19
+ #include "miscadmin.h"
14
20
#include "pgstat.h"
21
+ #include "storage/latch.h"
15
22
#include "utils/guc.h"
16
23
#include "utils/rel.h"
17
24
#include "utils/builtins.h"
18
25
#include "utils/lsyscache.h"
19
- #include "catalog/pg_type.h"
20
- #include "access/htup_details.h"
21
- #include "access/xlog.h"
22
- #include "storage/latch.h"
23
26
24
27
/* ensure that extension won't load against incompatible version of Postgres */
25
28
PG_MODULE_MAGIC ;
@@ -31,6 +34,7 @@ PG_FUNCTION_INFO_V1(broadcast);
31
34
PG_FUNCTION_INFO_V1 (reconstruct_table_attrs );
32
35
PG_FUNCTION_INFO_V1 (pq_conninfo_parse );
33
36
PG_FUNCTION_INFO_V1 (get_system_identifier );
37
+ PG_FUNCTION_INFO_V1 (reset_synchronous_standby_names_on_commit );
34
38
35
39
/* GUC variables */
36
40
static bool is_lord ;
@@ -39,6 +43,11 @@ static char *shardlord_connstring;
39
43
40
44
extern void _PG_init (void );
41
45
46
+ static bool reset_ssn_callback_set = false;
47
+ static bool reset_ssn_requested = false;
48
+
49
+ static void reset_ssn_xact_callback (XactEvent event , void * arg );
50
+
42
51
/*
43
52
* Entrypoint of the module. Define GUCs.
44
53
*/
@@ -75,6 +84,15 @@ _PG_init()
75
84
PGC_SUSET ,
76
85
0 ,
77
86
NULL , NULL , NULL );
87
+
88
+ /*
89
+ * Tell pathman that we want it to do shardman-specific COPY FROM: that
90
+ * is, support copy to foreign partitions by copying to foreign parent.
91
+ * For now we just ask to do it always. Better to turn on this in copy
92
+ * hook turn off after, however for that we need metadata on all nodes.
93
+ */
94
+ * find_rendezvous_variable (
95
+ "shardman_pathman_copy_from_rendezvous" ) = DatumGetPointer (1 );
78
96
}
79
97
80
98
Datum
@@ -95,6 +113,10 @@ is_shardlord(PG_FUNCTION_ARGS)
95
113
PG_RETURN_BOOL (is_lord );
96
114
}
97
115
116
+ /*
117
+ * Wait until PQgetResult would certainly be non-blocking. Returns true if
118
+ * everything is ok, false on error.
119
+ */
98
120
static bool
99
121
wait_command_completion (PGconn * conn )
100
122
{
@@ -119,7 +141,7 @@ wait_command_completion(PGconn* conn)
119
141
return false;
120
142
}
121
143
}
122
- return true;
144
+ return true;
123
145
}
124
146
125
147
typedef struct
@@ -139,22 +161,25 @@ broadcast(PG_FUNCTION_ARGS)
139
161
bool sync_commit_on = PG_GETARG_BOOL (3 );
140
162
bool sequential = PG_GETARG_BOOL (4 );
141
163
bool super_connstr = PG_GETARG_BOOL (5 );
164
+ char * iso_level = (PG_GETARG_POINTER (6 ) != NULL ) ?
165
+ text_to_cstring (PG_GETARG_TEXT_PP (6 )) : NULL ;
142
166
char * sep ;
143
167
char * sql ;
144
168
PGresult * res ;
145
169
char * fetch_node_connstr ;
146
170
int rc ;
147
171
int node_id ;
148
- int n ;
172
+ int n ;
149
173
char * conn_str ;
150
174
int n_cmds = 0 ;
151
175
int i ;
152
- int n_cons = 1024 ;
176
+ int n_cons = 1024 ; /* num of channels allocated currently */
153
177
Channel * chan ;
154
178
PGconn * con ;
155
179
StringInfoData resp ;
180
+ StringInfoData fin_sql ;
156
181
157
- char const * errmsg = "" ;
182
+ char const * errstr = "" ;
158
183
159
184
elog (DEBUG1 , "Broadcast commmand '%s'" , cmd );
160
185
@@ -163,6 +188,7 @@ broadcast(PG_FUNCTION_ARGS)
163
188
SPI_connect ();
164
189
chan = (Channel * ) palloc (sizeof (Channel ) * n_cons );
165
190
191
+ /* Open connections and send all queries */
166
192
while ((sep = strchr (cmd , * cmd == '{' ? '}' : ';' )) != NULL )
167
193
{
168
194
* sep = '\0' ;
@@ -171,9 +197,10 @@ broadcast(PG_FUNCTION_ARGS)
171
197
cmd += 1 ;
172
198
rc = sscanf (cmd , "%d:%n" , & node_id , & n );
173
199
if (rc != 1 ) {
174
- elog (ERROR , "SHARDMAN: Invalid command string: '%s' in '%s'" , cmd , sql_full );
200
+ elog (ERROR , "SHARDMAN: Invalid command string: '%s' in '%s'" ,
201
+ cmd , sql_full );
175
202
}
176
- sql = cmd + n ;
203
+ sql = cmd + n ; /* eat node id and colon */
177
204
cmd = sep + 1 ;
178
205
if (node_id != 0 )
179
206
{
@@ -187,7 +214,8 @@ broadcast(PG_FUNCTION_ARGS)
187
214
}
188
215
pfree (fetch_node_connstr );
189
216
190
- conn_str = SPI_getvalue (SPI_tuptable -> vals [0 ], SPI_tuptable -> tupdesc , 1 );
217
+ conn_str = SPI_getvalue (SPI_tuptable -> vals [0 ],
218
+ SPI_tuptable -> tupdesc , 1 );
191
219
}
192
220
else
193
221
{
@@ -212,39 +240,40 @@ broadcast(PG_FUNCTION_ARGS)
212
240
{
213
241
if (ignore_errors )
214
242
{
215
- errmsg = psprintf ("%s<error>%d:Connection failure: %s</error>" ,
216
- errmsg , node_id , PQerrorMessage (con ));
243
+ errstr = psprintf ("%s<error>%d:Connection failure: %s</error>" ,
244
+ errstr , node_id , PQerrorMessage (con ));
217
245
chan [n_cmds - 1 ].sql = NULL ;
218
246
continue ;
219
247
}
220
- errmsg = psprintf ("Failed to connect to node %d: %s" , node_id ,
248
+ errstr = psprintf ("Failed to connect to node %d: %s" , node_id ,
221
249
PQerrorMessage (con ));
222
250
goto cleanup ;
223
251
}
252
+ /* Build the actual sql to send, mem freed with ctxt */
253
+ initStringInfo (& fin_sql );
224
254
if (!sync_commit_on )
225
- {
226
- /* mem freed with context */
227
- if (two_phase )
228
- {
229
- sql = psprintf ("SET SESSION synchronous_commit TO local; BEGIN; %s; PREPARE TRANSACTION 'shardlord';" , sql );
230
- }
231
- else
232
- {
233
- sql = psprintf ("SET SESSION synchronous_commit TO local; %s" , sql );
234
- }
235
- }
236
- elog (DEBUG1 , "Send command '%s' to node %d" , sql , node_id );
237
- if (!PQsendQuery (con , sql )
255
+ appendStringInfoString (& fin_sql , "SET SESSION synchronous_commit TO local; " );
256
+ if (iso_level )
257
+ appendStringInfo (& fin_sql , "BEGIN TRANSACTION ISOLATION LEVEL %s; " , iso_level );
258
+ appendStringInfoString (& fin_sql , sql );
259
+ appendStringInfoChar (& fin_sql , ';' ); /* it was removed after strchr */
260
+ if (two_phase )
261
+ appendStringInfoString (& fin_sql , "PREPARE TRANSACTION 'shardlord';" );
262
+ else if (iso_level )
263
+ appendStringInfoString (& fin_sql , "END;" );
264
+
265
+ elog (DEBUG1 , "Sending command '%s' to node %d" , fin_sql .data , node_id );
266
+ if (!PQsendQuery (con , fin_sql .data )
238
267
|| (sequential && !wait_command_completion (con )))
239
268
{
240
269
if (ignore_errors )
241
270
{
242
- errmsg = psprintf ("%s<error>%d:Failed to send query '%s': %s</error>" ,
243
- errmsg , node_id , sql , PQerrorMessage (con ));
271
+ errstr = psprintf ("%s<error>%d:Failed to send query '%s': %s</error>" ,
272
+ errstr , node_id , fin_sql . data , PQerrorMessage (con ));
244
273
chan [n_cmds - 1 ].sql = NULL ;
245
274
continue ;
246
275
}
247
- errmsg = psprintf ("Failed to send query '%s' to node %d: %s'" , sql ,
276
+ errstr = psprintf ("Failed to send query '%s' to node %d: %s'" , fin_sql . data ,
248
277
node_id , PQerrorMessage (con ));
249
278
goto cleanup ;
250
279
}
@@ -255,6 +284,9 @@ broadcast(PG_FUNCTION_ARGS)
255
284
elog (ERROR , "SHARDMAN: Junk at end of command list: %s" , cmd );
256
285
}
257
286
287
+ /*
288
+ * Now collect results
289
+ */
258
290
for (i = 0 ; i < n_cmds ; i ++ )
259
291
{
260
292
PGresult * next_res ;
@@ -269,6 +301,7 @@ broadcast(PG_FUNCTION_ARGS)
269
301
continue ;
270
302
}
271
303
304
+ /* Skip all but the last result */
272
305
while ((next_res = PQgetResult (con )) != NULL )
273
306
{
274
307
if (res != NULL )
@@ -277,31 +310,32 @@ broadcast(PG_FUNCTION_ARGS)
277
310
}
278
311
res = next_res ;
279
312
}
313
+
280
314
if (res == NULL )
281
315
{
282
316
if (ignore_errors )
283
317
{
284
- errmsg = psprintf ("%s<error>%d:Failed to received response for '%s': %s</error>" ,
285
- errmsg , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
318
+ errstr = psprintf ("%s<error>%d:Failed to received response for '%s': %s</error>" ,
319
+ errstr , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
286
320
continue ;
287
321
}
288
- errmsg = psprintf ("Failed to receive response for query %s from node %d: %s" ,
322
+ errstr = psprintf ("Failed to receive response for query %s from node %d: %s" ,
289
323
chan [i ].sql , chan [i ].node , PQerrorMessage (con ));
290
324
goto cleanup ;
291
325
}
292
326
293
- /* Ok, result was successfully fetched */
327
+ /* Ok, result was successfully fetched, add it to resp */
294
328
status = PQresultStatus (res );
295
329
if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK )
296
330
{
297
331
if (ignore_errors )
298
332
{
299
- errmsg = psprintf ("%s<error>%d:Command %s failed: %s</error>" ,
300
- errmsg , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
333
+ errstr = psprintf ("%s<error>%d:Command %s failed: %s</error>" ,
334
+ errstr , chan [i ].node , chan [i ].sql , PQerrorMessage (con ));
301
335
PQclear (res );
302
336
continue ;
303
337
}
304
- errmsg = psprintf ("Command %s failed at node %d: %s" ,
338
+ errstr = psprintf ("Command %s failed at node %d: %s" ,
305
339
chan [i ].sql , chan [i ].node , PQerrorMessage (con ));
306
340
PQclear (res );
307
341
goto cleanup ;
@@ -322,7 +356,7 @@ broadcast(PG_FUNCTION_ARGS)
322
356
}
323
357
else
324
358
{
325
- errmsg = psprintf ("Query '%s' doesn't return single tuple at node %d" ,
359
+ errstr = psprintf ("Query '%s' doesn't return single tuple at node %d" ,
326
360
chan [i ].sql , chan [i ].node );
327
361
PQclear (res );
328
362
goto cleanup ;
@@ -339,13 +373,14 @@ broadcast(PG_FUNCTION_ARGS)
339
373
}
340
374
PQclear (res );
341
375
}
376
+
342
377
cleanup :
343
378
for (i = 0 ; i < n_cmds ; i ++ )
344
379
{
345
380
con = chan [i ].con ;
346
381
if (two_phase )
347
382
{
348
- if (* errmsg )
383
+ if (* errstr )
349
384
{
350
385
res = PQexec (con , "ROLLBACK PREPARED 'shardlord'" );
351
386
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
@@ -369,17 +404,19 @@ broadcast(PG_FUNCTION_ARGS)
369
404
PQfinish (con );
370
405
}
371
406
372
- if (* errmsg )
407
+ if (* errstr )
373
408
{
374
409
if (ignore_errors )
375
410
{
376
411
resetStringInfo (& resp );
377
- appendStringInfoString (& resp , errmsg );
378
- elog (WARNING , "SHARDMAN: %s" , errmsg );
412
+ appendStringInfoString (& resp , errstr );
413
+ elog (WARNING , "SHARDMAN: %s" , errstr );
379
414
}
380
415
else
381
416
{
382
- elog (ERROR , "SHARDMAN: %s" , errmsg );
417
+ ereport (ERROR ,
418
+ (errcode (ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION ),
419
+ errmsg ("SHARDMAN: %s" , errstr )));
383
420
}
384
421
}
385
422
@@ -605,3 +642,45 @@ get_system_identifier(PG_FUNCTION_ARGS)
605
642
{
606
643
PG_RETURN_INT64 (GetSystemIdentifier ());
607
644
}
645
+
646
+ /*
647
+ * Execute "ALTER SYSTEM SET synchronous_standby_names = '' on commit"
648
+ */
649
+ Datum
650
+ reset_synchronous_standby_names_on_commit (PG_FUNCTION_ARGS )
651
+ {
652
+ if (!reset_ssn_callback_set )
653
+ RegisterXactCallback (reset_ssn_xact_callback , NULL );
654
+ reset_ssn_requested = true;
655
+ PG_RETURN_VOID ();
656
+ }
657
+
658
+ static void
659
+ reset_ssn_xact_callback (XactEvent event , void * arg )
660
+ {
661
+ if (reset_ssn_requested )
662
+ {
663
+ /* I just wanted to practice a bit with PG nodes and lists */
664
+ A_Const * aconst = makeNode (A_Const );
665
+ List * set_stmt_args = list_make1 (aconst );
666
+ VariableSetStmt setstmt ;
667
+ AlterSystemStmt altersysstmt ;
668
+
669
+ aconst -> val .type = T_String ;
670
+ aconst -> val .val .str = "" ; /* set it to empty value */
671
+ aconst -> location = -1 ;
672
+
673
+ setstmt .type = T_VariableSetStmt ;
674
+ setstmt .kind = VAR_SET_VALUE ;
675
+ setstmt .name = "synchronous_standby_names" ;
676
+ setstmt .args = set_stmt_args ;
677
+
678
+ altersysstmt .type = T_AlterSystemStmt ;
679
+ altersysstmt .setstmt = & setstmt ;
680
+ AlterSystemSetConfigFile (& altersysstmt );
681
+ pg_reload_conf (NULL );
682
+
683
+ list_free_deep (setstmt .args );
684
+ reset_ssn_requested = false;
685
+ }
686
+ }
0 commit comments