Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3c5db1d

Browse files
committed
Implement pg_wal_replay_wait() stored procedure
pg_wal_replay_wait() is to be used on standby and specifies waiting for the specific WAL location to be replayed. This option is useful when the user makes some data changes on primary and needs a guarantee to see these changes are on standby. The queue of waiters is stored in the shared memory as an LSN-ordered pairing heap, where the waiter with the nearest LSN stays on the top. During the replay of WAL, waiters whose LSNs have already been replayed are deleted from the shared memory pairing heap and woken up by setting their latches. pg_wal_replay_wait() needs to wait without any snapshot held. Otherwise, the snapshot could prevent the replay of WAL records, implying a kind of self-deadlock. This is why it is only possible to implement pg_wal_replay_wait() as a procedure working without an active snapshot, not a function. Catversion is bumped. Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru Author: Kartyshov Ivan, Alexander Korotkov Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira Reviewed-by: Heikki Linnakangas, Kyotaro Horiguchi
1 parent a83f308 commit 3c5db1d

File tree

21 files changed

+786
-8
lines changed

21 files changed

+786
-8
lines changed

doc/src/sgml/func.sgml

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28911,6 +28911,123 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2891128911
the pause, the rate of WAL generation and available disk space.
2891228912
</para>
2891328913

28914+
<para>
28915+
The procedure shown in <xref linkend="recovery-synchronization-procedure-table"/>
28916+
can be executed only during recovery.
28917+
</para>
28918+
28919+
<table id="recovery-synchronization-procedure-table">
28920+
<title>Recovery Synchronization Procedure</title>
28921+
<tgroup cols="1">
28922+
<thead>
28923+
<row>
28924+
<entry role="func_table_entry"><para role="func_signature">
28925+
Procedure
28926+
</para>
28927+
<para>
28928+
Description
28929+
</para></entry>
28930+
</row>
28931+
</thead>
28932+
28933+
<tbody>
28934+
<row>
28935+
<entry role="func_table_entry"><para role="func_signature">
28936+
<indexterm>
28937+
<primary>pg_wal_replay_wait</primary>
28938+
</indexterm>
28939+
<function>pg_wal_replay_wait</function> (
28940+
<parameter>target_lsn</parameter> <type>pg_lsn</type>,
28941+
<parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>)
28942+
<returnvalue>void</returnvalue>
28943+
</para>
28944+
<para>
28945+
Waits until recovery replays <literal>target_lsn</literal>.
28946+
If no <parameter>timeout</parameter> is specified or it is set to
28947+
zero, this procedure waits indefinitely for the
28948+
<literal>target_lsn</literal>. If the <parameter>timeout</parameter>
28949+
is specified (in milliseconds) and is greater than zero, the
28950+
procedure waits until <literal>target_lsn</literal> is reached or
28951+
the specified <parameter>timeout</parameter> has elapsed.
28952+
On timeout, or if the server is promoted before
28953+
<literal>target_lsn</literal> is reached, an error is emitted.
28954+
</para></entry>
28955+
</row>
28956+
</tbody>
28957+
</tgroup>
28958+
</table>
28959+
28960+
<para>
28961+
<function>pg_wal_replay_wait</function> waits till
28962+
<parameter>target_lsn</parameter> to be replayed on standby.
28963+
That is, after this function execution, the value returned by
28964+
<function>pg_last_wal_replay_lsn</function> should be greater or equal
28965+
to the <parameter>target_lsn</parameter> value. This is useful to achieve
28966+
read-your-writes-consistency, while using async replica for reads and
28967+
primary for writes. In that case <acronym>lsn</acronym> of the last
28968+
modification should be stored on the client application side or the
28969+
connection pooler side.
28970+
</para>
28971+
28972+
<para>
28973+
You can use <function>pg_wal_replay_wait</function> to wait for
28974+
the <type>pg_lsn</type> value. For example, an application could update
28975+
the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
28976+
changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
28977+
on primary server to get the <acronym>lsn</acronym> given that
28978+
<varname>synchronous_commit</varname> could be set to
28979+
<literal>off</literal>.
28980+
28981+
<programlisting>
28982+
postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
28983+
UPDATE 100
28984+
postgres=# SELECT pg_current_wal_insert_lsn();
28985+
pg_current_wal_insert_lsn
28986+
--------------------
28987+
0/306EE20
28988+
(1 row)
28989+
</programlisting>
28990+
28991+
Then an application could run <function>pg_wal_replay_wait</function>
28992+
with the <acronym>lsn</acronym> obtained from primary. After that the
28993+
changes made of primary should be guaranteed to be visible on replica.
28994+
28995+
<programlisting>
28996+
postgres=# CALL pg_wal_replay_wait('0/306EE20');
28997+
CALL
28998+
postgres=# SELECT * FROM movie WHERE genre = 'Drama';
28999+
genre
29000+
-------
29001+
(0 rows)
29002+
</programlisting>
29003+
29004+
It may also happen that target <acronym>lsn</acronym> is not achieved
29005+
within the timeout. In that case the error is thrown.
29006+
29007+
<programlisting>
29008+
postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
29009+
ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
29010+
</programlisting>
29011+
29012+
</para>
29013+
29014+
<para>
29015+
<function>pg_wal_replay_wait</function> can't be used within
29016+
a transaction with an isolation level higher than
29017+
<literal>READ COMMITTED</literal>, another procedure, or a function.
29018+
All the cases above imply holding a snapshot, which could prevent
29019+
WAL records from replaying (see <xref linkend="hot-standby-conflict"/>)
29020+
and cause an indirect deadlock.
29021+
29022+
<programlisting>
29023+
postgres=# BEGIN;
29024+
BEGIN
29025+
postgres=*# CALL pg_wal_replay_wait('0/306EE20');
29026+
ERROR: pg_wal_replay_wait() must be only called without an active or registered snapshot
29027+
DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.
29028+
</programlisting>
29029+
29030+
</para>
2891429031
</sect2>
2891529032

2891629033
<sect2 id="functions-snapshot-synchronization">

src/backend/access/transam/xact.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "commands/async.h"
3939
#include "commands/tablecmds.h"
4040
#include "commands/trigger.h"
41+
#include "commands/waitlsn.h"
4142
#include "common/pg_prng.h"
4243
#include "executor/spi.h"
4344
#include "libpq/be-fsstubs.h"
@@ -2809,6 +2810,11 @@ AbortTransaction(void)
28092810
*/
28102811
LWLockReleaseAll();
28112812

2813+
/*
2814+
* Cleanup waiting for LSN if any.
2815+
*/
2816+
WaitLSNCleanup();
2817+
28122818
/* Clear wait information and command progress indicator */
28132819
pgstat_report_wait_end();
28142820
pgstat_progress_end_command();

src/backend/access/transam/xlog.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
#include "catalog/catversion.h"
6767
#include "catalog/pg_control.h"
6868
#include "catalog/pg_database.h"
69+
#include "commands/waitlsn.h"
6970
#include "common/controldata_utils.h"
7071
#include "common/file_utils.h"
7172
#include "executor/instrument.h"
@@ -6143,6 +6144,12 @@ StartupXLOG(void)
61436144
UpdateControlFile();
61446145
LWLockRelease(ControlFileLock);
61456146

6147+
/*
6148+
* Wake up all waiters for replay LSN. They need to report an error that
6149+
* recovery was ended before achieving the target LSN.
6150+
*/
6151+
WaitLSNSetLatches(InvalidXLogRecPtr);
6152+
61466153
/*
61476154
* Shutdown the recovery environment. This must occur after
61486155
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())

src/backend/access/transam/xlogrecovery.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "backup/basebackup.h"
4444
#include "catalog/pg_control.h"
4545
#include "commands/tablespace.h"
46+
#include "commands/waitlsn.h"
4647
#include "common/file_utils.h"
4748
#include "miscadmin.h"
4849
#include "pgstat.h"
@@ -1828,6 +1829,16 @@ PerformWalRecovery(void)
18281829
break;
18291830
}
18301831

1832+
/*
1833+
* If we replayed an LSN that someone was waiting for then walk
1834+
* over the shared memory array and set latches to notify the
1835+
* waiters.
1836+
*/
1837+
if (waitLSNState &&
1838+
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
1839+
pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
1840+
WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
1841+
18311842
/* Else, try to fetch the next WAL record */
18321843
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
18331844
} while (record != NULL);

src/backend/catalog/system_functions.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
414414
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
415415
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
416416

417+
CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
418+
LANGUAGE internal AS 'pg_wal_replay_wait';
419+
417420
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
418421
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
419422
OUT lsn pg_lsn, OUT xid xid, OUT data text)

src/backend/commands/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ OBJS = \
6161
vacuum.o \
6262
vacuumparallel.o \
6363
variable.o \
64-
view.o
64+
view.o \
65+
waitlsn.o
6566

6667
include $(top_srcdir)/src/backend/common.mk

src/backend/commands/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ backend_sources += files(
5050
'vacuumparallel.c',
5151
'variable.c',
5252
'view.c',
53+
'waitlsn.c',
5354
)

0 commit comments

Comments
 (0)