Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4c3e877

Browse files
zilderildus
authored andcommitted
Synchronous standbys (#46)
Add set_synchronous_standbys() method
1 parent 95d37e9 commit 4c3e877

File tree

5 files changed

+148
-1
lines changed

5 files changed

+148
-1
lines changed

docs/source/testgres.rst

+8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ testgres.node
6161
.. autoclass:: testgres.node.ProcessProxy
6262
:members:
6363

64+
testgres.standby
65+
----------------
66+
67+
.. automodule:: testgres.standby
68+
:members:
69+
:undoc-members:
70+
:show-inheritance:
71+
6472
testgres.pubsub
6573
---------------
6674

testgres/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@
2626
get_bin_path, \
2727
get_pg_config, \
2828
get_pg_version
29+
30+
from .standby import \
31+
First, \
32+
Any

testgres/node.py

+43-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import subprocess
77
import time
88

9+
from collections import Iterable
910
from shutil import rmtree
1011
from six import raise_from, iteritems, text_type
1112
from tempfile import mkstemp, mkdtemp
@@ -64,6 +65,8 @@
6465

6566
from .pubsub import Publication, Subscription
6667

68+
from .standby import First
69+
6770
from .utils import \
6871
PgVer, \
6972
eprint, \
@@ -699,7 +702,7 @@ def restart(self, params=[]):
699702

700703
def reload(self, params=[]):
701704
"""
702-
Reload config files using pg_ctl.
705+
Asynchronously reload config files using pg_ctl.
703706
704707
Args:
705708
params: additional arguments for pg_ctl.
@@ -1117,6 +1120,45 @@ def replicate(self, name=None, slot=None, **kwargs):
11171120
with clean_on_error(self.backup(**kwargs)) as backup:
11181121
return backup.spawn_replica(name=name, destroy=True, slot=slot)
11191122

1123+
def set_synchronous_standbys(self, standbys):
1124+
"""
1125+
Set standby synchronization options. This corresponds to
1126+
`synchronous_standby_names <https://www.postgresql.org/docs/current/static/runtime-config-replication.html#GUC-SYNCHRONOUS-STANDBY-NAMES>`_
1127+
option. Note that :meth:`~.PostgresNode.reload` or
1128+
:meth:`~.PostgresNode.restart` is needed for changes to take place.
1129+
1130+
Args:
1131+
standbys: either :class:`.First` or :class:`.Any` object specifying
1132+
sychronization parameters or just a plain list of
1133+
:class:`.PostgresNode`s replicas which would be equivalent
1134+
to passing ``First(1, <list>)``. For PostgreSQL 9.5 and below
1135+
it is only possible to specify a plain list of standbys as
1136+
`FIRST` and `ANY` keywords aren't supported.
1137+
1138+
Example::
1139+
1140+
from testgres import get_new_node, First
1141+
1142+
master = get_new_node().init().start()
1143+
with master.replicate().start() as standby:
1144+
master.append_conf("synchronous_commit = remote_apply")
1145+
master.set_synchronous_standbys(First(1, [standby]))
1146+
master.restart()
1147+
1148+
"""
1149+
if self._pg_version >= '9.6':
1150+
if isinstance(standbys, Iterable):
1151+
standbys = First(1, standbys)
1152+
else:
1153+
if isinstance(standbys, Iterable):
1154+
standbys = u", ".join(
1155+
u"\"{}\"".format(r.name) for r in standbys)
1156+
else:
1157+
raise TestgresException("Feature isn't supported in "
1158+
"Postgres 9.5 and below")
1159+
1160+
self.append_conf("synchronous_standby_names = '{}'".format(standbys))
1161+
11201162
def catchup(self, dbname=None, username=None):
11211163
"""
11221164
Wait until async replica catches up with its master.

testgres/standby.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# coding: utf-8
2+
3+
import six
4+
5+
6+
@six.python_2_unicode_compatible
7+
class First:
8+
"""
9+
Specifies a priority-based synchronous replication and makes transaction
10+
commits wait until their WAL records are replicated to ``num_sync``
11+
synchronous standbys chosen based on their priorities.
12+
13+
Args:
14+
sync_num (int): the number of standbys that transaction need to wait
15+
for replies from
16+
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
17+
nodes
18+
"""
19+
20+
def __init__(self, sync_num, standbys):
21+
self.sync_num = sync_num
22+
self.standbys = standbys
23+
24+
def __str__(self):
25+
return u"{} ({})".format(self.sync_num, u", ".join(
26+
u"\"{}\"".format(r.name) for r in self.standbys))
27+
28+
29+
@six.python_2_unicode_compatible
30+
class Any:
31+
"""
32+
Specifies a quorum-based synchronous replication and makes transaction
33+
commits wait until their WAL records are replicated to at least ``num_sync``
34+
listed standbys. Only available for Postgres 10 and newer.
35+
36+
Args:
37+
sync_num (int): the number of standbys that transaction need to wait
38+
for replies from
39+
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
40+
nodes
41+
"""
42+
43+
def __init__(self, sync_num, standbys):
44+
self.sync_num = sync_num
45+
self.standbys = standbys
46+
47+
def __str__(self):
48+
return u"ANY {} ({})".format(self.sync_num, u", ".join(
49+
u"\"{}\"".format(r.name) for r in self.standbys))

tests/test_simple.py

+44
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
get_pg_config, \
4242
get_pg_version
4343

44+
from testgres import \
45+
First, \
46+
Any
47+
4448
# NOTE: those are ugly imports
4549
from testgres import bound_ports
4650
from testgres.utils import PgVer
@@ -410,6 +414,46 @@ def test_replicate(self):
410414
res = node.execute('select * from test')
411415
self.assertListEqual(res, [])
412416

417+
def test_synchronous_replication(self):
418+
with get_new_node() as master:
419+
old_version = not pg_version_ge('9.6')
420+
421+
master.init(allow_streaming=True).start()
422+
423+
if not old_version:
424+
master.append_conf('synchronous_commit = remote_apply')
425+
426+
# create standby
427+
with master.replicate() as standby1, master.replicate() as standby2:
428+
standby1.start()
429+
standby2.start()
430+
431+
# check formatting
432+
self.assertEqual(
433+
'1 ("{}", "{}")'.format(standby1.name, standby2.name),
434+
str(First(1, (standby1, standby2)))) # yapf: disable
435+
self.assertEqual(
436+
'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name),
437+
str(Any(1, (standby1, standby2)))) # yapf: disable
438+
439+
# set synchronous_standby_names
440+
master.set_synchronous_standbys([standby1, standby2])
441+
master.restart()
442+
443+
# the following part of the test is only applicable to newer
444+
# versions of PostgresQL
445+
if not old_version:
446+
master.safe_psql('create table abc(a int)')
447+
448+
# Create a large transaction that will take some time to apply
449+
# on standby to check that it applies synchronously
450+
# (If set synchronous_commit to 'on' or other lower level then
451+
# standby most likely won't catchup so fast and test will fail)
452+
master.safe_psql(
453+
'insert into abc select generate_series(1, 1000000)')
454+
res = standby1.safe_psql('select count(*) from abc')
455+
self.assertEqual(res, b'1000000\n')
456+
413457
@unittest.skipUnless(pg_version_ge('10'), 'requires 10+')
414458
def test_logical_replication(self):
415459
with get_new_node() as node1, get_new_node() as node2:

0 commit comments

Comments
 (0)