Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Restart the apply worker if the privileges have been revoked.
authorAmit Kapila <akapila@postgresql.org>
Tue, 17 Oct 2023 03:00:05 +0000 (08:30 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 17 Oct 2023 03:11:44 +0000 (08:41 +0530)
Restart the apply worker if the subscription owner's superuser privileges
have been revoked. This is required so that the subscription connection
string gets revalidated and use the password option to connect to the
publisher for non-superusers, if required.

Author: Vignesh C
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/CALDaNm2Dxmhq08nr4P6G+24QvdBo_GAVyZ_Q1TcGYK+8NHs9xw@mail.gmail.com

src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/catalog/pg_subscription.h
src/test/subscription/t/027_nosuperuser.pl

index d07f88ce2807a3bc5e8f08324872e2f58e95475a..d6a978f136275e4e0f79175a9329e75e023bafe6 100644 (file)
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
                                   Anum_pg_subscription_suborigin);
    sub->origin = TextDatumGetCString(datum);
 
+   /* Is the subscription owner a superuser? */
+   sub->ownersuperuser = superuser_arg(sub->owner);
+
    ReleaseSysCache(tup);
 
    return sub;
index 6fe111e98d3afbc71961e522e195d66f8fa44c9f..edc82c11bebb8c56654e8915b238895def6943b5 100644 (file)
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
    load_file("libpqwalreceiver", false);
 
    /* Try to connect to the publisher. */
-   must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+   must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
                            sub->name, &err);
    if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
            load_file("libpqwalreceiver", false);
            /* Check the connection info string. */
            walrcv_check_conninfo(stmt->conninfo,
-                                 sub->passwordrequired && !superuser_arg(sub->owner));
+                                 sub->passwordrequired && !sub->ownersuperuser);
 
            values[Anum_pg_subscription_subconninfo - 1] =
                CStringGetTextDatum(stmt->conninfo);
index e2cee92cf26bfe6a24c0ca127293cf3aead88857..37a0abe2f4dde70997a0c731a0e29101a0527d1b 100644 (file)
@@ -1275,13 +1275,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                       MyLogicalRepWorker->relid,
                                       &relstate_lsn);
+   CommitTransactionCommand();
 
    /* Is the use of a password mandatory? */
    must_use_password = MySubscription->passwordrequired &&
-       !superuser_arg(MySubscription->owner);
-
-   /* Note that the superuser_arg call can access the DB */
-   CommitTransactionCommand();
+       !MySubscription->ownersuperuser;
 
    SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    MyLogicalRepWorker->relstate = relstate;
index 597947410f8ddd16afaad797ecb5480ae06d16f3..54c14495bea5b9367c229241f3a8f4949a957451 100644 (file)
@@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
        apply_worker_exit();
    }
 
+   /*
+    * Exit if the subscription owner's superuser privileges have been
+    * revoked.
+    */
+   if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+   {
+       if (am_parallel_apply_worker())
+           ereport(LOG,
+                   errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
+                          MySubscription->name));
+       else
+           ereport(LOG,
+                   errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
+                          MySubscription->name));
+
+       apply_worker_exit();
+   }
+
    /* Check for other changes that should never happen too. */
    if (newsub->dbid != MySubscription->dbid)
    {
@@ -4492,13 +4510,11 @@ run_apply_worker()
    replorigin_session_setup(originid, 0);
    replorigin_session_origin = originid;
    origin_startpos = replorigin_session_get_progress(false);
+   CommitTransactionCommand();
 
    /* Is the use of a password mandatory? */
    must_use_password = MySubscription->passwordrequired &&
-       !superuser_arg(MySubscription->owner);
-
-   /* Note that the superuser_arg call can access the DB */
-   CommitTransactionCommand();
+       !MySubscription->ownersuperuser;
 
    LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
                                            must_use_password,
@@ -4621,11 +4637,18 @@ InitializeLogRepWorker(void)
    SetConfigOption("synchronous_commit", MySubscription->synccommit,
                    PGC_BACKEND, PGC_S_OVERRIDE);
 
-   /* Keep us informed about subscription changes. */
+   /*
+    * Keep us informed about subscription or role changes. Note that the
+    * role's superuser privilege can be revoked.
+    */
    CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
                                  subscription_change_cb,
                                  (Datum) 0);
 
+   CacheRegisterSyscacheCallback(AUTHOID,
+                                 subscription_change_cb,
+                                 (Datum) 0);
+
    if (am_tablesync_worker())
        ereport(LOG,
                (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
index be36c4a8207f4e395b10f987533b8b6b5accaa1b..e0b91eacd2a66d0bbc9f1f8a58775230d93249b4 100644 (file)
@@ -127,6 +127,7 @@ typedef struct Subscription
                                 * skipped */
    char       *name;           /* Name of the subscription */
    Oid         owner;          /* Oid of the subscription owner */
+   bool        ownersuperuser; /* Is the subscription owner a superuser? */
    bool        enabled;        /* Indicates if the subscription is enabled */
    bool        binary;         /* Indicates if the subscription wants data in
                                 * binary format */
index d7a7e3ef5bbef84f5cc2d6aa91b350f2147ff3ce..642baa5d7c9b8fb012b03bf7965dad07cb4f9251 100644 (file)
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
    "restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+   'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+   'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+   qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
+   $offset);
+
 done_testing();