Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d37213c

Browse files
committed
fix events aggregation bugs #6
1 parent f40b220 commit d37213c

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

tests2/lib/bank_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ def check_total(self):
6262
conn = psycopg2.connect(self.connstr)
6363
cur = conn.cursor();
6464
while self.run.value:
65+
event_id = self.history.register_start('total')
6566
cur.execute('select sum(amount) from bank_test')
67+
self.history.register_finish(event_id, 'commit')
68+
6669
res = cur.fetchone()
6770
if res[0] != 0:
6871
print("Isolation error, total = %d" % (res[0],))
@@ -83,7 +86,7 @@ def transfer_money(self):
8386
from_uid = random.randrange(1, self.accounts + 1)
8487
to_uid = random.randrange(1, self.accounts + 1)
8588

86-
event_id = self.history.register_start('tx')
89+
event_id = self.history.register_start('transfer')
8790

8891
cur.execute('''update bank_test
8992
set amount = amount - %s
@@ -119,7 +122,6 @@ def start(self):
119122
self.total_process = Process(target=self.watchdog, args=())
120123
self.total_process.start()
121124

122-
123125
return
124126

125127
def stop(self):

tests2/lib/event_history.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ def __init__(self):
1010
self.queue = Queue()
1111
self.events = []
1212
self.running_events = {}
13+
self.last_aggregation = datetime.datetime.now()
14+
self.agg_template = {
15+
'commit': 0,
16+
'rollback': 0,
17+
'max_latency': 0.0,
18+
'running': 0,
19+
'running_latency': 0.0
20+
}
1321

1422
def register_start(self, name):
1523
event_id = uuid.uuid4()
@@ -54,22 +62,24 @@ def aggregate(self):
5462

5563
agg = {}
5664
for ev in self.events:
57-
if ev['name'] in agg:
58-
named_agg = agg[ev['name']]
59-
latency = (ev['finished_at'] - ev['started_at']).total_seconds()
60-
if ev['status'] in named_agg:
61-
named_agg[ev['status']] += 1
62-
if named_agg['max_latency'] < latency:
63-
named_agg['max_latency'] = latency
64-
else:
65-
named_agg[ev['status']] = 0
66-
named_agg['max_latency'] = latency
67-
else:
68-
agg[ev['name']] = {}
65+
if ev['finished_at'] < self.last_aggregation:
66+
continue
67+
68+
if ev['name'] not in agg:
69+
agg[ev['name']] = self.agg_template.copy()
70+
71+
named_agg = agg[ev['name']]
72+
latency = (ev['finished_at'] - ev['started_at']).total_seconds()
73+
named_agg[ev['status']] += 1
74+
if named_agg['max_latency'] < latency:
75+
named_agg['max_latency'] = latency
6976

7077
for value in self.running_events.itervalues():
78+
if value['name'] not in agg:
79+
agg[value['name']] = self.agg_template.copy()
80+
7181
named_agg = agg[value['name']]
72-
latency = (datetime.datetime.now() - ev['started_at']).total_seconds()
82+
latency = (datetime.datetime.now() - value['time']).total_seconds()
7383
if 'started' in named_agg:
7484
named_agg['running'] += 1
7585
if latency > named_agg['running_latency']:
@@ -78,6 +88,7 @@ def aggregate(self):
7888
named_agg['running'] = 1
7989
named_agg['running_latency'] = latency
8090

91+
self.last_aggregation = datetime.datetime.now()
8192
return agg
8293

8394
def aggregate_by(self, period):

0 commit comments

Comments
 (0)