Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 00dbb8c

Browse files
author
Commitfest Bot
committed
[CF 4748] v5 - Network failure may prevent promotion
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/4748 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/d0c357e9-0978-466e-9830-148f816d9367@postgrespro.ru Author(s): Kyotaro Horiguchi
2 parents 231064a + 4637600 commit 00dbb8c

File tree

3 files changed

+52
-202
lines changed

3 files changed

+52
-202
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 42 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "common/connect.h"
2626
#include "funcapi.h"
2727
#include "libpq-fe.h"
28+
#include "libpq/libpq-be-fe-helpers.h"
2829
#include "mb/pg_wchar.h"
2930
#include "miscadmin.h"
3031
#include "pgstat.h"
@@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
113114
};
114115

115116
/* Prototypes for private functions */
116-
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
117-
static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
118117
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
119118

120119
/*
@@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
148147
bool must_use_password, const char *appname, char **err)
149148
{
150149
WalReceiverConn *conn;
151-
PostgresPollingStatusType status;
152150
const char *keys[6];
153151
const char *vals[6];
154152
int i = 0;
@@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
214212
Assert(i < lengthof(keys));
215213

216214
conn = palloc0(sizeof(WalReceiverConn));
217-
conn->streamConn = PQconnectStartParams(keys, vals,
218-
/* expand_dbname = */ true);
219-
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
220-
goto bad_connection_errmsg;
221-
222-
/*
223-
* Poll connection until we have OK or FAILED status.
224-
*
225-
* Per spec for PQconnectPoll, first wait till socket is write-ready.
226-
*/
227-
status = PGRES_POLLING_WRITING;
228-
do
229-
{
230-
int io_flag;
231-
int rc;
232-
233-
if (status == PGRES_POLLING_READING)
234-
io_flag = WL_SOCKET_READABLE;
235-
#ifdef WIN32
236-
/* Windows needs a different test while waiting for connection-made */
237-
else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
238-
io_flag = WL_SOCKET_CONNECTED;
239-
#endif
240-
else
241-
io_flag = WL_SOCKET_WRITEABLE;
242-
243-
rc = WaitLatchOrSocket(MyLatch,
244-
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
245-
PQsocket(conn->streamConn),
246-
0,
247-
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
248-
249-
/* Interrupted? */
250-
if (rc & WL_LATCH_SET)
251-
{
252-
ResetLatch(MyLatch);
253-
ProcessWalRcvInterrupts();
254-
}
255-
256-
/* If socket is ready, advance the libpq state machine */
257-
if (rc & io_flag)
258-
status = PQconnectPoll(conn->streamConn);
259-
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
215+
conn->streamConn =
216+
libpqsrv_connect_params(keys, vals,
217+
/* expand_dbname = */ true,
218+
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
260219

261220
if (PQstatus(conn->streamConn) != CONNECTION_OK)
262221
goto bad_connection_errmsg;
263222

264223
if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
265224
{
266-
PQfinish(conn->streamConn);
225+
libpqsrv_disconnect(conn->streamConn);
267226
pfree(conn);
268227

269228
ereport(ERROR,
@@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
281240
{
282241
PGresult *res;
283242

284-
res = libpqrcv_PQexec(conn->streamConn,
285-
ALWAYS_SECURE_SEARCH_PATH_SQL);
243+
res = libpqsrv_exec(conn->streamConn,
244+
ALWAYS_SECURE_SEARCH_PATH_SQL,
245+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
286246
if (PQresultStatus(res) != PGRES_TUPLES_OK)
287247
{
288248
PQclear(res);
@@ -303,7 +263,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
303263

304264
/* error path, error already set */
305265
bad_connection:
306-
PQfinish(conn->streamConn);
266+
libpqsrv_disconnect(conn->streamConn);
307267
pfree(conn);
308268
return NULL;
309269
}
@@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
454414
* Get the system identifier and timeline ID as a DataRow message from the
455415
* primary server.
456416
*/
457-
res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
417+
res = libpqsrv_exec(conn->streamConn,
418+
"IDENTIFY_SYSTEM",
419+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
458420
if (PQresultStatus(res) != PGRES_TUPLES_OK)
459421
{
460422
PQclear(res);
@@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
631593
options->proto.physical.startpointTLI);
632594

633595
/* Start streaming. */
634-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
596+
res = libpqsrv_exec(conn->streamConn,
597+
cmd.data,
598+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
635599
pfree(cmd.data);
636600

637601
if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
661625
PGresult *res;
662626

663627
/*
664-
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
628+
* Send copy-end message. As in libpqsrv_exec, this could theoretically
665629
* block, but the risk seems small.
666630
*/
667631
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
681645
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
682646
* also possible in case we aborted the copy in mid-stream.
683647
*/
684-
res = libpqrcv_PQgetResult(conn->streamConn);
648+
res = libpqsrv_get_result(conn->streamConn,
649+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
685650
if (PQresultStatus(res) == PGRES_TUPLES_OK)
686651
{
687652
/*
@@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
696661
PQclear(res);
697662

698663
/* the result set should be followed by CommandComplete */
699-
res = libpqrcv_PQgetResult(conn->streamConn);
664+
res = libpqsrv_get_result(conn->streamConn,
665+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
700666
}
701667
else if (PQresultStatus(res) == PGRES_COPY_OUT)
702668
{
@@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
710676
pchomp(PQerrorMessage(conn->streamConn)))));
711677

712678
/* CommandComplete should follow */
713-
res = libpqrcv_PQgetResult(conn->streamConn);
679+
res = libpqsrv_get_result(conn->streamConn,
680+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
714681
}
715682

716683
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
721688
PQclear(res);
722689

723690
/* Verify that there are no more results */
724-
res = libpqrcv_PQgetResult(conn->streamConn);
691+
res = libpqsrv_get_result(conn->streamConn,
692+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
725693
if (res != NULL)
726694
ereport(ERROR,
727695
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
746714
* Request the primary to send over the history file for given timeline.
747715
*/
748716
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
749-
res = libpqrcv_PQexec(conn->streamConn, cmd);
717+
res = libpqsrv_exec(conn->streamConn,
718+
cmd,
719+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
750720
if (PQresultStatus(res) != PGRES_TUPLES_OK)
751721
{
752722
PQclear(res);
@@ -776,114 +746,13 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
776746
PQclear(res);
777747
}
778748

779-
/*
780-
* Send a query and wait for the results by using the asynchronous libpq
781-
* functions and socket readiness events.
782-
*
783-
* The function is modeled on libpqsrv_exec(), with the behavior difference
784-
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
785-
* skips try/catch, since all errors terminate the process.
786-
*
787-
* May return NULL, rather than an error result, on failure.
788-
*/
789-
static PGresult *
790-
libpqrcv_PQexec(PGconn *streamConn, const char *query)
791-
{
792-
PGresult *lastResult = NULL;
793-
794-
/*
795-
* PQexec() silently discards any prior query results on the connection.
796-
* This is not required for this function as it's expected that the caller
797-
* (which is this library in all cases) will behave correctly and we don't
798-
* have to be backwards compatible with old libpq.
799-
*/
800-
801-
/*
802-
* Submit the query. Since we don't use non-blocking mode, this could
803-
* theoretically block. In practice, since we don't send very long query
804-
* strings, the risk seems negligible.
805-
*/
806-
if (!PQsendQuery(streamConn, query))
807-
return NULL;
808-
809-
for (;;)
810-
{
811-
/* Wait for, and collect, the next PGresult. */
812-
PGresult *result;
813-
814-
result = libpqrcv_PQgetResult(streamConn);
815-
if (result == NULL)
816-
break; /* query is complete, or failure */
817-
818-
/*
819-
* Emulate PQexec()'s behavior of returning the last result when there
820-
* are many. We are fine with returning just last error message.
821-
*/
822-
PQclear(lastResult);
823-
lastResult = result;
824-
825-
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
826-
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
827-
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
828-
PQstatus(streamConn) == CONNECTION_BAD)
829-
break;
830-
}
831-
832-
return lastResult;
833-
}
834-
835-
/*
836-
* Perform the equivalent of PQgetResult(), but watch for interrupts.
837-
*/
838-
static PGresult *
839-
libpqrcv_PQgetResult(PGconn *streamConn)
840-
{
841-
/*
842-
* Collect data until PQgetResult is ready to get the result without
843-
* blocking.
844-
*/
845-
while (PQisBusy(streamConn))
846-
{
847-
int rc;
848-
849-
/*
850-
* We don't need to break down the sleep into smaller increments,
851-
* since we'll get interrupted by signals and can handle any
852-
* interrupts here.
853-
*/
854-
rc = WaitLatchOrSocket(MyLatch,
855-
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
856-
WL_LATCH_SET,
857-
PQsocket(streamConn),
858-
0,
859-
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
860-
861-
/* Interrupted? */
862-
if (rc & WL_LATCH_SET)
863-
{
864-
ResetLatch(MyLatch);
865-
ProcessWalRcvInterrupts();
866-
}
867-
868-
/* Consume whatever data is available from the socket */
869-
if (PQconsumeInput(streamConn) == 0)
870-
{
871-
/* trouble; return NULL */
872-
return NULL;
873-
}
874-
}
875-
876-
/* Now we can collect and return the next PGresult */
877-
return PQgetResult(streamConn);
878-
}
879-
880749
/*
881750
* Disconnect connection to primary, if any.
882751
*/
883752
static void
884753
libpqrcv_disconnect(WalReceiverConn *conn)
885754
{
886-
PQfinish(conn->streamConn);
755+
libpqsrv_disconnect(conn->streamConn);
887756
PQfreemem(conn->recvBuf);
888757
pfree(conn);
889758
}
@@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
937806
{
938807
PGresult *res;
939808

940-
res = libpqrcv_PQgetResult(conn->streamConn);
809+
res = libpqsrv_get_result(conn->streamConn,
810+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
941811
if (PQresultStatus(res) == PGRES_COMMAND_OK)
942812
{
943813
PQclear(res);
944814

945815
/* Verify that there are no more results. */
946-
res = libpqrcv_PQgetResult(conn->streamConn);
816+
res = libpqsrv_get_result(conn->streamConn,
817+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
947818
if (res != NULL)
948819
{
949820
PQclear(res);
@@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1094965
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1095966
}
1096967

1097-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
968+
res = libpqsrv_exec(conn->streamConn,
969+
cmd.data,
970+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1098971
pfree(cmd.data);
1099972

1100973
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
11471020

11481021
appendStringInfoString(&cmd, " );");
11491022

1150-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1023+
res = libpqsrv_exec(conn->streamConn, cmd.data,
1024+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
11511025
pfree(cmd.data);
11521026

11531027
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
12141088
{
12151089
char *cstrs[MaxTupleAttributeNumber];
12161090

1217-
ProcessWalRcvInterrupts();
1091+
CHECK_FOR_INTERRUPTS();
12181092

12191093
/* Do the allocations in temporary context. */
12201094
oldcontext = MemoryContextSwitchTo(rowcontext);
@@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
12601134
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
12611135
errmsg("the query interface requires a database connection")));
12621136

1263-
pgres = libpqrcv_PQexec(conn->streamConn, query);
1137+
pgres = libpqsrv_exec(conn->streamConn,
1138+
query,
1139+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
12641140

12651141
switch (PQresultStatus(pgres))
12661142
{

0 commit comments

Comments
 (0)