diff options
author | Peter Eisentraut | 2020-03-10 07:42:59 +0000 |
---|---|---|
committer | Peter Eisentraut | 2020-03-10 08:09:32 +0000 |
commit | 17b9e7f9fe238eeb5f6b40061b444ebf28d9e06f (patch) | |
tree | 7c6f8d87b72708aeeb5f800ec6384cc19c927b63 /src/backend | |
parent | 61d7c7bce3686ec02bd64abac742dd35ed9b9b01 (diff) |
Support adding partitioned tables to publication
When a partitioned table is added to a publication, changes of all of
its partitions (current or future) are published via that publication.
This change only affects which tables a publication considers as its
members. The receiving side still sees the data coming from the
individual leaf partitions. So existing restrictions that partition
hierarchies can only be replicated one-to-one are not changed by this.
Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 85 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 23 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 1 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 19 |
4 files changed, 106 insertions, 22 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c5eea7af3fb..500a5ae1ee0 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -24,8 +24,10 @@ #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -40,6 +42,8 @@ #include "utils/rel.h" #include "utils/syscache.h" +static List *get_rel_publications(Oid relid); + /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -47,17 +51,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -221,12 +218,37 @@ publication_add_relation(Oid pubid, Relation targetrel, /* - * Gets list of publication oids for a relation oid. + * Gets list of publication oids for a relation, plus those of ancestors, + * if any, if the relation is a partition. */ List * GetRelationPublications(Oid relid) { List *result = NIL; + + result = get_rel_publications(relid); + if (get_rel_relispartition(relid)) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *ancestor_pubs = get_rel_publications(ancestor); + + result = list_concat(result, ancestor_pubs); + } + } + + return result; +} + +/* Workhorse of GetRelationPublications() */ +static List * +get_rel_publications(Oid relid) +{ + List *result = NIL; CatCList *pubrellist; int i; @@ -253,7 +275,7 @@ GetRelationPublications(Oid relid) * should use GetAllTablesPublicationRelations(). */ List * -GetPublicationRelations(Oid pubid) +GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) { List *result; Relation pubrelsrel; @@ -279,7 +301,31 @@ GetPublicationRelations(Oid pubid) pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = lappend_oid(result, pubrel->prrelid); + if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && + pub_partopt != PUBLICATION_PART_ROOT) + { + List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock, + NULL); + + if (pub_partopt == PUBLICATION_PART_ALL) + result = list_concat(result, all_parts); + else if (pub_partopt == PUBLICATION_PART_LEAF) + { + ListCell *lc; + + foreach(lc, all_parts) + { + Oid partOid = lfirst_oid(lc); + + if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) + result = lappend_oid(result, partOid); + } + } + else + Assert(false); + } + else + result = lappend_oid(result, pubrel->prrelid); } systable_endscan(scan); @@ -480,10 +526,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ if (publication->alltables) tables = GetAllTablesPublicationRelations(); else - tables = GetPublicationRelations(publication->oid); + tables = GetPublicationRelations(publication->oid, + PUBLICATION_PART_LEAF); funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index eb4d22cc2a8..768c2184e13 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, } else { - List *relids = GetPublicationRelations(pubform->oid); + /* + * For any partitioned tables contained in the publication, we must + * invalidate all partitions contained in the respective partition + * trees, not just those explicitly mentioned in the publication. + */ + List *relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); /* * We don't want to send too many individual messages, at some point @@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { - List *oldrelids = GetPublicationRelations(pubid); + List *oldrelids = GetPublicationRelations(pubid, + PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +547,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488c..98825f01e98 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -761,6 +761,7 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 752508213af..552a70cffa5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ @@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubtruncate) continue; + /* + * Don't send partitioned tables, because partitions should be sent + * instead. + */ + if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + continue; + relids[nrelids++] = relid; maybe_send_schema(ctx, relation, relentry); } @@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * This looks up publications that the given relation is directly or + * indirectly part of (the latter if it's really the relation's ancestor that + * is part of a publication) and fills up the found entry with the information + * about which operations to publish. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) |