Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
In pg_dump, avoid doing per-table queries for RLS policies.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 31 Aug 2021 19:04:05 +0000 (15:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 31 Aug 2021 19:04:05 +0000 (15:04 -0400)
For no particularly good reason, getPolicies() queried pg_policy
separately for each table.  We can collect all the policies in
a single query instead, and attach them to the correct TableInfo
objects using findTableByOid() lookups.  On the regression
database, this reduces the number of queries substantially, and
provides a visible savings even when running against a local
server.

Per complaint from Hubert Depesz Lubaczewski.  Since this is such
a simple fix and can have a visible performance benefit, back-patch
to all supported branches.

Discussion: https://postgr.es/m/20210826084430.GA26282@depesz.com

src/bin/pg_dump/pg_dump.c

index 9aae8f8f55cf28f0f1a72f24f0dc403c4c15eef6..beee8471bc0e17bc14d94bf64e0ef355eb116213 100644 (file)
@@ -3508,7 +3508,7 @@ dumpBlobs(Archive *fout, void *arg)
 
 /*
  * getPolicies
- *   get information about policies on a dumpable table.
+ *   get information about all RLS policies on dumpable tables.
  */
 void
 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
@@ -3518,6 +3518,7 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
    PolicyInfo *polinfo;
    int         i_oid;
    int         i_tableoid;
+   int         i_polrelid;
    int         i_polname;
    int         i_polcmd;
    int         i_polpermissive;
@@ -3533,6 +3534,10 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
 
    query = createPQExpBuffer();
 
+   /*
+    * First, check which tables have RLS enabled.  We represent RLS being
+    * enabled on a table by creating a PolicyInfo object with null polname.
+    */
    for (i = 0; i < numTables; i++)
    {
        TableInfo  *tbinfo = &tblinfo[i];
@@ -3541,15 +3546,6 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
        if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
            continue;
 
-       pg_log_info("reading row security enabled for table \"%s.%s\"",
-                   tbinfo->dobj.namespace->dobj.name,
-                   tbinfo->dobj.name);
-
-       /*
-        * Get row security enabled information for the table. We represent
-        * RLS being enabled on a table by creating a PolicyInfo object with
-        * null polname.
-        */
        if (tbinfo->rowsec)
        {
            /*
@@ -3571,51 +3567,35 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
            polinfo->polqual = NULL;
            polinfo->polwithcheck = NULL;
        }
+   }
 
-       pg_log_info("reading policies for table \"%s.%s\"",
-                   tbinfo->dobj.namespace->dobj.name,
-                   tbinfo->dobj.name);
-
-       resetPQExpBuffer(query);
-
-       /* Get the policies for the table. */
-       if (fout->remoteVersion >= 100000)
-           appendPQExpBuffer(query,
-                             "SELECT oid, tableoid, pol.polname, pol.polcmd, pol.polpermissive, "
-                             "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
-                             "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
-                             "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
-                             "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
-                             "FROM pg_catalog.pg_policy pol "
-                             "WHERE polrelid = '%u'",
-                             tbinfo->dobj.catId.oid);
-       else
-           appendPQExpBuffer(query,
-                             "SELECT oid, tableoid, pol.polname, pol.polcmd, 't' as polpermissive, "
-                             "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
-                             "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
-                             "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
-                             "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
-                             "FROM pg_catalog.pg_policy pol "
-                             "WHERE polrelid = '%u'",
-                             tbinfo->dobj.catId.oid);
-       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+   /*
+    * Now, read all RLS policies, and create PolicyInfo objects for all those
+    * that are of interest.
+    */
+   pg_log_info("reading row-level security policies");
 
-       ntups = PQntuples(res);
+   printfPQExpBuffer(query,
+                     "SELECT oid, tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
+   if (fout->remoteVersion >= 100000)
+       appendPQExpBuffer(query, "pol.polpermissive, ");
+   else
+       appendPQExpBuffer(query, "'t' as polpermissive, ");
+   appendPQExpBuffer(query,
+                     "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
+                     "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
+                     "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
+                     "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
+                     "FROM pg_catalog.pg_policy pol");
 
-       if (ntups == 0)
-       {
-           /*
-            * No explicit policies to handle (only the default-deny policy,
-            * which is handled as part of the table definition).  Clean up
-            * and return.
-            */
-           PQclear(res);
-           continue;
-       }
+   res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
+   ntups = PQntuples(res);
+   if (ntups > 0)
+   {
        i_oid = PQfnumber(res, "oid");
        i_tableoid = PQfnumber(res, "tableoid");
+       i_polrelid = PQfnumber(res, "polrelid");
        i_polname = PQfnumber(res, "polname");
        i_polcmd = PQfnumber(res, "polcmd");
        i_polpermissive = PQfnumber(res, "polpermissive");
@@ -3627,6 +3607,16 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
 
        for (j = 0; j < ntups; j++)
        {
+           Oid         polrelid = atooid(PQgetvalue(res, j, i_polrelid));
+           TableInfo  *tbinfo = findTableByOid(polrelid);
+
+           /*
+            * Ignore row security on tables not to be dumped.  (This will
+            * result in some harmless wasted slots in polinfo[].)
+            */
+           if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
+               continue;
+
            polinfo[j].dobj.objType = DO_POLICY;
            polinfo[j].dobj.catId.tableoid =
                atooid(PQgetvalue(res, j, i_tableoid));
@@ -3656,8 +3646,10 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
                polinfo[j].polwithcheck
                    = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
        }
-       PQclear(res);
    }
+
+   PQclear(res);
+
    destroyPQExpBuffer(query);
 }