Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c94bbba

Browse files
knizhnikkelvich
authored andcommitted
Correctly handle duplicated PREPARE message
1 parent 8595c4b commit c94bbba

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
258258
{
259259
int rc;
260260
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
261-
if (rc < 0 && errno == EAGAIN) {
261+
if (rc <= 0 && errno == EAGAIN) {
262262
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
263263
if (rc == 1) {
264264
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);

multimaster.c

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
854854
MtmTransState* ts;
855855
MtmTransMap* tm;
856856
TransactionId* subxids;
857+
bool found;
857858
MTM_TXTRACE(x, "PrePrepareTransaction Start");
858859

859860
if (!x->isDistributed) {
@@ -869,7 +870,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
869870

870871
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
871872
/* Do not take in account bg-workers which are performing recovery */
872-
elog(ERROR, "Abort current transaction because this cluster node is in %s status", MtmNodeStatusMnem[Mtm->status]);
873+
elog(ERROR, "Abort transaction %s (%llu) because this cluster node is in %s status", x->gid, (long64)x->xid, MtmNodeStatusMnem[Mtm->status]);
873874
}
874875
if (TransactionIdIsValid(x->gtid.xid) && BIT_CHECK(Mtm->disabledNodeMask, x->gtid.node-1)) {
875876
/* Coordinator of transaction is disabled: just abort transaction without any further steps */
@@ -878,6 +879,14 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
878879

879880
MtmLock(LW_EXCLUSIVE);
880881

882+
Assert(*x->gid != '\0');
883+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, &found);
884+
if (found && tm->status != TRANSACTION_STATUS_IN_PROGRESS) {
885+
Assert(tm->status == TRANSACTION_STATUS_ABORTED);
886+
MtmUnlock();
887+
elog(ERROR, "Skip already aborted transaction %s (%llu) from node %d", x->gid, (long64)x->xid, x->gtid.node);
888+
}
889+
881890
ts = MtmCreateTransState(x);
882891
/*
883892
* Invalid CSN prevent replication of transaction by logical replication
@@ -898,8 +907,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
898907
x->isPrepared = true;
899908
x->csn = ts->csn;
900909

901-
Assert(*x->gid != '\0');
902-
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
903910
tm->state = ts;
904911
tm->status = TRANSACTION_STATUS_IN_PROGRESS;
905912
MTM_LOG2("Prepare transaction %s", x->gid);

tests2/test_recovery.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def assertCommits(self, aggs):
3131
for conn_id, agg in enumerate(aggs):
3232
commits = commits and 'commit' in agg['transfer']['finish']
3333
if not commits:
34+
print('No commits during aggregation interval')
35+
time.sleep(100000)
3436
raise AssertionError('No commits during aggregation interval')
3537

3638
def assertNoCommits(self, aggs):

0 commit comments

Comments
 (0)