-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworkflow.go
More file actions
545 lines (448 loc) · 20 KB
/
workflow.go
File metadata and controls
545 lines (448 loc) · 20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
package temporal
import (
"context"
"fmt"
"regexp"
"sort"
"time"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"github.com/jasoet/pkg/v2/otel"
)
// WorkflowManager provides workflow query and management operations
type WorkflowManager struct {
client client.Client
ownsClient bool
namespace string
}
// WorkflowDetails contains detailed information about a workflow execution
type WorkflowDetails struct {
WorkflowID string
RunID string
WorkflowType string
Status enums.WorkflowExecutionStatus
StartTime time.Time
CloseTime time.Time
ExecutionTime time.Duration
HistoryLength int64
}
// DashboardStats provides aggregated workflow statistics
type DashboardStats struct {
TotalRunning int64
TotalCompleted int64
TotalFailed int64
TotalCanceled int64
TotalTerminated int64
AverageDuration time.Duration
}
// safeIdentifier validates that a string is safe for use in visibility queries.
// It allows only alphanumeric characters, hyphens, underscores, and dots.
var safeIdentifier = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`)
// validateQueryParam validates a string is safe for use in visibility queries.
func validateQueryParam(param string) error {
if !safeIdentifier.MatchString(param) {
return fmt.Errorf("invalid query parameter %q: must contain only alphanumeric characters, hyphens, underscores, and dots", param)
}
return nil
}
// NewWorkflowManagerWithNamespace creates a new WorkflowManager with an explicit
// namespace when using an existing client.Client. When a *Config is passed the
// namespace is taken from the config and the namespace parameter is ignored.
func NewWorkflowManagerWithNamespace(clientOrConfig interface{}, namespace string) (*WorkflowManager, error) {
ctx := context.Background()
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "temporal.NewWorkflowManagerWithNamespace")
var temporalClient client.Client
var ownsClient bool
switch v := clientOrConfig.(type) {
case client.Client:
// If passed a client directly, use it (caller retains ownership)
temporalClient = v
ownsClient = false
logger.Debug("Using provided Temporal client for Workflow Manager", otel.F("namespace", namespace))
case *Config:
// If passed a config, create a new client (we own it)
namespace = v.Namespace
logger.Debug("Creating new Workflow Manager with config",
otel.F("hostPort", v.HostPort),
otel.F("namespace", namespace))
var err error
temporalClient, err = NewClient(v)
if err != nil {
logger.Error(err, "Failed to create Temporal client for Workflow Manager")
return nil, fmt.Errorf("create temporal client: %w", err)
}
ownsClient = true
default:
logger.Error(nil, "Invalid argument type for NewWorkflowManagerWithNamespace")
return nil, fmt.Errorf("invalid argument type: expected client.Client or *Config")
}
logger.Debug("Workflow Manager created successfully")
return &WorkflowManager{
client: temporalClient,
ownsClient: ownsClient,
namespace: namespace,
}, nil
}
// NewWorkflowManager creates a new WorkflowManager instance.
// Accepts either a client.Client or *Config.
// When a client.Client is provided the namespace defaults to "default";
// use NewWorkflowManagerWithNamespace to specify a different namespace.
func NewWorkflowManager(clientOrConfig interface{}) (*WorkflowManager, error) {
return NewWorkflowManagerWithNamespace(clientOrConfig, "default")
}
// Close closes the Workflow Manager and its client if it was created by the manager
func (wm *WorkflowManager) Close() {
ctx := context.Background()
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.Close")
logger.Debug("Closing Workflow Manager")
if wm.ownsClient && wm.client != nil {
logger.Debug("Closing Temporal client")
wm.client.Close()
}
logger.Debug("Workflow Manager closed")
}
// GetClient returns the internal Temporal client. Callers must not close this
// client independently; use Close() on the manager instead.
func (wm *WorkflowManager) GetClient() client.Client {
return wm.client
}
// ListWorkflows lists workflows with pagination and optional query filter
func (wm *WorkflowManager) ListWorkflows(ctx context.Context, pageSize int, query string) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.ListWorkflows")
logger.Debug("Listing workflows",
otel.F("pageSize", pageSize),
otel.F("query", query))
request := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: wm.namespace,
PageSize: int32(pageSize),
Query: query,
}
response, err := wm.client.WorkflowService().ListWorkflowExecutions(ctx, request)
if err != nil {
logger.Error(err, "Failed to list workflow executions")
return nil, fmt.Errorf("list workflow executions: %w", err)
}
workflows := make([]*WorkflowDetails, 0, len(response.Executions))
for _, exec := range response.Executions {
details := &WorkflowDetails{
WorkflowID: exec.Execution.WorkflowId,
RunID: exec.Execution.RunId,
WorkflowType: exec.Type.Name,
Status: exec.Status,
StartTime: exec.StartTime.AsTime(),
HistoryLength: exec.HistoryLength,
}
if exec.CloseTime != nil {
details.CloseTime = exec.CloseTime.AsTime()
details.ExecutionTime = details.CloseTime.Sub(details.StartTime)
}
workflows = append(workflows, details)
}
logger.Debug("Workflows listed successfully", otel.F("count", len(workflows)))
return workflows, nil
}
// DescribeWorkflow retrieves detailed information about a specific workflow execution
func (wm *WorkflowManager) DescribeWorkflow(ctx context.Context, workflowID, runID string) (*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.DescribeWorkflow")
logger.Debug("Describing workflow",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
response, err := wm.client.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
logger.Error(err, "Failed to describe workflow execution",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
return nil, fmt.Errorf("describe workflow %q: %w", workflowID, err)
}
details := &WorkflowDetails{
WorkflowID: response.WorkflowExecutionInfo.Execution.WorkflowId,
RunID: response.WorkflowExecutionInfo.Execution.RunId,
WorkflowType: response.WorkflowExecutionInfo.Type.Name,
Status: response.WorkflowExecutionInfo.Status,
StartTime: response.WorkflowExecutionInfo.StartTime.AsTime(),
HistoryLength: response.WorkflowExecutionInfo.HistoryLength,
}
if response.WorkflowExecutionInfo.CloseTime != nil {
details.CloseTime = response.WorkflowExecutionInfo.CloseTime.AsTime()
details.ExecutionTime = details.CloseTime.Sub(details.StartTime)
}
logger.Debug("Workflow described successfully",
otel.F("workflowID", workflowID),
otel.F("status", details.Status))
return details, nil
}
// GetWorkflowStatus returns the current status of a workflow execution
func (wm *WorkflowManager) GetWorkflowStatus(ctx context.Context, workflowID, runID string) (enums.WorkflowExecutionStatus, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.GetWorkflowStatus")
logger.Debug("Getting workflow status",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
details, err := wm.DescribeWorkflow(ctx, workflowID, runID)
if err != nil {
logger.Error(err, "Failed to get workflow status",
otel.F("workflowID", workflowID))
return enums.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, err
}
logger.Debug("Workflow status retrieved",
otel.F("workflowID", workflowID),
otel.F("status", details.Status))
return details.Status, nil
}
// GetWorkflowHistory retrieves the event history of a workflow execution
func (wm *WorkflowManager) GetWorkflowHistory(ctx context.Context, workflowID, runID string) (*workflowservice.GetWorkflowExecutionHistoryResponse, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.GetWorkflowHistory")
logger.Debug("Getting workflow history",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
request := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: wm.namespace,
Execution: &common.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
}
response, err := wm.client.WorkflowService().GetWorkflowExecutionHistory(ctx, request)
if err != nil {
logger.Error(err, "Failed to get workflow history",
otel.F("workflowID", workflowID))
return nil, fmt.Errorf("get workflow history %q: %w", workflowID, err)
}
logger.Debug("Workflow history retrieved successfully",
otel.F("workflowID", workflowID),
otel.F("eventCount", len(response.History.Events)))
return response, nil
}
// CancelWorkflow cancels a running workflow execution
func (wm *WorkflowManager) CancelWorkflow(ctx context.Context, workflowID, runID string) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.CancelWorkflow")
logger.Debug("Canceling workflow",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
err := wm.client.CancelWorkflow(ctx, workflowID, runID)
if err != nil {
logger.Error(err, "Failed to cancel workflow",
otel.F("workflowID", workflowID))
return fmt.Errorf("cancel workflow %q: %w", workflowID, err)
}
logger.Debug("Workflow canceled successfully",
otel.F("workflowID", workflowID))
return nil
}
// TerminateWorkflow terminates a workflow execution with a reason
func (wm *WorkflowManager) TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.TerminateWorkflow")
logger.Debug("Terminating workflow",
otel.F("workflowID", workflowID),
otel.F("runID", runID),
otel.F("reason", reason))
err := wm.client.TerminateWorkflow(ctx, workflowID, runID, reason)
if err != nil {
logger.Error(err, "Failed to terminate workflow",
otel.F("workflowID", workflowID))
return fmt.Errorf("terminate workflow %q: %w", workflowID, err)
}
logger.Debug("Workflow terminated successfully",
otel.F("workflowID", workflowID))
return nil
}
// SignalWorkflow sends a signal to a running workflow
func (wm *WorkflowManager) SignalWorkflow(ctx context.Context, workflowID, runID, signalName string, arg interface{}) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.SignalWorkflow")
logger.Debug("Signaling workflow",
otel.F("workflowID", workflowID),
otel.F("runID", runID),
otel.F("signalName", signalName))
err := wm.client.SignalWorkflow(ctx, workflowID, runID, signalName, arg)
if err != nil {
logger.Error(err, "Failed to signal workflow",
otel.F("workflowID", workflowID),
otel.F("signalName", signalName))
return fmt.Errorf("signal workflow %q with %q: %w", workflowID, signalName, err)
}
logger.Debug("Workflow signaled successfully",
otel.F("workflowID", workflowID),
otel.F("signalName", signalName))
return nil
}
// QueryWorkflow queries a running workflow for custom data
func (wm *WorkflowManager) QueryWorkflow(ctx context.Context, workflowID, runID, queryType string, args ...interface{}) (interface{}, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.QueryWorkflow")
logger.Debug("Querying workflow",
otel.F("workflowID", workflowID),
otel.F("runID", runID),
otel.F("queryType", queryType))
result, err := wm.client.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err != nil {
logger.Error(err, "Failed to query workflow",
otel.F("workflowID", workflowID),
otel.F("queryType", queryType))
return nil, fmt.Errorf("query workflow %q type %q: %w", workflowID, queryType, err)
}
logger.Debug("Workflow queried successfully",
otel.F("workflowID", workflowID),
otel.F("queryType", queryType))
return result, nil
}
// ListWorkflowsByStatus lists workflows filtered by execution status
func (wm *WorkflowManager) ListWorkflowsByStatus(ctx context.Context, status enums.WorkflowExecutionStatus, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.ListWorkflowsByStatus")
logger.Debug("Listing workflows by status",
otel.F("status", status.String()),
otel.F("pageSize", pageSize))
statusStr := status.String()
if err := validateQueryParam(statusStr); err != nil {
return nil, fmt.Errorf("invalid status: %w", err)
}
query := fmt.Sprintf("ExecutionStatus='%s'", statusStr)
return wm.ListWorkflows(ctx, pageSize, query)
}
// ListRunningWorkflows returns all currently running workflows
func (wm *WorkflowManager) ListRunningWorkflows(ctx context.Context, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.ListRunningWorkflows")
logger.Debug("Listing running workflows", otel.F("pageSize", pageSize))
return wm.ListWorkflowsByStatus(ctx, enums.WORKFLOW_EXECUTION_STATUS_RUNNING, pageSize)
}
// ListCompletedWorkflows returns completed workflows
func (wm *WorkflowManager) ListCompletedWorkflows(ctx context.Context, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.ListCompletedWorkflows")
logger.Debug("Listing completed workflows", otel.F("pageSize", pageSize))
return wm.ListWorkflowsByStatus(ctx, enums.WORKFLOW_EXECUTION_STATUS_COMPLETED, pageSize)
}
// ListFailedWorkflows returns failed workflows
func (wm *WorkflowManager) ListFailedWorkflows(ctx context.Context, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.ListFailedWorkflows")
logger.Debug("Listing failed workflows", otel.F("pageSize", pageSize))
return wm.ListWorkflowsByStatus(ctx, enums.WORKFLOW_EXECUTION_STATUS_FAILED, pageSize)
}
// SearchWorkflowsByType searches workflows by workflow type name
func (wm *WorkflowManager) SearchWorkflowsByType(ctx context.Context, workflowType string, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.SearchWorkflowsByType")
logger.Debug("Searching workflows by type",
otel.F("workflowType", workflowType),
otel.F("pageSize", pageSize))
if err := validateQueryParam(workflowType); err != nil {
return nil, fmt.Errorf("invalid workflow type: %w", err)
}
query := fmt.Sprintf("WorkflowType='%s'", workflowType)
return wm.ListWorkflows(ctx, pageSize, query)
}
// SearchWorkflowsByID searches for workflows matching a workflow ID pattern
func (wm *WorkflowManager) SearchWorkflowsByID(ctx context.Context, workflowIDPrefix string, pageSize int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.SearchWorkflowsByID")
logger.Debug("Searching workflows by ID",
otel.F("workflowIDPrefix", workflowIDPrefix),
otel.F("pageSize", pageSize))
if err := validateQueryParam(workflowIDPrefix); err != nil {
return nil, fmt.Errorf("invalid workflow ID prefix: %w", err)
}
query := fmt.Sprintf("WorkflowId STARTS_WITH '%s'", workflowIDPrefix)
return wm.ListWorkflows(ctx, pageSize, query)
}
// CountWorkflows counts workflows matching a query
func (wm *WorkflowManager) CountWorkflows(ctx context.Context, query string) (int64, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.CountWorkflows")
logger.Debug("Counting workflows", otel.F("query", query))
request := &workflowservice.CountWorkflowExecutionsRequest{
Namespace: wm.namespace,
Query: query,
}
response, err := wm.client.WorkflowService().CountWorkflowExecutions(ctx, request)
if err != nil {
logger.Error(err, "Failed to count workflow executions")
return 0, fmt.Errorf("count workflow executions: %w", err)
}
logger.Debug("Workflows counted successfully", otel.F("count", response.Count))
return response.Count, nil
}
// GetDashboardStats retrieves aggregated statistics for all workflows
func (wm *WorkflowManager) GetDashboardStats(ctx context.Context) (*DashboardStats, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.GetDashboardStats")
logger.Debug("Getting dashboard statistics")
stats := &DashboardStats{}
// Count workflows by status using the enum's String() method to avoid hardcoded strings.
runningStatus := enums.WORKFLOW_EXECUTION_STATUS_RUNNING.String()
runningCount, err := wm.CountWorkflows(ctx, fmt.Sprintf("ExecutionStatus='%s'", runningStatus))
if err != nil {
logger.Error(err, "Failed to count running workflows")
return nil, err
}
stats.TotalRunning = runningCount
completedStatus := enums.WORKFLOW_EXECUTION_STATUS_COMPLETED.String()
completedCount, err := wm.CountWorkflows(ctx, fmt.Sprintf("ExecutionStatus='%s'", completedStatus))
if err != nil {
logger.Error(err, "Failed to count completed workflows")
return nil, err
}
stats.TotalCompleted = completedCount
failedStatus := enums.WORKFLOW_EXECUTION_STATUS_FAILED.String()
failedCount, err := wm.CountWorkflows(ctx, fmt.Sprintf("ExecutionStatus='%s'", failedStatus))
if err != nil {
logger.Error(err, "Failed to count failed workflows")
return nil, err
}
stats.TotalFailed = failedCount
canceledStatus := enums.WORKFLOW_EXECUTION_STATUS_CANCELED.String()
canceledCount, err := wm.CountWorkflows(ctx, fmt.Sprintf("ExecutionStatus='%s'", canceledStatus))
if err != nil {
logger.Error(err, "Failed to count canceled workflows")
return nil, err
}
stats.TotalCanceled = canceledCount
terminatedStatus := enums.WORKFLOW_EXECUTION_STATUS_TERMINATED.String()
terminatedCount, err := wm.CountWorkflows(ctx, fmt.Sprintf("ExecutionStatus='%s'", terminatedStatus))
if err != nil {
logger.Error(err, "Failed to count terminated workflows")
return nil, err
}
stats.TotalTerminated = terminatedCount
// Calculate average duration from completed workflows
completedWorkflows, err := wm.ListCompletedWorkflows(ctx, 100)
if err != nil {
logger.Error(err, "Failed to list completed workflows for duration calculation")
// Don't fail, just skip average duration
} else if len(completedWorkflows) > 0 {
var totalDuration time.Duration
for _, wf := range completedWorkflows {
totalDuration += wf.ExecutionTime
}
stats.AverageDuration = totalDuration / time.Duration(len(completedWorkflows))
}
logger.Debug("Dashboard statistics retrieved successfully",
otel.F("running", stats.TotalRunning),
otel.F("completed", stats.TotalCompleted),
otel.F("failed", stats.TotalFailed))
return stats, nil
}
// GetRecentWorkflows retrieves the most recent workflow executions
func (wm *WorkflowManager) GetRecentWorkflows(ctx context.Context, limit int) ([]*WorkflowDetails, error) {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.GetRecentWorkflows")
logger.Debug("Getting recent workflows", otel.F("limit", limit))
workflows, err := wm.ListWorkflows(ctx, limit, "")
if err != nil {
return nil, err
}
// Sort by StartTime descending (most recent first)
sort.Slice(workflows, func(i, j int) bool {
return workflows[i].StartTime.After(workflows[j].StartTime)
})
return workflows, nil
}
// GetWorkflowResult retrieves the result of a completed workflow
func (wm *WorkflowManager) GetWorkflowResult(ctx context.Context, workflowID, runID string, valuePtr interface{}) error {
logger := otel.NewLogHelper(ctx, nil, "github.com/jasoet/pkg/v2/temporal", "WorkflowManager.GetWorkflowResult")
logger.Debug("Getting workflow result",
otel.F("workflowID", workflowID),
otel.F("runID", runID))
run := wm.client.GetWorkflow(ctx, workflowID, runID)
err := run.Get(ctx, valuePtr)
if err != nil {
logger.Error(err, "Failed to get workflow result",
otel.F("workflowID", workflowID))
return fmt.Errorf("get workflow result %q: %w", workflowID, err)
}
logger.Debug("Workflow result retrieved successfully",
otel.F("workflowID", workflowID))
return nil
}