Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Refactor PgFdwModifyState creation/destruction into separate functions.
authorRobert Haas <rhaas@postgresql.org>
Fri, 6 Apr 2018 15:29:43 +0000 (11:29 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 6 Apr 2018 15:29:43 +0000 (11:29 -0400)
Etsuro Fujita.  The larger patch series of which this is a part has
been reviewed by Amit Langote, David Fetter, Maksim Milyutin,
Álvaro Herrera, Stephen Frost, and me.

Discussion: http://postgr.es/m/5A95487E.9050808@lab.ntt.co.jp

contrib/postgres_fdw/postgres_fdw.c

index a15ce28a48b3b575b9a6166229f83502d12198b1..e7441c759ba0d1a6b4773f9ecff2532d4751626b 100644 (file)
@@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void create_cursor(ForeignScanState *node);
 static void fetch_more_data(ForeignScanState *node);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static PgFdwModifyState *create_foreign_modify(EState *estate,
+                     ResultRelInfo *resultRelInfo,
+                     CmdType operation,
+                     Plan *subplan,
+                     char *query,
+                     List *target_attrs,
+                     bool has_returning,
+                     List *retrieved_attrs);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                         ItemPointer tupleid,
                         TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
                       TupleTableSlot *slot, PGresult *res);
+static void finish_foreign_modify(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
                       List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
                           int eflags)
 {
    PgFdwModifyState *fmstate;
-   EState     *estate = mtstate->ps.state;
-   CmdType     operation = mtstate->operation;
-   Relation    rel = resultRelInfo->ri_RelationDesc;
-   RangeTblEntry *rte;
-   Oid         userid;
-   ForeignTable *table;
-   UserMapping *user;
-   AttrNumber  n_params;
-   Oid         typefnoid;
-   bool        isvarlena;
-   ListCell   *lc;
-   TupleDesc   tupdesc = RelationGetDescr(rel);
+   char       *query;
+   List       *target_attrs;
+   bool        has_returning;
+   List       *retrieved_attrs;
 
    /*
     * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
@@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
    if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
        return;
 
-   /* Begin constructing PgFdwModifyState. */
-   fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
-   fmstate->rel = rel;
-
-   /*
-    * Identify which user to do the remote access as.  This should match what
-    * ExecCheckRTEPerms() does.
-    */
-   rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
-   userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
-
-   /* Get info about foreign table. */
-   table = GetForeignTable(RelationGetRelid(rel));
-   user = GetUserMapping(userid, table->serverid);
-
-   /* Open connection; report that we'll create a prepared statement. */
-   fmstate->conn = GetConnection(user, true);
-   fmstate->p_name = NULL;     /* prepared statement not made yet */
-
    /* Deconstruct fdw_private data. */
-   fmstate->query = strVal(list_nth(fdw_private,
-                                    FdwModifyPrivateUpdateSql));
-   fmstate->target_attrs = (List *) list_nth(fdw_private,
-                                             FdwModifyPrivateTargetAttnums);
-   fmstate->has_returning = intVal(list_nth(fdw_private,
-                                            FdwModifyPrivateHasReturning));
-   fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
-                                                FdwModifyPrivateRetrievedAttrs);
-
-   /* Create context for per-tuple temp workspace. */
-   fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
-                                             "postgres_fdw temporary data",
-                                             ALLOCSET_SMALL_SIZES);
-
-   /* Prepare for input conversion of RETURNING results. */
-   if (fmstate->has_returning)
-       fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-   /* Prepare for output conversion of parameters used in prepared stmt. */
-   n_params = list_length(fmstate->target_attrs) + 1;
-   fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
-   fmstate->p_nums = 0;
-
-   if (operation == CMD_UPDATE || operation == CMD_DELETE)
-   {
-       /* Find the ctid resjunk column in the subplan's result */
-       Plan       *subplan = mtstate->mt_plans[subplan_index]->plan;
-
-       fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
-                                                         "ctid");
-       if (!AttributeNumberIsValid(fmstate->ctidAttno))
-           elog(ERROR, "could not find junk ctid column");
-
-       /* First transmittable parameter will be ctid */
-       getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
-       fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
-       fmstate->p_nums++;
-   }
-
-   if (operation == CMD_INSERT || operation == CMD_UPDATE)
-   {
-       /* Set up for remaining transmittable parameters */
-       foreach(lc, fmstate->target_attrs)
-       {
-           int         attnum = lfirst_int(lc);
-           Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
-
-           Assert(!attr->attisdropped);
-
-           getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
-           fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
-           fmstate->p_nums++;
-       }
-   }
-
-   Assert(fmstate->p_nums <= n_params);
+   query = strVal(list_nth(fdw_private,
+                           FdwModifyPrivateUpdateSql));
+   target_attrs = (List *) list_nth(fdw_private,
+                                    FdwModifyPrivateTargetAttnums);
+   has_returning = intVal(list_nth(fdw_private,
+                                   FdwModifyPrivateHasReturning));
+   retrieved_attrs = (List *) list_nth(fdw_private,
+                                       FdwModifyPrivateRetrievedAttrs);
+
+   /* Construct an execution state. */
+   fmstate = create_foreign_modify(mtstate->ps.state,
+                                   resultRelInfo,
+                                   mtstate->operation,
+                                   mtstate->mt_plans[subplan_index]->plan,
+                                   query,
+                                   target_attrs,
+                                   has_returning,
+                                   retrieved_attrs);
 
    resultRelInfo->ri_FdwState = fmstate;
 }
@@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate,
    if (fmstate == NULL)
        return;
 
-   /* If we created a prepared statement, destroy it */
-   if (fmstate->p_name)
-   {
-       char        sql[64];
-       PGresult   *res;
-
-       snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
-       /*
-        * We don't use a PG_TRY block here, so be careful not to throw error
-        * without releasing the PGresult.
-        */
-       res = pgfdw_exec_query(fmstate->conn, sql);
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-       PQclear(res);
-       fmstate->p_name = NULL;
-   }
-
-   /* Release remote connection */
-   ReleaseConnection(fmstate->conn);
-   fmstate->conn = NULL;
+   /* Destroy the execution state */
+   finish_foreign_modify(fmstate);
 }
 
 /*
@@ -3228,6 +3152,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
    PQclear(res);
 }
 
+/*
+ * create_foreign_modify
+ *     Construct an execution state of a foreign insert/update/delete
+ *     operation
+ */
+static PgFdwModifyState *
+create_foreign_modify(EState *estate,
+                     ResultRelInfo *resultRelInfo,
+                     CmdType operation,
+                     Plan *subplan,
+                     char *query,
+                     List *target_attrs,
+                     bool has_returning,
+                     List *retrieved_attrs)
+{
+   PgFdwModifyState *fmstate;
+   Relation    rel = resultRelInfo->ri_RelationDesc;
+   TupleDesc   tupdesc = RelationGetDescr(rel);
+   RangeTblEntry *rte;
+   Oid         userid;
+   ForeignTable *table;
+   UserMapping *user;
+   AttrNumber  n_params;
+   Oid         typefnoid;
+   bool        isvarlena;
+   ListCell   *lc;
+
+   /* Begin constructing PgFdwModifyState. */
+   fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+   fmstate->rel = rel;
+
+   /*
+    * Identify which user to do the remote access as.  This should match what
+    * ExecCheckRTEPerms() does.
+    */
+   rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+   userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+   /* Get info about foreign table. */
+   table = GetForeignTable(RelationGetRelid(rel));
+   user = GetUserMapping(userid, table->serverid);
+
+   /* Open connection; report that we'll create a prepared statement. */
+   fmstate->conn = GetConnection(user, true);
+   fmstate->p_name = NULL;     /* prepared statement not made yet */
+
+   /* Set up remote query information. */
+   fmstate->query = query;
+   fmstate->target_attrs = target_attrs;
+   fmstate->has_returning = has_returning;
+   fmstate->retrieved_attrs = retrieved_attrs;
+
+   /* Create context for per-tuple temp workspace. */
+   fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+                                             "postgres_fdw temporary data",
+                                             ALLOCSET_SMALL_SIZES);
+
+   /* Prepare for input conversion of RETURNING results. */
+   if (fmstate->has_returning)
+       fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+   /* Prepare for output conversion of parameters used in prepared stmt. */
+   n_params = list_length(fmstate->target_attrs) + 1;
+   fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+   fmstate->p_nums = 0;
+
+   if (operation == CMD_UPDATE || operation == CMD_DELETE)
+   {
+       Assert(subplan != NULL);
+
+       /* Find the ctid resjunk column in the subplan's result */
+       fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+                                                         "ctid");
+       if (!AttributeNumberIsValid(fmstate->ctidAttno))
+           elog(ERROR, "could not find junk ctid column");
+
+       /* First transmittable parameter will be ctid */
+       getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+       fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+       fmstate->p_nums++;
+   }
+
+   if (operation == CMD_INSERT || operation == CMD_UPDATE)
+   {
+       /* Set up for remaining transmittable parameters */
+       foreach(lc, fmstate->target_attrs)
+       {
+           int         attnum = lfirst_int(lc);
+           Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+           Assert(!attr->attisdropped);
+
+           getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+           fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+           fmstate->p_nums++;
+       }
+   }
+
+   Assert(fmstate->p_nums <= n_params);
+
+   return fmstate;
+}
+
 /*
  * prepare_foreign_modify
  *     Establish a prepared statement for execution of INSERT/UPDATE/DELETE
@@ -3370,6 +3397,39 @@ store_returning_result(PgFdwModifyState *fmstate,
    PG_END_TRY();
 }
 
+/*
+ * finish_foreign_modify
+ *     Release resources for a foreign insert/update/delete operation
+ */
+static void
+finish_foreign_modify(PgFdwModifyState *fmstate)
+{
+   Assert(fmstate != NULL);
+
+   /* If we created a prepared statement, destroy it */
+   if (fmstate->p_name)
+   {
+       char        sql[64];
+       PGresult   *res;
+
+       snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+       /*
+        * We don't use a PG_TRY block here, so be careful not to throw error
+        * without releasing the PGresult.
+        */
+       res = pgfdw_exec_query(fmstate->conn, sql);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+           pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+       PQclear(res);
+       fmstate->p_name = NULL;
+   }
+
+   /* Release remote connection */
+   ReleaseConnection(fmstate->conn);
+   fmstate->conn = NULL;
+}
+
 /*
  * build_remote_returning
  *     Build a RETURNING targetlist of a remote query for performing an