Buffer childbuf, GinStatsData *buildStats);
static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
bool freestack, GinStatsData *buildStats);
+static void ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack,
+ GinStatsData *buildStats, int access);
/*
* Lock buffer by needed method for search.
* encounter on the way.
*/
if (!searchMode && GinPageIsIncompleteSplit(page))
- ginFinishSplit(btree, stack, false, NULL);
+ ginFinishOldSplit(btree, stack, NULL, access);
/*
* ok, page is correctly locked, we should check to move right ..,
TestForOldSnapshot(snapshot, btree->index, page);
if (!searchMode && GinPageIsIncompleteSplit(page))
- ginFinishSplit(btree, stack, false, NULL);
+ ginFinishOldSplit(btree, stack, NULL, access);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
* Step right from current page.
*
* The next page is locked first, before releasing the current page. This is
- * crucial to protect from concurrent page deletion (see comment in
- * ginDeletePage).
+ * crucial to prevent concurrent VACUUM from deleting a page that we are about
+ * to step to. (The lock-coupling isn't strictly necessary when we are
+ * traversing the tree to find an insert location, because page deletion grabs
+ * a cleanup lock on the root to prevent any concurrent inserts. See Page
+ * deletion section in the README. But there's no harm in doing it always.)
*/
Buffer
ginStepRight(Buffer buffer, Relation index, int lockmode)
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
- ginFinishSplit(btree, ptr, false, NULL);
+ ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
leftmostBlkno = btree->getLeftMostChild(btree, page);
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
- ginFinishSplit(btree, ptr, false, NULL);
+ ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
}
bool done;
bool first = true;
- /*
- * freestack == false when we encounter an incompletely split page during
- * a scan, while freestack == true is used in the normal scenario that a
- * split is finished right after the initial insert.
- */
- if (!freestack)
- elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
- stack->blkno, RelationGetRelationName(btree->index));
-
/* this loop crawls up the stack until the insertion is complete */
do
{
* would fail.
*/
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
- ginFinishSplit(btree, parent, false, buildStats);
+ ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
page = BufferGetPage(parent->buffer);
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
- ginFinishSplit(btree, parent, false, buildStats);
+ ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
}
/* insert the downlink */
freeGinBtreeStack(stack);
}
+/*
+ * An entry point to ginFinishSplit() that is used when we stumble upon an
+ * existing incompletely split page in the tree, as opposed to completing a
+ * split that we just made outselves. The difference is that stack->buffer may
+ * be merely share-locked on entry, and will be upgraded to exclusive mode.
+ *
+ * Note: Upgrading the lock momentarily releases it. Doing that in a scan
+ * would not be OK, because a concurrent VACUUM might delete the page while
+ * we're not holding the lock. It's OK in an insert, though, because VACUUM
+ * has a different mechanism that prevents it from running concurrently with
+ * inserts. (Namely, it holds a cleanup lock on the root.)
+ */
+static void
+ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats, int access)
+{
+ elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
+ stack->blkno, RelationGetRelationName(btree->index));
+
+ if (access == GIN_SHARE)
+ {
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ LockBuffer(stack->buffer, GIN_EXCLUSIVE);
+
+ if (!GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
+ {
+ /*
+ * Someone else already completed the split while we were not
+ * holding the lock.
+ */
+ return;
+ }
+ }
+
+ ginFinishSplit(btree, stack, false, buildStats);
+}
+
/*
* Insert a value to tree described by stack.
*
/* If the leaf page was incompletely split, finish the split first */
if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
- ginFinishSplit(btree, stack, false, buildStats);
+ ginFinishOldSplit(btree, stack, buildStats, GIN_EXCLUSIVE);
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,