Skip to content

Commit 43f3db9

Browse files
authored
Add heartbeat API every 5 seconds (#149)
1 parent 9597375 commit 43f3db9

File tree

8 files changed

+599
-4
lines changed

8 files changed

+599
-4
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Package heartbeatjob
2+
package heartbeatjob
3+
4+
import (
5+
"context"
6+
"sync"
7+
8+
"hostlink/app/services/heartbeat"
9+
)
10+
11+
type TriggerFunc func(context.Context, func() error)
12+
13+
type HeartbeatJobConfig struct {
14+
Trigger TriggerFunc
15+
}
16+
17+
type HeartbeatJob struct {
18+
config HeartbeatJobConfig
19+
cancel context.CancelFunc
20+
wg sync.WaitGroup
21+
}
22+
23+
func New() HeartbeatJob {
24+
return NewWithConfig(HeartbeatJobConfig{
25+
Trigger: Trigger,
26+
})
27+
}
28+
29+
func NewWithConfig(cfg HeartbeatJobConfig) HeartbeatJob {
30+
if cfg.Trigger == nil {
31+
cfg.Trigger = Trigger
32+
}
33+
34+
return HeartbeatJob{
35+
config: cfg,
36+
}
37+
}
38+
39+
func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service) context.CancelFunc {
40+
ctx, cancel := context.WithCancel(ctx)
41+
hj.cancel = cancel
42+
43+
hj.wg.Add(1)
44+
go func() {
45+
defer hj.wg.Done()
46+
hj.config.Trigger(ctx, func() error {
47+
return svc.Send()
48+
})
49+
}()
50+
51+
return cancel
52+
}
53+
54+
func (hj *HeartbeatJob) Shutdown() {
55+
if hj.cancel != nil {
56+
hj.cancel()
57+
}
58+
hj.wg.Wait()
59+
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package heartbeatjob
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
)
14+
15+
type MockHeartbeatService struct {
16+
mock.Mock
17+
}
18+
19+
func (m *MockHeartbeatService) Send() error {
20+
args := m.Called()
21+
return args.Error(0)
22+
}
23+
24+
func immediateTrigger(callCount int, done chan struct{}) TriggerFunc {
25+
return func(ctx context.Context, fn func() error) {
26+
for i := 0; i < callCount; i++ {
27+
fn()
28+
}
29+
close(done)
30+
<-ctx.Done()
31+
}
32+
}
33+
34+
// TestNew_DefaultsTrigger - New() uses default trigger
35+
func TestNew_DefaultsTrigger(t *testing.T) {
36+
job := New()
37+
38+
assert.NotNil(t, job.config.Trigger)
39+
}
40+
41+
// TestNewWithConfig_UsesCustomTrigger - custom trigger is used when provided
42+
func TestNewWithConfig_UsesCustomTrigger(t *testing.T) {
43+
customTriggerCalled := false
44+
customTrigger := func(ctx context.Context, fn func() error) {
45+
customTriggerCalled = true
46+
<-ctx.Done()
47+
}
48+
49+
job := NewWithConfig(HeartbeatJobConfig{
50+
Trigger: customTrigger,
51+
})
52+
53+
svc := new(MockHeartbeatService)
54+
ctx := context.Background()
55+
cancel := job.Register(ctx, svc)
56+
cancel()
57+
job.Shutdown()
58+
59+
assert.True(t, customTriggerCalled)
60+
}
61+
62+
// TestNewWithConfig_DefaultsNilTrigger - nil trigger defaults to Trigger
63+
func TestNewWithConfig_DefaultsNilTrigger(t *testing.T) {
64+
job := NewWithConfig(HeartbeatJobConfig{
65+
Trigger: nil,
66+
})
67+
68+
assert.NotNil(t, job.config.Trigger)
69+
}
70+
71+
// TestRegister_CallsServiceSend - trigger calls heartbeat.Service.Send()
72+
func TestRegister_CallsServiceSend(t *testing.T) {
73+
svc := new(MockHeartbeatService)
74+
svc.On("Send").Return(nil).Times(3)
75+
76+
done := make(chan struct{})
77+
job := NewWithConfig(HeartbeatJobConfig{
78+
Trigger: immediateTrigger(3, done),
79+
})
80+
81+
ctx := context.Background()
82+
cancel := job.Register(ctx, svc)
83+
<-done
84+
cancel()
85+
job.Shutdown()
86+
87+
svc.AssertNumberOfCalls(t, "Send", 3)
88+
}
89+
90+
// TestRegister_ContinuesOnError - job continues running after Send() error
91+
func TestRegister_ContinuesOnError(t *testing.T) {
92+
svc := new(MockHeartbeatService)
93+
svc.On("Send").Return(errors.New("connection refused")).Times(3)
94+
95+
done := make(chan struct{})
96+
job := NewWithConfig(HeartbeatJobConfig{
97+
Trigger: immediateTrigger(3, done),
98+
})
99+
100+
ctx := context.Background()
101+
cancel := job.Register(ctx, svc)
102+
<-done
103+
cancel()
104+
job.Shutdown()
105+
106+
svc.AssertNumberOfCalls(t, "Send", 3)
107+
}
108+
109+
// TestRegister_ReturnsCancel - returns cancel function
110+
func TestRegister_ReturnsCancel(t *testing.T) {
111+
svc := new(MockHeartbeatService)
112+
113+
job := NewWithConfig(HeartbeatJobConfig{
114+
Trigger: func(ctx context.Context, fn func() error) {
115+
<-ctx.Done()
116+
},
117+
})
118+
119+
ctx := context.Background()
120+
cancel := job.Register(ctx, svc)
121+
122+
assert.NotNil(t, cancel)
123+
cancel()
124+
job.Shutdown()
125+
}
126+
127+
// TestShutdown_StopsJob - Shutdown() stops the job gracefully
128+
func TestShutdown_StopsJob(t *testing.T) {
129+
var callCount atomic.Int32
130+
svc := new(MockHeartbeatService)
131+
svc.On("Send").Return(nil).Run(func(args mock.Arguments) {
132+
callCount.Add(1)
133+
})
134+
135+
job := NewWithConfig(HeartbeatJobConfig{
136+
Trigger: func(ctx context.Context, fn func() error) {
137+
for {
138+
select {
139+
case <-ctx.Done():
140+
return
141+
default:
142+
fn()
143+
time.Sleep(1 * time.Millisecond)
144+
}
145+
}
146+
},
147+
})
148+
149+
ctx := context.Background()
150+
job.Register(ctx, svc)
151+
152+
time.Sleep(10 * time.Millisecond)
153+
countBeforeShutdown := callCount.Load()
154+
job.Shutdown()
155+
156+
time.Sleep(10 * time.Millisecond)
157+
countAfterShutdown := callCount.Load()
158+
159+
assert.Equal(t, countBeforeShutdown, countAfterShutdown)
160+
}
161+
162+
// TestShutdown_WaitsForCompletion - Shutdown() waits for goroutine to finish
163+
func TestShutdown_WaitsForCompletion(t *testing.T) {
164+
var wg sync.WaitGroup
165+
goroutineFinished := false
166+
167+
svc := new(MockHeartbeatService)
168+
169+
job := NewWithConfig(HeartbeatJobConfig{
170+
Trigger: func(ctx context.Context, fn func() error) {
171+
<-ctx.Done()
172+
time.Sleep(20 * time.Millisecond)
173+
goroutineFinished = true
174+
},
175+
})
176+
177+
ctx := context.Background()
178+
job.Register(ctx, svc)
179+
180+
wg.Add(1)
181+
go func() {
182+
defer wg.Done()
183+
job.Shutdown()
184+
}()
185+
186+
wg.Wait()
187+
assert.True(t, goroutineFinished)
188+
}

app/jobs/heartbeatjob/trigger.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package heartbeatjob
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/labstack/gommon/log"
8+
)
9+
10+
type TriggerConfig struct {
11+
Interval time.Duration
12+
}
13+
14+
func DefaultTriggerConfig() TriggerConfig {
15+
return TriggerConfig{
16+
Interval: 5 * time.Second,
17+
}
18+
}
19+
20+
func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfig) {
21+
for {
22+
select {
23+
case <-ctx.Done():
24+
return
25+
case <-time.After(config.Interval):
26+
if err := fn(); err != nil {
27+
log.Errorf("heartbeat failed: %s", err)
28+
}
29+
}
30+
}
31+
}
32+
33+
func Trigger(ctx context.Context, fn func() error) {
34+
TriggerWithConfig(ctx, fn, DefaultTriggerConfig())
35+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package heartbeat
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"hostlink/app/services/agentstate"
8+
"hostlink/config/appconf"
9+
"hostlink/internal/apiserver"
10+
)
11+
12+
type Service interface {
13+
Send() error
14+
}
15+
16+
type heartbeatService struct {
17+
apiserver apiserver.HeartbeatOperations
18+
agentstate agentstate.Operations
19+
}
20+
21+
func New() (*heartbeatService, error) {
22+
return NewWithConf()
23+
}
24+
25+
func NewWithConf() (*heartbeatService, error) {
26+
state := agentstate.New(appconf.AgentStatePath())
27+
if err := state.Load(); err != nil {
28+
return nil, err
29+
}
30+
31+
client, err := apiserver.NewDefaultClient()
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to create api client: %w", err)
34+
}
35+
36+
return &heartbeatService{
37+
apiserver: client,
38+
agentstate: state,
39+
}, nil
40+
}
41+
42+
func NewWithDependencies(
43+
apiserver apiserver.HeartbeatOperations,
44+
agentstate agentstate.Operations,
45+
) *heartbeatService {
46+
return &heartbeatService{
47+
apiserver: apiserver,
48+
agentstate: agentstate,
49+
}
50+
}
51+
52+
func (s *heartbeatService) Send() error {
53+
agentID := s.agentstate.GetAgentID()
54+
if agentID == "" {
55+
return fmt.Errorf("agent not registered: missing agent ID")
56+
}
57+
58+
return s.apiserver.Heartbeat(context.Background(), agentID)
59+
}

0 commit comments

Comments
 (0)