81
81
typedef struct
82
82
{
83
83
MtmMessageCode code ; /* Message code: MSG_READY, MSG_PREPARE, MSG_COMMIT, MSG_ABORT */
84
- int node ; /* Sender node ID */
84
+ int node ; /* Sender node ID */
85
85
TransactionId dxid ; /* Transaction ID at destination node */
86
86
TransactionId sxid ; /* Transaction ID at sender node */
87
87
csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
88
88
nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
89
89
csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
90
+ uint64 seqno ;/* Message sequence number (used to eliminate duplicated messages) */
90
91
} MtmArbiterMessage ;
91
92
92
93
typedef struct
@@ -112,6 +113,7 @@ static int busy_socket;
112
113
static void MtmTransSender (Datum arg );
113
114
static void MtmTransReceiver (Datum arg );
114
115
static void MtmSendHeartbeat (void );
116
+ static bool MtmSendToNode (int node , void const * buf , int size );
115
117
116
118
117
119
static char const * const messageText [] =
@@ -248,6 +250,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
248
250
if (rc == 1 ) {
249
251
int n = send (sd , src , size , 0 );
250
252
if (n < 0 ) {
253
+ Assert (errno != EINTR ); /* should not happen in non-blocking call */
251
254
busy_socket = -1 ;
252
255
return false;
253
256
}
@@ -266,6 +269,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
266
269
{
267
270
int rc = recv (sd , buf , buf_size , 0 );
268
271
if (rc <= 0 ) {
272
+ Assert (errno != EINTR ); /* should not happen in non-blocking call */
269
273
return -1 ;
270
274
}
271
275
return rc ;
@@ -346,9 +350,8 @@ static void MtmSendHeartbeat()
346
350
{
347
351
if (sockets [i ] >= 0 && sockets [i ] != busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
348
352
{
349
- size_t rc = send (sockets [i ], & msg , sizeof (msg ), 0 );
350
- if ((size_t )rc != sizeof (msg )) {
351
- elog (LOG , "Failed to send heartbeat to node %d: %d" , i + 1 , errno );
353
+ if (!MtmSendToNode (i , & msg , sizeof (msg ))) {
354
+ elog (LOG , "Arbiter failed to send heartbeat to node %d" , i + 1 );
352
355
}
353
356
}
354
357
}
@@ -629,6 +632,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
629
632
MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
630
633
messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
631
634
Assert (ts -> cmd != MSG_INVALID );
635
+ buf -> data [buf -> used ].seqno = ++ Mtm -> nodes [node ].sendSeqNo ;
632
636
buf -> data [buf -> used ].code = ts -> cmd ;
633
637
buf -> data [buf -> used ].sxid = ts -> xid ;
634
638
buf -> data [buf -> used ].csn = ts -> csn ;
@@ -845,10 +849,17 @@ static void MtmTransReceiver(Datum arg)
845
849
elog (WARNING , "Ignore message from dead node %d\n" , msg -> node );
846
850
continue ;
847
851
}
852
+ if (msg -> seqno <= Mtm -> nodes [msg -> node - 1 ].recvSeqNo ) {
853
+ elog (WARNING , "Ignore duplicated message %ld from node %d" , msg -> seqno , msg -> node );
854
+ continue ;
855
+ }
856
+ Mtm -> nodes [msg -> node - 1 ].recvSeqNo = msg -> seqno ;
848
857
849
858
ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
850
- Assert (ts != NULL );
851
-
859
+ if (ts == NULL ) {
860
+ elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , msg -> node );
861
+ continue ;
862
+ }
852
863
if (BIT_CHECK (msg -> disabledNodeMask , MtmNodeId - 1 ) && Mtm -> status != MTM_RECOVERY ) {
853
864
elog (PANIC , "Node %d thinks that I was dead: perform hara-kiri not to be a zombie" , msg -> node );
854
865
}
0 commit comments