Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4794c2d

Browse files
committed
libpq: Add PQsendPipelineSync()
This new function is equivalent to PQpipelineSync(), except that it does not flush anything to the server except if the size threshold of the output buffer is reached; the user must subsequently call PQflush() instead. Its purpose is to reduce the system call overhead of pipeline mode, by giving to applications more control over the timing of the flushes when manipulating commands in pipeline mode. Author: Anton Kirilov Reviewed-by: Jelte Fennema-Nio, Robert Haas, Álvaro Herrera, Denis Laxalde, Michael Paquier Discussion: https://postgr.es/m/CACV6eE5arHFZEA717=iKEa_OewpVFfWJOmsOdGrqqsr8CJVfWQ@mail.gmail.com
1 parent 83eb244 commit 4794c2d

File tree

6 files changed

+138
-17
lines changed

6 files changed

+138
-17
lines changed

doc/src/sgml/libpq.sgml

+38-7
Original file line numberDiff line numberDiff line change
@@ -3547,8 +3547,9 @@ ExecStatusType PQresultStatus(const PGresult *res);
35473547
<listitem>
35483548
<para>
35493549
The <structname>PGresult</structname> represents a
3550-
synchronization point in pipeline mode, requested by
3551-
<xref linkend="libpq-PQpipelineSync"/>.
3550+
synchronization point in pipeline mode, requested by either
3551+
<xref linkend="libpq-PQpipelineSync"/> or
3552+
<xref linkend="libpq-PQsendPipelineSync"/>.
35523553
This status occurs only when pipeline mode has been selected.
35533554
</para>
35543555
</listitem>
@@ -5122,7 +5123,8 @@ int PQsendClosePortal(PGconn *conn, const char *portalName);
51225123
<xref linkend="libpq-PQsendDescribePrepared"/>,
51235124
<xref linkend="libpq-PQsendDescribePortal"/>,
51245125
<xref linkend="libpq-PQsendClosePrepared"/>,
5125-
<xref linkend="libpq-PQsendClosePortal"/>, or
5126+
<xref linkend="libpq-PQsendClosePortal"/>,
5127+
<xref linkend="libpq-PQsendPipelineSync"/>, or
51265128
<xref linkend="libpq-PQpipelineSync"/>
51275129
call, and returns it.
51285130
A null pointer is returned when the command is complete and there
@@ -5507,8 +5509,9 @@ int PQflush(PGconn *conn);
55075509
client sends them. The server will begin executing the commands in the
55085510
pipeline immediately, not waiting for the end of the pipeline.
55095511
Note that results are buffered on the server side; the server flushes
5510-
that buffer when a synchronization point is established with
5511-
<function>PQpipelineSync</function>, or when
5512+
that buffer when a synchronization point is established with either
5513+
<function>PQpipelineSync</function> or
5514+
<function>PQsendPipelineSync</function>, or when
55125515
<function>PQsendFlushRequest</function> is called.
55135516
If any statement encounters an error, the server aborts the current
55145517
transaction and does not execute any subsequent command in the queue
@@ -5565,7 +5568,8 @@ int PQflush(PGconn *conn);
55655568
<type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
55665569
and <literal>PGRES_PIPELINE_ABORTED</literal>.
55675570
<literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
5568-
<function>PQpipelineSync</function> at the corresponding point
5571+
<function>PQpipelineSync</function> or
5572+
<function>PQsendPipelineSync</function> at the corresponding point
55695573
in the pipeline.
55705574
<literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
55715575
query result for the first error and all subsequent results
@@ -5603,7 +5607,8 @@ int PQflush(PGconn *conn);
56035607
<function>PQresultStatus</function> will report a
56045608
<literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
56055609
operation in an aborted pipeline. The result for
5606-
<function>PQpipelineSync</function> is reported as
5610+
<function>PQpipelineSync</function> or
5611+
<function>PQsendPipelineSync</function> is reported as
56075612
<literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
56085613
and resumption of normal result processing.
56095614
</para>
@@ -5810,6 +5815,32 @@ int PQpipelineSync(PGconn *conn);
58105815
</listitem>
58115816
</varlistentry>
58125817

5818+
<varlistentry id="libpq-PQsendPipelineSync">
5819+
<term><function>PQsendPipelineSync</function><indexterm><primary>PQsendPipelineSync</primary></indexterm></term>
5820+
5821+
<listitem>
5822+
<para>
5823+
Marks a synchronization point in a pipeline by sending a
5824+
<link linkend="protocol-flow-ext-query">sync message</link>
5825+
without flushing the send buffer. This serves as
5826+
the delimiter of an implicit transaction and an error recovery
5827+
point; see <xref linkend="libpq-pipeline-errors"/>.
5828+
5829+
<synopsis>
5830+
int PQsendPipelineSync(PGconn *conn);
5831+
</synopsis>
5832+
</para>
5833+
<para>
5834+
Returns 1 for success. Returns 0 if the connection is not in
5835+
pipeline mode or sending a
5836+
<link linkend="protocol-flow-ext-query">sync message</link>
5837+
failed.
5838+
Note that the message is not itself flushed to the server automatically;
5839+
use <function>PQflush</function> if necessary.
5840+
</para>
5841+
</listitem>
5842+
</varlistentry>
5843+
58135844
<varlistentry id="libpq-PQsendFlushRequest">
58145845
<term><function>PQsendFlushRequest</function><indexterm><primary>PQsendFlushRequest</primary></indexterm></term>
58155846

src/interfaces/libpq/exports.txt

+1
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,4 @@ PQclosePortal 189
192192
PQsendClosePrepared 190
193193
PQsendClosePortal 191
194194
PQchangePassword 192
195+
PQsendPipelineSync 193

src/interfaces/libpq/fe-exec.c

+44-10
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type,
8181
const char *target);
8282
static int check_field_number(const PGresult *res, int field_num);
8383
static void pqPipelineProcessQueue(PGconn *conn);
84+
static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
8485
static int pqPipelineFlush(PGconn *conn);
8586

8687

@@ -3224,25 +3225,48 @@ pqPipelineProcessQueue(PGconn *conn)
32243225
/*
32253226
* PQpipelineSync
32263227
* Send a Sync message as part of a pipeline, and flush to server
3228+
*/
3229+
int
3230+
PQpipelineSync(PGconn *conn)
3231+
{
3232+
return pqPipelineSyncInternal(conn, true);
3233+
}
3234+
3235+
/*
3236+
* PQsendPipelineSync
3237+
* Send a Sync message as part of a pipeline, without flushing to server
3238+
*/
3239+
int
3240+
PQsendPipelineSync(PGconn *conn)
3241+
{
3242+
return pqPipelineSyncInternal(conn, false);
3243+
}
3244+
3245+
/*
3246+
* Workhorse function for PQpipelineSync and PQsendPipelineSync.
32273247
*
32283248
* It's legal to start submitting more commands in the pipeline immediately,
32293249
* without waiting for the results of the current pipeline. There's no need to
32303250
* end pipeline mode and start it again.
32313251
*
3232-
* If a command in a pipeline fails, every subsequent command up to and including
3233-
* the result to the Sync message sent by PQpipelineSync gets set to
3234-
* PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
3235-
* error, a PGresult with PGRES_PIPELINE_SYNC is produced.
3252+
* If a command in a pipeline fails, every subsequent command up to and
3253+
* including the result to the Sync message sent by pqPipelineSyncInternal
3254+
* gets set to PGRES_PIPELINE_ABORTED state. If the whole pipeline is
3255+
* processed without error, a PGresult with PGRES_PIPELINE_SYNC is produced.
32363256
*
3237-
* Queries can already have been sent before PQpipelineSync is called, but
3238-
* PQpipelineSync needs to be called before retrieving command results.
3257+
* Queries can already have been sent before pqPipelineSyncInternal is called,
3258+
* but pqPipelineSyncInternal needs to be called before retrieving command
3259+
* results.
32393260
*
32403261
* The connection will remain in pipeline mode and unavailable for new
32413262
* synchronous command execution functions until all results from the pipeline
32423263
* are processed by the client.
3264+
*
3265+
* immediate_flush controls if the flush happens immediately after sending the
3266+
* Sync message or not.
32433267
*/
3244-
int
3245-
PQpipelineSync(PGconn *conn)
3268+
static int
3269+
pqPipelineSyncInternal(PGconn *conn, bool immediate_flush)
32463270
{
32473271
PGcmdQueueEntry *entry;
32483272

@@ -3288,9 +3312,19 @@ PQpipelineSync(PGconn *conn)
32883312
/*
32893313
* Give the data a push. In nonblock mode, don't complain if we're unable
32903314
* to send it all; PQgetResult() will do any additional flushing needed.
3315+
* If immediate_flush is disabled, the data is pushed if we are past the
3316+
* size threshold.
32913317
*/
3292-
if (PQflush(conn) < 0)
3293-
goto sendFailed;
3318+
if (immediate_flush)
3319+
{
3320+
if (pqFlush(conn) < 0)
3321+
goto sendFailed;
3322+
}
3323+
else
3324+
{
3325+
if (pqPipelineFlush(conn) < 0)
3326+
goto sendFailed;
3327+
}
32943328

32953329
/* OK, it's launched! */
32963330
pqAppendCmdQueueEntry(conn, entry);

src/interfaces/libpq/libpq-fe.h

+1
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn);
474474
extern int PQexitPipelineMode(PGconn *conn);
475475
extern int PQpipelineSync(PGconn *conn);
476476
extern int PQsendFlushRequest(PGconn *conn);
477+
extern int PQsendPipelineSync(PGconn *conn);
477478

478479
/* LISTEN/NOTIFY support */
479480
extern PGnotify *PQnotifies(PGconn *conn);

src/test/modules/libpq_pipeline/libpq_pipeline.c

+43
Original file line numberDiff line numberDiff line change
@@ -162,21 +162,35 @@ test_multi_pipelines(PGconn *conn)
162162
if (PQenterPipelineMode(conn) != 1)
163163
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164164

165+
/* first pipeline */
165166
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
166167
dummy_params, NULL, NULL, 0) != 1)
167168
pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
168169

169170
if (PQpipelineSync(conn) != 1)
170171
pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
171172

173+
/* second pipeline */
172174
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
173175
dummy_params, NULL, NULL, 0) != 1)
174176
pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
175177

178+
/* Skip flushing once. */
179+
if (PQsendPipelineSync(conn) != 1)
180+
pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
181+
182+
/* third pipeline */
183+
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
184+
dummy_params, NULL, NULL, 0) != 1)
185+
pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
186+
176187
if (PQpipelineSync(conn) != 1)
177188
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
178189

179190
/* OK, start processing the results */
191+
192+
/* first pipeline */
193+
180194
res = PQgetResult(conn);
181195
if (res == NULL)
182196
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
@@ -214,6 +228,35 @@ test_multi_pipelines(PGconn *conn)
214228
if (PQresultStatus(res) != PGRES_TUPLES_OK)
215229
pg_fatal("Unexpected result code %s from second pipeline item",
216230
PQresStatus(PQresultStatus(res)));
231+
PQclear(res);
232+
res = NULL;
233+
234+
if (PQgetResult(conn) != NULL)
235+
pg_fatal("PQgetResult returned something extra after first result");
236+
237+
if (PQexitPipelineMode(conn) != 0)
238+
pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
239+
240+
res = PQgetResult(conn);
241+
if (res == NULL)
242+
pg_fatal("PQgetResult returned null when sync result expected: %s",
243+
PQerrorMessage(conn));
244+
245+
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
246+
pg_fatal("Unexpected result code %s instead of sync result, error: %s",
247+
PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
248+
PQclear(res);
249+
250+
/* third pipeline */
251+
252+
res = PQgetResult(conn);
253+
if (res == NULL)
254+
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
255+
PQerrorMessage(conn));
256+
257+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
258+
pg_fatal("Unexpected result code %s from third pipeline item",
259+
PQresStatus(PQresultStatus(res)));
217260

218261
res = PQgetResult(conn);
219262
if (res != NULL)

src/test/modules/libpq_pipeline/traces/multi_pipelines.trace

+11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0
88
F 6 Describe P ""
99
F 9 Execute "" 0
1010
F 4 Sync
11+
F 21 Parse "" "SELECT $1" 1 NNNN
12+
F 19 Bind "" "" 0 1 1 '1' 1 0
13+
F 6 Describe P ""
14+
F 9 Execute "" 0
15+
F 4 Sync
16+
B 4 ParseComplete
17+
B 4 BindComplete
18+
B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
19+
B 11 DataRow 1 1 '1'
20+
B 13 CommandComplete "SELECT 1"
21+
B 5 ReadyForQuery I
1122
B 4 ParseComplete
1223
B 4 BindComplete
1324
B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0

0 commit comments

Comments
 (0)