@@ -105,6 +105,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
105
105
bool ignore_errors );
106
106
static bool pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
107
107
PGresult * * result );
108
+ static void pgfdw_abort_cleanup (ConnCacheEntry * entry , const char * sql ,
109
+ bool toplevel );
108
110
static bool UserMappingPasswordRequired (UserMapping * user );
109
111
static bool disconnect_cached_connections (Oid serverid );
110
112
@@ -872,8 +874,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
872
874
/* If it has an open remote transaction, try to close it */
873
875
if (entry -> xact_depth > 0 )
874
876
{
875
- bool abort_cleanup_failure = false;
876
-
877
877
elog (DEBUG3 , "closing remote transaction on connection %p" ,
878
878
entry -> conn );
879
879
@@ -940,67 +940,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
940
940
case XACT_EVENT_PARALLEL_ABORT :
941
941
case XACT_EVENT_ABORT :
942
942
943
- /*
944
- * Don't try to clean up the connection if we're already
945
- * in error recursion trouble.
946
- */
947
- if (in_error_recursion_trouble ())
948
- entry -> changing_xact_state = true;
949
-
950
- /*
951
- * If connection is already unsalvageable, don't touch it
952
- * further.
953
- */
954
- if (entry -> changing_xact_state )
955
- break ;
956
-
957
- /*
958
- * Mark this connection as in the process of changing
959
- * transaction state.
960
- */
961
- entry -> changing_xact_state = true;
962
-
963
- /* Assume we might have lost track of prepared statements */
964
- entry -> have_error = true;
965
-
966
- /*
967
- * If a command has been submitted to the remote server by
968
- * using an asynchronous execution function, the command
969
- * might not have yet completed. Check to see if a
970
- * command is still being processed by the remote server,
971
- * and if so, request cancellation of the command.
972
- */
973
- if (PQtransactionStatus (entry -> conn ) == PQTRANS_ACTIVE &&
974
- !pgfdw_cancel_query (entry -> conn ))
975
- {
976
- /* Unable to cancel running query. */
977
- abort_cleanup_failure = true;
978
- }
979
- else if (!pgfdw_exec_cleanup_query (entry -> conn ,
980
- "ABORT TRANSACTION" ,
981
- false))
982
- {
983
- /* Unable to abort remote transaction. */
984
- abort_cleanup_failure = true;
985
- }
986
- else if (entry -> have_prep_stmt && entry -> have_error &&
987
- !pgfdw_exec_cleanup_query (entry -> conn ,
988
- "DEALLOCATE ALL" ,
989
- true))
990
- {
991
- /* Trouble clearing prepared statements. */
992
- abort_cleanup_failure = true;
993
- }
994
- else
995
- {
996
- entry -> have_prep_stmt = false;
997
- entry -> have_error = false;
998
- /* Also reset per-connection state */
999
- memset (& entry -> state , 0 , sizeof (entry -> state ));
1000
- }
1001
-
1002
- /* Disarm changing_xact_state if it all worked. */
1003
- entry -> changing_xact_state = abort_cleanup_failure ;
943
+ pgfdw_abort_cleanup (entry , "ABORT TRANSACTION" , true);
1004
944
break ;
1005
945
}
1006
946
}
@@ -1091,46 +1031,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1091
1031
do_sql_command (entry -> conn , sql );
1092
1032
entry -> changing_xact_state = false;
1093
1033
}
1094
- else if (in_error_recursion_trouble ())
1095
- {
1096
- /*
1097
- * Don't try to clean up the connection if we're already in error
1098
- * recursion trouble.
1099
- */
1100
- entry -> changing_xact_state = true;
1101
- }
1102
- else if (!entry -> changing_xact_state )
1034
+ else
1103
1035
{
1104
- bool abort_cleanup_failure = false;
1105
-
1106
- /* Remember that abort cleanup is in progress. */
1107
- entry -> changing_xact_state = true;
1108
-
1109
- /* Assume we might have lost track of prepared statements */
1110
- entry -> have_error = true;
1111
-
1112
- /*
1113
- * If a command has been submitted to the remote server by using
1114
- * an asynchronous execution function, the command might not have
1115
- * yet completed. Check to see if a command is still being
1116
- * processed by the remote server, and if so, request cancellation
1117
- * of the command.
1118
- */
1119
- if (PQtransactionStatus (entry -> conn ) == PQTRANS_ACTIVE &&
1120
- !pgfdw_cancel_query (entry -> conn ))
1121
- abort_cleanup_failure = true;
1122
- else
1123
- {
1124
- /* Rollback all remote subtransactions during abort */
1125
- snprintf (sql , sizeof (sql ),
1126
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d" ,
1127
- curlevel , curlevel );
1128
- if (!pgfdw_exec_cleanup_query (entry -> conn , sql , false))
1129
- abort_cleanup_failure = true;
1130
- }
1131
-
1132
- /* Disarm changing_xact_state if it all worked. */
1133
- entry -> changing_xact_state = abort_cleanup_failure ;
1036
+ /* Rollback all remote subtransactions during abort */
1037
+ snprintf (sql , sizeof (sql ),
1038
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d" ,
1039
+ curlevel , curlevel );
1040
+ pgfdw_abort_cleanup (entry , sql , false);
1134
1041
}
1135
1042
1136
1043
/* OK, we're outta that level of subtransaction */
@@ -1409,6 +1316,72 @@ exit: ;
1409
1316
return timed_out ;
1410
1317
}
1411
1318
1319
+ /*
1320
+ * Abort remote transaction.
1321
+ *
1322
+ * The statement specified in "sql" is sent to the remote server,
1323
+ * in order to rollback the remote transaction.
1324
+ *
1325
+ * "toplevel" should be set to true if toplevel (main) transaction is
1326
+ * rollbacked, false otherwise.
1327
+ *
1328
+ * Set entry->changing_xact_state to false on success, true on failure.
1329
+ */
1330
+ static void
1331
+ pgfdw_abort_cleanup (ConnCacheEntry * entry , const char * sql , bool toplevel )
1332
+ {
1333
+ /*
1334
+ * Don't try to clean up the connection if we're already in error
1335
+ * recursion trouble.
1336
+ */
1337
+ if (in_error_recursion_trouble ())
1338
+ entry -> changing_xact_state = true;
1339
+
1340
+ /*
1341
+ * If connection is already unsalvageable, don't touch it further.
1342
+ */
1343
+ if (entry -> changing_xact_state )
1344
+ return ;
1345
+
1346
+ /*
1347
+ * Mark this connection as in the process of changing transaction state.
1348
+ */
1349
+ entry -> changing_xact_state = true;
1350
+
1351
+ /* Assume we might have lost track of prepared statements */
1352
+ entry -> have_error = true;
1353
+
1354
+ /*
1355
+ * If a command has been submitted to the remote server by using an
1356
+ * asynchronous execution function, the command might not have yet
1357
+ * completed. Check to see if a command is still being processed by the
1358
+ * remote server, and if so, request cancellation of the command.
1359
+ */
1360
+ if (PQtransactionStatus (entry -> conn ) == PQTRANS_ACTIVE &&
1361
+ !pgfdw_cancel_query (entry -> conn ))
1362
+ return ; /* Unable to cancel running query */
1363
+
1364
+ if (!pgfdw_exec_cleanup_query (entry -> conn , sql , false))
1365
+ return ; /* Unable to abort remote transaction */
1366
+
1367
+ if (toplevel )
1368
+ {
1369
+ if (entry -> have_prep_stmt && entry -> have_error &&
1370
+ !pgfdw_exec_cleanup_query (entry -> conn ,
1371
+ "DEALLOCATE ALL" ,
1372
+ true))
1373
+ return ; /* Trouble clearing prepared statements */
1374
+
1375
+ entry -> have_prep_stmt = false;
1376
+ entry -> have_error = false;
1377
+ /* Also reset per-connection state */
1378
+ memset (& entry -> state , 0 , sizeof (entry -> state ));
1379
+ }
1380
+
1381
+ /* Disarm changing_xact_state if it all worked */
1382
+ entry -> changing_xact_state = false;
1383
+ }
1384
+
1412
1385
/*
1413
1386
* List active foreign server connections.
1414
1387
*
0 commit comments