Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 29d0a77

Browse files
author
Amit Kapila
committed
Migrate logical slots to the new node during an upgrade.
While reading information from the old cluster, a list of logical slots is fetched. At the later part of upgrading, pg_upgrade revisits the list and restores slots by executing pg_create_logical_replication_slot() on the new cluster. Migration of logical replication slots is only supported when the old cluster is version 17.0 or later. If the old node has invalid slots or slots with unconsumed WAL records, the pg_upgrade fails. These checks are needed to prevent data loss. The significant advantage of this commit is that it makes it easy to continue logical replication even after upgrading the publisher node. Previously, pg_upgrade allowed copying publications to a new node. With this patch, adjusting the connection string to the new publisher will cause the apply worker on the subscriber to connect to the new publisher automatically. This enables seamless continuation of logical replication, even after an upgrade. Author: Hayato Kuroda, Hou Zhijie Reviewed-by: Peter Smith, Bharath Rupireddy, Dilip Kumar, Vignesh C, Shlok Kyal Discussion: http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com Discussion: http://postgr.es/m/CAA4eK1+t7xYcfa0rEQw839=b2MzsfvYDPz3xbD+ZqOdP3zpKYg@mail.gmail.com
1 parent bddc2f7 commit 29d0a77

File tree

18 files changed

+927
-27
lines changed

18 files changed

+927
-27
lines changed

doc/src/sgml/ref/pgupgrade.sgml

+76-2
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,79 @@ make prefix=/usr/local/pgsql.new install
383383
</para>
384384
</step>
385385

386+
<step>
387+
<title>Prepare for publisher upgrades</title>
388+
389+
<para>
390+
<application>pg_upgrade</application> attempts to migrate logical
391+
slots. This helps avoid the need for manually defining the same
392+
logical slots on the new publisher. Migration of logical slots is
393+
only supported when the old cluster is version 17.0 or later.
394+
Logical slots on clusters before version 17.0 will silently be
395+
ignored.
396+
</para>
397+
398+
<para>
399+
Before you start upgrading the publisher cluster, ensure that the
400+
subscription is temporarily disabled, by executing
401+
<link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
402+
Re-enable the subscription after the upgrade.
403+
</para>
404+
405+
<para>
406+
There are some prerequisites for <application>pg_upgrade</application> to
407+
be able to upgrade the logical slots. If these are not met an error
408+
will be reported.
409+
</para>
410+
411+
<itemizedlist>
412+
<listitem>
413+
<para>
414+
The new cluster must have
415+
<link linkend="guc-wal-level"><varname>wal_level</varname></link> as
416+
<literal>logical</literal>.
417+
</para>
418+
</listitem>
419+
<listitem>
420+
<para>
421+
The new cluster must have
422+
<link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
423+
configured to a value greater than or equal to the number of slots
424+
present in the old cluster.
425+
</para>
426+
</listitem>
427+
<listitem>
428+
<para>
429+
The output plugins referenced by the slots on the old cluster must be
430+
installed in the new PostgreSQL executable directory.
431+
</para>
432+
</listitem>
433+
<listitem>
434+
<para>
435+
The old cluster has replicated all the transactions and logical decoding
436+
messages to subscribers.
437+
</para>
438+
</listitem>
439+
<listitem>
440+
<para>
441+
All slots on the old cluster must be usable, i.e., there are no slots
442+
whose
443+
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
444+
is <literal>true</literal>.
445+
</para>
446+
</listitem>
447+
<listitem>
448+
<para>
449+
The new cluster must not have permanent logical slots, i.e.,
450+
there must be no slots where
451+
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>temporary</structfield>
452+
is <literal>false</literal>.
453+
</para>
454+
</listitem>
455+
</itemizedlist>
456+
457+
</step>
458+
386459
<step>
387460
<title>Stop both servers</title>
388461

@@ -650,8 +723,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
650723
Configure the servers for log shipping. (You do not need to run
651724
<function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
652725
or take a file system backup as the standbys are still synchronized
653-
with the primary.) Replication slots are not copied and must
654-
be recreated.
726+
with the primary.) Only logical slots on the primary are copied to the
727+
new standby, but other slots on the old standby are not copied so must
728+
be recreated manually.
655729
</para>
656730
</step>
657731

src/backend/replication/logical/decode.c

+39-9
Original file line numberDiff line numberDiff line change
@@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
600600

601601
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
602602

603-
/*
604-
* If we don't have snapshot or we are just fast-forwarding, there is no
605-
* point in decoding messages.
606-
*/
607-
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
608-
ctx->fast_forward)
603+
/* If we don't have snapshot, there is no point in decoding messages */
604+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
609605
return;
610606

611607
message = (xl_logical_message *) XLogRecGetData(r);
@@ -622,6 +618,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
622618
SnapBuildXactNeedsSkip(builder, buf->origptr)))
623619
return;
624620

621+
/*
622+
* We also skip decoding in fast_forward mode. This check must be last
623+
* because we don't want to set the processing_required flag unless we
624+
* have a decodable message.
625+
*/
626+
if (ctx->fast_forward)
627+
{
628+
/*
629+
* We need to set processing_required flag to notify the message's
630+
* existence to the caller. Usually, the flag is set when either the
631+
* COMMIT or ABORT records are decoded, but this must be turned on
632+
* here because the non-transactional logical message is decoded
633+
* without waiting for these records.
634+
*/
635+
if (!message->transactional)
636+
ctx->processing_required = true;
637+
638+
return;
639+
}
640+
625641
/*
626642
* If this is a non-transactional change, get the snapshot we're expected
627643
* to use. We only get here when the snapshot is consistent, and the
@@ -1286,7 +1302,21 @@ static bool
12861302
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
12871303
Oid txn_dbid, RepOriginId origin_id)
12881304
{
1289-
return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
1290-
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
1291-
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
1305+
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
1306+
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
1307+
FilterByOrigin(ctx, origin_id))
1308+
return true;
1309+
1310+
/*
1311+
* We also skip decoding in fast_forward mode. In passing set the
1312+
* processing_required flag to indicate that if it were not for
1313+
* fast_forward mode, processing would have been required.
1314+
*/
1315+
if (ctx->fast_forward)
1316+
{
1317+
ctx->processing_required = true;
1318+
return true;
1319+
}
1320+
1321+
return false;
12921322
}

src/backend/replication/logical/logical.c

+75
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "postgres.h"
3030

3131
#include "access/xact.h"
32+
#include "access/xlogutils.h"
3233
#include "access/xlog_internal.h"
3334
#include "fmgr.h"
3435
#include "miscadmin.h"
@@ -41,6 +42,7 @@
4142
#include "storage/proc.h"
4243
#include "storage/procarray.h"
4344
#include "utils/builtins.h"
45+
#include "utils/inval.h"
4446
#include "utils/memutils.h"
4547

4648
/* data for errcontext callback */
@@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
19491951
rb->totalTxns = 0;
19501952
rb->totalBytes = 0;
19511953
}
1954+
1955+
/*
1956+
* Read up to the end of WAL starting from the decoding slot's restart_lsn.
1957+
* Return true if any meaningful/decodable WAL records are encountered,
1958+
* otherwise false.
1959+
*/
1960+
bool
1961+
LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
1962+
{
1963+
bool has_pending_wal = false;
1964+
1965+
Assert(MyReplicationSlot);
1966+
1967+
PG_TRY();
1968+
{
1969+
LogicalDecodingContext *ctx;
1970+
1971+
/*
1972+
* Create our decoding context in fast_forward mode, passing start_lsn
1973+
* as InvalidXLogRecPtr, so that we start processing from the slot's
1974+
* confirmed_flush.
1975+
*/
1976+
ctx = CreateDecodingContext(InvalidXLogRecPtr,
1977+
NIL,
1978+
true, /* fast_forward */
1979+
XL_ROUTINE(.page_read = read_local_xlog_page,
1980+
.segment_open = wal_segment_open,
1981+
.segment_close = wal_segment_close),
1982+
NULL, NULL, NULL);
1983+
1984+
/*
1985+
* Start reading at the slot's restart_lsn, which we know points to a
1986+
* valid record.
1987+
*/
1988+
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
1989+
1990+
/* Invalidate non-timetravel entries */
1991+
InvalidateSystemCaches();
1992+
1993+
/* Loop until the end of WAL or some changes are processed */
1994+
while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
1995+
{
1996+
XLogRecord *record;
1997+
char *errm = NULL;
1998+
1999+
record = XLogReadRecord(ctx->reader, &errm);
2000+
2001+
if (errm)
2002+
elog(ERROR, "could not find record for logical decoding: %s", errm);
2003+
2004+
if (record != NULL)
2005+
LogicalDecodingProcessRecord(ctx, ctx->reader);
2006+
2007+
has_pending_wal = ctx->processing_required;
2008+
2009+
CHECK_FOR_INTERRUPTS();
2010+
}
2011+
2012+
/* Clean up */
2013+
FreeDecodingContext(ctx);
2014+
InvalidateSystemCaches();
2015+
}
2016+
PG_CATCH();
2017+
{
2018+
/* clear all timetravel entries */
2019+
InvalidateSystemCaches();
2020+
2021+
PG_RE_THROW();
2022+
}
2023+
PG_END_TRY();
2024+
2025+
return has_pending_wal;
2026+
}

src/backend/replication/slot.c

+14
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
14231423

14241424
SpinLockRelease(&s->mutex);
14251425

1426+
/*
1427+
* The logical replication slots shouldn't be invalidated as
1428+
* max_slot_wal_keep_size GUC is set to -1 during the upgrade.
1429+
*
1430+
* The following is just a sanity check.
1431+
*/
1432+
if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
1433+
{
1434+
ereport(ERROR,
1435+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1436+
errmsg("replication slots must not be invalidated during the upgrade"),
1437+
errhint("\"max_slot_wal_keep_size\" must be set to -1 during the upgrade"));
1438+
}
1439+
14261440
if (active_pid != 0)
14271441
{
14281442
/*

src/backend/utils/adt/pg_upgrade_support.c

+44
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "catalog/pg_type.h"
1818
#include "commands/extension.h"
1919
#include "miscadmin.h"
20+
#include "replication/logical.h"
2021
#include "utils/array.h"
2122
#include "utils/builtins.h"
2223

@@ -261,3 +262,46 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
261262

262263
PG_RETURN_VOID();
263264
}
265+
266+
/*
267+
* Verify the given slot has already consumed all the WAL changes.
268+
*
269+
* Returns true if there are no decodable WAL records after the
270+
* confirmed_flush_lsn. Otherwise false.
271+
*
272+
* This is a special purpose function to ensure that the given slot can be
273+
* upgraded without data loss.
274+
*/
275+
Datum
276+
binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
277+
{
278+
Name slot_name;
279+
XLogRecPtr end_of_wal;
280+
bool found_pending_wal;
281+
282+
CHECK_IS_BINARY_UPGRADE;
283+
284+
/* We must check before dereferencing the argument */
285+
if (PG_ARGISNULL(0))
286+
elog(ERROR, "null argument to binary_upgrade_validate_wal_records is not allowed");
287+
288+
CheckSlotPermissions();
289+
290+
slot_name = PG_GETARG_NAME(0);
291+
292+
/* Acquire the given slot */
293+
ReplicationSlotAcquire(NameStr(*slot_name), true);
294+
295+
Assert(SlotIsLogical(MyReplicationSlot));
296+
297+
/* Slots must be valid as otherwise we won't be able to scan the WAL */
298+
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
299+
300+
end_of_wal = GetFlushRecPtr(NULL);
301+
found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);
302+
303+
/* Clean up */
304+
ReplicationSlotRelease();
305+
306+
PG_RETURN_BOOL(!found_pending_wal);
307+
}

src/bin/pg_upgrade/Makefile

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
44
PGAPPICON = win32
55

6+
# required for 003_upgrade_logical_replication_slots.pl
7+
EXTRA_INSTALL=contrib/test_decoding
8+
69
subdir = src/bin/pg_upgrade
710
top_builddir = ../../..
811
include $(top_builddir)/src/Makefile.global

0 commit comments

Comments
 (0)