Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 038f586

Browse files
committed
pgbench: Prepare commands in pipelines in advance
Failing to do so results in an error when a pgbench script tries to start a serializable transaction inside a pipeline, because by the time BEGIN ISOLATION LEVEL SERIALIZABLE is executed, we're already in a transaction that has acquired a snapshot, so the server rightfully complains. We can work around that by preparing all commands in the pipeline before actually starting the pipeline. This changes the existing code in two aspects: first, we now prepare each command individually at the point where that command is about to be executed; previously, we would prepare all commands in a script as soon as the first command of that script would be executed. It's hard to see that this would make much of a difference (particularly since it only affects the first time to execute each script in a client), but I didn't actually try to measure it. Secondly, we no longer use PQsendPrepare() in pipeline mode, but only PQprepare. There's no specific reason for this change other than no longer needing to do differently in pipeline mode. (Previously we had no choice, because in pipeline mode PQprepare could not be used.) Backpatch to 14, where pgbench got support for pipeline mode. Reported-by: Yugo NAGATA <nagata@sraoss.co.jp> Discussion: https://postgr.es/m/20210716153013.fc53b1c780b06fccc07a7f0d@sraoss.co.jp
1 parent 8028e29 commit 038f586

File tree

2 files changed

+127
-51
lines changed

2 files changed

+127
-51
lines changed

src/bin/pgbench/pgbench.c

+107-51
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,8 @@ typedef struct
628628
pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
629629
pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
630630

631-
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
631+
/* whether client prepared each command of each script */
632+
bool **prepared;
632633

633634
/*
634635
* For processing failures and repeating transactions with serialization
@@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
733734
* argv Command arguments, the first of which is the command or SQL
734735
* string itself. For SQL commands, after post-processing
735736
* argv[0] is the same as 'lines' with variables substituted.
736-
* varprefix SQL commands terminated with \gset or \aset have this set
737+
* prepname The name that this command is prepared under, in prepare mode
738+
* varprefix SQL commands terminated with \gset or \aset have this set
737739
* to a non NULL value. If nonempty, it's used to prefix the
738740
* variable name that receives the value.
739741
* aset do gset on all possible queries of a combined query (\;).
@@ -751,6 +753,7 @@ typedef struct Command
751753
MetaCommand meta;
752754
int argc;
753755
char *argv[MAX_ARGS];
756+
char *prepname;
754757
char *varprefix;
755758
PgBenchExpr *expr;
756759
SimpleStats stats;
@@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
30063009
return true;
30073010
}
30083011

3009-
#define MAX_PREPARE_NAME 32
3010-
static void
3011-
preparedStatementName(char *buffer, int file, int state)
3012-
{
3013-
sprintf(buffer, "P%d_%d", file, state);
3014-
}
3015-
30163012
/*
30173013
* Report the abortion of the client when processing SQL commands.
30183014
*/
@@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
30533049
return i - 1;
30543050
}
30553051

3052+
/*
3053+
* Prepare the SQL command from st->use_file at command_num.
3054+
*/
3055+
static void
3056+
prepareCommand(CState *st, int command_num)
3057+
{
3058+
Command *command = sql_script[st->use_file].commands[command_num];
3059+
3060+
/* No prepare for non-SQL commands */
3061+
if (command->type != SQL_COMMAND)
3062+
return;
3063+
3064+
/*
3065+
* If not already done, allocate space for 'prepared' flags: one boolean
3066+
* for each command of each script.
3067+
*/
3068+
if (!st->prepared)
3069+
{
3070+
st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
3071+
for (int i = 0; i < num_scripts; i++)
3072+
{
3073+
ParsedScript *script = &sql_script[i];
3074+
int numcmds;
3075+
3076+
for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
3077+
;
3078+
st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
3079+
}
3080+
}
3081+
3082+
if (!st->prepared[st->use_file][command_num])
3083+
{
3084+
PGresult *res;
3085+
3086+
pg_log_debug("client %d preparing %s", st->id, command->prepname);
3087+
res = PQprepare(st->con, command->prepname,
3088+
command->argv[0], command->argc - 1, NULL);
3089+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3090+
pg_log_error("%s", PQerrorMessage(st->con));
3091+
PQclear(res);
3092+
st->prepared[st->use_file][command_num] = true;
3093+
}
3094+
}
3095+
3096+
/*
3097+
* Prepare all the commands in the script that come after the \startpipeline
3098+
* that's at position st->command, and the first \endpipeline we find.
3099+
*
3100+
* This sets the ->prepared flag for each relevant command as well as the
3101+
* \startpipeline itself, but doesn't move the st->command counter.
3102+
*/
3103+
static void
3104+
prepareCommandsInPipeline(CState *st)
3105+
{
3106+
int j;
3107+
Command **commands = sql_script[st->use_file].commands;
3108+
3109+
Assert(commands[st->command]->type == META_COMMAND &&
3110+
commands[st->command]->meta == META_STARTPIPELINE);
3111+
3112+
/*
3113+
* We set the 'prepared' flag on the \startpipeline itself to flag that we
3114+
* don't need to do this next time without calling prepareCommand(), even
3115+
* though we don't actually prepare this command.
3116+
*/
3117+
if (st->prepared &&
3118+
st->prepared[st->use_file][st->command])
3119+
return;
3120+
3121+
for (j = st->command + 1; commands[j] != NULL; j++)
3122+
{
3123+
if (commands[j]->type == META_COMMAND &&
3124+
commands[j]->meta == META_ENDPIPELINE)
3125+
break;
3126+
3127+
prepareCommand(st, j);
3128+
}
3129+
3130+
st->prepared[st->use_file][st->command] = true;
3131+
}
3132+
30563133
/* Send a SQL command, using the chosen querymode */
30573134
static bool
30583135
sendCommand(CState *st, Command *command)
@@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
30833160
}
30843161
else if (querymode == QUERY_PREPARED)
30853162
{
3086-
char name[MAX_PREPARE_NAME];
30873163
const char *params[MAX_ARGS];
30883164

3089-
if (!st->prepared[st->use_file])
3090-
{
3091-
int j;
3092-
Command **commands = sql_script[st->use_file].commands;
3093-
3094-
for (j = 0; commands[j] != NULL; j++)
3095-
{
3096-
PGresult *res;
3097-
3098-
if (commands[j]->type != SQL_COMMAND)
3099-
continue;
3100-
preparedStatementName(name, st->use_file, j);
3101-
if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3102-
{
3103-
res = PQprepare(st->con, name,
3104-
commands[j]->argv[0], commands[j]->argc - 1, NULL);
3105-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3106-
pg_log_error("%s", PQerrorMessage(st->con));
3107-
PQclear(res);
3108-
}
3109-
else
3110-
{
3111-
/*
3112-
* In pipeline mode, we use asynchronous functions. If a
3113-
* server-side error occurs, it will be processed later
3114-
* among the other results.
3115-
*/
3116-
if (!PQsendPrepare(st->con, name,
3117-
commands[j]->argv[0], commands[j]->argc - 1, NULL))
3118-
pg_log_error("%s", PQerrorMessage(st->con));
3119-
}
3120-
}
3121-
st->prepared[st->use_file] = true;
3122-
}
3123-
3165+
prepareCommand(st, st->command);
31243166
getQueryParams(&st->variables, command, params);
3125-
preparedStatementName(name, st->use_file, st->command);
31263167

3127-
pg_log_debug("client %d sending %s", st->id, name);
3128-
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
3168+
pg_log_debug("client %d sending %s", st->id, command->prepname);
3169+
r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
31293170
params, NULL, NULL, 0);
31303171
}
31313172
else /* unknown sql mode */
@@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
35973638
thread->conn_duration += now - start;
35983639

35993640
/* Reset session-local state */
3600-
memset(st->prepared, 0, sizeof(st->prepared));
3641+
pg_free(st->prepared);
3642+
st->prepared = NULL;
36013643
}
36023644

36033645
/*
@@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
43604402
return CSTATE_ABORTED;
43614403
}
43624404

4405+
/*
4406+
* If we're in prepared-query mode, we need to prepare all the
4407+
* commands that are inside the pipeline before we actually start the
4408+
* pipeline itself. This solves the problem that running BEGIN
4409+
* ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4410+
* snapshot having been acquired by the prepare within the pipeline.
4411+
*/
4412+
if (querymode == QUERY_PREPARED)
4413+
prepareCommandsInPipeline(st);
4414+
43634415
if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
43644416
{
43654417
commandFailed(st, "startpipeline", "already in pipeline mode");
@@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
54395491
my_command->varprefix = NULL; /* allocated later, if needed */
54405492
my_command->expr = NULL;
54415493
initSimpleStats(&my_command->stats);
5494+
my_command->prepname = NULL; /* set later, if needed */
54425495

54435496
return my_command;
54445497
}
@@ -5468,6 +5521,7 @@ static void
54685521
postprocess_sql_command(Command *my_command)
54695522
{
54705523
char buffer[128];
5524+
static int prepnum = 0;
54715525

54725526
Assert(my_command->type == SQL_COMMAND);
54735527

@@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command)
54765530
buffer[strcspn(buffer, "\n\r")] = '\0';
54775531
my_command->first_line = pg_strdup(buffer);
54785532

5479-
/* parse query if necessary */
5533+
/* Parse query and generate prepared statement name, if necessary */
54805534
switch (querymode)
54815535
{
54825536
case QUERY_SIMPLE:
54835537
my_command->argv[0] = my_command->lines.data;
54845538
my_command->argc++;
54855539
break;
5486-
case QUERY_EXTENDED:
54875540
case QUERY_PREPARED:
5541+
my_command->prepname = psprintf("P_%d", prepnum++);
5542+
/* fall through */
5543+
case QUERY_EXTENDED:
54885544
if (!parseQuery(my_command))
54895545
exit(1);
54905546
break;

src/bin/pgbench/t/001_pgbench_with_server.pl

+20
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,26 @@
839839
}
840840
});
841841

842+
# Working \startpipeline in prepared query mode with serializable
843+
$node->pgbench(
844+
'-c4 -j2 -t 10 -n -M prepared',
845+
0,
846+
[
847+
qr{type: .*/001_pgbench_pipeline_serializable},
848+
qr{actually processed: (\d+)/\1}
849+
],
850+
[],
851+
'working \startpipeline with serializable',
852+
{
853+
'001_pgbench_pipeline_serializable' => q{
854+
-- test startpipeline with serializable
855+
\startpipeline
856+
BEGIN ISOLATION LEVEL SERIALIZABLE;
857+
} . "select 1;\n" x 10 . q{
858+
END;
859+
\endpipeline
860+
}
861+
});
842862

843863
# trigger many expression errors
844864
my @errors = (

0 commit comments

Comments
 (0)