Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Improve and cleanup ProcArrayAdd(), ProcArrayRemove().
authorAndres Freund <andres@anarazel.de>
Sat, 12 Jun 2021 04:22:21 +0000 (21:22 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 12 Jun 2021 04:33:07 +0000 (21:33 -0700)
941697c3c1a changed ProcArrayAdd()/Remove() substantially. As reported by
zhanyi, I missed that due to the introduction of the PGPROC->pgxactoff
ProcArrayRemove() does not need to search for the position in
procArray->pgprocnos anymore - that's pgxactoff. Remove the search loop.

The removal of the search loop reduces assertion coverage a bit - but we can
easily do better than before by adding assertions to other loops over
pgprocnos elements.

Also do a bit of cleanup, mostly by reducing line lengths by introducing local
helper variables and adding newlines.

Author: zhanyi <w@hidva.com>
Author: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/tencent_5624AA3B116B3D1C31CA9744@qq.com

src/backend/storage/ipc/procarray.c

index 42a89fc5dc9ff255c7a114a395d819390add9b3e..085bd1e40778ed4a56b812e3838e3ddf193130dd 100644 (file)
@@ -446,6 +446,7 @@ ProcArrayAdd(PGPROC *proc)
 {
    ProcArrayStruct *arrayP = procArray;
    int         index;
+   int         movecount;
 
    /* See ProcGlobal comment explaining why both locks are held */
    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -474,33 +475,48 @@ ProcArrayAdd(PGPROC *proc)
     */
    for (index = 0; index < arrayP->numProcs; index++)
    {
-       /*
-        * If we are the first PGPROC or if we have found our right position
-        * in the array, break
-        */
-       if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
+       int         procno PG_USED_FOR_ASSERTS_ONLY = arrayP->pgprocnos[index];
+
+       Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+       Assert(allProcs[procno].pgxactoff == index);
+
+       /* If we have found our right position in the array, break */
+       if (arrayP->pgprocnos[index] > proc->pgprocno)
            break;
    }
 
-   memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
-           (arrayP->numProcs - index) * sizeof(*arrayP->pgprocnos));
-   memmove(&ProcGlobal->xids[index + 1], &ProcGlobal->xids[index],
-           (arrayP->numProcs - index) * sizeof(*ProcGlobal->xids));
-   memmove(&ProcGlobal->subxidStates[index + 1], &ProcGlobal->subxidStates[index],
-           (arrayP->numProcs - index) * sizeof(*ProcGlobal->subxidStates));
-   memmove(&ProcGlobal->statusFlags[index + 1], &ProcGlobal->statusFlags[index],
-           (arrayP->numProcs - index) * sizeof(*ProcGlobal->statusFlags));
+   movecount = arrayP->numProcs - index;
+   memmove(&arrayP->pgprocnos[index + 1],
+           &arrayP->pgprocnos[index],
+           movecount * sizeof(*arrayP->pgprocnos));
+   memmove(&ProcGlobal->xids[index + 1],
+           &ProcGlobal->xids[index],
+           movecount * sizeof(*ProcGlobal->xids));
+   memmove(&ProcGlobal->subxidStates[index + 1],
+           &ProcGlobal->subxidStates[index],
+           movecount * sizeof(*ProcGlobal->subxidStates));
+   memmove(&ProcGlobal->statusFlags[index + 1],
+           &ProcGlobal->statusFlags[index],
+           movecount * sizeof(*ProcGlobal->statusFlags));
 
    arrayP->pgprocnos[index] = proc->pgprocno;
+   proc->pgxactoff = index;
    ProcGlobal->xids[index] = proc->xid;
    ProcGlobal->subxidStates[index] = proc->subxidStatus;
    ProcGlobal->statusFlags[index] = proc->statusFlags;
 
    arrayP->numProcs++;
 
+   /* adjust pgxactoff for all following PGPROCs */
+   index++;
    for (; index < arrayP->numProcs; index++)
    {
-       allProcs[arrayP->pgprocnos[index]].pgxactoff = index;
+       int         procno = arrayP->pgprocnos[index];
+
+       Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+       Assert(allProcs[procno].pgxactoff == index - 1);
+
+       allProcs[procno].pgxactoff = index;
    }
 
    /*
@@ -525,7 +541,8 @@ void
 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 {
    ProcArrayStruct *arrayP = procArray;
-   int         index;
+   int         myoff;
+   int         movecount;
 
 #ifdef XIDCACHE_DEBUG
    /* dump stats at backend shutdown, but not prepared-xact end */
@@ -537,11 +554,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
-   Assert(ProcGlobal->allProcs[arrayP->pgprocnos[proc->pgxactoff]].pgxactoff == proc->pgxactoff);
+   myoff = proc->pgxactoff;
+
+   Assert(myoff >= 0 && myoff < arrayP->numProcs);
+   Assert(ProcGlobal->allProcs[arrayP->pgprocnos[myoff]].pgxactoff == myoff);
 
    if (TransactionIdIsValid(latestXid))
    {
-       Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
+       Assert(TransactionIdIsValid(ProcGlobal->xids[myoff]));
 
        /* Advance global latestCompletedXid while holding the lock */
        MaintainLatestCompletedXid(latestXid);
@@ -549,57 +569,60 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
        /* Same with xactCompletionCount  */
        ShmemVariableCache->xactCompletionCount++;
 
-       ProcGlobal->xids[proc->pgxactoff] = 0;
-       ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
-       ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
+       ProcGlobal->xids[myoff] = InvalidTransactionId;
+       ProcGlobal->subxidStates[myoff].overflowed = false;
+       ProcGlobal->subxidStates[myoff].count = 0;
    }
    else
    {
        /* Shouldn't be trying to remove a live transaction here */
-       Assert(!TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
+       Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
    }
 
-   Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff] == 0));
-   Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].count == 0));
-   Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].overflowed == false));
-   ProcGlobal->statusFlags[proc->pgxactoff] = 0;
+   Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
+   Assert(ProcGlobal->subxidStates[myoff].count == 0);
+   Assert(ProcGlobal->subxidStates[myoff].overflowed == false);
+
+   ProcGlobal->statusFlags[myoff] = 0;
+
+   /* Keep the PGPROC array sorted. See notes above */
+   movecount = arrayP->numProcs - myoff - 1;
+   memmove(&arrayP->pgprocnos[myoff],
+           &arrayP->pgprocnos[myoff + 1],
+           movecount * sizeof(*arrayP->pgprocnos));
+   memmove(&ProcGlobal->xids[myoff],
+           &ProcGlobal->xids[myoff + 1],
+           movecount * sizeof(*ProcGlobal->xids));
+   memmove(&ProcGlobal->subxidStates[myoff],
+           &ProcGlobal->subxidStates[myoff + 1],
+           movecount * sizeof(*ProcGlobal->subxidStates));
+   memmove(&ProcGlobal->statusFlags[myoff],
+           &ProcGlobal->statusFlags[myoff + 1],
+           movecount * sizeof(*ProcGlobal->statusFlags));
+
+   arrayP->pgprocnos[arrayP->numProcs - 1] = -1;   /* for debugging */
+   arrayP->numProcs--;
 
-   for (index = 0; index < arrayP->numProcs; index++)
+   /*
+    * Adjust pgxactoff of following procs for removed PGPROC (note that
+    * numProcs already has been decremented).
+    */
+   for (int index = myoff; index < arrayP->numProcs; index++)
    {
-       if (arrayP->pgprocnos[index] == proc->pgprocno)
-       {
-           /* Keep the PGPROC array sorted. See notes above */
-           memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
-                   (arrayP->numProcs - index - 1) * sizeof(*arrayP->pgprocnos));
-           memmove(&ProcGlobal->xids[index], &ProcGlobal->xids[index + 1],
-                   (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->xids));
-           memmove(&ProcGlobal->subxidStates[index], &ProcGlobal->subxidStates[index + 1],
-                   (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->subxidStates));
-           memmove(&ProcGlobal->statusFlags[index], &ProcGlobal->statusFlags[index + 1],
-                   (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->statusFlags));
-
-           arrayP->pgprocnos[arrayP->numProcs - 1] = -1;   /* for debugging */
-           arrayP->numProcs--;
-
-           /* adjust for removed PGPROC */
-           for (; index < arrayP->numProcs; index++)
-               allProcs[arrayP->pgprocnos[index]].pgxactoff--;
+       int         procno = arrayP->pgprocnos[index];
 
-           /*
-            * Release in reversed acquisition order, to reduce frequency of
-            * having to wait for XidGenLock while holding ProcArrayLock.
-            */
-           LWLockRelease(XidGenLock);
-           LWLockRelease(ProcArrayLock);
-           return;
-       }
+       Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+       Assert(allProcs[procno].pgxactoff - 1 == index);
+
+       allProcs[procno].pgxactoff = index;
    }
 
-   /* Oops */
+   /*
+    * Release in reversed acquisition order, to reduce frequency of having to
+    * wait for XidGenLock while holding ProcArrayLock.
+    */
    LWLockRelease(XidGenLock);
    LWLockRelease(ProcArrayLock);
-
-   elog(LOG, "failed to find proc %p in ProcArray", proc);
 }