Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 073066b

Browse files
committed
fix interrupts handlers for parallel connections
1 parent 1ff4dde commit 073066b

File tree

4 files changed

+55
-23
lines changed

4 files changed

+55
-23
lines changed

src/backup.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2708,7 +2708,10 @@ pg_ptrack_get_block(backup_files_args *arguments,
27082708
sprintf(params[3], "%u", blknum);
27092709

27102710
if (arguments->thread_backup_conn == NULL)
2711+
{
27112712
arguments->thread_backup_conn = pgut_connect(pgut_dbname);
2713+
}
2714+
arguments->thread_cancel_conn = PQgetCancel(arguments->thread_backup_conn);
27122715

27132716
//elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum);
27142717
res = pgut_execute_parallel(arguments->thread_backup_conn,

src/pg_probackup.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ typedef struct
250250
parray *prev_backup_filelist;
251251
XLogRecPtr prev_backup_start_lsn;
252252
PGconn *thread_backup_conn;
253-
PGconn *thread_cancel_conn;
253+
PGcancel *thread_cancel_conn;
254254
} backup_files_args;
255255

256256
/*

src/utils/pgut.c

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ static bool parse_pair(const char buffer[], char key[], char value[]);
5454

5555
/* Connection routines */
5656
static void init_cancel_handler(void);
57-
static void on_before_exec(PGconn *conn);
58-
static void on_after_exec(void);
57+
static void on_before_exec(PGconn *conn, PGcancel *thread_cancel_conn);
58+
static void on_after_exec(PGcancel *thread_cancel_conn);
5959
static void on_interrupt(void);
6060
static void on_cleanup(void);
6161
static void exit_or_abort(int exitcode);
@@ -1512,7 +1512,7 @@ pgut_set_port(const char *new_port)
15121512

15131513
PGresult *
15141514
pgut_execute_parallel(PGconn* conn,
1515-
PGconn* cancel_conn, const char *query,
1515+
PGcancel* thread_cancel_conn, const char *query,
15161516
int nParams, const char **params,
15171517
bool text_result)
15181518
{
@@ -1540,7 +1540,7 @@ pgut_execute_parallel(PGconn* conn,
15401540
return NULL;
15411541
}
15421542

1543-
//on_before_exec(conn);
1543+
on_before_exec(conn, thread_cancel_conn);
15441544
if (nParams == 0)
15451545
res = PQexec(conn, query);
15461546
else
@@ -1550,7 +1550,7 @@ pgut_execute_parallel(PGconn* conn,
15501550
* or one to obtain results in binary format.
15511551
*/
15521552
(text_result) ? 0 : 1);
1553-
//on_after_exec();
1553+
on_after_exec(thread_cancel_conn);
15541554

15551555
switch (PQresultStatus(res))
15561556
{
@@ -1594,7 +1594,7 @@ pgut_execute(PGconn* conn, const char *query, int nParams, const char **params,
15941594
return NULL;
15951595
}
15961596

1597-
on_before_exec(conn);
1597+
on_before_exec(conn, NULL);
15981598
if (nParams == 0)
15991599
res = PQexec(conn, query);
16001600
else
@@ -1604,7 +1604,7 @@ pgut_execute(PGconn* conn, const char *query, int nParams, const char **params,
16041604
* or one to obtain results in binary format.
16051605
*/
16061606
(text_result) ? 0 : 1);
1607-
on_after_exec();
1607+
on_after_exec(NULL);
16081608

16091609
switch (PQresultStatus(res))
16101610
{
@@ -1745,7 +1745,7 @@ static CRITICAL_SECTION cancelConnLock;
17451745
* Set cancel_conn to point to the current database connection.
17461746
*/
17471747
static void
1748-
on_before_exec(PGconn *conn)
1748+
on_before_exec(PGconn *conn, PGcancel *thread_cancel_conn)
17491749
{
17501750
PGcancel *old;
17511751

@@ -1756,16 +1756,32 @@ on_before_exec(PGconn *conn)
17561756
EnterCriticalSection(&cancelConnLock);
17571757
#endif
17581758

1759-
/* Free the old one if we have one */
1760-
old = cancel_conn;
1759+
if (thread_cancel_conn)
1760+
{
1761+
elog(WARNING, "Handle tread_cancel_conn. on_before_exec");
1762+
old = thread_cancel_conn;
1763+
1764+
/* be sure handle_sigint doesn't use pointer while freeing */
1765+
thread_cancel_conn = NULL;
1766+
1767+
if (old != NULL)
1768+
PQfreeCancel(old);
1769+
1770+
thread_cancel_conn = PQgetCancel(conn);
1771+
}
1772+
else
1773+
{
1774+
/* Free the old one if we have one */
1775+
old = cancel_conn;
17611776

1762-
/* be sure handle_sigint doesn't use pointer while freeing */
1763-
cancel_conn = NULL;
1777+
/* be sure handle_sigint doesn't use pointer while freeing */
1778+
cancel_conn = NULL;
17641779

1765-
if (old != NULL)
1766-
PQfreeCancel(old);
1780+
if (old != NULL)
1781+
PQfreeCancel(old);
17671782

1768-
cancel_conn = PQgetCancel(conn);
1783+
cancel_conn = PQgetCancel(conn);
1784+
}
17691785

17701786
#ifdef WIN32
17711787
LeaveCriticalSection(&cancelConnLock);
@@ -1778,7 +1794,7 @@ on_before_exec(PGconn *conn)
17781794
* Free the current cancel connection, if any, and set to NULL.
17791795
*/
17801796
static void
1781-
on_after_exec(void)
1797+
on_after_exec(PGcancel *thread_cancel_conn)
17821798
{
17831799
PGcancel *old;
17841800

@@ -1789,14 +1805,27 @@ on_after_exec(void)
17891805
EnterCriticalSection(&cancelConnLock);
17901806
#endif
17911807

1792-
old = cancel_conn;
1808+
if (thread_cancel_conn)
1809+
{
1810+
elog(WARNING, "Handle tread_cancel_conn. on_after_exec");
1811+
old = thread_cancel_conn;
17931812

1794-
/* be sure handle_sigint doesn't use pointer while freeing */
1795-
cancel_conn = NULL;
1813+
/* be sure handle_sigint doesn't use pointer while freeing */
1814+
thread_cancel_conn = NULL;
17961815

1797-
if (old != NULL)
1798-
PQfreeCancel(old);
1816+
if (old != NULL)
1817+
PQfreeCancel(old);
1818+
}
1819+
else
1820+
{
1821+
old = cancel_conn;
1822+
1823+
/* be sure handle_sigint doesn't use pointer while freeing */
1824+
cancel_conn = NULL;
17991825

1826+
if (old != NULL)
1827+
PQfreeCancel(old);
1828+
}
18001829
#ifdef WIN32
18011830
LeaveCriticalSection(&cancelConnLock);
18021831
#endif

src/utils/pgut.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ extern PGconn *pgut_connect_replication_extended(const char *pghost, const char
127127
extern void pgut_disconnect(PGconn *conn);
128128
extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams,
129129
const char **params, bool text_result);
130-
extern PGresult *pgut_execute_parallel(PGconn* conn, PGconn* cancel_conn,
130+
extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn,
131131
const char *query, int nParams,
132132
const char **params, bool text_result);
133133
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);

0 commit comments

Comments
 (0)