@@ -392,7 +392,7 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
392
392
*/
393
393
int cfs_shmem_size ()
394
394
{
395
- return sizeof (CfsState );
395
+ return sizeof (CfsState ) + sizeof ( pg_atomic_uint32 ) * MaxBackends ;
396
396
}
397
397
398
398
void cfs_initialize ()
@@ -401,6 +401,8 @@ void cfs_initialize()
401
401
cfs_state = (CfsState * )ShmemInitStruct ("CFS Control" , sizeof (CfsState ), & found );
402
402
if (!found )
403
403
{
404
+ int i ;
405
+
404
406
memset (& cfs_state -> gc_stat , 0 , sizeof cfs_state -> gc_stat );
405
407
pg_atomic_init_flag (& cfs_state -> gc_started );
406
408
pg_atomic_init_u32 (& cfs_state -> n_active_gc , 0 );
@@ -409,6 +411,9 @@ void cfs_initialize()
409
411
cfs_state -> gc_enabled = true;
410
412
cfs_state -> max_iterations = 0 ;
411
413
414
+ for (i = 0 ; i < MaxBackends ; i ++ )
415
+ pg_atomic_init_u32 (& cfs_state -> locks [i ], 0 );
416
+
412
417
if (cfs_encryption )
413
418
cfs_crypto_init ();
414
419
@@ -449,11 +454,6 @@ FileMap* cfs_mmap(int md)
449
454
#else
450
455
map = (FileMap * )mmap (NULL , sizeof (FileMap ), PROT_WRITE | PROT_READ , MAP_SHARED , md , 0 );
451
456
#endif
452
- if (map != MAP_FAILED && map -> postmasterPid != PostmasterPid )
453
- {
454
- map -> postmasterPid = PostmasterPid ;
455
- pg_atomic_write_u32 (& map -> lock , 0 );
456
- }
457
457
return map ;
458
458
}
459
459
@@ -606,65 +606,126 @@ static bool cfs_recover(FileMap* map, int md,
606
606
unlink (file_bck_path );
607
607
unlink (map_bck_path );
608
608
}
609
+ if (ok )
610
+ pg_atomic_write_u32 (& map -> gc_active , false); /* clear the GC flag */
609
611
return ok ;
610
612
}
611
613
612
614
/*
613
- * Protects file from GC
615
+ * Get lock entry for this file.
616
+ * Size of array of locks is equal to maximal number of backends, because there are cann't be more than MaxBackens active locks.
614
617
*/
615
- void cfs_lock_file (FileMap * map , int md , char const * file_path )
618
+ static pg_atomic_uint32 *
619
+ cfs_get_lock (char const * file_path )
620
+ {
621
+ uint32 hash = string_hash (file_path , 0 );
622
+ return & cfs_state -> locks [hash % MaxBackends ];
623
+ }
624
+
625
+ /*
626
+ * Set GC exclusive lock preventing all backends from accessing this file
627
+ */
628
+ static void
629
+ cfs_gc_lock (pg_atomic_uint32 * lock )
616
630
{
631
+ uint32 count = pg_atomic_fetch_or_u32 (lock , CFS_GC_LOCK );
617
632
long delay = CFS_LOCK_MIN_TIMEOUT ;
618
633
619
- while (true )
634
+ while (( count & ~ CFS_GC_LOCK ) != 1 )
620
635
{
621
- uint32 count = pg_atomic_fetch_add_u32 (& map -> lock , 1 );
622
-
623
- if (count < CFS_GC_LOCK )
636
+ pg_usleep (delay );
637
+ CHECK_FOR_INTERRUPTS ();
638
+ count = pg_atomic_read_u32 (lock );
639
+ if (delay < CFS_LOCK_MAX_TIMEOUT )
624
640
{
625
- /* No GC is active for this segment */
626
- break ;
641
+ delay *= 2 ;
627
642
}
643
+ }
644
+ pg_memory_barrier ();
645
+ }
628
646
629
- if (pg_atomic_read_u32 (& cfs_state -> n_active_gc ) == 0 )
630
- {
631
- /* There is no active GC, so lock is set by crashed GC */
647
+ /*
648
+ * Release CFS GC lock
649
+ */
650
+ static void cfs_gc_unlock (pg_atomic_uint32 * lock )
651
+ {
652
+ pg_write_barrier ();
653
+ pg_atomic_fetch_and_u32 (lock , ~CFS_GC_LOCK );
654
+ }
632
655
633
- LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent race condition with GC */
656
+ /*
657
+ * Set shared acess lock, preventing GC of this file
658
+ */
659
+ static void
660
+ cfs_access_lock (char const * file_path )
661
+ {
662
+ pg_atomic_uint32 * lock = cfs_get_lock (file_path );
663
+ long delay = CFS_LOCK_MIN_TIMEOUT ;
634
664
635
- /* Recheck under CfsGcLock that map->lock was not released */
636
- if (pg_atomic_read_u32 (& map -> lock ) >= CFS_GC_LOCK )
637
- {
638
- /* Uhhh... looks like last GC was interrupted.
639
- * Try to recover the file.
640
- */
641
- char * map_path = psprintf ("%s.cfm" , file_path );
642
- char * map_bck_path = psprintf ("%s.cfm.bck" , file_path );
643
- char * file_bck_path = psprintf ("%s.bck" , file_path );
644
-
645
- if (!cfs_recover (map , md , file_path , map_path , file_bck_path , map_bck_path ))
646
- {
647
- pg_atomic_fetch_sub_u32 (& map -> lock , 1 );
648
- LWLockRelease (CfsGcLock );
649
- elog (ERROR , "CFS found that file %s is completely destroyed" , file_path );
650
- }
665
+ /* Increment number of locks and wait until there is no active GC for this segment */
666
+ while (true)
667
+ {
668
+ uint32 count = pg_atomic_fetch_add_u32 (lock , 1 );
651
669
652
- count = pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* revoke GC lock */
653
- Assert ((int )count > 0 );
654
- pfree (file_bck_path );
655
- pfree (map_bck_path );
656
- pfree (map_path );
657
- }
658
- LWLockRelease (CfsGcLock );
670
+ if (count < CFS_GC_LOCK )
671
+ {
672
+ /* No GC is active for this segment */
673
+ return ;
659
674
}
660
675
/* Wait until GC of segment is completed */
661
- pg_atomic_fetch_sub_u32 (& map -> lock , 1 );
676
+ pg_atomic_fetch_sub_u32 (lock , 1 );
662
677
pg_usleep (delay );
678
+ CHECK_FOR_INTERRUPTS ();
663
679
if (delay < CFS_LOCK_MAX_TIMEOUT )
664
680
{
665
681
delay *= 2 ;
666
682
}
667
683
}
684
+ }
685
+
686
+ /*
687
+ * Protects file from GC and checks whether recovery of the file is needed
688
+ */
689
+ void cfs_lock_file (FileMap * map , int md , char const * file_path )
690
+ {
691
+ cfs_access_lock (file_path );
692
+
693
+ if (pg_atomic_read_u32 (& map -> gc_active )) /* Non-zero value of map->gc_active indicates that GC was not successfully completed during previous Postges session */
694
+ {
695
+ LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent race condition with GC */
696
+
697
+ /* Recheck under CfsGcLock that map->gc_active was not released */
698
+ if (pg_atomic_read_u32 (& map -> gc_active ))
699
+ {
700
+ /* Uhhh... looks like last GC was interrupted.
701
+ * Try to recover the file.
702
+ */
703
+ char * map_path = psprintf ("%s.cfm" , file_path );
704
+ char * map_bck_path = psprintf ("%s.cfm.bck" , file_path );
705
+ char * file_bck_path = psprintf ("%s.bck" , file_path );
706
+
707
+ if (!cfs_recover (map , md , file_path , map_path , file_bck_path , map_bck_path ))
708
+ {
709
+ cfs_unlock_file (map , file_path );
710
+ LWLockRelease (CfsGcLock );
711
+ elog (ERROR , "CFS found that file %s is completely destroyed" , file_path );
712
+ }
713
+
714
+ pfree (file_bck_path );
715
+ pfree (map_bck_path );
716
+ pfree (map_path );
717
+ }
718
+ LWLockRelease (CfsGcLock );
719
+ }
720
+ }
721
+
722
+ /*
723
+ * Start background GC workers if not start yet.
724
+ * It is done lazily on forst data file access.
725
+ * Is there some better place to start background workers?
726
+ */
727
+ void cfs_start_background_workers (void )
728
+ {
668
729
669
730
if (IsUnderPostmaster && cfs_gc_workers != 0
670
731
&& pg_atomic_test_set_flag (& cfs_state -> gc_started ))
@@ -676,9 +737,10 @@ void cfs_lock_file(FileMap* map, int md, char const* file_path)
676
737
/*
677
738
* Release file lock
678
739
*/
679
- void cfs_unlock_file (FileMap * map )
740
+ void cfs_unlock_file (FileMap * map , char const * file_path )
680
741
{
681
- pg_atomic_fetch_sub_u32 (& map -> lock , 1 );
742
+ pg_atomic_uint32 * lock = cfs_get_lock (file_path );
743
+ pg_atomic_fetch_sub_u32 (lock , 1 );
682
744
}
683
745
684
746
/*
@@ -727,7 +789,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
727
789
char * file_path = (char * )palloc (suf + 1 );
728
790
char * map_bck_path = (char * )palloc (suf + 10 );
729
791
char * file_bck_path = (char * )palloc (suf + 5 );
730
- uint32 count ;
731
792
int rc ;
732
793
733
794
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
@@ -752,7 +813,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
752
813
exit (1 );
753
814
754
815
ResetLatch (MyLatch );
755
- CHECK_FOR_INTERRUPTS ();
756
816
757
817
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
758
818
}
@@ -784,8 +844,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
784
844
strcat (strcpy (file_bck_path , file_path ), ".bck" );
785
845
786
846
/* mostly same as for cfs_lock_file */
787
- count = pg_atomic_fetch_add_u32 (& map -> lock , 1 );
788
- if (count >= CFS_GC_LOCK )
847
+ if (pg_atomic_read_u32 (& map -> gc_active )) /* Check if GC was not normally completed at previous Postgres run */
789
848
{
790
849
/* there could not be concurrent GC for this file here, so recover */
791
850
if (!cfs_recover (map , md , file_path , map_path , file_bck_path , map_bck_path ))
@@ -821,11 +880,14 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
821
880
int usecs , usecs2 ;
822
881
int i , size ;
823
882
uint32 offs ;
883
+ pg_atomic_uint32 * lock ;
824
884
off_t rc PG_USED_FOR_ASSERTS_ONLY ;
825
885
826
886
startTime = GetCurrentTimestamp ();
827
887
secondTime = startTime ;
828
888
889
+ lock = cfs_get_lock (file_path );
890
+
829
891
fd2 = open (file_bck_path , O_CREAT |O_RDWR |PG_BINARY |O_TRUNC , 0600 );
830
892
if (fd2 < 0 )
831
893
{
@@ -855,13 +917,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
855
917
}
856
918
857
919
/* temporary lock file for fetching map snapshot */
858
- count = pg_atomic_fetch_or_u32 (& map -> lock , CFS_GC_LOCK );
859
- while ((count & ~CFS_GC_LOCK ) != 1 )
860
- {
861
- pg_usleep (10 );
862
- count = pg_atomic_read_u32 (& map -> lock );
863
- }
864
- pg_memory_barrier ();
920
+ cfs_gc_lock (lock );
921
+
865
922
/* Reread variables after locking file */
866
923
virtSize = pg_atomic_read_u32 (& map -> hdr .virtSize );
867
924
n_pages = virtSize / BLCKSZ ;
@@ -873,7 +930,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
873
930
inodes [i ] = & newMap -> inodes [i ];
874
931
}
875
932
/* may unlock until second phase */
876
- pg_atomic_fetch_and_u32 ( & map -> lock , ~ CFS_GC_LOCK );
933
+ cfs_gc_unlock ( lock );
877
934
878
935
/* sort inodes by offset to improve read locality */
879
936
qsort (inodes , n_pages , sizeof (inode_t * ), cfs_cmp_page_offs );
@@ -950,13 +1007,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
950
1007
951
1008
secondTime = GetCurrentTimestamp ();
952
1009
953
- count = pg_atomic_fetch_or_u32 (& map -> lock , CFS_GC_LOCK );
954
- while ((count & ~CFS_GC_LOCK ) != 1 )
955
- {
956
- pg_usleep (10 );
957
- count = pg_atomic_read_u32 (& map -> lock );
958
- }
959
- pg_memory_barrier ();
1010
+ cfs_gc_lock (lock );
1011
+
960
1012
/* Reread variables after locking file */
961
1013
virtSize = pg_atomic_read_u32 (& map -> hdr .virtSize );
962
1014
n_pages = virtSize / BLCKSZ ;
@@ -1054,6 +1106,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1054
1106
pg_atomic_write_u32 (& newMap -> hdr .physSize , newSize );
1055
1107
pg_atomic_write_u32 (& newMap -> hdr .virtSize , virtSize );
1056
1108
1109
+ pg_atomic_write_u32 (& newMap -> gc_active , true); /* Indicate start of GC */
1110
+
1057
1111
/* Persist copy of map file */
1058
1112
if (!cfs_write_file (md2 , & newMap -> hdr , sizeof (newMap -> hdr )))
1059
1113
{
@@ -1116,7 +1170,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1116
1170
1117
1171
if (res != BLCKSZ )
1118
1172
{
1119
- pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK + 1 ); /* release lock */
1173
+ pg_atomic_fetch_sub_u32 (lock , CFS_GC_LOCK ); /* release lock */
1120
1174
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
1121
1175
elog (ERROR , "CFS: verification failed for block %u position %u size %u of relation %s: error code %d" ,
1122
1176
i , (int )CFS_INODE_OFFS (inode ), size , file_bck_path , (int )res );
@@ -1146,6 +1200,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1146
1200
memcpy (map -> inodes , newMap -> inodes , n_pages * sizeof (inode_t ));
1147
1201
pg_atomic_write_u32 (& map -> hdr .usedSize , newUsed );
1148
1202
pg_atomic_write_u32 (& map -> hdr .physSize , newSize );
1203
+ pg_atomic_write_u32 (& map -> gc_active , false);
1149
1204
map -> generation += 1 ; /* force all backends to reopen the file */
1150
1205
1151
1206
/* Before removing backup files and releasing locks
@@ -1159,7 +1214,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1159
1214
{
1160
1215
elog (WARNING , "CFS failed to sync file %s: %m" , map_path );
1161
1216
1162
- Cleanup :
1217
+ Cleanup :
1163
1218
if (fd >= 0 ) close (fd );
1164
1219
if (fd2 >= 0 ) close (fd2 );
1165
1220
if (md2 >= 0 ) close (md2 );
@@ -1174,8 +1229,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1174
1229
else
1175
1230
remove_backups = true; /* we don't need backups anymore */
1176
1231
1177
- pg_write_barrier ();
1178
- pg_atomic_fetch_and_u32 (& map -> lock , ~CFS_GC_LOCK ); /* release gc lock */
1232
+ cfs_gc_unlock (lock );
1179
1233
1180
1234
/* remove map backup file */
1181
1235
if (remove_backups && unlink (map_bck_path ))
@@ -1210,7 +1264,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1210
1264
MyProcPid , suf , map_path , physSize , virtSize , usedSize , (double )virtSize /physSize );
1211
1265
1212
1266
FinUnmap :
1213
- pg_atomic_fetch_sub_u32 (& map -> lock , 1 ); /* release read lock */
1214
1267
if (cfs_munmap (map ) < 0 )
1215
1268
{
1216
1269
elog (WARNING , "CFS failed to unmap file %s: %m" , map_path );
0 commit comments