@@ -42,6 +42,8 @@ typedef struct
42
42
}
43
43
SessionPoolKey ;
44
44
45
+ #define NULLSTR (s ) ((s) ? (s) : "?")
46
+
45
47
/*
46
48
* Channels represent both clients and backends
47
49
*/
@@ -84,7 +86,7 @@ Channel;
84
86
#define REMOVED_CHANNEL_MAGIC 0xDEADDEEDU
85
87
86
88
/*
87
- * Control structure for connection proxies (several proxy workers can be launched and each has it sown proxy instance).
89
+ * Control structure for connection proxies (several proxy workers can be launched and each has its own proxy instance).
88
90
* Proxy contains hash of session pools for reach role/dbname combination.
89
91
*/
90
92
typedef struct Proxy
@@ -115,6 +117,8 @@ typedef struct SessionPool
115
117
int n_connected_clients ; /* Total number of connected clients */
116
118
int n_idle_clients ; /* Number of clients in idle state */
117
119
int n_pending_clients ; /* Number of clients waiting for free backend */
120
+ List * startup_gucs ; /* List of startup options specified in startup packet */
121
+ char * cmdline_options ; /* Command line options passed to backend */
118
122
}
119
123
SessionPool ;
120
124
@@ -208,6 +212,71 @@ backend_reschedule(Channel* chan, bool is_new)
208
212
return true;
209
213
}
210
214
215
+ static size_t
216
+ string_length (char const * str )
217
+ {
218
+ size_t length ;
219
+ if (str == NULL )
220
+ return 0 ;
221
+ while (* str != '\0' )
222
+ length += (* str ++ == ' ' ) ? 2 : 1 ;
223
+ return length ;
224
+ }
225
+
226
+ static size_t
227
+ string_list_length (List * list )
228
+ {
229
+ ListCell * cell ;
230
+ size_t length = 0 ;
231
+ foreach (cell , list )
232
+ {
233
+ length += strlen ((char * )lfirst (cell ));
234
+ }
235
+ return length ;
236
+ }
237
+
238
+ static List *
239
+ string_list_copy (List * orig )
240
+ {
241
+ List * copy = list_copy (orig );
242
+ ListCell * cell ;
243
+ foreach (cell , copy )
244
+ {
245
+ lfirst (cell ) = pstrdup ((char * )lfirst (cell ));
246
+ }
247
+ return copy ;
248
+ }
249
+
250
+ static bool
251
+ string_list_equal (List * a , List * b )
252
+ {
253
+ const ListCell * ca , * cb ;
254
+ if (list_length (a ) != list_length (b ))
255
+ return false;
256
+ forboth (ca , a , cb , b )
257
+ if (strcmp (lfirst (ca ), lfirst (cb )) != 0 )
258
+ return false;
259
+ return true;
260
+ }
261
+
262
+ static char *
263
+ string_append (char * dst , char const * src )
264
+ {
265
+ while (* src )
266
+ {
267
+ if (* src == ' ' )
268
+ * dst ++ = '\\' ;
269
+ * dst ++ = * src ++ ;
270
+ }
271
+ return dst ;
272
+ }
273
+
274
+ static bool
275
+ string_equal (char const * a , char const * b )
276
+ {
277
+ return a == b ? true : a == NULL || b == NULL ? false : strcmp (a , b ) == 0 ;
278
+ }
279
+
211
280
/**
212
281
* Parse client's startup packet and assign client to proper connection pool based on dbname/role
213
282
*/
@@ -253,6 +322,17 @@ client_connect(Channel* chan, int startup_packet_size)
253
322
else
254
323
strlcpy (key .username , chan -> client_port -> user_name , NAMEDATALEN );
255
324
325
+ ELOG (LOG , "Client %p connects to %s/%s" , chan , key .database , key .username );
326
+
327
+ chan -> pool = (SessionPool * )hash_search (chan -> proxy -> pools , & key , HASH_ENTER , & found );
328
+ if (!found )
329
+ {
330
+ /* First connection to this role/dbname */
331
+ chan -> proxy -> state -> n_pools += 1 ;
332
+ chan -> pool -> startup_gucs = NULL ;
333
+ chan -> pool -> cmdline_options = NULL ;
334
+ memset ((char * )chan -> pool + sizeof (SessionPoolKey ), 0 , sizeof (SessionPool ) - sizeof (SessionPoolKey ));
335
+ }
256
336
if (ProxyingGUCs )
257
337
{
258
338
ListCell * gucopts = list_head (chan -> client_port -> guc_options );
@@ -270,15 +350,31 @@ client_connect(Channel* chan, int startup_packet_size)
270
350
chan -> gucs = psprintf ("%sset local %s='%s';" , chan -> gucs ? chan -> gucs : "" , name , value );
271
351
}
272
352
}
273
-
274
- ELOG (LOG , "Client %p connects to %s/%s" , chan , key .database , key .username );
275
-
276
- chan -> pool = (SessionPool * )hash_search (chan -> proxy -> pools , & key , HASH_ENTER , & found );
277
- if (!found )
353
+ else
278
354
{
279
- /* First connection to this role/dbname */
280
- chan -> proxy -> state -> n_pools += 1 ;
281
- memset ((char * )chan -> pool + sizeof (SessionPoolKey ), 0 , sizeof (SessionPool ) - sizeof (SessionPoolKey ));
355
+ /* Assume that all clients are using the same set of GUCs.
356
+ * Use then for launching pooler worker backends and report error
357
+ * if GUCs in startup packets are different.
358
+ */
359
+ if (chan -> pool -> n_launched_backends == 0 )
360
+ {
361
+ list_free (chan -> pool -> startup_gucs );
362
+ if (chan -> pool -> cmdline_options )
363
+ pfree (chan -> pool -> cmdline_options );
364
+
365
+ chan -> pool -> startup_gucs = string_list_copy (chan -> client_port -> guc_options );
366
+ if (chan -> client_port -> cmdline_options )
367
+ chan -> pool -> cmdline_options = pstrdup (chan -> client_port -> cmdline_options );
368
+ }
369
+ else
370
+ {
371
+ if (!string_list_equal (chan -> pool -> startup_gucs , chan -> client_port -> guc_options ) ||
372
+ !string_equal (chan -> pool -> cmdline_options , chan -> client_port -> cmdline_options ))
373
+ {
374
+ elog (LOG , "Ignoring GUCs of client %s:%s" ,
375
+ NULLSTR (chan -> client_port -> remote_host ), NULLSTR (chan -> client_port -> remote_port ));
376
+ }
377
+ }
282
378
}
283
379
chan -> pool -> proxy = chan -> proxy ;
284
380
chan -> pool -> n_connected_clients += 1 ;
@@ -845,13 +941,16 @@ backend_start(SessionPool* pool, char** error)
845
941
{
846
942
Channel * chan ;
847
943
char postmaster_port [8 ];
848
- char const * keywords [] = {"port" ,"dbname" ,"user" ,"sslmode" ,"application_name" ,NULL };
849
- char const * values [] = {postmaster_port ,pool -> key .database ,pool -> key .username ,"disable" ,"pool_worker" ,NULL };
944
+ char * options = (char * )palloc (string_length (pool -> cmdline_options ) + string_list_length (pool -> startup_gucs ) + list_length (pool -> startup_gucs )* 5 + 1 );
945
+ char const * keywords [] = {"port" ,"dbname" ,"user" ,"sslmode" ,"application_name" ,"options" ,NULL };
946
+ char const * values [] = {postmaster_port ,pool -> key .database ,pool -> key .username ,"disable" ,"pool_worker" ,options ,NULL };
850
947
PGconn * conn ;
851
948
char * msg ;
852
949
int int32_buf ;
853
950
int msg_len ;
854
951
static bool libpqconn_loaded ;
952
+ ListCell * gucopts ;
953
+ char * dst = options ;
855
954
856
955
if (!libpqconn_loaded )
857
956
{
@@ -861,7 +960,31 @@ backend_start(SessionPool* pool, char** error)
861
960
libpqconn_loaded = true;
862
961
}
863
962
pg_itoa (PostPortNumber , postmaster_port );
963
+
964
+ gucopts = list_head (pool -> startup_gucs );
965
+ if (pool -> cmdline_options )
966
+ dst += sprintf (dst , "%s" , pool -> cmdline_options );
967
+ while (gucopts )
968
+ {
969
+ char * name ;
970
+ char * value ;
971
+
972
+ name = lfirst (gucopts );
973
+ gucopts = lnext (pool -> startup_gucs , gucopts );
974
+
975
+ value = lfirst (gucopts );
976
+ gucopts = lnext (pool -> startup_gucs , gucopts );
977
+
978
+ if (strcmp (name , "application_name" ) != 0 )
979
+ {
980
+ dst += sprintf (dst , " -c %s=" , name );
981
+ dst = string_append (dst , value );
982
+ }
983
+ }
984
+ * dst = '\0' ;
985
+ elog (LOG , "Spawn backend with parameters \"%s\"" , options );
864
986
conn = LibpqConnectdbParams (keywords , values , error );
987
+ pfree (options );
865
988
if (!conn )
866
989
return NULL ;
867
990
0 commit comments