-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprogress.go
More file actions
144 lines (112 loc) · 2.43 KB
/
progress.go
File metadata and controls
144 lines (112 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//Package progress calculates an average item processing speed over a double sliding window
package progress
import (
"context"
"errors"
"time"
"golang.org/x/sync/semaphore"
)
type Window struct {
sem *semaphore.Weighted
total time.Duration
interval time.Duration
average int64
current int64
data []int64
historical []int64
pos int
cancel context.CancelFunc
}
//NewWindow returns a double sliding Window that calculates an average item processing speed
func NewWindow(total, interval time.Duration) (*Window, error) {
if total == 0 {
return nil, errors.New("progress: total cannot be 0")
}
if interval == 0 {
return nil, errors.New("progress: interval cannot be 0")
}
if total <= interval || total%interval != 0 {
return nil, errors.New("progress: total has to be a multiplier of interval")
}
cctx, cancel := context.WithCancel(context.Background())
w := &Window{
total: total,
interval: interval,
data: make([]int64, int(total/interval)),
historical: make([]int64, int(total/interval)),
cancel: cancel,
sem: semaphore.NewWeighted(1),
}
for i := range w.data {
w.data[i] = -1
}
for i := range w.historical {
w.historical[i] = -1
}
go w.tick(cctx)
return w, nil
}
func (w *Window) tick(ctx context.Context) {
ticker := time.NewTicker(w.interval)
for {
select {
case <-ticker.C:
w.move(ctx)
case <-ctx.Done():
ticker.Stop()
return
}
}
}
func (w *Window) move(ctx context.Context) {
w.sem.Acquire(ctx, 1)
var total int64
var num int64
w.data[w.pos] = w.current
for _, v := range w.data {
if v == -1 {
continue
}
total += v
num++
}
if num == 0 {
num = 1
}
firstAverage := total / num
w.historical[w.pos] = firstAverage
total = 0
num = 0
for _, v := range w.historical {
if v == -1 {
continue
}
total += v
num++
}
if num == 0 {
num = 1
}
secondAverage := total / num
w.average = secondAverage * int64(len(w.historical))
w.pos++
if w.pos >= len(w.historical) {
w.pos = 0
}
w.current = 0
w.sem.Release(1)
}
//ItemCompleted adds a completed item to the Window
func (w *Window) ItemCompleted() {
w.sem.Acquire(context.Background(), 1)
w.current += 1
w.sem.Release(1)
}
//Average gives you average item processing speed averaged over a double sliding Window
func (w *Window) Average() int64 {
return w.average
}
//End stops the Window's ticking
func (w *Window) End() {
w.cancel()
}