@@ -95,6 +95,13 @@ static uint32 pgfdw_we_get_result = 0;
95
95
*/
96
96
#define CONNECTION_CLEANUP_TIMEOUT 30000
97
97
98
+ /*
99
+ * Milliseconds to wait before issuing another cancel request. This covers
100
+ * the race condition where the remote session ignored our cancel request
101
+ * because it arrived while idle.
102
+ */
103
+ #define RETRY_CANCEL_TIMEOUT 1000
104
+
98
105
/* Macro for constructing abort command to be sent */
99
106
#define CONSTRUCT_ABORT_COMMAND (sql , entry , toplevel ) \
100
107
do { \
@@ -135,6 +142,7 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135
142
static bool pgfdw_cancel_query (PGconn * conn );
136
143
static bool pgfdw_cancel_query_begin (PGconn * conn , TimestampTz endtime );
137
144
static bool pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime ,
145
+ TimestampTz retrycanceltime ,
138
146
bool consume_input );
139
147
static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
140
148
bool ignore_errors );
@@ -144,6 +152,7 @@ static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
144
152
bool consume_input ,
145
153
bool ignore_errors );
146
154
static bool pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
155
+ TimestampTz retrycanceltime ,
147
156
PGresult * * result , bool * timed_out );
148
157
static void pgfdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel );
149
158
static bool pgfdw_abort_cleanup_begin (ConnCacheEntry * entry , bool toplevel ,
@@ -1308,18 +1317,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1308
1317
static bool
1309
1318
pgfdw_cancel_query (PGconn * conn )
1310
1319
{
1320
+ TimestampTz now = GetCurrentTimestamp ();
1311
1321
TimestampTz endtime ;
1322
+ TimestampTz retrycanceltime ;
1312
1323
1313
1324
/*
1314
1325
* If it takes too long to cancel the query and discard the result, assume
1315
1326
* the connection is dead.
1316
1327
*/
1317
- endtime = TimestampTzPlusMilliseconds (GetCurrentTimestamp (),
1318
- CONNECTION_CLEANUP_TIMEOUT );
1328
+ endtime = TimestampTzPlusMilliseconds (now , CONNECTION_CLEANUP_TIMEOUT );
1329
+
1330
+ /*
1331
+ * Also, lose patience and re-issue the cancel request after a little bit.
1332
+ * (This serves to close some race conditions.)
1333
+ */
1334
+ retrycanceltime = TimestampTzPlusMilliseconds (now , RETRY_CANCEL_TIMEOUT );
1319
1335
1320
1336
if (!pgfdw_cancel_query_begin (conn , endtime ))
1321
1337
return false;
1322
- return pgfdw_cancel_query_end (conn , endtime , false);
1338
+ return pgfdw_cancel_query_end (conn , endtime , retrycanceltime , false);
1323
1339
}
1324
1340
1325
1341
/*
@@ -1345,9 +1361,10 @@ pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1345
1361
}
1346
1362
1347
1363
static bool
1348
- pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime , bool consume_input )
1364
+ pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime ,
1365
+ TimestampTz retrycanceltime , bool consume_input )
1349
1366
{
1350
- PGresult * result = NULL ;
1367
+ PGresult * result ;
1351
1368
bool timed_out ;
1352
1369
1353
1370
/*
@@ -1366,7 +1383,8 @@ pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1366
1383
}
1367
1384
1368
1385
/* Get and discard the result of the query. */
1369
- if (pgfdw_get_cleanup_result (conn , endtime , & result , & timed_out ))
1386
+ if (pgfdw_get_cleanup_result (conn , endtime , retrycanceltime ,
1387
+ & result , & timed_out ))
1370
1388
{
1371
1389
if (timed_out )
1372
1390
ereport (WARNING ,
@@ -1439,7 +1457,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1439
1457
TimestampTz endtime , bool consume_input ,
1440
1458
bool ignore_errors )
1441
1459
{
1442
- PGresult * result = NULL ;
1460
+ PGresult * result ;
1443
1461
bool timed_out ;
1444
1462
1445
1463
Assert (query != NULL );
@@ -1457,7 +1475,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1457
1475
}
1458
1476
1459
1477
/* Get the result of the query. */
1460
- if (pgfdw_get_cleanup_result (conn , endtime , & result , & timed_out ))
1478
+ if (pgfdw_get_cleanup_result (conn , endtime , endtime , & result , & timed_out ))
1461
1479
{
1462
1480
if (timed_out )
1463
1481
ereport (WARNING ,
@@ -1481,28 +1499,36 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1481
1499
}
1482
1500
1483
1501
/*
1484
- * Get, during abort cleanup, the result of a query that is in progress. This
1485
- * might be a query that is being interrupted by transaction abort, or it might
1486
- * be a query that was initiated as part of transaction abort to get the remote
1487
- * side back to the appropriate state.
1502
+ * Get, during abort cleanup, the result of a query that is in progress.
1503
+ * This might be a query that is being interrupted by a cancel request or by
1504
+ * transaction abort, or it might be a query that was initiated as part of
1505
+ * transaction abort to get the remote side back to the appropriate state.
1506
+ *
1507
+ * endtime is the time at which we should give up and assume the remote side
1508
+ * is dead. retrycanceltime is the time at which we should issue a fresh
1509
+ * cancel request (pass the same value as endtime if this is not wanted).
1488
1510
*
1489
- * endtime is the time at which we should give up and assume the remote
1490
- * side is dead. Returns true if the timeout expired or connection trouble
1491
- * occurred, false otherwise. Sets *result except in case of a timeout.
1492
- * Sets timed_out to true only when the timeout expired.
1511
+ * Returns true if the timeout expired or connection trouble occurred,
1512
+ * false otherwise. Sets *result except in case of a true result.
1513
+ * Sets *timed_out to true only when the timeout expired.
1493
1514
*/
1494
1515
static bool
1495
- pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime , PGresult * * result ,
1516
+ pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
1517
+ TimestampTz retrycanceltime ,
1518
+ PGresult * * result ,
1496
1519
bool * timed_out )
1497
1520
{
1498
1521
volatile bool failed = false;
1499
1522
PGresult * volatile last_res = NULL ;
1500
1523
1524
+ * result = NULL ;
1501
1525
* timed_out = false;
1502
1526
1503
1527
/* In what follows, do not leak any PGresults on an error. */
1504
1528
PG_TRY ();
1505
1529
{
1530
+ int canceldelta = RETRY_CANCEL_TIMEOUT * 2 ;
1531
+
1506
1532
for (;;)
1507
1533
{
1508
1534
PGresult * res ;
@@ -1513,8 +1539,33 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1513
1539
TimestampTz now = GetCurrentTimestamp ();
1514
1540
long cur_timeout ;
1515
1541
1542
+ /* If timeout has expired, give up. */
1543
+ if (now >= endtime )
1544
+ {
1545
+ * timed_out = true;
1546
+ failed = true;
1547
+ goto exit ;
1548
+ }
1549
+
1550
+ /* If we need to re-issue the cancel request, do that. */
1551
+ if (now >= retrycanceltime )
1552
+ {
1553
+ /* We ignore failure to issue the repeated request. */
1554
+ (void ) libpqsrv_cancel (conn , endtime );
1555
+
1556
+ /* Recompute "now" in case that took measurable time. */
1557
+ now = GetCurrentTimestamp ();
1558
+
1559
+ /* Adjust re-cancel timeout in increasing steps. */
1560
+ retrycanceltime = TimestampTzPlusMilliseconds (now ,
1561
+ canceldelta );
1562
+ canceldelta += canceldelta ;
1563
+ }
1564
+
1516
1565
/* If timeout has expired, give up, else get sleep time. */
1517
- cur_timeout = TimestampDifferenceMilliseconds (now , endtime );
1566
+ cur_timeout = TimestampDifferenceMilliseconds (now ,
1567
+ Min (endtime ,
1568
+ retrycanceltime ));
1518
1569
if (cur_timeout <= 0 )
1519
1570
{
1520
1571
* timed_out = true;
@@ -1835,7 +1886,9 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1835
1886
foreach (lc , cancel_requested )
1836
1887
{
1837
1888
ConnCacheEntry * entry = (ConnCacheEntry * ) lfirst (lc );
1889
+ TimestampTz now = GetCurrentTimestamp ();
1838
1890
TimestampTz endtime ;
1891
+ TimestampTz retrycanceltime ;
1839
1892
char sql [100 ];
1840
1893
1841
1894
Assert (entry -> changing_xact_state );
@@ -1849,10 +1902,13 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1849
1902
* remaining entries in the list, leading to slamming that entry's
1850
1903
* connection shut.
1851
1904
*/
1852
- endtime = TimestampTzPlusMilliseconds (GetCurrentTimestamp () ,
1905
+ endtime = TimestampTzPlusMilliseconds (now ,
1853
1906
CONNECTION_CLEANUP_TIMEOUT );
1907
+ retrycanceltime = TimestampTzPlusMilliseconds (now ,
1908
+ RETRY_CANCEL_TIMEOUT );
1854
1909
1855
- if (!pgfdw_cancel_query_end (entry -> conn , endtime , true))
1910
+ if (!pgfdw_cancel_query_end (entry -> conn , endtime ,
1911
+ retrycanceltime , true))
1856
1912
{
1857
1913
/* Unable to cancel running query */
1858
1914
pgfdw_reset_xact_state (entry , toplevel );
0 commit comments