Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2739552

Browse files
committed
check for temp tables access in the end of MtmProcessUtility()
1 parent f71d360 commit 2739552

File tree

3 files changed

+76
-28
lines changed

3 files changed

+76
-28
lines changed

contrib/mmts/multimaster.c

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ bool MtmUseRaftable;
210210
bool MtmUseDtm;
211211
bool MtmVolksWagenMode;
212212

213+
TransactionId MtmUtilityProcessedInXid;
214+
213215
static char* MtmConnStrs;
214216
static int MtmQueueSize;
215217
static int MtmWorkers;
@@ -688,7 +690,12 @@ MtmXactCallback(XactEvent event, void *arg)
688690
static bool
689691
MtmIsUserTransaction()
690692
{
691-
return !IsAutoVacuumLauncherProcess() && IsNormalProcessingMode() && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess();
693+
return !IsAutoVacuumLauncherProcess() &&
694+
IsNormalProcessingMode() &&
695+
MtmDoReplication &&
696+
!am_walsender &&
697+
!IsBackgroundWorker &&
698+
!IsAutoVacuumWorkerProcess();
692699
}
693700

694701
void
@@ -700,7 +707,6 @@ MtmResetTransaction()
700707
x->gtid.xid = InvalidTransactionId;
701708
x->isDistributed = false;
702709
x->isPrepared = false;
703-
x->isPrepared = false;
704710
x->status = TRANSACTION_STATUS_UNKNOWN;
705711
}
706712

@@ -3335,20 +3341,20 @@ MtmGenerateGid(char* gid)
33353341

33363342
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
33373343
{
3338-
if (MyXactAccessedTempRel)
3339-
{
3340-
/*
3341-
* XXX: this tx anyway goes to subscribers later, but without
3342-
* surrounding begin/commit. Now it will be filtered out on receiver side.
3343-
* Probably there is more clever way to do that.
3344-
*/
3345-
x->isDistributed = false;
3346-
if (!MtmVolksWagenMode)
3347-
elog(NOTICE, "MTM: Transaction was not replicated as it accesed temporary relation");
3348-
return false;
3349-
}
3350-
3351-
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
3344+
// if (MyXactAccessedTempRel)
3345+
// {
3346+
// /*
3347+
// * XXX: this tx anyway goes to subscribers later, but without
3348+
// * surrounding begin/commit. Now it will be filtered out on receiver side.
3349+
// * Probably there is more clever way to do that.
3350+
// */
3351+
// x->isDistributed = false;
3352+
// if (!MtmVolksWagenMode)
3353+
// elog(NOTICE, "MTM: Transaction was not replicated as it accesed temporary relation");
3354+
// return false;
3355+
// }
3356+
3357+
if (!x->isReplicated && x->isDistributed && x->containsDML) {
33523358
MtmGenerateGid(x->gid);
33533359
if (!x->isTransactionBlock) {
33543360
BeginTransactionBlock();
@@ -3358,7 +3364,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
33583364
}
33593365
if (!PrepareTransactionBlock(x->gid))
33603366
{
3361-
elog(WARNING, "Failed to prepare transaction %s", x->gid);
3367+
if (!MtmVolksWagenMode)
3368+
elog(WARNING, "Failed to prepare transaction %s", x->gid);
33623369
/* ??? Should we do explicit rollback */
33633370
} else {
33643371
CommitTransactionCommand();
@@ -3551,6 +3558,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35513558
switch (stmt->kind)
35523559
{
35533560
case TRANS_STMT_BEGIN:
3561+
case TRANS_STMT_START:
35543562
MtmTx.isTransactionBlock = true;
35553563
break;
35563564
case TRANS_STMT_COMMIT:
@@ -3587,16 +3595,34 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35873595
case T_LoadStmt:
35883596
case T_ClusterStmt:
35893597
case T_VacuumStmt:
3590-
case T_ExplainStmt:
35913598
case T_VariableShowStmt:
35923599
case T_ReassignOwnedStmt:
35933600
case T_LockStmt:
35943601
case T_CheckPointStmt:
35953602
case T_ReindexStmt:
3596-
case T_RefreshMatViewStmt:
35973603
skipCommand = true;
35983604
break;
35993605

3606+
case T_ExplainStmt:
3607+
/*
3608+
* EXPLAIN ANALYZE can create side-effects.
3609+
* Better to catch that by some general mechanism of detecting
3610+
* catalog and heap writes.
3611+
*/
3612+
{
3613+
ExplainStmt *stmt = (ExplainStmt *) parsetree;
3614+
ListCell *lc;
3615+
3616+
skipCommand = true;
3617+
foreach(lc, stmt->options)
3618+
{
3619+
DefElem *opt = (DefElem *) lfirst(lc);
3620+
if (strcmp(opt->defname, "analyze") == 0)
3621+
skipCommand = false;
3622+
}
3623+
}
3624+
break;
3625+
36003626
/* Save GUC context for consequent DDL execution */
36013627
case T_DiscardStmt:
36023628
{
@@ -3657,12 +3683,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36573683
break;
36583684
}
36593685

3660-
if (context == PROCESS_UTILITY_TOPLEVEL) // || context == PROCESS_UTILITY_QUERY)
3686+
/* XXX: dirty. Clear on new tx */
3687+
if (!skipCommand && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
3688+
MtmUtilityProcessedInXid = InvalidTransactionId;
3689+
3690+
if (context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY)
36613691
{
3662-
if (!skipCommand && !MtmTx.isReplicated) {
3663-
if (MtmProcessDDLCommand(queryString)) {
3664-
return;
3665-
}
3692+
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {
3693+
MtmUtilityProcessedInXid = GetCurrentTransactionId();
3694+
MtmProcessDDLCommand(queryString);
36663695
}
36673696
}
36683697

@@ -3676,12 +3705,22 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36763705
standard_ProcessUtility(parsetree, queryString, context,
36773706
params, dest, completionTag);
36783707
}
3708+
3709+
if (MyXactAccessedTempRel)
3710+
{
3711+
MTM_LOG1("Xact accessed temp table, stopping replication");
3712+
MtmTx.isDistributed = false; /* Skip */
3713+
}
3714+
36793715
}
36803716

36813717

36823718
static void
36833719
MtmExecutorFinish(QueryDesc *queryDesc)
36843720
{
3721+
/*
3722+
* If tx didn't wrote to XLOG then there is nothing to commit on other nodes.
3723+
*/
36853724
if (MtmDoReplication) {
36863725
CmdType operation = queryDesc->operation;
36873726
EState *estate = queryDesc->estate;
@@ -3699,12 +3738,20 @@ MtmExecutorFinish(QueryDesc *queryDesc)
36993738
continue;
37003739
}
37013740
}
3741+
MTM_LOG1("MtmTx.containsDML = true // WAL");
37023742
MtmTx.containsDML = true;
37033743
break;
37043744
}
37053745
}
37063746
}
37073747
}
3748+
3749+
// if (MyXactAccessedRel)
3750+
// {
3751+
// MTM_LOG1("MtmTx.containsDML = true");
3752+
// MtmTx.containsDML = true;
3753+
// }
3754+
37083755
if (PreviousExecutorFinishHook != NULL)
37093756
{
37103757
PreviousExecutorFinishHook(queryDesc);

contrib/mmts/pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -686,9 +686,9 @@ process_remote_insert(StringInfo s, Relation rel)
686686
ExecStoreTuple(tup, newslot, InvalidBuffer, true);
687687
}
688688

689-
if (rel->rd_rel->relkind != RELKIND_RELATION)
690-
elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
691-
rel->rd_rel->relkind, RelationGetRelationName(rel));
689+
// if (rel->rd_rel->relkind != RELKIND_RELATION) // RELKIND_MATVIEW
690+
// elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
691+
// rel->rd_rel->relkind, RelationGetRelationName(rel));
692692

693693
/* debug output */
694694
#ifdef VERBOSE_INSERT
@@ -978,7 +978,7 @@ void MtmExecutor(int id, void* work, size_t size)
978978
{
979979
while (true) {
980980
char action = pq_getmsgbyte(&s);
981-
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
981+
MTM_LOG1("%d: REMOTE process action %c", MyProcPid, action);
982982
#if 0
983983
if (Mtm->status == MTM_RECOVERY) {
984984
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);

contrib/mmts/pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
112112

113113
if (!isRecovery && csn == INVALID_CSN) {
114114
MtmIsFilteredTxn = true;
115+
MTM_LOG1("MtmIsFilteredTxn");
115116
} else {
116117
pq_sendbyte(out, 'B'); /* BEGIN */
117118
pq_sendint(out, MtmNodeId, 4);

0 commit comments

Comments
 (0)