@@ -180,8 +180,7 @@ static TransactionManager MtmTM = {
180
180
MtmGetTransactionStateSize ,
181
181
MtmSerializeTransactionState ,
182
182
MtmDeserializeTransactionState ,
183
- // MtmInitializeSequence
184
- PgInitializeSequence
183
+ MtmInitializeSequence
185
184
};
186
185
187
186
char const * const MtmNodeStatusMnem [] =
@@ -198,7 +197,6 @@ char const* const MtmNodeStatusMnem[] =
198
197
bool MtmDoReplication ;
199
198
char * MtmDatabaseName ;
200
199
char * MtmDatabaseUser ;
201
- char * MtmUtilityStmt = NULL ;
202
200
203
201
int MtmNodes ;
204
202
int MtmNodeId ;
@@ -214,6 +212,7 @@ int MtmHeartbeatSendTimeout;
214
212
int MtmHeartbeatRecvTimeout ;
215
213
bool MtmUseRaftable ;
216
214
bool MtmUseDtm ;
215
+ bool MtmVolksWagenMode ;
217
216
218
217
static char * MtmConnStrs ;
219
218
static int MtmQueueSize ;
@@ -236,9 +235,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
236
235
ProcessUtilityContext context , ParamListInfo params ,
237
236
DestReceiver * dest , char * completionTag );
238
237
239
- // static StringInfo MtmGUCBuffer;
240
- // static bool MtmGUCBufferAllocated = false;
241
-
242
238
/*
243
239
* -------------------------------------------
244
240
* Synchronize access to MTM structures.
@@ -365,8 +361,16 @@ MtmDeserializeTransactionState(void* ctx)
365
361
static void
366
362
MtmInitializeSequence (int64 * start , int64 * step )
367
363
{
368
- * start = MtmNodeId ;
369
- * step = MtmMaxNodes ;
364
+ if (MtmVolksWagenMode )
365
+ {
366
+ * start = 1 ;
367
+ * step = 1 ;
368
+ }
369
+ else
370
+ {
371
+ * start = MtmNodeId ;
372
+ * step = MtmMaxNodes ;
373
+ }
370
374
}
371
375
372
376
@@ -683,10 +687,6 @@ static const char* const isoLevelStr[] =
683
687
static void
684
688
MtmBeginTransaction (MtmCurrentTrans * x )
685
689
{
686
- if (MtmUtilityStmt )
687
- pfree (MtmUtilityStmt );
688
- MtmUtilityStmt = NULL ;
689
-
690
690
if (x -> snapshot == INVALID_CSN ) {
691
691
TransactionId xmin = (Mtm -> gcCount >= MtmGcPeriod ) ? PgGetOldestXmin (NULL , false) : InvalidTransactionId ; /* Get oldest xmin outside critical section */
692
692
@@ -2070,6 +2070,19 @@ _PG_init(void)
2070
2070
NULL
2071
2071
);
2072
2072
2073
+ DefineCustomBoolVariable (
2074
+ "multimaster.volkswagen_mode" ,
2075
+ "Pretend to be normal postgres. This means skip some NOTICE's and use local sequences. Default false." ,
2076
+ NULL ,
2077
+ & MtmVolksWagenMode ,
2078
+ false,
2079
+ PGC_BACKEND ,
2080
+ 0 ,
2081
+ NULL ,
2082
+ NULL ,
2083
+ NULL
2084
+ );
2085
+
2073
2086
DefineCustomIntVariable (
2074
2087
"multimaster.workers" ,
2075
2088
"Number of multimaster executor workers per node" ,
@@ -2936,13 +2949,6 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2936
2949
{
2937
2950
if (conns [i ])
2938
2951
{
2939
- // if (MtmGUCBufferAllocated && !MtmRunUtilityStmt(conns[i], MtmGUCBuffer->data, &utility_errmsg) && !ignoreError)
2940
- // {
2941
- // errorMsg = "Failed to set GUC variables at node %d";
2942
- // elog(WARNING, "%s", utility_errmsg);
2943
- // failedNode = i;
2944
- // break;
2945
- // }
2946
2952
if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
2947
2953
{
2948
2954
errorMsg = "Failed to start transaction at node %d" ;
@@ -2996,38 +3002,6 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2996
3002
}
2997
3003
}
2998
3004
2999
- // static void MtmGUCBufferAppend(const char *gucQueryString){
3000
-
3001
- // if (!MtmGUCBufferAllocated)
3002
- // {
3003
- // MemoryContext oldcontext;
3004
- // oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3005
- // MtmGUCBuffer = makeStringInfo();
3006
- // MemoryContextSwitchTo(oldcontext);
3007
- // MtmGUCBufferAllocated = true;
3008
- // appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
3009
- // }
3010
-
3011
- // appendStringInfoString(MtmGUCBuffer, gucQueryString);
3012
- // /* sometimes there is no ';' char at the end. */
3013
- // // appendStringInfoString(MtmGUCBuffer, ";");
3014
- // }
3015
-
3016
- // static char * MtmGUCBufferGet(void){
3017
- // if (!MtmGUCBufferAllocated)
3018
- // MtmGUCBufferAppend("");
3019
- // return MtmGUCBuffer->data;
3020
- // }
3021
-
3022
- // static void MtmGUCBufferClear(void)
3023
- // {
3024
- // if (MtmGUCBufferAllocated)
3025
- // {
3026
- // resetStringInfo(MtmGUCBuffer);
3027
- // MtmGUCBufferAppend("");
3028
- // }
3029
- // }
3030
-
3031
3005
/*
3032
3006
* Genenerate global transaction identifier for two-pahse commit.
3033
3007
* It should be unique for all nodes
@@ -3045,11 +3019,12 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
3045
3019
{
3046
3020
/*
3047
3021
* XXX: this tx anyway goes to subscribers later, but without
3048
- * surrounding begin/commit. Probably there is more clever way
3049
- * to do that.
3022
+ * surrounding begin/commit. Now it will be filtered out on receiver side.
3023
+ * Probably there is more clever way to do that.
3050
3024
*/
3051
3025
x -> isDistributed = false;
3052
- x -> csn = NULL ;
3026
+ if (!MtmVolksWagenMode )
3027
+ elog (NOTICE , "MTM: Transaction was not replicated as it accesed temporary relation" );
3053
3028
return false;
3054
3029
}
3055
3030
@@ -3247,8 +3222,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3247
3222
{
3248
3223
bool skipCommand = false;
3249
3224
3250
- // skipCommand = MyXactAccessedTempRel;
3251
-
3252
3225
MTM_LOG3 ("%d: Process utility statement %s" , MyProcPid , queryString );
3253
3226
switch (nodeTag (parsetree ))
3254
3227
{
@@ -3304,19 +3277,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3304
3277
skipCommand = true;
3305
3278
break ;
3306
3279
3307
- // /* Do not skip following unless temp object was accessed */
3308
- // case T_CreateTableAsStmt:
3309
- // case T_CreateStmt:
3310
- // case T_ViewStmt:
3311
- // case T_IndexStmt:
3312
- // case T_DropStmt:
3313
- // break;
3314
-
3315
3280
/* Save GUC context for consequent DDL execution */
3316
3281
case T_DiscardStmt :
3317
3282
{
3318
3283
DiscardStmt * stmt = (DiscardStmt * ) parsetree ;
3319
- skipCommand = stmt -> target == DISCARD_TEMP ; // XXX
3320
3284
3321
3285
if (!IsTransactionBlock ())
3322
3286
{
0 commit comments