Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f42f375

Browse files
committed
Do not swich ourselves to offline
1 parent 383d5ab commit f42f375

File tree

3 files changed

+33
-21
lines changed

3 files changed

+33
-21
lines changed

contrib/mmts/multimaster.c

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,13 +1317,15 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
13171317

13181318
static void MtmDisableNode(int nodeId)
13191319
{
1320+
timestamp_t now = MtmGetSystemTime();
1321+
elog(WARNING, "Disable node %d at xlog position %lx, last status change time %d msec ago", nodeId, GetXLogInsertRecPtr(),
1322+
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
13201323
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1321-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1324+
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
13221325
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13231326
if (nodeId != MtmNodeId) {
13241327
Mtm->nLiveNodes -= 1;
13251328
}
1326-
elog(WARNING, "Disable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
13271329
MtmPollStatusOfPreparedTransactions(nodeId);
13281330
}
13291331

@@ -1346,8 +1348,8 @@ void MtmRecoveryCompleted(void)
13461348
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
13471349
MtmLock(LW_EXCLUSIVE);
13481350
Mtm->recoverySlot = 0;
1349-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13501351
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1352+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13511353
for (i = 0; i < Mtm->nAllNodes; i++) {
13521354
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13531355
}
@@ -1601,10 +1603,11 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16011603
if (disabled) {
16021604
timestamp_t now = MtmGetSystemTime();
16031605
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
1604-
if (mask & 1) {
1605-
if (Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now) {
1606-
MtmDisableNode(i+1);
1607-
}
1606+
if (i+1 != MtmNodeId
1607+
&& (mask & 1) != 0
1608+
&& Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now)
1609+
{
1610+
MtmDisableNode(i+1);
16081611
}
16091612
}
16101613
}
@@ -1616,15 +1619,16 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16161619

16171620
if (disabled|enabled) {
16181621
MtmCheckQuorum();
1619-
}
1620-
/* Interrupt voting for active transaction and abort them */
1621-
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1622-
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1623-
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
1624-
if (MtmIsCoordinator(ts)) {
1625-
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1626-
MtmAbortTransaction(ts);
1627-
MtmWakeUpBackend(ts);
1622+
1623+
/* Interrupt voting for active transaction and abort them */
1624+
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1625+
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1626+
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
1627+
if (MtmIsCoordinator(ts)) {
1628+
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1629+
MtmAbortTransaction(ts);
1630+
MtmWakeUpBackend(ts);
1631+
}
16281632
}
16291633
}
16301634
}
@@ -2243,7 +2247,7 @@ _PG_init(void)
22432247
"Minimal amount of time (msec) between node status change",
22442248
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
22452249
&MtmNodeDisableDelay,
2246-
1000,
2250+
2000,
22472251
1,
22482252
INT_MAX,
22492253
PGC_BACKEND,

contrib/mmts/pglogical_apply.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ process_remote_begin(StringInfo s)
341341
Assert(gtid.node > 0);
342342

343343
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
344-
#if 0
344+
#if 1
345345
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
346346
elog(WARNING, "Ignore transaction %d from disabled node %d", gtid.xid, gtid.node);
347347
MtmResetTransaction();
@@ -633,9 +633,9 @@ process_remote_commit(StringInfo in)
633633
{
634634
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
635635
gid = pq_getmsgstring(in);
636-
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
636+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
637637
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
638-
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
638+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
639639
StartTransactionCommand();
640640
MtmBeginSession();
641641
MtmSetCurrentTransactionGID(gid);

contrib/mmts/pglogical_proto.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,22 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161161
return;
162162
}
163163
} else {
164-
//csn_t csn = MtmTransactionSnapshot(txn->xid);
164+
csn_t csn = MtmTransactionSnapshot(txn->xid);
165165
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
166166

167+
167168
if (!isRecovery && txn->origin_id != InvalidRepOriginId)
168169
{
170+
if (flags == PGLOGICAL_ABORT_PREPARED) {
171+
MTM_LOG1("Skip ABORT_PREPARED for transaction %s to node %d", txn->gid, MtmReplicationNodeId);
172+
}
169173
Assert(MtmTransactionRecords == 0);
170174
return;
171175
}
176+
if (flags == PGLOGICAL_ABORT_PREPARED) {
177+
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
178+
txn->xid, txn->gid, txn->end_lsn, MtmReplicationNodeId, isRecovery, txn->origin_id, csn);
179+
}
172180
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
173181
MTM_LOG1("wal-sender complete recovery of node %d at LSN(commit %lx, end %lx, log %lx) in transaction %s event %d", MtmReplicationNodeId, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr(), txn->gid, flags);
174182
flags |= PGLOGICAL_CAUGHT_UP;

0 commit comments

Comments
 (0)