Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 745217a

Browse files
author
Amit Kapila
committed
Replicate generated columns when specified in the column list.
This commit allows logical replication to publish and replicate generated columns when explicitly listed in the column list. We also ensured that the generated columns were copied during the initial tablesync when they were published. We will allow to replicate generated columns even when they are not specified in the column list (via a new publication option) in a separate commit. The motivation of this work is to allow replication for cases where the client doesn't have generated columns. For example, the case where one is trying to replicate data from Postgres to the non-Postgres database. Author: Shubham Khanna, Vignesh C, Hou Zhijie Reviewed-by: Peter Smith, Hayato Kuroda, Shlok Kyal, Amit Kapila Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
1 parent f22e436 commit 745217a

File tree

10 files changed

+145
-74
lines changed

10 files changed

+145
-74
lines changed

doc/src/sgml/protocol.sgml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6544,7 +6544,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
65446544

65456545
<para>
65466546
Next, the following message part appears for each column included in
6547-
the publication (except generated columns):
6547+
the publication:
65486548
</para>
65496549

65506550
<variablelist>
@@ -7477,7 +7477,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
74777477
</variablelist>
74787478

74797479
<para>
7480-
Next, one of the following submessages appears for each column (except generated columns):
7480+
Next, one of the following submessages appears for each column:
74817481

74827482
<variablelist>
74837483
<varlistentry>

doc/src/sgml/ref/create_publication.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
8989

9090
<para>
9191
When a column list is specified, only the named columns are replicated.
92-
If no column list is specified, all columns of the table are replicated
92+
The column list can contain generated columns as well. If no column list
93+
is specified, all table columns (except generated columns) are replicated
9394
through this publication, including any columns added later. It has no
9495
effect on <literal>TRUNCATE</literal> commands. See
9596
<xref linkend="logical-replication-col-lists"/> for details about column

src/backend/catalog/pg_publication.c

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
500500
* pub_collist_validate
501501
* Process and validate the 'columns' list and ensure the columns are all
502502
* valid to use for a publication. Checks for and raises an ERROR for
503-
* any; unknown columns, system columns, duplicate columns or generated
504-
* columns.
503+
* any unknown columns, system columns, or duplicate columns.
505504
*
506505
* Looks up each column's attnum and returns a 0-based Bitmapset of the
507506
* corresponding attnums.
@@ -511,7 +510,6 @@ pub_collist_validate(Relation targetrel, List *columns)
511510
{
512511
Bitmapset *set = NULL;
513512
ListCell *lc;
514-
TupleDesc tupdesc = RelationGetDescr(targetrel);
515513

516514
foreach(lc, columns)
517515
{
@@ -530,12 +528,6 @@ pub_collist_validate(Relation targetrel, List *columns)
530528
errmsg("cannot use system column \"%s\" in publication column list",
531529
colname));
532530

533-
if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
534-
ereport(ERROR,
535-
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
536-
errmsg("cannot use generated column \"%s\" in publication column list",
537-
colname));
538-
539531
if (bms_is_member(attnum, set))
540532
ereport(ERROR,
541533
errcode(ERRCODE_DUPLICATE_OBJECT),

src/backend/replication/logical/proto.c

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,6 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
4040
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
4141
static const char *logicalrep_read_namespace(StringInfo in);
4242

43-
/*
44-
* Check if a column is covered by a column list.
45-
*
46-
* Need to be careful about NULL, which is treated as a column list covering
47-
* all columns.
48-
*/
49-
static bool
50-
column_in_column_list(int attnum, Bitmapset *columns)
51-
{
52-
return (columns == NULL || bms_is_member(attnum, columns));
53-
}
54-
55-
5643
/*
5744
* Write BEGIN to the output stream.
5845
*/
@@ -781,10 +768,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
781768
{
782769
Form_pg_attribute att = TupleDescAttr(desc, i);
783770

784-
if (att->attisdropped || att->attgenerated)
785-
continue;
786-
787-
if (!column_in_column_list(att->attnum, columns))
771+
if (!logicalrep_should_publish_column(att, columns))
788772
continue;
789773

790774
nliveatts++;
@@ -802,10 +786,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
802786
Form_pg_type typclass;
803787
Form_pg_attribute att = TupleDescAttr(desc, i);
804788

805-
if (att->attisdropped || att->attgenerated)
806-
continue;
807-
808-
if (!column_in_column_list(att->attnum, columns))
789+
if (!logicalrep_should_publish_column(att, columns))
809790
continue;
810791

811792
if (isnull[i])
@@ -938,10 +919,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
938919
{
939920
Form_pg_attribute att = TupleDescAttr(desc, i);
940921

941-
if (att->attisdropped || att->attgenerated)
942-
continue;
943-
944-
if (!column_in_column_list(att->attnum, columns))
922+
if (!logicalrep_should_publish_column(att, columns))
945923
continue;
946924

947925
nliveatts++;
@@ -959,10 +937,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
959937
Form_pg_attribute att = TupleDescAttr(desc, i);
960938
uint8 flags = 0;
961939

962-
if (att->attisdropped || att->attgenerated)
963-
continue;
964-
965-
if (!column_in_column_list(att->attnum, columns))
940+
if (!logicalrep_should_publish_column(att, columns))
966941
continue;
967942

968943
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@@ -1269,3 +1244,33 @@ logicalrep_message_type(LogicalRepMsgType action)
12691244

12701245
return err_unknown;
12711246
}
1247+
1248+
/*
1249+
* Check if the column 'att' of a table should be published.
1250+
*
1251+
* 'columns' represents the column list specified for that table in the
1252+
* publication.
1253+
*
1254+
* Note that generated columns can be present only in 'columns' list.
1255+
*/
1256+
bool
1257+
logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
1258+
{
1259+
if (att->attisdropped)
1260+
return false;
1261+
1262+
/*
1263+
* Skip publishing generated columns if they are not included in the
1264+
* column list.
1265+
*/
1266+
if (!columns && att->attgenerated)
1267+
return false;
1268+
1269+
/*
1270+
* Check if a column is covered by a column list.
1271+
*/
1272+
if (columns && !bms_is_member(att->attnum, columns))
1273+
return false;
1274+
1275+
return true;
1276+
}

src/backend/replication/logical/tablesync.c

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread)
787787

788788
/*
789789
* Get information about remote relation in similar fashion the RELATION
790-
* message provides during replication. This function also returns the relation
791-
* qualifications to be used in the COPY command.
790+
* message provides during replication.
791+
*
792+
* This function also returns (a) the relation qualifications to be used in
793+
* the COPY command, and (b) whether the remote relation has published any
794+
* generated column.
792795
*/
793796
static void
794-
fetch_remote_table_info(char *nspname, char *relname,
795-
LogicalRepRelation *lrel, List **qual)
797+
fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
798+
List **qual, bool *gencol_published)
796799
{
797800
WalRcvExecResult *res;
798801
StringInfoData cmd;
799802
TupleTableSlot *slot;
800803
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
801-
Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
804+
Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
802805
Oid qualRow[] = {TEXTOID};
803806
bool isnull;
804807
int natt;
805808
StringInfo pub_names = NULL;
806809
Bitmapset *included_cols = NULL;
810+
int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
807811

808812
lrel->nspname = nspname;
809813
lrel->relname = relname;
@@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname,
851855
* We need to do this before fetching info about column names and types,
852856
* so that we can skip columns that should not be replicated.
853857
*/
854-
if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
858+
if (server_version >= 150000)
855859
{
856860
WalRcvExecResult *pubres;
857861
TupleTableSlot *tslot;
@@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname,
941945
"SELECT a.attnum,"
942946
" a.attname,"
943947
" a.atttypid,"
944-
" a.attnum = ANY(i.indkey)"
948+
" a.attnum = ANY(i.indkey)");
949+
950+
/* Generated columns can be replicated since version 18. */
951+
if (server_version >= 180000)
952+
appendStringInfo(&cmd, ", a.attgenerated != ''");
953+
954+
appendStringInfo(&cmd,
945955
" FROM pg_catalog.pg_attribute a"
946956
" LEFT JOIN pg_catalog.pg_index i"
947957
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
@@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname,
950960
" AND a.attrelid = %u"
951961
" ORDER BY a.attnum",
952962
lrel->remoteid,
953-
(walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
963+
(server_version >= 120000 && server_version < 180000 ?
954964
"AND a.attgenerated = ''" : ""),
955965
lrel->remoteid);
956966
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
957-
lengthof(attrRow), attrRow);
967+
server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
958968

959969
if (res->status != WALRCV_OK_TUPLES)
960970
ereport(ERROR,
@@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname,
9981008
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
9991009
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
10001010

1011+
/* Remember if the remote table has published any generated column. */
1012+
if (server_version >= 180000 && !(*gencol_published))
1013+
{
1014+
*gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1015+
Assert(!isnull);
1016+
}
1017+
10011018
/* Should never happen. */
10021019
if (++natt >= MaxTupleAttributeNumber)
10031020
elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname,
10301047
* 3) one of the subscribed publications is declared as TABLES IN SCHEMA
10311048
* that includes this relation
10321049
*/
1033-
if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
1050+
if (server_version >= 150000)
10341051
{
10351052
/* Reuse the already-built pub_names. */
10361053
Assert(pub_names != NULL);
@@ -1106,10 +1123,12 @@ copy_table(Relation rel)
11061123
List *attnamelist;
11071124
ParseState *pstate;
11081125
List *options = NIL;
1126+
bool gencol_published = false;
11091127

11101128
/* Get the publisher relation info. */
11111129
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1112-
RelationGetRelationName(rel), &lrel, &qual);
1130+
RelationGetRelationName(rel), &lrel, &qual,
1131+
&gencol_published);
11131132

11141133
/* Put the relation into relmap. */
11151134
logicalrep_relmap_update(&lrel);
@@ -1121,8 +1140,8 @@ copy_table(Relation rel)
11211140
/* Start copy on the publisher. */
11221141
initStringInfo(&cmd);
11231142

1124-
/* Regular table with no row filter */
1125-
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1143+
/* Regular table with no row filter or generated columns */
1144+
if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
11261145
{
11271146
appendStringInfo(&cmd, "COPY %s",
11281147
quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1153,9 +1172,14 @@ copy_table(Relation rel)
11531172
{
11541173
/*
11551174
* For non-tables and tables with row filters, we need to do COPY
1156-
* (SELECT ...), but we can't just do SELECT * because we need to not
1157-
* copy generated columns. For tables with any row filters, build a
1158-
* SELECT query with OR'ed row filters for COPY.
1175+
* (SELECT ...), but we can't just do SELECT * because we may need to
1176+
* copy only subset of columns including generated columns. For tables
1177+
* with any row filters, build a SELECT query with OR'ed row filters
1178+
* for COPY.
1179+
*
1180+
* We also need to use this same COPY (SELECT ...) syntax when
1181+
* generated columns are published, because copy of generated columns
1182+
* is not supported by the normal COPY.
11591183
*/
11601184
appendStringInfoString(&cmd, "COPY (SELECT ");
11611185
for (int i = 0; i < lrel.natts; i++)

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -766,16 +766,12 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
766766
{
767767
Form_pg_attribute att = TupleDescAttr(desc, i);
768768

769-
if (att->attisdropped || att->attgenerated)
769+
if (!logicalrep_should_publish_column(att, columns))
770770
continue;
771771

772772
if (att->atttypid < FirstGenbkiObjectId)
773773
continue;
774774

775-
/* Skip this attribute if it's not present in the column list */
776-
if (columns != NULL && !bms_is_member(att->attnum, columns))
777-
continue;
778-
779775
OutputPluginPrepareWrite(ctx, false);
780776
logicalrep_write_typ(ctx->out, xid, att->atttypid);
781777
OutputPluginWrite(ctx, false);
@@ -1074,6 +1070,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
10741070
int i;
10751071
int nliveatts = 0;
10761072
TupleDesc desc = RelationGetDescr(relation);
1073+
bool att_gen_present = false;
10771074

10781075
pgoutput_ensure_entry_cxt(data, entry);
10791076

@@ -1085,17 +1082,30 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
10851082
{
10861083
Form_pg_attribute att = TupleDescAttr(desc, i);
10871084

1088-
if (att->attisdropped || att->attgenerated)
1085+
if (att->attisdropped)
10891086
continue;
10901087

1088+
if (att->attgenerated)
1089+
{
1090+
/*
1091+
* Generated cols are skipped unless they are
1092+
* present in a column list.
1093+
*/
1094+
if (!bms_is_member(att->attnum, cols))
1095+
continue;
1096+
1097+
att_gen_present = true;
1098+
}
1099+
10911100
nliveatts++;
10921101
}
10931102

10941103
/*
1095-
* If column list includes all the columns of the table,
1096-
* set it to NULL.
1104+
* Generated attributes are published only when they are
1105+
* present in the column list. Otherwise, a NULL column
1106+
* list means publish all columns.
10971107
*/
1098-
if (bms_num_members(cols) == nliveatts)
1108+
if (!att_gen_present && bms_num_members(cols) == nliveatts)
10991109
{
11001110
bms_free(cols);
11011111
cols = NULL;

src/include/replication/logicalproto.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,5 +270,7 @@ extern void logicalrep_read_stream_abort(StringInfo in,
270270
LogicalRepStreamAbortData *abort_data,
271271
bool read_abort_info);
272272
extern const char *logicalrep_message_type(LogicalRepMsgType action);
273+
extern bool logicalrep_should_publish_column(Form_pg_attribute att,
274+
Bitmapset *columns);
273275

274276
#endif /* LOGICAL_PROTO_H */

src/test/regress/expected/publication.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,9 +687,6 @@ UPDATE testpub_tbl5 SET a = 1;
687687
ERROR: cannot update table "testpub_tbl5"
688688
DETAIL: Column list used by the publication does not cover the replica identity.
689689
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
690-
-- error: generated column "d" can't be in list
691-
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
692-
ERROR: cannot use generated column "d" in publication column list
693690
-- error: system attributes "ctid" not allowed in column list
694691
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
695692
ERROR: cannot use system column "ctid" in publication column list
@@ -717,6 +714,9 @@ UPDATE testpub_tbl5 SET a = 1;
717714
ERROR: cannot update table "testpub_tbl5"
718715
DETAIL: Column list used by the publication does not cover the replica identity.
719716
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
717+
-- ok: generated column "d" can be in the list too
718+
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
719+
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
720720
-- error: change the replica identity to "b", and column list to (a, c)
721721
-- then update fails, because (a, c) does not cover replica identity
722722
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;

0 commit comments

Comments
 (0)