51
51
* size limits, but now it doesn't seems to be worth of troubles.
52
52
*/
53
53
54
- #define DMQ_NAME_MAXLEN 32
55
54
#define DMQ_CONNSTR_MAX_LEN 150
56
55
57
56
#define DMQ_MAX_SUBS_PER_BACKEND 10
@@ -83,38 +82,42 @@ typedef struct
83
82
int procno ;
84
83
} DmqStreamSubscription ;
85
84
86
- typedef struct DmqSharedState
85
+
86
+ /* Global state for dmq */
87
+ struct DmqSharedState
87
88
{
88
89
LWLock * lock ;
89
90
dsm_handle out_dsm ;
90
91
DmqDestination destinations [DMQ_MAX_DESTINATIONS ];
91
92
92
93
HTAB * subscriptions ;
94
+
95
+ // XXX: change to nested struct
93
96
char receiver [DMQ_MAX_RECEIVERS ][DMQ_NAME_MAXLEN ];
94
97
dsm_handle receiver_dsm [DMQ_MAX_RECEIVERS ];
95
98
int receiver_procno [DMQ_MAX_RECEIVERS ];
96
99
int nreceivers ;
97
100
98
101
pid_t sender_pid ;
99
- } DmqSharedState ;
102
+ } * dmq_state ;
100
103
101
- static DmqSharedState * dmq_state ;
102
104
103
105
/* Backend-local i/o queues. */
104
- typedef struct DmqBackendState
106
+ struct
105
107
{
106
- shm_mq_handle * mq_outh ;
107
- shm_mq_handle * mq_inh [DMQ_MAX_RECEIVERS ];
108
+ shm_mq_handle * mq_outh ;
108
109
int n_inhandles ;
109
- char receiver_names [DMQ_MAX_RECEIVERS ][DMQ_NAME_MAXLEN ];
110
- int mask_pos [DMQ_MAX_RECEIVERS ];
111
- } DmqBackendState ;
110
+ struct
111
+ {
112
+ shm_mq_handle * mqh ;
113
+ char name [DMQ_NAME_MAXLEN ];
114
+ int mask_pos ;
115
+ } inhandles [DMQ_MAX_RECEIVERS ];
116
+ } dmq_local ;
112
117
113
118
/* Flags set by signal handlers */
114
119
static volatile sig_atomic_t got_SIGHUP = false;
115
120
116
- static DmqBackendState dmq_local ;
117
-
118
121
static shmem_startup_hook_type PreviousShmemStartupHook ;
119
122
120
123
dmq_receiver_hook_type dmq_receiver_start_hook ;
@@ -177,7 +180,7 @@ dmq_shmem_startup_hook(void)
177
180
LWLockAcquire (AddinShmemInitLock , LW_EXCLUSIVE );
178
181
179
182
dmq_state = ShmemInitStruct ("dmq" ,
180
- sizeof (DmqSharedState ),
183
+ sizeof (struct DmqSharedState ),
181
184
& found );
182
185
if (!found )
183
186
{
@@ -211,7 +214,7 @@ dmq_shmem_size(void)
211
214
{
212
215
Size size = 0 ;
213
216
214
- size = add_size (size , sizeof (DmqSharedState ));
217
+ size = add_size (size , sizeof (struct DmqSharedState ));
215
218
size = add_size (size , hash_estimate_size (DMQ_MAX_SUBS_PER_BACKEND * MaxBackends ,
216
219
sizeof (DmqStreamSubscription )));
217
220
return MAXALIGN (size );
@@ -612,7 +615,6 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
612
615
bool found ;
613
616
DmqStreamSubscription * sub ;
614
617
shm_mq_result res ;
615
- shm_mq * mq ;
616
618
617
619
/*
618
620
* Consume stream_name packed as a string and interpret rest of the data
@@ -653,7 +655,7 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
653
655
return ;
654
656
}
655
657
656
- elog (LOG , "got message %s.%s, passing to %d" , stream_name , body , sub -> procno );
658
+ // elog(LOG, "got message %s.%s, passing to %d", stream_name, body, sub->procno);
657
659
658
660
/* and send it */
659
661
res = shm_mq_send (mq_handles [sub -> procno ], body_len , body , false);
@@ -1046,13 +1048,43 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
1046
1048
1047
1049
// elog(LOG, "pushing l=%d '%.*s'", buf.len, buf.len, buf.data);
1048
1050
1051
+ // XXX: use sendv instead
1049
1052
res = shm_mq_send (dmq_local .mq_outh , buf .len , buf .data , false);
1050
1053
if (res != SHM_MQ_SUCCESS )
1051
1054
elog (ERROR , "dmq_push: can't send to queue" );
1052
1055
1053
1056
resetStringInfo (& buf );
1054
1057
}
1055
1058
1059
+
1060
+ void
1061
+ dmq_push_buffer (DmqDestinationId dest_id , char * stream_name , const void * payload , size_t len )
1062
+ {
1063
+ StringInfoData buf ;
1064
+ shm_mq_result res ;
1065
+
1066
+ ensure_outq_handle ();
1067
+
1068
+ initStringInfo (& buf );
1069
+ pq_sendbyte (& buf , dest_id );
1070
+ pq_send_ascii_string (& buf , stream_name );
1071
+ pq_sendbytes (& buf , payload , len );
1072
+
1073
+ // elog(LOG, "pushing l=%d '%.*s'", buf.len, buf.len, buf.data);
1074
+
1075
+ // XXX: use sendv instead
1076
+ res = shm_mq_send (dmq_local .mq_outh , buf .len , buf .data , false);
1077
+ if (res != SHM_MQ_SUCCESS )
1078
+ elog (ERROR , "dmq_push: can't send to queue" );
1079
+ }
1080
+
1081
+
1082
+
1083
+
1084
+
1085
+
1086
+
1087
+
1056
1088
static bool
1057
1089
dmq_reattach_shm_mq (int handle_id )
1058
1090
{
@@ -1065,12 +1097,11 @@ dmq_reattach_shm_mq(int handle_id)
1065
1097
int receiver_procno ;
1066
1098
int i ;
1067
1099
1068
- /* await for sender to connect */
1069
1100
LWLockAcquire (dmq_state -> lock , LW_SHARED );
1070
1101
for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
1071
1102
{
1072
- // XXX: change to hash
1073
- if (strcmp (dmq_state -> receiver [i ], dmq_local .receiver_names [handle_id ]) == 0 )
1103
+ // XXX: change to hash maybe
1104
+ if (strcmp (dmq_state -> receiver [i ], dmq_local .inhandles [handle_id ]. name ) == 0 )
1074
1105
{
1075
1106
receiver_id = i ;
1076
1107
receiver_procno = dmq_state -> receiver_procno [i ];
@@ -1098,9 +1129,9 @@ dmq_reattach_shm_mq(int handle_id)
1098
1129
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1099
1130
errmsg ("bad magic number in dynamic shared memory segment" )));
1100
1131
1101
- if (dmq_local .mq_inh [handle_id ])
1132
+ if (dmq_local .inhandles [handle_id ]. mqh )
1102
1133
{
1103
- shm_mq_detach (dmq_local .mq_inh [handle_id ]);
1134
+ shm_mq_detach (dmq_local .inhandles [handle_id ]. mqh );
1104
1135
}
1105
1136
1106
1137
inq = shm_toc_lookup (toc , MyProc -> pgprocno , false);
@@ -1111,15 +1142,28 @@ dmq_reattach_shm_mq(int handle_id)
1111
1142
shm_mq_set_sender (inq , & ProcGlobal -> allProcs [receiver_procno ]);
1112
1143
1113
1144
oldctx = MemoryContextSwitchTo (TopMemoryContext );
1114
- dmq_local .mq_inh [handle_id ] = shm_mq_attach (inq , seg , NULL );
1145
+ dmq_local .inhandles [handle_id ]. mqh = shm_mq_attach (inq , seg , NULL );
1115
1146
MemoryContextSwitchTo (oldctx );
1116
1147
1117
1148
return true;
1118
1149
}
1119
1150
1151
+ void
1152
+ dmq_attach_receiver (char * sender_name , int mask_pos )
1153
+ {
1154
+ int handle_id = dmq_local .n_inhandles ;
1155
+
1156
+ dmq_local .inhandles [handle_id ].mqh = NULL ;
1157
+ dmq_local .inhandles [handle_id ].mask_pos = mask_pos ;
1158
+ strncpy (dmq_local .inhandles [handle_id ].name , sender_name , DMQ_NAME_MAXLEN );
1159
+
1160
+ dmq_reattach_shm_mq (handle_id );
1161
+
1162
+ dmq_local .n_inhandles ++ ;
1163
+ }
1120
1164
1121
1165
void
1122
- dmq_stream_subscribe (char * sender_name , char * stream_name , int mask_pos )
1166
+ dmq_stream_subscribe (char * stream_name )
1123
1167
{
1124
1168
bool found ;
1125
1169
DmqStreamSubscription * sub ;
@@ -1133,17 +1177,19 @@ dmq_stream_subscribe(char *sender_name, char *stream_name, int mask_pos)
1133
1177
MyProc -> pgprocno , stream_name , sub -> procno , sub -> stream_name );
1134
1178
}
1135
1179
sub -> procno = MyProc -> pgprocno ;
1136
- sub -> mask_pos = mask_pos ;
1137
1180
LWLockRelease (dmq_state -> lock );
1181
+ }
1138
1182
1139
- dmq_local . mq_inh [ dmq_local . n_inhandles ] = NULL ;
1140
- strncpy ( dmq_local . receiver_names [ dmq_local . n_inhandles ], sender_name ,
1141
- DMQ_NAME_MAXLEN );
1142
- dmq_local . mask_pos [ dmq_local . n_inhandles ] = mask_pos ;
1183
+ void
1184
+ dmq_stream_unsubscribe ( char * stream_name )
1185
+ {
1186
+ bool found ;
1143
1187
1144
- dmq_reattach_shm_mq (dmq_local .n_inhandles );
1188
+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
1189
+ hash_search (dmq_state -> subscriptions , stream_name , HASH_REMOVE , & found );
1190
+ LWLockRelease (dmq_state -> lock );
1145
1191
1146
- dmq_local . n_inhandles ++ ;
1192
+ Assert ( found ) ;
1147
1193
}
1148
1194
1149
1195
void
@@ -1163,10 +1209,10 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg, int64 mask)
1163
1209
Size len ;
1164
1210
void * data ;
1165
1211
1166
- if (!BIT_CHECK (mask , dmq_local .mask_pos [i ]))
1212
+ if (!BIT_CHECK (mask , dmq_local .inhandles [i ]. mask_pos ))
1167
1213
continue ;
1168
1214
1169
- res = shm_mq_receive (dmq_local .mq_inh [i ], & len , & data , true);
1215
+ res = shm_mq_receive (dmq_local .inhandles [i ]. mqh , & len , & data , true);
1170
1216
if (res == SHM_MQ_SUCCESS )
1171
1217
{
1172
1218
msg -> data = data ;
@@ -1201,12 +1247,14 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg)
1201
1247
shm_mq_result res ;
1202
1248
int i ;
1203
1249
1250
+ return false;
1251
+
1204
1252
for (i = 0 ; i < dmq_local .n_inhandles ; i ++ )
1205
1253
{
1206
1254
Size len ;
1207
1255
void * data ;
1208
1256
1209
- res = shm_mq_receive (dmq_local .mq_inh [i ], & len , & data , true);
1257
+ res = shm_mq_receive (dmq_local .inhandles [i ]. mqh , & len , & data , true);
1210
1258
if (res == SHM_MQ_SUCCESS )
1211
1259
{
1212
1260
msg -> data = data ;
@@ -1258,23 +1306,3 @@ dmq_destination_add(char *connstr, char *sender_name, int ping_period)
1258
1306
}
1259
1307
1260
1308
1261
- void
1262
- dmq_push_buffer (DmqDestinationId dest_id , char * stream_name , const void * payload , size_t len )
1263
- {
1264
- StringInfoData buf ;
1265
- shm_mq_result res ;
1266
-
1267
- ensure_outq_handle ();
1268
-
1269
- initStringInfo (& buf );
1270
- pq_sendbyte (& buf , dest_id );
1271
- pq_send_ascii_string (& buf , stream_name );
1272
- pq_sendbytes (& buf , payload , len );
1273
-
1274
- // elog(LOG, "pushing l=%d '%.*s'", buf.len, buf.len, buf.data);
1275
-
1276
- res = shm_mq_send (dmq_local .mq_outh , buf .len , buf .data , false);
1277
- if (res != SHM_MQ_SUCCESS )
1278
- elog (ERROR , "dmq_push: can't send to queue" );
1279
- }
1280
-
0 commit comments