Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0fdab27

Browse files
committed
Allow logical decoding on standbys
Unsurprisingly, this requires wal_level = logical to be set on the primary and standby. The infrastructure added in 2666975 ensures that slots are invalidated if the primary's wal_level is lowered. Creating a slot on a standby waits for a xl_running_xact record to be processed. If the primary is idle (and thus not emitting xl_running_xact records), that can take a while. To make that faster, this commit also introduces the pg_log_standby_snapshot() function. By executing it on the primary, completion of slot creation on the standby can be accelerated. Note that logical decoding on a standby does not itself enforce that required catalog rows are not removed. The user has to use physical replication slots + hot_standby_feedback or other measures to prevent that. If catalog rows required for a slot are removed, the slot is invalidated. See 6af1793 for an overall design of logical decoding on a standby. Bumps catversion, for the addition of the pg_log_standby_snapshot() function. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> (in an older version) Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: FabrÌzio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com>
1 parent e101dfa commit 0fdab27

File tree

12 files changed

+202
-61
lines changed

12 files changed

+202
-61
lines changed

doc/src/sgml/func.sgml

+15
Original file line numberDiff line numberDiff line change
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2707427074
prepared with <xref linkend="sql-prepare-transaction"/>.
2707527075
</para></entry>
2707627076
</row>
27077+
<row>
27078+
<entry role="func_table_entry"><para role="func_signature">
27079+
<indexterm>
27080+
<primary>pg_log_standby_snapshot</primary>
27081+
</indexterm>
27082+
<function>pg_log_standby_snapshot</function> ()
27083+
<returnvalue>pg_lsn</returnvalue>
27084+
</para>
27085+
<para>
27086+
Take a snapshot of running transactions and write it to WAL, without
27087+
having to wait bgwriter or checkpointer to log one. This is useful for
27088+
logical decoding on standby, as logical slot creation has to wait
27089+
until such a record is replayed on the standby.
27090+
</para></entry>
27091+
</row>
2707727092
</tbody>
2707827093
</tgroup>
2707927094
</table>

doc/src/sgml/logicaldecoding.sgml

+27
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
316316
may consume changes from a slot at any given time.
317317
</para>
318318

319+
<para>
320+
A logical replication slot can also be created on a hot standby. To prevent
321+
<command>VACUUM</command> from removing required rows from the system
322+
catalogs, <varname>hot_standby_feedback</varname> should be set on the
323+
standby. In spite of that, if any required rows get removed, the slot gets
324+
invalidated. It's highly recommended to use a physical slot between the primary
325+
and the standby. Otherwise, hot_standby_feedback will work, but only while the
326+
connection is alive (for example a node restart would break it). Then, the
327+
primary may delete system catalog rows that could be needed by the logical
328+
decoding on the standby (as it does not know about the catalog_xmin on the
329+
standby). Existing logical slots on standby also get invalidated if wal_level
330+
on primary is reduced to less than 'logical'. This is done as soon as the
331+
standby detects such a change in the WAL stream. It means, that for walsenders
332+
that are lagging (if any), some WAL records up to the wal_level parameter change
333+
on the primary won't be decoded.
334+
</para>
335+
336+
<para>
337+
Creation of a logical slot requires information about all the currently
338+
running transactions. On the primary, this information is available
339+
directly, but on a standby, this information has to be obtained from
340+
primary. Thus, slot creation may need to wait for some activity to happen
341+
on the primary. If the primary is idle, creating a logical slot on
342+
standby may take noticeable time. This can be sped up by calling the
343+
<function>pg_log_standby_snapshot</function> on the primary.
344+
</para>
345+
319346
<caution>
320347
<para>
321348
Replication slots persist across crashes and know nothing about the state

src/backend/access/transam/xlog.c

+11
Original file line numberDiff line numberDiff line change
@@ -4469,6 +4469,17 @@ LocalProcessControlFile(bool reset)
44694469
ReadControlFile();
44704470
}
44714471

4472+
/*
4473+
* Get the wal_level from the control file. For a standby, this value should be
4474+
* considered as its active wal_level, because it may be different from what
4475+
* was originally configured on standby.
4476+
*/
4477+
WalLevel
4478+
GetActiveWalLevelOnStandby(void)
4479+
{
4480+
return ControlFile->wal_level;
4481+
}
4482+
44724483
/*
44734484
* Initialization of shared memory for XLOG
44744485
*/

src/backend/access/transam/xlogfuncs.c

+31
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "storage/fd.h"
3232
#include "storage/ipc.h"
3333
#include "storage/smgr.h"
34+
#include "storage/standby.h"
3435
#include "utils/builtins.h"
3536
#include "utils/guc.h"
3637
#include "utils/memutils.h"
@@ -196,6 +197,36 @@ pg_switch_wal(PG_FUNCTION_ARGS)
196197
PG_RETURN_LSN(switchpoint);
197198
}
198199

200+
/*
201+
* pg_log_standby_snapshot: call LogStandbySnapshot()
202+
*
203+
* Permission checking for this function is managed through the normal
204+
* GRANT system.
205+
*/
206+
Datum
207+
pg_log_standby_snapshot(PG_FUNCTION_ARGS)
208+
{
209+
XLogRecPtr recptr;
210+
211+
if (RecoveryInProgress())
212+
ereport(ERROR,
213+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
214+
errmsg("recovery is in progress"),
215+
errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
216+
217+
if (!XLogStandbyInfoActive())
218+
ereport(ERROR,
219+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
220+
errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica")));
221+
222+
recptr = LogStandbySnapshot();
223+
224+
/*
225+
* As a convenience, return the WAL location of the last inserted record
226+
*/
227+
PG_RETURN_LSN(recptr);
228+
}
229+
199230
/*
200231
* pg_create_restore_point: a named point for restore
201232
*

src/backend/catalog/system_functions.sql

+2
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
644644

645645
REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
646646

647+
REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
648+
647649
REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
648650

649651
REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;

src/backend/replication/logical/decode.c

+29-1
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
152152
* can restart from there.
153153
*/
154154
break;
155+
case XLOG_PARAMETER_CHANGE:
156+
{
157+
xl_parameter_change *xlrec =
158+
(xl_parameter_change *) XLogRecGetData(buf->record);
159+
160+
/*
161+
* If wal_level on the primary is reduced to less than
162+
* logical, we want to prevent existing logical slots from
163+
* being used. Existing logical slots on the standby get
164+
* invalidated when this WAL record is replayed; and further,
165+
* slot creation fails when wal_level is not sufficient; but
166+
* all these operations are not synchronized, so a logical
167+
* slot may creep in while the wal_level is being
168+
* reduced. Hence this extra check.
169+
*/
170+
if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
171+
{
172+
/*
173+
* This can occur only on a standby, as a primary would
174+
* not allow to restart after changing wal_level < logical
175+
* if there is pre-existing logical slot.
176+
*/
177+
Assert(RecoveryInProgress());
178+
ereport(ERROR,
179+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
181+
}
182+
break;
183+
}
155184
case XLOG_NOOP:
156185
case XLOG_NEXTOID:
157186
case XLOG_SWITCH:
158187
case XLOG_BACKUP_END:
159-
case XLOG_PARAMETER_CHANGE:
160188
case XLOG_RESTORE_POINT:
161189
case XLOG_FPW_CHANGE:
162190
case XLOG_FPI_FOR_HINT:

src/backend/replication/logical/logical.c

+20-16
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
124124
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
125125
errmsg("logical decoding requires a database connection")));
126126

127-
/* ----
128-
* TODO: We got to change that someday soon...
129-
*
130-
* There's basically three things missing to allow this:
131-
* 1) We need to be able to correctly and quickly identify the timeline a
132-
* LSN belongs to
133-
* 2) We need to force hot_standby_feedback to be enabled at all times so
134-
* the primary cannot remove rows we need.
135-
* 3) support dropping replication slots referring to a database, in
136-
* dbase_redo. There can't be any active ones due to HS recovery
137-
* conflicts, so that should be relatively easy.
138-
* ----
139-
*/
140127
if (RecoveryInProgress())
141-
ereport(ERROR,
142-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
143-
errmsg("logical decoding cannot be used while in recovery")));
128+
{
129+
/*
130+
* This check may have race conditions, but whenever
131+
* XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
132+
* verify that there are no existing logical replication slots. And to
133+
* avoid races around creating a new slot,
134+
* CheckLogicalDecodingRequirements() is called once before creating
135+
* the slot, and once when logical decoding is initially starting up.
136+
*/
137+
if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
138+
ereport(ERROR,
139+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
140+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
141+
}
144142
}
145143

146144
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
342340
LogicalDecodingContext *ctx;
343341
MemoryContext old_context;
344342

343+
/*
344+
* On a standby, this check is also required while creating the
345+
* slot. Check the comments in the function.
346+
*/
347+
CheckLogicalDecodingRequirements();
348+
345349
/* shorter lines... */
346350
slot = MyReplicationSlot;
347351

src/backend/replication/slot.c

+30-27
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
#include "access/transam.h"
4343
#include "access/xlog_internal.h"
44+
#include "access/xlogrecovery.h"
4445
#include "common/file_utils.h"
4546
#include "common/string.h"
4647
#include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
11921193
/*
11931194
* For logical slots log a standby snapshot and start logical decoding
11941195
* at exactly that position. That allows the slot to start up more
1195-
* quickly.
1196+
* quickly. But on a standby we cannot do WAL writes, so just use the
1197+
* replay pointer; effectively, an attempt to create a logical slot on
1198+
* standby will cause it to wait for an xl_running_xact record to be
1199+
* logged independently on the primary, so that a snapshot can be
1200+
* built using the record.
11961201
*
1197-
* That's not needed (or indeed helpful) for physical slots as they'll
1198-
* start replay at the last logged checkpoint anyway. Instead return
1199-
* the location of the last redo LSN. While that slightly increases
1200-
* the chance that we have to retry, it's where a base backup has to
1201-
* start replay at.
1202+
* None of this is needed (or indeed helpful) for physical slots as
1203+
* they'll start replay at the last logged checkpoint anyway. Instead
1204+
* return the location of the last redo LSN. While that slightly
1205+
* increases the chance that we have to retry, it's where a base
1206+
* backup has to start replay at.
12021207
*/
1203-
if (!RecoveryInProgress() && SlotIsLogical(slot))
1204-
{
1205-
XLogRecPtr flushptr;
1206-
1207-
/* start at current insert position */
1208+
if (SlotIsPhysical(slot))
1209+
restart_lsn = GetRedoRecPtr();
1210+
else if (RecoveryInProgress())
1211+
restart_lsn = GetXLogReplayRecPtr(NULL);
1212+
else
12081213
restart_lsn = GetXLogInsertRecPtr();
1209-
SpinLockAcquire(&slot->mutex);
1210-
slot->data.restart_lsn = restart_lsn;
1211-
SpinLockRelease(&slot->mutex);
1212-
1213-
/* make sure we have enough information to start */
1214-
flushptr = LogStandbySnapshot();
12151214

1216-
/* and make sure it's fsynced to disk */
1217-
XLogFlush(flushptr);
1218-
}
1219-
else
1220-
{
1221-
restart_lsn = GetRedoRecPtr();
1222-
SpinLockAcquire(&slot->mutex);
1223-
slot->data.restart_lsn = restart_lsn;
1224-
SpinLockRelease(&slot->mutex);
1225-
}
1215+
SpinLockAcquire(&slot->mutex);
1216+
slot->data.restart_lsn = restart_lsn;
1217+
SpinLockRelease(&slot->mutex);
12261218

12271219
/* prevent WAL removal as fast as possible */
12281220
ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
12381230
if (XLogGetLastRemovedSegno() < segno)
12391231
break;
12401232
}
1233+
1234+
if (!RecoveryInProgress() && SlotIsLogical(slot))
1235+
{
1236+
XLogRecPtr flushptr;
1237+
1238+
/* make sure we have enough information to start */
1239+
flushptr = LogStandbySnapshot();
1240+
1241+
/* and make sure it's fsynced to disk */
1242+
XLogFlush(flushptr);
1243+
}
12411244
}
12421245

12431246
/*

src/backend/replication/walsender.c

+32-16
Original file line numberDiff line numberDiff line change
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
906906
int count;
907907
WALReadError errinfo;
908908
XLogSegNo segno;
909-
TimeLineID currTLI = GetWALInsertionTimeLine();
909+
TimeLineID currTLI;
910+
911+
/*
912+
* Make sure we have enough WAL available before retrieving the current
913+
* timeline. This is needed to determine am_cascading_walsender accurately
914+
* which is needed to determine the current timeline.
915+
*/
916+
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
910917

911918
/*
912-
* Since logical decoding is only permitted on a primary server, we know
913-
* that the current timeline ID can't be changing any more. If we did this
914-
* on a standby, we'd have to worry about the values we compute here
915-
* becoming invalid due to a promotion or timeline change.
919+
* Since logical decoding is also permitted on a standby server, we need
920+
* to check if the server is in recovery to decide how to get the current
921+
* timeline ID (so that it also cover the promotion or timeline change
922+
* cases).
916923
*/
924+
am_cascading_walsender = RecoveryInProgress();
925+
926+
if (am_cascading_walsender)
927+
GetXLogReplayRecPtr(&currTLI);
928+
else
929+
currTLI = GetWALInsertionTimeLine();
930+
917931
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
918932
sendTimeLineIsHistoric = (state->currTLI != currTLI);
919933
sendTimeLine = state->currTLI;
920934
sendTimeLineValidUpto = state->currTLIValidUntil;
921935
sendTimeLineNextTLI = state->nextTLI;
922936

923-
/* make sure we have enough WAL available */
924-
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
925-
926937
/* fail if not (implies we are going to shut down) */
927938
if (flushptr < targetPagePtr + reqLen)
928939
return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
937948
cur_page,
938949
targetPagePtr,
939950
XLOG_BLCKSZ,
940-
state->seg.ws_tli, /* Pass the current TLI because only
941-
* WalSndSegmentOpen controls whether new
942-
* TLI is needed. */
951+
currTLI, /* Pass the current TLI because only
952+
* WalSndSegmentOpen controls whether new TLI
953+
* is needed. */
943954
&errinfo))
944955
WALReadRaiseError(&errinfo);
945956

@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
30763087
* If first time through in this session, initialize flushPtr. Otherwise,
30773088
* we only need to update flushPtr if EndRecPtr is past it.
30783089
*/
3079-
if (flushPtr == InvalidXLogRecPtr)
3080-
flushPtr = GetFlushRecPtr(NULL);
3081-
else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3082-
flushPtr = GetFlushRecPtr(NULL);
3090+
if (flushPtr == InvalidXLogRecPtr ||
3091+
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3092+
{
3093+
if (am_cascading_walsender)
3094+
flushPtr = GetStandbyFlushRecPtr(NULL);
3095+
else
3096+
flushPtr = GetFlushRecPtr(NULL);
3097+
}
30833098

30843099
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
30853100
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
31703185
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
31713186
replayPtr = GetXLogReplayRecPtr(&replayTLI);
31723187

3173-
*tli = replayTLI;
3188+
if (tli)
3189+
*tli = replayTLI;
31743190

31753191
result = replayPtr;
31763192
if (receiveTLI == replayTLI && receivePtr > replayPtr)

src/include/access/xlog.h

+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
230230
extern void BootStrapXLOG(void);
231231
extern void InitializeWalConsistencyChecking(void);
232232
extern void LocalProcessControlFile(bool reset);
233+
extern WalLevel GetActiveWalLevelOnStandby(void);
233234
extern void StartupXLOG(void);
234235
extern void ShutdownXLOG(int code, Datum arg);
235236
extern void CreateCheckPoint(int flags);

0 commit comments

Comments
 (0)