28
28
#include "storage/procarray.h"
29
29
#include "storage/sinvaladt.h"
30
30
#include "storage/standby.h"
31
+ #include "utils/hsearch.h"
32
+ #include "utils/memutils.h"
31
33
#include "utils/ps_status.h"
32
34
#include "utils/timeout.h"
33
35
#include "utils/timestamp.h"
@@ -37,14 +39,22 @@ int vacuum_defer_cleanup_age;
37
39
int max_standby_archive_delay = 30 * 1000 ;
38
40
int max_standby_streaming_delay = 30 * 1000 ;
39
41
40
- static List * RecoveryLockList ;
42
+ static HTAB * RecoveryLockLists ;
41
43
42
44
static void ResolveRecoveryConflictWithVirtualXIDs (VirtualTransactionId * waitlist ,
43
45
ProcSignalReason reason );
44
46
static void SendRecoveryConflictWithBufferPin (ProcSignalReason reason );
45
47
static XLogRecPtr LogCurrentRunningXacts (RunningTransactions CurrRunningXacts );
46
48
static void LogAccessExclusiveLocks (int nlocks , xl_standby_lock * locks );
47
49
50
+ /*
51
+ * Keep track of all the locks owned by a given transaction.
52
+ */
53
+ typedef struct RecoveryLockListsEntry
54
+ {
55
+ TransactionId xid ;
56
+ List * locks ;
57
+ } RecoveryLockListsEntry ;
48
58
49
59
/*
50
60
* InitRecoveryTransactionEnvironment
62
72
InitRecoveryTransactionEnvironment (void )
63
73
{
64
74
VirtualTransactionId vxid ;
75
+ HASHCTL hash_ctl ;
76
+
77
+ /*
78
+ * Initialize the hash table for tracking the list of locks held by each
79
+ * transaction.
80
+ */
81
+ memset (& hash_ctl , 0 , sizeof (hash_ctl ));
82
+ hash_ctl .keysize = sizeof (TransactionId );
83
+ hash_ctl .entrysize = sizeof (RecoveryLockListsEntry );
84
+ RecoveryLockLists = hash_create ("RecoveryLockLists" ,
85
+ 64 ,
86
+ & hash_ctl ,
87
+ HASH_ELEM | HASH_BLOBS );
65
88
66
89
/*
67
90
* Initialize shared invalidation management for Startup process, being
@@ -106,6 +129,10 @@ ShutdownRecoveryTransactionEnvironment(void)
106
129
/* Release all locks the tracked transactions were holding */
107
130
StandbyReleaseAllLocks ();
108
131
132
+ /* Destroy the hash table of locks. */
133
+ hash_destroy (RecoveryLockLists );
134
+ RecoveryLockLists = NULL ;
135
+
109
136
/* Cleanup our VirtualTransaction */
110
137
VirtualXactLockTableCleanup ();
111
138
}
@@ -585,8 +612,8 @@ StandbyLockTimeoutHandler(void)
585
612
* We only keep track of AccessExclusiveLocks, which are only ever held by
586
613
* one transaction on one relation.
587
614
*
588
- * We keep a single dynamically expandible list of locks in local memory,
589
- * RecoveryLockList , so we can keep track of the various entries made by
615
+ * We keep a hash table of lists of locks in local memory keyed by xid ,
616
+ * RecoveryLockLists , so we can keep track of the various entries made by
590
617
* the Startup process's virtual xid in the shared lock table.
591
618
*
592
619
* We record the lock against the top-level xid, rather than individual
@@ -604,8 +631,10 @@ StandbyLockTimeoutHandler(void)
604
631
void
605
632
StandbyAcquireAccessExclusiveLock (TransactionId xid , Oid dbOid , Oid relOid )
606
633
{
634
+ RecoveryLockListsEntry * entry ;
607
635
xl_standby_lock * newlock ;
608
636
LOCKTAG locktag ;
637
+ bool found ;
609
638
610
639
/* Already processed? */
611
640
if (!TransactionIdIsValid (xid ) ||
@@ -619,58 +648,68 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
619
648
/* dbOid is InvalidOid when we are locking a shared relation. */
620
649
Assert (OidIsValid (relOid ));
621
650
651
+ /* Create a new list for this xid, if we don't have one already. */
652
+ entry = hash_search (RecoveryLockLists , & xid , HASH_ENTER , & found );
653
+ if (!found )
654
+ {
655
+ entry -> xid = xid ;
656
+ entry -> locks = NIL ;
657
+ }
658
+
622
659
newlock = palloc (sizeof (xl_standby_lock ));
623
660
newlock -> xid = xid ;
624
661
newlock -> dbOid = dbOid ;
625
662
newlock -> relOid = relOid ;
626
- RecoveryLockList = lappend (RecoveryLockList , newlock );
663
+ entry -> locks = lappend (entry -> locks , newlock );
627
664
628
665
SET_LOCKTAG_RELATION (locktag , newlock -> dbOid , newlock -> relOid );
629
666
630
667
LockAcquireExtended (& locktag , AccessExclusiveLock , true, false, false);
631
668
}
632
669
633
670
static void
634
- StandbyReleaseLocks ( TransactionId xid )
671
+ StandbyReleaseLockList ( List * locks )
635
672
{
636
- ListCell * cell ,
637
- * prev ,
638
- * next ;
639
-
640
- /*
641
- * Release all matching locks and remove them from list
642
- */
643
- prev = NULL ;
644
- for (cell = list_head (RecoveryLockList ); cell ; cell = next )
673
+ while (locks )
645
674
{
646
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
675
+ xl_standby_lock * lock = (xl_standby_lock * ) linitial (locks );
676
+ LOCKTAG locktag ;
677
+ elog (trace_recovery (DEBUG4 ),
678
+ "releasing recovery lock: xid %u db %u rel %u" ,
679
+ lock -> xid , lock -> dbOid , lock -> relOid );
680
+ SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
681
+ if (!LockRelease (& locktag , AccessExclusiveLock , true))
682
+ {
683
+ elog (LOG ,
684
+ "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
685
+ lock -> xid , lock -> dbOid , lock -> relOid );
686
+ Assert (false);
687
+ }
688
+ pfree (lock );
689
+ locks = list_delete_first (locks );
690
+ }
691
+ }
647
692
648
- next = lnext (cell );
693
+ static void
694
+ StandbyReleaseLocks (TransactionId xid )
695
+ {
696
+ RecoveryLockListsEntry * entry ;
649
697
650
- if (!TransactionIdIsValid (xid ) || lock -> xid == xid )
698
+ if (TransactionIdIsValid (xid ))
699
+ {
700
+ if ((entry = hash_search (RecoveryLockLists , & xid , HASH_FIND , NULL )))
651
701
{
652
- LOCKTAG locktag ;
653
-
654
- elog (trace_recovery (DEBUG4 ),
655
- "releasing recovery lock: xid %u db %u rel %u" ,
656
- lock -> xid , lock -> dbOid , lock -> relOid );
657
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
658
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
659
- elog (LOG ,
660
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
661
- lock -> xid , lock -> dbOid , lock -> relOid );
662
-
663
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
664
- pfree (lock );
702
+ StandbyReleaseLockList (entry -> locks );
703
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
665
704
}
666
- else
667
- prev = cell ;
668
705
}
706
+ else
707
+ StandbyReleaseAllLocks ();
669
708
}
670
709
671
710
/*
672
711
* Release locks for a transaction tree, starting at xid down, from
673
- * RecoveryLockList .
712
+ * RecoveryLockLists .
674
713
*
675
714
* Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
676
715
* to remove any AccessExclusiveLocks requested by a transaction.
@@ -692,30 +731,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
692
731
void
693
732
StandbyReleaseAllLocks (void )
694
733
{
695
- ListCell * cell ,
696
- * prev ,
697
- * next ;
698
- LOCKTAG locktag ;
734
+ HASH_SEQ_STATUS status ;
735
+ RecoveryLockListsEntry * entry ;
699
736
700
737
elog (trace_recovery (DEBUG2 ), "release all standby locks" );
701
738
702
- prev = NULL ;
703
- for ( cell = list_head ( RecoveryLockList ); cell ; cell = next )
739
+ hash_seq_init ( & status , RecoveryLockLists ) ;
740
+ while (( entry = hash_seq_search ( & status )) )
704
741
{
705
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
706
-
707
- next = lnext (cell );
708
-
709
- elog (trace_recovery (DEBUG4 ),
710
- "releasing recovery lock: xid %u db %u rel %u" ,
711
- lock -> xid , lock -> dbOid , lock -> relOid );
712
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
713
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
714
- elog (LOG ,
715
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
716
- lock -> xid , lock -> dbOid , lock -> relOid );
717
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
718
- pfree (lock );
742
+ StandbyReleaseLockList (entry -> locks );
743
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
719
744
}
720
745
}
721
746
@@ -727,22 +752,17 @@ StandbyReleaseAllLocks(void)
727
752
void
728
753
StandbyReleaseOldLocks (int nxids , TransactionId * xids )
729
754
{
730
- ListCell * cell ,
731
- * prev ,
732
- * next ;
733
- LOCKTAG locktag ;
755
+ HASH_SEQ_STATUS status ;
756
+ RecoveryLockListsEntry * entry ;
734
757
735
- prev = NULL ;
736
- for ( cell = list_head ( RecoveryLockList ); cell ; cell = next )
758
+ hash_seq_init ( & status , RecoveryLockLists ) ;
759
+ while (( entry = hash_seq_search ( & status )) )
737
760
{
738
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
739
761
bool remove = false;
740
762
741
- next = lnext ( cell );
763
+ Assert ( TransactionIdIsValid ( entry -> xid ) );
742
764
743
- Assert (TransactionIdIsValid (lock -> xid ));
744
-
745
- if (StandbyTransactionIdIsPrepared (lock -> xid ))
765
+ if (StandbyTransactionIdIsPrepared (entry -> xid ))
746
766
remove = false;
747
767
else
748
768
{
@@ -751,7 +771,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
751
771
752
772
for (i = 0 ; i < nxids ; i ++ )
753
773
{
754
- if (lock -> xid == xids [i ])
774
+ if (entry -> xid == xids [i ])
755
775
{
756
776
found = true;
757
777
break ;
@@ -767,19 +787,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
767
787
768
788
if (remove )
769
789
{
770
- elog (trace_recovery (DEBUG4 ),
771
- "releasing recovery lock: xid %u db %u rel %u" ,
772
- lock -> xid , lock -> dbOid , lock -> relOid );
773
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
774
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
775
- elog (LOG ,
776
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
777
- lock -> xid , lock -> dbOid , lock -> relOid );
778
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
779
- pfree (lock );
790
+ StandbyReleaseLockList (entry -> locks );
791
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
780
792
}
781
- else
782
- prev = cell ;
783
793
}
784
794
}
785
795
0 commit comments