diff options
author | Robert Haas | 2016-06-30 22:35:32 +0000 |
---|---|---|
committer | Robert Haas | 2016-06-30 22:35:32 +0000 |
commit | 10c0558ffefcd12bf1d3dc35587eba41d1ce4571 (patch) | |
tree | 5b6344c420e2822f7c66f626294218adbd54343f /src/backend | |
parent | f8c58554db48fe004938a8a34a42afb78157b66c (diff) |
Fix several mistakes around parallel workers and client_encoding.
Previously, workers sent data to the leader using the client encoding.
That mostly worked, but the leader the converted the data back to the
server encoding. Since not all encoding conversions are reversible,
that could provoke failures. Fix by using the database encoding for
all communication between worker and leader.
Also, while temporary changes to GUC settings, as from the SET clause
of a function, are in general OK for parallel query, changing
client_encoding this way inside of a parallel worker is not OK.
Previously, that would have confused the leader; with these changes,
it would not confuse the leader, but it wouldn't do anything either.
So refuse such changes in parallel workers.
Also, the previous code naively assumed that when it received a
NotifyResonse from the worker, it could pass that directly back to the
user. But now that worker-to-leader communication always uses the
database encoding, that's clearly no longer correct - though,
actually, the old way was always broken for V2 clients. So
disassemble and reconstitute the message instead.
Issues reported by Peter Eisentraut. Patch by me, reviewed by
Peter Eisentraut.
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/parallel.c | 18 | ||||
-rw-r--r-- | src/backend/commands/async.c | 5 | ||||
-rw-r--r-- | src/backend/commands/variable.c | 24 | ||||
-rw-r--r-- | src/backend/libpq/pqformat.c | 30 | ||||
-rw-r--r-- | src/backend/libpq/pqmq.c | 2 |
5 files changed, 73 insertions, 6 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 088700e17cb..eef1dc2b184 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ - pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_endmessage(msg); + + NotifyMyFrontEnd(channel, payload, pid); + break; } @@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg) BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); + /* + * Set the client encoding to the database encoding, since that is what + * the leader will expect. + */ + SetClientEncoding(GetDatabaseEncoding()); + /* Restore GUC values from launching backend. */ gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); Assert(gucspace != NULL); diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c39ac3aeef0..716f1c33183 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(const char *channel, - const char *payload, - int32 srcPid); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); @@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void) /* * Send NOTIFY message to my front end. */ -static void +void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index 962d75db6e4..4ad8266a51c 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra) { int encoding = *((int *) extra); + /* + * Parallel workers send data to the leader, not the client. They always + * send data using the database encoding. + */ + if (IsParallelWorker()) + { + /* + * During parallel worker startup, we want to accept the leader's + * client_encoding setting so that anyone who looks at the value in + * the worker sees the same value that they would see in the leader. + */ + if (InitializingParallelWorker) + return; + + /* + * A change other than during startup, for example due to a SET clause + * attached to a function definition, should be rejected, as there is + * nothing we can do inside the worker to make it take effect. + */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot change client_encoding in a parallel worker"))); + } + /* We do not expect an error if PrepareClientEncoding succeeded */ if (SetClientEncoding(encoding) < 0) elog(LOG, "SetClientEncoding(%d) failed", encoding); diff --git a/src/backend/libpq/pqformat.c b/src/backend/libpq/pqformat.c index 4ddea8285fc..b5d9d64e547 100644 --- a/src/backend/libpq/pqformat.c +++ b/src/backend/libpq/pqformat.c @@ -65,6 +65,7 @@ * pq_copymsgbytes - copy raw data from a message buffer * pq_getmsgtext - get a counted text string (with conversion) * pq_getmsgstring - get a null-terminated text string (with conversion) + * pq_getmsgrawstring - get a null-terminated text string - NO conversion * pq_getmsgend - verify message fully consumed */ @@ -640,6 +641,35 @@ pq_getmsgstring(StringInfo msg) } /* -------------------------------- + * pq_getmsgrawstring - get a null-terminated text string - NO conversion + * + * Returns a pointer directly into the message buffer. + * -------------------------------- + */ +const char * +pq_getmsgrawstring(StringInfo msg) +{ + char *str; + int slen; + + str = &msg->data[msg->cursor]; + + /* + * It's safe to use strlen() here because a StringInfo is guaranteed to + * have a trailing null byte. But check we found a null inside the + * message. + */ + slen = strlen(str); + if (msg->cursor + slen >= msg->len) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid string in message"))); + msg->cursor += slen + 1; + + return str; +} + +/* -------------------------------- * pq_getmsgend - verify message fully consumed * -------------------------------- */ diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 3225c1fa0e7..0dcdee03db5 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata) pq_getmsgend(msg); break; } - value = pq_getmsgstring(msg); + value = pq_getmsgrawstring(msg); switch (code) { |