Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f437110

Browse files
committed
Abbort prepared transaction of disabled nodes
1 parent df7106d commit f437110

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

contrib/mmts/multimaster.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -702,10 +702,12 @@ MtmCreateTransState(MtmCurrentTrans* x)
702702
if (TransactionIdIsValid(x->gtid.xid)) {
703703
Assert(x->gtid.node != MtmNodeId);
704704
ts->gtid = x->gtid;
705+
strcpy(ts->gid, x->gid);
705706
} else {
706707
/* I am coordinator of transaction */
707708
ts->gtid.xid = x->xid;
708709
ts->gtid.node = MtmNodeId;
710+
ts->gid[0] = '\0';
709711
}
710712
}
711713
return ts;
@@ -1062,6 +1064,7 @@ void MtmWakeUpBackend(MtmTransState* ts)
10621064
void MtmAbortTransaction(MtmTransState* ts)
10631065
{
10641066
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1067+
MTM_LOG1("Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
10651068
ts->status = TRANSACTION_STATUS_ABORTED;
10661069
MtmAdjustSubtransactions(ts);
10671070
Mtm->nActiveTransactions -= 1;
@@ -1337,7 +1340,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13371340
*/
13381341
bool MtmRefreshClusterStatus(bool nowait)
13391342
{
1340-
nodemask_t mask, clique;
1343+
nodemask_t mask, clique, disabled;
13411344
nodemask_t matrix[MAX_NODES];
13421345
MtmTransState *ts;
13431346
int clique_size;
@@ -1363,7 +1366,9 @@ bool MtmRefreshClusterStatus(bool nowait)
13631366

13641367
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long) clique, (long) Mtm->disabledNodeMask);
13651368
MtmLock(LW_EXCLUSIVE);
1366-
mask = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1369+
disabled = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1370+
1371+
mask = disabled;
13671372
for (i = 0; mask != 0; i++, mask >>= 1) {
13681373
if (mask & 1) {
13691374
MtmDisableNode(i+1);
@@ -1378,12 +1383,16 @@ bool MtmRefreshClusterStatus(bool nowait)
13781383
MtmCheckQuorum();
13791384
/* Interrupt voting for active transaction and abort them */
13801385
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1381-
if (!ts->votingCompleted && MtmIsCoordinator(ts)) {
1386+
if (MtmIsCoordinator(ts)) {
1387+
if (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED) {
1388+
MtmAbortTransaction(ts);
1389+
MtmWakeUpBackend(ts);
1390+
}
1391+
} else if (BIT_CHECK(disabled, ts->gtid.node-1)) { // coordinator of transaction is on disabled node
13821392
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1383-
MTM_LOG1("Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
13841393
MtmAbortTransaction(ts);
1385-
}
1386-
MtmWakeUpBackend(ts);
1394+
FinishPreparedTransaction(ts->gid, false);
1395+
}
13871396
}
13881397
}
13891398
MtmUnlock();
@@ -1444,12 +1453,16 @@ void MtmOnNodeDisconnect(int nodeId)
14441453
MtmCheckQuorum();
14451454
/* Interrupt voting for active transaction and abort them */
14461455
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1447-
if (!ts->votingCompleted && MtmIsCoordinator(ts)) {
1456+
if (MtmIsCoordinator(ts)) {
1457+
if (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED) {
1458+
MtmAbortTransaction(ts);
1459+
MtmWakeUpBackend(ts);
1460+
}
1461+
} else if (ts->gtid.node == nodeId) { //coordinator of transaction is on disabled node
14481462
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1449-
MTM_LOG1("Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
14501463
MtmAbortTransaction(ts);
1451-
}
1452-
MtmWakeUpBackend(ts);
1464+
FinishPreparedTransaction(ts->gid, false);
1465+
}
14531466
}
14541467
}
14551468
}

contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ typedef struct MtmTransState
140140
{
141141
TransactionId xid;
142142
XidStatus status;
143-
GlobalTransactionId gtid;
143+
char gid[MULTIMASTER_MAX_GID_SIZE]; /* Global transaction ID (used for 2PC) */
144+
GlobalTransactionId gtid; /* Transaction id at coordinator */
144145
csn_t csn; /* commit serial number */
145146
csn_t snapshot; /* transaction snapshot, or INVALID_CSN for local transactions */
146147
int nVotes; /* number of votes received from replcas for this transaction:
@@ -153,7 +154,7 @@ typedef struct MtmTransState
153154
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
154155
bool votingCompleted; /* 2PC voting is completed */
155156
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
156-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
157+
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
157158
} MtmTransState;
158159

159160
typedef struct

0 commit comments

Comments
 (0)