diff options
author | Amit Kapila | 2024-03-22 08:22:05 +0000 |
---|---|---|
committer | Amit Kapila | 2024-03-22 08:22:05 +0000 |
commit | 6ae701b4378db2284c77314560e95a93d0ba9484 (patch) | |
tree | 8490c11526b9b13777a5e13fbdab2e1e2b45c8d9 /src/backend | |
parent | b4080fa3dcf6c6359e542169e0e81a0662c53ba8 (diff) |
Track invalidation_reason in pg_replication_slots.
Till now, the reason for replication slot invalidation is not tracked
directly in pg_replication_slots. A recent commit 007693f2a3 added
'conflict_reason' to show the reasons for slot conflict/invalidation, but
only for logical slots.
This commit adds a new column 'invalidation_reason' to show invalidation
reasons for both physical and logical slots. And, this commit also turns
'conflict_reason' text column to 'conflicting' boolean column (effectively
reverting commit 007693f2a3). The 'conflicting' column is true for
invalidation reasons 'rows_removed' and 'wal_level_insufficient' because
those make the slot conflict with recovery. When 'conflicting' is true,
one can now look at the new 'invalidation_reason' column for the reason
for the logical slot's conflict with recovery.
The new 'invalidation_reason' column will also be useful to track other
invalidation reasons in the future commit.
Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/ZfR7HuzFEswakt/a%40ip-10-97-1-34.eu-west-3.compute.internal
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 2 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 49 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 25 |
4 files changed, 44 insertions, 35 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 04227a72d10..f69b7f55801 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflict_reason, + L.conflicting, + L.invalidation_reason, L.failover, L.synced FROM pg_get_replication_slots() AS L diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 7b180bdb5c8..30480960c55 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -663,7 +663,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," " restart_lsn, catalog_xmin, two_phase, failover," - " database, conflict_reason" + " database, invalidation_reason" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 91ca397857a..cdf0c450c59 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1525,14 +1525,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr; XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr; XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; - ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; + ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; for (;;) { XLogRecPtr restart_lsn; NameData slotname; int active_pid = 0; - ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE; + ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1554,17 +1554,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, restart_lsn = s->data.restart_lsn; - /* - * If the slot is already invalid or is a non conflicting slot, we - * don't need to do anything. - */ + /* we do nothing if the slot is already invalid */ if (s->data.invalidated == RS_INVAL_NONE) { /* * The slot's mutex will be released soon, and it is possible that * those values change since the process holding the slot has been * terminated (if any), so record them here to ensure that we - * would report the correct conflict cause. + * would report the correct invalidation cause. */ if (!terminated) { @@ -1578,7 +1575,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_REMOVED: if (initial_restart_lsn != InvalidXLogRecPtr && initial_restart_lsn < oldestLSN) - conflict = cause; + invalidation_cause = cause; break; case RS_INVAL_HORIZON: if (!SlotIsLogical(s)) @@ -1589,15 +1586,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, if (TransactionIdIsValid(initial_effective_xmin) && TransactionIdPrecedesOrEquals(initial_effective_xmin, snapshotConflictHorizon)) - conflict = cause; + invalidation_cause = cause; else if (TransactionIdIsValid(initial_catalog_effective_xmin) && TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, snapshotConflictHorizon)) - conflict = cause; + invalidation_cause = cause; break; case RS_INVAL_WAL_LEVEL: if (SlotIsLogical(s)) - conflict = cause; + invalidation_cause = cause; break; case RS_INVAL_NONE: pg_unreachable(); @@ -1605,14 +1602,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, } /* - * The conflict cause recorded previously should not change while the - * process owning the slot (if any) has been terminated. + * The invalidation cause recorded previously should not change while + * the process owning the slot (if any) has been terminated. */ - Assert(!(conflict_prev != RS_INVAL_NONE && terminated && - conflict_prev != conflict)); + Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated && + invalidation_cause_prev != invalidation_cause)); - /* if there's no conflict, we're done */ - if (conflict == RS_INVAL_NONE) + /* if there's no invalidation, we're done */ + if (invalidation_cause == RS_INVAL_NONE) { SpinLockRelease(&s->mutex); if (released_lock) @@ -1632,13 +1629,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated = conflict; + s->data.invalidated = invalidation_cause; /* * XXX: We should consider not overwriting restart_lsn and instead * just rely on .invalidated. */ - if (conflict == RS_INVAL_WAL_REMOVED) + if (invalidation_cause == RS_INVAL_WAL_REMOVED) s->data.restart_lsn = InvalidXLogRecPtr; /* Let caller know */ @@ -1681,7 +1678,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, */ if (last_signaled_pid != active_pid) { - ReportSlotInvalidation(conflict, true, active_pid, + ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon); @@ -1694,7 +1691,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, last_signaled_pid = active_pid; terminated = true; - conflict_prev = conflict; + invalidation_cause_prev = invalidation_cause; } /* Wait until the slot is released. */ @@ -1727,7 +1724,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlotSave(); ReplicationSlotRelease(); - ReportSlotInvalidation(conflict, false, active_pid, + ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon); @@ -2356,21 +2353,21 @@ RestoreSlotFromDisk(const char *name) } /* - * Maps a conflict reason for a replication slot to + * Maps an invalidation reason for a replication slot to * ReplicationSlotInvalidationCause. */ ReplicationSlotInvalidationCause -GetSlotInvalidationCause(const char *conflict_reason) +GetSlotInvalidationCause(const char *invalidation_reason) { ReplicationSlotInvalidationCause cause; ReplicationSlotInvalidationCause result = RS_INVAL_NONE; bool found PG_USED_FOR_ASSERTS_ONLY = false; - Assert(conflict_reason); + Assert(invalidation_reason); for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++) { - if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0) + if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0) { found = true; result = cause; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ad79e1fccd6..4232c1e52e5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 17 +#define PG_GET_REPLICATION_SLOTS_COLS 18 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; WALAvailability walstate; int i; + ReplicationSlotInvalidationCause cause; if (!slot->in_use) continue; @@ -409,18 +410,28 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); - if (slot_contents.data.database == InvalidOid) + cause = slot_contents.data.invalidated; + + if (SlotIsPhysical(&slot_contents)) nulls[i++] = true; else { - ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated; - - if (cause == RS_INVAL_NONE) - nulls[i++] = true; + /* + * rows_removed and wal_level_insufficient are the only two + * reasons for the logical slot's conflict with recovery. + */ + if (cause == RS_INVAL_HORIZON || + cause == RS_INVAL_WAL_LEVEL) + values[i++] = BoolGetDatum(true); else - values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + values[i++] = BoolGetDatum(false); } + if (cause == RS_INVAL_NONE) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + values[i++] = BoolGetDatum(slot_contents.data.failover); values[i++] = BoolGetDatum(slot_contents.data.synced); |