Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 54849ca

Browse files
committed
wip. handle exceptions. #6
1 parent d37213c commit 54849ca

File tree

2 files changed

+85
-38
lines changed

2 files changed

+85
-38
lines changed

tests2/lib/bank_client.py

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import random
33
from multiprocessing import Process, Value, Queue
44
import time
5+
import sys
56
from event_history import *
67

78
class ClientCollection(object):
@@ -59,26 +60,36 @@ def history(self):
5960
return self._history
6061

6162
def check_total(self):
62-
conn = psycopg2.connect(self.connstr)
63-
cur = conn.cursor();
63+
conn, cur = self.connect()
64+
6465
while self.run.value:
6566
event_id = self.history.register_start('total')
66-
cur.execute('select sum(amount) from bank_test')
67-
self.history.register_finish(event_id, 'commit')
6867

69-
res = cur.fetchone()
70-
if res[0] != 0:
71-
print("Isolation error, total = %d" % (res[0],))
72-
raise BaseException
68+
try:
69+
cur.execute('select sum(amount) from bank_test')
70+
res = cur.fetchone()
71+
if res[0] != 0:
72+
print("Isolation error, total = %d" % (res[0],))
73+
raise BaseException
74+
except psycopg2.InterfaceError:
75+
print("Got error: ", sys.exc_info())
76+
print("Reconnecting")
77+
conn, cur = self.connect(reconnect=True)
78+
except:
79+
print("Got error: ", sys.exc_info())
80+
self.history.register_finish(event_id, 'rollback')
81+
else:
82+
self.history.register_finish(event_id, 'commit')
83+
7384

7485
cur.close()
7586
conn.close()
7687

7788
def transfer_money(self):
78-
print(self.connstr)
79-
conn = psycopg2.connect(self.connstr)
80-
cur = conn.cursor()
81-
89+
#conn = psycopg2.connect(self.connstr)
90+
#cur = conn.cursor()
91+
conn, cur = self.connect()
92+
8293
i = 0
8394
while self.run.value:
8495
i += 1
@@ -88,39 +99,59 @@ def transfer_money(self):
8899

89100
event_id = self.history.register_start('transfer')
90101

91-
cur.execute('''update bank_test
102+
try:
103+
cur.execute('''update bank_test
92104
set amount = amount - %s
93105
where uid = %s''',
94106
(amount, from_uid))
95-
cur.execute('''update bank_test
107+
cur.execute('''update bank_test
96108
set amount = amount + %s
97109
where uid = %s''',
98110
(amount, to_uid))
99111

100-
try:
101112
conn.commit()
113+
except psycopg2.DatabaseError:
114+
print("Got error: ", sys.exc_info())
115+
print("Reconnecting")
116+
117+
self.history.register_finish(event_id, 'rollback')
118+
conn, cur = self.connect(reconnect=True)
102119
except:
120+
print("Got error: ", sys.exc_info())
103121
self.history.register_finish(event_id, 'rollback')
104122
else:
105123
self.history.register_finish(event_id, 'commit')
106-
124+
107125
cur.close()
108126
conn.close()
109127

110-
def watchdog(self):
128+
def connect(self, reconnect=False):
129+
111130
while self.run.value:
112-
time.sleep(1)
113-
print('watchdog: ', self.history.aggregate())
131+
try:
132+
conn = psycopg2.connect(self.connstr)
133+
cur = conn.cursor()
134+
except:
135+
print("Got error: ", sys.exc_info())
136+
if not reconnect:
137+
raise
138+
else:
139+
return conn, cur
140+
141+
# def watchdog(self):
142+
# while self.run.value:
143+
# time.sleep(1)
144+
# print('watchdog: ', self.history.aggregate())
114145

115146
def start(self):
116147
self.transfer_process = Process(target=self.transfer_money, args=())
117148
self.transfer_process.start()
118-
149+
119150
self.total_process = Process(target=self.check_total, args=())
120151
self.total_process.start()
121152

122-
self.total_process = Process(target=self.watchdog, args=())
123-
self.total_process.start()
153+
#self.total_process = Process(target=self.watchdog, args=())
154+
#self.total_process.start()
124155

125156
return
126157

@@ -136,5 +167,3 @@ def cleanup(self):
136167
cur.close()
137168
conn.close()
138169

139-
140-

tests2/test_recovery.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,44 @@ def tearDown(self):
1919
self.clients[0].cleanup()
2020
subprocess.check_call(['blockade','join'])
2121

22-
# def test_0_normal_operation(self):
23-
# print('normalOpsTest')
24-
# time.sleep(5)
25-
#
26-
# for client in self.clients:
27-
# agg = client.history.aggregate()
28-
# print(agg)
29-
# self.assertTrue(agg['tx']['commit'] > 0)
22+
def test_0_normal_operation(self):
23+
print('### normalOpsTest ###')
24+
print('Waiting 5s to check operability')
25+
time.sleep(5)
26+
27+
for client in self.clients:
28+
agg = client.history.aggregate()
29+
print(agg)
30+
self.assertTrue(agg['transfer']['commit'] > 0)
3031

3132
def test_1_node_disconnect(self):
32-
print('disconnectTest')
33-
time.sleep(3)
33+
print('### disconnectTest ###')
3434

3535
subprocess.check_call(['blockade','partition','node3'])
36-
print('---node3 out---')
37-
time.sleep(15)
36+
print('Node3 disconnected')
37+
38+
print('Waiting 12s to discover failure')
39+
time.sleep(12)
40+
for client in self.clients:
41+
agg = client.history.aggregate()
42+
print(agg)
43+
44+
print('Waiting 3s to check operability')
45+
time.sleep(3)
46+
for client in self.clients:
47+
agg = client.history.aggregate()
48+
print(agg)
3849

3950
subprocess.check_call(['blockade','join'])
40-
print('---node2 and node3 are back---')
41-
time.sleep(5)
51+
print('Node3 connected back')
52+
53+
print('Waiting 12s for catch-up')
54+
time.sleep(12)
55+
56+
for client in self.clients:
57+
agg = client.history.aggregate()
58+
print(agg)
59+
4260

4361
if __name__ == '__main__':
4462
unittest.main()

0 commit comments

Comments
 (0)