@@ -171,19 +171,12 @@ def stopped(self):
171
171
return self .stop_event .isSet ()
172
172
173
173
174
- def log_watch ( node_name , pg_logname ):
174
+ class IsolationLevel ( Enum ):
175
175
"""
176
- Starts thread for node that redirects
177
- postgresql logs to python logging system
176
+ Transaction isolation level for NodeConnection
178
177
"""
179
178
180
- reader = TestgresLogger (node_name , open (pg_logname , 'r' ))
181
- reader .start ()
182
-
183
- global util_threads
184
- util_threads .append (reader )
185
-
186
- return reader
179
+ ReadUncommitted , ReadCommitted , RepeatableRead , Serializable = range (4 )
187
180
188
181
189
182
class NodeConnection (object ):
@@ -218,7 +211,7 @@ def __enter__(self):
218
211
def __exit__ (self , type , value , traceback ):
219
212
self .close ()
220
213
221
- def begin (self , isolation_level = 0 ):
214
+ def begin (self , isolation_level = IsolationLevel . ReadCommitted ):
222
215
# yapf: disable
223
216
levels = [
224
217
'read uncommitted' ,
@@ -227,40 +220,51 @@ def begin(self, isolation_level=0):
227
220
'serializable'
228
221
]
229
222
230
- # Check if level is int [0..3]
231
- if (isinstance (isolation_level , int ) and
232
- isolation_level in range (0 , 4 )):
223
+ # Check if level is an IsolationLevel
224
+ if (isinstance (isolation_level , IsolationLevel )):
233
225
234
- # Replace index with isolation level type
235
- isolation_level = levels [isolation_level ]
226
+ # Get index of isolation level
227
+ level_idx = isolation_level .value
228
+ assert (level_idx in range (4 ))
236
229
237
- # Or it might be a string
238
- elif (isinstance (isolation_level , six .text_type ) and
239
- isolation_level .lower () in levels ):
230
+ # Replace isolation level with its name
231
+ isolation_level = levels [level_idx ]
240
232
241
- # Nothing to do here
242
- pass
243
-
244
- # Something is wrong, emit exception
245
233
else :
246
- raise QueryException (
247
- 'Invalid isolation level "{}"' .format (isolation_level ))
234
+ # Get name of isolation level
235
+ level_str = str (isolation_level ).lower ()
236
+
237
+ # Validate level string
238
+ if level_str not in levels :
239
+ error = 'Invalid isolation level "{}"'
240
+ raise QueryException (error .format (level_str ))
241
+
242
+ # Replace isolation level with its name
243
+ isolation_level = level_str
248
244
249
- self .cursor .execute (
250
- 'SET TRANSACTION ISOLATION LEVEL {}' .format (isolation_level ))
245
+ # Set isolation level
246
+ cmd = 'SET TRANSACTION ISOLATION LEVEL {}'
247
+ self .cursor .execute (cmd .format (isolation_level ))
248
+
249
+ return self
251
250
252
251
def commit (self ):
253
252
self .connection .commit ()
254
253
254
+ return self
255
+
255
256
def rollback (self ):
256
257
self .connection .rollback ()
257
258
259
+ return self
260
+
258
261
def execute (self , query , * args ):
259
262
self .cursor .execute (query , args )
260
263
261
264
try :
262
265
res = self .cursor .fetchall ()
263
266
267
+ # pg8000 might return tuples
264
268
if isinstance (res , tuple ):
265
269
res = [tuple (t ) for t in res ]
266
270
@@ -1311,3 +1315,18 @@ def configure_testgres(**options):
1311
1315
1312
1316
for key , option in options .items ():
1313
1317
setattr (TestgresConfig , key , option )
1318
+
1319
+
1320
+ def log_watch (node_name , pg_logname ):
1321
+ """
1322
+ Start thread for node that redirects
1323
+ PostgreSQL logs to python logging system.
1324
+ """
1325
+
1326
+ reader = TestgresLogger (node_name , open (pg_logname , 'r' ))
1327
+ reader .start ()
1328
+
1329
+ global util_threads
1330
+ util_threads .append (reader )
1331
+
1332
+ return reader
0 commit comments