@@ -552,6 +552,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
552
552
Assert (!found );
553
553
sts -> status = ts -> status ;
554
554
sts -> csn = ts -> csn ;
555
+ sts -> votingCompleted = true;
555
556
MtmTransactionListInsertAfter (ts , sts );
556
557
}
557
558
}
@@ -744,7 +745,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
744
745
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
745
746
MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
746
747
Assert (x -> gid [0 ]);
747
- tm -> state = ts ;
748
+ tm -> state = ts ;
749
+ ts -> votingCompleted = true;
748
750
if (Mtm -> status != MTM_RECOVERY ) {
749
751
MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
750
752
} else {
@@ -776,9 +778,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
776
778
MtmLock (LW_EXCLUSIVE );
777
779
tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
778
780
Assert (tm != NULL );
779
- tm -> state -> status = TRANSACTION_STATUS_ABORTED ;
780
- MtmAdjustSubtransactions (tm -> state );
781
- Mtm -> nActiveTransactions -= 1 ;
781
+ MtmAbortTransaction (tm -> state );
782
782
MtmUnlock ();
783
783
x -> status = TRANSACTION_STATUS_ABORTED ;
784
784
}
@@ -834,6 +834,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
834
834
ts -> gtid = x -> gtid ;
835
835
ts -> nSubxids = 0 ;
836
836
ts -> cmd = MSG_INVALID ;
837
+ ts -> votingCompleted = true;
837
838
MtmTransactionListAppend (ts );
838
839
}
839
840
MtmSendNotificationMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
@@ -936,6 +937,20 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
936
937
return csn ;
937
938
}
938
939
940
+ void MtmWakeUpBackend (MtmTransState * ts )
941
+ {
942
+ MTM_TRACE ("Wakeup backed procno=%d, pid=%d\n" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
943
+ ts -> votingCompleted = true;
944
+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
945
+ }
946
+
947
+ void MtmAbortTransaction (MtmTransState * ts )
948
+ {
949
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
950
+ MtmAdjustSubtransactions (ts );
951
+ Mtm -> nActiveTransactions -= 1 ;
952
+ }
953
+
939
954
/*
940
955
* -------------------------------------------
941
956
* HA functions
@@ -1212,9 +1227,10 @@ void MtmCheckQuorum(void)
1212
1227
}
1213
1228
}
1214
1229
1215
-
1216
1230
void MtmOnNodeDisconnect (int nodeId )
1217
- {
1231
+ {
1232
+ MtmTransState * ts ;
1233
+
1218
1234
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1219
1235
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
1220
1236
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1228,6 +1244,16 @@ void MtmOnNodeDisconnect(int nodeId)
1228
1244
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1229
1245
Mtm -> nNodes -= 1 ;
1230
1246
MtmCheckQuorum ();
1247
+ /* Interrupt voting for active transaction and abort them */
1248
+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
1249
+ if (!ts -> votingCompleted ) {
1250
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1251
+ elog (WARNING , "Rollback active transaction %d:%d" , ts -> gtid .node , ts -> gtid .xid );
1252
+ MtmAbortTransaction (ts );
1253
+ }
1254
+ MtmWakeUpBackend (ts );
1255
+ }
1256
+ }
1231
1257
}
1232
1258
MtmUnlock ();
1233
1259
}
0 commit comments