Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7c17559

Browse files
committed
Handle CREATE/DROP INDEX CONCURRENTLY without 2PC
1 parent 9914ce5 commit 7c17559

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

multimaster.c

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ static void MtmShmemStartup(void);
148148
static BgwPool* MtmPoolConstructor(void);
149149
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
150150
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
151-
static bool MtmProcessDDLCommand(char const* queryString);
151+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
152152

153153
MtmState* Mtm;
154154

@@ -3721,7 +3721,7 @@ static char * MtmGucSerialize(void)
37213721
* -------------------------------------------
37223722
*/
37233723

3724-
static bool MtmProcessDDLCommand(char const* queryString)
3724+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
37253725
{
37263726
char *queryWithContext;
37273727
char *gucContext;
@@ -3740,7 +3740,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
37403740
}
37413741

37423742
MTM_LOG1("Sending utility: %s", queryWithContext);
3743-
LogLogicalMessage("G", queryWithContext, strlen(queryWithContext)+1, true);
3743+
if (transactional)
3744+
/* DDL */
3745+
LogLogicalMessage("D", queryWithContext, strlen(queryWithContext) + 1, true);
3746+
else
3747+
/* CONCURRENT DDL */
3748+
LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false);
37443749

37453750
MtmTx.containsDML = true;
37463751
return false;
@@ -3886,6 +3891,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38863891
}
38873892
break;
38883893

3894+
case T_IndexStmt:
3895+
{
3896+
IndexStmt *indexStmt = (IndexStmt *) parsetree;
3897+
if (indexStmt->concurrent && !IsTransactionBlock())
3898+
{
3899+
skipCommand = true;
3900+
MtmProcessDDLCommand(queryString, false);
3901+
MtmTx.isDistributed = false;
3902+
}
3903+
}
3904+
break;
3905+
3906+
case T_DropStmt:
3907+
{
3908+
DropStmt *stmt = (DropStmt *) parsetree;
3909+
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent && !IsTransactionBlock())
3910+
{
3911+
skipCommand = true;
3912+
MtmProcessDDLCommand(queryString, false);
3913+
MtmTx.isDistributed = false;
3914+
}
3915+
}
3916+
break;
3917+
38893918
/* Copy need some special care */
38903919
case T_CopyStmt:
38913920
{
@@ -3923,7 +3952,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39233952
{
39243953
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {
39253954
MtmUtilityProcessedInXid = GetCurrentTransactionId();
3926-
MtmProcessDDLCommand(queryString);
3955+
MtmProcessDDLCommand(queryString, true);
39273956
executed = true;
39283957
}
39293958
}
@@ -3950,7 +3979,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39503979
MtmTx.snapshot = INVALID_CSN;
39513980
}
39523981

3953-
if (executed)
3982+
if (executed && !skipCommand)
39543983
{
39553984
MtmFinishDDLCommand();
39563985
}

pglogical_apply.c

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
7171

7272
static bool process_remote_begin(StringInfo s);
73-
static void process_remote_transactional_message(StringInfo s);
7473
static void process_remote_message(StringInfo s);
7574
static void process_remote_commit(StringInfo s);
7675
static void process_remote_insert(StringInfo s, Relation rel);
@@ -355,35 +354,43 @@ process_remote_begin(StringInfo s)
355354
}
356355

357356
static void
358-
process_remote_transactional_message(StringInfo s)
357+
process_remote_message(StringInfo s)
359358
{
360-
int rc;
359+
char action = pq_getmsgbyte(s);
361360
int messageSize = pq_getmsgint(s, 4);
362-
char const* stmt = pq_getmsgbytes(s, messageSize);
361+
char const* messageBody = pq_getmsgbytes(s, messageSize);
363362

364-
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, stmt);
365-
SPI_connect();
366-
ActivePortal->sourceText = stmt;
367-
rc = SPI_execute(stmt, false, 0);
368-
SPI_finish();
369-
if (rc < 0)
370-
elog(ERROR, "Failed to execute utility statement %s", stmt);
363+
switch (action)
364+
{
365+
case 'C':
366+
{
367+
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);
368+
SetCurrentStatementStartTimestamp();
369+
StartTransactionCommand();
370+
/* intentional falldown to the next case */
371+
}
372+
case 'D':
373+
{
374+
int rc;
375+
376+
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, messageBody);
377+
SPI_connect();
378+
ActivePortal->sourceText = messageBody;
379+
rc = SPI_execute(messageBody, false, 0);
380+
SPI_finish();
381+
if (rc < 0)
382+
elog(ERROR, "Failed to execute utility statement %s", messageBody);
383+
break;
384+
}
385+
case 'L':
386+
{
387+
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
388+
MtmUpdateLockGraph(MtmReplicationNodeId, messageBody, messageSize);
389+
break;
390+
}
391+
}
371392

372-
//XXX: create messages for tables localization too.
373-
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
374-
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
375-
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
376-
// MtmMakeTableLocal(schema, name);
377-
// }
378-
}
379393

380-
static void
381-
process_remote_message(StringInfo s)
382-
{
383-
int messageSize = pq_getmsgint(s, 4);
384-
char const* messageBody = pq_getmsgbytes(s, messageSize);
385-
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
386-
MtmUpdateLockGraph(MtmReplicationNodeId, messageBody, messageSize);
387394
}
388395

389396
static void
@@ -1049,16 +1056,10 @@ void MtmExecutor(void* work, size_t size)
10491056
s.len = save_len;
10501057
continue;
10511058
}
1052-
case 'G':
1053-
case 'E':
1054-
{
1055-
process_remote_transactional_message(&s);
1056-
continue;
1057-
}
1058-
case 'L':
1059+
case 'M':
10591060
{
10601061
process_remote_message(&s);
1061-
break;
1062+
continue;
10621063
}
10631064
default:
10641065
elog(ERROR, "unknown action of type %c", action);

pglogical_proto.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pglogical_write_message(StringInfo out,
137137
{
138138
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
139139
}
140-
else if (*prefix == 'G')
140+
else if (*prefix == 'D')
141141
{
142142
if (MtmTransactionSnapshot(MtmCurrentXid) == INVALID_CSN)
143143
{
@@ -149,8 +149,14 @@ pglogical_write_message(StringInfo out,
149149
else if (*prefix == 'E')
150150
{
151151
DDLInProress = false;
152+
/*
153+
* we use End message only as indicator of DDL transaction finish,
154+
* so no need to send that to replicas.
155+
*/
156+
return;
152157
}
153158

159+
pq_sendbyte(out, 'M');
154160
pq_sendbyte(out, *prefix);
155161
pq_sendint(out, sz, 4);
156162
pq_sendbytes(out, message, sz);

0 commit comments

Comments
 (0)