Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c5accda

Browse files
committed
use mm GUCs for timeout in dmq
1 parent 1b9cdfd commit c5accda

File tree

6 files changed

+20
-22
lines changed

6 files changed

+20
-22
lines changed

Cluster.pm

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ sub init
4444
max_wal_senders = 6
4545
max_replication_slots = 12
4646
wal_sender_timeout = 0
47+
48+
multimaster.heartbeat_send_timeout = 100
49+
multimaster.heartbeat_recv_timeout = 5000
4750
});
4851
}
4952
}

multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ END
1111
$$;
1212

1313
-- message queue receiver, for internal use only
14-
CREATE FUNCTION mtm.dmq_receiver_loop(sender_name text) RETURNS void
14+
CREATE FUNCTION mtm.dmq_receiver_loop(sender_name text, recv_timeout int) RETURNS void
1515
AS 'MODULE_PATHNAME','dmq_receiver_loop'
1616
LANGUAGE C;
1717

src/dmq.c

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ typedef struct {
7878
char sender_name[DMQ_NAME_MAXLEN];
7979
char receiver_name[DMQ_NAME_MAXLEN];
8080
char connstr[DMQ_CONNSTR_MAX_LEN];
81-
int ping_period;
81+
int recv_timeout;
8282
PGconn *pgconn;
8383
DmqConnState state;
8484
int pos;
@@ -240,7 +240,7 @@ dmq_shmem_size(void)
240240
}
241241

242242
void
243-
dmq_init(void)
243+
dmq_init(int send_timeout)
244244
{
245245
BackgroundWorker worker;
246246

@@ -258,7 +258,7 @@ dmq_init(void)
258258
worker.bgw_start_time = BgWorkerStart_ConsistentState;
259259
worker.bgw_restart_time = 5;
260260
worker.bgw_notify_pid = 0;
261-
worker.bgw_main_arg = 0;
261+
worker.bgw_main_arg = send_timeout;
262262
sprintf(worker.bgw_library_name, "multimaster");
263263
sprintf(worker.bgw_function_name, "dmq_sender_main");
264264
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm-dmq-sender");
@@ -272,14 +272,6 @@ dmq_init(void)
272272
// on_proc_exit(dmq_at_exit, 0);
273273
}
274274

275-
// void _PG_init(void);
276-
277-
// void
278-
// _PG_init(void)
279-
// {
280-
// dmq_init();
281-
// }
282-
283275
static Size
284276
dmq_toc_size()
285277
{
@@ -347,6 +339,7 @@ dmq_sender_main(Datum main_arg)
347339
shm_mq_handle **mq_handles;
348340
WaitEventSet *set;
349341
DmqDestination conns[DMQ_MAX_DESTINATIONS];
342+
int heartbeat_send_timeout = DatumGetInt32(main_arg);
350343

351344
double prev_timer_at = dmq_now();
352345

@@ -508,7 +501,7 @@ dmq_sender_main(Datum main_arg)
508501
* Otherwise we can stuck with timer_event forever.
509502
*/
510503
now_millisec = dmq_now();
511-
if (now_millisec - prev_timer_at > 250)
504+
if (now_millisec - prev_timer_at > heartbeat_send_timeout)
512505
{
513506
prev_timer_at = now_millisec;
514507
timer_event = true;
@@ -635,8 +628,8 @@ dmq_sender_main(Datum main_arg)
635628
else if (status == PGRES_POLLING_OK)
636629
{
637630
char *sender_name = conns[conn_id].sender_name;
638-
char *query = psprintf("select mtm.dmq_receiver_loop('%s')",
639-
sender_name);
631+
char *query = psprintf("select mtm.dmq_receiver_loop('%s', %d)",
632+
sender_name, conns[conn_id].recv_timeout);
640633

641634
conns[conn_id].state = Negotiating;
642635
ModifyWaitEvent(set, event.pos, WL_SOCKET_READABLE, NULL);
@@ -1001,9 +994,11 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
1001994
char *proc_name;
1002995
int i;
1003996
int receiver_id = -1;
997+
int recv_timeout;
1004998
double last_message_at = dmq_now();
1005999

10061000
sender_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
1001+
recv_timeout = PG_GETARG_INT32(1);
10071002

10081003
proc_name = psprintf("mtm-dmq-receiver %s", sender_name);
10091004
set_ps_display(proc_name, true);
@@ -1142,7 +1137,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
11421137
// XXX: is it enough?
11431138
CHECK_FOR_INTERRUPTS();
11441139

1145-
if (dmq_now() - last_message_at > 2000)
1140+
if (dmq_now() - last_message_at > recv_timeout)
11461141
{
11471142
mtm_log(ERROR, "[DMQ] exit receiver due to heatbeat timeout");
11481143
}
@@ -1573,7 +1568,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15731568

15741569
DmqDestinationId
15751570
dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
1576-
int ping_period)
1571+
int recv_timeout)
15771572
{
15781573
DmqDestinationId dest_id;
15791574
pid_t sender_pid;
@@ -1587,7 +1582,7 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
15871582
strncpy(dest->sender_name, sender_name, DMQ_NAME_MAXLEN);
15881583
strncpy(dest->receiver_name, receiver_name, DMQ_NAME_MAXLEN);
15891584
strncpy(dest->connstr, connstr, DMQ_CONNSTR_MAX_LEN);
1590-
dest->ping_period = ping_period;
1585+
dest->recv_timeout = recv_timeout;
15911586
dest->active = true;
15921587
break;
15931588
}

src/include/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ typedef int8 DmqSenderId;
99

1010
#define DMQ_NAME_MAXLEN 32
1111

12-
extern void dmq_init(void);
12+
extern void dmq_init(int send_timeout);
1313

1414
extern DmqDestinationId dmq_destination_add(char *connstr, char *sender_name,
1515
char *receiver_name, int ping_period);

src/multimaster.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ _PG_init(void)
341341
"Timeout in milliseconds of receiving heartbeat messages",
342342
"If no heartbeat message is received from node within this period, it assumed to be dead",
343343
&MtmHeartbeatRecvTimeout,
344-
1000,
344+
2000,
345345
1,
346346
INT_MAX,
347347
PGC_BACKEND,
@@ -502,7 +502,7 @@ _PG_init(void)
502502
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize + sizeof(MtmTime));
503503
RequestNamedLWLockTranche(MULTIMASTER_NAME, 4);
504504

505-
dmq_init();
505+
dmq_init(MtmHeartbeatSendTimeout);
506506
dmq_receiver_start_hook = MtmOnNodeConnect;
507507
dmq_receiver_stop_hook = MtmOnNodeDisconnect;
508508
dmq_sender_connect_hook = MtmOnDmqSenderConnect;

src/state.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1351,7 +1351,7 @@ start_node_workers(int node_id, MtmConfig *new_cfg, Datum arg)
13511351

13521352
/* Add dmq destination */
13531353
dest = dmq_destination_add(dmq_connstr, dmq_my_name, dmq_node_name,
1354-
MtmHeartbeatSendTimeout);
1354+
MtmHeartbeatRecvTimeout);
13551355

13561356
LWLockAcquire(Mtm->lock, LW_EXCLUSIVE);
13571357
Mtm->peers[node_id - 1].dmq_dest_id = dest;

0 commit comments

Comments
 (0)