@@ -154,6 +154,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
154
154
ProcessUtilityContext context , ParamListInfo params ,
155
155
DestReceiver * dest , char * completionTag );
156
156
157
+ /*
158
+ * Using LWLock seems to be more efficient (at our benchmarks)
159
+ */
157
160
void MtmLock (LWLockMode mode )
158
161
{
159
162
#ifdef USE_SPINLOCK
@@ -197,6 +200,9 @@ void MtmSleep(timestamp_t interval)
197
200
}
198
201
}
199
202
203
+ /**
204
+ * Return ascending unique timestamp which is used as CSN
205
+ */
200
206
csn_t MtmAssignCSN ()
201
207
{
202
208
csn_t csn = MtmGetCurrentTime ();
@@ -208,6 +214,9 @@ csn_t MtmAssignCSN()
208
214
return csn ;
209
215
}
210
216
217
+ /**
218
+ * "Adjust" system clock if we receive message from future
219
+ */
211
220
csn_t MtmSyncClock (csn_t global_csn )
212
221
{
213
222
csn_t local_csn ;
@@ -471,14 +480,23 @@ MtmXactCallback(XactEvent event, void *arg)
471
480
}
472
481
}
473
482
483
+ /*
484
+ * Check if this is "normal" user trnsaction which shoudl be distributed to other nodes
485
+ */
486
+ static bool
487
+ MtmIsUserTransaction ()
488
+ {
489
+ return IsNormalProcessingMode () && dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
490
+ }
491
+
474
492
static void
475
493
MtmBeginTransaction (MtmCurrentTrans * x )
476
494
{
477
495
if (x -> snapshot == INVALID_CSN ) {
478
496
MtmLock (LW_EXCLUSIVE );
479
497
x -> xid = GetCurrentTransactionIdIfAny ();
480
498
x -> isReplicated = false;
481
- x -> isDistributed = IsNormalProcessingMode () && dtm -> status == MTM_ONLINE && MtmDoReplication && ! am_walsender && ! IsBackgroundWorker && ! IsAutoVacuumWorkerProcess ();
499
+ x -> isDistributed = MtmIsUserTransaction ();
482
500
x -> containsDML = false;
483
501
x -> snapshot = MtmAssignCSN ();
484
502
x -> gtid .xid = InvalidTransactionId ;
@@ -489,7 +507,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
489
507
}
490
508
491
509
492
- /* This function is called at transaction start with multimaster ock set */
510
+ /*
511
+ * If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
512
+ * WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
513
+ * This function is called at transaction start with multimaster lock set
514
+ */
493
515
static void
494
516
MtmCheckClusterLock ()
495
517
{
@@ -507,6 +529,7 @@ MtmCheckClusterLock()
507
529
break ;
508
530
} else {
509
531
/* recovered replica catched up with master */
532
+ elog (WARNING , "WAL-sender %d complete receovery" , i );
510
533
dtm -> walSenderLockerMask &= ~((nodemask_t )1 << i );
511
534
}
512
535
}
@@ -524,6 +547,7 @@ MtmCheckClusterLock()
524
547
} else {
525
548
/* All lockers are synchronized their logs */
526
549
/* Remove lock and mark them as receovered */
550
+ elog (WARNING , "Complete recovery of %d nodes (node mask %lx)" , dtm -> nLockers , dtm -> nodeLockerMask );
527
551
Assert (dtm -> walSenderLockerMask == 0 );
528
552
Assert ((dtm -> nodeLockerMask & dtm -> disabledNodeMask ) == dtm -> nodeLockerMask );
529
553
dtm -> disabledNodeMask &= ~dtm -> nodeLockerMask ;
@@ -552,6 +576,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
552
576
x -> xid = GetCurrentTransactionId ();
553
577
554
578
MtmLock (LW_EXCLUSIVE );
579
+
580
+ /*
581
+ * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to cache-up
582
+ */
555
583
MtmCheckClusterLock ();
556
584
557
585
ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
@@ -580,6 +608,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
580
608
MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" , getpid (), x -> xid , ts -> csn );
581
609
}
582
610
611
+ /**
612
+ * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613
+ * WAL overflow
614
+ */
583
615
static void MtmCheckSlots ()
584
616
{
585
617
if (MtmMaxRecoveryLag != 0 && dtm -> disabledNodeMask != 0 )
@@ -636,17 +668,23 @@ void MtmSendNotificationMessage(MtmTransState* ts)
636
668
}
637
669
638
670
/*
639
- * This function is called by WAL sender when start sending new transaction
671
+ * This function is called by WAL sender when start sending new transaction.
672
+ * It returns true if specified node is in recovery mode. In this case we should send all transactions from WAL,
673
+ * not only coordinated by self node as in normal mode.
640
674
*/
641
675
bool MtmIsRecoveredNode (int nodeId )
642
676
{
643
677
if (BIT_CHECK (dtm -> disabledNodeMask , nodeId - 1 )) {
644
- Assert (MyWalSnd != NULL );
678
+ Assert (MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
645
679
if (!BIT_CHECK (dtm -> nodeLockerMask , nodeId - 1 )
646
680
&& MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
647
681
{
648
- /* Wal sender almost catched up */
649
- /* Lock cluster preventing new transaction to start until wal is completely replayed */
682
+ /*
683
+ * Wal sender almost catched up.
684
+ * Lock cluster preventing new transaction to start until wal is completely replayed.
685
+ * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
686
+ * Is there some better way to establish mapping between nodes ad WAL-seconder?
687
+ */
650
688
MtmLock (LW_EXCLUSIVE );
651
689
dtm -> nodeLockerMask |= (nodemask_t )1 << (nodeId - 1 );
652
690
dtm -> walSenderLockerMask |= (nodemask_t )1 << (MyWalSnd - WalSndCtl -> walsnds );
@@ -793,8 +831,8 @@ _PG_init(void)
793
831
DefineCustomIntVariable (
794
832
"multimaster.max_recovery_lag" ,
795
833
"Maximal lag of replication slot of failed node after which this slot is dropped to avoid transaction log overflow" ,
796
- "Dropping slog makes it not possible to recover node using logical replication mechanism, it will eb ncessary to completely copy content of some other nodes "
797
- "usimg basebackup or similar tool" ,
834
+ "Dropping slog makes it not possible to recover node using logical replication mechanism, it will be ncessary to completely copy content of some other nodes "
835
+ "usimg basebackup or similar tool. Zero value of parameter disable droipping slot. " ,
798
836
& MtmMaxRecoveryLag ,
799
837
100000000 ,
800
838
0 ,
@@ -990,6 +1028,7 @@ _PG_fini(void)
990
1028
static void MtmSwitchFromRecoveryToNormalMode ()
991
1029
{
992
1030
dtm -> status = MTM_ONLINE ;
1031
+ elog (WARNING , "Switch to normal mode" );
993
1032
/* ??? Something else to do here? */
994
1033
}
995
1034
@@ -1008,8 +1047,10 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
1008
1047
}
1009
1048
1010
1049
if (!TransactionIdIsValid (gtid -> xid )) {
1050
+ /* In case of recovery InvalidTransactionId is passed */
1011
1051
Assert (dtm -> status == MTM_RECOVERY );
1012
1052
} else if (dtm -> status == MTM_RECOVERY ) {
1053
+ /* When recovery is completed we get normal transaction ID and switch to normal mode */
1013
1054
MtmSwitchFromRecoveryToNormalMode ();
1014
1055
}
1015
1056
dtmTx .gtid = * gtid ;
@@ -1026,6 +1067,7 @@ void MtmReceiverStarted(int nodeId)
1026
1067
if (!BIT_CHECK (dtm -> pglogicalNodeMask , nodeId - 1 )) {
1027
1068
dtm -> pglogicalNodeMask |= (int64 )1 << (nodeId - 1 );
1028
1069
if (++ dtm -> nReceivers == dtm -> nNodes - 1 ) {
1070
+ elog (WARNING , "All receivers are started, switch to normal mode" );
1029
1071
Assert (dtm -> status == MTM_CONNECTED );
1030
1072
dtm -> status = MTM_ONLINE ;
1031
1073
}
@@ -1048,17 +1090,25 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
1048
1090
return snapshot ;
1049
1091
}
1050
1092
1093
+ /*
1094
+ * Determine when and how we should open replication slot.
1095
+ * Druing recovery we need to open only one replication slot from which node should receive all transactions.
1096
+ * Slots at other nodes should be removed
1097
+ */
1051
1098
MtmSlotMode MtmReceiverSlotMode (int nodeId )
1052
1099
{
1053
1100
while (dtm -> status != MTM_CONNECTED && dtm -> status != MTM_ONLINE ) {
1054
1101
if (dtm -> status == MTM_RECOVERY ) {
1055
1102
if (dtm -> recoverySlot == 0 || dtm -> recoverySlot == nodeId ) {
1103
+ /* Choose for recovery first available slot */
1056
1104
dtm -> recoverySlot = nodeId ;
1057
1105
return SLOT_OPEN_EXISTED ;
1058
1106
}
1059
1107
}
1108
+ /* delay opening of other slots until recovery is completed */
1060
1109
MtmSleep (STATUS_POLL_DELAY );
1061
1110
}
1111
+ /* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1062
1112
return dtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
1063
1113
}
1064
1114
0 commit comments