56
56
#include "catalog/indexing.h"
57
57
#include "catalog/namespace.h"
58
58
#include "pglogical_output/hooks.h"
59
+ #include "parser/analyze.h"
60
+ #include "parser/parse_relation.h"
59
61
60
62
#include "multimaster.h"
61
63
#include "ddd.h"
@@ -147,6 +149,7 @@ static void MtmShmemStartup(void);
147
149
static BgwPool * MtmPoolConstructor (void );
148
150
static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg );
149
151
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
152
+ static bool MtmProcessDDLCommand (char const * queryString );
150
153
151
154
MtmState * Mtm ;
152
155
@@ -175,7 +178,8 @@ static TransactionManager MtmTM = {
175
178
MtmGetTransactionStateSize ,
176
179
MtmSerializeTransactionState ,
177
180
MtmDeserializeTransactionState ,
178
- MtmInitializeSequence
181
+ // MtmInitializeSequence
182
+ PgInitializeSequence
179
183
};
180
184
181
185
char const * const MtmNodeStatusMnem [] =
@@ -207,6 +211,8 @@ int MtmHeartbeatRecvTimeout;
207
211
bool MtmUseRaftable ;
208
212
bool MtmUseDtm ;
209
213
214
+ // static int reset_wrokers = 0;
215
+
210
216
static char * MtmConnStrs ;
211
217
static int MtmQueueSize ;
212
218
static int MtmWorkers ;
@@ -228,8 +234,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
228
234
ProcessUtilityContext context , ParamListInfo params ,
229
235
DestReceiver * dest , char * completionTag );
230
236
231
- static StringInfo MtmGUCBuffer ;
232
- static bool MtmGUCBufferAllocated = false;
237
+ // static StringInfo MtmGUCBuffer;
238
+ // static bool MtmGUCBufferAllocated = false;
233
239
234
240
/*
235
241
* -------------------------------------------
@@ -614,7 +620,7 @@ MtmXactCallback(XactEvent event, void *arg)
614
620
{
615
621
switch (event )
616
622
{
617
- case XACT_EVENT_START :
623
+ case XACT_EVENT_START :
618
624
MtmBeginTransaction (& MtmTx );
619
625
break ;
620
626
case XACT_EVENT_PRE_PREPARE :
@@ -1159,8 +1165,8 @@ void MtmHandleApplyError(void)
1159
1165
case ERRCODE_OUT_OF_MEMORY :
1160
1166
elog (WARNING , "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u" ,
1161
1167
edata -> sqlerrcode , edata -> message , getpid ());
1162
- MtmSwitchClusterMode (MTM_OUT_OF_SERVICE );
1163
- kill (PostmasterPid , SIGQUIT );
1168
+ // MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1169
+ // kill(PostmasterPid, SIGQUIT);
1164
1170
break ;
1165
1171
}
1166
1172
FreeErrorData (edata );
@@ -2929,13 +2935,13 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2929
2935
{
2930
2936
if (conns [i ])
2931
2937
{
2932
- if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ], MtmGUCBuffer -> data , & utility_errmsg ) && !ignoreError )
2933
- {
2934
- errorMsg = "Failed to set GUC variables at node %d" ;
2935
- elog (WARNING , "%s" , utility_errmsg );
2936
- failedNode = i ;
2937
- break ;
2938
- }
2938
+ // if (MtmGUCBufferAllocated && !MtmRunUtilityStmt(conns[i], MtmGUCBuffer->data, &utility_errmsg) && !ignoreError)
2939
+ // {
2940
+ // errorMsg = "Failed to set GUC variables at node %d";
2941
+ // elog(WARNING, "%s", utility_errmsg);
2942
+ // failedNode = i;
2943
+ // break;
2944
+ // }
2939
2945
if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
2940
2946
{
2941
2947
errorMsg = "Failed to start transaction at node %d" ;
@@ -2999,7 +3005,7 @@ static bool MtmProcessDDLCommand(char const* queryString)
2999
3005
bool nulls [Natts_mtm_ddl_log ];
3000
3006
TimestampTz ts = GetCurrentTimestamp ();
3001
3007
3002
- rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_DDL_TABLE , -1 );
3008
+ rv = makeRangeVar ("public" , MULTIMASTER_DDL_TABLE , -1 );
3003
3009
rel = heap_openrv_extended (rv , RowExclusiveLock , true);
3004
3010
3005
3011
if (rel == NULL ) {
@@ -3132,18 +3138,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3132
3138
break ;
3133
3139
case T_DiscardStmt :
3134
3140
{
3135
- // DiscardStmt *stmt = (DiscardStmt *) parsetree;
3136
- // skipCommand = stmt->target == DISCARD_TEMP;
3141
+ DiscardStmt * stmt = (DiscardStmt * ) parsetree ;
3142
+ skipCommand = stmt -> target == DISCARD_TEMP ;
3137
3143
3138
- skipCommand = true;
3144
+ // skipCommand = true;
3139
3145
3140
- if (MtmGUCBufferAllocated )
3141
- {
3142
- // XXX: move allocation somewhere to backend startup and check
3143
- // where buffer is empty in send routines.
3144
- MtmGUCBufferAllocated = false;
3145
- pfree (MtmGUCBuffer );
3146
- }
3146
+ // if (MtmGUCBufferAllocated)
3147
+ // {
3148
+ // // XXX: move allocation somewhere to backend startup and check
3149
+ // // where buffer is empty in send routines.
3150
+ // MtmGUCBufferAllocated = false;
3151
+ // pfree(MtmGUCBuffer);
3152
+ // }
3147
3153
3148
3154
}
3149
3155
break ;
@@ -3155,22 +3161,31 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3155
3161
3156
3162
/* Prevent SET TRANSACTION from replication */
3157
3163
if (stmt -> kind == VAR_SET_MULTI )
3158
- break ;
3164
+ // break;
3165
+ skipCommand = true;
3159
3166
3160
- if (!MtmGUCBufferAllocated )
3161
- {
3162
- MemoryContext oldcontext ;
3167
+ // if (!MtmGUCBufferAllocated)
3168
+ // {
3169
+ // MemoryContext oldcontext;
3163
3170
3164
- oldcontext = MemoryContextSwitchTo (TopMemoryContext );
3165
- MtmGUCBuffer = makeStringInfo ();
3166
- MemoryContextSwitchTo (oldcontext );
3167
- MtmGUCBufferAllocated = true;
3168
- }
3171
+ // oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3172
+ // MtmGUCBuffer = makeStringInfo();
3173
+ // MemoryContextSwitchTo(oldcontext);
3174
+ // MtmGUCBufferAllocated = true;
3175
+ // }
3169
3176
3170
- appendStringInfoString (MtmGUCBuffer , queryString );
3177
+ // appendStringInfoString(MtmGUCBuffer, queryString);
3171
3178
3172
3179
// sometimes there is no ';' char at the end.
3173
- appendStringInfoString (MtmGUCBuffer , ";" );
3180
+ // appendStringInfoString(MtmGUCBuffer, ";");
3181
+ }
3182
+ break ;
3183
+ case T_CreateTableAsStmt :
3184
+ {
3185
+ /* Do not replicate temp tables */
3186
+ CreateTableAsStmt * stmt = (CreateTableAsStmt * ) parsetree ;
3187
+ skipCommand = stmt -> into -> rel -> relpersistence == RELPERSISTENCE_TEMP ||
3188
+ (stmt -> into -> rel -> schemaname && strcmp (stmt -> into -> rel -> schemaname , "pg_temp" ) == 0 );
3174
3189
}
3175
3190
break ;
3176
3191
case T_CreateStmt :
@@ -3181,6 +3196,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3181
3196
(stmt -> relation -> schemaname && strcmp (stmt -> relation -> schemaname , "pg_temp" ) == 0 );
3182
3197
}
3183
3198
break ;
3199
+ case T_ViewStmt :
3200
+ {
3201
+ ViewStmt * stmt = (ViewStmt * ) parsetree ;
3202
+ Query * viewParse ;
3203
+
3204
+ viewParse = parse_analyze ((Node * ) copyObject (stmt -> query ),
3205
+ queryString , NULL , 0 );
3206
+ skipCommand = isQueryUsingTempRelation (viewParse );
3207
+ // ||
3208
+ // (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3209
+ }
3210
+ break ;
3184
3211
case T_IndexStmt :
3185
3212
{
3186
3213
Oid relid ;
@@ -3219,6 +3246,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3219
3246
heap_close (rel , ShareLock );
3220
3247
}
3221
3248
}
3249
+ else if (stmt -> removeType == OBJECT_INDEX )
3250
+ {
3251
+ RangeVar * rv = makeRangeVarFromNameList (
3252
+ (List * ) lfirst (list_head (stmt -> objects )));
3253
+ Oid relid = RelnameGetRelid (rv -> relname );
3254
+
3255
+ if (OidIsValid (relid ))
3256
+ {
3257
+ Relation irel = index_open (relid , ShareLock );
3258
+ skipCommand = irel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
3259
+ index_close (irel , ShareLock );
3260
+ }
3261
+ }
3222
3262
}
3223
3263
break ;
3224
3264
case T_CopyStmt :
0 commit comments