Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5571df8

Browse files
committed
brand new reconnection logic
1 parent be1b3f6 commit 5571df8

File tree

3 files changed

+67
-21
lines changed

3 files changed

+67
-21
lines changed

multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3850,7 +3850,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38503850
StartTransactionCommand();
38513851
if (x->status == TRANSACTION_STATUS_ABORTED) {
38523852
FinishPreparedTransaction(x->gid, false);
3853-
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3853+
elog(ERROR, "Transaction aborted by DTM");
38543854
} else {
38553855
FinishPreparedTransaction(x->gid, true);
38563856
}

tests2/lib/bank_client.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import aiopg
55
import random
66
import psycopg2
7+
from psycopg2.extensions import *
78
import time
89
import datetime
910
import copy
1011
import aioprocessing
1112
import multiprocessing
13+
import logging
1214

1315
class MtmTxAggregate(object):
1416

@@ -57,6 +59,7 @@ def keep_trying(tries, delay, method, name, *args, **kwargs):
5759
class MtmClient(object):
5860

5961
def __init__(self, dsns, n_accounts=100000):
62+
# logging.basicConfig(level=logging.DEBUG)
6063
self.n_accounts = n_accounts
6164
self.dsns = dsns
6265
self.aggregates = {}
@@ -129,21 +132,41 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
129132
if conn_i not in self.aggregates:
130133
self.aggregates[conn_i] = {}
131134
agg = self.aggregates[conn_i][aggname_prefix] = MtmTxAggregate(aggname)
132-
pool = yield from aiopg.create_pool(self.dsns[conn_i])
133-
conn = yield from pool.acquire()
134-
cur = yield from conn.cursor()
135+
dsn = self.dsns[conn_i]
136+
137+
conn = cur = False
138+
135139
while self.running:
136140
agg.start_tx()
141+
137142
try:
138-
# yield from cur.execute('commit')
143+
if (not conn) or conn.closed:
144+
# enable_hstore tries to perform select from database
145+
# which in case of select's failure will lead to exception
146+
# and stale connection to the database
147+
conn = yield from aiopg.connect(dsn, enable_hstore=False)
148+
print("reconnected")
149+
150+
if (not cur) or cur.closed:
151+
cur = yield from conn.cursor()
152+
153+
# ROLLBACK tx after previous exception.
154+
# Doing this here instead of except handler to stay inside try
155+
# block.
156+
status = yield from conn.get_transaction_status()
157+
if status != TRANSACTION_STATUS_IDLE:
158+
yield from cur.execute('rollback')
159+
139160
yield from tx_block(conn, cur, agg)
140161
agg.finish_tx('commit')
141-
except psycopg2.OperationalError as e:
142-
if not cur.closed:
143-
yield from cur.execute('rollback')
144-
agg.finish_tx('operational_rollback')
162+
145163
except psycopg2.Error as e:
146-
agg.finish_tx(e.pgerror)
164+
agg.finish_tx(str(e).strip())
165+
# Give evloop some free time.
166+
# In case of continuous excetions we can loop here without returning
167+
# back to event loop and block it
168+
yield from asyncio.sleep(0.01)
169+
147170
print("We've count to infinity!")
148171

149172
@asyncio.coroutine
@@ -199,14 +222,14 @@ def bgrun(self):
199222
self.evloop_process = multiprocessing.Process(target=self.run, args=())
200223
self.evloop_process.start()
201224

202-
def get_aggregates(self, print=True):
225+
def get_aggregates(self, _print=True):
203226
self.parent_pipe.send('status')
204227
resp = self.parent_pipe.recv()
205-
if print:
228+
if _print:
206229
MtmClient.print_aggregates(resp)
207230
return resp
208231

209-
def clean_aggregates(self, print=True):
232+
def clean_aggregates(self):
210233
self.parent_pipe.send('status')
211234
self.parent_pipe.recv()
212235

tests2/test_recovery.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import subprocess
88
import datetime
99
import docker
10+
import warnings
1011

1112
from lib.bank_client import MtmClient
1213
from lib.failure_injector import *
@@ -63,6 +64,7 @@ def setUpClass(self):
6364

6465
# XXX: add normal wait here
6566
time.sleep(20)
67+
print('started')
6668
self.client = MtmClient([
6769
"dbname=regression user=postgres host=127.0.0.1 port=15432",
6870
"dbname=regression user=postgres host=127.0.0.1 port=15433",
@@ -77,8 +79,11 @@ def tearDownClass(self):
7779
# XXX: check nodes data identity here
7880
subprocess.check_call(['docker-compose','down'])
7981

82+
def setUp(self):
83+
warnings.simplefilter("ignore", ResourceWarning)
84+
8085
def test_normal_operations(self):
81-
print('### normal_operations ###')
86+
print('### test_normal_operations ###')
8287

8388
aggs_failure, aggs = self.performFailure(NoFailure())
8489

@@ -90,7 +95,7 @@ def test_normal_operations(self):
9095

9196

9297
def test_node_partition(self):
93-
print('### nodePartitionTest ###')
98+
print('### test_node_partition ###')
9499

95100
aggs_failure, aggs = self.performFailure(SingleNodePartition('node3'))
96101

@@ -103,7 +108,7 @@ def test_node_partition(self):
103108

104109

105110
def test_edge_partition(self):
106-
print('### edgePartitionTest ###')
111+
print('### test_edge_partition ###')
107112

108113
aggs_failure, aggs = self.performFailure(EdgePartition('node2', 'node3'))
109114

@@ -114,13 +119,31 @@ def test_edge_partition(self):
114119
self.assertCommits(aggs)
115120
self.assertIsolation(aggs)
116121

117-
subprocess.check_call(['blockade','join'])
118-
print("Node3 joined back")
122+
def test_node_restart(self):
123+
print('### test_node_restart ###')
124+
125+
time.sleep(3)
126+
127+
aggs_failure, aggs = self.performFailure(RestartNode('node3'))
119128

120-
for i in range(50):
121-
time.sleep(3)
122-
self.clients.print_agg()
129+
self.assertCommits(aggs_failure[:2])
130+
self.assertNoCommits(aggs_failure[2:])
131+
self.assertIsolation(aggs_failure)
132+
133+
self.assertCommits(aggs)
134+
self.assertIsolation(aggs)
123135

136+
def test_node_crash(self):
137+
print('### test_node_crash ###')
138+
139+
aggs_failure, aggs = self.performFailure(CrashRecoverNode('node3'))
140+
141+
self.assertCommits(aggs_failure[:2])
142+
self.assertNoCommits(aggs_failure[2:])
143+
self.assertIsolation(aggs_failure)
144+
145+
self.assertCommits(aggs)
146+
self.assertIsolation(aggs)
124147

125148
if __name__ == '__main__':
126149
unittest.main()

0 commit comments

Comments
 (0)