12
12
After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()`
13
13
methods may be used to setup replication. Example:
14
14
15
- >>> from .api import get_new_node
15
+ >>> from testgres import get_new_node
16
16
>>> with get_new_node() as nodeA, get_new_node() as nodeB:
17
17
... nodeA.init(allow_logical=True).start()
18
18
... nodeB.init().start()
44
44
45
45
from six import raise_from
46
46
47
+ from .consts import LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS
47
48
from .defaults import default_dbname , default_username
48
49
from .exceptions import CatchUpException
49
50
from .utils import options_string
@@ -56,11 +57,11 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
56
57
constructing publication objects.
57
58
58
59
Args:
59
- name: publication name
60
- node: publisher's node
61
- tables: tables list or None for all tables
62
- dbname: database name used to connect and perform subscription
63
- username: username used to connect to the database
60
+ name: publication name.
61
+ node: publisher's node.
62
+ tables: tables list or None for all tables.
63
+ dbname: database name used to connect and perform subscription.
64
+ username: username used to connect to the database.
64
65
"""
65
66
self .name = name
66
67
self .node = node
@@ -70,7 +71,7 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
70
71
# create publication in database
71
72
t = "table " + ", " .join (tables ) if tables else "all tables"
72
73
query = "create publication {} for {}"
73
- node .safe_psql (query .format (name , t ), dbname = dbname , username = username )
74
+ node .execute (query .format (name , t ), dbname = dbname , username = username )
74
75
75
76
def drop (self , dbname = None , username = None ):
76
77
"""
@@ -87,13 +88,13 @@ def add_tables(self, tables, dbname=None, username=None):
87
88
created with empty tables list.
88
89
89
90
Args:
90
- tables: a list of tables to be added to the publication
91
+ tables: a list of tables to be added to the publication.
91
92
"""
92
93
if not tables :
93
94
raise ValueError ("Tables list is empty" )
94
95
95
96
query = "alter publication {} add table {}"
96
- self .node .safe_psql (
97
+ self .node .execute (
97
98
query .format (self .name , ", " .join (tables )),
98
99
dbname = dbname or self .dbname ,
99
100
username = username or self .username )
@@ -112,15 +113,15 @@ def __init__(self,
112
113
constructing subscription objects.
113
114
114
115
Args:
115
- name: subscription name
116
- node: subscriber's node
116
+ name: subscription name.
117
+ node: subscriber's node.
117
118
publication: :class:`.Publication` object we are subscribing to
118
- (see :meth:`.PostgresNode.publish()`)
119
- dbname: database name used to connect and perform subscription
120
- username: username used to connect to the database
119
+ (see :meth:`.PostgresNode.publish()`).
120
+ dbname: database name used to connect and perform subscription.
121
+ username: username used to connect to the database.
121
122
params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
122
123
<https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
123
- for details)
124
+ for details).
124
125
"""
125
126
self .name = name
126
127
self .node = node
@@ -142,28 +143,29 @@ def __init__(self,
142
143
if params :
143
144
query += " with ({})" .format (options_string (** params ))
144
145
145
- node .safe_psql (query , dbname = dbname , username = username )
146
+ # Note: cannot run 'create subscription' query in transaction mode
147
+ node .execute (query , dbname = dbname , username = username )
146
148
147
149
def disable (self , dbname = None , username = None ):
148
150
"""
149
151
Disables the running subscription.
150
152
"""
151
153
query = "alter subscription {} disable"
152
- self .node .safe_psql (query .format (self .name ), dbname = None , username = None )
154
+ self .node .execute (query .format (self .name ), dbname = None , username = None )
153
155
154
156
def enable (self , dbname = None , username = None ):
155
157
"""
156
158
Enables the previously disabled subscription.
157
159
"""
158
160
query = "alter subscription {} enable"
159
- self .node .safe_psql (query .format (self .name ), dbname = None , username = None )
161
+ self .node .execute (query .format (self .name ), dbname = None , username = None )
160
162
161
163
def refresh (self , copy_data = True , dbname = None , username = None ):
162
164
"""
163
165
Disables the running subscription.
164
166
"""
165
167
query = "alter subscription {} refresh publication with (copy_data={})"
166
- self .node .safe_psql (
168
+ self .node .execute (
167
169
query .format (self .name , copy_data ),
168
170
dbname = dbname ,
169
171
username = username )
@@ -172,7 +174,7 @@ def drop(self, dbname=None, username=None):
172
174
"""
173
175
Drops subscription
174
176
"""
175
- self .node .safe_psql (
177
+ self .node .execute (
176
178
"drop subscription {}" .format (self .name ),
177
179
dbname = dbname ,
178
180
username = username )
@@ -182,19 +184,19 @@ def catchup(self, username=None):
182
184
Wait until subscription catches up with publication.
183
185
184
186
Args:
185
- username: remote node's user name
187
+ username: remote node's user name.
186
188
"""
187
- query = (
188
- " select pg_current_wal_lsn() - replay_lsn = 0 "
189
- " from pg_stat_replication where application_name = '{}'" ). format (
190
- self .name )
189
+ query = """
190
+ select pg_current_wal_lsn() - replay_lsn = 0
191
+ from pg_catalog. pg_stat_replication where application_name = '{}'
192
+ """ . format ( self .name )
191
193
192
194
try :
193
195
# wait until this LSN reaches subscriber
194
196
self .pub .node .poll_query_until (
195
197
query = query ,
196
198
dbname = self .pub .dbname ,
197
199
username = username or self .pub .username ,
198
- max_attempts = 60 )
200
+ max_attempts = LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS )
199
201
except Exception as e :
200
202
raise_from (CatchUpException ("Failed to catch up" , query ), e )
0 commit comments