@@ -458,7 +458,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
458
458
MTM_TRACE ("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n" ,
459
459
messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
460
460
Assert (ts -> cmd != MSG_INVALID );
461
- buf -> data [buf -> used ].code = ts -> cmd ;
461
+ buf -> data [buf -> used ].code = ts -> status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED ;
462
462
buf -> data [buf -> used ].dxid = xid ;
463
463
buf -> data [buf -> used ].sxid = ts -> xid ;
464
464
buf -> data [buf -> used ].csn = ts -> csn ;
@@ -467,21 +467,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
467
467
buf -> used += 1 ;
468
468
}
469
469
470
- static void MtmBroadcastMessage (MtmBuffer * txBuffer , MtmTransState * ts )
471
- {
472
- int i ;
473
- int n = 1 ;
474
- for (i = 0 ; i < MtmNodes ; i ++ )
475
- {
476
- if (TransactionIdIsValid (ts -> xids [i ])) {
477
- Assert (i + 1 != MtmNodeId );
478
- MtmAppendBuffer (txBuffer , ts -> xids [i ], i , ts );
479
- n += 1 ;
480
- }
481
- }
482
- Assert (n == ds -> nNodes );
483
- }
484
-
485
470
static void MtmTransSender (Datum arg )
486
471
{
487
472
int nNodes = MtmNodes ;
@@ -508,12 +493,7 @@ static void MtmTransSender(Datum arg)
508
493
MtmLock (LW_SHARED );
509
494
510
495
for (ts = ds -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
511
- if (MtmIsCoordinator (ts )) {
512
- MtmBroadcastMessage (txBuffer , ts );
513
- } else {
514
- MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
515
- }
516
- ts -> cmd = MSG_INVALID ;
496
+ MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
517
497
}
518
498
ds -> votingTransactions = NULL ;
519
499
MtmUnlock ();
@@ -634,109 +614,31 @@ static void MtmTransReceiver(Datum arg)
634
614
MtmArbiterMessage * msg = & rxBuffer [i ].data [j ];
635
615
MtmTransState * ts = (MtmTransState * )hash_search (xid2state , & msg -> dxid , HASH_FIND , NULL );
636
616
Assert (ts != NULL );
637
- Assert (ts -> cmd == MSG_INVALID );
638
617
Assert (msg -> node > 0 && msg -> node <= nNodes && msg -> node != MtmNodeId );
639
- ts -> xids [msg -> node - 1 ] = msg -> sxid ;
640
-
641
- if (MtmIsCoordinator (ts )) {
642
- switch (msg -> code ) {
643
- case MSG_READY :
644
- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
645
- Assert (ts -> nVotes < ds -> nNodes );
646
- ds -> nodeTransDelay [msg -> node - 1 ] += MtmGetCurrentTime () - ts -> csn ;
647
- if (++ ts -> nVotes == ds -> nNodes ) {
648
- /* All nodes are finished their transactions */
649
- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
650
- ts -> nVotes = 1 ; /* I voted myself */
651
- ts -> cmd = MSG_PREPARE ;
652
- } else {
653
- ts -> status = TRANSACTION_STATUS_ABORTED ;
654
- ts -> cmd = MSG_ABORT ;
655
- MtmAdjustSubtransactions (ts );
656
- MtmWakeUpBackend (ts );
657
- }
658
- MtmSendNotificationMessage (ts );
659
- }
660
- break ;
618
+ Assert (MtmIsCoordinator (ts ));
619
+ switch (msg -> code ) {
661
620
case MSG_PREPARED :
662
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
663
- Assert (ts -> nVotes < ds -> nNodes );
664
- if (msg -> csn > ts -> csn ) {
665
- ts -> csn = msg -> csn ;
666
- MtmSyncClock (ts -> csn );
667
- }
668
- if (++ ts -> nVotes == ds -> nNodes ) {
669
- /* ts->csn is maximum of CSNs at all nodes */
670
- ts -> nVotes = 1 ; /* I voted myself */
671
- ts -> cmd = MSG_COMMIT ;
672
- ts -> csn = MtmAssignCSN ();
673
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
674
- MtmAdjustSubtransactions (ts );
675
- MtmSendNotificationMessage (ts );
676
- }
677
- break ;
678
- case MSG_COMMITTED :
679
- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
680
- Assert (ts -> nVotes < ds -> nNodes );
681
- if (++ ts -> nVotes == ds -> nNodes ) {
682
- /* All nodes have the same CSN */
683
- MtmWakeUpBackend (ts );
621
+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
622
+ if (msg -> csn > ts -> csn ) {
623
+ ts -> csn = msg -> csn ;
624
+ MtmSyncClock (ts -> csn );
625
+ }
626
+ if (++ ts -> nVotes == ds -> nNodes ) {
627
+ MtmWakeUpBackend (ts );
628
+ }
684
629
}
685
630
break ;
686
631
case MSG_ABORTED :
687
- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
688
- Assert (ts -> nVotes < ds -> nNodes );
689
- ts -> status = TRANSACTION_STATUS_ABORTED ;
690
- if (++ ts -> nVotes == ds -> nNodes ) {
691
- ts -> cmd = MSG_ABORT ;
692
- MtmAdjustSubtransactions (ts );
693
- MtmSendNotificationMessage (ts );
694
- MtmWakeUpBackend (ts );
695
- }
696
- break ;
697
- default :
698
- Assert (false);
699
- }
700
- } else { /* replica */
701
- switch (msg -> code ) {
702
- case MSG_PREPARE :
703
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
704
- if ((msg -> disabledNodeMask & ~ds -> disabledNodeMask ) != 0 ) {
705
- /* Coordinator's disabled mask is wider than my: so reject such transaction to avoid
706
- commit on smaller subset of nodes */
632
+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
707
633
ts -> status = TRANSACTION_STATUS_ABORTED ;
708
- ts -> cmd = MSG_ABORT ;
709
- MtmAdjustSubtransactions (ts );
710
- MtmWakeUpBackend (ts );
711
- } else {
712
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
713
- ts -> csn = MtmAssignCSN ();
714
- ts -> cmd = MSG_PREPARED ;
715
- }
716
- MtmSendNotificationMessage (ts );
717
- break ;
718
- case MSG_COMMIT :
719
- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
720
- Assert (ts -> csn < msg -> csn );
721
- ts -> csn = msg -> csn ;
722
- MtmSyncClock (ts -> csn );
723
- ts -> cmd = MSG_COMMITTED ;
724
- MtmAdjustSubtransactions (ts );
725
- MtmSendNotificationMessage (ts );
726
- MtmWakeUpBackend (ts );
727
- break ;
728
- case MSG_ABORT :
729
- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
730
- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
731
- ts -> status = TRANSACTION_STATUS_ABORTED ;
732
634
MtmAdjustSubtransactions (ts );
733
635
MtmWakeUpBackend (ts );
734
636
}
735
637
break ;
736
638
default :
737
639
Assert (false);
738
640
}
739
- }
641
+ }
740
642
}
741
643
MtmUnlock ();
742
644
0 commit comments