Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bd4d9dc

Browse files
knizhnikkelvich
authored andcommitted
Fix setting origin for local commits
1 parent f0d434f commit bd4d9dc

File tree

5 files changed

+61
-61
lines changed

5 files changed

+61
-61
lines changed

arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ static void MtmSetSocketOptions(int sd)
314314
static void MtmCheckResponse(MtmArbiterMessage* resp)
315315
{
316316
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) && !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)) {
317-
elog(WARNING, "Node %d thinks that I was dead, while I am %s", resp->node, MtmNodeStatusMnem[Mtm->status]);
317+
elog(WARNING, "Node %d thinks that I was dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], messageKindText[resp->code]);
318318
if (Mtm->status != MTM_RECOVERY) {
319319
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
320320
MtmSwitchClusterMode(MTM_RECOVERY);
@@ -918,7 +918,7 @@ static void MtmReceiver(Datum arg)
918918
if (msg->status == TRANSACTION_STATUS_IN_PROGRESS || msg->status == TRANSACTION_STATUS_ABORTED) {
919919
elog(LOG, "Abort transaction %s because it is in state %d at node %d",
920920
msg->gid, ts->status, node);
921-
MtmFinishPreparedTransaction(node, ts, false);
921+
MtmFinishPreparedTransaction(ts, false);
922922
}
923923
else if (msg->status == TRANSACTION_STATUS_COMMITTED || msg->status == TRANSACTION_STATUS_UNKNOWN)
924924
{
@@ -928,7 +928,7 @@ static void MtmReceiver(Datum arg)
928928
}
929929
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
930930
elog(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
931-
MtmFinishPreparedTransaction(node, ts, true);
931+
MtmFinishPreparedTransaction(ts, true);
932932
}
933933
} else {
934934
elog(LOG, "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",

multimaster.c

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11031103
static void
11041104
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11051105
{
1106-
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1106+
MTM_LOG2("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
11071107
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->isTwoPhase, x->gid, commit ? "commit" : "abort");
11081108
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated) && !x->isTwoPhase) {
11091109
MtmTransState* ts = NULL;
@@ -1121,7 +1121,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11211121
}
11221122
if (ts != NULL) {
11231123
if (*ts->gid)
1124-
MTM_LOG1("TRANSLOG: %s transaction %s status %d", (commit ? "commit" : "rollback"), ts->gid, ts->status);
1124+
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ? "commit" : "rollback"), ts->gid, ts->status);
11251125
if (commit) {
11261126
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
11271127
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
@@ -1176,6 +1176,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11761176
Mtm->nActiveTransactions -= 1;
11771177
}
11781178
MtmTransactionListAppend(ts);
1179+
if (*x->gid) {
1180+
LogLogicalMessage("A", x->gid, strlen(x->gid) + 1, false);
1181+
}
11791182
}
11801183
MtmSend2PCMessage(ts, MSG_ABORTED); /* send notification to coordinator */
11811184
} else if (x->status == TRANSACTION_STATUS_ABORTED && x->isReplicated && !x->isPrepared) {
@@ -1229,7 +1232,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12291232
MtmSendMessage(&msg);
12301233
}
12311234
}
1232-
} else {
1235+
} else if (!BIT_CHECK(Mtm->disabledNodeMask, ts->gtid.node-1)) {
12331236
msg.node = ts->gtid.node;
12341237
msg.dxid = ts->gtid.xid;
12351238
MtmSendMessage(&msg);
@@ -1435,7 +1438,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14351438
Assert(ts->gid[0]);
14361439
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
14371440
elog(LOG, "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d", ts->gid, MtmNodeId);
1438-
MtmFinishPreparedTransaction(disabledNodeId, ts, false);
1441+
MtmFinishPreparedTransaction(ts, false);
14391442
} else {
14401443
MTM_LOG1("Poll state of transaction %d (%s)", ts->xid, ts->gid);
14411444
MtmBroadcastPollMessage(ts);
@@ -1458,7 +1461,9 @@ static void MtmDisableNode(int nodeId)
14581461
if (nodeId != MtmNodeId) {
14591462
Mtm->nLiveNodes -= 1;
14601463
}
1464+
MtmUnlock();
14611465
MtmPollStatusOfPreparedTransactions(nodeId);
1466+
MtmLock(LW_EXCLUSIVE);
14621467
}
14631468

14641469
static void MtmEnableNode(int nodeId)
@@ -2779,34 +2784,41 @@ void MtmReleaseRecoverySlot(int nodeId)
27792784
}
27802785
}
27812786

2782-
void MtmFinishPreparedTransaction(int nodeId, MtmTransState* ts, bool commit)
2787+
void MtmRollbackPreparedTransaction(char const* gid)
27832788
{
2789+
MTM_LOG1("Abort prepared transaction %s", gid);
2790+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
2791+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
2792+
MtmResetTransaction();
2793+
StartTransactionCommand();
2794+
MtmBeginSession(MtmReplicationNodeId);
2795+
MtmSetCurrentTransactionGID(gid);
2796+
FinishPreparedTransaction(gid, false);
2797+
CommitTransactionCommand();
2798+
MtmEndSession(MtmReplicationNodeId, true);
2799+
}
2800+
}
2801+
2802+
2803+
void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
2804+
{
2805+
if (Mtm->nodes[MtmNodeId-1].originId == InvalidRepOriginId) {
2806+
/* This dummy origin is used for local commits/aborts which should not be replicated */
2807+
Mtm->nodes[MtmNodeId-1].originId = replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId));
2808+
}
27842809
Assert(ts->votingCompleted);
27852810
Assert(!IsTransactionState());
27862811
MtmResetTransaction();
27872812
StartTransactionCommand();
2788-
MtmBeginSession(nodeId);
2813+
MtmBeginSession(MtmNodeId);
27892814
MtmSetCurrentTransactionCSN(ts->csn);
27902815
MtmSetCurrentTransactionGID(ts->gid);
27912816
FinishPreparedTransaction(ts->gid, commit);
27922817
CommitTransactionCommand();
2793-
MtmEndSession(nodeId, true);
2818+
MtmEndSession(MtmNodeId, true);
27942819
Assert(ts->status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED);
27952820
}
27962821

2797-
#if 0
2798-
static void MtmFinishAllPreparedTransactions(void)
2799-
{
2800-
MtmTransState* ts;
2801-
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
2802-
if (ts->status != TRANSACTION_STATUS_COMMITTED && ts->status != TRANSACTION_STATUS_ABORTED) {
2803-
MtmFinishPreparedTransaction(MtmReplicationNodeId, ts, false);
2804-
}
2805-
}
2806-
}
2807-
#endif
2808-
2809-
28102822
/*
28112823
* Determine when and how we should open replication slot.
28122824
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -2840,11 +2852,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28402852
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
28412853
}
28422854
MtmUnlock();
2843-
#if 0
2844-
MtmBeginSession(MtmReplicationNodeId);
2845-
FinishAllPreparedTransactions(false);
2846-
MtmEndSession(MtmReplicationNodeId, true);
2847-
#endif
28482855
return REPLMODE_RECOVERY;
28492856
}
28502857
}

multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,6 @@ extern void MtmReleaseRecoverySlot(int nodeId);
360360
extern PGconn *PQconnectdb_safe(const char *conninfo);
361361
extern void MtmBeginSession(int nodeId);
362362
extern void MtmEndSession(int nodeId, bool unlock);
363-
extern void MtmFinishPreparedTransaction(int nodeId, MtmTransState* ts, bool commit);
364-
363+
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
364+
extern void MtmRollbackPreparedTransaction(char const* gid);
365365
#endif

pglogical_apply.c

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ process_remote_message(StringInfo s)
425425
}
426426
break;
427427
}
428+
case 'A':
429+
{
430+
MtmRollbackPreparedTransaction(messageBody);
431+
standalone = true;
432+
break;
433+
}
428434
case 'L':
429435
{
430436
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
@@ -590,8 +596,8 @@ process_remote_commit(StringInfo in)
590596
MtmReplicationNodeId = pq_getmsgbyte(in);
591597

592598
/* read fields */
593-
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
594-
end_lsn = pq_getmsgint64(in); /* end_lsn */
599+
pq_getmsgint64(in); /* commit_lsn */
600+
replorigin_session_origin_lsn = end_lsn = pq_getmsgint64(in); /* end_lsn */
595601
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
596602

597603
origin_node = pq_getmsgbyte(in);
@@ -609,6 +615,7 @@ process_remote_commit(StringInfo in)
609615
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
610616
MtmBeginSession(MtmReplicationNodeId);
611617
CommitTransactionCommand();
618+
MtmEndSession(MtmReplicationNodeId, true);
612619
}
613620
break;
614621
}
@@ -639,6 +646,7 @@ process_remote_commit(StringInfo in)
639646
FinishPreparedTransaction(gid, false);
640647
CommitTransactionCommand();
641648
}
649+
MtmEndSession(MtmReplicationNodeId, true);
642650
}
643651
break;
644652
}
@@ -655,34 +663,19 @@ process_remote_commit(StringInfo in)
655663
MtmSetCurrentTransactionGID(gid);
656664
FinishPreparedTransaction(gid, true);
657665
CommitTransactionCommand();
666+
MtmEndSession(MtmReplicationNodeId, true);
658667
break;
659668
}
660669
case PGLOGICAL_ABORT_PREPARED:
661670
{
662671
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
663672
gid = pq_getmsgstring(in);
664-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
665-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
666-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
667-
MtmResetTransaction();
668-
StartTransactionCommand();
669-
MtmBeginSession(MtmReplicationNodeId);
670-
MtmSetCurrentTransactionGID(gid);
671-
FinishPreparedTransaction(gid, false);
672-
CommitTransactionCommand();
673-
}
673+
MtmRollbackPreparedTransaction(gid);
674674
break;
675675
}
676676
default:
677677
Assert(false);
678678
}
679-
#if 0 /* Do ont need to advance slot position here: it will be done by transaction commit */
680-
if (replorigin_session_origin != InvalidRepOriginId) {
681-
replorigin_advance(replorigin_session_origin, end_lsn,
682-
XactLastCommitEnd, false, false);
683-
}
684-
#endif
685-
MtmEndSession(MtmReplicationNodeId, true);
686679
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
687680
if (flags & PGLOGICAL_CAUGHT_UP) {
688681
MtmRecoveryCompleted();

pglogical_proto.c

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,29 +133,29 @@ static void
133133
pglogical_write_message(StringInfo out,
134134
const char *prefix, Size sz, const char *message)
135135
{
136-
if (*prefix == 'L')
137-
{
138-
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
139-
}
140-
else if (*prefix == 'D')
141-
{
142-
if (MtmTransactionSnapshot(MtmCurrentXid) == INVALID_CSN)
143-
{
136+
switch (*prefix) {
137+
case 'L':
138+
if (MtmIsRecoveredNode(MtmReplicationNodeId)) {
139+
return;
140+
} else {
141+
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
142+
}
143+
break;
144+
case 'D':
145+
if (MtmTransactionSnapshot(MtmCurrentXid) == INVALID_CSN) {
144146
MTM_LOG2("%d: pglogical_write_message filtered", MyProcPid);
145147
return;
146148
}
147149
DDLInProress = true;
148-
}
149-
else if (*prefix == 'E')
150-
{
150+
break;
151+
case 'E':
151152
DDLInProress = false;
152153
/*
153154
* we use End message only as indicator of DDL transaction finish,
154155
* so no need to send that to replicas.
155156
*/
156157
return;
157158
}
158-
159159
pq_sendbyte(out, 'M');
160160
pq_sendbyte(out, *prefix);
161161
pq_sendint(out, sz, 4);

0 commit comments

Comments
 (0)