@@ -770,6 +770,65 @@ typedef enum {
770
770
CFS_IMPLICIT
771
771
} GC_CALL_KIND ;
772
772
773
+ static bool cfs_copy_inodes (inode_t * * inodes , int n_nodes , int fd , int fd2 , uint32 * writeback , uint32 * offset , const char * file_path , const char * file_bck_path )
774
+ {
775
+ char block [BLCKSZ ];
776
+ uint32 size , offs ;
777
+ int i ;
778
+ off_t soff = -1 ;
779
+
780
+ /* sort inodes by offset to improve read locality */
781
+ qsort (inodes , n_nodes , sizeof (inode_t * ), cfs_cmp_page_offs );
782
+ for (i = 0 ; i < n_nodes ; i ++ )
783
+ {
784
+ size = CFS_INODE_SIZE (* inodes [i ]);
785
+ if (size != 0 )
786
+ {
787
+ offs = CFS_INODE_OFFS (* inodes [i ]);
788
+ Assert (size <= BLCKSZ );
789
+ if (soff != (off_t )offs )
790
+ {
791
+ soff = lseek (fd , offs , SEEK_SET );
792
+ Assert (soff == offs );
793
+ }
794
+
795
+ if (!cfs_read_file (fd , block , size ))
796
+ {
797
+ elog (WARNING , "CFS GC failed to read block %u of file %s at position %u size %u: %m" ,
798
+ i , file_path , offs , size );
799
+ return false;
800
+ }
801
+ soff += size ;
802
+
803
+ if (!cfs_write_file (fd2 , block , size ))
804
+ {
805
+ elog (WARNING , "CFS failed to write file %s: %m" , file_bck_path );
806
+ return false;
807
+ }
808
+ cfs_state -> gc_stat .processedBytes += size ;
809
+ cfs_state -> gc_stat .processedPages += 1 ;
810
+
811
+ offs = * offset ;
812
+ * offset += size ;
813
+ * inodes [i ] = CFS_INODE (size , offs );
814
+
815
+ /* xfs doesn't like if writeback performed closer than 128k to
816
+ * file end */
817
+ if (* writeback + 16 * 1024 * 1024 < * offset )
818
+ {
819
+ uint32 newwb = (* offset - 128 * 1024 ) & ~(128 * 1024 - 1 );
820
+ pg_flush_data (fd2 , * writeback , newwb - * writeback );
821
+ * writeback = newwb ;
822
+ }
823
+ }
824
+ else
825
+ {
826
+ * inodes [i ] = CFS_INODE (0 , 0 );
827
+ }
828
+ }
829
+ return true;
830
+ }
831
+
773
832
/*
774
833
* Perform garbage collection (if required) on the file
775
834
* @param map_path - path to the map file (*.cfm).
@@ -868,20 +927,20 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
868
927
/* do we need to perform defragmentation? */
869
928
if (physSize > CFS_IMPLICIT_GC_THRESHOLD || (uint64 )(physSize - usedSize )* 100 > (uint64 )physSize * cfs_gc_threshold )
870
929
{
871
- char block [BLCKSZ ];
872
930
FileMap * newMap = (FileMap * )palloc0 (sizeof (FileMap ));
873
931
uint32 newSize = 0 ;
874
932
uint32 writeback = 0 ;
875
933
uint32 newUsed = 0 ;
876
934
uint32 second_pass = 0 ;
935
+ uint32 second_pass_bytes = 0 ;
877
936
inode_t * * inodes = (inode_t * * )palloc (RELSEG_SIZE * sizeof (inode_t * ));
878
937
bool remove_backups = true;
879
- int n_pages ;
938
+ bool second_pass_whole = false;
939
+ int n_pages , n_pages1 ;
880
940
TimestampTz startTime , secondTime , endTime ;
881
941
long secs , secs2 ;
882
942
int usecs , usecs2 ;
883
943
int i , size ;
884
- uint32 offs ;
885
944
pg_atomic_uint32 * lock ;
886
945
off_t rc PG_USED_FOR_ASSERTS_ONLY ;
887
946
@@ -918,6 +977,13 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
918
977
goto Cleanup ;
919
978
}
920
979
980
+ fd = open (file_path , O_RDONLY |PG_BINARY , 0 );
981
+ if (fd < 0 )
982
+ {
983
+ elog (WARNING , "CFS failed to open file %s: %m" , map_bck_path );
984
+ goto Cleanup ;
985
+ }
986
+
921
987
/* temporary lock file for fetching map snapshot */
922
988
cfs_gc_lock (lock );
923
989
@@ -934,62 +1000,12 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
934
1000
/* may unlock until second phase */
935
1001
cfs_gc_unlock (lock );
936
1002
937
- /* sort inodes by offset to improve read locality */
938
- qsort (inodes , n_pages , sizeof (inode_t * ), cfs_cmp_page_offs );
939
-
940
- fd = open (file_path , O_RDONLY |PG_BINARY , 0 );
941
- if (fd < 0 )
942
- {
943
- elog (WARNING , "CFS failed to open file %s: %m" , map_bck_path );
944
- goto Cleanup ;
945
- }
946
-
947
1003
cfs_state -> gc_stat .processedFiles += 1 ;
948
1004
cfs_gc_processed_segments += 1 ;
949
1005
950
- for (i = 0 ; i < n_pages ; i ++ )
951
- {
952
- size = CFS_INODE_SIZE (* inodes [i ]);
953
- if (size != 0 )
954
- {
955
- offs = CFS_INODE_OFFS (* inodes [i ]);
956
- Assert (size <= BLCKSZ );
957
- rc = lseek (fd , offs , SEEK_SET );
958
- Assert (rc == offs );
959
-
960
- if (!cfs_read_file (fd , block , size ))
961
- {
962
- elog (WARNING , "CFS GC failed to read block %u of file %s at position %u size %u: %m" ,
963
- i , file_path , offs , size );
964
- goto Cleanup ;
965
- }
966
-
967
- if (!cfs_write_file (fd2 , block , size ))
968
- {
969
- elog (WARNING , "CFS failed to write file %s: %m" , file_bck_path );
970
- goto Cleanup ;
971
- }
972
- cfs_state -> gc_stat .processedBytes += size ;
973
- cfs_state -> gc_stat .processedPages += 1 ;
974
-
975
- offs = newSize ;
976
- newSize += size ;
977
- * inodes [i ] = CFS_INODE (size , offs );
978
-
979
- /* xfs doesn't like if writeback performed closer than 128k to
980
- * file end */
981
- if (writeback + 16 * 1024 * 1024 < newSize )
982
- {
983
- uint32 newwb = (newSize - 128 * 1024 ) & ~(128 * 1024 - 1 );
984
- pg_flush_data (fd2 , writeback , newwb - writeback );
985
- writeback = newwb ;
986
- }
987
- }
988
- else
989
- {
990
- * inodes [i ] = CFS_INODE (0 , 0 );
991
- }
992
- }
1006
+ if (!cfs_copy_inodes (inodes , n_pages , fd , fd2 , & writeback , & newSize ,
1007
+ file_path , file_bck_path ))
1008
+ goto Cleanup ;
993
1009
newUsed = newSize ;
994
1010
995
1011
/* Persist bigger part of copy to not do it under lock */
@@ -1009,6 +1025,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1009
1025
cfs_gc_lock (lock );
1010
1026
1011
1027
/* Reread variables after locking file */
1028
+ n_pages1 = n_pages ;
1012
1029
virtSize = pg_atomic_read_u32 (& map -> hdr .virtSize );
1013
1030
n_pages = virtSize / BLCKSZ ;
1014
1031
@@ -1025,46 +1042,60 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1025
1042
}
1026
1043
newUsed -= CFS_INODE_SIZE (nnode );
1027
1044
newUsed += size ;
1028
- if (size != 0 )
1045
+ newMap -> inodes [i ] = onode ;
1046
+ inodes [second_pass ] = & newMap -> inodes [i ];
1047
+ second_pass_bytes += size ;
1048
+ second_pass ++ ;
1049
+ }
1050
+
1051
+ if (n_pages1 > n_pages )
1052
+ {
1053
+ /* if file were truncated (vacuum???), clean a bit */
1054
+ for (i = n_pages ; i < n_pages1 ; i ++ )
1029
1055
{
1030
- second_pass ++ ;
1031
- offs = CFS_INODE_OFFS (onode );
1056
+ inode_t nnode = newMap -> inodes [i ];
1057
+ if (CFS_INODE_SIZE (nnode ) != 0 ) {
1058
+ newUsed -= CFS_INODE_SIZE (nnode );
1059
+ newMap -> inodes [i ] = CFS_INODE (0 , 0 );
1060
+ }
1061
+ }
1062
+ }
1032
1063
1033
- rc = lseek (fd , offs , SEEK_SET );
1034
- Assert (rc == (off_t )offs );
1064
+ if ((uint64 )(newSize + second_pass_bytes - newUsed ) * 100 >
1065
+ (uint64 )(newSize + second_pass_bytes ) * cfs_gc_threshold )
1066
+ {
1067
+ /* there were too many modified pages between passes, so it is
1068
+ * better to do whole copy again */
1069
+ newUsed = 0 ;
1070
+ newSize = 0 ;
1071
+ writeback = 0 ;
1072
+ second_pass_whole = true;
1073
+ memset (newMap -> inodes , 0 , sizeof (newMap -> inodes ));
1074
+ for (i = 0 ; i < n_pages ; i ++ )
1075
+ {
1076
+ newMap -> inodes [i ] = map -> inodes [i ];
1077
+ newUsed += CFS_INODE_SIZE (map -> inodes [i ]);
1078
+ inodes [i ] = & newMap -> inodes [i ];
1079
+ }
1080
+ second_pass = n_pages ;
1081
+ second_pass_bytes = newUsed ;
1082
+ }
1035
1083
1036
- if (!cfs_read_file (fd , block , size ))
1037
- {
1038
- elog (WARNING , "CFS GC failed to read block %u of file %s at position %u size %u: %m" ,
1039
- i , file_path , offs , size );
1040
- goto Cleanup ;
1041
- }
1084
+ if (!cfs_copy_inodes (inodes , second_pass , fd , fd2 , & writeback , & newSize ,
1085
+ file_path , file_bck_path ))
1086
+ goto Cleanup ;
1042
1087
1043
- /* copy it without sorting */
1044
- offs = newSize ;
1045
- newSize += size ;
1046
- if (!cfs_write_file (fd2 , block , size ))
1047
- {
1048
- elog (WARNING , "CFS failed to write file %s: %m" , file_bck_path );
1049
- goto Cleanup ;
1050
- }
1051
- newMap -> inodes [i ] = CFS_INODE (size , offs );
1088
+ pg_flush_data (fd2 , writeback , newSize );
1052
1089
1053
- if (writeback + 16 * 1024 * 1024 < newSize )
1054
- {
1055
- uint32 newwb = (newSize - 128 * 1024 ) & ~(128 * 1024 - 1 );
1056
- pg_flush_data (fd2 , writeback , newwb - writeback );
1057
- writeback = newwb ;
1058
- }
1059
- }
1060
- else
1090
+ if (second_pass_whole )
1091
+ {
1092
+ /* truncate file to copied size */
1093
+ if (ftruncate (fd2 , newSize ))
1061
1094
{
1062
- newMap -> inodes [i ] = CFS_INODE (0 , 0 );
1095
+ elog (WARNING , "CFS failed to truncate file %s: %m" , file_bck_path );
1096
+ goto Cleanup ;
1063
1097
}
1064
- cfs_state -> gc_stat .processedBytes += size ;
1065
- cfs_state -> gc_stat .processedPages += 1 ;
1066
1098
}
1067
- pg_flush_data (fd2 , writeback , newSize );
1068
1099
1069
1100
if (close (fd ) < 0 )
1070
1101
{
@@ -1235,10 +1266,10 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1235
1266
1236
1267
if (succeed )
1237
1268
{
1238
- elog (LOG , "CFS GC worker %d: defragment file %s: old size %u, new size %u, logical size %u, used %u, compression ratio %f, time %ld usec; second pass: pages %u, time %ld"
1269
+ elog (LOG , "CFS GC worker %d: defragment file %s: old size %u, new size %u, logical size %u, used %u, compression ratio %f, time %ld usec; second pass: pages %u, bytes %u, time %ld"
1239
1270
,
1240
1271
MyProcPid , file_path , physSize , newSize , virtSize , usedSize , (double )virtSize /newSize ,
1241
- secs * USECS_PER_SEC + usecs , second_pass ,
1272
+ secs * USECS_PER_SEC + usecs , second_pass , second_pass_bytes ,
1242
1273
secs2 * USECS_PER_SEC + usecs2 );
1243
1274
}
1244
1275
0 commit comments