From f9ea9c1213b9789ae3773ab0fe84da02d66babb4 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Thu, 15 Jan 2026 14:26:30 -0800 Subject: [PATCH 1/5] [Scheduler] CountSchedules implementation for V1 and V2 --- client/frontend/client_gen.go | 10 ++ client/frontend/metric_client_gen.go | 14 +++ client/frontend/retryable_client_gen.go | 15 +++ .../logtags/workflow_service_server_gen.go | 4 + common/rpc/interceptor/redirection.go | 1 + .../v1/service_grpc.pb.mock.go | 20 ++++ service/frontend/configs/quotas.go | 1 + service/frontend/workflow_handler.go | 110 ++++++++++++++++++ tests/schedule_test.go | 77 ++++++++++++ 9 files changed, 252 insertions(+) diff --git a/client/frontend/client_gen.go b/client/frontend/client_gen.go index 312a0f0d03..7b3f3e3aa5 100644 --- a/client/frontend/client_gen.go +++ b/client/frontend/client_gen.go @@ -19,6 +19,16 @@ func (c *clientImpl) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *clientImpl) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountSchedulesResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.CountSchedules(ctx, request, opts...) +} + func (c *clientImpl) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index 87ce92a444..ad15ce5dc4 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -23,6 +23,20 @@ func (c *metricClient) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *metricClient) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.CountSchedulesResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientCountSchedules") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.CountSchedules(ctx, request, opts...) +} + func (c *metricClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/client/frontend/retryable_client_gen.go b/client/frontend/retryable_client_gen.go index 036883dade..a46a3bbfd2 100644 --- a/client/frontend/retryable_client_gen.go +++ b/client/frontend/retryable_client_gen.go @@ -26,6 +26,21 @@ func (c *retryableClient) CountActivityExecutions( return resp, err } +func (c *retryableClient) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountSchedulesResponse, error) { + var resp *workflowservice.CountSchedulesResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.CountSchedules(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, diff --git a/common/rpc/interceptor/logtags/workflow_service_server_gen.go b/common/rpc/interceptor/logtags/workflow_service_server_gen.go index 8300fae41d..9857e7341b 100644 --- a/common/rpc/interceptor/logtags/workflow_service_server_gen.go +++ b/common/rpc/interceptor/logtags/workflow_service_server_gen.go @@ -13,6 +13,10 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.CountActivityExecutionsResponse: return nil + case *workflowservice.CountSchedulesRequest: + return nil + case *workflowservice.CountSchedulesResponse: + return nil case *workflowservice.CountWorkflowExecutionsRequest: return nil case *workflowservice.CountWorkflowExecutionsResponse: diff --git a/common/rpc/interceptor/redirection.go b/common/rpc/interceptor/redirection.go index 9cf31f9248..632f884ad5 100644 --- a/common/rpc/interceptor/redirection.go +++ b/common/rpc/interceptor/redirection.go @@ -94,6 +94,7 @@ var ( "PatchSchedule": func() any { return &workflowservice.PatchScheduleResponse{} }, "DeleteSchedule": func() any { return &workflowservice.DeleteScheduleResponse{} }, "ListSchedules": func() any { return &workflowservice.ListSchedulesResponse{} }, + "CountSchedules": func() any { return &workflowservice.CountSchedulesResponse{} }, "ListScheduleMatchingTimes": func() any { return &workflowservice.ListScheduleMatchingTimesResponse{} }, "UpdateWorkerBuildIdCompatibility": func() any { return &workflowservice.UpdateWorkerBuildIdCompatibilityResponse{} }, "GetWorkerBuildIdCompatibility": func() any { return &workflowservice.GetWorkerBuildIdCompatibilityResponse{} }, diff --git a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go index 6242b452ba..0ff2f78a3a 100644 --- a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go +++ b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go @@ -62,6 +62,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) CountActivityExecutions(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountActivityExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountActivityExecutions), varargs...) } +// CountSchedules mocks base method. +func (m *MockWorkflowServiceClient) CountSchedules(ctx context.Context, in *workflowservice.CountSchedulesRequest, opts ...grpc.CallOption) (*workflowservice.CountSchedulesResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CountSchedules", varargs...) + ret0, _ := ret[0].(*workflowservice.CountSchedulesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountSchedules indicates an expected call of CountSchedules. +func (mr *MockWorkflowServiceClientMockRecorder) CountSchedules(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountSchedules", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountSchedules), varargs...) +} + // CountWorkflowExecutions mocks base method. func (m *MockWorkflowServiceClient) CountWorkflowExecutions(ctx context.Context, in *workflowservice.CountWorkflowExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.CountWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 3bbc45961a..e886890128 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -197,6 +197,7 @@ var ( // APIs that rely on visibility "/temporal.api.workflowservice.v1.WorkflowService/GetWorkerTaskReachability": 1, "/temporal.api.workflowservice.v1.WorkflowService/ListSchedules": 1, + "/temporal.api.workflowservice.v1.WorkflowService/CountSchedules": 1, "/temporal.api.workflowservice.v1.WorkflowService/ListBatchOperations": 1, "/temporal.api.workflowservice.v1.WorkflowService/DescribeTaskQueueWithReachability": 1, // note this isn't a real method name "/temporal.api.workflowservice.v1.WorkflowService/ListDeployments": 1, diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index aab61f305b..29a8dc3ee6 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4557,6 +4557,116 @@ func (wh *WorkflowHandler) listSchedulesWorkflow( }, nil } +func (wh *WorkflowHandler) CountSchedules( + ctx context.Context, + request *workflowservice.CountSchedulesRequest, +) (_ *workflowservice.CountSchedulesResponse, retError error) { + defer log.CapturePanic(wh.logger, &retError) + + if request == nil { + return nil, errRequestNotSet + } + + if !wh.config.EnableSchedules(request.Namespace) { + return nil, errSchedulesNotAllowed + } + + namespaceName := namespace.Name(request.GetNamespace()) + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName) + if err != nil { + return nil, err + } + + if wh.config.DisableListVisibilityByFilter(namespaceName.String()) { + return nil, errListNotAllowed + } + + // Build query with base filter (same as ListSchedules) + query := scheduler.VisibilityBaseListQuery + if strings.TrimSpace(request.Query) != "" { + saNameType, err := wh.saProvider.GetSearchAttributes(wh.visibilityMgr.GetIndexName(), false) + if err != nil { + return nil, serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) + } + if err := scheduler.ValidateVisibilityQuery( + namespaceName, + saNameType, + wh.saMapperProvider, + wh.config.VisibilityEnableUnifiedQueryConverter, + request.Query, + ); err != nil { + return nil, err + } + query = fmt.Sprintf("%s AND (%s)", scheduler.VisibilityBaseListQuery, request.Query) + } + + // Route to CHASM or V1 based on config (same pattern as ListSchedules) + if wh.chasmSchedulerEnabled(ctx, namespaceName.String()) { + return wh.countSchedulesChasm(ctx, namespaceID, namespaceName, query) + } + return wh.countSchedulesWorkflow(ctx, namespaceID, namespaceName, query) +} + +// countSchedulesChasm counts schedules using CHASM APIs +func (wh *WorkflowHandler) countSchedulesChasm( + ctx context.Context, + namespaceID namespace.ID, + namespaceName namespace.Name, + query string, +) (*workflowservice.CountSchedulesResponse, error) { + resp, err := chasm.CountExecutions[*chasmscheduler.Scheduler](ctx, &chasm.CountExecutionsRequest{ + NamespaceID: namespaceID.String(), + NamespaceName: namespaceName.String(), + Query: query, + }) + if err != nil { + return nil, err + } + + groups := make([]*workflowservice.CountSchedulesResponse_AggregationGroup, 0, len(resp.Groups)) + for _, g := range resp.Groups { + groups = append(groups, &workflowservice.CountSchedulesResponse_AggregationGroup{ + GroupValues: g.Values, + Count: g.Count, + }) + } + + return &workflowservice.CountSchedulesResponse{ + Count: resp.Count, + Groups: groups, + }, nil +} + +// countSchedulesWorkflow counts schedules using direct visibility query (V1 path) +func (wh *WorkflowHandler) countSchedulesWorkflow( + ctx context.Context, + namespaceID namespace.ID, + namespaceName namespace.Name, + query string, +) (*workflowservice.CountSchedulesResponse, error) { + persistenceResp, err := wh.visibilityMgr.CountWorkflowExecutions(ctx, &manager.CountWorkflowExecutionsRequest{ + NamespaceID: namespaceID, + Namespace: namespaceName, + Query: query, + }) + if err != nil { + return nil, err + } + + groups := make([]*workflowservice.CountSchedulesResponse_AggregationGroup, 0, len(persistenceResp.Groups)) + for _, g := range persistenceResp.Groups { + groups = append(groups, &workflowservice.CountSchedulesResponse_AggregationGroup{ + GroupValues: g.GroupValues, + Count: g.Count, + }) + } + + return &workflowservice.CountSchedulesResponse{ + Count: persistenceResp.Count, + Groups: groups, + }, nil +} + func (wh *WorkflowHandler) UpdateWorkflowExecution( ctx context.Context, request *workflowservice.UpdateWorkflowExecutionRequest, diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 20a5c7c9bb..cecae6cf68 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -1370,3 +1370,80 @@ func (s *scheduleFunctionalSuiteBase) cleanup(sid string) { }) }) } + +func (s *scheduleFunctionalSuiteBase) TestCountSchedules() { + // Create multiple schedules with different paused states + sidPrefix := "sched-test-count-" + wid := "sched-test-count-wf" + wt := "sched-test-count-wt" + + // Create 3 schedules: 2 active, 1 paused + for i := range 3 { + sid := fmt.Sprintf("%s%d", sidPrefix, i) + paused := i == 2 // Third schedule is paused + + schedule := &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + {Interval: durationpb.New(1 * time.Hour)}, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: fmt.Sprintf("%s-%d", wid, i), + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + }, + }, + }, + State: &schedulepb.ScheduleState{ + Paused: paused, + }, + } + + _, err := s.FrontendClient().CreateSchedule(s.newContext(), &workflowservice.CreateScheduleRequest{ + Namespace: s.Namespace().String(), + ScheduleId: sid, + Schedule: schedule, + Identity: "test", + RequestId: uuid.NewString(), + }) + s.NoError(err) + s.cleanup(sid) + } + + // Wait for schedules to appear in visibility + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + if err != nil { + return false + } + return countResp.Count >= 3 + }, 15*time.Second, 1*time.Second) + + // Test basic count (all schedules) + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + s.NoError(err) + s.GreaterOrEqual(countResp.Count, int64(3), "Expected at least 3 schedules") + + // Test count with query filter for paused schedules + countResp, err = s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = true", sadefs.TemporalSchedulePaused), + }) + s.NoError(err) + s.GreaterOrEqual(countResp.Count, int64(1), "Expected at least 1 paused schedule") + + // Test count with query filter for non-paused schedules + countResp, err = s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = false", sadefs.TemporalSchedulePaused), + }) + s.NoError(err) + s.GreaterOrEqual(countResp.Count, int64(2), "Expected at least 2 non-paused schedules") +} From a65cf0c7e8ca7671651ce034e1115d4d68d1e603 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 28 Jan 2026 14:47:05 -0800 Subject: [PATCH 2/5] PR feedback --- service/frontend/workflow_handler.go | 61 +++++++++++++++------------- tests/schedule_test.go | 14 +++++++ 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 438c144e0c..7b96715298 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4437,7 +4437,25 @@ func (wh *WorkflowHandler) ListSchedules( } chasmEnabled := wh.chasmSchedulerEnabled(ctx, namespaceName.String()) + query, err := wh.prepareSchedulerQuery(chasmEnabled, request.Query, namespaceName) + if err != nil { + return nil, err + } + + if chasmEnabled { + // CHASM ListSchedules will include schedules created in the V1/workflow stack. + return wh.listSchedulesChasm(ctx, request, namespaceName, namespaceID, query) + } + return wh.listSchedulesWorkflow(ctx, request, namespaceName, namespaceID, query) +} +// prepareSchedulerQuery validates a scheduler RPC's query argument, and wraps it +// in the appropriate base query. +func (wh *WorkflowHandler) prepareSchedulerQuery( + chasmEnabled bool, + query string, + namespaceName namespace.Name, +) (string, error) { // Use different base queries based on code path: // - CHASM path uses TemporalSystemExecutionStatus (translated via archetype ID) // - V1 path uses ExecutionStatus directly (no archetype ID available) @@ -4446,29 +4464,27 @@ func (wh *WorkflowHandler) ListSchedules( baseQuery = scheduler.VisibilityListQueryChasm } - query := baseQuery - if strings.TrimSpace(request.Query) != "" { + result := baseQuery + if strings.TrimSpace(query) != "" { saNameType, err := wh.saProvider.GetSearchAttributes(wh.visibilityMgr.GetIndexName(), false) if err != nil { - return nil, serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) + return "", serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) } + if err := scheduler.ValidateVisibilityQuery( namespaceName, saNameType, wh.saMapperProvider, wh.config.VisibilityEnableUnifiedQueryConverter, - request.Query, + query, ); err != nil { - return nil, err + return "", err } - query = fmt.Sprintf("%s AND (%s)", baseQuery, request.Query) - } - if chasmEnabled { - // CHASM ListSchedules will include schedules created in the V1/workflow stack. - return wh.listSchedulesChasm(ctx, request, namespaceName, namespaceID, query) + result = fmt.Sprintf("%s AND (%s)", baseQuery, query) } - return wh.listSchedulesWorkflow(ctx, request, namespaceName, namespaceID, query) + + return result, nil } func (wh *WorkflowHandler) listSchedulesChasm( @@ -4589,27 +4605,14 @@ func (wh *WorkflowHandler) CountSchedules( return nil, errListNotAllowed } - // Build query with base filter (same as ListSchedules) - query := scheduler.VisibilityBaseListQuery - if strings.TrimSpace(request.Query) != "" { - saNameType, err := wh.saProvider.GetSearchAttributes(wh.visibilityMgr.GetIndexName(), false) - if err != nil { - return nil, serviceerror.NewUnavailablef(errUnableToGetSearchAttributesMessage, err) - } - if err := scheduler.ValidateVisibilityQuery( - namespaceName, - saNameType, - wh.saMapperProvider, - wh.config.VisibilityEnableUnifiedQueryConverter, - request.Query, - ); err != nil { - return nil, err - } - query = fmt.Sprintf("%s AND (%s)", scheduler.VisibilityBaseListQuery, request.Query) + chasmEnabled := wh.chasmSchedulerEnabled(ctx, namespaceName.String()) + query, err := wh.prepareSchedulerQuery(chasmEnabled, request.Query, namespaceName) + if err != nil { + return nil, err } // Route to CHASM or V1 based on config (same pattern as ListSchedules) - if wh.chasmSchedulerEnabled(ctx, namespaceName.String()) { + if chasmEnabled { return wh.countSchedulesChasm(ctx, namespaceID, namespaceName, query) } return wh.countSchedulesWorkflow(ctx, namespaceID, namespaceName, query) diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 14af225317..e9ffd7d9b9 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -742,6 +742,13 @@ func (s *ScheduleV1FunctionalSuite) TestCHASMCanListV1Schedules() { }) s.NotNil(v1Entry.GetInfo()) + // Count with V1 handler. + v1CountResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + s.NoError(err) + s.GreaterOrEqual(v1CountResp.Count, int64(1), "Expected at least 1 schedule with V1 handler") + // Flip on CHASM experiment and make sure we can still list. s.newContext = func() context.Context { return metadata.NewOutgoingContext(testcore.NewContext(), metadata.Pairs( @@ -751,6 +758,13 @@ func (s *ScheduleV1FunctionalSuite) TestCHASMCanListV1Schedules() { chasmEntry := s.getScheduleEntryFomVisibility(sid, nil) s.NotNil(chasmEntry.GetInfo()) s.ProtoEqual(chasmEntry.GetInfo(), v1Entry.GetInfo()) + + // Count with CHASM handler and verify it matches V1 count. + chasmCountResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + s.NoError(err) + s.Equal(v1CountResp.Count, chasmCountResp.Count, "CHASM and V1 counts should match") } // TestRefresh applies to V1 scheduler only; V2 does not support/need manual refresh. From 8a77a8f150a0a9bfb6a76d591aeeb4aeeb1a4837 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 28 Jan 2026 15:16:44 -0800 Subject: [PATCH 3/5] bump api-go --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a660043654..8115f77262 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 + go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 9a46dfd672..f77d1fc2ee 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 h1:Rahqpgjqalbv28RLoOtnNNZvwtnes/sQP0+cisO70Hw= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74= +go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= From 8c2fdbda49a8fb875d888b69ee999405d0b784f8 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 28 Jan 2026 15:47:21 -0800 Subject: [PATCH 4/5] Add CountSchedules to API metadata and test fixtures The CountSchedules API was added to the workflowservice but was missing from the API metadata map and related test fixtures, causing unit test failures. --- common/api/metadata.go | 1 + common/rpc/interceptor/redirection_test.go | 1 + service/frontend/configs/quotas_test.go | 1 + 3 files changed, 3 insertions(+) diff --git a/common/api/metadata.go b/common/api/metadata.go index d730c8c609..669f558b29 100644 --- a/common/api/metadata.go +++ b/common/api/metadata.go @@ -129,6 +129,7 @@ var ( "ListScheduleMatchingTimes": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "DeleteSchedule": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "ListSchedules": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, + "CountSchedules": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "UpdateWorkerBuildIdCompatibility": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "GetWorkerBuildIdCompatibility": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "UpdateWorkerVersioningRules": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, diff --git a/common/rpc/interceptor/redirection_test.go b/common/rpc/interceptor/redirection_test.go index 86b745305d..7a65e3ad10 100644 --- a/common/rpc/interceptor/redirection_test.go +++ b/common/rpc/interceptor/redirection_test.go @@ -151,6 +151,7 @@ func (s *redirectionInterceptorSuite) TestGlobalAPI() { "PatchSchedule": {}, "DeleteSchedule": {}, "ListSchedules": {}, + "CountSchedules": {}, "ListScheduleMatchingTimes": {}, "UpdateWorkerBuildIdCompatibility": {}, "GetWorkerBuildIdCompatibility": {}, diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index 4d64ab6b53..0565aee95c 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -97,6 +97,7 @@ func (s *quotasSuite) TestVisibilityAPIs() { "/temporal.api.workflowservice.v1.WorkflowService/GetWorkerTaskReachability": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListSchedules": {}, + "/temporal.api.workflowservice.v1.WorkflowService/CountSchedules": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListBatchOperations": {}, "/temporal.api.workflowservice.v1.WorkflowService/DescribeTaskQueueWithReachability": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListDeployments": {}, From 97ac678d4a97fdc37c3f4c375f2087c61eda3936 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 28 Jan 2026 16:18:25 -0800 Subject: [PATCH 5/5] Wrap CountSchedules test assertions with Eventually Add Eventually helpers around query-filtered CountSchedules assertions to handle potential visibility indexing latency on Cassandra+ES backends. --- tests/schedule_test.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tests/schedule_test.go b/tests/schedule_test.go index e9ffd7d9b9..941ef7d746 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -1484,25 +1484,28 @@ func (s *scheduleFunctionalSuiteBase) TestCountSchedules() { }, 15*time.Second, 1*time.Second) // Test basic count (all schedules) - countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ - Namespace: s.Namespace().String(), - }) - s.NoError(err) - s.GreaterOrEqual(countResp.Count, int64(3), "Expected at least 3 schedules") + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + }) + return err == nil && countResp.Count >= 3 + }, 15*time.Second, 1*time.Second, "Expected at least 3 schedules") // Test count with query filter for paused schedules - countResp, err = s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ - Namespace: s.Namespace().String(), - Query: fmt.Sprintf("%s = true", sadefs.TemporalSchedulePaused), - }) - s.NoError(err) - s.GreaterOrEqual(countResp.Count, int64(1), "Expected at least 1 paused schedule") + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = true", sadefs.TemporalSchedulePaused), + }) + return err == nil && countResp.Count >= 1 + }, 15*time.Second, 1*time.Second, "Expected at least 1 paused schedule") // Test count with query filter for non-paused schedules - countResp, err = s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ - Namespace: s.Namespace().String(), - Query: fmt.Sprintf("%s = false", sadefs.TemporalSchedulePaused), - }) - s.NoError(err) - s.GreaterOrEqual(countResp.Count, int64(2), "Expected at least 2 non-paused schedules") + s.Eventually(func() bool { + countResp, err := s.FrontendClient().CountSchedules(s.newContext(), &workflowservice.CountSchedulesRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = false", sadefs.TemporalSchedulePaused), + }) + return err == nil && countResp.Count >= 2 + }, 15*time.Second, 1*time.Second, "Expected at least 2 non-paused schedules") }