11
11
import aioprocessing
12
12
import multiprocessing
13
13
import logging
14
+ import re
14
15
15
16
class MtmTxAggregate (object ):
16
17
@@ -26,16 +27,19 @@ def clear_values(self):
26
27
def start_tx (self ):
27
28
self .start_time = datetime .datetime .now ()
28
29
29
- def finish_tx (self , name ):
30
+ def finish_tx (self , status ):
30
31
latency = (datetime .datetime .now () - self .start_time ).total_seconds ()
31
32
33
+ if "is aborted on node" in status :
34
+ status = re .sub (r'MTM-.+\)' , '<censored>' , status )
35
+
32
36
if latency > self .max_latency :
33
37
self .max_latency = latency
34
38
35
- if name not in self .finish :
36
- self .finish [name ] = 1
39
+ if status not in self .finish :
40
+ self .finish [status ] = 1
37
41
else :
38
- self .finish [name ] += 1
42
+ self .finish [status ] += 1
39
43
40
44
def as_dict (self ):
41
45
return {
@@ -62,6 +66,7 @@ def __init__(self, dsns, n_accounts=100000):
62
66
# logging.basicConfig(level=logging.DEBUG)
63
67
self .n_accounts = n_accounts
64
68
self .dsns = dsns
69
+ self .total = 0
65
70
self .aggregates = {}
66
71
keep_trying (40 , 1 , self .initdb , 'self.initdb' )
67
72
self .running = True
@@ -176,11 +181,11 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
176
181
# enable_hstore tries to perform select from database
177
182
# which in case of select's failure will lead to exception
178
183
# and stale connection to the database
179
- conn = yield from aiopg .connect (dsn , enable_hstore = False , timeout = 3600 )
180
- print ("reconnected" )
184
+ conn = yield from aiopg .connect (dsn , enable_hstore = False , timeout = 1 )
185
+ print ('Connected %s, %d' % ( aggname_prefix , conn_i + 1 ) )
181
186
182
187
if (not cur ) or cur .closed :
183
- cur = yield from conn .cursor ()
188
+ cur = yield from conn .cursor (timeout = 10 )
184
189
185
190
# ROLLBACK tx after previous exception.
186
191
# Doing this here instead of except handler to stay inside try
@@ -198,17 +203,17 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
198
203
# Give evloop some free time.
199
204
# In case of continuous excetions we can loop here without returning
200
205
# back to event loop and block it
201
- if "Multimaster node is not online" in msg :
202
- yield from asyncio .sleep (1.00 )
203
- else :
204
- yield from asyncio .sleep (0.01 )
206
+ yield from asyncio .sleep (0.5 )
207
+
205
208
except BaseException as e :
206
- print ('Catch exception ' , type (e ))
207
- agg .finish_tx (str (e ).strip ())
209
+ msg = str (e ).strip ()
210
+ agg .finish_tx (msg )
211
+ print ('Caught exception %s, %s, %d, %s' % (type (e ), aggname_prefix , conn_i + 1 , msg ) )
212
+
208
213
# Give evloop some free time.
209
214
# In case of continuous excetions we can loop here without returning
210
215
# back to event loop and block it
211
- yield from asyncio .sleep (0.01 )
216
+ yield from asyncio .sleep (0.5 )
212
217
213
218
print ("We've count to infinity!" )
214
219
@@ -235,10 +240,11 @@ def transfer_tx(self, conn, cur, agg):
235
240
def total_tx (self , conn , cur , agg ):
236
241
yield from cur .execute ('select sum(amount) from bank_test' )
237
242
total = yield from cur .fetchone ()
238
- if total [0 ] != 0 :
243
+ if total [0 ] != self . total :
239
244
agg .isolation += 1
240
- # print(self.oops)s
241
- # print('Isolation error, total = ', total[0])
245
+ self .total = total [0 ]
246
+ print (self .oops )
247
+ print ('Isolation error, total = ' , total [0 ])
242
248
# yield from cur.execute('select * from mtm.get_nodes_state()')
243
249
# nodes_state = yield from cur.fetchall()
244
250
# for i, col in enumerate(self.nodes_state_fields):
0 commit comments