Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ce87760

Browse files
knizhnikkelvich
authored andcommitted
Fix asynchronous write in arbiter
1 parent 558a943 commit ce87760

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

arbiter.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,18 +222,18 @@ static void MtmDisconnect(int node)
222222
MtmOnNodeDisconnect(node+1);
223223
}
224224

225-
static int MtmWaitWriteSocket(int sd, time_t timeoutMsec)
225+
static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
226226
{
227227
struct timeval tv;
228-
fd_set out_set;
228+
fd_set set;
229229
int rc;
230230
tv.tv_sec = timeoutMsec/1000;
231231
tv.tv_usec = timeoutMsec%1000*1000;
232-
FD_ZERO(&out_set);
233-
FD_SET(sd, &out_set);
232+
FD_ZERO(&set);
233+
FD_SET(sd, &set);
234234
do {
235235
MtmCheckHeartbeat();
236-
} while ((rc = select(sd+1, NULL, &out_set, NULL, &tv)) < 0 && errno == EINTR);
236+
} while ((rc = select(sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv)) < 0 && errno == EINTR);
237237
return rc;
238238
}
239239

@@ -242,7 +242,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
242242
char* src = (char*)buf;
243243
busy_socket = sd;
244244
while (size != 0) {
245-
int rc = MtmWaitWriteSocket(sd, MtmHeartbeatSendTimeout);
245+
int rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
246246
if (rc == 1) {
247247
int n = send(sd, src, size, 0);
248248
if (n < 0) {
@@ -401,7 +401,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
401401
busy_socket = -1;
402402
return -1;
403403
} else {
404-
rc = MtmWaitWriteSocket(sd, MtmConnectTimeout);
404+
rc = MtmWaitSocket(sd, true, MtmConnectTimeout);
405405
if (rc == 1) {
406406
socklen_t optlen = sizeof(int);
407407
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&rc, &optlen) < 0) {
@@ -434,7 +434,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
434434
close(sd);
435435
goto Retry;
436436
}
437-
if (MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
437+
if (MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout) != 1 || MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
438438
elog(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
439439
close(sd);
440440
goto Retry;

0 commit comments

Comments
 (0)