Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit af34ad7

Browse files
committed
Refresh cluster state only in monitor process
1 parent 61ca3a5 commit af34ad7

File tree

5 files changed

+86
-93
lines changed

5 files changed

+86
-93
lines changed

contrib/mmts/arbiter.c

+24-24
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ static void MtmSender(Datum arg);
9090
static void MtmReceiver(Datum arg);
9191
static void MtmMonitor(Datum arg);
9292
static void MtmSendHeartbeat(void);
93-
static bool MtmSendToNode(int node, void const* buf, int size);
93+
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout);
9494

9595
char const* const MtmMessageKindMnem[] =
9696
{
@@ -214,7 +214,7 @@ static void MtmDisconnect(int node)
214214
MtmOnNodeDisconnect(node+1);
215215
}
216216

217-
static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
217+
static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
218218
{
219219
struct timeval tv;
220220
fd_set set;
@@ -227,7 +227,7 @@ static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
227227
MtmCheckHeartbeat();
228228
now = MtmGetSystemTime();
229229
if (now > deadline) {
230-
return 0;
230+
now = deadline;
231231
}
232232
tv.tv_sec = (deadline - now)/USECS_PER_SEC;
233233
tv.tv_usec = (deadline - now)%USECS_PER_SEC;
@@ -355,7 +355,7 @@ static void MtmSendHeartbeat()
355355
timestamp_t now = MtmGetSystemTime();
356356
msg.code = MSG_HEARTBEAT;
357357
msg.disabledNodeMask = Mtm->disabledNodeMask;
358-
msg.connectivityMask = Mtm->connectivityMask;
358+
msg.connectivityMask = SELF_CONNECTIVITY_MASK;
359359
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
360360
msg.node = MtmNodeId;
361361
msg.csn = now;
@@ -373,15 +373,15 @@ static void MtmSendHeartbeat()
373373
|| !BIT_CHECK(Mtm->disabledNodeMask, i)
374374
|| BIT_CHECK(Mtm->reconnectMask, i)))
375375
{
376-
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
376+
if (!MtmSendToNode(i, &msg, sizeof(msg), MtmHeartbeatSendTimeout)) {
377377
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
378378
} else {
379379
if (last_heartbeat_to_node[i] + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
380380
MTM_LOG1("Last heartbeat to node %d was sent %ld microseconds ago", i+1, now - last_heartbeat_to_node[i]);
381381
}
382382
last_heartbeat_to_node[i] = now;
383383
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
384-
if (BIT_CHECK(Mtm->connectivityMask, i)) {
384+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK, i)) {
385385
MTM_LOG1("Force reconnect to node %d", i+1);
386386
close(sockets[i]);
387387
sockets[i] = -1;
@@ -411,7 +411,7 @@ void MtmCheckHeartbeat()
411411
}
412412

413413

414-
static int MtmConnectSocket(int node, int port, int timeout)
414+
static int MtmConnectSocket(int node, int port, time_t timeout)
415415
{
416416
struct sockaddr_in sock_inet;
417417
unsigned addrs[MAX_ROUTES];
@@ -422,6 +422,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
422422
timestamp_t start = MtmGetSystemTime();
423423
char const* host = Mtm->nodes[node].con.hostName;
424424
nodemask_t save_mask = busy_mask;
425+
timestamp_t afterWait;
426+
timestamp_t beforeWait;
425427

426428
sock_inet.sin_family = AF_INET;
427429
sock_inet.sin_port = htons(port);
@@ -435,7 +437,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
435437
Retry:
436438
while (1) {
437439
int rc = -1;
438-
439440
sd = socket(AF_INET, SOCK_STREAM, 0);
440441
if (sd < 0) {
441442
elog(LOG, "Arbiter failed to create socket: %d", errno);
@@ -461,7 +462,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
461462
if (rc == 0) {
462463
break;
463464
}
464-
if (errno != EINPROGRESS || start + MSEC_TO_USEC(timeout) < MtmGetSystemTime()) {
465+
beforeWait = MtmGetSystemTime();
466+
if (errno != EINPROGRESS || start + MSEC_TO_USEC(timeout) < beforeWait ) {
465467
elog(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
466468
close(sd);
467469
busy_mask = save_mask;
@@ -485,8 +487,10 @@ static int MtmConnectSocket(int node, int port, int timeout)
485487
elog(WARNING, "Arbiter waiting socket to %s:%d: rc=%d, error=%d", host, port, rc, errno);
486488
}
487489
close(sd);
488-
MtmCheckHeartbeat();
489-
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
490+
afterWait = MtmGetSystemTime();
491+
if (afterWait < beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
492+
MtmSleep(beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout) - afterWait);
493+
}
490494
}
491495
}
492496
MtmSetSocketOptions(sd);
@@ -496,7 +500,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
496500
req.hdr.sxid = ShmemVariableCache->nextXid;
497501
req.hdr.csn = MtmGetCurrentTime();
498502
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
499-
req.hdr.connectivityMask = Mtm->connectivityMask;
503+
req.hdr.connectivityMask = SELF_CONNECTIVITY_MASK;
500504
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
501505
if (!MtmWriteSocket(sd, &req, sizeof req)) {
502506
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
@@ -553,7 +557,7 @@ static void MtmOpenConnections()
553557
}
554558

555559

556-
static bool MtmSendToNode(int node, void const* buf, int size)
560+
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout)
557561
{
558562
bool result = true;
559563
nodemask_t save_mask = busy_mask;
@@ -580,7 +584,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
580584
close(sockets[node]);
581585
sockets[node] = -1;
582586
}
583-
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort, MtmReconnectTimeout);
587+
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort, reconnectTimeout);
584588
if (sockets[node] < 0) {
585589
MtmOnNodeDisconnect(node+1);
586590
result = false;
@@ -634,7 +638,7 @@ static void MtmAcceptOneConnection()
634638

635639
resp.code = MSG_STATUS;
636640
resp.disabledNodeMask = Mtm->disabledNodeMask;
637-
resp.connectivityMask = Mtm->connectivityMask;
641+
resp.connectivityMask = SELF_CONNECTIVITY_MASK;
638642
resp.dxid = HANDSHAKE_MAGIC;
639643
resp.sxid = ShmemVariableCache->nextXid;
640644
resp.csn = MtmGetCurrentTime();
@@ -759,7 +763,7 @@ static void MtmSender(Datum arg)
759763

760764
for (i = 0; i < Mtm->nAllNodes; i++) {
761765
if (txBuffer[i].used != 0) {
762-
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage));
766+
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage), MtmReconnectTimeout);
763767
txBuffer[i].used = 0;
764768
}
765769
}
@@ -813,7 +817,7 @@ static void MtmMonitor(Datum arg)
813817
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
814818

815819
while (!stop) {
816-
int rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
820+
int rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatSendTimeout);
817821
if (rc & WL_POSTMASTER_DEATH) {
818822
break;
819823
}
@@ -951,7 +955,7 @@ static void MtmReceiver(Datum arg)
951955
MTM_LOG1("Send response %s for transaction %s to node %d", MtmTxnStatusMnem[msg->status], msg->gid, msg->node);
952956
}
953957
msg->disabledNodeMask = Mtm->disabledNodeMask;
954-
msg->connectivityMask = Mtm->connectivityMask;
958+
msg->connectivityMask = SELF_CONNECTIVITY_MASK;
955959
msg->oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
956960
msg->code = MSG_POLL_STATUS;
957961
MtmSendMessage(msg);
@@ -1142,10 +1146,10 @@ static void MtmReceiver(Datum arg)
11421146
}
11431147
}
11441148
if (Mtm->status == MTM_ONLINE) {
1145-
/* Check for hearbeat only in case of timeout expiration: it means that we do not have unproceeded events.
1149+
now = MtmGetSystemTime();
1150+
/* Check for heartbeats only in case of timeout expiration: it means that we do not have unproceeded events.
11461151
* It helps to avoid false node failure detection because of blocking receiver.
11471152
*/
1148-
now = MtmGetSystemTime();
11491153
if (n == 0) {
11501154
selectTimeout = MtmHeartbeatRecvTimeout; /* restore select timeout */
11511155
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
@@ -1158,10 +1162,6 @@ static void MtmReceiver(Datum arg)
11581162
}
11591163
lastHeartbeatCheck = now;
11601164
}
1161-
if (Mtm->disabledNodeMask != 0) {
1162-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1163-
MtmRefreshClusterStatus();
1164-
}
11651165
} else {
11661166
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
11671167
/* Switch to non-blocking mode to proceed all pending requests before doing watchdog check */

contrib/mmts/multimaster--1.0.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text);
39+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text, "connectivityMask" bigint);
4040

4141
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
4242
AS 'MODULE_PATHNAME','mtm_get_nodes_state'

0 commit comments

Comments
 (0)