-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.go
More file actions
131 lines (109 loc) · 3.08 KB
/
worker.go
File metadata and controls
131 lines (109 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package tq
import (
"context"
"fmt"
"log"
"sync"
)
type pool struct {
// Number of workers to spin up.
count int
// Pull tasks from distributed stream.
tasks Stream
// Push results onto distributed stream.
results Stream
// Multiplex a set of task handlers on startup.
handler *serveMux
}
func NewWorkerPool(tasks, results Stream, count int) pool {
return pool{
count: count,
tasks: tasks,
results: results,
handler: NewServeMux(),
}
}
func (s *pool) Register(key string, handler func(context.Context, *Task) *Result) {
s.handler.Register(key, handler)
}
func (s *pool) Serve(ctx context.Context) {
log.Println("Serving workers")
var wg sync.WaitGroup
s.process(ctx, &wg)
wg.Wait()
}
func (s *pool) process(ctx context.Context, wg *sync.WaitGroup) {
// Pull events from broker into local channels.
// Defining a function creates a smaller lexical scope for confinement.
taskStream := func() <-chan *Task {
wg.Add(1)
stream := make(chan *Task)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
msgs, err := s.tasks.Dequeue(ctx)
if err != nil {
log.Fatalln(err)
}
for _, t := range msgs {
stream <- t
}
}
}
}()
return stream
}
// Push results from channel onto distributed stream.
// Defining a function creates a smaller lexical scope for confinement.
resultStream := func() chan<- *Task {
wg.Add(1)
stream := make(chan *Task)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case t := <-stream:
// Enqueue the process task onto the result stream.
err := s.results.Enqueue(ctx, t)
if err != nil {
log.Fatalf(": %v", err)
}
// Send ack to task stream that process results are on the result stream.
//err := s.tasks.Ack(ctx, r.Id)
//if err != nil {
// log.Fatalln("unable to ack task: %w", err)
//}
}
}
}()
return stream
}
for i := 0; i < s.count; i++ {
wg.Add(1)
go worker(ctx, wg, s.handler, taskStream(), resultStream())
}
}
// Ensure confinement by keeping the concurrent scope small.
func worker(ctx context.Context, wg *sync.WaitGroup, handler *serveMux, tasks <-chan *Task, results chan<- *Task) {
defer wg.Done()
for {
select {
case task, ok := <-tasks:
if !ok {
return
}
// Fan-In job execution multiplexing results into the results channel.
task.Result = *handler.ProcessTask(ctx, task)
results <- task
case <-ctx.Done():
fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
return
}
}
}