42
42
#define DMQ_MQ_SIZE ((Size) 65536)
43
43
#define DMQ_MQ_MAGIC 0x646d71
44
44
45
+ // XXX: move to common
46
+ #define BIT_CHECK (mask , bit ) (((mask) & ((int64)1 << (bit))) != 0)
47
+
45
48
/*
46
49
* Shared data structures to hold current connections topology.
47
50
* All that stuff can be moved to persistent tables to avoid hardcoded
@@ -76,6 +79,7 @@ typedef struct {
76
79
typedef struct
77
80
{
78
81
char stream_name [DMQ_NAME_MAXLEN ];
82
+ int mask_pos ;
79
83
int procno ;
80
84
} DmqStreamSubscription ;
81
85
@@ -88,6 +92,7 @@ typedef struct DmqSharedState
88
92
HTAB * subscriptions ;
89
93
char receiver [DMQ_MAX_RECEIVERS ][DMQ_NAME_MAXLEN ];
90
94
dsm_handle receiver_dsm [DMQ_MAX_RECEIVERS ];
95
+ int receiver_procno [DMQ_MAX_RECEIVERS ];
91
96
int nreceivers ;
92
97
93
98
pid_t sender_pid ;
@@ -99,8 +104,10 @@ static DmqSharedState *dmq_state;
99
104
typedef struct DmqBackendState
100
105
{
101
106
shm_mq_handle * mq_outh ;
102
- shm_mq_handle * mq_inh [DMQ_MAX_DESTINATIONS ];
107
+ shm_mq_handle * mq_inh [DMQ_MAX_RECEIVERS ];
103
108
int n_inhandles ;
109
+ char receiver_names [DMQ_MAX_RECEIVERS ][DMQ_NAME_MAXLEN ];
110
+ int mask_pos [DMQ_MAX_RECEIVERS ];
104
111
} DmqBackendState ;
105
112
106
113
/* Flags set by signal handlers */
@@ -529,8 +536,8 @@ dmq_sender_main(Datum main_arg)
529
536
{
530
537
conns [conn_id ].state = Idle ;
531
538
DeleteWaitEvent (set , event .pos );
532
- elog (LOG , "[DMQ] [B1] failed to connect: %s" ,
533
- PQerrorMessage (conns [conn_id ].pgconn ));
539
+ // elog(LOG, "[DMQ] [B1] failed to connect: %s",
540
+ // PQerrorMessage(conns[conn_id].pgconn));
534
541
}
535
542
else
536
543
Assert (status == PGRES_POLLING_WRITING );
@@ -646,14 +653,6 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
646
653
return ;
647
654
}
648
655
649
- /* select queue and reconnect it if needed */
650
- mq = shm_mq_get_queue (mq_handles [sub -> procno ]);
651
- if (shm_mq_get_sender (mq ) == NULL )
652
- {
653
- shm_mq_set_sender (mq , MyProc );
654
- mq_handles [sub -> procno ] = shm_mq_attach (mq , seg , NULL );
655
- }
656
-
657
656
elog (LOG , "got message %s.%s, passing to %d" , stream_name , body , sub -> procno );
658
657
659
658
/* and send it */
@@ -843,11 +842,12 @@ dmq_receiver_at_exit(int status, Datum sender)
843
842
char sender_name [DMQ_NAME_MAXLEN ];
844
843
845
844
LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
846
- strncmp (sender_name , dmq_state -> receiver [sender_id ], DMQ_NAME_MAXLEN );
845
+ strncpy (sender_name , dmq_state -> receiver [sender_id ], DMQ_NAME_MAXLEN );
847
846
dmq_state -> receiver [sender_id ][0 ] = '\0' ;
848
847
LWLockRelease (dmq_state -> lock );
849
848
850
- dmq_receiver_stop_hook (sender_name );
849
+ if (dmq_receiver_stop_hook )
850
+ dmq_receiver_stop_hook (sender_name );
851
851
}
852
852
853
853
@@ -897,6 +897,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
897
897
dmq_state -> receiver_dsm [receiver_id ] = dsm_segment_handle (seg );
898
898
strncpy (dmq_state -> receiver [receiver_id ], sender_name , DMQ_NAME_MAXLEN );
899
899
dmq_state -> nreceivers ++ ;
900
+ dmq_state -> receiver_procno [receiver_id ] = MyProc -> pgprocno ;
900
901
LWLockRelease (dmq_state -> lock );
901
902
902
903
on_shmem_exit (dmq_receiver_at_exit , Int32GetDatum (receiver_id ));
@@ -1052,53 +1053,34 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
1052
1053
resetStringInfo (& buf );
1053
1054
}
1054
1055
1055
- void
1056
- dmq_stream_subscribe ( char * sender_name , char * stream_name )
1056
+ static bool
1057
+ dmq_reattach_shm_mq ( int handle_id )
1057
1058
{
1058
- bool found ;
1059
- DmqStreamSubscription * sub ;
1060
- int receiver_id = -1 ;
1061
-
1062
1059
dsm_segment * seg ;
1063
1060
shm_toc * toc ;
1064
1061
shm_mq * inq ;
1065
1062
MemoryContext oldctx ;
1066
1063
1067
- LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
1068
- sub = (DmqStreamSubscription * ) hash_search (dmq_state -> subscriptions , stream_name ,
1069
- HASH_ENTER , & found );
1070
- if (found && sub -> procno != MyProc -> pgprocno )
1071
- {
1072
- elog (ERROR , "procno%d: %s: subscription is already active for procno %d / %s" ,
1073
- MyProc -> pgprocno , stream_name , sub -> procno , sub -> stream_name );
1074
- }
1075
- else
1076
- sub -> procno = MyProc -> pgprocno ;
1077
- LWLockRelease (dmq_state -> lock );
1064
+ int receiver_id = -1 ;
1065
+ int receiver_procno ;
1066
+ int i ;
1078
1067
1079
1068
/* await for sender to connect */
1080
1069
LWLockAcquire (dmq_state -> lock , LW_SHARED );
1081
- for (;; )
1070
+ for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
1082
1071
{
1083
- int i ;
1084
-
1085
- for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
1072
+ // XXX: change to hash
1073
+ if (strcmp (dmq_state -> receiver [i ], dmq_local .receiver_names [handle_id ]) == 0 )
1086
1074
{
1087
- if (strcmp (dmq_state -> receiver [i ], sender_name ) == 0 )
1088
- {
1089
- receiver_id = i ;
1090
- break ;
1091
- }
1092
- }
1093
-
1094
- if (receiver_id >= 0 )
1075
+ receiver_id = i ;
1076
+ receiver_procno = dmq_state -> receiver_procno [i ];
1095
1077
break ;
1096
-
1097
- LWLockRelease (dmq_state -> lock );
1098
- pg_usleep (100000L );
1099
- CHECK_FOR_INTERRUPTS ();
1100
- LWLockAcquire (dmq_state -> lock , LW_SHARED );
1078
+ }
1101
1079
}
1080
+ LWLockRelease (dmq_state -> lock );
1081
+
1082
+ if (receiver_id < 0 )
1083
+ return false;
1102
1084
1103
1085
Assert (dmq_state -> receiver_dsm [receiver_id ] != DSM_HANDLE_INVALID );
1104
1086
@@ -1108,8 +1090,6 @@ dmq_stream_subscribe(char *sender_name, char *stream_name)
1108
1090
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1109
1091
errmsg ("unable to map dynamic shared memory segment" )));
1110
1092
1111
- LWLockRelease (dmq_state -> lock );
1112
-
1113
1093
dsm_pin_mapping (seg );
1114
1094
1115
1095
toc = shm_toc_attach (DMQ_MQ_MAGIC , dsm_segment_address (seg ));
@@ -1118,30 +1098,74 @@ dmq_stream_subscribe(char *sender_name, char *stream_name)
1118
1098
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1119
1099
errmsg ("bad magic number in dynamic shared memory segment" )));
1120
1100
1101
+ if (dmq_local .mq_inh [handle_id ])
1102
+ {
1103
+ shm_mq_detach (dmq_local .mq_inh [handle_id ]);
1104
+ }
1105
+
1121
1106
inq = shm_toc_lookup (toc , MyProc -> pgprocno , false);
1122
- // xxx memleak
1107
+
1108
+ /* re-create */
1123
1109
inq = shm_mq_create (inq , DMQ_MQ_SIZE );
1124
1110
shm_mq_set_receiver (inq , MyProc );
1111
+ shm_mq_set_sender (inq , & ProcGlobal -> allProcs [receiver_procno ]);
1125
1112
1126
1113
oldctx = MemoryContextSwitchTo (TopMemoryContext );
1127
- dmq_local .mq_inh [dmq_local . n_inhandles ++ ] = shm_mq_attach (inq , seg , NULL );
1114
+ dmq_local .mq_inh [handle_id ] = shm_mq_attach (inq , seg , NULL );
1128
1115
MemoryContextSwitchTo (oldctx );
1116
+
1117
+ return true;
1129
1118
}
1130
1119
1120
+
1131
1121
void
1132
- dmq_pop (DmqSenderId * sender_id , StringInfo msg )
1122
+ dmq_stream_subscribe (char * sender_name , char * stream_name , int mask_pos )
1123
+ {
1124
+ bool found ;
1125
+ DmqStreamSubscription * sub ;
1126
+
1127
+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
1128
+ sub = (DmqStreamSubscription * ) hash_search (dmq_state -> subscriptions , stream_name ,
1129
+ HASH_ENTER , & found );
1130
+ if (found && sub -> procno != MyProc -> pgprocno )
1131
+ {
1132
+ elog (ERROR , "procno%d: %s: subscription is already active for procno %d / %s" ,
1133
+ MyProc -> pgprocno , stream_name , sub -> procno , sub -> stream_name );
1134
+ }
1135
+ sub -> procno = MyProc -> pgprocno ;
1136
+ sub -> mask_pos = mask_pos ;
1137
+ LWLockRelease (dmq_state -> lock );
1138
+
1139
+ dmq_local .mq_inh [dmq_local .n_inhandles ] = NULL ;
1140
+ strncpy (dmq_local .receiver_names [dmq_local .n_inhandles ], sender_name ,
1141
+ DMQ_NAME_MAXLEN );
1142
+ dmq_local .mask_pos [dmq_local .n_inhandles ] = mask_pos ;
1143
+
1144
+ dmq_reattach_shm_mq (dmq_local .n_inhandles );
1145
+
1146
+ dmq_local .n_inhandles ++ ;
1147
+ }
1148
+
1149
+ void
1150
+ dmq_pop (DmqSenderId * sender_id , StringInfo msg , int64 mask )
1133
1151
{
1134
1152
shm_mq_result res ;
1135
1153
1136
1154
for (;;)
1137
1155
{
1138
1156
int i ;
1157
+ bool nowait = false;
1158
+
1159
+ CHECK_FOR_INTERRUPTS ();
1139
1160
1140
1161
for (i = 0 ; i < dmq_local .n_inhandles ; i ++ )
1141
1162
{
1142
1163
Size len ;
1143
1164
void * data ;
1144
1165
1166
+ if (!BIT_CHECK (mask , dmq_local .mask_pos [i ]))
1167
+ continue ;
1168
+
1145
1169
res = shm_mq_receive (dmq_local .mq_inh [i ], & len , & data , true);
1146
1170
if (res == SHM_MQ_SUCCESS )
1147
1171
{
@@ -1155,14 +1179,19 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg)
1155
1179
}
1156
1180
else if (res == SHM_MQ_DETACHED )
1157
1181
{
1158
- elog (ERROR , "dmq_pop: queue detached" );
1182
+ if (dmq_reattach_shm_mq (i ))
1183
+ nowait = true;
1184
+ else
1185
+ elog (ERROR , "dmq_pop: queue detached" );
1159
1186
}
1160
1187
}
1161
1188
1189
+ if (nowait )
1190
+ continue ;
1191
+
1162
1192
// XXX cache that
1163
1193
WaitLatch (MyLatch , WL_LATCH_SET , 10 , WAIT_EVENT_MQ_RECEIVE );
1164
1194
ResetLatch (MyLatch );
1165
- CHECK_FOR_INTERRUPTS ();
1166
1195
}
1167
1196
}
1168
1197
0 commit comments