Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Eisentraut2020-03-10 07:42:59 +0000
committerPeter Eisentraut2020-03-10 08:09:32 +0000
commit17b9e7f9fe238eeb5f6b40061b444ebf28d9e06f (patch)
tree7c6f8d87b72708aeeb5f800ec6384cc19c927b63 /src/backend
parent61d7c7bce3686ec02bd64abac742dd35ed9b9b01 (diff)
Support adding partitioned tables to publication
When a partitioned table is added to a publication, changes of all of its partitions (current or future) are published via that publication. This change only affects which tables a publication considers as its members. The receiving side still sees the data coming from the individual leaf partitions. So existing restrictions that partition hierarchies can only be replicated one-to-one are not changed by this. Author: Amit Langote <amitlangote09@gmail.com> Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com> Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_publication.c85
-rw-r--r--src/backend/commands/publicationcmds.c23
-rw-r--r--src/backend/replication/logical/tablesync.c1
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c19
4 files changed, 106 insertions, 22 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index c5eea7af3fb..500a5ae1ee0 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -24,8 +24,10 @@
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
+#include "catalog/partition.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
+#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
@@ -40,6 +42,8 @@
#include "utils/rel.h"
#include "utils/syscache.h"
+static List *get_rel_publications(Oid relid);
+
/*
* Check if relation can be in given publication and throws appropriate
* error if not.
@@ -47,17 +51,9 @@
static void
check_publication_add_relation(Relation targetrel)
{
- /* Give more specific error for partitioned tables */
- if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("\"%s\" is a partitioned table",
- RelationGetRelationName(targetrel)),
- errdetail("Adding partitioned tables to publications is not supported."),
- errhint("You can add the table partitions individually.")));
-
- /* Must be table */
- if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
+ /* Must be a regular or partitioned table */
+ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
+ RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("\"%s\" is not a table",
@@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
- return reltuple->relkind == RELKIND_RELATION &&
+ return (reltuple->relkind == RELKIND_RELATION ||
+ reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -221,12 +218,37 @@ publication_add_relation(Oid pubid, Relation targetrel,
/*
- * Gets list of publication oids for a relation oid.
+ * Gets list of publication oids for a relation, plus those of ancestors,
+ * if any, if the relation is a partition.
*/
List *
GetRelationPublications(Oid relid)
{
List *result = NIL;
+
+ result = get_rel_publications(relid);
+ if (get_rel_relispartition(relid))
+ {
+ List *ancestors = get_partition_ancestors(relid);
+ ListCell *lc;
+
+ foreach(lc, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc);
+ List *ancestor_pubs = get_rel_publications(ancestor);
+
+ result = list_concat(result, ancestor_pubs);
+ }
+ }
+
+ return result;
+}
+
+/* Workhorse of GetRelationPublications() */
+static List *
+get_rel_publications(Oid relid)
+{
+ List *result = NIL;
CatCList *pubrellist;
int i;
@@ -253,7 +275,7 @@ GetRelationPublications(Oid relid)
* should use GetAllTablesPublicationRelations().
*/
List *
-GetPublicationRelations(Oid pubid)
+GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
{
List *result;
Relation pubrelsrel;
@@ -279,7 +301,31 @@ GetPublicationRelations(Oid pubid)
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
- result = lappend_oid(result, pubrel->prrelid);
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
+ pub_partopt != PUBLICATION_PART_ROOT)
+ {
+ List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
+ NULL);
+
+ if (pub_partopt == PUBLICATION_PART_ALL)
+ result = list_concat(result, all_parts);
+ else if (pub_partopt == PUBLICATION_PART_LEAF)
+ {
+ ListCell *lc;
+
+ foreach(lc, all_parts)
+ {
+ Oid partOid = lfirst_oid(lc);
+
+ if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
+ result = lappend_oid(result, partOid);
+ }
+ }
+ else
+ Assert(false);
+ }
+ else
+ result = lappend_oid(result, pubrel->prrelid);
}
systable_endscan(scan);
@@ -480,10 +526,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
publication = GetPublicationByName(pubname, false);
+
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */
if (publication->alltables)
tables = GetAllTablesPublicationRelations();
else
- tables = GetPublicationRelations(publication->oid);
+ tables = GetPublicationRelations(publication->oid,
+ PUBLICATION_PART_LEAF);
funcctx->user_fctx = (void *) tables;
MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index eb4d22cc2a8..768c2184e13 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
}
else
{
- List *relids = GetPublicationRelations(pubform->oid);
+ /*
+ * For any partitioned tables contained in the publication, we must
+ * invalidate all partitions contained in the respective partition
+ * trees, not just those explicitly mentioned in the publication.
+ */
+ List *relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
/*
* We don't want to send too many individual messages, at some point
@@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
- List *oldrelids = GetPublicationRelations(pubid);
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
List *delrels = NIL;
ListCell *oldlc;
@@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid)
/*
* Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
*/
static List *
OpenTableList(List *tables)
@@ -539,8 +547,13 @@ OpenTableList(List *tables)
rels = lappend(rels, rel);
relids = lappend_oid(relids, myrelid);
- /* Add children of this rel, if requested */
- if (recurse)
+ /*
+ * Add children of this rel, if requested, so that they too are added
+ * to the publication. A partitioned table can't have any inheritance
+ * children other than its partitions, which need not be explicitly
+ * added to the publication.
+ */
+ if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
{
List *children;
ListCell *child;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f8183cd488c..98825f01e98 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -761,6 +761,7 @@ copy_table(Relation rel)
/* Map the publisher relation to local one. */
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
+ Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
/* Start copy on the publisher. */
initStringInfo(&cmd);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 752508213af..552a70cffa5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
-/* Entry in the map used to remember which relation schemas we sent. */
+/*
+ * Entry in the map used to remember which relation schemas we sent.
+ *
+ * For partitions, 'pubactions' considers not only the table's own
+ * publications, but also those of all of its ancestors.
+ */
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
@@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!relentry->pubactions.pubtruncate)
continue;
+ /*
+ * Don't send partitioned tables, because partitions should be sent
+ * instead.
+ */
+ if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ continue;
+
relids[nrelids++] = relid;
maybe_send_schema(ctx, relation, relentry);
}
@@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx)
/*
* Find or create entry in the relation schema cache.
+ *
+ * This looks up publications that the given relation is directly or
+ * indirectly part of (the latter if it's really the relation's ancestor that
+ * is part of a publication) and fills up the found entry with the information
+ * about which operations to publish.
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Oid relid)