From 8989e78a0cb8915579ddbd406e04e0a12fa81d62 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 21:49:19 -0700 Subject: [PATCH 01/16] set TargetVersionChanged instead of SuggestCaN when TargetVersionChanged --- api/persistence/v1/executions.pb.go | 39 ++++--- go.mod | 2 +- go.sum | 4 +- .../api/persistence/v1/executions.proto | 1 + .../history/historybuilder/event_factory.go | 18 +-- .../history/historybuilder/history_builder.go | 2 + service/history/interfaces/mutable_state.go | 2 +- .../history/interfaces/workflow_task_info.go | 7 +- .../history/workflow/mutable_state_impl.go | 3 +- .../workflow/mutable_state_rebuilder.go | 1 + .../workflow/workflow_task_state_machine.go | 108 ++++++++++-------- tests/versioning_3_test.go | 29 ++--- 12 files changed, 116 insertions(+), 100 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 2e6d4d5654..2000a2538d 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -152,19 +152,20 @@ type WorkflowExecutionInfo struct { StartTime *timestamppb.Timestamp `protobuf:"bytes,20,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` LastUpdateTime *timestamppb.Timestamp `protobuf:"bytes,21,opt,name=last_update_time,json=lastUpdateTime,proto3" json:"last_update_time,omitempty"` // Workflow task fields. - WorkflowTaskVersion int64 `protobuf:"varint,22,opt,name=workflow_task_version,json=workflowTaskVersion,proto3" json:"workflow_task_version,omitempty"` - WorkflowTaskScheduledEventId int64 `protobuf:"varint,23,opt,name=workflow_task_scheduled_event_id,json=workflowTaskScheduledEventId,proto3" json:"workflow_task_scheduled_event_id,omitempty"` - WorkflowTaskStartedEventId int64 `protobuf:"varint,24,opt,name=workflow_task_started_event_id,json=workflowTaskStartedEventId,proto3" json:"workflow_task_started_event_id,omitempty"` - WorkflowTaskTimeout *durationpb.Duration `protobuf:"bytes,25,opt,name=workflow_task_timeout,json=workflowTaskTimeout,proto3" json:"workflow_task_timeout,omitempty"` - WorkflowTaskAttempt int32 `protobuf:"varint,26,opt,name=workflow_task_attempt,json=workflowTaskAttempt,proto3" json:"workflow_task_attempt,omitempty"` - WorkflowTaskStartedTime *timestamppb.Timestamp `protobuf:"bytes,27,opt,name=workflow_task_started_time,json=workflowTaskStartedTime,proto3" json:"workflow_task_started_time,omitempty"` - WorkflowTaskScheduledTime *timestamppb.Timestamp `protobuf:"bytes,28,opt,name=workflow_task_scheduled_time,json=workflowTaskScheduledTime,proto3" json:"workflow_task_scheduled_time,omitempty"` - WorkflowTaskOriginalScheduledTime *timestamppb.Timestamp `protobuf:"bytes,30,opt,name=workflow_task_original_scheduled_time,json=workflowTaskOriginalScheduledTime,proto3" json:"workflow_task_original_scheduled_time,omitempty"` - WorkflowTaskRequestId string `protobuf:"bytes,31,opt,name=workflow_task_request_id,json=workflowTaskRequestId,proto3" json:"workflow_task_request_id,omitempty"` - WorkflowTaskType v1.WorkflowTaskType `protobuf:"varint,68,opt,name=workflow_task_type,json=workflowTaskType,proto3,enum=temporal.server.api.enums.v1.WorkflowTaskType" json:"workflow_task_type,omitempty"` - WorkflowTaskSuggestContinueAsNew bool `protobuf:"varint,69,opt,name=workflow_task_suggest_continue_as_new,json=workflowTaskSuggestContinueAsNew,proto3" json:"workflow_task_suggest_continue_as_new,omitempty"` - WorkflowTaskSuggestContinueAsNewReasons []v11.SuggestContinueAsNewReason `protobuf:"varint,110,rep,packed,name=workflow_task_suggest_continue_as_new_reasons,json=workflowTaskSuggestContinueAsNewReasons,proto3,enum=temporal.api.enums.v1.SuggestContinueAsNewReason" json:"workflow_task_suggest_continue_as_new_reasons,omitempty"` - WorkflowTaskHistorySizeBytes int64 `protobuf:"varint,70,opt,name=workflow_task_history_size_bytes,json=workflowTaskHistorySizeBytes,proto3" json:"workflow_task_history_size_bytes,omitempty"` + WorkflowTaskVersion int64 `protobuf:"varint,22,opt,name=workflow_task_version,json=workflowTaskVersion,proto3" json:"workflow_task_version,omitempty"` + WorkflowTaskScheduledEventId int64 `protobuf:"varint,23,opt,name=workflow_task_scheduled_event_id,json=workflowTaskScheduledEventId,proto3" json:"workflow_task_scheduled_event_id,omitempty"` + WorkflowTaskStartedEventId int64 `protobuf:"varint,24,opt,name=workflow_task_started_event_id,json=workflowTaskStartedEventId,proto3" json:"workflow_task_started_event_id,omitempty"` + WorkflowTaskTimeout *durationpb.Duration `protobuf:"bytes,25,opt,name=workflow_task_timeout,json=workflowTaskTimeout,proto3" json:"workflow_task_timeout,omitempty"` + WorkflowTaskAttempt int32 `protobuf:"varint,26,opt,name=workflow_task_attempt,json=workflowTaskAttempt,proto3" json:"workflow_task_attempt,omitempty"` + WorkflowTaskStartedTime *timestamppb.Timestamp `protobuf:"bytes,27,opt,name=workflow_task_started_time,json=workflowTaskStartedTime,proto3" json:"workflow_task_started_time,omitempty"` + WorkflowTaskScheduledTime *timestamppb.Timestamp `protobuf:"bytes,28,opt,name=workflow_task_scheduled_time,json=workflowTaskScheduledTime,proto3" json:"workflow_task_scheduled_time,omitempty"` + WorkflowTaskOriginalScheduledTime *timestamppb.Timestamp `protobuf:"bytes,30,opt,name=workflow_task_original_scheduled_time,json=workflowTaskOriginalScheduledTime,proto3" json:"workflow_task_original_scheduled_time,omitempty"` + WorkflowTaskRequestId string `protobuf:"bytes,31,opt,name=workflow_task_request_id,json=workflowTaskRequestId,proto3" json:"workflow_task_request_id,omitempty"` + WorkflowTaskType v1.WorkflowTaskType `protobuf:"varint,68,opt,name=workflow_task_type,json=workflowTaskType,proto3,enum=temporal.server.api.enums.v1.WorkflowTaskType" json:"workflow_task_type,omitempty"` + WorkflowTaskSuggestContinueAsNew bool `protobuf:"varint,69,opt,name=workflow_task_suggest_continue_as_new,json=workflowTaskSuggestContinueAsNew,proto3" json:"workflow_task_suggest_continue_as_new,omitempty"` + WorkflowTaskSuggestContinueAsNewReasons []v11.SuggestContinueAsNewReason `protobuf:"varint,110,rep,packed,name=workflow_task_suggest_continue_as_new_reasons,json=workflowTaskSuggestContinueAsNewReasons,proto3,enum=temporal.api.enums.v1.SuggestContinueAsNewReason" json:"workflow_task_suggest_continue_as_new_reasons,omitempty"` + WorkflowTaskTargetWorkerDeploymentVersionChanged bool `protobuf:"varint,112,opt,name=workflow_task_target_worker_deployment_version_changed,json=workflowTaskTargetWorkerDeploymentVersionChanged,proto3" json:"workflow_task_target_worker_deployment_version_changed,omitempty"` + WorkflowTaskHistorySizeBytes int64 `protobuf:"varint,70,opt,name=workflow_task_history_size_bytes,json=workflowTaskHistorySizeBytes,proto3" json:"workflow_task_history_size_bytes,omitempty"` // tracks the started build ID for transient/speculative WFT. This info is used for two purposes: // - verify WFT completes by the same Build ID that started in the latest attempt // - when persisting transient/speculative WFT, the right Build ID is used in the WFT started event @@ -589,6 +590,13 @@ func (x *WorkflowExecutionInfo) GetWorkflowTaskSuggestContinueAsNewReasons() []v return nil } +func (x *WorkflowExecutionInfo) GetWorkflowTaskTargetWorkerDeploymentVersionChanged() bool { + if x != nil { + return x.WorkflowTaskTargetWorkerDeploymentVersionChanged + } + return false +} + func (x *WorkflowExecutionInfo) GetWorkflowTaskHistorySizeBytes() int64 { if x != nil { return x.WorkflowTaskHistorySizeBytes @@ -4572,7 +4580,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\x05R\x03key\x12D\n" + "\x05value\x18\x02 \x01(\v2..temporal.server.api.persistence.v1.QueueStateR\x05value:\x028\x01J\x04\b\x04\x10\x05J\x04\b\x05\x10\x06J\x04\b\b\x10\tJ\x04\b\t\x10\n" + "J\x04\b\n" + - "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xeb?\n" + + "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xdd@\n" + "\x15WorkflowExecutionInfo\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12\x1f\n" + "\vworkflow_id\x18\x02 \x01(\tR\n" + @@ -4606,7 +4614,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x18workflow_task_request_id\x18\x1f \x01(\tR\x15workflowTaskRequestId\x12\\\n" + "\x12workflow_task_type\x18D \x01(\x0e2..temporal.server.api.enums.v1.WorkflowTaskTypeR\x10workflowTaskType\x12O\n" + "%workflow_task_suggest_continue_as_new\x18E \x01(\bR workflowTaskSuggestContinueAsNew\x12\x91\x01\n" + - "-workflow_task_suggest_continue_as_new_reasons\x18n \x03(\x0e21.temporal.api.enums.v1.SuggestContinueAsNewReasonR'workflowTaskSuggestContinueAsNewReasons\x12F\n" + + "-workflow_task_suggest_continue_as_new_reasons\x18n \x03(\x0e21.temporal.api.enums.v1.SuggestContinueAsNewReasonR'workflowTaskSuggestContinueAsNewReasons\x12p\n" + + "6workflow_task_target_worker_deployment_version_changed\x18p \x01(\bR0workflowTaskTargetWorkerDeploymentVersionChanged\x12F\n" + " workflow_task_history_size_bytes\x18F \x01(\x03R\x1cworkflowTaskHistorySizeBytes\x123\n" + "\x16workflow_task_build_id\x18X \x01(\tR\x13workflowTaskBuildId\x12S\n" + "'workflow_task_build_id_redirect_counter\x18Y \x01(\x03R\"workflowTaskBuildIdRedirectCounter\x12.\n" + diff --git a/go.mod b/go.mod index 50abea769b..c9ddb442af 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed + go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3 go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index e8cf371965..4b128ae3aa 100644 --- a/go.sum +++ b/go.sum @@ -373,8 +373,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74= -go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3 h1:OFtCxga70aM+xSpbMGWb+10AJ6TvZ+eH1mmpgSpdEhs= +go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 116770bb82..0a025649ae 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -90,6 +90,7 @@ message WorkflowExecutionInfo { temporal.server.api.enums.v1.WorkflowTaskType workflow_task_type = 68; bool workflow_task_suggest_continue_as_new = 69; repeated temporal.api.enums.v1.SuggestContinueAsNewReason workflow_task_suggest_continue_as_new_reasons = 110; + bool workflow_task_target_worker_deployment_version_changed = 112; int64 workflow_task_history_size_bytes = 70; // tracks the started build ID for transient/speculative WFT. This info is used for two purposes: // - verify WFT completes by the same Build ID that started in the latest attempt diff --git a/service/history/historybuilder/event_factory.go b/service/history/historybuilder/event_factory.go index 33627bbe62..46456f28f6 100644 --- a/service/history/historybuilder/event_factory.go +++ b/service/history/historybuilder/event_factory.go @@ -125,18 +125,20 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, buildIdRedirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) *historypb.HistoryEvent { event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime) event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: scheduledEventID, - Identity: identity, - RequestId: requestID, - SuggestContinueAsNew: suggestContinueAsNew, - SuggestContinueAsNewReasons: suggestContinueAsNewReasons, - HistorySizeBytes: historySizeBytes, - WorkerVersion: versioningStamp, - BuildIdRedirectCounter: buildIdRedirectCounter, + ScheduledEventId: scheduledEventID, + Identity: identity, + RequestId: requestID, + SuggestContinueAsNew: suggestContinueAsNew, + SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, + HistorySizeBytes: historySizeBytes, + WorkerVersion: versioningStamp, + BuildIdRedirectCounter: buildIdRedirectCounter, }, } return event diff --git a/service/history/historybuilder/history_builder.go b/service/history/historybuilder/history_builder.go index 024078fd09..993659e4c2 100644 --- a/service/history/historybuilder/history_builder.go +++ b/service/history/historybuilder/history_builder.go @@ -198,6 +198,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, buildIdRedirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) *historypb.HistoryEvent { event := b.EventFactory.CreateWorkflowTaskStartedEvent( scheduledEventID, @@ -209,6 +210,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent( versioningStamp, buildIdRedirectCounter, suggestContinueAsNewReasons, + targetWorkerDeploymentVersionChanged, ) event, _ = b.EventStore.add(event) return event diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index daec80d476..97586aefe4 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -243,7 +243,7 @@ type ( ApplyWorkflowTaskCompletedEvent(*historypb.HistoryEvent) error ApplyWorkflowTaskFailedEvent() error ApplyWorkflowTaskScheduledEvent(int64, int64, *taskqueuepb.TaskQueue, *durationpb.Duration, int32, *timestamppb.Timestamp, *timestamppb.Timestamp, enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) - ApplyWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time, bool, int64, *commonpb.WorkerVersionStamp, int64, []enumspb.SuggestContinueAsNewReason) (*WorkflowTaskInfo, error) + ApplyWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time, bool, int64, *commonpb.WorkerVersionStamp, int64, []enumspb.SuggestContinueAsNewReason, bool) (*WorkflowTaskInfo, error) ApplyWorkflowTaskTimedOutEvent(enumspb.TimeoutType) error ApplyExternalWorkflowExecutionCancelRequested(*historypb.HistoryEvent) error ApplyExternalWorkflowExecutionSignaled(*historypb.HistoryEvent) error diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index 2d21e2fc90..475db95451 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -47,9 +47,10 @@ type WorkflowTaskInfo struct { // transient event), otherwise a dynamic config change of the suggestion threshold could // cause the WorkflowTaskStarted event that the worker used to not match the event we saved // in history. - SuggestContinueAsNew bool - SuggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason - HistorySizeBytes int64 + SuggestContinueAsNew bool + SuggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason + TargetWorkerDeploymentVersionChanged bool + HistorySizeBytes int64 // BuildIdRedirectCounter tracks the started build ID redirect counter for transient/speculative WFT. This // info is to make sure the right redirect counter is used in the WFT started event created later // for a transient/speculative WFT. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index af50fccc29..73f62246b6 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -3251,10 +3251,11 @@ func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, redirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) (*historyi.WorkflowTaskInfo, error) { return ms.workflowTaskManager.ApplyWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp, suggestContinueAsNew, historySizeBytes, versioningStamp, redirectCounter, - suggestContinueAsNewReasons) + suggestContinueAsNewReasons, targetWorkerDeploymentVersionChanged) } // TODO (alex-update): Transient needs to be renamed to "TransientOrSpeculative" diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index 77bd3c2765..2724af3099 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -237,6 +237,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( attributes.GetWorkerVersion(), attributes.GetBuildIdRedirectCounter(), attributes.GetSuggestContinueAsNewReasons(), + attributes.GetTargetWorkerDeploymentVersionChanged(), ) if err != nil { return nil, err diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index e7d0628d1c..98c8b60410 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -178,6 +178,7 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, redirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) (*historyi.WorkflowTaskInfo, error) { // When this function is called from ApplyEvents, workflowTask is nil. // It is safe to look up the workflow task as it does not have to deal with transient workflow task case. @@ -202,23 +203,24 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( } workflowTask = &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: startedEventID, - RequestID: requestID, - WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, - Attempt: workflowTask.Attempt, - AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, - StartedTime: startedTime, - ScheduledTime: workflowTask.ScheduledTime, - TaskQueue: workflowTask.TaskQueue, - OriginalScheduledTime: workflowTask.OriginalScheduledTime, - Type: workflowTask.Type, - SuggestContinueAsNew: suggestContinueAsNew, - SuggestContinueAsNewReasons: suggestContinueAsNewReasons, - HistorySizeBytes: historySizeBytes, - BuildIdRedirectCounter: redirectCounter, - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: startedEventID, + RequestID: requestID, + WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, + Attempt: workflowTask.Attempt, + AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, + StartedTime: startedTime, + ScheduledTime: workflowTask.ScheduledTime, + TaskQueue: workflowTask.TaskQueue, + OriginalScheduledTime: workflowTask.OriginalScheduledTime, + Type: workflowTask.Type, + SuggestContinueAsNew: suggestContinueAsNew, + SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, + HistorySizeBytes: historySizeBytes, + BuildIdRedirectCounter: redirectCounter, + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" { @@ -491,14 +493,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( // checking whether targetDeploymentVersion == nil means that we won't send the CaN recommendation to workflows // that are about to transition to the target version. This is good, because if their transition succeeds, they // don't need to CaN to start using the new version. - if m.ms.config.EnableSuggestCaNOnNewTargetVersion(m.ms.namespaceEntry.Name().String()) && - m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && + var targetDeploymentVersionChanged bool + if m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil { if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil && (currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId || currentDeploymentVersion.SeriesName != targetDeploymentVersion.DeploymentName) { - suggestContinueAsNew = cmp.Or(suggestContinueAsNew, true) - suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED) + targetDeploymentVersionChanged = true } } // emit metric @@ -512,8 +513,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE)), metrics.SuggestContinueAsNewReasonTooManyHistoryEventsTag( slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS)), - metrics.SuggestContinueAsNewReasonTargetVersionChangedTag( - slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED)), )).Record(1) } @@ -558,6 +557,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( versioningStamp, redirectCounter, suggestContinueAsNewReasons, + targetDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() startedEventID = startedEvent.GetEventId() @@ -575,6 +575,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( versioningStamp, redirectCounter, suggestContinueAsNewReasons, + targetDeploymentVersionChanged, ) if err != nil { return nil, nil, err @@ -751,6 +752,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( request.WorkerVersionStamp, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -841,6 +843,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( versioningStamp, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -913,6 +916,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( nil, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -1081,6 +1085,7 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.executionInfo.WorkflowTaskType = workflowTask.Type m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew = workflowTask.SuggestContinueAsNew m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons = workflowTask.SuggestContinueAsNewReasons + m.ms.executionInfo.WorkflowTaskTargetWorkerDeploymentVersionChanged = workflowTask.TargetWorkerDeploymentVersionChanged m.ms.executionInfo.WorkflowTaskHistorySizeBytes = workflowTask.HistorySizeBytes m.ms.executionInfo.WorkflowTaskBuildId = workflowTask.BuildId m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter = workflowTask.BuildIdRedirectCounter @@ -1174,13 +1179,14 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( Version: m.ms.currentVersion, Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: workflowTask.ScheduledEventID, - Identity: identity, - RequestId: workflowTask.RequestID, - SuggestContinueAsNew: workflowTask.SuggestContinueAsNew, - SuggestContinueAsNewReasons: workflowTask.SuggestContinueAsNewReasons, - HistorySizeBytes: workflowTask.HistorySizeBytes, - WorkerVersion: versioningStamp, + ScheduledEventId: workflowTask.ScheduledEventID, + Identity: identity, + RequestId: workflowTask.RequestID, + SuggestContinueAsNew: workflowTask.SuggestContinueAsNew, + SuggestContinueAsNewReasons: workflowTask.SuggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: workflowTask.TargetWorkerDeploymentVersionChanged, + HistorySizeBytes: workflowTask.HistorySizeBytes, + WorkerVersion: versioningStamp, }, }, } @@ -1192,26 +1198,27 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskInfo { wft := &historyi.WorkflowTaskInfo{ - Version: m.ms.executionInfo.WorkflowTaskVersion, - ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId, - StartedEventID: m.ms.executionInfo.WorkflowTaskStartedEventId, - RequestID: m.ms.executionInfo.WorkflowTaskRequestId, - WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), - Attempt: m.ms.executionInfo.WorkflowTaskAttempt, - AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, - StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), - ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), - TaskQueue: m.ms.CurrentTaskQueue(), - OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime.AsTime(), - Type: m.ms.executionInfo.WorkflowTaskType, - SuggestContinueAsNew: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew, - SuggestContinueAsNewReasons: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons, - HistorySizeBytes: m.ms.executionInfo.WorkflowTaskHistorySizeBytes, - BuildId: m.ms.executionInfo.WorkflowTaskBuildId, - BuildIdRedirectCounter: m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter, - ScheduleToStartTimeoutTask: m.ms.GetWorkflowTaskScheduleToStartTimeoutTask(), - StartToCloseTimeoutTask: m.ms.GetWorkflowTaskStartToCloseTimeoutTask(), - Stamp: m.ms.executionInfo.WorkflowTaskStamp, + Version: m.ms.executionInfo.WorkflowTaskVersion, + ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId, + StartedEventID: m.ms.executionInfo.WorkflowTaskStartedEventId, + RequestID: m.ms.executionInfo.WorkflowTaskRequestId, + WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), + Attempt: m.ms.executionInfo.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, + StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), + ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), + TaskQueue: m.ms.CurrentTaskQueue(), + OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime.AsTime(), + Type: m.ms.executionInfo.WorkflowTaskType, + SuggestContinueAsNew: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew, + SuggestContinueAsNewReasons: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: m.ms.executionInfo.WorkflowTaskTargetWorkerDeploymentVersionChanged, + HistorySizeBytes: m.ms.executionInfo.WorkflowTaskHistorySizeBytes, + BuildId: m.ms.executionInfo.WorkflowTaskBuildId, + BuildIdRedirectCounter: m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter, + ScheduleToStartTimeoutTask: m.ms.GetWorkflowTaskScheduleToStartTimeoutTask(), + StartToCloseTimeoutTask: m.ms.GetWorkflowTaskStartToCloseTimeoutTask(), + Stamp: m.ms.executionInfo.WorkflowTaskStamp, } return wft @@ -1458,6 +1465,7 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro nil, wt.BuildIdRedirectCounter, nil, + false, ) m.ms.hBuilder.FlushAndCreateNewBatch() diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 8c3573b815..39a95ed92b 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2467,27 +2467,18 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask wfTaskStartedEvents = append(wfTaskStartedEvents, event) } } - if enableSuggestCaNOnNewTargetVersion { - // Verify ContinueAsNewSuggested and reasons were sent on the last WFT started event (but not the earlier ones). - s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events - for i, event := range wfTaskStartedEvents { - attr := event.GetWorkflowTaskStartedEventAttributes() - if i == len(wfTaskStartedEvents)-1 { // last event - s.True(attr.GetSuggestContinueAsNew()) - s.Equal(enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED, attr.GetSuggestContinueAsNewReasons()[0]) - } else { // earlier events - s.False(attr.GetSuggestContinueAsNew()) - s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) - } - } - } else { - for _, event := range wfTaskStartedEvents { - attr := event.GetWorkflowTaskStartedEventAttributes() - s.False(attr.GetSuggestContinueAsNew()) - s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + // Verify ContinueAsNewSuggested and reasons were sent on the last WFT started event (but not the earlier ones). + s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events + for i, event := range wfTaskStartedEvents { + attr := event.GetWorkflowTaskStartedEventAttributes() + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + if i == len(wfTaskStartedEvents)-1 { // last event + s.True(attr.GetTargetWorkerDeploymentVersionChanged()) + } else { // earlier events + s.False(attr.GetTargetWorkerDeploymentVersionChanged()) } } - commands := []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, From 86b80cfd983fe9bd7d2669c7618e61955f1ce264 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 21:54:02 -0700 Subject: [PATCH 02/16] add metric --- common/metrics/metric_defs.go | 1 + service/history/workflow/workflow_task_state_machine.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index f5f491f2ff..d206bcfcca 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1334,6 +1334,7 @@ var ( StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count") VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency") SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation") + WorkflowTargetVersionChangedCount = NewCounterDef("workflow_target_version_changed_count") // Continue-as-new WorkflowContinueAsNewCount = NewCounterDef("workflow_continue_as_new_count") diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 98c8b60410..a63ac17031 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -503,6 +503,12 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( } } // emit metric + if targetDeploymentVersionChanged { + metrics.WorkflowTargetVersionChangedCount.With(m.metricsHandler.WithTags( + metrics.NamespaceTag(m.ms.namespaceEntry.Name().String()), + metrics.VersioningBehaviorTag(m.ms.GetEffectiveVersioningBehavior()), + )).Record(1) + } if suggestContinueAsNew { metrics.WorkflowSuggestContinueAsNewCount.With(m.metricsHandler.WithTags( metrics.NamespaceTag(m.ms.namespaceEntry.Name().String()), From 5d138a063173ddecf039719cff4c702e22e3a47f Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 22:06:59 -0700 Subject: [PATCH 03/16] fix test; --- .../history_builder_categorization_test.go | 2 +- .../history/historybuilder/history_builder_test.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/service/history/historybuilder/history_builder_categorization_test.go b/service/history/historybuilder/history_builder_categorization_test.go index 4ea2920673..ee2386a2cc 100644 --- a/service/history/historybuilder/history_builder_categorization_test.go +++ b/service/history/historybuilder/history_builder_categorization_test.go @@ -1238,7 +1238,7 @@ func (s *sutTestingAdapter) AddWorkflowExecutionStartedEvent(_ ...eventConfig) * } func (s *sutTestingAdapter) AddWorkflowTaskStartedEvent(_ ...eventConfig) *historypb.HistoryEvent { - return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, nil) + return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, nil, true) } func (s *sutTestingAdapter) AddWorkflowTaskCompletedEvent(_ ...eventConfig) *historypb.HistoryEvent { diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 34ebdcb9ef..43ac90a75d 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -658,6 +658,7 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { nil, int64(0), nil, + true, ) s.Equal(event, s.flush()) s.Equal(&historypb.HistoryEvent{ @@ -668,12 +669,13 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { Version: s.version, Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: scheduledEventID, - Identity: testIdentity, - RequestId: testRequestID, - SuggestContinueAsNew: false, - SuggestContinueAsNewReasons: nil, - HistorySizeBytes: 123678, + ScheduledEventId: scheduledEventID, + Identity: testIdentity, + RequestId: testRequestID, + SuggestContinueAsNew: false, + SuggestContinueAsNewReasons: nil, + HistorySizeBytes: 123678, + TargetWorkerDeploymentVersionChanged: true, }, }, }, event) From 979c68206a4d42d15abeb570f9cfe401f1b41451 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 22:11:57 -0700 Subject: [PATCH 04/16] separate new field with line break for smaller diff --- .../history/historybuilder/event_factory.go | 17 ++-- .../historybuilder/history_builder_test.go | 13 +-- .../history/interfaces/workflow_task_info.go | 7 +- .../workflow/workflow_task_state_machine.go | 91 ++++++++++--------- 4 files changed, 67 insertions(+), 61 deletions(-) diff --git a/service/history/historybuilder/event_factory.go b/service/history/historybuilder/event_factory.go index 46456f28f6..80d4b445e4 100644 --- a/service/history/historybuilder/event_factory.go +++ b/service/history/historybuilder/event_factory.go @@ -130,15 +130,16 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent( event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime) event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: scheduledEventID, - Identity: identity, - RequestId: requestID, - SuggestContinueAsNew: suggestContinueAsNew, - SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + ScheduledEventId: scheduledEventID, + Identity: identity, + RequestId: requestID, + SuggestContinueAsNew: suggestContinueAsNew, + SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + HistorySizeBytes: historySizeBytes, + WorkerVersion: versioningStamp, + BuildIdRedirectCounter: buildIdRedirectCounter, + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, - HistorySizeBytes: historySizeBytes, - WorkerVersion: versioningStamp, - BuildIdRedirectCounter: buildIdRedirectCounter, }, } return event diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 43ac90a75d..92b26cffaa 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -669,12 +669,13 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { Version: s.version, Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: scheduledEventID, - Identity: testIdentity, - RequestId: testRequestID, - SuggestContinueAsNew: false, - SuggestContinueAsNewReasons: nil, - HistorySizeBytes: 123678, + ScheduledEventId: scheduledEventID, + Identity: testIdentity, + RequestId: testRequestID, + SuggestContinueAsNew: false, + SuggestContinueAsNewReasons: nil, + HistorySizeBytes: 123678, + TargetWorkerDeploymentVersionChanged: true, }, }, diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index 475db95451..1437ec142e 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -47,10 +47,11 @@ type WorkflowTaskInfo struct { // transient event), otherwise a dynamic config change of the suggestion threshold could // cause the WorkflowTaskStarted event that the worker used to not match the event we saved // in history. - SuggestContinueAsNew bool - SuggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason + SuggestContinueAsNew bool + SuggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason + HistorySizeBytes int64 + TargetWorkerDeploymentVersionChanged bool - HistorySizeBytes int64 // BuildIdRedirectCounter tracks the started build ID redirect counter for transient/speculative WFT. This // info is to make sure the right redirect counter is used in the WFT started event created later // for a transient/speculative WFT. diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index a63ac17031..9be65be5aa 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -203,24 +203,25 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( } workflowTask = &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: startedEventID, - RequestID: requestID, - WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, - Attempt: workflowTask.Attempt, - AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, - StartedTime: startedTime, - ScheduledTime: workflowTask.ScheduledTime, - TaskQueue: workflowTask.TaskQueue, - OriginalScheduledTime: workflowTask.OriginalScheduledTime, - Type: workflowTask.Type, - SuggestContinueAsNew: suggestContinueAsNew, - SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: startedEventID, + RequestID: requestID, + WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, + Attempt: workflowTask.Attempt, + AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, + StartedTime: startedTime, + ScheduledTime: workflowTask.ScheduledTime, + TaskQueue: workflowTask.TaskQueue, + OriginalScheduledTime: workflowTask.OriginalScheduledTime, + Type: workflowTask.Type, + SuggestContinueAsNew: suggestContinueAsNew, + SuggestContinueAsNewReasons: suggestContinueAsNewReasons, + HistorySizeBytes: historySizeBytes, + BuildIdRedirectCounter: redirectCounter, + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, - HistorySizeBytes: historySizeBytes, - BuildIdRedirectCounter: redirectCounter, - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" { @@ -1185,14 +1186,15 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( Version: m.ms.currentVersion, Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ - ScheduledEventId: workflowTask.ScheduledEventID, - Identity: identity, - RequestId: workflowTask.RequestID, - SuggestContinueAsNew: workflowTask.SuggestContinueAsNew, - SuggestContinueAsNewReasons: workflowTask.SuggestContinueAsNewReasons, + ScheduledEventId: workflowTask.ScheduledEventID, + Identity: identity, + RequestId: workflowTask.RequestID, + SuggestContinueAsNew: workflowTask.SuggestContinueAsNew, + SuggestContinueAsNewReasons: workflowTask.SuggestContinueAsNewReasons, + HistorySizeBytes: workflowTask.HistorySizeBytes, + WorkerVersion: versioningStamp, + TargetWorkerDeploymentVersionChanged: workflowTask.TargetWorkerDeploymentVersionChanged, - HistorySizeBytes: workflowTask.HistorySizeBytes, - WorkerVersion: versioningStamp, }, }, } @@ -1204,27 +1206,28 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskInfo { wft := &historyi.WorkflowTaskInfo{ - Version: m.ms.executionInfo.WorkflowTaskVersion, - ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId, - StartedEventID: m.ms.executionInfo.WorkflowTaskStartedEventId, - RequestID: m.ms.executionInfo.WorkflowTaskRequestId, - WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), - Attempt: m.ms.executionInfo.WorkflowTaskAttempt, - AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, - StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), - ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), - TaskQueue: m.ms.CurrentTaskQueue(), - OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime.AsTime(), - Type: m.ms.executionInfo.WorkflowTaskType, - SuggestContinueAsNew: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew, - SuggestContinueAsNewReasons: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons, + Version: m.ms.executionInfo.WorkflowTaskVersion, + ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId, + StartedEventID: m.ms.executionInfo.WorkflowTaskStartedEventId, + RequestID: m.ms.executionInfo.WorkflowTaskRequestId, + WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), + Attempt: m.ms.executionInfo.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, + StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), + ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), + TaskQueue: m.ms.CurrentTaskQueue(), + OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime.AsTime(), + Type: m.ms.executionInfo.WorkflowTaskType, + SuggestContinueAsNew: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew, + SuggestContinueAsNewReasons: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons, + HistorySizeBytes: m.ms.executionInfo.WorkflowTaskHistorySizeBytes, + BuildId: m.ms.executionInfo.WorkflowTaskBuildId, + BuildIdRedirectCounter: m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter, + ScheduleToStartTimeoutTask: m.ms.GetWorkflowTaskScheduleToStartTimeoutTask(), + StartToCloseTimeoutTask: m.ms.GetWorkflowTaskStartToCloseTimeoutTask(), + Stamp: m.ms.executionInfo.WorkflowTaskStamp, + TargetWorkerDeploymentVersionChanged: m.ms.executionInfo.WorkflowTaskTargetWorkerDeploymentVersionChanged, - HistorySizeBytes: m.ms.executionInfo.WorkflowTaskHistorySizeBytes, - BuildId: m.ms.executionInfo.WorkflowTaskBuildId, - BuildIdRedirectCounter: m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter, - ScheduleToStartTimeoutTask: m.ms.GetWorkflowTaskScheduleToStartTimeoutTask(), - StartToCloseTimeoutTask: m.ms.GetWorkflowTaskStartToCloseTimeoutTask(), - Stamp: m.ms.executionInfo.WorkflowTaskStamp, } return wft From 835d97ae247de35e19b70cd2e937ad636594733f Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 22:13:27 -0700 Subject: [PATCH 05/16] fmt imports --- service/history/historybuilder/history_builder_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 92b26cffaa..c03f07d4ec 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -675,7 +675,7 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { SuggestContinueAsNew: false, SuggestContinueAsNewReasons: nil, HistorySizeBytes: 123678, - + TargetWorkerDeploymentVersionChanged: true, }, }, From 41cd7725955502d08bb7343288c683a930db1f28 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 23:08:06 -0700 Subject: [PATCH 06/16] mock --- service/history/interfaces/mutable_state_mock.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index c6ce89b78d..75a08d2112 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -1624,18 +1624,18 @@ func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskScheduledEvent(arg0, ar } // ApplyWorkflowTaskStartedEvent mocks base method. -func (m *MockMutableState) ApplyWorkflowTaskStartedEvent(arg0 *WorkflowTaskInfo, arg1, arg2, arg3 int64, arg4 string, arg5 time.Time, arg6 bool, arg7 int64, arg8 *common.WorkerVersionStamp, arg9 int64, arg10 []enums.SuggestContinueAsNewReason) (*WorkflowTaskInfo, error) { +func (m *MockMutableState) ApplyWorkflowTaskStartedEvent(arg0 *WorkflowTaskInfo, arg1, arg2, arg3 int64, arg4 string, arg5 time.Time, arg6 bool, arg7 int64, arg8 *common.WorkerVersionStamp, arg9 int64, arg10 []enums.SuggestContinueAsNewReason, arg11 bool) (*WorkflowTaskInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ApplyWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) + ret := m.ctrl.Call(m, "ApplyWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) ret0, _ := ret[0].(*WorkflowTaskInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ApplyWorkflowTaskStartedEvent indicates an expected call of ApplyWorkflowTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApplyWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).ApplyWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApplyWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).ApplyWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) } // ApplyWorkflowTaskTimedOutEvent mocks base method. From 4eb9b95e739424e8e054cb01dc945710bc757a5d Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 5 Feb 2026 23:22:48 -0700 Subject: [PATCH 07/16] fix unit test build --- service/history/workflow/mutable_state_impl_test.go | 2 ++ service/history/workflow/mutable_state_rebuilder_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 3af55e0066..4918661f40 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2228,6 +2228,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchAppl nil, int64(0), nil, + false, ) s.Nil(err) s.NotNil(wt) @@ -2286,6 +2287,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchAppl nil, int64(0), nil, + false, ) s.Nil(err) s.NotNil(wt) diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index 4418f9a842..966b65a5eb 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -1082,7 +1082,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskStarted() { } s.mockMutableState.EXPECT().ApplyWorkflowTaskStartedEvent( (*historyi.WorkflowTaskInfo)(nil), event.GetVersion(), scheduledEventID, event.GetEventId(), workflowTaskRequestID, timestamp.TimeValue(event.GetEventTime()), - false, gomock.Any(), nil, int64(0), nil, + false, gomock.Any(), nil, int64(0), nil, false, ).Return(wt, nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateStartWorkflowTaskTasks( From 5f9a24f6a1a0920dda6c720549927a50d3b32494 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 14:51:40 -0800 Subject: [PATCH 08/16] add EnableSendTargetVersionChanged dc --- common/dynamicconfig/constants.go | 5 +++++ service/history/configs/config.go | 2 ++ service/history/workflow/workflow_task_state_machine.go | 3 ++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 2ab64dff1b..115d6df13a 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -907,6 +907,11 @@ and deployment interaction in matching and history.`, false, `EnableSuggestCaNOnNewTargetVersion lets Pinned workflows receive SuggestContinueAsNew when a new target version is available.`, ) + EnableSendTargetVersionChanged = NewNamespaceBoolSetting( + "system.EnableSendTargetVersionChanged", + false, + `EnableSendTargetVersionChanged lets Pinned workflows receive TargetWorkerDeploymentVersionChanged=true when a new target version is available for that workflow.`, + ) EnableNexus = NewGlobalBoolSetting( "system.enableNexus", true, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 15456a5ae4..5cdec25db1 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -402,6 +402,7 @@ type Config struct { // Worker-Versioning related settings EnableSuggestCaNOnNewTargetVersion dynamicconfig.BoolPropertyFnWithNamespaceFilter + EnableSendTargetVersionChanged dynamicconfig.BoolPropertyFnWithNamespaceFilter UseRevisionNumberForWorkerVersioning dynamicconfig.BoolPropertyFnWithNamespaceFilter VersionMembershipCacheTTL dynamicconfig.DurationPropertyFn VersionMembershipCacheMaxSize dynamicconfig.IntPropertyFn @@ -770,6 +771,7 @@ func NewConfig( // Worker-Versioning related UseRevisionNumberForWorkerVersioning: dynamicconfig.UseRevisionNumberForWorkerVersioning.Get(dc), EnableSuggestCaNOnNewTargetVersion: dynamicconfig.EnableSuggestCaNOnNewTargetVersion.Get(dc), + EnableSendTargetVersionChanged: dynamicconfig.EnableSendTargetVersionChanged.Get(dc), VersionMembershipCacheTTL: dynamicconfig.VersionMembershipCacheTTL.Get(dc), VersionMembershipCacheMaxSize: dynamicconfig.VersionMembershipCacheMaxSize.Get(dc), RoutingInfoCacheTTL: dynamicconfig.RoutingInfoCacheTTL.Get(dc), diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 9be65be5aa..c2b6346697 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -495,7 +495,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( // that are about to transition to the target version. This is good, because if their transition succeeds, they // don't need to CaN to start using the new version. var targetDeploymentVersionChanged bool - if m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && + if m.ms.config.EnableSendTargetVersionChanged(m.ms.namespaceEntry.Name().String()) && + m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil { if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil && (currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId || From 46869bafc518d6d007e652770766ecf0c97c9add Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 14:52:00 -0800 Subject: [PATCH 09/16] default EnableSendTargetVersionChanged to true --- common/dynamicconfig/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 115d6df13a..aed3d7adca 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -909,7 +909,7 @@ and deployment interaction in matching and history.`, ) EnableSendTargetVersionChanged = NewNamespaceBoolSetting( "system.EnableSendTargetVersionChanged", - false, + true, `EnableSendTargetVersionChanged lets Pinned workflows receive TargetWorkerDeploymentVersionChanged=true when a new target version is available for that workflow.`, ) EnableNexus = NewGlobalBoolSetting( From 8044ddd98c348d49810fa395032f34ec8170afdb Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 14:57:29 -0800 Subject: [PATCH 10/16] use nil safe getters --- service/history/workflow/workflow_task_state_machine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index c2b6346697..a867b8663d 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -499,8 +499,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil { if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil && - (currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId || - currentDeploymentVersion.SeriesName != targetDeploymentVersion.DeploymentName) { + (currentDeploymentVersion.GetBuildId() != targetDeploymentVersion.GetBuildId() || + currentDeploymentVersion.GetSeriesName() != targetDeploymentVersion.GetDeploymentName()) { targetDeploymentVersionChanged = true } } From bc05ed6d3a76e78a943f8b0cb2ef1dec14644de7 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 15:20:59 -0800 Subject: [PATCH 11/16] address cursor comments --- common/dynamicconfig/constants.go | 2 +- common/util_test.go | 1 + service/history/workflow/mutable_state_impl.go | 11 ++++++----- .../history/workflow/workflow_task_state_machine.go | 4 ++-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index aed3d7adca..d4ef04f06d 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -908,7 +908,7 @@ and deployment interaction in matching and history.`, `EnableSuggestCaNOnNewTargetVersion lets Pinned workflows receive SuggestContinueAsNew when a new target version is available.`, ) EnableSendTargetVersionChanged = NewNamespaceBoolSetting( - "system.EnableSendTargetVersionChanged", + "system.enableSendTargetVersionChanged", true, `EnableSendTargetVersionChanged lets Pinned workflows receive TargetWorkerDeploymentVersionChanged=true when a new target version is available for that workflow.`, ) diff --git a/common/util_test.go b/common/util_test.go index 957720438b..c87dd31a85 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -463,6 +463,7 @@ func TestMergeProtoExcludingFields(t *testing.T) { &info.WorkflowTaskType, &info.WorkflowTaskSuggestContinueAsNew, &info.WorkflowTaskSuggestContinueAsNewReasons, + &info.WorkflowTaskTargetWorkerDeploymentVersionChanged, &info.WorkflowTaskHistorySizeBytes, &info.WorkflowTaskBuildId, &info.WorkflowTaskBuildIdRedirectCounter, diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 73f62246b6..4111bf2b3a 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -8697,11 +8697,12 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(), Type: incoming.WorkflowTaskType, - SuggestContinueAsNew: incoming.WorkflowTaskSuggestContinueAsNew, - SuggestContinueAsNewReasons: incoming.WorkflowTaskSuggestContinueAsNewReasons, - HistorySizeBytes: incoming.WorkflowTaskHistorySizeBytes, - BuildId: incoming.WorkflowTaskBuildId, - BuildIdRedirectCounter: incoming.WorkflowTaskBuildIdRedirectCounter, + SuggestContinueAsNew: incoming.WorkflowTaskSuggestContinueAsNew, + SuggestContinueAsNewReasons: incoming.WorkflowTaskSuggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: incoming.WorkflowTaskTargetWorkerDeploymentVersionChanged, + HistorySizeBytes: incoming.WorkflowTaskHistorySizeBytes, + BuildId: incoming.WorkflowTaskBuildId, + BuildIdRedirectCounter: incoming.WorkflowTaskBuildIdRedirectCounter, }) workflowTaskVersionUpdated = true } diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index a867b8663d..efe8890eb5 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -1474,8 +1474,8 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro wt.HistorySizeBytes, nil, wt.BuildIdRedirectCounter, - nil, - false, + wt.SuggestContinueAsNewReasons, + wt.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() From 0e061297937498cbd2e51911ee78afa670a2b0e5 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 15:23:57 -0800 Subject: [PATCH 12/16] minimal changes to test --- tests/versioning_3_test.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 39a95ed92b..69beb1cdad 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2396,9 +2396,9 @@ func (s *Versioning3Suite) makePinnedOverride(tv *testvars.TestVars) *workflowpb // 4. Trigger WFT (mode-dependent: signal (normal task), update (speculative task), or fail+retry(transient task)) // 5. On WFT: confirm ContinueAsNewSuggested=true, issue ContinueAsNew with AUTO_UPGRADE // 6. The new run should start on v2 (current) and be pinned after WFT completion. -func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask, transientTask, pinnedOverride, enableSuggestCaNOnNewTargetVersion bool) { - if enableSuggestCaNOnNewTargetVersion { - s.OverrideDynamicConfig(dynamicconfig.EnableSuggestCaNOnNewTargetVersion, true) +func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask, transientTask, pinnedOverride, enableSendTargetVersionChanged bool) { + if enableSendTargetVersionChanged { + s.OverrideDynamicConfig(dynamicconfig.EnableSendTargetVersionChanged, true) } s.RunTestWithMatchingBehavior(func() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -2467,18 +2467,28 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask wfTaskStartedEvents = append(wfTaskStartedEvents, event) } } - // Verify ContinueAsNewSuggested and reasons were sent on the last WFT started event (but not the earlier ones). - s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events - for i, event := range wfTaskStartedEvents { - attr := event.GetWorkflowTaskStartedEventAttributes() - s.False(attr.GetSuggestContinueAsNew()) - s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) - if i == len(wfTaskStartedEvents)-1 { // last event - s.True(attr.GetTargetWorkerDeploymentVersionChanged()) - } else { // earlier events + if enableSendTargetVersionChanged { + // Verify ContinueAsNewSuggested and reasons were sent on the last WFT started event (but not the earlier ones). + s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events + for i, event := range wfTaskStartedEvents { + attr := event.GetWorkflowTaskStartedEventAttributes() + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + if i == len(wfTaskStartedEvents)-1 { // last event + s.True(attr.GetTargetWorkerDeploymentVersionChanged()) + } else { // earlier events + s.False(attr.GetTargetWorkerDeploymentVersionChanged()) + } + } + } else { + for _, event := range wfTaskStartedEvents { + attr := event.GetWorkflowTaskStartedEventAttributes() + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) s.False(attr.GetTargetWorkerDeploymentVersionChanged()) } } + commands := []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, From 4421b204064882052212a95e678c896d872ab1da Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 15:25:54 -0800 Subject: [PATCH 13/16] minimize change --- tests/versioning_3_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 69beb1cdad..7c0171484a 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2472,11 +2472,13 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events for i, event := range wfTaskStartedEvents { attr := event.GetWorkflowTaskStartedEventAttributes() - s.False(attr.GetSuggestContinueAsNew()) - s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) if i == len(wfTaskStartedEvents)-1 { // last event + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) s.True(attr.GetTargetWorkerDeploymentVersionChanged()) } else { // earlier events + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) s.False(attr.GetTargetWorkerDeploymentVersionChanged()) } } From 1a8fcd8ed7fb44ed9518bc8626ff06f80dba44fd Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 15:51:58 -0800 Subject: [PATCH 14/16] fix tests --- .../history/workflow/workflow_task_state_machine.go | 10 +++++----- tests/versioning_3_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index efe8890eb5..186c0b4b7a 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -491,16 +491,16 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES) } - // checking whether targetDeploymentVersion == nil means that we won't send the CaN recommendation to workflows - // that are about to transition to the target version. This is good, because if their transition succeeds, they - // don't need to CaN to start using the new version. + // checking whether targetDeploymentVersion == nil means that we won't send the targetDeploymentVersionChanged=true + // to workflows that are about to transition to the target version. This is good because if their transition succeeds, + // they don't need to CaN-with-upgrade to start using the new version. var targetDeploymentVersionChanged bool if m.ms.config.EnableSendTargetVersionChanged(m.ms.namespaceEntry.Name().String()) && m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil { if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil && - (currentDeploymentVersion.GetBuildId() != targetDeploymentVersion.GetBuildId() || - currentDeploymentVersion.GetSeriesName() != targetDeploymentVersion.GetDeploymentName()) { + (currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId || + currentDeploymentVersion.SeriesName != targetDeploymentVersion.DeploymentName) { targetDeploymentVersionChanged = true } } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 7c0171484a..61adb0ad59 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2397,8 +2397,8 @@ func (s *Versioning3Suite) makePinnedOverride(tv *testvars.TestVars) *workflowpb // 5. On WFT: confirm ContinueAsNewSuggested=true, issue ContinueAsNew with AUTO_UPGRADE // 6. The new run should start on v2 (current) and be pinned after WFT completion. func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask, transientTask, pinnedOverride, enableSendTargetVersionChanged bool) { - if enableSendTargetVersionChanged { - s.OverrideDynamicConfig(dynamicconfig.EnableSendTargetVersionChanged, true) + if !enableSendTargetVersionChanged { + s.OverrideDynamicConfig(dynamicconfig.EnableSendTargetVersionChanged, false) } s.RunTestWithMatchingBehavior(func() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) From d770db0fe2abfeada4505dc8d387a58d2f4781a0 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 16:14:21 -0800 Subject: [PATCH 15/16] depend on latest api-go main --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c9ddb442af..9ede101db0 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3 + go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 4b128ae3aa..8c256ad316 100644 --- a/go.sum +++ b/go.sum @@ -373,8 +373,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3 h1:OFtCxga70aM+xSpbMGWb+10AJ6TvZ+eH1mmpgSpdEhs= -go.temporal.io/api v1.62.1-0.20260206000006-6754c54899a3/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc h1:hFmYOJKWlLJVG5wfziY8SLv+iXEswyGVnm9c7ebMi7k= +go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= From 1ab9e8ac65de10ccb590330912b409947ff0f4b1 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 11 Feb 2026 16:15:47 -0800 Subject: [PATCH 16/16] remove dead code --- common/metrics/tags.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 2942e117a4..5e5d723138 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -47,7 +47,6 @@ const ( suggestContinueAsNewReasonTooManyUpdates = "suggest_continue_as_new_reason_too_many_updates" suggestContinueAsNewReasonTooManyHistoryEvents = "suggest_continue_as_new_reason_too_many_history_events" suggestContinueAsNewReasonHistorySizeTooLarge = "suggest_continue_as_new_reason_history_size_too_large" - suggestContinueAsNewReasonTargetVersionChanged = "suggest_continue_as_new_reason_target_version_changed" isFirstAttempt = "first-attempt" workflowStatus = "workflow_status" behaviorBefore = "behavior_before" @@ -414,14 +413,6 @@ func ContinueAsNewVersioningBehaviorTag(canBehavior enumspb.ContinueAsNewVersion return Tag{Key: continueAsNewVersioningBehavior, Value: canBehavior.String()} } -func SuggestContinueAsNewReasonTargetVersionChangedTag(present bool) Tag { - v := falseValue - if present { - v = trueValue - } - return Tag{Key: suggestContinueAsNewReasonTargetVersionChanged, Value: v} -} - func SuggestContinueAsNewReasonTooManyUpdatesTag(present bool) Tag { v := falseValue if present {