133
133
#include "access/slru.h"
134
134
#include "access/transam.h"
135
135
#include "access/xact.h"
136
+ #include "catalog/pg_collation.h"
136
137
#include "catalog/pg_database.h"
137
138
#include "commands/async.h"
138
139
#include "common/hashfn.h"
@@ -312,6 +313,12 @@ static SlruCtlData NotifyCtlData;
312
313
313
314
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
314
315
316
+ typedef struct
317
+ {
318
+ bool ispatt ;
319
+ char channel [FLEXIBLE_ARRAY_MEMBER ]; /* nul-terminated string */
320
+ } ListenChannel ;
321
+
315
322
/*
316
323
* listenChannels identifies the channels we are actually listening to
317
324
* (ie, have committed a LISTEN on). It is a simple list of channel names,
@@ -339,6 +346,7 @@ typedef enum
339
346
typedef struct
340
347
{
341
348
ListenActionKind action ;
349
+ bool ispatt ;
342
350
char channel [FLEXIBLE_ARRAY_MEMBER ]; /* nul-terminated string */
343
351
} ListenAction ;
344
352
@@ -430,13 +438,13 @@ int max_notify_queue_pages = 1048576;
430
438
/* local function prototypes */
431
439
static inline int64 asyncQueuePageDiff (int64 p , int64 q );
432
440
static inline bool asyncQueuePagePrecedes (int64 p , int64 q );
433
- static void queue_listen (ListenActionKind action , const char * channel );
441
+ static void queue_listen (ListenActionKind action , const bool ispatt , const char * channel );
434
442
static void Async_UnlistenOnExit (int code , Datum arg );
435
443
static void Exec_ListenPreCommit (void );
436
- static void Exec_ListenCommit (const char * channel );
437
- static void Exec_UnlistenCommit (const char * channel );
444
+ static void Exec_ListenCommit (const bool ispatt , const char * channel );
445
+ static void Exec_UnlistenCommit (const bool ispatt , const char * channel );
438
446
static void Exec_UnlistenAllCommit (void );
439
- static bool IsListeningOn (const char * channel );
447
+ static bool IsListeningOn (const bool trymatch , const bool ispatt , const char * channel );
440
448
static void asyncQueueUnregister (void );
441
449
static bool asyncQueueIsFull (void );
442
450
static bool asyncQueueAdvance (volatile QueuePosition * position , int entryLength );
@@ -687,7 +695,7 @@ Async_Notify(const char *channel, const char *payload)
687
695
* commit.
688
696
*/
689
697
static void
690
- queue_listen (ListenActionKind action , const char * channel )
698
+ queue_listen (ListenActionKind action , const bool ispatt , const char * channel )
691
699
{
692
700
MemoryContext oldcontext ;
693
701
ListenAction * actrec ;
@@ -705,6 +713,7 @@ queue_listen(ListenActionKind action, const char *channel)
705
713
actrec = (ListenAction * ) palloc (offsetof(ListenAction , channel ) +
706
714
strlen (channel ) + 1 );
707
715
actrec -> action = action ;
716
+ actrec -> ispatt = ispatt ;
708
717
strcpy (actrec -> channel , channel );
709
718
710
719
if (pendingActions == NULL || my_level > pendingActions -> nestingLevel )
@@ -735,12 +744,12 @@ queue_listen(ListenActionKind action, const char *channel)
735
744
* This is executed by the SQL listen command.
736
745
*/
737
746
void
738
- Async_Listen (const char * channel )
747
+ Async_Listen (const bool ispatt , const char * channel )
739
748
{
740
749
if (Trace_notify )
741
750
elog (DEBUG1 , "Async_Listen(%s,%d)" , channel , MyProcPid );
742
751
743
- queue_listen (LISTEN_LISTEN , channel );
752
+ queue_listen (LISTEN_LISTEN , ispatt , channel );
744
753
}
745
754
746
755
/*
@@ -749,7 +758,7 @@ Async_Listen(const char *channel)
749
758
* This is executed by the SQL unlisten command.
750
759
*/
751
760
void
752
- Async_Unlisten (const char * channel )
761
+ Async_Unlisten (const bool ispatt , const char * channel )
753
762
{
754
763
if (Trace_notify )
755
764
elog (DEBUG1 , "Async_Unlisten(%s,%d)" , channel , MyProcPid );
@@ -758,7 +767,7 @@ Async_Unlisten(const char *channel)
758
767
if (pendingActions == NULL && !unlistenExitRegistered )
759
768
return ;
760
769
761
- queue_listen (LISTEN_UNLISTEN , channel );
770
+ queue_listen (LISTEN_UNLISTEN , ispatt , channel );
762
771
}
763
772
764
773
/*
@@ -776,7 +785,7 @@ Async_UnlistenAll(void)
776
785
if (pendingActions == NULL && !unlistenExitRegistered )
777
786
return ;
778
787
779
- queue_listen (LISTEN_UNLISTEN_ALL , "" );
788
+ queue_listen (LISTEN_UNLISTEN_ALL , false, "" );
780
789
}
781
790
782
791
/*
@@ -803,10 +812,31 @@ pg_listening_channels(PG_FUNCTION_ARGS)
803
812
804
813
if (funcctx -> call_cntr < list_length (listenChannels ))
805
814
{
806
- char * channel = (char * ) list_nth (listenChannels ,
807
- funcctx -> call_cntr );
815
+ ListenChannel * chnl ;
816
+
817
+ chnl = (ListenChannel * )list_nth (listenChannels , funcctx -> call_cntr );
818
+
819
+ if (chnl -> ispatt )
820
+ {
821
+ Size plen ;
822
+ char * result ;
823
+ MemoryContext oldcontext ;
824
+
825
+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
826
+
827
+ plen = strlen (chnl -> channel );
828
+ result = (char * )palloc (plen + 3 );
829
+ result [0 ] = '\'' ;
830
+ memcpy (result + 1 , chnl -> channel , plen );
831
+ result [plen + 1 ] = '\'' ;
832
+ result [plen + 2 ] = '\0' ;
808
833
809
- SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (channel ));
834
+ MemoryContextSwitchTo (oldcontext );
835
+
836
+ SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (result ));
837
+ }
838
+ else
839
+ SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (chnl -> channel ));
810
840
}
811
841
812
842
SRF_RETURN_DONE (funcctx );
@@ -989,10 +1019,10 @@ AtCommit_Notify(void)
989
1019
switch (actrec -> action )
990
1020
{
991
1021
case LISTEN_LISTEN :
992
- Exec_ListenCommit (actrec -> channel );
1022
+ Exec_ListenCommit (actrec -> ispatt , actrec -> channel );
993
1023
break ;
994
1024
case LISTEN_UNLISTEN :
995
- Exec_UnlistenCommit (actrec -> channel );
1025
+ Exec_UnlistenCommit (actrec -> ispatt , actrec -> channel );
996
1026
break ;
997
1027
case LISTEN_UNLISTEN_ALL :
998
1028
Exec_UnlistenAllCommit ();
@@ -1133,12 +1163,13 @@ Exec_ListenPreCommit(void)
1133
1163
* Add the channel to the list of channels we are listening on.
1134
1164
*/
1135
1165
static void
1136
- Exec_ListenCommit (const char * channel )
1166
+ Exec_ListenCommit (const bool ispatt , const char * channel )
1137
1167
{
1138
- MemoryContext oldcontext ;
1168
+ MemoryContext oldcontext ;
1169
+ ListenChannel * chnl ;
1139
1170
1140
1171
/* Do nothing if we are already listening on this channel */
1141
- if (IsListeningOn (channel ))
1172
+ if (IsListeningOn (false, ispatt , channel ))
1142
1173
return ;
1143
1174
1144
1175
/*
@@ -1150,7 +1181,15 @@ Exec_ListenCommit(const char *channel)
1150
1181
* later.
1151
1182
*/
1152
1183
oldcontext = MemoryContextSwitchTo (TopMemoryContext );
1153
- listenChannels = lappend (listenChannels , pstrdup (channel ));
1184
+
1185
+ chnl = (ListenChannel * ) palloc (offsetof(ListenChannel , channel ) +
1186
+ strlen (channel ) + 1 );
1187
+
1188
+ chnl -> ispatt = ispatt ;
1189
+ strcpy (chnl -> channel , channel );
1190
+
1191
+ listenChannels = lappend (listenChannels , chnl );
1192
+
1154
1193
MemoryContextSwitchTo (oldcontext );
1155
1194
}
1156
1195
@@ -1160,7 +1199,7 @@ Exec_ListenCommit(const char *channel)
1160
1199
* Remove the specified channel name from listenChannels.
1161
1200
*/
1162
1201
static void
1163
- Exec_UnlistenCommit (const char * channel )
1202
+ Exec_UnlistenCommit (const bool ispatt , const char * channel )
1164
1203
{
1165
1204
ListCell * q ;
1166
1205
@@ -1169,9 +1208,12 @@ Exec_UnlistenCommit(const char *channel)
1169
1208
1170
1209
foreach (q , listenChannels )
1171
1210
{
1172
- char * lchan = (char * ) lfirst (q );
1211
+ ListenChannel * lchan = (ListenChannel * ) lfirst (q );
1212
+
1213
+ if (lchan -> ispatt != ispatt )
1214
+ continue ;
1173
1215
1174
- if (strcmp (lchan , channel ) == 0 )
1216
+ if (strcmp (lchan -> channel , channel ) == 0 )
1175
1217
{
1176
1218
listenChannels = foreach_delete_current (listenChannels , q );
1177
1219
pfree (lchan );
@@ -1209,16 +1251,37 @@ Exec_UnlistenAllCommit(void)
1209
1251
* fairly short, though.
1210
1252
*/
1211
1253
static bool
1212
- IsListeningOn (const char * channel )
1254
+ IsListeningOn (const bool trymatch , const bool ispatt , const char * channel )
1213
1255
{
1214
1256
ListCell * p ;
1215
1257
1216
1258
foreach (p , listenChannels )
1217
1259
{
1218
- char * lchan = (char * ) lfirst (p );
1260
+ ListenChannel * lchan = (ListenChannel * ) lfirst (p );
1219
1261
1220
- if (strcmp (lchan , channel ) == 0 )
1221
- return true;
1262
+ if (trymatch )
1263
+ {
1264
+ Assert (!ispatt );
1265
+
1266
+ if (lchan -> ispatt )
1267
+ {
1268
+ Datum s = PointerGetDatum (cstring_to_text (channel ));
1269
+ Datum p = PointerGetDatum (cstring_to_text (lchan -> channel ));
1270
+
1271
+ if (DatumGetBool (DirectFunctionCall2Coll (textlike , DEFAULT_COLLATION_OID , s , p )))
1272
+ return true;
1273
+ }
1274
+ else if (strcmp (lchan -> channel , channel ) == 0 )
1275
+ return true;
1276
+ }
1277
+ else
1278
+ {
1279
+ if (ispatt == lchan -> ispatt )
1280
+ {
1281
+ if (strcmp (lchan -> channel , channel ) == 0 )
1282
+ return true;
1283
+ }
1284
+ }
1222
1285
}
1223
1286
return false;
1224
1287
}
@@ -2071,7 +2134,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
2071
2134
/* qe->data is the null-terminated channel name */
2072
2135
char * channel = qe -> data ;
2073
2136
2074
- if (IsListeningOn (channel ))
2137
+ if (IsListeningOn (true, false, channel ))
2075
2138
{
2076
2139
/* payload follows channel name */
2077
2140
char * payload = qe -> data + strlen (channel ) + 1 ;
0 commit comments