Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2d19bba

Browse files
committed
replication
1 parent 1244cbd commit 2d19bba

File tree

3 files changed

+70
-8
lines changed

3 files changed

+70
-8
lines changed

testgres/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from testgres import PostgresNode, get_new_node, clean_all
1+
from testgres import PostgresNode, get_new_node, clean_all, stop_all

testgres/testgres.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import pwd
2929
import tempfile
3030
import shutil
31+
import time
3132

3233
# Try to use psycopg2 by default. If psycopg2 isn"t available then use
3334
# pg8000 which is slower but much more portable because uses only
@@ -45,13 +46,14 @@
4546
last_assigned_port = int(random.random() * 16384) + 49152;
4647

4748

48-
class ClusterException(Exception):
49-
pass
49+
class ClusterException(Exception): pass
50+
class QueryException(Exception): pass
5051

5152

5253
class PostgresNode:
5354
def __init__(self, name, port):
5455
self.name = name
56+
self.host = '127.0.0.1'
5557
self.port = port
5658
self.base_dir = tempfile.mkdtemp()
5759
os.makedirs(self.logs_dir)
@@ -75,6 +77,11 @@ def output_filename(self):
7577
def error_filename(self):
7678
return "%s/stderr.log" % self.logs_dir
7779

80+
@property
81+
def connstr(self):
82+
return "port=%s" % self.port
83+
# return "port=%s host=%s" % (self.port, self.host)
84+
7885
def load_pg_config(self):
7986
""" Loads pg_config output into dict """
8087
pg_config = os.environ.get("PG_CONFIG") \
@@ -116,6 +123,10 @@ def init(self, allows_streaming=False):
116123
"fsync = off\n"
117124
"log_statement = all\n"
118125
"port = %s\n" % self.port)
126+
conf.write(
127+
# "unix_socket_directories = '%s'\n"
128+
# "listen_addresses = ''\n";)
129+
"listen_addresses = '%s'\n" % self.host)
119130

120131
if allows_streaming:
121132
conf.write(
@@ -129,7 +140,7 @@ def init(self, allows_streaming=False):
129140
"max_connections = 10\n")
130141
self.set_replication_conf()
131142

132-
def init_from_backup(self, root_node, backup_name):
143+
def init_from_backup(self, root_node, backup_name, has_streaming=False, hba_permit_replication=True):
133144
"""Initializes cluster from backup, made by another node"""
134145

135146
# Copy data from backup
@@ -142,11 +153,25 @@ def init_from_backup(self, root_node, backup_name):
142153
"postgresql.conf",
143154
"port = %s" % self.port
144155
)
156+
# Enable streaming
157+
if hba_permit_replication:
158+
self.set_replication_conf()
159+
if has_streaming:
160+
self.enable_streaming(root_node)
145161

146162
def set_replication_conf(self):
147163
hba_conf = "%s/pg_hba.conf" % self.data_dir
148164
with open(hba_conf, "a") as conf:
149165
conf.write("local replication all trust\n")
166+
# conf.write("host replication all 127.0.0.1/32 trust\n")
167+
168+
def enable_streaming(self, root_node):
169+
config_name = "%s/recovery.conf" % self.data_dir
170+
with open(config_name, "a") as conf:
171+
conf.write(
172+
"primary_conninfo='%s application_name=%s'\n"
173+
"standby_mode=on\n"
174+
% (root_node.connstr, self.name))
150175

151176
def append_conf(self, filename, string):
152177
"""Appends line to a config file like "postgresql.conf"
@@ -254,6 +279,22 @@ def safe_psql(self, dbname, query):
254279
raise ClusterException("psql failed:\n" + err)
255280
return out
256281

282+
def poll_query_until(self, dbname, query):
283+
"""Runs a query once a second until it returs True"""
284+
max_attemps = 60
285+
attemps = 0
286+
287+
while attemps < max_attemps:
288+
ret = self.safe_psql(dbname, query)
289+
290+
# TODO: fix psql so that it returns result without newline
291+
if ret == "t\n":
292+
return
293+
294+
time.sleep(1)
295+
attemps += 1
296+
raise QueryException("Timeout while waiting for query to return True")
297+
257298
def execute(self, dbname, query):
258299
"""Executes the query and returns all rows"""
259300
connection = pglib.connect(
@@ -327,3 +368,8 @@ def clean_all():
327368
for node in registered_nodes:
328369
node.cleanup()
329370
registered_nodes = []
371+
372+
def stop_all():
373+
global registered_nodes
374+
for node in registered_nodes:
375+
node.stop()

testgres/tests/test_simple.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import unittest
2-
from testgres import get_new_node, clean_all
2+
import time
3+
from testgres import get_new_node, clean_all, stop_all
34

45
class SimpleTest(unittest.TestCase):
56

67
def teardown(self):
78
# clean_all()
8-
pass
9+
stop_all()
910

1011
@unittest.skip("demo")
1112
def test_start_stop(self):
@@ -17,7 +18,7 @@ def test_start_stop(self):
1718
self.assertEqual(res[0][0], 1)
1819
node.stop()
1920

20-
def test_backup(self):
21+
def test_backup_and_replication(self):
2122
node = get_new_node('test')
2223
replica = get_new_node('repl')
2324

@@ -27,13 +28,28 @@ def test_backup(self):
2728
node.psql('postgres', 'insert into abc values (1, 2)')
2829
node.backup('my_backup')
2930

30-
replica.init_from_backup(node, 'my_backup')
31+
replica.init_from_backup(node, 'my_backup', has_streaming=True)
3132
replica.start()
3233
res = replica.execute('postgres', 'select * from abc')
3334
self.assertEqual(len(res), 1)
3435
self.assertEqual(res[0], (1, 2))
3536

37+
# Insert into master node
38+
node.psql('postgres', 'insert into abc values (3, 4)')
39+
# Wait until data syncronizes
40+
node.poll_query_until(
41+
'postgres',
42+
'SELECT pg_current_xlog_location() <= write_location '
43+
'FROM pg_stat_replication WHERE application_name = \'%s\''
44+
% replica.name)
45+
# time.sleep(0.5)
46+
# Check that this record was exported to replica
47+
res = replica.execute('postgres', 'select * from abc')
48+
self.assertEqual(len(res), 2)
49+
self.assertEqual(res[1], (3, 4))
50+
3651
node.stop()
52+
replica.stop()
3753

3854
if __name__ == '__main__':
3955
unittest.main()

0 commit comments

Comments
 (0)