Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0d6d2d8

Browse files
committed
COPY FROM to postgres_fdw implementation for pg_shardman
Cherry-picked from af234df. This is the commit message #2: Tracking on which foreign servers we already started COPY. If foreign server holds several partitions, COPY FROM to local root partition will try to perform several copies at the same time through one connection, obviously without much success. Now we track that and start/end COPY only once. We also allow to pass destination relation name which may be different from foreing table -- so we can copy into foreign root partition in shardman. This is pretty narrow solution. However, keeping several connections to the same foreign server requires significant changes, especially in 2pc handling, so staying here for now. This is the commit message #3: Allow COPY FROM to par8d table even if some FDW parts can't do that. This behaviour was broken in patches allowing COPY FROM to FDW tables. This is the commit message #4: COPY FROM deparse more complete, PG_SHARDMAN macro. Now column names, FORCE NULL and FORCE NOT NULL are deparsed too. PG_SHARDMAN macro ensures this PG contains patches for Postgres. This is the commit message #5: Disable COPY FROM to foreign parts, because no generic impl exists. This is the commit message #6: Fix COPY FROM deparse, forgotten comma for FORCE_NULL etc.
1 parent 21f20a4 commit 0d6d2d8

File tree

10 files changed

+499
-167
lines changed

10 files changed

+499
-167
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include "access/htup_details.h"
1919
#include "catalog/pg_user_mapping.h"
2020
#include "access/xact.h"
21+
#include "access/transam.h"
2122
#include "access/xlog.h" /* GetSystemIdentifier() */
23+
#include "libpq-int.h"
2224
#include "mb/pg_wchar.h"
2325
#include "miscadmin.h"
2426
#include "pgstat.h"
@@ -63,7 +65,8 @@ struct ConnCacheEntry
6365
bool invalidated; /* true if reconnect is pending */
6466
uint32 server_hashvalue; /* hash value of foreign server OID */
6567
uint32 mapping_hashvalue; /* hash value of user mapping OID */
66-
};
68+
bool copy_from_started; /* COPY FROM in progress on this conn */
69+
} ;
6770

6871
/*
6972
* Connection cache (initialized on first use)
@@ -129,7 +132,8 @@ static bool pgfdw_get_cleanup_result(ConnCacheEntry *entry, TimestampTz endtime,
129132
* (not even on error), we need this flag to cue manual cleanup.
130133
*/
131134
ConnCacheEntry *
132-
GetConnection(UserMapping *user, bool will_prep_stmt)
135+
GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
136+
bool **copy_from_started)
133137
{
134138
bool found;
135139
ConnCacheEntry *entry;
@@ -224,6 +228,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
224228
entry->have_error = false;
225229
entry->changing_xact_state = false;
226230
entry->invalidated = false;
231+
entry->copy_from_started = false;
227232
entry->server_hashvalue =
228233
GetSysCacheHashValue1(FOREIGNSERVEROID,
229234
ObjectIdGetDatum(server->serverid));
@@ -246,6 +251,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
246251
/* Remember if caller will prepare statements */
247252
entry->have_prep_stmt |= will_prep_stmt;
248253

254+
if (copy_from_started)
255+
*copy_from_started = &(entry->copy_from_started);
256+
249257
return entry;
250258
}
251259

@@ -255,6 +263,12 @@ ConnectionEntryGetConn(ConnCacheEntry *entry)
255263
return entry->conn;
256264
}
257265

266+
ConnCacheEntry *
267+
GetConnection(UserMapping *user, bool will_prep_stmt)
268+
{
269+
return GetConnectionCopyFrom(user, will_prep_stmt, NULL);
270+
}
271+
258272
/*
259273
* Connect to remote server using specified server and user mapping properties.
260274
*/
@@ -1292,21 +1306,40 @@ pgfdw_cancel_query(ConnCacheEntry *entry)
12921306
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
12931307

12941308
/*
1295-
* Issue cancel request. Unfortunately, there's no good way to limit the
1296-
* amount of time that we might block inside PQgetCancel().
1309+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1310+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1311+
* EAGAIN.
12971312
*/
1298-
if ((cancel = PQgetCancel(conn)))
1313+
if (conn->asyncStatus == PGASYNC_COPY_IN)
12991314
{
1300-
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1315+
if (PQputCopyEnd(conn, "postgres_fdw: transaction abort on source node") != 1)
13011316
{
13021317
ereport(WARNING,
13031318
(errcode(ERRCODE_CONNECTION_FAILURE),
1304-
errmsg("could not send cancel request: %s",
1319+
errmsg("could not send abort copy request: %s",
13051320
errbuf)));
1306-
PQfreeCancel(cancel);
13071321
return false;
13081322
}
1309-
PQfreeCancel(cancel);
1323+
}
1324+
else
1325+
{
1326+
/*
1327+
* Issue cancel request. Unfortunately, there's no good way to limit the
1328+
* amount of time that we might block inside PQgetCancel().
1329+
*/
1330+
if ((cancel = PQgetCancel(conn)))
1331+
{
1332+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1333+
{
1334+
ereport(WARNING,
1335+
(errcode(ERRCODE_CONNECTION_FAILURE),
1336+
errmsg("could not send cancel request: %s",
1337+
errbuf)));
1338+
PQfreeCancel(cancel);
1339+
return false;
1340+
}
1341+
PQfreeCancel(cancel);
1342+
}
13101343
}
13111344

13121345
/* Get and discard the result of the query. */

contrib/postgres_fdw/deparse.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include "catalog/pg_proc.h"
4646
#include "catalog/pg_type.h"
4747
#include "commands/defrem.h"
48+
#include "commands/copy.h"
49+
#include "mb/pg_wchar.h"
4850
#include "nodes/makefuncs.h"
4951
#include "nodes/nodeFuncs.h"
5052
#include "nodes/plannodes.h"
@@ -3323,3 +3325,107 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
33233325
/* Shouldn't get here */
33243326
elog(ERROR, "unexpected expression in subquery output");
33253327
}
3328+
3329+
/*
3330+
* Deparse COPY FROM into given buf.
3331+
*/
3332+
void
3333+
deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
3334+
const char *dest_relname)
3335+
{
3336+
ListCell *cur;
3337+
3338+
appendStringInfoString(buf, "COPY ");
3339+
if (dest_relname == NULL)
3340+
deparseRelation(buf, rel);
3341+
else
3342+
appendStringInfoString(buf, dest_relname);
3343+
3344+
if (cstate->binary)
3345+
{
3346+
ereport(ERROR,
3347+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3348+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3349+
RelationGetRelationName(rel))));
3350+
}
3351+
3352+
/* deparse column names */
3353+
if (cstate->attnumlist != NIL)
3354+
{
3355+
bool first = true;
3356+
3357+
appendStringInfoString(buf, " (");
3358+
foreach(cur, cstate->attnumlist)
3359+
{
3360+
int attnum = lfirst_int(cur);
3361+
char *attname;
3362+
3363+
if (!first)
3364+
appendStringInfoString(buf, ", ");
3365+
first = false;
3366+
3367+
attname = get_attname(rel->rd_id, attnum, false);
3368+
appendStringInfoString(buf, quote_identifier(attname));
3369+
}
3370+
appendStringInfoString(buf, " )");
3371+
}
3372+
3373+
appendStringInfoString(buf, " FROM STDIN WITH (");
3374+
if (cstate->csv_mode)
3375+
{
3376+
appendStringInfoString(buf, " FORMAT csv ");
3377+
appendStringInfo(buf, ", QUOTE '%c'", *(cstate->quote));
3378+
appendStringInfo(buf, ", ESCAPE '%c'", *(cstate->escape));
3379+
if (cstate->force_notnull != NIL)
3380+
{
3381+
bool first = true;
3382+
3383+
appendStringInfoString(buf, ", FORCE_NOT_NULL (");
3384+
foreach(cur, cstate->force_notnull)
3385+
{
3386+
char *attname = strVal(lfirst(cur));
3387+
3388+
if (!first)
3389+
appendStringInfoString(buf, ", ");
3390+
first = false;
3391+
3392+
appendStringInfoString(buf, quote_identifier(attname));
3393+
}
3394+
appendStringInfoString(buf, " )");
3395+
}
3396+
if (cstate->force_null != NIL)
3397+
{
3398+
bool first = true;
3399+
3400+
appendStringInfoString(buf, ", FORCE_NULL (");
3401+
foreach(cur, cstate->force_null)
3402+
{
3403+
char *attname = strVal(lfirst(cur));
3404+
3405+
if (!first)
3406+
appendStringInfoString(buf, ", ");
3407+
first = false;
3408+
3409+
appendStringInfoString(buf, quote_identifier(attname));
3410+
}
3411+
appendStringInfoString(buf, " )");
3412+
}
3413+
}
3414+
else
3415+
{
3416+
appendStringInfoString(buf, " FORMAT text ");
3417+
}
3418+
3419+
appendStringInfo(buf, ", OIDS %d", cstate->oids);
3420+
appendStringInfo(buf, ", FREEZE %d", cstate->freeze);
3421+
appendStringInfo(buf, ", DELIMITER '%c'", *(cstate->delim));
3422+
appendStringInfo(buf, ", NULL %s", quote_literal_cstr(cstate->null_print));
3423+
/*
3424+
* cstate->line_buf is passed to us already converted to this server
3425+
* encoding.
3426+
*/
3427+
appendStringInfo(buf, ", ENCODING %s",
3428+
quote_literal_cstr(
3429+
pg_encoding_to_char(GetDatabaseEncoding())));
3430+
appendStringInfoChar(buf, ')');
3431+
}

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,15 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
362362
RelOptInfo *input_rel,
363363
RelOptInfo *output_rel,
364364
void *extra);
365+
static void postgresBeginForeignCopyFrom(EState *estate,
366+
ResultRelInfo *rinfo,
367+
CopyState cstate,
368+
ResultRelInfo *parent_rinfo);
369+
static void postgresForeignNextCopyFrom(EState *estate,
370+
ResultRelInfo *rinfo,
371+
CopyState cstate);
372+
static void postgresEndForeignCopyFrom(EState *estate,
373+
ResultRelInfo *rinfo);
365374

366375
/*
367376
* Helper functions
@@ -509,6 +518,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
509518
/* Support functions for upper relation push-down */
510519
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
511520

521+
/* Functions for COPY FROM */
522+
routine->BeginForeignCopyFrom = postgresBeginForeignCopyFrom;
523+
routine->ForeignNextCopyFrom = postgresForeignNextCopyFrom;
524+
routine->EndForeignCopyFrom = postgresEndForeignCopyFrom;
525+
512526
PG_RETURN_POINTER(routine);
513527
}
514528

@@ -5821,6 +5835,114 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
58215835
return NULL;
58225836
}
58235837

5838+
/*
5839+
* Begin COPY FROM to foreign table. Currently we do it in a bit perverted
5840+
* way: we redirect COPY FROM to parent table on foreign server, assuming it
5841+
* exists in public schema (as in shardman), and let it direct tuples to
5842+
* proper partitions. Otherwise we would have to modify logic of managing
5843+
* connections and keep many connections open to one server from one backend.
5844+
* This probably should not be used outside pg_shardman.
5845+
*/
5846+
static void
5847+
postgresBeginForeignCopyFrom(EState *estate, ResultRelInfo *rinfo,
5848+
CopyState cstate, ResultRelInfo *parent_rinfo)
5849+
{
5850+
Relation rel = rinfo->ri_RelationDesc;
5851+
RangeTblEntry *rte;
5852+
Oid userid;
5853+
ForeignTable *table;
5854+
UserMapping *user;
5855+
StringInfoData sql;
5856+
PGconn *conn;
5857+
PGresult *res;
5858+
bool *copy_from_started;
5859+
char *dest_relname;
5860+
5861+
/*
5862+
* Identify which user to do the remote access as. This should match what
5863+
* ExecCheckRTEPerms() does.
5864+
*/
5865+
rte = rt_fetch(rinfo->ri_RangeTableIndex, estate->es_range_table);
5866+
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
5867+
5868+
/* Get info about foreign table. */
5869+
table = GetForeignTable(RelationGetRelid(rel));
5870+
user = GetUserMapping(userid, table->serverid);
5871+
rinfo->ri_FdwState = user;
5872+
5873+
/* Get (open, if not yet) connection */
5874+
conn = ConnectionEntryGetConn(
5875+
GetConnectionCopyFrom(user, false, &copy_from_started));
5876+
/* We already did COPY FROM to this server */
5877+
if (*copy_from_started)
5878+
return;
5879+
5880+
/* deparse COPY stmt */
5881+
dest_relname = psprintf(
5882+
"public.%s", quote_identifier(RelationGetRelationName(
5883+
parent_rinfo == NULL ?
5884+
rinfo->ri_RelationDesc :
5885+
parent_rinfo->ri_RelationDesc)));
5886+
initStringInfo(&sql);
5887+
deparseCopyFromSql(&sql, rel, cstate, dest_relname);
5888+
5889+
res = PQexec(conn, sql.data);
5890+
if (PQresultStatus(res) != PGRES_COPY_IN)
5891+
{
5892+
pgfdw_report_error(ERROR, res, conn, true, sql.data);
5893+
}
5894+
PQclear(res);
5895+
5896+
*copy_from_started = true;
5897+
}
5898+
5899+
/* COPY FROM next row to foreign table */
5900+
static void
5901+
postgresForeignNextCopyFrom(EState *estate, ResultRelInfo *rinfo,
5902+
CopyState cstate)
5903+
{
5904+
bool *copy_from_started;
5905+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5906+
PGconn *conn = ConnectionEntryGetConn(
5907+
GetConnectionCopyFrom(user, false, &copy_from_started));
5908+
5909+
Assert(copy_from_started);
5910+
Assert(!cstate->binary);
5911+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5912+
if (PQputline(conn, cstate->line_buf.data) || PQputnbytes(conn, "\n", 1))
5913+
{
5914+
pgfdw_report_error(ERROR, NULL, conn, false, cstate->line_buf.data);
5915+
}
5916+
}
5917+
5918+
/* Finish COPY FROM */
5919+
static void
5920+
postgresEndForeignCopyFrom(EState *estate, ResultRelInfo *rinfo)
5921+
{
5922+
bool *copy_from_started;
5923+
UserMapping *user = (UserMapping *) rinfo->ri_FdwState;
5924+
ConnCacheEntry *conn_entry = GetConnectionCopyFrom(user, false,
5925+
&copy_from_started);
5926+
PGconn *conn = ConnectionEntryGetConn(conn_entry);
5927+
PGresult *res;
5928+
5929+
if (*copy_from_started)
5930+
{
5931+
/* TODO: PQgetResult? */
5932+
if (PQendcopy(conn))
5933+
{
5934+
pgfdw_report_error(ERROR, NULL, conn, false, "end postgres_fdw copy from");
5935+
}
5936+
while ((res = PQgetResult(conn)) != NULL)
5937+
{
5938+
/* TODO: get error? */
5939+
PQclear(res);
5940+
}
5941+
*copy_from_started = false;
5942+
ReleaseConnection(conn_entry);
5943+
}
5944+
}
5945+
58245946
void
58255947
_PG_init(void)
58265948
{

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#define POSTGRES_FDW_H
1515

1616
#include "foreign/foreign.h"
17+
#include "commands/copy.h"
1718
#include "lib/stringinfo.h"
1819
#include "nodes/relation.h"
1920
#include "utils/relcache.h"
@@ -122,6 +123,8 @@ extern void reset_transmission_modes(int nestlevel);
122123

123124
/* in connection.c */
124125
extern ConnCacheEntry *GetConnection(UserMapping *user, bool will_prep_stmt);
126+
extern ConnCacheEntry *GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
127+
bool **copy_from_started);
125128
extern PGconn *ConnectionEntryGetConn(ConnCacheEntry *entry);
126129
extern void ReleaseConnection(ConnCacheEntry *entry);
127130
extern unsigned int GetCursorNumber(ConnCacheEntry *entry);
@@ -186,6 +189,8 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
186189
List *remote_conds, List *pathkeys, bool is_subquery,
187190
List **retrieved_attrs, List **params_list);
188191
extern const char *get_jointype_name(JoinType jointype);
192+
extern void deparseCopyFromSql(StringInfo buf, Relation rel, CopyState cstate,
193+
const char *dest_relname);
189194

190195
/* in shippable.c */
191196
extern bool is_builtin(Oid objectId);

0 commit comments

Comments
 (0)