💥Add worker heartbeat support#2186
Conversation
Implement worker heartbeats that report worker status, slot usage, and metrics to the server via the WorkerHeartbeat RPC. Key changes: - Add HeartbeatMetricsHandler to capture metrics needed for heartbeats - Add internal_worker_heartbeat.go for heartbeat worker management - Add hostmetrics package for CPU/memory usage reporting - Plumb heartbeat data through workers (workflow, activity, nexus, local activity) - Add integration tests for worker heartbeat functionality - Fix nil pointer in shutdownWorker when heartbeating is disabled
… AggregatedWorker.start()
…eTracker more idiomatic, move namespace capabilities to AW, remove Get from func names
# Conflicts: # contrib/sysinfo/go.sum # internal/cmd/build/main.go
…tbeat worker creation and callback registration
| // SysInfoContext provides context for SysInfoProvider calls. | ||
| // | ||
| // Exposed as: [go.temporal.io/sdk/worker.SysInfoContext] | ||
| type SysInfoContext struct { |
There was a problem hiding this comment.
cretz
Consider embedding context.Context in here even if just context.Background() (but not that important)
yuandrew
keeping out for now, localActivityContext, NexusOperationContext and testContext are all *Context structs that are missing context.Context, if we want to add it in later to use, we can
cretz
Then IMO we should consider passing a Go context into the calls alongside where this is passed (even if it's not used for anything yet)
yuandrew
I'm planning on leaving it out for now, and adding in the context into the SysInfoContext struct when we need the context. It seems a little redundant to pass in a Context and SysInfoContext together, any reason we'd need to add it into the calls alongside right now?
There was a problem hiding this comment.
Go API that is invoked that others implement basically all need a context.Context always even if you don't do anything with it yet. At this point it might as well be part of Go API heh. You can wait and embed later if you want, though harmless to do now.
| } | ||
| proto.Merge(aw.capabilities, capabilities) | ||
|
|
||
| if _, err := aw.client.loadNamespaceCapabilities(aw.executionParams.MetricsHandler); err != nil { |
There was a problem hiding this comment.
@jmaeagle99 this is similar to your change in external storage, we can coordinate
cretz
left a comment
There was a problem hiding this comment.
Nothing blocking, just a few minor comments, though worth looking at them and seeing if we want to address
| NumNexusSlots: defaultMaxConcurrentTaskExecutionSize, | ||
| }) | ||
| } | ||
| if params.pollTimeTracker == nil { |
There was a problem hiding this comment.
What is the situation where this is not always nil? Should this be set where params is created and not here?
There was a problem hiding this comment.
It's always nil, but this is the most common place to nil check, mainly for unit tests sake. I had this set where params are created in NewAggregatedWorker, but I'd still like to keep this check here for tests
| stickyTaskQueue = getWorkerTaskQueue(aw.workflowWorker.stickyUUID) | ||
| } | ||
|
|
||
| _, err := aw.client.workflowService.ShutdownWorker(grpcCtx, &workflowservice.ShutdownWorkerRequest{ |
There was a problem hiding this comment.
Can we pass the worker instance key here and in polls?Or if these should be done in separate issue, can we make it and put it up for triage?
There was a problem hiding this comment.
It looks like it's still not on shutdown here, but is on polls?
| } | ||
|
|
||
| if err != nil { | ||
| aw.logger.Debug("ShutdownWorker rpc errored during worker shutdown.", tagError, err) |
There was a problem hiding this comment.
I know this was debug in old code, but we should consider making this a warning, but can be done in a separate issue. This failing has negative user effects, so effectively swallowing this error and not telling user can be rough. But we have to think about situations where this is just noise.
There was a problem hiding this comment.
will address separately
| // SysInfoContext provides context for SysInfoProvider calls. | ||
| // | ||
| // Exposed as: [go.temporal.io/sdk/worker.SysInfoContext] | ||
| type SysInfoContext struct { |
There was a problem hiding this comment.
Go API that is invoked that others implement basically all need a context.Context always even if you don't do anything with it yet. At this point it might as well be part of Go API heh. You can wait and embed later if you want, though harmless to do now.
…w/activity poll requests, make WorkerHeartbeatInterval not a pointer
| TotalFailedTasks: int32(totalFailed), | ||
| LastIntervalProcessedTasks: int32(intervalProcessed), | ||
| LastIntervalFailureTasks: int32(intervalFailed), | ||
| } |
There was a problem hiding this comment.
Silent int64-to-int32 truncation corrupts heartbeat task counters
Medium Severity
buildSlotsInfo truncates int64 counter values to int32 via int32(totalProcessed) and int32(totalFailed) when populating the WorkerSlotsInfo proto. The internal tracking uses atomic.Int64, which can grow beyond math.MaxInt32. For a worker processing ~1000 tasks/second, this overflows in ~25 days, silently producing negative or wrapped values for TotalProcessedTasks and TotalFailedTasks. The same applies to TotalStickyCacheHit/TotalStickyCacheMiss in PopulateHeartbeat. Clamping to MaxInt32 would prevent silent corruption.
Additional Locations (1)
There was a problem hiding this comment.
Alternatively these should just be stored as int32 since that's what we have to send them as eventually anyway
Sushisource
left a comment
There was a problem hiding this comment.
Overall makes sense to me. I didn't do a super detailed review but the high level and tests look reasonable. A few small things.
| stickyTaskQueue = getWorkerTaskQueue(aw.workflowWorker.stickyUUID) | ||
| } | ||
|
|
||
| _, err := aw.client.workflowService.ShutdownWorker(grpcCtx, &workflowservice.ShutdownWorkerRequest{ |
There was a problem hiding this comment.
It looks like it's still not on shutdown here, but is on polls?
| TotalFailedTasks: int32(totalFailed), | ||
| LastIntervalProcessedTasks: int32(intervalProcessed), | ||
| LastIntervalFailureTasks: int32(intervalFailed), | ||
| } |
There was a problem hiding this comment.
Alternatively these should just be stored as int32 since that's what we have to send them as eventually anyway
| } else { | ||
| wtp.pollTimeTracker.recordPollSuccess(metrics.PollerTypeWorkflowTask) | ||
| } | ||
|
|
There was a problem hiding this comment.
Idle pollers report stale heartbeat timestamps
Medium Severity
pollTimeTracker.recordPollSuccess is only called when a poll returns a task token. Successful empty polls are skipped, so LastSuccessfulPollTime in worker heartbeats can remain stale or zero while pollers are healthy and actively polling. This makes heartbeat poller health data inaccurate during idle periods.
Additional Locations (2)
| } | ||
|
|
||
| aw.shutdownWorker() | ||
|
|
There was a problem hiding this comment.
Shutdown sent for never-started workers
Medium Severity
AggregatedWorker.Stop() now always calls shutdownWorker() even when the worker was never started. This can send a ShutdownWorker RPC with a synthesized heartbeat for a worker that never polled, creating misleading worker state and adding avoidable network/retry delay during startup-failure cleanup.


NOTE: This is a re-creation of #2148, making a newer, clean PR as the resolved comment count was getting very high and making it hard to navigate.
What was changed
Key changes:
contrib/resourcetunerlib, resource tuner portion has been moved into the SDK and is exposed through theworkerpackage, while a newcontrib/sysinfopackage was created to handle system resource measurementcontrib/resourcetuner, so this should be okay.ShutdownWorkerRPC to give final heartbeat, even if not using sticky queuesverifyNamespaceExistcalls from each specific worker, and instead call at theAggregatedWorkerlevel and cache for worker heartbeatingWhy?
New feature!
Checklist
Closes Worker Heartbeating #2094
How was this tested:
Added a bunch of tests to
worker_heartbeat_test.goNote
Medium Risk
Touches core worker start/stop and polling paths and adds new background goroutines/RPCs for heartbeating; failures could impact worker shutdown behavior or add load if misconfigured, but the feature is gated by an experimental interval option and server capabilities.
Overview
Adds experimental worker heartbeats: clients can now be configured with
WorkerHeartbeatIntervalto periodically callRecordWorkerHeartbeatand to send a final heartbeat viaShutdownWorkeron stop, including worker/poller/slot metrics, plugin names, deployment version, and optional CPU/memory usage.Refactors tuning/sysinfo: moves the resource-based tuner into the SDK
workerAPI (requiring an injectedSysInfoProvider), introduces newcontrib/sysinfo(gopsutil + cgroup-aware) implementation, and wires poll-success tracking plus heartbeat-specific metrics capturing across workflow/activity/nexus pollers.Updates dev/test configs to enable
WorkerHeartbeatsEnabled/ListWorkersEnabled, adjusts integration tests accordingly, and adds a large newworker_heartbeat_test.gocoverage suite.Written by Cursor Bugbot for commit 1cb7c90. This will update automatically on new commits. Configure here.