Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 32d4d32

Browse files
committed
Correctly process concurrent logical messages
1 parent cfbf8e4 commit 32d4d32

File tree

2 files changed

+32
-25
lines changed

2 files changed

+32
-25
lines changed

contrib/mmts/multimaster.c

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static void MtmShmemStartup(void);
156156
static BgwPool* MtmPoolConstructor(void);
157157
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
158158
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
159-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
159+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree);
160160

161161
MtmState* Mtm;
162162

@@ -3825,33 +3825,31 @@ static char * MtmGucSerialize(void)
38253825
* -------------------------------------------
38263826
*/
38273827

3828-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
3828+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree)
38293829
{
3830-
char *queryWithContext;
3830+
char *queryWithContext = (char *) queryString;
38313831
char *gucContext;
38323832

3833-
/* Append global GUC to utility stmt. */
3834-
gucContext = MtmGucSerialize();
3835-
if (gucContext)
3836-
{
3837-
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3838-
strcpy(queryWithContext, gucContext);
3839-
strcat(queryWithContext, queryString);
3840-
}
3841-
else
3842-
{
3843-
queryWithContext = (char *) queryString;
3833+
if (!contextFree) {
3834+
/* Append global GUC to utility stmt. */
3835+
gucContext = MtmGucSerialize();
3836+
if (gucContext)
3837+
{
3838+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3839+
strcpy(queryWithContext, gucContext);
3840+
strcat(queryWithContext, queryString);
3841+
}
38443842
}
38453843

38463844
MTM_LOG3("Sending utility: %s", queryWithContext);
3847-
if (transactional)
3845+
if (transactional) {
38483846
/* DDL */
38493847
LogLogicalMessage("D", queryWithContext, strlen(queryWithContext) + 1, true);
3850-
else
3848+
MtmTx.containsDML = true;
3849+
} else {
38513850
/* CONCURRENT DDL */
3852-
LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false);
3853-
3854-
MtmTx.containsDML = true;
3851+
XLogFlush(LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false));
3852+
}
38553853
return false;
38563854
}
38573855

@@ -3930,7 +3928,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39303928
case T_UnlistenStmt:
39313929
case T_LoadStmt:
39323930
case T_ClusterStmt:
3933-
case T_VacuumStmt:
39343931
case T_VariableShowStmt:
39353932
case T_ReassignOwnedStmt:
39363933
case T_LockStmt:
@@ -3939,6 +3936,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39393936
skipCommand = true;
39403937
break;
39413938

3939+
case T_VacuumStmt:
3940+
context = PROCESS_UTILITY_TOPLEVEL;
3941+
MtmProcessDDLCommand(queryString, false, true);
3942+
MtmTx.isDistributed = false;
3943+
skipCommand = true;
3944+
break;
3945+
39423946
case T_CreateDomainStmt:
39433947
{
39443948
CreateDomainStmt *stmt = (CreateDomainStmt *) parsetree;
@@ -4030,7 +4034,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40304034
if (indexStmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
40314035
{
40324036
skipCommand = true;
4033-
MtmProcessDDLCommand(queryString, false);
4037+
MtmProcessDDLCommand(queryString, false, false);
40344038
MtmTx.isDistributed = false;
40354039
}
40364040
}
@@ -4042,7 +4046,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40424046
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
40434047
{
40444048
skipCommand = true;
4045-
MtmProcessDDLCommand(queryString, false);
4049+
MtmProcessDDLCommand(queryString, false, false);
40464050
MtmTx.isDistributed = false;
40474051
}
40484052
}
@@ -4085,9 +4089,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40854089
MtmUtilityProcessedInXid = GetCurrentTransactionId();
40864090

40874091
if (context == PROCESS_UTILITY_TOPLEVEL)
4088-
MtmProcessDDLCommand(queryString, true);
4092+
MtmProcessDDLCommand(queryString, true, false);
40894093
else
4090-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4094+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
40914095

40924096
executed = true;
40934097
}
@@ -4143,7 +4147,7 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
41434147
}
41444148

41454149
if (ddl_generating_call && !MtmTx.isReplicated)
4146-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4150+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
41474151

41484152
if (PreviousExecutorStartHook != NULL)
41494153
PreviousExecutorStartHook(queryDesc, eflags);

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ pglogical_receiver_main(Datum main_arg)
528528
if (stmt[0] == 'M' && stmt[1] == 'L') {
529529
MTM_LOG3("Process deadlock message from %d", nodeId);
530530
MtmExecutor(stmt, rc - hdr_len);
531+
} else if (stmt[0] == 'M' && stmt[1] == 'C') {
532+
MTM_LOG3("Process concurrent DDL message from %d", nodeId);
533+
MtmExecutor(stmt, rc - hdr_len);
531534
} else {
532535
ByteBufferAppend(&buf, stmt, rc - hdr_len);
533536
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)