@@ -320,6 +320,11 @@ static void MtmSetSocketOptions(int sd)
320
320
321
321
static void MtmCheckResponse (MtmArbiterMessage * resp )
322
322
{
323
+ if (resp -> lockReq ) {
324
+ BIT_SET (Mtm -> globalLockerMask , resp -> node - 1 );
325
+ } else {
326
+ BIT_CLEAR (Mtm -> globalLockerMask , resp -> node - 1 );
327
+ }
323
328
if (BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 )
324
329
&& !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
325
330
&& Mtm -> status != MTM_RECOVERY
@@ -358,6 +363,7 @@ static void MtmSendHeartbeat()
358
363
msg .disabledNodeMask = Mtm -> disabledNodeMask ;
359
364
msg .connectivityMask = SELF_CONNECTIVITY_MASK ;
360
365
msg .oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
366
+ msg .lockReq = Mtm -> nodeLockerMask != 0 ;
361
367
msg .node = MtmNodeId ;
362
368
msg .csn = now ;
363
369
if (last_sent_heartbeat != 0 && last_sent_heartbeat + MSEC_TO_USEC (MtmHeartbeatSendTimeout )* 2 < now ) {
@@ -1073,6 +1079,7 @@ static void MtmReceiver(Datum arg)
1073
1079
StartTransactionCommand ();
1074
1080
SetPreparedTransactionState (ts -> gid , MULTIMASTER_PRECOMMITTED );
1075
1081
CommitTransactionCommand ();
1082
+ Assert (!MtmTransIsActive ());
1076
1083
MtmLock (LW_EXCLUSIVE );
1077
1084
} else {
1078
1085
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -1088,7 +1095,7 @@ static void MtmReceiver(Datum arg)
1088
1095
continue ;
1089
1096
}
1090
1097
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1091
- MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu)" , ts -> gid , (long64 )ts -> xid );
1098
+ MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d " , ts -> gid , (long64 )ts -> xid , node );
1092
1099
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1093
1100
MtmAbortTransaction (ts );
1094
1101
}
0 commit comments