From 5e3852722f5e09c0b4ae0b2a71317ce33eb42859 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Wed, 11 Jun 2025 19:35:53 -0700 Subject: [PATCH 01/10] Reset subcommand to update version --- temporalcli/commands.gen.go | 58 ++++++++++--- temporalcli/commands.workflow_reset.go | 17 +++- temporalcli/commands.workflow_reset_post.go | 86 +++++++++++++++++++ .../commands.workflow_reset_update_options.go | 68 +++++++++++++++ temporalcli/commandsgen/commands.yml | 23 +++++ 5 files changed, 236 insertions(+), 16 deletions(-) create mode 100644 temporalcli/commands.workflow_reset_post.go create mode 100644 temporalcli/commands.workflow_reset_update_options.go diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 921425e58..9978f4217 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -314,6 +314,18 @@ func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSe f.Var(&v.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.") } +type WorkflowUpdateOptionsOptions struct { + VersioningOverrideBehavior StringEnum + VersioningOverridePinnedVersion string +} + +func (v *WorkflowUpdateOptionsOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { + v.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") + f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") + _ = cobra.MarkFlagRequired(f, "versioning-override-behavior") + f.StringVar(&v.VersioningOverridePinnedVersion, "versioning-override-pinned-version", "", "Override Pinned Version for a Worker Deployment (Only for pinned).") +} + type TemporalCommand struct { Command cobra.Command Env string @@ -3395,8 +3407,7 @@ type TemporalWorkflowResetCommand struct { func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand { var s TemporalWorkflowResetCommand s.Parent = parent - s.Command.DisableFlagsInUseLine = true - s.Command.Use = "reset [flags]" + s.Command.Use = "reset" s.Command.Short = "Move Workflow Execution history point" if hasHighlighting { s.Command.Long = "Reset a Workflow Execution so it can resume from a point in its Event History\nwithout losing its progress up to that point:\n\n\x1b[1mtemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --event-id YourLastEvent\x1b[0m\n\nStart from where the Workflow Execution last continued as new:\n\n\x1b[1mtemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --type LastContinuedAsNew\x1b[0m\n\nFor batch resets, limit your resets to FirstWorkflowTask, LastWorkflowTask, or\nBuildId. Do not use Workflow IDs, run IDs, or event IDs with this command.\n\nVisit https://docs.temporal.io/visibility to read more about Search\nAttributes and Query creation." @@ -3404,21 +3415,40 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.Long = "Reset a Workflow Execution so it can resume from a point in its Event History\nwithout losing its progress up to that point:\n\n```\ntemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --event-id YourLastEvent\n```\n\nStart from where the Workflow Execution last continued as new:\n\n```\ntemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --type LastContinuedAsNew\n```\n\nFor batch resets, limit your resets to FirstWorkflowTask, LastWorkflowTask, or\nBuildId. Do not use Workflow IDs, run IDs, or event IDs with this command.\n\nVisit https://docs.temporal.io/visibility to read more about Search\nAttributes and Query creation." } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for non-batch reset operations.") - s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID.") - s.Command.Flags().IntVarP(&s.EventId, "event-id", "e", 0, "Event ID to reset to. Event must occur after `WorkflowTaskStarted`. `WorkflowTaskCompleted`, `WorkflowTaskFailed`, etc. are valid.") - s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for reset. Required.") - _ = cobra.MarkFlagRequired(s.Command.Flags(), "reason") + s.Command.AddCommand(&NewTemporalWorkflowResetWithWorkflowUpdateOptionsCommand(cctx, &s).Command) + s.Command.PersistentFlags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for non-batch reset operations.") + s.Command.PersistentFlags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID.") + s.Command.PersistentFlags().IntVarP(&s.EventId, "event-id", "e", 0, "Event ID to reset to. Event must occur after `WorkflowTaskStarted`. `WorkflowTaskCompleted`, `WorkflowTaskFailed`, etc. are valid.") + s.Command.PersistentFlags().StringVar(&s.Reason, "reason", "", "Reason for reset. Required.") + _ = cobra.MarkFlagRequired(s.Command.PersistentFlags(), "reason") s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All") - s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Types of events to re-apply after reset point. Accepted values: All, Signal, None.") - _ = s.Command.Flags().MarkDeprecated("reapply-type", "Use --reapply-exclude instead.") + s.Command.PersistentFlags().Var(&s.ReapplyType, "reapply-type", "Types of events to re-apply after reset point. Accepted values: All, Signal, None.") + _ = s.Command.PersistentFlags().MarkDeprecated("reapply-type", "Use --reapply-exclude instead.") s.ReapplyExclude = NewStringEnumArray([]string{"All", "Signal", "Update"}, []string{}) - s.Command.Flags().Var(&s.ReapplyExclude, "reapply-exclude", "Exclude these event types from re-application. Accepted values: All, Signal, Update.") + s.Command.PersistentFlags().Var(&s.ReapplyExclude, "reapply-exclude", "Exclude these event types from re-application. Accepted values: All, Signal, Update.") s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "") - s.Command.Flags().VarP(&s.Type, "type", "t", "The event type for the reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") - s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "A Build ID. Use only with the BuildId `--type`. Resets the first Workflow task processed by this ID. By default, this reset may be in a prior run, earlier than a Continue as New point.") - s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") - s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm. Only allowed when `--query` is present.") + s.Command.PersistentFlags().VarP(&s.Type, "type", "t", "The event type for the reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") + s.Command.PersistentFlags().StringVar(&s.BuildId, "build-id", "", "A Build ID. Use only with the BuildId `--type`. Resets the first Workflow task processed by this ID. By default, this reset may be in a prior run, earlier than a Continue as New point.") + s.Command.PersistentFlags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") + s.Command.PersistentFlags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm. Only allowed when `--query` is present.") + return &s +} + +type TemporalWorkflowResetWithWorkflowUpdateOptionsCommand struct { + Parent *TemporalWorkflowResetCommand + Command cobra.Command + WorkflowUpdateOptionsOptions +} + +func NewTemporalWorkflowResetWithWorkflowUpdateOptionsCommand(cctx *CommandContext, parent *TemporalWorkflowResetCommand) *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand { + var s TemporalWorkflowResetWithWorkflowUpdateOptionsCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "with-workflow-update-options [flags]" + s.Command.Short = "Update options on reset workflow" + s.Command.Long = "Run Workflow Update Options atomically after the Workflow is reset.\nWorkflows selected by the reset command are forwarded onto the subcommand." + s.Command.Args = cobra.NoArgs + s.WorkflowUpdateOptionsOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index a4284591b..10ed3738c 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + workflow "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" @@ -17,6 +18,7 @@ import ( ) func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.run()") validateArguments, doReset := c.getResetOperations() if err := validateArguments(); err != nil { return err @@ -66,7 +68,12 @@ func (c *TemporalWorkflowResetCommand) validateBatchResetArguments() error { return nil } func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl client.Client) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.doWorkflowReset()") + return c.doWorkflowResetWithPostOps(cctx, cl, nil) +} +func (c *TemporalWorkflowResetCommand) doWorkflowResetWithPostOps(cctx *CommandContext, cl client.Client, postOps []*workflow.PostResetOperation) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.doWorkflowResetWithPostOps(). postOps: %v", postOps) var err error resetBaseRunID := c.RunId eventID := int64(c.EventId) @@ -94,6 +101,7 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl WorkflowTaskFinishEventId: eventID, ResetReapplyType: reapplyType, ResetReapplyExcludeTypes: reapplyExcludes, + PostResetOperations: postOps, }) if err != nil { return fmt.Errorf("failed to reset workflow: %w", err) @@ -108,6 +116,10 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl } func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl client.Client) error { + return c.runBatchResetWithPostOps(cctx, cl, nil) +} + +func (c *TemporalWorkflowResetCommand) runBatchResetWithPostOps(cctx *CommandContext, cl client.Client, postOps []*workflow.PostResetOperation) error { request := workflowservice.StartBatchOperationRequest{ Namespace: c.Parent.Namespace, JobId: uuid.NewString(), @@ -121,8 +133,9 @@ func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl cl } request.Operation = &workflowservice.StartBatchOperationRequest_ResetOperation{ ResetOperation: &batch.BatchOperationReset{ - Identity: clientIdentity(), - Options: batchResetOptions, + Identity: clientIdentity(), + Options: batchResetOptions, + PostResetOperations: postOps, }, } count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) diff --git a/temporalcli/commands.workflow_reset_post.go b/temporalcli/commands.workflow_reset_post.go new file mode 100644 index 000000000..8e93ff141 --- /dev/null +++ b/temporalcli/commands.workflow_reset_post.go @@ -0,0 +1,86 @@ +package temporalcli + +// import ( +// "fmt" + +// workflowpb "go.temporal.io/api/workflow/v1" +// "google.golang.org/protobuf/types/known/fieldmaskpb" +// ) + +// func (c *TemporalWorkflowResetWithWorkflowSignalCommand) run(cctx *CommandContext, _ []string) error { +// validate, _ := c.Parent.getResetOperations() +// if err := validate(); err != nil { +// return err +// } +// cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) +// if err != nil { +// return err +// } +// defer cl.Close() +// input, err := c.PayloadInputOptions.buildRawInputPayloads() +// if err != nil { +// return err +// } +// op := &workflowpb.PostResetOperation{ +// Variant: &workflowpb.PostResetOperation_SignalWorkflow_{ +// SignalWorkflow: &workflowpb.PostResetOperation_SignalWorkflow{ +// SignalName: c.Name, +// Input: input, +// }, +// }, +// } +// if c.Parent.WorkflowId != "" { +// return c.Parent.doWorkflowResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) +// } +// return c.Parent.runBatchResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) +// } + +// func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, _ []string) error { +// if c.VersioningOverrideBehavior.Value == "pinned" && c.VersioningOverridePinnedVersion == "" { +// return fmt.Errorf("missing version with 'pinned' behavior") +// } +// if (c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade") && c.VersioningOverridePinnedVersion != "" { +// return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) +// } +// validate, _ := c.Parent.getResetOperations() +// if err := validate(); err != nil { +// return err +// } +// cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) +// if err != nil { +// return err +// } +// defer cl.Close() + +// behavior := workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_UNSPECIFIED +// switch c.VersioningOverrideBehavior.Value { +// case "unspecified": +// case "pinned": +// behavior = workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_PINNED +// case "auto_upgrade": +// behavior = workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_AUTO_UPGRADE +// default: +// return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) +// } +// mask, err := fieldmaskpb.New(&workflowpb.WorkflowExecutionOptions{}, "versioning_override") +// if err != nil { +// return fmt.Errorf("invalid field mask: %w", err) +// } +// op := &workflowpb.PostResetOperation{ +// Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ +// UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ +// WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ +// VersioningOverride: &workflowpb.VersioningOverride{ +// Behavior: behavior, +// PinnedVersion: c.VersioningOverridePinnedVersion, +// }, +// }, +// UpdateMask: mask, +// }, +// }, +// } +// if c.Parent.WorkflowId != "" { +// return c.Parent.doWorkflowResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) +// } +// return c.Parent.runBatchResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) +// } diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go new file mode 100644 index 000000000..219ec032e --- /dev/null +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -0,0 +1,68 @@ +package temporalcli + +import ( + "fmt" + + "go.temporal.io/api/enums/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetWithWorkflowUpdateOptionsCommand.run()\n") + validate, _ := c.Parent.getResetOperations() + if err := validate(); err != nil { + return err + } + + if c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade" { + if c.VersioningOverridePinnedVersion != "" { + return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) + } + } + if c.VersioningOverrideBehavior.Value == "pinned" && c.VersioningOverridePinnedVersion == "" { + return fmt.Errorf("missing version with 'pinned' behavior") + } + + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + behavior := enums.VERSIONING_BEHAVIOR_UNSPECIFIED + switch c.VersioningOverrideBehavior.Value { + case "unspecified": + case "pinned": + behavior = enums.VERSIONING_BEHAVIOR_PINNED + case "auto_upgrade": + behavior = enums.VERSIONING_BEHAVIOR_AUTO_UPGRADE + default: + return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) + } + + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } + + postOp := &workflowpb.PostResetOperation{ + Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ + UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: behavior, + PinnedVersion: c.VersioningOverridePinnedVersion, + }, + }, + UpdateMask: protoMask, + }, + }, + } + + if c.Parent.WorkflowId != "" { + return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + } + return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) +} diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 222c51546..c3cfa3e7d 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3506,6 +3506,14 @@ commands: Don't prompt to confirm. Only allowed when `--query` is present. + - name: temporal workflow reset with-workflow-update-options + summary: Update options on reset workflow + description: | + Run Workflow Update Options atomically after the Workflow is reset. + Workflows selected by the reset command are forwarded onto the subcommand. + option-sets: + - workflow-update-options + - name: temporal workflow result summary: Wait for and show the result of a Workflow Execution description: | @@ -4497,3 +4505,18 @@ option-sets: enum-values: - not_open - not_completed_cleanly + + - name: workflow-update-options + options: + - name: versioning-override-behavior + type: string-enum + description: | + Override the versioning behavior of a Workflow. + required: true + enum-values: + - unspecified + - pinned + - auto_upgrade + - name: versioning-override-pinned-version + type: string + description: Override Pinned Version for a Worker Deployment (Only for pinned). From 770ae2ffa8d32c4e2a2ad99ab05fc0e6c3bd16b4 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Wed, 11 Jun 2025 19:58:16 -0700 Subject: [PATCH 02/10] Add tests - Add validation tests for argument checking - Add functional tests for all versioning behaviors - Add integration tests for flag inheritance and different reset types --- temporalcli/commands.workflow_reset.go | 3 - .../commands.workflow_reset_update_options.go | 44 +- ...ands.workflow_reset_update_options_test.go | 417 ++++++++++++++++++ 3 files changed, 444 insertions(+), 20 deletions(-) create mode 100644 temporalcli/commands.workflow_reset_update_options_test.go diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index 10ed3738c..47112a2c0 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -18,7 +18,6 @@ import ( ) func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.run()") validateArguments, doReset := c.getResetOperations() if err := validateArguments(); err != nil { return err @@ -68,12 +67,10 @@ func (c *TemporalWorkflowResetCommand) validateBatchResetArguments() error { return nil } func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl client.Client) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.doWorkflowReset()") return c.doWorkflowResetWithPostOps(cctx, cl, nil) } func (c *TemporalWorkflowResetCommand) doWorkflowResetWithPostOps(cctx *CommandContext, cl client.Client, postOps []*workflow.PostResetOperation) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.doWorkflowResetWithPostOps(). postOps: %v", postOps) var err error resetBaseRunID := c.RunId eventID := int64(c.EventId) diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go index 219ec032e..d6f7e07d7 100644 --- a/temporalcli/commands.workflow_reset_update_options.go +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -9,7 +9,6 @@ import ( ) func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetWithWorkflowUpdateOptionsCommand.run()\n") validate, _ := c.Parent.getResetOperations() if err := validate(); err != nil { return err @@ -33,6 +32,8 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman behavior := enums.VERSIONING_BEHAVIOR_UNSPECIFIED switch c.VersioningOverrideBehavior.Value { case "unspecified": + // Leave as UNSPECIFIED, but the server may require an explicit behavior + behavior = enums.VERSIONING_BEHAVIOR_UNSPECIFIED case "pinned": behavior = enums.VERSIONING_BEHAVIOR_PINNED case "auto_upgrade": @@ -41,28 +42,37 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) } - var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions - protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") - if err != nil { - return fmt.Errorf("invalid field mask: %w", err) - } + // Only create the post-reset operation if we have a non-unspecified behavior or a pinned version + if behavior != enums.VERSIONING_BEHAVIOR_UNSPECIFIED || c.VersioningOverridePinnedVersion != "" { + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } - postOp := &workflowpb.PostResetOperation{ - Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ - UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: &workflowpb.VersioningOverride{ - Behavior: behavior, - PinnedVersion: c.VersioningOverridePinnedVersion, + postOp := &workflowpb.PostResetOperation{ + Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ + UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: behavior, + PinnedVersion: c.VersioningOverridePinnedVersion, + }, }, + UpdateMask: protoMask, }, - UpdateMask: protoMask, }, - }, + } + + if c.Parent.WorkflowId != "" { + return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + } + return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) } + // If unspecified with no version, just do a regular reset without post-ops if c.Parent.WorkflowId != "" { - return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + return c.Parent.doWorkflowResetWithPostOps(cctx, cl, nil) } - return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + return c.Parent.runBatchResetWithPostOps(cctx, cl, nil) } diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go new file mode 100644 index 000000000..b967e2f24 --- /dev/null +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -0,0 +1,417 @@ +package temporalcli_test + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" +) + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_MissingRequiredFlag() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "required flag") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_PinnedWithoutVersion() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + "--versioning-override-behavior", "pinned", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "missing version with 'pinned' behavior") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_AutoUpgradeWithVersion() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + "--versioning-override-behavior", "auto_upgrade", + "--versioning-override-pinned-version", "some-version", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "cannot set pinned version with") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_AutoUpgradeBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset with auto upgrade versioning behavior + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-with-auto-upgrade", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_UnspecifiedBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset with unspecified versioning behavior (should work by doing regular reset) + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-with-unspecified", + "--versioning-override-behavior", "unspecified", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_PinnedBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset with pinned versioning behavior and properly formatted version + pinnedVersion := "test-deployment.v1.0" + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-with-pinned-version", + "--versioning-override-behavior", "pinned", + "--versioning-override-pinned-version", pinnedVersion, + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 5*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow") +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_AutoUpgradeBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset batch with auto_upgrade versioning behavior + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-batch-reset-with-update-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for batch reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow from batch reset") +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_PinnedBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset batch with pinned versioning behavior and properly formatted version + pinnedVersion := "batch-deployment.v1.0" + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-batch-reset-with-pinned-version", + "--versioning-override-behavior", "pinned", + "--versioning-override-pinned-version", pinnedVersion, + ) + require.NoError(s.T(), res.Err) + + // Wait for batch reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 5*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow from batch reset") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_InheritsParentFlags() { + // Test that the subcommand inherits parent flags correctly + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-r", "test-run-id", + "-e", "10", + "--reason", "test-reset-with-inherited-flags", + "--versioning-override-behavior", "auto_upgrade", + "--reapply-exclude", "Signal", + ) + + // The command should fail because the workflow doesn't exist, but the error + // should be about the missing workflow, not about invalid flags + require.Error(s.T(), res.Err) + require.NotContains(s.T(), res.Err.Error(), "required flag") + require.NotContains(s.T(), res.Err.Error(), "invalid argument") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_WithLastWorkflowTask() { + var wfExecutions, activityExecutions int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityExecutions++ + return nil, nil + }) + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil) + wfExecutions++ + return nil, nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + var junk any + s.NoError(run.Get(s.Context, &junk)) + s.Equal(1, wfExecutions) + + // Reset to the last workflow task with update options + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "LastWorkflowTask", + "--reason", "test-reset-last-workflow-task-with-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should re-executed the workflow") + s.Equal(1, activityExecutions, "Should not have re-executed the activity") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_WithEventID() { + // Test that the new subcommand works with event ID reset type + var activityCount int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityCount++ + return a, nil + }) + + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + var res any + if err := workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, &res); err != nil { + return res, err + } + err := workflow.ExecuteActivity(ctx, DevActivity, 2).Get(ctx, &res) + return res, err + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + require.NoError(s.T(), err) + var ignored any + s.NoError(run.Get(s.Context, &ignored)) + s.Equal(2, activityCount) + + // Reset with event ID and update options + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-e", "3", // Use a known early event ID + "--reason", "test-reset-event-id-with-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 5*time.Second, 100*time.Millisecond) + + s.Greater(activityCount, 2, "Should have re-executed activities after reset") +} From 5096e5868b7f72d5927084f4325790360a186c8b Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Thu, 12 Jun 2025 12:34:21 -0700 Subject: [PATCH 03/10] Fix test --- ...commands.workflow_reset_update_options_test.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go index b967e2f24..e66db050c 100644 --- a/temporalcli/commands.workflow_reset_update_options_test.go +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -186,10 +186,19 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_P Query: "CustomKeywordField = '" + searchAttr + "'", }) s.NoError(err) - return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED - }, 5*time.Second, 100*time.Millisecond) + if len(resp.Executions) < 2 { // there should be two executions. + return false + } + resetRunID := resp.Executions[0].Execution.RunId // the first result is the reset execution. + descResult, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), resetRunID) + s.NoError(err) + s.NotNil(descResult) - s.Equal(2, wfExecutions, "Should have re-executed the workflow") + info := descResult.GetWorkflowExecutionInfo() + pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() + pinnedVersionOverrideString := pinnedVersionOverride.GetDeploymentName() + "." + pinnedVersionOverride.GetBuildId() + return pinnedVersionOverrideString == pinnedVersion // the second execution should have the pinned version override. + }, 5*time.Second, 100*time.Millisecond) } func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_AutoUpgradeBehavior() { From 3c4f4bcb88a5d58931dc3d31e1cdf56baed04871 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Thu, 12 Jun 2025 12:53:16 -0700 Subject: [PATCH 04/10] Cleanup --- temporalcli/commands.workflow_reset.go | 1 + temporalcli/commands.workflow_reset_post.go | 86 ------------------- .../commands.workflow_reset_update_options.go | 14 ++- 3 files changed, 6 insertions(+), 95 deletions(-) delete mode 100644 temporalcli/commands.workflow_reset_post.go diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index 47112a2c0..3c751f214 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -18,6 +18,7 @@ import ( ) func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.run()") validateArguments, doReset := c.getResetOperations() if err := validateArguments(); err != nil { return err diff --git a/temporalcli/commands.workflow_reset_post.go b/temporalcli/commands.workflow_reset_post.go deleted file mode 100644 index 8e93ff141..000000000 --- a/temporalcli/commands.workflow_reset_post.go +++ /dev/null @@ -1,86 +0,0 @@ -package temporalcli - -// import ( -// "fmt" - -// workflowpb "go.temporal.io/api/workflow/v1" -// "google.golang.org/protobuf/types/known/fieldmaskpb" -// ) - -// func (c *TemporalWorkflowResetWithWorkflowSignalCommand) run(cctx *CommandContext, _ []string) error { -// validate, _ := c.Parent.getResetOperations() -// if err := validate(); err != nil { -// return err -// } -// cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) -// if err != nil { -// return err -// } -// defer cl.Close() -// input, err := c.PayloadInputOptions.buildRawInputPayloads() -// if err != nil { -// return err -// } -// op := &workflowpb.PostResetOperation{ -// Variant: &workflowpb.PostResetOperation_SignalWorkflow_{ -// SignalWorkflow: &workflowpb.PostResetOperation_SignalWorkflow{ -// SignalName: c.Name, -// Input: input, -// }, -// }, -// } -// if c.Parent.WorkflowId != "" { -// return c.Parent.doWorkflowResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) -// } -// return c.Parent.runBatchResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) -// } - -// func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, _ []string) error { -// if c.VersioningOverrideBehavior.Value == "pinned" && c.VersioningOverridePinnedVersion == "" { -// return fmt.Errorf("missing version with 'pinned' behavior") -// } -// if (c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade") && c.VersioningOverridePinnedVersion != "" { -// return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) -// } -// validate, _ := c.Parent.getResetOperations() -// if err := validate(); err != nil { -// return err -// } -// cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) -// if err != nil { -// return err -// } -// defer cl.Close() - -// behavior := workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_UNSPECIFIED -// switch c.VersioningOverrideBehavior.Value { -// case "unspecified": -// case "pinned": -// behavior = workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_PINNED -// case "auto_upgrade": -// behavior = workflowpb.VersioningBehavior_VERSIONING_BEHAVIOR_AUTO_UPGRADE -// default: -// return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) -// } -// mask, err := fieldmaskpb.New(&workflowpb.WorkflowExecutionOptions{}, "versioning_override") -// if err != nil { -// return fmt.Errorf("invalid field mask: %w", err) -// } -// op := &workflowpb.PostResetOperation{ -// Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ -// UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ -// WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ -// VersioningOverride: &workflowpb.VersioningOverride{ -// Behavior: behavior, -// PinnedVersion: c.VersioningOverridePinnedVersion, -// }, -// }, -// UpdateMask: mask, -// }, -// }, -// } -// if c.Parent.WorkflowId != "" { -// return c.Parent.doWorkflowResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) -// } -// return c.Parent.runBatchResetWithOperations(cctx, cl, []*workflowpb.PostResetOperation{op}) -// } diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go index d6f7e07d7..033b337a4 100644 --- a/temporalcli/commands.workflow_reset_update_options.go +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -9,19 +9,18 @@ import ( ) func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { + cctx.Printer.Printlnf("Inside TemporalWorkflowResetWithWorkflowUpdateOptionsCommand.run()") validate, _ := c.Parent.getResetOperations() if err := validate(); err != nil { return err } - if c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade" { - if c.VersioningOverridePinnedVersion != "" { - return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) - } - } if c.VersioningOverrideBehavior.Value == "pinned" && c.VersioningOverridePinnedVersion == "" { return fmt.Errorf("missing version with 'pinned' behavior") } + if c.VersioningOverrideBehavior.Value != "pinned" && c.VersioningOverridePinnedVersion != "" { + return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) + } cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) if err != nil { @@ -29,11 +28,8 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman } defer cl.Close() - behavior := enums.VERSIONING_BEHAVIOR_UNSPECIFIED + var behavior enums.VersioningBehavior switch c.VersioningOverrideBehavior.Value { - case "unspecified": - // Leave as UNSPECIFIED, but the server may require an explicit behavior - behavior = enums.VERSIONING_BEHAVIOR_UNSPECIFIED case "pinned": behavior = enums.VERSIONING_BEHAVIOR_PINNED case "auto_upgrade": From 02dd916b392798ebddef414c71d445522f65ea8a Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Fri, 13 Jun 2025 11:55:28 -0700 Subject: [PATCH 05/10] Allow optional subcommands --- temporalcli/commands.gen.go | 8 ++++---- temporalcli/commandsgen/code.go | 4 ++-- temporalcli/commandsgen/commands.yml | 12 ++++++++---- temporalcli/commandsgen/parse.go | 1 + 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 9978f4217..4c48d0fbb 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -774,9 +774,9 @@ func NewTemporalConfigCommand(cctx *CommandContext, parent *TemporalCommand) *Te s.Command.Use = "config" s.Command.Short = "Manage config files (EXPERIMENTAL)" if hasHighlighting { - s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal. \n\n\x1b[1mtemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\x1b[0m\n\nThe default config file path is \x1b[1m$CONFIG_PATH/temporalio/temporal.toml\x1b[0m where\n\x1b[1m$CONFIG_PATH\x1b[0m is defined as \x1b[1m$HOME/.config\x1b[0m on Unix,\n\x1b[1m$HOME/Library/Application Support\x1b[0m on macOS, and \x1b[1m%AppData%\x1b[0m on Windows.\nThis can be overridden with the \x1b[1mTEMPORAL_CONFIG_FILE\x1b[0m environment\nvariable or \x1b[1m--config-file\x1b[0m.\n\nThe default profile is \x1b[1mdefault\x1b[0m. This can be overridden with the\n\x1b[1mTEMPORAL_PROFILE\x1b[0m environment variable or \x1b[1m--profile\x1b[0m." + s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal.\n\n\x1b[1mtemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\x1b[0m\n\nThe default config file path is \x1b[1m$CONFIG_PATH/temporalio/temporal.toml\x1b[0m where\n\x1b[1m$CONFIG_PATH\x1b[0m is defined as \x1b[1m$HOME/.config\x1b[0m on Unix,\n\x1b[1m$HOME/Library/Application Support\x1b[0m on macOS, and \x1b[1m%AppData%\x1b[0m on Windows.\nThis can be overridden with the \x1b[1mTEMPORAL_CONFIG_FILE\x1b[0m environment\nvariable or \x1b[1m--config-file\x1b[0m.\n\nThe default profile is \x1b[1mdefault\x1b[0m. This can be overridden with the\n\x1b[1mTEMPORAL_PROFILE\x1b[0m environment variable or \x1b[1m--profile\x1b[0m." } else { - s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal. \n\n```\ntemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\n```\n\nThe default config file path is `$CONFIG_PATH/temporalio/temporal.toml` where\n`$CONFIG_PATH` is defined as `$HOME/.config` on Unix,\n`$HOME/Library/Application Support` on macOS, and `%AppData%` on Windows.\nThis can be overridden with the `TEMPORAL_CONFIG_FILE` environment\nvariable or `--config-file`.\n\nThe default profile is `default`. This can be overridden with the\n`TEMPORAL_PROFILE` environment variable or `--profile`." + s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal.\n\n```\ntemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\n```\n\nThe default config file path is `$CONFIG_PATH/temporalio/temporal.toml` where\n`$CONFIG_PATH` is defined as `$HOME/.config` on Unix,\n`$HOME/Library/Application Support` on macOS, and `%AppData%` on Windows.\nThis can be overridden with the `TEMPORAL_CONFIG_FILE` environment\nvariable or `--config-file`.\n\nThe default profile is `default`. This can be overridden with the\n`TEMPORAL_PROFILE` environment variable or `--profile`." } s.Command.Args = cobra.NoArgs s.Command.AddCommand(&NewTemporalConfigDeleteCommand(cctx, &s).Command) @@ -3684,9 +3684,9 @@ func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent s.Command.Use = "start-update-with-start [flags]" s.Command.Short = "Send an Update-With-Start and wait for it to be accepted or rejected (Experimental)" if hasHighlighting { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running,\nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" } else { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running,\nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") diff --git a/temporalcli/commandsgen/code.go b/temporalcli/commandsgen/code.go index e51296d46..e2784f90f 100644 --- a/temporalcli/commandsgen/code.go +++ b/temporalcli/commandsgen/code.go @@ -261,8 +261,8 @@ func (c *Command) writeCode(w *codeWriter) error { } w.writeLinef("}))") } - // If there are no subcommands, we need a run function - if len(subCommands) == 0 { + // If there are no subcommands, or if subcommands are optional, we need a run function + if len(subCommands) == 0 || c.SubcommandsOptional { w.writeLinef("s.Command.Run = func(c *%v.Command, args []string) {", w.importCobra()) w.writeLinef("if err := s.run(cctx, args); err != nil {") w.writeLinef("cctx.Options.Fail(err)") diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index c3cfa3e7d..697fe6cdf 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -51,7 +51,8 @@ # * When commands have a single command-level option, include it the mandatory example. # * Use square bracket overviews to present how complex commands will be used. # * Yes: temporal operator [command] [subcommand] [options] -# Commands with subcommands can't be run on their own. +# Commands with subcommands can't be run on their own unless +# subcommands-optional is set to true. # Because of this, always use full command examples. # * Use square brackets to highlight optional elements, especially when long # descriptions would suffer from two very similar command invocations. @@ -94,6 +95,7 @@ # exact-args: Require this exact number of args. (int) # maximum-args: Require this maximum number of args. (int) # ignores-missing-env: Ignore missing environment variables. (bool) +# subcommands-optional: Allow command to be run even when it has subcommands. (bool) # options: A list of options. (Option[]) # - name: The option name. (string) # type: The option type. (string) @@ -483,7 +485,7 @@ commands: --reason YourReasonForTermination ``` - + Specify the Activity ID or Type and Workflow IDs: ``` @@ -689,7 +691,7 @@ commands: summary: Manage config files (EXPERIMENTAL) description: | Config files are TOML files that contain profiles, with each profile - containing configuration for connecting to Temporal. + containing configuration for connecting to Temporal. ``` temporal config set \ @@ -3858,7 +3860,7 @@ commands: summary: Send an Update-With-Start and wait for it to be accepted or rejected (Experimental) description: | Send a message to a Workflow Execution to invoke an Update handler, and wait for - the update to be accepted or rejected. If the Workflow Execution is not running, + the update to be accepted or rejected. If the Workflow Execution is not running, then a new workflow execution is started and the update is sent. Experimental. @@ -4520,3 +4522,5 @@ option-sets: - name: versioning-override-pinned-version type: string description: Override Pinned Version for a Worker Deployment (Only for pinned). + + diff --git a/temporalcli/commandsgen/parse.go b/temporalcli/commandsgen/parse.go index a93c69482..568e38fd7 100644 --- a/temporalcli/commandsgen/parse.go +++ b/temporalcli/commandsgen/parse.go @@ -48,6 +48,7 @@ type ( ExactArgs int `yaml:"exact-args"` MaximumArgs int `yaml:"maximum-args"` IgnoreMissingEnv bool `yaml:"ignores-missing-env"` + SubcommandsOptional bool `yaml:"subcommands-optional"` Options []Option `yaml:"options"` OptionSets []string `yaml:"option-sets"` Docs Docs `yaml:"docs"` From 56d3e4e3b0cb4b8d52a02ec78f736e5ae315344e Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Fri, 13 Jun 2025 14:44:21 -0700 Subject: [PATCH 06/10] Make subcommands optional for reset --- temporalcli/commands.gen.go | 5 ++ ...ands.workflow_reset_update_options_test.go | 46 ------------------- temporalcli/commandsgen/commands.yml | 1 + 3 files changed, 6 insertions(+), 46 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 4c48d0fbb..1b56a4e76 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -3431,6 +3431,11 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.PersistentFlags().StringVar(&s.BuildId, "build-id", "", "A Build ID. Use only with the BuildId `--type`. Resets the first Workflow task processed by this ID. By default, this reset may be in a prior run, earlier than a Continue as New point.") s.Command.PersistentFlags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") s.Command.PersistentFlags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm. Only allowed when `--query` is present.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } return &s } diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go index e66db050c..f459ed503 100644 --- a/temporalcli/commands.workflow_reset_update_options_test.go +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -98,52 +98,6 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_A s.Equal(2, wfExecutions, "Should have re-executed the workflow") } -func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_UnspecifiedBehavior() { - var wfExecutions int - s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { - wfExecutions++ - return "result", nil - }) - - // Start the workflow - searchAttr := "keyword-" + uuid.NewString() - run, err := s.Client.ExecuteWorkflow( - s.Context, - client.StartWorkflowOptions{ - TaskQueue: s.Worker().Options.TaskQueue, - SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, - }, - DevWorkflow, - "test-input", - ) - s.NoError(err) - var result any - s.NoError(run.Get(s.Context, &result)) - s.Equal(1, wfExecutions) - - // Reset with unspecified versioning behavior (should work by doing regular reset) - res := s.Execute( - "workflow", "reset", "with-workflow-update-options", - "--address", s.Address(), - "-w", run.GetID(), - "-t", "FirstWorkflowTask", - "--reason", "test-reset-with-unspecified", - "--versioning-override-behavior", "unspecified", - ) - require.NoError(s.T(), res.Err) - - // Wait for reset to complete - s.Eventually(func() bool { - resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ - Query: "CustomKeywordField = '" + searchAttr + "'", - }) - s.NoError(err) - return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED - }, 3*time.Second, 100*time.Millisecond) - - s.Equal(2, wfExecutions, "Should have re-executed the workflow") -} - func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_PinnedBehavior() { var wfExecutions int s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 697fe6cdf..fbb8da8c4 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3419,6 +3419,7 @@ commands: - name: temporal workflow reset summary: Move Workflow Execution history point + subcommands-optional: true description: | Reset a Workflow Execution so it can resume from a point in its Event History without losing its progress up to that point: From 11b7a6a90247f370d9759b13596b74df8debb0e2 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Mon, 16 Jun 2025 17:17:39 -0700 Subject: [PATCH 07/10] Cleanup --- temporalcli/commands.gen.go | 4 +- temporalcli/commands.workflow_reset.go | 1 - .../commands.workflow_reset_update_options.go | 44 +++++++------------ temporalcli/commandsgen/commands.yml | 1 - 4 files changed, 19 insertions(+), 31 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 1b56a4e76..fd82b3f9a 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -320,8 +320,8 @@ type WorkflowUpdateOptionsOptions struct { } func (v *WorkflowUpdateOptionsOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { - v.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") - f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") + v.VersioningOverrideBehavior = NewStringEnum([]string{"pinned", "auto_upgrade"}, "") + f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: pinned, auto_upgrade. Required.") _ = cobra.MarkFlagRequired(f, "versioning-override-behavior") f.StringVar(&v.VersioningOverridePinnedVersion, "versioning-override-pinned-version", "", "Override Pinned Version for a Worker Deployment (Only for pinned).") } diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index 3c751f214..47112a2c0 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -18,7 +18,6 @@ import ( ) func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetCommand.run()") validateArguments, doReset := c.getResetOperations() if err := validateArguments(); err != nil { return err diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go index 033b337a4..8854d2c9f 100644 --- a/temporalcli/commands.workflow_reset_update_options.go +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -9,7 +9,6 @@ import ( ) func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { - cctx.Printer.Printlnf("Inside TemporalWorkflowResetWithWorkflowUpdateOptionsCommand.run()") validate, _ := c.Parent.getResetOperations() if err := validate(); err != nil { return err @@ -35,40 +34,31 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman case "auto_upgrade": behavior = enums.VERSIONING_BEHAVIOR_AUTO_UPGRADE default: - return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) + return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) } - // Only create the post-reset operation if we have a non-unspecified behavior or a pinned version - if behavior != enums.VERSIONING_BEHAVIOR_UNSPECIFIED || c.VersioningOverridePinnedVersion != "" { - var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions - protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") - if err != nil { - return fmt.Errorf("invalid field mask: %w", err) - } + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } - postOp := &workflowpb.PostResetOperation{ - Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ - UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: &workflowpb.VersioningOverride{ - Behavior: behavior, - PinnedVersion: c.VersioningOverridePinnedVersion, - }, + postOp := &workflowpb.PostResetOperation{ + Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ + UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: behavior, + PinnedVersion: c.VersioningOverridePinnedVersion, }, - UpdateMask: protoMask, }, + UpdateMask: protoMask, }, - } - - if c.Parent.WorkflowId != "" { - return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) - } - return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + }, } - // If unspecified with no version, just do a regular reset without post-ops if c.Parent.WorkflowId != "" { - return c.Parent.doWorkflowResetWithPostOps(cctx, cl, nil) + return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) } - return c.Parent.runBatchResetWithPostOps(cctx, cl, nil) + return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) } diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index fbb8da8c4..7f31f3477 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -4517,7 +4517,6 @@ option-sets: Override the versioning behavior of a Workflow. required: true enum-values: - - unspecified - pinned - auto_upgrade - name: versioning-override-pinned-version From 6aebe9e10f3d88bf22f3ddf55a7db55655fc91f3 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Tue, 17 Jun 2025 21:14:55 -0700 Subject: [PATCH 08/10] Fix TestSharedServerSuite/TestWorkflow_ResetBatchWithWorkflowUpdateOptions_PinnedBehavior --- ...commands.workflow_reset_update_options_test.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go index f459ed503..8d6103d15 100644 --- a/temporalcli/commands.workflow_reset_update_options_test.go +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -245,10 +245,19 @@ func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_Pin Query: "CustomKeywordField = '" + searchAttr + "'", }) s.NoError(err) - return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED - }, 5*time.Second, 100*time.Millisecond) + if len(resp.Executions) < 2 { // there should be two executions. + return false + } + resetRunID := resp.Executions[0].Execution.RunId // the first result is the reset execution. + descResult, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), resetRunID) + s.NoError(err) + s.NotNil(descResult) - s.Equal(2, wfExecutions, "Should have re-executed the workflow from batch reset") + info := descResult.GetWorkflowExecutionInfo() + pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() + pinnedVersionOverrideString := pinnedVersionOverride.GetDeploymentName() + "." + pinnedVersionOverride.GetBuildId() + return pinnedVersionOverrideString == pinnedVersion // the second execution should have the pinned version override. + }, 5*time.Second, 100*time.Millisecond) } func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_InheritsParentFlags() { From 526b29c397eb67782a80854a75cda9222027e376 Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Wed, 9 Jul 2025 14:17:05 -0700 Subject: [PATCH 09/10] Fix versioning stuff --- temporalcli/commands.gen.go | 12 ++++--- .../commands.workflow_reset_update_options.go | 35 ++++++++++++------- ...ands.workflow_reset_update_options_test.go | 24 +++++++------ temporalcli/commandsgen/commands.yml | 11 ++++-- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index fd82b3f9a..2beb54f81 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -315,15 +315,17 @@ func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSe } type WorkflowUpdateOptionsOptions struct { - VersioningOverrideBehavior StringEnum - VersioningOverridePinnedVersion string + VersioningOverrideBehavior StringEnum + VersioningOverrideDeploymentName string + VersioningOverrideBuildId string } func (v *WorkflowUpdateOptionsOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { - v.VersioningOverrideBehavior = NewStringEnum([]string{"pinned", "auto_upgrade"}, "") - f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: pinned, auto_upgrade. Required.") + v.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") + f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") _ = cobra.MarkFlagRequired(f, "versioning-override-behavior") - f.StringVar(&v.VersioningOverridePinnedVersion, "versioning-override-pinned-version", "", "Override Pinned Version for a Worker Deployment (Only for pinned).") + f.StringVar(&v.VersioningOverrideDeploymentName, "versioning-override-deployment-name", "", "When overriding to a `pinned` behavior, specifies the Deployment Name of the version to target.") + f.StringVar(&v.VersioningOverrideBuildId, "versioning-override-build-id", "", "When overriding to a `pinned` behavior, specifies the Build ID of the version to target.") } type TemporalCommand struct { diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go index 8854d2c9f..102f6e55b 100644 --- a/temporalcli/commands.workflow_reset_update_options.go +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -3,7 +3,7 @@ package temporalcli import ( "fmt" - "go.temporal.io/api/enums/v1" + deploymentpb "go.temporal.io/api/deployment/v1" workflowpb "go.temporal.io/api/workflow/v1" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -14,11 +14,15 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman return err } - if c.VersioningOverrideBehavior.Value == "pinned" && c.VersioningOverridePinnedVersion == "" { - return fmt.Errorf("missing version with 'pinned' behavior") + if c.VersioningOverrideBehavior.Value == "pinned" { + if c.VersioningOverrideDeploymentName == "" || c.VersioningOverrideBuildId == "" { + return fmt.Errorf("deployment name and build id are required with 'pinned' behavior") + } } - if c.VersioningOverrideBehavior.Value != "pinned" && c.VersioningOverridePinnedVersion != "" { - return fmt.Errorf("cannot set pinned version with %v behavior", c.VersioningOverrideBehavior) + if c.VersioningOverrideBehavior.Value != "pinned" { + if c.VersioningOverrideDeploymentName != "" || c.VersioningOverrideBuildId != "" { + return fmt.Errorf("cannot set deployment name or build id with %v behavior", c.VersioningOverrideBehavior.Value) + } } cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) @@ -27,12 +31,22 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman } defer cl.Close() - var behavior enums.VersioningBehavior + VersioningOverride := &workflowpb.VersioningOverride{} switch c.VersioningOverrideBehavior.Value { case "pinned": - behavior = enums.VERSIONING_BEHAVIOR_PINNED + VersioningOverride.Override = &workflowpb.VersioningOverride_Pinned{ + Pinned: &workflowpb.VersioningOverride_PinnedOverride{ + Behavior: workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED, + Version: &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: c.VersioningOverrideDeploymentName, + BuildId: c.VersioningOverrideBuildId, + }, + }, + } case "auto_upgrade": - behavior = enums.VERSIONING_BEHAVIOR_AUTO_UPGRADE + VersioningOverride.Override = &workflowpb.VersioningOverride_AutoUpgrade{ + AutoUpgrade: true, + } default: return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) } @@ -47,10 +61,7 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: &workflowpb.VersioningOverride{ - Behavior: behavior, - PinnedVersion: c.VersioningOverridePinnedVersion, - }, + VersioningOverride: VersioningOverride, }, UpdateMask: protoMask, }, diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go index 8d6103d15..d031110fb 100644 --- a/temporalcli/commands.workflow_reset_update_options_test.go +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -35,7 +35,7 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Validate "--versioning-override-behavior", "pinned", ) require.Error(s.T(), res.Err) - require.Contains(s.T(), res.Err.Error(), "missing version with 'pinned' behavior") + require.Contains(s.T(), res.Err.Error(), "deployment name and build id are required with 'pinned' behavior") } func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_AutoUpgradeWithVersion() { @@ -46,10 +46,10 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Validate "-t", "FirstWorkflowTask", "--reason", "test-reset", "--versioning-override-behavior", "auto_upgrade", - "--versioning-override-pinned-version", "some-version", + "--versioning-override-deployment-name", "some-deployment", ) require.Error(s.T(), res.Err) - require.Contains(s.T(), res.Err.Error(), "cannot set pinned version with") + require.Contains(s.T(), res.Err.Error(), "cannot set deployment name or build id with auto_upgrade behavior") } func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_AutoUpgradeBehavior() { @@ -122,7 +122,8 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_P s.Equal(1, wfExecutions) // Reset with pinned versioning behavior and properly formatted version - pinnedVersion := "test-deployment.v1.0" + pinnedDeploymentName := "test-deployment" + pinnedBuildId := "v1.0" res := s.Execute( "workflow", "reset", "with-workflow-update-options", "--address", s.Address(), @@ -130,7 +131,8 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_P "-t", "FirstWorkflowTask", "--reason", "test-reset-with-pinned-version", "--versioning-override-behavior", "pinned", - "--versioning-override-pinned-version", pinnedVersion, + "--versioning-override-deployment-name", pinnedDeploymentName, + "--versioning-override-build-id", pinnedBuildId, ) require.NoError(s.T(), res.Err) @@ -150,8 +152,7 @@ func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_P info := descResult.GetWorkflowExecutionInfo() pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() - pinnedVersionOverrideString := pinnedVersionOverride.GetDeploymentName() + "." + pinnedVersionOverride.GetBuildId() - return pinnedVersionOverrideString == pinnedVersion // the second execution should have the pinned version override. + return pinnedVersionOverride.GetDeploymentName() == pinnedDeploymentName && pinnedVersionOverride.GetBuildId() == pinnedBuildId }, 5*time.Second, 100*time.Millisecond) } @@ -226,7 +227,8 @@ func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_Pin s.Equal(1, wfExecutions) // Reset batch with pinned versioning behavior and properly formatted version - pinnedVersion := "batch-deployment.v1.0" + pinnedDeploymentName := "batch-deployment" + pinnedBuildId := "v1.0" s.CommandHarness.Stdin.WriteString("y\n") res := s.Execute( "workflow", "reset", "with-workflow-update-options", @@ -235,7 +237,8 @@ func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_Pin "-t", "FirstWorkflowTask", "--reason", "test-batch-reset-with-pinned-version", "--versioning-override-behavior", "pinned", - "--versioning-override-pinned-version", pinnedVersion, + "--versioning-override-deployment-name", pinnedDeploymentName, + "--versioning-override-build-id", pinnedBuildId, ) require.NoError(s.T(), res.Err) @@ -255,8 +258,7 @@ func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_Pin info := descResult.GetWorkflowExecutionInfo() pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() - pinnedVersionOverrideString := pinnedVersionOverride.GetDeploymentName() + "." + pinnedVersionOverride.GetBuildId() - return pinnedVersionOverrideString == pinnedVersion // the second execution should have the pinned version override. + return pinnedVersionOverride.GetDeploymentName() == pinnedDeploymentName && pinnedVersionOverride.GetBuildId() == pinnedBuildId }, 5*time.Second, 100*time.Millisecond) } diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 7f31f3477..4864ba0d9 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -4517,10 +4517,15 @@ option-sets: Override the versioning behavior of a Workflow. required: true enum-values: + - unspecified - pinned - auto_upgrade - - name: versioning-override-pinned-version + - name: versioning-override-deployment-name type: string - description: Override Pinned Version for a Worker Deployment (Only for pinned). - + description: When overriding to a `pinned` behavior, specifies the Deployment Name of the + version to target. + - name: versioning-override-build-id + type: string + description: When overriding to a `pinned` behavior, specifies the Build ID of the + version to target. From 95b68851b6dc26bf912edb932774415a7aaf38de Mon Sep 17 00:00:00 2001 From: Chetan Gowda Date: Wed, 9 Jul 2025 14:32:13 -0700 Subject: [PATCH 10/10] Remove unspecified value --- temporalcli/commands.gen.go | 4 ++-- temporalcli/commands.workflow_reset_update_options.go | 2 +- temporalcli/commandsgen/commands.yml | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 2beb54f81..1e5a68d7a 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -321,8 +321,8 @@ type WorkflowUpdateOptionsOptions struct { } func (v *WorkflowUpdateOptionsOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { - v.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") - f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") + v.VersioningOverrideBehavior = NewStringEnum([]string{"pinned", "auto_upgrade"}, "") + f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: pinned, auto_upgrade. Required.") _ = cobra.MarkFlagRequired(f, "versioning-override-behavior") f.StringVar(&v.VersioningOverrideDeploymentName, "versioning-override-deployment-name", "", "When overriding to a `pinned` behavior, specifies the Deployment Name of the version to target.") f.StringVar(&v.VersioningOverrideBuildId, "versioning-override-build-id", "", "When overriding to a `pinned` behavior, specifies the Build ID of the version to target.") diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go index 102f6e55b..3c24a3218 100644 --- a/temporalcli/commands.workflow_reset_update_options.go +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -48,7 +48,7 @@ func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *Comman AutoUpgrade: true, } default: - return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior) + return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior.Value) } var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 4864ba0d9..689f807e3 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -4517,7 +4517,6 @@ option-sets: Override the versioning behavior of a Workflow. required: true enum-values: - - unspecified - pinned - auto_upgrade - name: versioning-override-deployment-name