73
73
typedef struct {
74
74
TransactionId xid ; /* local transaction ID */
75
75
GlobalTransactionId gtid ; /* global transaction ID assigned by coordinator of transaction */
76
+ bool isTwoPhase ; /* user level 2PC */
76
77
bool isReplicated ; /* transaction on replica */
77
78
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
78
79
bool isPrepared ; /* transaction is perpared at first stage of 2PC */
@@ -719,6 +720,7 @@ MtmResetTransaction()
719
720
x -> gtid .xid = InvalidTransactionId ;
720
721
x -> isDistributed = false;
721
722
x -> isPrepared = false;
723
+ x -> isTwoPhase = false;
722
724
x -> status = TRANSACTION_STATUS_UNKNOWN ;
723
725
}
724
726
@@ -746,6 +748,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
746
748
x -> isReplicated = MtmIsLogicalReceiver ;
747
749
x -> isDistributed = MtmIsUserTransaction ();
748
750
x -> isPrepared = false;
751
+ x -> isTwoPhase = false;
749
752
x -> isTransactionBlock = IsTransactionBlock ();
750
753
/* Application name can be changed usnig PGAPPNAME environment variable */
751
754
if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
@@ -906,8 +909,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
906
909
Assert (ts != NULL );
907
910
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
908
911
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
909
- bool found ;
910
- MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , & found );
912
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
911
913
Assert (x -> gid [0 ]);
912
914
tm -> state = ts ;
913
915
ts -> votingCompleted = true;
@@ -925,8 +927,13 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
925
927
time_t transTimeout = Max (MSEC_TO_USEC (Mtm2PCMinTimeout ), (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100 );
926
928
int result = 0 ;
927
929
int nConfigChanges = Mtm -> nConfigChanges ;
928
-
929
930
timestamp_t start = MtmGetSystemTime ();
931
+
932
+ if (x -> isTwoPhase ) {
933
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
934
+ tm -> state = ts ;
935
+ }
936
+
930
937
/* Wait votes from all nodes until: */
931
938
while (!ts -> votingCompleted /* all nodes voted */
932
939
&& nConfigChanges == Mtm -> nConfigChanges /* configarion is changed */
@@ -982,7 +989,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
982
989
MtmLock (LW_EXCLUSIVE );
983
990
tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
984
991
if (tm == NULL ) {
985
- elog (WARNING , "Global transaciton ID %s is not found" , x -> gid );
992
+ elog (WARNING , "Global transaciton ID '%s' is not found" , x -> gid );
986
993
} else {
987
994
Assert (tm -> state != NULL );
988
995
MTM_LOG1 ("Abort prepared transaction %d with gid='%s'" , x -> xid , x -> gid );
@@ -1265,7 +1272,7 @@ void MtmAbortTransaction(MtmTransState* ts)
1265
1272
Assert (MtmLockCount != 0 ); /* should be invoked with exclsuive lock */
1266
1273
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1267
1274
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
1268
- elog (WARNING , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1275
+ elog (LOG , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1269
1276
} else {
1270
1277
MTM_LOG1 ("Rollback active transaction %d:%d (local xid %d) status %d" , ts -> gtid .node , ts -> gtid .xid , ts -> xid , ts -> status );
1271
1278
ts -> status = TRANSACTION_STATUS_ABORTED ;
@@ -3798,11 +3805,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3798
3805
}
3799
3806
break ;
3800
3807
case TRANS_STMT_PREPARE :
3801
- elog (ERROR , "Two phase commit is not supported by multimaster" );
3802
- break ;
3803
3808
case TRANS_STMT_COMMIT_PREPARED :
3804
3809
case TRANS_STMT_ROLLBACK_PREPARED :
3805
- skipCommand = true;
3810
+ MtmTx .isTwoPhase = true;
3811
+ strcpy (MtmTx .gid , stmt -> gid );
3806
3812
break ;
3807
3813
default :
3808
3814
break ;
@@ -3960,8 +3966,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3960
3966
standard_ProcessUtility (parsetree , queryString , context ,
3961
3967
params , dest , completionTag );
3962
3968
}
3963
-
3964
- if (MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ && ! MtmVolksWagenMode ) {
3969
+
3970
+ if (! MtmVolksWagenMode && MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3965
3971
elog (ERROR , "Isolation level %s is not supported by multimaster" , isoLevelStr [XactIsoLevel ]);
3966
3972
}
3967
3973
@@ -4147,7 +4153,7 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4147
4153
}
4148
4154
MtmGetGtid (xid , & gtid );
4149
4155
hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
4150
- elog (WARNING , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4156
+ elog (LOG , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4151
4157
if (!hasDeadlock ) {
4152
4158
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
4153
4159
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
0 commit comments