Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit dc1837c

Browse files
knizhnikkelvich
authored andcommitted
Abbort prepared transaction of disabled nodes
1 parent 1a90ccc commit dc1837c

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

multimaster.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -701,10 +701,12 @@ MtmCreateTransState(MtmCurrentTrans* x)
701701
if (TransactionIdIsValid(x->gtid.xid)) {
702702
Assert(x->gtid.node != MtmNodeId);
703703
ts->gtid = x->gtid;
704+
strcpy(ts->gid, x->gid);
704705
} else {
705706
/* I am coordinator of transaction */
706707
ts->gtid.xid = x->xid;
707708
ts->gtid.node = MtmNodeId;
709+
ts->gid[0] = '\0';
708710
}
709711
}
710712
return ts;
@@ -1061,6 +1063,7 @@ void MtmWakeUpBackend(MtmTransState* ts)
10611063
void MtmAbortTransaction(MtmTransState* ts)
10621064
{
10631065
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1066+
MTM_LOG1("Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
10641067
ts->status = TRANSACTION_STATUS_ABORTED;
10651068
MtmAdjustSubtransactions(ts);
10661069
Mtm->nActiveTransactions -= 1;
@@ -1336,7 +1339,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13361339
*/
13371340
bool MtmRefreshClusterStatus(bool nowait)
13381341
{
1339-
nodemask_t mask, clique;
1342+
nodemask_t mask, clique, disabled;
13401343
nodemask_t matrix[MAX_NODES];
13411344
MtmTransState *ts;
13421345
int clique_size;
@@ -1362,7 +1365,9 @@ bool MtmRefreshClusterStatus(bool nowait)
13621365

13631366
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long) clique, (long) Mtm->disabledNodeMask);
13641367
MtmLock(LW_EXCLUSIVE);
1365-
mask = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1368+
disabled = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1369+
1370+
mask = disabled;
13661371
for (i = 0; mask != 0; i++, mask >>= 1) {
13671372
if (mask & 1) {
13681373
MtmDisableNode(i+1);
@@ -1377,12 +1382,16 @@ bool MtmRefreshClusterStatus(bool nowait)
13771382
MtmCheckQuorum();
13781383
/* Interrupt voting for active transaction and abort them */
13791384
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1380-
if (!ts->votingCompleted && MtmIsCoordinator(ts)) {
1385+
if (MtmIsCoordinator(ts)) {
1386+
if (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED) {
1387+
MtmAbortTransaction(ts);
1388+
MtmWakeUpBackend(ts);
1389+
}
1390+
} else if (BIT_CHECK(disabled, ts->gtid.node-1)) { // coordinator of transaction is on disabled node
13811391
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1382-
MTM_LOG1("1) Rollback active transaction %d:%d:%d", ts->gtid.node, ts->gtid.xid, ts->xid);
13831392
MtmAbortTransaction(ts);
1384-
}
1385-
MtmWakeUpBackend(ts);
1393+
FinishPreparedTransaction(ts->gid, false);
1394+
}
13861395
}
13871396
}
13881397
MtmUnlock();
@@ -1443,12 +1452,16 @@ void MtmOnNodeDisconnect(int nodeId)
14431452
MtmCheckQuorum();
14441453
/* Interrupt voting for active transaction and abort them */
14451454
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1446-
if (!ts->votingCompleted && MtmIsCoordinator(ts)) {
1455+
if (MtmIsCoordinator(ts)) {
1456+
if (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED) {
1457+
MtmAbortTransaction(ts);
1458+
MtmWakeUpBackend(ts);
1459+
}
1460+
} else if (ts->gtid.node == nodeId) { //coordinator of transaction is on disabled node
14471461
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1448-
MTM_LOG1("2) Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
14491462
MtmAbortTransaction(ts);
1450-
}
1451-
MtmWakeUpBackend(ts);
1463+
FinishPreparedTransaction(ts->gid, false);
1464+
}
14521465
}
14531466
}
14541467
}

multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ typedef struct MtmTransState
140140
{
141141
TransactionId xid;
142142
XidStatus status;
143-
GlobalTransactionId gtid;
143+
char gid[MULTIMASTER_MAX_GID_SIZE]; /* Global transaction ID (used for 2PC) */
144+
GlobalTransactionId gtid; /* Transaction id at coordinator */
144145
csn_t csn; /* commit serial number */
145146
csn_t snapshot; /* transaction snapshot, or INVALID_CSN for local transactions */
146147
int nVotes; /* number of votes received from replcas for this transaction:
@@ -153,7 +154,7 @@ typedef struct MtmTransState
153154
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
154155
bool votingCompleted; /* 2PC voting is completed */
155156
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
156-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
157+
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
157158
} MtmTransState;
158159

159160
typedef struct

0 commit comments

Comments
 (0)