Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b042dae

Browse files
committed
Stop trying to reconnect in MtmConnectSocket() as heartbeat will try to reconnect anyway; Improve arbiter logging in case of network problems;
1 parent ac81341 commit b042dae

File tree

3 files changed

+66
-110
lines changed

3 files changed

+66
-110
lines changed

arbiter.c

+65-75
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static void MtmSender(Datum arg);
9797
static void MtmReceiver(Datum arg);
9898
static void MtmMonitor(Datum arg);
9999
static void MtmSendHeartbeat(void);
100-
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout);
100+
static bool MtmSendToNode(int node, void const* buf, int size);
101101

102102
char const* const MtmMessageKindMnem[] =
103103
{
@@ -166,7 +166,7 @@ static void MtmRegisterSocket(int fd, int node)
166166
ev.events = EPOLLIN;
167167
ev.data.u32 = node;
168168
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
169-
MTM_ELOG(LOG, "Arbiter failed to add socket to epoll set: %d", errno);
169+
MTM_ELOG(LOG, "Arbiter failed to add socket to epoll set: %s", strerror(errno));
170170
}
171171
#else
172172
FD_SET(fd, &inset);
@@ -180,7 +180,7 @@ static void MtmUnregisterSocket(int fd)
180180
{
181181
#if USE_EPOLL
182182
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
183-
MTM_ELOG(LOG, "Arbiter failed to unregister socket from epoll set: %d", errno);
183+
MTM_ELOG(LOG, "Arbiter failed to unregister socket from epoll set: %s", strerror(errno));
184184
}
185185
#else
186186
FD_CLR(fd, &inset);
@@ -371,7 +371,7 @@ static void MtmSendHeartbeat()
371371
|| !BIT_CHECK(Mtm->disabledNodeMask, i)
372372
|| BIT_CHECK(Mtm->reconnectMask, i)))
373373
{
374-
if (!MtmSendToNode(i, &msg, sizeof(msg), MtmHeartbeatRecvTimeout)) {
374+
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
375375
MTM_ELOG(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
376376
} else {
377377
if (last_heartbeat_to_node[i] + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
@@ -408,7 +408,7 @@ void MtmCheckHeartbeat()
408408
}
409409

410410

411-
static int MtmConnectSocket(int node, int port, time_t timeout)
411+
static int MtmConnectSocket(int node, int port)
412412
{
413413
struct addrinfo *addrs = NULL;
414414
struct addrinfo *addr;
@@ -417,12 +417,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
417417
MtmHandshakeMessage req;
418418
MtmArbiterMessage resp;
419419
int sd = -1;
420-
int ret;
421-
timestamp_t start = MtmGetSystemTime();
420+
int rc;
422421
char const* host = Mtm->nodes[node].con.hostName;
423422
nodemask_t save_mask = busy_mask;
424-
timestamp_t afterWait;
425-
timestamp_t beforeWait;
426423

427424
/* Initialize hint structure */
428425
MemSet(&hint, 0, sizeof(hint));
@@ -431,67 +428,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
431428

432429
snprintf(portstr, sizeof(portstr), "%d", port);
433430

434-
ret = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
435-
if (ret != 0)
431+
rc = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
432+
if (rc != 0)
436433
{
437-
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: (%d) %s", host, ret, gai_strerror(ret));
434+
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: %s", host, gai_strerror(rc));
438435
return -1;
439436
}
440437
BIT_SET(busy_mask, node);
441438

442-
Retry:
443-
while (1) {
444-
int rc = -1;
445-
sd = pg_socket(AF_INET, SOCK_STREAM, 0, MtmUseRDMA);
446-
if (sd < 0) {
447-
MTM_ELOG(LOG, "Arbiter failed to create socket: %d", errno);
448-
goto Error;
449-
}
450-
rc = pg_fcntl(sd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
451-
if (rc < 0) {
452-
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
453-
goto Error;
454-
}
455-
for (addr = addrs; addr != NULL; addr = addr->ai_next)
456-
{
457-
do {
458-
rc = pg_connect(sd, addr->ai_addr, addr->ai_addrlen, MtmUseRDMA);
459-
} while (rc < 0 && errno == EINTR);
439+
Retry:
460440

461-
if (rc >= 0 || errno == EINPROGRESS) {
462-
break;
463-
}
464-
}
465-
if (rc == 0) {
441+
sd = socket(AF_INET, SOCK_STREAM, 0);
442+
if (sd < 0) {
443+
MTM_ELOG(LOG, "Arbiter failed to create socket: %s", strerror(errno));
444+
goto Error;
445+
}
446+
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
447+
if (rc < 0) {
448+
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
449+
goto Error;
450+
}
451+
for (addr = addrs; addr != NULL; addr = addr->ai_next)
452+
{
453+
do {
454+
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
455+
} while (rc < 0 && errno == EINTR);
456+
457+
if (rc >= 0 || errno == EINPROGRESS) {
466458
break;
467459
}
468-
beforeWait = MtmGetSystemTime();
469-
if (errno != EINPROGRESS || start + MSEC_TO_USEC(timeout) < beforeWait ) {
470-
MTM_ELOG(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
471-
goto Error;
472-
} else {
473-
rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
474-
if (rc == 1) {
475-
socklen_t optlen = sizeof(int);
476-
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&rc, &optlen) < 0) {
477-
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: error=%d", host, port, errno);
478-
goto Error;
479-
}
480-
if (rc == 0) {
481-
break;
482-
} else {
483-
MTM_ELOG(WARNING, "Arbiter trying to connect to %s:%d: rc=%d, error=%d", host, port, rc, errno);
484-
}
485-
} else {
486-
MTM_ELOG(WARNING, "Arbiter waiting socket to %s:%d: rc=%d, error=%d", host, port, rc, errno);
460+
}
461+
462+
if (rc != 0 && errno == EINPROGRESS) {
463+
rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
464+
if (rc == 1) {
465+
socklen_t optlen = sizeof(int);
466+
int errcode;
467+
468+
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&errcode, &optlen) < 0) {
469+
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: %s", host, port, strerror(errcode));
470+
goto Error;
487471
}
488-
pg_closesocket(sd, MtmUseRDMA);
489-
afterWait = MtmGetSystemTime();
490-
if (afterWait < beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
491-
MtmSleep(beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout) - afterWait);
472+
if (errcode != 0) {
473+
MTM_ELOG(WARNING, "Arbiter trying to connect to %s:%d: %s", host, port, strerror(errcode));
474+
goto Error;
492475
}
476+
} else {
477+
MTM_ELOG(WARNING, "Arbiter waiting socket to %s:%d: %s", host, port, strerror(errno));
493478
}
494479
}
480+
else if (rc != 0) {
481+
MTM_ELOG(WARNING, "Arbiter failed to connect to %s:%d: (%d) %s", host, port, rc, strerror(errno));
482+
goto Error;
483+
}
484+
495485
MtmSetSocketOptions(sd);
496486
MtmInitMessage(&req.hdr, MSG_HANDSHAKE);
497487
req.hdr.node = MtmNodeId;
@@ -500,13 +490,13 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
500490
req.hdr.csn = MtmGetCurrentTime();
501491
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
502492
if (!MtmWriteSocket(sd, &req, sizeof req)) {
503-
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
504-
pg_closesocket(sd, MtmUseRDMA);
493+
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %s", host, port, strerror(errno));
494+
close(sd);
505495
goto Retry;
506496
}
507497
if (MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
508-
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
509-
pg_closesocket(sd, MtmUseRDMA);
498+
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: %s", host, port, strerror(errno));
499+
close(sd);
510500
goto Retry;
511501
}
512502
if (resp.code != MSG_STATUS || resp.dxid != HANDSHAKE_MAGIC) {
@@ -527,7 +517,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
527517

528518
return sd;
529519

530-
Error:
520+
Error:
531521
busy_mask = save_mask;
532522
if (sd >= 0) {
533523
pg_closesocket(sd, MtmUseRDMA);
@@ -551,7 +541,7 @@ static void MtmOpenConnections()
551541
}
552542
for (i = 0; i < nNodes; i++) {
553543
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
554-
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort, MtmConnectTimeout);
544+
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort);
555545
if (sockets[i] < 0) {
556546
MtmOnNodeDisconnect(i+1);
557547
}
@@ -566,7 +556,7 @@ static void MtmOpenConnections()
566556
}
567557

568558

569-
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout)
559+
static bool MtmSendToNode(int node, void const* buf, int size)
570560
{
571561
bool result = true;
572562
nodemask_t save_mask = busy_mask;
@@ -589,11 +579,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
589579
}
590580
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
591581
if (sockets[node] >= 0) {
592-
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %d", node+1, errno);
593-
pg_closesocket(sockets[node], MtmUseRDMA);
582+
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %s", node+1, strerror(errno));
583+
close(sockets[node]);
594584
sockets[node] = -1;
595585
}
596-
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort, reconnectTimeout);
586+
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
597587
if (sockets[node] < 0) {
598588
MtmOnNodeDisconnect(node+1);
599589
result = false;
@@ -613,7 +603,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
613603
{
614604
int rc = MtmReadSocket(sockets[node], buf, buf_size);
615605
if (rc <= 0) {
616-
MTM_ELOG(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
606+
MTM_ELOG(WARNING, "Arbiter failed to read from node=%d: %s", node+1, strerror(errno));
617607
MtmDisconnect(node);
618608
}
619609
return rc;
@@ -623,17 +613,17 @@ static void MtmAcceptOneConnection()
623613
{
624614
int fd = pg_accept(gateway, NULL, NULL, MtmUseRDMA);
625615
if (fd < 0) {
626-
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %d", errno);
616+
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %s", strerror(errno));
627617
} else {
628618
MtmHandshakeMessage req;
629619
MtmArbiterMessage resp;
630620
int rc = pg_fcntl(fd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
631621
if (rc < 0) {
632-
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
622+
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
633623
}
634624
rc = MtmReadSocket(fd, &req, sizeof req);
635625
if (rc < sizeof(req)) {
636-
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %d, errno=%d", rc, errno);
626+
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %s", strerror(errno));
637627
pg_closesocket(fd, MtmUseRDMA);
638628
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
639629
MTM_ELOG(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
@@ -770,7 +760,7 @@ static void MtmSender(Datum arg)
770760

771761
for (i = 0; i < Mtm->nAllNodes; i++) {
772762
if (txBuffer[i].used != 0) {
773-
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage), MtmReconnectTimeout);
763+
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage));
774764
txBuffer[i].used = 0;
775765
}
776766
}
@@ -870,7 +860,7 @@ static void MtmReceiver(Datum arg)
870860
if (errno == EINTR) {
871861
continue;
872862
}
873-
MTM_ELOG(ERROR, "Arbiter failed to poll sockets: %d", errno);
863+
MTM_ELOG(ERROR, "Arbiter failed to poll sockets: %s", strerror(errno));
874864
}
875865
for (j = 0; j < n; j++) {
876866
i = events[j].data.u32;
@@ -894,7 +884,7 @@ static void MtmReceiver(Datum arg)
894884
} while (n < 0 && MtmRecovery());
895885

896886
if (n < 0) {
897-
MTM_ELOG(ERROR, "Arbiter failed to select sockets: %d", errno);
887+
MTM_ELOG(ERROR, "Arbiter failed to select sockets: %s", strerror(errno));
898888
}
899889
for (i = 0; i < nNodes; i++) {
900890
if (sockets[i] >= 0 && FD_ISSET(sockets[i], &events))

multimaster.c

+1-33
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ char const* const MtmNodeStatusMnem[] =
204204
{
205205
"Initialization",
206206
"Offline",
207-
"Connected",
207+
"Connecting",
208208
"Online",
209209
"Recovery",
210210
"Recovered",
@@ -229,8 +229,6 @@ int MtmNodes;
229229
int MtmNodeId;
230230
int MtmReplicationNodeId;
231231
int MtmArbiterPort;
232-
int MtmConnectTimeout;
233-
int MtmReconnectTimeout;
234232
int MtmNodeDisableDelay;
235233
int MtmTransSpillThreshold;
236234
int MtmMaxNodes;
@@ -3292,36 +3290,6 @@ _PG_init(void)
32923290
NULL
32933291
);
32943292

3295-
DefineCustomIntVariable(
3296-
"multimaster.connect_timeout",
3297-
"Multimaster nodes connect timeout",
3298-
"Interval in milliseconds for establishing connection with cluster node",
3299-
&MtmConnectTimeout,
3300-
10000, /* 10 seconds */
3301-
1,
3302-
INT_MAX,
3303-
PGC_BACKEND,
3304-
0,
3305-
NULL,
3306-
NULL,
3307-
NULL
3308-
);
3309-
3310-
DefineCustomIntVariable(
3311-
"multimaster.reconnect_timeout",
3312-
"Multimaster nodes reconnect timeout",
3313-
"Interval in milliseconds for reestablishing connection with cluster node",
3314-
&MtmReconnectTimeout,
3315-
5000, /* 5 seconds */
3316-
1,
3317-
INT_MAX,
3318-
PGC_BACKEND,
3319-
0,
3320-
NULL,
3321-
NULL,
3322-
NULL
3323-
);
3324-
33253293
if (!ConfigIsSane()) {
33263294
MTM_ELOG(ERROR, "Multimaster config is insane, refusing to work");
33273295
}

multimaster.h

-2
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,6 @@ extern int MtmNodes;
343343
extern int MtmArbiterPort;
344344
extern char* MtmDatabaseName;
345345
extern char* MtmDatabaseUser;
346-
extern int MtmConnectTimeout;
347-
extern int MtmReconnectTimeout;
348346
extern int MtmNodeDisableDelay;
349347
extern int MtmTransSpillThreshold;
350348
extern int MtmHeartbeatSendTimeout;

0 commit comments

Comments
 (0)