Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d6b44da

Browse files
arssherkelvich
authored andcommitted
Tracking on which foreign servers we already started COPY.
If foreign server holds several partitions, COPY FROM to local root partition will try to perform several copies at the same time through one connection, obviously without much success. Now we track that and start/end COPY only once. We also allow to pass destination relation name which may be different from foreing table -- so we can copy into foreign root partition in shardman. This is pretty narrow solution. However, keeping several connections to the same foreign server requires significant changes, especially in 2pc handling, so staying here for now.
1 parent 9b75dc4 commit d6b44da

File tree

6 files changed

+63
-29
lines changed

6 files changed

+63
-29
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
6262
bool invalidated; /* true if reconnect is pending */
6363
uint32 server_hashvalue; /* hash value of foreign server OID */
6464
uint32 mapping_hashvalue; /* hash value of user mapping OID */
65+
bool copy_from_started; /* COPY FROM in progress on this conn */
6566
} ConnCacheEntry;
6667

6768
/*
@@ -114,7 +115,8 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
114115
* (not even on error), we need this flag to cue manual cleanup.
115116
*/
116117
PGconn *
117-
GetConnection(UserMapping *user, bool will_prep_stmt)
118+
GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
119+
bool **copy_from_started)
118120
{
119121
bool found;
120122
ConnCacheEntry *entry;
@@ -200,6 +202,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
200202
entry->have_error = false;
201203
entry->changing_xact_state = false;
202204
entry->invalidated = false;
205+
entry->copy_from_started = false;
203206
entry->server_hashvalue =
204207
GetSysCacheHashValue1(FOREIGNSERVEROID,
205208
ObjectIdGetDatum(server->serverid));
@@ -222,9 +225,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
222225
/* Remember if caller will prepare statements */
223226
entry->have_prep_stmt |= will_prep_stmt;
224227

228+
if (copy_from_started)
229+
*copy_from_started = &(entry->copy_from_started);
225230
return entry->conn;
226231
}
227232

233+
PGconn *
234+
GetConnection(UserMapping *user, bool will_prep_stmt)
235+
{
236+
return GetConnectionCopyFrom(user, will_prep_stmt, NULL);
237+
}
238+
228239
/*
229240
* Connect to remote server using specified server and user mapping properties.
230241
*/

contrib/postgres_fdw/deparse.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3179,10 +3179,14 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
31793179
* Deparse COPY FROM
31803180
*/
31813181
void
3182-
deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate)
3182+
deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
3183+
const char *dest_relname)
31833184
{
31843185
appendStringInfoString(buf, "COPY ");
3185-
deparseRelation(buf, rel);
3186+
if (dest_relname == NULL)
3187+
deparseRelation(buf, rel);
3188+
else
3189+
appendStringInfoString(buf, dest_relname);
31863190
appendStringInfoString(buf, " FROM STDIN WITH (");
31873191

31883192
/* TODO: deparse column names */

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,8 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
354354
RelOptInfo *output_rel);
355355
static void postgresBeginForeignCopyFrom(EState *estate,
356356
ResultRelInfo *rinfo,
357-
CopyState cstate);
357+
CopyState cstate,
358+
const char *dest_relname);
358359
static void postgresForeignNextCopyFrom(EState *estate,
359360
ResultRelInfo *rinfo,
360361
CopyState cstate);
@@ -5225,7 +5226,7 @@ _PG_init(void)
52255226
/* Begin COPY FROM to foreign table */
52265227
static void
52275228
postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
5228-
CopyState cstate)
5229+
CopyState cstate, const char *dest_relname)
52295230
{
52305231
Relation rel = rinfo->ri_RelationDesc;
52315232
RangeTblEntry *rte;
@@ -5235,6 +5236,7 @@ postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
52355236
StringInfoData sql;
52365237
PGconn *conn;
52375238
PGresult *res;
5239+
bool *copy_from_started;
52385240

52395241
/*
52405242
* Identify which user to do the remote access as. This should match what
@@ -5246,30 +5248,38 @@ postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
52465248
/* Get info about foreign table. */
52475249
table = GetForeignTable(RelationGetRelid(rel));
52485250
user = GetUserMapping(userid, table->serverid);
5251+
rinfo->ri_FdwState = user;
52495252

5250-
/* Open connection */
5251-
conn = GetConnection(user, false);
5252-
rinfo->ri_FdwState = conn;
5253+
/* Get (open, if not yet) connection */
5254+
conn = GetConnectionCopyFrom(user, false, &copy_from_started);
5255+
/* We already did COPY FROM to this server */
5256+
if (*copy_from_started)
5257+
return;
52535258

52545259
/* deparse COPY stmt */
52555260
initStringInfo(&sql);
5256-
deparseCopyFromSql(&sql, rel, cstate);
5261+
deparseCopyFromSql(&sql, rel, cstate, dest_relname);
52575262

52585263
res = PQexec(conn, sql.data);
52595264
if (PQresultStatus(res) != PGRES_COPY_IN)
52605265
{
52615266
pgfdw_report_error(ERROR, res, conn, true, sql.data);
52625267
}
52635268
PQclear(res);
5269+
5270+
*copy_from_started = true;
52645271
}
52655272

52665273
/* COPY FROM next row to foreign table */
52675274
static void
52685275
postgresForeignNextCopyFrom(EState *estate, ResultRelInfo *rinfo,
52695276
CopyState cstate)
52705277
{
5271-
PGconn *conn = (PGconn *) rinfo->ri_FdwState;
5278+
bool *copy_from_started;
5279+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5280+
PGconn *conn = GetConnectionCopyFrom(user, false, &copy_from_started);
52725281

5282+
Assert(copy_from_started);
52735283
Assert(!cstate->binary);
52745284
/* TODO: distinuish failure and nonblocking-send EAGAIN */
52755285
if (PQputline(conn, cstate->line_buf.data) || PQputnbytes(conn, "\n", 1))
@@ -5282,19 +5292,24 @@ postgresForeignNextCopyFrom(EState *estate, ResultRelInfo *rinfo,
52825292
static void
52835293
postgresEndForeignCopyFrom(EState *estate, ResultRelInfo *rinfo)
52845294
{
5285-
PGconn *conn = (PGconn *) rinfo->ri_FdwState;
5286-
PGresult *res;
5295+
bool *copy_from_started;
5296+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5297+
PGconn *conn = GetConnectionCopyFrom(user, false, &copy_from_started);
5298+
PGresult *res;
52875299

5288-
/* TODO: PQgetResult? */
5289-
if (PQendcopy(conn))
5300+
if (*copy_from_started)
52905301
{
5291-
pgfdw_report_error(ERROR, NULL, conn, false, "end postgres_fdw copy from");
5292-
}
5293-
while ((res = PQgetResult(conn)) != NULL)
5294-
{
5295-
/* TODO: get error? */
5296-
PQclear(res);
5302+
/* TODO: PQgetResult? */
5303+
if (PQendcopy(conn))
5304+
{
5305+
pgfdw_report_error(ERROR, NULL, conn, false, "end postgres_fdw copy from");
5306+
}
5307+
while ((res = PQgetResult(conn)) != NULL)
5308+
{
5309+
/* TODO: get error? */
5310+
PQclear(res);
5311+
}
5312+
*copy_from_started = false;
5313+
ReleaseConnection(conn);
52975314
}
5298-
5299-
ReleaseConnection(conn);
53005315
}

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ extern void reset_transmission_modes(int nestlevel);
117117

118118
/* in connection.c */
119119
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
120+
extern PGconn *GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
121+
bool **copy_from_started);
120122
extern void ReleaseConnection(PGconn *conn);
121123
extern unsigned int GetCursorNumber(PGconn *conn);
122124
extern unsigned int GetPrepStmtNumber(PGconn *conn);
@@ -178,7 +180,8 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
178180
List *remote_conds, List *pathkeys, bool is_subquery,
179181
List **retrieved_attrs, List **params_list);
180182
extern const char *get_jointype_name(JoinType jointype);
181-
extern void deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate);
183+
extern void deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
184+
const char *dest_relname);
182185

183186
/* in shippable.c */
184187
extern bool is_builtin(Oid objectId);

src/backend/commands/copy.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ static bool CopyGetInt32(CopyState cstate, int32 *val);
181181
static void CopySendInt16(CopyState cstate, int16 val);
182182
static bool CopyGetInt16(CopyState cstate, int16 *val);
183183
static void InitForeignCopyFrom(EState *estate, ResultRelInfo *resultRelInfo,
184-
CopyState cstate);
184+
CopyState cstate, char *dest_relname);
185185

186186

187187
/*
@@ -2354,12 +2354,12 @@ CopyFrom(CopyState cstate)
23542354
/* If some partitions are foreign tables, init copy on remote end */
23552355
for (i = 0; i < num_partitions; i++)
23562356
{
2357-
InitForeignCopyFrom(estate, partitions + i, cstate);
2357+
InitForeignCopyFrom(estate, partitions + i, cstate, NULL);
23582358
}
23592359
}
23602360

23612361
/* If we are copying to foreign table, init it */
2362-
InitForeignCopyFrom(estate, resultRelInfo, cstate);
2362+
InitForeignCopyFrom(estate, resultRelInfo, cstate, NULL);
23632363

23642364
/*
23652365
* It's more efficient to prepare a bunch of tuples for insertion, and
@@ -4727,7 +4727,7 @@ CreateCopyDestReceiver(void)
47274727
}
47284728

47294729
static void InitForeignCopyFrom(EState *estate, ResultRelInfo *resultRelInfo,
4730-
CopyState cstate)
4730+
CopyState cstate, char *dest_relname)
47314731
{
47324732
if (resultRelInfo->ri_FdwRoutine)
47334733
{
@@ -4739,6 +4739,6 @@ static void InitForeignCopyFrom(EState *estate, ResultRelInfo *resultRelInfo,
47394739
errmsg("FDW adapter for relation \"%s\" doesn't support COPY FROM",
47404740
RelationGetRelationName(resultRelInfo->ri_RelationDesc))));
47414741
resultRelInfo->ri_FdwRoutine->
4742-
BeginForeignCopyFrom(estate, resultRelInfo, cstate);
4742+
BeginForeignCopyFrom(estate, resultRelInfo, cstate, dest_relname);
47434743
}
47444744
}

src/include/foreign/fdwapi.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
162162

163163
typedef void (*BeginForeignCopyFrom_function) (EState *estate,
164164
ResultRelInfo *rinfo,
165-
CopyState cstate);
165+
CopyState cstate,
166+
const char *dest_relname);
166167
/*
167168
* Currently we support only text and csv format and pass each row in
168169
* cstate->line_buf. We should also pass binary data and/or deformed tuple.

0 commit comments

Comments
 (0)