Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 41625ab

Browse files
committed
psql: Add support for pipelines
With \bind, \parse, \bind_named and \close, it is possible to issue queries from psql using the extended protocol. However, it was not possible to send these queries using libpq's pipeline mode. This feature has two advantages: - Testing. Pipeline tests were only possible with pgbench, using TAP tests. It now becomes possible to have more SQL tests that are able to stress the backend with pipelines and extended queries. More tests will be added in a follow-up commit that were discussed on some other threads. Some external projects in the community had to implement their own facility to work around this limitation. - Emulation of custom workloads, with more control over the actions taken by a client with libpq APIs. It is possible to emulate more workload patterns to bottleneck the backend with the extended query protocol. This patch adds six new meta-commands to be able to control pipelines: * \startpipeline starts a new pipeline. All extended queries are queued until the end of the pipeline are reached or a sync request is sent and processed. * \endpipeline ends an existing pipeline. All queued commands are sent to the server and all responses are processed by psql. * \syncpipeline queues a synchronisation request, without flushing the commands to the server, equivalent of PQsendPipelineSync(). * \flush, equivalent of PQflush(). * \flushrequest, equivalent of PQsendFlushRequest() * \getresults reads the server's results for the queries in a pipeline. Unsent data is automatically pushed when \getresults is called. It is possible to control the number of results read in a single meta-command execution with an optional parameter, 0 means that all the results should be read. Author: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com> Reviewed-by: Jelte Fennema-Nio <postgres@jeltef.nl> Reviewed-by: Kirill Reshke <reshkekirill@gmail.com> Discussion: https://postgr.es/m/CAO6_XqroE7JuMEm1sWz55rp9fAYX2JwmcP_3m_v51vnOFdsLiQ@mail.gmail.com
1 parent 40af897 commit 41625ab

File tree

11 files changed

+1497
-12
lines changed

11 files changed

+1497
-12
lines changed

doc/src/sgml/ref/psql-ref.sgml

+67
Original file line numberDiff line numberDiff line change
@@ -3674,6 +3674,73 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
36743674
</listitem>
36753675
</varlistentry>
36763676

3677+
<varlistentry id="app-psql-meta-command-pipeline">
3678+
<term><literal>\startpipeline</literal></term>
3679+
<term><literal>\syncpipeline</literal></term>
3680+
<term><literal>\endpipeline</literal></term>
3681+
<term><literal>\flushrequest</literal></term>
3682+
<term><literal>\flush</literal></term>
3683+
<term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
3684+
3685+
<listitem>
3686+
<para>
3687+
This group of commands implements pipelining of SQL statements.
3688+
A pipeline must begin with a <command>\startpipeline</command>
3689+
and end with an <command>\endpipeline</command>. In between there
3690+
may be any number of <command>\syncpipeline</command> commands,
3691+
which sends a <link linkend="protocol-flow-ext-query">sync message</link>
3692+
without ending the ongoing pipeline and flushing the send buffer.
3693+
In pipeline mode, statements are sent to the server without waiting
3694+
for the results of previous statements.
3695+
See <xref linkend="libpq-pipeline-mode"/> for more details.
3696+
</para>
3697+
3698+
<para>
3699+
Pipeline mode requires the use of the extended query protocol. All
3700+
queries need to be sent using the meta-commands
3701+
<literal>\bind</literal>, <literal>\bind_named</literal>,
3702+
<literal>\close</literal> or <literal>\parse</literal>. While a
3703+
pipeline is ongoing, <literal>\g</literal> will append the current
3704+
query buffer to the pipeline. Other meta-commands like
3705+
<literal>\gx</literal> or <literal>\gdesc</literal> are not allowed
3706+
in pipeline mode.
3707+
</para>
3708+
3709+
<para>
3710+
<command>\flushrequest</command> appends a flush command to the
3711+
pipeline, allowing to read results with
3712+
<command>\getresults</command> without issuing a sync or ending the
3713+
pipeline. <command>\getresults</command> will automatically push
3714+
unsent data to the server. <command>\flush</command> can be used to
3715+
manually push unsent data.
3716+
</para>
3717+
3718+
<para>
3719+
<command>\getresults</command> accepts an optional
3720+
<replaceable class="parameter">number_results</replaceable> parameter.
3721+
If provided, only the first
3722+
<replaceable class="parameter">number_results</replaceable> pending
3723+
results will be read. If not provided or <literal>0</literal>, all
3724+
pending results are read. The commands <literal>\bind</literal>,
3725+
<literal>\bind_named</literal>, <literal>\close</literal>,
3726+
<literal>\parse</literal> and <literal>\syncpipeline</literal>
3727+
generate one result to get.
3728+
</para>
3729+
3730+
<para>
3731+
Example:
3732+
<programlisting>
3733+
\startpipeline
3734+
SELECT 1 \bind \g
3735+
\flushrequest
3736+
\getresults
3737+
\endpipeline
3738+
</programlisting>
3739+
</para>
3740+
3741+
</listitem>
3742+
</varlistentry>
3743+
36773744

36783745
<varlistentry id="app-psql-meta-command-t-lc">
36793746
<term><literal>\t</literal></term>

src/bin/psql/command.c

+170
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
9090
PQExpBuffer query_buf);
9191
static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
9292
PQExpBuffer query_buf);
93+
static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
9394
static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
9495
static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
9596
static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
97+
static backslashResult exec_command_flush(PsqlScanState scan_state, bool active_branch);
98+
static backslashResult exec_command_flushrequest(PsqlScanState scan_state, bool active_branch);
9699
static backslashResult exec_command_g(PsqlScanState scan_state, bool active_branch,
97100
const char *cmd);
98101
static backslashResult process_command_g_options(char *first_option,
@@ -103,6 +106,7 @@ static backslashResult exec_command_gdesc(PsqlScanState scan_state, bool active_
103106
static backslashResult exec_command_getenv(PsqlScanState scan_state, bool active_branch,
104107
const char *cmd);
105108
static backslashResult exec_command_gexec(PsqlScanState scan_state, bool active_branch);
109+
static backslashResult exec_command_getresults(PsqlScanState scan_state, bool active_branch);
106110
static backslashResult exec_command_gset(PsqlScanState scan_state, bool active_branch);
107111
static backslashResult exec_command_help(PsqlScanState scan_state, bool active_branch);
108112
static backslashResult exec_command_html(PsqlScanState scan_state, bool active_branch);
@@ -132,6 +136,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
132136
const char *cmd);
133137
static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
134138
const char *cmd, bool is_func);
139+
static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
140+
static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
135141
static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
136142
static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
137143
static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
@@ -351,18 +357,26 @@ exec_command(const char *cmd,
351357
status = exec_command_else(scan_state, cstack, query_buf);
352358
else if (strcmp(cmd, "endif") == 0)
353359
status = exec_command_endif(scan_state, cstack, query_buf);
360+
else if (strcmp(cmd, "endpipeline") == 0)
361+
status = exec_command_endpipeline(scan_state, active_branch);
354362
else if (strcmp(cmd, "encoding") == 0)
355363
status = exec_command_encoding(scan_state, active_branch);
356364
else if (strcmp(cmd, "errverbose") == 0)
357365
status = exec_command_errverbose(scan_state, active_branch);
358366
else if (strcmp(cmd, "f") == 0)
359367
status = exec_command_f(scan_state, active_branch);
368+
else if (strcmp(cmd, "flush") == 0)
369+
status = exec_command_flush(scan_state, active_branch);
370+
else if (strcmp(cmd, "flushrequest") == 0)
371+
status = exec_command_flushrequest(scan_state, active_branch);
360372
else if (strcmp(cmd, "g") == 0 || strcmp(cmd, "gx") == 0)
361373
status = exec_command_g(scan_state, active_branch, cmd);
362374
else if (strcmp(cmd, "gdesc") == 0)
363375
status = exec_command_gdesc(scan_state, active_branch);
364376
else if (strcmp(cmd, "getenv") == 0)
365377
status = exec_command_getenv(scan_state, active_branch, cmd);
378+
else if (strcmp(cmd, "getresults") == 0)
379+
status = exec_command_getresults(scan_state, active_branch);
366380
else if (strcmp(cmd, "gexec") == 0)
367381
status = exec_command_gexec(scan_state, active_branch);
368382
else if (strcmp(cmd, "gset") == 0)
@@ -411,6 +425,10 @@ exec_command(const char *cmd,
411425
status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
412426
else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
413427
status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
428+
else if (strcmp(cmd, "startpipeline") == 0)
429+
status = exec_command_startpipeline(scan_state, active_branch);
430+
else if (strcmp(cmd, "syncpipeline") == 0)
431+
status = exec_command_syncpipeline(scan_state, active_branch);
414432
else if (strcmp(cmd, "t") == 0)
415433
status = exec_command_t(scan_state, active_branch);
416434
else if (strcmp(cmd, "T") == 0)
@@ -1515,6 +1533,44 @@ exec_command_f(PsqlScanState scan_state, bool active_branch)
15151533
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
15161534
}
15171535

1536+
/*
1537+
* \flush -- call PQflush() on the connection
1538+
*/
1539+
static backslashResult
1540+
exec_command_flush(PsqlScanState scan_state, bool active_branch)
1541+
{
1542+
backslashResult status = PSQL_CMD_SKIP_LINE;
1543+
1544+
if (active_branch)
1545+
{
1546+
pset.send_mode = PSQL_SEND_FLUSH;
1547+
status = PSQL_CMD_SEND;
1548+
}
1549+
else
1550+
ignore_slash_options(scan_state);
1551+
1552+
return status;
1553+
}
1554+
1555+
/*
1556+
* \flushrequest -- call PQsendFlushRequest() on the connection
1557+
*/
1558+
static backslashResult
1559+
exec_command_flushrequest(PsqlScanState scan_state, bool active_branch)
1560+
{
1561+
backslashResult status = PSQL_CMD_SKIP_LINE;
1562+
1563+
if (active_branch)
1564+
{
1565+
pset.send_mode = PSQL_SEND_FLUSH_REQUEST;
1566+
status = PSQL_CMD_SEND;
1567+
}
1568+
else
1569+
ignore_slash_options(scan_state);
1570+
1571+
return status;
1572+
}
1573+
15181574
/*
15191575
* \g [(pset-option[=pset-value] ...)] [filename/shell-command]
15201576
* \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
@@ -1550,6 +1606,14 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
15501606

15511607
if (status == PSQL_CMD_SKIP_LINE && active_branch)
15521608
{
1609+
if (strcmp(cmd, "gx") == 0 &&
1610+
PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1611+
{
1612+
pg_log_error("\\gx not allowed in pipeline mode");
1613+
clean_extended_state();
1614+
return PSQL_CMD_ERROR;
1615+
}
1616+
15531617
if (!fname)
15541618
pset.gfname = NULL;
15551619
else
@@ -1703,6 +1767,42 @@ exec_command_getenv(PsqlScanState scan_state, bool active_branch,
17031767
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
17041768
}
17051769

1770+
/*
1771+
* \getresults -- read results
1772+
*/
1773+
static backslashResult
1774+
exec_command_getresults(PsqlScanState scan_state, bool active_branch)
1775+
{
1776+
backslashResult status = PSQL_CMD_SKIP_LINE;
1777+
1778+
if (active_branch)
1779+
{
1780+
char *opt;
1781+
int num_results;
1782+
1783+
pset.send_mode = PSQL_SEND_GET_RESULTS;
1784+
status = PSQL_CMD_SEND;
1785+
opt = psql_scan_slash_option(scan_state, OT_NORMAL, NULL, false);
1786+
1787+
pset.requested_results = 0;
1788+
if (opt != NULL)
1789+
{
1790+
num_results = atoi(opt);
1791+
if (num_results < 0)
1792+
{
1793+
pg_log_error("\\getresults: invalid number of requested results");
1794+
return PSQL_CMD_SKIP_LINE;
1795+
}
1796+
pset.requested_results = num_results;
1797+
}
1798+
}
1799+
else
1800+
ignore_slash_options(scan_state);
1801+
1802+
return status;
1803+
}
1804+
1805+
17061806
/*
17071807
* \gexec -- send query and execute each field of result
17081808
*/
@@ -1713,6 +1813,12 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
17131813

17141814
if (active_branch)
17151815
{
1816+
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1817+
{
1818+
pg_log_error("\\gexec not allowed in pipeline mode");
1819+
clean_extended_state();
1820+
return PSQL_CMD_ERROR;
1821+
}
17161822
pset.gexec_flag = true;
17171823
status = PSQL_CMD_SEND;
17181824
}
@@ -1733,6 +1839,13 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
17331839
char *prefix = psql_scan_slash_option(scan_state,
17341840
OT_NORMAL, NULL, false);
17351841

1842+
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1843+
{
1844+
pg_log_error("\\gset not allowed in pipeline mode");
1845+
clean_extended_state();
1846+
return PSQL_CMD_ERROR;
1847+
}
1848+
17361849
if (prefix)
17371850
pset.gset_prefix = prefix;
17381851
else
@@ -2718,6 +2831,63 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
27182831
return status;
27192832
}
27202833

2834+
/*
2835+
* \startpipeline -- enter pipeline mode
2836+
*/
2837+
static backslashResult
2838+
exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
2839+
{
2840+
backslashResult status = PSQL_CMD_SKIP_LINE;
2841+
2842+
if (active_branch)
2843+
{
2844+
pset.send_mode = PSQL_SEND_START_PIPELINE_MODE;
2845+
status = PSQL_CMD_SEND;
2846+
}
2847+
else
2848+
ignore_slash_options(scan_state);
2849+
2850+
return status;
2851+
}
2852+
2853+
/*
2854+
* \syncpipeline -- send a sync message to an active pipeline
2855+
*/
2856+
static backslashResult
2857+
exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
2858+
{
2859+
backslashResult status = PSQL_CMD_SKIP_LINE;
2860+
2861+
if (active_branch)
2862+
{
2863+
pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
2864+
status = PSQL_CMD_SEND;
2865+
}
2866+
else
2867+
ignore_slash_options(scan_state);
2868+
2869+
return status;
2870+
}
2871+
2872+
/*
2873+
* \endpipeline -- end pipeline mode
2874+
*/
2875+
static backslashResult
2876+
exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
2877+
{
2878+
backslashResult status = PSQL_CMD_SKIP_LINE;
2879+
2880+
if (active_branch)
2881+
{
2882+
pset.send_mode = PSQL_SEND_END_PIPELINE_MODE;
2883+
status = PSQL_CMD_SEND;
2884+
}
2885+
else
2886+
ignore_slash_options(scan_state);
2887+
2888+
return status;
2889+
}
2890+
27212891
/*
27222892
* \t -- turn off table headers and row count
27232893
*/

0 commit comments

Comments
 (0)