Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0d842a2

Browse files
committed
insertion tests for referee
1 parent 7bfa7fd commit 0d842a2

File tree

3 files changed

+92
-4
lines changed

3 files changed

+92
-4
lines changed

tests2/lib/bank_client.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import logging
1414
import re
1515
import pprint
16+
import uuid
1617

1718
class MtmTxAggregate(object):
1819

@@ -80,7 +81,6 @@ def __init__(self, dsns, n_accounts=100000):
8081
self.total = 0
8182
self.aggregates = [{} for e in dsns]
8283
keep_trying(40, 1, self.initdb, 'self.initdb')
83-
self.running = True
8484
self.nodes_state_fields = ["id", "disabled", "disconnected", "catchUp", "slotLag",
8585
"avgTransDelay", "lastStatusChange", "oldestSnapshot", "SenderPid",
8686
"SenderStartTime ", "ReceiverPid", "ReceiverStartTime", "connStr"]
@@ -117,6 +117,7 @@ def initdb(self):
117117
conn.commit()
118118
cur.execute('drop table if exists bank_test')
119119
cur.execute('create table bank_test(uid int primary key, amount int)')
120+
cur.execute('create table insert_test(id text primary key)')
120121
cur.execute('''
121122
insert into bank_test
122123
select *, 0 from generate_series(0, %s)''',
@@ -166,6 +167,19 @@ def no_prepared_tx(self):
166167
print("n_prepared = %d" % (n_prepared))
167168
return (n_prepared)
168169

170+
def insert_counts(self):
171+
counts = []
172+
173+
for dsn in self.dsns:
174+
con = psycopg2.connect(dsn)
175+
cur = con.cursor()
176+
cur.execute("select count(*) from insert_test;")
177+
counts.append(int(cur.fetchone()[0]))
178+
cur.close()
179+
con.close()
180+
181+
return counts
182+
169183
@asyncio.coroutine
170184
def status(self):
171185
while self.running:
@@ -267,6 +281,11 @@ def transfer_tx(self, conn, cur, agg, conn_i):
267281
assert(cur.rowcount == 1)
268282
yield from cur.execute('commit')
269283

284+
@asyncio.coroutine
285+
def insert_tx(self, conn, cur, agg, conn_i):
286+
query = "insert into insert_test values ('%s')" % (uuid.uuid4())
287+
yield from cur.execute(query)
288+
270289
@asyncio.coroutine
271290
def total_tx(self, conn, cur, agg, conn_i):
272291
yield from cur.execute("select sum(amount), count(*), count(uid), current_setting('multimaster.node_id') from bank_test")
@@ -289,16 +308,21 @@ def run(self):
289308
self.loop = asyncio.get_event_loop()
290309

291310
for i, _ in enumerate(self.dsns):
292-
for j in range(3):
311+
for j in range(1):
293312
asyncio.ensure_future(self.exec_tx(self.transfer_tx, i, 'transfer', j))
294313
asyncio.ensure_future(self.exec_tx(self.total_tx, i, 'sumtotal', 0))
314+
for j in range(2):
315+
asyncio.ensure_future(self.exec_tx(self.insert_tx, i, 'inserter', j))
295316

296317
asyncio.ensure_future(self.status())
297318

298319
self.loop.run_forever()
299320

300321
def bgrun(self):
301322
print('Starting evloop in different process')
323+
324+
self.running = True
325+
302326
self.parent_pipe, self.child_pipe = aioprocessing.AioPipe()
303327
self.evloop_process = multiprocessing.Process(target=self.run, args=())
304328
self.evloop_process.start()

tests2/lib/test_helper.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,33 @@ def awaitCommit(self, node_id):
4040

4141
while total_sleep <= TEST_MAX_RECOVERY_TIME:
4242
aggs = self.client.get_aggregates(clean=False, _print=False)
43-
# print('=== ',aggs[node_id]['transfer']['finish'])
43+
print('=== ',aggs[node_id]['transfer']['finish'])
4444
if ('commit' in aggs[node_id]['transfer']['finish'] and
4545
aggs[node_id]['transfer']['finish']['commit'] > 10):
4646
break
4747
time.sleep(5)
4848
total_sleep += 5
4949

50+
def awaitOnline(self, dsn):
51+
total_sleep = 0
52+
one = 0
5053

51-
def performFailure(self, failure, wait=0, node_wait_for_commit=-1):
54+
while total_sleep <= TEST_MAX_RECOVERY_TIME:
55+
try:
56+
con = psycopg2.connect(dsn + " connect_timeout=1")
57+
cur = con.cursor()
58+
cur.execute("select 1")
59+
one = int(cur.fetchone()[0])
60+
cur.close()
61+
con.close()
62+
print("Online!")
63+
break
64+
except Exception as e:
65+
print('Waiting for online:', str(e))
66+
time.sleep(5)
67+
total_sleep += 5
68+
69+
def performFailure(self, failure, wait=0, node_wait_for_commit=-1, node_wait_for_online=None, stop_load=False):
5270

5371
time.sleep(TEST_WARMING_TIME)
5472

@@ -72,12 +90,27 @@ def performFailure(self, failure, wait=0, node_wait_for_commit=-1):
7290

7391
self.client.clean_aggregates()
7492

93+
if stop_load:
94+
time.sleep(3)
95+
self.client.get_aggregates(clean=False)
96+
self.client.stop()
97+
7598
if node_wait_for_commit >= 0:
7699
self.awaitCommit(node_wait_for_commit)
77100
else:
78101
time.sleep(TEST_RECOVERY_TIME)
79102

103+
if node_wait_for_online != None:
104+
self.awaitOnline(node_wait_for_online)
105+
else:
106+
time.sleep(TEST_RECOVERY_TIME)
107+
108+
if stop_load:
109+
self.client.bgrun()
110+
time.sleep(3)
111+
80112
aggs = self.client.get_aggregates()
113+
81114
return (aggs_failure, aggs)
82115

83116
def nodeExecute(dsn, statements):

tests2/test_referee.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,22 @@ def setUp(self):
5656
def tearDown(self):
5757
print('Finish test at ',datetime.datetime.utcnow())
5858

59+
def test_neighbor_restart(self):
60+
print('### test_neighbor_restart ###')
61+
62+
aggs_failure, aggs = self.performFailure(RestartNode('node2'), node_wait_for_online="dbname=regression user=postgres host=127.0.0.1 port=15433", stop_load=True)
63+
64+
self.assertCommits(aggs_failure[:1])
65+
self.assertNoCommits(aggs_failure[1:])
66+
self.assertIsolation(aggs_failure)
67+
68+
self.assertCommits(aggs)
69+
self.assertIsolation(aggs)
70+
71+
counts = self.client.insert_counts()
72+
self.assertEqual(counts[0], counts[1])
73+
74+
5975
def test_node_crash(self):
6076
print('### test_node_crash ###')
6177

@@ -68,6 +84,9 @@ def test_node_crash(self):
6884
self.assertCommits(aggs)
6985
self.assertIsolation(aggs)
7086

87+
counts = self.client.insert_counts()
88+
self.assertEqual(counts[0], counts[1])
89+
7190

7291
def test_partition_referee(self):
7392
print('### test_partition_referee ###')
@@ -81,6 +100,9 @@ def test_partition_referee(self):
81100
self.assertCommits(aggs)
82101
self.assertIsolation(aggs)
83102

103+
counts = self.client.insert_counts()
104+
self.assertEqual(counts[0], counts[1])
105+
84106
def test_double_failure_referee(self):
85107
print('### test_double_failure_referee ###')
86108

@@ -102,6 +124,9 @@ def test_double_failure_referee(self):
102124
self.assertCommits(aggs)
103125
self.assertIsolation(aggs)
104126

127+
counts = self.client.insert_counts()
128+
self.assertEqual(counts[0], counts[1])
129+
105130
def test_saved_referee_decision(self):
106131
print('### test_saved_referee_decision ###')
107132
docker_api = docker.from_env()
@@ -173,6 +198,9 @@ def test_saved_referee_decision(self):
173198

174199
self.assertEqual(decisions_count, 0)
175200

201+
counts = self.client.insert_counts()
202+
self.assertEqual(counts[0], counts[1])
203+
176204
def test_winner_restart(self):
177205
print('### test_winner_restart ###')
178206

@@ -200,6 +228,9 @@ def test_winner_restart(self):
200228
docker_api.containers.get('node1').start()
201229
self.awaitCommit(0)
202230

231+
counts = self.client.insert_counts()
232+
self.assertEqual(counts[0], counts[1])
233+
203234

204235
if __name__ == '__main__':
205236
unittest.main()

0 commit comments

Comments
 (0)