Conversation
Implement worker heartbeats that report worker status, slot usage, and metrics to the server via the WorkerHeartbeat RPC. Key changes: - Add HeartbeatMetricsHandler to capture metrics needed for heartbeats - Add internal_worker_heartbeat.go for heartbeat worker management - Add hostmetrics package for CPU/memory usage reporting - Plumb heartbeat data through workers (workflow, activity, nexus, local activity) - Add integration tests for worker heartbeat functionality - Fix nil pointer in shutdownWorker when heartbeating is disabled
| s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) | ||
| s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, nil).AnyTimes() | ||
| s.service.EXPECT().RespondWorkflowTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() | ||
| s.service.EXPECT().ShutdownWorker(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.ShutdownWorkerResponse{}, nil).Times(1) |
There was a problem hiding this comment.
These ones that went away seem wrong. We should still be calling shutdown worker in cases where we were previously
There was a problem hiding this comment.
The level at where ShutdownWorker is called went up a level, so now this test shuts down a workflow worker no longer makes the gRPC request, it's made at the AggregatedWorker level
cretz
left a comment
There was a problem hiding this comment.
Did an early pass on general structure
| // RecordPollSuccess records a successful poll time if the handler supports it. | ||
| // pollerType should be one of PollerTypeWorkflowTask, PollerTypeWorkflowStickyTask, | ||
| // PollerTypeActivityTask, or PollerTypeNexusTask. | ||
| func RecordPollSuccess(h Handler, pollerType string) { |
There was a problem hiding this comment.
I wonder if this should be divorced from metrics handler. Specifically, I wonder if the heartbeat metrics stuff has a handler for some metrics, and then *HeartbeatMetricsHandler is set on pollers specifically just for recording poll success. If you must keep this, why even have the interface instead of just type asserting that it is a *HeartbeatMetricsHandler?
There was a problem hiding this comment.
Is there a specific concern with divorcing from metrics handler? Today, heartbeat metrics should be a transparent layer, so passing in a specific *HeartbeatMetricsHandler doesn't feel too different than just using the metrics handler itself, maybe except being a little clarity, at the cost of passing an object (*HeartbeatMetricsHandler) that we already have access to from wtp.metricsHandler.
Thanks for the callout on the interface being unneccesary
There was a problem hiding this comment.
The main struggle for me is that if you hide everything behind metrics handler interface as if you were a user, it's hard for code readers in the SDK that may make future changes to recognize metrics handler is not just about user metrics, it's leveraged for internal utilities too. But not a big deal to leave as is.
There was a problem hiding this comment.
I still lean towards having this method, primarily for code simplicity sake. I've renamed to recordPollSuccessIfHeartbeat, does that help make it more distinct we're doing this for heartbeat, not user metrics?
There was a problem hiding this comment.
I think the name change goes wrong in the other direction. We should be recording poll successes regardless of metrics handler types. What I was trying to say is that heartbeats don't need to use metrics handlers for this data since workers already control where it is called. Extracting poll data out of a metrics handler adds for a bunch of confusing logic when you could just call something here that records it more specifically for that use case (and leave metrics handler alone to record it as it always has, no need to intercept).
But if you still want to extract poll metrics from the metrics handler abstraction, ok, but definitely no need to change the name or only do it if it's a certain metrics handler type.
There was a problem hiding this comment.
Created a separate pollTimeTracker and plumbed through separately
| // Track the worker type if present in tags | ||
| workerType := h.workerType | ||
| if wt, ok := tags[WorkerTypeTagName]; ok { | ||
| workerType = wt | ||
| } | ||
|
|
||
| // Track the poller type if present in tags | ||
| pollerType := h.pollerType | ||
| if pt, ok := tags[PollerTypeTagName]; ok { | ||
| pollerType = pt | ||
| } |
There was a problem hiding this comment.
I wonder if these two values should not be extracted from WithTags but be done more explicitly when they are set which is only once per worker/poller. Not a big deal, but it may clean up some of this abstraction if the worker/poller type specific metrics were in their own struct maybe? And/or a worker or poller type was required to get a handler out here? I can fashion some ideas if needed...
There was a problem hiding this comment.
Added an explicit forWorker and forPoller and made WithTags a passthrough
There was a problem hiding this comment.
I didn't mean double up calls on the caller side, I meant you can do what forWorker and forPoller do inside of WithTags (i.e. store more explicit state) and/or make these forWorker and forPoller calls do the WithTags themselves. It doesn't make sense to have two always-consecutive calls both independently copy this thing.
There was a problem hiding this comment.
oops, I had an additional WithTags call for forWorker. I feel better about this, where we always call WithTags, and if heartbeating, we supplementally call forWorker/forPoller before hand.
Don't want to change the function signature of WithTags, and now sure how we can be more explicit with WithTags without doing so. And I feel like putting the WithTags call inside of forWorker makes the caller less clear that it's still calling WithTags
| // RecordPollSuccess records a successful poll time if the handler supports it. | ||
| // pollerType should be one of PollerTypeWorkflowTask, PollerTypeWorkflowStickyTask, | ||
| // PollerTypeActivityTask, or PollerTypeNexusTask. | ||
| func RecordPollSuccess(h Handler, pollerType string) { |
There was a problem hiding this comment.
The main struggle for me is that if you hide everything behind metrics handler interface as if you were a user, it's hard for code readers in the SDK that may make future changes to recognize metrics handler is not just about user metrics, it's leveraged for internal utilities too. But not a big deal to leave as is.
… AggregatedWorker.start()
cretz
left a comment
There was a problem hiding this comment.
Looks great, almost all of my stuff now is pedantic
| // RecordPollSuccess records a successful poll time if the handler supports it. | ||
| // pollerType should be one of PollerTypeWorkflowTask, PollerTypeWorkflowStickyTask, | ||
| // PollerTypeActivityTask, or PollerTypeNexusTask. | ||
| func RecordPollSuccess(h Handler, pollerType string) { |
There was a problem hiding this comment.
I think the name change goes wrong in the other direction. We should be recording poll successes regardless of metrics handler types. What I was trying to say is that heartbeats don't need to use metrics handlers for this data since workers already control where it is called. Extracting poll data out of a metrics handler adds for a bunch of confusing logic when you could just call something here that records it more specifically for that use case (and leave metrics handler alone to record it as it always has, no need to intercept).
But if you still want to extract poll metrics from the metrics handler abstraction, ok, but definitely no need to change the name or only do it if it's a certain metrics handler type.
| err := p.updateCGroupStats() | ||
| // Stop updates if not in a container. No need to return the error and log it. | ||
| if !errors.Is(err, fs.ErrNotExist) { | ||
| if errors.Is(err, fs.ErrNotExist) { |
There was a problem hiding this comment.
This check seems inverted, I switched it around
There was a problem hiding this comment.
That this wasn't caught before makes me concerned this logic isn't being tested properly (but doesn't have to be part of this PR)
There was a problem hiding this comment.
Let's do this in a separate PR, left myself a note to add a test for this
| // WorkerHeartbeatInterval is the interval at which the worker will send heartbeats to the server. | ||
| // Interval must be between 1s and 60s, inclusive. | ||
| // | ||
| // default: 60s. To disable, set to 0. |
There was a problem hiding this comment.
Pedantic, but would maybe recommend "negative to disable" and remove the pointer and assume 0/unset means use default. Not a big deal though.
There was a problem hiding this comment.
I remember consulting claude on this design decision, it seemed like this was the more idiomatic choice for Go,
- HashiCorp consul-template — the most extensive example. Nearly every duration config is
*time.Duration with a Finalize() method that fills defaults for nil. They even have a
TimeDurationPresent() helper that returns true only if non-nil AND non-zero, explicitly treating zero
as a distinct state from nil.
https://pkg.go.dev/github.com/hashicorp/consul-template/config
internal/client.go
Outdated
| options.ConnectionOptions.GetSystemInfoTimeout = defaultGetSystemInfoTimeout | ||
| } | ||
|
|
||
| if options.Logger == nil { |
There was a problem hiding this comment.
Can you help me understand this change? All user-facing code that calls NewServiceClient should already be setting this default. Is there a new code path that calls NewServiceClient?
There was a problem hiding this comment.
No new code path, but sessionEnvironmentImpl.SignalCreationResponse calls GetClient without a logger, this seems like it would fix that missing scenario, but also seems like good practice to have in general?
There was a problem hiding this comment.
this seems like it would fix that missing scenario, but also seems like good practice to have in general?
They may have left logger off intentionally, unsure, but would take some more digging. It's possible/likely you're right, but it seems unrelated to this project and may deserve a separate issue.
There was a problem hiding this comment.
yeah that's fair, I'll remove this for now
There was a problem hiding this comment.
turns out this was due to newHeartbeatManager using the logger, but I added the check into that function itself, so SignalCreationResponse is unaffected
| @@ -1468,9 +1479,67 @@ func (aw *AggregatedWorker) Stop() { | |||
| WorkerInstanceKey: aw.workerInstanceKey, | |||
There was a problem hiding this comment.
Arguably we should go ahead and update API dependency and set this on the poll and shutdown calls, but we can make an issue to do that in successive PR if we want
There was a problem hiding this comment.
would prefer to do this in a separate PR
internal/internal_nexus_worker.go
Outdated
| var workflowClient *WorkflowClient | ||
| if wc, ok := opts.client.(*WorkflowClient); ok { | ||
| workflowClient = wc | ||
| } |
There was a problem hiding this comment.
Should we ever accept a situation where client is not this? But in general, why does nexusWorker need a client? Loading capabilities seems like something the outer/aggregate worker would do.
There was a problem hiding this comment.
agree we don't want this requirement on WorkerClient, removed all individual specific workers using this and instead AggregateWorker makes this call instead
internal/resource_tuner.go
Outdated
| // SystemInfoContext provides context for SystemInfoSupplier calls. | ||
| // | ||
| // Exposed as: [go.temporal.io/sdk/worker.SystemInfoContext] | ||
| type SystemInfoContext struct { |
There was a problem hiding this comment.
Consider embedding context.Context in here even if just context.Background() (but not that important)
There was a problem hiding this comment.
keeping out for now, localActivityContext, NexusOperationContext and testContext are all *Context structs that are missing context.Context, if we want to add it in later to use, we can
There was a problem hiding this comment.
Then IMO we should consider passing a Go context into the calls alongside where this is passed (even if it's not used for anything yet)
There was a problem hiding this comment.
I'm planning on leaving it out for now, and adding in the context into the SysInfoContext struct when we need the context. It seems a little redundant to pass in a Context and SysInfoContext together, any reason we'd need to add it into the calls alongside right now?
…eTracker more idiomatic, move namespace capabilities to AW, remove Get from func names
# Conflicts: # contrib/sysinfo/go.sum # internal/cmd/build/main.go
| hw.callbacksMutex.Unlock() | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Race condition between register and unregister in heartbeat manager
Medium Severity
registerWorker releases workersMutex (after getOrCreateSharedNamespaceWorker returns) before adding the callback to the sharedNamespaceWorker. A concurrent unregisterWorker call for a different worker on the same namespace can acquire workersMutex, find zero remaining callbacks, stop the sharedNamespaceWorker, and remove it from the map — all between the mutex release and the callback addition. The registering worker's callback is then silently added to a stopped worker whose goroutine has already exited, so its heartbeats are never sent.
Additional Locations (1)
| TotalFailedTasks: int32(totalFailed), | ||
| LastIntervalProcessedTasks: int32(intervalProcessed), | ||
| LastIntervalFailureTasks: int32(intervalFailed), | ||
| } |
There was a problem hiding this comment.
Unchecked int64-to-int32 truncation may wrap task counters
Low Severity
buildSlotsInfo casts totalProcessed and totalFailed (int64 values from atomic.Int64) directly to int32 without bounds checking. A long-running worker that processes more than ~2.1 billion tasks will silently overflow, producing incorrect negative values in TotalProcessedTasks, TotalFailedTasks, and the LastInterval* fields. Clamping to math.MaxInt32 before the cast would prevent this.
| TotalFailedTasks: int32(totalFailed), | ||
| LastIntervalProcessedTasks: int32(intervalProcessed), | ||
| LastIntervalFailureTasks: int32(intervalFailed), | ||
| } |
There was a problem hiding this comment.
Unchecked int64-to-int32 truncation may wrap task counters
Low Severity
buildSlotsInfo casts totalProcessed and totalFailed (int64 values from atomic.Int64) directly to int32 without bounds checking. A long-running worker that processes more than ~2.1 billion tasks will silently overflow, producing incorrect negative values in TotalProcessedTasks, TotalFailedTasks, and the LastInterval* fields. Clamping to math.MaxInt32 before the cast would prevent this.


What was changed
Key changes:
contrib/resourcetunerlib, resource tuner portion has been moved into the SDK and is exposed through theworkerpackage, while a newcontrib/sysinfopackage was created to handle system resource measurementcontrib/resourcetuner, so this should be okay.ShutdownWorkerRPC to give final heartbeat, even if not using sticky queuesverifyNamespaceExistcalls from each specific worker, and instead call at theAggregatedWorkerlevel and cache for worker heartbeatingWhy?
New feature!
Checklist
Closes Worker Heartbeating #2094
How was this tested:
Added a bunch of tests to
worker_heartbeat_test.goNote
High Risk
Touches core worker lifecycle, RPCs, and metrics plumbing, adding new background goroutines and shutdown behavior that could affect worker stability and server load if misconfigured.
Overview
Adds worker heartbeat support: the SDK now periodically reports per-worker slot/poller stats, sticky cache counters, plugin names, and host CPU/memory usage to the server via
RecordWorkerHeartbeat, and sends a finalShutdownWorkerRPC (with optional sticky queue) on worker stop.Introduces a shared per-namespace heartbeat manager and in-memory metric capture (
heartbeatMetricsHandler), plumbs poll-success timestamps from workflow/activity/nexus pollers, and addsclient.Options.WorkerHeartbeatIntervalwith capability gating viaDescribeNamespace.Refactors/migrates resource tuning: removes
contrib/resourcetunerby moving the resource-based tuner/controller and PID logic intointernaland re-exporting fromworker, requiring aSysInfoProvider; adds newcontrib/sysinfo(gopsutil + Linux cgroups) as the recommended provider and updates integration/tests accordingly (including newworker_heartbeat_test.go).Written by Cursor Bugbot for commit da40521. This will update automatically on new commits. Configure here.