Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1f5a0f7

Browse files
vigneshwaran-cCommitfest Bot
authored and
Commitfest Bot
committed
Introduce "ALL SEQUENCES" support for PostgreSQL logical replication
This commit enhances logical replication by enabling the inclusion of all sequences in publications. Furthermore, enhancements to psql commands now display which publications contain the specified sequence (\d command), and if a specified publication includes all sequences (\dRp command). Note: This patch currently supports only the "ALL SEQUENCES" clause. Handling of clauses such as "FOR SEQUENCE" and "FOR SEQUENCES IN SCHEMA" will be addressed in a subsequent patch.
1 parent dc3980a commit 1f5a0f7

File tree

14 files changed

+715
-335
lines changed

14 files changed

+715
-335
lines changed

src/backend/catalog/pg_publication.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ static bool
134134
is_publishable_class(Oid relid, Form_pg_class reltuple)
135135
{
136136
return (reltuple->relkind == RELKIND_RELATION ||
137-
reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
137+
reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
138+
reltuple->relkind == RELKIND_SEQUENCE) &&
138139
!IsCatalogRelationOid(relid) &&
139140
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
140141
relid >= FirstNormalObjectId;
@@ -1061,6 +1062,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
10611062
return result;
10621063
}
10631064

1065+
/*
1066+
* Gets list of all relations published by FOR ALL SEQUENCES publication(s).
1067+
*/
1068+
List *
1069+
GetAllSequencesPublicationRelations(void)
1070+
{
1071+
Relation classRel;
1072+
ScanKeyData key[1];
1073+
TableScanDesc scan;
1074+
HeapTuple tuple;
1075+
List *result = NIL;
1076+
1077+
classRel = table_open(RelationRelationId, AccessShareLock);
1078+
1079+
ScanKeyInit(&key[0],
1080+
Anum_pg_class_relkind,
1081+
BTEqualStrategyNumber, F_CHAREQ,
1082+
CharGetDatum(RELKIND_SEQUENCE));
1083+
1084+
scan = table_beginscan_catalog(classRel, 1, key);
1085+
1086+
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1087+
{
1088+
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1089+
Oid relid = relForm->oid;
1090+
1091+
if (is_publishable_class(relid, relForm))
1092+
result = lappend_oid(result, relid);
1093+
}
1094+
1095+
table_endscan(scan);
1096+
1097+
table_close(classRel, AccessShareLock);
1098+
return result;
1099+
}
1100+
10641101
/*
10651102
* Get publication using oid
10661103
*
@@ -1083,6 +1120,7 @@ GetPublication(Oid pubid)
10831120
pub->oid = pubid;
10841121
pub->name = pstrdup(NameStr(pubform->pubname));
10851122
pub->alltables = pubform->puballtables;
1123+
pub->allsequences = pubform->puballsequences;
10861124
pub->pubactions.pubinsert = pubform->pubinsert;
10871125
pub->pubactions.pubupdate = pubform->pubupdate;
10881126
pub->pubactions.pubdelete = pubform->pubdelete;

src/backend/commands/publicationcmds.c

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -848,11 +848,17 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
848848
aclcheck_error(aclresult, OBJECT_DATABASE,
849849
get_database_name(MyDatabaseId));
850850

851-
/* FOR ALL TABLES requires superuser */
852-
if (stmt->for_all_tables && !superuser())
853-
ereport(ERROR,
854-
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
855-
errmsg("must be superuser to create FOR ALL TABLES publication")));
851+
if (!superuser())
852+
{
853+
if (stmt->for_all_tables)
854+
ereport(ERROR,
855+
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
856+
errmsg("must be superuser to create a FOR ALL TABLES publication"));
857+
if (stmt->for_all_sequences)
858+
ereport(ERROR,
859+
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
860+
errmsg("must be superuser to create a FOR ALL SEQUENCES publication"));
861+
}
856862

857863
rel = table_open(PublicationRelationId, RowExclusiveLock);
858864

@@ -886,6 +892,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
886892
values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
887893
values[Anum_pg_publication_puballtables - 1] =
888894
BoolGetDatum(stmt->for_all_tables);
895+
values[Anum_pg_publication_puballsequences - 1] =
896+
BoolGetDatum(stmt->for_all_sequences);
889897
values[Anum_pg_publication_pubinsert - 1] =
890898
BoolGetDatum(pubactions.pubinsert);
891899
values[Anum_pg_publication_pubupdate - 1] =
@@ -2019,19 +2027,27 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
20192027
aclcheck_error(aclresult, OBJECT_DATABASE,
20202028
get_database_name(MyDatabaseId));
20212029

2022-
if (form->puballtables && !superuser_arg(newOwnerId))
2023-
ereport(ERROR,
2024-
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2025-
errmsg("permission denied to change owner of publication \"%s\"",
2026-
NameStr(form->pubname)),
2027-
errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
2028-
2029-
if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
2030-
ereport(ERROR,
2031-
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2032-
errmsg("permission denied to change owner of publication \"%s\"",
2033-
NameStr(form->pubname)),
2034-
errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
2030+
if (!superuser_arg(newOwnerId))
2031+
{
2032+
if (form->puballtables)
2033+
ereport(ERROR,
2034+
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2035+
errmsg("permission denied to change owner of publication \"%s\"",
2036+
NameStr(form->pubname)),
2037+
errhint("The owner of a FOR ALL TABLES publication must be a superuser."));
2038+
if (form->puballsequences)
2039+
ereport(ERROR,
2040+
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2041+
errmsg("permission denied to change owner of publication \"%s\"",
2042+
NameStr(form->pubname)),
2043+
errhint("The owner of a FOR ALL SEQUENCES publication must be a superuser."));
2044+
if (is_schema_publication(form->oid))
2045+
ereport(ERROR,
2046+
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2047+
errmsg("permission denied to change owner of publication \"%s\"",
2048+
NameStr(form->pubname)),
2049+
errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser."));
2050+
}
20352051
}
20362052

20372053
form->pubowner = newOwnerId;

src/backend/parser/gram.y

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ static PartitionStrategy parsePartitionStrategy(char *strategy, int location,
204204
core_yyscan_t yyscanner);
205205
static void preprocess_pubobj_list(List *pubobjspec_list,
206206
core_yyscan_t yyscanner);
207+
static void preprocess_pub_all_objtype_list(List *all_objects_list,
208+
bool *all_tables,
209+
bool *all_sequences,
210+
core_yyscan_t yyscanner);
207211
static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
208212

209213
%}
@@ -260,6 +264,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
260264
PartitionBoundSpec *partboundspec;
261265
RoleSpec *rolespec;
262266
PublicationObjSpec *publicationobjectspec;
267+
PublicationAllObjSpec *publicationallobjectspec;
263268
struct SelectLimit *selectlimit;
264269
SetQuantifier setquantifier;
265270
struct GroupClause *groupclause;
@@ -446,7 +451,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
446451
transform_element_list transform_type_list
447452
TriggerTransitions TriggerReferencing
448453
vacuum_relation_list opt_vacuum_relation_list
449-
drop_option_list pub_obj_list
454+
drop_option_list pub_obj_list pub_obj_type_list
450455

451456
%type <retclause> returning_clause
452457
%type <node> returning_option
@@ -585,6 +590,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
585590
%type <node> var_value zone_value
586591
%type <rolespec> auth_ident RoleSpec opt_granted_by
587592
%type <publicationobjectspec> PublicationObjSpec
593+
%type <publicationallobjectspec> PublicationAllObjSpec
588594

589595
%type <keyword> unreserved_keyword type_func_name_keyword
590596
%type <keyword> col_name_keyword reserved_keyword
@@ -10614,7 +10620,12 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
1061410620
*
1061510621
* CREATE PUBLICATION name [WITH options]
1061610622
*
10617-
* CREATE PUBLICATION FOR ALL TABLES [WITH options]
10623+
* CREATE PUBLICATION FOR ALL pub_obj_type [, ...] [WITH options]
10624+
*
10625+
* pub_obj_type is one of:
10626+
*
10627+
* TABLES
10628+
* SEQUENCES
1061810629
*
1061910630
* CREATE PUBLICATION FOR pub_obj [, ...] [WITH options]
1062010631
*
@@ -10634,13 +10645,13 @@ CreatePublicationStmt:
1063410645
n->options = $4;
1063510646
$$ = (Node *) n;
1063610647
}
10637-
| CREATE PUBLICATION name FOR ALL TABLES opt_definition
10648+
| CREATE PUBLICATION name FOR pub_obj_type_list opt_definition
1063810649
{
1063910650
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
1064010651

1064110652
n->pubname = $3;
10642-
n->options = $7;
10643-
n->for_all_tables = true;
10653+
preprocess_pub_all_objtype_list($5, &n->for_all_tables, &n->for_all_sequences, yyscanner);
10654+
n->options = $6;
1064410655
$$ = (Node *) n;
1064510656
}
1064610657
| CREATE PUBLICATION name FOR pub_obj_list opt_definition
@@ -10752,6 +10763,28 @@ pub_obj_list: PublicationObjSpec
1075210763
{ $$ = lappend($1, $3); }
1075310764
;
1075410765

10766+
PublicationAllObjSpec:
10767+
ALL TABLES
10768+
{
10769+
$$ = makeNode(PublicationAllObjSpec);
10770+
$$->pubobjtype = PUBLICATION_ALL_TABLES;
10771+
$$->location = @1;
10772+
}
10773+
| ALL SEQUENCES
10774+
{
10775+
$$ = makeNode(PublicationAllObjSpec);
10776+
$$->pubobjtype = PUBLICATION_ALL_SEQUENCES;
10777+
$$->location = @1;
10778+
}
10779+
;
10780+
10781+
pub_obj_type_list: PublicationAllObjSpec
10782+
{ $$ = list_make1($1); }
10783+
| pub_obj_type_list ',' PublicationAllObjSpec
10784+
{ $$ = lappend($1, $3); }
10785+
;
10786+
10787+
1075510788
/*****************************************************************************
1075610789
*
1075710790
* ALTER PUBLICATION name SET ( options )
@@ -19638,6 +19671,47 @@ parsePartitionStrategy(char *strategy, int location, core_yyscan_t yyscanner)
1963819671

1963919672
}
1964019673

19674+
/*
19675+
* Process all_objects_list to set all_tables/all_sequences.
19676+
* Also, checks if the pub_object_type has been specified more than once.
19677+
*/
19678+
static void
19679+
preprocess_pub_all_objtype_list(List *all_objects_list, bool *all_tables,
19680+
bool *all_sequences, core_yyscan_t yyscanner)
19681+
{
19682+
if (!all_objects_list)
19683+
return;
19684+
19685+
*all_tables = false;
19686+
*all_sequences = false;
19687+
19688+
foreach_ptr(PublicationAllObjSpec, obj, all_objects_list)
19689+
{
19690+
if (obj->pubobjtype == PUBLICATION_ALL_TABLES)
19691+
{
19692+
if (*all_tables)
19693+
ereport(ERROR,
19694+
errcode(ERRCODE_SYNTAX_ERROR),
19695+
errmsg("invalid publication object list"),
19696+
errdetail("ALL TABLES can be specified only once."),
19697+
parser_errposition(obj->location));
19698+
19699+
*all_tables = true;
19700+
}
19701+
else if (obj->pubobjtype == PUBLICATION_ALL_SEQUENCES)
19702+
{
19703+
if (*all_sequences)
19704+
ereport(ERROR,
19705+
errcode(ERRCODE_SYNTAX_ERROR),
19706+
errmsg("invalid publication object list"),
19707+
errdetail("ALL SEQUENCES can be specified only once."),
19708+
parser_errposition(obj->location));
19709+
19710+
*all_sequences = true;
19711+
}
19712+
}
19713+
}
19714+
1964119715
/*
1964219716
* Process pubobjspec_list to check for errors in any of the objects and
1964319717
* convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType.

src/bin/pg_dump/pg_dump.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4390,6 +4390,7 @@ getPublications(Archive *fout)
43904390
int i_pubname;
43914391
int i_pubowner;
43924392
int i_puballtables;
4393+
int i_puballsequences;
43934394
int i_pubinsert;
43944395
int i_pubupdate;
43954396
int i_pubdelete;
@@ -4420,9 +4421,9 @@ getPublications(Archive *fout)
44204421
appendPQExpBufferStr(query, "false AS pubviaroot, ");
44214422

44224423
if (fout->remoteVersion >= 180000)
4423-
appendPQExpBufferStr(query, "p.pubgencols ");
4424+
appendPQExpBufferStr(query, "p.pubgencols, p.puballsequences ");
44244425
else
4425-
appendPQExpBuffer(query, "'%c' AS pubgencols ", PUBLISH_GENCOLS_NONE);
4426+
appendPQExpBuffer(query, "'%c' AS pubgencols, false AS puballsequences ", PUBLISH_GENCOLS_NONE);
44264427

44274428
appendPQExpBufferStr(query, "FROM pg_publication p");
44284429

@@ -4438,6 +4439,7 @@ getPublications(Archive *fout)
44384439
i_pubname = PQfnumber(res, "pubname");
44394440
i_pubowner = PQfnumber(res, "pubowner");
44404441
i_puballtables = PQfnumber(res, "puballtables");
4442+
i_puballsequences = PQfnumber(res, "puballsequences");
44414443
i_pubinsert = PQfnumber(res, "pubinsert");
44424444
i_pubupdate = PQfnumber(res, "pubupdate");
44434445
i_pubdelete = PQfnumber(res, "pubdelete");
@@ -4458,6 +4460,8 @@ getPublications(Archive *fout)
44584460
pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
44594461
pubinfo[i].puballtables =
44604462
(strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4463+
pubinfo[i].puballsequences =
4464+
(strcmp(PQgetvalue(res, i, i_puballsequences), "t") == 0);
44614465
pubinfo[i].pubinsert =
44624466
(strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
44634467
pubinfo[i].pubupdate =
@@ -4509,8 +4513,12 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
45094513
appendPQExpBuffer(query, "CREATE PUBLICATION %s",
45104514
qpubname);
45114515

4512-
if (pubinfo->puballtables)
4516+
if (pubinfo->puballtables && pubinfo->puballsequences)
4517+
appendPQExpBufferStr(query, " FOR ALL TABLES, ALL SEQUENCES");
4518+
else if (pubinfo->puballtables)
45134519
appendPQExpBufferStr(query, " FOR ALL TABLES");
4520+
else if (pubinfo->puballsequences)
4521+
appendPQExpBufferStr(query, " FOR ALL SEQUENCES");
45144522

45154523
appendPQExpBufferStr(query, " WITH (publish = '");
45164524
if (pubinfo->pubinsert)

src/bin/pg_dump/pg_dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ typedef struct _PublicationInfo
661661
DumpableObject dobj;
662662
const char *rolname;
663663
bool puballtables;
664+
bool puballsequences;
664665
bool pubinsert;
665666
bool pubupdate;
666667
bool pubdelete;

src/bin/pg_dump/t/002_pg_dump.pl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3273,6 +3273,28 @@
32733273
like => { %full_runs, section_post_data => 1, },
32743274
},
32753275
3276+
'CREATE PUBLICATION pub5' => {
3277+
create_order => 50,
3278+
create_sql => 'CREATE PUBLICATION pub5
3279+
FOR ALL SEQUENCES
3280+
WITH (publish = \'\');',
3281+
regexp => qr/^
3282+
\QCREATE PUBLICATION pub5 FOR ALL SEQUENCES WITH (publish = '');\E
3283+
/xm,
3284+
like => { %full_runs, section_post_data => 1, },
3285+
},
3286+
3287+
'CREATE PUBLICATION pub6' => {
3288+
create_order => 50,
3289+
create_sql => 'CREATE PUBLICATION pub6
3290+
FOR ALL SEQUENCES, ALL TABLES
3291+
WITH (publish = \'\');',
3292+
regexp => qr/^
3293+
\QCREATE PUBLICATION pub6 FOR ALL TABLES, ALL SEQUENCES WITH (publish = '');\E
3294+
/xm,
3295+
like => { %full_runs, section_post_data => 1, },
3296+
},
3297+
32763298
'CREATE SUBSCRIPTION sub1' => {
32773299
create_order => 50,
32783300
create_sql => 'CREATE SUBSCRIPTION sub1

0 commit comments

Comments
 (0)