@@ -178,6 +178,7 @@ int MtmConnectAttempts;
178
178
int MtmConnectTimeout ;
179
179
int MtmKeepaliveTimeout ;
180
180
int MtmReconnectAttempts ;
181
+ int MtmNodeDisableDelay ;
181
182
bool MtmUseRaftable ;
182
183
MtmConnectionInfo * MtmConnections ;
183
184
@@ -992,6 +993,7 @@ void MtmRecoveryCompleted(void)
992
993
MtmLock (LW_EXCLUSIVE );
993
994
Mtm -> recoverySlot = 0 ;
994
995
BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
996
+ Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = time (NULL );
995
997
/* Mode will be changed to online once all locagical reciever are connected */
996
998
MtmSwitchClusterMode (MTM_CONNECTED );
997
999
MtmUnlock ();
@@ -1080,6 +1082,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1080
1082
/* We are lucky: caugth-up without locking cluster! */
1081
1083
}
1082
1084
BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1085
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1083
1086
Mtm -> nNodes += 1 ;
1084
1087
caughtUp = true;
1085
1088
} else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
@@ -1222,13 +1225,15 @@ bool MtmRefreshClusterStatus(bool nowait)
1222
1225
if (mask & 1 ) {
1223
1226
Mtm -> nNodes -= 1 ;
1224
1227
BIT_SET (Mtm -> disabledNodeMask , i );
1228
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1225
1229
}
1226
1230
}
1227
1231
mask = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1228
1232
for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1229
1233
if (mask & 1 ) {
1230
1234
Mtm -> nNodes += 1 ;
1231
1235
BIT_CLEAR (Mtm -> disabledNodeMask , i );
1236
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1232
1237
}
1233
1238
}
1234
1239
MtmCheckQuorum ();
@@ -1268,6 +1273,11 @@ void MtmOnNodeDisconnect(int nodeId)
1268
1273
{
1269
1274
MtmTransState * ts ;
1270
1275
1276
+ if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MtmNodeDisableDelay > time (NULL )) {
1277
+ /* Avoid false detection of node failure and prevent node status blinking */
1278
+ return ;
1279
+ }
1280
+
1271
1281
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1272
1282
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
1273
1283
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1278,6 +1288,7 @@ void MtmOnNodeDisconnect(int nodeId)
1278
1288
if (!MtmRefreshClusterStatus (false)) {
1279
1289
MtmLock (LW_EXCLUSIVE );
1280
1290
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1291
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1281
1292
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1282
1293
Mtm -> nNodes -= 1 ;
1283
1294
MtmCheckQuorum ();
@@ -1445,6 +1456,7 @@ static void MtmInitialize()
1445
1456
for (i = 0 ; i < MtmNodes ; i ++ ) {
1446
1457
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1447
1458
Mtm -> nodes [i ].transDelay = 0 ;
1459
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1448
1460
Mtm -> nodes [i ].con = MtmConnections [i ];
1449
1461
}
1450
1462
PGSemaphoreCreate (& Mtm -> votingSemaphore );
@@ -1565,10 +1577,25 @@ _PG_init(void)
1565
1577
if (!process_shared_preload_libraries_in_progress )
1566
1578
return ;
1567
1579
1580
+ DefineCustomIntVariable (
1581
+ "multimaster.node_disable_delay" ,
1582
+ "Minamal amount of time (sec) between node status change" ,
1583
+ "This delay is used to avoid false detection of node failure and to prevent blinking of node status node" ,
1584
+ & MtmNodeDisableDelay ,
1585
+ 1 ,
1586
+ 1 ,
1587
+ INT_MAX ,
1588
+ PGC_BACKEND ,
1589
+ 0 ,
1590
+ NULL ,
1591
+ NULL ,
1592
+ NULL
1593
+ );
1594
+
1568
1595
DefineCustomIntVariable (
1569
1596
"multimaster.min_recovery_lag" ,
1570
1597
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed" ,
1571
- "When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile compeition ' and "
1598
+ "When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile competition ' and "
1572
1599
"temporary stop commit of new transactions until node will be completely repared" ,
1573
1600
& MtmMinRecoveryLag ,
1574
1601
100000 ,
@@ -1890,6 +1917,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
1890
1917
{
1891
1918
elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nNodes );
1892
1919
}
1920
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1893
1921
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1894
1922
Mtm -> nNodes -= 1 ;
1895
1923
MtmCheckQuorum ();
@@ -1940,13 +1968,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1940
1968
if (MtmIsRecoverySession ) {
1941
1969
MTM_LOG1 ("%d: Node %d start recovery of node %d" , MyProcPid , MtmNodeId , MtmReplicationNodeId );
1942
1970
if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1971
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time (NULL );
1943
1972
BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1944
1973
Mtm -> nNodes -= 1 ;
1945
1974
MtmCheckQuorum ();
1946
1975
}
1947
1976
} else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1948
1977
if (recoveryCompleted ) {
1949
1978
MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
1979
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time (NULL );
1950
1980
BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1951
1981
Mtm -> nNodes += 1 ;
1952
1982
MtmCheckQuorum ();
@@ -2057,8 +2087,8 @@ typedef struct
2057
2087
int nodeId ;
2058
2088
char * connStrPtr ;
2059
2089
TupleDesc desc ;
2060
- Datum values [7 ];
2061
- bool nulls [7 ];
2090
+ Datum values [8 ];
2091
+ bool nulls [8 ];
2062
2092
} MtmGetNodeStateCtx ;
2063
2093
2064
2094
Datum
@@ -2095,11 +2125,12 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2095
2125
usrfctx -> values [4 ] = Int64GetDatum (lag );
2096
2126
usrfctx -> nulls [4 ] = lag < 0 ;
2097
2127
usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
2128
+ usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2098
2129
p = strchr (usrfctx -> connStrPtr , ',' );
2099
2130
if (p != NULL ) {
2100
2131
* p ++ = '\0' ;
2101
2132
}
2102
- usrfctx -> values [6 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2133
+ usrfctx -> values [7 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2103
2134
usrfctx -> connStrPtr = p ;
2104
2135
usrfctx -> nodeId += 1 ;
2105
2136
0 commit comments