-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathrejected_test.go
More file actions
108 lines (103 loc) · 3.02 KB
/
rejected_test.go
File metadata and controls
108 lines (103 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package executor
import (
"github.com/stretchr/testify/require"
"sync/atomic"
"testing"
"time"
)
// TestAbortPolicy 测试AbortPolicy 放弃任务执行,直接返回错误信息,默认的拒绝策略
func TestAbortPolicy(t *testing.T) {
executor := NewExecutor(WithCorePoolSize(1), WithTaskBlockingQueue(NewLinkedBlockingRunnableQueue(1)), WithRejectedHandler(AbortPolicy))
number := atomic.Int32{}
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
f, _ := executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
_, err := executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
require.NotNil(t, err)
f.Get()
//任务应该执行了2次,第三次返回异常
require.Equal(t, int32(2), number.Load())
}
// TestCallerRunsPolicy 测试CallerRunsPolicy 用当前提交的协程去进行执行
func TestCallerRunsPolicy(t *testing.T) {
executor := NewExecutor(WithCorePoolSize(1), WithTaskBlockingQueue(NewLinkedBlockingRunnableQueue(1)), WithRejectedHandler(CallerRunsPolicy))
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
})
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
})
var id int64
_, err := executor.Execute(func() {
id = GetGoroutineID()
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
})
time.Sleep(200)
gid := GetGoroutineID()
//协程ID应该相同
require.Equal(t, gid, id)
//不返回错误
require.Nil(t, err)
}
// TestBlockPolicy 测试BlockPolicy 阻塞拒绝处理器,阻塞调用方直到任务队列有空余
func TestBlockPolicy(t *testing.T) {
executor := NewExecutor(WithCorePoolSize(1), WithTaskBlockingQueue(NewLinkedBlockingRunnableQueue(1)), WithRejectedHandler(BlockPolicy))
number := atomic.Int32{}
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
f, err := executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
require.Nil(t, err)
f.Get()
//任务应该都被执行
require.Equal(t, int32(3), number.Load())
}
// TestDiscardOldestPolicy 测试DiscardOldestPolicy 丢弃最老的任务
func TestDiscardOldestPolicy(t *testing.T) {
executor := NewExecutor(WithCorePoolSize(1), WithTaskBlockingQueue(NewLinkedBlockingRunnableQueue(1)), WithRejectedHandler(DiscardOldestPolicy))
number := atomic.Int32{}
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(1)
})
f, err := executor.Execute(func() {
//模拟任务耗时
time.Sleep(100 * time.Millisecond)
number.Add(2)
})
require.Nil(t, err)
f.Get()
//如果第二个被丢弃了,应该值是3
require.Equal(t, int32(3), number.Load())
}