Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 062a844

Browse files
author
Amit Kapila
committed
Avoid syncing data twice for the 'publish_via_partition_root' option.
When there are multiple publications for a subscription and one of those publishes via the parent table by using publish_via_partition_root and the other one directly publishes the child table, we end up copying the same data twice during initial synchronization. The reason for this was that we get both the parent and child tables from the publisher and try to copy the data for both of them. This patch extends the function pg_get_publication_tables() to take a publication list as its input parameter. This allows us to exclude a partition table whose ancestor is published by the same publication list. This problem does exist in back-branches but we decide to fix it there in a separate commit if required. The fix for back-branches requires quite complicated changes to fetch the required table information from the publisher as we can't update the function pg_get_publication_tables() in back-branches. We are not sure whether we want to deviate and complicate the code in back-branches for this problem as there are no field reports yet. Author: Wang wei Reviewed-by: Peter Smith, Jacob Champion, Kuroda Hayato, Vignesh C, Osumi Takamichi, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent de5a47a commit 062a844

File tree

10 files changed

+274
-99
lines changed

10 files changed

+274
-99
lines changed

doc/src/sgml/ref/create_publication.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
201201
consisting of a different set of partitions.
202202
</para>
203203

204+
<para>
205+
There can be a case where a subscription combines multiple
206+
publications. If a partitioned table is published by any
207+
subscribed publications which set
208+
<literal>publish_via_partition_root</literal> = true, changes on this
209+
partitioned table (or on its partitions) will be published using
210+
the identity and schema of this partitioned table rather than
211+
that of the individual partitions.
212+
</para>
213+
204214
<para>
205215
This parameter also affects how row filters and column lists are
206216
chosen for partitions; see below for details.

src/backend/catalog/pg_publication.c

Lines changed: 140 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@
4545
#include "utils/rel.h"
4646
#include "utils/syscache.h"
4747

48+
/* Records association between publication and published table */
49+
typedef struct
50+
{
51+
Oid relid; /* OID of published table */
52+
Oid pubid; /* OID of publication that publishes this
53+
* table. */
54+
} published_rel;
55+
4856
static void publication_translate_columns(Relation targetrel, List *columns,
4957
int *natts, AttrNumber **attrs);
5058

@@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
172180
}
173181

174182
/*
175-
* Filter out the partitions whose parent tables were also specified in
176-
* the publication.
183+
* Returns true if the ancestor is in the list of published relations.
184+
* Otherwise, returns false.
177185
*/
178-
static List *
179-
filter_partitions(List *relids)
186+
static bool
187+
is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
188+
{
189+
ListCell *lc;
190+
191+
foreach(lc, table_infos)
192+
{
193+
Oid relid = ((published_rel *) lfirst(lc))->relid;
194+
195+
if (relid == ancestor)
196+
return true;
197+
}
198+
199+
return false;
200+
}
201+
202+
/*
203+
* Filter out the partitions whose parent tables are also present in the list.
204+
*/
205+
static void
206+
filter_partitions(List *table_infos)
180207
{
181-
List *result = NIL;
182208
ListCell *lc;
183-
ListCell *lc2;
184209

185-
foreach(lc, relids)
210+
foreach(lc, table_infos)
186211
{
187212
bool skip = false;
188213
List *ancestors = NIL;
189-
Oid relid = lfirst_oid(lc);
214+
ListCell *lc2;
215+
published_rel *table_info = (published_rel *) lfirst(lc);
190216

191-
if (get_rel_relispartition(relid))
192-
ancestors = get_partition_ancestors(relid);
217+
if (get_rel_relispartition(table_info->relid))
218+
ancestors = get_partition_ancestors(table_info->relid);
193219

194220
foreach(lc2, ancestors)
195221
{
196222
Oid ancestor = lfirst_oid(lc2);
197223

198-
/* Check if the parent table exists in the published table list. */
199-
if (list_member_oid(relids, ancestor))
224+
if (is_ancestor_member_tableinfos(ancestor, table_infos))
200225
{
201226
skip = true;
202227
break;
203228
}
204229
}
205230

206-
if (!skip)
207-
result = lappend_oid(result, relid);
231+
if (skip)
232+
table_infos = foreach_delete_current(table_infos, lc);
208233
}
209-
210-
return result;
211234
}
212235

213236
/*
@@ -1026,91 +1049,136 @@ GetPublicationByName(const char *pubname, bool missing_ok)
10261049
}
10271050

10281051
/*
1029-
* Returns information of tables in a publication.
1052+
* Get information of the tables in the given publication array.
1053+
*
1054+
* Returns pubid, relid, column list, row filter for each table.
10301055
*/
10311056
Datum
10321057
pg_get_publication_tables(PG_FUNCTION_ARGS)
10331058
{
1034-
#define NUM_PUBLICATION_TABLES_ELEM 3
1059+
#define NUM_PUBLICATION_TABLES_ELEM 4
10351060
FuncCallContext *funcctx;
1036-
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1037-
Publication *publication;
1038-
List *tables;
1061+
List *table_infos = NIL;
10391062

10401063
/* stuff done only on the first call of the function */
10411064
if (SRF_IS_FIRSTCALL())
10421065
{
10431066
TupleDesc tupdesc;
10441067
MemoryContext oldcontext;
1068+
ArrayType *arr;
1069+
Datum *elems;
1070+
int nelems,
1071+
i;
1072+
bool viaroot = false;
10451073

10461074
/* create a function context for cross-call persistence */
10471075
funcctx = SRF_FIRSTCALL_INIT();
10481076

10491077
/* switch to memory context appropriate for multiple function calls */
10501078
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
10511079

1052-
publication = GetPublicationByName(pubname, false);
1053-
10541080
/*
1055-
* Publications support partitioned tables, although all changes are
1056-
* replicated using leaf partition identity and schema, so we only
1057-
* need those.
1081+
* Deconstruct the parameter into elements where each element is a
1082+
* publication name.
10581083
*/
1059-
if (publication->alltables)
1060-
{
1061-
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
1062-
}
1063-
else
1084+
arr = PG_GETARG_ARRAYTYPE_P(0);
1085+
deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
1086+
&elems, NULL, &nelems);
1087+
1088+
/* Get Oids of tables from each publication. */
1089+
for (i = 0; i < nelems; i++)
10641090
{
1065-
List *relids,
1066-
*schemarelids;
1067-
1068-
relids = GetPublicationRelations(publication->oid,
1069-
publication->pubviaroot ?
1070-
PUBLICATION_PART_ROOT :
1071-
PUBLICATION_PART_LEAF);
1072-
schemarelids = GetAllSchemaPublicationRelations(publication->oid,
1073-
publication->pubviaroot ?
1074-
PUBLICATION_PART_ROOT :
1075-
PUBLICATION_PART_LEAF);
1076-
tables = list_concat_unique_oid(relids, schemarelids);
1091+
Publication *pub_elem;
1092+
List *pub_elem_tables = NIL;
1093+
ListCell *lc;
1094+
1095+
pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
10771096

10781097
/*
1079-
* If the publication publishes partition changes via their
1080-
* respective root partitioned tables, we must exclude partitions
1081-
* in favor of including the root partitioned tables. Otherwise,
1082-
* the function could return both the child and parent tables
1083-
* which could cause data of the child table to be
1084-
* double-published on the subscriber side.
1098+
* Publications support partitioned tables. If
1099+
* publish_via_partition_root is false, all changes are replicated
1100+
* using leaf partition identity and schema, so we only need
1101+
* those. Otherwise, get the partitioned table itself.
10851102
*/
1086-
if (publication->pubviaroot)
1087-
tables = filter_partitions(tables);
1103+
if (pub_elem->alltables)
1104+
pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1105+
else
1106+
{
1107+
List *relids,
1108+
*schemarelids;
1109+
1110+
relids = GetPublicationRelations(pub_elem->oid,
1111+
pub_elem->pubviaroot ?
1112+
PUBLICATION_PART_ROOT :
1113+
PUBLICATION_PART_LEAF);
1114+
schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1115+
pub_elem->pubviaroot ?
1116+
PUBLICATION_PART_ROOT :
1117+
PUBLICATION_PART_LEAF);
1118+
pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1119+
}
1120+
1121+
/*
1122+
* Record the published table and the corresponding publication so
1123+
* that we can get row filters and column lists later.
1124+
*
1125+
* When a table is published by multiple publications, to obtain
1126+
* all row filters and column lists, the structure related to this
1127+
* table will be recorded multiple times.
1128+
*/
1129+
foreach(lc, pub_elem_tables)
1130+
{
1131+
published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1132+
1133+
table_info->relid = lfirst_oid(lc);
1134+
table_info->pubid = pub_elem->oid;
1135+
table_infos = lappend(table_infos, table_info);
1136+
}
1137+
1138+
/* At least one publication is using publish_via_partition_root. */
1139+
if (pub_elem->pubviaroot)
1140+
viaroot = true;
10881141
}
10891142

1143+
/*
1144+
* If the publication publishes partition changes via their respective
1145+
* root partitioned tables, we must exclude partitions in favor of
1146+
* including the root partitioned tables. Otherwise, the function
1147+
* could return both the child and parent tables which could cause
1148+
* data of the child table to be double-published on the subscriber
1149+
* side.
1150+
*/
1151+
if (viaroot)
1152+
filter_partitions(table_infos);
1153+
10901154
/* Construct a tuple descriptor for the result rows. */
10911155
tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1092-
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
1156+
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
10931157
OIDOID, -1, 0);
1094-
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
1158+
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1159+
OIDOID, -1, 0);
1160+
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
10951161
INT2VECTOROID, -1, 0);
1096-
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
1162+
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
10971163
PG_NODE_TREEOID, -1, 0);
10981164

10991165
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1100-
funcctx->user_fctx = (void *) tables;
1166+
funcctx->user_fctx = (void *) table_infos;
11011167

11021168
MemoryContextSwitchTo(oldcontext);
11031169
}
11041170

11051171
/* stuff done on every call of the function */
11061172
funcctx = SRF_PERCALL_SETUP();
1107-
tables = (List *) funcctx->user_fctx;
1173+
table_infos = (List *) funcctx->user_fctx;
11081174

1109-
if (funcctx->call_cntr < list_length(tables))
1175+
if (funcctx->call_cntr < list_length(table_infos))
11101176
{
11111177
HeapTuple pubtuple = NULL;
11121178
HeapTuple rettuple;
1113-
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
1179+
Publication *pub;
1180+
published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1181+
Oid relid = table_info->relid;
11141182
Oid schemaid = get_rel_namespace(relid);
11151183
Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
11161184
bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
@@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11191187
* Form tuple with appropriate data.
11201188
*/
11211189

1122-
publication = GetPublicationByName(pubname, false);
1190+
pub = GetPublication(table_info->pubid);
11231191

1124-
values[0] = ObjectIdGetDatum(relid);
1192+
values[0] = ObjectIdGetDatum(pub->oid);
1193+
values[1] = ObjectIdGetDatum(relid);
11251194

11261195
/*
11271196
* We don't consider row filters or column lists for FOR ALL TABLES or
11281197
* FOR TABLES IN SCHEMA publications.
11291198
*/
1130-
if (!publication->alltables &&
1199+
if (!pub->alltables &&
11311200
!SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
11321201
ObjectIdGetDatum(schemaid),
1133-
ObjectIdGetDatum(publication->oid)))
1202+
ObjectIdGetDatum(pub->oid)))
11341203
pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
11351204
ObjectIdGetDatum(relid),
1136-
ObjectIdGetDatum(publication->oid));
1205+
ObjectIdGetDatum(pub->oid));
11371206

11381207
if (HeapTupleIsValid(pubtuple))
11391208
{
11401209
/* Lookup the column list attribute. */
1141-
values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1210+
values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
11421211
Anum_pg_publication_rel_prattrs,
1143-
&(nulls[1]));
1212+
&(nulls[2]));
11441213

11451214
/* Null indicates no filter. */
1146-
values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1215+
values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
11471216
Anum_pg_publication_rel_prqual,
1148-
&(nulls[2]));
1217+
&(nulls[3]));
11491218
}
11501219
else
11511220
{
1152-
nulls[1] = true;
11531221
nulls[2] = true;
1222+
nulls[3] = true;
11541223
}
11551224

11561225
/* Show all columns when the column list is not specified. */
1157-
if (nulls[1] == true)
1226+
if (nulls[2])
11581227
{
11591228
Relation rel = table_open(relid, AccessShareLock);
11601229
int nattnums = 0;
@@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11761245

11771246
if (nattnums > 0)
11781247
{
1179-
values[1] = PointerGetDatum(buildint2vector(attnums, nattnums));
1180-
nulls[1] = false;
1248+
values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1249+
nulls[2] = false;
11811250
}
11821251

11831252
table_close(rel, AccessShareLock);

0 commit comments

Comments
 (0)