Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 93cf923

Browse files
alvherrehoriguti
andcommitted
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries via PQsendQuery in pipeline mode, causing several scenarios to misbehave in different ways -- most notably, as reported by Daniele Varrazzo, that a warning message is produced by libpq: message type 0x33 arrived from server while idle But it is also possible, if queries are sent and results consumed not in lockstep, for the expected mediating NULL result values from PQgetResult to be lost (a problem which has not been reported, but which is more serious). Fix this by introducing two new concepts: one is a command queue element PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server response to the Close message that is sent by PQsendQuery. Because the application is not expecting any PGresult from this, the mechanism to consume it is a bit hackish. The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE state for libpq's state machine to differentiate "really idle" from merely "the idle state that occurs in between reading results from the server for elements in the pipeline". This makes libpq not go fully IDLE when the libpq command queue contains entries; in normal cases, we only go IDLE once at the end of the pipeline, when the server response to the final SYNC message is received. (However, there are corner cases it doesn't fix, such as terminating the query sequence by PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is what requires PGQUERY_CLOSE bit above.) This last bit helps make the libpq state machine clearer; in particular we can get rid of an ugly hack in pqParseInput3 to avoid considering IDLE as such when the command queue contains entries. A new test mode is added to libpq_pipeline.c to tickle some related problematic cases. Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com> Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
1 parent e5085fc commit 93cf923

File tree

6 files changed

+424
-35
lines changed

6 files changed

+424
-35
lines changed

src/interfaces/libpq/fe-exec.c

Lines changed: 96 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
13801380
* itself consume commands from the queue; if we're in any other
13811381
* state, we don't have to do anything.
13821382
*/
1383-
if (conn->asyncStatus == PGASYNC_IDLE)
1383+
if (conn->asyncStatus == PGASYNC_IDLE ||
1384+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
13841385
pqPipelineProcessQueue(conn);
13851386
break;
13861387
}
@@ -1436,6 +1437,7 @@ static int
14361437
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14371438
{
14381439
PGcmdQueueEntry *entry = NULL;
1440+
PGcmdQueueEntry *entry2 = NULL;
14391441

14401442
if (!PQsendQueryStart(conn, newQuery))
14411443
return 0;
@@ -1451,6 +1453,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14511453
entry = pqAllocCmdQueueEntry(conn);
14521454
if (entry == NULL)
14531455
return 0; /* error msg already set */
1456+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1457+
{
1458+
entry2 = pqAllocCmdQueueEntry(conn);
1459+
if (entry2 == NULL)
1460+
goto sendFailed;
1461+
}
14541462

14551463
/* Send the query message(s) */
14561464
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1520,6 +1528,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
15201528

15211529
/* OK, it's launched! */
15221530
pqAppendCmdQueueEntry(conn, entry);
1531+
1532+
/*
1533+
* When pipeline mode is in use, we need a second entry in the command
1534+
* queue to represent Close Portal message. This allows us later to wait
1535+
* for the CloseComplete message to be received before getting in IDLE
1536+
* state.
1537+
*/
1538+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1539+
{
1540+
entry2->queryclass = PGQUERY_CLOSE;
1541+
entry2->query = NULL;
1542+
pqAppendCmdQueueEntry(conn, entry2);
1543+
}
1544+
15231545
return 1;
15241546

15251547
sendFailed:
@@ -1767,11 +1789,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
17671789
switch (conn->asyncStatus)
17681790
{
17691791
case PGASYNC_IDLE:
1792+
case PGASYNC_PIPELINE_IDLE:
17701793
case PGASYNC_READY:
17711794
case PGASYNC_READY_MORE:
17721795
case PGASYNC_BUSY:
17731796
/* ok to queue */
17741797
break;
1798+
17751799
case PGASYNC_COPY_IN:
17761800
case PGASYNC_COPY_OUT:
17771801
case PGASYNC_COPY_BOTH:
@@ -2144,16 +2168,21 @@ PQgetResult(PGconn *conn)
21442168
{
21452169
case PGASYNC_IDLE:
21462170
res = NULL; /* query is complete */
2147-
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
2148-
{
2149-
/*
2150-
* We're about to return the NULL that terminates the round of
2151-
* results from the current query; prepare to send the results
2152-
* of the next query when we're called next.
2153-
*/
2154-
pqPipelineProcessQueue(conn);
2155-
}
21562171
break;
2172+
case PGASYNC_PIPELINE_IDLE:
2173+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
2174+
2175+
/*
2176+
* We're about to return the NULL that terminates the round of
2177+
* results from the current query; prepare to send the results
2178+
* of the next query, if any, when we're called next. If there's
2179+
* no next element in the command queue, this gets us in IDLE
2180+
* state.
2181+
*/
2182+
pqPipelineProcessQueue(conn);
2183+
res = NULL; /* query is complete */
2184+
break;
2185+
21572186
case PGASYNC_READY:
21582187

21592188
/*
@@ -2174,7 +2203,7 @@ PQgetResult(PGconn *conn)
21742203
* We're about to send the results of the current query. Set
21752204
* us idle now, and ...
21762205
*/
2177-
conn->asyncStatus = PGASYNC_IDLE;
2206+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
21782207

21792208
/*
21802209
* ... in cases when we're sending a pipeline-sync result,
@@ -2220,6 +2249,22 @@ PQgetResult(PGconn *conn)
22202249
break;
22212250
}
22222251

2252+
/* If the next command we expect is CLOSE, read and consume it */
2253+
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
2254+
conn->cmd_queue_head &&
2255+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
2256+
{
2257+
if (res && res->resultStatus != PGRES_FATAL_ERROR)
2258+
{
2259+
conn->asyncStatus = PGASYNC_BUSY;
2260+
parseInput(conn);
2261+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
2262+
}
2263+
else
2264+
/* we won't ever see the Close */
2265+
pqCommandQueueAdvance(conn);
2266+
}
2267+
22232268
/* Time to fire PGEVT_RESULTCREATE events, if there are any */
22242269
if (res && res->nEvents > 0)
22252270
(void) PQfireResultCreateEvents(conn, res);
@@ -3014,7 +3059,10 @@ PQexitPipelineMode(PGconn *conn)
30143059
if (!conn)
30153060
return 0;
30163061

3017-
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
3062+
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
3063+
(conn->asyncStatus == PGASYNC_IDLE ||
3064+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
3065+
conn->cmd_queue_head == NULL)
30183066
return 1;
30193067

30203068
switch (conn->asyncStatus)
@@ -3031,9 +3079,16 @@ PQexitPipelineMode(PGconn *conn)
30313079
libpq_gettext("cannot exit pipeline mode while busy\n"));
30323080
return 0;
30333081

3034-
default:
3082+
case PGASYNC_IDLE:
3083+
case PGASYNC_PIPELINE_IDLE:
30353084
/* OK */
30363085
break;
3086+
3087+
case PGASYNC_COPY_IN:
3088+
case PGASYNC_COPY_OUT:
3089+
case PGASYNC_COPY_BOTH:
3090+
appendPQExpBufferStr(&conn->errorMessage,
3091+
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
30373092
}
30383093

30393094
/* still work to process */
@@ -3070,6 +3125,10 @@ pqCommandQueueAdvance(PGconn *conn)
30703125
prevquery = conn->cmd_queue_head;
30713126
conn->cmd_queue_head = conn->cmd_queue_head->next;
30723127

3128+
/* If the queue is now empty, reset the tail too */
3129+
if (conn->cmd_queue_head == NULL)
3130+
conn->cmd_queue_tail = NULL;
3131+
30733132
/* and make it recyclable */
30743133
prevquery->next = NULL;
30753134
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3092,15 +3151,35 @@ pqPipelineProcessQueue(PGconn *conn)
30923151
case PGASYNC_BUSY:
30933152
/* client still has to process current query or results */
30943153
return;
3154+
30953155
case PGASYNC_IDLE:
3156+
/*
3157+
* If we're in IDLE mode and there's some command in the queue,
3158+
* get us into PIPELINE_IDLE mode and process normally. Otherwise
3159+
* there's nothing for us to do.
3160+
*/
3161+
if (conn->cmd_queue_head != NULL)
3162+
{
3163+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
3164+
break;
3165+
}
3166+
return;
3167+
3168+
case PGASYNC_PIPELINE_IDLE:
3169+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
30963170
/* next query please */
30973171
break;
30983172
}
30993173

3100-
/* Nothing to do if not in pipeline mode, or queue is empty */
3101-
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
3102-
conn->cmd_queue_head == NULL)
3174+
/*
3175+
* If there are no further commands to process in the queue, get us in
3176+
* "real idle" mode now.
3177+
*/
3178+
if (conn->cmd_queue_head == NULL)
3179+
{
3180+
conn->asyncStatus = PGASYNC_IDLE;
31033181
return;
3182+
}
31043183

31053184
/*
31063185
* Reset the error state. This and the next couple of steps correspond to
@@ -3193,6 +3272,7 @@ PQpipelineSync(PGconn *conn)
31933272
case PGASYNC_READY_MORE:
31943273
case PGASYNC_BUSY:
31953274
case PGASYNC_IDLE:
3275+
case PGASYNC_PIPELINE_IDLE:
31963276
/* OK to send sync */
31973277
break;
31983278
}

src/interfaces/libpq/fe-protocol3.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
158158
if (conn->asyncStatus != PGASYNC_IDLE)
159159
return;
160160

161-
/*
162-
* We're also notionally not-IDLE when in pipeline mode the state
163-
* says "idle" (so we have completed receiving the results of one
164-
* query from the server and dispatched them to the application)
165-
* but another query is queued; yield back control to caller so
166-
* that they can initiate processing of the next query in the
167-
* queue.
168-
*/
169-
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
170-
conn->cmd_queue_head != NULL)
171-
return;
172-
173161
/*
174162
* Unexpected message in IDLE state; need to recover somehow.
175163
* ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
296284
}
297285
break;
298286
case '2': /* Bind Complete */
287+
/* Nothing to do for this message type */
288+
break;
299289
case '3': /* Close Complete */
300-
/* Nothing to do for these message types */
290+
/*
291+
* If we get CloseComplete when waiting for it, consume
292+
* the queue element and keep going. A result is not
293+
* expected from this message; it is just there so that
294+
* we know to wait for it when PQsendQuery is used in
295+
* pipeline mode, before going in IDLE state. Failing to
296+
* do this makes us receive CloseComplete when IDLE, which
297+
* creates problems.
298+
*/
299+
if (conn->cmd_queue_head &&
300+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
301+
{
302+
pqCommandQueueAdvance(conn);
303+
}
304+
301305
break;
302306
case 'S': /* parameter status */
303307
if (getParameterStatus(conn))

src/interfaces/libpq/libpq-int.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ typedef enum
225225
* query */
226226
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
227227
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
228-
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
228+
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
229+
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
229230
} PGAsyncStatusType;
230231

231232
/* Target server type (decoded value of target_session_attrs) */
@@ -311,7 +312,8 @@ typedef enum
311312
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
312313
PGQUERY_PREPARE, /* Parse only (PQprepare) */
313314
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
314-
PGQUERY_SYNC /* Sync (at end of a pipeline) */
315+
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
316+
PGQUERY_CLOSE
315317
} PGQueryClass;
316318

317319
/*

0 commit comments

Comments
 (0)