@@ -111,20 +111,24 @@ def status(self):
111
111
while self .running :
112
112
msg = yield from self .child_pipe .coro_recv ()
113
113
if msg == 'status' :
114
- # print('evloop: got status request')
115
114
serialized_aggs = {}
116
- for name , aggregate in self .aggregates .items ():
117
- serialized_aggs [name ] = aggregate .as_dict ()
118
- aggregate .clear_values ()
115
+
116
+ for conn_id , conn_aggs in self .aggregates .items ():
117
+ serialized_aggs [conn_id ] = {}
118
+ for aggname , agg in conn_aggs .items ():
119
+ serialized_aggs [conn_id ][aggname ] = agg .as_dict ()
120
+ agg .clear_values ()
121
+
119
122
self .child_pipe .send (serialized_aggs )
120
- # print('evloop: sent status response')
121
123
else :
122
124
print ('evloop: unknown message' )
123
125
124
126
@asyncio .coroutine
125
127
def exec_tx (self , tx_block , aggname_prefix , conn_i ):
126
128
aggname = "%s_%i" % (aggname_prefix , conn_i )
127
- agg = self .aggregates [aggname ] = MtmTxAggregate (aggname )
129
+ if conn_i not in self .aggregates :
130
+ self .aggregates [conn_i ] = {}
131
+ agg = self .aggregates [conn_i ][aggname_prefix ] = MtmTxAggregate (aggname )
128
132
pool = yield from aiopg .create_pool (self .dsns [conn_i ])
129
133
conn = yield from pool .acquire ()
130
134
cur = yield from conn .cursor ()
@@ -167,8 +171,8 @@ def total_tx(self, conn, cur, agg):
167
171
total = yield from cur .fetchone ()
168
172
if total [0 ] != 0 :
169
173
agg .isolation += 1
170
- # print(self.oops)
171
- # print('Isolation error, total = ', total[0])
174
+ print (self .oops )
175
+ print ('Isolation error, total = ' , total [0 ])
172
176
# yield from cur.execute('select * from mtm.get_nodes_state()')
173
177
# nodes_state = yield from cur.fetchall()
174
178
# for i, col in enumerate(self.nodes_state_fields):
@@ -177,7 +181,6 @@ def total_tx(self, conn, cur, agg):
177
181
# print("%19s" % nodes_state[j][i], end="\t")
178
182
# print("\n")
179
183
180
-
181
184
def run (self ):
182
185
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
183
186
self .loop = asyncio .get_event_loop ()
@@ -196,13 +199,9 @@ def bgrun(self):
196
199
self .evloop_process = multiprocessing .Process (target = self .run , args = ())
197
200
self .evloop_process .start ()
198
201
199
- # XXX: introduce periodic report from client?
200
202
def get_aggregates (self , print = True ):
201
- # print('test: sending status request')
202
203
self .parent_pipe .send ('status' )
203
- # print('test: awaitng status response')
204
204
resp = self .parent_pipe .recv ()
205
- # print('test: got status response')
206
205
if print :
207
206
MtmClient .print_aggregates (resp )
208
207
return resp
@@ -216,7 +215,7 @@ def stop(self):
216
215
self .evloop_process .terminate ()
217
216
218
217
@classmethod
219
- def print_aggregates (cls , serialized_agg ):
218
+ def print_aggregates (cls , aggs ):
220
219
columns = ['running_latency' , 'max_latency' , 'isolation' , 'finish' ]
221
220
222
221
# print table header
@@ -225,23 +224,18 @@ def print_aggregates(cls, serialized_agg):
225
224
print (col , end = "\t " )
226
225
print ("\n " , end = "" )
227
226
228
- serialized_agg
229
-
230
- for aggname in sorted (serialized_agg .keys ()):
231
- agg = serialized_agg [aggname ]
232
- print ("%s\t " % aggname , end = "" )
233
- for col in columns :
234
- if col in agg :
227
+ for conn_id in aggs .keys ():
228
+ for aggname in aggs [conn_id ].keys ():
229
+ agg = aggs [conn_id ][aggname ]
230
+ print ("Node %d: %s\t " % (conn_id + 1 , aggname ), end = "" )
231
+ for col in columns :
235
232
if isinstance (agg [col ], float ):
236
233
print ("%.2f\t " % (agg [col ],), end = "\t " )
237
234
else :
238
235
print (agg [col ], end = "\t " )
239
- else :
240
- print ("-\t " , end = "" )
241
- print ("" )
236
+ print ("" )
242
237
print ("" )
243
238
244
-
245
239
if __name__ == "__main__" :
246
240
c = MtmClient (['dbname=postgres user=postgres host=127.0.0.1' ,
247
241
'dbname=postgres user=postgres host=127.0.0.1 port=5433' ,
0 commit comments