Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 85f7198

Browse files
author
Commitfest Bot
committed
[CF 5629] v12 - Selectively invalidate caches in pgoutput module
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/5629 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/OSCPR01MB1496634BCDF6475BF0AC6658BF5D02@OSCPR01MB14966.jpnprd01.prod.outlook.com Author(s): Hayato Kuroda
2 parents f554a95 + c4f7b28 commit 85f7198

File tree

10 files changed

+297
-16
lines changed

10 files changed

+297
-16
lines changed

src/backend/access/rmgrdesc/standbydesc.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
132132
appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
133133
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
134134
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
135+
else if (msg->id == SHAREDINVALRELSYNC_ID)
136+
appendStringInfo(buf, " relsync %u", msg->rs.relid);
135137
else
136138
appendStringInfo(buf, " unrecognized id %d", msg->id);
137139
}

src/backend/commands/alter.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
338338

339339
InvokeObjectPostAlterHook(classId, objectId, 0);
340340

341+
/* Do post catalog-update tasks */
342+
if (classId == PublicationRelationId)
343+
{
344+
Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
345+
346+
/*
347+
* Invalidate relsynccache entries.
348+
*
349+
* Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
350+
* publication does not impact the publication status of tables. So, we
351+
* don't need to invalidate relcache to rebuild the rd_pubdesc.
352+
* Instead, we invalidate only the relsyncache.
353+
*/
354+
InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
355+
}
356+
341357
/* Release memory */
342358
pfree(values);
343359
pfree(nulls);

src/backend/commands/publicationcmds.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
491491
return *invalid_column_list || *invalid_gen_col;
492492
}
493493

494+
/*
495+
* Invalidate entries in the RelationSyncCache for relations included in the
496+
* specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
497+
*
498+
* If 'puballtables' is true, invalidate all cache entries.
499+
*/
500+
void
501+
InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
502+
{
503+
if (puballtables)
504+
{
505+
CacheInvalidateRelSyncAll();
506+
}
507+
else
508+
{
509+
List *relids = NIL;
510+
List *schemarelids = NIL;
511+
512+
/*
513+
* For partitioned tables, we must invalidate all partitions and
514+
* itself. WAL records for INSERT/UPDATE/DELETE specify leaf
515+
* tables as a target. However, WAL records for TRUNCATE specify
516+
* both a root and its leaves.
517+
*/
518+
relids = GetPublicationRelations(pubid,
519+
PUBLICATION_PART_ALL);
520+
schemarelids = GetAllSchemaPublicationRelations(pubid,
521+
PUBLICATION_PART_ALL);
522+
523+
relids = list_concat_unique_oid(relids, schemarelids);
524+
525+
/* Invalidate the relsyncache */
526+
foreach_oid(relid, relids)
527+
CacheInvalidateRelSync(relid);
528+
}
529+
530+
return;
531+
}
532+
494533
/* check_functions_in_node callback */
495534
static bool
496535
contain_mutable_or_user_functions_checker(Oid func_id, void *context)

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
531531
CacheRegisterSyscacheCallback(PUBLICATIONOID,
532532
publication_invalidation_cb,
533533
(Datum) 0);
534+
CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
535+
(Datum) 0);
534536
publication_callback_registered = true;
535537
}
536538

@@ -1789,12 +1791,6 @@ static void
17891791
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
17901792
{
17911793
publications_valid = false;
1792-
1793-
/*
1794-
* Also invalidate per-relation cache so that next time the filtering info
1795-
* is checked it will be updated with the new publication settings.
1796-
*/
1797-
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
17981794
}
17991795

18001796
/*

src/backend/utils/cache/inval.c

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ int debug_discard_caches = 0;
271271

272272
#define MAX_SYSCACHE_CALLBACKS 64
273273
#define MAX_RELCACHE_CALLBACKS 10
274+
#define MAX_RELSYNC_CALLBACKS 10
274275

275276
static struct SYSCACHECALLBACK
276277
{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
292293

293294
static int relcache_callback_count = 0;
294295

296+
static struct RELSYNCCALLBACK
297+
{
298+
RelSyncCallbackFunction function;
299+
Datum arg;
300+
} relsync_callback_list[MAX_RELSYNC_CALLBACKS];
301+
302+
static int relsync_callback_count = 0;
303+
304+
295305
/* ----------------------------------------------------------------
296306
* Invalidation subgroup support functions
297307
* ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
484494
AddInvalidationMessage(group, RelCacheMsgs, &msg);
485495
}
486496

497+
/*
498+
* Add a relsync inval entry
499+
*
500+
* We put these into the relcache subgroup for simplicity. This message is the
501+
* same as AddRelcacheInvalidationMessage() except that it is for
502+
* RelationSyncCache maintained by decoding plugin pgoutput.
503+
*/
504+
static void
505+
AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
506+
Oid dbId, Oid relId)
507+
{
508+
SharedInvalidationMessage msg;
509+
510+
/* Don't add a duplicate item. */
511+
ProcessMessageSubGroup(group, RelCacheMsgs,
512+
if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
513+
(msg->rc.relId == relId ||
514+
msg->rc.relId == InvalidOid))
515+
return);
516+
517+
/* OK, add the item */
518+
msg.rc.id = SHAREDINVALRELSYNC_ID;
519+
msg.rc.dbId = dbId;
520+
msg.rc.relId = relId;
521+
/* check AddCatcacheInvalidationMessage() for an explanation */
522+
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
523+
524+
AddInvalidationMessage(group, RelCacheMsgs, &msg);
525+
}
526+
487527
/*
488528
* Add a snapshot inval entry
489529
*
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
611651
info->RelcacheInitFileInval = true;
612652
}
613653

654+
/*
655+
* RegisterRelsyncInvalidation
656+
*
657+
* As above, but register a relsynccache invalidation event.
658+
*/
659+
static void
660+
RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
661+
{
662+
AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
663+
}
664+
614665
/*
615666
* RegisterSnapshotInvalidation
616667
*
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
751802

752803
ccitem->function(ccitem->arg, InvalidOid);
753804
}
805+
806+
for (i = 0; i < relsync_callback_count; i++)
807+
{
808+
struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
809+
810+
ccitem->function(ccitem->arg, InvalidOid);
811+
}
754812
}
755813

756814
/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
832890
else if (msg->sn.dbId == MyDatabaseId)
833891
InvalidateCatalogSnapshot();
834892
}
893+
else if (msg->id == SHAREDINVALRELSYNC_ID)
894+
{
895+
/* We only care about our own database */
896+
if (msg->rs.dbId == MyDatabaseId)
897+
CallRelSyncCallbacks(msg->rs.relid);
898+
}
835899
else
836900
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
837901
}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
16211685
ReleaseSysCache(tup);
16221686
}
16231687

1688+
/*
1689+
* CacheInvalidateRelSync
1690+
* Register invalidation of the cache in logical decoding output plugin
1691+
* for a database.
1692+
*
1693+
* This type of invalidation message is used for the specific purpose of output
1694+
* plugins. Processes which do not decode WALs would do nothing even when it
1695+
* receives the message.
1696+
*/
1697+
void
1698+
CacheInvalidateRelSync(Oid relid)
1699+
{
1700+
RegisterRelsyncInvalidation(PrepareInvalidationState(),
1701+
MyDatabaseId, relid);
1702+
}
1703+
1704+
/*
1705+
* CacheInvalidateRelSyncAll
1706+
* Register invalidation of the whole cache in logical decoding output
1707+
* plugin.
1708+
*/
1709+
void
1710+
CacheInvalidateRelSyncAll(void)
1711+
{
1712+
CacheInvalidateRelSync(InvalidOid);
1713+
}
16241714

16251715
/*
16261716
* CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
17631853
++relcache_callback_count;
17641854
}
17651855

1856+
/*
1857+
* CacheRegisterRelSyncCallback
1858+
* Register the specified function to be called for all future
1859+
* relsynccache invalidation events.
1860+
*
1861+
* This function is intended to be call from the logical decoding output
1862+
* plugins.
1863+
*/
1864+
void
1865+
CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
1866+
Datum arg)
1867+
{
1868+
if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
1869+
elog(FATAL, "out of relsync_callback_list slots");
1870+
1871+
relsync_callback_list[relsync_callback_count].function = func;
1872+
relsync_callback_list[relsync_callback_count].arg = arg;
1873+
1874+
++relsync_callback_count;
1875+
}
1876+
17661877
/*
17671878
* CallSyscacheCallbacks
17681879
*
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
17881899
}
17891900
}
17901901

1902+
/*
1903+
* CallSyscacheCallbacks
1904+
*/
1905+
void
1906+
CallRelSyncCallbacks(Oid relid)
1907+
{
1908+
for (int i = 0; i < relsync_callback_count; i++)
1909+
{
1910+
struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
1911+
1912+
ccitem->function(ccitem->arg, relid);
1913+
}
1914+
}
1915+
17911916
/*
17921917
* LogLogicalInvalidations
17931918
*

src/include/commands/publicationcmds.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
3838
char pubgencols_type,
3939
bool *invalid_column_list,
4040
bool *invalid_gen_col);
41+
extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables);
4142

4243
#endif /* PUBLICATIONCMDS_H */

src/include/pg_config_manual.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,10 @@
282282

283283
/*
284284
* For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
285-
* use of the debug_discard_caches GUC to aggressively flush syscache/relcache
286-
* entries whenever it's possible to deliver invalidations. See
287-
* AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
288-
* details.
285+
* use of the debug_discard_caches GUC to aggressively flush
286+
* syscache/relcache/relsynccache entries whenever it's possible to deliver
287+
* invalidations. See AcceptInvalidationMessages() in
288+
* src/backend/utils/cache/inval.c for details.
289289
*
290290
* USE_ASSERT_CHECKING builds default to enabling this. It's possible to use
291291
* DISCARD_CACHES_ENABLED without a cassert build and the implied

src/include/storage/sinval.h

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* * invalidate an smgr cache entry for a specific physical relation
2828
* * invalidate the mapped-relation mapping for a given database
2929
* * invalidate any saved snapshot that might be used to scan a given relation
30+
* * invalidate a RelationSyncCache entry for a specific relation
3031
* More types could be added if needed. The message type is identified by
3132
* the first "int8" field of the message struct. Zero or positive means a
3233
* specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
4647
* catcache inval messages must be generated for each of its caches, since
4748
* the hash keys will generally be different.
4849
*
49-
* Catcache, relcache, and snapshot invalidations are transactional, and so
50-
* are sent to other backends upon commit. Internally to the generating
51-
* backend, they are also processed at CommandCounterIncrement so that later
52-
* commands in the same transaction see the new state. The generating backend
53-
* also has to process them at abort, to flush out any cache state it's loaded
54-
* from no-longer-valid entries.
50+
* Catcache, relcache, relsynccache, and snapshot invalidations are
51+
* transactional, and so are sent to other backends upon commit. Internally
52+
* to the generating backend, they are also processed at
53+
* CommandCounterIncrement so that later commands in the same transaction see
54+
* the new state. The generating backend also has to process them at abort,
55+
* to flush out any cache state it's loaded from no-longer-valid entries.
5556
*
5657
* smgr and relation mapping invalidations are non-transactional: they are
5758
* sent immediately when the underlying file change is made.
@@ -110,6 +111,15 @@ typedef struct
110111
Oid relId; /* relation ID */
111112
} SharedInvalSnapshotMsg;
112113

114+
#define SHAREDINVALRELSYNC_ID (-6)
115+
116+
typedef struct
117+
{
118+
int8 id; /* type field --- must be first */
119+
Oid dbId; /* database ID */
120+
Oid relid; /* relation ID, or 0 if whole RelationSyncCache */
121+
} SharedInvalRelSyncMsg;
122+
113123
typedef union
114124
{
115125
int8 id; /* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
119129
SharedInvalSmgrMsg sm;
120130
SharedInvalRelmapMsg rm;
121131
SharedInvalSnapshotMsg sn;
132+
SharedInvalRelSyncMsg rs;
122133
} SharedInvalidationMessage;
123134

124135

src/include/utils/inval.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
2222

2323
typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
2424
typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
25+
typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
2526

2627

2728
extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
5556

5657
extern void CacheInvalidateRelcacheByRelid(Oid relid);
5758

59+
extern void CacheInvalidateRelSync(Oid relid);
60+
61+
extern void CacheInvalidateRelSyncAll(void);
62+
5863
extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
5964

6065
extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
6671
extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
6772
Datum arg);
6873

74+
extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
75+
Datum arg);
76+
6977
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
7078

79+
extern void CallRelSyncCallbacks(Oid relid);
80+
7181
extern void InvalidateSystemCaches(void);
7282
extern void InvalidateSystemCachesExtended(bool debug_discard);
7383

0 commit comments

Comments
 (0)