Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3f4a139

Browse files
committed
Support concurrent drop
1 parent 5bc9448 commit 3f4a139

File tree

3 files changed

+32
-23
lines changed

3 files changed

+32
-23
lines changed

contrib/mmts/multimaster.c

+14-5
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ MtmState* Mtm;
162162

163163
VacuumStmt* MtmVacuumStmt;
164164
IndexStmt* MtmIndexStmt;
165+
DropStmt* MtmDropStmt;
165166
MemoryContext MtmApplyContext;
166167

167168
HTAB* MtmXid2State;
@@ -4052,7 +4053,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40524053
} else {
40534054
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
40544055
Assert(oldContext != MtmApplyContext);
4055-
MtmIndexStmt = (IndexStmt*)copyObject(parsetree);
4056+
MtmIndexStmt = indexStmt;
40564057
MemoryContextSwitchTo(oldContext);
40574058
return;
40584059
}
@@ -4063,11 +4064,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40634064
case T_DropStmt:
40644065
{
40654066
DropStmt *stmt = (DropStmt *) parsetree;
4066-
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
4067+
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
40674068
{
4068-
skipCommand = true;
4069-
MtmProcessDDLCommand(queryString, false, false);
4070-
MtmTx.isDistributed = false;
4069+
if (context == PROCESS_UTILITY_TOPLEVEL) {
4070+
MtmProcessDDLCommand(queryString, false, true);
4071+
MtmTx.isDistributed = false;
4072+
skipCommand = true;
4073+
} else {
4074+
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4075+
Assert(oldContext != MtmApplyContext);
4076+
MtmDropStmt = stmt;
4077+
MemoryContextSwitchTo(oldContext);
4078+
return;
4079+
}
40714080
}
40724081
}
40734082
break;

contrib/mmts/multimaster.h

+1
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ extern HTAB* MtmXid2State;
304304
extern HTAB* MtmGid2State;
305305
extern VacuumStmt* MtmVacuumStmt;
306306
extern IndexStmt* MtmIndexStmt;
307+
extern DropStmt* MtmDropStmt;
307308
extern MemoryContext MtmApplyContext;
308309

309310
extern void MtmArbiterInitialize(void);

contrib/mmts/pglogical_apply.c

+17-18
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ static void process_remote_insert(StringInfo s, Relation rel);
7979
static void process_remote_update(StringInfo s, Relation rel);
8080
static void process_remote_delete(StringInfo s, Relation rel);
8181

82+
static MemoryContext TopContext;
83+
8284
/*
8385
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8486
*
@@ -377,34 +379,31 @@ process_remote_message(StringInfo s)
377379
case 'D':
378380
{
379381
int rc;
380-
381382
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, messageBody);
382383
SPI_connect();
383384
ActivePortal->sourceText = messageBody;
384385
MtmVacuumStmt = NULL;
385386
MtmIndexStmt = NULL;
387+
MtmDropStmt = NULL;
386388
rc = SPI_execute(messageBody, false, 0);
387389
SPI_finish();
388390
if (rc < 0) {
389391
elog(ERROR, "Failed to execute utility statement %s", messageBody);
390392
} else {
391393
if (MtmVacuumStmt != NULL) {
392394
ExecVacuum(MtmVacuumStmt, 1);
393-
} else if (MtmIndexStmt != NULL) {
394-
MemoryContext saveCtx = TopTransactionContext;
395-
Oid relid;
396-
397-
TopTransactionContext = MtmApplyContext;
398-
relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
399-
false, false,
400-
NULL,
401-
NULL);
395+
} else if (MtmIndexStmt != NULL) {
396+
Oid relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
397+
false, false,
398+
NULL,
399+
NULL);
402400

403401
/* Run parse analysis ... */
404-
MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
402+
//MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
405403

406404
PushActiveSnapshot(GetTransactionSnapshot());
407405

406+
408407
DefineIndex(relid, /* OID of heap relation */
409408
MtmIndexStmt,
410409
InvalidOid, /* no predefined OID */
@@ -413,11 +412,11 @@ process_remote_message(StringInfo s)
413412
false, /* skip_build */
414413
false); /* quiet */
415414

416-
TopTransactionContext = saveCtx;
417-
418415
if (ActiveSnapshotSet())
419416
PopActiveSnapshot();
420417

418+
} else if (MtmDropStmt != NULL) {
419+
RemoveObjects(MtmDropStmt);
421420
}
422421
}
423422
if (standalone) {
@@ -1025,12 +1024,12 @@ void MtmExecutor(void* work, size_t size)
10251024

10261025
if (MtmApplyContext == NULL) {
10271026
MtmApplyContext = AllocSetContextCreate(TopMemoryContext,
1028-
"MessageContext",
1029-
ALLOCSET_DEFAULT_MINSIZE,
1030-
ALLOCSET_DEFAULT_INITSIZE,
1031-
ALLOCSET_DEFAULT_MAXSIZE);
1027+
"ApplyContext",
1028+
ALLOCSET_DEFAULT_MINSIZE,
1029+
ALLOCSET_DEFAULT_INITSIZE,
1030+
ALLOCSET_DEFAULT_MAXSIZE);
10321031
}
1033-
MemoryContextSwitchTo(MtmApplyContext);
1032+
TopContext = MemoryContextSwitchTo(MtmApplyContext);
10341033
replorigin_session_origin = InvalidRepOriginId;
10351034
PG_TRY();
10361035
{

0 commit comments

Comments
 (0)