Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f637cd7

Browse files
funbringerkelvich
authored andcommitted
some improvements to mm_cluster.py
1 parent 9aab136 commit f637cd7

File tree

2 files changed

+321
-5
lines changed

2 files changed

+321
-5
lines changed

testgres_tests/mm_cluster.py

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
#!/usr/bin/env python
2+
#
3+
# Multimaster testing framework based on testgres.
4+
# Copyright (c) 2017, Postgres Professional
5+
#
6+
# Execute this file in order to run 3-node mm cluster
7+
8+
9+
from testgres import PostgresNode, NodeStatus, NodeBackup
10+
from testgres import reserve_port, release_port
11+
from testgres import default_username
12+
from testgres import DEFAULT_XLOG_METHOD
13+
14+
15+
# track important changes
16+
__version__ = 0.1
17+
18+
19+
class Cluster(object):
20+
def __init__(self,
21+
nodes,
22+
max_nodes=None,
23+
dbname='postgres',
24+
username=default_username(),
25+
max_connections=100):
26+
27+
max_nodes = max_nodes or nodes
28+
assert(max_nodes >= nodes)
29+
assert(nodes >= 1)
30+
31+
# maximum amount of nodes
32+
self.max_nodes = max_nodes
33+
34+
# list of ClusterNodes
35+
self.nodes = []
36+
37+
# connection settings
38+
self.dbname = dbname
39+
self.username = username
40+
41+
# generate pairs of ports for multimaster
42+
self.ports = [(reserve_port(), reserve_port()) for _ in range(nodes)]
43+
44+
# generate connection string
45+
conn_strings = self._build_mm_conn_strings(self.ports,
46+
dbname,
47+
username)
48+
49+
for i in range(nodes):
50+
pg_port, mm_port = self.ports[i]
51+
52+
node_id = i + 1
53+
node = ClusterNode(name=''.join(['node_', str(node_id)]),
54+
pg_port=pg_port,
55+
mm_port=mm_port)
56+
57+
node.init().mm_init(node_id,
58+
max_nodes,
59+
conn_strings,
60+
max_connections)
61+
62+
self.nodes.append(node)
63+
64+
@staticmethod
65+
def _get_mm_conn_string():
66+
return (
67+
"host=127.0.0.1 "
68+
"dbname={} "
69+
"user={} "
70+
"port={} "
71+
"arbiter_port={}"
72+
)
73+
74+
@staticmethod
75+
def _build_mm_conn_strings(ports, dbname, username):
76+
return ','.join([
77+
Cluster
78+
._get_mm_conn_string()
79+
.format(dbname,
80+
username,
81+
pg_port,
82+
mm_port) for pg_port, mm_port in ports
83+
])
84+
85+
def __enter__(self):
86+
return self
87+
88+
def __exit__(self, type, value, traceback):
89+
self.cleanup()
90+
91+
def free_ports(self):
92+
for p1, p2 in self.ports:
93+
release_port(p1)
94+
release_port(p2)
95+
96+
def free_nodes(self):
97+
for node in self.nodes:
98+
node.cleanup()
99+
100+
def cleanup(self):
101+
self.free_nodes()
102+
self.free_ports()
103+
104+
return self
105+
106+
def start(self):
107+
for node in self.nodes:
108+
node.start()
109+
110+
return self
111+
112+
def stop(self):
113+
for node in self.nodes:
114+
node.stop()
115+
116+
return self
117+
118+
def restart(self):
119+
for node in self.nodes:
120+
node.restart()
121+
122+
return self
123+
124+
def reload(self):
125+
for node in self.nodes:
126+
node.reload()
127+
128+
return self
129+
130+
def install(self):
131+
self.node_any().poll_query_until(dbname=self.dbname,
132+
username=self.username,
133+
query="create extension multimaster",
134+
raise_programming_error=False,
135+
expected=None)
136+
137+
return self
138+
139+
def add_node(self):
140+
if len(self.nodes) == self.max_nodes:
141+
raise Exception("max amount of nodes reached ({})"
142+
.format(self.max_nodes))
143+
144+
pg_port, mm_port = reserve_port(), reserve_port()
145+
node_id = len(self.nodes) + 1
146+
147+
# request multimaster config changes
148+
conn_string = self._get_mm_conn_string().format(self.dbname,
149+
self.username,
150+
pg_port,
151+
mm_port)
152+
add_node_cmd = "select mtm.add_node('{}')".format(conn_string)
153+
self.execute_any(dbname=self.dbname,
154+
username=self.username,
155+
query=add_node_cmd,
156+
commit=True)
157+
158+
# copy node with new ports
159+
backup = self.node_any().backup()
160+
node = backup.spawn_primary(name=''.join(['node_', str(node_id)]),
161+
node_id=node_id,
162+
pg_port=pg_port,
163+
mm_port=mm_port)
164+
165+
# register node and its ports
166+
self.nodes.append(node)
167+
self.ports.append((pg_port, mm_port))
168+
169+
# build new connection strings
170+
conn_strings = self._build_mm_conn_strings(self.ports,
171+
self.dbname,
172+
self.username)
173+
174+
# patch connection strings
175+
for node in self.nodes:
176+
node.append_conf("postgresql.conf", "\n")
177+
node.append_conf("postgresql.conf",
178+
"multimaster.conn_strings = '{}'"
179+
.format(conn_strings))
180+
181+
# finally start it
182+
node.start()
183+
184+
return self
185+
186+
def execute_any(self, dbname, query, username=None, commit=False):
187+
return self.node_any().execute(dbname=dbname,
188+
username=username,
189+
query=query,
190+
commit=commit)
191+
192+
def node_any(self, status=NodeStatus.Running):
193+
for node in self.nodes:
194+
if node.status():
195+
return node
196+
197+
raise Exception("at least one node must be running")
198+
199+
200+
class ClusterNodeBackup(NodeBackup):
201+
def __init__(self,
202+
node,
203+
base_dir=None,
204+
username=None,
205+
xlog_method=DEFAULT_XLOG_METHOD):
206+
207+
super(ClusterNodeBackup, self).__init__(node,
208+
base_dir=base_dir,
209+
username=username,
210+
xlog_method=xlog_method)
211+
212+
def spawn_primary(self, name, node_id, pg_port, mm_port, destroy=True):
213+
base_dir = self._prepare_dir(destroy)
214+
215+
# build a new PostgresNode
216+
node = ClusterNode(name=name,
217+
base_dir=base_dir,
218+
master=self.original_node,
219+
pg_port=pg_port,
220+
mm_port=mm_port)
221+
222+
node.append_conf("postgresql.conf", "\n")
223+
node.append_conf("postgresql.conf",
224+
"port = {}".format(pg_port))
225+
node.append_conf("postgresql.conf",
226+
"multimaster.arbiter_port = {}".format(mm_port))
227+
node.append_conf("postgresql.conf",
228+
"multimaster.node_id = {}".format(node_id))
229+
230+
return node
231+
232+
def spawn_replica(self, name):
233+
raise Exception("not implemented yet")
234+
235+
236+
class ClusterNode(PostgresNode):
237+
def __init__(self,
238+
name,
239+
pg_port,
240+
mm_port,
241+
base_dir=None,
242+
use_logging=False,
243+
master=None):
244+
245+
super(ClusterNode, self).__init__(name=name,
246+
port=pg_port,
247+
base_dir=base_dir,
248+
use_logging=use_logging,
249+
master=master)
250+
251+
self.mm_port = mm_port
252+
253+
def teardown(self):
254+
self.stop(['-c', 'immediate'])
255+
256+
def mm_init(self, node_id, max_nodes, conn_strings, max_connections):
257+
mm_port = self.mm_port
258+
259+
conf_lines = (
260+
"shared_preload_libraries='multimaster'\n"
261+
262+
"max_connections = {0}\n"
263+
"max_prepared_transactions = {1}\n"
264+
"max_worker_processes = {2}\n"
265+
266+
"wal_level = logical\n"
267+
"max_wal_senders = {5}\n"
268+
"max_replication_slots = {5}\n"
269+
270+
"multimaster.conn_strings = '{3}'\n"
271+
"multimaster.arbiter_port = {4}\n"
272+
"multimaster.max_nodes = {5}\n"
273+
"multimaster.node_id = {6}\n"
274+
275+
).format(max_connections,
276+
max_connections * max_nodes,
277+
(max_connections + 3) * max_nodes,
278+
conn_strings,
279+
mm_port,
280+
max_nodes,
281+
node_id)
282+
283+
self.append_conf("postgresql.conf", conf_lines)
284+
285+
return self
286+
287+
def backup(self, username=None, xlog_method=DEFAULT_XLOG_METHOD):
288+
return ClusterNodeBackup(node=self,
289+
username=username,
290+
xlog_method=xlog_method)
291+
292+
293+
if __name__ == "__main__":
294+
import os
295+
296+
if os.environ.get('PG_CONFIG') is None and \
297+
os.environ.get('PG_BIN') is None:
298+
299+
# Do you rely on system PostgreSQL?
300+
print("WARNING: both PG_CONFIG and PG_BIN are not set")
301+
302+
# Start mm cluster
303+
with Cluster(3).start().install() as cluster:
304+
print("Cluster is working")
305+
306+
node_id = 0
307+
for node in cluster.nodes:
308+
node_id += 1
309+
310+
print("Node #{}".format(node_id))
311+
print("\t-> port: {}".format(node.port))
312+
print("\t-> arbiter port: {}".format(node.mm_port))
313+
print("\t-> dir: {}".format(node.base_dir))
314+
print()
315+
316+
print("Press ctrl+C to exit")
317+
318+
# Sleep forever
319+
while True:
320+
import time
321+
time.sleep(1)

testgres_tests/run.sh renamed to testgres_tests/run_tests.sh

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
#!/usr/bin/bash
22

3-
if [ -z "$PG_CONFIG" ]; then
4-
>&2 echo ERROR: you must set PG_CONFIG
5-
exit 1
6-
fi
7-
83
if [ -z "$VIRTUAL_ENV" ]; then
94
>&2 echo WARNING: not in virtualenv
105
fi

0 commit comments

Comments
 (0)