Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 708d165

Browse files
committed
postgres_fdw: Add function to list cached connections to foreign servers.
This commit adds function postgres_fdw_get_connections() to return the foreign server names of all the open connections that postgres_fdw established from the local session to the foreign servers. This function also returns whether each connection is valid or not. This function is useful when checking all the open foreign server connections. If we found some connection to drop, from the result of function, probably we can explicitly close them by the function that upcoming commit will add. This commit bumps the version of postgres_fdw to 1.1 since it adds new function. Author: Bharath Rupireddy, tweaked by Fujii Masao Reviewed-by: Zhijie Hou, Alexey Kondratov, Zhihong Yu, Fujii Masao Discussion: https://postgr.es/m/2d5cb0b3-a6e8-9bbb-953f-879f47128faa@oss.nttdata.com
1 parent a3dc926 commit 708d165

File tree

7 files changed

+266
-3
lines changed

7 files changed

+266
-3
lines changed

contrib/postgres_fdw/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
1414
SHLIB_LINK_INTERNAL = $(libpq)
1515

1616
EXTENSION = postgres_fdw
17-
DATA = postgres_fdw--1.0.sql
17+
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
1818

1919
REGRESS = postgres_fdw
2020

contrib/postgres_fdw/connection.c

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
#include "access/xact.h"
1717
#include "catalog/pg_user_mapping.h"
1818
#include "commands/defrem.h"
19+
#include "funcapi.h"
1920
#include "mb/pg_wchar.h"
2021
#include "miscadmin.h"
2122
#include "pgstat.h"
2223
#include "postgres_fdw.h"
2324
#include "storage/fd.h"
2425
#include "storage/latch.h"
26+
#include "utils/builtins.h"
2527
#include "utils/datetime.h"
2628
#include "utils/hsearch.h"
2729
#include "utils/inval.h"
@@ -74,6 +76,11 @@ static unsigned int prep_stmt_number = 0;
7476
/* tracks whether any work is needed in callback functions */
7577
static bool xact_got_connection = false;
7678

79+
/*
80+
* SQL functions
81+
*/
82+
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
83+
7784
/* prototypes of private functions */
7885
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
7986
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -1335,3 +1342,131 @@ exit: ;
13351342
*result = last_res;
13361343
return timed_out;
13371344
}
1345+
1346+
/*
1347+
* List active foreign server connections.
1348+
*
1349+
* This function takes no input parameter and returns setof record made of
1350+
* following values:
1351+
* - server_name - server name of active connection. In case the foreign server
1352+
* is dropped but still the connection is active, then the server name will
1353+
* be NULL in output.
1354+
* - valid - true/false representing whether the connection is valid or not.
1355+
* Note that the connections can get invalidated in pgfdw_inval_callback.
1356+
*
1357+
* No records are returned when there are no cached connections at all.
1358+
*/
1359+
Datum
1360+
postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1361+
{
1362+
#define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1363+
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1364+
TupleDesc tupdesc;
1365+
Tuplestorestate *tupstore;
1366+
MemoryContext per_query_ctx;
1367+
MemoryContext oldcontext;
1368+
HASH_SEQ_STATUS scan;
1369+
ConnCacheEntry *entry;
1370+
1371+
/* check to see if caller supports us returning a tuplestore */
1372+
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1373+
ereport(ERROR,
1374+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1375+
errmsg("set-valued function called in context that cannot accept a set")));
1376+
if (!(rsinfo->allowedModes & SFRM_Materialize))
1377+
ereport(ERROR,
1378+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1379+
errmsg("materialize mode required, but it is not allowed in this context")));
1380+
1381+
/* Build a tuple descriptor for our result type */
1382+
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1383+
elog(ERROR, "return type must be a row type");
1384+
1385+
/* Build tuplestore to hold the result rows */
1386+
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1387+
oldcontext = MemoryContextSwitchTo(per_query_ctx);
1388+
1389+
tupstore = tuplestore_begin_heap(true, false, work_mem);
1390+
rsinfo->returnMode = SFRM_Materialize;
1391+
rsinfo->setResult = tupstore;
1392+
rsinfo->setDesc = tupdesc;
1393+
1394+
MemoryContextSwitchTo(oldcontext);
1395+
1396+
/* If cache doesn't exist, we return no records */
1397+
if (!ConnectionHash)
1398+
{
1399+
/* clean up and return the tuplestore */
1400+
tuplestore_donestoring(tupstore);
1401+
1402+
PG_RETURN_VOID();
1403+
}
1404+
1405+
hash_seq_init(&scan, ConnectionHash);
1406+
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1407+
{
1408+
ForeignServer *server;
1409+
Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1410+
bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS];
1411+
1412+
/* We only look for open remote connections */
1413+
if (!entry->conn)
1414+
continue;
1415+
1416+
server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1417+
1418+
MemSet(values, 0, sizeof(values));
1419+
MemSet(nulls, 0, sizeof(nulls));
1420+
1421+
/*
1422+
* The foreign server may have been dropped in current explicit
1423+
* transaction. It is not possible to drop the server from another
1424+
* session when the connection associated with it is in use in the
1425+
* current transaction, if tried so, the drop query in another session
1426+
* blocks until the current transaction finishes.
1427+
*
1428+
* Even though the server is dropped in the current transaction, the
1429+
* cache can still have associated active connection entry, say we
1430+
* call such connections dangling. Since we can not fetch the server
1431+
* name from system catalogs for dangling connections, instead we
1432+
* show NULL value for server name in output.
1433+
*
1434+
* We could have done better by storing the server name in the cache
1435+
* entry instead of server oid so that it could be used in the output.
1436+
* But the server name in each cache entry requires 64 bytes of
1437+
* memory, which is huge, when there are many cached connections and
1438+
* the use case i.e. dropping the foreign server within the explicit
1439+
* current transaction seems rare. So, we chose to show NULL value for
1440+
* server name in output.
1441+
*
1442+
* Such dangling connections get closed either in next use or at the
1443+
* end of current explicit transaction in pgfdw_xact_callback.
1444+
*/
1445+
if (!server)
1446+
{
1447+
/*
1448+
* If the server has been dropped in the current explicit
1449+
* transaction, then this entry would have been invalidated in
1450+
* pgfdw_inval_callback at the end of drop sever command. Note
1451+
* that this connection would not have been closed in
1452+
* pgfdw_inval_callback because it is still being used in the
1453+
* current explicit transaction. So, assert that here.
1454+
*/
1455+
Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1456+
1457+
/* Show null, if no server name was found */
1458+
nulls[0] = true;
1459+
}
1460+
else
1461+
values[0] = CStringGetTextDatum(server->servername);
1462+
1463+
values[1] = BoolGetDatum(!entry->invalidated);
1464+
1465+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1466+
}
1467+
1468+
/* clean up and return the tuplestore */
1469+
tuplestore_donestoring(tupstore);
1470+
1471+
PG_RETURN_VOID();
1472+
}

contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@ DO $d$
1313
OPTIONS (dbname '$$||current_database()||$$',
1414
port '$$||current_setting('port')||$$'
1515
)$$;
16+
EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
17+
OPTIONS (dbname '$$||current_database()||$$',
18+
port '$$||current_setting('port')||$$'
19+
)$$;
20+
1621
END;
1722
$d$;
1823
CREATE USER MAPPING FOR public SERVER testserver1
1924
OPTIONS (user 'value', password 'value');
2025
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
2126
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
27+
CREATE USER MAPPING FOR public SERVER loopback3;
2228
-- ===================================================================
2329
-- create objects used through FDW loopback server
2430
-- ===================================================================
@@ -129,6 +135,11 @@ CREATE FOREIGN TABLE ft6 (
129135
c2 int NOT NULL,
130136
c3 text
131137
) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
138+
CREATE FOREIGN TABLE ft7 (
139+
c1 int NOT NULL,
140+
c2 int NOT NULL,
141+
c3 text
142+
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
132143
-- ===================================================================
133144
-- tests for validator
134145
-- ===================================================================
@@ -199,7 +210,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
199210
public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') |
200211
public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') |
201212
public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') |
202-
(5 rows)
213+
public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') |
214+
(6 rows)
203215

204216
-- Test that alteration of server options causes reconnection
205217
-- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9040,16 +9052,63 @@ DROP PROCEDURE terminate_backend_and_wait(text);
90409052
-- ===================================================================
90419053
-- This test case is for closing the connection in pgfdw_xact_callback
90429054
BEGIN;
9055+
-- List all the existing cached connections. Only loopback2 should be output.
9056+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
9057+
server_name | valid
9058+
-------------+-------
9059+
loopback2 | t
9060+
(1 row)
9061+
90439062
-- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
90449063
SELECT 1 FROM ft1 LIMIT 1;
90459064
?column?
90469065
----------
90479066
1
90489067
(1 row)
90499068

9069+
SELECT 1 FROM ft7 LIMIT 1;
9070+
?column?
9071+
----------
9072+
1
9073+
(1 row)
9074+
9075+
-- List all the existing cached connections. loopback and loopback3
9076+
-- also should be output as valid connections.
9077+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
9078+
server_name | valid
9079+
-------------+-------
9080+
loopback | t
9081+
loopback2 | t
9082+
loopback3 | t
9083+
(3 rows)
9084+
90509085
-- Connection is not closed at the end of the alter statement in
90519086
-- pgfdw_inval_callback. That's because the connection is in midst of this
90529087
-- xact, it is just marked as invalid.
90539088
ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
9089+
DROP SERVER loopback3 CASCADE;
9090+
NOTICE: drop cascades to 2 other objects
9091+
DETAIL: drop cascades to user mapping for public on server loopback3
9092+
drop cascades to foreign table ft7
9093+
-- List all the existing cached connections. loopback and loopback3
9094+
-- should be output as invalid connections. Also the server name for
9095+
-- loopback3 should be NULL because the server was dropped.
9096+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
9097+
server_name | valid
9098+
-------------+-------
9099+
loopback | f
9100+
loopback2 | t
9101+
| f
9102+
(3 rows)
9103+
90549104
-- The invalid connection gets closed in pgfdw_xact_callback during commit.
90559105
COMMIT;
9106+
-- List all the existing cached connections. loopback and loopback3
9107+
-- should not be output because they should be closed at the end of
9108+
-- the above transaction.
9109+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
9110+
server_name | valid
9111+
-------------+-------
9112+
loopback2 | t
9113+
(1 row)
9114+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/* contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql */
2+
3+
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
4+
\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.1'" to load this file. \quit
5+
6+
CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
7+
OUT valid boolean)
8+
RETURNS SETOF record
9+
AS 'MODULE_PATHNAME'
10+
LANGUAGE C STRICT PARALLEL RESTRICTED;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# postgres_fdw extension
22
comment = 'foreign-data wrapper for remote PostgreSQL servers'
3-
default_version = '1.0'
3+
default_version = '1.1'
44
module_pathname = '$libdir/postgres_fdw'
55
relocatable = true

contrib/postgres_fdw/sql/postgres_fdw.sql

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@ DO $d$
1515
OPTIONS (dbname '$$||current_database()||$$',
1616
port '$$||current_setting('port')||$$'
1717
)$$;
18+
EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
19+
OPTIONS (dbname '$$||current_database()||$$',
20+
port '$$||current_setting('port')||$$'
21+
)$$;
22+
1823
END;
1924
$d$;
2025

2126
CREATE USER MAPPING FOR public SERVER testserver1
2227
OPTIONS (user 'value', password 'value');
2328
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
2429
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
30+
CREATE USER MAPPING FOR public SERVER loopback3;
2531

2632
-- ===================================================================
2733
-- create objects used through FDW loopback server
@@ -142,6 +148,12 @@ CREATE FOREIGN TABLE ft6 (
142148
c3 text
143149
) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
144150

151+
CREATE FOREIGN TABLE ft7 (
152+
c1 int NOT NULL,
153+
c2 int NOT NULL,
154+
c3 text
155+
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
156+
145157
-- ===================================================================
146158
-- tests for validator
147159
-- ===================================================================
@@ -2703,11 +2715,26 @@ DROP PROCEDURE terminate_backend_and_wait(text);
27032715
-- ===================================================================
27042716
-- This test case is for closing the connection in pgfdw_xact_callback
27052717
BEGIN;
2718+
-- List all the existing cached connections. Only loopback2 should be output.
2719+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
27062720
-- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
27072721
SELECT 1 FROM ft1 LIMIT 1;
2722+
SELECT 1 FROM ft7 LIMIT 1;
2723+
-- List all the existing cached connections. loopback and loopback3
2724+
-- also should be output as valid connections.
2725+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
27082726
-- Connection is not closed at the end of the alter statement in
27092727
-- pgfdw_inval_callback. That's because the connection is in midst of this
27102728
-- xact, it is just marked as invalid.
27112729
ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
2730+
DROP SERVER loopback3 CASCADE;
2731+
-- List all the existing cached connections. loopback and loopback3
2732+
-- should be output as invalid connections. Also the server name for
2733+
-- loopback3 should be NULL because the server was dropped.
2734+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
27122735
-- The invalid connection gets closed in pgfdw_xact_callback during commit.
27132736
COMMIT;
2737+
-- List all the existing cached connections. loopback and loopback3
2738+
-- should not be output because they should be closed at the end of
2739+
-- the above transaction.
2740+
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;

doc/src/sgml/postgres-fdw.sgml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,38 @@ OPTIONS (ADD password_required 'false');
479479
</sect3>
480480
</sect2>
481481

482+
<sect2>
483+
<title>Functions</title>
484+
485+
<variablelist>
486+
<varlistentry>
487+
<term><function>postgres_fdw_get_connections(OUT server_name text, OUT valid boolean) returns setof record</function></term>
488+
<listitem>
489+
<para>
490+
This function returns the foreign server names of all the open
491+
connections that <filename>postgres_fdw</filename> established from
492+
the local session to the foreign servers. It also returns whether
493+
each connection is valid or not. <literal>false</literal> is returned
494+
if the foreign server connection is used in the current local
495+
transaction but its foreign server or user mapping is changed or
496+
dropped, and then such invalid connection will be closed at
497+
the end of that transaction. <literal>true</literal> is returned
498+
otherwise. If there are no open connections, no record is returned.
499+
Example usage of the function:
500+
<screen>
501+
postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
502+
server_name | valid
503+
-------------+-------
504+
loopback1 | t
505+
loopback2 | f
506+
</screen>
507+
</para>
508+
</listitem>
509+
</varlistentry>
510+
</variablelist>
511+
512+
</sect2>
513+
482514
<sect2>
483515
<title>Connection Management</title>
484516

0 commit comments

Comments
 (0)