Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 02f0b9e

Browse files
author
Mikhail Rutman
committed
Disable timeouts for multimaster
To avoid disconnection between multimaster components the following timeouts were disabled for the multimaster: - statement_timeout; - lock_timeout; - idle_in_transaction_session_timeout; - idle_session_timeout.
1 parent 524f915 commit 02f0b9e

11 files changed

+189
-9
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ OBJS = src/multimaster.o src/dmq.o src/commit.o src/bytebuf.o src/bgwpool.o \
44
src/pglogical_output.o src/pglogical_proto.o src/pglogical_receiver.o \
55
src/pglogical_apply.o src/pglogical_hooks.o src/pglogical_config.o \
66
src/pglogical_relid_map.o src/ddd.o src/bkb.o src/spill.o src/state.o \
7-
src/resolver.o src/ddl.o src/syncpoint.o src/global_tx.o
7+
src/resolver.o src/ddl.o src/syncpoint.o src/global_tx.o src/mtm_utils.o
88
MODULE_big = multimaster
99

1010
ifndef USE_PGXS # hmm, user didn't requested to use pgxs

src/bgwpool.c

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "multimaster.h"
3232
#include "state.h"
3333
#include "logger.h"
34+
#include "mtm_utils.h"
3435

3536
/*
3637
* Store the size of tx body, position of it in the tx list and transaction
@@ -324,6 +325,7 @@ BgwPoolMainLoop(BgwPool *poolDesc)
324325
void
325326
BgwPoolDynamicWorkerMainLoop(Datum arg)
326327
{
328+
MtmDisableTimeouts();
327329
BgwPoolMainLoop((BgwPool *) DatumGetPointer(arg));
328330
}
329331

src/ddl.c

-1
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,6 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
666666
QueryEnvironment *queryEnv, DestReceiver *dest,
667667
QueryCompletion *qc)
668668
{
669-
670669
/*
671670
* Quick exit if multimaster is not enabled.
672671
* XXX it's better to do MtmIsEnabled here, but this needs cache access

src/dmq.c

+9-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "dmq.h"
3232
#include "logger.h"
3333
#include "compat.h"
34+
#include "mtm_utils.h"
3435

3536
#include "access/transam.h"
3637
#include "libpq/libpq.h"
@@ -527,6 +528,8 @@ dmq_sender_main(Datum main_arg)
527528
pqsignal(SIGTERM, die);
528529
BackgroundWorkerUnblockSignals();
529530

531+
MtmDisableTimeouts();
532+
530533
memcpy(&heartbeat_send_timeout, MyBgworkerEntry->bgw_extra, sizeof(int));
531534
memcpy(&connect_timeout, MyBgworkerEntry->bgw_extra + sizeof(int), sizeof(int));
532535

@@ -796,7 +799,7 @@ dmq_sender_main(Datum main_arg)
796799
int pos = event.pos;
797800

798801
pqtime = dmq_now();
799-
status = PQconnectPoll(conns[conn_id].pgconn);
802+
status = MtmPQconnectPoll(conns[conn_id].pgconn);
800803
mtm_log(DmqPqTiming, "[DMQ] [TIMING] pqp = %f ms", dmq_now() - pqtime);
801804

802805
mtm_log(DmqStateIntermediate,
@@ -1386,6 +1389,11 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
13861389
double last_message_at = dmq_now();
13871390
void *extra = NULL;
13881391

1392+
/*
1393+
* We do not call MtmDisbaleTimeouts() here because of connection to this
1394+
* client is made by MtmPQconnectPoll() that sets all needed timeouts.
1395+
*/
1396+
13891397
sender_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
13901398
recv_timeout = PG_GETARG_INT32(1);
13911399

src/include/mtm_utils.h

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* mtm_utils.h
4+
* Utility functions:
5+
* - disable global timeouts settings;
6+
* - libpq connect function wrappers.
7+
*
8+
*
9+
* Copyright (c) 2022, Postgres Professional
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#ifndef MTM_UTILS_H
14+
#define MTM_UTILS_H
15+
16+
#include "libpq/pqformat.h"
17+
#include "libpq-fe.h"
18+
19+
extern void MtmDisableTimeouts(void);
20+
21+
extern PostgresPollingStatusType MtmPQconnectPoll(PGconn *conn);
22+
extern PGconn* MtmPQconnectdb(const char *conninfo);
23+
24+
#endif

src/mtm_utils.c

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*----------------------------------------------------------------------------
2+
*
3+
* mtm_utils.c
4+
* Utility functions
5+
*
6+
* Copyright (c) 2022, Postgres Professional
7+
*
8+
*----------------------------------------------------------------------------
9+
*/
10+
11+
#include "logger.h"
12+
#include "mtm_utils.h"
13+
14+
#include "utils/timeout.h"
15+
16+
/*
17+
* Disables timeouts on a client side:
18+
* - statement_timeout;
19+
* - lock_timeout;
20+
* - idle_in_transaction_session_timeout;
21+
* - idle_session_timeout.
22+
*
23+
* This timeouts, when set in the postgres config file, affect all process.
24+
* The multimaster needs his sessions not to be interrupted, so we disable
25+
* these timeouts.
26+
*
27+
* This function raises an error on PQExec failed.
28+
*/
29+
static void
30+
disable_client_timeouts(PGconn *conn)
31+
{
32+
PGresult *res;
33+
34+
res = PQexec(conn, "SET statement_timeout = 0");
35+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
36+
{
37+
char *msg = pchomp(PQerrorMessage(conn));
38+
mtm_log(ERROR, "failed to set statement_timeout: %s", msg);
39+
}
40+
41+
res = PQexec(conn, "SET lock_timeout = 0");
42+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
43+
{
44+
char *msg = pchomp(PQerrorMessage(conn));
45+
mtm_log(ERROR, "failed to set lock_timeout: %s", msg);
46+
}
47+
48+
res = PQexec(conn, "SET idle_in_transaction_session_timeout = 0");
49+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
50+
{
51+
char *msg = pchomp(PQerrorMessage(conn));
52+
mtm_log(ERROR, "failed to set idle_in_transaction_session_timeout: %s", msg);
53+
}
54+
55+
res = PQexec(conn, "SET idle_session_timeout = 0");
56+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
57+
{
58+
char *msg = pchomp(PQerrorMessage(conn));
59+
mtm_log(ERROR, "failed to set idle_session_timeout: %s", msg);
60+
}
61+
}
62+
63+
/*
64+
* Disable timeouts for a current process
65+
* - statement_timeout;
66+
* - lock_timeout;
67+
* - idle_in_transaction_session_timeout;
68+
* - idle_session_timeout.
69+
*
70+
* We disable these timeout for the same reason as in the disable_client_timeout()
71+
*/
72+
extern void
73+
MtmDisableTimeouts(void)
74+
{
75+
if (get_timeout_active(STATEMENT_TIMEOUT))
76+
{
77+
disable_timeout(STATEMENT_TIMEOUT, false);
78+
}
79+
80+
if (get_timeout_active(LOCK_TIMEOUT))
81+
{
82+
disable_timeout(LOCK_TIMEOUT, false);
83+
}
84+
85+
if (get_timeout_active(IDLE_IN_TRANSACTION_SESSION_TIMEOUT))
86+
{
87+
disable_timeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT, false);
88+
}
89+
90+
if (get_timeout_active(IDLE_SESSION_TIMEOUT))
91+
{
92+
disable_timeout(IDLE_SESSION_TIMEOUT, false);
93+
}
94+
}
95+
96+
/*
97+
* Wrapper on PQconnectPoll
98+
*
99+
* On connect disables timeouts on a client side
100+
*/
101+
PostgresPollingStatusType
102+
MtmPQconnectPoll(PGconn *conn)
103+
{
104+
PostgresPollingStatusType status;
105+
106+
status = PQconnectPoll(conn);
107+
if (status != PGRES_POLLING_OK)
108+
return status;
109+
110+
disable_client_timeouts(conn);
111+
112+
return status;
113+
}
114+
115+
/*
116+
* Wrapper on PQconnectdb
117+
*
118+
* On connect disables timeouts on a client side
119+
*/
120+
PGconn *
121+
MtmPQconnectdb(const char *conninfo)
122+
{
123+
PGconn *conn;
124+
125+
conn = PQconnectdb(conninfo);
126+
if (PQstatus(conn) != CONNECTION_OK)
127+
return conn;
128+
129+
disable_client_timeouts(conn);
130+
131+
return conn;
132+
}
133+

src/multimaster.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "commit.h"
4949
#include "messaging.h"
5050
#include "syncpoint.h"
51+
#include "mtm_utils.h"
5152

5253
#include "compat.h"
5354

@@ -333,7 +334,6 @@ MtmSleep(int64 usec)
333334
}
334335
}
335336

336-
337337
/*
338338
* These were once used to setup mtm state in parallel workers, but as long as
339339
* they are read-only we don't really need it (historically it imported csn
@@ -970,7 +970,7 @@ mtm_init_cluster(PG_FUNCTION_ARGS)
970970
int j;
971971

972972
/* connect */
973-
peer_conns[i] = PQconnectdb(conninfo);
973+
peer_conns[i] = MtmPQconnectdb(conninfo);
974974
if (PQstatus(peer_conns[i]) != CONNECTION_OK)
975975
{
976976
char *msg = pchomp(PQerrorMessage(peer_conns[i]));
@@ -1300,7 +1300,7 @@ mtm_join_node(PG_FUNCTION_ARGS)
13001300
if (new_node == NULL)
13011301
mtm_log(ERROR, "new node %d not found", new_node_id);
13021302
conninfo = new_node->conninfo;
1303-
conn = PQconnectdb(conninfo);
1303+
conn = MtmPQconnectdb(conninfo);
13041304
if (PQstatus(conn) != CONNECTION_OK)
13051305
{
13061306
char *msg = pchomp(PQerrorMessage(conn));
@@ -1495,7 +1495,7 @@ mtm_ping(PG_FUNCTION_ARGS)
14951495
if (!BIT_CHECK(curr_gen.members, peer->node_id - 1))
14961496
continue;
14971497

1498-
conn = PQconnectdb(peer->conninfo);
1498+
conn = MtmPQconnectdb(peer->conninfo);
14991499
if (PQstatus(conn) != CONNECTION_OK)
15001500
{
15011501
char *msg = pchomp(PQerrorMessage(conn));
@@ -2554,7 +2554,7 @@ _mtm_get_snapshots(const MtmConfig *mcfg, PGconn **conns, char **snapnames,
25542554
for (i = 0; i < mcfg->n_nodes; i++)
25552555
{
25562556
/* Establish connection to each node */
2557-
conns[i] = PQconnectdb(mcfg->nodes[i].conninfo);
2557+
conns[i] = MtmPQconnectdb(mcfg->nodes[i].conninfo);
25582558

25592559
if (conns[i] == NULL || PQstatus(conns[i]) == CONNECTION_BAD)
25602560
{
@@ -2680,7 +2680,7 @@ mtm_check_query(PG_FUNCTION_ARGS)
26802680
int pos = index[i];
26812681

26822682
/* Establish connection to each online node */
2683-
conn = PQconnectdb(cfg->nodes[pos].conninfo);
2683+
conn = MtmPQconnectdb(cfg->nodes[pos].conninfo);
26842684

26852685
if (conn == NULL || PQstatus(conn) == CONNECTION_BAD)
26862686
{

src/pglogical_output.c

+3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include "multimaster.h"
5858
#include "logger.h"
5959
#include "state.h"
60+
#include "mtm_utils.h"
6061

6162
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
6263

@@ -143,6 +144,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
143144
cb->shutdown_cb = pg_decode_shutdown;
144145
cb->message_cb = pg_decode_message;
145146
cb->caughtup_cb = pg_decode_caughtup;
147+
148+
MtmDisableTimeouts();
146149
}
147150

148151
#if 0

src/pglogical_receiver.c

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include "compat.h"
5757
#include "syncpoint.h"
5858
#include "global_tx.h"
59+
#include "mtm_utils.h"
5960

6061
#define ERRCODE_DUPLICATE_OBJECT_STR "42710"
6162

@@ -584,6 +585,8 @@ pglogical_receiver_main(Datum main_arg)
584585
*/
585586
on_shmem_exit(pglogical_receiver_at_exit, PointerGetDatum(rctx));
586587

588+
MtmDisableTimeouts();
589+
587590
MtmIsReceiver = true;
588591
/* Run as replica session replication role. */
589592
SetConfigOption("session_replication_role", "replica",

src/resolver.c

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "commit.h"
3131
#include "global_tx.h"
3232
#include "messaging.h"
33+
#include "mtm_utils.h"
3334

3435
static MtmConfig *mtm_cfg = NULL;
3536
static bool send_requests;
@@ -637,6 +638,8 @@ ResolverMain(Datum main_arg)
637638
Oid db_id,
638639
user_id;
639640

641+
MtmDisableTimeouts();
642+
640643
/* init this worker */
641644
pqsignal(SIGHUP, ResolverSigHupHandler);
642645
pqsignal(SIGTERM, die);

src/state.c

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "syncpoint.h"
4646
#include "logger.h"
4747
#include "messaging.h"
48+
#include "mtm_utils.h"
4849

4950
char const *const MtmNeighborEventMnem[] =
5051
{
@@ -1672,6 +1673,8 @@ CampaignerMain(Datum main_arg)
16721673
TimestampTz last_campaign_at = 0;
16731674
int rc = WL_TIMEOUT;
16741675

1676+
MtmDisableTimeouts();
1677+
16751678
MtmBackgroundWorker = true;
16761679
mtm_log(MtmStateMessage, "campaigner started");
16771680
before_shmem_exit(CampaignerOnExit, (Datum) 0);
@@ -3417,6 +3420,8 @@ ReplierMain(Datum main_arg)
34173420
ALLOCSET_DEFAULT_SIZES);
34183421
bool job_pending;
34193422

3423+
MtmDisableTimeouts();
3424+
34203425
MtmBackgroundWorker = true;
34213426
before_shmem_exit(ReplierOnExit, (Datum) 0);
34223427
mtm_log(MtmStateMessage, "replier started");

0 commit comments

Comments
 (0)