Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 86d25e9

Browse files
committed
check for temp tables access in the end of MtmProcessUtility()
1 parent 3754bb9 commit 86d25e9

File tree

3 files changed

+76
-28
lines changed

3 files changed

+76
-28
lines changed

multimaster.c

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ bool MtmUseRaftable;
209209
bool MtmUseDtm;
210210
bool MtmVolksWagenMode;
211211

212+
TransactionId MtmUtilityProcessedInXid;
213+
212214
static char* MtmConnStrs;
213215
static int MtmQueueSize;
214216
static int MtmWorkers;
@@ -687,7 +689,12 @@ MtmXactCallback(XactEvent event, void *arg)
687689
static bool
688690
MtmIsUserTransaction()
689691
{
690-
return !IsAutoVacuumLauncherProcess() && IsNormalProcessingMode() && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess();
692+
return !IsAutoVacuumLauncherProcess() &&
693+
IsNormalProcessingMode() &&
694+
MtmDoReplication &&
695+
!am_walsender &&
696+
!IsBackgroundWorker &&
697+
!IsAutoVacuumWorkerProcess();
691698
}
692699

693700
void
@@ -699,7 +706,6 @@ MtmResetTransaction()
699706
x->gtid.xid = InvalidTransactionId;
700707
x->isDistributed = false;
701708
x->isPrepared = false;
702-
x->isPrepared = false;
703709
x->status = TRANSACTION_STATUS_UNKNOWN;
704710
}
705711

@@ -3334,20 +3340,20 @@ MtmGenerateGid(char* gid)
33343340

33353341
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
33363342
{
3337-
if (MyXactAccessedTempRel)
3338-
{
3339-
/*
3340-
* XXX: this tx anyway goes to subscribers later, but without
3341-
* surrounding begin/commit. Now it will be filtered out on receiver side.
3342-
* Probably there is more clever way to do that.
3343-
*/
3344-
x->isDistributed = false;
3345-
if (!MtmVolksWagenMode)
3346-
elog(NOTICE, "MTM: Transaction was not replicated as it accesed temporary relation");
3347-
return false;
3348-
}
3349-
3350-
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
3343+
// if (MyXactAccessedTempRel)
3344+
// {
3345+
// /*
3346+
// * XXX: this tx anyway goes to subscribers later, but without
3347+
// * surrounding begin/commit. Now it will be filtered out on receiver side.
3348+
// * Probably there is more clever way to do that.
3349+
// */
3350+
// x->isDistributed = false;
3351+
// if (!MtmVolksWagenMode)
3352+
// elog(NOTICE, "MTM: Transaction was not replicated as it accesed temporary relation");
3353+
// return false;
3354+
// }
3355+
3356+
if (!x->isReplicated && x->isDistributed && x->containsDML) {
33513357
MtmGenerateGid(x->gid);
33523358
if (!x->isTransactionBlock) {
33533359
BeginTransactionBlock();
@@ -3357,7 +3363,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
33573363
}
33583364
if (!PrepareTransactionBlock(x->gid))
33593365
{
3360-
elog(WARNING, "Failed to prepare transaction %s", x->gid);
3366+
if (!MtmVolksWagenMode)
3367+
elog(WARNING, "Failed to prepare transaction %s", x->gid);
33613368
/* ??? Should we do explicit rollback */
33623369
} else {
33633370
CommitTransactionCommand();
@@ -3550,6 +3557,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35503557
switch (stmt->kind)
35513558
{
35523559
case TRANS_STMT_BEGIN:
3560+
case TRANS_STMT_START:
35533561
MtmTx.isTransactionBlock = true;
35543562
break;
35553563
case TRANS_STMT_COMMIT:
@@ -3586,16 +3594,34 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35863594
case T_LoadStmt:
35873595
case T_ClusterStmt:
35883596
case T_VacuumStmt:
3589-
case T_ExplainStmt:
35903597
case T_VariableShowStmt:
35913598
case T_ReassignOwnedStmt:
35923599
case T_LockStmt:
35933600
case T_CheckPointStmt:
35943601
case T_ReindexStmt:
3595-
case T_RefreshMatViewStmt:
35963602
skipCommand = true;
35973603
break;
35983604

3605+
case T_ExplainStmt:
3606+
/*
3607+
* EXPLAIN ANALYZE can create side-effects.
3608+
* Better to catch that by some general mechanism of detecting
3609+
* catalog and heap writes.
3610+
*/
3611+
{
3612+
ExplainStmt *stmt = (ExplainStmt *) parsetree;
3613+
ListCell *lc;
3614+
3615+
skipCommand = true;
3616+
foreach(lc, stmt->options)
3617+
{
3618+
DefElem *opt = (DefElem *) lfirst(lc);
3619+
if (strcmp(opt->defname, "analyze") == 0)
3620+
skipCommand = false;
3621+
}
3622+
}
3623+
break;
3624+
35993625
/* Save GUC context for consequent DDL execution */
36003626
case T_DiscardStmt:
36013627
{
@@ -3656,12 +3682,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36563682
break;
36573683
}
36583684

3659-
if (context == PROCESS_UTILITY_TOPLEVEL) // || context == PROCESS_UTILITY_QUERY)
3685+
/* XXX: dirty. Clear on new tx */
3686+
if (!skipCommand && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
3687+
MtmUtilityProcessedInXid = InvalidTransactionId;
3688+
3689+
if (context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY)
36603690
{
3661-
if (!skipCommand && !MtmTx.isReplicated) {
3662-
if (MtmProcessDDLCommand(queryString)) {
3663-
return;
3664-
}
3691+
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {
3692+
MtmUtilityProcessedInXid = GetCurrentTransactionId();
3693+
MtmProcessDDLCommand(queryString);
36653694
}
36663695
}
36673696

@@ -3675,12 +3704,22 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36753704
standard_ProcessUtility(parsetree, queryString, context,
36763705
params, dest, completionTag);
36773706
}
3707+
3708+
if (MyXactAccessedTempRel)
3709+
{
3710+
MTM_LOG1("Xact accessed temp table, stopping replication");
3711+
MtmTx.isDistributed = false; /* Skip */
3712+
}
3713+
36783714
}
36793715

36803716

36813717
static void
36823718
MtmExecutorFinish(QueryDesc *queryDesc)
36833719
{
3720+
/*
3721+
* If tx didn't wrote to XLOG then there is nothing to commit on other nodes.
3722+
*/
36843723
if (MtmDoReplication) {
36853724
CmdType operation = queryDesc->operation;
36863725
EState *estate = queryDesc->estate;
@@ -3698,12 +3737,20 @@ MtmExecutorFinish(QueryDesc *queryDesc)
36983737
continue;
36993738
}
37003739
}
3740+
MTM_LOG1("MtmTx.containsDML = true // WAL");
37013741
MtmTx.containsDML = true;
37023742
break;
37033743
}
37043744
}
37053745
}
37063746
}
3747+
3748+
// if (MyXactAccessedRel)
3749+
// {
3750+
// MTM_LOG1("MtmTx.containsDML = true");
3751+
// MtmTx.containsDML = true;
3752+
// }
3753+
37073754
if (PreviousExecutorFinishHook != NULL)
37083755
{
37093756
PreviousExecutorFinishHook(queryDesc);

pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -686,9 +686,9 @@ process_remote_insert(StringInfo s, Relation rel)
686686
ExecStoreTuple(tup, newslot, InvalidBuffer, true);
687687
}
688688

689-
if (rel->rd_rel->relkind != RELKIND_RELATION)
690-
elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
691-
rel->rd_rel->relkind, RelationGetRelationName(rel));
689+
// if (rel->rd_rel->relkind != RELKIND_RELATION) // RELKIND_MATVIEW
690+
// elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
691+
// rel->rd_rel->relkind, RelationGetRelationName(rel));
692692

693693
/* debug output */
694694
#ifdef VERBOSE_INSERT
@@ -978,7 +978,7 @@ void MtmExecutor(int id, void* work, size_t size)
978978
{
979979
while (true) {
980980
char action = pq_getmsgbyte(&s);
981-
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
981+
MTM_LOG1("%d: REMOTE process action %c", MyProcPid, action);
982982
#if 0
983983
if (Mtm->status == MTM_RECOVERY) {
984984
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);

pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
112112

113113
if (!isRecovery && csn == INVALID_CSN) {
114114
MtmIsFilteredTxn = true;
115+
MTM_LOG1("MtmIsFilteredTxn");
115116
} else {
116117
pq_sendbyte(out, 'B'); /* BEGIN */
117118
pq_sendint(out, MtmNodeId, 4);

0 commit comments

Comments
 (0)