diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 2e6d4d5654..2000a2538d 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -152,19 +152,20 @@ type WorkflowExecutionInfo struct { StartTime *timestamppb.Timestamp `protobuf:"bytes,20,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` LastUpdateTime *timestamppb.Timestamp `protobuf:"bytes,21,opt,name=last_update_time,json=lastUpdateTime,proto3" json:"last_update_time,omitempty"` // Workflow task fields. - WorkflowTaskVersion int64 `protobuf:"varint,22,opt,name=workflow_task_version,json=workflowTaskVersion,proto3" json:"workflow_task_version,omitempty"` - WorkflowTaskScheduledEventId int64 `protobuf:"varint,23,opt,name=workflow_task_scheduled_event_id,json=workflowTaskScheduledEventId,proto3" json:"workflow_task_scheduled_event_id,omitempty"` - WorkflowTaskStartedEventId int64 `protobuf:"varint,24,opt,name=workflow_task_started_event_id,json=workflowTaskStartedEventId,proto3" json:"workflow_task_started_event_id,omitempty"` - WorkflowTaskTimeout *durationpb.Duration `protobuf:"bytes,25,opt,name=workflow_task_timeout,json=workflowTaskTimeout,proto3" json:"workflow_task_timeout,omitempty"` - WorkflowTaskAttempt int32 `protobuf:"varint,26,opt,name=workflow_task_attempt,json=workflowTaskAttempt,proto3" json:"workflow_task_attempt,omitempty"` - WorkflowTaskStartedTime *timestamppb.Timestamp `protobuf:"bytes,27,opt,name=workflow_task_started_time,json=workflowTaskStartedTime,proto3" json:"workflow_task_started_time,omitempty"` - WorkflowTaskScheduledTime *timestamppb.Timestamp `protobuf:"bytes,28,opt,name=workflow_task_scheduled_time,json=workflowTaskScheduledTime,proto3" json:"workflow_task_scheduled_time,omitempty"` - WorkflowTaskOriginalScheduledTime *timestamppb.Timestamp `protobuf:"bytes,30,opt,name=workflow_task_original_scheduled_time,json=workflowTaskOriginalScheduledTime,proto3" json:"workflow_task_original_scheduled_time,omitempty"` - WorkflowTaskRequestId string `protobuf:"bytes,31,opt,name=workflow_task_request_id,json=workflowTaskRequestId,proto3" json:"workflow_task_request_id,omitempty"` - WorkflowTaskType v1.WorkflowTaskType `protobuf:"varint,68,opt,name=workflow_task_type,json=workflowTaskType,proto3,enum=temporal.server.api.enums.v1.WorkflowTaskType" json:"workflow_task_type,omitempty"` - WorkflowTaskSuggestContinueAsNew bool `protobuf:"varint,69,opt,name=workflow_task_suggest_continue_as_new,json=workflowTaskSuggestContinueAsNew,proto3" json:"workflow_task_suggest_continue_as_new,omitempty"` - WorkflowTaskSuggestContinueAsNewReasons []v11.SuggestContinueAsNewReason `protobuf:"varint,110,rep,packed,name=workflow_task_suggest_continue_as_new_reasons,json=workflowTaskSuggestContinueAsNewReasons,proto3,enum=temporal.api.enums.v1.SuggestContinueAsNewReason" json:"workflow_task_suggest_continue_as_new_reasons,omitempty"` - WorkflowTaskHistorySizeBytes int64 `protobuf:"varint,70,opt,name=workflow_task_history_size_bytes,json=workflowTaskHistorySizeBytes,proto3" json:"workflow_task_history_size_bytes,omitempty"` + WorkflowTaskVersion int64 `protobuf:"varint,22,opt,name=workflow_task_version,json=workflowTaskVersion,proto3" json:"workflow_task_version,omitempty"` + WorkflowTaskScheduledEventId int64 `protobuf:"varint,23,opt,name=workflow_task_scheduled_event_id,json=workflowTaskScheduledEventId,proto3" json:"workflow_task_scheduled_event_id,omitempty"` + WorkflowTaskStartedEventId int64 `protobuf:"varint,24,opt,name=workflow_task_started_event_id,json=workflowTaskStartedEventId,proto3" json:"workflow_task_started_event_id,omitempty"` + WorkflowTaskTimeout *durationpb.Duration `protobuf:"bytes,25,opt,name=workflow_task_timeout,json=workflowTaskTimeout,proto3" json:"workflow_task_timeout,omitempty"` + WorkflowTaskAttempt int32 `protobuf:"varint,26,opt,name=workflow_task_attempt,json=workflowTaskAttempt,proto3" json:"workflow_task_attempt,omitempty"` + WorkflowTaskStartedTime *timestamppb.Timestamp `protobuf:"bytes,27,opt,name=workflow_task_started_time,json=workflowTaskStartedTime,proto3" json:"workflow_task_started_time,omitempty"` + WorkflowTaskScheduledTime *timestamppb.Timestamp `protobuf:"bytes,28,opt,name=workflow_task_scheduled_time,json=workflowTaskScheduledTime,proto3" json:"workflow_task_scheduled_time,omitempty"` + WorkflowTaskOriginalScheduledTime *timestamppb.Timestamp `protobuf:"bytes,30,opt,name=workflow_task_original_scheduled_time,json=workflowTaskOriginalScheduledTime,proto3" json:"workflow_task_original_scheduled_time,omitempty"` + WorkflowTaskRequestId string `protobuf:"bytes,31,opt,name=workflow_task_request_id,json=workflowTaskRequestId,proto3" json:"workflow_task_request_id,omitempty"` + WorkflowTaskType v1.WorkflowTaskType `protobuf:"varint,68,opt,name=workflow_task_type,json=workflowTaskType,proto3,enum=temporal.server.api.enums.v1.WorkflowTaskType" json:"workflow_task_type,omitempty"` + WorkflowTaskSuggestContinueAsNew bool `protobuf:"varint,69,opt,name=workflow_task_suggest_continue_as_new,json=workflowTaskSuggestContinueAsNew,proto3" json:"workflow_task_suggest_continue_as_new,omitempty"` + WorkflowTaskSuggestContinueAsNewReasons []v11.SuggestContinueAsNewReason `protobuf:"varint,110,rep,packed,name=workflow_task_suggest_continue_as_new_reasons,json=workflowTaskSuggestContinueAsNewReasons,proto3,enum=temporal.api.enums.v1.SuggestContinueAsNewReason" json:"workflow_task_suggest_continue_as_new_reasons,omitempty"` + WorkflowTaskTargetWorkerDeploymentVersionChanged bool `protobuf:"varint,112,opt,name=workflow_task_target_worker_deployment_version_changed,json=workflowTaskTargetWorkerDeploymentVersionChanged,proto3" json:"workflow_task_target_worker_deployment_version_changed,omitempty"` + WorkflowTaskHistorySizeBytes int64 `protobuf:"varint,70,opt,name=workflow_task_history_size_bytes,json=workflowTaskHistorySizeBytes,proto3" json:"workflow_task_history_size_bytes,omitempty"` // tracks the started build ID for transient/speculative WFT. This info is used for two purposes: // - verify WFT completes by the same Build ID that started in the latest attempt // - when persisting transient/speculative WFT, the right Build ID is used in the WFT started event @@ -589,6 +590,13 @@ func (x *WorkflowExecutionInfo) GetWorkflowTaskSuggestContinueAsNewReasons() []v return nil } +func (x *WorkflowExecutionInfo) GetWorkflowTaskTargetWorkerDeploymentVersionChanged() bool { + if x != nil { + return x.WorkflowTaskTargetWorkerDeploymentVersionChanged + } + return false +} + func (x *WorkflowExecutionInfo) GetWorkflowTaskHistorySizeBytes() int64 { if x != nil { return x.WorkflowTaskHistorySizeBytes @@ -4572,7 +4580,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\x05R\x03key\x12D\n" + "\x05value\x18\x02 \x01(\v2..temporal.server.api.persistence.v1.QueueStateR\x05value:\x028\x01J\x04\b\x04\x10\x05J\x04\b\x05\x10\x06J\x04\b\b\x10\tJ\x04\b\t\x10\n" + "J\x04\b\n" + - "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xeb?\n" + + "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xdd@\n" + "\x15WorkflowExecutionInfo\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12\x1f\n" + "\vworkflow_id\x18\x02 \x01(\tR\n" + @@ -4606,7 +4614,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x18workflow_task_request_id\x18\x1f \x01(\tR\x15workflowTaskRequestId\x12\\\n" + "\x12workflow_task_type\x18D \x01(\x0e2..temporal.server.api.enums.v1.WorkflowTaskTypeR\x10workflowTaskType\x12O\n" + "%workflow_task_suggest_continue_as_new\x18E \x01(\bR workflowTaskSuggestContinueAsNew\x12\x91\x01\n" + - "-workflow_task_suggest_continue_as_new_reasons\x18n \x03(\x0e21.temporal.api.enums.v1.SuggestContinueAsNewReasonR'workflowTaskSuggestContinueAsNewReasons\x12F\n" + + "-workflow_task_suggest_continue_as_new_reasons\x18n \x03(\x0e21.temporal.api.enums.v1.SuggestContinueAsNewReasonR'workflowTaskSuggestContinueAsNewReasons\x12p\n" + + "6workflow_task_target_worker_deployment_version_changed\x18p \x01(\bR0workflowTaskTargetWorkerDeploymentVersionChanged\x12F\n" + " workflow_task_history_size_bytes\x18F \x01(\x03R\x1cworkflowTaskHistorySizeBytes\x123\n" + "\x16workflow_task_build_id\x18X \x01(\tR\x13workflowTaskBuildId\x12S\n" + "'workflow_task_build_id_redirect_counter\x18Y \x01(\x03R\"workflowTaskBuildIdRedirectCounter\x12.\n" + diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f492142c55..e69429facb 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -907,6 +907,11 @@ and deployment interaction in matching and history.`, false, `EnableSuggestCaNOnNewTargetVersion lets Pinned workflows receive SuggestContinueAsNew when a new target version is available.`, ) + EnableSendTargetVersionChanged = NewNamespaceBoolSetting( + "system.enableSendTargetVersionChanged", + true, + `EnableSendTargetVersionChanged lets Pinned workflows receive TargetWorkerDeploymentVersionChanged=true when a new target version is available for that workflow.`, + ) EnableNexus = NewGlobalBoolSetting( "system.enableNexus", true, diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 98f8d0cddf..3a351b65a7 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1337,6 +1337,7 @@ var ( StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count") VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency") SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation") + WorkflowTargetVersionChangedCount = NewCounterDef("workflow_target_version_changed_count") // Continue-as-new WorkflowContinueAsNewCount = NewCounterDef("workflow_continue_as_new_count") diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 2942e117a4..5e5d723138 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -47,7 +47,6 @@ const ( suggestContinueAsNewReasonTooManyUpdates = "suggest_continue_as_new_reason_too_many_updates" suggestContinueAsNewReasonTooManyHistoryEvents = "suggest_continue_as_new_reason_too_many_history_events" suggestContinueAsNewReasonHistorySizeTooLarge = "suggest_continue_as_new_reason_history_size_too_large" - suggestContinueAsNewReasonTargetVersionChanged = "suggest_continue_as_new_reason_target_version_changed" isFirstAttempt = "first-attempt" workflowStatus = "workflow_status" behaviorBefore = "behavior_before" @@ -414,14 +413,6 @@ func ContinueAsNewVersioningBehaviorTag(canBehavior enumspb.ContinueAsNewVersion return Tag{Key: continueAsNewVersioningBehavior, Value: canBehavior.String()} } -func SuggestContinueAsNewReasonTargetVersionChangedTag(present bool) Tag { - v := falseValue - if present { - v = trueValue - } - return Tag{Key: suggestContinueAsNewReasonTargetVersionChanged, Value: v} -} - func SuggestContinueAsNewReasonTooManyUpdatesTag(present bool) Tag { v := falseValue if present { diff --git a/common/util_test.go b/common/util_test.go index 957720438b..c87dd31a85 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -463,6 +463,7 @@ func TestMergeProtoExcludingFields(t *testing.T) { &info.WorkflowTaskType, &info.WorkflowTaskSuggestContinueAsNew, &info.WorkflowTaskSuggestContinueAsNewReasons, + &info.WorkflowTaskTargetWorkerDeploymentVersionChanged, &info.WorkflowTaskHistorySizeBytes, &info.WorkflowTaskBuildId, &info.WorkflowTaskBuildIdRedirectCounter, diff --git a/go.mod b/go.mod index e4462785fe..68eddc1bff 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.62.1 + go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index d81b84166e..84a7d4252c 100644 --- a/go.sum +++ b/go.sum @@ -375,8 +375,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc h1:hFmYOJKWlLJVG5wfziY8SLv+iXEswyGVnm9c7ebMi7k= +go.temporal.io/api v1.62.2-0.20260212001044-b64b7ab17efc/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 116770bb82..0a025649ae 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -90,6 +90,7 @@ message WorkflowExecutionInfo { temporal.server.api.enums.v1.WorkflowTaskType workflow_task_type = 68; bool workflow_task_suggest_continue_as_new = 69; repeated temporal.api.enums.v1.SuggestContinueAsNewReason workflow_task_suggest_continue_as_new_reasons = 110; + bool workflow_task_target_worker_deployment_version_changed = 112; int64 workflow_task_history_size_bytes = 70; // tracks the started build ID for transient/speculative WFT. This info is used for two purposes: // - verify WFT completes by the same Build ID that started in the latest attempt diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 15456a5ae4..5cdec25db1 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -402,6 +402,7 @@ type Config struct { // Worker-Versioning related settings EnableSuggestCaNOnNewTargetVersion dynamicconfig.BoolPropertyFnWithNamespaceFilter + EnableSendTargetVersionChanged dynamicconfig.BoolPropertyFnWithNamespaceFilter UseRevisionNumberForWorkerVersioning dynamicconfig.BoolPropertyFnWithNamespaceFilter VersionMembershipCacheTTL dynamicconfig.DurationPropertyFn VersionMembershipCacheMaxSize dynamicconfig.IntPropertyFn @@ -770,6 +771,7 @@ func NewConfig( // Worker-Versioning related UseRevisionNumberForWorkerVersioning: dynamicconfig.UseRevisionNumberForWorkerVersioning.Get(dc), EnableSuggestCaNOnNewTargetVersion: dynamicconfig.EnableSuggestCaNOnNewTargetVersion.Get(dc), + EnableSendTargetVersionChanged: dynamicconfig.EnableSendTargetVersionChanged.Get(dc), VersionMembershipCacheTTL: dynamicconfig.VersionMembershipCacheTTL.Get(dc), VersionMembershipCacheMaxSize: dynamicconfig.VersionMembershipCacheMaxSize.Get(dc), RoutingInfoCacheTTL: dynamicconfig.RoutingInfoCacheTTL.Get(dc), diff --git a/service/history/historybuilder/event_factory.go b/service/history/historybuilder/event_factory.go index 33627bbe62..80d4b445e4 100644 --- a/service/history/historybuilder/event_factory.go +++ b/service/history/historybuilder/event_factory.go @@ -125,6 +125,7 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, buildIdRedirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) *historypb.HistoryEvent { event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime) event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ @@ -137,6 +138,8 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent( HistorySizeBytes: historySizeBytes, WorkerVersion: versioningStamp, BuildIdRedirectCounter: buildIdRedirectCounter, + + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, }, } return event diff --git a/service/history/historybuilder/history_builder.go b/service/history/historybuilder/history_builder.go index 024078fd09..993659e4c2 100644 --- a/service/history/historybuilder/history_builder.go +++ b/service/history/historybuilder/history_builder.go @@ -198,6 +198,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, buildIdRedirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) *historypb.HistoryEvent { event := b.EventFactory.CreateWorkflowTaskStartedEvent( scheduledEventID, @@ -209,6 +210,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent( versioningStamp, buildIdRedirectCounter, suggestContinueAsNewReasons, + targetWorkerDeploymentVersionChanged, ) event, _ = b.EventStore.add(event) return event diff --git a/service/history/historybuilder/history_builder_categorization_test.go b/service/history/historybuilder/history_builder_categorization_test.go index 4ea2920673..ee2386a2cc 100644 --- a/service/history/historybuilder/history_builder_categorization_test.go +++ b/service/history/historybuilder/history_builder_categorization_test.go @@ -1238,7 +1238,7 @@ func (s *sutTestingAdapter) AddWorkflowExecutionStartedEvent(_ ...eventConfig) * } func (s *sutTestingAdapter) AddWorkflowTaskStartedEvent(_ ...eventConfig) *historypb.HistoryEvent { - return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, nil) + return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, nil, true) } func (s *sutTestingAdapter) AddWorkflowTaskCompletedEvent(_ ...eventConfig) *historypb.HistoryEvent { diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 34ebdcb9ef..c03f07d4ec 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -658,6 +658,7 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { nil, int64(0), nil, + true, ) s.Equal(event, s.flush()) s.Equal(&historypb.HistoryEvent{ @@ -674,6 +675,8 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() { SuggestContinueAsNew: false, SuggestContinueAsNewReasons: nil, HistorySizeBytes: 123678, + + TargetWorkerDeploymentVersionChanged: true, }, }, }, event) diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index be77e7e16b..68aedfd2ac 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -243,7 +243,7 @@ type ( ApplyWorkflowTaskCompletedEvent(*historypb.HistoryEvent) error ApplyWorkflowTaskFailedEvent() error ApplyWorkflowTaskScheduledEvent(int64, int64, *taskqueuepb.TaskQueue, *durationpb.Duration, int32, *timestamppb.Timestamp, *timestamppb.Timestamp, enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) - ApplyWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time, bool, int64, *commonpb.WorkerVersionStamp, int64, []enumspb.SuggestContinueAsNewReason) (*WorkflowTaskInfo, error) + ApplyWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time, bool, int64, *commonpb.WorkerVersionStamp, int64, []enumspb.SuggestContinueAsNewReason, bool) (*WorkflowTaskInfo, error) ApplyWorkflowTaskTimedOutEvent(enumspb.TimeoutType) error ApplyExternalWorkflowExecutionCancelRequested(*historypb.HistoryEvent) error ApplyExternalWorkflowExecutionSignaled(*historypb.HistoryEvent) error diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 3150e5873c..103a53840e 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -1624,18 +1624,18 @@ func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskScheduledEvent(arg0, ar } // ApplyWorkflowTaskStartedEvent mocks base method. -func (m *MockMutableState) ApplyWorkflowTaskStartedEvent(arg0 *WorkflowTaskInfo, arg1, arg2, arg3 int64, arg4 string, arg5 time.Time, arg6 bool, arg7 int64, arg8 *common.WorkerVersionStamp, arg9 int64, arg10 []enums.SuggestContinueAsNewReason) (*WorkflowTaskInfo, error) { +func (m *MockMutableState) ApplyWorkflowTaskStartedEvent(arg0 *WorkflowTaskInfo, arg1, arg2, arg3 int64, arg4 string, arg5 time.Time, arg6 bool, arg7 int64, arg8 *common.WorkerVersionStamp, arg9 int64, arg10 []enums.SuggestContinueAsNewReason, arg11 bool) (*WorkflowTaskInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ApplyWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) + ret := m.ctrl.Call(m, "ApplyWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) ret0, _ := ret[0].(*WorkflowTaskInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ApplyWorkflowTaskStartedEvent indicates an expected call of ApplyWorkflowTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) ApplyWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApplyWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).ApplyWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApplyWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).ApplyWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) } // ApplyWorkflowTaskTimedOutEvent mocks base method. diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index 2d21e2fc90..1437ec142e 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -50,6 +50,8 @@ type WorkflowTaskInfo struct { SuggestContinueAsNew bool SuggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason HistorySizeBytes int64 + + TargetWorkerDeploymentVersionChanged bool // BuildIdRedirectCounter tracks the started build ID redirect counter for transient/speculative WFT. This // info is to make sure the right redirect counter is used in the WFT started event created later // for a transient/speculative WFT. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index dd59c02102..f742905291 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -3251,10 +3251,11 @@ func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, redirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) (*historyi.WorkflowTaskInfo, error) { return ms.workflowTaskManager.ApplyWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp, suggestContinueAsNew, historySizeBytes, versioningStamp, redirectCounter, - suggestContinueAsNewReasons) + suggestContinueAsNewReasons, targetWorkerDeploymentVersionChanged) } // TODO (alex-update): Transient needs to be renamed to "TransientOrSpeculative" @@ -8696,11 +8697,12 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(), Type: incoming.WorkflowTaskType, - SuggestContinueAsNew: incoming.WorkflowTaskSuggestContinueAsNew, - SuggestContinueAsNewReasons: incoming.WorkflowTaskSuggestContinueAsNewReasons, - HistorySizeBytes: incoming.WorkflowTaskHistorySizeBytes, - BuildId: incoming.WorkflowTaskBuildId, - BuildIdRedirectCounter: incoming.WorkflowTaskBuildIdRedirectCounter, + SuggestContinueAsNew: incoming.WorkflowTaskSuggestContinueAsNew, + SuggestContinueAsNewReasons: incoming.WorkflowTaskSuggestContinueAsNewReasons, + TargetWorkerDeploymentVersionChanged: incoming.WorkflowTaskTargetWorkerDeploymentVersionChanged, + HistorySizeBytes: incoming.WorkflowTaskHistorySizeBytes, + BuildId: incoming.WorkflowTaskBuildId, + BuildIdRedirectCounter: incoming.WorkflowTaskBuildIdRedirectCounter, }) workflowTaskVersionUpdated = true } diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index b167425196..2b302fd7da 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2229,6 +2229,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchAppl nil, int64(0), nil, + false, ) s.Nil(err) s.NotNil(wt) @@ -2287,6 +2288,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchAppl nil, int64(0), nil, + false, ) s.Nil(err) s.NotNil(wt) diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index 77bd3c2765..2724af3099 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -237,6 +237,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( attributes.GetWorkerVersion(), attributes.GetBuildIdRedirectCounter(), attributes.GetSuggestContinueAsNewReasons(), + attributes.GetTargetWorkerDeploymentVersionChanged(), ) if err != nil { return nil, err diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index 4418f9a842..966b65a5eb 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -1082,7 +1082,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskStarted() { } s.mockMutableState.EXPECT().ApplyWorkflowTaskStartedEvent( (*historyi.WorkflowTaskInfo)(nil), event.GetVersion(), scheduledEventID, event.GetEventId(), workflowTaskRequestID, timestamp.TimeValue(event.GetEventTime()), - false, gomock.Any(), nil, int64(0), nil, + false, gomock.Any(), nil, int64(0), nil, false, ).Return(wt, nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateStartWorkflowTaskTasks( diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index e7d0628d1c..186c0b4b7a 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -178,6 +178,7 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, redirectCounter int64, suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason, + targetWorkerDeploymentVersionChanged bool, ) (*historyi.WorkflowTaskInfo, error) { // When this function is called from ApplyEvents, workflowTask is nil. // It is safe to look up the workflow task as it does not have to deal with transient workflow task case. @@ -219,6 +220,8 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( HistorySizeBytes: historySizeBytes, BuildIdRedirectCounter: redirectCounter, Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + + TargetWorkerDeploymentVersionChanged: targetWorkerDeploymentVersionChanged, } if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" { @@ -488,20 +491,26 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES) } - // checking whether targetDeploymentVersion == nil means that we won't send the CaN recommendation to workflows - // that are about to transition to the target version. This is good, because if their transition succeeds, they - // don't need to CaN to start using the new version. - if m.ms.config.EnableSuggestCaNOnNewTargetVersion(m.ms.namespaceEntry.Name().String()) && + // checking whether targetDeploymentVersion == nil means that we won't send the targetDeploymentVersionChanged=true + // to workflows that are about to transition to the target version. This is good because if their transition succeeds, + // they don't need to CaN-with-upgrade to start using the new version. + var targetDeploymentVersionChanged bool + if m.ms.config.EnableSendTargetVersionChanged(m.ms.namespaceEntry.Name().String()) && m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil { if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil && (currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId || currentDeploymentVersion.SeriesName != targetDeploymentVersion.DeploymentName) { - suggestContinueAsNew = cmp.Or(suggestContinueAsNew, true) - suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED) + targetDeploymentVersionChanged = true } } // emit metric + if targetDeploymentVersionChanged { + metrics.WorkflowTargetVersionChangedCount.With(m.metricsHandler.WithTags( + metrics.NamespaceTag(m.ms.namespaceEntry.Name().String()), + metrics.VersioningBehaviorTag(m.ms.GetEffectiveVersioningBehavior()), + )).Record(1) + } if suggestContinueAsNew { metrics.WorkflowSuggestContinueAsNewCount.With(m.metricsHandler.WithTags( metrics.NamespaceTag(m.ms.namespaceEntry.Name().String()), @@ -512,8 +521,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE)), metrics.SuggestContinueAsNewReasonTooManyHistoryEventsTag( slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS)), - metrics.SuggestContinueAsNewReasonTargetVersionChangedTag( - slices.Contains(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED)), )).Record(1) } @@ -558,6 +565,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( versioningStamp, redirectCounter, suggestContinueAsNewReasons, + targetDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() startedEventID = startedEvent.GetEventId() @@ -575,6 +583,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( versioningStamp, redirectCounter, suggestContinueAsNewReasons, + targetDeploymentVersionChanged, ) if err != nil { return nil, nil, err @@ -751,6 +760,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( request.WorkerVersionStamp, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -841,6 +851,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( versioningStamp, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -913,6 +924,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( nil, workflowTask.BuildIdRedirectCounter, workflowTask.SuggestContinueAsNewReasons, + workflowTask.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() @@ -1081,6 +1093,7 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.executionInfo.WorkflowTaskType = workflowTask.Type m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew = workflowTask.SuggestContinueAsNew m.ms.executionInfo.WorkflowTaskSuggestContinueAsNewReasons = workflowTask.SuggestContinueAsNewReasons + m.ms.executionInfo.WorkflowTaskTargetWorkerDeploymentVersionChanged = workflowTask.TargetWorkerDeploymentVersionChanged m.ms.executionInfo.WorkflowTaskHistorySizeBytes = workflowTask.HistorySizeBytes m.ms.executionInfo.WorkflowTaskBuildId = workflowTask.BuildId m.ms.executionInfo.WorkflowTaskBuildIdRedirectCounter = workflowTask.BuildIdRedirectCounter @@ -1181,6 +1194,8 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( SuggestContinueAsNewReasons: workflowTask.SuggestContinueAsNewReasons, HistorySizeBytes: workflowTask.HistorySizeBytes, WorkerVersion: versioningStamp, + + TargetWorkerDeploymentVersionChanged: workflowTask.TargetWorkerDeploymentVersionChanged, }, }, } @@ -1212,6 +1227,8 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskI ScheduleToStartTimeoutTask: m.ms.GetWorkflowTaskScheduleToStartTimeoutTask(), StartToCloseTimeoutTask: m.ms.GetWorkflowTaskStartToCloseTimeoutTask(), Stamp: m.ms.executionInfo.WorkflowTaskStamp, + + TargetWorkerDeploymentVersionChanged: m.ms.executionInfo.WorkflowTaskTargetWorkerDeploymentVersionChanged, } return wft @@ -1457,7 +1474,8 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro wt.HistorySizeBytes, nil, wt.BuildIdRedirectCounter, - nil, + wt.SuggestContinueAsNewReasons, + wt.TargetWorkerDeploymentVersionChanged, ) m.ms.hBuilder.FlushAndCreateNewBatch() diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index e2e4fd7f85..b447c07a38 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2385,9 +2385,9 @@ func (s *Versioning3Suite) makePinnedOverride(tv *testvars.TestVars) *workflowpb // 4. Trigger WFT (mode-dependent: signal (normal task), update (speculative task), or fail+retry(transient task)) // 5. On WFT: confirm ContinueAsNewSuggested=true, issue ContinueAsNew with AUTO_UPGRADE // 6. The new run should start on v2 (current) and be pinned after WFT completion. -func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask, transientTask, pinnedOverride, enableSuggestCaNOnNewTargetVersion bool) { - if enableSuggestCaNOnNewTargetVersion { - s.OverrideDynamicConfig(dynamicconfig.EnableSuggestCaNOnNewTargetVersion, true) +func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask, transientTask, pinnedOverride, enableSendTargetVersionChanged bool) { + if !enableSendTargetVersionChanged { + s.OverrideDynamicConfig(dynamicconfig.EnableSendTargetVersionChanged, false) } s.RunTestWithMatchingBehavior(func() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -2456,17 +2456,19 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask wfTaskStartedEvents = append(wfTaskStartedEvents, event) } } - if enableSuggestCaNOnNewTargetVersion { + if enableSendTargetVersionChanged { // Verify ContinueAsNewSuggested and reasons were sent on the last WFT started event (but not the earlier ones). s.Greater(len(wfTaskStartedEvents), 2) // make sure there are at least 2 WFT started events for i, event := range wfTaskStartedEvents { attr := event.GetWorkflowTaskStartedEventAttributes() if i == len(wfTaskStartedEvents)-1 { // last event - s.True(attr.GetSuggestContinueAsNew()) - s.Equal(enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED, attr.GetSuggestContinueAsNewReasons()[0]) + s.False(attr.GetSuggestContinueAsNew()) + s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + s.True(attr.GetTargetWorkerDeploymentVersionChanged()) } else { // earlier events s.False(attr.GetSuggestContinueAsNew()) s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + s.False(attr.GetTargetWorkerDeploymentVersionChanged()) } } } else { @@ -2474,6 +2476,7 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask attr := event.GetWorkflowTaskStartedEventAttributes() s.False(attr.GetSuggestContinueAsNew()) s.Require().Empty(attr.GetSuggestContinueAsNewReasons()) + s.False(attr.GetTargetWorkerDeploymentVersionChanged()) } }