Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Move RecoveryLockList into a hash table.
authorThomas Munro <tmunro@postgresql.org>
Tue, 26 Jun 2018 06:23:17 +0000 (18:23 +1200)
committerThomas Munro <tmunro@postgresql.org>
Tue, 26 Jun 2018 06:23:17 +0000 (18:23 +1200)
Standbys frequently need to release all locks held by a given xid.
Instead of searching one big list linearly, let's create one list
per xid and put them in a hash table, so we can find what we need
in O(1) time.

Earlier analysis and a prototype were done by David Rowley, though
this isn't his patch.

Back-patch all the way.

Author: Thomas Munro
Diagnosed-by: David Rowley, Andres Freund
Reviewed-by: Andres Freund, Tom Lane, Robert Haas
Discussion: https://postgr.es/m/CAEepm%3D1mL0KiQ2KJ4yuPpLGX94a4Ns_W6TL4EGRouxWibu56pA%40mail.gmail.com
Discussion: https://postgr.es/m/CAKJS1f9vJ841HY%3DwonnLVbfkTWGYWdPN72VMxnArcGCjF3SywA%40mail.gmail.com

src/backend/storage/ipc/standby.c

index 55833238f576fd9efda3b1463adcb5b73412c842..bcaebc060290c00413e955fdb686bdf77063420d 100644 (file)
@@ -27,6 +27,8 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -36,7 +38,7 @@ int           vacuum_defer_cleanup_age;
 int            max_standby_archive_delay = 30 * 1000;
 int            max_standby_streaming_delay = 30 * 1000;
 
-static List *RecoveryLockList;
+static HTAB *RecoveryLockLists;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
                                       ProcSignalReason reason);
@@ -45,6 +47,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
+/*
+ * Keep track of all the locks owned by a given transaction.
+ */
+typedef struct RecoveryLockListsEntry
+{
+   TransactionId   xid;
+   List           *locks;
+} RecoveryLockListsEntry;
 
 /*
  * InitRecoveryTransactionEnvironment
@@ -62,6 +72,20 @@ void
 InitRecoveryTransactionEnvironment(void)
 {
    VirtualTransactionId vxid;
+   HASHCTL         hash_ctl;
+
+   /*
+    * Initialize the hash table for tracking the list of locks held by each
+    * transaction.
+    */
+   memset(&hash_ctl, 0, sizeof(hash_ctl));
+   hash_ctl.keysize = sizeof(TransactionId);
+   hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
+   hash_ctl.hash = tag_hash;
+   RecoveryLockLists = hash_create("RecoveryLockLists",
+                                   64,
+                                   &hash_ctl,
+                                   HASH_ELEM | HASH_FUNCTION);
 
    /*
     * Initialize shared invalidation management for Startup process, being
@@ -106,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
    /* Release all locks the tracked transactions were holding */
    StandbyReleaseAllLocks();
 
+   /* Destroy the hash table of locks. */
+   hash_destroy(RecoveryLockLists);
+   RecoveryLockLists = NULL;
+
    /* Cleanup our VirtualTransaction */
    VirtualXactLockTableCleanup();
 }
@@ -548,8 +576,8 @@ StandbyTimeoutHandler(void)
  * We only keep track of AccessExclusiveLocks, which are only ever held by
  * one transaction on one relation, and don't worry about lock queuing.
  *
- * We keep a single dynamically expandible list of locks in local memory,
- * RecoveryLockList, so we can keep track of the various entries made by
+ * We keep a hash table of lists of locks in local memory keyed by xid,
+ * RecoveryLockLists, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * We record the lock against the top-level xid, rather than individual
@@ -567,8 +595,10 @@ StandbyTimeoutHandler(void)
 void
 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 {
+   RecoveryLockListsEntry *entry;
    xl_standby_lock *newlock;
    LOCKTAG     locktag;
+   bool        found;
 
    /* Already processed? */
    if (!TransactionIdIsValid(xid) ||
@@ -582,11 +612,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
    /* dbOid is InvalidOid when we are locking a shared relation. */
    Assert(OidIsValid(relOid));
 
+   /* Create a new list for this xid, if we don't have one already. */
+   entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
+   if (!found)
+   {
+       entry->xid = xid;
+       entry->locks = NIL;
+   }
+
    newlock = palloc(sizeof(xl_standby_lock));
    newlock->xid = xid;
    newlock->dbOid = dbOid;
    newlock->relOid = relOid;
-   RecoveryLockList = lappend(RecoveryLockList, newlock);
+   entry->locks = lappend(entry->locks, newlock);
 
    /*
     * Attempt to acquire the lock as requested, if not resolve conflict
@@ -599,46 +637,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 }
 
 static void
-StandbyReleaseLocks(TransactionId xid)
+StandbyReleaseLockList(List *locks)
 {
-   ListCell   *cell,
-              *prev,
-              *next;
-
-   /*
-    * Release all matching locks and remove them from list
-    */
-   prev = NULL;
-   for (cell = list_head(RecoveryLockList); cell; cell = next)
+   while (locks)
    {
-       xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+       xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
+       LOCKTAG     locktag;
+       elog(trace_recovery(DEBUG4),
+            "releasing recovery lock: xid %u db %u rel %u",
+            lock->xid, lock->dbOid, lock->relOid);
+       SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+       if (!LockRelease(&locktag, AccessExclusiveLock, true))
+       {
+           elog(LOG,
+                "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+                lock->xid, lock->dbOid, lock->relOid);
+           Assert(false);
+       }
+       pfree(lock);
+       locks = list_delete_first(locks);
+   }
+}
 
-       next = lnext(cell);
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+   RecoveryLockListsEntry *entry;
 
-       if (!TransactionIdIsValid(xid) || lock->xid == xid)
+   if (TransactionIdIsValid(xid))
+   {
+       if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
        {
-           LOCKTAG     locktag;
-
-           elog(trace_recovery(DEBUG4),
-                "releasing recovery lock: xid %u db %u rel %u",
-                lock->xid, lock->dbOid, lock->relOid);
-           SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-           if (!LockRelease(&locktag, AccessExclusiveLock, true))
-               elog(LOG,
-                    "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                    lock->xid, lock->dbOid, lock->relOid);
-
-           RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-           pfree(lock);
+           StandbyReleaseLockList(entry->locks);
+           hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
        }
-       else
-           prev = cell;
    }
+   else
+       StandbyReleaseAllLocks();
 }
 
 /*
  * Release locks for a transaction tree, starting at xid down, from
- * RecoveryLockList.
+ * RecoveryLockLists.
  *
  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
  * to remove any AccessExclusiveLocks requested by a transaction.
@@ -660,30 +700,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
 void
 StandbyReleaseAllLocks(void)
 {
-   ListCell   *cell,
-              *prev,
-              *next;
-   LOCKTAG     locktag;
+   HASH_SEQ_STATUS status;
+   RecoveryLockListsEntry *entry;
 
    elog(trace_recovery(DEBUG2), "release all standby locks");
 
-   prev = NULL;
-   for (cell = list_head(RecoveryLockList); cell; cell = next)
+   hash_seq_init(&status, RecoveryLockLists);
+   while ((entry = hash_seq_search(&status)))
    {
-       xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-
-       next = lnext(cell);
-
-       elog(trace_recovery(DEBUG4),
-            "releasing recovery lock: xid %u db %u rel %u",
-            lock->xid, lock->dbOid, lock->relOid);
-       SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-       if (!LockRelease(&locktag, AccessExclusiveLock, true))
-           elog(LOG,
-                "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                lock->xid, lock->dbOid, lock->relOid);
-       RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-       pfree(lock);
+       StandbyReleaseLockList(entry->locks);
+       hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
    }
 }
 
@@ -695,22 +721,17 @@ StandbyReleaseAllLocks(void)
 void
 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 {
-   ListCell   *cell,
-              *prev,
-              *next;
-   LOCKTAG     locktag;
+   HASH_SEQ_STATUS status;
+   RecoveryLockListsEntry *entry;
 
-   prev = NULL;
-   for (cell = list_head(RecoveryLockList); cell; cell = next)
+   hash_seq_init(&status, RecoveryLockLists);
+   while ((entry = hash_seq_search(&status)))
    {
-       xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
        bool        remove = false;
 
-       next = lnext(cell);
+       Assert(TransactionIdIsValid(entry->xid));
 
-       Assert(TransactionIdIsValid(lock->xid));
-
-       if (StandbyTransactionIdIsPrepared(lock->xid))
+       if (StandbyTransactionIdIsPrepared(entry->xid))
            remove = false;
        else
        {
@@ -719,7 +740,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
            for (i = 0; i < nxids; i++)
            {
-               if (lock->xid == xids[i])
+               if (entry->xid == xids[i])
                {
                    found = true;
                    break;
@@ -735,19 +756,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
        if (remove)
        {
-           elog(trace_recovery(DEBUG4),
-                "releasing recovery lock: xid %u db %u rel %u",
-                lock->xid, lock->dbOid, lock->relOid);
-           SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-           if (!LockRelease(&locktag, AccessExclusiveLock, true))
-               elog(LOG,
-                    "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                    lock->xid, lock->dbOid, lock->relOid);
-           RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-           pfree(lock);
+           StandbyReleaseLockList(entry->locks);
+           hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
        }
-       else
-           prev = cell;
    }
 }