-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworker.go
More file actions
167 lines (135 loc) · 4.34 KB
/
worker.go
File metadata and controls
167 lines (135 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package temporal
import (
"context"
"sync"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"github.com/jasoet/pkg/v2/otel"
)
type WorkerManager struct {
client client.Client
mu sync.RWMutex
workers []worker.Worker
}
func NewWorkerManager(config *Config) (*WorkerManager, error) {
ctx := context.Background()
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "temporal.NewWorkerManager")
logger.Debug("Creating new Worker Manager",
otel.F("hostPort", config.HostPort),
otel.F("namespace", config.Namespace))
temporalClient, err := NewClient(config)
if err != nil {
logger.Error(err, "Failed to create Temporal client for Worker Manager")
return nil, err
}
logger.Debug("Worker Manager created successfully")
return &WorkerManager{
client: temporalClient,
workers: make([]worker.Worker, 0),
}, nil
}
func (wm *WorkerManager) Close() {
ctx := context.Background()
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkerManager.Close")
wm.mu.RLock()
workerCount := len(wm.workers)
wm.mu.RUnlock()
logger.Debug("Closing Worker Manager", otel.F("workerCount", workerCount))
if workerCount > 0 {
logger.Debug("Stopping all workers")
wm.mu.RLock()
for i, w := range wm.workers {
logger.Debug("Stopping worker", otel.F("workerIndex", i))
w.Stop()
}
wm.mu.RUnlock()
logger.Debug("All workers stopped")
} else {
logger.Debug("No workers to stop")
}
if wm.client != nil {
logger.Debug("Closing Temporal client")
wm.client.Close()
}
logger.Debug("Worker Manager closed")
}
func (wm *WorkerManager) Register(taskQueue string, options worker.Options) worker.Worker {
ctx := context.Background()
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkerManager.Register")
logger.Debug("Registering new Temporal worker", otel.F("taskQueue", taskQueue))
logger.Debug("Creating worker instance")
w := worker.New(wm.client, taskQueue, options)
wm.mu.Lock()
wm.workers = append(wm.workers, w)
totalWorkers := len(wm.workers)
wm.mu.Unlock()
logger.Debug("Worker registered successfully",
otel.F("taskQueue", taskQueue),
otel.F("totalWorkers", totalWorkers))
return w
}
// Start starts the given worker. The ctx parameter is used for logging only;
// the worker's internal lifecycle is managed by the Temporal SDK.
func (wm *WorkerManager) Start(ctx context.Context, w worker.Worker) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkerManager.Start")
// Try to get the worker index from the registered list for logging purposes.
workerIndex := -1
wm.mu.RLock()
for i, registeredWorker := range wm.workers {
if registeredWorker == w {
workerIndex = i
break
}
}
wm.mu.RUnlock()
if workerIndex >= 0 {
logger.Debug("Starting Temporal worker", otel.F("workerIndex", workerIndex))
} else {
logger.Debug("Starting Temporal worker")
}
err := w.Start()
if err != nil {
logger.Error(err, "Failed to start Temporal worker")
return err
}
logger.Debug("Temporal worker started successfully")
return nil
}
func (wm *WorkerManager) StartAll(ctx context.Context) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkerManager.StartAll")
wm.mu.RLock()
workerCount := len(wm.workers)
wm.mu.RUnlock()
logger.Debug("Starting all Temporal workers", otel.F("workerCount", workerCount))
if workerCount == 0 {
logger.Warn("No workers to start")
return nil
}
wm.mu.RLock()
for i, w := range wm.workers {
logger.Debug("Starting worker", otel.F("workerIndex", i))
err := w.Start()
if err != nil {
wm.mu.RUnlock()
logger.Error(err, "Failed to start worker", otel.F("workerIndex", i))
return err
}
logger.Debug("Worker started successfully", otel.F("workerIndex", i))
}
wm.mu.RUnlock()
logger.Debug("All Temporal workers started successfully", otel.F("workerCount", workerCount))
return nil
}
// GetClient returns the internal Temporal client. Callers must not close this
// client independently; use Close() on the manager instead.
func (wm *WorkerManager) GetClient() client.Client {
return wm.client
}
func (wm *WorkerManager) GetWorkers() []worker.Worker {
wm.mu.RLock()
defer wm.mu.RUnlock()
// Return a copy to prevent concurrent slice access
workers := make([]worker.Worker, len(wm.workers))
copy(workers, wm.workers)
return workers
}