Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8920f0b

Browse files
knizhnikkelvich
authored andcommitted
Support users 2PC
1 parent b093bb1 commit 8920f0b

File tree

4 files changed

+95
-13
lines changed

4 files changed

+95
-13
lines changed

arbiter.c

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,18 @@ static void MtmReceiver(Datum arg)
997997
/* All nodes are finished their transactions */
998998
if (ts->status == TRANSACTION_STATUS_ABORTED) {
999999
MtmWakeUpBackend(ts);
1000-
} else if (MtmUseDtm) {
1001-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1002-
ts->nVotes = 1; /* I voted myself */
1003-
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PREPARE");
1004-
MtmSend2PCMessage(ts, MSG_PREPARE);
10051000
} else {
10061001
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1007-
ts->status = TRANSACTION_STATUS_UNKNOWN;
1008-
MtmWakeUpBackend(ts);
1002+
if (ts->isTwoPhase) {
1003+
MtmWakeUpBackend(ts);
1004+
} else if (MtmUseDtm) {
1005+
ts->nVotes = 1; /* I voted myself */
1006+
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PREPARE");
1007+
MtmSend2PCMessage(ts, MSG_PREPARE);
1008+
} else {
1009+
ts->status = TRANSACTION_STATUS_UNKNOWN;
1010+
MtmWakeUpBackend(ts);
1011+
}
10091012
}
10101013
}
10111014
}

multimaster.c

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
129129
static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
130130
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
131131
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
132+
static void MtmCommitPreparedTransaction(MtmCurrentTrans* x);
132133
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
133134
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
134135
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -684,6 +685,9 @@ MtmXactCallback(XactEvent event, void *arg)
684685
case XACT_EVENT_ABORT_PREPARED:
685686
MtmAbortPreparedTransaction(&MtmTx);
686687
break;
688+
case XACT_EVENT_COMMIT_PREPARED:
689+
MtmCommitPreparedTransaction(&MtmTx);
690+
break;
687691
case XACT_EVENT_COMMIT:
688692
MtmEndTransaction(&MtmTx, true);
689693
break;
@@ -792,6 +796,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
792796
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
793797
ts->snapshot = x->snapshot;
794798
ts->isLocal = true;
799+
ts->isTwoPhase = x->isTwoPhase;
795800
if (!found) {
796801
ts->isEnqueued = false;
797802
ts->isActive = false;
@@ -969,6 +974,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
969974
x->status = ts->status;
970975
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
971976
MtmUnlock();
977+
if (x->isTwoPhase) {
978+
MtmResetTransaction();
979+
}
972980
}
973981
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
974982
if (Mtm->inject2PCError == 3) {
@@ -979,6 +987,74 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
979987
MTM_TXTRACE(x, "PostPrepareTransaction Finish");
980988
}
981989

990+
static void
991+
MtmCommitPreparedTransaction(MtmCurrentTrans* x)
992+
{
993+
MtmTransMap* tm;
994+
MtmTransState* ts;
995+
996+
if (Mtm->status == MTM_RECOVERY || x->isReplicated || x->isPrepared) { /* Ignore auto-2PC originated by multimaster */
997+
return;
998+
}
999+
MtmLock(LW_EXCLUSIVE);
1000+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_FIND, NULL);
1001+
if (tm == NULL) {
1002+
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
1003+
} else {
1004+
time_t transTimeout = MSEC_TO_USEC(Mtm2PCMinTimeout);
1005+
int nConfigChanges = Mtm->nConfigChanges;
1006+
timestamp_t start = MtmGetSystemTime();
1007+
int result = 0;
1008+
1009+
Assert(tm->state != NULL);
1010+
MTM_LOG1("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
1011+
ts = tm->state;
1012+
1013+
Assert(MtmIsCoordinator(ts));
1014+
1015+
ts->votingCompleted = false;
1016+
ts->nVotes = 1; /* I voted myself */
1017+
ts->procno = MyProc->pgprocno;
1018+
MTM_TXTRACE(ts, "Coordinator sends MSG_PREPARE");
1019+
MtmSend2PCMessage(ts, MSG_PREPARE);
1020+
1021+
/* Wait votes from all nodes until: */
1022+
while (!ts->votingCompleted /* all nodes voted */
1023+
&& nConfigChanges == Mtm->nConfigChanges /* configarion is changed */
1024+
&& Mtm->status == MTM_ONLINE /* node is not online */
1025+
&& ts->status != TRANSACTION_STATUS_ABORTED /* transaction is aborted */
1026+
&& start + transTimeout >= MtmGetSystemTime()) /* timeout is expired */
1027+
{
1028+
MtmUnlock();
1029+
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
1030+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
1031+
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Finish");
1032+
/* Emergency bailout if postmaster has died */
1033+
if (result & WL_POSTMASTER_DEATH) {
1034+
proc_exit(1);
1035+
}
1036+
if (result & WL_LATCH_SET) {
1037+
MTM_LOG3("Latch signaled at %ld", MtmGetSystemTime());
1038+
ResetLatch(&MyProc->procLatch);
1039+
}
1040+
MtmLock(LW_EXCLUSIVE);
1041+
}
1042+
if (ts->status != TRANSACTION_STATUS_ABORTED && (!ts->votingCompleted || nConfigChanges != Mtm->nConfigChanges)) {
1043+
if (nConfigChanges != Mtm->nConfigChanges) {
1044+
elog(WARNING, "Transaction %d (%s) is aborted because cluster configuration is changed during commit", x->xid, x->gid);
1045+
} else {
1046+
elog(WARNING, "Transaction %d (%s) is aborted because of %d msec timeout expiration, prepare time %d msec",
1047+
x->xid, x->gid, (int)USEC_TO_MSEC(transTimeout), (int)USEC_TO_MSEC(ts->csn - x->snapshot));
1048+
}
1049+
MtmAbortTransaction(ts);
1050+
}
1051+
x->status = ts->status;
1052+
x->xid = ts->xid;
1053+
x->isPrepared = true;
1054+
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
1055+
}
1056+
MtmUnlock();
1057+
}
9821058

9831059
static void
9841060
MtmAbortPreparedTransaction(MtmCurrentTrans* x)
@@ -1008,9 +1084,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
10081084
static void
10091085
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
10101086
{
1011-
MTM_LOG3("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s",
1012-
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->gid, commit ? "commit" : "abort");
1013-
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated)) {
1087+
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1088+
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->isTwoPhase, x->gid, commit ? "commit" : "abort");
1089+
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated) && !x->isTwoPhase) {
10141090
MtmTransState* ts = NULL;
10151091
MtmLock(LW_EXCLUSIVE);
10161092
if (x->isPrepared) {
@@ -3819,9 +3895,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38193895
}
38203896
break;
38213897
case TRANS_STMT_PREPARE:
3898+
MtmTx.isTwoPhase = true;
3899+
strcpy(MtmTx.gid, stmt->gid);
3900+
break;
3901+
/* nobreak */
38223902
case TRANS_STMT_COMMIT_PREPARED:
38233903
case TRANS_STMT_ROLLBACK_PREPARED:
3824-
MtmTx.isTwoPhase = true;
3904+
Assert(!MtmTx.isTwoPhase);
38253905
strcpy(MtmTx.gid, stmt->gid);
38263906
break;
38273907
default:

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ typedef struct MtmTransState
213213
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
214214
bool isEnqueued; /* Transaction is inserted in queue */
215215
bool isActive; /* Transaction is active */
216+
bool isTwoPhase; /* user level 2PC */
216217
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
217218
nodemask_t votedMask; /* Mask of voted nodes */
218219
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */

pglogical_proto.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
else
187187
Assert(false);
188188

189-
Assert(flags != PGLOGICAL_COMMIT_PREPARED || txn->xid < 1000 || MtmTransactionRecords != 1);
190-
191189
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
192190
// if (MtmIsFilteredTxn) {
193191
// Assert(MtmTransactionRecords == 0);

0 commit comments

Comments
 (0)