Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit de66a2b

Browse files
committed
async python client for testing
1 parent 64a5f47 commit de66a2b

File tree

1 file changed

+142
-0
lines changed

1 file changed

+142
-0
lines changed

tests2/client2.py

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/env python3
2+
import asyncio
3+
import uvloop
4+
import aiopg
5+
import random
6+
import psycopg2
7+
import time
8+
import aioprocessing
9+
import multiprocessing
10+
11+
class MtmTxAggregate(object):
12+
13+
def __init__(self, name):
14+
self.name = name
15+
self.clear_values()
16+
17+
def clear_values(self):
18+
self.max_latency = 0.0
19+
self.running_latency = 0.0
20+
self.finish = {}
21+
22+
def add_finish(self, name):
23+
if name not in self.finish:
24+
self.finish[name] = 1
25+
else:
26+
self.finish[name] += 1
27+
28+
class MtmClient(object):
29+
30+
def __init__(self, dsns, n_accounts=100000):
31+
self.n_accounts = n_accounts
32+
self.dsns = dsns
33+
34+
self.aggregates = [MtmTxAggregate('transfer'), MtmTxAggregate('transfer'), MtmTxAggregate('transfer')]
35+
self.initdb()
36+
37+
def initdb(self):
38+
conn = psycopg2.connect(self.dsns[0])
39+
cur = conn.cursor()
40+
cur.execute('create extension if not exists multimaster')
41+
conn.commit()
42+
cur.execute('drop table if exists bank_test')
43+
cur.execute('create table bank_test(uid int primary key, amount int)')
44+
cur.execute('''
45+
insert into bank_test
46+
select *, 0 from generate_series(0, %s)''',
47+
(self.n_accounts,))
48+
conn.commit()
49+
cur.close()
50+
conn.close()
51+
52+
async def status(self):
53+
while True:
54+
msg = await self.child_pipe.coro_recv()
55+
if msg == 'status':
56+
self.child_pipe.send(self.aggregates)
57+
for aggregate in self.aggregates:
58+
aggregate.clear_values()
59+
60+
async def transfer(self, i):
61+
pool = await aiopg.create_pool(self.dsns[i])
62+
async with pool.acquire() as conn:
63+
async with conn.cursor() as cur:
64+
while True:
65+
amount = 1
66+
from_uid = random.randint(1, self.n_accounts - 1)
67+
to_uid = random.randint(1, self.n_accounts - 1)
68+
69+
try:
70+
await cur.execute('begin')
71+
await cur.execute('''update bank_test
72+
set amount = amount - %s
73+
where uid = %s''',
74+
(amount, from_uid))
75+
await cur.execute('''update bank_test
76+
set amount = amount + %s
77+
where uid = %s''',
78+
(amount, to_uid))
79+
await cur.execute('commit')
80+
81+
self.aggregates[i].add_finish('commit')
82+
except psycopg2.Error as e:
83+
await cur.execute('rollback')
84+
self.aggregates[i].add_finish(e.pgerror)
85+
86+
async def total(self, i):
87+
pool = await aiopg.create_pool(self.dsns[i])
88+
async with pool.acquire() as conn:
89+
async with conn.cursor() as cur:
90+
while True:
91+
92+
try:
93+
await cur.execute('select sum(amount) from bank_test')
94+
95+
if 'commit' not in self.tps_vector[i]:
96+
self.tps_vector[i]['commit'] = 1
97+
else:
98+
self.tps_vector[i]['commit'] += 1
99+
except psycopg2.Error as e:
100+
await cur.execute('rollback')
101+
if e.pgerror not in self.tps_vector[i]:
102+
self.tps_vector[i][e.pgerror] = 1
103+
else:
104+
self.tps_vector[i][e.pgerror] += 1
105+
106+
def run(self):
107+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
108+
self.loop = asyncio.get_event_loop()
109+
110+
for i, _ in enumerate(self.dsns):
111+
asyncio.ensure_future(self.transfer(i))
112+
# asyncio.ensure_future(self.total(i))
113+
114+
asyncio.ensure_future(self.status())
115+
116+
self.loop.run_forever()
117+
118+
def bgrun(self):
119+
print('Starting evloop in different process');
120+
121+
self.parent_pipe, self.child_pipe = aioprocessing.AioPipe()
122+
123+
self.evloop_process = multiprocessing.Process(target=self.run, args=())
124+
self.evloop_process.start()
125+
126+
def get_status(self):
127+
c.parent_pipe.send('status')
128+
return c.parent_pipe.recv()
129+
130+
131+
c = MtmClient(['dbname=postgres user=stas host=127.0.0.1',
132+
'dbname=postgres user=stas host=127.0.0.1 port=5433',
133+
'dbname=postgres user=stas host=127.0.0.1 port=5434'], n_accounts=1000)
134+
# c = MtmClient(['dbname=postgres user=stas host=127.0.0.1'])
135+
c.bgrun()
136+
137+
while True:
138+
time.sleep(1)
139+
aggs = c.get_status()
140+
for agg in aggs:
141+
print(agg.finish)
142+

0 commit comments

Comments
 (0)