32
32
RECOVERY_CONF_FILE , \
33
33
PG_LOG_FILE , \
34
34
UTILS_LOG_FILE , \
35
- PG_PID_FILE
35
+ PG_PID_FILE , \
36
+ REPLICATION_SLOTS
36
37
37
38
from .decorators import \
38
39
method_decorator , \
@@ -277,7 +278,7 @@ def _assign_master(self, master):
277
278
# now this node has a master
278
279
self ._master = master
279
280
280
- def _create_recovery_conf (self , username ):
281
+ def _create_recovery_conf (self , username , slot_name = None ):
281
282
"""NOTE: this is a private method!"""
282
283
283
284
# fetch master of this node
@@ -305,6 +306,9 @@ def _create_recovery_conf(self, username):
305
306
"standby_mode=on\n "
306
307
).format (conninfo )
307
308
309
+ if slot_name :
310
+ line += "primary_slot_name={}\n " .format (slot_name )
311
+
308
312
self .append_conf (RECOVERY_CONF_FILE , line )
309
313
310
314
def _maybe_start_logger (self ):
@@ -348,6 +352,28 @@ def _collect_special_files(self):
348
352
349
353
return result
350
354
355
+ def _create_replication_slot (self , slot_name , dbname = None , username = None ):
356
+ """
357
+ Create a physical replication slot.
358
+
359
+ Args:
360
+ slot_name: slot name
361
+ dbname: database name
362
+ username: database user name
363
+ """
364
+ rs = self .execute ("select exists (select * from pg_replication_slots "
365
+ "where slot_name = '{}')" .format (slot_name ),
366
+ dbname = dbname , username = username )
367
+
368
+ if rs [0 ][0 ]:
369
+ raise TestgresException ("Slot '{}' already exists" .format (slot_name ))
370
+
371
+ query = (
372
+ "select pg_create_physical_replication_slot('{}')"
373
+ ).format (slot_name )
374
+
375
+ self .execute (query = query , dbname = dbname , username = username )
376
+
351
377
def init (self , initdb_params = None , ** kwargs ):
352
378
"""
353
379
Perform initdb for this node.
@@ -458,8 +484,10 @@ def get_auth_method(t):
458
484
wal_keep_segments = 20 # for convenience
459
485
conf .write (u"hot_standby = on\n "
460
486
u"max_wal_senders = {}\n "
487
+ u"max_replication_slots = {}\n "
461
488
u"wal_keep_segments = {}\n "
462
489
u"wal_level = {}\n " .format (max_wal_senders ,
490
+ REPLICATION_SLOTS ,
463
491
wal_keep_segments ,
464
492
wal_level ))
465
493
@@ -941,7 +969,7 @@ def backup(self, **kwargs):
941
969
942
970
return NodeBackup (node = self , ** kwargs )
943
971
944
- def replicate (self , name = None , ** kwargs ):
972
+ def replicate (self , name = None , slot_name = None , ** kwargs ):
945
973
"""
946
974
Create a binary replica of this node.
947
975
@@ -952,10 +980,15 @@ def replicate(self, name=None, **kwargs):
952
980
base_dir: the base directory for data files and logs
953
981
"""
954
982
983
+ if slot_name :
984
+ self ._create_replication_slot (slot_name , ** kwargs )
985
+
955
986
backup = self .backup (** kwargs )
956
987
957
988
# transform backup into a replica
958
- return backup .spawn_replica (name = name , destroy = True )
989
+ return backup .spawn_replica (name = name ,
990
+ destroy = True ,
991
+ slot_name = slot_name )
959
992
960
993
def catchup (self , dbname = None , username = None ):
961
994
"""
0 commit comments