Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8d58d05

Browse files
knizhnikkelvich
authored andcommitted
Rewrite origin LSN calculation
1 parent b69eae0 commit 8d58d05

File tree

4 files changed

+58
-41
lines changed

4 files changed

+58
-41
lines changed

multimaster.c

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,16 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11331133
}
11341134
}
11351135

1136+
static void
1137+
MtmLogAbortLogicalMessage(int nodeId, char const* gid)
1138+
{
1139+
MtmAbortLogicalMessage msg;
1140+
strcpy(msg.gid, gid);
1141+
msg.origin_node = nodeId;
1142+
msg.origin_lsn = replorigin_session_origin_lsn;
1143+
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
1144+
}
1145+
11361146
static void
11371147
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11381148
{
@@ -1154,7 +1164,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541164
}
11551165
if (ts != NULL) {
11561166
if (*ts->gid)
1157-
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ? "commit" : "rollback"), ts->gid, ts->status);
1167+
MTM_LOG1("TRANSLOG: %s transaction git=%s xid=%d node=%d dxid=%d status %d",
1168+
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, ts->status);
11581169
if (commit) {
11591170
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
11601171
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
@@ -1213,7 +1224,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12131224
}
12141225
MtmTransactionListAppend(ts);
12151226
if (*x->gid) {
1216-
LogLogicalMessage("A", x->gid, strlen(x->gid) + 1, false);
1227+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
1228+
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12171229
}
12181230
}
12191231
MtmSend2PCMessage(ts, MSG_ABORTED); /* send notification to coordinator */
@@ -2826,20 +2838,23 @@ void MtmReleaseRecoverySlot(int nodeId)
28262838
if (Mtm->recoverySlot == nodeId) {
28272839
Mtm->recoverySlot = 0;
28282840
}
2829-
}
2841+
}
28302842

2831-
void MtmRollbackPreparedTransaction(char const* gid)
2843+
void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28322844
{
2833-
MTM_LOG1("Abort prepared transaction %s", gid);
2834-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
2845+
XidStatus status = MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED);
2846+
MTM_LOG1("Abort prepared transaction %s status %d", gid, status);
2847+
if (status == TRANSACTION_STATUS_UNKNOWN) {
28352848
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
28362849
MtmResetTransaction();
28372850
StartTransactionCommand();
2838-
MtmBeginSession(MtmReplicationNodeId);
2851+
MtmBeginSession(nodeId);
28392852
MtmSetCurrentTransactionGID(gid);
28402853
FinishPreparedTransaction(gid, false);
28412854
CommitTransactionCommand();
2842-
MtmEndSession(MtmReplicationNodeId, true);
2855+
MtmEndSession(nodeId, true);
2856+
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2857+
MtmLogAbortLogicalMessage(nodeId, gid);
28432858
}
28442859
}
28452860

@@ -3159,19 +3174,11 @@ bool MtmFilterTransaction(char* record, int size)
31593174
default:
31603175
break;
31613176
}
3162-
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3177+
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3178+
duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31633179

3164-
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3165-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
3166-
if (Mtm->status == MTM_RECOVERY) {
3167-
if (Mtm->nodes[origin_node-1].restartLSN < origin_lsn) {
3168-
Mtm->nodes[origin_node-1].restartLSN = origin_lsn;
3169-
}
3170-
} else {
3171-
if (Mtm->nodes[replication_node-1].restartLSN < end_lsn) {
3172-
Mtm->nodes[replication_node-1].restartLSN = end_lsn;
3173-
}
3174-
}
3180+
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3181+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
31753182
return duplicate;
31763183
}
31773184

multimaster.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ typedef struct
149149
pgid_t gid; /* Global transaction identifier */
150150
} MtmArbiterMessage;
151151

152+
typedef struct MtmAbortLogicalMessage
153+
{
154+
pgid_t gid;
155+
int origin_node;
156+
XLogRecPtr origin_lsn;
157+
} MtmAbortLogicalMessage;
158+
152159
typedef struct MtmMessageQueue
153160
{
154161
MtmArbiterMessage msg;
@@ -364,7 +371,7 @@ extern PGconn *PQconnectdb_safe(const char *conninfo);
364371
extern void MtmBeginSession(int nodeId);
365372
extern void MtmEndSession(int nodeId, bool unlock);
366373
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
367-
extern void MtmRollbackPreparedTransaction(char const* gid);
374+
extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
368375
extern bool MtmFilterTransaction(char* record, int size);
369376

370377
#endif

pglogical_apply.c

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,14 @@ process_remote_message(StringInfo s)
427427
}
428428
case 'A':
429429
{
430-
MtmRollbackPreparedTransaction(messageBody);
430+
MtmAbortLogicalMessage* msg = (MtmAbortLogicalMessage*)messageBody;
431+
int origin_node = msg->origin_node;
432+
Assert(messageSize == sizeof(MtmAbortLogicalMessage));
433+
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
434+
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
435+
}
436+
replorigin_session_origin_lsn = msg->origin_lsn;
437+
MtmRollbackPreparedTransaction(origin_node, msg->gid);
431438
standalone = true;
432439
break;
433440
}
@@ -597,17 +604,16 @@ process_remote_commit(StringInfo in)
597604

598605
/* read fields */
599606
pq_getmsgint64(in); /* commit_lsn */
600-
replorigin_session_origin_lsn = end_lsn = pq_getmsgint64(in); /* end_lsn */
607+
end_lsn = pq_getmsgint64(in); /* end_lsn */
601608
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
602609

603610
origin_node = pq_getmsgbyte(in);
604611
origin_lsn = pq_getmsgint64(in);
605612

606-
if (origin_node != MtmReplicationNodeId) {
607-
replorigin_advance(Mtm->nodes[origin_node-1].originId, origin_lsn, GetXLogInsertRecPtr(),
608-
false /* backward */ , false /* WAL */ );
613+
replorigin_session_origin_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
614+
if (Mtm->nodes[origin_node-1].restartLSN < replorigin_session_origin_lsn) {
615+
Mtm->nodes[origin_node-1].restartLSN = replorigin_session_origin_lsn;
609616
}
610-
611617
Assert(replorigin_session_origin == InvalidRepOriginId);
612618

613619
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -617,9 +623,9 @@ process_remote_commit(StringInfo in)
617623
MTM_LOG2("%d: PGLOGICAL_COMMIT commit", MyProcPid);
618624
if (IsTransactionState()) {
619625
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
620-
MtmBeginSession(MtmReplicationNodeId);
626+
MtmBeginSession(origin_node);
621627
CommitTransactionCommand();
622-
MtmEndSession(MtmReplicationNodeId, true);
628+
MtmEndSession(origin_node, true);
623629
}
624630
break;
625631
}
@@ -632,12 +638,12 @@ process_remote_commit(StringInfo in)
632638
AbortCurrentTransaction();
633639
} else {
634640
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
635-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s", gid);
641+
MTM_LOG1("PGLOGICAL_PREPARE commit: gid=%s", gid);
636642
BeginTransactionBlock();
637643
CommitTransactionCommand();
638644
StartTransactionCommand();
639645

640-
MtmBeginSession(MtmReplicationNodeId);
646+
MtmBeginSession(origin_node);
641647
/* PREPARE itself */
642648
MtmSetCurrentTransactionGID(gid);
643649
PrepareTransactionBlock(gid);
@@ -650,7 +656,7 @@ process_remote_commit(StringInfo in)
650656
FinishPreparedTransaction(gid, false);
651657
CommitTransactionCommand();
652658
}
653-
MtmEndSession(MtmReplicationNodeId, true);
659+
MtmEndSession(origin_node, true);
654660
}
655661
break;
656662
}
@@ -659,22 +665,22 @@ process_remote_commit(StringInfo in)
659665
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
660666
csn = pq_getmsgint64(in);
661667
gid = pq_getmsgstring(in);
662-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
668+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx", csn, gid, end_lsn);
663669
MtmResetTransaction();
664670
StartTransactionCommand();
665-
MtmBeginSession(MtmReplicationNodeId);
671+
MtmBeginSession(origin_node);
666672
MtmSetCurrentTransactionCSN(csn);
667673
MtmSetCurrentTransactionGID(gid);
668674
FinishPreparedTransaction(gid, true);
669675
CommitTransactionCommand();
670-
MtmEndSession(MtmReplicationNodeId, true);
676+
MtmEndSession(origin_node, true);
671677
break;
672678
}
673679
case PGLOGICAL_ABORT_PREPARED:
674680
{
675681
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
676682
gid = pq_getmsgstring(in);
677-
MtmRollbackPreparedTransaction(gid);
683+
MtmRollbackPreparedTransaction(origin_node, gid);
678684
break;
679685
}
680686
default:

pglogical_receiver.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,12 +534,9 @@ pglogical_receiver_main(Datum main_arg)
534534
MtmSpillToFile(spill_file, buf.data, buf.used);
535535
ByteBufferReset(&buf);
536536
}
537-
if (stmt[0] == 'M' && stmt[1] == 'L') {
538-
MTM_LOG3("Process deadlock message from %d", nodeId);
537+
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'C' || stmt[1] == 'A')) {
538+
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
539539
MtmExecutor(stmt, rc - hdr_len);
540-
} else if (stmt[0] == 'M' && stmt[1] == 'C') {
541-
MTM_LOG1("Process concurrent DDL message from %d", nodeId);
542-
MtmExecute(stmt, rc - hdr_len);
543540
} else {
544541
ByteBufferAppend(&buf, stmt, rc - hdr_len);
545542
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)