13
13
import re
14
14
import subprocess
15
15
import threading
16
- import json
17
16
import time
18
17
import unittest
19
18
20
19
from distutils .version import LooseVersion
21
- from testgres import get_new_node , get_bin_path , get_pg_config
20
+ from testgres import get_new_node , get_bin_path , get_pg_version
22
21
23
22
# set setup base logging config, it can be turned on by `use_logging`
24
23
# parameter on node setup
54
53
}
55
54
56
55
logging .config .dictConfig (LOG_CONFIG )
57
- version = LooseVersion (get_pg_config (). get ( "VERSION_NUM" ))
56
+ version = LooseVersion (get_pg_version ( ))
58
57
59
58
60
59
# Helper function for json equality
@@ -106,23 +105,6 @@ def start_new_pathman_cluster(self,
106
105
107
106
return node
108
107
109
- def catchup_replica (self , master , replica ):
110
- """ Wait until replica synchronizes with master """
111
- if version >= LooseVersion ('10' ):
112
- wait_lsn_query = """
113
- SELECT pg_current_wal_lsn() <= replay_lsn
114
- FROM pg_stat_replication
115
- WHERE application_name = '{0}'
116
- """
117
- else :
118
- wait_lsn_query = """
119
- SELECT pg_current_xlog_location() <= replay_location
120
- FROM pg_stat_replication
121
- WHERE application_name = '{0}'
122
- """
123
-
124
- master .poll_query_until ('postgres' , wait_lsn_query .format (replica .name ))
125
-
126
108
def test_concurrent (self ):
127
109
""" Test concurrent partitioning """
128
110
@@ -158,8 +140,7 @@ def test_replication(self):
158
140
with self .start_new_pathman_cluster (allow_streaming = True , test_data = True ) as node :
159
141
with node .replicate ('node2' ) as replica :
160
142
replica .start ()
161
- # wait until replica catches up
162
- self .catchup_replica (node , replica )
143
+ replica .catchup ()
163
144
164
145
# check that results are equal
165
146
self .assertEqual (
@@ -169,7 +150,9 @@ def test_replication(self):
169
150
# enable parent and see if it is enabled in replica
170
151
node .psql ('postgres' , "select enable_parent('abc')" )
171
152
172
- self .catchup_replica (node , replica )
153
+ # wait until replica catches up
154
+ replica .catchup ()
155
+
173
156
self .assertEqual (
174
157
node .psql ('postgres' , 'explain (costs off) select * from abc' ),
175
158
replica .psql ('postgres' , 'explain (costs off) select * from abc' ))
@@ -182,7 +165,10 @@ def test_replication(self):
182
165
# check that UPDATE in pathman_config_params invalidates cache
183
166
node .psql ('postgres' ,
184
167
'update pathman_config_params set enable_parent = false' )
185
- self .catchup_replica (node , replica )
168
+
169
+ # wait until replica catches up
170
+ replica .catchup ()
171
+
186
172
self .assertEqual (
187
173
node .psql ('postgres' , 'explain (costs off) select * from abc' ),
188
174
replica .psql ('postgres' , 'explain (costs off) select * from abc' ))
@@ -688,7 +674,7 @@ def con2_thread():
688
674
explain (analyze, costs off, timing off)
689
675
select * from drop_test
690
676
where val = any (select generate_series(1, 40, 34))
691
- """ ) # query selects from drop_test_1 and drop_test_4
677
+ """ ) # query selects from drop_test_1 and drop_test_4
692
678
693
679
con2 .commit ()
694
680
@@ -712,15 +698,14 @@ def con2_thread():
712
698
# return all values in tuple
713
699
queue .put ((has_runtime_append , has_drop_test_1 , has_drop_test_4 ))
714
700
715
-
716
701
# Step 1: cache partitioned table in con1
717
702
con1 .begin ()
718
- con1 .execute ('select count(*) from drop_test' ) # load pathman's cache
703
+ con1 .execute ('select count(*) from drop_test' ) # load pathman's cache
719
704
con1 .commit ()
720
705
721
706
# Step 2: cache partitioned table in con2
722
707
con2 .begin ()
723
- con2 .execute ('select count(*) from drop_test' ) # load pathman's cache
708
+ con2 .execute ('select count(*) from drop_test' ) # load pathman's cache
724
709
con2 .commit ()
725
710
726
711
# Step 3: drop first partition of 'drop_test'
@@ -786,12 +771,12 @@ def con2_thread():
786
771
787
772
# Step 1: lock partitioned table in con1
788
773
con1 .begin ()
789
- con1 .execute ('select count(*) from ins_test' ) # load pathman's cache
774
+ con1 .execute ('select count(*) from ins_test' ) # load pathman's cache
790
775
con1 .execute ('lock table ins_test in share update exclusive mode' )
791
776
792
777
# Step 2: try inserting new value in con2 (waiting)
793
778
con2 .begin ()
794
- con2 .execute ('select count(*) from ins_test' ) # load pathman's cache
779
+ con2 .execute ('select count(*) from ins_test' ) # load pathman's cache
795
780
t = threading .Thread (target = con2_thread )
796
781
t .start ()
797
782
@@ -853,12 +838,12 @@ def con2_thread():
853
838
854
839
# Step 1: initilize con1
855
840
con1 .begin ()
856
- con1 .execute ('select count(*) from ins_test' ) # load pathman's cache
841
+ con1 .execute ('select count(*) from ins_test' ) # load pathman's cache
857
842
858
843
# Step 2: initilize con2
859
844
con2 .begin ()
860
- con2 .execute ('select count(*) from ins_test' ) # load pathman's cache
861
- con2 .commit () # unlock relations
845
+ con2 .execute ('select count(*) from ins_test' ) # load pathman's cache
846
+ con2 .commit () # unlock relations
862
847
863
848
# Step 3: merge 'ins_test1' + 'ins_test_2' in con1 (success)
864
849
con1 .execute (
@@ -1031,12 +1016,12 @@ def turnon_pathman(node):
1031
1016
get_bin_path ("pg_dump" ), "-p {}" .format (node .port ),
1032
1017
"initial"
1033
1018
], [get_bin_path ("psql" ), "-p {}" .format (node .port ), "copy" ],
1034
- cmp_full ), # dump as plain text and restore via COPY
1019
+ cmp_full ), # dump as plain text and restore via COPY
1035
1020
(turnoff_pathman , turnon_pathman , [
1036
1021
get_bin_path ("pg_dump" ), "-p {}" .format (node .port ),
1037
1022
"--inserts" , "initial"
1038
1023
], [get_bin_path ("psql" ), "-p {}" .format (node .port ), "copy" ],
1039
- cmp_full ), # dump as plain text and restore via INSERTs
1024
+ cmp_full ), # dump as plain text and restore via INSERTs
1040
1025
(None , None , [
1041
1026
get_bin_path ("pg_dump" ), "-p {}" .format (node .port ),
1042
1027
"--format=custom" , "initial"
@@ -1052,7 +1037,7 @@ def turnon_pathman(node):
1052
1037
dump_restore_cmd = " | " .join ((' ' .join (pg_dump_params ),
1053
1038
' ' .join (pg_restore_params )))
1054
1039
1055
- if (preproc != None ):
1040
+ if (preproc is not None ):
1056
1041
preproc (node )
1057
1042
1058
1043
# transfer and restore data
@@ -1065,12 +1050,12 @@ def turnon_pathman(node):
1065
1050
stderr = fnull )
1066
1051
p2 .communicate (input = stdoutdata )
1067
1052
1068
- if (postproc != None ):
1053
+ if (postproc is not None ):
1069
1054
postproc (node )
1070
1055
1071
1056
# validate data
1072
1057
with node .connect ('initial' ) as con1 , \
1073
- node .connect ('copy' ) as con2 :
1058
+ node .connect ('copy' ) as con2 :
1074
1059
1075
1060
# compare plans and contents of initial and copy
1076
1061
cmp_result = cmp_dbs (con1 , con2 )
@@ -1092,8 +1077,8 @@ def turnon_pathman(node):
1092
1077
config_params_initial [row [0 ]] = row [1 :]
1093
1078
for row in con2 .execute (config_params_query ):
1094
1079
config_params_copy [row [0 ]] = row [1 :]
1095
- self .assertEqual (config_params_initial , config_params_copy , \
1096
- "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
1080
+ self .assertEqual (config_params_initial , config_params_copy ,
1081
+ "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
1097
1082
1098
1083
# compare constraints on each partition
1099
1084
constraints_query = """
@@ -1106,8 +1091,8 @@ def turnon_pathman(node):
1106
1091
constraints_initial [row [0 ]] = row [1 :]
1107
1092
for row in con2 .execute (constraints_query ):
1108
1093
constraints_copy [row [0 ]] = row [1 :]
1109
- self .assertEqual (constraints_initial , constraints_copy , \
1110
- "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
1094
+ self .assertEqual (constraints_initial , constraints_copy ,
1095
+ "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
1111
1096
1112
1097
# clear copy database
1113
1098
node .psql ('copy' , 'drop schema public cascade' )
@@ -1128,9 +1113,9 @@ def test_concurrent_detach(self):
1128
1113
test_interval = int (math .ceil (detach_timeout * num_detachs ))
1129
1114
1130
1115
insert_pgbench_script = os .path .dirname (os .path .realpath (__file__ )) \
1131
- + "/pgbench_scripts/insert_current_timestamp.pgbench"
1116
+ + "/pgbench_scripts/insert_current_timestamp.pgbench"
1132
1117
detach_pgbench_script = os .path .dirname (os .path .realpath (__file__ )) \
1133
- + "/pgbench_scripts/detachs_in_timeout.pgbench"
1118
+ + "/pgbench_scripts/detachs_in_timeout.pgbench"
1134
1119
1135
1120
# Check pgbench scripts on existance
1136
1121
self .assertTrue (
@@ -1202,16 +1187,14 @@ def test_update_node_plan1(self):
1202
1187
Test scan on all partititions when using update node.
1203
1188
We can't use regression tests here because 9.5 and 9.6 give
1204
1189
different plans
1205
- '''
1190
+ '''
1206
1191
1207
1192
with get_new_node ('test_update_node' ) as node :
1208
1193
node .init ()
1209
- node .append_conf (
1210
- 'postgresql.conf' ,
1211
- """
1212
- shared_preload_libraries=\' pg_pathman\'
1213
- pg_pathman.override_copy=false
1214
- pg_pathman.enable_partitionrouter=on
1194
+ node .append_conf ('postgresql.conf' , """
1195
+ shared_preload_libraries=\' pg_pathman\'
1196
+ pg_pathman.override_copy=false
1197
+ pg_pathman.enable_partitionrouter=on
1215
1198
""" )
1216
1199
node .start ()
1217
1200
@@ -1275,5 +1258,6 @@ def test_update_node_plan1(self):
1275
1258
node .psql ('postgres' , 'DROP SCHEMA test_update_node CASCADE;' )
1276
1259
node .psql ('postgres' , 'DROP EXTENSION pg_pathman CASCADE;' )
1277
1260
1261
+
1278
1262
if __name__ == "__main__" :
1279
1263
unittest .main ()
0 commit comments