Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Davis2023-01-13 23:32:37 +0000
committerJeff Davis2023-01-14 08:16:23 +0000
commitff9618e82a466fc9c635f9f087776e57b21e4f14 (patch)
tree667ee5aece95c24ff098be9e39f67aa9df461d2d /src/backend/commands
parentff23b592ad6621563d3128b26860bcb41daf9542 (diff)
Fix MAINTAIN privileges for toast tables and partitions.
Commit 60684dd8 left loose ends when it came to maintaining toast tables or partitions. For toast tables, simply skip the privilege check if the toast table is an indirect target of the maintenance command, because the main table privileges have already been checked. For partitions, allow the maintenance command if the user has the MAINTAIN privilege on the partition or any parent. Also make CLUSTER emit "skipping" messages when the user doesn't have privileges, similar to VACUUM. Author: Nathan Bossart Reported-by: Pavel Luzanov Reviewed-by: Pavel Luzanov, Ted Yu Discussion: https://postgr.es/m/20230113231339.GA2422750@nathanxps13
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/cluster.c35
-rw-r--r--src/backend/commands/indexcmds.c22
-rw-r--r--src/backend/commands/lockcmds.c8
-rw-r--r--src/backend/commands/tablecmds.c30
-rw-r--r--src/backend/commands/vacuum.c21
5 files changed, 85 insertions, 31 deletions
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index f11691aff79..369fea7c046 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -80,6 +80,7 @@ static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
static List *get_tables_to_cluster(MemoryContext cluster_context);
static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
Oid indexOid);
+static bool cluster_is_permitted_for_relation(Oid relid, Oid userid);
/*---------------------------------------------------------------------------
@@ -366,8 +367,7 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
if (recheck)
{
/* Check that the user still has privileges for the relation */
- if (!object_ownercheck(RelationRelationId, tableOid, save_userid) &&
- pg_class_aclcheck(tableOid, save_userid, ACL_MAINTAIN) != ACLCHECK_OK)
+ if (!cluster_is_permitted_for_relation(tableOid, save_userid))
{
relation_close(OldHeap, AccessExclusiveLock);
goto out;
@@ -1645,8 +1645,7 @@ get_tables_to_cluster(MemoryContext cluster_context)
index = (Form_pg_index) GETSTRUCT(indexTuple);
- if (!object_ownercheck(RelationRelationId, index->indrelid, GetUserId()) &&
- pg_class_aclcheck(index->indrelid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+ if (!cluster_is_permitted_for_relation(index->indrelid, GetUserId()))
continue;
/* Use a permanent memory context for the result list */
@@ -1694,12 +1693,11 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
if (get_rel_relkind(indexrelid) != RELKIND_INDEX)
continue;
- /* Silently skip partitions which the user has no access to. */
- if (!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
- pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
- (!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) ||
- IsSharedRelation(relid)))
- continue;
+ /*
+ * We already checked that the user has privileges to CLUSTER the
+ * partitioned table when we locked it earlier, so there's no need to
+ * check the privileges again here.
+ */
/* Use a permanent memory context for the result list */
old_context = MemoryContextSwitchTo(cluster_context);
@@ -1714,3 +1712,20 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
return rtcs;
}
+
+/*
+ * Return whether userid has privileges to CLUSTER relid. If not, this
+ * function emits a WARNING.
+ */
+static bool
+cluster_is_permitted_for_relation(Oid relid, Oid userid)
+{
+ if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK ||
+ has_partition_ancestor_privs(relid, userid, ACL_MAINTAIN))
+ return true;
+
+ ereport(WARNING,
+ (errmsg("permission denied to cluster \"%s\", skipping it",
+ get_rel_name(relid))));
+ return false;
+}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 8afc006f891..16ec0b114e6 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2796,9 +2796,9 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation,
/* Check permissions */
table_oid = IndexGetRelation(relId, true);
- if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
- OidIsValid(table_oid) &&
- pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+ if (OidIsValid(table_oid) &&
+ pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+ !has_partition_ancestor_privs(table_oid, GetUserId(), ACL_MAINTAIN))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
relation->relname);
@@ -3008,16 +3008,16 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
/*
* The table can be reindexed if the user has been granted MAINTAIN on
- * the table or the user is a superuser, the table owner, or the
- * database/schema owner (but in the latter case, only if it's not a
- * shared relation). object_ownercheck includes the superuser case,
- * and depending on objectKind we already know that the user has
- * permission to run REINDEX on this database or schema per the
- * permission checks at the beginning of this routine.
+ * the table or one of its partition ancestors or the user is a
+ * superuser, the table owner, or the database/schema owner (but in the
+ * latter case, only if it's not a shared relation). pg_class_aclcheck
+ * includes the superuser case, and depending on objectKind we already
+ * know that the user has permission to run REINDEX on this database or
+ * schema per the permission checks at the beginning of this routine.
*/
if (classtuple->relisshared &&
- !object_ownercheck(RelationRelationId, relid, GetUserId()) &&
- pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+ pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+ !has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
continue;
/*
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 410d78b040f..6bf1b815f01 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -19,6 +19,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_inherits.h"
#include "commands/lockcmds.h"
+#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_clause.h"
@@ -305,5 +306,12 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
aclresult = pg_class_aclcheck(reloid, userid, aclmask);
+ /*
+ * If this is a partition, check permissions of its ancestors if needed.
+ */
+ if (aclresult != ACLCHECK_OK &&
+ has_partition_ancestor_privs(reloid, userid, ACL_MAINTAIN))
+ aclresult = ACLCHECK_OK;
+
return aclresult;
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1fbdad4b649..7c697a285bd 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16886,13 +16886,39 @@ RangeVarCallbackMaintainsTable(const RangeVar *relation,
errmsg("\"%s\" is not a table or materialized view", relation->relname)));
/* Check permissions */
- if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
- pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+ if (pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+ !has_partition_ancestor_privs(relId, GetUserId(), ACL_MAINTAIN))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLE,
relation->relname);
}
/*
+ * If relid is a partition, returns whether userid has any of the privileges
+ * specified in acl on any of its ancestors. Otherwise, returns false.
+ */
+bool
+has_partition_ancestor_privs(Oid relid, Oid userid, AclMode acl)
+{
+ List *ancestors;
+ ListCell *lc;
+
+ if (!get_rel_relispartition(relid))
+ return false;
+
+ ancestors = get_partition_ancestors(relid);
+ foreach(lc, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc);
+
+ if (OidIsValid(ancestor) &&
+ pg_class_aclcheck(ancestor, userid, acl) == ACLCHECK_OK)
+ return true;
+ }
+
+ return false;
+}
+
+/*
* Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
*/
static void
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ea1428dc8c0..7b1a4b127eb 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -41,6 +41,7 @@
#include "catalog/pg_namespace.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
+#include "commands/tablecmds.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@@ -91,7 +92,8 @@ static void vac_truncate_clog(TransactionId frozenXID,
MultiXactId minMulti,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
+ bool skip_privs);
static double compute_parallel_delay(void);
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
static bool vac_tid_reaped(ItemPointer itemptr, void *state);
@@ -501,7 +503,7 @@ vacuum(List *relations, VacuumParams *params,
if (params->options & VACOPT_VACUUM)
{
- if (!vacuum_rel(vrel->oid, vrel->relation, params))
+ if (!vacuum_rel(vrel->oid, vrel->relation, params, false))
continue;
}
@@ -598,11 +600,13 @@ vacuum_is_permitted_for_relation(Oid relid, Form_pg_class reltuple,
* - the role owns the relation
* - the role owns the current database and the relation is not shared
* - the role has been granted the MAINTAIN privilege on the relation
+ * - the role has privileges to vacuum/analyze any of the relation's
+ * partition ancestors
*----------
*/
- if (object_ownercheck(RelationRelationId, relid, GetUserId()) ||
- (object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
- pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK)
+ if ((object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
+ pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK ||
+ has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
return true;
relname = NameStr(reltuple->relname);
@@ -1828,7 +1832,7 @@ vac_truncate_clog(TransactionId frozenXID,
* At entry and exit, we are not inside a transaction.
*/
static bool
-vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, bool skip_privs)
{
LOCKMODE lmode;
Relation rel;
@@ -1915,7 +1919,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
* happen across multiple transactions where privileges could have changed
* in-between. Make sure to only generate logs for VACUUM in this case.
*/
- if (!vacuum_is_permitted_for_relation(RelationGetRelid(rel),
+ if (!skip_privs &&
+ !vacuum_is_permitted_for_relation(RelationGetRelid(rel),
rel->rd_rel,
params->options & VACOPT_VACUUM))
{
@@ -2089,7 +2094,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
* totally unimportant for toast relations.
*/
if (toast_relid != InvalidOid)
- vacuum_rel(toast_relid, NULL, params);
+ vacuum_rel(toast_relid, NULL, params, true);
/*
* Now release the session-level lock on the main table.