5
5
import time
6
6
import sys
7
7
from event_history import *
8
+ import select
9
+ import signal
8
10
9
11
class ClientCollection (object ):
10
12
def __init__ (self , connstrs ):
@@ -38,8 +40,6 @@ def print_agg(self):
38
40
39
41
columns = ['running' , 'running_latency' , 'max_latency' , 'finish' ]
40
42
41
- #rows = [ k+str(i+1) for k in agg.keys() for i, agg in enumerate(aggs)]
42
-
43
43
print ("\t \t " , end = "" )
44
44
for col in columns :
45
45
print (col , end = "\t " )
@@ -61,17 +61,29 @@ def print_agg(self):
61
61
62
62
print ("" )
63
63
64
+ def set_acc_to_tx (self , max_acc ):
65
+ for client in self ._clients :
66
+ client .set_acc_to_tx (max_acc )
67
+
64
68
65
69
class BankClient (object ):
66
70
67
- def __init__ (self , connstr , node_id ):
71
+ def __init__ (self , connstr , node_id , accounts = 10000 ):
68
72
self .connstr = connstr
69
73
self .node_id = node_id
70
74
self .run = Value ('b' , True )
71
75
self ._history = EventHistory ()
72
- self .accounts = 10000
76
+ self .accounts = accounts
77
+ self .accounts_to_tx = accounts
73
78
self .show_errors = True
74
79
80
+ #x = self
81
+ #def on_sigint(sig, frame):
82
+ # x.stop()
83
+ #
84
+ #signal.signal(signal.SIGINT, on_sigint)
85
+
86
+
75
87
def initialize (self ):
76
88
conn = psycopg2 .connect (self .connstr )
77
89
cur = conn .cursor ()
@@ -88,6 +100,22 @@ def initialize(self):
88
100
cur .close ()
89
101
conn .close ()
90
102
103
+ def aconn (self ):
104
+ return psycopg2 .connect (self .connstr , async = 1 )
105
+
106
+ @classmethod
107
+ def wait (cls , conn ):
108
+ while 1 :
109
+ state = conn .poll ()
110
+ if state == psycopg2 .extensions .POLL_OK :
111
+ break
112
+ elif state == psycopg2 .extensions .POLL_WRITE :
113
+ select .select ([], [conn .fileno ()], [])
114
+ elif state == psycopg2 .extensions .POLL_READ :
115
+ select .select ([conn .fileno ()], [], [])
116
+ else :
117
+ raise psycopg2 .OperationalError ("poll() returned %s" % state )
118
+
91
119
@property
92
120
def history (self ):
93
121
return self ._history
@@ -105,25 +133,16 @@ def exec_tx(self, name, tx_block):
105
133
106
134
if conn .closed :
107
135
self .history .register_finish (event_id , 'ReConnect' )
108
- try :
109
- conn = psycopg2 .connect (self .connstr )
110
- cur = conn .cursor ()
111
- except :
112
- continue
113
- else :
114
- continue
136
+ conn = psycopg2 .connect (self .connstr )
137
+ cur = conn .cursor ()
115
138
116
139
try :
117
- tx_block (conn , cur )
140
+ tx_block (conn , cur )
141
+ self .history .register_finish (event_id , 'Commit' )
118
142
except psycopg2 .InterfaceError :
119
143
self .history .register_finish (event_id , 'InterfaceError' )
120
144
except psycopg2 .Error :
121
145
self .history .register_finish (event_id , 'PsycopgError' )
122
- except :
123
- print (sys .exc_info ())
124
- self .history .register_finish (event_id , 'OtherError' )
125
- else :
126
- self .history .register_finish (event_id , 'Commit' )
127
146
128
147
cur .close ()
129
148
conn .close ()
@@ -135,17 +154,20 @@ def tx(conn, cur):
135
154
res = cur .fetchone ()
136
155
conn .commit ()
137
156
if res [0 ] != 0 :
138
- print ("Isolation error, total = %d" % (res [0 ],))
157
+ print ("Isolation error, total = %d, node = %d " % (res [0 ],self . node_id ))
139
158
raise BaseException
140
159
141
160
self .exec_tx ('total' , tx )
142
161
162
+ def set_acc_to_tx (self , max_acc ):
163
+ self .accounts_to_tx = max_acc
164
+
143
165
def transfer_money (self ):
144
166
145
167
def tx (conn , cur ):
146
168
amount = 1
147
- from_uid = random .randrange (1 , self .accounts - 10 )
148
- to_uid = from_uid + 1 # random.randrange(1, self.accounts + 1)
169
+ from_uid = random .randrange (1 , self .accounts_to_tx - 1 )
170
+ to_uid = random .randrange (1 , self .accounts_to_tx - 1 )
149
171
150
172
conn .commit ()
151
173
cur .execute ('''update bank_test
@@ -161,7 +183,10 @@ def tx(conn, cur):
161
183
self .exec_tx ('transfer' , tx )
162
184
163
185
def start (self ):
164
- self .transfer_process = Process (target = self .transfer_money , args = ())
186
+ print ('Starting client' );
187
+ self .run .value = True
188
+
189
+ self .transfer_process = Process (target = self .transfer_money , name = "txor" , args = ())
165
190
self .transfer_process .start ()
166
191
167
192
self .total_process = Process (target = self .check_total , args = ())
@@ -170,7 +195,7 @@ def start(self):
170
195
return
171
196
172
197
def stop (self ):
173
- print ('Stopping! ' );
198
+ print ('Stopping client ' );
174
199
self .run .value = False
175
200
self .total_process .terminate ()
176
201
self .transfer_process .terminate ()
0 commit comments