@@ -9,8 +9,8 @@ class ClientCollection(object):
9
9
def __init__ (self , connstrs ):
10
10
self ._clients = []
11
11
12
- for cs in connstrs :
13
- b = BankClient (cs )
12
+ for i , cs in enumerate ( connstrs ) :
13
+ b = BankClient (cs , i )
14
14
self ._clients .append (b )
15
15
16
16
self ._clients [0 ].initialize ()
@@ -27,17 +27,21 @@ def start(self):
27
27
client .start ()
28
28
29
29
def stop (self ):
30
+ print ('collection stop called' , self ._clients )
30
31
for client in self ._clients :
32
+ print ('stop coll' )
31
33
client .stop ()
32
34
33
35
34
36
class BankClient (object ):
35
37
36
- def __init__ (self , connstr ):
38
+ def __init__ (self , connstr , node_id ):
37
39
self .connstr = connstr
40
+ self .node_id = node_id
38
41
self .run = Value ('b' , True )
39
42
self ._history = EventHistory ()
40
43
self .accounts = 10000
44
+ self .show_errors = True
41
45
42
46
def initialize (self ):
43
47
conn = psycopg2 .connect (self .connstr )
@@ -59,6 +63,10 @@ def initialize(self):
59
63
def history (self ):
60
64
return self ._history
61
65
66
+ def print_error (self , arg , comment = '' ):
67
+ if self .show_errors :
68
+ print ('Node' , self .node_id , 'got error' , arg , comment )
69
+
62
70
def check_total (self ):
63
71
conn , cur = self .connect ()
64
72
@@ -72,16 +80,13 @@ def check_total(self):
72
80
print ("Isolation error, total = %d" % (res [0 ],))
73
81
raise BaseException
74
82
except psycopg2 .InterfaceError :
75
- print ("Got error: " , sys .exc_info ())
76
- print ("Reconnecting" )
77
83
conn , cur = self .connect (reconnect = True )
78
84
except :
79
- print ( "Got error: " , sys .exc_info ())
85
+ self . print_error ( sys .exc_info (), '3' )
80
86
self .history .register_finish (event_id , 'rollback' )
81
87
else :
82
88
self .history .register_finish (event_id , 'commit' )
83
89
84
-
85
90
cur .close ()
86
91
conn .close ()
87
92
@@ -110,14 +115,10 @@ def transfer_money(self):
110
115
(amount , to_uid ))
111
116
112
117
conn .commit ()
113
- except psycopg2 .DatabaseError :
114
- print ("Got error: " , sys .exc_info ())
115
- print ("Reconnecting" )
116
-
117
- self .history .register_finish (event_id , 'rollback' )
118
+ except psycopg2 .InterfaceError :
118
119
conn , cur = self .connect (reconnect = True )
119
120
except :
120
- print ( "Got error: " , sys .exc_info ())
121
+ self . print_error ( sys .exc_info (), '1' )
121
122
self .history .register_finish (event_id , 'rollback' )
122
123
else :
123
124
self .history .register_finish (event_id , 'commit' )
@@ -127,16 +128,17 @@ def transfer_money(self):
127
128
128
129
def connect (self , reconnect = False ):
129
130
130
- while self . run . value :
131
+ while True :
131
132
try :
132
133
conn = psycopg2 .connect (self .connstr )
133
134
cur = conn .cursor ()
135
+ return conn , cur
134
136
except :
135
- print ( "Got error: " , sys .exc_info ())
137
+ self . print_error ( sys .exc_info (), '2' )
136
138
if not reconnect :
137
139
raise
138
- else :
139
- return conn , cur
140
+ if not self . run . value :
141
+ raise
140
142
141
143
# def watchdog(self):
142
144
# while self.run.value:
@@ -156,7 +158,10 @@ def start(self):
156
158
return
157
159
158
160
def stop (self ):
161
+ print ('Stopping!' );
159
162
self .run .value = False
163
+ self .total_process .join ()
164
+ self .transfer_process .join ()
160
165
return
161
166
162
167
def cleanup (self ):
0 commit comments