Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9b75dc4

Browse files
arssherkelvich
authored andcommitted
1 parent b79402a commit 9b75dc4

File tree

8 files changed

+431
-196
lines changed

8 files changed

+431
-196
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include "access/xact.h"
2020
#include "access/xtm.h"
2121
#include "access/transam.h"
22+
#include "access/xlog.h"
23+
#include "libpq-int.h"
2224
#include "mb/pg_wchar.h"
2325
#include "miscadmin.h"
2426
#include "pgstat.h"
@@ -1231,21 +1233,40 @@ pgfdw_cancel_query(PGconn *conn)
12311233
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
12321234

12331235
/*
1234-
* Issue cancel request. Unfortunately, there's no good way to limit the
1235-
* amount of time that we might block inside PQgetCancel().
1236+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1237+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1238+
* EAGAIN.
12361239
*/
1237-
if ((cancel = PQgetCancel(conn)))
1240+
if (conn->asyncStatus == PGASYNC_COPY_IN)
12381241
{
1239-
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1242+
if (PQputCopyEnd(conn, "postgres_fdw: transaction abort on source node") != 1)
12401243
{
12411244
ereport(WARNING,
12421245
(errcode(ERRCODE_CONNECTION_FAILURE),
1243-
errmsg("could not send cancel request: %s",
1246+
errmsg("could not send abort copy request: %s",
12441247
errbuf)));
1245-
PQfreeCancel(cancel);
12461248
return false;
12471249
}
1248-
PQfreeCancel(cancel);
1250+
}
1251+
else
1252+
{
1253+
/*
1254+
* Issue cancel request. Unfortunately, there's no good way to limit the
1255+
* amount of time that we might block inside PQgetCancel().
1256+
*/
1257+
if ((cancel = PQgetCancel(conn)))
1258+
{
1259+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1260+
{
1261+
ereport(WARNING,
1262+
(errcode(ERRCODE_CONNECTION_FAILURE),
1263+
errmsg("could not send cancel request: %s",
1264+
errbuf)));
1265+
PQfreeCancel(cancel);
1266+
return false;
1267+
}
1268+
PQfreeCancel(cancel);
1269+
}
12491270
}
12501271

12511272
/* Get and discard the result of the query. */

contrib/postgres_fdw/deparse.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include "catalog/pg_proc.h"
4646
#include "catalog/pg_type.h"
4747
#include "commands/defrem.h"
48+
#include "commands/copy.h"
49+
#include "mb/pg_wchar.h"
4850
#include "nodes/makefuncs.h"
4951
#include "nodes/nodeFuncs.h"
5052
#include "nodes/plannodes.h"
@@ -3172,3 +3174,47 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
31723174
/* Shouldn't get here */
31733175
elog(ERROR, "unexpected expression in subquery output");
31743176
}
3177+
3178+
/*
3179+
* Deparse COPY FROM
3180+
*/
3181+
void
3182+
deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate)
3183+
{
3184+
appendStringInfoString(buf, "COPY ");
3185+
deparseRelation(buf, rel);
3186+
appendStringInfoString(buf, " FROM STDIN WITH (");
3187+
3188+
/* TODO: deparse column names */
3189+
if (cstate->binary)
3190+
{
3191+
ereport(ERROR,
3192+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3193+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3194+
RelationGetRelationName(rel))));
3195+
}
3196+
if (cstate->csv_mode)
3197+
{
3198+
appendStringInfoString(buf, " FORMAT csv ");
3199+
appendStringInfo(buf, ", QUOTE '%c'", *(cstate->quote));
3200+
appendStringInfo(buf, ", ESCAPE '%c'", *(cstate->escape));
3201+
/* TODO: force quote, force not null, force null */
3202+
}
3203+
else
3204+
{
3205+
appendStringInfoString(buf, " FORMAT text ");
3206+
}
3207+
3208+
appendStringInfo(buf, ", OIDS %d", cstate->oids);
3209+
appendStringInfo(buf, ", FREEZE %d", cstate->freeze);
3210+
appendStringInfo(buf, ", DELIMITER '%c'", *(cstate->delim));
3211+
appendStringInfo(buf, ", NULL %s", quote_literal_cstr(cstate->null_print));
3212+
/*
3213+
* cstate->line_buf is passed to us already converted to this server
3214+
* encoding.
3215+
*/
3216+
appendStringInfo(buf, ", ENCODING %s",
3217+
quote_literal_cstr(
3218+
pg_encoding_to_char(GetDatabaseEncoding())));
3219+
appendStringInfoChar(buf, ')');
3220+
}

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,14 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
352352
UpperRelationKind stage,
353353
RelOptInfo *input_rel,
354354
RelOptInfo *output_rel);
355+
static void postgresBeginForeignCopyFrom(EState *estate,
356+
ResultRelInfo *rinfo,
357+
CopyState cstate);
358+
static void postgresForeignNextCopyFrom(EState *estate,
359+
ResultRelInfo *rinfo,
360+
CopyState cstate);
361+
static void postgresEndForeignCopyFrom(EState *estate,
362+
ResultRelInfo *rinfo);
355363

356364
/*
357365
* Helper functions
@@ -476,6 +484,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
476484
/* Support functions for upper relation push-down */
477485
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
478486

487+
/* Functions for COPY FROM */
488+
routine->BeginForeignCopyFrom = postgresBeginForeignCopyFrom;
489+
routine->ForeignNextCopyFrom = postgresForeignNextCopyFrom;
490+
routine->EndForeignCopyFrom = postgresEndForeignCopyFrom;
491+
479492
PG_RETURN_POINTER(routine);
480493
}
481494

@@ -5208,3 +5221,80 @@ _PG_init(void)
52085221
&UseTsDtmTransactions, false, PGC_USERSET, 0, NULL,
52095222
NULL, NULL);
52105223
}
5224+
5225+
/* Begin COPY FROM to foreign table */
5226+
static void
5227+
postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
5228+
CopyState cstate)
5229+
{
5230+
Relation rel = rinfo->ri_RelationDesc;
5231+
RangeTblEntry *rte;
5232+
Oid userid;
5233+
ForeignTable *table;
5234+
UserMapping *user;
5235+
StringInfoData sql;
5236+
PGconn *conn;
5237+
PGresult *res;
5238+
5239+
/*
5240+
* Identify which user to do the remote access as. This should match what
5241+
* ExecCheckRTEPerms() does.
5242+
*/
5243+
rte = rt_fetch(rinfo->ri_RangeTableIndex, estate->es_range_table);
5244+
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
5245+
5246+
/* Get info about foreign table. */
5247+
table = GetForeignTable(RelationGetRelid(rel));
5248+
user = GetUserMapping(userid, table->serverid);
5249+
5250+
/* Open connection */
5251+
conn = GetConnection(user, false);
5252+
rinfo->ri_FdwState = conn;
5253+
5254+
/* deparse COPY stmt */
5255+
initStringInfo(&sql);
5256+
deparseCopyFromSql(&sql, rel, cstate);
5257+
5258+
res = PQexec(conn, sql.data);
5259+
if (PQresultStatus(res) != PGRES_COPY_IN)
5260+
{
5261+
pgfdw_report_error(ERROR, res, conn, true, sql.data);
5262+
}
5263+
PQclear(res);
5264+
}
5265+
5266+
/* COPY FROM next row to foreign table */
5267+
static void
5268+
postgresForeignNextCopyFrom(EState *estate, ResultRelInfo *rinfo,
5269+
CopyState cstate)
5270+
{
5271+
PGconn *conn = (PGconn *) rinfo->ri_FdwState;
5272+
5273+
Assert(!cstate->binary);
5274+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5275+
if (PQputline(conn, cstate->line_buf.data) || PQputnbytes(conn, "\n", 1))
5276+
{
5277+
pgfdw_report_error(ERROR, NULL, conn, false, cstate->line_buf.data);
5278+
}
5279+
}
5280+
5281+
/* Finish COPY FROM */
5282+
static void
5283+
postgresEndForeignCopyFrom(EState *estate, ResultRelInfo *rinfo)
5284+
{
5285+
PGconn *conn = (PGconn *) rinfo->ri_FdwState;
5286+
PGresult *res;
5287+
5288+
/* TODO: PQgetResult? */
5289+
if (PQendcopy(conn))
5290+
{
5291+
pgfdw_report_error(ERROR, NULL, conn, false, "end postgres_fdw copy from");
5292+
}
5293+
while ((res = PQgetResult(conn)) != NULL)
5294+
{
5295+
/* TODO: get error? */
5296+
PQclear(res);
5297+
}
5298+
5299+
ReleaseConnection(conn);
5300+
}

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#define POSTGRES_FDW_H
1515

1616
#include "foreign/foreign.h"
17+
#include "commands/copy.h"
1718
#include "lib/stringinfo.h"
1819
#include "nodes/relation.h"
1920
#include "utils/relcache.h"
@@ -177,6 +178,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
177178
List *remote_conds, List *pathkeys, bool is_subquery,
178179
List **retrieved_attrs, List **params_list);
179180
extern const char *get_jointype_name(JoinType jointype);
181+
extern void deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate);
180182

181183
/* in shippable.c */
182184
extern bool is_builtin(Oid objectId);

0 commit comments

Comments
 (0)