-
Notifications
You must be signed in to change notification settings - Fork 244
/
Copy pathfetcher.go
114 lines (98 loc) · 2.82 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package refresh
import (
"context"
"time"
)
const (
DefaultMinInterval = 4 * time.Second
DefaultMaxInterval = 1024 * time.Second
)
// Fetcher fetches data at regular intervals. The interval will vary within the range of minInterval and
// maxInterval. When no diff is observed after a fetch, the interval doubles (subject to the maximum interval).
// When a diff is observed, the interval resets to the minimum. The interval can be made unchanging by setting
// minInterval and maxInterval to the same desired value.
type Fetcher[T equaler[T]] struct {
fetchFunc func(context.Context) (T, error)
cache T
minInterval time.Duration
maxInterval time.Duration
currentInterval time.Duration
ticker TickProvider
consumeFunc func(T) error
logger Logger
}
// NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds.
func NewFetcher[T equaler[T]](
fetchFunc func(context.Context) (T, error),
minInterval time.Duration,
maxInterval time.Duration,
consumeFunc func(T) error,
logger Logger,
) *Fetcher[T] {
if minInterval == 0 {
minInterval = DefaultMinInterval
}
if maxInterval == 0 {
maxInterval = DefaultMaxInterval
}
maxInterval = max(minInterval, maxInterval)
return &Fetcher[T]{
fetchFunc: fetchFunc,
minInterval: minInterval,
maxInterval: maxInterval,
currentInterval: minInterval,
consumeFunc: consumeFunc,
logger: logger,
}
}
func (f *Fetcher[T]) Start(ctx context.Context) {
go func() {
// do an initial fetch
res, err := f.fetchFunc(ctx)
if err != nil {
f.logger.Printf("Error invoking fetch: %v", err)
}
f.cache = res
if f.consumeFunc != nil {
if err := f.consumeFunc(res); err != nil {
f.logger.Errorf("Error consuming data: %v", err)
}
}
if f.ticker == nil {
f.ticker = NewTimedTickProvider(f.currentInterval)
}
defer f.ticker.Stop()
for {
select {
case <-ctx.Done():
f.logger.Printf("Fetcher stopped")
return
case <-f.ticker.C():
result, err := f.fetchFunc(ctx)
if err != nil {
f.logger.Errorf("Error fetching data: %v", err)
} else {
if result.Equal(f.cache) {
f.updateFetchIntervalForNoObservedDiff()
f.logger.Printf("No diff observed in fetch, not invoking the consumer")
} else {
f.cache = result
f.updateFetchIntervalForObservedDiff()
if f.consumeFunc != nil {
if err := f.consumeFunc(result); err != nil {
f.logger.Errorf("Error consuming data: %v", err)
}
}
}
}
f.ticker.Reset(f.currentInterval)
}
}
}()
}
func (f *Fetcher[T]) updateFetchIntervalForNoObservedDiff() {
f.currentInterval = min(f.currentInterval*2, f.maxInterval) // nolint:gomnd // doubling logic
}
func (f *Fetcher[T]) updateFetchIntervalForObservedDiff() {
f.currentInterval = f.minInterval
}