Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6d4565a

Browse files
committed
libpq: Move cancellation related functions to fe-cancel.c
In follow up commits we'll add more functions related to query cancellations. This groups those all together instead of mixing them with the other functions in fe-connect.c. The formerly static parse_int_param() function had to be exported to other libpq users, so it's been renamed pqParseIntParam() and moved to a more reasonable place within fe-connect.c (rather than randomly between various keepalive-related routines). Author: Jelte Fennema-Nio <jelte.fennema@microsoft.com> Discussion: https://postgr.es/m/AM5PR83MB0178D3B31CA1B6EC4A8ECC42F7529@AM5PR83MB0178.EURPRD83.prod.outlook.com
1 parent cf765ff commit 6d4565a

File tree

5 files changed

+451
-426
lines changed

5 files changed

+451
-426
lines changed

src/interfaces/libpq/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ endif
3030
OBJS = \
3131
$(WIN32RES) \
3232
fe-auth-scram.o \
33+
fe-cancel.o \
3334
fe-connect.o \
3435
fe-exec.o \
3536
fe-lobj.o \

src/interfaces/libpq/fe-cancel.c

+387
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* fe-cancel.c
4+
* functions related to setting up a connection to the backend
5+
*
6+
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
*
10+
* IDENTIFICATION
11+
* src/interfaces/libpq/fe-cancel.c
12+
*
13+
*-------------------------------------------------------------------------
14+
*/
15+
16+
#include "postgres_fe.h"
17+
18+
#include <unistd.h>
19+
20+
#include "libpq-fe.h"
21+
#include "libpq-int.h"
22+
#include "port/pg_bswap.h"
23+
24+
/*
25+
* PQgetCancel: get a PGcancel structure corresponding to a connection.
26+
*
27+
* A copy is needed to be able to cancel a running query from a different
28+
* thread. If the same structure is used all structure members would have
29+
* to be individually locked (if the entire structure was locked, it would
30+
* be impossible to cancel a synchronous query because the structure would
31+
* have to stay locked for the duration of the query).
32+
*/
33+
PGcancel *
34+
PQgetCancel(PGconn *conn)
35+
{
36+
PGcancel *cancel;
37+
38+
if (!conn)
39+
return NULL;
40+
41+
if (conn->sock == PGINVALID_SOCKET)
42+
return NULL;
43+
44+
cancel = malloc(sizeof(PGcancel));
45+
if (cancel == NULL)
46+
return NULL;
47+
48+
memcpy(&cancel->raddr, &conn->raddr, sizeof(SockAddr));
49+
cancel->be_pid = conn->be_pid;
50+
cancel->be_key = conn->be_key;
51+
/* We use -1 to indicate an unset connection option */
52+
cancel->pgtcp_user_timeout = -1;
53+
cancel->keepalives = -1;
54+
cancel->keepalives_idle = -1;
55+
cancel->keepalives_interval = -1;
56+
cancel->keepalives_count = -1;
57+
if (conn->pgtcp_user_timeout != NULL)
58+
{
59+
if (!pqParseIntParam(conn->pgtcp_user_timeout,
60+
&cancel->pgtcp_user_timeout,
61+
conn, "tcp_user_timeout"))
62+
goto fail;
63+
}
64+
if (conn->keepalives != NULL)
65+
{
66+
if (!pqParseIntParam(conn->keepalives,
67+
&cancel->keepalives,
68+
conn, "keepalives"))
69+
goto fail;
70+
}
71+
if (conn->keepalives_idle != NULL)
72+
{
73+
if (!pqParseIntParam(conn->keepalives_idle,
74+
&cancel->keepalives_idle,
75+
conn, "keepalives_idle"))
76+
goto fail;
77+
}
78+
if (conn->keepalives_interval != NULL)
79+
{
80+
if (!pqParseIntParam(conn->keepalives_interval,
81+
&cancel->keepalives_interval,
82+
conn, "keepalives_interval"))
83+
goto fail;
84+
}
85+
if (conn->keepalives_count != NULL)
86+
{
87+
if (!pqParseIntParam(conn->keepalives_count,
88+
&cancel->keepalives_count,
89+
conn, "keepalives_count"))
90+
goto fail;
91+
}
92+
93+
return cancel;
94+
95+
fail:
96+
free(cancel);
97+
return NULL;
98+
}
99+
100+
/* PQfreeCancel: free a cancel structure */
101+
void
102+
PQfreeCancel(PGcancel *cancel)
103+
{
104+
free(cancel);
105+
}
106+
107+
108+
/*
109+
* Sets an integer socket option on a TCP socket, if the provided value is
110+
* not negative. Returns false if setsockopt fails for some reason.
111+
*
112+
* CAUTION: This needs to be signal safe, since it's used by PQcancel.
113+
*/
114+
#if defined(TCP_USER_TIMEOUT) || !defined(WIN32)
115+
static bool
116+
optional_setsockopt(int fd, int protoid, int optid, int value)
117+
{
118+
if (value < 0)
119+
return true;
120+
if (setsockopt(fd, protoid, optid, (char *) &value, sizeof(value)) < 0)
121+
return false;
122+
return true;
123+
}
124+
#endif
125+
126+
127+
/*
128+
* PQcancel: request query cancel
129+
*
130+
* The return value is true if the cancel request was successfully
131+
* dispatched, false if not (in which case an error message is available).
132+
* Note: successful dispatch is no guarantee that there will be any effect at
133+
* the backend. The application must read the operation result as usual.
134+
*
135+
* On failure, an error message is stored in *errbuf, which must be of size
136+
* errbufsize (recommended size is 256 bytes). *errbuf is not changed on
137+
* success return.
138+
*
139+
* CAUTION: we want this routine to be safely callable from a signal handler
140+
* (for example, an application might want to call it in a SIGINT handler).
141+
* This means we cannot use any C library routine that might be non-reentrant.
142+
* malloc/free are often non-reentrant, and anything that might call them is
143+
* just as dangerous. We avoid sprintf here for that reason. Building up
144+
* error messages with strcpy/strcat is tedious but should be quite safe.
145+
* We also save/restore errno in case the signal handler support doesn't.
146+
*/
147+
int
148+
PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
149+
{
150+
int save_errno = SOCK_ERRNO;
151+
pgsocket tmpsock = PGINVALID_SOCKET;
152+
int maxlen;
153+
struct
154+
{
155+
uint32 packetlen;
156+
CancelRequestPacket cp;
157+
} crp;
158+
159+
if (!cancel)
160+
{
161+
strlcpy(errbuf, "PQcancel() -- no cancel object supplied", errbufsize);
162+
/* strlcpy probably doesn't change errno, but be paranoid */
163+
SOCK_ERRNO_SET(save_errno);
164+
return false;
165+
}
166+
167+
/*
168+
* We need to open a temporary connection to the postmaster. Do this with
169+
* only kernel calls.
170+
*/
171+
if ((tmpsock = socket(cancel->raddr.addr.ss_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
172+
{
173+
strlcpy(errbuf, "PQcancel() -- socket() failed: ", errbufsize);
174+
goto cancel_errReturn;
175+
}
176+
177+
/*
178+
* Since this connection will only be used to send a single packet of
179+
* data, we don't need NODELAY. We also don't set the socket to
180+
* nonblocking mode, because the API definition of PQcancel requires the
181+
* cancel to be sent in a blocking way.
182+
*
183+
* We do set socket options related to keepalives and other TCP timeouts.
184+
* This ensures that this function does not block indefinitely when
185+
* reasonable keepalive and timeout settings have been provided.
186+
*/
187+
if (cancel->raddr.addr.ss_family != AF_UNIX &&
188+
cancel->keepalives != 0)
189+
{
190+
#ifndef WIN32
191+
if (!optional_setsockopt(tmpsock, SOL_SOCKET, SO_KEEPALIVE, 1))
192+
{
193+
strlcpy(errbuf, "PQcancel() -- setsockopt(SO_KEEPALIVE) failed: ", errbufsize);
194+
goto cancel_errReturn;
195+
}
196+
197+
#ifdef PG_TCP_KEEPALIVE_IDLE
198+
if (!optional_setsockopt(tmpsock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
199+
cancel->keepalives_idle))
200+
{
201+
strlcpy(errbuf, "PQcancel() -- setsockopt(" PG_TCP_KEEPALIVE_IDLE_STR ") failed: ", errbufsize);
202+
goto cancel_errReturn;
203+
}
204+
#endif
205+
206+
#ifdef TCP_KEEPINTVL
207+
if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPINTVL,
208+
cancel->keepalives_interval))
209+
{
210+
strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPINTVL) failed: ", errbufsize);
211+
goto cancel_errReturn;
212+
}
213+
#endif
214+
215+
#ifdef TCP_KEEPCNT
216+
if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPCNT,
217+
cancel->keepalives_count))
218+
{
219+
strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPCNT) failed: ", errbufsize);
220+
goto cancel_errReturn;
221+
}
222+
#endif
223+
224+
#else /* WIN32 */
225+
226+
#ifdef SIO_KEEPALIVE_VALS
227+
if (!pqSetKeepalivesWin32(tmpsock,
228+
cancel->keepalives_idle,
229+
cancel->keepalives_interval))
230+
{
231+
strlcpy(errbuf, "PQcancel() -- WSAIoctl(SIO_KEEPALIVE_VALS) failed: ", errbufsize);
232+
goto cancel_errReturn;
233+
}
234+
#endif /* SIO_KEEPALIVE_VALS */
235+
#endif /* WIN32 */
236+
237+
/* TCP_USER_TIMEOUT works the same way on Unix and Windows */
238+
#ifdef TCP_USER_TIMEOUT
239+
if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_USER_TIMEOUT,
240+
cancel->pgtcp_user_timeout))
241+
{
242+
strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_USER_TIMEOUT) failed: ", errbufsize);
243+
goto cancel_errReturn;
244+
}
245+
#endif
246+
}
247+
248+
retry3:
249+
if (connect(tmpsock, (struct sockaddr *) &cancel->raddr.addr,
250+
cancel->raddr.salen) < 0)
251+
{
252+
if (SOCK_ERRNO == EINTR)
253+
/* Interrupted system call - we'll just try again */
254+
goto retry3;
255+
strlcpy(errbuf, "PQcancel() -- connect() failed: ", errbufsize);
256+
goto cancel_errReturn;
257+
}
258+
259+
/* Create and send the cancel request packet. */
260+
261+
crp.packetlen = pg_hton32((uint32) sizeof(crp));
262+
crp.cp.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
263+
crp.cp.backendPID = pg_hton32(cancel->be_pid);
264+
crp.cp.cancelAuthCode = pg_hton32(cancel->be_key);
265+
266+
retry4:
267+
if (send(tmpsock, (char *) &crp, sizeof(crp), 0) != (int) sizeof(crp))
268+
{
269+
if (SOCK_ERRNO == EINTR)
270+
/* Interrupted system call - we'll just try again */
271+
goto retry4;
272+
strlcpy(errbuf, "PQcancel() -- send() failed: ", errbufsize);
273+
goto cancel_errReturn;
274+
}
275+
276+
/*
277+
* Wait for the postmaster to close the connection, which indicates that
278+
* it's processed the request. Without this delay, we might issue another
279+
* command only to find that our cancel zaps that command instead of the
280+
* one we thought we were canceling. Note we don't actually expect this
281+
* read to obtain any data, we are just waiting for EOF to be signaled.
282+
*/
283+
retry5:
284+
if (recv(tmpsock, (char *) &crp, 1, 0) < 0)
285+
{
286+
if (SOCK_ERRNO == EINTR)
287+
/* Interrupted system call - we'll just try again */
288+
goto retry5;
289+
/* we ignore other error conditions */
290+
}
291+
292+
/* All done */
293+
closesocket(tmpsock);
294+
SOCK_ERRNO_SET(save_errno);
295+
return true;
296+
297+
cancel_errReturn:
298+
299+
/*
300+
* Make sure we don't overflow the error buffer. Leave space for the \n at
301+
* the end, and for the terminating zero.
302+
*/
303+
maxlen = errbufsize - strlen(errbuf) - 2;
304+
if (maxlen >= 0)
305+
{
306+
/*
307+
* We can't invoke strerror here, since it's not signal-safe. Settle
308+
* for printing the decimal value of errno. Even that has to be done
309+
* the hard way.
310+
*/
311+
int val = SOCK_ERRNO;
312+
char buf[32];
313+
char *bufp;
314+
315+
bufp = buf + sizeof(buf) - 1;
316+
*bufp = '\0';
317+
do
318+
{
319+
*(--bufp) = (val % 10) + '0';
320+
val /= 10;
321+
} while (val > 0);
322+
bufp -= 6;
323+
memcpy(bufp, "error ", 6);
324+
strncat(errbuf, bufp, maxlen);
325+
strcat(errbuf, "\n");
326+
}
327+
if (tmpsock != PGINVALID_SOCKET)
328+
closesocket(tmpsock);
329+
SOCK_ERRNO_SET(save_errno);
330+
return false;
331+
}
332+
333+
/*
334+
* PQrequestCancel: old, not thread-safe function for requesting query cancel
335+
*
336+
* Returns true if able to send the cancel request, false if not.
337+
*
338+
* On failure, the error message is saved in conn->errorMessage; this means
339+
* that this can't be used when there might be other active operations on
340+
* the connection object.
341+
*
342+
* NOTE: error messages will be cut off at the current size of the
343+
* error message buffer, since we dare not try to expand conn->errorMessage!
344+
*/
345+
int
346+
PQrequestCancel(PGconn *conn)
347+
{
348+
int r;
349+
PGcancel *cancel;
350+
351+
/* Check we have an open connection */
352+
if (!conn)
353+
return false;
354+
355+
if (conn->sock == PGINVALID_SOCKET)
356+
{
357+
strlcpy(conn->errorMessage.data,
358+
"PQrequestCancel() -- connection is not open\n",
359+
conn->errorMessage.maxlen);
360+
conn->errorMessage.len = strlen(conn->errorMessage.data);
361+
conn->errorReported = 0;
362+
363+
return false;
364+
}
365+
366+
cancel = PQgetCancel(conn);
367+
if (cancel)
368+
{
369+
r = PQcancel(cancel, conn->errorMessage.data,
370+
conn->errorMessage.maxlen);
371+
PQfreeCancel(cancel);
372+
}
373+
else
374+
{
375+
strlcpy(conn->errorMessage.data, "out of memory",
376+
conn->errorMessage.maxlen);
377+
r = false;
378+
}
379+
380+
if (!r)
381+
{
382+
conn->errorMessage.len = strlen(conn->errorMessage.data);
383+
conn->errorReported = 0;
384+
}
385+
386+
return r;
387+
}

0 commit comments

Comments
 (0)