@@ -243,7 +243,6 @@ static int MtmMaxRecoveryLag;
243
243
static int MtmGcPeriod ;
244
244
static bool MtmIgnoreTablesWithoutPk ;
245
245
static int MtmLockCount ;
246
- static int MtmSenderStarted ;
247
246
248
247
static ExecutorStart_hook_type PreviousExecutorStartHook ;
249
248
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -261,7 +260,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
261
260
/*
262
261
* -------------------------------------------
263
262
* Synchronize access to MTM structures.
264
- * Using LWLock seems to be more efficient (at our benchmarks)
263
+ * Using LWLock seems to be more efficient (at our benchmarks)
264
+ * Multimaster uses trash of 2N+1 lwlocks, where N is number of nodes.
265
+ * locks[0] is used to synchronize access to multimaster state,
266
+ * locks[1..N] are used to provide exclusive access to replication session for each node
267
+ * locks[N+1..2*N] are used to synchronize access to distributed lock graph at each node
265
268
* -------------------------------------------
266
269
*/
267
270
void MtmLock (LWLockMode mode )
@@ -316,6 +319,9 @@ timestamp_t MtmGetSystemTime(void)
316
319
return (timestamp_t )tv .tv_sec * USECS_PER_SEC + tv .tv_usec ;
317
320
}
318
321
322
+ /*
323
+ * Get adjusted system time: taking in account time shift
324
+ */
319
325
timestamp_t MtmGetCurrentTime (void )
320
326
{
321
327
return MtmGetSystemTime () + Mtm -> timeShift ;
@@ -610,13 +616,16 @@ MtmAdjustOldestXid(TransactionId xid)
610
616
}
611
617
return xid ;
612
618
}
619
+
613
620
/*
614
621
* -------------------------------------------
615
- * Transaction list manipulation
622
+ * Transaction list manipulation.
623
+ * All distributed transactions are linked in L1-list ordered by transaction start time.
624
+ * This list is inspected by MtmAdjustOldestXid and transactions which are not used in any snapshot at any node
625
+ * are removed from the list and from the hash.
616
626
* -------------------------------------------
617
627
*/
618
628
619
-
620
629
static void MtmTransactionListAppend (MtmTransState * ts )
621
630
{
622
631
if (!ts -> isEnqueued ) {
@@ -1293,6 +1302,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1293
1302
}
1294
1303
}
1295
1304
1305
+ /*
1306
+ * Send arbiter's message
1307
+ */
1296
1308
void MtmSendMessage (MtmArbiterMessage * msg )
1297
1309
{
1298
1310
SpinLockAcquire (& Mtm -> queueSpinlock );
@@ -1315,6 +1327,11 @@ void MtmSendMessage(MtmArbiterMessage* msg)
1315
1327
SpinLockRelease (& Mtm -> queueSpinlock );
1316
1328
}
1317
1329
1330
+ /*
1331
+ * Send arbiter's 2PC message. Right now only responses to coordinates are
1332
+ * sent through arbiter. Brodcasts from coordinator to noes are done
1333
+ * using logical decoding.
1334
+ */
1318
1335
void MtmSend2PCMessage (MtmTransState * ts , MtmMessageCode cmd )
1319
1336
{
1320
1337
MtmArbiterMessage msg ;
@@ -1347,6 +1364,11 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
1347
1364
}
1348
1365
}
1349
1366
1367
+ /*
1368
+ * Broadcase poll state message to all nodes.
1369
+ * This function is used to gather information about state of prepared transaction
1370
+ * at node startup or after crash of some node.
1371
+ */
1350
1372
static void MtmBroadcastPollMessage (MtmTransState * ts )
1351
1373
{
1352
1374
int i ;
@@ -1370,7 +1392,9 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
1370
1392
}
1371
1393
1372
1394
/*
1373
- * Restore state of recovered prepared transaction in memory
1395
+ * Restore state of recovered prepared transaction in memory.
1396
+ * This function is called at system startup to make it possible to
1397
+ * handle this prepared transactions in normal way.
1374
1398
*/
1375
1399
static void MtmLoadPreparedTransactions (void )
1376
1400
{
@@ -1426,6 +1450,10 @@ static void MtmStartRecovery()
1426
1450
MtmUnlock ();
1427
1451
}
1428
1452
1453
+
1454
+ /*
1455
+ * Prepare context for applying transaction at replica
1456
+ */
1429
1457
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
1430
1458
{
1431
1459
MtmTx .gtid = * gtid ;
@@ -1479,6 +1507,13 @@ XidStatus MtmGetCurrentTransactionStatus(void)
1479
1507
return MtmTx .status ;
1480
1508
}
1481
1509
1510
+ /*
1511
+ * Perform atomic exchange of global transaction status.
1512
+ * The problem is that because of concurrent applying transactions at replica by multiple
1513
+ * threads we can proceed ABORT request before PREPARE - when transaction is not yet
1514
+ * applied at this node and there is MtmTransState associated with this transactions.
1515
+ * We remember information about status of this transaction in MtmTransMap.
1516
+ */
1482
1517
XidStatus MtmExchangeGlobalTransactionStatus (char const * gid , XidStatus new_status )
1483
1518
{
1484
1519
MtmTransMap * tm ;
@@ -1526,6 +1561,9 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
1526
1561
return csn ;
1527
1562
}
1528
1563
1564
+ /*
1565
+ * Wakeup coordinator's backend when voting is completed
1566
+ */
1529
1567
void MtmWakeUpBackend (MtmTransState * ts )
1530
1568
{
1531
1569
if (!ts -> votingCompleted ) {
@@ -1536,6 +1574,10 @@ void MtmWakeUpBackend(MtmTransState* ts)
1536
1574
}
1537
1575
}
1538
1576
1577
+
1578
+ /*
1579
+ * Abort the transaction if it is not yet aborted
1580
+ */
1539
1581
void MtmAbortTransaction (MtmTransState * ts )
1540
1582
{
1541
1583
Assert (MtmLockCount != 0 ); /* should be invoked with exclsuive lock */
@@ -1561,6 +1603,11 @@ void MtmAbortTransaction(MtmTransState* ts)
1561
1603
* -------------------------------------------
1562
1604
*/
1563
1605
1606
+ /*
1607
+ * Handle critical errors while applying transaction at replica.
1608
+ * Such errors should cause shutdown of this cluster node to allow other nodes to continue serving client requests.
1609
+ * Other error will be just reported and ignored
1610
+ */
1564
1611
void MtmHandleApplyError (void )
1565
1612
{
1566
1613
ErrorData * edata = CopyErrorData ();
@@ -1570,13 +1617,15 @@ void MtmHandleApplyError(void)
1570
1617
case ERRCODE_IO_ERROR :
1571
1618
case ERRCODE_DATA_CORRUPTED :
1572
1619
case ERRCODE_INDEX_CORRUPTED :
1620
+ /* Should we really treate this errors as fatal?
1573
1621
case ERRCODE_SYSTEM_ERROR:
1574
1622
case ERRCODE_INTERNAL_ERROR:
1575
1623
case ERRCODE_OUT_OF_MEMORY:
1624
+ */
1576
1625
elog (WARNING , "Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u" ,
1577
1626
edata -> sqlerrcode , edata -> message , getpid ());
1578
- // MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1579
- // kill(PostmasterPid, SIGQUIT);
1627
+ MtmSwitchClusterMode (MTM_OUT_OF_SERVICE );
1628
+ kill (PostmasterPid , SIGQUIT );
1580
1629
break ;
1581
1630
}
1582
1631
FreeErrorData (edata );
@@ -1643,6 +1692,9 @@ static void MtmEnableNode(int nodeId)
1643
1692
elog (WARNING , "Enable node %d at xlog position %lx" , nodeId , GetXLogInsertRecPtr ());
1644
1693
}
1645
1694
1695
+ /*
1696
+ * Function call when recovery of node is completed
1697
+ */
1646
1698
void MtmRecoveryCompleted (void )
1647
1699
{
1648
1700
int i ;
@@ -1712,7 +1764,7 @@ static int64 MtmGetSlotLag(int nodeId)
1712
1764
1713
1765
/*
1714
1766
* This function is called by WAL sender when start sending new transaction.
1715
- * It returns true if specified node is in recovery mode. In this case we should send all transactions from WAL,
1767
+ * It returns true if specified node is in recovery mode. In this case we should send to it all transactions from WAL,
1716
1768
* not only coordinated by self node as in normal mode.
1717
1769
*/
1718
1770
bool MtmIsRecoveredNode (int nodeId )
@@ -1728,7 +1780,13 @@ bool MtmIsRecoveredNode(int nodeId)
1728
1780
}
1729
1781
}
1730
1782
1731
-
1783
+ /*
1784
+ * Check if wal sender replayed all transactions from WAL log.
1785
+ * It can never happen if there are many active transactions.
1786
+ * In this case we wait until gap between sent and current position in the
1787
+ * WAL becomes smaller than threshold value MtmMinRecoveryLag and
1788
+ * after it prohibit start of new transactions until WAL is completely replayed.
1789
+ */
1732
1790
bool MtmRecoveryCaughtUp (int nodeId , XLogRecPtr slotLSN )
1733
1791
{
1734
1792
bool caughtUp = false;
@@ -1822,7 +1880,7 @@ MtmCheckClusterLock()
1822
1880
MtmLock (LW_EXCLUSIVE );
1823
1881
continue ;
1824
1882
} else {
1825
- /* All lockers are synchronized their logs */
1883
+ /* All lockers have synchronized their logs */
1826
1884
/* Remove lock and mark them as recovered */
1827
1885
MTM_LOG1 ("Complete recovery of %d nodes (node mask %lx)" , Mtm -> nLockers , (long ) Mtm -> nodeLockerMask );
1828
1886
Assert (Mtm -> walSenderLockerMask == 0 );
@@ -2186,7 +2244,8 @@ static void MtmInitialize()
2186
2244
Mtm -> nAllNodes = MtmNodes ;
2187
2245
Mtm -> disabledNodeMask = 0 ;
2188
2246
Mtm -> connectivityMask = 0 ;
2189
- Mtm -> pglogicalNodeMask = 0 ;
2247
+ Mtm -> pglogicalReceiverMask = 0 ;
2248
+ Mtm -> pglogicalSenderMask = 0 ;
2190
2249
Mtm -> walSenderLockerMask = 0 ;
2191
2250
Mtm -> nodeLockerMask = 0 ;
2192
2251
Mtm -> reconnectMask = 0 ;
@@ -2900,8 +2959,8 @@ _PG_fini(void)
2900
2959
void MtmReceiverStarted (int nodeId )
2901
2960
{
2902
2961
MtmLock (LW_EXCLUSIVE );
2903
- if (!BIT_CHECK (Mtm -> pglogicalNodeMask , nodeId - 1 )) {
2904
- BIT_SET (Mtm -> pglogicalNodeMask , nodeId - 1 );
2962
+ if (!BIT_CHECK (Mtm -> pglogicalReceiverMask , nodeId - 1 )) {
2963
+ BIT_SET (Mtm -> pglogicalReceiverMask , nodeId - 1 );
2905
2964
if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2906
2965
MtmEnableNode (nodeId );
2907
2966
MtmCheckQuorum ();
@@ -3014,7 +3073,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
3014
3073
Mtm -> nReceivers = 0 ;
3015
3074
Mtm -> nSenders = 0 ;
3016
3075
Mtm -> recoveryCount += 1 ;
3017
- Mtm -> pglogicalNodeMask = 0 ;
3076
+ Mtm -> pglogicalReceiverMask = 0 ;
3077
+ Mtm -> pglogicalSenderMask = 0 ;
3018
3078
MtmUnlock ();
3019
3079
return REPLMODE_RECOVERY ;
3020
3080
}
@@ -3155,19 +3215,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3155
3215
MtmEnableNode (MtmReplicationNodeId );
3156
3216
MtmCheckQuorum ();
3157
3217
} else {
3158
- /* Force arbiter to reestablish connection with this nodem send heartbeat to inform this node that it was disabled and should perform recovery */
3218
+ /* Force arbiter to reestablish connection with this node, send heartbeat to inform this node that it was disabled and should perform recovery */
3159
3219
BIT_SET (Mtm -> reconnectMask , MtmReplicationNodeId - 1 );
3160
3220
MtmUnlock ();
3161
3221
elog (ERROR , "Disabled node %d tries to reconnect without recovery" , MtmReplicationNodeId );
3162
3222
}
3163
3223
} else {
3164
3224
MTM_LOG1 ("Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
3165
3225
}
3166
- elog (LOG , "Start %d senders and %d receivers from %d cluster status %s" , Mtm -> nSenders + 1 , Mtm -> nReceivers , Mtm -> nLiveNodes - 1 , MtmNodeStatusMnem [Mtm -> status ]);
3167
- MtmSenderStarted = 1 ;
3168
- if (++ Mtm -> nSenders == Mtm -> nLiveNodes - 1 && Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> status == MTM_CONNECTED ) {
3169
- /* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3170
- MtmSwitchClusterMode (MTM_ONLINE );
3226
+ if (!BIT_CHECK (Mtm -> pglogicalSenderMask , MtmReplicationNodeId - 1 )) {
3227
+ elog (LOG , "Start %d senders and %d receivers from %d cluster status %s" , Mtm -> nSenders + 1 , Mtm -> nReceivers , Mtm -> nLiveNodes - 1 , MtmNodeStatusMnem [Mtm -> status ]);
3228
+ BIT_SET (Mtm -> pglogicalSenderMask , MtmReplicationNodeId - 1 );
3229
+ if (++ Mtm -> nSenders == Mtm -> nLiveNodes - 1 && Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> status == MTM_CONNECTED ) {
3230
+ /* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3231
+ MtmSwitchClusterMode (MTM_ONLINE );
3232
+ }
3171
3233
}
3172
3234
BIT_SET (Mtm -> reconnectMask , MtmReplicationNodeId - 1 ); /* arbiter should try to reestablish connection with this node */
3173
3235
MtmUnlock ();
@@ -3227,14 +3289,15 @@ void MtmUpdateLsnMapping(int node_id, XLogRecPtr end_lsn)
3227
3289
static void
3228
3290
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
3229
3291
{
3230
- if ( MtmReplicationNodeId >= 0 ) {
3231
- MtmLock ( LW_EXCLUSIVE );
3232
- Mtm -> nSenders -= MtmSenderStarted ;
3233
- MtmUnlock () ;
3292
+ MtmLock ( LW_EXCLUSIVE );
3293
+ if ( MtmReplicationNodeId >= 0 && BIT_CHECK ( Mtm -> pglogicalSenderMask , MtmReplicationNodeId - 1 )) {
3294
+ BIT_CLEAR ( Mtm -> pglogicalSenderMask , MtmReplicationNodeId - 1 ) ;
3295
+ Mtm -> nSenders -= 1 ;
3234
3296
MTM_LOG1 ("Logical replication to node %d is stopped" , MtmReplicationNodeId );
3235
3297
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
3236
- MtmReplicationNodeId = -1 ; /* defuse on_proc_exit hook */
3298
+ MtmReplicationNodeId = -1 ; /* defuse MtmOnProcExit hook */
3237
3299
}
3300
+ MtmUnlock ();
3238
3301
}
3239
3302
3240
3303
/*
@@ -3949,6 +4012,10 @@ MtmGenerateGid(char* gid)
3949
4012
sprintf (gid , "MTM-%d-%d-%d" , MtmNodeId , MyProcPid , ++ localCount );
3950
4013
}
3951
4014
4015
+ /*
4016
+ * Replace normal commit with two-phase commit.
4017
+ * It is called either for commit of standalone command either for commit of transaction block.
4018
+ */
3952
4019
static bool MtmTwoPhaseCommit (MtmCurrentTrans * x )
3953
4020
{
3954
4021
// if (MyXactAccessedTempRel)
0 commit comments