@@ -695,6 +695,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
695
695
ts -> votingCompleted = false;
696
696
ts -> cmd = MSG_INVALID ;
697
697
ts -> nSubxids = xactGetCommittedChildren (& subxids );
698
+ Mtm -> nActiveTransactions += 1 ;
698
699
699
700
x -> isPrepared = true;
700
701
x -> csn = ts -> csn ;
@@ -794,6 +795,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
794
795
ts -> status = TRANSACTION_STATUS_ABORTED ;
795
796
}
796
797
MtmAdjustSubtransactions (ts );
798
+ Assert (Mtm -> nActiveTransactions != 0 );
799
+ Mtm -> nActiveTransactions -= 1 ;
797
800
}
798
801
if (!commit && x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
799
802
/*
@@ -835,6 +838,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
835
838
}
836
839
}
837
840
841
+ void MtmRecoveryCompleted (void )
842
+ {
843
+ elog (WARNING , "Recevoery of node %d is completed" , MtmNodeId );
844
+ Mtm -> recoverySlot = 0 ;
845
+ MtmSwitchClusterMode (MTM_ONLINE );
846
+ }
847
+
838
848
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
839
849
{
840
850
MtmLock (LW_EXCLUSIVE );
@@ -846,8 +856,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
846
856
Assert (Mtm -> status == MTM_RECOVERY );
847
857
} else if (Mtm -> status == MTM_RECOVERY ) {
848
858
/* When recovery is completed we get normal transaction ID and switch to normal mode */
849
- Mtm -> recoverySlot = 0 ;
850
- MtmSwitchClusterMode (MTM_ONLINE );
859
+ MtmRecoveryCompleted ();
851
860
}
852
861
MtmTx .gtid = * gtid ;
853
862
MtmTx .xid = GetCurrentTransactionId ();
@@ -972,35 +981,52 @@ static int64 MtmGetSlotLag(int nodeId)
972
981
*/
973
982
bool MtmIsRecoveredNode (int nodeId )
974
983
{
975
- return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) ;
984
+ return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 );
976
985
}
977
986
978
987
979
- void MtmRecoveryPorgress ( XLogRecPtr lsn )
988
+ bool MtmRecoveryCaughtUp ( int nodeId , XLogRecPtr slotLSN )
980
989
{
981
-
982
- Assert (MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
983
- if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
984
- && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
990
+ bool caughtUp = false;
991
+ if (MtmIsRecoveredNode (nodeId )) {
992
+ XLogRecPtr walLSN = GetXLogInsertRecPtr ();
993
+ MtmLock (LW_EXCLUSIVE );
994
+ if (slotLSN == walLSN ) {
995
+ if (BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )) {
996
+ elog (WARNING ,"Node %d is caught-up" , nodeId );
997
+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
998
+ BIT_CLEAR (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
999
+ BIT_CLEAR (Mtm -> nodeLockerMask , nodeId - 1 );
1000
+ Mtm -> nLockers -= 1 ;
1001
+ } else {
1002
+ elog (WARNING ,"Node %d is caugth-up without locking cluster" , nodeId );
1003
+ /* We are lucky: caugth-up without locking cluster! */
1004
+ Mtm -> nNodes += 1 ;
1005
+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1006
+ }
1007
+ caughtUp = true;
1008
+ } else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
1009
+ && slotLSN + MtmMinRecoveryLag > walLSN )
985
1010
{
986
1011
/*
987
1012
* Wal sender almost catched up.
988
1013
* Lock cluster preventing new transaction to start until wal is completely replayed.
989
1014
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
990
1015
* Is there some better way to establish mapping between nodes ad WAL-seconder?
991
1016
*/
992
- elog (WARNING ,"Node %d is catching up " , nodeId );
993
- MtmLock ( LW_EXCLUSIVE );
1017
+ elog (WARNING ,"Node %d is almost caught-up: lock cluster " , nodeId );
1018
+ Assert ( MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
994
1019
BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
995
1020
BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
996
1021
Mtm -> nLockers += 1 ;
997
- MtmUnlock ();
998
1022
} else {
999
- MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" , nodeId , MyWalSnd -> sentPtr , GetXLogInsertRecPtr () , Mtm -> nLockers );
1023
+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d \n" , nodeId , slotLSN , walLSN , MyWalSnd -> sentPtr , Mtm -> nLockers , Mtm -> nActiveTransactions );
1000
1024
}
1001
- return true;
1025
+ MtmUnlock ();
1026
+ } else {
1027
+ MTM_INFO ("Node %d is not in recovery mode\n" , nodeId );
1002
1028
}
1003
- return false ;
1029
+ return caughtUp ;
1004
1030
}
1005
1031
1006
1032
void MtmSwitchClusterMode (MtmNodeStatus mode )
@@ -1019,22 +1045,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
1019
1045
static void
1020
1046
MtmCheckClusterLock ()
1021
1047
{
1048
+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
1022
1049
while (true)
1023
1050
{
1024
1051
nodemask_t mask = Mtm -> walSenderLockerMask ;
1025
1052
if (mask != 0 ) {
1026
- XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1027
- int i ;
1028
- timestamp_t delay = MIN_WAIT_TIMEOUT ;
1029
- for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1030
- if (mask & 1 ) {
1031
- if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1032
- /* recovery is in progress */
1033
- break ;
1034
- } else {
1035
- /* recovered replica catched up with master */
1036
- elog (WARNING , "WAL-sender %d complete recovery" , i );
1037
- BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1053
+ if (Mtm -> nActiveTransactions == 0 ) {
1054
+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1055
+ int i ;
1056
+ for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1057
+ if (mask & 1 ) {
1058
+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1059
+ /* recovery is in progress */
1060
+ break ;
1061
+ } else {
1062
+ /* recovered replica catched up with master */
1063
+ elog (WARNING , "WAL-sender %d complete recovery" , i );
1064
+ BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1065
+ }
1038
1066
}
1039
1067
}
1040
1068
}
@@ -1294,6 +1322,7 @@ static void MtmInitialize()
1294
1322
Mtm -> walSenderLockerMask = 0 ;
1295
1323
Mtm -> nodeLockerMask = 0 ;
1296
1324
Mtm -> nLockers = 0 ;
1325
+ Mtm -> nActiveTransactions = 0 ;
1297
1326
Mtm -> votingTransactions = NULL ;
1298
1327
Mtm -> transListHead = NULL ;
1299
1328
Mtm -> transListTail = & Mtm -> transListHead ;
@@ -1734,12 +1763,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
1734
1763
static void
1735
1764
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
1736
1765
{
1766
+ ListCell * param ;
1767
+ bool isRecoverySession = false;
1768
+ foreach (param , args -> in_params )
1769
+ {
1770
+ DefElem * elem = lfirst (param );
1771
+ if (strcmp ("mtm_replication_mode" , elem -> defname ) == 0 ) {
1772
+ isRecoverySession = elem -> arg != NULL && strVal (elem -> arg ) != NULL && strcmp (strVal (elem -> arg ), "recovery" ) == 0 ;
1773
+ break ;
1774
+ }
1775
+ }
1737
1776
MtmLock (LW_EXCLUSIVE );
1738
- if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1739
- elog (WARNING , "Recovery of node %d is completed: start normal replication" , MtmReplicationNodeId );
1777
+ if (isRecoverySession ) {
1778
+ elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
1779
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1780
+ BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1781
+ Mtm -> nNodes -= 1 ;
1782
+ MtmCheckQuorum ();
1783
+ }
1784
+ } else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1785
+ elog (WARNING , "Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
1740
1786
BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1741
1787
Mtm -> nNodes += 1 ;
1742
1788
MtmCheckQuorum ();
1789
+ } else {
1790
+ elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
1743
1791
}
1744
1792
MtmUnlock ();
1745
1793
}
@@ -1757,7 +1805,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1757
1805
bool res = Mtm -> status != MTM_RECOVERY
1758
1806
&& (args -> origin_id == InvalidRepOriginId
1759
1807
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1760
- MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1808
+ MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1761
1809
return res ;
1762
1810
}
1763
1811
0 commit comments