Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4b3a499

Browse files
committed
better errors handling in bank_client
1 parent e604b24 commit 4b3a499

File tree

3 files changed

+84
-122
lines changed

3 files changed

+84
-122
lines changed

tests2/lib/bank_client.py

Lines changed: 39 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -65,106 +65,68 @@ def print_error(self, arg, comment=''):
6565
if self.show_errors:
6666
print('Node', self.node_id, 'got error', arg, comment)
6767

68-
def check_total(self):
69-
conn, cur = self.connect()
70-
i = 0
68+
def exec_tx(self, name, tx_block):
69+
conn = psycopg2.connect(self.connstr)
70+
cur = conn.cursor()
7171

7272
while self.run.value:
73-
i += 1
74-
75-
event_id = self.history.register_start('total')
73+
event_id = self.history.register_start(name)
7674

77-
if not conn.closed:
75+
if conn.closed:
76+
self.history.register_finish(event_id, 'ReConnect')
7877
try :
7978
conn = psycopg2.connect(self.connstr)
8079
cur = conn.cursor()
8180
except :
82-
self.history.register_finish(event_id, 'CantConnect')
83-
next
84-
85-
amount = 1
86-
from_uid = random.randrange(1, self.accounts + 1)
87-
to_uid = random.randrange(1, self.accounts + 1)
81+
continue
82+
else :
83+
continue
8884

8985
try:
90-
cur.execute('select sum(amount) from bank_test')
91-
res = cur.fetchone()
92-
if res[0] != 0:
93-
print("Isolation error, total = %d" % (res[0],))
94-
raise BaseException
95-
except BaseException:
96-
raise BaseException
86+
tx_block(conn, cur)
9787
except psycopg2.InterfaceError:
9888
self.history.register_finish(event_id, 'InterfaceError')
89+
except psycopg2.Error:
90+
self.history.register_finish(event_id, 'PsycopgError')
9991
except :
92+
print(sys.exc_info())
10093
self.history.register_finish(event_id, 'OtherError')
10194
else :
102-
self.history.register_finish(event_id, 'commit')
95+
self.history.register_finish(event_id, 'Commit')
10396

10497
cur.close()
10598
conn.close()
10699

107-
def transfer_money(self):
108-
conn, cur = self.connect()
109-
110-
i = 0
100+
def check_total(self):
111101

112-
while self.run.value:
113-
i += 1
102+
def tx(conn, cur):
103+
cur.execute('select sum(amount) from bank_test')
104+
res = cur.fetchone()
105+
if res[0] != 0:
106+
print("Isolation error, total = %d" % (res[0],))
107+
raise BaseException
114108

115-
event_id = self.history.register_start('transfer')
109+
self.exec_tx('total', tx)
116110

117-
if not conn.closed:
118-
try :
119-
conn = psycopg2.connect(self.connstr)
120-
cur = conn.cursor()
121-
except :
122-
self.history.register_finish(event_id, 'CantConnect')
123-
next
111+
def transfer_money(self):
124112

113+
def tx(conn, cur):
125114
amount = 1
126-
from_uid = random.randrange(1, self.accounts + 1)
127-
to_uid = random.randrange(1, self.accounts + 1)
128-
129-
try:
130-
cur.execute('''update bank_test
131-
set amount = amount - %s
132-
where uid = %s''',
133-
(amount, from_uid))
134-
cur.execute('''update bank_test
135-
set amount = amount + %s
136-
where uid = %s''',
137-
(amount, to_uid))
138-
conn.commit()
139-
140-
except psycopg2.InterfaceError:
141-
self.history.register_finish(event_id, 'InterfaceError')
142-
except :
143-
self.history.register_finish(event_id, 'OtherError')
144-
else :
145-
self.history.register_finish(event_id, 'commit')
146-
147-
cur.close()
148-
conn.close()
149-
150-
def connect(self, reconnect=False):
151-
152-
while True:
153-
try:
154-
conn = psycopg2.connect(self.connstr)
155-
cur = conn.cursor()
156-
return conn, cur
157-
except:
158-
self.print_error(sys.exc_info(),'2')
159-
if not reconnect:
160-
raise
161-
if not self.run.value:
162-
raise
163-
164-
# def watchdog(self):
165-
# while self.run.value:
166-
# time.sleep(1)
167-
# print('watchdog: ', self.history.aggregate())
115+
from_uid = random.randrange(1, self.accounts - 10)
116+
to_uid = from_uid + 1 #random.randrange(1, self.accounts + 1)
117+
118+
conn.commit()
119+
cur.execute('''update bank_test
120+
set amount = amount - %s
121+
where uid = %s''',
122+
(amount, from_uid))
123+
cur.execute('''update bank_test
124+
set amount = amount + %s
125+
where uid = %s''',
126+
(amount, to_uid))
127+
conn.commit()
128+
129+
self.exec_tx('transfer', tx)
168130

169131
def start(self):
170132
self.transfer_process = Process(target=self.transfer_money, args=())
@@ -173,9 +135,6 @@ def start(self):
173135
self.total_process = Process(target=self.check_total, args=())
174136
self.total_process.start()
175137

176-
#self.total_process = Process(target=self.watchdog, args=())
177-
#self.total_process.start()
178-
179138
return
180139

181140
def stop(self):

tests2/lib/event_history.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import time
22
import datetime
33
import uuid
4+
import copy
45
from multiprocessing import Queue
56

67

@@ -12,11 +13,10 @@ def __init__(self):
1213
self.running_events = {}
1314
self.last_aggregation = datetime.datetime.now()
1415
self.agg_template = {
15-
'commit': 0,
16-
'rollback': 0,
1716
'max_latency': 0.0,
1817
'running': 0,
19-
'running_latency': 0.0
18+
'running_latency': 0.0,
19+
'finish': {}
2020
}
2121

2222
def register_start(self, name):
@@ -43,18 +43,21 @@ def load_queue(self):
4343
self.running_events[event['event_id']] = event
4444
else:
4545
# finish mark
46-
if event['event_id'] in self.running_events:
47-
start_ev = self.running_events[event['event_id']]
48-
self.events.append({
49-
'name': start_ev['name'],
50-
'started_at': start_ev['time'],
51-
'finished_at': event['time'],
52-
'status': event['status']
53-
})
54-
self.running_events.pop(event['event_id'], None)
55-
else:
46+
if event['event_id'] not in self.running_events:
5647
# found finish event without corresponding start
48+
print("ololololo!")
5749
raise
50+
51+
start_ev = self.running_events[event['event_id']]
52+
self.events.append({
53+
'name': start_ev['name'],
54+
'started_at': start_ev['time'],
55+
'finished_at': event['time'],
56+
'status': event['status']
57+
})
58+
self.running_events.pop(event['event_id'], None)
59+
60+
#print(self.events)
5861
return
5962

6063
def aggregate(self):
@@ -63,32 +66,38 @@ def aggregate(self):
6366
agg = {}
6467
for ev in self.events:
6568
if ev['finished_at'] < self.last_aggregation:
69+
#print("cont")
6670
continue
6771

6872
if ev['name'] not in agg:
69-
agg[ev['name']] = self.agg_template.copy()
73+
agg[ev['name']] = copy.deepcopy(self.agg_template)
74+
#print('-=-=-', agg)
7075

7176
named_agg = agg[ev['name']]
7277
latency = (ev['finished_at'] - ev['started_at']).total_seconds()
73-
named_agg[ev['status']] += 1
78+
79+
if ev['status'] not in named_agg['finish']:
80+
named_agg['finish'][ev['status']] = 1
81+
else:
82+
named_agg['finish'][ev['status']] += 1
83+
7484
if named_agg['max_latency'] < latency:
7585
named_agg['max_latency'] = latency
7686

7787
for value in self.running_events.itervalues():
88+
7889
if value['name'] not in agg:
79-
agg[value['name']] = self.agg_template.copy()
90+
agg[value['name']] = copy.deepcopy(self.agg_template)
8091

8192
named_agg = agg[value['name']]
8293
latency = (datetime.datetime.now() - value['time']).total_seconds()
83-
if 'started' in named_agg:
84-
named_agg['running'] += 1
85-
if latency > named_agg['running_latency']:
86-
named_agg['running_latency'] = latency
87-
else:
88-
named_agg['running'] = 1
94+
95+
named_agg['running'] += 1
96+
if named_agg['running_latency'] < latency:
8997
named_agg['running_latency'] = latency
9098

9199
self.last_aggregation = datetime.datetime.now()
100+
#print("aggregeted!")
92101
return agg
93102

94103
def aggregate_by(self, period):

tests2/test_recovery.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import subprocess
44
from lib.bank_client import *
55

6-
76
class RecoveryTest(unittest.TestCase):
87
def setUp(self):
98
#subprocess.check_call(['blockade','up'])
@@ -24,41 +23,36 @@ def test_0_normal_operation(self):
2423
print('### normalOpsTest ###')
2524
print('Waiting 5s to check operability')
2625
time.sleep(5)
27-
26+
2827
for client in self.clients:
2928
agg = client.history.aggregate()
3029
print(agg)
31-
self.assertTrue(agg['transfer']['commit'] > 0)
30+
self.assertTrue(agg['transfer']['finish']['Commit'] > 0)
3231

3332
def test_1_node_disconnect(self):
3433
print('### disconnectTest ###')
3534

3635
subprocess.check_call(['blockade','partition','node3'])
3736
print('Node3 disconnected')
3837

39-
print('Waiting 20s to discover failure')
38+
print('Waiting 15s to discover failure')
4039

41-
while True:
40+
for i in range(5):
4241
time.sleep(3)
4342
for client in self.clients:
4443
agg = client.history.aggregate()
4544
print(agg)
45+
print(" ")
46+
47+
subprocess.check_call(['blockade','join'])
4648

47-
# print('Waiting 3s to check operability')
48-
# time.sleep(3)
49-
# for client in self.clients:
50-
# agg = client.history.aggregate()
51-
# print(agg)
52-
#
53-
# subprocess.check_call(['blockade','join'])
54-
# print('Node3 connected back')
55-
#
56-
# print('Waiting 12s for catch-up')
57-
# time.sleep(12)
58-
#
59-
# for client in self.clients:
60-
# agg = client.history.aggregate()
61-
# print(agg)
49+
print('Waiting 15s to join node')
50+
for i in range(1000):
51+
time.sleep(3)
52+
for client in self.clients:
53+
agg = client.history.aggregate()
54+
print(agg)
55+
print(" ")
6256

6357

6458
if __name__ == '__main__':

0 commit comments

Comments
 (0)