diff --git a/client/frontend/client_gen.go b/client/frontend/client_gen.go index 312a0f0d03..7b3f3e3aa5 100644 --- a/client/frontend/client_gen.go +++ b/client/frontend/client_gen.go @@ -19,6 +19,16 @@ func (c *clientImpl) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *clientImpl) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountSchedulesResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.CountSchedules(ctx, request, opts...) +} + func (c *clientImpl) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index 87ce92a444..ad15ce5dc4 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -23,6 +23,20 @@ func (c *metricClient) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *metricClient) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.CountSchedulesResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientCountSchedules") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.CountSchedules(ctx, request, opts...) +} + func (c *metricClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/client/frontend/retryable_client_gen.go b/client/frontend/retryable_client_gen.go index 036883dade..a46a3bbfd2 100644 --- a/client/frontend/retryable_client_gen.go +++ b/client/frontend/retryable_client_gen.go @@ -26,6 +26,21 @@ func (c *retryableClient) CountActivityExecutions( return resp, err } +func (c *retryableClient) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountSchedulesResponse, error) { + var resp *workflowservice.CountSchedulesResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.CountSchedules(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/common/api/metadata.go b/common/api/metadata.go index d730c8c609..669f558b29 100644 --- a/common/api/metadata.go +++ b/common/api/metadata.go @@ -129,6 +129,7 @@ var ( "ListScheduleMatchingTimes": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "DeleteSchedule": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "ListSchedules": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, + "CountSchedules": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "UpdateWorkerBuildIdCompatibility": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "GetWorkerBuildIdCompatibility": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "UpdateWorkerVersioningRules": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, diff --git a/common/rpc/interceptor/logtags/workflow_service_server_gen.go b/common/rpc/interceptor/logtags/workflow_service_server_gen.go index 8300fae41d..9857e7341b 100644 --- a/common/rpc/interceptor/logtags/workflow_service_server_gen.go +++ b/common/rpc/interceptor/logtags/workflow_service_server_gen.go @@ -13,6 +13,10 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.CountActivityExecutionsResponse: return nil + case *workflowservice.CountSchedulesRequest: + return nil + case *workflowservice.CountSchedulesResponse: + return nil case *workflowservice.CountWorkflowExecutionsRequest: return nil case *workflowservice.CountWorkflowExecutionsResponse: diff --git a/common/rpc/interceptor/redirection.go b/common/rpc/interceptor/redirection.go index 9cf31f9248..632f884ad5 100644 --- a/common/rpc/interceptor/redirection.go +++ b/common/rpc/interceptor/redirection.go @@ -94,6 +94,7 @@ var ( "PatchSchedule": func() any { return &workflowservice.PatchScheduleResponse{} }, "DeleteSchedule": func() any { return &workflowservice.DeleteScheduleResponse{} }, "ListSchedules": func() any { return &workflowservice.ListSchedulesResponse{} }, + "CountSchedules": func() any { return &workflowservice.CountSchedulesResponse{} }, "ListScheduleMatchingTimes": func() any { return &workflowservice.ListScheduleMatchingTimesResponse{} }, "UpdateWorkerBuildIdCompatibility": func() any { return &workflowservice.UpdateWorkerBuildIdCompatibilityResponse{} }, "GetWorkerBuildIdCompatibility": func() any { return &workflowservice.GetWorkerBuildIdCompatibilityResponse{} }, diff --git a/common/rpc/interceptor/redirection_test.go b/common/rpc/interceptor/redirection_test.go index 86b745305d..7a65e3ad10 100644 --- a/common/rpc/interceptor/redirection_test.go +++ b/common/rpc/interceptor/redirection_test.go @@ -151,6 +151,7 @@ func (s *redirectionInterceptorSuite) TestGlobalAPI() { "PatchSchedule": {}, "DeleteSchedule": {}, "ListSchedules": {}, + "CountSchedules": {}, "ListScheduleMatchingTimes": {}, "UpdateWorkerBuildIdCompatibility": {}, "GetWorkerBuildIdCompatibility": {}, diff --git a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go index 6242b452ba..0ff2f78a3a 100644 --- a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go +++ b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go @@ -62,6 +62,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) CountActivityExecutions(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountActivityExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountActivityExecutions), varargs...) } +// CountSchedules mocks base method. +func (m *MockWorkflowServiceClient) CountSchedules(ctx context.Context, in *workflowservice.CountSchedulesRequest, opts ...grpc.CallOption) (*workflowservice.CountSchedulesResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CountSchedules", varargs...) + ret0, _ := ret[0].(*workflowservice.CountSchedulesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountSchedules indicates an expected call of CountSchedules. +func (mr *MockWorkflowServiceClientMockRecorder) CountSchedules(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountSchedules", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountSchedules), varargs...) +} + // CountWorkflowExecutions mocks base method. func (m *MockWorkflowServiceClient) CountWorkflowExecutions(ctx context.Context, in *workflowservice.CountWorkflowExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.CountWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/go.mod b/go.mod index a660043654..8115f77262 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 + go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 9a46dfd672..f77d1fc2ee 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 h1:Rahqpgjqalbv28RLoOtnNNZvwtnes/sQP0+cisO70Hw= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 3bbc45961a..e886890128 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -197,6 +197,7 @@ var ( // APIs that rely on visibility "/temporal.api.workflowservice.v1.WorkflowService/GetWorkerTaskReachability": 1, "/temporal.api.workflowservice.v1.WorkflowService/ListSchedules": 1, + "/temporal.api.workflowservice.v1.WorkflowService/CountSchedules": 1, "/temporal.api.workflowservice.v1.WorkflowService/ListBatchOperations": 1, "/temporal.api.workflowservice.v1.WorkflowService/DescribeTaskQueueWithReachability": 1, // note this isn't a real method name "/temporal.api.workflowservice.v1.WorkflowService/ListDeployments": 1, diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index 4d64ab6b53..0565aee95c 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -97,6 +97,7 @@ func (s *quotasSuite) TestVisibilityAPIs() { "/temporal.api.workflowservice.v1.WorkflowService/GetWorkerTaskReachability": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListSchedules": {}, + "/temporal.api.workflowservice.v1.WorkflowService/CountSchedules": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListBatchOperations": {}, "/temporal.api.workflowservice.v1.WorkflowService/DescribeTaskQueueWithReachability": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListDeployments": {}, diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 9f51ad6bb1..7b96715298 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4437,7 +4437,25 @@ func (wh *WorkflowHandler) ListSchedules( } chasmEnabled := wh.chasmSchedulerEnabled(ctx, namespaceName.String()) + query, err := wh.prepareSchedulerQuery(chasmEnabled, request.Query, namespaceName) + if err != nil { + return nil, err + } + if chasmEnabled { + // CHASM ListSchedules will include schedules created in the V1/workflow stack. + return wh.listSchedulesChasm(ctx, request, namespaceName, namespaceID, query) + } + return wh.listSchedulesWorkflow(ctx, request, namespaceName, namespaceID, query) +} + +// prepareSchedulerQuery validates a scheduler RPC's query argument, and wraps it +// in the appropriate base query. +func (wh *WorkflowHandler) prepareSchedulerQuery( + chasmEnabled bool, + query string, + namespaceName namespace.Name, +) (string, error) { // Use different base queries based on code path: // - CHASM path uses TemporalSystemExecutionStatus (translated via archetype ID) // - V1 path uses ExecutionStatus directly (no archetype ID available) @@ -4446,29 +4464,27 @@ func (wh *WorkflowHandler) ListSchedules( baseQuery = scheduler.VisibilityListQueryChasm } - query := baseQuery - if strings.TrimSpace(request.Query) != "" { + result := baseQuery + if strings.TrimSpace(query) != "" { saNameType, err := wh.saProvider.GetSearchAttributes(wh.visibilityMgr.GetIndexName(), false) if err != nil { - return nil, serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) + return "", serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) } + if err := scheduler.ValidateVisibilityQuery( namespaceName, saNameType, wh.saMapperProvider, wh.config.VisibilityEnableUnifiedQueryConverter, - request.Query, + query, ); err != nil { - return nil, err + return "", err } - query = fmt.Sprintf("%s AND (%s)", baseQuery, request.Query) - } - if chasmEnabled { - // CHASM ListSchedules will include schedules created in the V1/workflow stack. - return wh.listSchedulesChasm(ctx, request, namespaceName, namespaceID, query) + result = fmt.Sprintf("%s AND (%s)", baseQuery, query) } - return wh.listSchedulesWorkflow(ctx, request, namespaceName, namespaceID, query) + + return result, nil } func (wh *WorkflowHandler) listSchedulesChasm( @@ -4565,6 +4581,103 @@ func (wh *WorkflowHandler) listSchedulesWorkflow( }, nil } +func (wh *WorkflowHandler) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, +) (_ *workflowservice.CountSchedulesResponse, retError error) { + defer log.CapturePanic(wh.logger, &retError) + + if request == nil { + return nil, errRequestNotSet + } + + if !wh.config.EnableSchedules(request.Namespace) { + return nil, errSchedulesNotAllowed + } + + namespaceName := namespace.Name(request.GetNamespace()) + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName) + if err != nil { + return nil, err + } + + if wh.config.DisableListVisibilityByFilter(namespaceName.String()) { + return nil, errListNotAllowed + } + + chasmEnabled := wh.chasmSchedulerEnabled(ctx, namespaceName.String()) + query, err := wh.prepareSchedulerQuery(chasmEnabled, request.Query, namespaceName) + if err != nil { + return nil, err + } + + // Route to CHASM or V1 based on config (same pattern as ListSchedules) + if chasmEnabled { + return wh.countSchedulesChasm(ctx, namespaceID, namespaceName, query) + } + return wh.countSchedulesWorkflow(ctx, namespaceID, namespaceName, query) +} + +// countSchedulesChasm counts schedules using CHASM APIs +func (wh *WorkflowHandler) countSchedulesChasm( + ctx context.Context, + namespaceID namespace.ID, + namespaceName namespace.Name, + query string, +) (*workflowservice.CountSchedulesResponse, error) { + resp, err := chasm.CountExecutions[*chasmscheduler.Scheduler](ctx, &chasm.CountExecutionsRequest{ + NamespaceID: namespaceID.String(), + NamespaceName: namespaceName.String(), + Query: query, + }) + if err != nil { + return nil, err + } + + groups := make([]*workflowservice.CountSchedulesResponse_AggregationGroup, 0, len(resp.Groups)) + for _, g := range resp.Groups { + groups = append(groups, &workflowservice.CountSchedulesResponse_AggregationGroup{ + GroupValues: g.Values, + Count: g.Count, + }) + } + + return &workflowservice.CountSchedulesResponse{ + Count: resp.Count, + Groups: groups, + }, nil +} + +// countSchedulesWorkflow counts schedules using direct visibility query (V1 path) +func (wh *WorkflowHandler) countSchedulesWorkflow( + ctx context.Context, + namespaceID namespace.ID, + namespaceName namespace.Name, + query string, +) (*workflowservice.CountSchedulesResponse, error) { + persistenceResp, err := wh.visibilityMgr.CountWorkflowExecutions(ctx, &manager.CountWorkflowExecutionsRequest{ + NamespaceID: namespaceID, + Namespace: namespaceName, + Query: query, + }) + if err != nil { + return nil, err + } + + groups := make([]*workflowservice.CountSchedulesResponse_AggregationGroup, 0, len(persistenceResp.Groups)) + for _, g := range persistenceResp.Groups { + groups = append(groups, &workflowservice.CountSchedulesResponse_AggregationGroup{ + GroupValues: g.GroupValues, + Count: g.Count, + }) + } + + return &workflowservice.CountSchedulesResponse{ + Count: persistenceResp.Count, + Groups: groups, + }, nil +} + func (wh *WorkflowHandler) UpdateWorkflowExecution( ctx context.Context, request *workflowservice.UpdateWorkflowExecutionRequest, diff --git a/tests/schedule_test.go b/tests/schedule_test.go index b01e5cad19..941ef7d746 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -742,6 +742,13 @@ func (s *ScheduleV1FunctionalSuite) TestCHASMCanListV1Schedules() { }) s.NotNil(v1Entry.GetInfo()) + // Count with V1 handler. + v1CountResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + s.NoError(err) + s.GreaterOrEqual(v1CountResp.Count, int64(1), "Expected at least 1 schedule with V1 handler") + // Flip on CHASM experiment and make sure we can still list. s.newContext = func() context.Context { return metadata.NewOutgoingContext(testcore.NewContext(), metadata.Pairs( @@ -751,6 +758,13 @@ func (s *ScheduleV1FunctionalSuite) TestCHASMCanListV1Schedules() { chasmEntry := s.getScheduleEntryFomVisibility(sid, nil) s.NotNil(chasmEntry.GetInfo()) s.ProtoEqual(chasmEntry.GetInfo(), v1Entry.GetInfo()) + + // Count with CHASM handler and verify it matches V1 count. + chasmCountResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + s.NoError(err) + s.Equal(v1CountResp.Count, chasmCountResp.Count, "CHASM and V1 counts should match") } // TestRefresh applies to V1 scheduler only; V2 does not support/need manual refresh. @@ -1415,3 +1429,83 @@ func (s *ScheduleCHASMFunctionalSuite) TestCreateScheduleAlreadyExists() { s.ErrorAs(err, &alreadyExists) s.Contains(err.Error(), sid) } + +func (s *scheduleFunctionalSuiteBase) TestCountSchedules() { + // Create multiple schedules with different paused states + sidPrefix := "sched-test-count-" + wid := "sched-test-count-wf" + wt := "sched-test-count-wt" + + // Create 3 schedules: 2 active, 1 paused + for i := range 3 { + sid := fmt.Sprintf("%s%d", sidPrefix, i) + paused := i == 2 // Third schedule is paused + + schedule := &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + {Interval: durationpb.New(1 * time.Hour)}, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: fmt.Sprintf("%s-%d", wid, i), + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + }, + }, + }, + State: &schedulepb.ScheduleState{ + Paused: paused, + }, + } + + _, err := s.FrontendClient().CreateSchedule(s.newContext(), &workflowservice.CreateScheduleRequest{ + Namespace: s.Namespace().String(), + ScheduleId: sid, + Schedule: schedule, + Identity: "test", + RequestId: uuid.NewString(), + }) + s.NoError(err) + s.cleanup(sid) + } + + // Wait for schedules to appear in visibility + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + if err != nil { + return false + } + return countResp.Count >= 3 + }, 15*time.Second, 1*time.Second) + + // Test basic count (all schedules) + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + return err == nil && countResp.Count >= 3 + }, 15*time.Second, 1*time.Second, "Expected at least 3 schedules") + + // Test count with query filter for paused schedules + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = true", sadefs.TemporalSchedulePaused), + }) + return err == nil && countResp.Count >= 1 + }, 15*time.Second, 1*time.Second, "Expected at least 1 paused schedule") + + // Test count with query filter for non-paused schedules + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = false", sadefs.TemporalSchedulePaused), + }) + return err == nil && countResp.Count >= 2 + }, 15*time.Second, 1*time.Second, "Expected at least 2 non-paused schedules") +}