@@ -111,6 +111,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111
111
static void MtmPostPrepareTransaction (MtmCurrentTrans * x );
112
112
static void MtmAbortPreparedTransaction (MtmCurrentTrans * x );
113
113
static void MtmEndTransaction (MtmCurrentTrans * x , bool commit );
114
+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x );
114
115
static TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum );
115
116
static bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
116
117
static TransactionId MtmAdjustOldestXid (TransactionId xid );
@@ -588,6 +589,11 @@ MtmXactCallback(XactEvent event, void *arg)
588
589
case XACT_EVENT_ABORT :
589
590
MtmEndTransaction (& MtmTx , false);
590
591
break ;
592
+ case XACT_EVENT_COMMIT_COMMAND :
593
+ if (!IsTransactionBlock ()) {
594
+ MtmTwoPhaseCommit (& MtmTx );
595
+ }
596
+ break ;
591
597
default :
592
598
break ;
593
599
}
@@ -1922,33 +1928,33 @@ MtmGenerateGid(char* gid)
1922
1928
sprintf (gid , "MTM-%d-%d-%d" , MtmNodeId , MyProcPid , ++ localCount );
1923
1929
}
1924
1930
1925
- static void MtmTwoPhaseCommit (char * completionTag )
1931
+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x )
1926
1932
{
1927
- MtmGenerateGid (MtmTx .gid );
1928
- if (!IsTransactionBlock ()) {
1929
- elog (WARNING , "Start transaction block for %d" , MtmTx .xid );
1930
- BeginTransactionBlock ();
1931
- CommitTransactionCommand ();
1932
- StartTransactionCommand ();
1933
- }
1934
- if (!PrepareTransactionBlock (MtmTx .gid ))
1935
- {
1936
- elog (WARNING , "Failed to prepare transaction %s" , MtmTx .gid );
1937
- /* report unsuccessful commit in completionTag */
1938
- if (completionTag ) {
1939
- strcpy (completionTag , "ROLLBACK" );
1933
+ if (x -> isDistributed && x -> containsDML ) {
1934
+ MtmGenerateGid (x -> gid );
1935
+ if (!IsTransactionBlock ()) {
1936
+ elog (WARNING , "Start transaction block for %d" , x -> xid );
1937
+ BeginTransactionBlock ();
1938
+ CommitTransactionCommand ();
1939
+ StartTransactionCommand ();
1940
1940
}
1941
- /* ??? Should we do explicit rollback */
1942
- } else {
1943
- CommitTransactionCommand ();
1944
- StartTransactionCommand ();
1945
- if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1946
- FinishPreparedTransaction (MtmTx .gid , false);
1947
- elog (ERROR , "Transaction %s is aborted by DTM" , MtmTx .gid );
1948
- } else {
1949
- FinishPreparedTransaction (MtmTx .gid , true);
1941
+ if (!PrepareTransactionBlock (x -> gid ))
1942
+ {
1943
+ elog (WARNING , "Failed to prepare transaction %s" , x -> gid );
1944
+ /* ??? Should we do explicit rollback */
1945
+ } else {
1946
+ CommitTransactionCommand ();
1947
+ StartTransactionCommand ();
1948
+ if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1949
+ FinishPreparedTransaction (x -> gid , false);
1950
+ elog (ERROR , "Transaction %s is aborted by DTM" , x -> gid );
1951
+ } else {
1952
+ FinishPreparedTransaction (x -> gid , true);
1953
+ }
1950
1954
}
1955
+ return true;
1951
1956
}
1957
+ return false;
1952
1958
}
1953
1959
1954
1960
static void MtmProcessUtility (Node * parsetree , const char * queryString ,
@@ -1965,8 +1971,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
1965
1971
switch (stmt -> kind )
1966
1972
{
1967
1973
case TRANS_STMT_COMMIT :
1968
- if (MtmTx .isDistributed && MtmTx .containsDML ) {
1969
- MtmTwoPhaseCommit (completionTag );
1974
+ if (MtmTwoPhaseCommit (& MtmTx )) {
1970
1975
return ;
1971
1976
}
1972
1977
break ;
@@ -2002,9 +2007,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2002
2007
if (MtmProcessDDLCommand (queryString )) {
2003
2008
return ;
2004
2009
}
2005
- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2006
- MtmTwoPhaseCommit (completionTag );
2007
- }
2008
2010
}
2009
2011
if (PreviousProcessUtilityHook != NULL )
2010
2012
{
@@ -2035,7 +2037,7 @@ MtmExecutorFinish(QueryDesc *queryDesc)
2035
2037
}
2036
2038
}
2037
2039
if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2038
- MtmTwoPhaseCommit (NULL );
2040
+ MtmTwoPhaseCommit (& MtmTx );
2039
2041
}
2040
2042
}
2041
2043
if (PreviousExecutorFinishHook != NULL )
0 commit comments