Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4ef69a3

Browse files
knizhnikkelvich
authored andcommitted
Do not swich ourselves to offline
1 parent 337d429 commit 4ef69a3

File tree

3 files changed

+33
-21
lines changed

3 files changed

+33
-21
lines changed

multimaster.c

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,13 +1316,15 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
13161316

13171317
static void MtmDisableNode(int nodeId)
13181318
{
1319+
timestamp_t now = MtmGetSystemTime();
1320+
elog(WARNING, "Disable node %d at xlog position %lx, last status change time %d msec ago", nodeId, GetXLogInsertRecPtr(),
1321+
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
13191322
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1320-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1323+
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
13211324
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13221325
if (nodeId != MtmNodeId) {
13231326
Mtm->nLiveNodes -= 1;
13241327
}
1325-
elog(WARNING, "Disable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
13261328
MtmPollStatusOfPreparedTransactions(nodeId);
13271329
}
13281330

@@ -1345,8 +1347,8 @@ void MtmRecoveryCompleted(void)
13451347
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
13461348
MtmLock(LW_EXCLUSIVE);
13471349
Mtm->recoverySlot = 0;
1348-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13491350
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1351+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13501352
for (i = 0; i < Mtm->nAllNodes; i++) {
13511353
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13521354
}
@@ -1600,10 +1602,11 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16001602
if (disabled) {
16011603
timestamp_t now = MtmGetSystemTime();
16021604
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
1603-
if (mask & 1) {
1604-
if (Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now) {
1605-
MtmDisableNode(i+1);
1606-
}
1605+
if (i+1 != MtmNodeId
1606+
&& (mask & 1) != 0
1607+
&& Mtm->nodes[i].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < now)
1608+
{
1609+
MtmDisableNode(i+1);
16071610
}
16081611
}
16091612
}
@@ -1615,15 +1618,16 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16151618

16161619
if (disabled|enabled) {
16171620
MtmCheckQuorum();
1618-
}
1619-
/* Interrupt voting for active transaction and abort them */
1620-
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1621-
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1622-
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
1623-
if (MtmIsCoordinator(ts)) {
1624-
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1625-
MtmAbortTransaction(ts);
1626-
MtmWakeUpBackend(ts);
1621+
1622+
/* Interrupt voting for active transaction and abort them */
1623+
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1624+
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1625+
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
1626+
if (MtmIsCoordinator(ts)) {
1627+
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1628+
MtmAbortTransaction(ts);
1629+
MtmWakeUpBackend(ts);
1630+
}
16271631
}
16281632
}
16291633
}
@@ -2242,7 +2246,7 @@ _PG_init(void)
22422246
"Minimal amount of time (msec) between node status change",
22432247
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
22442248
&MtmNodeDisableDelay,
2245-
1000,
2249+
2000,
22462250
1,
22472251
INT_MAX,
22482252
PGC_BACKEND,

pglogical_apply.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ process_remote_begin(StringInfo s)
341341
Assert(gtid.node > 0);
342342

343343
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
344-
#if 0
344+
#if 1
345345
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
346346
elog(WARNING, "Ignore transaction %d from disabled node %d", gtid.xid, gtid.node);
347347
MtmResetTransaction();
@@ -633,9 +633,9 @@ process_remote_commit(StringInfo in)
633633
{
634634
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
635635
gid = pq_getmsgstring(in);
636-
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
636+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
637637
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
638-
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
638+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
639639
StartTransactionCommand();
640640
MtmBeginSession();
641641
MtmSetCurrentTransactionGID(gid);

pglogical_proto.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,22 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161161
return;
162162
}
163163
} else {
164-
//csn_t csn = MtmTransactionSnapshot(txn->xid);
164+
csn_t csn = MtmTransactionSnapshot(txn->xid);
165165
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
166166

167+
167168
if (!isRecovery && txn->origin_id != InvalidRepOriginId)
168169
{
170+
if (flags == PGLOGICAL_ABORT_PREPARED) {
171+
MTM_LOG1("Skip ABORT_PREPARED for transaction %s to node %d", txn->gid, MtmReplicationNodeId);
172+
}
169173
Assert(MtmTransactionRecords == 0);
170174
return;
171175
}
176+
if (flags == PGLOGICAL_ABORT_PREPARED) {
177+
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
178+
txn->xid, txn->gid, txn->end_lsn, MtmReplicationNodeId, isRecovery, txn->origin_id, csn);
179+
}
172180
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
173181
MTM_LOG1("wal-sender complete recovery of node %d at LSN(commit %lx, end %lx, log %lx) in transaction %s event %d", MtmReplicationNodeId, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr(), txn->gid, flags);
174182
flags |= PGLOGICAL_CAUGHT_UP;

0 commit comments

Comments
 (0)