@@ -749,7 +749,7 @@ RumPageDeletePostingItem(Page page, OffsetNumber offset)
749
749
}
750
750
751
751
/*
752
- * checks space to install new value,
752
+ * checks space to install at least one new value,
753
753
* item pointer never deletes!
754
754
*/
755
755
static bool
@@ -762,8 +762,6 @@ dataIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
762
762
763
763
if (RumPageIsLeaf (page ))
764
764
{
765
- int n ,
766
- j ;
767
765
ItemPointerData iptr = {{0 , 0 }, 0 };
768
766
Size size = 0 ;
769
767
@@ -772,24 +770,8 @@ dataIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
772
770
* encoding from zero item pointer. Also use worst case assumption
773
771
* about alignment.
774
772
*/
775
- n = RumPageGetOpaque (page )-> maxoff ;
776
-
777
- if (RumPageRightMost (page ) && off > n )
778
- {
779
- for (j = btree -> curitem ; j < btree -> nitem ; j ++ )
780
- {
781
- size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
782
- & btree -> items [j ],
783
- (j == btree -> curitem ) ? (& iptr ) : & btree -> items [j - 1 ].iptr ,
784
- btree -> rumstate , size );
785
- }
786
- }
787
- else
788
- {
789
- j = btree -> curitem ;
790
- size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
791
- & btree -> items [j ], & iptr , btree -> rumstate , size );
792
- }
773
+ size = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
774
+ btree -> items + btree -> curitem , & iptr , btree -> rumstate , 0 );
793
775
size += MAXIMUM_ALIGNOF ;
794
776
795
777
if (RumPageGetOpaque (page )-> freespace >= size )
@@ -840,86 +822,127 @@ dataPlaceToPage(RumBtree btree, Page page, OffsetNumber off)
840
822
841
823
if (RumPageIsLeaf (page ))
842
824
{
843
- int i = 0 ,
844
- j ,
845
- max_j ;
846
825
Pointer ptr = RumDataPageGetData (page ),
847
- copy_ptr = NULL ;
826
+ copyPtr = NULL ;
848
827
ItemPointerData iptr = {{0 , 0 }, 0 };
849
- RumKey copy_item ;
828
+ RumKey copyItem ;
829
+ bool copyItemEmpty = true;
850
830
char pageCopy [BLCKSZ ];
851
831
int maxoff = RumPageGetOpaque (page )-> maxoff ;
852
- int freespace ;
832
+ int freespace ,
833
+ insertCount = 0 ;
834
+ bool stopAppend = false;
853
835
854
836
/*
855
837
* We're going to prevent var-byte re-encoding of whole page. Find
856
838
* position in page using page indexes.
857
839
*/
858
840
findInLeafPage (btree , page , & off , & iptr , & ptr );
859
841
860
- freespace = RumDataPageFreeSpacePre (page , ptr );
861
- Assert (freespace >= 0 );
862
-
863
842
if (off <= maxoff )
864
843
{
865
844
/*
866
845
* Read next item-pointer with additional information: we'll have
867
846
* to re-encode it. Copy previous part of page.
868
847
*/
869
- memcpy (pageCopy , page , BLCKSZ );
870
- copy_ptr = pageCopy + (ptr - page );
871
- copy_item .iptr = iptr ;
848
+ memcpy (pageCopy + ( ptr - page ), ptr , BLCKSZ - ( ptr - page ) );
849
+ copyPtr = pageCopy + (ptr - page );
850
+ copyItem .iptr = iptr ;
872
851
}
873
852
874
- /* Check how many items we're going to add */
875
- if (RumPageRightMost (page ) && off > maxoff )
876
- max_j = btree -> nitem ;
877
- else
878
- max_j = btree -> curitem + 1 ;
853
+ freespace = RumPageGetOpaque (page )-> freespace ;
879
854
880
- /* Place items to the page while we have enough of space */
881
- i = 0 ;
882
- for (j = btree -> curitem ; j < max_j ; j ++ )
855
+ /*
856
+ * We are execute a merge join over old page and new items, but we
857
+ * should stop inserting of new items if free space of page is ended
858
+ * to put all old items back
859
+ */
860
+ while (42 )
883
861
{
884
- Pointer ptr2 ;
862
+ int cmp ;
885
863
886
- ptr2 = page + rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
887
- & btree -> items [j ], & iptr , btree -> rumstate , ptr - page );
864
+ /* get next item to copy if we pushed it on previous loop */
865
+ if (copyItemEmpty == true && off <= maxoff )
866
+ {
867
+ copyPtr = rumDataPageLeafRead (copyPtr , btree -> entryAttnum ,
868
+ & copyItem , btree -> rumstate );
869
+ copyItemEmpty = false;
870
+ }
888
871
889
- freespace = RumDataPageFreeSpacePre (page , ptr2 );
890
- if (freespace < 0 )
872
+ if (off <= maxoff && btree -> curitem < btree -> nitem )
873
+ {
874
+ if (stopAppend )
875
+ cmp = -1 ; /* force copy */
876
+ else
877
+ cmp = compareRumKey (btree -> rumstate , btree -> entryAttnum ,
878
+ & copyItem ,
879
+ btree -> items + btree -> curitem );
880
+ }
881
+ else if (btree -> curitem < btree -> nitem )
882
+ {
883
+ /* we copied all old items but we have to add more new items */
884
+ if (stopAppend )
885
+ /* there is no free space on page */
886
+ break ;
887
+ else
888
+ /* force insertion of new item */
889
+ cmp = 1 ;
890
+ }
891
+ else if (off <= maxoff )
892
+ {
893
+ /* force copy, we believe that all old items could be placed */
894
+ cmp = -1 ;
895
+ }
896
+ else
897
+ {
891
898
break ;
899
+ }
892
900
893
- ptr = rumPlaceToDataPageLeaf (ptr , btree -> entryAttnum ,
894
- & btree -> items [j ], & iptr , btree -> rumstate );
895
- Assert (RumDataPageFreeSpacePre (page , ptr ) >= 0 );
896
-
897
- iptr = btree -> items [j ].iptr ;
898
- btree -> curitem ++ ;
899
- i ++ ;
900
- }
901
-
902
- /* Place rest of the page back */
903
- if (off <= maxoff )
904
- {
905
- for (j = off ; j <= maxoff ; j ++ )
901
+ if (cmp <= 0 )
906
902
{
907
- copy_ptr = rumDataPageLeafRead (copy_ptr , btree -> entryAttnum ,
908
- & copy_item , btree -> rumstate );
909
- ptr = rumPlaceToDataPageLeaf (ptr , btree -> entryAttnum , & copy_item ,
903
+ ptr = rumPlaceToDataPageLeaf (ptr , btree -> entryAttnum ,
904
+ & copyItem ,
910
905
& iptr , btree -> rumstate );
911
906
912
- Assert (RumDataPageFreeSpacePre (page , ptr ) >= 0 );
907
+ iptr = copyItem .iptr ;
908
+ off ++ ;
909
+ copyItemEmpty = true;
913
910
914
- iptr = copy_item .iptr ;
911
+ if (cmp == 0 )
912
+ {
913
+ btree -> curitem ++ ;
914
+ insertCount ++ ;
915
+ }
916
+ }
917
+ else /* if (cmp > 0) */
918
+ {
919
+ int newItemSize ;
920
+
921
+ newItemSize = rumCheckPlaceToDataPageLeaf (btree -> entryAttnum ,
922
+ btree -> items + btree -> curitem , & iptr ,
923
+ btree -> rumstate , 0 );
924
+
925
+ if (newItemSize <= freespace )
926
+ {
927
+ ptr = rumPlaceToDataPageLeaf (ptr , btree -> entryAttnum ,
928
+ btree -> items + btree -> curitem ,
929
+ & iptr , btree -> rumstate );
930
+ iptr = btree -> items [btree -> curitem ].iptr ;
931
+ btree -> curitem ++ ;
932
+ freespace -= newItemSize ;
933
+ insertCount ++ ;
934
+ }
935
+ else
936
+ {
937
+ stopAppend = true;
938
+ }
915
939
}
916
- }
917
940
918
- RumPageGetOpaque (page )-> maxoff += i ;
941
+ Assert (RumDataPageFreeSpacePre (page , ptr ) >= 0 );
942
+ }
919
943
920
- freespace = RumDataPageFreeSpacePre (page , ptr );
921
- if (freespace < 0 )
922
- elog (ERROR , "Not enough of space in leaf page!" );
944
+ Assert (insertCount > 0 );
945
+ RumPageGetOpaque (page )-> maxoff += insertCount ;
923
946
924
947
/* Update indexes in the end of page */
925
948
updateItemIndexes (page , btree -> entryAttnum , btree -> rumstate );
0 commit comments