This repository was archived by the owner on Feb 22, 2020. It is now read-only.
File tree Expand file tree Collapse file tree 4 files changed +13
-7
lines changed
Expand file tree Collapse file tree 4 files changed +13
-7
lines changed Original file line number Diff line number Diff line change @@ -63,9 +63,11 @@ def get_default_fn(r_type):
6363 fn = self .routes .get (resp_type )
6464 else :
6565 fn = get_default_fn (type (resp ))
66-
67- self .logger .info ('handling response with %s' % fn .__name__ )
68- return fn (self ._context , resp )
66+ self .logger .info ('handling response with %s' % fn .__name__ )
67+ return fn (self ._context , resp )
68+ else :
69+ self .logger .warning ('the received message is not response' )
70+ return None
6971
7072
7173class ZmqClient :
Original file line number Diff line number Diff line change @@ -44,7 +44,7 @@ class StreamingClient(GrpcClient):
4444 def __init__ (self , args ):
4545 super ().__init__ (args )
4646
47- self ._request_queue = queue .Queue (maxsize = 1000 )
47+ self ._request_queue = queue .Queue (maxsize = 10 )
4848 self ._is_streaming = threading .Event ()
4949
5050 self ._dispatch_thread = threading .Thread (target = self ._start )
@@ -63,16 +63,16 @@ def _start(self):
6363 self ._is_streaming .clear ()
6464
6565 def _request_generator (self ):
66- while self . _is_streaming . is_set () :
66+ while True :
6767 try :
6868 request = self ._request_queue .get (block = True , timeout = 5.0 )
6969 if request is None :
7070 break
7171 yield request
7272 except queue .Empty :
73- continue
73+ break
7474 except Exception as e :
75- print ('exception: %s' % str (e ))
75+ self . logger . error ('exception: %s' % str (e ))
7676 break
7777
7878 @handler .register (NotImplementedError )
Original file line number Diff line number Diff line change @@ -46,6 +46,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None:
4646 else :
4747 idx = np .sort (np .random .choice (len (images ), self .sframes , replace = False ))
4848 chunk .blob .CopyFrom (array2blob (images [idx ]))
49+ del images
4950 else :
5051 self .logger .error (
5152 'bad document: "doc.chunks" is empty!' )
Original file line number Diff line number Diff line change @@ -125,6 +125,9 @@ def get_response(num_recv, blocked=False):
125125 with self .zmq_context as zmq_client :
126126
127127 for request in request_iterator :
128+ num_recv = max (self .pending_request - self .args .max_pending_request , 0 )
129+ yield from get_response (num_recv , num_recv > 0 )
130+
128131 zmq_client .send_message (self .add_envelope (request , zmq_client ), ** self .send_recv_kwargs )
129132 self .pending_request += 1
130133
You can’t perform that action at this time.
0 commit comments