@@ -292,6 +292,43 @@ static void MtmSetSocketOptions(int sd)
292
292
#endif
293
293
}
294
294
295
+
296
+
297
+ static void MtmScheduleHeartbeat ()
298
+ {
299
+ send_heartbeat = true;
300
+ PGSemaphoreUnlock (& Mtm -> votingSemaphore );
301
+ }
302
+
303
+ static void MtmSendHeartbeat ()
304
+ {
305
+ int i ;
306
+ MtmArbiterMessage msg ;
307
+ msg .code = MSG_HEARTBEAT ;
308
+ msg .disabledNodeMask = Mtm -> disabledNodeMask ;
309
+ msg .oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
310
+ msg .node = MtmNodeId ;
311
+
312
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
313
+ {
314
+ if (sockets [i ] >= 0 && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
315
+ {
316
+ MtmWriteSocket (sockets [i ], & msg , sizeof (msg ));
317
+ }
318
+ }
319
+
320
+ }
321
+
322
+ static void MtmCheckHeartbeat ()
323
+ {
324
+ if (send_heartbeat ) {
325
+ send_heartbeat = false;
326
+ enable_timeout_after (heartbeat_timer , MtmHeartbeatSendTimeout );
327
+ MtmSendHeartbeat ();
328
+ }
329
+ }
330
+
331
+
295
332
static int MtmConnectSocket (char const * host , int port , int max_attempts )
296
333
{
297
334
struct sockaddr_in sock_inet ;
@@ -318,6 +355,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
318
355
memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
319
356
do {
320
357
rc = connect (sd , (struct sockaddr * )& sock_inet , sizeof (sock_inet ));
358
+ MtmCheckHeartbeat ();
321
359
} while (rc < 0 && errno == EINTR );
322
360
323
361
if (rc >= 0 || errno == EINPROGRESS ) {
@@ -331,7 +369,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
331
369
} else {
332
370
max_attempts -= 1 ;
333
371
elog (WARNING , "Arbiter trying to connect to %s:%d: error=%d" , host , port , errno );
334
- MtmSleep (5 * MtmConnectTimeout );
372
+ MtmSleep (MtmConnectTimeout );
335
373
}
336
374
continue ;
337
375
} else {
@@ -380,14 +418,15 @@ static void MtmOpenConnections()
380
418
381
419
sockets = (int * )palloc (sizeof (int )* nNodes );
382
420
421
+ for (i = 0 ; i < nNodes ; i ++ ) {
422
+ sockets [i ] = -1 ;
423
+ }
383
424
for (i = 0 ; i < nNodes ; i ++ ) {
384
425
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
385
426
sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
386
427
if (sockets [i ] < 0 ) {
387
428
MtmOnNodeDisconnect (i + 1 );
388
429
}
389
- } else {
390
- sockets [i ] = -1 ;
391
430
}
392
431
}
393
432
if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
@@ -412,6 +451,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
412
451
if (sockets [node ] >= 0 ) {
413
452
elog (WARNING , "Arbiter failed to write to node %d: %d" , node + 1 , errno );
414
453
close (sockets [node ]);
454
+ sockets [node ] = -1 ;
415
455
}
416
456
sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
417
457
if (sockets [node ] < 0 ) {
@@ -518,17 +558,12 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
518
558
}
519
559
buf -> data [buf -> used ].dxid = xid ;
520
560
521
- if (ts != NULL ) {
522
- MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
523
- messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
524
- Assert (ts -> cmd != MSG_INVALID );
525
- buf -> data [buf -> used ].code = ts -> cmd ;
526
- buf -> data [buf -> used ].sxid = ts -> xid ;
527
- buf -> data [buf -> used ].csn = ts -> csn ;
528
- } else {
529
- buf -> data [buf -> used ].code = MSG_HEARTBEAT ;
530
- MTM_LOG3 ("Send HEARTBEAT to node %d from node %d at %ld\n" , node + 1 , MtmNodeId , USEC_TO_MSEC (MtmGetSystemTime ()));
531
- }
561
+ MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
562
+ messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
563
+ Assert (ts -> cmd != MSG_INVALID );
564
+ buf -> data [buf -> used ].code = ts -> cmd ;
565
+ buf -> data [buf -> used ].sxid = ts -> xid ;
566
+ buf -> data [buf -> used ].csn = ts -> csn ;
532
567
buf -> data [buf -> used ].node = MtmNodeId ;
533
568
buf -> data [buf -> used ].disabledNodeMask = Mtm -> disabledNodeMask ;
534
569
buf -> data [buf -> used ].oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
@@ -541,24 +576,15 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
541
576
int n = 1 ;
542
577
for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
543
578
{
544
- if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )
545
- && (ts == NULL || TransactionIdIsValid (ts -> xids [i ])))
579
+ if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i ) && TransactionIdIsValid (ts -> xids [i ]))
546
580
{
547
- MtmAppendBuffer (txBuffer , ts ? ts -> xids [i ] : InvalidTransactionId , i , ts );
581
+ MtmAppendBuffer (txBuffer , ts -> xids [i ], i , ts );
548
582
n += 1 ;
549
583
}
550
584
}
551
585
Assert (n == Mtm -> nLiveNodes );
552
586
}
553
587
554
- static void MtmSendHeartbeat ()
555
- {
556
- send_heartbeat = true;
557
- PGSemaphoreUnlock (& Mtm -> votingSemaphore );
558
- //enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
559
- }
560
-
561
-
562
588
static void MtmTransSender (Datum arg )
563
589
{
564
590
sigset_t sset ;
@@ -575,7 +601,7 @@ static void MtmTransSender(Datum arg)
575
601
sigfillset (& sset );
576
602
sigprocmask (SIG_UNBLOCK , & sset , NULL );
577
603
578
- heartbeat_timer = RegisterTimeout (USER_TIMEOUT , MtmSendHeartbeat );
604
+ heartbeat_timer = RegisterTimeout (USER_TIMEOUT , MtmScheduleHeartbeat );
579
605
enable_timeout_after (heartbeat_timer , MtmHeartbeatSendTimeout );
580
606
581
607
MtmOpenConnections ();
@@ -589,11 +615,7 @@ static void MtmTransSender(Datum arg)
589
615
PGSemaphoreLock (& Mtm -> votingSemaphore );
590
616
CHECK_FOR_INTERRUPTS ();
591
617
592
- if (send_heartbeat ) {
593
- send_heartbeat = false;
594
- enable_timeout_after (heartbeat_timer , MtmHeartbeatSendTimeout );
595
- MtmBroadcastMessage (txBuffer , NULL );
596
- }
618
+ MtmCheckHeartbeat ();
597
619
/*
598
620
* Use shared lock to improve locality,
599
621
* because all other process modifying this list are using exclusive lock
@@ -676,7 +698,7 @@ static void MtmTransReceiver(Datum arg)
676
698
677
699
while (!stop ) {
678
700
#if USE_EPOLL
679
- n = epoll_wait (epollfd , events , nNodes , MtmKeepaliveTimeout / 1000 );
701
+ n = epoll_wait (epollfd , events , nNodes , USEC_TO_MSEC ( MtmKeepaliveTimeout ) );
680
702
if (n < 0 ) {
681
703
if (errno == EINTR ) {
682
704
continue ;
0 commit comments