From 0dda2ffe56278d67f11988623d0e9eb83daca73a Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 13 Mar 2018 14:50:06 +0300 Subject: [PATCH 1/5] Replication slots --- testgres/backup.py | 2 +- testgres/node.py | 26 +++++++++++++++++++++++--- tests/test_simple.py | 12 ++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/testgres/backup.py b/testgres/backup.py index 64f021ed..1226d135 100644 --- a/testgres/backup.py +++ b/testgres/backup.py @@ -154,7 +154,7 @@ def spawn_primary(self, name=None, destroy=True): return node - def spawn_replica(self, name=None, destroy=True): + def spawn_replica(self, name=None, destroy=True, slot_name=None): """ Create a replica of the original node from a backup. diff --git a/testgres/node.py b/testgres/node.py index 02cc0f52..8974d493 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -179,7 +179,7 @@ def _assign_master(self, master): # now this node has a master self._master = master - def _create_recovery_conf(self, username): + def _create_recovery_conf(self, username, slot_name=None): """NOTE: this is a private method!""" # fetch master of this node @@ -207,6 +207,9 @@ def _create_recovery_conf(self, username): "standby_mode=on\n" ).format(conninfo) + if slot_name: + line += "primary_slot_name={}".format() + self.append_conf(RECOVERY_CONF_FILE, line) def _maybe_start_logger(self): @@ -856,7 +859,22 @@ def backup(self, **kwargs): return NodeBackup(node=self, **kwargs) - def replicate(self, name=None, **kwargs): + def create_replication_slot(self, slot_name, dbname=None, username=None): + """ + Create a physical replication slot. + + Args: + slot_name: slot name + dbname: database name + username: database user name + """ + query = "select pg_create_physical_replication_slot('{}')".format(slot_name) + + self.execute(query=query, + dbname=dbname or default_dbname(), + username=username or default_username()) + + def replicate(self, name=None, slot_name=None, **kwargs): """ Create a binary replica of this node. @@ -870,7 +888,9 @@ def replicate(self, name=None, **kwargs): backup = self.backup(**kwargs) # transform backup into a replica - return backup.spawn_replica(name=name, destroy=True) + return backup.spawn_replica(name=name, + destroy=True, + slot_name=slot_name) def catchup(self, dbname=None, username=None): """ diff --git a/tests/test_simple.py b/tests/test_simple.py index ab2a5804..fef0ead8 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -382,6 +382,18 @@ def test_replicate(self): res = node.execute('select * from test') self.assertListEqual(res, []) + def test_replication_slots(self): + query_create = 'create table test as select generate_series(1, 2) as val' + + with get_new_node() as node: + node.init(allow_streaming=True).start() + node.create_replication_slot('slot1') + node.execute(query_create) + + with node.replicate(slot_name='slot1').start() as replica: + res = replica.execute('select * from test') + self.assertListEqual(res, [(1, ), (2, )]) + def test_incorrect_catchup(self): with get_new_node() as node: node.init(allow_streaming=True).start() From 3e5dbec70ec20a4756ceb301f6e965e0a2e04e8f Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 13 Mar 2018 16:37:03 +0300 Subject: [PATCH 2/5] Set default max_replication_slots number --- testgres/consts.py | 3 +++ testgres/node.py | 18 +++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/testgres/consts.py b/testgres/consts.py index 15400311..5ca5b747 100644 --- a/testgres/consts.py +++ b/testgres/consts.py @@ -24,3 +24,6 @@ PG_LOG_FILE = "postgresql.log" UTILS_LOG_FILE = "utils.log" BACKUP_LOG_FILE = "backup.log" + +# default replication slots number +REPLICATION_SLOTS = 10 diff --git a/testgres/node.py b/testgres/node.py index 8974d493..11687365 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -32,7 +32,8 @@ RECOVERY_CONF_FILE, \ PG_LOG_FILE, \ UTILS_LOG_FILE, \ - PG_PID_FILE + PG_PID_FILE, \ + REPLICATION_SLOTS from .decorators import \ method_decorator, \ @@ -208,7 +209,7 @@ def _create_recovery_conf(self, username, slot_name=None): ).format(conninfo) if slot_name: - line += "primary_slot_name={}".format() + line += "primary_slot_name={}\n".format() self.append_conf(RECOVERY_CONF_FILE, line) @@ -343,11 +344,14 @@ def get_auth_method(t): conf.write(u"fsync = off\n") # yapf: disable - conf.write(u"log_statement = {}\n" - u"listen_addresses = '{}'\n" - u"port = {}\n".format(log_statement, - self.host, - self.port)) + conf.write( + u"log_statement = {}\n" + u"listen_addresses = '{}'\n" + u"port = {}\n" + u"max_replication_slots = {}\n".format(log_statement, + self.host, + self.port, + REPLICATION_SLOTS)) # replication-related settings if allow_streaming: From bc6c3027820017613f49a4d5522f8f5861fc298d Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 13 Mar 2018 17:01:57 +0300 Subject: [PATCH 3/5] Fix: primary_slot_name never actually was set --- testgres/backup.py | 2 +- testgres/node.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/testgres/backup.py b/testgres/backup.py index 1226d135..6e9aedb0 100644 --- a/testgres/backup.py +++ b/testgres/backup.py @@ -171,7 +171,7 @@ def spawn_replica(self, name=None, destroy=True, slot_name=None): # Assign it a master and a recovery file (private magic) node._assign_master(self.original_node) - node._create_recovery_conf(username=self.username) + node._create_recovery_conf(username=self.username, slot_name=slot_name) return node diff --git a/testgres/node.py b/testgres/node.py index 11687365..d4d01ee0 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -209,7 +209,7 @@ def _create_recovery_conf(self, username, slot_name=None): ).format(conninfo) if slot_name: - line += "primary_slot_name={}\n".format() + line += "primary_slot_name={}\n".format(slot_name) self.append_conf(RECOVERY_CONF_FILE, line) @@ -872,7 +872,9 @@ def create_replication_slot(self, slot_name, dbname=None, username=None): dbname: database name username: database user name """ - query = "select pg_create_physical_replication_slot('{}')".format(slot_name) + query = ( + "select pg_create_physical_replication_slot('{}')" + ).format(slot_name) self.execute(query=query, dbname=dbname or default_dbname(), From e2e5a8d3d2018a6f1329f2567f9877ae801f73da Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Tue, 20 Mar 2018 14:33:04 +0300 Subject: [PATCH 4/5] Make replication slot creation implicit; minor refactoring --- testgres/node.py | 50 ++++++++++++++++++++++---------------------- tests/test_simple.py | 1 - 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index d4d01ee0..15a6f0b4 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -254,6 +254,21 @@ def _collect_special_files(self): return result + def _create_replication_slot(self, slot_name, dbname=None, username=None): + """ + Create a physical replication slot. + + Args: + slot_name: slot name + dbname: database name + username: database user name + """ + query = ( + "select pg_create_physical_replication_slot('{}')" + ).format(slot_name) + + self.execute(query=query, dbname=dbname, username=username) + def init(self, initdb_params=None, **kwargs): """ Perform initdb for this node. @@ -344,14 +359,11 @@ def get_auth_method(t): conf.write(u"fsync = off\n") # yapf: disable - conf.write( - u"log_statement = {}\n" - u"listen_addresses = '{}'\n" - u"port = {}\n" - u"max_replication_slots = {}\n".format(log_statement, - self.host, - self.port, - REPLICATION_SLOTS)) + conf.write(u"log_statement = {}\n" + u"listen_addresses = '{}'\n" + u"port = {}\n".format(log_statement, + self.host, + self.port)) # replication-related settings if allow_streaming: @@ -367,8 +379,10 @@ def get_auth_method(t): wal_keep_segments = 20 # for convenience conf.write(u"hot_standby = on\n" u"max_wal_senders = {}\n" + u"max_replication_slots = {}\n" u"wal_keep_segments = {}\n" u"wal_level = {}\n".format(max_wal_senders, + REPLICATION_SLOTS, wal_keep_segments, wal_level)) @@ -863,23 +877,6 @@ def backup(self, **kwargs): return NodeBackup(node=self, **kwargs) - def create_replication_slot(self, slot_name, dbname=None, username=None): - """ - Create a physical replication slot. - - Args: - slot_name: slot name - dbname: database name - username: database user name - """ - query = ( - "select pg_create_physical_replication_slot('{}')" - ).format(slot_name) - - self.execute(query=query, - dbname=dbname or default_dbname(), - username=username or default_username()) - def replicate(self, name=None, slot_name=None, **kwargs): """ Create a binary replica of this node. @@ -891,6 +888,9 @@ def replicate(self, name=None, slot_name=None, **kwargs): base_dir: the base directory for data files and logs """ + if slot_name: + self._create_replication_slot(slot_name, **kwargs) + backup = self.backup(**kwargs) # transform backup into a replica diff --git a/tests/test_simple.py b/tests/test_simple.py index fef0ead8..66b08a02 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -387,7 +387,6 @@ def test_replication_slots(self): with get_new_node() as node: node.init(allow_streaming=True).start() - node.create_replication_slot('slot1') node.execute(query_create) with node.replicate(slot_name='slot1').start() as replica: From 59dbe426d75883cd09d3be842419826b317204c4 Mon Sep 17 00:00:00 2001 From: Ildar Musin Date: Wed, 21 Mar 2018 16:49:15 +0300 Subject: [PATCH 5/5] When creating new physical replication slot check that it doesn't already exist --- testgres/node.py | 10 +++++++++- tests/test_simple.py | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 15a6f0b4..72a0707e 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -49,7 +49,8 @@ ExecUtilException, \ QueryException, \ StartNodeException, \ - TimeoutException + TimeoutException, \ + TestgresException from .logger import TestgresLogger @@ -263,6 +264,13 @@ def _create_replication_slot(self, slot_name, dbname=None, username=None): dbname: database name username: database user name """ + rs = self.execute("select exists (select * from pg_replication_slots " + "where slot_name = '{}')".format(slot_name), + dbname=dbname, username=username) + + if rs[0][0]: + raise TestgresException("Slot '{}' already exists".format(slot_name)) + query = ( "select pg_create_physical_replication_slot('{}')" ).format(slot_name) diff --git a/tests/test_simple.py b/tests/test_simple.py index 66b08a02..8f2c57d2 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -21,7 +21,8 @@ BackupException, \ QueryException, \ CatchUpException, \ - TimeoutException + TimeoutException, \ + TestgresException from testgres import \ TestgresConfig, \ @@ -393,6 +394,10 @@ def test_replication_slots(self): res = replica.execute('select * from test') self.assertListEqual(res, [(1, ), (2, )]) + # cannot create new slot with the same name + with self.assertRaises(TestgresException): + node._create_replication_slot('slot1') + def test_incorrect_catchup(self): with get_new_node() as node: node.init(allow_streaming=True).start()