@@ -173,8 +173,9 @@ static bool MtmIsRecoverySession;
173
173
static MtmConnectionInfo * MtmConnections ;
174
174
175
175
static MtmCurrentTrans MtmTx ;
176
-
176
+ static TimeoutId MtmRefreshClusterStatusTimer ;
177
177
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT (MtmLsnMapping );
178
+ static bool MtmRefreshClusterStatusTimerCocked ;
178
179
179
180
static TransactionManager MtmTM = {
180
181
PgTransactionIdGetStatus ,
@@ -1656,7 +1657,7 @@ void MtmHandleApplyError(void)
1656
1657
* The reason is that we want to avoid extra polling to obtain maximum CSN from all nodes to assign it to committed transaction.
1657
1658
* Called only from MtmDisableNode in critical section.
1658
1659
*/
1659
- static void MtmPollStatusOfPreparedTransactions (int disabledNodeId )
1660
+ static void MtmPollStatusOfPreparedTransactionsForDisabledNode (int disabledNodeId )
1660
1661
{
1661
1662
MtmTransState * ts ;
1662
1663
for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
@@ -1680,6 +1681,34 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
1680
1681
}
1681
1682
}
1682
1683
1684
+ /*
1685
+ * Poll status of all active prepared transaction.
1686
+ * This function is called before start of recovery to prevent blocking of recovery process by some
1687
+ * prepared transaction which is not recovered
1688
+ */
1689
+ static void MtmPollStatusOfPreparedTransactions ()
1690
+ {
1691
+ MtmTransState * ts ;
1692
+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
1693
+ if (TransactionIdIsValid (ts -> gtid .xid )
1694
+ && ts -> votingCompleted /* If voting is not yet completed, then there is some backend coordinating this transaction */
1695
+ && (ts -> status == TRANSACTION_STATUS_UNKNOWN || ts -> status == TRANSACTION_STATUS_IN_PROGRESS ))
1696
+ {
1697
+ Assert (ts -> gid [0 ]);
1698
+ MTM_LOG1 ("Poll state of transaction %d (%s) from node %d" , ts -> xid , ts -> gid , ts -> gtid .node );
1699
+ MtmBroadcastPollMessage (ts );
1700
+ } else {
1701
+ MTM_LOG2 ("Skip prepared transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx" ,
1702
+ ts -> xid , ts -> gid , MtmTxnStatusMnem [ts -> status ], ts -> gtid .node , ts -> gtid .xid , ts -> votedMask );
1703
+ }
1704
+ }
1705
+ }
1706
+
1707
+ /*
1708
+ * Node is disabled if it is not part of clique built using connectivity masks of all nodes.
1709
+ * There is no warranty that all noeds will make the same decision about clique, btu as far as we want to avoid
1710
+ * some global coordinator (which will be SPOF), we have to rely on Bron–Kerbosch algorithm locating maximum clique in graph
1711
+ */
1683
1712
static void MtmDisableNode (int nodeId )
1684
1713
{
1685
1714
timestamp_t now = MtmGetSystemTime ();
@@ -1694,10 +1723,14 @@ static void MtmDisableNode(int nodeId)
1694
1723
}
1695
1724
if (Mtm -> nLiveNodes >= Mtm -> nAllNodes /2 + 1 ) {
1696
1725
/* Make decision about prepared transaction status only in quorum */
1697
- MtmPollStatusOfPreparedTransactions (nodeId );
1726
+ MtmPollStatusOfPreparedTransactionsForDisabledNode (nodeId );
1698
1727
}
1699
1728
}
1700
-
1729
+
1730
+ /*
1731
+ * Node is anabled when it's recovery is completed.
1732
+ * This why node is mostly marked as recovered when logical sender/receiver to this node is (re)started.
1733
+ */
1701
1734
static void MtmEnableNode (int nodeId )
1702
1735
{
1703
1736
BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
@@ -1763,6 +1796,9 @@ MtmCheckSlots()
1763
1796
}
1764
1797
}
1765
1798
1799
+ /*
1800
+ * Get lag between replication slot position (dsata proceeded by WAL sender) and current position in WAL
1801
+ */
1766
1802
static int64 MtmGetSlotLag (int nodeId )
1767
1803
{
1768
1804
int i ;
@@ -1849,6 +1885,9 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1849
1885
return caughtUp ;
1850
1886
}
1851
1887
1888
+ /*
1889
+ * This function is called inside critical section
1890
+ */
1852
1891
void MtmSwitchClusterMode (MtmNodeStatus mode )
1853
1892
{
1854
1893
Mtm -> status = mode ;
@@ -1918,7 +1957,7 @@ MtmCheckClusterLock()
1918
1957
* Build internode connectivity mask. 1 - means that node is disconnected.
1919
1958
*/
1920
1959
static bool
1921
- MtmBuildConnectivityMatrix (nodemask_t * matrix , bool nowait )
1960
+ MtmBuildConnectivityMatrix (nodemask_t * matrix )
1922
1961
{
1923
1962
int i , j , n = Mtm -> nAllNodes ;
1924
1963
bool changed = false;
@@ -1969,24 +2008,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1969
2008
1970
2009
/**
1971
2010
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
1972
- * This function returns false if current node is excluded from cluster, true otherwise
1973
2011
*/
1974
- bool MtmRefreshClusterStatus (bool nowait )
2012
+ void MtmRefreshClusterStatus ()
1975
2013
{
1976
2014
nodemask_t mask , clique , disabled ;
1977
2015
nodemask_t matrix [MAX_NODES ];
1978
2016
int clique_size ;
1979
2017
int i ;
1980
2018
1981
- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1982
- return false;
2019
+ MtmRefreshClusterStatusTimerCocked = false;
2020
+
2021
+ if (!MtmBuildConnectivityMatrix (matrix )) {
2022
+ return ;
1983
2023
}
1984
2024
1985
2025
clique = MtmFindMaxClique (matrix , Mtm -> nAllNodes , & clique_size );
1986
2026
1987
2027
if ( clique == (~Mtm -> disabledNodeMask & (((nodemask_t )1 << Mtm -> nAllNodes )- 1 )) )
1988
2028
/* Nothing is changed */
1989
- return false ;
2029
+ return ;
1990
2030
1991
2031
if (clique_size >= Mtm -> nAllNodes /2 + 1 ) { /* have quorum */
1992
2032
fprintf (stderr , "Old mask: " );
@@ -2043,9 +2083,11 @@ bool MtmRefreshClusterStatus(bool nowait)
2043
2083
MTM_LOG1 ("Clique %lx has no quorum" , (long ) clique );
2044
2084
MtmSwitchClusterMode (MTM_IN_MINORITY );
2045
2085
}
2046
- return true;
2047
2086
}
2048
2087
2088
+ /*
2089
+ * Check if there is quorum: current node see more than half of all nodes
2090
+ */
2049
2091
void MtmCheckQuorum (void )
2050
2092
{
2051
2093
Mtm -> nConfigChanges += 1 ;
@@ -2062,6 +2104,13 @@ void MtmCheckQuorum(void)
2062
2104
}
2063
2105
}
2064
2106
2107
+ /*
2108
+ * This function is called in case of non-recoverable connection failure with this node.
2109
+ * Non-recoverable means that connections can not be reestablish using specified number of attempts.
2110
+ * It sets bit in connectivity mask and register delayed refresh of cluster status which build connectivity matrix
2111
+ * and determine clique of connected nodes. Timeout here is needed to allow all nodes to exchanges their connectivity masks (them
2112
+ * are sent together with any arbiter message, including heartbeats.
2113
+ */
2065
2114
void MtmOnNodeDisconnect (int nodeId )
2066
2115
{
2067
2116
if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ))
@@ -2078,12 +2127,16 @@ void MtmOnNodeDisconnect(int nodeId)
2078
2127
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
2079
2128
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
2080
2129
elog (LOG , "Disconnect node %d connectivity mask %llx" , nodeId , (long long ) Mtm -> connectivityMask );
2130
+ if (!MtmRefreshClusterStatusTimerCocked ) {
2131
+ MtmRefreshClusterStatusTimerCocked = true;
2132
+ enable_timeout_after (MtmRefreshClusterStatusTimer , MtmHeartbeatSendTimeout );
2133
+ }
2081
2134
MtmUnlock ();
2082
-
2083
- MtmSleep (MSEC_TO_USEC (MtmHeartbeatSendTimeout ));
2084
- MtmRefreshClusterStatus (false);
2085
2135
}
2086
2136
2137
+ /*
2138
+ * This method is called when connection with node is reestablished
2139
+ */
2087
2140
void MtmOnNodeConnect (int nodeId )
2088
2141
{
2089
2142
MtmLock (LW_EXCLUSIVE );
@@ -2093,6 +2146,9 @@ void MtmOnNodeConnect(int nodeId)
2093
2146
MtmUnlock ();
2094
2147
}
2095
2148
2149
+ /*
2150
+ * Set reconnect mask to force reconnection attempt to the node
2151
+ */
2096
2152
void MtmReconnectNode (int nodeId )
2097
2153
{
2098
2154
MtmLock (LW_EXCLUSIVE );
@@ -2108,7 +2164,11 @@ void MtmReconnectNode(int nodeId)
2108
2164
* -------------------------------------------
2109
2165
*/
2110
2166
2111
-
2167
+ /*
2168
+ * Initialize Xid2State hash table to obtain status of transaction by its local XID.
2169
+ * Size of this hash table should be limited by MtmAdjustOldestXid function which performs cleanup
2170
+ * of transaction list and from the list and from the hash table transactions which XIDs are not used in any snapshot at any node
2171
+ */
2112
2172
static HTAB *
2113
2173
MtmCreateXidMap (void )
2114
2174
{
@@ -2127,6 +2187,11 @@ MtmCreateXidMap(void)
2127
2187
return htab ;
2128
2188
}
2129
2189
2190
+ /*
2191
+ * Initialize Gid2State hash table to obtain status of transaction by GID.
2192
+ * Size of this hash table should be limited by MtmAdjustOldestXid function which performs cleanup
2193
+ * of transaction list and from the list and from the hash table transactions which XIDs are not used in any snapshot at any node
2194
+ */
2130
2195
static HTAB *
2131
2196
MtmCreateGidMap (void )
2132
2197
{
@@ -2144,6 +2209,9 @@ MtmCreateGidMap(void)
2144
2209
return htab ;
2145
2210
}
2146
2211
2212
+ /*
2213
+ * Initialize hash table used to mark local (not distributed) tables
2214
+ */
2147
2215
static HTAB *
2148
2216
MtmCreateLocalTableMap (void )
2149
2217
{
@@ -2208,6 +2276,13 @@ static void MtmLoadLocalTables(void)
2208
2276
}
2209
2277
}
2210
2278
2279
+ /*
2280
+ * Multimaster control file is used to prevent erroneous inclusion of node in the cluster.
2281
+ * It contains cluster name (any user defined identifier) and node id.
2282
+ * In case of creating new cluster node using pg_basebackup this file is copied together will
2283
+ * all other PostgreSQL files and so new node will know ID of the cluster node from which it
2284
+ * is cloned. It is necessary to complete synchronization of new node with the rest of the cluster.
2285
+ */
2211
2286
static void MtmCheckControlFile (void )
2212
2287
{
2213
2288
char controlFilePath [MAXPGPATH ];
@@ -2242,7 +2317,10 @@ static void MtmCheckControlFile(void)
2242
2317
}
2243
2318
}
2244
2319
2245
-
2320
+ /*
2321
+ * Perform initialization of multimaster state.
2322
+ * This function is called from shared memory startup hook (after completion of initialization of shared memory)
2323
+ */
2246
2324
static void MtmInitialize ()
2247
2325
{
2248
2326
bool found ;
@@ -2311,6 +2389,7 @@ static void MtmInitialize()
2311
2389
RegisterXactCallback (MtmXactCallback , NULL );
2312
2390
MtmTx .snapshot = INVALID_CSN ;
2313
2391
MtmTx .xid = InvalidTransactionId ;
2392
+ MtmRefreshClusterStatusTimer = RegisterTimeout (USER_TIMEOUT , MtmRefreshClusterStatus );
2314
2393
}
2315
2394
MtmXid2State = MtmCreateXidMap ();
2316
2395
MtmGid2State = MtmCreateGidMap ();
@@ -2331,6 +2410,10 @@ MtmShmemStartup(void)
2331
2410
MtmInitialize ();
2332
2411
}
2333
2412
2413
+ /*
2414
+ * Parse node connection string.
2415
+ * This function is called at cluster startup and while adding new cluster node
2416
+ */
2334
2417
void MtmUpdateNodeConnectionInfo (MtmConnectionInfo * conn , char const * connStr )
2335
2418
{
2336
2419
char const * host ;
@@ -2371,6 +2454,10 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
2371
2454
elog (WARNING , "Using arbiter port: %d" , conn -> arbiterPort );
2372
2455
}
2373
2456
2457
+ /*
2458
+ * Parse "multimaster.conn_strings" configuration parameter and
2459
+ * set connection string for each node using MtmUpdateNodeConnectionInfo
2460
+ */
2374
2461
static void MtmSplitConnStrs (void )
2375
2462
{
2376
2463
int i ;
@@ -2494,6 +2581,9 @@ static void MtmSplitConnStrs(void)
2494
2581
MemoryContextSwitchTo (old_context );
2495
2582
}
2496
2583
2584
+ /*
2585
+ * Check correctness of multimaster configuration
2586
+ */
2497
2587
static bool ConfigIsSane (void )
2498
2588
{
2499
2589
bool ok = true;
@@ -2991,13 +3081,22 @@ void MtmReceiverStarted(int nodeId)
2991
3081
MtmUnlock ();
2992
3082
}
2993
3083
3084
+ /*
3085
+ * Recovery slot is node ID from which new or crash node is performing recovery.
3086
+ * This function is called in case of logical receiver error to make it possible to try to perform
3087
+ * recovery from some other node
3088
+ */
2994
3089
void MtmReleaseRecoverySlot (int nodeId )
2995
3090
{
2996
3091
if (Mtm -> recoverySlot == nodeId ) {
2997
3092
Mtm -> recoverySlot = 0 ;
2998
3093
}
2999
3094
}
3000
3095
3096
+ /*
3097
+ * Rollback transaction originated from the specified node.
3098
+ * This function is called either for commit logical message with AbortPrepared flag either for abort prepared logical message.
3099
+ */
3001
3100
void MtmRollbackPreparedTransaction (int nodeId , char const * gid )
3002
3101
{
3003
3102
XidStatus status = MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_ABORTED );
@@ -3021,8 +3120,10 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
3021
3120
}
3022
3121
3023
3122
/*
3024
- * This function is call with MTM mutex locked.
3025
- * It shoudl unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
3123
+ * Wrapper arround FinishPreparedTransaction function.
3124
+ * This function shoudl proper context for invocation of this function.
3125
+ * This function is called with MTM mutex locked.
3126
+ * It should unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
3026
3127
* ts object is pinned to prevent deallocation while lock is released.
3027
3128
*/
3028
3129
void MtmFinishPreparedTransaction (MtmTransState * ts , bool commit )
@@ -3095,6 +3196,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
3095
3196
Mtm -> recoveryCount += 1 ;
3096
3197
Mtm -> pglogicalReceiverMask = 0 ;
3097
3198
Mtm -> pglogicalSenderMask = 0 ;
3199
+ MtmPollStatusOfPreparedTransactions ();
3098
3200
MtmUnlock ();
3099
3201
return REPLMODE_RECOVERY ;
3100
3202
}
0 commit comments