Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5c2ee51

Browse files
committed
fix last problem with lost PRECOMMIT confirmation
1 parent 2fe7732 commit 5c2ee51

File tree

6 files changed

+23
-32
lines changed

6 files changed

+23
-32
lines changed

multimaster.c

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,9 +1179,7 @@ void MtmPrecommitTransaction(char const* gid)
11791179
ts->status = TRANSACTION_STATUS_UNKNOWN;
11801180
ts->csn = MtmAssignCSN();
11811181
MtmAdjustSubtransactions(ts);
1182-
if (Mtm->status != MTM_RECOVERY) {
1183-
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
1184-
}
1182+
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
11851183
MtmUnlock();
11861184
Assert(replorigin_session_origin != InvalidRepOriginId);
11871185
if (!IsTransactionState()) {
@@ -1625,12 +1623,10 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
16251623
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
16261624

16271625
Assert(!MtmIsCoordinator(ts)); /* All broadcasts are now done through logical decoding */
1628-
if (!BIT_CHECK(Mtm->disabledNodeMask, ts->gtid.node-1)) {
1629-
MTM_LOG2("Send %s message to node %d xid=%d gid=%s", MtmMessageKindMnem[cmd], ts->gtid.node, ts->gtid.xid, ts->gid);
1630-
msg.node = ts->gtid.node;
1631-
msg.dxid = ts->gtid.xid;
1632-
MtmSendMessage(&msg);
1633-
}
1626+
MTM_TXTRACE(ts, "MtmSend2PCMessage sending %s message to node %d", MtmMessageKindMnem[cmd], ts->gtid.node);
1627+
msg.node = ts->gtid.node;
1628+
msg.dxid = ts->gtid.xid;
1629+
MtmSendMessage(&msg);
16341630
}
16351631

16361632
/*

multimaster.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414
#define DEBUG_LEVEL 0
1515
#endif
1616

17-
#ifndef MTM_TRACE
18-
#define MTM_TRACE 0
19-
#endif
2017

2118
#define MTM_TAG "[MTM] "
2219
#define MTM_ELOG(level,fmt,...) elog(level, MTM_TAG fmt, ## __VA_ARGS__)
@@ -44,12 +41,14 @@
4441
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4542
#endif
4643

47-
// #if MTM_TRACE == 0
48-
// #define MTM_TXTRACE(tx, event)
49-
// #else
50-
#define MTM_TXTRACE(tx, event) \
51-
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
52-
// #endif
44+
// #define MTM_TRACE 1
45+
46+
#ifndef MTM_TRACE
47+
#define MTM_TXTRACE(tx, event, ...)
48+
#else
49+
#define MTM_TXTRACE(tx, event, ...) \
50+
fprintf(stderr, MTM_TAG "%s, %lld, %u " event "\n", tx->gid, (long long)MtmGetSystemTime(), MyProcPid, ## __VA_ARGS__)
51+
#endif
5352

5453
#define MULTIMASTER_NAME "multimaster"
5554
#define MULTIMASTER_SCHEMA_NAME "mtm"
@@ -390,7 +389,6 @@ extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, no
390389
extern csn_t MtmAssignCSN(void);
391390
extern csn_t MtmSyncClock(csn_t csn);
392391
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot, nodemask_t participantsMask);
393-
extern void MtmReceiverStarted(int nodeId);
394392
extern MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown);
395393
extern void MtmExecute(void* work, int size);
396394
extern void MtmExecutor(void* work, size_t size);

pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -684,15 +684,15 @@ process_remote_commit(StringInfo in)
684684
{
685685
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
686686
strncpy(gid, pq_getmsgstring(in), sizeof gid);
687-
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s", MyProcPid, gid);
687+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
688688
MtmBeginSession(origin_node);
689689
MtmPrecommitTransaction(gid);
690690
MtmEndSession(origin_node, true);
691691
return;
692692
}
693693
case PGLOGICAL_COMMIT:
694694
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMIT commit", MyProcPid);
695+
MTM_LOG2("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
696696
if (IsTransactionState()) {
697697
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698698
MtmBeginSession(origin_node);
@@ -705,12 +705,12 @@ process_remote_commit(StringInfo in)
705705
{
706706
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
707707
strncpy(gid, pq_getmsgstring(in), sizeof gid);
708+
MTM_LOG2("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
708709
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
709710
MTM_LOG1("Avoid prepare of previously aborted global transaction %s", gid);
710711
AbortCurrentTransaction();
711712
} else {
712713
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
713-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s", gid);
714714
BeginTransactionBlock(false);
715715
CommitTransactionCommand();
716716
StartTransactionCommand();
@@ -745,7 +745,7 @@ process_remote_commit(StringInfo in)
745745
*/
746746
Assert(csn);
747747
strncpy(gid, pq_getmsgstring(in), sizeof gid);
748-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%lld, gid=%s, lsn=%llx", csn, gid, end_lsn);
748+
MTM_LOG2("%d: PGLOGICAL_COMMIT_PREPARED %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
749749
MtmResetTransaction();
750750
StartTransactionCommand();
751751
MtmBeginSession(origin_node);

pglogical_receiver.c

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -519,14 +519,6 @@ pglogical_receiver_main(Datum main_arg)
519519
{
520520
int msg_len = rc - hdr_len;
521521
stmt = copybuf + hdr_len;
522-
if (mode == REPLMODE_RECOVERED) {
523-
/* Ingore all incompleted transactions from recovered node */
524-
if (stmt[0] != 'B') {
525-
output_written_lsn = Max(walEnd, output_written_lsn);
526-
continue;
527-
}
528-
mode = REPLMODE_OPEN_EXISTED;
529-
}
530522
MTM_LOG3("Receive message %c from node %d", stmt[0], nodeId);
531523
if (buf.used + msg_len + 1 >= MtmTransSpillThreshold*MB) {
532524
if (spill_file < 0) {

state.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ MtmCheckState(void)
112112
case MTM_RECOVERY:
113113
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId-1))
114114
{
115-
// BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1); // kk trick
115+
BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1); // kk trick
116116
MtmSetClusterStatus(MTM_RECOVERED);
117117
return;
118118
}
@@ -148,6 +148,7 @@ MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev)
148148
break;
149149

150150
case MTM_NEIGHBOR_WAL_RECEIVER_START:
151+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1);
151152
if (!BIT_CHECK(Mtm->pglogicalReceiverMask, node_id - 1))
152153
BIT_SET(Mtm->pglogicalReceiverMask, node_id - 1);
153154
break;

t/004_recovery.pl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,7 @@
7474
diag("Sums: $sum0, $sum1, $sum2");
7575
is($sum2, $sum0, "Check that sum_2 == sum_0");
7676
is($sum2, $sum1, "Check that sum_2 == sum_1");
77+
78+
$cluster->{nodes}->[0]->stop('fast');
79+
$cluster->{nodes}->[1]->stop('fast');
80+
$cluster->{nodes}->[2]->stop('fast');

0 commit comments

Comments
 (0)