-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup.go
More file actions
81 lines (69 loc) · 2.68 KB
/
group.go
File metadata and controls
81 lines (69 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package lifecycle
import (
"context"
"fmt"
"time"
"github.com/aretw0/lifecycle/pkg/core/metrics"
"github.com/aretw0/lifecycle/pkg/core/runtime"
"golang.org/x/sync/errgroup"
)
// Group is a wrapper around errgroup.Group that adds lifecycle safety features:
// - Panic Recovery: Captures panics in goroutines and returns them as errors.
// - Observability: Tracks started/finished goroutines via pkg/metrics.
// - Context Propagation: Inherits context from the parent.
type Group struct {
g *errgroup.Group
ctx context.Context
}
// NewGroup creates a new Group derived from the given context.
// It acts like errgroup.WithContext.
func NewGroup(ctx context.Context) (*Group, context.Context) {
g, ctx := errgroup.WithContext(ctx)
return &Group{g: g, ctx: ctx}, ctx
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
func (g *Group) SetLimit(n int) {
g.g.SetLimit(n)
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// If the function triggers a panic, it is recovered, logged, and returned as an error,
// which will cancel the Group's context.
func (g *Group) Go(fn func(ctx context.Context) error) {
// We wrap the function to handle panics and metrics.
// We also verify backpressure by measuring how long it takes to schedule.
start := time.Now()
// Signal that we are attempting to schedule (entering wait queue)
metrics.GetProvider().IncGoroutineWaiting()
// We need to track Goroutine semantics explicitly for Group,
// because runtime.Do tracks "Critical Operations" which is slightly different.
g.g.Go(func() (err error) {
metrics.GetProvider().DecGoroutineWaiting()
metrics.GetProvider().IncGoroutineStarted()
defer metrics.GetProvider().IncGoroutineFinished()
defer func() {
if r := recover(); r != nil {
// runtime.Do() already logged the panic.
// We recover it here to return it as an error to the errgroup,
// which will cancel the context.
metrics.GetProvider().IncGoroutinePanicked()
err = fmt.Errorf("panic in lifecycle.Group: %v", r)
}
}()
// Use runtime.Do for execution safety
return runtime.Do(g.ctx, fn)
})
// Measure backpressure (Time spent blocked by SetLimit)
// Note: errgroup.Go returns when the goroutine is scheduled.
if d := time.Since(start); d > 100*time.Microsecond {
metrics.GetProvider().ObserveGoroutineBlockDuration(d)
}
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
return g.g.Wait()
}