@@ -94,18 +94,23 @@ typedef struct
94
94
struct DmqSharedState
95
95
{
96
96
LWLock * lock ;
97
+
98
+ /* sender stuff */
99
+ pid_t sender_pid ;
97
100
dsm_handle out_dsm ;
98
101
DmqDestination destinations [DMQ_MAX_DESTINATIONS ];
99
102
103
+ /* receivers stuff */
100
104
HTAB * subscriptions ;
105
+ int n_receivers ;
106
+ struct
107
+ {
108
+ char name [DMQ_NAME_MAXLEN ];
109
+ dsm_handle dsm_h ;
110
+ int procno ;
111
+ bool active ;
112
+ } receivers [DMQ_MAX_RECEIVERS ];
101
113
102
- // XXX: change to nested struct
103
- char receiver [DMQ_MAX_RECEIVERS ][DMQ_NAME_MAXLEN ];
104
- dsm_handle receiver_dsm [DMQ_MAX_RECEIVERS ];
105
- int receiver_procno [DMQ_MAX_RECEIVERS ];
106
- int nreceivers ;
107
-
108
- pid_t sender_pid ;
109
114
} * dmq_state ;
110
115
111
116
@@ -116,7 +121,8 @@ struct
116
121
int n_inhandles ;
117
122
struct
118
123
{
119
- shm_mq_handle * mqh ;
124
+ dsm_segment * dsm_seg ;
125
+ shm_mq_handle * mqh ;
120
126
char name [DMQ_NAME_MAXLEN ];
121
127
int mask_pos ;
122
128
} inhandles [DMQ_MAX_RECEIVERS ];
@@ -198,11 +204,13 @@ dmq_shmem_startup_hook(void)
198
204
memset (dmq_state -> destinations , '\0' , sizeof (DmqDestination )* DMQ_MAX_DESTINATIONS );
199
205
200
206
dmq_state -> sender_pid = 0 ;
201
- dmq_state -> nreceivers = 0 ;
207
+ dmq_state -> n_receivers = 0 ;
202
208
for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
203
209
{
204
- dmq_state -> receiver [i ][0 ] = '\0' ;
205
- dmq_state -> receiver_dsm [i ] = DSM_HANDLE_INVALID ;
210
+ dmq_state -> receivers [i ].name [0 ] = '\0' ;
211
+ dmq_state -> receivers [i ].dsm_h = DSM_HANDLE_INVALID ;
212
+ dmq_state -> receivers [i ].procno = -1 ;
213
+ dmq_state -> receivers [i ].active = false;
206
214
}
207
215
}
208
216
@@ -913,8 +921,9 @@ dmq_receiver_at_exit(int status, Datum receiver)
913
921
char sender_name [DMQ_NAME_MAXLEN ];
914
922
915
923
LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
916
- strncpy (sender_name , dmq_state -> receiver [receiver_id ], DMQ_NAME_MAXLEN );
917
- dmq_state -> receiver [receiver_id ][0 ] = '\0' ;
924
+ strncpy (sender_name , dmq_state -> receivers [receiver_id ].name ,
925
+ DMQ_NAME_MAXLEN );
926
+ dmq_state -> receivers [receiver_id ].active = false;
918
927
LWLockRelease (dmq_state -> lock );
919
928
920
929
if (dmq_receiver_stop_hook )
@@ -937,7 +946,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
937
946
shm_mq_handle * * mq_handles ;
938
947
char * sender_name ;
939
948
int i ;
940
- int receiver_id ;
949
+ int receiver_id = -1 ;
941
950
double last_message_at = dmq_now ();
942
951
943
952
sender_name = text_to_cstring (PG_GETARG_TEXT_PP (0 ));
@@ -958,17 +967,34 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
958
967
959
968
/* register ourself in dmq_state */
960
969
LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
970
+
961
971
/* check for a conflicting receiver_name */
962
972
for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
963
973
{
964
- if (strcmp (dmq_state -> receiver [i ], sender_name ) == 0 )
965
- mtm_log (ERROR , "[DMQ] sender '%s' already connected" , sender_name );
974
+ if (strcmp (dmq_state -> receivers [i ].name , sender_name ) == 0 )
975
+ {
976
+ if (!dmq_state -> receivers [i ].active )
977
+ {
978
+ receiver_id = i ;
979
+ }
980
+ else
981
+ mtm_log (ERROR , "[DMQ] sender '%s' already connected" , sender_name );
982
+ }
983
+ }
984
+
985
+ if (receiver_id < 0 )
986
+ {
987
+ if (dmq_state -> n_receivers >= DMQ_MAX_RECEIVERS )
988
+ mtm_log (ERROR , "[DMQ] maximum number of dmq-receivers reached" );
989
+
990
+ receiver_id = dmq_state -> n_receivers ;
991
+ dmq_state -> n_receivers ++ ;
992
+ strncpy (dmq_state -> receivers [receiver_id ].name , sender_name , DMQ_NAME_MAXLEN );
966
993
}
967
- receiver_id = dmq_state -> nreceivers ;
968
- dmq_state -> receiver_dsm [receiver_id ] = dsm_segment_handle (seg );
969
- strncpy (dmq_state -> receiver [receiver_id ], sender_name , DMQ_NAME_MAXLEN );
970
- dmq_state -> nreceivers ++ ;
971
- dmq_state -> receiver_procno [receiver_id ] = MyProc -> pgprocno ;
994
+
995
+ dmq_state -> receivers [receiver_id ].dsm_h = dsm_segment_handle (seg );
996
+ dmq_state -> receivers [receiver_id ].procno = MyProc -> pgprocno ;
997
+ dmq_state -> receivers [receiver_id ].active = true;
972
998
LWLockRelease (dmq_state -> lock );
973
999
974
1000
on_shmem_exit (dmq_receiver_at_exit , Int32GetDatum (receiver_id ));
@@ -1152,7 +1178,6 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *payload
1152
1178
static bool
1153
1179
dmq_reattach_shm_mq (int handle_id )
1154
1180
{
1155
- dsm_segment * seg ;
1156
1181
shm_toc * toc ;
1157
1182
shm_mq * inq ;
1158
1183
MemoryContext oldctx ;
@@ -1166,11 +1191,12 @@ dmq_reattach_shm_mq(int handle_id)
1166
1191
for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
1167
1192
{
1168
1193
// XXX: change to hash maybe
1169
- if (strcmp (dmq_state -> receiver [i ], dmq_local .inhandles [handle_id ].name ) == 0 )
1194
+ if (strcmp (dmq_state -> receivers [i ].name , dmq_local .inhandles [handle_id ].name ) == 0
1195
+ && dmq_state -> receivers [i ].active )
1170
1196
{
1171
1197
receiver_id = i ;
1172
- receiver_procno = dmq_state -> receiver_procno [i ];
1173
- receiver_dsm = dmq_state -> receiver_dsm [receiver_id ];
1198
+ receiver_procno = dmq_state -> receivers [i ]. procno ;
1199
+ receiver_dsm = dmq_state -> receivers [receiver_id ]. dsm_h ;
1174
1200
break ;
1175
1201
}
1176
1202
}
@@ -1197,39 +1223,46 @@ dmq_reattach_shm_mq(int handle_id)
1197
1223
dmq_local .inhandles [handle_id ].name ,
1198
1224
receiver_dsm );
1199
1225
1200
- seg = dsm_attach (receiver_dsm );
1201
- if (seg == NULL )
1226
+ if (dmq_local .inhandles [handle_id ].dsm_seg )
1227
+ {
1228
+ if (dmq_local .inhandles [handle_id ].mqh )
1229
+ {
1230
+ mtm_log (DmqTraceShmMq , "[DMQ] detach shm_mq_handle %p" ,
1231
+ dmq_local .inhandles [handle_id ].mqh );
1232
+ shm_mq_detach (dmq_local .inhandles [handle_id ].mqh );
1233
+ }
1234
+ mtm_log (DmqTraceShmMq , "[DMQ] detach dsm_seg %p" ,
1235
+ dmq_local .inhandles [handle_id ].dsm_seg );
1236
+ dsm_detach (dmq_local .inhandles [handle_id ].dsm_seg );
1237
+ }
1238
+
1239
+ dmq_local .inhandles [handle_id ].dsm_seg = dsm_attach (receiver_dsm );
1240
+ if (dmq_local .inhandles [handle_id ].dsm_seg == NULL )
1202
1241
ereport (ERROR ,
1203
1242
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1204
1243
errmsg ("unable to map dynamic shared memory segment" )));
1205
1244
1206
- dsm_pin_mapping (seg );
1245
+ dsm_pin_mapping (dmq_local . inhandles [ handle_id ]. dsm_seg );
1207
1246
1208
- toc = shm_toc_attach (DMQ_MQ_MAGIC , dsm_segment_address (seg ));
1247
+
1248
+ toc = shm_toc_attach (DMQ_MQ_MAGIC ,
1249
+ dsm_segment_address (dmq_local .inhandles [handle_id ].dsm_seg ));
1209
1250
if (toc == NULL )
1210
1251
ereport (ERROR ,
1211
1252
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1212
1253
errmsg ("bad magic number in dynamic shared memory segment" )));
1213
1254
1214
1255
inq = shm_toc_lookup (toc , MyProc -> pgprocno , false);
1215
1256
1216
- if (dmq_local .inhandles [handle_id ].mqh )
1217
- {
1218
- shm_mq_detach (dmq_local .inhandles [handle_id ].mqh );
1219
- mtm_log (DmqTraceShmMq , "[DMQ] re-creating shm_mq handle %p" , inq );
1220
- }
1221
- else
1222
- {
1223
- mtm_log (DmqTraceShmMq , "[DMQ] creating shm_mq handle %p" , inq );
1224
- }
1225
-
1226
1257
/* re-create */
1227
- inq = shm_mq_create (inq , DMQ_MQ_SIZE );
1258
+ mtm_log (DmqTraceShmMq , "[DMQ] creating shm_mq handle %p" , inq );
1259
+ inq = shm_mq_create (inq , DMQ_MQ_SIZE ); // XXX
1228
1260
shm_mq_set_receiver (inq , MyProc );
1229
- shm_mq_set_sender (inq , & ProcGlobal -> allProcs [receiver_procno ]);
1261
+ shm_mq_set_sender (inq , & ProcGlobal -> allProcs [receiver_procno ]); // XXX
1230
1262
1231
1263
oldctx = MemoryContextSwitchTo (TopMemoryContext );
1232
- dmq_local .inhandles [handle_id ].mqh = shm_mq_attach (inq , seg , NULL );
1264
+ dmq_local .inhandles [handle_id ].mqh = shm_mq_attach (inq ,
1265
+ dmq_local .inhandles [handle_id ].dsm_seg , NULL );
1233
1266
MemoryContextSwitchTo (oldctx );
1234
1267
1235
1268
return true;
0 commit comments