@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
91
91
static void MtmSendHeartbeat (void );
92
92
static bool MtmSendToNode (int node , void const * buf , int size );
93
93
94
- /*
95
- static char const* const messageText[] =
94
+ static char const * const messageKindText [] =
96
95
{
97
96
"INVALID" ,
98
97
"HANDSHAKE" ,
99
- "READY",
100
- "PREPARE",
101
98
"PREPARED" ,
99
+ "PRECOMMIT" ,
100
+ "PRECOMMITTED" ,
102
101
"ABORTED" ,
103
102
"STATUS" ,
104
103
"HEARTBEAT" ,
105
104
"POLL_REQUEST" ,
106
105
"POLL_STATUS"
107
106
};
108
- */
109
107
110
108
static BackgroundWorker MtmSenderWorker = {
111
109
"mtm-sender" ,
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364
362
MTM_LOG2 ("Send heartbeat to node %d with timestamp %ld" , i + 1 , now );
365
363
}
366
364
} else {
367
- MTM_LOG1 ("Do not send heartbeat to node %d, busy mask %lld, status %d" , i + 1 , (long long ) busy_mask , Mtm -> status );
365
+ MTM_LOG2 ("Do not send heartbeat to node %d, busy mask %lld, status %d" , i + 1 , (long long ) busy_mask , Mtm -> status );
368
366
}
369
367
}
370
368
}
@@ -898,9 +896,14 @@ static void MtmReceiver(Datum arg)
898
896
msg -> status = TRANSACTION_STATUS_ABORTED ;
899
897
} else {
900
898
msg -> status = tm -> state -> status ;
899
+ msg -> csn = tm -> state -> csn ;
901
900
MTM_LOG1 ("Send response %d for transaction %s to node %d" , msg -> status , msg -> gid , msg -> node );
902
901
}
903
- msg -> code = MSG_POLL_STATUS ;
902
+ msg -> disabledNodeMask = Mtm -> disabledNodeMask ;
903
+ msg -> connectivityMask = Mtm -> connectivityMask ;
904
+ msg -> oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
905
+ msg -> code = MSG_POLL_STATUS ;
906
+ msg -> csn = ts -> csn ;
904
907
MtmSendMessage (msg );
905
908
continue ;
906
909
case MSG_POLL_STATUS :
@@ -911,41 +914,34 @@ static void MtmReceiver(Datum arg)
911
914
} else {
912
915
ts = tm -> state ;
913
916
BIT_SET (ts -> votedMask , node - 1 );
914
- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
915
- if (msg -> status == TRANSACTION_STATUS_UNKNOWN || msg -> status == TRANSACTION_STATUS_COMMITTED ) {
916
- elog (LOG , "Commit transaction %s because it is in state %d at node %d" ,
917
+ if (ts -> status == TRANSACTION_STATUS_UNKNOWN ) {
918
+ if (msg -> status == TRANSACTION_STATUS_IN_PROGRESS || msg -> status == TRANSACTION_STATUS_ABORTED ) {
919
+ elog (LOG , "Abort transaction %s because it is in state %d at node %d" ,
917
920
msg -> gid , ts -> status , node );
918
- Assert (!IsTransactionState ());
919
- StartTransactionCommand ();
920
- MtmSetCurrentTransactionGID (ts -> gid );
921
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
922
- FinishPreparedTransaction (ts -> gid , true);
923
- CommitTransactionCommand ();
924
- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED );
925
- } else if (msg -> status == TRANSACTION_STATUS_ABORTED
926
- || ((ts -> participantsMask & ~Mtm -> disabledNodeMask ) & ~ts -> votedMask ) == 0 )
921
+ MtmFinishPreparedTransaction (node , ts , false);
922
+ }
923
+ else if (msg -> status == TRANSACTION_STATUS_COMMITTED || msg -> status == TRANSACTION_STATUS_UNKNOWN )
927
924
{
928
- if (msg -> status == TRANSACTION_STATUS_ABORTED ) {
929
- elog (LOG , "Abort transaction %s because it is aborted at node %d" , msg -> gid , node );
930
- } else {
931
- elog (LOG , "Abort transaction %s because it is not prepared at any online node" , msg -> gid );
925
+ if (msg -> csn > ts -> csn ) {
926
+ ts -> csn = msg -> csn ;
927
+ MtmSyncClock (ts -> csn );
928
+ }
929
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
930
+ elog (LOG , "Commit transaction %s because it is prepared at all live nodes" , msg -> gid );
931
+ MtmFinishPreparedTransaction (node , ts , true);
932
932
}
933
- Assert (!IsTransactionState ());
934
- StartTransactionCommand ();
935
- MtmSetCurrentTransactionGID (ts -> gid );
936
- FinishPreparedTransaction (ts -> gid , false);
937
- CommitTransactionCommand ();
938
- Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
939
933
} else {
940
934
elog (LOG , "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx" ,
941
- msg -> status , msg -> gid , node , (long long ) ts -> votedMask ,
942
- (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
935
+ msg -> status , msg -> gid , node , (long long ) ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ));
943
936
continue ;
944
937
}
945
938
} else if (ts -> status == TRANSACTION_STATUS_ABORTED && msg -> status == TRANSACTION_STATUS_COMMITTED ) {
946
939
elog (WARNING , "Transaction %s is aborted at node %d but committed at node %d" , msg -> gid , MtmNodeId , node );
947
940
} else if (msg -> status == TRANSACTION_STATUS_ABORTED && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
948
941
elog (WARNING , "Transaction %s is committed at node %d but aborted at node %d" , msg -> gid , MtmNodeId , node );
942
+ } else {
943
+ elog (LOG , "Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx" ,
944
+ msg -> status , msg -> gid , ts -> status , node , (long long ) ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
949
945
}
950
946
}
951
947
continue ;
@@ -961,50 +957,49 @@ static void MtmReceiver(Datum arg)
961
957
elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , node );
962
958
continue ;
963
959
}
960
+ if (BIT_CHECK (ts -> votedMask , node - 1 )) {
961
+ elog (WARNING , "Receive deteriorated %s response for transaction %d (%s) from node %d" ,
962
+ messageKindText [msg -> code ], ts -> xid , ts -> gid , node );
963
+ continue ;
964
+ }
964
965
MtmCheckResponse (msg );
965
-
966
+ BIT_SET (ts -> votedMask , node - 1 );
967
+
966
968
if (MtmIsCoordinator (ts )) {
967
969
switch (msg -> code ) {
968
- case MSG_READY :
969
- MTM_TXTRACE (ts , "MtmTransReceiver got MSG_READY " );
970
+ case MSG_PREPARED :
971
+ MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PREPARED " );
970
972
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
971
- elog (WARNING , "Receive READY response for already committed transaction %d from node %d" ,
973
+ elog (WARNING , "Receive PREPARED response for already committed transaction %d from node %d" ,
972
974
ts -> xid , node );
973
975
continue ;
974
976
}
975
- if (ts -> nVotes >= Mtm -> nLiveNodes ) {
976
- elog (WARNING , "Receive deteriorated READY response for transaction %d (%s) from node %d" ,
977
- ts -> xid , ts -> gid , node );
977
+ Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
978
+ ts -> xids [node - 1 ] = msg -> sxid ;
979
+
980
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
981
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
982
+ commit on smaller subset of nodes */
983
+ elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
984
+ node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
978
985
MtmAbortTransaction (ts );
979
- MtmWakeUpBackend (ts );
980
- } else {
981
- Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
982
- ts -> xids [node - 1 ] = msg -> sxid ;
983
-
984
- if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
985
- /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986
- commit on smaller subset of nodes */
987
- elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
988
- node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
989
- MtmAbortTransaction (ts );
990
- }
991
-
992
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
993
- /* All nodes are finished their transactions */
994
- if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
995
- MtmWakeUpBackend (ts );
986
+ }
987
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
988
+ /* All nodes are finished their transactions */
989
+ if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
990
+ MtmWakeUpBackend (ts );
991
+ } else {
992
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
993
+ ts -> isPrepared = true;
994
+ if (ts -> isTwoPhase ) {
995
+ MtmWakeUpBackend (ts );
996
+ } else if (MtmUseDtm ) {
997
+ ts -> votedMask = 0 ;
998
+ MTM_TXTRACE (ts , "MtmTransReceiver send MSG_PRECOMMIT" );
999
+ MtmSend2PCMessage (ts , MSG_PRECOMMIT );
996
1000
} else {
997
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
998
- if (ts -> isTwoPhase ) {
999
- MtmWakeUpBackend (ts );
1000
- } else if (MtmUseDtm ) {
1001
- ts -> nVotes = 1 ; /* I voted myself */
1002
- MTM_TXTRACE (ts , "MtmTransReceiver send MSG_PREPARE" );
1003
- MtmSend2PCMessage (ts , MSG_PREPARE );
1004
- } else {
1005
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1006
- MtmWakeUpBackend (ts );
1007
- }
1001
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1002
+ MtmWakeUpBackend (ts );
1008
1003
}
1009
1004
}
1010
1005
}
@@ -1019,47 +1014,40 @@ static void MtmReceiver(Datum arg)
1019
1014
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1020
1015
MtmAbortTransaction (ts );
1021
1016
}
1022
- if (++ ts -> nVotes >= Mtm -> nLiveNodes ) {
1017
+ if (( ts -> participantsMask & ~ Mtm -> disabledNodeMask & ~ ts -> votedMask ) == 0 ) {
1023
1018
MtmWakeUpBackend (ts );
1024
1019
}
1025
1020
break ;
1026
- case MSG_PREPARED :
1027
- MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PREPARED" );
1028
- if (ts -> nVotes >= Mtm -> nLiveNodes ) {
1029
- elog (WARNING , "Receive deteriorated PREPARED response for transaction %d (%s) from node %d" ,
1030
- ts -> xid , ts -> gid , node );
1031
- MtmAbortTransaction (ts );
1032
- MtmWakeUpBackend (ts );
1021
+ case MSG_PRECOMMITTED :
1022
+ MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PRECOMMITTED" );
1023
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1024
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1025
+ if (msg -> csn > ts -> csn ) {
1026
+ ts -> csn = msg -> csn ;
1027
+ MtmSyncClock (ts -> csn );
1028
+ }
1029
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
1030
+ ts -> csn = MtmAssignCSN ();
1031
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1032
+ MtmWakeUpBackend (ts );
1033
+ }
1033
1034
} else {
1034
- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1035
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1036
- if (msg -> csn > ts -> csn ) {
1037
- ts -> csn = msg -> csn ;
1038
- MtmSyncClock (ts -> csn );
1039
- }
1040
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1041
- ts -> csn = MtmAssignCSN ();
1042
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1043
- MtmWakeUpBackend (ts );
1044
- }
1045
- } else {
1046
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1047
- MtmWakeUpBackend (ts );
1048
- }
1049
- }
1050
- }
1035
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
1036
+ MtmWakeUpBackend (ts );
1037
+ }
1038
+ }
1051
1039
break ;
1052
1040
default :
1053
1041
Assert (false);
1054
1042
}
1055
1043
} else {
1056
1044
switch (msg -> code ) {
1057
- case MSG_PREPARE :
1045
+ case MSG_PRECOMMIT :
1058
1046
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1059
1047
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1060
1048
ts -> csn = MtmAssignCSN ();
1061
1049
MtmAdjustSubtransactions (ts );
1062
- MtmSend2PCMessage (ts , MSG_PREPARED );
1050
+ MtmSend2PCMessage (ts , MSG_PRECOMMITTED );
1063
1051
} else {
1064
1052
Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
1065
1053
MtmSend2PCMessage (ts , MSG_ABORTED );
0 commit comments