@@ -47,6 +47,7 @@ SessionPoolKey;
47
47
*/
48
48
typedef struct Channel
49
49
{
50
+ int magic ;
50
51
char * buf ;
51
52
int rx_pos ;
52
53
int tx_pos ;
@@ -79,6 +80,9 @@ typedef struct Channel
79
80
}
80
81
Channel ;
81
82
83
+ #define ACTIVE_CHANNEL_MAGIC 0xDEFA1234U
84
+ #define REMOVED_CHANNEL_MAGIC 0xDEADDEEDU
85
+
82
86
/*
83
87
* Control structure for connection proxies (several proxy workers can be launched and each has it sown proxy instance).
84
88
* Proxy contains hash of session pools for reach role/dbname combination.
@@ -533,7 +537,8 @@ static bool
533
537
is_transactional_statement (char * stmt )
534
538
{
535
539
static char const * const non_tx_stmts [] = {
536
- "create" ,
540
+ "create tablespace" ,
541
+ "create database" ,
537
542
"cluster" ,
538
543
"drop" ,
539
544
"discard" ,
@@ -670,13 +675,25 @@ channel_read(Channel* chan)
670
675
chan -> prev_gucs = NULL ;
671
676
}
672
677
if (ProxyingGUCs
673
- && pg_strncasecmp (stmt , "set" , 3 ) == 0
674
- && pg_strncasecmp (stmt + 3 , " local" , 6 ) != 0 )
678
+ && ((pg_strncasecmp (stmt , "set" , 3 ) == 0
679
+ && pg_strncasecmp (stmt + 3 , " local" , 6 ) != 0 )
680
+ || pg_strncasecmp (stmt , "reset" , 5 ) == 0 ))
675
681
{
676
682
char * new_msg ;
677
683
chan -> prev_gucs = chan -> gucs ? chan -> gucs : pstrdup ("" );
678
- chan -> gucs = psprintf ("%sset local%s%c" , chan -> prev_gucs , stmt + 3 ,
679
- chan -> buf [chan -> rx_pos - 2 ] == ';' ? ' ' : ';' );
684
+ if (pg_strncasecmp (stmt , "reset" , 5 ) == 0 )
685
+ {
686
+ char * semi = strchr (stmt + 5 , ';' );
687
+ if (semi )
688
+ * semi = '\0' ;
689
+ chan -> gucs = psprintf ("%sset local%s=default;" ,
690
+ chan -> prev_gucs , stmt + 5 );
691
+ }
692
+ else
693
+ {
694
+ chan -> gucs = psprintf ("%sset local%s%c" , chan -> prev_gucs , stmt + 3 ,
695
+ chan -> buf [chan -> rx_pos - 2 ] == ';' ? ' ' : ';' );
696
+ }
680
697
new_msg = chan -> gucs + strlen (chan -> prev_gucs );
681
698
Assert (msg_start + strlen (new_msg )* 2 + 6 < chan -> buf_size );
682
699
/*
@@ -786,6 +803,7 @@ static Channel*
786
803
channel_create (Proxy * proxy )
787
804
{
788
805
Channel * chan = (Channel * )palloc0 (sizeof (Channel ));
806
+ chan -> magic = ACTIVE_CHANNEL_MAGIC ;
789
807
chan -> proxy = proxy ;
790
808
chan -> buf = palloc (INIT_BUF_SIZE );
791
809
chan -> buf_size = INIT_BUF_SIZE ;
@@ -877,6 +895,7 @@ backend_start(SessionPool* pool, char** error)
877
895
* error = strdup ("Too much sessios: try to increase 'max_sessions' configuration parameter" );
878
896
/* Too much sessions, error report was already logged */
879
897
closesocket (chan -> backend_socket );
898
+ chan -> magic = REMOVED_CHANNEL_MAGIC ;
880
899
pfree (chan -> buf );
881
900
pfree (chan );
882
901
chan = NULL ;
@@ -908,6 +927,7 @@ proxy_add_client(Proxy* proxy, Port* port)
908
927
#if defined(ENABLE_GSS ) || defined(ENABLE_SSPI )
909
928
pfree (port -> gss );
910
929
#endif
930
+ chan -> magic = REMOVED_CHANNEL_MAGIC ;
911
931
pfree (port );
912
932
pfree (chan -> buf );
913
933
pfree (chan );
@@ -959,6 +979,7 @@ channel_remove(Channel* chan)
959
979
free (error );
960
980
}
961
981
}
982
+ chan -> magic = REMOVED_CHANNEL_MAGIC ;
962
983
pfree (chan -> buf );
963
984
pfree (chan );
964
985
}
@@ -1035,12 +1056,39 @@ proxy_loop(Proxy* proxy)
1035
1056
proxy_add_client (proxy , port );
1036
1057
}
1037
1058
}
1038
- else
1059
+ /*
1060
+ * epoll may return event for already closed session if
1061
+ * socket is still openned. From epoll documentation: Q6
1062
+ * Will closing a file descriptor cause it to be removed
1063
+ * from all epoll sets automatically?
1064
+ *
1065
+ * A6 Yes, but be aware of the following point. A file
1066
+ * descriptor is a reference to an open file description
1067
+ * (see open(2)). Whenever a descriptor is duplicated via
1068
+ * dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new
1069
+ * file descriptor referring to the same open file
1070
+ * description is created. An open file description
1071
+ * continues to exist until all file descriptors
1072
+ * referring to it have been closed. A file descriptor is
1073
+ * removed from an epoll set only after all the file
1074
+ * descriptors referring to the underlying open file
1075
+ * description have been closed (or before if the
1076
+ * descriptor is explicitly removed using epoll_ctl(2)
1077
+ * EPOLL_CTL_DEL). This means that even after a file
1078
+ * descriptor that is part of an epoll set has been
1079
+ * closed, events may be reported for that file
1080
+ * descriptor if other file descriptors referring to
1081
+ * the same underlying file description remain open.
1082
+ *
1083
+ * Using this check for valid magic field we try to ignore
1084
+ * such events.
1085
+ */
1086
+ else if (chan -> magic == ACTIVE_CHANNEL_MAGIC )
1039
1087
{
1040
1088
if (ready [i ].events & WL_SOCKET_WRITEABLE ) {
1041
1089
ELOG (LOG , "Channel %p is writable" , chan );
1042
1090
channel_write (chan , false);
1043
- if (chan -> peer == NULL || chan -> peer -> tx_size == 0 ) /* nothing to write */
1091
+ if (chan -> magic == ACTIVE_CHANNEL_MAGIC && ( chan -> peer == NULL || chan -> peer -> tx_size == 0 ) ) /* nothing to write */
1044
1092
{
1045
1093
/* At systems not supporting epoll edge triggering (Win32, FreeBSD, MacOS), we need to disable writable event to avoid busy loop */
1046
1094
ModifyWaitEvent (chan -> proxy -> wait_events , chan -> event_pos , WL_SOCKET_READABLE | WL_SOCKET_EDGE , NULL );
@@ -1050,7 +1098,7 @@ proxy_loop(Proxy* proxy)
1050
1098
if (ready [i ].events & WL_SOCKET_READABLE ) {
1051
1099
ELOG (LOG , "Channel %p is readable" , chan );
1052
1100
channel_read (chan );
1053
- if (chan -> tx_size != 0 ) /* pending write: read is not prohibited */
1101
+ if (chan -> magic == ACTIVE_CHANNEL_MAGIC && chan -> tx_size != 0 ) /* pending write: read is not prohibited */
1054
1102
{
1055
1103
/* At systems not supporting epoll edge triggering (Win32, FreeBSD, MacOS), we need to disable readable event to avoid busy loop */
1056
1104
ModifyWaitEvent (chan -> proxy -> wait_events , chan -> event_pos , WL_SOCKET_WRITEABLE | WL_SOCKET_EDGE , NULL );
0 commit comments