@@ -134,14 +134,17 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
134
134
* To avoid complexity in the callers, either buffer1 or buffer2 may be
135
135
* InvalidBuffer if only one buffer is involved. For the same reason, block2
136
136
* may be smaller than block1.
137
+ *
138
+ * Returns whether buffer locks were temporarily released.
137
139
*/
138
- static void
140
+ static bool
139
141
GetVisibilityMapPins (Relation relation , Buffer buffer1 , Buffer buffer2 ,
140
142
BlockNumber block1 , BlockNumber block2 ,
141
143
Buffer * vmbuffer1 , Buffer * vmbuffer2 )
142
144
{
143
145
bool need_to_pin_buffer1 ;
144
146
bool need_to_pin_buffer2 ;
147
+ bool released_locks = false;
145
148
146
149
/*
147
150
* Swap buffers around to handle case of a single block/buffer, and to
@@ -175,9 +178,10 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
175
178
&& PageIsAllVisible (BufferGetPage (buffer2 ))
176
179
&& !visibilitymap_pin_ok (block2 , * vmbuffer2 );
177
180
if (!need_to_pin_buffer1 && !need_to_pin_buffer2 )
178
- return ;
181
+ break ;
179
182
180
183
/* We must unlock both buffers before doing any I/O. */
184
+ released_locks = true;
181
185
LockBuffer (buffer1 , BUFFER_LOCK_UNLOCK );
182
186
if (buffer2 != InvalidBuffer && buffer2 != buffer1 )
183
187
LockBuffer (buffer2 , BUFFER_LOCK_UNLOCK );
@@ -203,6 +207,8 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
203
207
|| (need_to_pin_buffer1 && need_to_pin_buffer2 ))
204
208
break ;
205
209
}
210
+
211
+ return released_locks ;
206
212
}
207
213
208
214
/*
@@ -365,6 +371,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
365
371
BlockNumber targetBlock ,
366
372
otherBlock ;
367
373
bool needLock ;
374
+ bool unlockedTargetBuffer ;
375
+ bool recheckVmPins ;
368
376
369
377
len = MAXALIGN (len ); /* be conservative */
370
378
@@ -645,6 +653,9 @@ RelationGetBufferForTuple(Relation relation, Size len,
645
653
if (needLock )
646
654
UnlockRelationForExtension (relation , ExclusiveLock );
647
655
656
+ unlockedTargetBuffer = false;
657
+ targetBlock = BufferGetBlockNumber (buffer );
658
+
648
659
/*
649
660
* We need to initialize the empty new page. Double-check that it really
650
661
* is empty (this should never happen, but if it does we don't want to
@@ -654,75 +665,108 @@ RelationGetBufferForTuple(Relation relation, Size len,
654
665
655
666
if (!PageIsNew (page ))
656
667
elog (ERROR , "page %u of relation \"%s\" should be empty but is not" ,
657
- BufferGetBlockNumber ( buffer ) ,
668
+ targetBlock ,
658
669
RelationGetRelationName (relation ));
659
670
660
671
PageInit (page , BufferGetPageSize (buffer ), 0 );
661
672
MarkBufferDirty (buffer );
662
673
663
674
/*
664
- * The page is empty, pin vmbuffer to set all_frozen bit.
675
+ * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
676
+ * do IO while the buffer is locked, so we unlock the page first if IO is
677
+ * needed (necessitating checks below).
665
678
*/
666
679
if (options & HEAP_INSERT_FROZEN )
667
680
{
668
- Assert (PageGetMaxOffsetNumber (BufferGetPage (buffer )) == 0 );
669
- visibilitymap_pin (relation , BufferGetBlockNumber (buffer ), vmbuffer );
681
+ Assert (PageGetMaxOffsetNumber (page ) == 0 );
682
+
683
+ if (!visibilitymap_pin_ok (targetBlock , * vmbuffer ))
684
+ {
685
+ unlockedTargetBuffer = true;
686
+ LockBuffer (buffer , BUFFER_LOCK_UNLOCK );
687
+ visibilitymap_pin (relation , targetBlock , vmbuffer );
688
+ }
670
689
}
671
690
672
691
/*
673
- * Lock the other buffer. It's guaranteed to be of a lower page number
674
- * than the new page. To conform with the deadlock prevent rules, we ought
675
- * to lock otherBuffer first, but that would give other backends a chance
676
- * to put tuples on our page. To reduce the likelihood of that, attempt to
677
- * lock the other buffer conditionally, that's very likely to work.
678
- * Otherwise we need to lock buffers in the correct order, and retry if
679
- * the space has been used in the mean time.
692
+ * Reacquire locks if necessary.
680
693
*
681
- * Alternatively, we could acquire the lock on otherBuffer before
682
- * extending the relation, but that'd require holding the lock while
683
- * performing IO, which seems worse than an unlikely retry.
694
+ * If the target buffer was unlocked above, or is unlocked while
695
+ * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
696
+ * that another backend used space on this page. We check for that below,
697
+ * and retry if necessary.
684
698
*/
685
- if (otherBuffer != InvalidBuffer )
699
+ recheckVmPins = false;
700
+ if (unlockedTargetBuffer )
701
+ {
702
+ /* released lock on target buffer above */
703
+ if (otherBuffer != InvalidBuffer )
704
+ LockBuffer (otherBuffer , BUFFER_LOCK_EXCLUSIVE );
705
+ LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
706
+ recheckVmPins = true;
707
+ }
708
+ else if (otherBuffer != InvalidBuffer )
686
709
{
710
+ /*
711
+ * We did not release the target buffer, and otherBuffer is valid,
712
+ * need to lock the other buffer. It's guaranteed to be of a lower
713
+ * page number than the new page. To conform with the deadlock
714
+ * prevent rules, we ought to lock otherBuffer first, but that would
715
+ * give other backends a chance to put tuples on our page. To reduce
716
+ * the likelihood of that, attempt to lock the other buffer
717
+ * conditionally, that's very likely to work.
718
+ *
719
+ * Alternatively, we could acquire the lock on otherBuffer before
720
+ * extending the relation, but that'd require holding the lock while
721
+ * performing IO, which seems worse than an unlikely retry.
722
+ */
687
723
Assert (otherBuffer != buffer );
688
- targetBlock = BufferGetBlockNumber (buffer );
689
724
Assert (targetBlock > otherBlock );
690
725
691
726
if (unlikely (!ConditionalLockBuffer (otherBuffer )))
692
727
{
728
+ unlockedTargetBuffer = true;
693
729
LockBuffer (buffer , BUFFER_LOCK_UNLOCK );
694
730
LockBuffer (otherBuffer , BUFFER_LOCK_EXCLUSIVE );
695
731
LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
696
732
}
733
+ recheckVmPins = true;
734
+ }
697
735
698
- /*
699
- * Because the buffers were unlocked for a while, it's possible,
700
- * although unlikely, that an all-visible flag became set or that
701
- * somebody used up the available space in the new page. We can use
702
- * GetVisibilityMapPins to deal with the first case. In the second
703
- * case, just retry from start.
704
- */
705
- GetVisibilityMapPins (relation , otherBuffer , buffer ,
706
- otherBlock , targetBlock , vmbuffer_other ,
707
- vmbuffer );
736
+ /*
737
+ * If one of the buffers was unlocked (always the case if otherBuffer is
738
+ * valid), it's possible, although unlikely, that an all-visible flag
739
+ * became set. We can use GetVisibilityMapPins to deal with that. It's
740
+ * possible that GetVisibilityMapPins() might need to temporarily release
741
+ * buffer locks, in which case we'll need to check if there's still enough
742
+ * space on the page below.
743
+ */
744
+ if (recheckVmPins )
745
+ {
746
+ if (GetVisibilityMapPins (relation , otherBuffer , buffer ,
747
+ otherBlock , targetBlock , vmbuffer_other ,
748
+ vmbuffer ))
749
+ unlockedTargetBuffer = true;
750
+ }
708
751
709
- /*
710
- * Note that we have to check the available space even if our
711
- * conditional lock succeeded, because GetVisibilityMapPins might've
712
- * transiently released lock on the target buffer to acquire a VM pin
713
- * for the otherBuffer.
714
- */
715
- if (len > PageGetHeapFreeSpace (page ))
752
+ /*
753
+ * If the target buffer was temporarily unlocked since the relation
754
+ * extension, it's possible, although unlikely, that all the space on the
755
+ * page was already used. If so, we just retry from the start. If we
756
+ * didn't unlock, something has gone wrong if there's not enough space -
757
+ * the test at the top should have prevented reaching this case.
758
+ */
759
+ pageFreeSpace = PageGetHeapFreeSpace (page );
760
+ if (len > pageFreeSpace )
761
+ {
762
+ if (unlockedTargetBuffer )
716
763
{
717
- LockBuffer (otherBuffer , BUFFER_LOCK_UNLOCK );
764
+ if (otherBuffer != InvalidBuffer )
765
+ LockBuffer (otherBuffer , BUFFER_LOCK_UNLOCK );
718
766
UnlockReleaseBuffer (buffer );
719
767
720
768
goto loop ;
721
769
}
722
- }
723
- else if (len > PageGetHeapFreeSpace (page ))
724
- {
725
- /* We should not get here given the test at the top */
726
770
elog (PANIC , "tuple is too big: size %zu" , len );
727
771
}
728
772
@@ -735,7 +779,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
735
779
* current backend to make more insertions or not, which is probably a
736
780
* good bet most of the time. So for now, don't add it to FSM yet.
737
781
*/
738
- RelationSetTargetBlock (relation , BufferGetBlockNumber ( buffer ) );
782
+ RelationSetTargetBlock (relation , targetBlock );
739
783
740
784
return buffer ;
741
785
}
0 commit comments