From 728274ac4b1cd7e7031c87e068eed0e78d608dac Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 27 Feb 2025 10:08:07 -0800 Subject: [PATCH 1/7] User Metadata set on start / desc (#742) --- .gitignore | 2 +- temporalcli/commands.gen.go | 4 ++ temporalcli/commands.schedule.go | 2 + temporalcli/commands.schedule_test.go | 18 +++++++++ temporalcli/commands.workflow_exec.go | 2 + temporalcli/commands.workflow_view.go | 14 +++++++ temporalcli/commands.workflow_view_test.go | 43 ++++++++++++++++++++++ temporalcli/commandsgen/commands.yml | 12 ++++++ 8 files changed, 96 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index a8a5794dc..14713bfa8 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,5 @@ # Used by IDE /.idea /.vscode +/.zed *~ - diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 04c626589..ccab78306 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -190,6 +190,8 @@ type SharedWorkflowStartOptions struct { TaskTimeout Duration SearchAttribute []string Memo []string + StaticSummary string + StaticDetails string } func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { @@ -206,6 +208,8 @@ func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.F f.Var(&v.TaskTimeout, "task-timeout", "Fail a Workflow Task if it lasts longer than `DURATION`. This is the Start-to-close timeout for a Workflow Task.") f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Search Attribute in `KEY=VALUE` format. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.") f.StringArrayVar(&v.Memo, "memo", nil, "Memo using 'KEY=\"VALUE\"' pairs. Use JSON values.") + f.StringVar(&v.StaticSummary, "static-summary", "", "Static Workflow summary for human consumption in UIs. Uses Temporal Markdown formatting, should be a single line.") + f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines.") } type WorkflowStartOptions struct { diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index 3d645cb2e..7cf300e88 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -270,6 +270,8 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c // RetryPolicy not supported yet UntypedSearchAttributes: untypedSearchAttributes, Memo: opts.Memo, + StaticSummary: opts.StaticSummary, + StaticDetails: opts.StaticDetails, } if action.Args, err = i.buildRawInput(); err != nil { return nil, err diff --git a/temporalcli/commands.schedule_test.go b/temporalcli/commands.schedule_test.go index ec2167438..d24dc02c2 100644 --- a/temporalcli/commands.schedule_test.go +++ b/temporalcli/commands.schedule_test.go @@ -176,6 +176,24 @@ func (s *SharedServerSuite) TestSchedule_CreateDescribe_SearchAttributes_Memo() s.ContainsOnSameLine(out, "Action", "wfMemo", b64(`"other data"`)) } +func (s *SharedServerSuite) TestSchedule_CreateDescribe_UserMetadata() { + schedId, _, res := s.createSchedule("--interval", "10d", + "--static-summary", "summ", + "--static-details", "details", + ) + s.NoError(res.Err) + + res = s.Execute( + "schedule", "describe", + "--address", s.Address(), + "-s", schedId, + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Action", "Summary", "summ") + s.ContainsOnSameLine(out, "Action", "Details", "details") +} + func (s *SharedServerSuite) TestSchedule_List() { res := s.Execute( "operator", "search-attribute", "create", diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index 08393a7d9..cb5cf77f6 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -386,6 +386,8 @@ func buildStartOptions(sw *SharedWorkflowStartOptions, w *WorkflowStartOptions) CronSchedule: w.Cron, WorkflowExecutionErrorWhenAlreadyStarted: w.FailExisting, StartDelay: w.StartDelay.Duration(), + StaticSummary: sw.StaticSummary, + StaticDetails: sw.StaticDetails, } if w.IdReusePolicy.Value != "" { var err error diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index a1b969322..70c25ddaa 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -134,6 +134,20 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin HistorySize: info.HistorySizeBytes, }, printer.StructuredOptions{}) + staticSummary := resp.GetExecutionConfig().GetUserMetadata().GetSummary() + staticDetails := resp.GetExecutionConfig().GetUserMetadata().GetDetails() + if len(staticSummary.GetData()) > 0 || len(staticDetails.GetData()) > 0 { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Metadata:")) + _ = cctx.Printer.PrintStructured(struct { + StaticSummary *common.Payload + StaticDetails *common.Payload + }{ + StaticSummary: staticSummary, + StaticDetails: staticDetails, + }, printer.StructuredOptions{}) + } + if info.VersioningInfo != nil { cctx.Printer.Println() cctx.Printer.Println(color.MagentaString("Versioning Info:")) diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index fe169bbd4..31bfc96af 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -950,3 +950,46 @@ func (s *SharedServerSuite) Test_WorkflowResult() { s.Contains(output, `"message": "failed on purpose"`) s.Contains(output, "workflowExecutionFailedEventAttributes") } + +func (s *SharedServerSuite) TestWorkflow_Describe_WorkflowMetadata() { + workflowId := uuid.NewString() + + s.Worker().OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + return map[string]string{"foo": "bar"}, nil + }) + + res := s.Execute( + "workflow", "start", + "--address", s.Address(), + "--task-queue", s.Worker().Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", workflowId, + "--static-summary", "summie", + "--static-details", "deets", + ) + s.NoError(res.Err) + + // Text + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", workflowId, + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "StaticSummary", "summie") + s.ContainsOnSameLine(out, "StaticDetails", "deets") + + // JSON + res = s.Execute( + "workflow", "describe", + "-o", "json", + "--address", s.Address(), + "-w", workflowId, + ) + s.NoError(res.Err) + var jsonOut workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonOut, true)) + s.NotNil(jsonOut.ExecutionConfig.UserMetadata.Summary) + s.NotNil(jsonOut.ExecutionConfig.UserMetadata.Details) +} diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 85ec87d2e..ea676a117 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3991,6 +3991,18 @@ option-sets: description: | Memo using 'KEY="VALUE"' pairs. Use JSON values. + - name: static-summary + type: string + experimental: true + description: | + Static Workflow summary for human consumption in UIs. + Uses Temporal Markdown formatting, should be a single line. + - name: static-details + type: string + experimental: true + description: | + Static Workflow details for human consumption in UIs. + Uses Temporal Markdown formatting, may be multiple lines. - name: workflow-start options: From cbf2b52eb78a011d843c1c0005b5d24257938bcc Mon Sep 17 00:00:00 2001 From: Antonio Lain <135073478+antlai-temporal@users.noreply.github.com> Date: Thu, 27 Feb 2025 12:31:19 -0800 Subject: [PATCH 2/7] Fix batch deployment test nil pointer exception (#769) --- temporalcli/commands.workflow_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 8e479c82f..02fddb10f 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -528,7 +528,9 @@ func (s *SharedServerSuite) TestWorkflow_Batch_Update_Options_Versioning_Overrid var jsonResp workflowservice.DescribeWorkflowExecutionResponse assert.NoError(t, temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) - versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + + versioningInfo := jsonResp.GetWorkflowExecutionInfo().GetVersioningInfo() + assert.NotNil(t, versioningInfo) assert.NotNil(t, versioningInfo.VersioningOverride) assert.Equal(t, version2, versioningInfo.VersioningOverride.PinnedVersion) } From 97cc6c27149b812d92f43abef2c2250ca79c53e0 Mon Sep 17 00:00:00 2001 From: Yuri Date: Thu, 27 Feb 2025 12:49:39 -0800 Subject: [PATCH 3/7] Add fields from extended info to DescribeWorkflow output (#771) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What was changed Add extended info to the DescribeWorkflow output. ## Why? Customer request. --- temporalcli/commands.workflow_view.go | 67 ++++++++++++++++----------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index 70c25ddaa..75f1db34e 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -102,36 +102,47 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin cctx.Printer.Println(color.MagentaString("Execution Info:")) info := resp.WorkflowExecutionInfo + extendedInfo := resp.WorkflowExtendedInfo _ = cctx.Printer.PrintStructured(struct { - WorkflowId string - RunId string - Type string - Namespace string - TaskQueue string - AssignedBuildId string - StartTime time.Time - CloseTime time.Time `cli:",cardOmitEmpty"` - ExecutionTime time.Time `cli:",cardOmitEmpty"` - Memo map[string]*common.Payload `cli:",cardOmitEmpty"` - SearchAttributes map[string]*common.Payload `cli:",cardOmitEmpty"` - StateTransitionCount int64 - HistoryLength int64 - HistorySize int64 + WorkflowId string + RunId string + Type string + Namespace string + TaskQueue string + AssignedBuildId string + StartTime time.Time + CloseTime time.Time `cli:",cardOmitEmpty"` + ExecutionTime time.Time `cli:",cardOmitEmpty"` + Memo map[string]*common.Payload `cli:",cardOmitEmpty"` + SearchAttributes map[string]*common.Payload `cli:",cardOmitEmpty"` + StateTransitionCount int64 + HistoryLength int64 + HistorySize int64 + ExecutionExpirationTime time.Time `cli:",cardOmitEmpty"` + RunExpirationTime time.Time `cli:",cardOmitEmpty"` + CancelRequested bool + LastResetTime time.Time `cli:",cardOmitEmpty"` + OriginalStartTime time.Time `cli:",cardOmitEmpty"` }{ - WorkflowId: info.Execution.WorkflowId, - RunId: info.Execution.RunId, - Type: info.Type.GetName(), - Namespace: c.Parent.Namespace, - TaskQueue: info.TaskQueue, - AssignedBuildId: info.GetAssignedBuildId(), - StartTime: timestampToTime(info.StartTime), - CloseTime: timestampToTime(info.CloseTime), - ExecutionTime: timestampToTime(info.ExecutionTime), - Memo: info.Memo.GetFields(), - SearchAttributes: info.SearchAttributes.GetIndexedFields(), - StateTransitionCount: info.StateTransitionCount, - HistoryLength: info.HistoryLength, - HistorySize: info.HistorySizeBytes, + WorkflowId: info.Execution.WorkflowId, + RunId: info.Execution.RunId, + Type: info.Type.GetName(), + Namespace: c.Parent.Namespace, + TaskQueue: info.TaskQueue, + AssignedBuildId: info.GetAssignedBuildId(), + StartTime: timestampToTime(info.StartTime), + CloseTime: timestampToTime(info.CloseTime), + ExecutionTime: timestampToTime(info.ExecutionTime), + Memo: info.Memo.GetFields(), + SearchAttributes: info.SearchAttributes.GetIndexedFields(), + StateTransitionCount: info.StateTransitionCount, + HistoryLength: info.HistoryLength, + HistorySize: info.HistorySizeBytes, + ExecutionExpirationTime: timestampToTime(extendedInfo.ExecutionExpirationTime), + RunExpirationTime: timestampToTime(extendedInfo.RunExpirationTime), + CancelRequested: extendedInfo.CancelRequested, + LastResetTime: timestampToTime(extendedInfo.LastResetTime), + OriginalStartTime: timestampToTime(extendedInfo.OriginalStartTime), }, printer.StructuredOptions{}) staticSummary := resp.GetExecutionConfig().GetUserMetadata().GetSummary() From 94883d3b86f70b77f9d2e22c6a8015603fd88d05 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 28 Feb 2025 09:32:02 -0800 Subject: [PATCH 4/7] added `workflow start-update-with-start` and `workflow execute-update-with-start` commands (#762) added `temporal workflow start-update-with-start` and `temporal workflow execute-update-with-start` commands `temporal workflow start-update-with-start` usage: ``` temporal workflow start-update-with-start \ --update-name YourUpdate \ --update-input '{"update-key": "update-value"}' \ --update-wait-for-stage accepted \ --workflow-id YourWorkflowId \ --type YourWorkflowType \ --task-queue YourTaskQueue \ --id-conflict-policy Fail \ --input '{"wf-key": "wf-value"}' ``` `temporal workflow execute-update-with-start` usage: ``` temporal workflow execute-update-with-start \ --update-name YourUpdate \ --update-input '{"update-key": "update-value"}' \ --workflow-id YourWorkflowId \ --type YourWorkflowType \ --task-queue YourTaskQueue \ --id-conflict-policy Fail \ --input '{"wf-key": "wf-value"}' ``` 1. Closes #664 2. How was this tested: 3. Any docs updates needed? Yes --- temporalcli/commands.gen.go | 114 +++++++- temporalcli/commands.workflow_exec.go | 182 ++++++++++++- temporalcli/commands.workflow_exec_test.go | 290 +++++++++++++++++---- temporalcli/commandsgen/commands.yml | 158 +++++++++++ 4 files changed, 693 insertions(+), 51 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index ccab78306..45a4069ca 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -2859,6 +2859,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowDeleteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowDescribeCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowExecuteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowFixHistoryJsonCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowMetadataCommand(cctx, &s).Command) @@ -2870,6 +2871,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowStartUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) @@ -3026,6 +3028,58 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor return &s } +type TemporalWorkflowExecuteUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowExecuteUpdateWithStartCommand { + var s TemporalWorkflowExecuteUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "execute-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to complete (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowFixHistoryJsonCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3407,6 +3461,62 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf return &s } +type TemporalWorkflowStartUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateWaitForStage StringEnum + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowStartUpdateWithStartCommand { + var s TemporalWorkflowStartUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "start-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.UpdateWaitForStage = NewStringEnum([]string{"accepted"}, "") + s.Command.Flags().Var(&s.UpdateWaitForStage, "update-wait-for-stage", "Update stage to wait for. The only option is `accepted`, but this option is required. This is to allow a future version of the CLI to choose a default value. Accepted values: accepted. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-wait-for-stage") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowTerminateCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3599,9 +3709,9 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora s.Command.Use = "start [flags]" s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" if hasHighlighting { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\x1b[0m" } else { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\n```" } s.Command.Args = cobra.NoArgs s.WaitForStage = NewStringEnum([]string{"accepted"}, "") diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index cb5cf77f6..a88d89ba7 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "os" "reflect" @@ -121,7 +122,7 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s signalPayloadInputOpts := PayloadInputOptions{ Input: c.SignalInput, InputFile: c.SignalInputFile, - InputMeta: c.InputMeta, + InputMeta: c.SignalInputMeta, InputBase64: c.SignalInputBase64, } signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() @@ -194,16 +195,191 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s RunId string `json:"runId"` Type string `json:"type"` Namespace string `json:"namespace"` - TaskQueue string `json:"taskQueue"` }{ WorkflowId: c.WorkflowId, RunId: resp.RunId, Type: c.Type, Namespace: c.Parent.Namespace, - TaskQueue: c.TaskQueue, }, printer.StructuredOptions{}) } +func (c *TemporalWorkflowStartUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + waitForStage := client.WorkflowUpdateStageUnspecified + switch c.UpdateWaitForStage.Value { + case "accepted": + waitForStage = client.WorkflowUpdateStageAccepted + } + if waitForStage != client.WorkflowUpdateStageAccepted { + return fmt.Errorf("invalid wait for stage: %v, valid values are: 'accepted'", c.UpdateWaitForStage) + } + + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + updateOpts := client.UpdateWorkflowOptions{ + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + UpdateName: c.UpdateName, + Args: updateInput, + WaitForStage: waitForStage, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + // Currently we only accept 'accepted' as a valid wait for stage value, but we intend + // to support more in the future. + if waitForStage == client.WorkflowUpdateStageAccepted { + // Use a canceled context to check whether the initial server response + // shows that the update has _already_ failed, without issuing a second request. + ctx, cancel := context.WithCancel(cctx) + cancel() + err = handle.Get(ctx, nil) + var timeoutOrCanceledErr *client.WorkflowUpdateServiceTimeoutOrCanceledError + if err != nil && !errors.As(err, &timeoutOrCanceledErr) { + return fmt.Errorf("unable to update workflow: %w", err) + } + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: handle.UpdateID(), + }, printer.StructuredOptions{}) +} + +func (c *TemporalWorkflowExecuteUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + + updateOpts := client.UpdateWorkflowOptions{ + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + Args: updateInput, + WaitForStage: client.WorkflowUpdateStageCompleted, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + var valuePtr interface{} + err = handle.Get(cctx, &valuePtr) + if err != nil { + return fmt.Errorf("unable to update workflow: %w", err) + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + UpdateResult interface{} `json:"updateResult"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + UpdateResult: valuePtr, + }, printer.StructuredOptions{}) +} + +func executeUpdateWithStartWorkflow( + cctx *CommandContext, + clientOpts ClientOptions, + sharedWfOpts SharedWorkflowStartOptions, + wfStartOpts WorkflowStartOptions, + wfInputOpts PayloadInputOptions, + updateWfOpts client.UpdateWorkflowOptions, +) (client.WorkflowUpdateHandle, error) { + if sharedWfOpts.WorkflowId == "" { + return nil, fmt.Errorf("--workflow-id flag must be provided") + } + if wfStartOpts.IdConflictPolicy.Value == "" { + return nil, fmt.Errorf("--id-conflict-policy flag must be provided") + } + cl, err := clientOpts.dialClient(cctx) + if err != nil { + return nil, err + } + defer cl.Close() + + clStartWfOpts, err := buildStartOptions(&sharedWfOpts, &wfStartOpts) + if err != nil { + return nil, err + } + + wfArgs, err := wfInputOpts.buildRawInput() + if err != nil { + return nil, err + } + + startOp := cl.NewWithStartWorkflowOperation( + clStartWfOpts, + sharedWfOpts.Type, + wfArgs..., + ) + + // Execute the update with start operation + return cl.UpdateWithStartWorkflow(cctx, client.UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: startOp, + UpdateOptions: updateWfOpts, + }) +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index e706d9465..9d88a1100 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -675,8 +675,40 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { s.ErrorContains(res.Err, "--workflow-id flag must be provided") } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartNewWorkflow() { + s.testSignalWithStartHelper(false) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_SendSignalToExistingWorkflow() { + s.testSignalWithStartHelper(true) +} + +func (s *SharedServerSuite) testSignalWithStartHelper(useExistingWorkflow bool) { wfId := uuid.NewString() + signalWfInput := `"workflow-input"` + signalInput := `"signal-input"` + expectedWfOutput := map[string]string{ + "workflow": "workflow-input", + "signal": "signal-input", + } + + if useExistingWorkflow { + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + // Re-assign wfId for the signal to be sent to an existing workflow. + wfId = run.GetID() + expectedWfOutput["workflow"] = "not-signal-with-start-input" + } + + // Run workflow, block on signal. + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + var sigReceived string + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + wfState["signal"] = sigReceived + return wfState, nil + }) // Send signal-with-start command. res := s.Execute( @@ -684,84 +716,250 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { "--address", s.Address(), "--workflow-id", wfId, "--type", "DevWorkflow", - "--input", `{"wf-signal-with-start": "workflow-input"}`, - "--task-queue", "tq", + "--input", signalWfInput, + "--task-queue", s.Worker().Options.TaskQueue, "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, + "--signal-input", signalInput, ) - s.NoError(res.Err) - // Confirm text output has key/vals as expected out := res.Stdout.String() s.ContainsOnSameLine(out, "WorkflowId", wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", "tq") s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") - // Check that new workflow was started with expected workflow ID. - run := s.Client.GetWorkflow(s.Context, wfId, "") - s.Equal(wfId, run.GetID()) - - // Run workflow, block on signal. - var sigReceived any - s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil - }) + // Check that a new workflow was started with expected workflow ID. + if !useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + } // Wait for workflow to complete. - var wfReturn any + wfReturn := make(map[string]string) err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) s.NoError(err) - // Expect workflow to have received signal and given inputs from signal-with-start. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) - s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) + // Compare the extracted values with what the workflow returned + s.Equal(expectedWfOutput["signal"], wfReturn["signal"]) + s.Equal(expectedWfOutput["workflow"], wfReturn["workflow"]) } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { - // Run workflow, block on signal. - var sigReceived any +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +type updateWithStartTest struct { + updateWithStartSetup + useStart bool + idConflictPolicy string + expectedError string + expectedUpdateResult string + expectedWfOutput map[string]string +} + +type updateWithStartSetup struct { + wfId string + updateName string + updateId string + useExistingWorkflow bool +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_StartsNewWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_StartsWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedUpdateResult: "update-input", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) updateWithStartTestSetup(useExistingWorkflow bool) updateWithStartSetup { + wfId := uuid.NewString() + updateName := "test-update-name" + updateId := uuid.NewString() + if useExistingWorkflow { + // Start a workflow with a specific workflow ID. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + }, + DevWorkflow, + "not-update-with-start-workflow-input", + ) + s.NoError(err) + // Re-assign wfId for the update to be sent to an existing workflow. + wfId = run.GetID() + } + + // Run workflow. s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + + err := workflow.SetUpdateHandlerWithOptions( + ctx, + updateName, + func(ctx workflow.Context, updateInput string) (string, error) { + wfState["update"] = updateInput + return updateInput, nil + }, + workflow.UpdateHandlerOptions{}, + ) + if err != nil { + return nil, err + } + // Block workflow completion on signal. + workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) + return wfState, nil }) + return updateWithStartSetup{wfId, updateName, updateId, useExistingWorkflow} +} - // Start workflow - run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") - s.NoError(err) +func (s *SharedServerSuite) testStartUpdateWithStartHelper(opts updateWithStartTest) { + cmdName := "execute-update-with-start" + additionalArgs := []string{} - wfId := run.GetID() + if opts.useStart { + cmdName = "start-update-with-start" + additionalArgs = []string{"--update-wait-for-stage", "accepted"} + } - // Send signal-with-start command. - res := s.Execute( - "workflow", "signal-with-start", + baseArgs := []string{ + "workflow", cmdName, "--address", s.Address(), - "--workflow-id", wfId, + "--workflow-id", opts.wfId, "--type", "DevWorkflow", - "--input", `{"workflow": "workflow-input"}`, + "--input", `"workflow-input"`, "--task-queue", s.Worker().Options.TaskQueue, - "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, - ) + "--id-conflict-policy", opts.idConflictPolicy, + "--update-name", opts.updateName, + "--update-id", opts.updateId, + "--update-input", `"update-input"`, + } + + // Send start-update-with-start command. + args := append(baseArgs, additionalArgs...) + res := s.Execute(args...) + + // Check expected error. + if opts.expectedError != "" { + s.ErrorContains(res.Err, opts.expectedError) + return + } + s.NoError(res.Err) // Confirm text output has key/vals as expected out := res.Stdout.String() - s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.ContainsOnSameLine(out, "WorkflowId", opts.wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") + s.ContainsOnSameLine(out, "UpdateName", opts.updateName) + s.ContainsOnSameLine(out, "UpdateID", opts.updateId) + + // Check expected update result. + if opts.expectedUpdateResult != "" { + s.ContainsOnSameLine(out, "UpdateResult", opts.expectedUpdateResult) + } + + // Check that new workflow was started with expected workflow ID. + if !opts.useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, opts.wfId, "") + s.Equal(opts.wfId, run.GetID()) + } + + // Send signal to complete workflow. + err := s.Client.SignalWorkflow(s.Context, opts.wfId, "", "complete", nil) + s.NoError(err) // Wait for workflow to complete. - var ret any - s.NoError(run.Get(s.Context, &ret)) + wfReturn := make(map[string]string) + err = s.Client.GetWorkflow(s.Context, opts.wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) - // Expect workflow to have not been started by the signal-with-start command. - s.Equal("not-signal-with-start-input", ret) - // Expect signal to have been received with given input. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + // Expect workflow to have received update and given inputs from start-update-with-start. + s.Equal(opts.expectedWfOutput["workflow"], wfReturn["workflow"]) + s.Equal(opts.expectedWfOutput["update"], wfReturn["update"]) } diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index ea676a117..b0e6597a8 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3673,6 +3673,7 @@ commands: --workflow-id YourWorkflowId \ --name YourUpdate \ --input '{"some-key": "some-value"}' + --wait-for-stage accepted ``` option-sets: - update-starting @@ -3688,6 +3689,163 @@ commands: - accepted required: true + - name: temporal workflow start-update-with-start + summary: Send an Update and wait for it to be accepted or rejected (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to be accepted or rejected. If the Workflow Execution is not running, + then a new workflow execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow start-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --update-wait-for-stage accepted \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-wait-for-stage + type: string-enum + description: | + Update stage to wait for. + The only option is `accepted`, but this option is required. This is to allow + a future version of the CLI to choose a default value. + enum-values: + - accepted + required: true + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + + - name: temporal workflow execute-update-with-start + summary: Send an Update and wait for it to complete (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to complete. If the Workflow Execution is not running, then a new workflow + execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow execute-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + option-sets: - name: client options: From 57e8cecf5ecd6a69d77edc7aaa190036588eb257 Mon Sep 17 00:00:00 2001 From: Jacob Barzee Date: Wed, 5 Mar 2025 15:46:59 -0700 Subject: [PATCH 5/7] quote attribute type to make whitespace more obvious (#772) ## What was changed quote attribute type in error message ## Why? To make whitespace more obvious --------- Co-authored-by: Rodrigo Zhou <2068124+rodrigozhou@users.noreply.github.com> --- temporalcli/commands.operator_search_attribute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalcli/commands.operator_search_attribute.go b/temporalcli/commands.operator_search_attribute.go index 405a31b85..afdca478c 100644 --- a/temporalcli/commands.operator_search_attribute.go +++ b/temporalcli/commands.operator_search_attribute.go @@ -60,7 +60,7 @@ func searchAttributeTypeStringToEnum(search string) (enums.IndexedValueType, err return enums.IndexedValueType(v), nil } } - return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %v", search) + return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %q", search) } func (c *TemporalOperatorSearchAttributeRemoveCommand) run(cctx *CommandContext, args []string) error { From 1a88da6ffe6b2957f13ca3ec9954e4b549ec3948 Mon Sep 17 00:00:00 2001 From: Yuri Date: Fri, 7 Mar 2025 12:34:07 -0800 Subject: [PATCH 6/7] Fix for DescribeWorkflow (#773) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What was changed Check if extended info is not nil. ## Why? https://github.com/temporalio/cli/pull/771 Code assumes that some extended info exists in proto. Which is not true for older server versions.. --- temporalcli/commands.workflow_view.go | 85 +++++++++++++++------------ 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index 75f1db34e..94e721dec 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -102,49 +102,56 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin cctx.Printer.Println(color.MagentaString("Execution Info:")) info := resp.WorkflowExecutionInfo - extendedInfo := resp.WorkflowExtendedInfo _ = cctx.Printer.PrintStructured(struct { - WorkflowId string - RunId string - Type string - Namespace string - TaskQueue string - AssignedBuildId string - StartTime time.Time - CloseTime time.Time `cli:",cardOmitEmpty"` - ExecutionTime time.Time `cli:",cardOmitEmpty"` - Memo map[string]*common.Payload `cli:",cardOmitEmpty"` - SearchAttributes map[string]*common.Payload `cli:",cardOmitEmpty"` - StateTransitionCount int64 - HistoryLength int64 - HistorySize int64 - ExecutionExpirationTime time.Time `cli:",cardOmitEmpty"` - RunExpirationTime time.Time `cli:",cardOmitEmpty"` - CancelRequested bool - LastResetTime time.Time `cli:",cardOmitEmpty"` - OriginalStartTime time.Time `cli:",cardOmitEmpty"` + WorkflowId string + RunId string + Type string + Namespace string + TaskQueue string + AssignedBuildId string + StartTime time.Time + CloseTime time.Time `cli:",cardOmitEmpty"` + ExecutionTime time.Time `cli:",cardOmitEmpty"` + Memo map[string]*common.Payload `cli:",cardOmitEmpty"` + SearchAttributes map[string]*common.Payload `cli:",cardOmitEmpty"` + StateTransitionCount int64 + HistoryLength int64 + HistorySize int64 }{ - WorkflowId: info.Execution.WorkflowId, - RunId: info.Execution.RunId, - Type: info.Type.GetName(), - Namespace: c.Parent.Namespace, - TaskQueue: info.TaskQueue, - AssignedBuildId: info.GetAssignedBuildId(), - StartTime: timestampToTime(info.StartTime), - CloseTime: timestampToTime(info.CloseTime), - ExecutionTime: timestampToTime(info.ExecutionTime), - Memo: info.Memo.GetFields(), - SearchAttributes: info.SearchAttributes.GetIndexedFields(), - StateTransitionCount: info.StateTransitionCount, - HistoryLength: info.HistoryLength, - HistorySize: info.HistorySizeBytes, - ExecutionExpirationTime: timestampToTime(extendedInfo.ExecutionExpirationTime), - RunExpirationTime: timestampToTime(extendedInfo.RunExpirationTime), - CancelRequested: extendedInfo.CancelRequested, - LastResetTime: timestampToTime(extendedInfo.LastResetTime), - OriginalStartTime: timestampToTime(extendedInfo.OriginalStartTime), + WorkflowId: info.Execution.WorkflowId, + RunId: info.Execution.RunId, + Type: info.Type.GetName(), + Namespace: c.Parent.Namespace, + TaskQueue: info.TaskQueue, + AssignedBuildId: info.GetAssignedBuildId(), + StartTime: timestampToTime(info.StartTime), + CloseTime: timestampToTime(info.CloseTime), + ExecutionTime: timestampToTime(info.ExecutionTime), + Memo: info.Memo.GetFields(), + SearchAttributes: info.SearchAttributes.GetIndexedFields(), + StateTransitionCount: info.StateTransitionCount, + HistoryLength: info.HistoryLength, + HistorySize: info.HistorySizeBytes, }, printer.StructuredOptions{}) + extendedInfo := resp.WorkflowExtendedInfo + if extendedInfo != nil { + cctx.Printer.Println(color.MagentaString("Extended Execution Info:")) + _ = cctx.Printer.PrintStructured(struct { + CancelRequested bool + ExecutionExpirationTime time.Time `cli:",cardOmitEmpty"` + RunExpirationTime time.Time `cli:",cardOmitEmpty"` + LastResetTime time.Time `cli:",cardOmitEmpty"` + OriginalStartTime time.Time `cli:",cardOmitEmpty"` + }{ + CancelRequested: extendedInfo.CancelRequested, + ExecutionExpirationTime: timestampToTime(extendedInfo.ExecutionExpirationTime), + RunExpirationTime: timestampToTime(extendedInfo.RunExpirationTime), + LastResetTime: timestampToTime(extendedInfo.LastResetTime), + OriginalStartTime: timestampToTime(extendedInfo.OriginalStartTime), + }, printer.StructuredOptions{}) + } + staticSummary := resp.GetExecutionConfig().GetUserMetadata().GetSummary() staticDetails := resp.GetExecutionConfig().GetUserMetadata().GetDetails() if len(staticSummary.GetData()) > 0 || len(staticDetails.GetData()) > 0 { From 424a3097ce89e53a62a1e5dfc7b423453c624f08 Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Tue, 25 Mar 2025 11:51:57 -0700 Subject: [PATCH 7/7] Upgrade server to v1.28.0-130.0 --- go.mod | 9 ++++----- go.sum | 18 ++++++++---------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index f251a0628..4867c9bd6 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,9 @@ require ( github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 github.com/temporalio/ui-server/v2 v2.36.0 - go.temporal.io/api v1.45.0 + go.temporal.io/api v1.46.0 go.temporal.io/sdk v1.33.0 - go.temporal.io/server v1.27.1 + go.temporal.io/server v1.28.0-130.0 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 @@ -138,9 +138,9 @@ require ( go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.35.0 // indirect golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect - golang.org/x/net v0.35.0 // indirect + golang.org/x/net v0.36.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect golang.org/x/sync v0.11.0 // indirect golang.org/x/sys v0.30.0 // indirect @@ -150,7 +150,6 @@ require ( google.golang.org/genproto v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect - gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/validator.v2 v2.0.1 // indirect modernc.org/libc v1.61.11 // indirect diff --git a/go.sum b/go.sum index 62d3e7b05..dcf5cb4c7 100644 --- a/go.sum +++ b/go.sum @@ -371,12 +371,12 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.45.0 h1:2FZ3eUoOYjavBaQi3/V93MBl99Nq1ifFRjrRwT3MeC8= -go.temporal.io/api v1.45.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.46.0 h1:O1efPDB6O2B8uIeCDIa+3VZC7tZMvYsMZYQapSbHvCg= +go.temporal.io/api v1.46.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.33.0 h1:T91UzeRdlHTiMGgpygsItOH9+VSkg+M/mG85PqNjdog= go.temporal.io/sdk v1.33.0/go.mod h1:WwCmJZLy7zabz3ar5NRAQEygsdP8tgR9sDjISSHuWZw= -go.temporal.io/server v1.27.1 h1:0dyBl25Ua7P4IOXifJg0xUXfnoYTNY1IlUFf1RL4OBo= -go.temporal.io/server v1.27.1/go.mod h1:ddxnsbsXvdZ/oRvjLHaL45NJUGMOPW+3RLkhpq9TOAs= +go.temporal.io/server v1.28.0-130.0 h1:I7juPBQoGfB6kHV7MVC6OnDzl4jRWpUzcsIQPlaXKmk= +go.temporal.io/server v1.28.0-130.0/go.mod h1:WjwWFpSiNEQp/whDLUU7IrUS7UewzJfAk3tQh0GMXZg= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -408,8 +408,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -449,8 +449,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE= golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -561,8 +561,6 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/go-jose/go-jose.v2 v2.6.3 h1:nt80fvSDlhKWQgSWyHyy5CfmlQr+asih51R8PTWNKKs= -gopkg.in/go-jose/go-jose.v2 v2.6.3/go.mod h1:zzZDPkNNw/c9IE7Z9jr11mBZQhKQTMzoEEIoEdZlFBI= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY=