Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2024-03-22 08:22:05 +0000
committerAmit Kapila2024-03-22 08:22:05 +0000
commit6ae701b4378db2284c77314560e95a93d0ba9484 (patch)
tree8490c11526b9b13777a5e13fbdab2e1e2b45c8d9 /src/backend
parentb4080fa3dcf6c6359e542169e0e81a0662c53ba8 (diff)
Track invalidation_reason in pg_replication_slots.
Till now, the reason for replication slot invalidation is not tracked directly in pg_replication_slots. A recent commit 007693f2a3 added 'conflict_reason' to show the reasons for slot conflict/invalidation, but only for logical slots. This commit adds a new column 'invalidation_reason' to show invalidation reasons for both physical and logical slots. And, this commit also turns 'conflict_reason' text column to 'conflicting' boolean column (effectively reverting commit 007693f2a3). The 'conflicting' column is true for invalidation reasons 'rows_removed' and 'wal_level_insufficient' because those make the slot conflict with recovery. When 'conflicting' is true, one can now look at the new 'invalidation_reason' column for the reason for the logical slot's conflict with recovery. The new 'invalidation_reason' column will also be useful to track other invalidation reasons in the future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/ZfR7HuzFEswakt/a%40ip-10-97-1-34.eu-west-3.compute.internal Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/logical/slotsync.c2
-rw-r--r--src/backend/replication/slot.c49
-rw-r--r--src/backend/replication/slotfuncs.c25
4 files changed, 44 insertions, 35 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d10..f69b7f55801 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
- L.conflict_reason,
+ L.conflicting,
+ L.invalidation_reason,
L.failover,
L.synced
FROM pg_get_replication_slots() AS L
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 7b180bdb5c8..30480960c55 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -663,7 +663,7 @@ synchronize_slots(WalReceiverConn *wrconn)
bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
- " database, conflict_reason"
+ " database, invalidation_reason"
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 91ca397857a..cdf0c450c59 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1525,14 +1525,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr;
XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr;
XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
- ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+ ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
for (;;)
{
XLogRecPtr restart_lsn;
NameData slotname;
int active_pid = 0;
- ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
+ ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
@@ -1554,17 +1554,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
restart_lsn = s->data.restart_lsn;
- /*
- * If the slot is already invalid or is a non conflicting slot, we
- * don't need to do anything.
- */
+ /* we do nothing if the slot is already invalid */
if (s->data.invalidated == RS_INVAL_NONE)
{
/*
* The slot's mutex will be released soon, and it is possible that
* those values change since the process holding the slot has been
* terminated (if any), so record them here to ensure that we
- * would report the correct conflict cause.
+ * would report the correct invalidation cause.
*/
if (!terminated)
{
@@ -1578,7 +1575,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
case RS_INVAL_WAL_REMOVED:
if (initial_restart_lsn != InvalidXLogRecPtr &&
initial_restart_lsn < oldestLSN)
- conflict = cause;
+ invalidation_cause = cause;
break;
case RS_INVAL_HORIZON:
if (!SlotIsLogical(s))
@@ -1589,15 +1586,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
if (TransactionIdIsValid(initial_effective_xmin) &&
TransactionIdPrecedesOrEquals(initial_effective_xmin,
snapshotConflictHorizon))
- conflict = cause;
+ invalidation_cause = cause;
else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
snapshotConflictHorizon))
- conflict = cause;
+ invalidation_cause = cause;
break;
case RS_INVAL_WAL_LEVEL:
if (SlotIsLogical(s))
- conflict = cause;
+ invalidation_cause = cause;
break;
case RS_INVAL_NONE:
pg_unreachable();
@@ -1605,14 +1602,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
}
/*
- * The conflict cause recorded previously should not change while the
- * process owning the slot (if any) has been terminated.
+ * The invalidation cause recorded previously should not change while
+ * the process owning the slot (if any) has been terminated.
*/
- Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
- conflict_prev != conflict));
+ Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
+ invalidation_cause_prev != invalidation_cause));
- /* if there's no conflict, we're done */
- if (conflict == RS_INVAL_NONE)
+ /* if there's no invalidation, we're done */
+ if (invalidation_cause == RS_INVAL_NONE)
{
SpinLockRelease(&s->mutex);
if (released_lock)
@@ -1632,13 +1629,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
- s->data.invalidated = conflict;
+ s->data.invalidated = invalidation_cause;
/*
* XXX: We should consider not overwriting restart_lsn and instead
* just rely on .invalidated.
*/
- if (conflict == RS_INVAL_WAL_REMOVED)
+ if (invalidation_cause == RS_INVAL_WAL_REMOVED)
s->data.restart_lsn = InvalidXLogRecPtr;
/* Let caller know */
@@ -1681,7 +1678,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
*/
if (last_signaled_pid != active_pid)
{
- ReportSlotInvalidation(conflict, true, active_pid,
+ ReportSlotInvalidation(invalidation_cause, true, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon);
@@ -1694,7 +1691,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
last_signaled_pid = active_pid;
terminated = true;
- conflict_prev = conflict;
+ invalidation_cause_prev = invalidation_cause;
}
/* Wait until the slot is released. */
@@ -1727,7 +1724,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
ReplicationSlotSave();
ReplicationSlotRelease();
- ReportSlotInvalidation(conflict, false, active_pid,
+ ReportSlotInvalidation(invalidation_cause, false, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon);
@@ -2356,21 +2353,21 @@ RestoreSlotFromDisk(const char *name)
}
/*
- * Maps a conflict reason for a replication slot to
+ * Maps an invalidation reason for a replication slot to
* ReplicationSlotInvalidationCause.
*/
ReplicationSlotInvalidationCause
-GetSlotInvalidationCause(const char *conflict_reason)
+GetSlotInvalidationCause(const char *invalidation_reason)
{
ReplicationSlotInvalidationCause cause;
ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
bool found PG_USED_FOR_ASSERTS_ONLY = false;
- Assert(conflict_reason);
+ Assert(invalidation_reason);
for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
{
- if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
+ if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
{
found = true;
result = cause;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad79e1fccd6..4232c1e52e5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 17
+#define PG_GET_REPLICATION_SLOTS_COLS 18
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
WALAvailability walstate;
int i;
+ ReplicationSlotInvalidationCause cause;
if (!slot->in_use)
continue;
@@ -409,18 +410,28 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
- if (slot_contents.data.database == InvalidOid)
+ cause = slot_contents.data.invalidated;
+
+ if (SlotIsPhysical(&slot_contents))
nulls[i++] = true;
else
{
- ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated;
-
- if (cause == RS_INVAL_NONE)
- nulls[i++] = true;
+ /*
+ * rows_removed and wal_level_insufficient are the only two
+ * reasons for the logical slot's conflict with recovery.
+ */
+ if (cause == RS_INVAL_HORIZON ||
+ cause == RS_INVAL_WAL_LEVEL)
+ values[i++] = BoolGetDatum(true);
else
- values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+ values[i++] = BoolGetDatum(false);
}
+ if (cause == RS_INVAL_NONE)
+ nulls[i++] = true;
+ else
+ values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+
values[i++] = BoolGetDatum(slot_contents.data.failover);
values[i++] = BoolGetDatum(slot_contents.data.synced);