Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fc033ef

Browse files
knizhnikkelvich
authored andcommitted
Support concurrent messages: VACUUM and CREATE INDEX CONCURRENTLY
1 parent 85f9100 commit fc033ef

File tree

4 files changed

+93
-27
lines changed

4 files changed

+93
-27
lines changed

multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bo
159159

160160
MtmState* Mtm;
161161

162+
VacuumStmt* MtmVacuumStmt;
163+
IndexStmt* MtmIndexStmt;
164+
MemoryContext MtmApplyContext;
165+
162166
HTAB* MtmXid2State;
163167
HTAB* MtmGid2State;
164168
static HTAB* MtmLocalTables;
@@ -3874,8 +3878,8 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
38743878
}
38753879

38763880
static void MtmProcessUtility(Node *parsetree, const char *queryString,
3877-
ProcessUtilityContext context, ParamListInfo params,
3878-
DestReceiver *dest, char *completionTag)
3881+
ProcessUtilityContext context, ParamListInfo params,
3882+
DestReceiver *dest, char *completionTag)
38793883
{
38803884
bool skipCommand = false;
38813885
bool executed = false;
@@ -3936,11 +3940,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39363940
break;
39373941

39383942
case T_VacuumStmt:
3939-
context = PROCESS_UTILITY_TOPLEVEL;
3940-
MtmProcessDDLCommand(queryString, false, true);
3941-
MtmTx.isDistributed = false;
3942-
skipCommand = true;
3943-
break;
3943+
if (context == PROCESS_UTILITY_TOPLEVEL) {
3944+
MtmProcessDDLCommand(queryString, false, true);
3945+
MtmTx.isDistributed = false;
3946+
skipCommand = true;
3947+
break;
3948+
} else {
3949+
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
3950+
Assert(oldContext != MtmApplyContext);
3951+
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
3952+
MemoryContextSwitchTo(oldContext);
3953+
return;
3954+
}
39443955

39453956
case T_CreateDomainStmt:
39463957
/* Detect temp tables access */
@@ -4031,11 +4042,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40314042
case T_IndexStmt:
40324043
{
40334044
IndexStmt *indexStmt = (IndexStmt *) parsetree;
4034-
if (indexStmt->concurrent && !IsTransactionBlock() && !MtmTx.isReplicated)
4045+
if (indexStmt->concurrent)
40354046
{
4036-
skipCommand = true;
4037-
MtmProcessDDLCommand(queryString, false, false);
4038-
MtmTx.isDistributed = false;
4047+
if (context == PROCESS_UTILITY_TOPLEVEL) {
4048+
MtmProcessDDLCommand(queryString, false, true);
4049+
MtmTx.isDistributed = false;
4050+
skipCommand = true;
4051+
} else {
4052+
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4053+
Assert(oldContext != MtmApplyContext);
4054+
MtmIndexStmt = (IndexStmt*)copyObject(parsetree);
4055+
MemoryContextSwitchTo(oldContext);
4056+
return;
4057+
}
40394058
}
40404059
}
40414060
break;

multimaster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "access/clog.h"
99
#include "pglogical_output/hooks.h"
10+
#include "commands/vacuum.h"
1011
#include "libpq-fe.h"
1112

1213
#define DEBUG_LEVEL 0
@@ -301,6 +302,9 @@ extern bool MtmUseDtm;
301302
extern bool MtmPreserveCommitOrder;
302303
extern HTAB* MtmXid2State;
303304
extern HTAB* MtmGid2State;
305+
extern VacuumStmt* MtmVacuumStmt;
306+
extern IndexStmt* MtmIndexStmt;
307+
extern MemoryContext MtmApplyContext;
304308

305309
extern void MtmArbiterInitialize(void);
306310
extern void MtmStartReceivers(void);

pglogical_apply.c

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
#include "catalog/pg_type.h"
1919

2020
#include "executor/spi.h"
21+
#include "commands/vacuum.h"
22+
#include "commands/defrem.h"
23+
#include "parser/parse_utilcmd.h"
2124

2225
#include "libpq/pqformat.h"
2326

@@ -70,7 +73,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7073
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
7174

7275
static bool process_remote_begin(StringInfo s);
73-
static void process_remote_message(StringInfo s);
76+
static bool process_remote_message(StringInfo s);
7477
static void process_remote_commit(StringInfo s);
7578
static void process_remote_insert(StringInfo s, Relation rel);
7679
static void process_remote_update(StringInfo s, Relation rel);
@@ -353,20 +356,22 @@ process_remote_begin(StringInfo s)
353356
return true;
354357
}
355358

356-
static void
359+
static bool
357360
process_remote_message(StringInfo s)
358361
{
359362
char action = pq_getmsgbyte(s);
360363
int messageSize = pq_getmsgint(s, 4);
361364
char const* messageBody = pq_getmsgbytes(s, messageSize);
362-
365+
bool standalone = false;
366+
363367
switch (action)
364368
{
365369
case 'C':
366370
{
367371
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);
368372
SetCurrentStatementStartTimestamp();
369373
StartTransactionCommand();
374+
standalone = true;
370375
/* intentional falldown to the next case */
371376
}
372377
case 'D':
@@ -376,21 +381,59 @@ process_remote_message(StringInfo s)
376381
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, messageBody);
377382
SPI_connect();
378383
ActivePortal->sourceText = messageBody;
384+
MtmVacuumStmt = NULL;
385+
MtmIndexStmt = NULL;
379386
rc = SPI_execute(messageBody, false, 0);
380387
SPI_finish();
381-
if (rc < 0)
388+
if (rc < 0) {
382389
elog(ERROR, "Failed to execute utility statement %s", messageBody);
390+
} else {
391+
if (MtmVacuumStmt != NULL) {
392+
ExecVacuum(MtmVacuumStmt, 1);
393+
} else if (MtmIndexStmt != NULL) {
394+
MemoryContext saveCtx = TopTransactionContext;
395+
Oid relid;
396+
397+
TopTransactionContext = MtmApplyContext;
398+
relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
399+
false, false,
400+
NULL,
401+
NULL);
402+
403+
/* Run parse analysis ... */
404+
MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
405+
406+
PushActiveSnapshot(GetTransactionSnapshot());
407+
408+
DefineIndex(relid, /* OID of heap relation */
409+
MtmIndexStmt,
410+
InvalidOid, /* no predefined OID */
411+
false, /* is_alter_table */
412+
true, /* check_rights */
413+
false, /* skip_build */
414+
false); /* quiet */
415+
416+
TopTransactionContext = saveCtx;
417+
418+
if (ActiveSnapshotSet())
419+
PopActiveSnapshot();
420+
421+
}
422+
}
423+
if (standalone) {
424+
CommitTransactionCommand();
425+
}
383426
break;
384427
}
385428
case 'L':
386429
{
387430
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
388431
MtmUpdateLockGraph(MtmReplicationNodeId, messageBody, messageSize);
432+
standalone = true;
389433
break;
390434
}
391435
}
392-
393-
436+
return standalone;
394437
}
395438

396439
static void
@@ -968,8 +1011,6 @@ process_remote_delete(StringInfo s, Relation rel)
9681011
CommandCounterIncrement();
9691012
}
9701013

971-
static MemoryContext ApplyContext;
972-
9731014
void MtmExecutor(void* work, size_t size)
9741015
{
9751016
StringInfoData s;
@@ -982,14 +1023,14 @@ void MtmExecutor(void* work, size_t size)
9821023
s.maxlen = -1;
9831024
s.cursor = 0;
9841025

985-
if (ApplyContext == NULL) {
986-
ApplyContext = AllocSetContextCreate(TopMemoryContext,
1026+
if (MtmApplyContext == NULL) {
1027+
MtmApplyContext = AllocSetContextCreate(TopMemoryContext,
9871028
"MessageContext",
9881029
ALLOCSET_DEFAULT_MINSIZE,
9891030
ALLOCSET_DEFAULT_INITSIZE,
9901031
ALLOCSET_DEFAULT_MAXSIZE);
9911032
}
992-
MemoryContextSwitchTo(ApplyContext);
1033+
MemoryContextSwitchTo(MtmApplyContext);
9931034
replorigin_session_origin = InvalidRepOriginId;
9941035
PG_TRY();
9951036
{
@@ -1058,7 +1099,9 @@ void MtmExecutor(void* work, size_t size)
10581099
}
10591100
case 'M':
10601101
{
1061-
process_remote_message(&s);
1102+
if (process_remote_message(&s)) {
1103+
break;
1104+
}
10621105
continue;
10631106
}
10641107
default:
@@ -1069,7 +1112,7 @@ void MtmExecutor(void* work, size_t size)
10691112
}
10701113
PG_CATCH();
10711114
{
1072-
MemoryContext oldcontext = MemoryContextSwitchTo(ApplyContext);
1115+
MemoryContext oldcontext = MemoryContextSwitchTo(MtmApplyContext);
10731116
MtmHandleApplyError();
10741117
MemoryContextSwitchTo(oldcontext);
10751118
EmitErrorReport();
@@ -1083,6 +1126,6 @@ void MtmExecutor(void* work, size_t size)
10831126
if (spill_file >= 0) {
10841127
MtmCloseSpillFile(spill_file);
10851128
}
1086-
MemoryContextResetAndDeleteChildren(ApplyContext);
1129+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
10871130
}
10881131

pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ pglogical_receiver_main(Datum main_arg)
529529
MTM_LOG3("Process deadlock message from %d", nodeId);
530530
MtmExecutor(stmt, rc - hdr_len);
531531
} else if (stmt[0] == 'M' && stmt[1] == 'C') {
532-
MTM_LOG3("Process concurrent DDL message from %d", nodeId);
533-
MtmExecutor(stmt, rc - hdr_len);
532+
MTM_LOG1("Process concurrent DDL message from %d", nodeId);
533+
MtmExecute(stmt, rc - hdr_len);
534534
} else {
535535
ByteBufferAppend(&buf, stmt, rc - hdr_len);
536536
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)