Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 85f9100

Browse files
knizhnikkelvich
authored andcommitted
Correctly process concurrent logical messages
1 parent 29ce7df commit 85f9100

File tree

2 files changed

+32
-25
lines changed

2 files changed

+32
-25
lines changed

multimaster.c

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ static void MtmShmemStartup(void);
155155
static BgwPool* MtmPoolConstructor(void);
156156
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
157157
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
158-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
158+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree);
159159

160160
MtmState* Mtm;
161161

@@ -3824,33 +3824,31 @@ static char * MtmGucSerialize(void)
38243824
* -------------------------------------------
38253825
*/
38263826

3827-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
3827+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree)
38283828
{
3829-
char *queryWithContext;
3829+
char *queryWithContext = (char *) queryString;
38303830
char *gucContext;
38313831

3832-
/* Append global GUC to utility stmt. */
3833-
gucContext = MtmGucSerialize();
3834-
if (gucContext)
3835-
{
3836-
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3837-
strcpy(queryWithContext, gucContext);
3838-
strcat(queryWithContext, queryString);
3839-
}
3840-
else
3841-
{
3842-
queryWithContext = (char *) queryString;
3832+
if (!contextFree) {
3833+
/* Append global GUC to utility stmt. */
3834+
gucContext = MtmGucSerialize();
3835+
if (gucContext)
3836+
{
3837+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3838+
strcpy(queryWithContext, gucContext);
3839+
strcat(queryWithContext, queryString);
3840+
}
38433841
}
38443842

38453843
MTM_LOG3("Sending utility: %s", queryWithContext);
3846-
if (transactional)
3844+
if (transactional) {
38473845
/* DDL */
38483846
LogLogicalMessage("D", queryWithContext, strlen(queryWithContext) + 1, true);
3849-
else
3847+
MtmTx.containsDML = true;
3848+
} else {
38503849
/* CONCURRENT DDL */
3851-
LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false);
3852-
3853-
MtmTx.containsDML = true;
3850+
XLogFlush(LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false));
3851+
}
38543852
return false;
38553853
}
38563854

@@ -3929,7 +3927,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39293927
case T_UnlistenStmt:
39303928
case T_LoadStmt:
39313929
case T_ClusterStmt:
3932-
case T_VacuumStmt:
39333930
case T_VariableShowStmt:
39343931
case T_ReassignOwnedStmt:
39353932
case T_LockStmt:
@@ -3938,6 +3935,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39383935
skipCommand = true;
39393936
break;
39403937

3938+
case T_VacuumStmt:
3939+
context = PROCESS_UTILITY_TOPLEVEL;
3940+
MtmProcessDDLCommand(queryString, false, true);
3941+
MtmTx.isDistributed = false;
3942+
skipCommand = true;
3943+
break;
3944+
39413945
case T_CreateDomainStmt:
39423946
/* Detect temp tables access */
39433947
{
@@ -4030,7 +4034,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40304034
if (indexStmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
40314035
{
40324036
skipCommand = true;
4033-
MtmProcessDDLCommand(queryString, false);
4037+
MtmProcessDDLCommand(queryString, false, false);
40344038
MtmTx.isDistributed = false;
40354039
}
40364040
}
@@ -4042,7 +4046,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40424046
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
40434047
{
40444048
skipCommand = true;
4045-
MtmProcessDDLCommand(queryString, false);
4049+
MtmProcessDDLCommand(queryString, false, false);
40464050
MtmTx.isDistributed = false;
40474051
}
40484052
}
@@ -4085,9 +4089,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40854089
MtmUtilityProcessedInXid = GetCurrentTransactionId();
40864090

40874091
if (context == PROCESS_UTILITY_TOPLEVEL)
4088-
MtmProcessDDLCommand(queryString, true);
4092+
MtmProcessDDLCommand(queryString, true, false);
40894093
else
4090-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4094+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
40914095

40924096
executed = true;
40934097
}
@@ -4144,7 +4148,7 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
41444148
}
41454149

41464150
if (ddl_generating_call && !MtmTx.isReplicated)
4147-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
4151+
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
41484152

41494153
if (PreviousExecutorStartHook != NULL)
41504154
PreviousExecutorStartHook(queryDesc, eflags);

pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ pglogical_receiver_main(Datum main_arg)
528528
if (stmt[0] == 'M' && stmt[1] == 'L') {
529529
MTM_LOG3("Process deadlock message from %d", nodeId);
530530
MtmExecutor(stmt, rc - hdr_len);
531+
} else if (stmt[0] == 'M' && stmt[1] == 'C') {
532+
MTM_LOG3("Process concurrent DDL message from %d", nodeId);
533+
MtmExecutor(stmt, rc - hdr_len);
531534
} else {
532535
ByteBufferAppend(&buf, stmt, rc - hdr_len);
533536
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)