From ca5b5465a07e45f91f6258bfcc73a4dd06ae9e8c Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Fri, 16 Mar 2018 18:47:25 +0300 Subject: [PATCH 01/12] Logical replication --- testgres/node.py | 58 +++++++++++++++- testgres/pubsub.py | 161 +++++++++++++++++++++++++++++++++++++++++++ tests/test_simple.py | 66 ++++++++++++++++++ 3 files changed, 282 insertions(+), 3 deletions(-) create mode 100644 testgres/pubsub.py diff --git a/testgres/node.py b/testgres/node.py index 02cc0f52..c187e469 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -48,10 +48,13 @@ ExecUtilException, \ QueryException, \ StartNodeException, \ - TimeoutException + TimeoutException, \ + InitNodeException from .logger import TestgresLogger +from .pubsub import Publication, Subscription + from .utils import \ eprint, \ get_bin_path, \ @@ -278,6 +281,7 @@ def default_conf(self, fsync=False, unix_sockets=True, allow_streaming=True, + allow_logical=False, log_statement='all'): """ Apply default settings to this node. @@ -286,6 +290,7 @@ def default_conf(self, fsync: should this node use fsync to keep data safe? unix_sockets: should we enable UNIX sockets? allow_streaming: should this node add a hba entry for replication? + allow_logical: can this node be used as a logical replication publisher? log_statement: one of ('all', 'off', 'mod', 'ddl'). Returns: @@ -365,6 +370,12 @@ def get_auth_method(t): wal_keep_segments, wal_level)) + if allow_logical: + if not pg_version_ge('10'): + raise InitNodeException("Logical replication is only " + "available for Postgres 10 and newer") + conf.write(u"wal_level = logical\n") + # disable UNIX sockets if asked to if not unix_sockets: conf.write(u"unix_socket_directories = ''\n") @@ -751,7 +762,8 @@ def poll_query_until(self, expected=True, commit=True, raise_programming_error=True, - raise_internal_error=True): + raise_internal_error=True, + zero_rows_is_ok=False): """ Run a query once per second until it returns 'expected'. Query should return a single value (1 row, 1 column). @@ -788,7 +800,12 @@ def poll_query_until(self, raise QueryException('Query returned None', query) if len(res) == 0: - raise QueryException('Query returned 0 rows', query) + if zero_rows_is_ok: + time.sleep(sleep_time) + attempts += 1 + continue + else: + raise QueryException('Query returned 0 rows', query) if len(res[0]) == 0: raise QueryException('Query returned 0 columns', query) @@ -902,6 +919,41 @@ def catchup(self, dbname=None, username=None): except Exception as e: raise_from(CatchUpException("Failed to catch up", poll_lsn), e) + def publish(self, + pubname, + tables=None, + dbname=None, + username=None): + """ + Create publication for logical replication + + Args: + pubname: publication name + tables: tables names list + dbname: database name where objects or interest are located + username: replication username + """ + return Publication(pubname, self, tables, dbname, username) + + def subscribe(self, + publication, + subname, + dbname=None, + username=None, + **kwargs): + """ + Create subscription for logical replication + + Args: + subname: subscription name + publication: publication object obtained from publish() + + """ + return Subscription(subname, self, publication, + dbname=dbname, + username=username, + **kwargs) + def pgbench(self, dbname=None, username=None, diff --git a/testgres/pubsub.py b/testgres/pubsub.py new file mode 100644 index 00000000..35d8f67a --- /dev/null +++ b/testgres/pubsub.py @@ -0,0 +1,161 @@ +# coding: utf-8 + +from six import raise_from + +from .defaults import default_dbname, default_username +from .exceptions import CatchUpException +from .utils import pg_version_ge + + +class Publication(object): + def __init__(self, pubname, node, tables=None, dbname=None, username=None): + """ + Constructor + + Args: + pubname: publication name + node: publisher's node + tables: tables list or None for all tables + dbname: database name used to connect and perform subscription + username: username used to connect to the database + """ + self.name = pubname + self.node = node + self.dbname = dbname or default_dbname() + self.username = username or default_username() + + # create publication in database + t = 'table ' + ', '.join(tables) if tables else 'all tables' + query = "create publication {} for {}" + node.safe_psql(query.format(pubname, t), + dbname=dbname, + username=username) + + def close(self, dbname=None, username=None): + """ + Drop publication + """ + self.node.safe_psql("drop publication {}".format(self.name), + dbname=dbname, username=username) + + def add_tables(self, tables, dbname=None, username=None): + """ + Add tables + + Args: + tables: a list of tables to add to the publication + """ + if not tables: + raise ValueError("Tables list is empty") + + query = "alter publication {} add table {}" + self.node.safe_psql(query.format(self.name, ', '.join(tables)), + dbname=dbname or self.dbname, + username=username or self.username) + + +class Subscription(object): + def __init__(self, + subname, + node, + publication, + dbname=None, + username=None, + **kwargs): + """ + Constructor + + Args: + subname: subscription name + node: subscriber's node + publication: Publication object we are subscribing to + dbname: database name used to connect and perform subscription + username: username used to connect to the database + **kwargs: subscription parameters (see CREATE SUBSCRIPTION + in PostgreSQL documentation for more information) + """ + self.name = subname + self.node = node + self.pub = publication + + # connection info + conninfo = ( + u"dbname={} user={} host={} port={}" + ).format(self.pub.dbname, + self.pub.username, + self.pub.node.host, + self.pub.node.port) + + query = ( + "create subscription {} connection '{}' publication {}" + ).format(subname, conninfo, self.pub.name) + + # additional parameters + if kwargs: + params = ','.join('{}={}'.format(k, v) for k, v in kwargs.iteritems()) + query += " with ({})".format(params) + + node.safe_psql(query, dbname=dbname, username=username) + + def disable(self, dbname=None, username=None): + """ + Disables the running subscription. + """ + query = "alter subscription {} disable" + self.node.safe_psql(query.format(self.name), + dbname=None, + username=None) + + def enable(self, dbname=None, username=None): + """ + Enables the previously disabled subscription. + """ + query = "alter subscription {} enable" + self.node.safe_psql(query.format(self.name), + dbname=None, + username=None) + + def refresh(self, copy_data=True, dbname=None, username=None): + """ + Disables the running subscription. + """ + query = "alter subscription {} refresh publication with (copy_data={})" + self.node.safe_psql(query.format(self.name, copy_data), + dbname=dbname, + username=username) + + def close(self, dbname=None, username=None): + """ + Drops subscription + """ + self.node.safe_psql("drop subscription {}".format(self.name), + dbname=dbname, username=username) + + def catchup(self, username=None): + """ + Wait until subscription catches up with publication. + + Args: + username: remote node's user name + """ + if pg_version_ge('10'): + query = ( + "select pg_current_wal_lsn() - replay_lsn = 0 " + "from pg_stat_replication where application_name = '{}'" + ).format(self.name) + else: + query = ( + "select pg_current_xlog_location() - replay_location = 0 " + "from pg_stat_replication where application_name = '{}'" + ).format(self.name) + + try: + # wait until this LSN reaches subscriber + self.pub.node.poll_query_until( + query=query, + dbname=self.pub.dbname, + username=username or self.pub.username, + max_attempts=60, + zero_rows_is_ok=True) # statistics may have not updated yet + except Exception as e: + raise_from(CatchUpException("Failed to catch up", query), e) diff --git a/tests/test_simple.py b/tests/test_simple.py index ab2a5804..37b10e6d 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -382,6 +382,72 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + def test_logical_replication(self): + with get_new_node() as node1, get_new_node() as node2: + node1.init(allow_logical=True) + node1.start() + node2.init().start() + + create_table = 'create table test (a int, b int)' + node1.safe_psql(create_table) + node2.safe_psql(create_table) + + # create publication / create subscription + pub = node1.publish('mypub') + sub = node2.subscribe(pub, 'mysub') + + node1.safe_psql('insert into test values (1, 1), (2, 2)') + + # wait until changes apply on subscriber and check them + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2)]) + + # disable and put some new data + sub.disable() + node1.safe_psql('insert into test values (3, 3)') + + # enable and ensure that data successfully transfered + sub.enable() + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)]) + + # Add new tables. Since we added "all tables" to publication + # (default behaviour of publish() method) we don't need + # to explicitely perform pub.add_table() + create_table = 'create table test2 (c char)' + node1.safe_psql(create_table) + node2.safe_psql(create_table) + sub.refresh() + + # put new data + node1.safe_psql('insert into test2 values (\'a\'), (\'b\')') + sub.catchup() + res = node2.execute('select * from test2') + self.assertListEqual(res, [('a',), ('b',)]) + + # drop subscription + sub.close() + pub.close() + + # create new publication and subscription for specific table + # (ommitting copying data as it's already done) + pub = node1.publish('newpub', tables=['test']) + sub = node2.subscribe(pub, 'newsub', copy_data=False) + + node1.safe_psql('insert into test values (4, 4)') + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)]) + + # explicitely add table + pub.add_tables(['test2']) + node1.safe_psql('insert into test2 values (\'c\')') + sub.catchup() + res = node2.execute('select * from test2') + self.assertListEqual(res, [('a',), ('b',)]) + def test_incorrect_catchup(self): with get_new_node() as node: node.init(allow_streaming=True).start() From bb01c7d787f960691539c44ceee73f9d326f2b40 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Fri, 16 Mar 2018 19:05:38 +0300 Subject: [PATCH 02/12] Skip logical replication test on PostgreSQL versions below 10 --- tests/test_simple.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_simple.py b/tests/test_simple.py index 37b10e6d..3489fcf5 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -382,6 +382,7 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_replication(self): with get_new_node() as node1, get_new_node() as node2: node1.init(allow_logical=True) From 782484b971ed5ab94e03dd0ba4bc3224a0636d5d Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Fri, 16 Mar 2018 20:28:24 +0300 Subject: [PATCH 03/12] Fix logical replication for python3 --- testgres/pubsub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testgres/pubsub.py b/testgres/pubsub.py index 35d8f67a..dde2e694 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -1,6 +1,6 @@ # coding: utf-8 -from six import raise_from +from six import raise_from, iteritems from .defaults import default_dbname, default_username from .exceptions import CatchUpException @@ -92,7 +92,7 @@ def __init__(self, # additional parameters if kwargs: - params = ','.join('{}={}'.format(k, v) for k, v in kwargs.iteritems()) + params = ','.join('{}={}'.format(k, v) for k, v in iteritems(kwargs)) query += " with ({})".format(params) node.safe_psql(query, dbname=dbname, username=username) From b1cba73cf385896e7307998f230ec9b4a43f8aa4 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 22 Mar 2018 15:25:53 +0300 Subject: [PATCH 04/12] Some minor refactoring of logical replication --- testgres/node.py | 23 +++++++++-------------- testgres/pubsub.py | 17 +++++------------ tests/test_simple.py | 9 ++++----- 3 files changed, 18 insertions(+), 31 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index c53393c2..4288e863 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -847,8 +847,7 @@ def poll_query_until(self, expected=True, commit=True, raise_programming_error=True, - raise_internal_error=True, - zero_rows_is_ok=False): + raise_internal_error=True): """ Run a query once per second until it returns 'expected'. Query should return a single value (1 row, 1 column). @@ -884,18 +883,14 @@ def poll_query_until(self, if res is None: raise QueryException('Query returned None', query) - if len(res) == 0: - if zero_rows_is_ok: - time.sleep(sleep_time) - attempts += 1 - continue - else: - raise QueryException('Query returned 0 rows', query) - - if len(res[0]) == 0: - raise QueryException('Query returned 0 columns', query) - - if res[0][0] == expected: + # result set is not empty + if len(res): + if len(res[0]) == 0: + raise QueryException('Query returned 0 columns', query) + if res[0][0] == expected: + return # done + # empty result set is considered as None + elif expected is None: return # done except ProgrammingError as e: diff --git a/testgres/pubsub.py b/testgres/pubsub.py index dde2e694..3717e495 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -138,16 +138,10 @@ def catchup(self, username=None): Args: username: remote node's user name """ - if pg_version_ge('10'): - query = ( - "select pg_current_wal_lsn() - replay_lsn = 0 " - "from pg_stat_replication where application_name = '{}'" - ).format(self.name) - else: - query = ( - "select pg_current_xlog_location() - replay_location = 0 " - "from pg_stat_replication where application_name = '{}'" - ).format(self.name) + query = ( + "select pg_current_wal_lsn() - replay_lsn = 0 " + "from pg_stat_replication where application_name = '{}'" + ).format(self.name) try: # wait until this LSN reaches subscriber @@ -155,7 +149,6 @@ def catchup(self, username=None): query=query, dbname=self.pub.dbname, username=username or self.pub.username, - max_attempts=60, - zero_rows_is_ok=True) # statistics may have not updated yet + max_attempts=60) except Exception as e: raise_from(CatchUpException("Failed to catch up", query), e) diff --git a/tests/test_simple.py b/tests/test_simple.py index 62c8668b..1b962b1b 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -495,11 +495,6 @@ def test_poll_query_until(self): self.assertTrue(end_time - start_time >= 5) - # check 0 rows - with self.assertRaises(QueryException): - node.poll_query_until( - query='select * from pg_class where true = false') - # check 0 columns with self.assertRaises(QueryException): node.poll_query_until(query='select from pg_class limit 1') @@ -512,6 +507,10 @@ def test_poll_query_until(self): node.poll_query_until( query='create table def()', expected=None) # returns nothing + # check 0 rows equivalent to expected=None + node.poll_query_until( + query='select * from pg_class where true = false', expected=None) + # check arbitrary expected value, fail with self.assertRaises(TimeoutException): node.poll_query_until( From 954879aaf155d54bf50e15ef5bf3ef9d6d23e539 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 22 Mar 2018 16:19:09 +0300 Subject: [PATCH 05/12] Added options_string() func --- testgres/node.py | 19 ++++++++++--------- testgres/pubsub.py | 21 ++++++++++----------- testgres/utils.py | 5 +++++ 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 4288e863..d838bf79 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -63,7 +63,8 @@ pg_version_ge, \ reserve_port, \ release_port, \ - execute_utility + execute_utility, \ + options_string from .backup import NodeBackup @@ -288,25 +289,25 @@ def _create_recovery_conf(self, username): assert master is not None # yapf: disable - conninfo = ( - u"application_name={} " - u"port={} " - u"user={} " - ).format(self.name, master.port, username) + conninfo = { + "application_name": self.name, + "port": master.port, + "user": username + } # host is tricky try: import ipaddress ipaddress.ip_address(master.host) - conninfo += u"hostaddr={}".format(master.host) + conninfo["hostaddr"] = master.host except ValueError: - conninfo += u"host={}".format(master.host) + conninfo["host"] = master.host # yapf: disable line = ( "primary_conninfo='{}'\n" "standby_mode=on\n" - ).format(conninfo) + ).format(options_string(**conninfo)) self.append_conf(RECOVERY_CONF_FILE, line) diff --git a/testgres/pubsub.py b/testgres/pubsub.py index 3717e495..caef3727 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -1,10 +1,10 @@ # coding: utf-8 -from six import raise_from, iteritems +from six import raise_from from .defaults import default_dbname, default_username from .exceptions import CatchUpException -from .utils import pg_version_ge +from .utils import options_string class Publication(object): @@ -79,21 +79,20 @@ def __init__(self, self.pub = publication # connection info - conninfo = ( - u"dbname={} user={} host={} port={}" - ).format(self.pub.dbname, - self.pub.username, - self.pub.node.host, - self.pub.node.port) + conninfo = { + "dbname": self.pub.dbname, + "user": self.pub.username, + "host": self.pub.node.host, + "port": self.pub.node.port + } query = ( "create subscription {} connection '{}' publication {}" - ).format(subname, conninfo, self.pub.name) + ).format(subname, options_string(**conninfo), self.pub.name) # additional parameters if kwargs: - params = ','.join('{}={}'.format(k, v) for k, v in iteritems(kwargs)) - query += " with ({})".format(params) + query += " with ({})".format(options_string(**kwargs)) node.safe_psql(query, dbname=dbname, username=username) diff --git a/testgres/utils.py b/testgres/utils.py index a4108b23..2e4b5548 100644 --- a/testgres/utils.py +++ b/testgres/utils.py @@ -10,6 +10,7 @@ import sys from distutils.version import LooseVersion +from six import iteritems from .config import testgres_config from .exceptions import ExecUtilException @@ -218,3 +219,7 @@ def file_tail(f, num_lines): def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) + + +def options_string(separator=u" ", **kwargs): + return separator.join(u"{}={}".format(k, v) for k, v in iteritems(kwargs)) From f4e0bd0e1c8e0f84909ceb5d5c0de0d5938ba77a Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 22 Mar 2018 17:45:21 +0300 Subject: [PATCH 06/12] Minor refactoring --- testgres/node.py | 13 ++++++------- testgres/pubsub.py | 20 ++++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 3556ede4..71e360af 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -1032,7 +1032,7 @@ def catchup(self, dbname=None, username=None): raise_from(CatchUpException("Failed to catch up", poll_lsn), e) def publish(self, - pubname, + name, tables=None, dbname=None, username=None): @@ -1045,11 +1045,12 @@ def publish(self, dbname: database name where objects or interest are located username: replication username """ - return Publication(pubname, self, tables, dbname, username) + return Publication(name=name, node=self, tables=tables, dbname=dbname, + username=username) def subscribe(self, publication, - subname, + name, dbname=None, username=None, **kwargs): @@ -1061,10 +1062,8 @@ def subscribe(self, publication: publication object obtained from publish() """ - return Subscription(subname, self, publication, - dbname=dbname, - username=username, - **kwargs) + return Subscription(name=name, node=self, publication=publication, + dbname=dbname, username=username, **kwargs) def pgbench(self, dbname=None, diff --git a/testgres/pubsub.py b/testgres/pubsub.py index 4b3db9cb..4cc77322 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -8,26 +8,26 @@ class Publication(object): - def __init__(self, pubname, node, tables=None, dbname=None, username=None): + def __init__(self, name, node, tables=None, dbname=None, username=None): """ Constructor Args: - pubname: publication name + name: publication name node: publisher's node tables: tables list or None for all tables dbname: database name used to connect and perform subscription username: username used to connect to the database """ - self.name = pubname + self.name = name self.node = node self.dbname = dbname or default_dbname() self.username = username or default_username() # create publication in database - t = 'table ' + ', '.join(tables) if tables else 'all tables' + t = "table " + ", ".join(tables) if tables else "all tables" query = "create publication {} for {}" - node.safe_psql(query.format(pubname, t), + node.safe_psql(query.format(name, t), dbname=dbname, username=username) @@ -49,14 +49,14 @@ def add_tables(self, tables, dbname=None, username=None): raise ValueError("Tables list is empty") query = "alter publication {} add table {}" - self.node.safe_psql(query.format(self.name, ', '.join(tables)), + self.node.safe_psql(query.format(self.name, ", ".join(tables)), dbname=dbname or self.dbname, username=username or self.username) class Subscription(object): def __init__(self, - subname, + name, node, publication, dbname=None, @@ -66,7 +66,7 @@ def __init__(self, Constructor Args: - subname: subscription name + name: subscription name node: subscriber's node publication: Publication object we are subscribing to dbname: database name used to connect and perform subscription @@ -74,7 +74,7 @@ def __init__(self, **kwargs: subscription parameters (see CREATE SUBSCRIPTION in PostgreSQL documentation for more information) """ - self.name = subname + self.name = name self.node = node self.pub = publication @@ -88,7 +88,7 @@ def __init__(self, query = ( "create subscription {} connection '{}' publication {}" - ).format(subname, options_string(**conninfo), self.pub.name) + ).format(name, options_string(**conninfo), self.pub.name) # additional parameters if kwargs: From f48623be51dc3c22ab8cca09f764ff5b9a0587c4 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 22 Mar 2018 18:17:05 +0300 Subject: [PATCH 07/12] Add failing logical replication test for 9.6 --- tests/test_simple.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_simple.py b/tests/test_simple.py index e8ac15e7..2de51df3 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -456,6 +456,12 @@ def test_logical_replication(self): res = node2.execute('select * from test2') self.assertListEqual(res, [('a',), ('b',)]) + @unittest.skipIf(pg_version_ge('10'), 'requires <10') + def test_logical_replication_fail(self): + with get_new_node() as node: + with self.assertRaises(InitNodeException): + node.init(allow_logical=True) + def test_replication_slots(self): with get_new_node() as node: node.init(allow_streaming=True).start() From 138c6ccffccfe5bdf4381a24583463e2416c38c2 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 22 Mar 2018 18:44:27 +0300 Subject: [PATCH 08/12] Added test for Publication.add_tables() --- testgres/pubsub.py | 46 ++++++++++++++++++++++---------------------- tests/test_simple.py | 4 +++- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/testgres/pubsub.py b/testgres/pubsub.py index 4cc77322..f0bedcb8 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -27,16 +27,16 @@ def __init__(self, name, node, tables=None, dbname=None, username=None): # create publication in database t = "table " + ", ".join(tables) if tables else "all tables" query = "create publication {} for {}" - node.safe_psql(query.format(name, t), - dbname=dbname, - username=username) + node.safe_psql(query.format(name, t), dbname=dbname, username=username) def drop(self, dbname=None, username=None): """ Drop publication """ - self.node.safe_psql("drop publication {}".format(self.name), - dbname=dbname, username=username) + self.node.safe_psql( + "drop publication {}".format(self.name), + dbname=dbname, + username=username) def add_tables(self, tables, dbname=None, username=None): """ @@ -49,9 +49,10 @@ def add_tables(self, tables, dbname=None, username=None): raise ValueError("Tables list is empty") query = "alter publication {} add table {}" - self.node.safe_psql(query.format(self.name, ", ".join(tables)), - dbname=dbname or self.dbname, - username=username or self.username) + self.node.safe_psql( + query.format(self.name, ", ".join(tables)), + dbname=dbname or self.dbname, + username=username or self.username) class Subscription(object): @@ -87,8 +88,8 @@ def __init__(self, } query = ( - "create subscription {} connection '{}' publication {}" - ).format(name, options_string(**conninfo), self.pub.name) + "create subscription {} connection '{}' publication {}").format( + name, options_string(**conninfo), self.pub.name) # additional parameters if kwargs: @@ -101,34 +102,33 @@ def disable(self, dbname=None, username=None): Disables the running subscription. """ query = "alter subscription {} disable" - self.node.safe_psql(query.format(self.name), - dbname=None, - username=None) + self.node.safe_psql(query.format(self.name), dbname=None, username=None) def enable(self, dbname=None, username=None): """ Enables the previously disabled subscription. """ query = "alter subscription {} enable" - self.node.safe_psql(query.format(self.name), - dbname=None, - username=None) + self.node.safe_psql(query.format(self.name), dbname=None, username=None) def refresh(self, copy_data=True, dbname=None, username=None): """ Disables the running subscription. """ query = "alter subscription {} refresh publication with (copy_data={})" - self.node.safe_psql(query.format(self.name, copy_data), - dbname=dbname, - username=username) + self.node.safe_psql( + query.format(self.name, copy_data), + dbname=dbname, + username=username) def drop(self, dbname=None, username=None): """ Drops subscription """ - self.node.safe_psql("drop subscription {}".format(self.name), - dbname=dbname, username=username) + self.node.safe_psql( + "drop subscription {}".format(self.name), + dbname=dbname, + username=username) def catchup(self, username=None): """ @@ -139,8 +139,8 @@ def catchup(self, username=None): """ query = ( "select pg_current_wal_lsn() - replay_lsn = 0 " - "from pg_stat_replication where application_name = '{}'" - ).format(self.name) + "from pg_stat_replication where application_name = '{}'").format( + self.name) try: # wait until this LSN reaches subscriber diff --git a/tests/test_simple.py b/tests/test_simple.py index 2de51df3..b5735a6f 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -423,7 +423,7 @@ def test_logical_replication(self): # Add new tables. Since we added "all tables" to publication # (default behaviour of publish() method) we don't need - # to explicitely perform pub.add_table() + # to explicitely perform pub.add_tables() create_table = 'create table test2 (c char)' node1.safe_psql(create_table) node2.safe_psql(create_table) @@ -450,6 +450,8 @@ def test_logical_replication(self): self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)]) # explicitely add table + with self.assertRaises(ValueError): + pub.add_tables([]) # fail pub.add_tables(['test2']) node1.safe_psql('insert into test2 values (\'c\')') sub.catchup() From bc1002f7ed82610900dcbdcd8404e8a6d842f01a Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 27 Mar 2018 15:52:13 +0300 Subject: [PATCH 09/12] Additional subscription catchup test --- tests/test_simple.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/test_simple.py b/tests/test_simple.py index 3ec69b32..bf80b888 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -457,6 +457,29 @@ def test_logical_replication(self): res = node2.execute('select * from test2') self.assertListEqual(res, [('a',), ('b',)]) + @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') + def test_logical_catchup(self): + """ Runs catchup for 100 times to be sure that it is consistent """ + with get_new_node() as node1, get_new_node() as node2: + node1.init(allow_logical=True) + node1.start() + node2.init().start() + + create_table = 'create table test (key int primary key, val int); ' + node1.safe_psql(create_table) + node1.safe_psql('alter table test replica identity default') + node2.safe_psql(create_table) + + # create publication / create subscription + sub = node2.subscribe(node1.publish('mypub'), 'mysub') + + for i in range(0, 100): + node1.execute('insert into test values ({0}, {0})'.format(i)) + sub.catchup() + res = node2.execute('select * from test') + self.assertListEqual(res, [(i, i,)]) + node1.execute('delete from test') + @unittest.skipIf(pg_version_ge('10'), 'requires <10') def test_logical_replication_fail(self): with get_new_node() as node: From 08ed6ef9719737daa7a797d741afbfe3c172d477 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 27 Mar 2018 16:34:08 +0300 Subject: [PATCH 10/12] Some refactoring of logical replication API and formatting --- testgres/node.py | 34 ++++++++++++++++------------------ testgres/pubsub.py | 11 ++++++----- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 8994f605..d50b4c06 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -501,8 +501,9 @@ def get_auth_method(t): if allow_logical: if not pg_version_ge('10'): - raise InitNodeException("Logical replication is only " - "available for Postgres 10 and newer") + raise InitNodeException( + "Logical replication is only available for Postgres 10 " + "and newer") conf.write(u"wal_level = logical\n") # disable UNIX sockets if asked to @@ -1022,11 +1023,7 @@ def catchup(self, dbname=None, username=None): except Exception as e: raise_from(CatchUpException("Failed to catch up", poll_lsn), e) - def publish(self, - name, - tables=None, - dbname=None, - username=None): + def publish(self, name, **kwargs): """ Create publication for logical replication @@ -1036,25 +1033,26 @@ def publish(self, dbname: database name where objects or interest are located username: replication username """ - return Publication(name=name, node=self, tables=tables, dbname=dbname, - username=username) + return Publication(name=name, node=self, **kwargs) - def subscribe(self, - publication, - name, - dbname=None, - username=None, - **kwargs): + def subscribe(self, publication, name, dbname=None, username=None, + **params): """ Create subscription for logical replication Args: - subname: subscription name + name: subscription name publication: publication object obtained from publish() - + dbname: database name + username: replication username + params: subscription parameters (see documentation on `CREATE SUBSCRIPTION + `_ + for details) """ + # yapf: disable return Subscription(name=name, node=self, publication=publication, - dbname=dbname, username=username, **kwargs) + dbname=dbname, username=username, **params) + # yapf: enable def pgbench(self, dbname=None, diff --git a/testgres/pubsub.py b/testgres/pubsub.py index e19fc4a1..b22b0068 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -106,7 +106,7 @@ def __init__(self, publication, dbname=None, username=None, - **kwargs): + **params): """ Constructor. Use :meth:`.PostgresNode.subscribe()` instead of direct constructing subscription objects. @@ -118,8 +118,9 @@ def __init__(self, (see :meth:`.PostgresNode.publish()`) dbname: database name used to connect and perform subscription username: username used to connect to the database - **kwargs: subscription parameters (see ``CREATE SUBSCRIPTION`` - in PostgreSQL documentation for more information) + params: subscription parameters (see documentation on `CREATE SUBSCRIPTION + `_ + for details) """ self.name = name self.node = node @@ -138,8 +139,8 @@ def __init__(self, name, options_string(**conninfo), self.pub.name) # additional parameters - if kwargs: - query += " with ({})".format(options_string(**kwargs)) + if params: + query += " with ({})".format(options_string(**params)) node.safe_psql(query, dbname=dbname, username=username) From 4b279efd54b23838b9db3b89bb433b6378c069e9 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Thu, 31 May 2018 16:49:57 +0300 Subject: [PATCH 11/12] minor refactoring --- testgres/connection.py | 8 ++++++- testgres/consts.py | 3 +++ testgres/node.py | 18 +++++++++----- testgres/pubsub.py | 54 ++++++++++++++++++++++-------------------- tests/test_simple.py | 17 ++++++++----- 5 files changed, 61 insertions(+), 39 deletions(-) diff --git a/testgres/connection.py b/testgres/connection.py index 6447f685..3943a4e2 100644 --- a/testgres/connection.py +++ b/testgres/connection.py @@ -27,7 +27,12 @@ class NodeConnection(object): Transaction wrapper returned by Node """ - def __init__(self, node, dbname=None, username=None, password=None): + def __init__(self, + node, + dbname=None, + username=None, + password=None, + autocommit=False): # Set default arguments dbname = dbname or default_dbname() @@ -42,6 +47,7 @@ def __init__(self, node, dbname=None, username=None, password=None): host=node.host, port=node.port) + self._connection.autocommit = autocommit self._cursor = self.connection.cursor() @property diff --git a/testgres/consts.py b/testgres/consts.py index 123a034c..f7f01d9d 100644 --- a/testgres/consts.py +++ b/testgres/consts.py @@ -29,3 +29,6 @@ MAX_REPLICATION_SLOTS = 10 MAX_WAL_SENDERS = 10 WAL_KEEP_SEGMENTS = 20 + +# logical replication settings +LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS = 60 diff --git a/testgres/node.py b/testgres/node.py index d50b4c06..dcae9adb 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -953,13 +953,11 @@ def execute(self, with self.connect(dbname=dbname, username=username, - password=password) as node_con: # yapf: disable + password=password, + autocommit=commit) as node_con: # yapf: disable res = node_con.execute(query) - if commit: - node_con.commit() - return res def backup(self, **kwargs): @@ -1152,7 +1150,11 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs): return execute_utility(_params, self.utils_log_file) - def connect(self, dbname=None, username=None, password=None): + def connect(self, + dbname=None, + username=None, + password=None, + autocommit=False): """ Connect to a database. @@ -1160,6 +1162,9 @@ def connect(self, dbname=None, username=None, password=None): dbname: database name to connect to. username: database user name. password: user's password. + autocommit: commit each statement automatically. Also it should be + set to `True` for statements requiring to be run outside + a transaction? such as `VACUUM` or `CREATE DATABASE`. Returns: An instance of :class:`.NodeConnection`. @@ -1168,4 +1173,5 @@ def connect(self, dbname=None, username=None, password=None): return NodeConnection(node=self, dbname=dbname, username=username, - password=password) # yapf: disable + password=password, + autocommit=autocommit) # yapf: disable diff --git a/testgres/pubsub.py b/testgres/pubsub.py index b22b0068..e257569c 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -12,7 +12,7 @@ After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()` methods may be used to setup replication. Example: ->>> from .api import get_new_node +>>> from testgres import get_new_node >>> with get_new_node() as nodeA, get_new_node() as nodeB: ... nodeA.init(allow_logical=True).start() ... nodeB.init().start() @@ -44,6 +44,7 @@ from six import raise_from +from .consts import LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS from .defaults import default_dbname, default_username from .exceptions import CatchUpException from .utils import options_string @@ -56,11 +57,11 @@ def __init__(self, name, node, tables=None, dbname=None, username=None): constructing publication objects. Args: - name: publication name - node: publisher's node - tables: tables list or None for all tables - dbname: database name used to connect and perform subscription - username: username used to connect to the database + name: publication name. + node: publisher's node. + tables: tables list or None for all tables. + dbname: database name used to connect and perform subscription. + username: username used to connect to the database. """ self.name = name self.node = node @@ -70,7 +71,7 @@ def __init__(self, name, node, tables=None, dbname=None, username=None): # create publication in database t = "table " + ", ".join(tables) if tables else "all tables" query = "create publication {} for {}" - node.safe_psql(query.format(name, t), dbname=dbname, username=username) + node.execute(query.format(name, t), dbname=dbname, username=username) def drop(self, dbname=None, username=None): """ @@ -87,13 +88,13 @@ def add_tables(self, tables, dbname=None, username=None): created with empty tables list. Args: - tables: a list of tables to be added to the publication + tables: a list of tables to be added to the publication. """ if not tables: raise ValueError("Tables list is empty") query = "alter publication {} add table {}" - self.node.safe_psql( + self.node.execute( query.format(self.name, ", ".join(tables)), dbname=dbname or self.dbname, username=username or self.username) @@ -112,15 +113,15 @@ def __init__(self, constructing subscription objects. Args: - name: subscription name - node: subscriber's node + name: subscription name. + node: subscriber's node. publication: :class:`.Publication` object we are subscribing to - (see :meth:`.PostgresNode.publish()`) - dbname: database name used to connect and perform subscription - username: username used to connect to the database + (see :meth:`.PostgresNode.publish()`). + dbname: database name used to connect and perform subscription. + username: username used to connect to the database. params: subscription parameters (see documentation on `CREATE SUBSCRIPTION `_ - for details) + for details). """ self.name = name self.node = node @@ -142,28 +143,29 @@ def __init__(self, if params: query += " with ({})".format(options_string(**params)) - node.safe_psql(query, dbname=dbname, username=username) + # Note: cannot run 'create subscription' query in transaction mode + node.execute(query, dbname=dbname, username=username) def disable(self, dbname=None, username=None): """ Disables the running subscription. """ query = "alter subscription {} disable" - self.node.safe_psql(query.format(self.name), dbname=None, username=None) + self.node.execute(query.format(self.name), dbname=None, username=None) def enable(self, dbname=None, username=None): """ Enables the previously disabled subscription. """ query = "alter subscription {} enable" - self.node.safe_psql(query.format(self.name), dbname=None, username=None) + self.node.execute(query.format(self.name), dbname=None, username=None) def refresh(self, copy_data=True, dbname=None, username=None): """ Disables the running subscription. """ query = "alter subscription {} refresh publication with (copy_data={})" - self.node.safe_psql( + self.node.execute( query.format(self.name, copy_data), dbname=dbname, username=username) @@ -172,7 +174,7 @@ def drop(self, dbname=None, username=None): """ Drops subscription """ - self.node.safe_psql( + self.node.execute( "drop subscription {}".format(self.name), dbname=dbname, username=username) @@ -182,12 +184,12 @@ def catchup(self, username=None): Wait until subscription catches up with publication. Args: - username: remote node's user name + username: remote node's user name. """ - query = ( - "select pg_current_wal_lsn() - replay_lsn = 0 " - "from pg_stat_replication where application_name = '{}'").format( - self.name) + query = """ + select pg_current_wal_lsn() - replay_lsn = 0 + from pg_catalog.pg_stat_replication where application_name = '{}' + """.format(self.name) try: # wait until this LSN reaches subscriber @@ -195,6 +197,6 @@ def catchup(self, username=None): query=query, dbname=self.pub.dbname, username=username or self.pub.username, - max_attempts=60) + max_attempts=LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS) except Exception as e: raise_from(CatchUpException("Failed to catch up", query), e) diff --git a/tests/test_simple.py b/tests/test_simple.py index bf80b888..c1173267 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -432,7 +432,7 @@ def test_logical_replication(self): node1.safe_psql('insert into test2 values (\'a\'), (\'b\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a',), ('b',)]) + self.assertListEqual(res, [('a', ), ('b', )]) # drop subscription sub.drop() @@ -450,12 +450,12 @@ def test_logical_replication(self): # explicitely add table with self.assertRaises(ValueError): - pub.add_tables([]) # fail + pub.add_tables([]) # fail pub.add_tables(['test2']) node1.safe_psql('insert into test2 values (\'c\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a',), ('b',)]) + self.assertListEqual(res, [('a', ), ('b', )]) @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_catchup(self): @@ -477,7 +477,10 @@ def test_logical_catchup(self): node1.execute('insert into test values ({0}, {0})'.format(i)) sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(i, i,)]) + self.assertListEqual(res, [( + i, + i, + )]) node1.execute('delete from test') @unittest.skipIf(pg_version_ge('10'), 'requires <10') @@ -544,7 +547,8 @@ def test_poll_query_until(self): # check 0 columns with self.assertRaises(QueryException): - node.poll_query_until(query='select from pg_class limit 1') + node.poll_query_until( + query='select from pg_catalog.pg_class limit 1') # check None, fail with self.assertRaises(QueryException): @@ -556,7 +560,8 @@ def test_poll_query_until(self): # check 0 rows equivalent to expected=None node.poll_query_until( - query='select * from pg_class where true = false', expected=None) + query='select * from pg_catalog.pg_class where true = false', + expected=None) # check arbitrary expected value, fail with self.assertRaises(TimeoutException): From 50e02ffe6100e0916fb239fbb55c59684c72fe8a Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Fri, 1 Jun 2018 12:45:07 +0300 Subject: [PATCH 12/12] change safe_psql() call to execute() in pubsub.py --- testgres/api.py | 2 +- testgres/pubsub.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/testgres/api.py b/testgres/api.py index 775d5ef8..e90cf7bd 100644 --- a/testgres/api.py +++ b/testgres/api.py @@ -28,7 +28,7 @@ ... replica.catchup() # wait until changes are visible ... print(replica.execute('postgres', 'select count(*) from test')) PostgresNode(name='...', port=..., base_dir='...') -[(3L,)] +[(3,)] """ from .node import PostgresNode diff --git a/testgres/pubsub.py b/testgres/pubsub.py index f8e5dcfe..bb153913 100644 --- a/testgres/pubsub.py +++ b/testgres/pubsub.py @@ -77,7 +77,7 @@ def drop(self, dbname=None, username=None): """ Drop publication """ - self.node.safe_psql( + self.node.execute( "drop publication {}".format(self.name), dbname=dbname, username=username)