Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 41b9c84

Browse files
committed
Replace libpq's "row processor" API with a "single row" mode.
After taking awhile to digest the row-processor feature that was added to libpq in commit 92785da, we've concluded it is over-complicated and too hard to use. Leave the core infrastructure changes in place (that is, there's still a row processor function inside libpq), but remove the exposed API pieces, and instead provide a "single row" mode switch that causes PQgetResult to return one row at a time in separate PGresult objects. This approach incurs more overhead than proper use of a row processor callback would, since construction of a PGresult per row adds extra cycles. However, it is far easier to use and harder to break. The single-row mode still affords applications the primary benefit that the row processor API was meant to provide, namely not having to accumulate large result sets in memory before processing them. Preliminary testing suggests that we can probably buy back most of the extra cycles by micro-optimizing construction of the extra results, but that task will be left for another day. Marko Kreen
1 parent 7c0fecd commit 41b9c84

File tree

10 files changed

+404
-655
lines changed

10 files changed

+404
-655
lines changed

contrib/dblink/dblink.c

+99-65
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ typedef struct storeInfo
7070
AttInMetadata *attinmeta;
7171
MemoryContext tmpcontext;
7272
char **cstrs;
73+
/* temp storage for results to avoid leaks on exception */
74+
PGresult *last_res;
75+
PGresult *cur_res;
7376
} storeInfo;
7477

7578
/*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
8386
const char *conname,
8487
const char *sql,
8588
bool fail);
86-
static int storeHandler(PGresult *res, const PGdataValue *columns,
87-
const char **errmsgp, void *param);
89+
static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
90+
static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
8891
static remoteConn *getConnectionByName(const char *name);
8992
static HTAB *createConnHash(void);
9093
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
630633
/* async query send */
631634
retval = PQsendQuery(conn, sql);
632635
if (retval != 1)
633-
elog(NOTICE, "%s", PQerrorMessage(conn));
636+
elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
634637

635638
PG_RETURN_INT32(retval);
636639
}
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
927930
/*
928931
* Execute the given SQL command and store its results into a tuplestore
929932
* to be returned as the result of the current function.
933+
*
930934
* This is equivalent to PQexec followed by materializeResult, but we make
931-
* use of libpq's "row processor" API to reduce per-row overhead.
935+
* use of libpq's single-row mode to avoid accumulating the whole result
936+
* inside libpq before it gets transferred to the tuplestore.
932937
*/
933938
static void
934939
materializeQueryResult(FunctionCallInfo fcinfo,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
944949
/* prepTuplestoreResult must have been called previously */
945950
Assert(rsinfo->returnMode == SFRM_Materialize);
946951

952+
/* initialize storeInfo to empty */
953+
memset(&sinfo, 0, sizeof(sinfo));
954+
sinfo.fcinfo = fcinfo;
955+
947956
PG_TRY();
948957
{
949-
/* initialize storeInfo to empty */
950-
memset(&sinfo, 0, sizeof(sinfo));
951-
sinfo.fcinfo = fcinfo;
952-
953-
/* We'll collect tuples using storeHandler */
954-
PQsetRowProcessor(conn, storeHandler, &sinfo);
955-
956-
res = PQexec(conn, sql);
957-
958-
/* We don't keep the custom row processor installed permanently */
959-
PQsetRowProcessor(conn, NULL, NULL);
958+
/* execute query, collecting any tuples into the tuplestore */
959+
res = storeQueryResult(&sinfo, conn, sql);
960960

961961
if (!res ||
962962
(PQresultStatus(res) != PGRES_COMMAND_OK &&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
975975
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
976976
{
977977
/*
978-
* storeHandler didn't get called, so we need to convert the
979-
* command status string to a tuple manually
978+
* storeRow didn't get called, so we need to convert the command
979+
* status string to a tuple manually
980980
*/
981981
TupleDesc tupdesc;
982982
AttInMetadata *attinmeta;
@@ -1008,49 +1008,103 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10081008
tuplestore_puttuple(tupstore, tuple);
10091009

10101010
PQclear(res);
1011+
res = NULL;
10111012
}
10121013
else
10131014
{
10141015
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1015-
/* storeHandler should have created a tuplestore */
1016+
/* storeRow should have created a tuplestore */
10161017
Assert(rsinfo->setResult != NULL);
10171018

10181019
PQclear(res);
1020+
res = NULL;
10191021
}
1022+
PQclear(sinfo.last_res);
1023+
sinfo.last_res = NULL;
1024+
PQclear(sinfo.cur_res);
1025+
sinfo.cur_res = NULL;
10201026
}
10211027
PG_CATCH();
10221028
{
1023-
/* be sure to unset the custom row processor */
1024-
PQsetRowProcessor(conn, NULL, NULL);
10251029
/* be sure to release any libpq result we collected */
1026-
if (res)
1027-
PQclear(res);
1030+
PQclear(res);
1031+
PQclear(sinfo.last_res);
1032+
PQclear(sinfo.cur_res);
10281033
/* and clear out any pending data in libpq */
1029-
while ((res = PQskipResult(conn)) != NULL)
1034+
while ((res = PQgetResult(conn)) != NULL)
10301035
PQclear(res);
10311036
PG_RE_THROW();
10321037
}
10331038
PG_END_TRY();
10341039
}
10351040

10361041
/*
1037-
* Custom row processor for materializeQueryResult.
1038-
* Prototype of this function must match PQrowProcessor.
1042+
* Execute query, and send any result rows to sinfo->tuplestore.
10391043
*/
1040-
static int
1041-
storeHandler(PGresult *res, const PGdataValue *columns,
1042-
const char **errmsgp, void *param)
1044+
static PGresult *
1045+
storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
1046+
{
1047+
bool first = true;
1048+
PGresult *res;
1049+
1050+
if (!PQsendQuery(conn, sql))
1051+
elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
1052+
1053+
if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1054+
elog(ERROR, "failed to set single-row mode for dblink query");
1055+
1056+
for (;;)
1057+
{
1058+
CHECK_FOR_INTERRUPTS();
1059+
1060+
sinfo->cur_res = PQgetResult(conn);
1061+
if (!sinfo->cur_res)
1062+
break;
1063+
1064+
if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1065+
{
1066+
/* got one row from possibly-bigger resultset */
1067+
storeRow(sinfo, sinfo->cur_res, first);
1068+
1069+
PQclear(sinfo->cur_res);
1070+
sinfo->cur_res = NULL;
1071+
first = false;
1072+
}
1073+
else
1074+
{
1075+
/* if empty resultset, fill tuplestore header */
1076+
if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1077+
storeRow(sinfo, sinfo->cur_res, first);
1078+
1079+
/* store completed result at last_res */
1080+
PQclear(sinfo->last_res);
1081+
sinfo->last_res = sinfo->cur_res;
1082+
sinfo->cur_res = NULL;
1083+
first = true;
1084+
}
1085+
}
1086+
1087+
/* return last_res */
1088+
res = sinfo->last_res;
1089+
sinfo->last_res = NULL;
1090+
return res;
1091+
}
1092+
1093+
/*
1094+
* Send single row to sinfo->tuplestore.
1095+
*
1096+
* If "first" is true, create the tuplestore using PGresult's metadata
1097+
* (in this case the PGresult might contain either zero or one row).
1098+
*/
1099+
static void
1100+
storeRow(storeInfo *sinfo, PGresult *res, bool first)
10431101
{
1044-
storeInfo *sinfo = (storeInfo *) param;
10451102
int nfields = PQnfields(res);
1046-
char **cstrs = sinfo->cstrs;
10471103
HeapTuple tuple;
1048-
char *pbuf;
1049-
int pbuflen;
10501104
int i;
10511105
MemoryContext oldcontext;
10521106

1053-
if (columns == NULL)
1107+
if (first)
10541108
{
10551109
/* Prepare for new result set */
10561110
ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
10981152
sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
10991153

11001154
/* Create a new, empty tuplestore */
1101-
oldcontext = MemoryContextSwitchTo(
1102-
rsinfo->econtext->ecxt_per_query_memory);
1155+
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
11031156
sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
11041157
rsinfo->setResult = sinfo->tuplestore;
11051158
rsinfo->setDesc = tupdesc;
11061159
MemoryContextSwitchTo(oldcontext);
11071160

1161+
/* Done if empty resultset */
1162+
if (PQntuples(res) == 0)
1163+
return;
1164+
11081165
/*
11091166
* Set up sufficiently-wide string pointers array; this won't change
11101167
* in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11211178
ALLOCSET_DEFAULT_MINSIZE,
11221179
ALLOCSET_DEFAULT_INITSIZE,
11231180
ALLOCSET_DEFAULT_MAXSIZE);
1124-
1125-
return 1;
11261181
}
11271182

1128-
CHECK_FOR_INTERRUPTS();
1183+
/* Should have a single-row result if we get here */
1184+
Assert(PQntuples(res) == 1);
11291185

11301186
/*
11311187
* Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11351191
oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
11361192

11371193
/*
1138-
* The strings passed to us are not null-terminated, but the datatype
1139-
* input functions we're about to call require null termination. Copy the
1140-
* strings and add null termination. As a micro-optimization, allocate
1141-
* all the strings with one palloc.
1194+
* Fill cstrs with null-terminated strings of column values.
11421195
*/
1143-
pbuflen = nfields; /* count the null terminators themselves */
11441196
for (i = 0; i < nfields; i++)
11451197
{
1146-
int len = columns[i].len;
1147-
1148-
if (len > 0)
1149-
pbuflen += len;
1150-
}
1151-
pbuf = (char *) palloc(pbuflen);
1152-
1153-
for (i = 0; i < nfields; i++)
1154-
{
1155-
int len = columns[i].len;
1156-
1157-
if (len < 0)
1158-
cstrs[i] = NULL;
1198+
if (PQgetisnull(res, 0, i))
1199+
sinfo->cstrs[i] = NULL;
11591200
else
1160-
{
1161-
cstrs[i] = pbuf;
1162-
memcpy(pbuf, columns[i].value, len);
1163-
pbuf += len;
1164-
*pbuf++ = '\0';
1165-
}
1201+
sinfo->cstrs[i] = PQgetvalue(res, 0, i);
11661202
}
11671203

11681204
/* Convert row to a tuple, and add it to the tuplestore */
1169-
tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
1205+
tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
11701206

11711207
tuplestore_puttuple(sinfo->tuplestore, tuple);
11721208

11731209
/* Clean up */
11741210
MemoryContextSwitchTo(oldcontext);
11751211
MemoryContextReset(sinfo->tmpcontext);
1176-
1177-
return 1;
11781212
}
11791213

11801214
/*

0 commit comments

Comments
 (0)