Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2024-11-07 03:28:49 +0000
committerAmit Kapila2024-11-07 03:28:49 +0000
commit7054186c4ebe24e63254651e2ae9b36efae90d4e (patch)
tree56f43479b5f7c127128b91697da9db84242bc67e /src/backend
parent70291a3c66eca599fd9f59f7f6051432b2020f4b (diff)
Replicate generated columns when 'publish_generated_columns' is set.
This patch builds on the work done in commit 745217a051 by enabling the replication of generated columns alongside regular column changes through a new publication parameter: publish_generated_columns. Example usage: CREATE PUBLICATION pub1 FOR TABLE tab_gencol WITH (publish_generated_columns = true); The column list takes precedence. If the generated columns are specified in the column list, they will be replicated even if 'publish_generated_columns' is set to false. Conversely, if generated columns are not included in the column list (assuming the user specifies a column list), they will not be replicated even if 'publish_generated_columns' is true. Author: Vignesh C, Shubham Khanna Reviewed-by: Peter Smith, Amit Kapila, Hayato Kuroda, Shlok Kyal, Ajin Cherian, Hou Zhijie, Masahiko Sawada Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_publication.c73
-rw-r--r--src/backend/commands/publicationcmds.c33
-rw-r--r--src/backend/replication/logical/proto.c69
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c201
4 files changed, 254 insertions, 122 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 17a6093d069..09e2dbdd10a 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -257,6 +257,52 @@ is_schema_publication(Oid pubid)
}
/*
+ * Returns true if the relation has column list associated with the
+ * publication, false otherwise.
+ *
+ * If a column list is found, the corresponding bitmap is returned through the
+ * cols parameter, if provided. The bitmap is constructed within the given
+ * memory context (mcxt).
+ */
+bool
+check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
+ Bitmapset **cols)
+{
+ HeapTuple cftuple;
+ bool found = false;
+
+ if (pub->alltables)
+ return false;
+
+ cftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pub->oid));
+ if (HeapTupleIsValid(cftuple))
+ {
+ Datum cfdatum;
+ bool isnull;
+
+ /* Lookup the column list attribute. */
+ cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
+ Anum_pg_publication_rel_prattrs, &isnull);
+
+ /* Was a column list found? */
+ if (!isnull)
+ {
+ /* Build the column list bitmap in the given memory context. */
+ if (cols)
+ *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
+
+ found = true;
+ }
+
+ ReleaseSysCache(cftuple);
+ }
+
+ return found;
+}
+
+/*
* Gets the relations based on the publication partition option for a specified
* relation.
*/
@@ -574,6 +620,30 @@ pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
}
/*
+ * Returns a bitmap representing the columns of the specified table.
+ *
+ * Generated columns are included if include_gencols is true.
+ */
+Bitmapset *
+pub_form_cols_map(Relation relation, bool include_gencols)
+{
+ Bitmapset *result = NULL;
+ TupleDesc desc = RelationGetDescr(relation);
+
+ for (int i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || (att->attgenerated && !include_gencols))
+ continue;
+
+ result = bms_add_member(result, att->attnum);
+ }
+
+ return result;
+}
+
+/*
* Insert new publication / schema mapping.
*/
ObjectAddress
@@ -998,6 +1068,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
pub->pubviaroot = pubform->pubviaroot;
+ pub->pubgencols = pubform->pubgencols;
ReleaseSysCache(tup);
@@ -1205,7 +1276,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
+ if (att->attisdropped || (att->attgenerated && !pub->pubgencols))
continue;
attnums[nattnums++] = att->attnum;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index d6ffef374ea..0129db18c6e 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -78,12 +78,15 @@ parse_publication_options(ParseState *pstate,
bool *publish_given,
PublicationActions *pubactions,
bool *publish_via_partition_root_given,
- bool *publish_via_partition_root)
+ bool *publish_via_partition_root,
+ bool *publish_generated_columns_given,
+ bool *publish_generated_columns)
{
ListCell *lc;
*publish_given = false;
*publish_via_partition_root_given = false;
+ *publish_generated_columns_given = false;
/* defaults */
pubactions->pubinsert = true;
@@ -91,6 +94,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
*publish_via_partition_root = false;
+ *publish_generated_columns = false;
/* Parse options */
foreach(lc, options)
@@ -151,6 +155,13 @@ parse_publication_options(ParseState *pstate,
*publish_via_partition_root_given = true;
*publish_via_partition_root = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "publish_generated_columns") == 0)
+ {
+ if (*publish_generated_columns_given)
+ errorConflictingDefElem(defel, pstate);
+ *publish_generated_columns_given = true;
+ *publish_generated_columns = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -737,6 +748,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
+ bool publish_generated_columns_given;
+ bool publish_generated_columns;
AclResult aclresult;
List *relations = NIL;
List *schemaidlist = NIL;
@@ -776,7 +789,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
stmt->options,
&publish_given, &pubactions,
&publish_via_partition_root_given,
- &publish_via_partition_root);
+ &publish_via_partition_root,
+ &publish_generated_columns_given,
+ &publish_generated_columns);
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
Anum_pg_publication_oid);
@@ -793,6 +808,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubtruncate);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
+ values[Anum_pg_publication_pubgencols - 1] =
+ BoolGetDatum(publish_generated_columns);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -878,6 +895,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
+ bool publish_generated_columns_given;
+ bool publish_generated_columns;
ObjectAddress obj;
Form_pg_publication pubform;
List *root_relids = NIL;
@@ -887,7 +906,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
stmt->options,
&publish_given, &pubactions,
&publish_via_partition_root_given,
- &publish_via_partition_root);
+ &publish_via_partition_root,
+ &publish_generated_columns_given,
+ &publish_generated_columns);
pubform = (Form_pg_publication) GETSTRUCT(tup);
@@ -997,6 +1018,12 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
replaces[Anum_pg_publication_pubviaroot - 1] = true;
}
+ if (publish_generated_columns_given)
+ {
+ values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
+ replaces[Anum_pg_publication_pubgencols - 1] = true;
+ }
+
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ac4af53feba..2c2085b2f98 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -30,10 +30,11 @@
#define TRUNCATE_RESTART_SEQS (1<<1)
static void logicalrep_write_attrs(StringInfo out, Relation rel,
- Bitmapset *columns);
+ Bitmapset *columns, bool include_gencols);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
- bool binary, Bitmapset *columns);
+ bool binary, Bitmapset *columns,
+ bool include_gencols);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -399,7 +400,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+ TupleTableSlot *newslot, bool binary,
+ Bitmapset *columns, bool include_gencols)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -411,7 +413,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
}
/*
@@ -444,7 +446,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
- bool binary, Bitmapset *columns)
+ bool binary, Bitmapset *columns, bool include_gencols)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -465,11 +467,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+ logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ include_gencols);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
}
/*
@@ -519,7 +522,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, bool binary,
- Bitmapset *columns)
+ Bitmapset *columns, bool include_gencols)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -539,7 +542,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+ logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
}
/*
@@ -655,7 +658,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
*/
void
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
- Bitmapset *columns)
+ Bitmapset *columns, bool include_gencols)
{
char *relname;
@@ -677,7 +680,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
- logicalrep_write_attrs(out, rel, columns);
+ logicalrep_write_attrs(out, rel, columns, include_gencols);
}
/*
@@ -754,7 +757,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
- bool binary, Bitmapset *columns)
+ bool binary, Bitmapset *columns, bool include_gencols)
{
TupleDesc desc;
Datum *values;
@@ -768,7 +771,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
nliveatts++;
@@ -786,7 +789,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
Form_pg_type typclass;
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
if (isnull[i])
@@ -904,7 +907,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
-logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
+ bool include_gencols)
{
TupleDesc desc;
int i;
@@ -919,7 +923,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
nliveatts++;
@@ -937,7 +941,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
Form_pg_attribute att = TupleDescAttr(desc, i);
uint8 flags = 0;
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@@ -1248,29 +1252,26 @@ logicalrep_message_type(LogicalRepMsgType action)
/*
* Check if the column 'att' of a table should be published.
*
- * 'columns' represents the column list specified for that table in the
- * publication.
+ * 'columns' represents the publication column list (if any) for that table.
*
- * Note that generated columns can be present only in 'columns' list.
+ * 'include_gencols' flag indicates whether generated columns should be
+ * published when there is no column list. Typically, this will have the same
+ * value as the 'publish_generated_columns' publication parameter.
+ *
+ * Note that generated columns can be published only when present in a
+ * publication column list, or when include_gencols is true.
*/
bool
-logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
+logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
+ bool include_gencols)
{
if (att->attisdropped)
return false;
- /*
- * Skip publishing generated columns if they are not included in the
- * column list.
- */
- if (!columns && att->attgenerated)
- return false;
-
- /*
- * Check if a column is covered by a column list.
- */
- if (columns && !bms_is_member(att->attnum, columns))
- return false;
+ /* If a column list is provided, publish only the cols in that list. */
+ if (columns)
+ return bms_is_member(att->attnum, columns);
- return true;
+ /* All non-generated columns are always published. */
+ return att->attgenerated ? include_gencols : true;
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 12c17359063..a6002b223df 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -84,9 +84,6 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
-static void send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx,
- Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@@ -129,6 +126,12 @@ typedef struct RelationSyncEntry
bool replicate_valid; /* overall validity flag for entry */
bool schema_sent;
+
+ /*
+ * This is set if the 'publish_generated_columns' parameter is true, and
+ * the relation contains generated columns.
+ */
+ bool include_gencols;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
@@ -213,6 +216,9 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx,
+ RelationSyncEntry *relentry);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -731,11 +737,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+ send_relation_and_attrs(ancestor, xid, ctx, relentry);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+ send_relation_and_attrs(relation, xid, ctx, relentry);
if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -749,9 +755,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
- Bitmapset *columns)
+ RelationSyncEntry *relentry)
{
TupleDesc desc = RelationGetDescr(relation);
+ Bitmapset *columns = relentry->columns;
+ bool include_gencols = relentry->include_gencols;
int i;
/*
@@ -766,7 +774,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
if (att->atttypid < FirstGenbkiObjectId)
@@ -778,7 +786,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, xid, relation, columns);
+ logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
OutputPluginWrite(ctx, false);
}
@@ -1005,6 +1013,66 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
}
/*
+ * If the table contains a generated column, check for any conflicting
+ * values of 'publish_generated_columns' parameter in the publications.
+ */
+static void
+check_and_init_gencol(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc desc = RelationGetDescr(relation);
+ bool gencolpresent = false;
+ bool first = true;
+
+ /* Check if there is any generated column present. */
+ for (int i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attgenerated)
+ {
+ gencolpresent = true;
+ break;
+ }
+ }
+
+ /* There are no generated columns to be published. */
+ if (!gencolpresent)
+ {
+ entry->include_gencols = false;
+ return;
+ }
+
+ /*
+ * There may be a conflicting value for 'publish_generated_columns'
+ * parameter in the publications.
+ */
+ foreach_ptr(Publication, pub, publications)
+ {
+ /*
+ * The column list takes precedence over the
+ * 'publish_generated_columns' parameter. Those will be checked later,
+ * see pgoutput_column_list_init.
+ */
+ if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
+ continue;
+
+ if (first)
+ {
+ entry->include_gencols = pub->pubgencols;
+ first = false;
+ }
+ else if (entry->include_gencols != pub->pubgencols)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation)));
+ }
+}
+
+/*
* Initialize the column list.
*/
static void
@@ -1014,6 +1082,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
ListCell *lc;
bool first = true;
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+ bool found_pub_collist = false;
+ Bitmapset *relcols = NULL;
+
+ pgoutput_ensure_entry_cxt(data, entry);
/*
* Find if there are any column lists for this relation. If there are,
@@ -1027,93 +1099,39 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
* fetch_table_list. But one can later change the publication so we still
* need to check all the given publication-table mappings and report an
* error if any publications have a different column list.
- *
- * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
- HeapTuple cftuple = NULL;
- Datum cfdatum = 0;
Bitmapset *cols = NULL;
+ /* Retrieve the bitmap of columns for a column list publication. */
+ found_pub_collist |= check_and_fetch_column_list(pub,
+ entry->publish_as_relid,
+ entry->entry_cxt, &cols);
+
/*
- * If the publication is FOR ALL TABLES then it is treated the same as
- * if there are no column lists (even if other publications have a
- * list).
+ * For non-column list publications — e.g. TABLE (without a column
+ * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
+ * of the table (including generated columns when
+ * 'publish_generated_columns' parameter is true).
*/
- if (!pub->alltables)
+ if (!cols)
{
- bool pub_no_list = true;
-
/*
- * Check for the presence of a column list in this publication.
- *
- * Note: If we find no pg_publication_rel row, it's a publication
- * defined for a whole schema, so it can't have a column list,
- * just like a FOR ALL TABLES publication.
+ * Cache the table columns for the first publication with no
+ * specified column list to detect publication with a different
+ * column list.
*/
- cftuple = SearchSysCache2(PUBLICATIONRELMAP,
- ObjectIdGetDatum(entry->publish_as_relid),
- ObjectIdGetDatum(pub->oid));
-
- if (HeapTupleIsValid(cftuple))
+ if (!relcols && (list_length(publications) > 1))
{
- /* Lookup the column list attribute. */
- cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
- Anum_pg_publication_rel_prattrs,
- &pub_no_list);
-
- /* Build the column list bitmap in the per-entry context. */
- if (!pub_no_list) /* when not null */
- {
- int i;
- int nliveatts = 0;
- TupleDesc desc = RelationGetDescr(relation);
- bool att_gen_present = false;
-
- pgoutput_ensure_entry_cxt(data, entry);
-
- cols = pub_collist_to_bitmapset(cols, cfdatum,
- entry->entry_cxt);
+ MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
- /* Get the number of live attributes. */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
-
- if (att->attisdropped)
- continue;
-
- if (att->attgenerated)
- {
- /*
- * Generated cols are skipped unless they are
- * present in a column list.
- */
- if (!bms_is_member(att->attnum, cols))
- continue;
-
- att_gen_present = true;
- }
-
- nliveatts++;
- }
-
- /*
- * Generated attributes are published only when they are
- * present in the column list. Otherwise, a NULL column
- * list means publish all columns.
- */
- if (!att_gen_present && bms_num_members(cols) == nliveatts)
- {
- bms_free(cols);
- cols = NULL;
- }
- }
-
- ReleaseSysCache(cftuple);
+ relcols = pub_form_cols_map(relation, entry->include_gencols);
+ MemoryContextSwitchTo(oldcxt);
}
+
+ cols = relcols;
}
if (first)
@@ -1129,6 +1147,13 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationGetRelationName(relation)));
} /* loop all subscribed publications */
+ /*
+ * If no column list publications exist, columns to be published will be
+ * computed later according to the 'publish_generated_columns' parameter.
+ */
+ if (!found_pub_collist)
+ entry->columns = NULL;
+
RelationClose(relation);
}
@@ -1541,15 +1566,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
- data->binary, relentry->columns);
+ data->binary, relentry->columns,
+ relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
- new_slot, data->binary, relentry->columns);
+ new_slot, data->binary, relentry->columns,
+ relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
- data->binary, relentry->columns);
+ data->binary, relentry->columns,
+ relentry->include_gencols);
break;
default:
Assert(false);
@@ -2000,6 +2028,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
entry->replicate_valid = false;
entry->schema_sent = false;
+ entry->include_gencols = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
@@ -2052,6 +2081,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* earlier definition.
*/
entry->schema_sent = false;
+ entry->include_gencols = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);
@@ -2223,6 +2253,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
+ /* Check whether to publish generated columns. */
+ check_and_init_gencol(data, rel_publications, entry);
+
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications, entry);
}