Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Use atomics to avoid locking in InjectionPointRun()
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:21:16 +0000 (10:21 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:22:11 +0000 (10:22 +0300)
This allows using injection points without having a PGPROC, like early
at backend startup, or in the postmaster.

The injection points facility is new in v17, so backpatch there.

Reviewed-by: Michael Paquier <michael@paquier.xyz>
Disussion: https://www.postgresql.org/message-id/4317a7f7-8d24-435e-9e49-29b72a3dc418@iki.fi

src/backend/utils/misc/injection_point.c
src/tools/pgindent/typedefs.list

index 48f29e9b60ab79390187921262abec6245c16ff3..84ad5e470d7eb723bf0a8501033ba35b08970c4f 100644 (file)
@@ -21,7 +21,6 @@
 
 #include "fmgr.h"
 #include "miscadmin.h"
-#include "port/pg_bitutils.h"
 #include "storage/fd.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 
 #ifdef USE_INJECTION_POINTS
 
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash;   /* find points from names */
-
 /* Field sizes */
 #define INJ_NAME_MAXLEN        64
 #define INJ_LIB_MAXLEN     128
 #define INJ_FUNC_MAXLEN        128
 #define INJ_PRIVATE_MAXLEN 1024
 
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
 typedef struct InjectionPointEntry
 {
+   /*
+    * Because injection points need to be usable without LWLocks, we use a
+    * generation counter on each entry to allow safe, lock-free reading.
+    *
+    * To read an entry, first read the current 'generation' value.  If it's
+    * even, then the slot is currently unused, and odd means it's in use.
+    * When reading the other fields, beware that they may change while
+    * reading them, if the entry is released and reused!  After reading the
+    * other fields, read 'generation' again: if its value hasn't changed, you
+    * can be certain that the other fields you read are valid.  Otherwise,
+    * the slot was concurrently recycled, and you should ignore it.
+    *
+    * When adding an entry, you must store all the other fields first, and
+    * then update the generation number, with an appropriate memory barrier
+    * in between. In addition to that protocol, you must also hold
+    * InjectionPointLock, to prevent two backends from modifying the array at
+    * the same time.
+    */
+   pg_atomic_uint64 generation;
+
    char        name[INJ_NAME_MAXLEN];  /* hash key */
    char        library[INJ_LIB_MAXLEN];    /* library */
    char        function[INJ_FUNC_MAXLEN];  /* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
    char        private_data[INJ_PRIVATE_MAXLEN];
 } InjectionPointEntry;
 
-#define INJECTION_POINT_HASH_INIT_SIZE 16
-#define INJECTION_POINT_HASH_MAX_SIZE  128
+#define MAX_INJECTION_POINTS   128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one.  It's just an
+ * optimization to avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+   pg_atomic_uint32 max_inuse;
+   InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
 
 /*
  * Backend local cache of injection callbacks already loaded, stored in
@@ -70,6 +96,14 @@ typedef struct InjectionPointCacheEntry
    char        name[INJ_NAME_MAXLEN];
    char        private_data[INJ_PRIVATE_MAXLEN];
    InjectionPointCallback callback;
+
+   /*
+    * Shmem slot and copy of its generation number when this cache entry was
+    * created.  They can be used to validate if the cached entry is still
+    * valid.
+    */
+   int         slot_idx;
+   uint64      generation;
 } InjectionPointCacheEntry;
 
 static HTAB *InjectionPointCache = NULL;
@@ -79,8 +113,10 @@ static HTAB *InjectionPointCache = NULL;
  *
  * Add an injection point to the local cache.
  */
-static void
+static InjectionPointCacheEntry *
 injection_point_cache_add(const char *name,
+                         int slot_idx,
+                         uint64 generation,
                          InjectionPointCallback callback,
                          const void *private_data)
 {
@@ -97,7 +133,7 @@ injection_point_cache_add(const char *name,
        hash_ctl.hcxt = TopMemoryContext;
 
        InjectionPointCache = hash_create("InjectionPoint cache hash",
-                                         INJECTION_POINT_HASH_MAX_SIZE,
+                                         MAX_INJECTION_POINTS,
                                          &hash_ctl,
                                          HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
    }
@@ -107,9 +143,12 @@ injection_point_cache_add(const char *name,
 
    Assert(!found);
    strlcpy(entry->name, name, sizeof(entry->name));
+   entry->slot_idx = slot_idx;
+   entry->generation = generation;
    entry->callback = callback;
-   if (private_data != NULL)
-       memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+   memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+   return entry;
 }
 
 /*
@@ -122,11 +161,10 @@ injection_point_cache_add(const char *name,
 static void
 injection_point_cache_remove(const char *name)
 {
-   /* leave if no cache */
-   if (InjectionPointCache == NULL)
-       return;
+   bool        found PG_USED_FOR_ASSERTS_ONLY;
 
-   (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+   (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+   Assert(found);
 }
 
 /*
@@ -134,29 +172,32 @@ injection_point_cache_remove(const char *name)
  *
  * Load an injection point into the local cache.
  */
-static void
-injection_point_cache_load(InjectionPointEntry *entry_by_name)
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
 {
    char        path[MAXPGPATH];
    void       *injection_callback_local;
 
    snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
-            entry_by_name->library, DLSUFFIX);
+            entry->library, DLSUFFIX);
 
    if (!pg_file_exists(path))
        elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
-            path, entry_by_name->name);
+            path, entry->name);
 
    injection_callback_local = (void *)
-       load_external_function(path, entry_by_name->function, false, NULL);
+       load_external_function(path, entry->function, false, NULL);
 
    if (injection_callback_local == NULL)
        elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
-            entry_by_name->function, path, entry_by_name->name);
-
-   /* add it to the local cache when found */
-   injection_point_cache_add(entry_by_name->name, injection_callback_local,
-                             entry_by_name->private_data);
+            entry->function, path, entry->name);
+
+   /* add it to the local cache */
+   return injection_point_cache_add(entry->name,
+                                    slot_idx,
+                                    generation,
+                                    injection_callback_local,
+                                    entry->private_data);
 }
 
 /*
@@ -193,8 +234,7 @@ InjectionPointShmemSize(void)
 #ifdef USE_INJECTION_POINTS
    Size        sz = 0;
 
-   sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
-                                        sizeof(InjectionPointEntry)));
+   sz = add_size(sz, sizeof(InjectionPointsCtl));
    return sz;
 #else
    return 0;
@@ -208,16 +248,20 @@ void
 InjectionPointShmemInit(void)
 {
 #ifdef USE_INJECTION_POINTS
-   HASHCTL     info;
-
-   /* key is a NULL-terminated string */
-   info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
-   info.entrysize = sizeof(InjectionPointEntry);
-   InjectionPointHash = ShmemInitHash("InjectionPoint hash",
-                                      INJECTION_POINT_HASH_INIT_SIZE,
-                                      INJECTION_POINT_HASH_MAX_SIZE,
-                                      &info,
-                                      HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+   bool        found;
+
+   ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+                                           sizeof(InjectionPointsCtl),
+                                           &found);
+   if (!IsUnderPostmaster)
+   {
+       Assert(!found);
+       pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+       for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+           pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+   }
+   else
+       Assert(found);
 #endif
 }
 
@@ -232,8 +276,10 @@ InjectionPointAttach(const char *name,
                     int private_data_size)
 {
 #ifdef USE_INJECTION_POINTS
-   InjectionPointEntry *entry_by_name;
-   bool        found;
+   InjectionPointEntry *entry;
+   uint64      generation;
+   uint32      max_inuse;
+   int         free_idx;
 
    if (strlen(name) >= INJ_NAME_MAXLEN)
        elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -253,21 +299,51 @@ InjectionPointAttach(const char *name,
     * exist.  For testing purposes this should be fine.
     */
    LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-   entry_by_name = (InjectionPointEntry *)
-       hash_search(InjectionPointHash, name,
-                   HASH_ENTER, &found);
-   if (found)
-       elog(ERROR, "injection point \"%s\" already defined", name);
+   max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+   free_idx = -1;
+
+   for (int idx = 0; idx < max_inuse; idx++)
+   {
+       entry = &ActiveInjectionPoints->entries[idx];
+       generation = pg_atomic_read_u64(&entry->generation);
+       if (generation % 2 == 0)
+       {
+           /*
+            * Found a free slot where we can add the new entry, but keep
+            * going so that we will find out if the entry already exists.
+            */
+           if (free_idx == -1)
+               free_idx = idx;
+       }
+
+       if (strcmp(entry->name, name) == 0)
+           elog(ERROR, "injection point \"%s\" already defined", name);
+   }
+   if (free_idx == -1)
+   {
+       if (max_inuse == MAX_INJECTION_POINTS)
+           elog(ERROR, "too many injection points");
+       free_idx = max_inuse;
+   }
+   entry = &ActiveInjectionPoints->entries[free_idx];
+   generation = pg_atomic_read_u64(&entry->generation);
+   Assert(generation % 2 == 0);
 
    /* Save the entry */
-   strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
-   entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
-   strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
-   entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
-   strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
-   entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+   strlcpy(entry->name, name, sizeof(entry->name));
+   entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+   strlcpy(entry->library, library, sizeof(entry->library));
+   entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+   strlcpy(entry->function, function, sizeof(entry->function));
+   entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
    if (private_data != NULL)
-       memcpy(entry_by_name->private_data, private_data, private_data_size);
+       memcpy(entry->private_data, private_data, private_data_size);
+
+   pg_write_barrier();
+   pg_atomic_write_u64(&entry->generation, generation + 1);
+
+   if (free_idx + 1 > max_inuse)
+       pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
 
    LWLockRelease(InjectionPointLock);
 
@@ -285,63 +361,177 @@ bool
 InjectionPointDetach(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-   bool        found;
+   bool        found = false;
+   int         idx;
+   int         max_inuse;
 
    LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-   hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
-   LWLockRelease(InjectionPointLock);
 
-   if (!found)
-       return false;
+   /* Find it in the shmem array, and mark the slot as unused */
+   max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+   for (idx = max_inuse - 1; idx >= 0; --idx)
+   {
+       InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+       uint64      generation;
+
+       generation = pg_atomic_read_u64(&entry->generation);
+       if (generation % 2 == 0)
+           continue;           /* empty slot */
+
+       if (strcmp(entry->name, name) == 0)
+       {
+           Assert(!found);
+           found = true;
+           pg_atomic_write_u64(&entry->generation, generation + 1);
+           break;
+       }
+   }
+
+   /* If we just removed the highest-numbered entry, update 'max_inuse' */
+   if (found && idx == max_inuse - 1)
+   {
+       for (; idx >= 0; --idx)
+       {
+           InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+           uint64      generation;
+
+           generation = pg_atomic_read_u64(&entry->generation);
+           if (generation % 2 != 0)
+               break;
+       }
+       pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+   }
+   LWLockRelease(InjectionPointLock);
 
-   return true;
+   return found;
 #else
    elog(ERROR, "Injection points are not supported by this build");
    return true;                /* silence compiler */
 #endif
 }
 
+#ifdef USE_INJECTION_POINTS
 /*
- * Load an injection point into the local cache.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
  *
- * This is useful to be able to load an injection point before running it,
- * especially if the injection point is called in a code path where memory
- * allocations cannot happen, like critical sections.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
  */
-void
-InjectionPointLoad(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
 {
-#ifdef USE_INJECTION_POINTS
-   InjectionPointEntry *entry_by_name;
-   bool        found;
+   uint32      max_inuse;
+   int         namelen;
+   InjectionPointEntry local_copy;
+   InjectionPointCacheEntry *cached;
 
-   LWLockAcquire(InjectionPointLock, LW_SHARED);
-   entry_by_name = (InjectionPointEntry *)
-       hash_search(InjectionPointHash, name,
-                   HASH_FIND, &found);
+   /*
+    * First read the number of in-use slots.  More entries can be added or
+    * existing ones can be removed while we're reading them.  If the entry
+    * we're looking for is concurrently added or removed, we might or might
+    * not see it.  That's OK.
+    */
+   max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+   if (max_inuse == 0)
+   {
+       if (InjectionPointCache)
+       {
+           hash_destroy(InjectionPointCache);
+           InjectionPointCache = NULL;
+       }
+       return NULL;
+   }
 
    /*
-    * If not found, do nothing and remove it from the local cache if it
-    * existed there.
+    * If we have this entry in the local cache already, check if the cached
+    * entry is still valid.
     */
-   if (!found)
+   cached = injection_point_cache_get(name);
+   if (cached)
    {
+       int         idx = cached->slot_idx;
+       InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+       if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+       {
+           /* still good */
+           return cached;
+       }
        injection_point_cache_remove(name);
-       LWLockRelease(InjectionPointLock);
-       return;
+       cached = NULL;
    }
 
-   /* Check first the local cache, and leave if this entry exists. */
-   if (injection_point_cache_get(name) != NULL)
+   /*
+    * Search the shared memory array.
+    *
+    * It's possible that the entry we're looking for is concurrently detached
+    * or attached.  Or detached *and* re-attached, to the same slot or a
+    * different slot.  Detach and re-attach is not an atomic operation, so
+    * it's OK for us to return the old value, NULL, or the new value in such
+    * cases.
+    */
+   namelen = strlen(name);
+   for (int idx = 0; idx < max_inuse; idx++)
    {
-       LWLockRelease(InjectionPointLock);
-       return;
+       InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+       uint64      generation;
+
+       /*
+        * Read the generation number so that we can detect concurrent
+        * modifications.  The read barrier ensures that the generation number
+        * is loaded before any of the other fields.
+        */
+       generation = pg_atomic_read_u64(&entry->generation);
+       if (generation % 2 == 0)
+           continue;           /* empty slot */
+       pg_read_barrier();
+
+       /* Is this the injection point we're looking for? */
+       if (memcmp(entry->name, name, namelen + 1) != 0)
+           continue;
+
+       /*
+        * The entry can change at any time, if the injection point is
+        * concurrently detached.  Copy it to local memory, and re-check the
+        * generation.  If the generation hasn't changed, we know our local
+        * copy is coherent.
+        */
+       memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+       pg_read_barrier();
+       if (pg_atomic_read_u64(&entry->generation) != generation)
+       {
+           /*
+            * The entry was concurrently detached.
+            *
+            * Continue the search, because if the generation number changed,
+            * we cannot trust the result of the name comparison we did above.
+            * It's theoretically possible that it falsely matched a mixed-up
+            * state of the old and new name, if the slot was recycled with a
+            * different name.
+            */
+           continue;
+       }
+
+       /* Success! Load it into the cache and return it */
+       return injection_point_cache_load(&local_copy, idx, generation);
    }
+   return NULL;
+}
+#endif
 
-   /* Nothing?  Then load it and leave */
-   injection_point_cache_load(entry_by_name);
-
-   LWLockRelease(InjectionPointLock);
+/*
+ * Load an injection point into the local cache.
+ *
+ * This is useful to be able to load an injection point before running it,
+ * especially if the injection point is called in a code path where memory
+ * allocations cannot happen, like critical sections.
+ */
+void
+InjectionPointLoad(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+   InjectionPointCacheRefresh(name);
 #else
    elog(ERROR, "Injection points are not supported by this build");
 #endif
@@ -349,50 +539,16 @@ InjectionPointLoad(const char *name)
 
 /*
  * Execute an injection point, if defined.
- *
- * Check first the shared hash table, and adapt the local cache depending
- * on that as it could be possible that an entry to run has been removed.
  */
 void
 InjectionPointRun(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-   InjectionPointEntry *entry_by_name;
-   bool        found;
    InjectionPointCacheEntry *cache_entry;
 
-   LWLockAcquire(InjectionPointLock, LW_SHARED);
-   entry_by_name = (InjectionPointEntry *)
-       hash_search(InjectionPointHash, name,
-                   HASH_FIND, &found);
-
-   /*
-    * If not found, do nothing and remove it from the local cache if it
-    * existed there.
-    */
-   if (!found)
-   {
-       injection_point_cache_remove(name);
-       LWLockRelease(InjectionPointLock);
-       return;
-   }
-
-   /*
-    * Check if the callback exists in the local cache, to avoid unnecessary
-    * external loads.
-    */
-   if (injection_point_cache_get(name) == NULL)
-   {
-       /* not found in local cache, so load and register it */
-       injection_point_cache_load(entry_by_name);
-   }
-
-   /* Now loaded, so get it. */
-   cache_entry = injection_point_cache_get(name);
-
-   LWLockRelease(InjectionPointLock);
-
-   cache_entry->callback(name, cache_entry->private_data);
+   cache_entry = InjectionPointCacheRefresh(name);
+   if (cache_entry)
+       cache_entry->callback(name, cache_entry->private_data);
 #else
    elog(ERROR, "Injection points are not supported by this build");
 #endif
index 635e6d6e21545954a9332fa55af26b0b0ab81e3d..b4d7f9217cec458de320d2ba640f9bcbc0e17b51 100644 (file)
@@ -1239,6 +1239,7 @@ InjectionPointCallback
 InjectionPointCondition
 InjectionPointConditionType
 InjectionPointEntry
+InjectionPointsCtl
 InjectionPointSharedState
 InlineCodeBlock
 InsertStmt