22
22
DBNAME = "postgres"
23
23
NODE_NAMES = ["Luke" , "C3PO" , "Palpatine" , "DarthMaul" , "jabba" , "bobafett" ]
24
24
25
+ def common_conn_string (port ):
26
+ return "dbname={} port={}" .format (DBNAME , port )
27
+
25
28
class Shardlord (PostgresNode ):
26
29
def __init__ (self , name , port = None ):
27
30
# worker_id (int) -> PostgresNode
@@ -38,12 +41,8 @@ def __init__(self, name, port=None):
38
41
use_logging = True )
39
42
super (Shardlord , self ).init ()
40
43
41
- @staticmethod
42
- def _common_conn_string (port ):
43
- return "dbname={} port={}" .format (DBNAME , port )
44
-
45
44
def _shardlord_connstring (self ):
46
- return self . _common_conn_string (self .port )
45
+ return common_conn_string (self .port )
47
46
48
47
def _common_conf_lines (self ):
49
48
return (
@@ -82,12 +81,14 @@ def start_lord(self):
82
81
return self
83
82
84
83
# create fresh cluster with given num of repgroups and nodes in each one
85
- def create_cluster (self , num_repgroups , nodes_in_repgroup = 1 ):
84
+ def create_cluster (self , num_repgroups , nodes_in_repgroup = 1 ,
85
+ worker_creation_cbk = None ):
86
86
self .destroy_cluster ()
87
87
self .start_lord ()
88
88
for rgnum in range (num_repgroups ):
89
89
for nodenum in range (nodes_in_repgroup ):
90
- self .add_node (repl_group = "rg_{}" .format (rgnum ))
90
+ self .add_node (repl_group = "rg_{}" .format (rgnum ),
91
+ worker_creation_cbk = worker_creation_cbk )
91
92
92
93
# destroy and shutdown everything, but keep nodes
93
94
def destroy_cluster (self ):
@@ -177,19 +178,26 @@ def pop_worker(self):
177
178
node .append_conf ("postgresql.conf" , config_lines )
178
179
return node ;
179
180
180
- # Add worker using reserved node, returns node instance, node id pair
181
- def add_node (self , repl_group = None , additional_conf = "" ):
181
+ # Add worker using reserved node, returns node instance, node id pair.
182
+ # Callback is called when node is started, but not yet registred. It might
183
+ # return conn_string, uh.
184
+ def add_node (self , repl_group = None , additional_conf = "" , worker_creation_cbk = None ):
182
185
node = self .pop_worker ()
183
186
184
187
# start this node
185
188
node .append_conf ("postgresql.conf" , additional_conf ) \
186
189
.start () \
187
- .safe_psql (DBNAME , "drop extension if exists pg_shardman; create extension pg_shardman cascade;" )
190
+ .safe_psql (DBNAME , "create extension pg_shardman cascade;" )
191
+ # call callback, if needed
192
+ conn_string = None
193
+ if worker_creation_cbk :
194
+ conn_string = worker_creation_cbk (node )
195
+ conn_string = "'{}'" .format (conn_string ) if conn_string else 'NULL' ;
196
+ repl_group = "'{}'" .format (repl_group ) if repl_group else 'NULL' ;
188
197
# and register this node
189
- conn_string = self ._common_conn_string (node .port )
190
- add_node_cmd = "select shardman.add_node('{}' {})" .format (
191
- conn_string , ", repl_group => '{}'" .format (repl_group ) if repl_group
192
- else '' )
198
+ super_conn_string = common_conn_string (node .port )
199
+ add_node_cmd = "select shardman.add_node('{}', conn_string => {}, " "repl_group => {})" \
200
+ .format (super_conn_string , conn_string , repl_group )
193
201
new_node_id = int (self .execute (DBNAME , add_node_cmd )[0 ][0 ])
194
202
self .workers_dict [new_node_id ] = node
195
203
@@ -418,6 +426,57 @@ def test_rebalance(self):
418
426
419
427
self .pt_cleanup ()
420
428
429
+ # perform basic steps: add nodes, shard table, rebalance it and rm table
430
+ # with non-super user.
431
+ def test_non_super_user (self ):
432
+ self .lord .start_lord ()
433
+
434
+ # shard some table, make sure everyone sees it and replicas are good
435
+ os .environ ["PGPASSWORD" ] = "12345"
436
+ self .lord .create_cluster (3 , 2 , worker_creation_cbk = non_super_user_cbk )
437
+ self .lord .safe_psql (
438
+ DBNAME , 'create table pt(id int primary key, payload int);' )
439
+ self .lord .safe_psql (
440
+ DBNAME , "select shardman.create_hash_partitions('pt', 'id', 12, redundancy => 1);" )
441
+ self .lord .workers [0 ].safe_psql (
442
+ DBNAME ,
443
+ "insert into pt select generate_series(1, 1000), (random() * 100)::int" ,
444
+ username = 'joe' )
445
+
446
+ # everyone sees the whole
447
+ luke_sum = int (self .lord .workers [0 ].execute (
448
+ DBNAME , sum_query ("pt" ), username = 'joe' )[0 ][0 ])
449
+ for worker in self .lord .workers [1 :]:
450
+ worker_sum = int (worker .execute (
451
+ DBNAME , sum_query ("pt" ), username = 'joe' )[0 ][0 ])
452
+ self .assertEqual (luke_sum , worker_sum )
453
+
454
+ # replicas integrity
455
+ parts = self .lord .execute (
456
+ DBNAME , "select part_name, node_id from shardman.partitions" )
457
+ for part_name , node_id in parts :
458
+ part_sum = self .lord .workers_dict [int (node_id )].execute (
459
+ DBNAME , sum_query (part_name ), username = 'joe' )
460
+ replicas = self .lord .execute (
461
+ DBNAME ,
462
+ "select node_id from shardman.replicas where part_name = '{}'" \
463
+ .format (part_name ))
464
+ for replica in replicas :
465
+ replica_id = int (replica [0 ])
466
+ replica_sum = self .lord .workers_dict [replica_id ].execute (
467
+ DBNAME , sum_query (part_name ), username = 'joe' )
468
+ self .assertEqual (part_sum , replica_sum )
469
+
470
+ # now rm table
471
+ self .lord .safe_psql (DBNAME , "select shardman.rm_table('pt')" )
472
+ for worker in self .lord .workers :
473
+ ptrels = worker .execute (
474
+ DBNAME , "select relname from pg_class where relname ~ '^pt.*';" )
475
+ self .assertEqual (len (ptrels ), 0 )
476
+
477
+ self .lord .safe_psql (DBNAME , "drop table pt;" )
478
+ self .lord .destroy_cluster ()
479
+
421
480
def test_deadlock_detector (self ):
422
481
self .lord .create_cluster (2 )
423
482
self .lord .safe_psql (
@@ -780,6 +839,35 @@ def test_copy_from(self):
780
839
self .lord .safe_psql (DBNAME , "drop table pt_text;" )
781
840
self .lord .destroy_cluster ()
782
841
842
+ # Create user joe and allow it to use shardman; configure pg_hba accordingly.
843
+ # Unfortunately, we must use password, because postgres_fdw forbids passwordless
844
+ # access for non-superusers
845
+ def non_super_user_cbk (worker ):
846
+ worker .safe_psql (DBNAME ,
847
+ """
848
+ set synchronous_commit to local;
849
+ drop role if exists joe;
850
+ create role joe login password '12345';
851
+ grant usage on foreign data wrapper postgres_fdw to joe;
852
+ grant all privileges on schema shardman to group joe;
853
+ """ )
854
+ worker .stop ()
855
+ hba_conf_path = os .path .join (worker .data_dir , "pg_hba.conf" )
856
+ with open (hba_conf_path , "w" ) as hba_conf_file :
857
+ pg_hba = [
858
+ "local\t all\t joe\t password\n " ,
859
+ "local\t all\t all\t trust\n " ,
860
+ "host\t all\t all\t 127.0.0.1/32\t trust\n " ,
861
+ "local\t replication\t all\t trust\n " ,
862
+ "host\t replication\t all\t 127.0.0.1/32\t trust\n "
863
+ "host\t replication\t all\t ::1/128\t trust\n "
864
+ ]
865
+ hba_conf_file .writelines (pg_hba )
866
+ worker .start ()
867
+ conn_string = common_conn_string (worker .port ) + " user=joe password=12345"
868
+ return conn_string
869
+
870
+
783
871
# We violate good practices and order the tests -- it doesn't make sense to
784
872
# e.g. test copy_from if add_node doesn't work.
785
873
def suite ():
0 commit comments