-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoptions.go
More file actions
130 lines (112 loc) · 3.34 KB
/
options.go
File metadata and controls
130 lines (112 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package crank
import (
"time"
"github.com/ogwurujohnson/crank/internal/broker"
)
// Option configures the engine and client created by New.
type Option func(*options)
type options struct {
brokerKind string // "redis", "nats", "pgsql"; must be set explicitly via WithBroker
customBroker broker.Broker // user-provided broker implementation; takes precedence over brokerKind
concurrency int
timeout time.Duration
queues []queueOpt
logger Logger
retryPollInterval time.Duration
reaperInterval time.Duration
redisTimeout time.Duration
useTLS bool
tlsInsecureSkip bool
}
type queueOpt struct {
Name string
Weight int
}
// QueueOption defines a queue name and its polling weight (higher = polled more often).
type QueueOption struct {
Name string
Weight int
}
// WithBroker sets the broker backend explicitly ("redis", "nats", "pgsql").
// Either WithBroker or WithCustomBroker must be provided; there is no default.
func WithBroker(kind string) Option {
return func(o *options) {
o.brokerKind = kind
}
}
// WithCustomBroker injects a user-provided Broker implementation.
// When set, brokerURL and WithBroker are ignored and this broker is used directly.
func WithCustomBroker(b broker.Broker) Option {
return func(o *options) {
o.customBroker = b
}
}
// WithConcurrency sets the number of concurrent workers. Default is 10.
func WithConcurrency(n int) Option {
return func(o *options) {
o.concurrency = n
}
}
// WithTimeout sets the per-job execution timeout. Default is 8 seconds.
func WithTimeout(d time.Duration) Option {
return func(o *options) {
o.timeout = d
}
}
// WithQueues sets the queues to poll. Default is a single queue named "default" with weight 1.
func WithQueues(qs ...QueueOption) Option {
return func(o *options) {
o.queues = make([]queueOpt, len(qs))
for i, q := range qs {
o.queues[i] = queueOpt{Name: q.Name, Weight: q.Weight}
if o.queues[i].Weight <= 0 {
o.queues[i].Weight = 1
}
}
}
}
// WithLogger sets the logger for the engine. If not set, a no-op logger is used.
func WithLogger(l Logger) Option {
return func(o *options) {
o.logger = l
}
}
// WithRetryPollInterval sets how often the retry set is checked. Default is 5 seconds.
func WithRetryPollInterval(d time.Duration) Option {
return func(o *options) {
o.retryPollInterval = d
}
}
// WithReaperInterval sets how often the reaper checks for orphaned jobs. Default is 30 seconds.
func WithReaperInterval(d time.Duration) Option {
return func(o *options) {
o.reaperInterval = d
}
}
// WithRedisTimeout sets the Redis connection/read/write timeout. Default is 5 seconds.
func WithRedisTimeout(d time.Duration) Option {
return func(o *options) {
o.redisTimeout = d
}
}
// WithTLS enables TLS for the Redis connection when the URL does not use rediss://.
func WithTLS(use bool) Option {
return func(o *options) {
o.useTLS = use
}
}
// WithTLSInsecureSkipVerify skips TLS certificate verification (insecure).
func WithTLSInsecureSkipVerify(skip bool) Option {
return func(o *options) {
o.tlsInsecureSkip = skip
}
}
func defaultOptions() options {
return options{
concurrency: 10,
timeout: 8 * time.Second,
queues: []queueOpt{{Name: "default", Weight: 1}},
retryPollInterval: 5 * time.Second,
redisTimeout: 5 * time.Second,
}
}