Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e0896c1

Browse files
knizhnikkelvich
authored andcommitted
Ressurect support of concurrent DDL statements
1 parent 1f46538 commit e0896c1

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

multimaster.c

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
133133
static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
134134
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
135135
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
136-
static void MtmCommitPreparedTransaction(MtmCurrentTrans* x);
136+
static void MtmPreCommitPreparedTransaction(MtmCurrentTrans* x);
137137
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
138138
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
139139
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -705,8 +705,8 @@ MtmXactCallback(XactEvent event, void *arg)
705705
case XACT_EVENT_ABORT_PREPARED:
706706
MtmAbortPreparedTransaction(&MtmTx);
707707
break;
708-
case XACT_EVENT_COMMIT_PREPARED:
709-
MtmCommitPreparedTransaction(&MtmTx);
708+
case XACT_EVENT_PRE_COMMIT_PREPARED:
709+
MtmPreCommitPreparedTransaction(&MtmTx);
710710
break;
711711
case XACT_EVENT_COMMIT:
712712
MtmEndTransaction(&MtmTx, true);
@@ -1149,7 +1149,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
11491149
}
11501150

11511151
static void
1152-
MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1152+
MtmPreCommitPreparedTransaction(MtmCurrentTrans* x)
11531153
{
11541154
MtmTransMap* tm;
11551155
MtmTransState* ts;
@@ -1181,6 +1181,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11811181
Mtm2PCVoting(x, ts);
11821182

11831183
x->xid = ts->xid;
1184+
x->csn = ts->csn;
11841185
x->isPrepared = true;
11851186
}
11861187
MtmUnlock();
@@ -3529,6 +3530,7 @@ bool MtmFilterTransaction(char* record, int size)
35293530
}
35303531
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
35313532
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
3533+
Assert(restart_lsn != InvalidXLogRecPtr);
35323534
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, restart_lsn);
35333535
Mtm->nodes[origin_node-1].restartLSN = restart_lsn;
35343536
} else {
@@ -4442,19 +4444,14 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
44424444
}
44434445
MTM_LOG3("Sending utility: %s", queryString);
44444446
if (transactional) {
4445-
/* DDL */
4447+
/* Transactional DDL */
44464448
LogLogicalMessage("D", queryString, strlen(queryString) + 1, true);
44474449
MtmTx.containsDML = true;
4448-
} else {
4449-
char* gucCtx = MtmGucSerialize();
4450-
return;
4451-
if (*gucCtx) {
4452-
queryString = psprintf("%s; %s", gucCtx, queryString);
4453-
}
4454-
/* CONCURRENT DDL */
4450+
} else {
4451+
MTM_LOG1("Execute concurrent DDL: %s", queryString);
4452+
/* Concurrent DDL */
44554453
XLogFlush(LogLogicalMessage("C", queryString, strlen(queryString) + 1, false));
44564454
}
4457-
return;
44584455
}
44594456

44604457
static void MtmFinishDDLCommand()
@@ -4542,7 +4539,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45424539

45434540
case T_VacuumStmt:
45444541
skipCommand = true;
4545-
#if 0
45464542
if (context == PROCESS_UTILITY_TOPLEVEL) {
45474543
MtmProcessDDLCommand(queryString, false);
45484544
MtmTx.isDistributed = false;
@@ -4553,7 +4549,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
45534549
MemoryContextSwitchTo(oldContext);
45544550
return;
45554551
}
4556-
#endif
45574552
break;
45584553

45594554
case T_CreateDomainStmt:

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ process_remote_message(StringInfo s)
384384
{
385385
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);
386386
SetCurrentStatementStartTimestamp();
387+
MtmResetTransaction();
387388
StartTransactionCommand();
388389
standalone = true;
389390
/* intentional falldown to the next case */
@@ -413,7 +414,6 @@ process_remote_message(StringInfo s)
413414
false, false,
414415
NULL,
415416
NULL);
416-
417417
/* Run parse analysis ... */
418418
MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
419419

pglogical_receiver.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,13 @@ pglogical_receiver_main(Datum main_arg)
543543
MtmSpillToFile(spill_file, buf.data, buf.used);
544544
ByteBufferReset(&buf);
545545
}
546-
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A')) {
546+
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C')) {
547547
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
548-
MtmExecutor(stmt, rc - hdr_len);
548+
if ( stmt[1] == 'C') { /* concurrent DDL */
549+
MtmExecute(stmt, rc - hdr_len);
550+
} else {
551+
MtmExecutor(stmt, rc - hdr_len);
552+
}
549553
} else {
550554
ByteBufferAppend(&buf, stmt, rc - hdr_len);
551555
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)