@@ -126,7 +126,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId *subxids, int
126
126
static void MtmShmemStartup (void );
127
127
128
128
static BgwPool * MtmPoolConstructor (void );
129
- static bool MtmRunUtilityStmt (PGconn * conn , char const * sql );
129
+ static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg );
130
130
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
131
131
132
132
MtmState * Mtm ;
@@ -1791,14 +1791,24 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
1791
1791
/*
1792
1792
* Execute statement with specified parameters and check its result
1793
1793
*/
1794
- static bool MtmRunUtilityStmt (PGconn * conn , char const * sql )
1794
+ static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg )
1795
1795
{
1796
1796
PGresult * result = PQexec (conn , sql );
1797
1797
int status = PQresultStatus (result );
1798
+ char * errstr ;
1799
+
1798
1800
bool ret = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK ;
1799
- if (!ret ) {
1800
- elog (WARNING , "Command '%s' failed with status %d" , sql , status );
1801
+
1802
+ if (!ret ) {
1803
+ char * errstr = PQresultErrorMessage (result );
1804
+ int errlen = strlen (errstr );
1805
+
1806
+ * errmsg = palloc0 (errlen );
1807
+
1808
+ /* Strip "ERROR:\t" from beginning and "\n" from end of error string */
1809
+ strncpy (* errmsg , errstr + 7 , errlen - 1 - 7 );
1801
1810
}
1811
+
1802
1812
PQclear (result );
1803
1813
return ret ;
1804
1814
}
@@ -1812,6 +1822,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
1812
1822
int failedNode = -1 ;
1813
1823
char const * errorMsg = NULL ;
1814
1824
PGconn * * conns = palloc0 (sizeof (PGconn * )* MtmNodes );
1825
+ char * utility_errmsg ;
1815
1826
1816
1827
while (conn_str < conn_str_end )
1817
1828
{
@@ -1847,15 +1858,18 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
1847
1858
{
1848
1859
if (conns [i ])
1849
1860
{
1850
- if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" ) && !ignoreError )
1861
+ if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
1851
1862
{
1852
1863
errorMsg = "Failed to start transaction at node %d" ;
1853
1864
failedNode = i ;
1854
1865
break ;
1855
1866
}
1856
- if (!MtmRunUtilityStmt (conns [i ], sql ) && !ignoreError )
1867
+ if (!MtmRunUtilityStmt (conns [i ], sql , & utility_errmsg ) && !ignoreError )
1857
1868
{
1858
- errorMsg = "Failed to run command at node %d" ;
1869
+ // errorMsg = "Failed to run command at node %d";
1870
+ // XXX: add check for our node
1871
+ errorMsg = utility_errmsg ;
1872
+
1859
1873
failedNode = i ;
1860
1874
break ;
1861
1875
}
@@ -1867,13 +1881,13 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
1867
1881
{
1868
1882
if (conns [i ])
1869
1883
{
1870
- MtmRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" );
1884
+ MtmRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" , & utility_errmsg );
1871
1885
}
1872
1886
}
1873
1887
} else {
1874
1888
for (i = 0 ; i < MtmNodes ; i ++ )
1875
1889
{
1876
- if (conns [i ] && !MtmRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" ) && !ignoreError )
1890
+ if (conns [i ] && !MtmRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" , & utility_errmsg ) && !ignoreError )
1877
1891
{
1878
1892
errorMsg = "Commit failed at node %d" ;
1879
1893
failedNode = i ;
@@ -1955,8 +1969,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
1955
1969
{
1956
1970
if (x -> isDistributed && x -> containsDML ) {
1957
1971
MtmGenerateGid (x -> gid );
1958
- if (!IsTransactionBlock () ) {
1959
- elog (WARNING , "Start transaction block for %d " , x -> xid );
1972
+ if (!x -> isTransactionBlock ) {
1973
+ /* elog(WARNING, "Start transaction block for %s ", x->gid); */
1960
1974
BeginTransactionBlock ();
1961
1975
CommitTransactionCommand ();
1962
1976
StartTransactionCommand ();
@@ -2048,6 +2062,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2048
2062
case T_ReindexStmt :
2049
2063
skipCommand = true;
2050
2064
break ;
2065
+ case T_CreateStmt :
2066
+ {
2067
+ /* Do not replicate temp tables */
2068
+ CreateStmt * stmt = (CreateStmt * ) parsetree ;
2069
+ skipCommand = stmt -> relation -> relpersistence == RELPERSISTENCE_TEMP ;
2070
+ }
2071
+ break ;
2051
2072
default :
2052
2073
skipCommand = false;
2053
2074
break ;
0 commit comments