@@ -429,29 +429,29 @@ static int MtmConnectSocket(int node, int port)
429
429
snprintf (portstr , sizeof (portstr ), "%d" , port );
430
430
431
431
rc = pg_getaddrinfo_all (host , portstr , & hint , & addrs );
432
- if (rc != 0 )
432
+ if (rc != 0 )
433
433
{
434
434
MTM_ELOG (LOG , "Arbiter failed to resolve host '%s' by name: %s" , host , gai_strerror (rc ));
435
435
return -1 ;
436
436
}
437
437
BIT_SET (busy_mask , node );
438
438
439
- Retry :
439
+ Retry :
440
440
441
- sd = socket (AF_INET , SOCK_STREAM , 0 );
441
+ sd = pg_socket (AF_INET , SOCK_STREAM , 0 , MtmUseRDMA );
442
442
if (sd < 0 ) {
443
443
MTM_ELOG (LOG , "Arbiter failed to create socket: %s" , strerror (errno ));
444
444
goto Error ;
445
445
}
446
- rc = fcntl (sd , F_SETFL , O_NONBLOCK );
446
+ rc = pg_fcntl (sd , F_SETFL , O_NONBLOCK , MtmUseRDMA );
447
447
if (rc < 0 ) {
448
448
MTM_ELOG (LOG , "Arbiter failed to switch socket to non-blocking mode: %s" , strerror (errno ));
449
449
goto Error ;
450
450
}
451
451
for (addr = addrs ; addr != NULL ; addr = addr -> ai_next )
452
452
{
453
453
do {
454
- rc = connect (sd , addr -> ai_addr , addr -> ai_addrlen );
454
+ rc = pg_connect (sd , addr -> ai_addr , addr -> ai_addrlen , MtmUseRDMA );
455
455
} while (rc < 0 && errno == EINTR );
456
456
457
457
if (rc >= 0 || errno == EINPROGRESS ) {
@@ -491,12 +491,12 @@ static int MtmConnectSocket(int node, int port)
491
491
strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
492
492
if (!MtmWriteSocket (sd , & req , sizeof req )) {
493
493
MTM_ELOG (WARNING , "Arbiter failed to send handshake message to %s:%d: %s" , host , port , strerror (errno ));
494
- close (sd );
494
+ pg_closesocket (sd , MtmUseRDMA );
495
495
goto Retry ;
496
496
}
497
497
if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
498
498
MTM_ELOG (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: %s" , host , port , strerror (errno ));
499
- close (sd );
499
+ pg_closesocket (sd , MtmUseRDMA );
500
500
goto Retry ;
501
501
}
502
502
if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
@@ -580,7 +580,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
580
580
if (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
581
581
if (sockets [node ] >= 0 ) {
582
582
MTM_ELOG (WARNING , "Arbiter fail to write to node %d: %s" , node + 1 , strerror (errno ));
583
- close (sockets [node ]);
583
+ pg_closesocket (sockets [node ], MtmUseRDMA );
584
584
sockets [node ] = -1 ;
585
585
}
586
586
sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
@@ -626,7 +626,7 @@ static void MtmAcceptOneConnection()
626
626
MTM_ELOG (WARNING , "Arbiter failed to handshake socket: %s" , strerror (errno ));
627
627
pg_closesocket (fd , MtmUseRDMA );
628
628
} else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
629
- MTM_ELOG (WARNING , "Arbiter get unexpected handshake message %d " , req . hdr . code );
629
+ MTM_ELOG (WARNING , "Arbiter failed to handshake socket: %s " , strerror ( errno ) );
630
630
pg_closesocket (fd , MtmUseRDMA );
631
631
} else {
632
632
int node = req .hdr .node - 1 ;
0 commit comments