diff options
author | Noah Misch | 2024-01-08 19:39:56 +0000 |
---|---|---|
committer | Noah Misch | 2024-01-08 19:39:56 +0000 |
commit | d3c5f37dd543498cc7c678815d3921823beec9e9 (patch) | |
tree | 9c430f23ec95036ded0b8bb16446b30528159c9e /contrib/postgres_fdw | |
parent | 0efc8318477714600567d15812dc8d15841e269e (diff) |
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query). Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side. The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these. Use them in postgres_fdw,
replacing a local implementation from which the libpqsrv implementation
derives. This is a bug fix for dblink. Code inspection identified the
bug at least thirteen years ago, but user complaints have not appeared.
Hence, no back-patch for now.
Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 88 | ||||
-rw-r--r-- | contrib/postgres_fdw/deparse.c | 2 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 10 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.h | 2 |
4 files changed, 24 insertions, 78 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index fc69e189d5b..4931ebf5915 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { HASHCTL ctl; + if (pgfdw_we_get_result == 0) + pgfdw_we_get_result = + WaitEventExtensionNew("PostgresFdwGetResult"); + ctl.keysize = sizeof(ConnCacheKey); ctl.entrysize = sizeof(ConnCacheEntry); ConnectionHash = hash_create("postgres_fdw connections", 8, @@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) */ if (consume_input && !PQconsumeInput(conn)) pgfdw_report_error(ERROR, NULL, conn, false, sql); - res = pgfdw_get_result(conn, sql); + res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn) /* * Submit a query and wait for the result. * - * This function is interruptible by signals. + * Since we don't use non-blocking mode, this can't process interrupts while + * pushing the query text to the server. That risk is relatively small, so we + * ignore that for now. * * Caller is responsible for the error handling on the result. */ @@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) if (state && state->pendingAreq) process_pending_request(state->pendingAreq); - /* - * Submit a query. Since we don't use non-blocking mode, this also can - * block. But its risk is relatively small, so we ignore that for now. - */ if (!PQsendQuery(conn, query)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - - /* Wait for the result. */ - return pgfdw_get_result(conn, query); + return NULL; + return pgfdw_get_result(conn); } /* - * Wait for the result from a prior asynchronous execution function call. - * - * This function offers quick responsiveness by checking for any interruptions. - * - * This function emulates PQexec()'s behavior of returning the last result - * when there are many. + * Wrap libpqsrv_get_result_last(), adding wait event. * * Caller is responsible for the error handling on the result. */ PGresult * -pgfdw_get_result(PGconn *conn, const char *query) +pgfdw_get_result(PGconn *conn) { - PGresult *volatile last_res = NULL; - - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); - { - for (;;) - { - PGresult *res; - - while (PQisBusy(conn)) - { - int wc; - - /* first time, allocate or get the custom wait event */ - if (pgfdw_we_get_result == 0) - pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult"); - - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, pgfdw_we_get_result); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - } - } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ - - PQclear(last_res); - last_res = res; - } - } - PG_CATCH(); - { - PQclear(last_res); - PG_RE_THROW(); - } - PG_END_TRY(); - - return last_res; + return libpqsrv_get_result_last(conn, pgfdw_we_get_result); } /* @@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, /* * If we don't get a message from the PGresult, try the PGconn. This - * is needed because for connection-level failures, PQexec may just - * return NULL, not a PGresult at all. + * is needed because for connection-level failures, PQgetResult may + * just return NULL, not a PGresult at all. */ if (message_primary == NULL) message_primary = pchomp(PQerrorMessage(conn)); @@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL", + NULL); PQclear(res); } entry->have_prep_stmt = false; diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 8fc0b2c9c4b..8fc66fa11c7 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -3815,7 +3815,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first, * Print the representation of a parameter to be sent to the remote side. * * Note: we always label the Param's type explicitly rather than relying on - * transmitting a numeric type OID in PQexecParams(). This allows us to + * transmitting a numeric type OID in PQsendQueryParams(). This allows us to * avoid assuming that types have the same OIDs on the remote side as they * do locally --- they need only have the same names. */ diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index dc5c585890f..142dcfc9957 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -3760,7 +3760,7 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(conn, buf.data); + res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -3810,7 +3810,7 @@ fetch_more_data(ForeignScanState *node) * The query was already sent by an earlier call to * fetch_more_data_begin. So now we just fetch the result. */ - res = pgfdw_get_result(conn, fsstate->query); + res = pgfdw_get_result(conn); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); @@ -4159,7 +4159,7 @@ execute_foreign_modify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -4229,7 +4229,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); @@ -4571,7 +4571,7 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->conn); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 32d9356bdb2..37c1575af6c 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,7 +162,7 @@ extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern void do_sql_command(PGconn *conn, const char *sql); -extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); +extern PGresult *pgfdw_get_result(PGconn *conn); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, |