@@ -104,9 +104,13 @@ static int* sockets;
104
104
static int gateway ;
105
105
static bool send_heartbeat ;
106
106
static TimeoutId heartbeat_timer ;
107
+ static int busy_socket ;
107
108
108
109
static void MtmTransSender (Datum arg );
109
110
static void MtmTransReceiver (Datum arg );
111
+ static void MtmSendHeartbeat (void );
112
+ static void MtmCheckHeartbeat (void );
113
+
110
114
111
115
112
116
static char const * const messageText [] =
@@ -218,17 +222,41 @@ static void MtmDisconnect(int node)
218
222
MtmOnNodeDisconnect (node + 1 );
219
223
}
220
224
225
+ static int MtmWaitWriteSocket (int sd , time_t timeoutMsec )
226
+ {
227
+ struct timeval tv ;
228
+ fd_set out_set ;
229
+ int rc ;
230
+ tv .tv_sec = timeoutMsec /1000 ;
231
+ tv .tv_usec = timeoutMsec %1000 * 1000 ;
232
+ FD_ZERO (& out_set );
233
+ FD_SET (sd , & out_set );
234
+ do {
235
+ MtmCheckHeartbeat ();
236
+ } while ((rc = select (sd + 1 , NULL , & out_set , NULL , & tv )) < 0 && errno == EINTR );
237
+ return rc ;
238
+ }
239
+
221
240
static bool MtmWriteSocket (int sd , void const * buf , int size )
222
241
{
223
242
char * src = (char * )buf ;
243
+ busy_socket = sd ;
224
244
while (size != 0 ) {
225
- int n = send (sd , src , size , 0 );
226
- if (n <= 0 ) {
245
+ int rc = MtmWaitWriteSocket (sd , MtmHeartbeatSendTimeout );
246
+ if (rc == 1 ) {
247
+ int n = send (sd , src , size , 0 );
248
+ if (n < 0 ) {
249
+ busy_socket = -1 ;
250
+ return false;
251
+ }
252
+ size -= n ;
253
+ src += n ;
254
+ } else if (rc < 0 ) {
255
+ busy_socket = -1 ;
227
256
return false;
228
- }
229
- size -= n ;
230
- src += n ;
257
+ }
231
258
}
259
+ busy_socket = -1 ;
232
260
return true;
233
261
}
234
262
@@ -311,9 +339,10 @@ static void MtmSendHeartbeat()
311
339
312
340
for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
313
341
{
314
- if (sockets [i ] >= 0 && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
342
+ if (sockets [i ] >= 0 && sockets [ i ] != busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
315
343
{
316
- MtmWriteSocket (sockets [i ], & msg , sizeof (msg ));
344
+ int rc = send (sockets [i ], & msg , sizeof (msg ), 0 );
345
+ Assert (rc <= 0 || (size_t )rc == sizeof (msg ));
317
346
}
318
347
}
319
348
@@ -327,13 +356,15 @@ static void MtmCheckHeartbeat()
327
356
MtmSendHeartbeat ();
328
357
}
329
358
}
330
-
359
+
331
360
332
361
static int MtmConnectSocket (char const * host , int port , int max_attempts )
333
362
{
334
363
struct sockaddr_in sock_inet ;
335
364
unsigned addrs [MAX_ROUTES ];
336
365
unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
366
+ MtmHandshakeMessage req ;
367
+ MtmArbiterMessage resp ;
337
368
int sd ;
338
369
339
370
sock_inet .sin_family = AF_INET ;
@@ -347,67 +378,80 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
347
378
while (1 ) {
348
379
int rc = -1 ;
349
380
350
- sd = socket (AF_INET , SOCK_STREAM , 0 );
381
+ sd = socket (AF_INET , SOCK_STREAM | SOCK_NONBLOCK , 0 );
351
382
if (sd < 0 ) {
352
383
elog (ERROR , "Arbiter failed to create socket: %d" , errno );
353
384
}
385
+ busy_socket = sd ;
354
386
for (i = 0 ; i < n_addrs ; ++ i ) {
355
387
memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
356
388
do {
357
389
rc = connect (sd , (struct sockaddr * )& sock_inet , sizeof (sock_inet ));
358
- MtmCheckHeartbeat ();
359
390
} while (rc < 0 && errno == EINTR );
360
391
361
392
if (rc >= 0 || errno == EINPROGRESS ) {
362
393
break ;
363
394
}
364
395
}
365
- if (rc < 0 ) {
366
- if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) || max_attempts == 0 ) {
367
- elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
368
- return -1 ;
369
- } else {
370
- max_attempts -= 1 ;
371
- elog (WARNING , "Arbiter trying to connect to %s:%d: error=%d" , host , port , errno );
372
- MtmSleep (MtmConnectTimeout );
373
- }
374
- continue ;
396
+ if (rc == 0 ) {
397
+ break ;
398
+ }
399
+ if (errno != EINPROGRESS || max_attempts == 0 ) {
400
+ elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
401
+ busy_socket = -1 ;
402
+ return -1 ;
375
403
} else {
376
- MtmHandshakeMessage req ;
377
- MtmArbiterMessage resp ;
378
- MtmSetSocketOptions (sd );
379
- req .hdr .code = MSG_HANDSHAKE ;
380
- req .hdr .node = MtmNodeId ;
381
- req .hdr .dxid = HANDSHAKE_MAGIC ;
382
- req .hdr .sxid = ShmemVariableCache -> nextXid ;
383
- req .hdr .csn = MtmGetCurrentTime ();
384
- req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
385
- strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
386
- if (!MtmWriteSocket (sd , & req , sizeof req )) {
387
- elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
388
- close (sd );
389
- goto Retry ;
390
- }
391
- if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
392
- elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
393
- close (sd );
394
- goto Retry ;
395
- }
396
- if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
397
- elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
398
- close (sd );
399
- goto Retry ;
400
- }
401
-
402
- /* Some node considered that I am dead, so switch to recovery mode */
403
- if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
404
- elog (WARNING , "Node %d thinks that I was dead" , resp .node );
405
- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
406
- MtmSwitchClusterMode (MTM_RECOVERY );
404
+ rc = MtmWaitWriteSocket (sd , MtmConnectTimeout );
405
+ if (rc == 1 ) {
406
+ socklen_t optlen = sizeof (int );
407
+ if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
408
+ elog (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
409
+ busy_socket = -1 ;
410
+ return -1 ;
411
+ }
412
+ if (rc == 0 ) {
413
+ break ;
414
+ } else {
415
+ elog (WARNING , "Arbiter trying to connect to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
416
+ }
417
+ } else {
418
+ elog (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
407
419
}
408
- return sd ;
420
+ max_attempts -= 1 ;
421
+ MtmSleep (MSEC_TO_USEC (MtmConnectTimeout ));
409
422
}
410
- }
423
+ }
424
+ MtmSetSocketOptions (sd );
425
+ req .hdr .code = MSG_HANDSHAKE ;
426
+ req .hdr .node = MtmNodeId ;
427
+ req .hdr .dxid = HANDSHAKE_MAGIC ;
428
+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
429
+ req .hdr .csn = MtmGetCurrentTime ();
430
+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
431
+ strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
432
+ if (!MtmWriteSocket (sd , & req , sizeof req )) {
433
+ elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
434
+ close (sd );
435
+ goto Retry ;
436
+ }
437
+ if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
438
+ elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
439
+ close (sd );
440
+ goto Retry ;
441
+ }
442
+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
443
+ elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
444
+ close (sd );
445
+ goto Retry ;
446
+ }
447
+
448
+ /* Some node considered that I am dead, so switch to recovery mode */
449
+ if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
450
+ elog (WARNING , "Node %d thinks that I was dead" , resp .node );
451
+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
452
+ MtmSwitchClusterMode (MTM_RECOVERY );
453
+ }
454
+ return sd ;
411
455
}
412
456
413
457
0 commit comments