diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 73 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 33 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 69 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 201 |
4 files changed, 254 insertions, 122 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 17a6093d069..09e2dbdd10a 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -257,6 +257,52 @@ is_schema_publication(Oid pubid) } /* + * Returns true if the relation has column list associated with the + * publication, false otherwise. + * + * If a column list is found, the corresponding bitmap is returned through the + * cols parameter, if provided. The bitmap is constructed within the given + * memory context (mcxt). + */ +bool +check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, + Bitmapset **cols) +{ + HeapTuple cftuple; + bool found = false; + + if (pub->alltables) + return false; + + cftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(cftuple)) + { + Datum cfdatum; + bool isnull; + + /* Lookup the column list attribute. */ + cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, + Anum_pg_publication_rel_prattrs, &isnull); + + /* Was a column list found? */ + if (!isnull) + { + /* Build the column list bitmap in the given memory context. */ + if (cols) + *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt); + + found = true; + } + + ReleaseSysCache(cftuple); + } + + return found; +} + +/* * Gets the relations based on the publication partition option for a specified * relation. */ @@ -574,6 +620,30 @@ pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) } /* + * Returns a bitmap representing the columns of the specified table. + * + * Generated columns are included if include_gencols is true. + */ +Bitmapset * +pub_form_cols_map(Relation relation, bool include_gencols) +{ + Bitmapset *result = NULL; + TupleDesc desc = RelationGetDescr(relation); + + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || (att->attgenerated && !include_gencols)) + continue; + + result = bms_add_member(result, att->attnum); + } + + return result; +} + +/* * Insert new publication / schema mapping. */ ObjectAddress @@ -998,6 +1068,7 @@ GetPublication(Oid pubid) pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + pub->pubgencols = pubform->pubgencols; ReleaseSysCache(tup); @@ -1205,7 +1276,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped || (att->attgenerated && !pub->pubgencols)) continue; attnums[nattnums++] = att->attnum; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index d6ffef374ea..0129db18c6e 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -78,12 +78,15 @@ parse_publication_options(ParseState *pstate, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, - bool *publish_via_partition_root) + bool *publish_via_partition_root, + bool *publish_generated_columns_given, + bool *publish_generated_columns) { ListCell *lc; *publish_given = false; *publish_via_partition_root_given = false; + *publish_generated_columns_given = false; /* defaults */ pubactions->pubinsert = true; @@ -91,6 +94,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; pubactions->pubtruncate = true; *publish_via_partition_root = false; + *publish_generated_columns = false; /* Parse options */ foreach(lc, options) @@ -151,6 +155,13 @@ parse_publication_options(ParseState *pstate, *publish_via_partition_root_given = true; *publish_via_partition_root = defGetBoolean(defel); } + else if (strcmp(defel->defname, "publish_generated_columns") == 0) + { + if (*publish_generated_columns_given) + errorConflictingDefElem(defel, pstate); + *publish_generated_columns_given = true; + *publish_generated_columns = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -737,6 +748,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) PublicationActions pubactions; bool publish_via_partition_root_given; bool publish_via_partition_root; + bool publish_generated_columns_given; + bool publish_generated_columns; AclResult aclresult; List *relations = NIL; List *schemaidlist = NIL; @@ -776,7 +789,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) stmt->options, &publish_given, &pubactions, &publish_via_partition_root_given, - &publish_via_partition_root); + &publish_via_partition_root, + &publish_generated_columns_given, + &publish_generated_columns); puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, Anum_pg_publication_oid); @@ -793,6 +808,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubtruncate); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + values[Anum_pg_publication_pubgencols - 1] = + BoolGetDatum(publish_generated_columns); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -878,6 +895,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, PublicationActions pubactions; bool publish_via_partition_root_given; bool publish_via_partition_root; + bool publish_generated_columns_given; + bool publish_generated_columns; ObjectAddress obj; Form_pg_publication pubform; List *root_relids = NIL; @@ -887,7 +906,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, stmt->options, &publish_given, &pubactions, &publish_via_partition_root_given, - &publish_via_partition_root); + &publish_via_partition_root, + &publish_generated_columns_given, + &publish_generated_columns); pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -997,6 +1018,12 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, replaces[Anum_pg_publication_pubviaroot - 1] = true; } + if (publish_generated_columns_given) + { + values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns); + replaces[Anum_pg_publication_pubgencols - 1] = true; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index ac4af53feba..2c2085b2f98 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,10 +30,11 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns); + Bitmapset *columns, bool include_gencols); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool include_gencols); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -399,7 +400,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary, Bitmapset *columns) + TupleTableSlot *newslot, bool binary, + Bitmapset *columns, bool include_gencols) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -411,7 +413,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols); } /* @@ -444,7 +446,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, bool include_gencols) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -465,11 +467,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + include_gencols); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols); } /* @@ -519,7 +522,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns) + Bitmapset *columns, bool include_gencols) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -539,7 +542,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols); } /* @@ -655,7 +658,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns) + Bitmapset *columns, bool include_gencols) { char *relname; @@ -677,7 +680,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns); + logicalrep_write_attrs(out, rel, columns, include_gencols); } /* @@ -754,7 +757,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, bool include_gencols) { TupleDesc desc; Datum *values; @@ -768,7 +771,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; nliveatts++; @@ -786,7 +789,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; if (isnull[i]) @@ -904,7 +907,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, + bool include_gencols) { TupleDesc desc; int i; @@ -919,7 +923,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; nliveatts++; @@ -937,7 +941,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ @@ -1248,29 +1252,26 @@ logicalrep_message_type(LogicalRepMsgType action) /* * Check if the column 'att' of a table should be published. * - * 'columns' represents the column list specified for that table in the - * publication. + * 'columns' represents the publication column list (if any) for that table. * - * Note that generated columns can be present only in 'columns' list. + * 'include_gencols' flag indicates whether generated columns should be + * published when there is no column list. Typically, this will have the same + * value as the 'publish_generated_columns' publication parameter. + * + * Note that generated columns can be published only when present in a + * publication column list, or when include_gencols is true. */ bool -logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns) +logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, + bool include_gencols) { if (att->attisdropped) return false; - /* - * Skip publishing generated columns if they are not included in the - * column list. - */ - if (!columns && att->attgenerated) - return false; - - /* - * Check if a column is covered by a column list. - */ - if (columns && !bms_is_member(att->attnum, columns)) - return false; + /* If a column list is provided, publish only the cols in that list. */ + if (columns) + return bms_is_member(att->attnum, columns); - return true; + /* All non-generated columns are always published. */ + return att->attgenerated ? include_gencols : true; } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 12c17359063..a6002b223df 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -84,9 +84,6 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx, - Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -129,6 +126,12 @@ typedef struct RelationSyncEntry bool replicate_valid; /* overall validity flag for entry */ bool schema_sent; + + /* + * This is set if the 'publish_generated_columns' parameter is true, and + * the relation contains generated columns. + */ + bool include_gencols; List *streamed_txns; /* streamed toplevel transactions with this * schema */ @@ -213,6 +216,9 @@ static void init_rel_sync_cache(MemoryContext cachectx); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx, + RelationSyncEntry *relentry); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -731,11 +737,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns); + send_relation_and_attrs(relation, xid, ctx, relentry); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -749,9 +755,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns) + RelationSyncEntry *relentry) { TupleDesc desc = RelationGetDescr(relation); + Bitmapset *columns = relentry->columns; + bool include_gencols = relentry->include_gencols; int i; /* @@ -766,7 +774,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; if (att->atttypid < FirstGenbkiObjectId) @@ -778,7 +786,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns); + logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols); OutputPluginWrite(ctx, false); } @@ -1005,6 +1013,66 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } /* + * If the table contains a generated column, check for any conflicting + * values of 'publish_generated_columns' parameter in the publications. + */ +static void +check_and_init_gencol(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + Relation relation = RelationIdGetRelation(entry->publish_as_relid); + TupleDesc desc = RelationGetDescr(relation); + bool gencolpresent = false; + bool first = true; + + /* Check if there is any generated column present. */ + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attgenerated) + { + gencolpresent = true; + break; + } + } + + /* There are no generated columns to be published. */ + if (!gencolpresent) + { + entry->include_gencols = false; + return; + } + + /* + * There may be a conflicting value for 'publish_generated_columns' + * parameter in the publications. + */ + foreach_ptr(Publication, pub, publications) + { + /* + * The column list takes precedence over the + * 'publish_generated_columns' parameter. Those will be checked later, + * see pgoutput_column_list_init. + */ + if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL)) + continue; + + if (first) + { + entry->include_gencols = pub->pubgencols; + first = false; + } + else if (entry->include_gencols != pub->pubgencols) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation))); + } +} + +/* * Initialize the column list. */ static void @@ -1014,6 +1082,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, ListCell *lc; bool first = true; Relation relation = RelationIdGetRelation(entry->publish_as_relid); + bool found_pub_collist = false; + Bitmapset *relcols = NULL; + + pgoutput_ensure_entry_cxt(data, entry); /* * Find if there are any column lists for this relation. If there are, @@ -1027,93 +1099,39 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, * fetch_table_list. But one can later change the publication so we still * need to check all the given publication-table mappings and report an * error if any publications have a different column list. - * - * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list". */ foreach(lc, publications) { Publication *pub = lfirst(lc); - HeapTuple cftuple = NULL; - Datum cfdatum = 0; Bitmapset *cols = NULL; + /* Retrieve the bitmap of columns for a column list publication. */ + found_pub_collist |= check_and_fetch_column_list(pub, + entry->publish_as_relid, + entry->entry_cxt, &cols); + /* - * If the publication is FOR ALL TABLES then it is treated the same as - * if there are no column lists (even if other publications have a - * list). + * For non-column list publications — e.g. TABLE (without a column + * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns + * of the table (including generated columns when + * 'publish_generated_columns' parameter is true). */ - if (!pub->alltables) + if (!cols) { - bool pub_no_list = true; - /* - * Check for the presence of a column list in this publication. - * - * Note: If we find no pg_publication_rel row, it's a publication - * defined for a whole schema, so it can't have a column list, - * just like a FOR ALL TABLES publication. + * Cache the table columns for the first publication with no + * specified column list to detect publication with a different + * column list. */ - cftuple = SearchSysCache2(PUBLICATIONRELMAP, - ObjectIdGetDatum(entry->publish_as_relid), - ObjectIdGetDatum(pub->oid)); - - if (HeapTupleIsValid(cftuple)) + if (!relcols && (list_length(publications) > 1)) { - /* Lookup the column list attribute. */ - cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, - Anum_pg_publication_rel_prattrs, - &pub_no_list); - - /* Build the column list bitmap in the per-entry context. */ - if (!pub_no_list) /* when not null */ - { - int i; - int nliveatts = 0; - TupleDesc desc = RelationGetDescr(relation); - bool att_gen_present = false; - - pgoutput_ensure_entry_cxt(data, entry); - - cols = pub_collist_to_bitmapset(cols, cfdatum, - entry->entry_cxt); + MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); - /* Get the number of live attributes. */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->attgenerated) - { - /* - * Generated cols are skipped unless they are - * present in a column list. - */ - if (!bms_is_member(att->attnum, cols)) - continue; - - att_gen_present = true; - } - - nliveatts++; - } - - /* - * Generated attributes are published only when they are - * present in the column list. Otherwise, a NULL column - * list means publish all columns. - */ - if (!att_gen_present && bms_num_members(cols) == nliveatts) - { - bms_free(cols); - cols = NULL; - } - } - - ReleaseSysCache(cftuple); + relcols = pub_form_cols_map(relation, entry->include_gencols); + MemoryContextSwitchTo(oldcxt); } + + cols = relcols; } if (first) @@ -1129,6 +1147,13 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, RelationGetRelationName(relation))); } /* loop all subscribed publications */ + /* + * If no column list publications exist, columns to be published will be + * computed later according to the 'publish_generated_columns' parameter. + */ + if (!found_pub_collist) + entry->columns = NULL; + RelationClose(relation); } @@ -1541,15 +1566,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + relentry->include_gencols); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns); + new_slot, data->binary, relentry->columns, + relentry->include_gencols); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + relentry->include_gencols); break; default: Assert(false); @@ -2000,6 +2028,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { entry->replicate_valid = false; entry->schema_sent = false; + entry->include_gencols = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; @@ -2052,6 +2081,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * earlier definition. */ entry->schema_sent = false; + entry->include_gencols = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; bms_free(entry->columns); @@ -2223,6 +2253,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Initialize the row filter */ pgoutput_row_filter_init(data, rel_publications, entry); + /* Check whether to publish generated columns. */ + check_and_init_gencol(data, rel_publications, entry); + /* Initialize the column list */ pgoutput_column_list_init(data, rel_publications, entry); } |