Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndres Freund2023-04-08 06:11:28 +0000
committerAndres Freund2023-04-08 07:05:44 +0000
commit26669757b6a7665c1069e77e6472bd8550193ca6 (patch)
treeb60b91e8fa8276744b171a7a5714ffb5b8918f66 /src/backend
parentbe87200efd9308ccfe217ce8828f316e93e370da (diff)
Handle logical slot conflicts on standby
During WAL replay on the standby, when a conflict with a logical slot is identified, invalidate such slots. There are two sources of conflicts: 1) Using the information added in 6af1793954e, logical slots are invalidated if required rows are removed 2) wal_level on the primary server is reduced to below logical Uses the infrastructure introduced in the prior commit. FIXME: add commit reference. Change InvalidatePossiblyObsoleteSlot() to use a recovery conflict to interrupt use of a slot, if called in the startup process. The new recovery conflict is added to pg_stat_database_conflicts, as confl_active_logicalslot. See 6af1793954e for an overall design of logical decoding on a standby. Bumps catversion for the addition of the pg_stat_database_conflicts column. Bumps PGSTAT_FILE_FORMAT_ID for the same reason. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/gist/gistxlog.c2
-rw-r--r--src/backend/access/hash/hash_xlog.c1
-rw-r--r--src/backend/access/heap/heapam.c3
-rw-r--r--src/backend/access/nbtree/nbtxlog.c2
-rw-r--r--src/backend/access/spgist/spgxlog.c1
-rw-r--r--src/backend/access/transam/xlog.c15
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/slot.c8
-rw-r--r--src/backend/storage/ipc/procsignal.c3
-rw-r--r--src/backend/storage/ipc/standby.c20
-rw-r--r--src/backend/tcop/postgres.c9
-rw-r--r--src/backend/utils/activity/pgstat_database.c4
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c3
13 files changed, 71 insertions, 3 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b7678f3c144..9a86fb3feff 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+ xldata->isCatalogRel,
rlocator);
}
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+ xlrec->isCatalogRel,
xlrec->locator);
}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3f..e8e06c62a95 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+ xldata->isCatalogRel,
rlocator);
}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8b13e3f8925..f389ceee1ea 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+ xlrec->isCatalogRel,
rlocator);
/*
@@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+ xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
rlocator);
/*
@@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+ xlrec->isCatalogRel,
rlocator);
}
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6deb..c87e46ed66e 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+ xlrec->isCatalogRel,
rlocator);
}
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+ xlrec->isCatalogRel,
xlrec->locator);
}
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index b071b59c8ac..459ac929ba5 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+ xldata->isCatalogRel,
locator);
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 23903445293..13f83dd57d6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7970,6 +7970,21 @@ xlog_redo(XLogReaderState *record)
/* Update our copy of the parameters in pg_control */
memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
+ /*
+ * Invalidate logical slots if we are in hot standby and the primary
+ * does not have a WAL level sufficient for logical decoding. No need
+ * to search for potentially conflicting logically slots if standby is
+ * running with wal_level lower than logical, because in that case, we
+ * would have either disallowed creation of logical slots or
+ * invalidated existing ones.
+ */
+ if (InRecovery && InHotStandby &&
+ xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+ wal_level >= WAL_LEVEL_LOGICAL)
+ InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+ 0, InvalidOid,
+ InvalidTransactionId);
+
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->MaxConnections = xlrec.MaxConnections;
ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ff69983f2ea..2129c916aa1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1069,7 +1069,8 @@ CREATE VIEW pg_stat_database_conflicts AS
pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
- pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+ pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+ pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot
FROM pg_database D;
CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4d0421c5ed1..cc79d6713b2 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1442,7 +1442,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon);
- (void) kill(active_pid, SIGTERM);
+ if (MyBackendType == B_STARTUP)
+ (void) SendProcSignal(active_pid,
+ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+ InvalidBackendId);
+ else
+ (void) kill(active_pid, SIGTERM);
+
last_signaled_pid = active_pid;
}
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 395b2cf6909..c85cb5cc18d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+ if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+ RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 9f56b4e95cf..63a72033f9a 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
*/
void
ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+ bool isCatalogRel,
RelFileLocator locator)
{
VirtualTransactionId *backends;
@@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
true);
+
+ /*
+ * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
+ * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
+ * seems OK, given that this kind of conflict should not normally be
+ * reached, e.g. due to using a physical replication slot.
+ */
+ if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+ InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
+ snapshotConflictHorizon);
}
/*
@@ -499,6 +511,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
*/
void
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+ bool isCatalogRel,
RelFileLocator locator)
{
/*
@@ -517,7 +530,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
TransactionId truncated;
truncated = XidFromFullTransactionId(snapshotConflictHorizon);
- ResolveRecoveryConflictWithSnapshot(truncated, locator);
+ ResolveRecoveryConflictWithSnapshot(truncated,
+ isCatalogRel,
+ locator);
}
}
@@ -1478,6 +1493,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
reasonDesc = _("recovery conflict on snapshot");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ reasonDesc = _("recovery conflict on replication slot");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
reasonDesc = _("recovery conflict on buffer deadlock");
break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50b..01b6cc1f7d3 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
errdetail("User query might have needed to see row versions that must be removed.");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ errdetail("User was using a logical slot that must be invalidated.");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
errdetail("User transaction caused buffer deadlock with recovery.");
break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
InterruptPending = true;
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ RecoveryConflictPending = true;
+ QueryCancelPending = true;
+ InterruptPending = true;
+ break;
+
default:
elog(FATAL, "unrecognized conflict mode: %d",
(int) reason);
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaade..7149f22f729 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
dbentry->conflict_bufferpin++;
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ dbentry->conflict_logicalslot++;
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
dbentry->conflict_startup_deadlock++;
break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
PGSTAT_ACCUM_DBCOUNT(conflict_lock);
PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+ PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6b3c464db83..e79b065d214 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1071,6 +1071,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
/* pg_stat_get_db_xact_rollback */
PG_STAT_GET_DBENTRY_INT64(xact_rollback)
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
Datum
pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1104,6 +1106,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
result = (int64) (dbentry->conflict_tablespace +
dbentry->conflict_lock +
dbentry->conflict_snapshot +
+ dbentry->conflict_logicalslot +
dbentry->conflict_bufferpin +
dbentry->conflict_startup_deadlock);