Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d941a6a

Browse files
committed
be slightly more clever about guc variables
1 parent 5ff17df commit d941a6a

File tree

2 files changed

+211
-80
lines changed

2 files changed

+211
-80
lines changed

multimaster--1.0.sql

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
17-
AS 'MODULE_PATHNAME','mtm_add_node'
18-
LANGUAGE C;
16+
-- -- XXX: cstring as an argument breaks sanity check
17+
-- CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
18+
-- AS 'MODULE_PATHNAME','mtm_add_node'
19+
-- LANGUAGE C;
1920

2021
-- Create replication slot for the node which was previously dropped together with it's slot
2122
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
@@ -69,7 +70,7 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
6970
AS 'MODULE_PATHNAME','mtm_inject_2pc_error'
7071
LANGUAGE C;
7172

72-
CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
73+
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7374

74-
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
75+
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
7576

multimaster.c

Lines changed: 205 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
234234
ProcessUtilityContext context, ParamListInfo params,
235235
DestReceiver *dest, char *completionTag);
236236

237-
static StringInfo MtmGUCBuffer;
238-
static bool MtmGUCBufferAllocated = false;
237+
// static StringInfo MtmGUCBuffer;
238+
// static bool MtmGUCBufferAllocated = false;
239239

240240
/*
241241
* -------------------------------------------
@@ -3171,63 +3171,37 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
31713171
}
31723172
}
31733173

3174-
static void MtmGUCBufferAppend(const char *gucQueryString){
3174+
// static void MtmGUCBufferAppend(const char *gucQueryString){
31753175

3176-
if (!MtmGUCBufferAllocated)
3177-
{
3178-
MemoryContext oldcontext;
3179-
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3180-
MtmGUCBuffer = makeStringInfo();
3181-
MemoryContextSwitchTo(oldcontext);
3182-
MtmGUCBufferAllocated = true;
3183-
appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
3184-
}
3176+
// if (!MtmGUCBufferAllocated)
3177+
// {
3178+
// MemoryContext oldcontext;
3179+
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3180+
// MtmGUCBuffer = makeStringInfo();
3181+
// MemoryContextSwitchTo(oldcontext);
3182+
// MtmGUCBufferAllocated = true;
3183+
// appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
3184+
// }
31853185

3186-
appendStringInfoString(MtmGUCBuffer, gucQueryString);
3187-
/* sometimes there is no ';' char at the end. */
3188-
// appendStringInfoString(MtmGUCBuffer, ";");
3189-
}
3190-
3191-
static char * MtmGUCBufferGet(void){
3192-
if (!MtmGUCBufferAllocated)
3193-
MtmGUCBufferAppend("");
3194-
return MtmGUCBuffer->data;
3195-
}
3186+
// appendStringInfoString(MtmGUCBuffer, gucQueryString);
3187+
// /* sometimes there is no ';' char at the end. */
3188+
// // appendStringInfoString(MtmGUCBuffer, ";");
3189+
// }
31963190

3197-
static void MtmGUCBufferClear(void)
3198-
{
3199-
if (MtmGUCBufferAllocated)
3200-
{
3201-
resetStringInfo(MtmGUCBuffer);
3202-
MtmGUCBufferAppend("");
3203-
}
3204-
}
3205-
3206-
static bool MtmProcessDDLCommand(char const* queryString)
3207-
{
3208-
char *queryWithContext;
3209-
char *gucContext;
3210-
3211-
/* Append global GUC to utility stmt. */
3212-
gucContext = MtmGUCBufferGet();
3213-
if (gucContext)
3214-
{
3215-
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3216-
strcpy(queryWithContext, gucContext);
3217-
strcat(queryWithContext, queryString);
3218-
}
3219-
else
3220-
{
3221-
queryWithContext = (char *) queryString;
3222-
}
3223-
3224-
MTM_LOG1("Sending utility: %s", queryWithContext);
3225-
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
3226-
3227-
MtmTx.containsDML = true;
3228-
return false;
3229-
}
3191+
// static char * MtmGUCBufferGet(void){
3192+
// if (!MtmGUCBufferAllocated)
3193+
// MtmGUCBufferAppend("");
3194+
// return MtmGUCBuffer->data;
3195+
// }
32303196

3197+
// static void MtmGUCBufferClear(void)
3198+
// {
3199+
// if (MtmGUCBufferAllocated)
3200+
// {
3201+
// resetStringInfo(MtmGUCBuffer);
3202+
// MtmGUCBufferAppend("");
3203+
// }
3204+
// }
32313205

32323206
/*
32333207
* Genenerate global transaction identifier for two-pahse commit.
@@ -3280,6 +3254,167 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
32803254
return false;
32813255
}
32823256

3257+
3258+
/*
3259+
* -------------------------------------------
3260+
* GUC Context Handling
3261+
* -------------------------------------------
3262+
*/
3263+
3264+
// XXX: is it defined somewhere?
3265+
#define GUC_KEY_MAXLEN 255
3266+
3267+
#define MTM_GUC_HASHSIZE 20
3268+
3269+
typedef struct MtmGucHashEntry
3270+
{
3271+
char key[GUC_KEY_MAXLEN];
3272+
char *value;
3273+
} MtmGucHashEntry;
3274+
3275+
static HTAB *MtmGucHash = NULL;
3276+
3277+
static void MtmGucHashInit(void)
3278+
{
3279+
HASHCTL hash_ctl;
3280+
3281+
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
3282+
hash_ctl.keysize = GUC_KEY_MAXLEN;
3283+
hash_ctl.entrysize = sizeof(MtmGucHashEntry);
3284+
hash_ctl.hcxt = TopMemoryContext;
3285+
MtmGucHash = hash_create("MtmGucHash",
3286+
MTM_GUC_HASHSIZE,
3287+
&hash_ctl,
3288+
HASH_ELEM | HASH_CONTEXT);
3289+
}
3290+
3291+
static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
3292+
{
3293+
MemoryContext oldcontext;
3294+
MtmGucHashEntry *hentry;
3295+
bool found;
3296+
char *key;
3297+
3298+
if (!MtmGucHash)
3299+
MtmGucHashInit();
3300+
3301+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3302+
3303+
switch (stmt->kind)
3304+
{
3305+
case VAR_SET_VALUE:
3306+
case VAR_SET_DEFAULT:
3307+
case VAR_SET_CURRENT:
3308+
{
3309+
char *value;
3310+
3311+
key = pstrdup(stmt->name);
3312+
hash_search(MtmGucHash, key, HASH_FIND, &found);
3313+
value = ExtractSetVariableArgs(stmt);
3314+
3315+
fprintf(stderr, ":MtmGucSet: %s -> %s\n", key, value);
3316+
3317+
if (value)
3318+
{
3319+
hentry = (MtmGucHashEntry *) hash_search(MtmGucHash, key,
3320+
HASH_ENTER, &found);
3321+
3322+
// if (found)
3323+
// pfree(hentry->value);
3324+
3325+
hentry->value = palloc(strlen(value) + 1);
3326+
strcpy(hentry->value, value);
3327+
}
3328+
else if (found)
3329+
{
3330+
/* That was SET TO DEFAULT and we already had some value */
3331+
hash_search(MtmGucHash, key, HASH_REMOVE, NULL);
3332+
}
3333+
}
3334+
break;
3335+
3336+
case VAR_RESET:
3337+
{
3338+
key = pstrdup(stmt->name);
3339+
hash_search(MtmGucHash, key, HASH_REMOVE, NULL);
3340+
}
3341+
break;
3342+
case VAR_RESET_ALL:
3343+
break;
3344+
3345+
case VAR_SET_MULTI:
3346+
break;
3347+
}
3348+
3349+
MemoryContextSwitchTo(oldcontext);
3350+
}
3351+
3352+
static void MtmGucDiscard(DiscardStmt *stmt)
3353+
{
3354+
3355+
}
3356+
3357+
static void MtmGucClear(void)
3358+
{
3359+
3360+
}
3361+
3362+
static char * MtmGucSerialize(void)
3363+
{
3364+
HASH_SEQ_STATUS status;
3365+
MtmGucHashEntry *hentry;
3366+
StringInfo serialized_gucs;
3367+
3368+
serialized_gucs = makeStringInfo();
3369+
appendStringInfoString(serialized_gucs, "RESET SESSION AUTHORIZATION; reset all; ");
3370+
3371+
if (MtmGucHash)
3372+
{
3373+
hash_seq_init(&status, MtmGucHash);
3374+
while ((hentry = (MtmGucHashEntry *) hash_seq_search(&status)) != NULL)
3375+
{
3376+
appendStringInfoString(serialized_gucs, "SET ");
3377+
appendStringInfoString(serialized_gucs, hentry->key);
3378+
appendStringInfoString(serialized_gucs, " TO ");
3379+
appendStringInfoString(serialized_gucs, hentry->value);
3380+
appendStringInfoString(serialized_gucs, "; ");
3381+
}
3382+
}
3383+
3384+
return serialized_gucs->data;
3385+
}
3386+
3387+
/*
3388+
* -------------------------------------------
3389+
* DDL Handling
3390+
* -------------------------------------------
3391+
*/
3392+
3393+
static bool MtmProcessDDLCommand(char const* queryString)
3394+
{
3395+
char *queryWithContext;
3396+
char *gucContext;
3397+
3398+
/* Append global GUC to utility stmt. */
3399+
gucContext = MtmGucSerialize();
3400+
if (gucContext)
3401+
{
3402+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3403+
strcpy(queryWithContext, gucContext);
3404+
strcat(queryWithContext, queryString);
3405+
}
3406+
else
3407+
{
3408+
queryWithContext = (char *) queryString;
3409+
}
3410+
3411+
MTM_LOG1("Sending utility: %s", queryWithContext);
3412+
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
3413+
3414+
MtmTx.containsDML = true;
3415+
return false;
3416+
}
3417+
32833418
static void MtmProcessUtility(Node *parsetree, const char *queryString,
32843419
ProcessUtilityContext context, ParamListInfo params,
32853420
DestReceiver *dest, char *completionTag)
@@ -3339,30 +3474,24 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
33393474
skipCommand = true;
33403475
break;
33413476

3342-
/* Do not skip following unless temp object was accessed */
3343-
case T_CreateTableAsStmt:
3344-
case T_CreateStmt:
3345-
case T_ViewStmt:
3346-
case T_IndexStmt:
3347-
case T_DropStmt:
3348-
break;
3477+
// /* Do not skip following unless temp object was accessed */
3478+
// case T_CreateTableAsStmt:
3479+
// case T_CreateStmt:
3480+
// case T_ViewStmt:
3481+
// case T_IndexStmt:
3482+
// case T_DropStmt:
3483+
// break;
33493484

33503485
/* Save GUC context for consequent DDL execution */
33513486
case T_DiscardStmt:
33523487
{
3353-
/*
3354-
* DiscardStmt *stmt = (DiscardStmt *) parsetree;
3355-
* skipCommand = stmt->target == DISCARD_TEMP;
3356-
*/
3488+
DiscardStmt *stmt = (DiscardStmt *) parsetree;
3489+
skipCommand = stmt->target == DISCARD_TEMP; // XXX
33573490

33583491
if (!IsTransactionBlock())
33593492
{
3360-
/*
3361-
* XXX: move allocation somewhere to backend startup and check
3362-
* where buffer is empty in send routines.
3363-
*/
3364-
MtmGUCBufferAllocated = false;
3365-
pfree(MtmGUCBuffer);
3493+
skipCommand = true;
3494+
MtmGucDiscard(stmt);
33663495
}
33673496
}
33683497
break;
@@ -3406,12 +3535,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
34063535
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
34073536

34083537
if (stmt->kind == VAR_RESET && strcmp(stmt->name, "session_authorization") == 0)
3409-
MtmGUCBufferClear();
3538+
MtmGucClear();
34103539

34113540
if (!IsTransactionBlock())
34123541
{
34133542
skipCommand = true;
3414-
MtmGUCBufferAppend(queryString);
3543+
MtmGucSet(stmt, queryString);
34153544
}
34163545
}
34173546
break;
@@ -3444,7 +3573,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
34443573
skipCommand = false;
34453574
break;
34463575
}
3447-
if (context == PROCESS_UTILITY_TOPLEVEL)
3576+
3577+
if (context == PROCESS_UTILITY_TOPLEVEL) // || context == PROCESS_UTILITY_QUERY)
34483578
{
34493579
if (!skipCommand && !MtmTx.isReplicated) {
34503580
if (MtmProcessDDLCommand(queryString)) {

0 commit comments

Comments
 (0)