Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1f46538

Browse files
knizhnikkelvich
authored andcommitted
Reset GUC context at transaction begin
1 parent d065889 commit 1f46538

File tree

3 files changed

+36
-30
lines changed

3 files changed

+36
-30
lines changed

multimaster.c

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static void MtmShmemStartup(void);
156156
static BgwPool* MtmPoolConstructor(void);
157157
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
158158
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
159-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
159+
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
160160

161161
MtmState* Mtm;
162162

@@ -175,7 +175,8 @@ static MtmConnectionInfo* MtmConnections;
175175
static MtmCurrentTrans MtmTx;
176176
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
177177

178-
static TransactionManager MtmTM = {
178+
static TransactionManager MtmTM =
179+
{
179180
PgTransactionIdGetStatus,
180181
PgTransactionIdSetTreeStatus,
181182
MtmGetSnapshot,
@@ -4431,18 +4432,29 @@ char* MtmGucSerialize(void)
44314432
* -------------------------------------------
44324433
*/
44334434

4434-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
4435+
static void MtmProcessDDLCommand(char const* queryString, bool transactional)
44354436
{
4437+
char* gucCtx = MtmGucSerialize();
4438+
if (*gucCtx) {
4439+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s; %s", gucCtx, queryString);
4440+
} else {
4441+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", queryString);
4442+
}
44364443
MTM_LOG3("Sending utility: %s", queryString);
44374444
if (transactional) {
44384445
/* DDL */
44394446
LogLogicalMessage("D", queryString, strlen(queryString) + 1, true);
44404447
MtmTx.containsDML = true;
4441-
} else {
4448+
} else {
4449+
char* gucCtx = MtmGucSerialize();
4450+
return;
4451+
if (*gucCtx) {
4452+
queryString = psprintf("%s; %s", gucCtx, queryString);
4453+
}
44424454
/* CONCURRENT DDL */
44434455
XLogFlush(LogLogicalMessage("C", queryString, strlen(queryString) + 1, false));
44444456
}
4445-
return false;
4457+
return;
44464458
}
44474459

44484460
static void MtmFinishDDLCommand()
@@ -4530,16 +4542,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45304542

45314543
case T_VacuumStmt:
45324544
skipCommand = true;
4533-
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4534-
// MtmProcessDDLCommand(queryString, false, true);
4535-
// MtmTx.isDistributed = false;
4536-
// } else if (MtmApplyContext != NULL) {
4537-
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4538-
// Assert(oldContext != MtmApplyContext);
4539-
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4540-
// MemoryContextSwitchTo(oldContext);
4541-
// return;
4542-
// }
4545+
#if 0
4546+
if (context == PROCESS_UTILITY_TOPLEVEL) {
4547+
MtmProcessDDLCommand(queryString, false);
4548+
MtmTx.isDistributed = false;
4549+
} else if (MtmApplyContext != NULL) {
4550+
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4551+
Assert(oldContext != MtmApplyContext);
4552+
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4553+
MemoryContextSwitchTo(oldContext);
4554+
return;
4555+
}
4556+
#endif
45434557
break;
45444558

45454559
case T_CreateDomainStmt:
@@ -4634,7 +4648,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
46344648
if (indexStmt->concurrent)
46354649
{
46364650
if (context == PROCESS_UTILITY_TOPLEVEL) {
4637-
// MtmProcessDDLCommand(queryString, false, true);
4651+
MtmProcessDDLCommand(queryString, false);
46384652
MtmTx.isDistributed = false;
46394653
skipCommand = true;
46404654
/*
@@ -4661,7 +4675,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
46614675
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
46624676
{
46634677
if (context == PROCESS_UTILITY_TOPLEVEL) {
4664-
// MtmProcessDDLCommand(queryString, false, true);
4678+
MtmProcessDDLCommand(queryString, false);
46654679
MtmTx.isDistributed = false;
46664680
skipCommand = true;
46674681
} else if (MtmApplyContext != NULL) {

pglogical_apply.c

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,11 @@ process_remote_begin(StringInfo s)
337337
{
338338
GlobalTransactionId gtid;
339339
csn_t snapshot;
340-
char const* gucCtx;
341340
int rc;
342341

343342
gtid.node = pq_getmsgint(s, 4);
344343
gtid.xid = pq_getmsgint(s, 4);
345344
snapshot = pq_getmsgint64(s);
346-
gucCtx = pq_getmsgstring(s);
347345

348346
Assert(gtid.node > 0);
349347

@@ -359,19 +357,13 @@ process_remote_begin(StringInfo s)
359357
StartTransactionCommand();
360358
MtmJoinTransaction(&gtid, snapshot);
361359

362-
if (*gucCtx || GucAltered) {
360+
if (GucAltered) {
363361
SPI_connect();
364-
if (GucAltered) {
365-
GucAltered = *gucCtx != '\0';
366-
gucCtx = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", gucCtx);
367-
} else {
368-
GucAltered = true;
369-
}
370-
ActivePortal->sourceText = gucCtx;
371-
rc = SPI_execute(gucCtx, false, 0);
362+
GucAltered = false;
363+
rc = SPI_execute("RESET SESSION AUTHORIZATION; reset all;", false, 0);
372364
SPI_finish();
373365
if (rc < 0) {
374-
elog(ERROR, "Failed to set GUC context %s: %d", gucCtx, rc);
366+
elog(ERROR, "Failed to set reset context: %d", rc);
375367
}
376368
}
377369

@@ -399,6 +391,7 @@ process_remote_message(StringInfo s)
399391
case 'D':
400392
{
401393
int rc;
394+
GucAltered = true;
402395
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, messageBody);
403396
SPI_connect();
404397
ActivePortal->sourceText = messageBody;

pglogical_proto.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
131131
pq_sendint(out, MtmNodeId, 4);
132132
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
133133
pq_sendint64(out, csn);
134-
pq_sendstring(out, MtmGucSerialize());
135134

136135
MtmTransactionRecords = 0;
137136
}

0 commit comments

Comments
 (0)