|
24 | 24 | #include "common/connect.h"
|
25 | 25 | #include "funcapi.h"
|
26 | 26 | #include "libpq-fe.h"
|
27 |
| -#include "libpq/libpq-be-fe-helpers.h" |
28 | 27 | #include "mb/pg_wchar.h"
|
29 | 28 | #include "miscadmin.h"
|
30 | 29 | #include "pgstat.h"
|
@@ -133,6 +132,7 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
|
133 | 132 | const char *appname, char **err)
|
134 | 133 | {
|
135 | 134 | WalReceiverConn *conn;
|
| 135 | + PostgresPollingStatusType status; |
136 | 136 | const char *keys[6];
|
137 | 137 | const char *vals[6];
|
138 | 138 | int i = 0;
|
@@ -188,17 +188,56 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
|
188 | 188 | Assert(i < sizeof(keys));
|
189 | 189 |
|
190 | 190 | conn = palloc0(sizeof(WalReceiverConn));
|
191 |
| - conn->streamConn = |
192 |
| - libpqsrv_connect_params(keys, vals, |
193 |
| - /* expand_dbname = */ true, |
194 |
| - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); |
| 191 | + conn->streamConn = PQconnectStartParams(keys, vals, |
| 192 | + /* expand_dbname = */ true); |
| 193 | + if (PQstatus(conn->streamConn) == CONNECTION_BAD) |
| 194 | + goto bad_connection_errmsg; |
| 195 | + |
| 196 | + /* |
| 197 | + * Poll connection until we have OK or FAILED status. |
| 198 | + * |
| 199 | + * Per spec for PQconnectPoll, first wait till socket is write-ready. |
| 200 | + */ |
| 201 | + status = PGRES_POLLING_WRITING; |
| 202 | + do |
| 203 | + { |
| 204 | + int io_flag; |
| 205 | + int rc; |
| 206 | + |
| 207 | + if (status == PGRES_POLLING_READING) |
| 208 | + io_flag = WL_SOCKET_READABLE; |
| 209 | +#ifdef WIN32 |
| 210 | + /* Windows needs a different test while waiting for connection-made */ |
| 211 | + else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) |
| 212 | + io_flag = WL_SOCKET_CONNECTED; |
| 213 | +#endif |
| 214 | + else |
| 215 | + io_flag = WL_SOCKET_WRITEABLE; |
| 216 | + |
| 217 | + rc = WaitLatchOrSocket(MyLatch, |
| 218 | + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, |
| 219 | + PQsocket(conn->streamConn), |
| 220 | + 0, |
| 221 | + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); |
| 222 | + |
| 223 | + /* Interrupted? */ |
| 224 | + if (rc & WL_LATCH_SET) |
| 225 | + { |
| 226 | + ResetLatch(MyLatch); |
| 227 | + ProcessWalRcvInterrupts(); |
| 228 | + } |
| 229 | + |
| 230 | + /* If socket is ready, advance the libpq state machine */ |
| 231 | + if (rc & io_flag) |
| 232 | + status = PQconnectPoll(conn->streamConn); |
| 233 | + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); |
195 | 234 |
|
196 | 235 | if (PQstatus(conn->streamConn) != CONNECTION_OK)
|
197 | 236 | goto bad_connection_errmsg;
|
198 | 237 |
|
199 | 238 | if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
|
200 | 239 | {
|
201 |
| - libpqsrv_disconnect(conn->streamConn); |
| 240 | + PQfinish(conn->streamConn); |
202 | 241 | pfree(conn);
|
203 | 242 |
|
204 | 243 | ereport(ERROR,
|
@@ -234,7 +273,7 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
|
234 | 273 |
|
235 | 274 | /* error path, error already set */
|
236 | 275 | bad_connection:
|
237 |
| - libpqsrv_disconnect(conn->streamConn); |
| 276 | + PQfinish(conn->streamConn); |
238 | 277 | pfree(conn);
|
239 | 278 | return NULL;
|
240 | 279 | }
|
@@ -770,7 +809,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
|
770 | 809 | static void
|
771 | 810 | libpqrcv_disconnect(WalReceiverConn *conn)
|
772 | 811 | {
|
773 |
| - libpqsrv_disconnect(conn->streamConn); |
| 812 | + PQfinish(conn->streamConn); |
774 | 813 | PQfreemem(conn->recvBuf);
|
775 | 814 | pfree(conn);
|
776 | 815 | }
|
|
0 commit comments