@@ -66,7 +66,6 @@ typedef struct rf_context
66
66
Oid parentid ; /* relid of the parent relation */
67
67
} rf_context ;
68
68
69
- static List * OpenRelIdList (List * relids );
70
69
static List * OpenTableList (List * tables );
71
70
static void CloseTableList (List * rels );
72
71
static void LockSchemaList (List * schemalist );
@@ -214,44 +213,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
214
213
}
215
214
}
216
215
217
- /*
218
- * Check if any of the given relation's schema is a member of the given schema
219
- * list.
220
- */
221
- static void
222
- CheckObjSchemaNotAlreadyInPublication (List * rels , List * schemaidlist ,
223
- PublicationObjSpecType checkobjtype )
224
- {
225
- ListCell * lc ;
226
-
227
- foreach (lc , rels )
228
- {
229
- PublicationRelInfo * pub_rel = (PublicationRelInfo * ) lfirst (lc );
230
- Relation rel = pub_rel -> relation ;
231
- Oid relSchemaId = RelationGetNamespace (rel );
232
-
233
- if (list_member_oid (schemaidlist , relSchemaId ))
234
- {
235
- if (checkobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA )
236
- ereport (ERROR ,
237
- errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
238
- errmsg ("cannot add schema \"%s\" to publication" ,
239
- get_namespace_name (relSchemaId )),
240
- errdetail ("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported." ,
241
- RelationGetRelationName (rel ),
242
- get_namespace_name (relSchemaId )));
243
- else if (checkobjtype == PUBLICATIONOBJ_TABLE )
244
- ereport (ERROR ,
245
- errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
246
- errmsg ("cannot add relation \"%s.%s\" to publication" ,
247
- get_namespace_name (relSchemaId ),
248
- RelationGetRelationName (rel )),
249
- errdetail ("Table's schema \"%s\" is already part of the publication or part of the specified schema list." ,
250
- get_namespace_name (relSchemaId )));
251
- }
252
- }
253
- }
254
-
255
216
/*
256
217
* Returns true if any of the columns used in the row filter WHERE expression is
257
218
* not part of REPLICA IDENTITY, false otherwise.
@@ -721,7 +682,7 @@ TransformPubWhereClauses(List *tables, const char *queryString,
721
682
*/
722
683
static void
723
684
CheckPubRelationColumnList (List * tables , const char * queryString ,
724
- bool pubviaroot )
685
+ bool publish_schema , bool pubviaroot )
725
686
{
726
687
ListCell * lc ;
727
688
@@ -732,6 +693,24 @@ CheckPubRelationColumnList(List *tables, const char *queryString,
732
693
if (pri -> columns == NIL )
733
694
continue ;
734
695
696
+ /*
697
+ * Disallow specifying column list if any schema is in the
698
+ * publication.
699
+ *
700
+ * XXX We could instead just forbid the case when the publication
701
+ * tries to publish the table with a column list and a schema for that
702
+ * table. However, if we do that then we need a restriction during
703
+ * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
704
+ * seem to be a good idea.
705
+ */
706
+ if (publish_schema )
707
+ ereport (ERROR ,
708
+ errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
709
+ errmsg ("cannot use publication column list for relation \"%s.%s\"" ,
710
+ get_namespace_name (RelationGetNamespace (pri -> relation )),
711
+ RelationGetRelationName (pri -> relation )),
712
+ errdetail ("Column list cannot be specified if any schema is part of the publication or specified in the list." ));
713
+
735
714
/*
736
715
* If the publication doesn't publish changes via the root partitioned
737
716
* table, the partition's column list will be used. So disallow using
@@ -858,13 +837,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
858
837
List * rels ;
859
838
860
839
rels = OpenTableList (relations );
861
- CheckObjSchemaNotAlreadyInPublication (rels , schemaidlist ,
862
- PUBLICATIONOBJ_TABLE );
863
-
864
840
TransformPubWhereClauses (rels , pstate -> p_sourcetext ,
865
841
publish_via_partition_root );
866
842
867
843
CheckPubRelationColumnList (rels , pstate -> p_sourcetext ,
844
+ schemaidlist != NIL ,
868
845
publish_via_partition_root );
869
846
870
847
PublicationAddTables (puboid , rels , true, NULL );
@@ -1110,8 +1087,8 @@ InvalidatePublicationRels(List *relids)
1110
1087
*/
1111
1088
static void
1112
1089
AlterPublicationTables (AlterPublicationStmt * stmt , HeapTuple tup ,
1113
- List * tables , List * schemaidlist ,
1114
- const char * queryString )
1090
+ List * tables , const char * queryString ,
1091
+ bool publish_schema )
1115
1092
{
1116
1093
List * rels = NIL ;
1117
1094
Form_pg_publication pubform = (Form_pg_publication ) GETSTRUCT (tup );
@@ -1129,19 +1106,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1129
1106
1130
1107
if (stmt -> action == AP_AddObjects )
1131
1108
{
1132
- List * schemas = NIL ;
1133
-
1134
- /*
1135
- * Check if the relation is member of the existing schema in the
1136
- * publication or member of the schema list specified.
1137
- */
1138
- schemas = list_concat_copy (schemaidlist , GetPublicationSchemas (pubid ));
1139
- CheckObjSchemaNotAlreadyInPublication (rels , schemas ,
1140
- PUBLICATIONOBJ_TABLE );
1141
-
1142
1109
TransformPubWhereClauses (rels , queryString , pubform -> pubviaroot );
1143
1110
1144
- CheckPubRelationColumnList (rels , queryString , pubform -> pubviaroot );
1111
+ publish_schema |= is_schema_publication (pubid );
1112
+
1113
+ CheckPubRelationColumnList (rels , queryString , publish_schema ,
1114
+ pubform -> pubviaroot );
1145
1115
1146
1116
PublicationAddTables (pubid , rels , false, stmt );
1147
1117
}
@@ -1154,12 +1124,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1154
1124
List * delrels = NIL ;
1155
1125
ListCell * oldlc ;
1156
1126
1157
- CheckObjSchemaNotAlreadyInPublication (rels , schemaidlist ,
1158
- PUBLICATIONOBJ_TABLE );
1159
-
1160
1127
TransformPubWhereClauses (rels , queryString , pubform -> pubviaroot );
1161
1128
1162
- CheckPubRelationColumnList (rels , queryString , pubform -> pubviaroot );
1129
+ CheckPubRelationColumnList (rels , queryString , publish_schema ,
1130
+ pubform -> pubviaroot );
1163
1131
1164
1132
/*
1165
1133
* To recreate the relation list for the publication, look for
@@ -1308,16 +1276,35 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
1308
1276
LockSchemaList (schemaidlist );
1309
1277
if (stmt -> action == AP_AddObjects )
1310
1278
{
1311
- List * rels ;
1279
+ ListCell * lc ;
1312
1280
List * reloids ;
1313
1281
1314
1282
reloids = GetPublicationRelations (pubform -> oid , PUBLICATION_PART_ROOT );
1315
- rels = OpenRelIdList (reloids );
1316
1283
1317
- CheckObjSchemaNotAlreadyInPublication (rels , schemaidlist ,
1318
- PUBLICATIONOBJ_TABLES_IN_SCHEMA );
1284
+ foreach (lc , reloids )
1285
+ {
1286
+ HeapTuple coltuple ;
1287
+
1288
+ coltuple = SearchSysCache2 (PUBLICATIONRELMAP ,
1289
+ ObjectIdGetDatum (lfirst_oid (lc )),
1290
+ ObjectIdGetDatum (pubform -> oid ));
1291
+
1292
+ if (!HeapTupleIsValid (coltuple ))
1293
+ continue ;
1294
+
1295
+ /*
1296
+ * Disallow adding schema if column list is already part of the
1297
+ * publication. See CheckPubRelationColumnList.
1298
+ */
1299
+ if (!heap_attisnull (coltuple , Anum_pg_publication_rel_prattrs , NULL ))
1300
+ ereport (ERROR ,
1301
+ errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
1302
+ errmsg ("cannot add schema to the publication" ),
1303
+ errdetail ("Schema cannot be added if any table that specifies column list is already part of the publication." ));
1304
+
1305
+ ReleaseSysCache (coltuple );
1306
+ }
1319
1307
1320
- CloseTableList (rels );
1321
1308
PublicationAddSchemas (pubform -> oid , schemaidlist , false, stmt );
1322
1309
}
1323
1310
else if (stmt -> action == AP_DropObjects )
@@ -1429,14 +1416,7 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1429
1416
1430
1417
heap_freetuple (tup );
1431
1418
1432
- /*
1433
- * Lock the publication so nobody else can do anything with it. This
1434
- * prevents concurrent alter to add table(s) that were already going
1435
- * to become part of the publication by adding corresponding schema(s)
1436
- * via this command and similarly it will prevent the concurrent
1437
- * addition of schema(s) for which there is any corresponding table
1438
- * being added by this command.
1439
- */
1419
+ /* Lock the publication so nobody else can do anything with it. */
1440
1420
LockDatabaseObject (PublicationRelationId , pubid , 0 ,
1441
1421
AccessExclusiveLock );
1442
1422
@@ -1453,8 +1433,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1453
1433
errmsg ("publication \"%s\" does not exist" ,
1454
1434
stmt -> pubname ));
1455
1435
1456
- AlterPublicationTables (stmt , tup , relations , schemaidlist ,
1457
- pstate -> p_sourcetext );
1436
+ AlterPublicationTables (stmt , tup , relations , pstate -> p_sourcetext ,
1437
+ schemaidlist != NIL );
1458
1438
AlterPublicationSchemas (stmt , tup , schemaidlist );
1459
1439
}
1460
1440
@@ -1569,32 +1549,6 @@ RemovePublicationSchemaById(Oid psoid)
1569
1549
table_close (rel , RowExclusiveLock );
1570
1550
}
1571
1551
1572
- /*
1573
- * Open relations specified by a relid list.
1574
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1575
- * add them to a publication.
1576
- */
1577
- static List *
1578
- OpenRelIdList (List * relids )
1579
- {
1580
- ListCell * lc ;
1581
- List * rels = NIL ;
1582
-
1583
- foreach (lc , relids )
1584
- {
1585
- PublicationRelInfo * pub_rel ;
1586
- Oid relid = lfirst_oid (lc );
1587
- Relation rel = table_open (relid ,
1588
- ShareUpdateExclusiveLock );
1589
-
1590
- pub_rel = palloc (sizeof (PublicationRelInfo ));
1591
- pub_rel -> relation = rel ;
1592
- rels = lappend (rels , pub_rel );
1593
- }
1594
-
1595
- return rels ;
1596
- }
1597
-
1598
1552
/*
1599
1553
* Open relations specified by a PublicationTable list.
1600
1554
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
0 commit comments