Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Use HTAB for replication slot statistics.
authorAmit Kapila <akapila@postgresql.org>
Tue, 27 Apr 2021 03:39:11 +0000 (09:09 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 27 Apr 2021 03:39:11 +0000 (09:09 +0530)
Previously, we used to use the array of size max_replication_slots to
store stats for replication slots. But that had two problems in the cases
where a message for dropping a slot gets lost: 1) the stats for the new
slot are not recorded if the array is full and 2) writing beyond the end
of the array if the user reduces the max_replication_slots.

This commit uses HTAB for replication slot statistics, resolving both
problems. Now, pgstat_vacuum_stat() search for all the dead replication
slots in stats hashtable and tell the collector to remove them. To avoid
showing the stats for the already-dropped slots, pg_stat_replication_slots
view searches slot stats by the slot name taken from pg_replication_slots.

Also, we send a message for creating a slot at slot creation, initializing
the stats. This reduces the possibility that the stats are accumulated
into the old slot stats when a message for dropping a slot gets lost.

Reported-by: Andres Freund
Author: Sawada Masahiko, test case by Vignesh C
Reviewed-by: Amit Kapila, Vignesh C, Dilip Kumar
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de

12 files changed:
contrib/test_decoding/t/001_repl_stats.pl
src/backend/catalog/system_views.sql
src/backend/postmaster/pgstat.c
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/slot.h
src/test/regress/expected/rules.out
src/tools/pgindent/typedefs.list

index 11b6cd9b9c70b2effb0908f8932f3519a8a458a6..3ab0e8072283063ef1715a069da932015829ec00 100644 (file)
@@ -2,9 +2,10 @@
 # drop replication slot and restart.
 use strict;
 use warnings;
+use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 1;
+use Test::More tests => 2;
 
 # Test set-up
 my $node = get_new_node('test');
@@ -12,9 +13,22 @@ $node->init(allows_streaming => 'logical');
 $node->append_conf('postgresql.conf', 'synchronous_commit = on');
 $node->start;
 
+# Check that replication slot stats are expected.
+sub test_slot_stats
+{
+   my ($node, $expected, $msg) = @_;
+
+   my $result = $node->safe_psql(
+       'postgres', qq[
+       SELECT slot_name, total_txns > 0 AS total_txn,
+              total_bytes > 0 AS total_bytes
+              FROM pg_stat_replication_slots
+              ORDER BY slot_name]);
+   is($result, $expected, $msg);
+}
+
 # Create table.
-$node->safe_psql('postgres',
-        "CREATE TABLE test_repl_stat(col1 int)");
+$node->safe_psql('postgres', "CREATE TABLE test_repl_stat(col1 int)");
 
 # Create replication slots.
 $node->safe_psql(
@@ -26,7 +40,8 @@ $node->safe_psql(
 ]);
 
 # Insert some data.
-$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+$node->safe_psql('postgres',
+   "INSERT INTO test_repl_stat values(generate_series(1, 5));");
 
 $node->safe_psql(
    'postgres', qq[
@@ -50,27 +65,51 @@ $node->poll_query_until(
 
 # Test to drop one of the replication slot and verify replication statistics data is
 # fine after restart.
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+$node->safe_psql('postgres',
+   "SELECT pg_drop_replication_slot('regression_slot4')");
 
 $node->stop;
 $node->start;
 
 # Verify statistics data present in pg_stat_replication_slots are sane after
 # restart.
-my $result = $node->safe_psql('postgres',
-   "SELECT slot_name, total_txns > 0 AS total_txn,
-   total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
-   ORDER BY slot_name"
-);
-is($result, qq(regression_slot1|t|t
+test_slot_stats(
+   $node,
+   qq(regression_slot1|t|t
 regression_slot2|t|t
-regression_slot3|t|t), 'check replication statistics are updated');
+regression_slot3|t|t),
+   'check replication statistics are updated');
+
+# Test to remove one of the replication slots and adjust
+# max_replication_slots accordingly to the number of slots. This leads
+# to a mismatch between the number of slots present in the stats file and the
+# number of stats present in the shared memory, simulating the scenario for
+# drop slot message lost by the statistics collector process. We verify
+# replication statistics data is fine after restart.
+
+$node->stop;
+my $datadir           = $node->data_dir;
+my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
+
+rmtree($slot3_replslotdir);
+
+$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+test_slot_stats(
+   $node,
+   qq(regression_slot1|t|t
+regression_slot2|t|t),
+   'check replication statistics after removing the slot file');
 
 # cleanup
 $node->safe_psql('postgres', "DROP TABLE test_repl_stat");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+$node->safe_psql('postgres',
+   "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres',
+   "SELECT pg_drop_replication_slot('regression_slot2')");
 
 # shutdown
 $node->stop;
index 70e578894f5a01bcf0e5124e5bbd4f3fd2875ac5..08f95c43cae7732663ffed2c77dab3ea44127204 100644 (file)
@@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
-CREATE VIEW pg_stat_replication_slots AS
-    SELECT
-            s.slot_name,
-            s.spill_txns,
-            s.spill_count,
-            s.spill_bytes,
-            s.stream_txns,
-            s.stream_count,
-            s.stream_bytes,
-            s.total_txns,
-            s.total_bytes,
-            s.stats_reset
-    FROM pg_stat_get_replication_slots() AS s;
-
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.slot_name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
+            s.stats_reset
+    FROM pg_replication_slots as r,
+        LATERAL pg_stat_get_replication_slot(slot_name) as s
+    WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
index 6e8dee97842bf5b48c12f7896451ede8505cbf1e..ba335fd342991d00472d8b1b51d9ae71a69616ee 100644 (file)
 #define PGSTAT_DB_HASH_SIZE        16
 #define PGSTAT_TAB_HASH_SIZE   512
 #define PGSTAT_FUNCTION_HASH_SIZE  512
+#define PGSTAT_REPLSLOT_HASH_SIZE  32
 
 
 /* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int nReplSlotStats;
+static HTAB *replSlotStatHash = NULL;
 static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
-static int pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
    /* Clean up */
    hash_destroy(htab);
 
+   /*
+    * Search for all the dead replication slots in stats hashtable and tell
+    * the stats collector to drop them.
+    */
+   if (replSlotStatHash)
+   {
+       PgStat_StatReplSlotEntry *slotentry;
+
+       hash_seq_init(&hstat, replSlotStatHash);
+       while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+       {
+           CHECK_FOR_INTERRUPTS();
+
+           if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+               pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+       }
+   }
+
    /*
     * Lookup our own database entry; if not found, nothing more to do.
     */
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
 
    if (name)
    {
-       ReplicationSlot *slot;
-
-       /*
-        * Check if the slot exists with the given name. It is possible that by
-        * the time this message is executed the slot is dropped but at least
-        * this check will ensure that the given name is for a valid slot.
-        */
-       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       slot = SearchNamedReplicationSlot(name);
-       LWLockRelease(ReplicationSlotControlLock);
-
-       if (!slot)
-           ereport(ERROR,
-                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                    errmsg("replication slot \"%s\" does not exist",
-                           name)));
-
-       /*
-        * Nothing to do for physical slots as we collect stats only for
-        * logical slots.
-        */
-       if (SlotIsPhysical(slot))
-           return;
-
        namestrcpy(&msg.m_slotname, name);
        msg.clearall = false;
    }
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
 {
    PgStat_MsgReplSlot msg;
 
@@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
     */
    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
    namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
+   msg.m_create = false;
    msg.m_drop = false;
    msg.m_spill_txns = repSlotStat->spill_txns;
    msg.m_spill_count = repSlotStat->spill_count;
@@ -1834,6 +1829,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
    pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_replslot_create() -
+ *
+ * Tell the collector about creating the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_create(const char *slotname)
+{
+   PgStat_MsgReplSlot msg;
+
+   pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+   namestrcpy(&msg.m_slotname, slotname);
+   msg.m_create = true;
+   msg.m_drop = false;
+   pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
 /* ----------
  * pgstat_report_replslot_drop() -
  *
@@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname)
 
    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
    namestrcpy(&msg.m_slotname, slotname);
+   msg.m_create = false;
    msg.m_drop = true;
    pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
@@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void)
  * pgstat_fetch_replslot() -
  *
  * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the replication slot statistics struct and sets the
- * number of entries in nslots_p.
+ * a pointer to the replication slot statistics struct.
  * ---------
  */
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_StatReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
 {
    backend_read_statsfile();
 
-   *nslots_p = nReplSlotStats;
-   return replSlotStats;
+   return pgstat_get_replslot_entry(slotname, false);
 }
 
 /*
@@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
    const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
    const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
    int         rc;
-   int         i;
 
    elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
    /*
     * Write replication slot stats struct
     */
-   for (i = 0; i < nReplSlotStats; i++)
+   if (replSlotStatHash)
    {
-       fputc('R', fpout);
-       rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
-       (void) rc;              /* we'll check for error with ferror */
+       PgStat_StatReplSlotEntry *slotent;
+
+       hash_seq_init(&hstat, replSlotStatHash);
+       while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+       {
+           fputc('R', fpout);
+           rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
+           (void) rc;              /* we'll check for error with ferror */
+       }
    }
 
    /*
@@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
    dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-   /* Allocate the space for replication slot statistics */
-   replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
-                                          max_replication_slots
-                                          * sizeof(PgStat_ReplSlotStats));
-   nReplSlotStats = 0;
-
    /*
     * Clear out global, archiver, WAL and SLRU statistics so they start from
     * zero in case we can't load an existing statsfile.
@@ -4005,12 +4016,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
    for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
        slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
-   /*
-    * Set the same reset timestamp for all replication slots too.
-    */
-   for (i = 0; i < max_replication_slots; i++)
-       replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
    /*
     * Try to open the stats file. If it doesn't exist, the backends simply
     * return zero for anything and the collector simply starts from scratch
@@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
                break;
 
                /*
-                * 'R'  A PgStat_ReplSlotStats struct describing a replication
-                * slot follows.
+                * 'R'  A PgStat_StatReplSlotEntry struct describing a
+                * replication slot follows.
                 */
            case 'R':
-               if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
-                   != sizeof(PgStat_ReplSlotStats))
                {
-                   ereport(pgStatRunningInCollector ? LOG : WARNING,
-                           (errmsg("corrupted statistics file \"%s\"",
-                                   statfile)));
-                   memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-                   goto done;
+                   PgStat_StatReplSlotEntry slotbuf;
+                   PgStat_StatReplSlotEntry *slotent;
+
+                   if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+                       != sizeof(PgStat_StatReplSlotEntry))
+                   {
+                       ereport(pgStatRunningInCollector ? LOG : WARNING,
+                               (errmsg("corrupted statistics file \"%s\"",
+                                       statfile)));
+                       goto done;
+                   }
+
+                   /* Create hash table if we don't have it already. */
+                   if (replSlotStatHash == NULL)
+                   {
+                       HASHCTL     hash_ctl;
+
+                       hash_ctl.keysize = sizeof(NameData);
+                       hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+                       hash_ctl.hcxt = pgStatLocalContext;
+                       replSlotStatHash = hash_create("Replication slots hash",
+                                                      PGSTAT_REPLSLOT_HASH_SIZE,
+                                                      &hash_ctl,
+                                                      HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+                   }
+
+                   slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+                                                                      (void *) &slotbuf.slotname,
+                                                                      HASH_ENTER, NULL);
+                   memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
+                   break;
                }
-               nReplSlotStats++;
-               break;
 
            case 'E':
                goto done;
@@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
    PgStat_ArchiverStats myArchiverStats;
    PgStat_WalStats myWalStats;
    PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
-   PgStat_ReplSlotStats myReplSlotStats;
+   PgStat_StatReplSlotEntry myReplSlotStats;
    PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
    FILE       *fpin;
    int32       format_id;
@@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
                break;
 
                /*
-                * 'R'  A PgStat_ReplSlotStats struct describing a replication
-                * slot follows.
+                * 'R'  A PgStat_StatReplSlotEntry struct describing a
+                * replication slot follows.
                 */
            case 'R':
-               if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
-                   != sizeof(PgStat_ReplSlotStats))
+               if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+                   != sizeof(PgStat_StatReplSlotEntry))
                {
                    ereport(pgStatRunningInCollector ? LOG : WARNING,
                            (errmsg("corrupted statistics file \"%s\"",
@@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void)
    /* Reset variables */
    pgStatLocalContext = NULL;
    pgStatDBHash = NULL;
-   replSlotStats = NULL;
-   nReplSlotStats = 0;
+   replSlotStatHash = NULL;
 
    /*
     * Historically the backend_status.c facilities lived in this file, and
@@ -5189,20 +5215,26 @@ static void
 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
                                 int len)
 {
-   int         i;
-   int         idx = -1;
+   PgStat_StatReplSlotEntry *slotent;
    TimestampTz ts;
 
+   /* Return if we don't have replication slot statistics */
+   if (replSlotStatHash == NULL)
+       return;
+
    ts = GetCurrentTimestamp();
    if (msg->clearall)
    {
-       for (i = 0; i < nReplSlotStats; i++)
-           pgstat_reset_replslot(i, ts);
+       HASH_SEQ_STATUS sstat;
+
+       hash_seq_init(&sstat, replSlotStatHash);
+       while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+           pgstat_reset_replslot(slotent, ts);
    }
    else
    {
-       /* Get the index of replication slot statistics to reset */
-       idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+       /* Get the slot statistics to reset */
+       slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
 
        /*
         * Nothing to do if the given slot entry is not found.  This could
@@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
         * corresponding statistics entry is also removed before receiving the
         * reset message.
         */
-       if (idx < 0)
+       if (!slotent)
            return;
 
        /* Reset the stats for the requested replication slot */
-       pgstat_reset_replslot(idx, ts);
+       pgstat_reset_replslot(slotent, ts);
    }
 }
 
@@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
 static void
 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 {
-   int         idx;
-
-   /*
-    * Get the index of replication slot statistics.  On dropping, we don't
-    * create the new statistics.
-    */
-   idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
-   /*
-    * The slot entry is not found or there is no space to accommodate the new
-    * entry.  This could happen when the message for the creation of a slot
-    * reached before the drop message even though the actual operations
-    * happen in reverse order.  In such a case, the next update of the
-    * statistics for the same slot will create the required entry.
-    */
-   if (idx < 0)
-       return;
-
-   /* it must be a valid replication slot index */
-   Assert(idx < nReplSlotStats);
-
    if (msg->m_drop)
    {
+       Assert(!msg->m_create);
+
        /* Remove the replication slot statistics with the given name */
-       if (idx < nReplSlotStats - 1)
-           memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
-                  sizeof(PgStat_ReplSlotStats));
-       nReplSlotStats--;
+       if (replSlotStatHash != NULL)
+           (void) hash_search(replSlotStatHash,
+                              (void *) &(msg->m_slotname),
+                              HASH_REMOVE,
+                              NULL);
    }
    else
    {
-       /* Update the replication slot statistics */
-       replSlotStats[idx].spill_txns += msg->m_spill_txns;
-       replSlotStats[idx].spill_count += msg->m_spill_count;
-       replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
-       replSlotStats[idx].stream_txns += msg->m_stream_txns;
-       replSlotStats[idx].stream_count += msg->m_stream_count;
-       replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
-       replSlotStats[idx].total_txns += msg->m_total_txns;
-       replSlotStats[idx].total_bytes += msg->m_total_bytes;
+       PgStat_StatReplSlotEntry *slotent;
+
+       slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+       Assert(slotent);
+
+       if (msg->m_create)
+       {
+           /*
+            * If the message for dropping the slot with the same name gets
+            * lost, slotent has stats for the old slot. So we initialize all
+            * counters at slot creation.
+            */
+           pgstat_reset_replslot(slotent, 0);
+       }
+       else
+       {
+           /* Update the replication slot statistics */
+           slotent->spill_txns += msg->m_spill_txns;
+           slotent->spill_count += msg->m_spill_count;
+           slotent->spill_bytes += msg->m_spill_bytes;
+           slotent->stream_txns += msg->m_stream_txns;
+           slotent->stream_count += msg->m_stream_count;
+           slotent->stream_bytes += msg->m_stream_bytes;
+           slotent->total_txns += msg->m_total_txns;
+           slotent->total_bytes += msg->m_total_bytes;
+       }
    }
 }
 
@@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid)
 }
 
 /* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
  *
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
  *
- * create_it tells whether to create the new slot entry if it is not found.
+ * create tells whether to create the new slot entry if it is not found.
  * ----------
  */
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_StatReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create)
 {
-   int         i;
+   PgStat_StatReplSlotEntry *slotent;
+   bool        found;
 
-   Assert(nReplSlotStats <= max_replication_slots);
-   for (i = 0; i < nReplSlotStats; i++)
+   if (replSlotStatHash == NULL)
    {
-       if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
-           return i;           /* found */
+       HASHCTL     hash_ctl;
+
+       /*
+        * Quick return NULL if the hash table is empty and the caller didn't
+        * request to create the entry.
+        */
+       if (!create)
+           return NULL;
+
+       hash_ctl.keysize = sizeof(NameData);
+       hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+       replSlotStatHash = hash_create("Replication slots hash",
+                                      PGSTAT_REPLSLOT_HASH_SIZE,
+                                      &hash_ctl,
+                                      HASH_ELEM | HASH_BLOBS);
    }
 
-   /*
-    * The slot is not found.  We don't want to register the new statistics if
-    * the list is already full or the caller didn't request.
-    */
-   if (i == max_replication_slots || !create_it)
-       return -1;
+   slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+                                                      (void *) &name,
+                                                      create ? HASH_ENTER : HASH_FIND,
+                                                      &found);
 
-   /* Register new slot */
-   memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-   namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+   if (!slotent)
+   {
+       /* not found */
+       Assert(!create && !found);
+       return NULL;
+   }
+
+   /* initialize the entry */
+   if (create && !found)
+   {
+       namestrcpy(&(slotent->slotname), NameStr(name));
+       pgstat_reset_replslot(slotent, 0);
+   }
 
-   return nReplSlotStats++;
+   return slotent;
 }
 
 /* ----------
  * pgstat_reset_replslot
  *
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
  * ----------
  */
 static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 {
    /* reset only counters. Don't clear slot name */
-   replSlotStats[i].spill_txns = 0;
-   replSlotStats[i].spill_count = 0;
-   replSlotStats[i].spill_bytes = 0;
-   replSlotStats[i].stream_txns = 0;
-   replSlotStats[i].stream_count = 0;
-   replSlotStats[i].stream_bytes = 0;
-   replSlotStats[i].total_txns = 0;
-   replSlotStats[i].total_bytes = 0;
-   replSlotStats[i].stat_reset_timestamp = ts;
+   slotent->spill_txns = 0;
+   slotent->spill_count = 0;
+   slotent->spill_bytes = 0;
+   slotent->stream_txns = 0;
+   slotent->stream_count = 0;
+   slotent->stream_bytes = 0;
+   slotent->total_txns = 0;
+   slotent->total_bytes = 0;
+   slotent->stat_reset_timestamp = ts;
 }
 
 /*
index 35b0c67641291c7a012e7704079931072f57784d..00543ede45a00b6ca2513da9994d284ea795f520 100644 (file)
@@ -1773,7 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
    ReorderBuffer *rb = ctx->reorder;
-   PgStat_ReplSlotStats repSlotStat;
+   PgStat_StatReplSlotEntry repSlotStat;
 
    /* Nothing to do if we don't have any replication stats to be sent. */
    if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
index f61b163f78d0e1619914654da4a4217934a1cef3..cf261e200e4bcb908a91dd84fdfeedf1c803396c 100644 (file)
@@ -328,12 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
     * ReplicationSlotAllocationLock.
     */
    if (SlotIsLogical(slot))
-   {
-       PgStat_ReplSlotStats repSlotStat;
-       MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
-       namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
-       pgstat_report_replslot(&repSlotStat);
-   }
+       pgstat_report_replslot_create(NameStr(slot->data.name));
 
    /*
     * Now that the slot has been marked as in_use and active, it's safe to
@@ -349,17 +344,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Search for the named replication slot.
  *
  * Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
  */
 ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
 {
    int         i;
-   ReplicationSlot *slot = NULL;
+   ReplicationSlot *slot = NULL;
 
-   Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
-                               LW_SHARED));
+   if (need_lock)
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
    for (i = 0; i < max_replication_slots; i++)
    {
@@ -372,6 +365,9 @@ SearchNamedReplicationSlot(const char *name)
        }
    }
 
+   if (need_lock)
+       LWLockRelease(ReplicationSlotControlLock);
+
    return slot;
 }
 
@@ -416,7 +412,7 @@ retry:
     * Search for the slot with the specified name if the slot to acquire is
     * not given. If the slot is not found, we either return -1 or error out.
     */
-   s = slot ? slot : SearchNamedReplicationSlot(name);
+   s = slot ? slot : SearchNamedReplicationSlot(name, false);
    if (s == NULL || !s->in_use)
    {
        LWLockRelease(ReplicationSlotControlLock);
@@ -713,6 +709,12 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
     * reduce that possibility. If the messages reached in reverse, we would
     * lose one statistics update message. But the next update message will
     * create the statistics for the replication slot.
+    *
+    * XXX In case, the messages for creation and drop slot of the same name
+    * get lost and create happens before (auto)vacuum cleans up the dead
+    * slot, the stats will be accumulated into the old slot. One can imagine
+    * having OIDs for each slot to avoid the accumulation of stats but that
+    * doesn't seem worth doing as in practice this won't happen frequently.
     */
    if (SlotIsLogical(slot))
        pgstat_report_replslot_drop(NameStr(slot->data.name));
index 87f02d572e65d3e515864561ce2450cc33e86bef..14056f53471d365f9065eb94a93f284214fad346 100644 (file)
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -2207,8 +2208,33 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
    char       *target = NULL;
 
    if (!PG_ARGISNULL(0))
+   {
+       ReplicationSlot *slot;
+
        target = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
+       /*
+        * Check if the slot exists with the given name. It is possible that
+        * by the time this message is executed the slot is dropped but at
+        * least this check will ensure that the given name is for a valid
+        * slot.
+        */
+       slot = SearchNamedReplicationSlot(target, true);
+
+       if (!slot)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg("replication slot \"%s\" does not exist",
+                           target)));
+
+       /*
+        * Nothing to do for physical slots as we collect stats only for
+        * logical slots.
+        */
+       if (SlotIsPhysical(slot))
+           PG_RETURN_VOID();
+   }
+
    pgstat_reset_replslot_counter(target);
 
    PG_RETURN_VOID();
@@ -2280,73 +2306,77 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
    PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/* Get the statistics for the replication slots */
+/*
+ * Get the statistics for the replication slot. If the slot statistics is not
+ * available, return all-zeroes stats.
+ */
 Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_REPLICATION_SLOT_COLS 10
-   ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+   text       *slotname_text = PG_GETARG_TEXT_P(0);
+   NameData    slotname;
    TupleDesc   tupdesc;
-   Tuplestorestate *tupstore;
-   MemoryContext per_query_ctx;
-   MemoryContext oldcontext;
-   PgStat_ReplSlotStats *slotstats;
-   int         nstats;
-   int         i;
+   Datum       values[10];
+   bool        nulls[10];
+   PgStat_StatReplSlotEntry *slotent;
+   PgStat_StatReplSlotEntry allzero;
 
-   /* check to see if caller supports us returning a tuplestore */
-   if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("set-valued function called in context that cannot accept a set")));
-   if (!(rsinfo->allowedModes & SFRM_Materialize))
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("materialize mode required, but it is not allowed in this context")));
-
-   /* Build a tuple descriptor for our result type */
-   if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-       elog(ERROR, "return type must be a row type");
-
-   per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
-   oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
-   tupstore = tuplestore_begin_heap(true, false, work_mem);
-   rsinfo->returnMode = SFRM_Materialize;
-   rsinfo->setResult = tupstore;
-   rsinfo->setDesc = tupdesc;
+   /* Initialise values and NULL flags arrays */
+   MemSet(values, 0, sizeof(values));
+   MemSet(nulls, 0, sizeof(nulls));
 
-   MemoryContextSwitchTo(oldcontext);
+   /* Initialise attributes information in the tuple descriptor */
+   tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+                      TEXTOID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+                      TIMESTAMPTZOID, -1, 0);
+   BlessTupleDesc(tupdesc);
 
-   slotstats = pgstat_fetch_replslot(&nstats);
-   for (i = 0; i < nstats; i++)
+   namestrcpy(&slotname, text_to_cstring(slotname_text));
+   slotent = pgstat_fetch_replslot(slotname);
+   if (!slotent)
    {
-       Datum       values[PG_STAT_GET_REPLICATION_SLOT_COLS];
-       bool        nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
-       PgStat_ReplSlotStats *s = &(slotstats[i]);
-
-       MemSet(values, 0, sizeof(values));
-       MemSet(nulls, 0, sizeof(nulls));
-
-       values[0] = CStringGetTextDatum(NameStr(s->slotname));
-       values[1] = Int64GetDatum(s->spill_txns);
-       values[2] = Int64GetDatum(s->spill_count);
-       values[3] = Int64GetDatum(s->spill_bytes);
-       values[4] = Int64GetDatum(s->stream_txns);
-       values[5] = Int64GetDatum(s->stream_count);
-       values[6] = Int64GetDatum(s->stream_bytes);
-       values[7] = Int64GetDatum(s->total_txns);
-       values[8] = Int64GetDatum(s->total_bytes);
-
-       if (s->stat_reset_timestamp == 0)
-           nulls[9] = true;
-       else
-           values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
-
-       tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       /*
+        * If the slot is not found, initialise its stats. This is possible if
+        * the create slot message is lost.
+        */
+       memset(&allzero, 0, sizeof(PgStat_StatReplSlotEntry));
+       slotent = &allzero;
    }
 
-   tuplestore_donestoring(tupstore);
+   values[0] = CStringGetTextDatum(NameStr(slotname));
+   values[1] = Int64GetDatum(slotent->spill_txns);
+   values[2] = Int64GetDatum(slotent->spill_count);
+   values[3] = Int64GetDatum(slotent->spill_bytes);
+   values[4] = Int64GetDatum(slotent->stream_txns);
+   values[5] = Int64GetDatum(slotent->stream_count);
+   values[6] = Int64GetDatum(slotent->stream_bytes);
+   values[7] = Int64GetDatum(slotent->total_txns);
+   values[8] = Int64GetDatum(slotent->total_bytes);
 
-   return (Datum) 0;
+   if (slotent->stat_reset_timestamp == 0)
+       nulls[9] = true;
+   else
+       values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+
+   /* Returns the record as Datum */
+   PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
index ba1a0d03333f86f3fd37c4c275a0372cff6ddd3c..22dcd0a270c6ffc1e21edbd9c684545f0b75cf21 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202104231
+#define CATALOG_VERSION_NO 202104271
 
 #endif
index db1abc149c6f2db4d4eb3e6b1aaadce8be7e2a19..91f0ea2212c7de20727bc528798a4d0c3535e274 100644 (file)
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
-  proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+  proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
-  prosrc => 'pg_stat_get_replication_slots' },
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
index 5c5920b0b5f0dd6471bf4f4ed4524a5c1097f0c9..1ce363e7d181d5b725aa431f5cdff733197d18e9 100644 (file)
@@ -541,6 +541,7 @@ typedef struct PgStat_MsgReplSlot
 {
    PgStat_MsgHdr m_hdr;
    NameData    m_slotname;
+   bool        m_create;
    bool        m_drop;
    PgStat_Counter m_spill_txns;
    PgStat_Counter m_spill_count;
@@ -917,7 +918,7 @@ typedef struct PgStat_SLRUStats
 /*
  * Replication slot statistics kept in the stats collector
  */
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_StatReplSlotEntry
 {
    NameData    slotname;
    PgStat_Counter spill_txns;
@@ -929,7 +930,7 @@ typedef struct PgStat_ReplSlotStats
    PgStat_Counter total_txns;
    PgStat_Counter total_bytes;
    TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_StatReplSlotEntry;
 
 
 /*
@@ -1031,7 +1032,8 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
+extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
@@ -1129,7 +1131,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
 extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
index 1ad5e6c50dffd698cbf3160f43a5f8b587c8a2b8..357068403a11e9997089c62d1afdfda04860127f 100644 (file)
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
index 6dff5439e00358127369e07a1b3e21178c297a64..572bc2057cc2749ad521055ce1746fb4ffff741b 100644 (file)
@@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.total_txns,
     s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
+   FROM pg_replication_slots r,
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+  WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
index c7aff677d4bb3e854237083501617108bb36720c..878b67a276d43bf5830e3cffc4b9a7550890de47 100644 (file)
@@ -1870,12 +1870,12 @@ PgStat_MsgTabstat
 PgStat_MsgTempFile
 PgStat_MsgVacuum
 PgStat_MsgWal
-PgStat_ReplSlotStats
 PgStat_SLRUStats
 PgStat_Shared_Reset_Target
 PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
+PgStat_StatReplSlotEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts