Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
643a708
Add worker heartbeat support
yuandrew Jan 14, 2026
6af4db8
vendor gopsutil
yuandrew Jan 23, 2026
04ff20f
PR feedback
yuandrew Jan 23, 2026
819080b
Sort plugin names
yuandrew Jan 27, 2026
c156e0d
Create new hostinfo package
yuandrew Jan 27, 2026
ebf1064
make methods/structs private, remove aw.workerHeartbeatManager
yuandrew Jan 28, 2026
b8893b9
tighten lock, consolidate describeNamespace calls to a single call in…
yuandrew Jan 28, 2026
a6135de
simplify heartbeat metrics, decouple poller/worker type from WithTags()
yuandrew Jan 28, 2026
4f43e75
remove unused nexus worker, tighten heartbeat callback and make concu…
yuandrew Jan 28, 2026
54ddf1f
Merge branch 'master' into worker-heartbeat
yuandrew Jan 28, 2026
e7fbc03
Fix tests
yuandrew Jan 29, 2026
edf6e11
Fix cursor discovered bugs, fix integ tests
yuandrew Jan 29, 2026
73f4a10
Rename hostinfo to sysinfo, add interval enforcement, rename mutexes,…
yuandrew Feb 2, 2026
972555a
fix bugs cursor found, sync.oncevalue, separate poll time tracking ou…
yuandrew Feb 2, 2026
f952732
Add back resource tuner tests that got dropped
yuandrew Feb 3, 2026
a25d85d
Fix tests
yuandrew Feb 3, 2026
53da340
Fix tests, disable heartbeating for normal tests, bump dev server ver…
yuandrew Feb 3, 2026
2155206
Finish renames of sysInfoProvider, handle Time.IsZero(), make pollTim…
yuandrew Feb 4, 2026
dd02159
Fix tests
yuandrew Feb 6, 2026
bb556cb
remove extra default logger addition, remove dead code
yuandrew Feb 9, 2026
136d311
Merge branch 'master' into worker-heartbeat
yuandrew Feb 10, 2026
da40521
forgot a change..
yuandrew Feb 10, 2026
04f5d4d
fix unit tests
yuandrew Feb 10, 2026
a5c85d0
Fix eventually expectation for slower CI machines, fix race with hear…
yuandrew Feb 10, 2026
b68132e
Merge branch 'master' into worker-heartbeat
yuandrew Feb 10, 2026
c5b49db
Gate all sticky cache tests behind maxWorkflowCacheSize checks so it …
yuandrew Feb 10, 2026
faeba63
loosen workerInfo.CurrentStickyCacheSize and workerInfo.TotalStickyCa…
yuandrew Feb 10, 2026
004032a
Fix up TestWorkerHeartbeatStickyCacheMiss
yuandrew Feb 11, 2026
257e264
Add comment, minor fix
yuandrew Feb 11, 2026
8d7aa2e
Make SHUTTING_DOWN status atomic, plumb workerInstanceKeys to workflo…
yuandrew Feb 12, 2026
24e102a
Merge branch 'master' into worker-heartbeat1, PR feedback
yuandrew Feb 13, 2026
300c71d
Merge branch 'master' into worker-heartbeat1
yuandrew Feb 13, 2026
41d6afd
bring back listworkers dynamic config, fix identity in heartbeat
yuandrew Feb 13, 2026
e297ae1
Add dynamic config for listWorkers for docker test
yuandrew Feb 13, 2026
1cb7c90
server v1.29.1 still requires dynamic config for heartbeating
yuandrew Feb 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ history.enableChasm:
history.enableTransitionHistory:
- value: true
component.nexusoperations.useSystemCallbackURL:
- value: false
- value: false
frontend.WorkerHeartbeatsEnabled:
- value: true
frontend.ListWorkersEnabled:
- value: true
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build linux

package resourcetuner
package sysinfo

import (
"errors"
Expand Down Expand Up @@ -43,11 +43,11 @@ func (p *cGroupInfoImpl) GetLastCPUUsage() float64 {
func (p *cGroupInfoImpl) updateCGroupStats() error {
control, err := cgroup2.Load("/")
if err != nil {
return fmt.Errorf("failed to get cgroup mem stats %w", err)
return fmt.Errorf("failed to load cgroup: %w", err)
}
metrics, err := control.Stat()
if err != nil {
return fmt.Errorf("failed to get cgroup mem stats %w", err)
return fmt.Errorf("failed to get cgroup stats: %w", err)
}
// Only update if a limit has been set
if metrics.Memory.UsageLimit != 0 {
Expand All @@ -56,7 +56,7 @@ func (p *cGroupInfoImpl) updateCGroupStats() error {

err = p.cgroupCpuCalc.updateCpuUsage(metrics)
if err != nil {
return fmt.Errorf("failed to get cgroup cpu usage %w", err)
return fmt.Errorf("failed to get cgroup cpu usage: %w", err)
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package resourcetuner
package sysinfo

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build !linux

package resourcetuner
package sysinfo

import "errors"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package resourcetuner
package sysinfo

import (
"errors"
Expand Down
5 changes: 2 additions & 3 deletions contrib/resourcetuner/go.mod → contrib/sysinfo/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module go.temporal.io/sdk/contrib/resourcetuner
module go.temporal.io/sdk/contrib/sysinfo

go 1.23.0

Expand All @@ -8,7 +8,6 @@ require (
github.com/containerd/cgroups/v3 v3.0.3
github.com/shirou/gopsutil/v4 v4.24.8
github.com/stretchr/testify v1.10.0
go.einride.tech/pid v0.1.3
go.temporal.io/sdk v1.29.1
)

Expand All @@ -31,7 +30,7 @@ require (
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand Down
6 changes: 2 additions & 4 deletions contrib/resourcetuner/go.sum → contrib/sysinfo/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
Expand Down Expand Up @@ -141,5 +141,3 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
111 changes: 111 additions & 0 deletions contrib/sysinfo/sysinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package sysinfo

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"go.temporal.io/sdk/worker"
)

var sysInfoProvider = sync.OnceValue(func() *psUtilSystemInfoSupplier {
return &psUtilSystemInfoSupplier{
cGroupInfo: newCGroupInfo(),
}
})

// SysInfoProvider returns a shared SysInfoProvider using gopsutil.
// Supports cgroup metrics in containerized Linux environments.
func SysInfoProvider() worker.SysInfoProvider {
return sysInfoProvider()
}

type psUtilSystemInfoSupplier struct {
mu sync.Mutex
lastRefresh atomic.Int64 // UnixNano, atomic for lock-free reads in maybeRefresh

lastMemStat *mem.VirtualMemoryStat
lastCpuUsage float64

stopTryingToGetCGroupInfo bool
cGroupInfo cGroupInfo
}

type cGroupInfo interface {
// Update requests an update of the cgroup stats. This is a no-op if not in a cgroup. Returns
// true if cgroup stats should continue to be updated, false if not in a cgroup or the returned
// error is considered unrecoverable.
Update() (bool, error)
// GetLastMemUsage returns last known memory usage as a fraction of the cgroup limit. 0 if not
// in a cgroup or limit is not set.
GetLastMemUsage() float64
// GetLastCPUUsage returns last known CPU usage as a fraction of the cgroup limit. 0 if not in a
// cgroup or limit is not set.
GetLastCPUUsage() float64
}

func (p *psUtilSystemInfoSupplier) MemoryUsage(infoContext *worker.SysInfoContext) (float64, error) {
if err := p.maybeRefresh(infoContext); err != nil {
return 0, err
}
p.mu.Lock()
defer p.mu.Unlock()
lastCGroupMem := p.cGroupInfo.GetLastMemUsage()
if lastCGroupMem != 0 {
return lastCGroupMem, nil
}
return p.lastMemStat.UsedPercent / 100, nil
}

func (p *psUtilSystemInfoSupplier) CpuUsage(infoContext *worker.SysInfoContext) (float64, error) {
if err := p.maybeRefresh(infoContext); err != nil {
return 0, err
}
p.mu.Lock()
defer p.mu.Unlock()
lastCGroupCPU := p.cGroupInfo.GetLastCPUUsage()
if lastCGroupCPU != 0 {
return lastCGroupCPU, nil
}
return p.lastCpuUsage / 100, nil
}

func (p *psUtilSystemInfoSupplier) maybeRefresh(infoContext *worker.SysInfoContext) error {
if time.Since(time.Unix(0, p.lastRefresh.Load())) < 100*time.Millisecond {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
// Double check refresh is still needed
if time.Since(time.Unix(0, p.lastRefresh.Load())) < 100*time.Millisecond {
return nil
}
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFn()
memStat, err := mem.VirtualMemoryWithContext(ctx)
if err != nil {
return err
}
cpuUsage, err := cpu.PercentWithContext(ctx, 0, false)
if err != nil {
return err
}

p.lastMemStat = memStat
p.lastCpuUsage = cpuUsage[0]

if runtime.GOOS == "linux" && !p.stopTryingToGetCGroupInfo {
continueUpdates, err := p.cGroupInfo.Update()
if err != nil {
infoContext.Logger.Warn("Failed to get cgroup stats", "error", err)
}
p.stopTryingToGetCGroupInfo = !continueUpdates
}

p.lastRefresh.Store(time.Now().UnixNano())
return nil
}
42 changes: 42 additions & 0 deletions contrib/sysinfo/sysinfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package sysinfo

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/internal/log"
"go.temporal.io/sdk/worker"
)

func TestGetMemoryCpuUsage(t *testing.T) {
supplier := SysInfoProvider()
ctx := &worker.SysInfoContext{Logger: log.NewNopLogger()}

usage, err := supplier.MemoryUsage(ctx)
require.NoError(t, err)
assert.GreaterOrEqual(t, usage, 0.0)
assert.LessOrEqual(t, usage, 1.0)

usage, err = supplier.CpuUsage(ctx)
require.NoError(t, err)
assert.GreaterOrEqual(t, usage, 0.0)
assert.LessOrEqual(t, usage, 1.0)
}

func TestMaybeRefreshRateLimiting(t *testing.T) {
supplier := SysInfoProvider().(*psUtilSystemInfoSupplier)
ctx := &worker.SysInfoContext{Logger: log.NewNopLogger()}

// First call should refresh
firstUsage, err := supplier.MemoryUsage(ctx)
require.NoError(t, err)
firstRefresh := supplier.lastRefresh.Load()

// Immediate second call should not refresh (rate limited)
secondUsage, err := supplier.MemoryUsage(ctx)
require.NoError(t, err)
assert.Equal(t, firstRefresh, supplier.lastRefresh.Load())

assert.Equal(t, firstUsage, secondUsage)
}
32 changes: 31 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/google/uuid"
"sync/atomic"
"time"

Expand Down Expand Up @@ -601,6 +602,14 @@ type (
//
// NOTE: Experimental
Plugins []ClientPlugin

// WorkerHeartbeatInterval is the interval at which the worker will send heartbeats to the server.
// Interval must be between 1s and 60s, inclusive, or a negative value to disable.
//
// default: 0 defaults to 60s interval.
//
// NOTE: Experimental
WorkerHeartbeatInterval time.Duration
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
Expand Down Expand Up @@ -1198,7 +1207,9 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien

// Collect set of applicable worker plugins and interceptors
var workerPlugins []WorkerPlugin
var clientPluginNames []string
for _, plugin := range options.Plugins {
clientPluginNames = append(clientPluginNames, plugin.Name())
if workerPlugin, _ := plugin.(WorkerPlugin); workerPlugin != nil {
workerPlugins = append(workerPlugins, workerPlugin)
}
Expand All @@ -1210,6 +1221,18 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
}
}

var heartbeatInterval time.Duration
if options.WorkerHeartbeatInterval < 0 {
heartbeatInterval = 0
} else if options.WorkerHeartbeatInterval == 0 {
heartbeatInterval = 60 * time.Second
} else {
if options.WorkerHeartbeatInterval < time.Second || options.WorkerHeartbeatInterval > 60*time.Second {
panic("WorkerHeartbeatInterval must be between 1 second and 60 seconds")
}
heartbeatInterval = options.WorkerHeartbeatInterval
}

client := &WorkflowClient{
workflowService: workflowServiceClient,
conn: conn,
Expand All @@ -1223,11 +1246,18 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
contextPropagators: options.ContextPropagators,
workerPlugins: workerPlugins,
workerInterceptors: workerInterceptors,
clientPluginNames: clientPluginNames,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string]map[eagerWorker]struct{}),
},
getSystemInfoTimeout: options.ConnectionOptions.GetSystemInfoTimeout,
getSystemInfoTimeout: options.ConnectionOptions.GetSystemInfoTimeout,
workerHeartbeatInterval: heartbeatInterval,
workerGroupingKey: uuid.NewString(),
}

if heartbeatInterval > 0 {
client.heartbeatManager = newHeartbeatManager(client, heartbeatInterval, client.logger)
}

// Create outbound interceptor by wrapping backwards through chain
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func (b *builder) integrationTest() error {
"--dynamic-config-value", "history.enableChasm=true",
"--dynamic-config-value", "history.enableTransitionHistory=true",
"--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`,
"--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`},
"--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`,
"--dynamic-config-value", "frontend.ListWorkersEnabled=true",
},
})
if err != nil {
return fmt.Errorf("failed starting dev server: %w", err)
Expand Down
7 changes: 3 additions & 4 deletions internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newNexusTaskPoller(
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
},
taskHandler: taskHandler,
service: service,
Expand Down Expand Up @@ -90,11 +91,9 @@ func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
return nil, nil
}

return &nexusTask{task: response}, nil
}
ntp.pollTimeTracker.recordPollSuccess(metrics.PollerTypeNexusTask)

func (ntp *nexusTaskPoller) Cleanup() error {
return nil
return &nexusTask{task: response}, nil
}

// PollTask polls a new task
Expand Down
4 changes: 0 additions & 4 deletions internal/internal_nexus_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func newNexusWorker(opts nexusWorkerOptions) (*nexusWorker, error) {

// Start the worker.
func (w *nexusWorker) Start() error {
err := verifyNamespaceExist(w.workflowService, w.executionParameters.MetricsHandler, w.executionParameters.Namespace, w.worker.logger)
if err != nil {
return err
}
w.worker.Start()
return nil
}
Expand Down
Loading