1
+ #!/usr/bin/env python3
1
2
# coding: utf-8
3
+
2
4
"""
3
5
concurrent_partitioning_test.py
4
6
Tests concurrent partitioning worker with simultaneous update queries
5
7
6
- Copyright (c) 2015-2016 , Postgres Professional
8
+ Copyright (c) 2015-2017 , Postgres Professional
7
9
"""
8
10
9
11
import unittest
10
12
import math
11
- from testgres import get_new_node , stop_all
12
13
import time
13
14
import os
14
15
import re
15
16
import subprocess
16
17
import threading
17
18
19
+ from testgres import get_new_node , stop_all
20
+
18
21
19
22
# Helper function for json equality
20
23
def ordered (obj ):
@@ -25,6 +28,7 @@ def ordered(obj):
25
28
else :
26
29
return obj
27
30
31
+
28
32
def if_fdw_enabled (func ):
29
33
"""To run tests with FDW support set environment variable TEST_FDW=1"""
30
34
def wrapper (* args , ** kwargs ):
@@ -110,7 +114,7 @@ def test_concurrent(self):
110
114
self .assertEqual (data [0 ][0 ], 300000 )
111
115
112
116
node .stop ()
113
- except Exception , e :
117
+ except Exception as e :
114
118
self .printlog (node .logs_dir + '/postgresql.log' )
115
119
raise e
116
120
@@ -175,7 +179,7 @@ def test_replication(self):
175
179
node .execute ('postgres' , 'select count(*) from abc' )[0 ][0 ],
176
180
0
177
181
)
178
- except Exception , e :
182
+ except Exception as e :
179
183
self .printlog (node .logs_dir + '/postgresql.log' )
180
184
self .printlog (replica .logs_dir + '/postgresql.log' )
181
185
raise e
@@ -199,7 +203,7 @@ def get(self):
199
203
200
204
# There is one flag for each thread which shows if thread have done
201
205
# its work
202
- flags = [Flag (False ) for i in xrange (3 )]
206
+ flags = [Flag (False ) for i in range (3 )]
203
207
204
208
# All threads synchronizes though this lock
205
209
lock = threading .Lock ()
@@ -275,9 +279,9 @@ def add_partition(node, flag, query):
275
279
'postgres' ,
276
280
'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
277
281
),
278
- '6\n '
282
+ b '6\n '
279
283
)
280
- except Exception , e :
284
+ except Exception as e :
281
285
self .printlog (node .logs_dir + '/postgresql.log' )
282
286
raise e
283
287
@@ -422,14 +426,14 @@ def test_foreign_table(self):
422
426
# Check that table attached to partitioned table
423
427
self .assertEqual (
424
428
master .safe_psql ('postgres' , 'select * from ftable' ),
425
- '25|foreign\n '
429
+ b '25|foreign\n '
426
430
)
427
431
428
432
# Check that we can successfully insert new data into foreign partition
429
433
master .safe_psql ('postgres' , 'insert into abc values (26, \' part\' )' )
430
434
self .assertEqual (
431
435
master .safe_psql ('postgres' , 'select * from ftable order by id' ),
432
- '25|foreign\n 26|part\n '
436
+ b '25|foreign\n 26|part\n '
433
437
)
434
438
435
439
# Testing drop partitions (including foreign partitions)
@@ -459,7 +463,7 @@ def test_foreign_table(self):
459
463
460
464
self .assertEqual (
461
465
master .safe_psql ('postgres' , 'select * from hash_test' ),
462
- '1|\n 2|\n 5|\n 6|\n 8|\n 9|\n 3|\n 4|\n 7|\n 10|\n '
466
+ b '1|\n 2|\n 5|\n 6|\n 8|\n 9|\n 3|\n 4|\n 7|\n 10|\n '
463
467
)
464
468
master .safe_psql ('postgres' , 'select drop_partitions(\' hash_test\' )' )
465
469
@@ -851,63 +855,72 @@ def turnon_pathman(node):
851
855
"--dbname=copy" ],
852
856
cmp_full ), # dump in archive format
853
857
]
854
- for preproc , postproc , pg_dump_params , pg_restore_params , cmp_dbs in test_params :
855
858
856
- dump_restore_cmd = " | " .join ((' ' .join (pg_dump_params ), ' ' .join (pg_restore_params )))
857
-
858
- if (preproc != None ):
859
- preproc (node )
860
-
861
- # transfer and restore data
859
+ try :
862
860
FNULL = open (os .devnull , 'w' )
863
- p1 = subprocess .Popen (pg_dump_params , stdout = subprocess .PIPE )
864
- p2 = subprocess .Popen (pg_restore_params , stdin = p1 .stdout , stdout = FNULL , stderr = FNULL )
865
- p1 .stdout .close () # Allow p1 to receive a SIGPIPE if p2 exits.
866
- p2 .communicate ()
867
-
868
- if (postproc != None ):
869
- postproc (node )
870
-
871
- # check validity of data
872
- with node .connect ('initial' ) as con1 , node .connect ('copy' ) as con2 :
873
-
874
- # compare plans and contents of initial and copy
875
- cmp_result = cmp_dbs (con1 , con2 )
876
- self .assertNotEqual (cmp_result , PLANS_MISMATCH ,
877
- "mismatch in plans of select query on partitioned tables under the command: %s" % dump_restore_cmd )
878
- self .assertNotEqual (cmp_result , CONTENTS_MISMATCH ,
879
- "mismatch in contents of partitioned tables under the command: %s" % dump_restore_cmd )
880
-
881
- # compare enable_parent flag and callback function
882
- config_params_query = """
883
- select partrel, enable_parent, init_callback from pathman_config_params
884
- """
885
- config_params_initial , config_params_copy = {}, {}
886
- for row in con1 .execute (config_params_query ):
887
- config_params_initial [row [0 ]] = row [1 :]
888
- for row in con2 .execute (config_params_query ):
889
- config_params_copy [row [0 ]] = row [1 :]
890
- self .assertEqual (config_params_initial , config_params_copy , \
891
- "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
892
-
893
- # compare constraints on each partition
894
- constraints_query = """
895
- select r.relname, c.conname, c.consrc from
896
- pg_constraint c join pg_class r on c.conrelid=r.oid
897
- where relname similar to '(range|hash)_partitioned_\d+'
898
- """
899
- constraints_initial , constraints_copy = {}, {}
900
- for row in con1 .execute (constraints_query ):
901
- constraints_initial [row [0 ]] = row [1 :]
902
- for row in con2 .execute (constraints_query ):
903
- constraints_copy [row [0 ]] = row [1 :]
904
- self .assertEqual (constraints_initial , constraints_copy , \
905
- "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
906
-
907
- # clear copy database
908
- node .psql ('copy' , 'drop schema public cascade' )
909
- node .psql ('copy' , 'create schema public' )
910
- node .psql ('copy' , 'drop extension pg_pathman cascade' )
861
+
862
+ for preproc , postproc , pg_dump_params , pg_restore_params , cmp_dbs in test_params :
863
+
864
+ dump_restore_cmd = " | " .join ((' ' .join (pg_dump_params ), ' ' .join (pg_restore_params )))
865
+
866
+ if (preproc != None ):
867
+ preproc (node )
868
+
869
+ # transfer and restore data
870
+ p1 = subprocess .Popen (pg_dump_params , stdout = subprocess .PIPE )
871
+ stdoutdata , _ = p1 .communicate ()
872
+ p2 = subprocess .Popen (pg_restore_params , stdin = subprocess .PIPE ,
873
+ stdout = FNULL , stderr = FNULL )
874
+ p2 .communicate (input = stdoutdata )
875
+
876
+ if (postproc != None ):
877
+ postproc (node )
878
+
879
+ # check validity of data
880
+ with node .connect ('initial' ) as con1 , node .connect ('copy' ) as con2 :
881
+
882
+ # compare plans and contents of initial and copy
883
+ cmp_result = cmp_dbs (con1 , con2 )
884
+ self .assertNotEqual (cmp_result , PLANS_MISMATCH ,
885
+ "mismatch in plans of select query on partitioned tables under the command: %s" % dump_restore_cmd )
886
+ self .assertNotEqual (cmp_result , CONTENTS_MISMATCH ,
887
+ "mismatch in contents of partitioned tables under the command: %s" % dump_restore_cmd )
888
+
889
+ # compare enable_parent flag and callback function
890
+ config_params_query = """
891
+ select partrel, enable_parent, init_callback from pathman_config_params
892
+ """
893
+ config_params_initial , config_params_copy = {}, {}
894
+ for row in con1 .execute (config_params_query ):
895
+ config_params_initial [row [0 ]] = row [1 :]
896
+ for row in con2 .execute (config_params_query ):
897
+ config_params_copy [row [0 ]] = row [1 :]
898
+ self .assertEqual (config_params_initial , config_params_copy , \
899
+ "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
900
+
901
+ # compare constraints on each partition
902
+ constraints_query = """
903
+ select r.relname, c.conname, c.consrc from
904
+ pg_constraint c join pg_class r on c.conrelid=r.oid
905
+ where relname similar to '(range|hash)_partitioned_\d+'
906
+ """
907
+ constraints_initial , constraints_copy = {}, {}
908
+ for row in con1 .execute (constraints_query ):
909
+ constraints_initial [row [0 ]] = row [1 :]
910
+ for row in con2 .execute (constraints_query ):
911
+ constraints_copy [row [0 ]] = row [1 :]
912
+ self .assertEqual (constraints_initial , constraints_copy , \
913
+ "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
914
+
915
+ # clear copy database
916
+ node .psql ('copy' , 'drop schema public cascade' )
917
+ node .psql ('copy' , 'create schema public' )
918
+ node .psql ('copy' , 'drop extension pg_pathman cascade' )
919
+
920
+ except :
921
+ raise
922
+ finally :
923
+ FNULL .close ()
911
924
912
925
# Stop instance and finish work
913
926
node .stop ()
@@ -958,24 +971,24 @@ def test_concurrent_detach(self):
958
971
"-T" , "%i" % (test_interval + inserts_advance )
959
972
])
960
973
time .sleep (inserts_advance )
961
- detachs = node .pgbench (stdout = FNULL , stderr = subprocess . PIPE , options = [
974
+ detachs = node .pgbench (stdout = FNULL , stderr = FNULL , options = [
962
975
"-D" , "timeout=%f" % detach_timeout ,
963
976
"-f" , detach_pgbench_script ,
964
977
"-T" , "%i" % test_interval
965
978
])
966
979
967
980
# Wait for completion of processes
968
- inserts .wait ()
981
+ _ , stderrdata = inserts .communicate ()
969
982
detachs .wait ()
970
983
971
984
# Obtain error log from inserts process
972
- inserts_errors = inserts .stderr .read ()
973
- self .assertIsNone (re .search ("ERROR|FATAL|PANIC" , inserts_errors ),
985
+ self .assertIsNone (re .search ("ERROR|FATAL|PANIC" , str (stderrdata )),
974
986
msg = "Race condition between detach and concurrent inserts with append partition is expired" )
975
987
976
988
# Stop instance and finish work
977
989
node .stop ()
978
990
node .cleanup ()
991
+ FNULL .close ()
979
992
980
993
981
994
if __name__ == "__main__" :
0 commit comments