Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7da0e33

Browse files
committed
Add commit_command hook
1 parent bfa7286 commit 7da0e33

File tree

4 files changed

+39
-31
lines changed

4 files changed

+39
-31
lines changed

contrib/mmts/multimaster.c

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111111
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
112112
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
113113
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
114+
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
114115
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
115116
static bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
116117
static TransactionId MtmAdjustOldestXid(TransactionId xid);
@@ -588,6 +589,11 @@ MtmXactCallback(XactEvent event, void *arg)
588589
case XACT_EVENT_ABORT:
589590
MtmEndTransaction(&MtmTx, false);
590591
break;
592+
case XACT_EVENT_COMMIT_COMMAND:
593+
if (!IsTransactionBlock()) {
594+
MtmTwoPhaseCommit(&MtmTx);
595+
}
596+
break;
591597
default:
592598
break;
593599
}
@@ -1922,33 +1928,33 @@ MtmGenerateGid(char* gid)
19221928
sprintf(gid, "MTM-%d-%d-%d", MtmNodeId, MyProcPid, ++localCount);
19231929
}
19241930

1925-
static void MtmTwoPhaseCommit(char *completionTag)
1931+
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
19261932
{
1927-
MtmGenerateGid(MtmTx.gid);
1928-
if (!IsTransactionBlock()) {
1929-
elog(WARNING, "Start transaction block for %d", MtmTx.xid);
1930-
BeginTransactionBlock();
1931-
CommitTransactionCommand();
1932-
StartTransactionCommand();
1933-
}
1934-
if (!PrepareTransactionBlock(MtmTx.gid))
1935-
{
1936-
elog(WARNING, "Failed to prepare transaction %s", MtmTx.gid);
1937-
/* report unsuccessful commit in completionTag */
1938-
if (completionTag) {
1939-
strcpy(completionTag, "ROLLBACK");
1933+
if (x->isDistributed && x->containsDML) {
1934+
MtmGenerateGid(x->gid);
1935+
if (!IsTransactionBlock()) {
1936+
elog(WARNING, "Start transaction block for %d", x->xid);
1937+
BeginTransactionBlock();
1938+
CommitTransactionCommand();
1939+
StartTransactionCommand();
19401940
}
1941-
/* ??? Should we do explicit rollback */
1942-
} else {
1943-
CommitTransactionCommand();
1944-
StartTransactionCommand();
1945-
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
1946-
FinishPreparedTransaction(MtmTx.gid, false);
1947-
elog(ERROR, "Transaction %s is aborted by DTM", MtmTx.gid);
1948-
} else {
1949-
FinishPreparedTransaction(MtmTx.gid, true);
1941+
if (!PrepareTransactionBlock(x->gid))
1942+
{
1943+
elog(WARNING, "Failed to prepare transaction %s", x->gid);
1944+
/* ??? Should we do explicit rollback */
1945+
} else {
1946+
CommitTransactionCommand();
1947+
StartTransactionCommand();
1948+
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
1949+
FinishPreparedTransaction(x->gid, false);
1950+
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
1951+
} else {
1952+
FinishPreparedTransaction(x->gid, true);
1953+
}
19501954
}
1955+
return true;
19511956
}
1957+
return false;
19521958
}
19531959

19541960
static void MtmProcessUtility(Node *parsetree, const char *queryString,
@@ -1965,8 +1971,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19651971
switch (stmt->kind)
19661972
{
19671973
case TRANS_STMT_COMMIT:
1968-
if (MtmTx.isDistributed && MtmTx.containsDML) {
1969-
MtmTwoPhaseCommit(completionTag);
1974+
if (MtmTwoPhaseCommit(&MtmTx)) {
19701975
return;
19711976
}
19721977
break;
@@ -2002,9 +2007,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
20022007
if (MtmProcessDDLCommand(queryString)) {
20032008
return;
20042009
}
2005-
if (MtmTx.isDistributed && MtmTx.containsDML && !IsTransactionBlock()) {
2006-
MtmTwoPhaseCommit(completionTag);
2007-
}
20082010
}
20092011
if (PreviousProcessUtilityHook != NULL)
20102012
{
@@ -2035,7 +2037,7 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20352037
}
20362038
}
20372039
if (MtmTx.isDistributed && MtmTx.containsDML && !IsTransactionBlock()) {
2038-
MtmTwoPhaseCommit(NULL);
2040+
MtmTwoPhaseCommit(&MtmTx);
20392041
}
20402042
}
20412043
if (PreviousExecutorFinishHook != NULL)

src/backend/access/transam/twophase.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,9 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12481248

12491249
hdr = (TwoPhaseFileHeader *) xlrec;
12501250
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1251+
bufptr += MAXALIGN(hdr->gidlen);
1252+
1253+
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12511254

12521255
parsed->twophase_xid = hdr->xid;
12531256
parsed->dbId = hdr->database;
@@ -1267,7 +1270,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12671270
parsed->msgs = (SharedInvalidationMessage *) bufptr;
12681271
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
12691272

1270-
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
1273+
// strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12711274
}
12721275

12731276

src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,6 +2728,8 @@ CommitTransactionCommand(void)
27282728
{
27292729
TransactionState s = CurrentTransactionState;
27302730

2731+
CallXactCallbacks(XACT_EVENT_COMMIT_COMMAND);
2732+
27312733
switch (s->blockState)
27322734
{
27332735
/*

src/include/access/xact.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
XACT_EVENT_PRE_PREPARE,
9393
XACT_EVENT_POST_PREPARE,
9494
XACT_EVENT_COMMIT_PREPARED,
95-
XACT_EVENT_ABORT_PREPARED
95+
XACT_EVENT_ABORT_PREPARED,
96+
XACT_EVENT_COMMIT_COMMAND
9697
} XactEvent;
9798

9899
typedef void (*XactCallback) (XactEvent event, void *arg);

0 commit comments

Comments
 (0)