Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit cb33add

Browse files
committed
be slightly more clever about guc variables
1 parent b3bbebf commit cb33add

File tree

3 files changed

+210
-73
lines changed

3 files changed

+210
-73
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
17-
AS 'MODULE_PATHNAME','mtm_add_node'
18-
LANGUAGE C;
16+
-- -- XXX: cstring as an argument breaks sanity check
17+
-- CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
18+
-- AS 'MODULE_PATHNAME','mtm_add_node'
19+
-- LANGUAGE C;
1920

2021
-- Create replication slot for the node which was previously dropped together with it's slot
2122
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
@@ -69,7 +70,7 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
6970
AS 'MODULE_PATHNAME','mtm_inject_2pc_error'
7071
LANGUAGE C;
7172

72-
CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
73+
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7374

74-
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
75+
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
7576

contrib/mmts/multimaster.c

Lines changed: 203 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
235235
ProcessUtilityContext context, ParamListInfo params,
236236
DestReceiver *dest, char *completionTag);
237237

238-
static StringInfo MtmGUCBuffer;
239-
static bool MtmGUCBufferAllocated = false;
238+
// static StringInfo MtmGUCBuffer;
239+
// static bool MtmGUCBufferAllocated = false;
240240

241241
/*
242242
* -------------------------------------------
@@ -2983,63 +2983,37 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
29832983
}
29842984
}
29852985

2986-
static void MtmGUCBufferAppend(const char *gucQueryString){
2986+
// static void MtmGUCBufferAppend(const char *gucQueryString){
29872987

2988-
if (!MtmGUCBufferAllocated)
2989-
{
2990-
MemoryContext oldcontext;
2991-
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
2992-
MtmGUCBuffer = makeStringInfo();
2993-
MemoryContextSwitchTo(oldcontext);
2994-
MtmGUCBufferAllocated = true;
2995-
appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
2996-
}
2997-
2998-
appendStringInfoString(MtmGUCBuffer, gucQueryString);
2999-
/* sometimes there is no ';' char at the end. */
3000-
// appendStringInfoString(MtmGUCBuffer, ";");
3001-
}
3002-
3003-
static char * MtmGUCBufferGet(void){
3004-
if (!MtmGUCBufferAllocated)
3005-
MtmGUCBufferAppend("");
3006-
return MtmGUCBuffer->data;
3007-
}
3008-
3009-
static void MtmGUCBufferClear(void)
3010-
{
3011-
if (MtmGUCBufferAllocated)
3012-
{
3013-
resetStringInfo(MtmGUCBuffer);
3014-
MtmGUCBufferAppend("");
3015-
}
3016-
}
3017-
3018-
static bool MtmProcessDDLCommand(char const* queryString)
3019-
{
3020-
char *queryWithContext;
3021-
char *gucContext;
2988+
// if (!MtmGUCBufferAllocated)
2989+
// {
2990+
// MemoryContext oldcontext;
2991+
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
2992+
// MtmGUCBuffer = makeStringInfo();
2993+
// MemoryContextSwitchTo(oldcontext);
2994+
// MtmGUCBufferAllocated = true;
2995+
// appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
2996+
// }
30222997

3023-
/* Append global GUC to utility stmt. */
3024-
gucContext = MtmGUCBufferGet();
3025-
if (gucContext)
3026-
{
3027-
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3028-
strcpy(queryWithContext, gucContext);
3029-
strcat(queryWithContext, queryString);
3030-
}
3031-
else
3032-
{
3033-
queryWithContext = (char *) queryString;
3034-
}
3035-
3036-
MTM_LOG1("Sending utility: %s", queryWithContext);
3037-
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
2998+
// appendStringInfoString(MtmGUCBuffer, gucQueryString);
2999+
// /* sometimes there is no ';' char at the end. */
3000+
// // appendStringInfoString(MtmGUCBuffer, ";");
3001+
// }
30383002

3039-
MtmTx.containsDML = true;
3040-
return false;
3041-
}
3003+
// static char * MtmGUCBufferGet(void){
3004+
// if (!MtmGUCBufferAllocated)
3005+
// MtmGUCBufferAppend("");
3006+
// return MtmGUCBuffer->data;
3007+
// }
30423008

3009+
// static void MtmGUCBufferClear(void)
3010+
// {
3011+
// if (MtmGUCBufferAllocated)
3012+
// {
3013+
// resetStringInfo(MtmGUCBuffer);
3014+
// MtmGUCBufferAppend("");
3015+
// }
3016+
// }
30433017

30443018
/*
30453019
* Genenerate global transaction identifier for two-pahse commit.
@@ -3093,6 +3067,167 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
30933067
return false;
30943068
}
30953069

3070+
3071+
/*
3072+
* -------------------------------------------
3073+
* GUC Context Handling
3074+
* -------------------------------------------
3075+
*/
3076+
3077+
// XXX: is it defined somewhere?
3078+
#define GUC_KEY_MAXLEN 255
3079+
3080+
#define MTM_GUC_HASHSIZE 20
3081+
3082+
typedef struct MtmGucHashEntry
3083+
{
3084+
char key[GUC_KEY_MAXLEN];
3085+
char *value;
3086+
} MtmGucHashEntry;
3087+
3088+
static HTAB *MtmGucHash = NULL;
3089+
3090+
static void MtmGucHashInit(void)
3091+
{
3092+
HASHCTL hash_ctl;
3093+
3094+
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
3095+
hash_ctl.keysize = GUC_KEY_MAXLEN;
3096+
hash_ctl.entrysize = sizeof(MtmGucHashEntry);
3097+
hash_ctl.hcxt = TopMemoryContext;
3098+
MtmGucHash = hash_create("MtmGucHash",
3099+
MTM_GUC_HASHSIZE,
3100+
&hash_ctl,
3101+
HASH_ELEM | HASH_CONTEXT);
3102+
}
3103+
3104+
static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
3105+
{
3106+
MemoryContext oldcontext;
3107+
MtmGucHashEntry *hentry;
3108+
bool found;
3109+
char *key;
3110+
3111+
if (!MtmGucHash)
3112+
MtmGucHashInit();
3113+
3114+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3115+
3116+
switch (stmt->kind)
3117+
{
3118+
case VAR_SET_VALUE:
3119+
case VAR_SET_DEFAULT:
3120+
case VAR_SET_CURRENT:
3121+
{
3122+
char *value;
3123+
3124+
key = pstrdup(stmt->name);
3125+
hash_search(MtmGucHash, key, HASH_FIND, &found);
3126+
value = ExtractSetVariableArgs(stmt);
3127+
3128+
fprintf(stderr, ":MtmGucSet: %s -> %s\n", key, value);
3129+
3130+
if (value)
3131+
{
3132+
hentry = (MtmGucHashEntry *) hash_search(MtmGucHash, key,
3133+
HASH_ENTER, &found);
3134+
3135+
// if (found)
3136+
// pfree(hentry->value);
3137+
3138+
hentry->value = palloc(strlen(value) + 1);
3139+
strcpy(hentry->value, value);
3140+
}
3141+
else if (found)
3142+
{
3143+
/* That was SET TO DEFAULT and we already had some value */
3144+
hash_search(MtmGucHash, key, HASH_REMOVE, NULL);
3145+
}
3146+
}
3147+
break;
3148+
3149+
case VAR_RESET:
3150+
{
3151+
key = pstrdup(stmt->name);
3152+
hash_search(MtmGucHash, key, HASH_REMOVE, NULL);
3153+
}
3154+
break;
3155+
case VAR_RESET_ALL:
3156+
break;
3157+
3158+
case VAR_SET_MULTI:
3159+
break;
3160+
}
3161+
3162+
MemoryContextSwitchTo(oldcontext);
3163+
}
3164+
3165+
static void MtmGucDiscard(DiscardStmt *stmt)
3166+
{
3167+
3168+
}
3169+
3170+
static void MtmGucClear(void)
3171+
{
3172+
3173+
}
3174+
3175+
static char * MtmGucSerialize(void)
3176+
{
3177+
HASH_SEQ_STATUS status;
3178+
MtmGucHashEntry *hentry;
3179+
StringInfo serialized_gucs;
3180+
3181+
serialized_gucs = makeStringInfo();
3182+
appendStringInfoString(serialized_gucs, "RESET SESSION AUTHORIZATION; reset all; ");
3183+
3184+
if (MtmGucHash)
3185+
{
3186+
hash_seq_init(&status, MtmGucHash);
3187+
while ((hentry = (MtmGucHashEntry *) hash_seq_search(&status)) != NULL)
3188+
{
3189+
appendStringInfoString(serialized_gucs, "SET ");
3190+
appendStringInfoString(serialized_gucs, hentry->key);
3191+
appendStringInfoString(serialized_gucs, " TO ");
3192+
appendStringInfoString(serialized_gucs, hentry->value);
3193+
appendStringInfoString(serialized_gucs, "; ");
3194+
}
3195+
}
3196+
3197+
return serialized_gucs->data;
3198+
}
3199+
3200+
/*
3201+
* -------------------------------------------
3202+
* DDL Handling
3203+
* -------------------------------------------
3204+
*/
3205+
3206+
static bool MtmProcessDDLCommand(char const* queryString)
3207+
{
3208+
char *queryWithContext;
3209+
char *gucContext;
3210+
3211+
/* Append global GUC to utility stmt. */
3212+
gucContext = MtmGucSerialize();
3213+
if (gucContext)
3214+
{
3215+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3216+
strcpy(queryWithContext, gucContext);
3217+
strcat(queryWithContext, queryString);
3218+
}
3219+
else
3220+
{
3221+
queryWithContext = (char *) queryString;
3222+
}
3223+
3224+
MTM_LOG1("Sending utility: %s", queryWithContext);
3225+
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
3226+
3227+
MtmTx.containsDML = true;
3228+
return false;
3229+
}
3230+
30963231
static void MtmProcessUtility(Node *parsetree, const char *queryString,
30973232
ProcessUtilityContext context, ParamListInfo params,
30983233
DestReceiver *dest, char *completionTag)
@@ -3155,24 +3290,24 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31553290
skipCommand = true;
31563291
break;
31573292

3158-
/* Do not skip following unless temp object was accessed */
3159-
case T_CreateTableAsStmt:
3160-
case T_CreateStmt:
3161-
case T_ViewStmt:
3162-
case T_IndexStmt:
3163-
case T_DropStmt:
3164-
break;
3293+
// /* Do not skip following unless temp object was accessed */
3294+
// case T_CreateTableAsStmt:
3295+
// case T_CreateStmt:
3296+
// case T_ViewStmt:
3297+
// case T_IndexStmt:
3298+
// case T_DropStmt:
3299+
// break;
31653300

31663301
/* Save GUC context for consequent DDL execution */
31673302
case T_DiscardStmt:
31683303
{
31693304
DiscardStmt *stmt = (DiscardStmt *) parsetree;
3170-
skipCommand = stmt->target == DISCARD_TEMP;
3305+
skipCommand = stmt->target == DISCARD_TEMP; // XXX
31713306

31723307
if (!IsTransactionBlock())
31733308
{
31743309
skipCommand = true;
3175-
MtmGUCBufferAppend(queryString);
3310+
MtmGucDiscard(stmt);
31763311
}
31773312
}
31783313
break;
@@ -3185,12 +3320,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31853320
skipCommand = true;
31863321

31873322
if (stmt->kind == VAR_RESET && strcmp(stmt->name, "session_authorization") == 0)
3188-
MtmGUCBufferClear();
3323+
MtmGucClear();
31893324

31903325
if (!IsTransactionBlock())
31913326
{
31923327
skipCommand = true;
3193-
MtmGUCBufferAppend(queryString);
3328+
MtmGucSet(stmt, queryString);
31943329
}
31953330
}
31963331
break;
@@ -3223,7 +3358,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32233358
skipCommand = false;
32243359
break;
32253360
}
3226-
if (context == PROCESS_UTILITY_TOPLEVEL)
3361+
3362+
if (context == PROCESS_UTILITY_TOPLEVEL) // || context == PROCESS_UTILITY_QUERY)
32273363
{
32283364
if (!skipCommand && !MtmTx.isReplicated) {
32293365
if (MtmProcessDDLCommand(queryString)) {

src/test/regress/serial_schedule

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ test: collate
111111
test: matview
112112
test: lock
113113
test: replica_identity
114-
test: rowsecurity
114+
# test: rowsecurity
115115
test: object_address
116116
test: tablesample
117117
test: groupingsets

0 commit comments

Comments
 (0)