@@ -366,14 +366,16 @@ static void MtmCheckHeartbeat()
366
366
}
367
367
368
368
369
- static int MtmConnectSocket (char const * host , int port , int max_attempts )
369
+ static int MtmConnectSocket (char const * host , int port , int timeout )
370
370
{
371
371
struct sockaddr_in sock_inet ;
372
372
unsigned addrs [MAX_ROUTES ];
373
373
unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
374
374
MtmHandshakeMessage req ;
375
375
MtmArbiterMessage resp ;
376
376
int sd ;
377
+ timestamp_t start = MtmGetSystemTime ();
378
+
377
379
378
380
sock_inet .sin_family = AF_INET ;
379
381
sock_inet .sin_port = htons (port );
@@ -390,7 +392,10 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
390
392
if (sd < 0 ) {
391
393
elog (ERROR , "Arbiter failed to create socket: %d" , errno );
392
394
}
393
- fcntl (sd , F_SETFL , O_NONBLOCK );
395
+ rc = fcntl (sd , F_SETFL , O_NONBLOCK );
396
+ if (rc < 0 ) {
397
+ elog (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
398
+ }
394
399
busy_socket = sd ;
395
400
for (i = 0 ; i < n_addrs ; ++ i ) {
396
401
memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
@@ -405,17 +410,19 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
405
410
if (rc == 0 ) {
406
411
break ;
407
412
}
408
- if (errno != EINPROGRESS || max_attempts == 0 ) {
413
+ if (errno != EINPROGRESS || start + MSEC_TO_USEC ( timeout ) < MtmGetSystemTime () ) {
409
414
elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
410
415
busy_socket = -1 ;
416
+ close (sd );
411
417
return -1 ;
412
418
} else {
413
- rc = MtmWaitSocket (sd , true, MtmConnectTimeout );
419
+ rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
414
420
if (rc == 1 ) {
415
421
socklen_t optlen = sizeof (int );
416
422
if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
417
423
elog (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
418
424
busy_socket = -1 ;
425
+ close (sd );
419
426
return -1 ;
420
427
}
421
428
if (rc == 0 ) {
@@ -426,8 +433,8 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
426
433
} else {
427
434
elog (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
428
435
}
429
- max_attempts -= 1 ;
430
- MtmSleep (MSEC_TO_USEC (MtmConnectTimeout ));
436
+ close ( sd ) ;
437
+ MtmSleep (MSEC_TO_USEC (MtmHeartbeatSendTimeout ));
431
438
}
432
439
}
433
440
MtmSetSocketOptions (sd );
@@ -479,7 +486,7 @@ static void MtmOpenConnections()
479
486
}
480
487
for (i = 0 ; i < nNodes ; i ++ ) {
481
488
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
482
- sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
489
+ sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectTimeout );
483
490
if (sockets [i ] < 0 ) {
484
491
MtmOnNodeDisconnect (i + 1 );
485
492
}
@@ -511,7 +518,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
511
518
close (sockets [node ]);
512
519
sockets [node ] = -1 ;
513
520
}
514
- sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
521
+ sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectTimeout );
515
522
if (sockets [node ] < 0 ) {
516
523
MtmOnNodeDisconnect (node + 1 );
517
524
return false;
0 commit comments