Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Improve sift up/down code in binaryheap.c and logtape.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 14 Dec 2021 18:35:22 +0000 (13:35 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 14 Dec 2021 18:35:22 +0000 (13:35 -0500)
Borrow the logic that's long been used in tuplesort.c: instead
of physically swapping the data in two heap entries, keep the
value that's being sifted up or down in a local variable, and
just move the other values as necessary.  This makes the code
shorter as well as faster.  It's not clear that any current
callers are really time-critical enough to notice, but we
might as well code heap maintenance the same way everywhere.

Ma Liangzhu and Tom Lane

Discussion: https://postgr.es/m/17336-fc4e522d26a750fd@postgresql.org

src/backend/lib/binaryheap.c
src/backend/utils/sort/logtape.c

index d54e24529919483e6e7e13dd2ad1f6dd57523ab6..4198d9fcd9d13d4beb460650630e02a96c9212c8 100644 (file)
@@ -19,7 +19,6 @@
 
 static void sift_down(binaryheap *heap, int node_off);
 static void sift_up(binaryheap *heap, int node_off);
-static inline void swap_nodes(binaryheap *heap, int a, int b);
 
 /*
  * binaryheap_allocate
@@ -173,24 +172,28 @@ binaryheap_first(binaryheap *heap)
 Datum
 binaryheap_remove_first(binaryheap *heap)
 {
+   Datum       result;
+
    Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
+   /* extract the root node, which will be the result */
+   result = heap->bh_nodes[0];
+
+   /* easy if heap contains one element */
    if (heap->bh_size == 1)
    {
        heap->bh_size--;
-       return heap->bh_nodes[0];
+       return result;
    }
 
    /*
-    * Swap the root and last nodes, decrease the size of the heap (i.e.
-    * remove the former root node) and sift the new root node down to its
-    * correct position.
+    * Remove the last node, placing it in the vacated root entry, and sift
+    * the new root node down to its correct position.
     */
-   swap_nodes(heap, 0, heap->bh_size - 1);
-   heap->bh_size--;
+   heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size];
    sift_down(heap, 0);
 
-   return heap->bh_nodes[heap->bh_size];
+   return result;
 }
 
 /*
@@ -211,19 +214,6 @@ binaryheap_replace_first(binaryheap *heap, Datum d)
        sift_down(heap, 0);
 }
 
-/*
- * Swap the contents of two nodes.
- */
-static inline void
-swap_nodes(binaryheap *heap, int a, int b)
-{
-   Datum       swap;
-
-   swap = heap->bh_nodes[a];
-   heap->bh_nodes[a] = heap->bh_nodes[b];
-   heap->bh_nodes[b] = swap;
-}
-
 /*
  * Sift a node up to the highest position it can hold according to the
  * comparator.
@@ -231,29 +221,40 @@ swap_nodes(binaryheap *heap, int a, int b)
 static void
 sift_up(binaryheap *heap, int node_off)
 {
+   Datum       node_val = heap->bh_nodes[node_off];
+
+   /*
+    * Within the loop, the node_off'th array entry is a "hole" that
+    * notionally holds node_val, but we don't actually store node_val there
+    * till the end, saving some unnecessary data copying steps.
+    */
    while (node_off != 0)
    {
        int         cmp;
        int         parent_off;
+       Datum       parent_val;
 
        /*
         * If this node is smaller than its parent, the heap condition is
         * satisfied, and we're done.
         */
        parent_off = parent_offset(node_off);
-       cmp = heap->bh_compare(heap->bh_nodes[node_off],
-                              heap->bh_nodes[parent_off],
+       parent_val = heap->bh_nodes[parent_off];
+       cmp = heap->bh_compare(node_val,
+                              parent_val,
                               heap->bh_arg);
        if (cmp <= 0)
            break;
 
        /*
-        * Otherwise, swap the node and its parent and go on to check the
-        * node's new parent.
+        * Otherwise, swap the parent value with the hole, and go on to check
+        * the node's new parent.
         */
-       swap_nodes(heap, node_off, parent_off);
+       heap->bh_nodes[node_off] = parent_val;
        node_off = parent_off;
    }
+   /* Re-fill the hole */
+   heap->bh_nodes[node_off] = node_val;
 }
 
 /*
@@ -263,6 +264,13 @@ sift_up(binaryheap *heap, int node_off)
 static void
 sift_down(binaryheap *heap, int node_off)
 {
+   Datum       node_val = heap->bh_nodes[node_off];
+
+   /*
+    * Within the loop, the node_off'th array entry is a "hole" that
+    * notionally holds node_val, but we don't actually store node_val there
+    * till the end, saving some unnecessary data copying steps.
+    */
    while (true)
    {
        int         left_off = left_offset(node_off);
@@ -271,14 +279,14 @@ sift_down(binaryheap *heap, int node_off)
 
        /* Is the left child larger than the parent? */
        if (left_off < heap->bh_size &&
-           heap->bh_compare(heap->bh_nodes[node_off],
+           heap->bh_compare(node_val,
                             heap->bh_nodes[left_off],
                             heap->bh_arg) < 0)
            swap_off = left_off;
 
        /* Is the right child larger than the parent? */
        if (right_off < heap->bh_size &&
-           heap->bh_compare(heap->bh_nodes[node_off],
+           heap->bh_compare(node_val,
                             heap->bh_nodes[right_off],
                             heap->bh_arg) < 0)
        {
@@ -298,10 +306,12 @@ sift_down(binaryheap *heap, int node_off)
            break;
 
        /*
-        * Otherwise, swap the node with the child that violates the heap
+        * Otherwise, swap the hole with the child that violates the heap
         * property; then go on to check its children.
         */
-       swap_nodes(heap, swap_off, node_off);
+       heap->bh_nodes[node_off] = heap->bh_nodes[swap_off];
        node_off = swap_off;
    }
+   /* Re-fill the hole */
+   heap->bh_nodes[node_off] = node_val;
 }
index 722708237b1cb504bb67eafba2c7f0d760b3d147..9366cb7e252e51f985a7dc11cc2dcb2f192ffd6b 100644 (file)
@@ -340,16 +340,6 @@ ltsReadFillBuffer(LogicalTape *lt)
    return (lt->nbytes > 0);
 }
 
-static inline void
-swap_nodes(long *heap, unsigned long a, unsigned long b)
-{
-   long        swap;
-
-   swap = heap[a];
-   heap[a] = heap[b];
-   heap[b] = swap;
-}
-
 static inline unsigned long
 left_offset(unsigned long i)
 {
@@ -390,31 +380,33 @@ ltsGetFreeBlock(LogicalTapeSet *lts)
    long       *heap = lts->freeBlocks;
    long        blocknum;
    int         heapsize;
-   unsigned long pos;
+   long        holeval;
+   unsigned long holepos;
 
    /* freelist empty; allocate a new block */
    if (lts->nFreeBlocks == 0)
        return lts->nBlocksAllocated++;
 
+   /* easy if heap contains one element */
    if (lts->nFreeBlocks == 1)
    {
        lts->nFreeBlocks--;
        return lts->freeBlocks[0];
    }
 
-   /* take top of minheap */
+   /* remove top of minheap */
    blocknum = heap[0];
 
-   /* replace with end of minheap array */
-   heap[0] = heap[--lts->nFreeBlocks];
+   /* we'll replace it with end of minheap array */
+   holeval = heap[--lts->nFreeBlocks];
 
    /* sift down */
-   pos = 0;
+   holepos = 0;                /* holepos is where the "hole" is */
    heapsize = lts->nFreeBlocks;
    while (true)
    {
-       unsigned long left = left_offset(pos);
-       unsigned long right = right_offset(pos);
+       unsigned long left = left_offset(holepos);
+       unsigned long right = right_offset(holepos);
        unsigned long min_child;
 
        if (left < heapsize && right < heapsize)
@@ -426,12 +418,13 @@ ltsGetFreeBlock(LogicalTapeSet *lts)
        else
            break;
 
-       if (heap[min_child] >= heap[pos])
+       if (heap[min_child] >= holeval)
            break;
 
-       swap_nodes(heap, min_child, pos);
-       pos = min_child;
+       heap[holepos] = heap[min_child];
+       holepos = min_child;
    }
+   heap[holepos] = holeval;
 
    return blocknum;
 }
@@ -483,7 +476,7 @@ static void
 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 {
    long       *heap;
-   unsigned long pos;
+   unsigned long holepos;
 
    /*
     * Do nothing if we're no longer interested in remembering free space.
@@ -508,24 +501,23 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
                                            lts->freeBlocksLen * sizeof(long));
    }
 
+   /* create a "hole" at end of minheap array */
    heap = lts->freeBlocks;
-   pos = lts->nFreeBlocks;
-
-   /* place entry at end of minheap array */
-   heap[pos] = blocknum;
+   holepos = lts->nFreeBlocks;
    lts->nFreeBlocks++;
 
-   /* sift up */
-   while (pos != 0)
+   /* sift up to insert blocknum */
+   while (holepos != 0)
    {
-       unsigned long parent = parent_offset(pos);
+       unsigned long parent = parent_offset(holepos);
 
-       if (heap[parent] < heap[pos])
+       if (heap[parent] < blocknum)
            break;
 
-       swap_nodes(heap, parent, pos);
-       pos = parent;
+       heap[holepos] = heap[parent];
+       holepos = parent;
    }
+   heap[holepos] = blocknum;
 }
 
 /*