Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit dc5ae4a

Browse files
knizhnikkelvich
authored andcommitted
Fixes in 2pc
1 parent 5e35877 commit dc5ae4a

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

arbiter.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,7 @@ static void MtmTransReceiver(Datum arg)
677677
ts->status = TRANSACTION_STATUS_UNKNOWN;
678678
MtmWakeUpBackend(ts);
679679
}
680+
break;
680681
default:
681682
Assert(false);
682683
}

multimaster.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ static void MtmXactCallback(XactEvent event, void *arg);
110110
static void MtmBeginTransaction(MtmCurrentTrans* x);
111111
static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
112112
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
113+
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
113114
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
114115
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
115116
static bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
@@ -496,6 +497,9 @@ MtmXactCallback(XactEvent event, void *arg)
496497
case XACT_EVENT_POST_PREPARE:
497498
MtmPostPrepareTransaction(&dtmTx);
498499
break;
500+
case XACT_EVENT_ABORT_PREPARED:
501+
MtmAbortPreparedTransaction(&dtmTx);
502+
break;
499503
case XACT_EVENT_COMMIT:
500504
MtmEndTransaction(&dtmTx, true);
501505
break;
@@ -522,6 +526,7 @@ MtmResetTransaction(MtmCurrentTrans* x)
522526
x->snapshot = INVALID_CSN;
523527
x->xid = InvalidTransactionId;
524528
x->gtid.xid = InvalidTransactionId;
529+
x->isDistributed = false;
525530
}
526531

527532
static void
@@ -620,14 +625,16 @@ static void
620625
MtmPostPrepareTransaction(MtmCurrentTrans* x)
621626
{
622627
MtmTransState* ts;
628+
MtmTransMap* tm;
623629

624630
MtmLock(LW_EXCLUSIVE);
625631
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
626632
Assert(ts != NULL);
633+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
634+
Assert(x->gid[0]);
635+
tm->state = ts;
636+
627637
if (!MtmIsCoordinator(ts)) {
628-
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
629-
Assert(x->gid[0]);
630-
tm->state = ts;
631638
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
632639
MtmUnlock();
633640
MtmResetTransaction(x);
@@ -646,6 +653,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
646653
}
647654

648655

656+
static void
657+
MtmAbortPreparedTransaction(MtmCurrentTrans* x)
658+
{
659+
MtmTransMap* tm;
660+
661+
MtmLock(LW_EXCLUSIVE);
662+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
663+
Assert(tm != NULL);
664+
tm->state->status = TRANSACTION_STATUS_ABORTED;
665+
MtmAdjustSubtransactions(tm->state);
666+
MtmUnlock();
667+
MtmResetTransaction(x);
668+
}
669+
649670
static void
650671
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
651672
{
@@ -695,9 +716,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
695716
}
696717
MtmUnlock();
697718
}
698-
x->snapshot = INVALID_CSN;
699-
x->xid = InvalidTransactionId;
700-
x->gtid.xid = InvalidTransactionId;
719+
MtmResetTransaction(x);
701720
MtmCheckSlots();
702721
}
703722

@@ -1735,17 +1754,16 @@ MtmGenerateGid(char* gid)
17351754

17361755
static void MtmTwoPhaseCommit(char *completionTag)
17371756
{
1738-
char gid[MULTIMASTER_MAX_GID_SIZE];
1739-
MtmGenerateGid(gid);
1757+
MtmGenerateGid(dtmTx.gid);
17401758
if (!IsTransactionBlock()) {
17411759
elog(WARNING, "Start transaction block for %d", dtmTx.xid);
17421760
BeginTransactionBlock();
17431761
CommitTransactionCommand();
17441762
StartTransactionCommand();
17451763
}
1746-
if (!PrepareTransactionBlock(gid))
1764+
if (!PrepareTransactionBlock(dtmTx.gid))
17471765
{
1748-
elog(WARNING, "Failed to prepare transaction %s", gid);
1766+
elog(WARNING, "Failed to prepare transaction %s", dtmTx.gid);
17491767
/* report unsuccessful commit in completionTag */
17501768
if (completionTag) {
17511769
strcpy(completionTag, "ROLLBACK");
@@ -1755,10 +1773,10 @@ static void MtmTwoPhaseCommit(char *completionTag)
17551773
CommitTransactionCommand();
17561774
StartTransactionCommand();
17571775
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
1758-
FinishPreparedTransaction(gid, false);
1759-
elog(ERROR, "Transaction %s is aborted by DTM", gid);
1776+
FinishPreparedTransaction(dtmTx.gid, false);
1777+
elog(ERROR, "Transaction %s is aborted by DTM", dtmTx.gid);
17601778
} else {
1761-
FinishPreparedTransaction(gid, true);
1779+
FinishPreparedTransaction(dtmTx.gid, true);
17621780
}
17631781
}
17641782
}

pglogical_apply.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,21 @@ read_rel(StringInfo s, LOCKMODE mode)
464464
return heap_open(relid, NoLock);
465465
}
466466

467+
static void
468+
MtmSetCurrentSession(int nodeId)
469+
{
470+
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
471+
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, nodeId);
472+
replorigin_session_origin = replorigin_by_name(slot_name, false);
473+
replorigin_session_setup(replorigin_session_origin);
474+
}
475+
467476
static void
468477
process_remote_commit(StringInfo in)
469478
{
470479
uint8 flags;
471480
uint8 nodeId;
472481
const char *gid = NULL;
473-
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
474482

475483
/* read flags */
476484
flags = pq_getmsgbyte(in);
@@ -481,17 +489,14 @@ process_remote_commit(StringInfo in)
481489
pq_getmsgint64(in); /* end_lsn */
482490
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
483491

484-
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, nodeId);
485-
replorigin_session_origin = replorigin_by_name(slot_name, false);
486-
replorigin_session_setup(replorigin_session_origin);
487-
488492
switch(PGLOGICAL_XACT_EVENT(flags))
489493
{
490494
case PGLOGICAL_COMMIT:
491495
{
492496
MTM_TRACE("%d: PGLOGICAL_COMMIT commit\n", MyProcPid);
493497
if (IsTransactionState()) {
494498
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
499+
MtmSetCurrentSession(nodeId);
495500
CommitTransactionCommand();
496501
}
497502
break;
@@ -505,6 +510,7 @@ process_remote_commit(StringInfo in)
505510
BeginTransactionBlock();
506511
CommitTransactionCommand();
507512
StartTransactionCommand();
513+
MtmSetCurrentSession(nodeId);
508514
/* PREPARE itself */
509515
MtmSetCurrentTransactionGID(gid);
510516
PrepareTransactionBlock(gid);
@@ -517,6 +523,7 @@ process_remote_commit(StringInfo in)
517523
gid = pq_getmsgstring(in);
518524
MTM_TRACE("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n", MyProcPid, csn, gid);
519525
StartTransactionCommand();
526+
MtmSetCurrentSession(nodeId);
520527
MtmSetCurrentTransactionGID(gid);
521528
FinishPreparedTransaction(gid, true);
522529
CommitTransactionCommand();

0 commit comments

Comments
 (0)