Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit be87200

Browse files
committed
Support invalidating replication slots due to horizon and wal_level
Needed for logical decoding on a standby. Slots need to be invalidated because of the horizon if rows required for logical decoding are removed. If the primary's wal_level is lowered from 'logical', logical slots on the standby need to be invalidated. The new invalidation methods will be used in a subsequent commit. Logical slots that have been invalidated can be identified via the new pg_replication_slots.conflicting column. See 6af1793 for an overall design of logical decoding on a standby. Bumps catversion for the addition of the new pg_replication_slots column. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
1 parent 2ed16aa commit be87200

File tree

10 files changed

+176
-37
lines changed

10 files changed

+176
-37
lines changed

doc/src/sgml/system-views.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25172517
false for physical slots.
25182518
</para></entry>
25192519
</row>
2520+
2521+
<row>
2522+
<entry role="catalog_table_entry"><para role="column_definition">
2523+
<structfield>conflicting</structfield> <type>bool</type>
2524+
</para>
2525+
<para>
2526+
True if this logical slot conflicted with recovery (and so is now
2527+
invalidated). Always NULL for physical slots.
2528+
</para></entry>
2529+
</row>
25202530
</tbody>
25212531
</tgroup>
25222532
</table>

src/backend/access/transam/xlog.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags)
68096809
*/
68106810
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
68116811
KeepLogSeg(recptr, &_logSegNo);
6812-
if (InvalidateObsoleteReplicationSlots(_logSegNo))
6812+
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
6813+
_logSegNo, InvalidOid,
6814+
InvalidTransactionId))
68136815
{
68146816
/*
68156817
* Some slots have been invalidated; recalculate the old-segment
@@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags)
72537255
replayPtr = GetXLogReplayRecPtr(&replayTLI);
72547256
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
72557257
KeepLogSeg(endptr, &_logSegNo);
7256-
if (InvalidateObsoleteReplicationSlots(_logSegNo))
7258+
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
7259+
_logSegNo, InvalidOid,
7260+
InvalidTransactionId))
72577261
{
72587262
/*
72597263
* Some slots have been invalidated; recalculate the old-segment

src/backend/catalog/system_views.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS
10001000
L.confirmed_flush_lsn,
10011001
L.wal_status,
10021002
L.safe_wal_size,
1003-
L.two_phase
1003+
L.two_phase,
1004+
L.conflicting
10041005
FROM pg_get_replication_slots() AS L
10051006
LEFT JOIN pg_database D ON (L.datoid = D.oid);
10061007

src/backend/replication/logical/logical.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
531531
NameStr(MyReplicationSlot->data.name)),
532532
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
533533

534+
if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
535+
ereport(ERROR,
536+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
537+
errmsg("can no longer get changes from replication slot \"%s\"",
538+
NameStr(MyReplicationSlot->data.name)),
539+
errdetail("This slot has been invalidated because it was conflicting with recovery.")));
540+
534541
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
535542
Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
536543

src/backend/replication/slot.c

Lines changed: 125 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
12411241
}
12421242

12431243
/*
1244-
* Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1245-
* and mark it invalid, if necessary and possible.
1244+
* Report that replication slot needs to be invalidated
1245+
*/
1246+
static void
1247+
ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1248+
bool terminating,
1249+
int pid,
1250+
NameData slotname,
1251+
XLogRecPtr restart_lsn,
1252+
XLogRecPtr oldestLSN,
1253+
TransactionId snapshotConflictHorizon)
1254+
{
1255+
StringInfoData err_detail;
1256+
bool hint = false;
1257+
1258+
initStringInfo(&err_detail);
1259+
1260+
switch (cause)
1261+
{
1262+
case RS_INVAL_WAL_REMOVED:
1263+
hint = true;
1264+
appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
1265+
LSN_FORMAT_ARGS(restart_lsn),
1266+
(unsigned long long) (oldestLSN - restart_lsn));
1267+
break;
1268+
case RS_INVAL_HORIZON:
1269+
appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1270+
snapshotConflictHorizon);
1271+
break;
1272+
1273+
case RS_INVAL_WAL_LEVEL:
1274+
appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
1275+
break;
1276+
case RS_INVAL_NONE:
1277+
pg_unreachable();
1278+
}
1279+
1280+
ereport(LOG,
1281+
terminating ?
1282+
errmsg("terminating process %d to release replication slot \"%s\"",
1283+
pid, NameStr(slotname)) :
1284+
errmsg("invalidating obsolete replication slot \"%s\"",
1285+
NameStr(slotname)),
1286+
errdetail_internal("%s", err_detail.data),
1287+
hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
1288+
1289+
pfree(err_detail.data);
1290+
}
1291+
1292+
/*
1293+
* Helper for InvalidateObsoleteReplicationSlots
1294+
*
1295+
* Acquires the given slot and mark it invalid, if necessary and possible.
12461296
*
12471297
* Returns whether ReplicationSlotControlLock was released in the interim (and
12481298
* in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
12531303
* for syscalls, so caller must restart if we return true.
12541304
*/
12551305
static bool
1256-
InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
1306+
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1307+
ReplicationSlot *s,
1308+
XLogRecPtr oldestLSN,
1309+
Oid dboid, TransactionId snapshotConflictHorizon,
12571310
bool *invalidated)
12581311
{
12591312
int last_signaled_pid = 0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12641317
XLogRecPtr restart_lsn;
12651318
NameData slotname;
12661319
int active_pid = 0;
1320+
ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
12671321

12681322
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
12691323

@@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12861340
restart_lsn = s->data.restart_lsn;
12871341

12881342
/*
1289-
* If the slot is already invalid or is fresh enough, we don't need to
1290-
* do anything.
1343+
* If the slot is already invalid or is a non conflicting slot, we
1344+
* don't need to do anything.
12911345
*/
1292-
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
1346+
if (s->data.invalidated == RS_INVAL_NONE)
1347+
{
1348+
switch (cause)
1349+
{
1350+
case RS_INVAL_WAL_REMOVED:
1351+
if (s->data.restart_lsn != InvalidXLogRecPtr &&
1352+
s->data.restart_lsn < oldestLSN)
1353+
conflict = cause;
1354+
break;
1355+
case RS_INVAL_HORIZON:
1356+
if (!SlotIsLogical(s))
1357+
break;
1358+
/* invalid DB oid signals a shared relation */
1359+
if (dboid != InvalidOid && dboid != s->data.database)
1360+
break;
1361+
if (TransactionIdIsValid(s->effective_xmin) &&
1362+
TransactionIdPrecedesOrEquals(s->effective_xmin,
1363+
snapshotConflictHorizon))
1364+
conflict = cause;
1365+
else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
1366+
TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
1367+
snapshotConflictHorizon))
1368+
conflict = cause;
1369+
break;
1370+
case RS_INVAL_WAL_LEVEL:
1371+
if (SlotIsLogical(s))
1372+
conflict = cause;
1373+
break;
1374+
case RS_INVAL_NONE:
1375+
pg_unreachable();
1376+
}
1377+
}
1378+
1379+
/* if there's no conflict, we're done */
1380+
if (conflict == RS_INVAL_NONE)
12931381
{
12941382
SpinLockRelease(&s->mutex);
12951383
if (released_lock)
@@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13091397
{
13101398
MyReplicationSlot = s;
13111399
s->active_pid = MyProcPid;
1312-
s->data.invalidated = RS_INVAL_WAL_REMOVED;
1400+
s->data.invalidated = conflict;
13131401

13141402
/*
13151403
* XXX: We should consider not overwriting restart_lsn and instead
13161404
* just rely on .invalidated.
13171405
*/
1318-
s->data.restart_lsn = InvalidXLogRecPtr;
1406+
if (conflict == RS_INVAL_WAL_REMOVED)
1407+
s->data.restart_lsn = InvalidXLogRecPtr;
13191408

13201409
/* Let caller know */
13211410
*invalidated = true;
@@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13491438
*/
13501439
if (last_signaled_pid != active_pid)
13511440
{
1352-
ereport(LOG,
1353-
errmsg("terminating process %d to release replication slot \"%s\"",
1354-
active_pid, NameStr(slotname)),
1355-
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1356-
LSN_FORMAT_ARGS(restart_lsn),
1357-
(unsigned long long) (oldestLSN - restart_lsn)),
1358-
errhint("You might need to increase max_slot_wal_keep_size."));
1441+
ReportSlotInvalidation(conflict, true, active_pid,
1442+
slotname, restart_lsn,
1443+
oldestLSN, snapshotConflictHorizon);
13591444

13601445
(void) kill(active_pid, SIGTERM);
13611446
last_signaled_pid = active_pid;
@@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13901475
ReplicationSlotMarkDirty();
13911476
ReplicationSlotSave();
13921477
ReplicationSlotRelease();
1478+
pgstat_drop_replslot(s);
13931479

1394-
ereport(LOG,
1395-
errmsg("invalidating obsolete replication slot \"%s\"",
1396-
NameStr(slotname)),
1397-
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1398-
LSN_FORMAT_ARGS(restart_lsn),
1399-
(unsigned long long) (oldestLSN - restart_lsn)),
1400-
errhint("You might need to increase max_slot_wal_keep_size."));
1480+
ReportSlotInvalidation(conflict, false, active_pid,
1481+
slotname, restart_lsn,
1482+
oldestLSN, snapshotConflictHorizon);
14011483

14021484
/* done with this slot for now */
14031485
break;
@@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
14101492
}
14111493

14121494
/*
1413-
* Mark any slot that points to an LSN older than the given segment
1414-
* as invalid; it requires WAL that's about to be removed.
1495+
* Invalidate slots that require resources about to be removed.
14151496
*
14161497
* Returns true when any slot have got invalidated.
14171498
*
1499+
* Whether a slot needs to be invalidated depends on the cause. A slot is
1500+
* removed if it:
1501+
* - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1502+
* - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1503+
* db; dboid may be InvalidOid for shared relations
1504+
* - RS_INVAL_WAL_LEVEL: is logical
1505+
*
14181506
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
14191507
*/
14201508
bool
1421-
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1509+
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1510+
XLogSegNo oldestSegno, Oid dboid,
1511+
TransactionId snapshotConflictHorizon)
14221512
{
14231513
XLogRecPtr oldestLSN;
14241514
bool invalidated = false;
14251515

1516+
Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1517+
Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1518+
Assert(cause != RS_INVAL_NONE);
1519+
1520+
if (max_replication_slots == 0)
1521+
return invalidated;
1522+
14261523
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
14271524

14281525
restart:
@@ -1434,7 +1531,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
14341531
if (!s->in_use)
14351532
continue;
14361533

1437-
if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
1534+
if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1535+
snapshotConflictHorizon,
1536+
&invalidated))
14381537
{
14391538
/* if the lock was released, start from scratch */
14401539
goto restart;

src/backend/replication/slotfuncs.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
232232
Datum
233233
pg_get_replication_slots(PG_FUNCTION_ARGS)
234234
{
235-
#define PG_GET_REPLICATION_SLOTS_COLS 14
235+
#define PG_GET_REPLICATION_SLOTS_COLS 15
236236
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
237237
XLogRecPtr currlsn;
238238
int slotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
402402

403403
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
404404

405+
if (slot_contents.data.database == InvalidOid)
406+
nulls[i++] = true;
407+
else
408+
{
409+
if (slot_contents.data.invalidated != RS_INVAL_NONE)
410+
values[i++] = BoolGetDatum(true);
411+
else
412+
values[i++] = BoolGetDatum(false);
413+
}
414+
405415
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
406416

407417
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/* yyyymmddN */
60-
#define CATALOG_VERSION_NO 202304072
60+
#define CATALOG_VERSION_NO 202304073
6161

6262
#endif

src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11077,9 +11077,9 @@
1107711077
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
1107811078
proretset => 't', provolatile => 's', prorettype => 'record',
1107911079
proargtypes => '',
11080-
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
11081-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11082-
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
11080+
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
11081+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11082+
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
1108311083
prosrc => 'pg_get_replication_slots' },
1108411084
{ oid => '3786', descr => 'set up a logical replication slot',
1108511085
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

src/include/replication/slot.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
4646
RS_INVAL_NONE,
4747
/* required WAL has been removed */
4848
RS_INVAL_WAL_REMOVED,
49+
/* required rows have been removed */
50+
RS_INVAL_HORIZON,
51+
/* wal_level insufficient for slot */
52+
RS_INVAL_WAL_LEVEL,
4953
} ReplicationSlotInvalidationCause;
5054

5155
/*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
226230
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
227231
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
228232
extern void ReplicationSlotsDropDBSlots(Oid dboid);
229-
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
233+
extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
234+
XLogSegNo oldestSegno,
235+
Oid dboid,
236+
TransactionId snapshotConflictHorizon);
230237
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
231238
extern int ReplicationSlotIndex(ReplicationSlot *slot);
232239
extern bool ReplicationSlotName(int index, Name name);

src/test/regress/expected/rules.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
14721472
l.confirmed_flush_lsn,
14731473
l.wal_status,
14741474
l.safe_wal_size,
1475-
l.two_phase
1476-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
1475+
l.two_phase,
1476+
l.conflicting
1477+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
14771478
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14781479
pg_roles| SELECT pg_authid.rolname,
14791480
pg_authid.rolsuper,

0 commit comments

Comments
 (0)