Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5c6fae5

Browse files
committed
parse gid to set right node_id
1 parent 2ae23e9 commit 5c6fae5

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

multimaster.c

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
180180
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int forceOnNode);
181181
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
182182

183+
static int MtmGidParseNodeId(const char* gid);
184+
183185
// static void MtmLockCluster(void);
184186
// static void MtmUnlockCluster(void);
185187

@@ -1060,6 +1062,8 @@ MtmCreateTransState(MtmCurrentTrans* x)
10601062
}
10611063
strcpy(ts->gid, x->gid);
10621064
x->isActive = true;
1065+
1066+
Assert(ts->gid[0] == '\0' || MtmGidParseNodeId(ts->gid) == ts->gtid.node);
10631067
return ts;
10641068
}
10651069

@@ -1807,7 +1811,7 @@ static void MtmLoadPreparedTransactions(void)
18071811
if (!found || tm->state == NULL) {
18081812
TransactionId xid = GetNewTransactionId(false);
18091813
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_ENTER, &found);
1810-
MTM_LOG1("Recover prepared transaction %s (%llu) state=%s", gid, (long64)xid, pxacts[i].state_3pc);
1814+
MTM_LOG1("Load prepared transaction %s (%llu) state=%s", gid, (long64)xid, pxacts[i].state_3pc);
18111815
MyPgXact->xid = InvalidTransactionId; /* dirty hack:((( */
18121816
Assert(!found);
18131817
ts->isEnqueued = false;
@@ -1820,7 +1824,7 @@ static void MtmLoadPreparedTransactions(void)
18201824
ts->snapshot = INVALID_CSN;
18211825
ts->isTwoPhase = false;
18221826
ts->csn = 0; /* should be replaced with real CSN by poll result */
1823-
ts->gtid.node = MtmNodeId;
1827+
ts->gtid.node = MtmGidParseNodeId(gid);
18241828
ts->gtid.xid = xid;
18251829
ts->nSubxids = 0;
18261830
ts->votingCompleted = true;
@@ -1911,6 +1915,7 @@ void MtmSetCurrentTransactionGID(char const* gid)
19111915
strcpy(MtmTx.gid, gid);
19121916
MtmTx.isDistributed = true;
19131917
MtmTx.isReplicated = true;
1918+
MtmTx.gtid.node = MtmGidParseNodeId(gid);
19141919
}
19151920

19161921
TransactionId MtmGetCurrentTransactionId(void)
@@ -2066,6 +2071,8 @@ void MtmPollStatusOfPreparedTransactionsForDisabledNode(int disabledNodeId, bool
20662071
&& (ts->status == TRANSACTION_STATUS_UNKNOWN || ts->status == TRANSACTION_STATUS_IN_PROGRESS))
20672072
{
20682073
Assert(ts->gid[0]);
2074+
Assert(MtmGidParseNodeId(ts->gid) == ts->gtid.node);
2075+
20692076
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
20702077
MTM_ELOG(LOG, "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d", ts->gid, MtmNodeId);
20712078
TXFINISH("%s ABORT, PollStatusOfPrepared", ts->gid);
@@ -2119,6 +2126,7 @@ MtmPollStatusOfPreparedTransactions(bool majorMode)
21192126
&& (ts->status == TRANSACTION_STATUS_UNKNOWN || ts->status == TRANSACTION_STATUS_IN_PROGRESS))
21202127
{
21212128
Assert(ts->gid[0]);
2129+
Assert(MtmGidParseNodeId(ts->gid) == ts->gtid.node);
21222130

21232131
if (majorMode)
21242132
{
@@ -2130,7 +2138,7 @@ MtmPollStatusOfPreparedTransactions(bool majorMode)
21302138
// MtmBroadcastPollMessage(ts);
21312139
if (ts->gtid.node == MtmNodeId && !ts->isPrepared)
21322140
{
2133-
MTM_LOG1("Abort our in-progress transaction %s", ts->gid);
2141+
MTM_LOG1("Abort our in-progress transaction %s (%d, %d)", ts->gid, ts->gtid.node, MtmNodeId);
21342142
MtmFinishPreparedTransaction(ts, false);
21352143
}
21362144
else
@@ -4756,6 +4764,15 @@ MtmGenerateGid(char* gid)
47564764
{
47574765
static int localCount;
47584766
sprintf(gid, "MTM-%d-%d-%d-" INT64_FORMAT, MtmNodeId, MyProcPid, ++localCount, (int64) GetCurrentTimestamp());
4767+
// MTM_LOG1("MtmGenerateGid: %s", gid);
4768+
}
4769+
4770+
static int
4771+
MtmGidParseNodeId(const char* gid)
4772+
{
4773+
int MtmNodeId;
4774+
sscanf(gid, "MTM-%d-%*d-%*d-%*d", &MtmNodeId);
4775+
return MtmNodeId;
47594776
}
47604777

47614778
/*

pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
160160
MTM_LOG2("%d: pglogical_write_begin XID=%lld sent", MyProcPid, (long64)txn->xid);
161161
pq_sendbyte(out, 'B'); /* BEGIN */
162162
pq_sendint(out, MtmNodeId, 4);
163-
pq_sendint64(out, isRecovery ? InvalidTransactionId : txn->xid);
163+
pq_sendint64(out, txn->xid);
164164
pq_sendint64(out, csn);
165165
pq_sendint64(out, participantsMask);
166166

0 commit comments

Comments
 (0)