@@ -540,86 +540,72 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
540
540
void cfs_lock_file (FileMap * map , char const * file_path )
541
541
{
542
542
long delay = CFS_LOCK_MIN_TIMEOUT ;
543
- int n_attempts = 0 ;
544
543
545
544
while (true)
546
545
{
547
- uint64 count = pg_atomic_fetch_add_u32 (& map -> lock , 1 );
548
- bool revokeLock = false;
546
+ uint32 count = pg_atomic_fetch_add_u32 (& map -> lock , 1 );
549
547
550
548
if (count < CFS_GC_LOCK )
551
- break ;
552
-
553
- if (InRecovery )
554
- {
555
- revokeLock = true;
556
- }
557
- else
558
549
{
559
- if (pg_atomic_unlocked_test_flag (& cfs_state -> gc_started ))
560
- {
561
- if (++ n_attempts > MAX_LOCK_ATTEMPTS )
562
- {
563
- /* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564
- * Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565
- * without releasing lock. And for some reasons recovery was not performed and this page left locked.
566
- * We should revoke the the lock to allow access to this segment.
567
- */
568
- revokeLock = true;
569
- elog (WARNING , "CFS revokes lock on file %s\n" , file_path );
570
- }
571
- }
572
- else
573
- {
574
- n_attempts = 0 ; /* Reset counter of attempts because GC is in progress */
575
- }
550
+ /* No GC is active for this segment */
551
+ break ;
576
552
}
577
- if (revokeLock
578
- /* use gc_started flag to prevent race condition with other backends and GC */
579
- && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
580
- {
581
- /* Ugggh... looks like last GC was interrupted.
582
- * Try to recover the file.
583
- */
584
- char * map_bck_path = psprintf ("%s.cfm.bck" , file_path );
585
- char * file_bck_path = psprintf ("%s.bck" , file_path );
586
553
587
- elog (WARNING , "CFS indicates that GC of %s was interrupted: try to perform recovery" , file_path );
554
+ if (pg_atomic_read_u32 (& cfs_state -> n_active_gc ) == 0 )
555
+ {
556
+ /* There is no active GC, so lock is set by crashed GC */
588
557
589
- if (access (file_bck_path , R_OK ) != 0 )
590
- {
591
- /* There is no backup file: new map should be constructed */
592
- int md2 = open (map_bck_path , O_RDWR |PG_BINARY , 0 );
593
- if (md2 >= 0 )
594
- {
595
- /* Recover map. */
596
- if (!cfs_read_file (md2 , map , sizeof (FileMap )))
597
- elog (WARNING , "CFS failed to read file %s: %m" , map_bck_path );
558
+ LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent race condition with GC */
598
559
599
- close (md2 );
600
- }
601
- }
602
- else
560
+ /* Recheck under CfsGcLock that map->lock was not released */
561
+ if (pg_atomic_read_u32 (& map -> lock ) >= CFS_GC_LOCK )
603
562
{
604
- /* Presence of backup file means that we still have
605
- * unchanged data and map files. Just remove backup files and
606
- * revoke GC lock.
563
+ /* Uhhh... looks like last GC was interrupted.
564
+ * Try to recover the file.
607
565
*/
608
- unlink (file_bck_path );
609
- unlink (map_bck_path );
566
+ char * map_bck_path = psprintf ("%s.cfm.bck" , file_path );
567
+ char * file_bck_path = psprintf ("%s.bck" , file_path );
568
+
569
+ elog (WARNING , "CFS indicates that GC of %s was interrupted: try to perform recovery" , file_path );
570
+
571
+ if (access (file_bck_path , R_OK ) != 0 )
572
+ {
573
+ /* There is no backup file: new map should be constructed */
574
+ int md2 = open (map_bck_path , O_RDWR |PG_BINARY , 0 );
575
+ if (md2 >= 0 )
576
+ {
577
+ /* Recover map. */
578
+ if (!cfs_read_file (md2 , map , sizeof (FileMap )))
579
+ elog (WARNING , "CFS failed to read file %s: %m" , map_bck_path );
580
+
581
+ close (md2 );
582
+ }
583
+ }
584
+ else
585
+ {
586
+ /* Presence of backup file means that we still have
587
+ * unchanged data and map files. Just remove backup files and
588
+ * revoke GC lock.
589
+ */
590
+ unlink (file_bck_path );
591
+ unlink (map_bck_path );
592
+ }
593
+
594
+ count = pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* revoke GC lock */
595
+ Assert ((int )count > 0 );
596
+ pfree (file_bck_path );
597
+ pfree (map_bck_path );
610
598
}
611
-
612
- pg_atomic_clear_flag (& cfs_state -> gc_started );
613
- count = pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* revoke GC lock */
614
- Assert ((int )count > 0 );
615
- pfree (file_bck_path );
616
- pfree (map_bck_path );
599
+ LWLockRelease (CfsGcLock );
617
600
break ;
618
- }
601
+ }
602
+ /* Wait until GC of segment is completed */
619
603
pg_atomic_fetch_sub_u32 (& map -> lock , 1 );
620
604
pg_usleep (delay );
621
605
if (delay < CFS_LOCK_MAX_TIMEOUT )
606
+ {
622
607
delay *= 2 ;
608
+ }
623
609
}
624
610
625
611
if (IsUnderPostmaster && cfs_gc_workers != 0
@@ -649,11 +635,11 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
649
635
/*
650
636
* Perform garbage collection (if required) on the file
651
637
* @param map_path - path to the map file (*.cfm).
652
- * @param noerror - surpress error message (when this function is called by cfs_gc_relation until there are available segments)
638
+ * @param bacground - GC is performed in background by BGW: surpress error message and set CfsGcLock
653
639
*/
654
- static bool cfs_gc_file (char * map_path , bool noerror )
640
+ static bool cfs_gc_file (char * map_path , bool background )
655
641
{
656
- int md = open ( map_path , O_RDWR | PG_BINARY , 0 ) ;
642
+ int md ;
657
643
FileMap * map ;
658
644
uint32 physSize ;
659
645
uint32 usedSize ;
@@ -663,29 +649,33 @@ static bool cfs_gc_file(char* map_path, bool noerror)
663
649
int fd2 = -1 ;
664
650
int md2 = -1 ;
665
651
bool succeed = false;
652
+ int rc ;
653
+
666
654
667
655
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
668
656
669
- while (! cfs_state -> gc_enabled )
657
+ if ( background )
670
658
{
671
- int rc ;
672
-
673
- pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
674
-
675
- rc = WaitLatch (MyLatch ,
676
- WL_TIMEOUT | WL_POSTMASTER_DEATH ,
677
- CFS_DISABLE_TIMEOUT /* ms */ );
678
- if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
679
- exit (1 );
659
+ while (!cfs_state -> gc_enabled )
660
+ {
661
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
662
+
663
+ rc = WaitLatch (MyLatch ,
664
+ WL_TIMEOUT | WL_POSTMASTER_DEATH ,
665
+ CFS_DISABLE_TIMEOUT /* ms */ );
666
+ if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
667
+ exit (1 );
668
+
669
+ pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
670
+ }
680
671
681
- pg_atomic_fetch_add_u32 ( & cfs_state -> n_active_gc , 1 );
672
+ LWLockAcquire ( CfsGcLock , LW_SHARED ); /* avoid race condition with cfs_file_lock */
682
673
}
683
674
675
+ md = open (map_path , O_RDWR |PG_BINARY , 0 );
684
676
if (md < 0 )
685
677
{
686
- if (!noerror ) {
687
- elog (WARNING , "CFS failed to open map file %s: %m" , map_path );
688
- }
678
+ elog (DEBUG1 , "CFS failed to open map file %s: %m" , map_path );
689
679
goto FinishGC ;
690
680
}
691
681
@@ -1032,7 +1022,12 @@ static bool cfs_gc_file(char* map_path, bool noerror)
1032
1022
}
1033
1023
1034
1024
FinishGC :
1025
+ if (background )
1026
+ {
1027
+ LWLockRelease (CfsGcLock );
1028
+ }
1035
1029
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
1030
+
1036
1031
return succeed ;
1037
1032
}
1038
1033
@@ -1066,7 +1061,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1066
1061
strcmp (file_path + len - 4 , ".cfm" ) == 0 )
1067
1062
{
1068
1063
if (entry -> d_ino % cfs_state -> n_workers == worker_id
1069
- && !cfs_gc_file (file_path , false ))
1064
+ && !cfs_gc_file (file_path , true ))
1070
1065
{
1071
1066
success = false;
1072
1067
break ;
@@ -1395,31 +1390,35 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
1395
1390
1396
1391
Datum cfs_gc_relation (PG_FUNCTION_ARGS )
1397
1392
{
1398
- cfs_gc_processed_segments = 0 ;
1399
-
1400
- if (cfs_gc_workers == 0 && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
1393
+ Oid oid = PG_GETARG_OID (0 );
1394
+ Relation rel = try_relation_open (oid , AccessShareLock );
1395
+ int processed_segments = 0 ;
1396
+
1397
+ if (rel != NULL )
1401
1398
{
1402
- Oid oid = PG_GETARG_OID (0 );
1403
- Relation rel = try_relation_open (oid , AccessShareLock );
1404
-
1405
- if (rel != NULL )
1406
- {
1407
- char * path = relpathbackend (rel -> rd_node , rel -> rd_backend , MAIN_FORKNUM );
1408
- char * map_path = (char * )palloc (strlen (path ) + 16 );
1409
- int i = 0 ;
1410
- sprintf (map_path , "%s.cfm" , path );
1399
+ char * path ;
1400
+ char * map_path ;
1401
+ int i = 0 ;
1402
+
1403
+ LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent interaction with background GC */
1404
+
1405
+ processed_segments = cfs_gc_processed_segments ;
1406
+
1407
+ path = relpathbackend (rel -> rd_node , rel -> rd_backend , MAIN_FORKNUM );
1408
+ map_path = (char * )palloc (strlen (path ) + 16 );
1409
+ sprintf (map_path , "%s.cfm" , path );
1411
1410
1412
- while (true)
1413
- {
1414
- if (!cfs_gc_file (map_path , true))
1415
- break ;
1416
- sprintf (map_path , "%s.%u.cfm" , path , ++ i );
1417
- }
1418
- pfree (path );
1419
- pfree (map_path );
1420
- relation_close (rel , AccessShareLock );
1411
+ while (cfs_gc_file (map_path , false))
1412
+ {
1413
+ sprintf (map_path , "%s.%u.cfm" , path , ++ i );
1421
1414
}
1422
- pg_atomic_clear_flag (& cfs_state -> gc_started );
1415
+ pfree (path );
1416
+ pfree (map_path );
1417
+ relation_close (rel , AccessShareLock );
1418
+
1419
+ processed_segments -= cfs_gc_processed_segments ;
1420
+
1421
+ LWLockRelease (CfsGcLock );
1423
1422
}
1424
1423
PG_RETURN_INT32 (cfs_gc_processed_segments );
1425
1424
}
0 commit comments