Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/interfaces/libpq/fe-exec.c19
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c90
-rw-r--r--src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl2
-rw-r--r--src/test/modules/libpq_pipeline/traces/nosync.trace92
4 files changed, 188 insertions, 15 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c1b12696725..b13ddab393b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn,
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
/*
* Give the data a push (in pipeline mode, only if we're past the size
@@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn,
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn)
*/
if (PQflush(conn) < 0)
goto sendFailed;
-
- /*
- * Call pqPipelineProcessQueue so the user can call start calling
- * PQgetResult.
- */
- pqPipelineProcessQueue(conn);
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71eedb6dbb4..249ee22105c 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -231,6 +231,93 @@ test_multi_pipelines(PGconn *conn)
}
/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+ int numqueries = 10;
+ int results = 0;
+ int sock = PQsocket(conn);
+
+ fprintf(stderr, "nosync... ");
+
+ if (sock < 0)
+ pg_fatal("invalid socket");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("could not enter pipeline mode");
+ for (int i = 0; i < numqueries; i++)
+ {
+ fd_set input_mask;
+ struct timeval tv;
+
+ if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("error sending select: %s", PQerrorMessage(conn));
+ PQflush(conn);
+
+ /*
+ * If the server has written anything to us, read (some of) it now.
+ */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+ if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+ pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+ }
+
+ /* tell server to flush its output buffer */
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ PQflush(conn);
+
+ /* Now read all results */
+ for (;;)
+ {
+ PGresult *res;
+
+ res = PQgetResult(conn);
+
+ /* NULL results are only expected after TUPLES_OK */
+ if (res == NULL)
+ pg_fatal("got unexpected NULL result after %d results", results);
+
+ /* We expect exactly one TUPLES_OK result for each query we sent */
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ PGresult *res2;
+
+ /* and one NULL result should follow each */
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ PQclear(res);
+ results++;
+
+ /* if we're done, we're done */
+ if (results == numqueries)
+ break;
+
+ continue;
+ }
+
+ /* anything else is unexpected */
+ pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+ }
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
* When an operation in a pipeline fails the rest of the pipeline is flushed. We
* still have to get results for each pipeline item, but the item will just be
* a PGRES_PIPELINE_ABORTED code.
@@ -1237,6 +1324,7 @@ print_test_list(void)
{
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
+ printf("nosync\n");
printf("pipeline_abort\n");
printf("pipelined_insert\n");
printf("prepared\n");
@@ -1334,6 +1422,8 @@ main(int argc, char **argv)
test_disallowed_in_pipeline(conn);
else if (strcmp(testname, "multi_pipelines") == 0)
test_multi_pipelines(conn);
+ else if (strcmp(testname, "nosync") == 0)
+ test_nosync(conn);
else if (strcmp(testname, "pipeline_abort") == 0)
test_pipeline_abort(conn);
else if (strcmp(testname, "pipelined_insert") == 0)
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index 2bc0e6c2236..4101ef950ee 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -26,7 +26,7 @@ for my $testname (@tests)
{
my @extraargs = ('-r', $numrows);
my $cmptrace = grep(/^$testname$/,
- qw(simple_pipeline multi_pipelines prepared singlerow
+ qw(simple_pipeline nosync multi_pipelines prepared singlerow
pipeline_abort transaction disallowed_in_pipeline)) > 0;
# For a bunch of tests, generate a libpq trace file too.
diff --git a/src/test/modules/libpq_pipeline/traces/nosync.trace b/src/test/modules/libpq_pipeline/traces/nosync.trace
new file mode 100644
index 00000000000..d99aac649db
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/nosync.trace
@@ -0,0 +1,92 @@
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+F 4 Terminate