Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7228f87

Browse files
committed
Optimize TwoPhaseGetGXact
1 parent 3bb92c1 commit 7228f87

File tree

5 files changed

+27
-6
lines changed

5 files changed

+27
-6
lines changed

contrib/mmts/arbiter.c

+1
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ static void MtmReceiver(Datum arg)
10511051
MtmWakeUpBackend(ts);
10521052
}
10531053
} else {
1054+
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction"); // How it can happen? SHould we use assert here?
10541055
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
10551056
MtmWakeUpBackend(ts);
10561057
}

contrib/mmts/multimaster.c

+10-3
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ void MtmPrecommitTransaction(char const* gid)
942942
ts->csn = MtmAssignCSN();
943943
MtmAdjustSubtransactions(ts);
944944
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
945+
SetPrepareTransactionState(ts->gid, "precommitted");
945946
}
946947
}
947948
MtmUnlock();
@@ -1561,7 +1562,10 @@ void MtmHandleApplyError(void)
15611562
}
15621563

15631564
/**
1564-
* Check status of all prepared transactions with coordinator at disabled node
1565+
* Check status of all prepared transactions with coordinator at disabled node.
1566+
* Actually, if node is precommitted (state == UNKNOWN) at any of nodes, then is is prepared at all nodes and so can be committed.
1567+
* But if coordinator of transaction is crashed, we made a decision about transaction commit only if transaction is precommitted at ALL live nodes.
1568+
* The reason is that we want to avoid extra polling to obtain maximum CSN from all nodes to assign it to committed transaction.
15651569
* Called only from MtmDisableNode in critical section.
15661570
*/
15671571
static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
@@ -1602,9 +1606,12 @@ static void MtmDisableNode(int nodeId)
16021606
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
16031607
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16041608
if (nodeId != MtmNodeId) {
1605-
Mtm->nLiveNodes -= 1;
1609+
Mtm->nLiveNodes -= 1;
1610+
}
1611+
if (Mtm->nLiveNodes >= Mtm->nAllNodes/2+1) {
1612+
/* Make decision about prepared transaction status only in quorum */
1613+
MtmPollStatusOfPreparedTransactions(nodeId);
16061614
}
1607-
MtmPollStatusOfPreparedTransactions(nodeId);
16081615
}
16091616

16101617
static void MtmEnableNode(int nodeId)

contrib/mmts/multimaster.h

+6
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ typedef struct
154154
pgid_t gid; /* Global transaction identifier */
155155
} MtmArbiterMessage;
156156

157+
/*
158+
* Abort logical message is send by replica when error is happen while applying prepared transaction.
159+
* In this case we do not have prepared transaction and can not do abort-prepared.
160+
* But we have to record the fact of abort to be able to replay it in case of crash of coordinator of this transaction.
161+
* We are using logical abort message with code 'A' for it
162+
*/
157163
typedef struct MtmAbortLogicalMessage
158164
{
159165
pgid_t gid;

contrib/mmts/pglogical_apply.c

+3
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,9 @@ process_remote_commit(StringInfo in)
628628
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
629629
gid = pq_getmsgstring(in);
630630
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s", MyProcPid, gid);
631+
MtmBeginSession(origin_node);
631632
MtmPrecommitTransaction(gid);
633+
MtmEndSession(origin_node, true);
632634
return;
633635
}
634636
case PGLOGICAL_COMMIT:
@@ -693,6 +695,7 @@ process_remote_commit(StringInfo in)
693695
{
694696
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
695697
gid = pq_getmsgstring(in);
698+
/* MtmRollbackPreparedTransaction will set origin session itself */
696699
MtmRollbackPreparedTransaction(origin_node, gid);
697700
break;
698701
}

src/backend/access/transam/twophase.c

+7-3
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ static TwoPhaseStateData *TwoPhaseState;
183183
static GlobalTransaction MyLockedGxact = NULL;
184184

185185
static bool twophaseExitRegistered = false;
186+
static TransactionId cached_xid = InvalidTransactionId;
187+
static GlobalTransaction cached_gxact = NULL;
188+
186189

187190
static void RecordTransactionCommitPrepared(TransactionId xid,
188191
int nchildren,
@@ -438,6 +441,10 @@ MarkAsPreparing(TransactionId xid, const char *gid,
438441
proc->lwWaitMode = 0;
439442
proc->waitLock = NULL;
440443
proc->waitProcLock = NULL;
444+
445+
cached_xid = xid;
446+
cached_gxact = gxact;
447+
441448
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
442449
SHMQueueInit(&(proc->myProcLocks[i]));
443450
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -825,9 +832,6 @@ TwoPhaseGetGXact(TransactionId xid)
825832
GlobalTransaction result = NULL;
826833
int i;
827834

828-
static TransactionId cached_xid = InvalidTransactionId;
829-
static GlobalTransaction cached_gxact = NULL;
830-
831835
/*
832836
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
833837
* repeatedly for the same XID. We can save work with a simple cache.

0 commit comments

Comments
 (0)