@@ -87,7 +87,6 @@ typedef struct
87
87
csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
88
88
nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
89
89
csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
90
- uint64 seqno ;/* Message sequence number (used to eliminate duplicated messages) */
91
90
} MtmArbiterMessage ;
92
91
93
92
typedef struct
@@ -446,7 +445,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
446
445
req .hdr .sxid = ShmemVariableCache -> nextXid ;
447
446
req .hdr .csn = MtmGetCurrentTime ();
448
447
req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
449
- req .hdr .seqno = Mtm -> nodes [node ].recvSeqNo ;
450
448
strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
451
449
if (!MtmWriteSocket (sd , & req , sizeof req )) {
452
450
elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
@@ -465,9 +463,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
465
463
}
466
464
467
465
MtmLock (LW_EXCLUSIVE );
468
- if (Mtm -> nodes [resp .node - 1 ].sendSeqNo < resp .seqno ) {
469
- Mtm -> nodes [resp .node - 1 ].sendSeqNo = resp .seqno ;
470
- }
471
466
472
467
/* Some node considered that I am dead, so switch to recovery mode */
473
468
if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
@@ -582,10 +577,6 @@ static void MtmAcceptOneConnection()
582
577
resp .sxid = ShmemVariableCache -> nextXid ;
583
578
resp .csn = MtmGetCurrentTime ();
584
579
resp .node = MtmNodeId ;
585
- resp .seqno = Mtm -> nodes [req .hdr .node - 1 ].recvSeqNo ;
586
- if (Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo < req .hdr .seqno ) {
587
- Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo = req .hdr .seqno ;
588
- }
589
580
MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req .hdr .node - 1 ].con , req .connStr );
590
581
if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
591
582
elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
@@ -651,7 +642,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
651
642
MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
652
643
messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
653
644
Assert (ts -> cmd != MSG_INVALID );
654
- buf -> data [buf -> used ].seqno = ++ Mtm -> nodes [node ].sendSeqNo ;
655
645
buf -> data [buf -> used ].code = ts -> cmd ;
656
646
buf -> data [buf -> used ].sxid = ts -> xid ;
657
647
buf -> data [buf -> used ].csn = ts -> csn ;
@@ -868,12 +858,6 @@ static void MtmTransReceiver(Datum arg)
868
858
elog (WARNING , "Ignore message from dead node %d\n" , msg -> node );
869
859
continue ;
870
860
}
871
- if (msg -> seqno <= Mtm -> nodes [msg -> node - 1 ].recvSeqNo ) {
872
- elog (WARNING , "Ignore duplicated message %ld (<=%ld) from node %d" , msg -> seqno , Mtm -> nodes [msg -> node - 1 ].recvSeqNo , msg -> node );
873
- continue ;
874
- }
875
- Mtm -> nodes [msg -> node - 1 ].recvSeqNo = msg -> seqno ;
876
-
877
861
ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
878
862
if (ts == NULL ) {
879
863
elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , msg -> node );
0 commit comments