Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0dda2ff

Browse files
committed
Replication slots
1 parent 5bc608e commit 0dda2ff

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

testgres/backup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def spawn_primary(self, name=None, destroy=True):
154154

155155
return node
156156

157-
def spawn_replica(self, name=None, destroy=True):
157+
def spawn_replica(self, name=None, destroy=True, slot_name=None):
158158
"""
159159
Create a replica of the original node from a backup.
160160

testgres/node.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def _assign_master(self, master):
179179
# now this node has a master
180180
self._master = master
181181

182-
def _create_recovery_conf(self, username):
182+
def _create_recovery_conf(self, username, slot_name=None):
183183
"""NOTE: this is a private method!"""
184184

185185
# fetch master of this node
@@ -207,6 +207,9 @@ def _create_recovery_conf(self, username):
207207
"standby_mode=on\n"
208208
).format(conninfo)
209209

210+
if slot_name:
211+
line += "primary_slot_name={}".format()
212+
210213
self.append_conf(RECOVERY_CONF_FILE, line)
211214

212215
def _maybe_start_logger(self):
@@ -856,7 +859,22 @@ def backup(self, **kwargs):
856859

857860
return NodeBackup(node=self, **kwargs)
858861

859-
def replicate(self, name=None, **kwargs):
862+
def create_replication_slot(self, slot_name, dbname=None, username=None):
863+
"""
864+
Create a physical replication slot.
865+
866+
Args:
867+
slot_name: slot name
868+
dbname: database name
869+
username: database user name
870+
"""
871+
query = "select pg_create_physical_replication_slot('{}')".format(slot_name)
872+
873+
self.execute(query=query,
874+
dbname=dbname or default_dbname(),
875+
username=username or default_username())
876+
877+
def replicate(self, name=None, slot_name=None, **kwargs):
860878
"""
861879
Create a binary replica of this node.
862880
@@ -870,7 +888,9 @@ def replicate(self, name=None, **kwargs):
870888
backup = self.backup(**kwargs)
871889

872890
# transform backup into a replica
873-
return backup.spawn_replica(name=name, destroy=True)
891+
return backup.spawn_replica(name=name,
892+
destroy=True,
893+
slot_name=slot_name)
874894

875895
def catchup(self, dbname=None, username=None):
876896
"""

tests/test_simple.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,18 @@ def test_replicate(self):
382382
res = node.execute('select * from test')
383383
self.assertListEqual(res, [])
384384

385+
def test_replication_slots(self):
386+
query_create = 'create table test as select generate_series(1, 2) as val'
387+
388+
with get_new_node() as node:
389+
node.init(allow_streaming=True).start()
390+
node.create_replication_slot('slot1')
391+
node.execute(query_create)
392+
393+
with node.replicate(slot_name='slot1').start() as replica:
394+
res = replica.execute('select * from test')
395+
self.assertListEqual(res, [(1, ), (2, )])
396+
385397
def test_incorrect_catchup(self):
386398
with get_new_node() as node:
387399
node.init(allow_streaming=True).start()

0 commit comments

Comments
 (0)