74
74
75
75
#include "multimaster.h"
76
76
77
- #define MAX_ROUTES 16
78
- #define BUFFER_SIZE 1024
79
- #define HANDSHAKE_MAGIC 0xCAFEDEED
77
+ #define MAX_ROUTES 16
78
+ #define INIT_BUFFER_SIZE 1024
79
+ #define HANDSHAKE_MAGIC 0xCAFEDEED
80
80
81
81
typedef struct
82
82
{
@@ -98,7 +98,8 @@ typedef struct
98
98
typedef struct
99
99
{
100
100
int used ;
101
- MtmArbiterMessage data [BUFFER_SIZE ];
101
+ int size ;
102
+ MtmArbiterMessage * data ;
102
103
} MtmBuffer ;
103
104
104
105
static int * sockets ;
@@ -450,11 +451,14 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
450
451
}
451
452
452
453
/* Some node considered that I am dead, so switch to recovery mode */
454
+ MtmLock (LW_EXCLUSIVE );
453
455
if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
454
456
elog (WARNING , "Node %d thinks that I was dead" , resp .node );
455
457
BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
456
458
MtmSwitchClusterMode (MTM_RECOVERY );
457
459
}
460
+ MtmUnlock ();
461
+
458
462
return sd ;
459
463
}
460
464
@@ -491,7 +495,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
491
495
while (true) {
492
496
if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
493
497
elog (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
498
+ MtmLock (LW_EXCLUSIVE );
494
499
BIT_CLEAR (Mtm -> reconnectMask , node );
500
+ MtmUnlock ();
495
501
close (sockets [node ]);
496
502
sockets [node ] = -1 ;
497
503
}
@@ -506,7 +512,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
506
512
MtmOnNodeDisconnect (node + 1 );
507
513
return false;
508
514
}
515
+ MtmLock (LW_EXCLUSIVE );
509
516
BIT_CLEAR (Mtm -> reconnectMask , node );
517
+ MtmUnlock ();
510
518
MTM_LOG3 ("Arbiter restablished connection with node %d" , node + 1 );
511
519
} else {
512
520
return true;
@@ -552,7 +560,6 @@ static void MtmAcceptOneConnection()
552
560
close (fd );
553
561
} else {
554
562
MTM_LOG1 ("Arbiter established connection with node %d" , req .hdr .node );
555
- BIT_CLEAR (Mtm -> connectivityMask , req .hdr .node - 1 );
556
563
MtmRegisterSocket (fd , req .hdr .node - 1 );
557
564
sockets [req .hdr .node - 1 ] = fd ;
558
565
MtmOnNodeConnect (req .hdr .node );
@@ -598,12 +605,9 @@ static void MtmAcceptIncomingConnections()
598
605
static void MtmAppendBuffer (MtmBuffer * txBuffer , TransactionId xid , int node , MtmTransState * ts )
599
606
{
600
607
MtmBuffer * buf = & txBuffer [node ];
601
- if (buf -> used == BUFFER_SIZE ) {
602
- if (!MtmSendToNode (node , buf -> data , buf -> used * sizeof (MtmArbiterMessage ))) {
603
- buf -> used = 0 ;
604
- return ;
605
- }
606
- buf -> used = 0 ;
608
+ if (buf -> used == buf -> size ) {
609
+ buf -> size = buf -> size ? buf -> size * 2 : INIT_BUFFER_SIZE ;
610
+ buf -> data = repalloc (buf -> data , buf -> size * sizeof (MtmArbiterMessage ));
607
611
}
608
612
buf -> data [buf -> used ].dxid = xid ;
609
613
@@ -640,7 +644,7 @@ static void MtmTransSender(Datum arg)
640
644
int nNodes = MtmMaxNodes ;
641
645
int i ;
642
646
643
- MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
647
+ MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
644
648
645
649
InitializeTimeouts ();
646
650
@@ -655,10 +659,6 @@ static void MtmTransSender(Datum arg)
655
659
656
660
MtmOpenConnections ();
657
661
658
- for (i = 0 ; i < nNodes ; i ++ ) {
659
- txBuffer [i ].used = 0 ;
660
- }
661
-
662
662
while (!stop ) {
663
663
MtmTransState * ts ;
664
664
PGSemaphoreLock (& Mtm -> votingSemaphore );
@@ -723,7 +723,7 @@ static void MtmTransReceiver(Datum arg)
723
723
int nNodes = MtmMaxNodes ;
724
724
int nResponses ;
725
725
int i , j , n , rc ;
726
- MtmBuffer * rxBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
726
+ MtmBuffer * rxBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
727
727
timestamp_t lastHeartbeatCheck = MtmGetSystemTime ();
728
728
timestamp_t now ;
729
729
@@ -744,7 +744,8 @@ static void MtmTransReceiver(Datum arg)
744
744
MtmAcceptIncomingConnections ();
745
745
746
746
for (i = 0 ; i < nNodes ; i ++ ) {
747
- rxBuffer [i ].used = 0 ;
747
+ rxBuffer [i ].size = INIT_BUFFER_SIZE ;
748
+ rxBuffer [i ].data = palloc (INIT_BUFFER_SIZE * sizeof (MtmArbiterMessage ));
748
749
}
749
750
750
751
while (!stop ) {
@@ -788,7 +789,7 @@ static void MtmTransReceiver(Datum arg)
788
789
continue ;
789
790
}
790
791
791
- rc = MtmReadFromNode (i , (char * )rxBuffer [i ].data + rxBuffer [i ].used , BUFFER_SIZE - rxBuffer [i ].used );
792
+ rc = MtmReadFromNode (i , (char * )rxBuffer [i ].data + rxBuffer [i ].used , rxBuffer [ i ]. size - rxBuffer [i ].used );
792
793
if (rc <= 0 ) {
793
794
continue ;
794
795
}
0 commit comments