Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit af234df

Browse files
committed
COPY FROM to postgres_fdw implementation for pg_shardman.
This is the commit message #2: Tracking on which foreign servers we already started COPY. If foreign server holds several partitions, COPY FROM to local root partition will try to perform several copies at the same time through one connection, obviously without much success. Now we track that and start/end COPY only once. We also allow to pass destination relation name which may be different from foreing table -- so we can copy into foreign root partition in shardman. This is pretty narrow solution. However, keeping several connections to the same foreign server requires significant changes, especially in 2pc handling, so staying here for now. This is the commit message #3: Allow COPY FROM to par8d table even if some FDW parts can't do that. This behaviour was broken in patches allowing COPY FROM to FDW tables. This is the commit message #4: COPY FROM deparse more complete, PG_SHARDMAN macro. Now column names, FORCE NULL and FORCE NOT NULL are deparsed too. PG_SHARDMAN macro ensures this PG contains patches for Postgres. This is the commit message #5: Disable COPY FROM to foreign parts, because no generic impl exists. This is the commit message #6: Fix COPY FROM deparse, forgotten comma for FORCE_NULL etc.
1 parent b79402a commit af234df

File tree

10 files changed

+530
-190
lines changed

10 files changed

+530
-190
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include "access/xact.h"
2020
#include "access/xtm.h"
2121
#include "access/transam.h"
22+
#include "access/xlog.h"
23+
#include "libpq-int.h"
2224
#include "mb/pg_wchar.h"
2325
#include "miscadmin.h"
2426
#include "pgstat.h"
@@ -60,6 +62,7 @@ typedef struct ConnCacheEntry
6062
bool invalidated; /* true if reconnect is pending */
6163
uint32 server_hashvalue; /* hash value of foreign server OID */
6264
uint32 mapping_hashvalue; /* hash value of user mapping OID */
65+
bool copy_from_started; /* COPY FROM in progress on this conn */
6366
} ConnCacheEntry;
6467

6568
/*
@@ -112,7 +115,8 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
112115
* (not even on error), we need this flag to cue manual cleanup.
113116
*/
114117
PGconn *
115-
GetConnection(UserMapping *user, bool will_prep_stmt)
118+
GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
119+
bool **copy_from_started)
116120
{
117121
bool found;
118122
ConnCacheEntry *entry;
@@ -198,6 +202,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
198202
entry->have_error = false;
199203
entry->changing_xact_state = false;
200204
entry->invalidated = false;
205+
entry->copy_from_started = false;
201206
entry->server_hashvalue =
202207
GetSysCacheHashValue1(FOREIGNSERVEROID,
203208
ObjectIdGetDatum(server->serverid));
@@ -220,9 +225,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
220225
/* Remember if caller will prepare statements */
221226
entry->have_prep_stmt |= will_prep_stmt;
222227

228+
if (copy_from_started)
229+
*copy_from_started = &(entry->copy_from_started);
223230
return entry->conn;
224231
}
225232

233+
PGconn *
234+
GetConnection(UserMapping *user, bool will_prep_stmt)
235+
{
236+
return GetConnectionCopyFrom(user, will_prep_stmt, NULL);
237+
}
238+
226239
/*
227240
* Connect to remote server using specified server and user mapping properties.
228241
*/
@@ -1231,21 +1244,40 @@ pgfdw_cancel_query(PGconn *conn)
12311244
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
12321245

12331246
/*
1234-
* Issue cancel request. Unfortunately, there's no good way to limit the
1235-
* amount of time that we might block inside PQgetCancel().
1247+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1248+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1249+
* EAGAIN.
12361250
*/
1237-
if ((cancel = PQgetCancel(conn)))
1251+
if (conn->asyncStatus == PGASYNC_COPY_IN)
12381252
{
1239-
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1253+
if (PQputCopyEnd(conn, "postgres_fdw: transaction abort on source node") != 1)
12401254
{
12411255
ereport(WARNING,
12421256
(errcode(ERRCODE_CONNECTION_FAILURE),
1243-
errmsg("could not send cancel request: %s",
1257+
errmsg("could not send abort copy request: %s",
12441258
errbuf)));
1245-
PQfreeCancel(cancel);
12461259
return false;
12471260
}
1248-
PQfreeCancel(cancel);
1261+
}
1262+
else
1263+
{
1264+
/*
1265+
* Issue cancel request. Unfortunately, there's no good way to limit the
1266+
* amount of time that we might block inside PQgetCancel().
1267+
*/
1268+
if ((cancel = PQgetCancel(conn)))
1269+
{
1270+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1271+
{
1272+
ereport(WARNING,
1273+
(errcode(ERRCODE_CONNECTION_FAILURE),
1274+
errmsg("could not send cancel request: %s",
1275+
errbuf)));
1276+
PQfreeCancel(cancel);
1277+
return false;
1278+
}
1279+
PQfreeCancel(cancel);
1280+
}
12491281
}
12501282

12511283
/* Get and discard the result of the query. */

contrib/postgres_fdw/deparse.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include "catalog/pg_proc.h"
4646
#include "catalog/pg_type.h"
4747
#include "commands/defrem.h"
48+
#include "commands/copy.h"
49+
#include "mb/pg_wchar.h"
4850
#include "nodes/makefuncs.h"
4951
#include "nodes/nodeFuncs.h"
5052
#include "nodes/plannodes.h"
@@ -3172,3 +3174,107 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
31723174
/* Shouldn't get here */
31733175
elog(ERROR, "unexpected expression in subquery output");
31743176
}
3177+
3178+
/*
3179+
* Deparse COPY FROM into given buf.
3180+
*/
3181+
void
3182+
deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
3183+
const char *dest_relname)
3184+
{
3185+
ListCell *cur;
3186+
3187+
appendStringInfoString(buf, "COPY ");
3188+
if (dest_relname == NULL)
3189+
deparseRelation(buf, rel);
3190+
else
3191+
appendStringInfoString(buf, dest_relname);
3192+
3193+
if (cstate->binary)
3194+
{
3195+
ereport(ERROR,
3196+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3197+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3198+
RelationGetRelationName(rel))));
3199+
}
3200+
3201+
/* deparse column names */
3202+
if (cstate->attnumlist != NIL)
3203+
{
3204+
bool first = true;
3205+
3206+
appendStringInfoString(buf, " (");
3207+
foreach(cur, cstate->attnumlist)
3208+
{
3209+
int attnum = lfirst_int(cur);
3210+
char *attname;
3211+
3212+
if (!first)
3213+
appendStringInfoString(buf, ", ");
3214+
first = false;
3215+
3216+
attname = get_relid_attribute_name(rel->rd_id, attnum);
3217+
appendStringInfoString(buf, quote_identifier(attname));
3218+
}
3219+
appendStringInfoString(buf, " )");
3220+
}
3221+
3222+
appendStringInfoString(buf, " FROM STDIN WITH (");
3223+
if (cstate->csv_mode)
3224+
{
3225+
appendStringInfoString(buf, " FORMAT csv ");
3226+
appendStringInfo(buf, ", QUOTE '%c'", *(cstate->quote));
3227+
appendStringInfo(buf, ", ESCAPE '%c'", *(cstate->escape));
3228+
if (cstate->force_notnull != NIL)
3229+
{
3230+
bool first = true;
3231+
3232+
appendStringInfoString(buf, ", FORCE_NOT_NULL (");
3233+
foreach(cur, cstate->force_notnull)
3234+
{
3235+
char *attname = strVal(lfirst(cur));
3236+
3237+
if (!first)
3238+
appendStringInfoString(buf, ", ");
3239+
first = false;
3240+
3241+
appendStringInfoString(buf, quote_identifier(attname));
3242+
}
3243+
appendStringInfoString(buf, " )");
3244+
}
3245+
if (cstate->force_null != NIL)
3246+
{
3247+
bool first = true;
3248+
3249+
appendStringInfoString(buf, ", FORCE_NULL (");
3250+
foreach(cur, cstate->force_null)
3251+
{
3252+
char *attname = strVal(lfirst(cur));
3253+
3254+
if (!first)
3255+
appendStringInfoString(buf, ", ");
3256+
first = false;
3257+
3258+
appendStringInfoString(buf, quote_identifier(attname));
3259+
}
3260+
appendStringInfoString(buf, " )");
3261+
}
3262+
}
3263+
else
3264+
{
3265+
appendStringInfoString(buf, " FORMAT text ");
3266+
}
3267+
3268+
appendStringInfo(buf, ", OIDS %d", cstate->oids);
3269+
appendStringInfo(buf, ", FREEZE %d", cstate->freeze);
3270+
appendStringInfo(buf, ", DELIMITER '%c'", *(cstate->delim));
3271+
appendStringInfo(buf, ", NULL %s", quote_literal_cstr(cstate->null_print));
3272+
/*
3273+
* cstate->line_buf is passed to us already converted to this server
3274+
* encoding.
3275+
*/
3276+
appendStringInfo(buf, ", ENCODING %s",
3277+
quote_literal_cstr(
3278+
pg_encoding_to_char(GetDatabaseEncoding())));
3279+
appendStringInfoChar(buf, ')');
3280+
}

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,15 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
352352
UpperRelationKind stage,
353353
RelOptInfo *input_rel,
354354
RelOptInfo *output_rel);
355+
static void postgresBeginForeignCopyFrom(EState *estate,
356+
ResultRelInfo *rinfo,
357+
CopyState cstate,
358+
ResultRelInfo *parent_rinfo);
359+
static void postgresForeignNextCopyFrom(EState *estate,
360+
ResultRelInfo *rinfo,
361+
CopyState cstate);
362+
static void postgresEndForeignCopyFrom(EState *estate,
363+
ResultRelInfo *rinfo);
355364

356365
/*
357366
* Helper functions
@@ -476,6 +485,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
476485
/* Support functions for upper relation push-down */
477486
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
478487

488+
/* Functions for COPY FROM */
489+
routine->BeginForeignCopyFrom = postgresBeginForeignCopyFrom;
490+
routine->ForeignNextCopyFrom = postgresForeignNextCopyFrom;
491+
routine->EndForeignCopyFrom = postgresEndForeignCopyFrom;
492+
479493
PG_RETURN_POINTER(routine);
480494
}
481495

@@ -5200,6 +5214,110 @@ postgres_fdw_exec(PG_FUNCTION_ARGS)
52005214
PG_RETURN_VOID();
52015215
}
52025216

5217+
/*
5218+
* Begin COPY FROM to foreign table. Currently we do it in a bit perverted
5219+
* way: we redirect COPY FROM to parent table on foreign server, assuming it
5220+
* exists in public schema (as in shardman), and let it direct tuples to
5221+
* proper partitions. Otherwise we would have to modify logic of managing
5222+
* connections and keep many connections open to one server from one backend.
5223+
* This probably should not be used outside pg_shardman.
5224+
*/
5225+
static void
5226+
postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
5227+
CopyState cstate, ResultRelInfo *parent_rinfo)
5228+
{
5229+
Relation rel = rinfo->ri_RelationDesc;
5230+
RangeTblEntry *rte;
5231+
Oid userid;
5232+
ForeignTable *table;
5233+
UserMapping *user;
5234+
StringInfoData sql;
5235+
PGconn *conn;
5236+
PGresult *res;
5237+
bool *copy_from_started;
5238+
char *dest_relname;
5239+
5240+
/*
5241+
* Identify which user to do the remote access as. This should match what
5242+
* ExecCheckRTEPerms() does.
5243+
*/
5244+
rte = rt_fetch(rinfo->ri_RangeTableIndex, estate->es_range_table);
5245+
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
5246+
5247+
/* Get info about foreign table. */
5248+
table = GetForeignTable(RelationGetRelid(rel));
5249+
user = GetUserMapping(userid, table->serverid);
5250+
rinfo->ri_FdwState = user;
5251+
5252+
/* Get (open, if not yet) connection */
5253+
conn = GetConnectionCopyFrom(user, false, &copy_from_started);
5254+
/* We already did COPY FROM to this server */
5255+
if (*copy_from_started)
5256+
return;
5257+
5258+
/* deparse COPY stmt */
5259+
dest_relname = psprintf(
5260+
"public.%s", quote_identifier(RelationGetRelationName(
5261+
parent_rinfo == NULL ?
5262+
rinfo->ri_RelationDesc :
5263+
parent_rinfo->ri_RelationDesc)));
5264+
initStringInfo(&sql);
5265+
deparseCopyFromSql(&sql, rel, cstate, dest_relname);
5266+
5267+
res = PQexec(conn, sql.data);
5268+
if (PQresultStatus(res) != PGRES_COPY_IN)
5269+
{
5270+
pgfdw_report_error(ERROR, res, conn, true, sql.data);
5271+
}
5272+
PQclear(res);
5273+
5274+
*copy_from_started = true;
5275+
}
5276+
5277+
/* COPY FROM next row to foreign table */
5278+
static void
5279+
postgresForeignNextCopyFrom(EState *estate, ResultRelInfo *rinfo,
5280+
CopyState cstate)
5281+
{
5282+
bool *copy_from_started;
5283+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5284+
PGconn *conn = GetConnectionCopyFrom(user, false, &copy_from_started);
5285+
5286+
Assert(copy_from_started);
5287+
Assert(!cstate->binary);
5288+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5289+
if (PQputline(conn, cstate->line_buf.data) || PQputnbytes(conn, "\n", 1))
5290+
{
5291+
pgfdw_report_error(ERROR, NULL, conn, false, cstate->line_buf.data);
5292+
}
5293+
}
5294+
5295+
/* Finish COPY FROM */
5296+
static void
5297+
postgresEndForeignCopyFrom(EState *estate, ResultRelInfo *rinfo)
5298+
{
5299+
bool *copy_from_started;
5300+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5301+
PGconn *conn = GetConnectionCopyFrom(user, false, &copy_from_started);
5302+
PGresult *res;
5303+
5304+
if (*copy_from_started)
5305+
{
5306+
/* TODO: PQgetResult? */
5307+
if (PQendcopy(conn))
5308+
{
5309+
pgfdw_report_error(ERROR, NULL, conn, false, "end postgres_fdw copy from");
5310+
}
5311+
while ((res = PQgetResult(conn)) != NULL)
5312+
{
5313+
/* TODO: get error? */
5314+
PQclear(res);
5315+
}
5316+
*copy_from_started = false;
5317+
ReleaseConnection(conn);
5318+
}
5319+
}
5320+
52035321
void
52045322
_PG_init(void)
52055323
{

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#define POSTGRES_FDW_H
1515

1616
#include "foreign/foreign.h"
17+
#include "commands/copy.h"
1718
#include "lib/stringinfo.h"
1819
#include "nodes/relation.h"
1920
#include "utils/relcache.h"
@@ -116,6 +117,8 @@ extern void reset_transmission_modes(int nestlevel);
116117

117118
/* in connection.c */
118119
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
120+
extern PGconn *GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
121+
bool **copy_from_started);
119122
extern void ReleaseConnection(PGconn *conn);
120123
extern unsigned int GetCursorNumber(PGconn *conn);
121124
extern unsigned int GetPrepStmtNumber(PGconn *conn);
@@ -177,6 +180,8 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
177180
List *remote_conds, List *pathkeys, bool is_subquery,
178181
List **retrieved_attrs, List **params_list);
179182
extern const char *get_jointype_name(JoinType jointype);
183+
extern void deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
184+
const char *dest_relname);
180185

181186
/* in shippable.c */
182187
extern bool is_builtin(Oid objectId);

0 commit comments

Comments
 (0)