Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b6e113a

Browse files
knizhnikkelvich
authored andcommitted
Avoid loops in transaction list
1 parent 1a120b5 commit b6e113a

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ void MtmLock(LWLockMode mode)
242242
#else
243243
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
244244
#endif
245+
Mtm->lastLockHolder = MyProcPid;
245246
}
246247

247248
void MtmUnlock(void)
@@ -251,6 +252,7 @@ void MtmUnlock(void)
251252
#else
252253
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
253254
#endif
255+
Mtm->lastLockHolder = 0;
254256
}
255257

256258
void MtmLockNode(int nodeId)
@@ -549,16 +551,20 @@ MtmAdjustOldestXid(TransactionId xid)
549551

550552
static void MtmTransactionListAppend(MtmTransState* ts)
551553
{
552-
ts->next = NULL;
553-
ts->nSubxids = 0;
554-
*Mtm->transListTail = ts;
555-
Mtm->transListTail = &ts->next;
554+
if (!ts->isEnqueued) {
555+
ts->isEnqueued = true;
556+
ts->next = NULL;
557+
ts->nSubxids = 0;
558+
*Mtm->transListTail = ts;
559+
Mtm->transListTail = &ts->next;
560+
}
556561
}
557562

558563
static void MtmTransactionListInsertAfter(MtmTransState* after, MtmTransState* ts)
559564
{
560565
ts->next = after->next;
561566
after->next = ts;
567+
ts->isEnqueued = true;
562568
if (Mtm->transListTail == &after->next) {
563569
Mtm->transListTail = &ts->next;
564570
}
@@ -699,6 +705,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
699705
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
700706
ts->snapshot = x->snapshot;
701707
ts->isLocal = true;
708+
if (!found) {
709+
ts->isEnqueued = false;
710+
}
702711
if (TransactionIdIsValid(x->gtid.xid)) {
703712
Assert(x->gtid.node != MtmNodeId);
704713
ts->gtid = x->gtid;
@@ -832,6 +841,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
832841
Assert(x->gid[0]);
833842
tm->state = ts;
834843
ts->votingCompleted = true;
844+
if (!found) {
845+
ts->isEnqueued = false;
846+
}
835847
if (Mtm->status != MTM_RECOVERY) {
836848
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
837849
if (!MtmUseDtm) {
@@ -944,8 +956,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
944956
*/
945957
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
946958
if (ts == NULL) {
959+
bool found;
947960
Assert(TransactionIdIsValid(x->xid));
948-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
961+
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
962+
if (!found) {
963+
ts->isEnqueued = false;
964+
}
949965
ts->status = TRANSACTION_STATUS_ABORTED;
950966
ts->isLocal = true;
951967
ts->snapshot = x->snapshot;
@@ -1363,7 +1379,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13631379
*/
13641380
bool MtmRefreshClusterStatus(bool nowait)
13651381
{
1366-
nodemask_t mask, clique, disabled, enabled;
1382+
nodemask_t mask, clique, disabled;
13671383
nodemask_t matrix[MAX_NODES];
13681384
MtmTransState *ts;
13691385
int clique_size;
@@ -1390,28 +1406,29 @@ bool MtmRefreshClusterStatus(bool nowait)
13901406
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long) clique, (long) Mtm->disabledNodeMask);
13911407
MtmLock(LW_EXCLUSIVE);
13921408
disabled = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1393-
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
13941409

13951410
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
13961411
if (mask & 1) {
13971412
MtmDisableNode(i+1);
13981413
}
1399-
}
1400-
1414+
}
1415+
#if 0 /* Do not enable nodes here: them will be enabled after completion of recovery */
1416+
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
14011417
for (i = 0, mask = enabled; mask != 0; i++, mask >>= 1) {
14021418
if (mask & 1) {
14031419
MtmEnableNode(i+1);
14041420
}
14051421
}
1406-
if (disabled|enabled) {
1422+
#endif
1423+
if (disabled) {
14071424
MtmCheckQuorum();
14081425
}
14091426
/* Interrupt voting for active transaction and abort them */
14101427
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
14111428
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
14121429
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
14131430
if (MtmIsCoordinator(ts)) {
1414-
if (!ts->votingCompleted && (disabled|enabled) != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1431+
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
14151432
MtmAbortTransaction(ts);
14161433
MtmWakeUpBackend(ts);
14171434
}
@@ -2221,6 +2238,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
22212238
{
22222239
if (nodeId <= 0 || nodeId > Mtm->nLiveNodes)
22232240
{
2241+
MtmUnlock();
22242242
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nLiveNodes);
22252243
}
22262244
MtmDisableNode(nodeId);
@@ -2286,6 +2304,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
22862304
MtmEnableNode(MtmReplicationNodeId);
22872305
MtmCheckQuorum();
22882306
} else {
2307+
MtmUnlock();
22892308
elog(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);
22902309
}
22912310
} else {

multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ typedef struct MtmTransState
163163
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
164164
bool votingCompleted; /* 2PC voting is completed */
165165
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
166-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
166+
bool isEnqueued; /* Transaction is inserted in queue */
167+
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
167168
} MtmTransState;
168169

169170
typedef struct
@@ -180,7 +181,7 @@ typedef struct
180181
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
181182
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
182183
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
183-
184+
int lastLockHolder; /* PID of process last obtaning the node lock */
184185
bool localTablesHashLoaded; /* Whether data from local_tables table is loaded in shared memory hash table */
185186
int inject2PCError; /* Simulate error during 2PC commit at this node */
186187
int nLiveNodes; /* Number of active nodes */

tests2/lib/bank_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ def exec_tx(self, name, tx_block):
137137
self.history.register_finish(event_id, 'Commit')
138138
except psycopg2.InterfaceError:
139139
self.history.register_finish(event_id, 'InterfaceError')
140-
except psycopg2.Error:
140+
except psycopg2.Error as x:
141+
print(x.pgerror)
141142
self.history.register_finish(event_id, 'PsycopgError')
142143

143144
cur.close()

0 commit comments

Comments
 (0)