diff --git a/.gitignore b/.gitignore index a8a5794dc..14713bfa8 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,5 @@ # Used by IDE /.idea /.vscode +/.zed *~ - diff --git a/go.mod b/go.mod index f251a0628..4867c9bd6 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,9 @@ require ( github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 github.com/temporalio/ui-server/v2 v2.36.0 - go.temporal.io/api v1.45.0 + go.temporal.io/api v1.46.0 go.temporal.io/sdk v1.33.0 - go.temporal.io/server v1.27.1 + go.temporal.io/server v1.28.0-130.0 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 @@ -138,9 +138,9 @@ require ( go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.35.0 // indirect golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect - golang.org/x/net v0.35.0 // indirect + golang.org/x/net v0.36.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect golang.org/x/sync v0.11.0 // indirect golang.org/x/sys v0.30.0 // indirect @@ -150,7 +150,6 @@ require ( google.golang.org/genproto v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect - gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/validator.v2 v2.0.1 // indirect modernc.org/libc v1.61.11 // indirect diff --git a/go.sum b/go.sum index 62d3e7b05..dcf5cb4c7 100644 --- a/go.sum +++ b/go.sum @@ -371,12 +371,12 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.45.0 h1:2FZ3eUoOYjavBaQi3/V93MBl99Nq1ifFRjrRwT3MeC8= -go.temporal.io/api v1.45.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.46.0 h1:O1efPDB6O2B8uIeCDIa+3VZC7tZMvYsMZYQapSbHvCg= +go.temporal.io/api v1.46.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.33.0 h1:T91UzeRdlHTiMGgpygsItOH9+VSkg+M/mG85PqNjdog= go.temporal.io/sdk v1.33.0/go.mod h1:WwCmJZLy7zabz3ar5NRAQEygsdP8tgR9sDjISSHuWZw= -go.temporal.io/server v1.27.1 h1:0dyBl25Ua7P4IOXifJg0xUXfnoYTNY1IlUFf1RL4OBo= -go.temporal.io/server v1.27.1/go.mod h1:ddxnsbsXvdZ/oRvjLHaL45NJUGMOPW+3RLkhpq9TOAs= +go.temporal.io/server v1.28.0-130.0 h1:I7juPBQoGfB6kHV7MVC6OnDzl4jRWpUzcsIQPlaXKmk= +go.temporal.io/server v1.28.0-130.0/go.mod h1:WjwWFpSiNEQp/whDLUU7IrUS7UewzJfAk3tQh0GMXZg= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -408,8 +408,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -449,8 +449,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE= golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -561,8 +561,6 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/go-jose/go-jose.v2 v2.6.3 h1:nt80fvSDlhKWQgSWyHyy5CfmlQr+asih51R8PTWNKKs= -gopkg.in/go-jose/go-jose.v2 v2.6.3/go.mod h1:zzZDPkNNw/c9IE7Z9jr11mBZQhKQTMzoEEIoEdZlFBI= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY= diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 04c626589..45a4069ca 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -190,6 +190,8 @@ type SharedWorkflowStartOptions struct { TaskTimeout Duration SearchAttribute []string Memo []string + StaticSummary string + StaticDetails string } func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { @@ -206,6 +208,8 @@ func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.F f.Var(&v.TaskTimeout, "task-timeout", "Fail a Workflow Task if it lasts longer than `DURATION`. This is the Start-to-close timeout for a Workflow Task.") f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Search Attribute in `KEY=VALUE` format. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.") f.StringArrayVar(&v.Memo, "memo", nil, "Memo using 'KEY=\"VALUE\"' pairs. Use JSON values.") + f.StringVar(&v.StaticSummary, "static-summary", "", "Static Workflow summary for human consumption in UIs. Uses Temporal Markdown formatting, should be a single line.") + f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines.") } type WorkflowStartOptions struct { @@ -2855,6 +2859,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowDeleteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowDescribeCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowExecuteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowFixHistoryJsonCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowMetadataCommand(cctx, &s).Command) @@ -2866,6 +2871,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowStartUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) @@ -3022,6 +3028,58 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor return &s } +type TemporalWorkflowExecuteUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowExecuteUpdateWithStartCommand { + var s TemporalWorkflowExecuteUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "execute-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to complete (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowFixHistoryJsonCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3403,6 +3461,62 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf return &s } +type TemporalWorkflowStartUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateWaitForStage StringEnum + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowStartUpdateWithStartCommand { + var s TemporalWorkflowStartUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "start-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.UpdateWaitForStage = NewStringEnum([]string{"accepted"}, "") + s.Command.Flags().Var(&s.UpdateWaitForStage, "update-wait-for-stage", "Update stage to wait for. The only option is `accepted`, but this option is required. This is to allow a future version of the CLI to choose a default value. Accepted values: accepted. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-wait-for-stage") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowTerminateCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3595,9 +3709,9 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora s.Command.Use = "start [flags]" s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" if hasHighlighting { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\x1b[0m" } else { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\n```" } s.Command.Args = cobra.NoArgs s.WaitForStage = NewStringEnum([]string{"accepted"}, "") diff --git a/temporalcli/commands.operator_search_attribute.go b/temporalcli/commands.operator_search_attribute.go index 405a31b85..afdca478c 100644 --- a/temporalcli/commands.operator_search_attribute.go +++ b/temporalcli/commands.operator_search_attribute.go @@ -60,7 +60,7 @@ func searchAttributeTypeStringToEnum(search string) (enums.IndexedValueType, err return enums.IndexedValueType(v), nil } } - return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %v", search) + return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %q", search) } func (c *TemporalOperatorSearchAttributeRemoveCommand) run(cctx *CommandContext, args []string) error { diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index 3d645cb2e..7cf300e88 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -270,6 +270,8 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c // RetryPolicy not supported yet UntypedSearchAttributes: untypedSearchAttributes, Memo: opts.Memo, + StaticSummary: opts.StaticSummary, + StaticDetails: opts.StaticDetails, } if action.Args, err = i.buildRawInput(); err != nil { return nil, err diff --git a/temporalcli/commands.schedule_test.go b/temporalcli/commands.schedule_test.go index ec2167438..d24dc02c2 100644 --- a/temporalcli/commands.schedule_test.go +++ b/temporalcli/commands.schedule_test.go @@ -176,6 +176,24 @@ func (s *SharedServerSuite) TestSchedule_CreateDescribe_SearchAttributes_Memo() s.ContainsOnSameLine(out, "Action", "wfMemo", b64(`"other data"`)) } +func (s *SharedServerSuite) TestSchedule_CreateDescribe_UserMetadata() { + schedId, _, res := s.createSchedule("--interval", "10d", + "--static-summary", "summ", + "--static-details", "details", + ) + s.NoError(res.Err) + + res = s.Execute( + "schedule", "describe", + "--address", s.Address(), + "-s", schedId, + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Action", "Summary", "summ") + s.ContainsOnSameLine(out, "Action", "Details", "details") +} + func (s *SharedServerSuite) TestSchedule_List() { res := s.Execute( "operator", "search-attribute", "create", diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index 08393a7d9..a88d89ba7 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "os" "reflect" @@ -121,7 +122,7 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s signalPayloadInputOpts := PayloadInputOptions{ Input: c.SignalInput, InputFile: c.SignalInputFile, - InputMeta: c.InputMeta, + InputMeta: c.SignalInputMeta, InputBase64: c.SignalInputBase64, } signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() @@ -194,16 +195,191 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s RunId string `json:"runId"` Type string `json:"type"` Namespace string `json:"namespace"` - TaskQueue string `json:"taskQueue"` }{ WorkflowId: c.WorkflowId, RunId: resp.RunId, Type: c.Type, Namespace: c.Parent.Namespace, - TaskQueue: c.TaskQueue, }, printer.StructuredOptions{}) } +func (c *TemporalWorkflowStartUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + waitForStage := client.WorkflowUpdateStageUnspecified + switch c.UpdateWaitForStage.Value { + case "accepted": + waitForStage = client.WorkflowUpdateStageAccepted + } + if waitForStage != client.WorkflowUpdateStageAccepted { + return fmt.Errorf("invalid wait for stage: %v, valid values are: 'accepted'", c.UpdateWaitForStage) + } + + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + updateOpts := client.UpdateWorkflowOptions{ + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + UpdateName: c.UpdateName, + Args: updateInput, + WaitForStage: waitForStage, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + // Currently we only accept 'accepted' as a valid wait for stage value, but we intend + // to support more in the future. + if waitForStage == client.WorkflowUpdateStageAccepted { + // Use a canceled context to check whether the initial server response + // shows that the update has _already_ failed, without issuing a second request. + ctx, cancel := context.WithCancel(cctx) + cancel() + err = handle.Get(ctx, nil) + var timeoutOrCanceledErr *client.WorkflowUpdateServiceTimeoutOrCanceledError + if err != nil && !errors.As(err, &timeoutOrCanceledErr) { + return fmt.Errorf("unable to update workflow: %w", err) + } + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: handle.UpdateID(), + }, printer.StructuredOptions{}) +} + +func (c *TemporalWorkflowExecuteUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + + updateOpts := client.UpdateWorkflowOptions{ + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + Args: updateInput, + WaitForStage: client.WorkflowUpdateStageCompleted, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + var valuePtr interface{} + err = handle.Get(cctx, &valuePtr) + if err != nil { + return fmt.Errorf("unable to update workflow: %w", err) + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + UpdateResult interface{} `json:"updateResult"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + UpdateResult: valuePtr, + }, printer.StructuredOptions{}) +} + +func executeUpdateWithStartWorkflow( + cctx *CommandContext, + clientOpts ClientOptions, + sharedWfOpts SharedWorkflowStartOptions, + wfStartOpts WorkflowStartOptions, + wfInputOpts PayloadInputOptions, + updateWfOpts client.UpdateWorkflowOptions, +) (client.WorkflowUpdateHandle, error) { + if sharedWfOpts.WorkflowId == "" { + return nil, fmt.Errorf("--workflow-id flag must be provided") + } + if wfStartOpts.IdConflictPolicy.Value == "" { + return nil, fmt.Errorf("--id-conflict-policy flag must be provided") + } + cl, err := clientOpts.dialClient(cctx) + if err != nil { + return nil, err + } + defer cl.Close() + + clStartWfOpts, err := buildStartOptions(&sharedWfOpts, &wfStartOpts) + if err != nil { + return nil, err + } + + wfArgs, err := wfInputOpts.buildRawInput() + if err != nil { + return nil, err + } + + startOp := cl.NewWithStartWorkflowOperation( + clStartWfOpts, + sharedWfOpts.Type, + wfArgs..., + ) + + // Execute the update with start operation + return cl.UpdateWithStartWorkflow(cctx, client.UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: startOp, + UpdateOptions: updateWfOpts, + }) +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` @@ -386,6 +562,8 @@ func buildStartOptions(sw *SharedWorkflowStartOptions, w *WorkflowStartOptions) CronSchedule: w.Cron, WorkflowExecutionErrorWhenAlreadyStarted: w.FailExisting, StartDelay: w.StartDelay.Duration(), + StaticSummary: sw.StaticSummary, + StaticDetails: sw.StaticDetails, } if w.IdReusePolicy.Value != "" { var err error diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index e706d9465..9d88a1100 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -675,8 +675,40 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { s.ErrorContains(res.Err, "--workflow-id flag must be provided") } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartNewWorkflow() { + s.testSignalWithStartHelper(false) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_SendSignalToExistingWorkflow() { + s.testSignalWithStartHelper(true) +} + +func (s *SharedServerSuite) testSignalWithStartHelper(useExistingWorkflow bool) { wfId := uuid.NewString() + signalWfInput := `"workflow-input"` + signalInput := `"signal-input"` + expectedWfOutput := map[string]string{ + "workflow": "workflow-input", + "signal": "signal-input", + } + + if useExistingWorkflow { + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + // Re-assign wfId for the signal to be sent to an existing workflow. + wfId = run.GetID() + expectedWfOutput["workflow"] = "not-signal-with-start-input" + } + + // Run workflow, block on signal. + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + var sigReceived string + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + wfState["signal"] = sigReceived + return wfState, nil + }) // Send signal-with-start command. res := s.Execute( @@ -684,84 +716,250 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { "--address", s.Address(), "--workflow-id", wfId, "--type", "DevWorkflow", - "--input", `{"wf-signal-with-start": "workflow-input"}`, - "--task-queue", "tq", + "--input", signalWfInput, + "--task-queue", s.Worker().Options.TaskQueue, "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, + "--signal-input", signalInput, ) - s.NoError(res.Err) - // Confirm text output has key/vals as expected out := res.Stdout.String() s.ContainsOnSameLine(out, "WorkflowId", wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", "tq") s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") - // Check that new workflow was started with expected workflow ID. - run := s.Client.GetWorkflow(s.Context, wfId, "") - s.Equal(wfId, run.GetID()) - - // Run workflow, block on signal. - var sigReceived any - s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil - }) + // Check that a new workflow was started with expected workflow ID. + if !useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + } // Wait for workflow to complete. - var wfReturn any + wfReturn := make(map[string]string) err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) s.NoError(err) - // Expect workflow to have received signal and given inputs from signal-with-start. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) - s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) + // Compare the extracted values with what the workflow returned + s.Equal(expectedWfOutput["signal"], wfReturn["signal"]) + s.Equal(expectedWfOutput["workflow"], wfReturn["workflow"]) } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { - // Run workflow, block on signal. - var sigReceived any +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +type updateWithStartTest struct { + updateWithStartSetup + useStart bool + idConflictPolicy string + expectedError string + expectedUpdateResult string + expectedWfOutput map[string]string +} + +type updateWithStartSetup struct { + wfId string + updateName string + updateId string + useExistingWorkflow bool +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_StartsNewWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_StartsWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedUpdateResult: "update-input", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) updateWithStartTestSetup(useExistingWorkflow bool) updateWithStartSetup { + wfId := uuid.NewString() + updateName := "test-update-name" + updateId := uuid.NewString() + if useExistingWorkflow { + // Start a workflow with a specific workflow ID. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + }, + DevWorkflow, + "not-update-with-start-workflow-input", + ) + s.NoError(err) + // Re-assign wfId for the update to be sent to an existing workflow. + wfId = run.GetID() + } + + // Run workflow. s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + + err := workflow.SetUpdateHandlerWithOptions( + ctx, + updateName, + func(ctx workflow.Context, updateInput string) (string, error) { + wfState["update"] = updateInput + return updateInput, nil + }, + workflow.UpdateHandlerOptions{}, + ) + if err != nil { + return nil, err + } + // Block workflow completion on signal. + workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) + return wfState, nil }) + return updateWithStartSetup{wfId, updateName, updateId, useExistingWorkflow} +} - // Start workflow - run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") - s.NoError(err) +func (s *SharedServerSuite) testStartUpdateWithStartHelper(opts updateWithStartTest) { + cmdName := "execute-update-with-start" + additionalArgs := []string{} - wfId := run.GetID() + if opts.useStart { + cmdName = "start-update-with-start" + additionalArgs = []string{"--update-wait-for-stage", "accepted"} + } - // Send signal-with-start command. - res := s.Execute( - "workflow", "signal-with-start", + baseArgs := []string{ + "workflow", cmdName, "--address", s.Address(), - "--workflow-id", wfId, + "--workflow-id", opts.wfId, "--type", "DevWorkflow", - "--input", `{"workflow": "workflow-input"}`, + "--input", `"workflow-input"`, "--task-queue", s.Worker().Options.TaskQueue, - "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, - ) + "--id-conflict-policy", opts.idConflictPolicy, + "--update-name", opts.updateName, + "--update-id", opts.updateId, + "--update-input", `"update-input"`, + } + + // Send start-update-with-start command. + args := append(baseArgs, additionalArgs...) + res := s.Execute(args...) + + // Check expected error. + if opts.expectedError != "" { + s.ErrorContains(res.Err, opts.expectedError) + return + } + s.NoError(res.Err) // Confirm text output has key/vals as expected out := res.Stdout.String() - s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.ContainsOnSameLine(out, "WorkflowId", opts.wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") + s.ContainsOnSameLine(out, "UpdateName", opts.updateName) + s.ContainsOnSameLine(out, "UpdateID", opts.updateId) + + // Check expected update result. + if opts.expectedUpdateResult != "" { + s.ContainsOnSameLine(out, "UpdateResult", opts.expectedUpdateResult) + } + + // Check that new workflow was started with expected workflow ID. + if !opts.useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, opts.wfId, "") + s.Equal(opts.wfId, run.GetID()) + } + + // Send signal to complete workflow. + err := s.Client.SignalWorkflow(s.Context, opts.wfId, "", "complete", nil) + s.NoError(err) // Wait for workflow to complete. - var ret any - s.NoError(run.Get(s.Context, &ret)) + wfReturn := make(map[string]string) + err = s.Client.GetWorkflow(s.Context, opts.wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) - // Expect workflow to have not been started by the signal-with-start command. - s.Equal("not-signal-with-start-input", ret) - // Expect signal to have been received with given input. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + // Expect workflow to have received update and given inputs from start-update-with-start. + s.Equal(opts.expectedWfOutput["workflow"], wfReturn["workflow"]) + s.Equal(opts.expectedWfOutput["update"], wfReturn["update"]) } diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 8e479c82f..02fddb10f 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -528,7 +528,9 @@ func (s *SharedServerSuite) TestWorkflow_Batch_Update_Options_Versioning_Overrid var jsonResp workflowservice.DescribeWorkflowExecutionResponse assert.NoError(t, temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) - versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + + versioningInfo := jsonResp.GetWorkflowExecutionInfo().GetVersioningInfo() + assert.NotNil(t, versioningInfo) assert.NotNil(t, versioningInfo.VersioningOverride) assert.Equal(t, version2, versioningInfo.VersioningOverride.PinnedVersion) } diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index a1b969322..94e721dec 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -134,6 +134,38 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin HistorySize: info.HistorySizeBytes, }, printer.StructuredOptions{}) + extendedInfo := resp.WorkflowExtendedInfo + if extendedInfo != nil { + cctx.Printer.Println(color.MagentaString("Extended Execution Info:")) + _ = cctx.Printer.PrintStructured(struct { + CancelRequested bool + ExecutionExpirationTime time.Time `cli:",cardOmitEmpty"` + RunExpirationTime time.Time `cli:",cardOmitEmpty"` + LastResetTime time.Time `cli:",cardOmitEmpty"` + OriginalStartTime time.Time `cli:",cardOmitEmpty"` + }{ + CancelRequested: extendedInfo.CancelRequested, + ExecutionExpirationTime: timestampToTime(extendedInfo.ExecutionExpirationTime), + RunExpirationTime: timestampToTime(extendedInfo.RunExpirationTime), + LastResetTime: timestampToTime(extendedInfo.LastResetTime), + OriginalStartTime: timestampToTime(extendedInfo.OriginalStartTime), + }, printer.StructuredOptions{}) + } + + staticSummary := resp.GetExecutionConfig().GetUserMetadata().GetSummary() + staticDetails := resp.GetExecutionConfig().GetUserMetadata().GetDetails() + if len(staticSummary.GetData()) > 0 || len(staticDetails.GetData()) > 0 { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Metadata:")) + _ = cctx.Printer.PrintStructured(struct { + StaticSummary *common.Payload + StaticDetails *common.Payload + }{ + StaticSummary: staticSummary, + StaticDetails: staticDetails, + }, printer.StructuredOptions{}) + } + if info.VersioningInfo != nil { cctx.Printer.Println() cctx.Printer.Println(color.MagentaString("Versioning Info:")) diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index fe169bbd4..31bfc96af 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -950,3 +950,46 @@ func (s *SharedServerSuite) Test_WorkflowResult() { s.Contains(output, `"message": "failed on purpose"`) s.Contains(output, "workflowExecutionFailedEventAttributes") } + +func (s *SharedServerSuite) TestWorkflow_Describe_WorkflowMetadata() { + workflowId := uuid.NewString() + + s.Worker().OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + return map[string]string{"foo": "bar"}, nil + }) + + res := s.Execute( + "workflow", "start", + "--address", s.Address(), + "--task-queue", s.Worker().Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", workflowId, + "--static-summary", "summie", + "--static-details", "deets", + ) + s.NoError(res.Err) + + // Text + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", workflowId, + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "StaticSummary", "summie") + s.ContainsOnSameLine(out, "StaticDetails", "deets") + + // JSON + res = s.Execute( + "workflow", "describe", + "-o", "json", + "--address", s.Address(), + "-w", workflowId, + ) + s.NoError(res.Err) + var jsonOut workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonOut, true)) + s.NotNil(jsonOut.ExecutionConfig.UserMetadata.Summary) + s.NotNil(jsonOut.ExecutionConfig.UserMetadata.Details) +} diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 85ec87d2e..b0e6597a8 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3673,6 +3673,7 @@ commands: --workflow-id YourWorkflowId \ --name YourUpdate \ --input '{"some-key": "some-value"}' + --wait-for-stage accepted ``` option-sets: - update-starting @@ -3688,6 +3689,163 @@ commands: - accepted required: true + - name: temporal workflow start-update-with-start + summary: Send an Update and wait for it to be accepted or rejected (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to be accepted or rejected. If the Workflow Execution is not running, + then a new workflow execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow start-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --update-wait-for-stage accepted \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-wait-for-stage + type: string-enum + description: | + Update stage to wait for. + The only option is `accepted`, but this option is required. This is to allow + a future version of the CLI to choose a default value. + enum-values: + - accepted + required: true + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + + - name: temporal workflow execute-update-with-start + summary: Send an Update and wait for it to complete (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to complete. If the Workflow Execution is not running, then a new workflow + execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow execute-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + option-sets: - name: client options: @@ -3991,6 +4149,18 @@ option-sets: description: | Memo using 'KEY="VALUE"' pairs. Use JSON values. + - name: static-summary + type: string + experimental: true + description: | + Static Workflow summary for human consumption in UIs. + Uses Temporal Markdown formatting, should be a single line. + - name: static-details + type: string + experimental: true + description: | + Static Workflow details for human consumption in UIs. + Uses Temporal Markdown formatting, may be multiple lines. - name: workflow-start options: