45
45
#include "storage/proc.h"
46
46
#include "utils/syscache.h"
47
47
#include "replication/walsender.h"
48
+ #include "replication/walsender_private.h"
48
49
#include "replication/slot.h"
49
50
#include "port/atomics.h"
50
51
#include "tcop/utility.h"
@@ -60,16 +61,14 @@ typedef struct {
60
61
bool isReplicated ; /* transaction on replica */
61
62
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
62
63
bool containsDML ; /* transaction contains DML statements */
63
- bool isPrepared ; /* transaction was prepared for commit */
64
64
csn_t snapshot ; /* transaction snaphsot */
65
65
} MtmCurrentTrans ;
66
66
67
67
/* #define USE_SPINLOCK 1 */
68
68
69
69
typedef enum
70
70
{
71
- HASH_LOCK_ID ,
72
- COMMIT_LOCK_ID ,
71
+ MTM_STATE_LOCK_ID ,
73
72
N_LOCKS
74
73
} MtmLockIds ;
75
74
@@ -142,6 +141,7 @@ int MtmReconnectAttempts;
142
141
static int MtmQueueSize ;
143
142
static int MtmWorkers ;
144
143
static int MtmVacuumDelay ;
144
+ static int MtmMinRecoveryLag ;
145
145
146
146
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
147
147
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -158,7 +158,7 @@ void MtmLock(LWLockMode mode)
158
158
#ifdef USE_SPINLOCK
159
159
SpinLockAcquire (& dtm -> spinlock );
160
160
#else
161
- LWLockAcquire (dtm -> locks [HASH_LOCK_ID ], mode );
161
+ LWLockAcquire (( LWLockId ) & dtm -> locks [MTM_STATE_LOCK_ID ], mode );
162
162
#endif
163
163
}
164
164
@@ -167,7 +167,7 @@ void MtmUnlock(void)
167
167
#ifdef USE_SPINLOCK
168
168
SpinLockRelease (& dtm -> spinlock );
169
169
#else
170
- LWLockRelease (dtm -> locks [HASH_LOCK_ID ]);
170
+ LWLockRelease (( LWLockId ) & dtm -> locks [MTM_STATE_LOCK_ID ]);
171
171
#endif
172
172
}
173
173
@@ -426,12 +426,14 @@ static void MtmInitialize()
426
426
dtm -> nNodes = MtmNodes ;
427
427
dtm -> disabledNodeMask = 0 ;
428
428
dtm -> pglogicalNodeMask = 0 ;
429
+ dtm -> walSenderLockerMask = 0 ;
430
+ dtm -> nodeLockerMask = 0 ;
431
+ dtm -> nLockers = 0 ;
429
432
dtm -> votingTransactions = NULL ;
430
433
dtm -> transListHead = NULL ;
431
434
dtm -> transListTail = & dtm -> transListHead ;
432
435
dtm -> nReceivers = 0 ;
433
436
dtm -> timeShift = 0 ;
434
- pg_atomic_write_u32 (& dtm -> nCommittingTrans , 0 );
435
437
PGSemaphoreCreate (& dtm -> votingSemaphore );
436
438
PGSemaphoreReset (& dtm -> votingSemaphore );
437
439
SpinLockInit (& dtm -> spinlock );
@@ -476,7 +478,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476
478
x -> xid = GetCurrentTransactionIdIfAny ();
477
479
x -> isReplicated = false;
478
480
x -> isDistributed = IsNormalProcessingMode () && dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
479
- x -> isPrepared = false;
480
481
x -> containsDML = false;
481
482
x -> snapshot = MtmAssignCSN ();
482
483
x -> gtid .xid = InvalidTransactionId ;
@@ -487,6 +488,53 @@ MtmBeginTransaction(MtmCurrentTrans* x)
487
488
}
488
489
489
490
491
+ /* This function is called at transaction start with multimaster ock set */
492
+ static void
493
+ MtmCheckClusterLock ()
494
+ {
495
+ while (true)
496
+ {
497
+ nodemask_t mask = dtm -> walSenderLockerMask ;
498
+ if (mask != 0 ) {
499
+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
500
+ int i ;
501
+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
502
+ for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
503
+ if (mask & 1 ) {
504
+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
505
+ /* recovery is in progress */
506
+ break ;
507
+ } else {
508
+ /* recovered replica catched up with master */
509
+ dtm -> walSenderLockerMask &= ~((nodemask_t )1 << i );
510
+ }
511
+ }
512
+ }
513
+ if (mask != 0 ) {
514
+ /* some "almost catch-up" wal-senders are still working */
515
+ /* Do not start new transactions until them complete */
516
+ MtmUnlock ();
517
+ MtmSleep (delay );
518
+ if (delay * 2 <= MAX_WAIT_TIMEOUT ) {
519
+ delay *= 2 ;
520
+ }
521
+ MtmLock (LW_EXCLUSIVE );
522
+ continue ;
523
+ } else {
524
+ /* All lockers are synchronized their logs */
525
+ /* Remove lock and mark them as receovered */
526
+ Assert (dtm -> walSenderLockerMask == 0 );
527
+ Assert ((dtm -> nodeLockerMask & dtm -> disabledNodeMask ) == dtm -> nodeLockerMask );
528
+ dtm -> disabledNodeMask &= ~dtm -> nodeLockerMask ;
529
+ dtm -> nNodes += dtm -> nLockers ;
530
+ dtm -> nLockers = 0 ;
531
+ dtm -> nodeLockerMask = 0 ;
532
+ }
533
+ }
534
+ break ;
535
+ }
536
+ }
537
+
490
538
/*
491
539
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
492
540
* before commit
@@ -499,15 +547,12 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
499
547
if (!x -> isDistributed ) {
500
548
return ;
501
549
}
502
- /* Check that commits are not disabled */
503
- LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ], LW_SHARED );
504
- LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]);
505
550
506
- pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions , 1 );
507
- x -> isPrepared = true;
508
551
x -> xid = GetCurrentTransactionId ();
509
552
510
553
MtmLock (LW_EXCLUSIVE );
554
+ MtmCheckClusterLock ();
555
+
511
556
ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
512
557
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
513
558
ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
@@ -546,9 +591,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
546
591
MtmAdjustSubtransactions (ts );
547
592
MtmUnlock ();
548
593
}
549
- if (x -> isPrepared ) {
550
- pg_atomic_fetch_add_u32 (dtm -> nCommittingTransactions , -1 );
551
- }
552
594
x -> snapshot = INVALID_CSN ;
553
595
x -> xid = InvalidTransactionId ;
554
596
x -> gtid .xid = InvalidTransactionId ;
@@ -568,39 +610,29 @@ void MtmSendNotificationMessage(MtmTransState* ts)
568
610
}
569
611
}
570
612
571
- void MtmUpdateStatus (bool recovered )
572
- {
573
- if (dtm -> status == MTM_RECOVERY ) {
574
- MtmLock (LW_EXCLUSIVE );
575
- dtm -> status = MTM_ONLINE ; /* Is it all we shoudl do t switch to nortmal state */
576
- MtmUnlock ();
577
- }
578
- }
579
-
580
- void MtmRecoveryCompleted (int nodeId )
613
+ /*
614
+ * This function is called by WAL sender when start sending new transaction
615
+ */
616
+ bool MtmIsRecoveredNode (int nodeId )
581
617
{
582
- if (BIT_CHECK (dtm -> pglogicalNodeMask , nodeId - 1 )) {
583
- if (MyWalSnd -> sentPtr == GetXLogInsertRecPtr ()) {
584
- /* Ok, now we done with recovery of node */
618
+ if (BIT_CHECK (dtm -> disabledNodeMask , nodeId - 1 )) {
619
+ Assert (MyWalSnd != NULL );
620
+ if (!BIT_CHECK (dtm -> nodeLockerMask , nodeId - 1 )
621
+ && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
622
+ {
623
+ /* Wal sender almost catched up */
624
+ /* Lock cluster preventing new transaction to start until wal is completely replayed */
585
625
MtmLock (LW_EXCLUSIVE );
586
- dtm -> pglogicalNodeMask &= (int64 )1 << (nodeId - 1 ); /* now node is assumed as recovered */
587
- dtm -> nNodes += 1 ;
626
+ dtm -> nodeLockerMask |= (nodemask_t )1 << (nodeId - 1 );
627
+ dtm -> walSenderLockerMask |= (nodemask_t )1 << (MyWalSnd - WalSndCtl -> walsnds );
628
+ dtm -> nLockers += 1 ;
588
629
MtmUnlock ();
589
-
590
- LWLockRelease (dtm -> locks [COMMIT_LOCK_ID ]); /* enable commits */
591
-
592
- return true;
593
- } else if (MyWalSnd -> sentPtr + MtmSlotDelayThreashold > GetXLogInsertRecPtr ()) {
594
- /* we almost done with recovery of node.. */
595
- LWLockAcquire (dtm -> locks [COMMIT_LOCK_ID ], LW_EXCLUSIVE ); /* disable new commits */
596
630
}
597
- return false;
598
- } else {
599
631
return true;
600
632
}
633
+ return false;
601
634
}
602
635
603
-
604
636
static bool
605
637
MtmCommitTransaction (TransactionId xid , int nsubxids , TransactionId * subxids )
606
638
{
@@ -717,6 +749,21 @@ _PG_init(void)
717
749
if (!process_shared_preload_libraries_in_progress )
718
750
return ;
719
751
752
+ DefineCustomIntVariable (
753
+ "multimaster.min_recovery_lag" ,
754
+ "Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed" ,
755
+ NULL ,
756
+ & MtmMinRecoveryLag ,
757
+ 100000 ,
758
+ 1 ,
759
+ INT_MAX ,
760
+ PGC_BACKEND ,
761
+ 0 ,
762
+ NULL ,
763
+ NULL ,
764
+ NULL
765
+ );
766
+
720
767
DefineCustomIntVariable (
721
768
"multimaster.vacuum_delay" ,
722
769
"Minimal age of records which can be vacuumed (seconds)" ,
@@ -897,6 +944,12 @@ _PG_fini(void)
897
944
* ***************************************************************************
898
945
*/
899
946
947
+ static void MtmSwitchFromRecoveryToNormalMode ()
948
+ {
949
+ dtm -> status = MTM_ONLINE ;
950
+ /* ??? Something else to do here? */
951
+ }
952
+
900
953
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
901
954
{
902
955
csn_t localSnapshot ;
@@ -910,6 +963,11 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
910
963
elog (ERROR , "Too old snapshot: requested %ld, current %ld" , globalSnapshot , localSnapshot );
911
964
}
912
965
966
+ if (!TransactionIdIsValid (gtid -> xid )) {
967
+ Assert (dtm -> status == MTM_RECOVERY );
968
+ } else if (dtm -> status == MTM_RECOVERY ) {
969
+ MtmSwitchFromRecoveryToNormalMode ();
970
+ }
913
971
dtmTx .gtid = * gtid ;
914
972
dtmTx .xid = GetCurrentTransactionId ();
915
973
dtmTx .snapshot = globalSnapshot ;
0 commit comments