diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 921425e58..1e5a68d7a 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -314,6 +314,20 @@ func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSe f.Var(&v.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.") } +type WorkflowUpdateOptionsOptions struct { + VersioningOverrideBehavior StringEnum + VersioningOverrideDeploymentName string + VersioningOverrideBuildId string +} + +func (v *WorkflowUpdateOptionsOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { + v.VersioningOverrideBehavior = NewStringEnum([]string{"pinned", "auto_upgrade"}, "") + f.Var(&v.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: pinned, auto_upgrade. Required.") + _ = cobra.MarkFlagRequired(f, "versioning-override-behavior") + f.StringVar(&v.VersioningOverrideDeploymentName, "versioning-override-deployment-name", "", "When overriding to a `pinned` behavior, specifies the Deployment Name of the version to target.") + f.StringVar(&v.VersioningOverrideBuildId, "versioning-override-build-id", "", "When overriding to a `pinned` behavior, specifies the Build ID of the version to target.") +} + type TemporalCommand struct { Command cobra.Command Env string @@ -762,9 +776,9 @@ func NewTemporalConfigCommand(cctx *CommandContext, parent *TemporalCommand) *Te s.Command.Use = "config" s.Command.Short = "Manage config files (EXPERIMENTAL)" if hasHighlighting { - s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal. \n\n\x1b[1mtemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\x1b[0m\n\nThe default config file path is \x1b[1m$CONFIG_PATH/temporalio/temporal.toml\x1b[0m where\n\x1b[1m$CONFIG_PATH\x1b[0m is defined as \x1b[1m$HOME/.config\x1b[0m on Unix,\n\x1b[1m$HOME/Library/Application Support\x1b[0m on macOS, and \x1b[1m%AppData%\x1b[0m on Windows.\nThis can be overridden with the \x1b[1mTEMPORAL_CONFIG_FILE\x1b[0m environment\nvariable or \x1b[1m--config-file\x1b[0m.\n\nThe default profile is \x1b[1mdefault\x1b[0m. This can be overridden with the\n\x1b[1mTEMPORAL_PROFILE\x1b[0m environment variable or \x1b[1m--profile\x1b[0m." + s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal.\n\n\x1b[1mtemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\x1b[0m\n\nThe default config file path is \x1b[1m$CONFIG_PATH/temporalio/temporal.toml\x1b[0m where\n\x1b[1m$CONFIG_PATH\x1b[0m is defined as \x1b[1m$HOME/.config\x1b[0m on Unix,\n\x1b[1m$HOME/Library/Application Support\x1b[0m on macOS, and \x1b[1m%AppData%\x1b[0m on Windows.\nThis can be overridden with the \x1b[1mTEMPORAL_CONFIG_FILE\x1b[0m environment\nvariable or \x1b[1m--config-file\x1b[0m.\n\nThe default profile is \x1b[1mdefault\x1b[0m. This can be overridden with the\n\x1b[1mTEMPORAL_PROFILE\x1b[0m environment variable or \x1b[1m--profile\x1b[0m." } else { - s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal. \n\n```\ntemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\n```\n\nThe default config file path is `$CONFIG_PATH/temporalio/temporal.toml` where\n`$CONFIG_PATH` is defined as `$HOME/.config` on Unix,\n`$HOME/Library/Application Support` on macOS, and `%AppData%` on Windows.\nThis can be overridden with the `TEMPORAL_CONFIG_FILE` environment\nvariable or `--config-file`.\n\nThe default profile is `default`. This can be overridden with the\n`TEMPORAL_PROFILE` environment variable or `--profile`." + s.Command.Long = "Config files are TOML files that contain profiles, with each profile\ncontaining configuration for connecting to Temporal.\n\n```\ntemporal config set \\\n --prop address \\\n --value us-west-2.aws.api.temporal.io:7233\n```\n\nThe default config file path is `$CONFIG_PATH/temporalio/temporal.toml` where\n`$CONFIG_PATH` is defined as `$HOME/.config` on Unix,\n`$HOME/Library/Application Support` on macOS, and `%AppData%` on Windows.\nThis can be overridden with the `TEMPORAL_CONFIG_FILE` environment\nvariable or `--config-file`.\n\nThe default profile is `default`. This can be overridden with the\n`TEMPORAL_PROFILE` environment variable or `--profile`." } s.Command.Args = cobra.NoArgs s.Command.AddCommand(&NewTemporalConfigDeleteCommand(cctx, &s).Command) @@ -3395,8 +3409,7 @@ type TemporalWorkflowResetCommand struct { func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand { var s TemporalWorkflowResetCommand s.Parent = parent - s.Command.DisableFlagsInUseLine = true - s.Command.Use = "reset [flags]" + s.Command.Use = "reset" s.Command.Short = "Move Workflow Execution history point" if hasHighlighting { s.Command.Long = "Reset a Workflow Execution so it can resume from a point in its Event History\nwithout losing its progress up to that point:\n\n\x1b[1mtemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --event-id YourLastEvent\x1b[0m\n\nStart from where the Workflow Execution last continued as new:\n\n\x1b[1mtemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --type LastContinuedAsNew\x1b[0m\n\nFor batch resets, limit your resets to FirstWorkflowTask, LastWorkflowTask, or\nBuildId. Do not use Workflow IDs, run IDs, or event IDs with this command.\n\nVisit https://docs.temporal.io/visibility to read more about Search\nAttributes and Query creation." @@ -3404,21 +3417,45 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.Long = "Reset a Workflow Execution so it can resume from a point in its Event History\nwithout losing its progress up to that point:\n\n```\ntemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --event-id YourLastEvent\n```\n\nStart from where the Workflow Execution last continued as new:\n\n```\ntemporal workflow reset \\\n --workflow-id YourWorkflowId \\\n --type LastContinuedAsNew\n```\n\nFor batch resets, limit your resets to FirstWorkflowTask, LastWorkflowTask, or\nBuildId. Do not use Workflow IDs, run IDs, or event IDs with this command.\n\nVisit https://docs.temporal.io/visibility to read more about Search\nAttributes and Query creation." } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for non-batch reset operations.") - s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID.") - s.Command.Flags().IntVarP(&s.EventId, "event-id", "e", 0, "Event ID to reset to. Event must occur after `WorkflowTaskStarted`. `WorkflowTaskCompleted`, `WorkflowTaskFailed`, etc. are valid.") - s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for reset. Required.") - _ = cobra.MarkFlagRequired(s.Command.Flags(), "reason") + s.Command.AddCommand(&NewTemporalWorkflowResetWithWorkflowUpdateOptionsCommand(cctx, &s).Command) + s.Command.PersistentFlags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for non-batch reset operations.") + s.Command.PersistentFlags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID.") + s.Command.PersistentFlags().IntVarP(&s.EventId, "event-id", "e", 0, "Event ID to reset to. Event must occur after `WorkflowTaskStarted`. `WorkflowTaskCompleted`, `WorkflowTaskFailed`, etc. are valid.") + s.Command.PersistentFlags().StringVar(&s.Reason, "reason", "", "Reason for reset. Required.") + _ = cobra.MarkFlagRequired(s.Command.PersistentFlags(), "reason") s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All") - s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Types of events to re-apply after reset point. Accepted values: All, Signal, None.") - _ = s.Command.Flags().MarkDeprecated("reapply-type", "Use --reapply-exclude instead.") + s.Command.PersistentFlags().Var(&s.ReapplyType, "reapply-type", "Types of events to re-apply after reset point. Accepted values: All, Signal, None.") + _ = s.Command.PersistentFlags().MarkDeprecated("reapply-type", "Use --reapply-exclude instead.") s.ReapplyExclude = NewStringEnumArray([]string{"All", "Signal", "Update"}, []string{}) - s.Command.Flags().Var(&s.ReapplyExclude, "reapply-exclude", "Exclude these event types from re-application. Accepted values: All, Signal, Update.") + s.Command.PersistentFlags().Var(&s.ReapplyExclude, "reapply-exclude", "Exclude these event types from re-application. Accepted values: All, Signal, Update.") s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "") - s.Command.Flags().VarP(&s.Type, "type", "t", "The event type for the reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") - s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "A Build ID. Use only with the BuildId `--type`. Resets the first Workflow task processed by this ID. By default, this reset may be in a prior run, earlier than a Continue as New point.") - s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") - s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm. Only allowed when `--query` is present.") + s.Command.PersistentFlags().VarP(&s.Type, "type", "t", "The event type for the reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") + s.Command.PersistentFlags().StringVar(&s.BuildId, "build-id", "", "A Build ID. Use only with the BuildId `--type`. Resets the first Workflow task processed by this ID. By default, this reset may be in a prior run, earlier than a Continue as New point.") + s.Command.PersistentFlags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") + s.Command.PersistentFlags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm. Only allowed when `--query` is present.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkflowResetWithWorkflowUpdateOptionsCommand struct { + Parent *TemporalWorkflowResetCommand + Command cobra.Command + WorkflowUpdateOptionsOptions +} + +func NewTemporalWorkflowResetWithWorkflowUpdateOptionsCommand(cctx *CommandContext, parent *TemporalWorkflowResetCommand) *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand { + var s TemporalWorkflowResetWithWorkflowUpdateOptionsCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "with-workflow-update-options [flags]" + s.Command.Short = "Update options on reset workflow" + s.Command.Long = "Run Workflow Update Options atomically after the Workflow is reset.\nWorkflows selected by the reset command are forwarded onto the subcommand." + s.Command.Args = cobra.NoArgs + s.WorkflowUpdateOptionsOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -3654,9 +3691,9 @@ func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent s.Command.Use = "start-update-with-start [flags]" s.Command.Short = "Send an Update-With-Start and wait for it to be accepted or rejected (Experimental)" if hasHighlighting { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running,\nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" } else { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running,\nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index a4284591b..47112a2c0 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + workflow "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" @@ -66,7 +67,10 @@ func (c *TemporalWorkflowResetCommand) validateBatchResetArguments() error { return nil } func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl client.Client) error { + return c.doWorkflowResetWithPostOps(cctx, cl, nil) +} +func (c *TemporalWorkflowResetCommand) doWorkflowResetWithPostOps(cctx *CommandContext, cl client.Client, postOps []*workflow.PostResetOperation) error { var err error resetBaseRunID := c.RunId eventID := int64(c.EventId) @@ -94,6 +98,7 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl WorkflowTaskFinishEventId: eventID, ResetReapplyType: reapplyType, ResetReapplyExcludeTypes: reapplyExcludes, + PostResetOperations: postOps, }) if err != nil { return fmt.Errorf("failed to reset workflow: %w", err) @@ -108,6 +113,10 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl } func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl client.Client) error { + return c.runBatchResetWithPostOps(cctx, cl, nil) +} + +func (c *TemporalWorkflowResetCommand) runBatchResetWithPostOps(cctx *CommandContext, cl client.Client, postOps []*workflow.PostResetOperation) error { request := workflowservice.StartBatchOperationRequest{ Namespace: c.Parent.Namespace, JobId: uuid.NewString(), @@ -121,8 +130,9 @@ func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl cl } request.Operation = &workflowservice.StartBatchOperationRequest_ResetOperation{ ResetOperation: &batch.BatchOperationReset{ - Identity: clientIdentity(), - Options: batchResetOptions, + Identity: clientIdentity(), + Options: batchResetOptions, + PostResetOperations: postOps, }, } count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) diff --git a/temporalcli/commands.workflow_reset_update_options.go b/temporalcli/commands.workflow_reset_update_options.go new file mode 100644 index 000000000..3c24a3218 --- /dev/null +++ b/temporalcli/commands.workflow_reset_update_options.go @@ -0,0 +1,75 @@ +package temporalcli + +import ( + "fmt" + + deploymentpb "go.temporal.io/api/deployment/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +func (c *TemporalWorkflowResetWithWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { + validate, _ := c.Parent.getResetOperations() + if err := validate(); err != nil { + return err + } + + if c.VersioningOverrideBehavior.Value == "pinned" { + if c.VersioningOverrideDeploymentName == "" || c.VersioningOverrideBuildId == "" { + return fmt.Errorf("deployment name and build id are required with 'pinned' behavior") + } + } + if c.VersioningOverrideBehavior.Value != "pinned" { + if c.VersioningOverrideDeploymentName != "" || c.VersioningOverrideBuildId != "" { + return fmt.Errorf("cannot set deployment name or build id with %v behavior", c.VersioningOverrideBehavior.Value) + } + } + + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + VersioningOverride := &workflowpb.VersioningOverride{} + switch c.VersioningOverrideBehavior.Value { + case "pinned": + VersioningOverride.Override = &workflowpb.VersioningOverride_Pinned{ + Pinned: &workflowpb.VersioningOverride_PinnedOverride{ + Behavior: workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED, + Version: &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: c.VersioningOverrideDeploymentName, + BuildId: c.VersioningOverrideBuildId, + }, + }, + } + case "auto_upgrade": + VersioningOverride.Override = &workflowpb.VersioningOverride_AutoUpgrade{ + AutoUpgrade: true, + } + default: + return fmt.Errorf("invalid deployment behavior: %v, valid values are: 'pinned', and 'auto_upgrade'", c.VersioningOverrideBehavior.Value) + } + + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } + + postOp := &workflowpb.PostResetOperation{ + Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ + UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: VersioningOverride, + }, + UpdateMask: protoMask, + }, + }, + } + + if c.Parent.WorkflowId != "" { + return c.Parent.doWorkflowResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) + } + return c.Parent.runBatchResetWithPostOps(cctx, cl, []*workflowpb.PostResetOperation{postOp}) +} diff --git a/temporalcli/commands.workflow_reset_update_options_test.go b/temporalcli/commands.workflow_reset_update_options_test.go new file mode 100644 index 000000000..d031110fb --- /dev/null +++ b/temporalcli/commands.workflow_reset_update_options_test.go @@ -0,0 +1,391 @@ +package temporalcli_test + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" +) + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_MissingRequiredFlag() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "required flag") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_PinnedWithoutVersion() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + "--versioning-override-behavior", "pinned", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "deployment name and build id are required with 'pinned' behavior") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_ValidatesArguments_AutoUpgradeWithVersion() { + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-t", "FirstWorkflowTask", + "--reason", "test-reset", + "--versioning-override-behavior", "auto_upgrade", + "--versioning-override-deployment-name", "some-deployment", + ) + require.Error(s.T(), res.Err) + require.Contains(s.T(), res.Err.Error(), "cannot set deployment name or build id with auto_upgrade behavior") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_AutoUpgradeBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset with auto upgrade versioning behavior + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-with-auto-upgrade", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_Single_PinnedBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset with pinned versioning behavior and properly formatted version + pinnedDeploymentName := "test-deployment" + pinnedBuildId := "v1.0" + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-with-pinned-version", + "--versioning-override-behavior", "pinned", + "--versioning-override-deployment-name", pinnedDeploymentName, + "--versioning-override-build-id", pinnedBuildId, + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + if len(resp.Executions) < 2 { // there should be two executions. + return false + } + resetRunID := resp.Executions[0].Execution.RunId // the first result is the reset execution. + descResult, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), resetRunID) + s.NoError(err) + s.NotNil(descResult) + + info := descResult.GetWorkflowExecutionInfo() + pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() + return pinnedVersionOverride.GetDeploymentName() == pinnedDeploymentName && pinnedVersionOverride.GetBuildId() == pinnedBuildId + }, 5*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_AutoUpgradeBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset batch with auto_upgrade versioning behavior + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-batch-reset-with-update-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for batch reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should have re-executed the workflow from batch reset") +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatchWithWorkflowUpdateOptions_PinnedBehavior() { + var wfExecutions int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + wfExecutions++ + return "result", nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "test-input", + ) + s.NoError(err) + var result any + s.NoError(run.Get(s.Context, &result)) + s.Equal(1, wfExecutions) + + // Reset batch with pinned versioning behavior and properly formatted version + pinnedDeploymentName := "batch-deployment" + pinnedBuildId := "v1.0" + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-batch-reset-with-pinned-version", + "--versioning-override-behavior", "pinned", + "--versioning-override-deployment-name", pinnedDeploymentName, + "--versioning-override-build-id", pinnedBuildId, + ) + require.NoError(s.T(), res.Err) + + // Wait for batch reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + if len(resp.Executions) < 2 { // there should be two executions. + return false + } + resetRunID := resp.Executions[0].Execution.RunId // the first result is the reset execution. + descResult, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), resetRunID) + s.NoError(err) + s.NotNil(descResult) + + info := descResult.GetWorkflowExecutionInfo() + pinnedVersionOverride := info.VersioningInfo.VersioningOverride.GetPinned().GetVersion() + return pinnedVersionOverride.GetDeploymentName() == pinnedDeploymentName && pinnedVersionOverride.GetBuildId() == pinnedBuildId + }, 5*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_InheritsParentFlags() { + // Test that the subcommand inherits parent flags correctly + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", "test-workflow", + "-r", "test-run-id", + "-e", "10", + "--reason", "test-reset-with-inherited-flags", + "--versioning-override-behavior", "auto_upgrade", + "--reapply-exclude", "Signal", + ) + + // The command should fail because the workflow doesn't exist, but the error + // should be about the missing workflow, not about invalid flags + require.Error(s.T(), res.Err) + require.NotContains(s.T(), res.Err.Error(), "required flag") + require.NotContains(s.T(), res.Err.Error(), "invalid argument") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_WithLastWorkflowTask() { + var wfExecutions, activityExecutions int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityExecutions++ + return nil, nil + }) + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil) + wfExecutions++ + return nil, nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + var junk any + s.NoError(run.Get(s.Context, &junk)) + s.Equal(1, wfExecutions) + + // Reset to the last workflow task with update options + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-t", "LastWorkflowTask", + "--reason", "test-reset-last-workflow-task-with-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 3*time.Second, 100*time.Millisecond) + + s.Equal(2, wfExecutions, "Should re-executed the workflow") + s.Equal(1, activityExecutions, "Should not have re-executed the activity") +} + +func (s *SharedServerSuite) TestWorkflow_ResetWithWorkflowUpdateOptions_WithEventID() { + // Test that the new subcommand works with event ID reset type + var activityCount int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityCount++ + return a, nil + }) + + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + var res any + if err := workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, &res); err != nil { + return res, err + } + err := workflow.ExecuteActivity(ctx, DevActivity, 2).Get(ctx, &res) + return res, err + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + require.NoError(s.T(), err) + var ignored any + s.NoError(run.Get(s.Context, &ignored)) + s.Equal(2, activityCount) + + // Reset with event ID and update options + res := s.Execute( + "workflow", "reset", "with-workflow-update-options", + "--address", s.Address(), + "-w", run.GetID(), + "-e", "3", // Use a known early event ID + "--reason", "test-reset-event-id-with-options", + "--versioning-override-behavior", "auto_upgrade", + ) + require.NoError(s.T(), res.Err) + + // Wait for reset to complete + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 2 && resp.Executions[0].Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED + }, 5*time.Second, 100*time.Millisecond) + + s.Greater(activityCount, 2, "Should have re-executed activities after reset") +} diff --git a/temporalcli/commandsgen/code.go b/temporalcli/commandsgen/code.go index e51296d46..e2784f90f 100644 --- a/temporalcli/commandsgen/code.go +++ b/temporalcli/commandsgen/code.go @@ -261,8 +261,8 @@ func (c *Command) writeCode(w *codeWriter) error { } w.writeLinef("}))") } - // If there are no subcommands, we need a run function - if len(subCommands) == 0 { + // If there are no subcommands, or if subcommands are optional, we need a run function + if len(subCommands) == 0 || c.SubcommandsOptional { w.writeLinef("s.Command.Run = func(c *%v.Command, args []string) {", w.importCobra()) w.writeLinef("if err := s.run(cctx, args); err != nil {") w.writeLinef("cctx.Options.Fail(err)") diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 222c51546..689f807e3 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -51,7 +51,8 @@ # * When commands have a single command-level option, include it the mandatory example. # * Use square bracket overviews to present how complex commands will be used. # * Yes: temporal operator [command] [subcommand] [options] -# Commands with subcommands can't be run on their own. +# Commands with subcommands can't be run on their own unless +# subcommands-optional is set to true. # Because of this, always use full command examples. # * Use square brackets to highlight optional elements, especially when long # descriptions would suffer from two very similar command invocations. @@ -94,6 +95,7 @@ # exact-args: Require this exact number of args. (int) # maximum-args: Require this maximum number of args. (int) # ignores-missing-env: Ignore missing environment variables. (bool) +# subcommands-optional: Allow command to be run even when it has subcommands. (bool) # options: A list of options. (Option[]) # - name: The option name. (string) # type: The option type. (string) @@ -483,7 +485,7 @@ commands: --reason YourReasonForTermination ``` - + Specify the Activity ID or Type and Workflow IDs: ``` @@ -689,7 +691,7 @@ commands: summary: Manage config files (EXPERIMENTAL) description: | Config files are TOML files that contain profiles, with each profile - containing configuration for connecting to Temporal. + containing configuration for connecting to Temporal. ``` temporal config set \ @@ -3417,6 +3419,7 @@ commands: - name: temporal workflow reset summary: Move Workflow Execution history point + subcommands-optional: true description: | Reset a Workflow Execution so it can resume from a point in its Event History without losing its progress up to that point: @@ -3506,6 +3509,14 @@ commands: Don't prompt to confirm. Only allowed when `--query` is present. + - name: temporal workflow reset with-workflow-update-options + summary: Update options on reset workflow + description: | + Run Workflow Update Options atomically after the Workflow is reset. + Workflows selected by the reset command are forwarded onto the subcommand. + option-sets: + - workflow-update-options + - name: temporal workflow result summary: Wait for and show the result of a Workflow Execution description: | @@ -3850,7 +3861,7 @@ commands: summary: Send an Update-With-Start and wait for it to be accepted or rejected (Experimental) description: | Send a message to a Workflow Execution to invoke an Update handler, and wait for - the update to be accepted or rejected. If the Workflow Execution is not running, + the update to be accepted or rejected. If the Workflow Execution is not running, then a new workflow execution is started and the update is sent. Experimental. @@ -4497,3 +4508,23 @@ option-sets: enum-values: - not_open - not_completed_cleanly + + - name: workflow-update-options + options: + - name: versioning-override-behavior + type: string-enum + description: | + Override the versioning behavior of a Workflow. + required: true + enum-values: + - pinned + - auto_upgrade + - name: versioning-override-deployment-name + type: string + description: When overriding to a `pinned` behavior, specifies the Deployment Name of the + version to target. + - name: versioning-override-build-id + type: string + description: When overriding to a `pinned` behavior, specifies the Build ID of the + version to target. + diff --git a/temporalcli/commandsgen/parse.go b/temporalcli/commandsgen/parse.go index a93c69482..568e38fd7 100644 --- a/temporalcli/commandsgen/parse.go +++ b/temporalcli/commandsgen/parse.go @@ -48,6 +48,7 @@ type ( ExactArgs int `yaml:"exact-args"` MaximumArgs int `yaml:"maximum-args"` IgnoreMissingEnv bool `yaml:"ignores-missing-env"` + SubcommandsOptional bool `yaml:"subcommands-optional"` Options []Option `yaml:"options"` OptionSets []string `yaml:"option-sets"` Docs Docs `yaml:"docs"`