Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b8ba734

Browse files
committed
Fix handling of errors in libpq pipelines
The logic to keep the libpq command queue in sync with queries that have been processed had a bug when errors were returned for reasons other than problems in queries -- for example, when a connection is lost. We incorrectly consumed an element from the command queue every time, but this is wrong and can lead to the queue becoming empty ahead of time, leading to later malfunction: PQgetResult would return nothing, potentially causing the calling application to enter a busy loop. Fix by making the SYNC queue element a barrier that can only be consumed when a SYNC message is received. Backpatch to 14. Reported by: Иван Трофимов (Ivan Trofimov) <i.trofimow@yandex.ru> Discussion: https://postgr.es/m/17948-fcace7557e449957@postgresql.org
1 parent 63c5df1 commit b8ba734

File tree

3 files changed

+40
-25
lines changed

3 files changed

+40
-25
lines changed

src/interfaces/libpq/fe-exec.c

+36-17
Original file line numberDiff line numberDiff line change
@@ -2112,19 +2112,12 @@ PQgetResult(PGconn *conn)
21122112
break;
21132113

21142114
case PGASYNC_READY:
2115-
2116-
/*
2117-
* For any query type other than simple query protocol, we advance
2118-
* the command queue here. This is because for simple query
2119-
* protocol we can get the READY state multiple times before the
2120-
* command is actually complete, since the command string can
2121-
* contain many queries. In simple query protocol, the queue
2122-
* advance is done by fe-protocol3 when it receives ReadyForQuery.
2123-
*/
2124-
if (conn->cmd_queue_head &&
2125-
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
2126-
pqCommandQueueAdvance(conn);
21272115
res = pqPrepareAsyncResult(conn);
2116+
2117+
/* Advance the queue as appropriate */
2118+
pqCommandQueueAdvance(conn, false,
2119+
res->resultStatus == PGRES_PIPELINE_SYNC);
2120+
21282121
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
21292122
{
21302123
/*
@@ -3088,26 +3081,52 @@ PQexitPipelineMode(PGconn *conn)
30883081

30893082
/*
30903083
* pqCommandQueueAdvance
3091-
* Remove one query from the command queue, when we receive
3092-
* all results from the server that pertain to it.
3084+
* Remove one query from the command queue, if appropriate.
3085+
*
3086+
* If we have received all results corresponding to the head element
3087+
* in the command queue, remove it.
3088+
*
3089+
* In simple query protocol we must not advance the command queue until the
3090+
* ReadyForQuery message has been received. This is because in simple mode a
3091+
* command can have multiple queries, and we must process result for all of
3092+
* them before moving on to the next command.
3093+
*
3094+
* Another consideration is synchronization during error processing in
3095+
* extended query protocol: we refuse to advance the queue past a SYNC queue
3096+
* element, unless the result we've received is also a SYNC. In particular
3097+
* this protects us from advancing when an error is received at an
3098+
* inappropriate moment.
30933099
*/
30943100
void
3095-
pqCommandQueueAdvance(PGconn *conn)
3101+
pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync)
30963102
{
30973103
PGcmdQueueEntry *prevquery;
30983104

30993105
if (conn->cmd_queue_head == NULL)
31003106
return;
31013107

3102-
/* delink from queue */
3108+
/*
3109+
* If processing a query of simple query protocol, we only advance the
3110+
* queue when we receive the ReadyForQuery message for it.
3111+
*/
3112+
if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery)
3113+
return;
3114+
3115+
/*
3116+
* If we're waiting for a SYNC, don't advance the queue until we get one.
3117+
*/
3118+
if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync)
3119+
return;
3120+
3121+
/* delink element from queue */
31033122
prevquery = conn->cmd_queue_head;
31043123
conn->cmd_queue_head = conn->cmd_queue_head->next;
31053124

31063125
/* If the queue is now empty, reset the tail too */
31073126
if (conn->cmd_queue_head == NULL)
31083127
conn->cmd_queue_tail = NULL;
31093128

3110-
/* and make it recyclable */
3129+
/* and make the queue element recyclable */
31113130
prevquery->next = NULL;
31123131
pqRecycleCmdQueueEntry(conn, prevquery);
31133132
}

src/interfaces/libpq/fe-protocol3.c

+2-7
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,8 @@ pqParseInput3(PGconn *conn)
240240
}
241241
else
242242
{
243-
/*
244-
* In simple query protocol, advance the command queue
245-
* (see PQgetResult).
246-
*/
247-
if (conn->cmd_queue_head &&
248-
conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
249-
pqCommandQueueAdvance(conn);
243+
/* Advance the command queue and set us idle */
244+
pqCommandQueueAdvance(conn, true, false);
250245
conn->asyncStatus = PGASYNC_IDLE;
251246
}
252247
break;

src/interfaces/libpq/libpq-int.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,8 @@ extern void pqSaveMessageField(PGresult *res, char code,
698698
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
699699
const char *value);
700700
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
701-
extern void pqCommandQueueAdvance(PGconn *conn);
701+
extern void pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery,
702+
bool gotSync);
702703
extern int PQsendQueryContinue(PGconn *conn, const char *query);
703704

704705
/* === in fe-protocol3.c === */

0 commit comments

Comments
 (0)