5
5
import time
6
6
import sys
7
7
from event_history import *
8
+ import select
9
+ import signal
8
10
9
11
class ClientCollection (object ):
10
12
def __init__ (self , connstrs ):
@@ -59,17 +61,29 @@ def print_agg(self):
59
61
60
62
print ("" )
61
63
64
+ def set_acc_to_tx (self , max_acc ):
65
+ for client in self ._clients :
66
+ client .set_acc_to_tx (max_acc )
67
+
62
68
63
69
class BankClient (object ):
64
70
65
- def __init__ (self , connstr , node_id ):
71
+ def __init__ (self , connstr , node_id , accounts = 10000 ):
66
72
self .connstr = connstr
67
73
self .node_id = node_id
68
74
self .run = Value ('b' , True )
69
75
self ._history = EventHistory ()
70
- self .accounts = 10000
76
+ self .accounts = accounts
77
+ self .accounts_to_tx = accounts
71
78
self .show_errors = True
72
79
80
+ #x = self
81
+ #def on_sigint(sig, frame):
82
+ # x.stop()
83
+ #
84
+ #signal.signal(signal.SIGINT, on_sigint)
85
+
86
+
73
87
def initialize (self ):
74
88
conn = psycopg2 .connect (self .connstr )
75
89
cur = conn .cursor ()
@@ -86,6 +100,22 @@ def initialize(self):
86
100
cur .close ()
87
101
conn .close ()
88
102
103
+ def aconn (self ):
104
+ return psycopg2 .connect (self .connstr , async = 1 )
105
+
106
+ @classmethod
107
+ def wait (cls , conn ):
108
+ while 1 :
109
+ state = conn .poll ()
110
+ if state == psycopg2 .extensions .POLL_OK :
111
+ break
112
+ elif state == psycopg2 .extensions .POLL_WRITE :
113
+ select .select ([], [conn .fileno ()], [])
114
+ elif state == psycopg2 .extensions .POLL_READ :
115
+ select .select ([conn .fileno ()], [], [])
116
+ else :
117
+ raise psycopg2 .OperationalError ("poll() returned %s" % state )
118
+
89
119
@property
90
120
def history (self ):
91
121
return self ._history
@@ -103,25 +133,16 @@ def exec_tx(self, name, tx_block):
103
133
104
134
if conn .closed :
105
135
self .history .register_finish (event_id , 'ReConnect' )
106
- try :
107
- conn = psycopg2 .connect (self .connstr )
108
- cur = conn .cursor ()
109
- except :
110
- continue
111
- else :
112
- continue
136
+ conn = psycopg2 .connect (self .connstr )
137
+ cur = conn .cursor ()
113
138
114
139
try :
115
- tx_block (conn , cur )
140
+ tx_block (conn , cur )
141
+ self .history .register_finish (event_id , 'Commit' )
116
142
except psycopg2 .InterfaceError :
117
143
self .history .register_finish (event_id , 'InterfaceError' )
118
144
except psycopg2 .Error :
119
145
self .history .register_finish (event_id , 'PsycopgError' )
120
- except :
121
- print (sys .exc_info ())
122
- self .history .register_finish (event_id , 'OtherError' )
123
- else :
124
- self .history .register_finish (event_id , 'Commit' )
125
146
126
147
cur .close ()
127
148
conn .close ()
@@ -133,17 +154,20 @@ def tx(conn, cur):
133
154
res = cur .fetchone ()
134
155
conn .commit ()
135
156
if res [0 ] != 0 :
136
- print ("Isolation error, total = %d" % (res [0 ],))
157
+ print ("Isolation error, total = %d, node = %d " % (res [0 ],self . node_id ))
137
158
raise BaseException
138
159
139
160
self .exec_tx ('total' , tx )
140
161
162
+ def set_acc_to_tx (self , max_acc ):
163
+ self .accounts_to_tx = max_acc
164
+
141
165
def transfer_money (self ):
142
166
143
167
def tx (conn , cur ):
144
168
amount = 1
145
- from_uid = random .randrange (1 , self .accounts - 10 )
146
- to_uid = from_uid + 1 # random.randrange(1, self.accounts + 1)
169
+ from_uid = random .randrange (1 , self .accounts_to_tx - 1 )
170
+ to_uid = random .randrange (1 , self .accounts_to_tx - 1 )
147
171
148
172
conn .commit ()
149
173
cur .execute ('''update bank_test
@@ -159,7 +183,10 @@ def tx(conn, cur):
159
183
self .exec_tx ('transfer' , tx )
160
184
161
185
def start (self ):
162
- self .transfer_process = Process (target = self .transfer_money , args = ())
186
+ print ('Starting client' );
187
+ self .run .value = True
188
+
189
+ self .transfer_process = Process (target = self .transfer_money , name = "txor" , args = ())
163
190
self .transfer_process .start ()
164
191
165
192
self .total_process = Process (target = self .check_total , args = ())
@@ -168,7 +195,7 @@ def start(self):
168
195
return
169
196
170
197
def stop (self ):
171
- print ('Stopping! ' );
198
+ print ('Stopping client ' );
172
199
self .run .value = False
173
200
self .total_process .terminate ()
174
201
self .transfer_process .terminate ()
0 commit comments