From 38cfcb3ce962bea8fd6f3304e6bee8604574c183 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 20 Feb 2025 15:59:24 -0500 Subject: [PATCH 1/4] Added `temporal workflow signal-with-start` command. Description and sample usage: Send an asynchronous notification (Signal) to a Workflow Execution. If the Workflow Execution is not running or is not found, it starts the workflow then sends the signal. ``` temporal workflow signal-with-start \ --signal-name YourSignal \ --signal-input '{"some-key": "some-value"}' \ --workflow-id YourWorkflowId \ --type YourWorkflowType \ --task-queue YourTaskQueue \ --input '{"some-key": "some-value"}' ``` --- temporalcli/commands.gen.go | 47 ++++++++ temporalcli/commands.schedule.go | 4 +- temporalcli/commands.workflow.go | 90 ++++++++++++++++ temporalcli/commands.workflow_test.go | 100 ++++++++++++++++++ temporalcli/commandsgen/commands.yml | 54 ++++++++++ temporalcli/internal/cmd/gen-commands/main.go | 2 +- 6 files changed, 294 insertions(+), 3 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 3ca46ec7f..71d6ddb30 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -2704,6 +2704,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) @@ -3131,6 +3132,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork return &s } +type TemporalWorkflowSignalWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + SignalName string + SignalInput []string + SignalInputFile []string + SignalInputMeta []string + SignalInputBase64 bool +} + +func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand { + var s TemporalWorkflowSignalWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "signal-with-start [flags]" + s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running" + if hasHighlighting { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name") + s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "signal-type": "signal-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowStackCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index e458ba918..bd181b145 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -257,7 +257,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c if err != nil { return nil, err } - untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes) + untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes) if err != nil { return nil, err } @@ -606,7 +606,7 @@ func formatDuration(d time.Duration) string { return s } -func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { +func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { if len(in) == 0 { return nil, nil } diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 421fa354b..40a95d11d 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -15,15 +15,19 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" "go.temporal.io/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/query/v1" sdkpb "go.temporal.io/api/sdk/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/update/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -262,6 +266,92 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) return nil } +func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error { + if c.SharedWorkflowStartOptions.WorkflowId == "" { + return fmt.Errorf("--workflow-id flag must be provided") + } + + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions) + if err != nil { + return err + } + wfInput, err := c.buildRawInputPayloads() + if err != nil { + return err + } + + signalPayloadInputOpts := PayloadInputOptions{ + Input: c.SignalInput, + InputFile: c.SignalInputFile, + InputMeta: c.InputMeta, + InputBase64: c.SignalInputBase64, + } + signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() + if err != nil { + return err + } + + var retryPolicy *common.RetryPolicy + if wfStartOpts.RetryPolicy != nil { + retryPolicy = &commonpb.RetryPolicy{ + MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval), + InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval), + BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient, + MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts, + NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes, + } + } + var memo *common.Memo + if wfStartOpts.Memo != nil { + fields, err := encodeMapToPayloads(wfStartOpts.Memo) + if err != nil { + return err + } + memo = &common.Memo{Fields: fields} + } + var searchAttr *common.SearchAttributes + if wfStartOpts.SearchAttributes != nil { + fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes) + if err != nil { + return err + } + searchAttr = &common.SearchAttributes{IndexedFields: fields} + } + + // We have to use the raw signal service call here because the Go SDK's + // signal-with-start call doesn't accept multiple signal arguments. + _, err = cl.WorkflowService().SignalWithStartWorkflowExecution( + cctx, + &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: c.Parent.Namespace, + RequestId: uuid.NewString(), + WorkflowId: c.WorkflowId, + WorkflowType: &common.WorkflowType{Name: c.Type}, + TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: wfInput, + WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout), + WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout), + WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout), + SignalName: c.SignalName, + SignalInput: signalInput, + Identity: clientIdentity(), + RetryPolicy: retryPolicy, + CronSchedule: wfStartOpts.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy, + }, + ) + return err +} + func (c *TemporalWorkflowStackCommand) run(cctx *CommandContext, args []string) error { return queryHelper(cctx, c.Parent, PayloadInputOptions{}, "__stack_trace", c.RejectCondition, c.WorkflowReferenceOptions) diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 81ae0721d..fca67b343 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -1241,3 +1241,103 @@ func (s *SharedServerSuite) testWorkflowMetadata(json bool) { s.Error(res.Err) s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed") } + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { + res := s.Execute( + "workflow", "signal-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--signal-name", "sigName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { + wfId := uuid.NewString() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"wf-signal-with-start": "workflow-input"}`, + "--task-queue", "tq", + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + + s.NoError(res.Err) + + // Check that new workflow was started with expected workflow ID. + s.Eventually(func() bool { + listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) + s.NoError(err) + return len(listResp.Executions) == 1 && listResp.Executions[0].Execution.GetWorkflowId() == wfId + }, 3*time.Second, 200*time.Millisecond) + + // Run workflow, block on signal. + var sigReceived any + s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Wait for workflow to complete. + var wfReturn any + err = s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) + + // Expect workflow to have received signal and given inputs. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { + // Run workflow, block on signal. + var sigReceived any + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Start workflow + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + + wfId := run.GetID() + + // Check that expected workflow exists. + s.Eventually(func() bool { + listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) + s.NoError(err) + return len(listResp.Executions) == 1 && listResp.Executions[0].Execution.GetWorkflowId() == wfId + }, 3*time.Second, 200*time.Millisecond) + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"workflow": "workflow-input"}`, + "--task-queue", s.Worker().Options.TaskQueue, + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + s.NoError(res.Err) + + // Wait for workflow to complete. + var ret any + s.NoError(run.Get(s.Context, &ret)) + + // Expect workflow to have received signal and given inputs. + s.Equal("not-signal-with-start-input", ret) + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + + // Check that signal-with-start did not start a new workflow. + listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) + s.NoError(err) + s.Equal(1, len(listResp.Executions)) + s.Equal(wfId, listResp.Executions[0].Execution.GetWorkflowId()) +} diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 0fc735682..ee52e7965 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3153,6 +3153,59 @@ commands: aliases: - type + - name: temporal workflow signal-with-start + summary: Send a message to a Workflow Execution, start the execution if it isn't running + description: | + Send an asynchronous notification (Signal) to a Workflow Execution. + If the Workflow Execution is not running or is not found, it starts the + workflow then sends the signal. + + ``` + temporal workflow signal-with-start \ + --signal-name YourSignal \ + --signal-input '{"some-key": "some-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + # workflow-id is "required" (runtime check) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: signal-name + type: string + description: Signal name. + required: true + aliases: + - signal-type + - name: signal-input + type: string[] + description: | + Signal input value. + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input-file. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-meta + type: string[] + description: | + Input signal payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: signal-input-base64 + type: bool + description: | + Assume signal inputs are base64-encoded and attempt to decode them. + - name: temporal workflow stack summary: Trace a Workflow Execution description: | @@ -3704,6 +3757,7 @@ option-sets: - Fail - UseExisting - TerminateExisting + - name: payload-input options: - name: input diff --git a/temporalcli/internal/cmd/gen-commands/main.go b/temporalcli/internal/cmd/gen-commands/main.go index 00df1848d..817ff2968 100644 --- a/temporalcli/internal/cmd/gen-commands/main.go +++ b/temporalcli/internal/cmd/gen-commands/main.go @@ -21,7 +21,7 @@ func run() error { _, file, _, _ := runtime.Caller(0) commandsDir := filepath.Join(file, "../../../../") - // Parse markdown + // Parse YAML cmds, err := commandsgen.ParseCommands() if err != nil { return fmt.Errorf("failed parsing markdown: %w", err) From 5c0a899675972f0422e99fc58ee3140d8dceff7c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 20 Feb 2025 17:01:23 -0500 Subject: [PATCH 2/4] fix tests, add warning --- temporalcli/commands.workflow.go | 4 ++++ temporalcli/commands.workflow_test.go | 27 ++++++--------------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 40a95d11d..98e8ce2c4 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -324,6 +324,10 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s searchAttr = &common.SearchAttributes{IndexedFields: fields} } + if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) { + cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command") + } + // We have to use the raw signal service call here because the Go SDK's // signal-with-start call doesn't accept multiple signal arguments. _, err = cl.WorkflowService().SignalWithStartWorkflowExecution( diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index fca67b343..69ed8acd5 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -1270,11 +1270,8 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { s.NoError(res.Err) // Check that new workflow was started with expected workflow ID. - s.Eventually(func() bool { - listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) - s.NoError(err) - return len(listResp.Executions) == 1 && listResp.Executions[0].Execution.GetWorkflowId() == wfId - }, 3*time.Second, 200*time.Millisecond) + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) // Run workflow, block on signal. var sigReceived any @@ -1285,10 +1282,10 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { // Wait for workflow to complete. var wfReturn any - err = s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) + err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) s.NoError(err) - // Expect workflow to have received signal and given inputs. + // Expect workflow to have received signal and given inputs from signal-with-start. s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) } @@ -1307,13 +1304,6 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { wfId := run.GetID() - // Check that expected workflow exists. - s.Eventually(func() bool { - listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) - s.NoError(err) - return len(listResp.Executions) == 1 && listResp.Executions[0].Execution.GetWorkflowId() == wfId - }, 3*time.Second, 200*time.Millisecond) - // Send signal-with-start command. res := s.Execute( "workflow", "signal-with-start", @@ -1331,13 +1321,8 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { var ret any s.NoError(run.Get(s.Context, &ret)) - // Expect workflow to have received signal and given inputs. + // Expect workflow to have not been started by the signal-with-start command. s.Equal("not-signal-with-start-input", ret) + // Expect signal to have been received with given input. s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) - - // Check that signal-with-start did not start a new workflow. - listResp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{}) - s.NoError(err) - s.Equal(1, len(listResp.Executions)) - s.Equal(wfId, listResp.Executions[0].Execution.GetWorkflowId()) } From 266730756ea657a90a1a69121eded575773c82dd Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 21 Feb 2025 10:24:57 -0500 Subject: [PATCH 3/4] Code movement, added workflow execution print --- temporalcli/commands.go | 19 ++++ temporalcli/commands.schedule.go | 18 ---- temporalcli/commands.workflow.go | 94 ----------------- temporalcli/commands.workflow_exec.go | 116 +++++++++++++++++++++ temporalcli/commands.workflow_exec_test.go | 101 ++++++++++++++++++ temporalcli/commands.workflow_test.go | 85 --------------- 6 files changed, 236 insertions(+), 197 deletions(-) diff --git a/temporalcli/commands.go b/temporalcli/commands.go index b9ba6322a..217f2f299 100644 --- a/temporalcli/commands.go +++ b/temporalcli/commands.go @@ -23,8 +23,10 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" "github.com/temporalio/ui-server/v2/server/version" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/failure/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" "go.temporal.io/server/common/headers" "google.golang.org/grpc" @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err Details: deets, }, nil } + +func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { + if len(in) == 0 { + return nil, nil + } + // search attributes always use default dataconverter + dc := converter.GetDefaultDataConverter() + out := make(map[string]*commonpb.Payload, len(in)) + for key, val := range in { + payload, err := dc.ToPayload(val) + if err != nil { + return nil, err + } + out[key] = payload + } + return out, nil +} diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index bd181b145..3d645cb2e 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -16,7 +16,6 @@ import ( schedpb "go.temporal.io/api/schedule/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" "go.temporal.io/server/common/primitives/timestamp" ) @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string { s = strings.TrimSpace(s) return s } - -func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { - if len(in) == 0 { - return nil, nil - } - // search attributes always use default dataconverter - dc := converter.GetDefaultDataConverter() - out := make(map[string]*commonpb.Payload, len(in)) - for key, val := range in { - payload, err := dc.ToPayload(val) - if err != nil { - return nil, err - } - out[key] = payload - } - return out, nil -} diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 98e8ce2c4..421fa354b 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -15,19 +15,15 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" - commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" "go.temporal.io/api/enums/v1" - enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/query/v1" sdkpb "go.temporal.io/api/sdk/v1" - taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/update/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -266,96 +262,6 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) return nil } -func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error { - if c.SharedWorkflowStartOptions.WorkflowId == "" { - return fmt.Errorf("--workflow-id flag must be provided") - } - - cl, err := c.Parent.ClientOptions.dialClient(cctx) - if err != nil { - return err - } - defer cl.Close() - - wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions) - if err != nil { - return err - } - wfInput, err := c.buildRawInputPayloads() - if err != nil { - return err - } - - signalPayloadInputOpts := PayloadInputOptions{ - Input: c.SignalInput, - InputFile: c.SignalInputFile, - InputMeta: c.InputMeta, - InputBase64: c.SignalInputBase64, - } - signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() - if err != nil { - return err - } - - var retryPolicy *common.RetryPolicy - if wfStartOpts.RetryPolicy != nil { - retryPolicy = &commonpb.RetryPolicy{ - MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval), - InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval), - BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient, - MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts, - NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes, - } - } - var memo *common.Memo - if wfStartOpts.Memo != nil { - fields, err := encodeMapToPayloads(wfStartOpts.Memo) - if err != nil { - return err - } - memo = &common.Memo{Fields: fields} - } - var searchAttr *common.SearchAttributes - if wfStartOpts.SearchAttributes != nil { - fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes) - if err != nil { - return err - } - searchAttr = &common.SearchAttributes{IndexedFields: fields} - } - - if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) { - cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command") - } - - // We have to use the raw signal service call here because the Go SDK's - // signal-with-start call doesn't accept multiple signal arguments. - _, err = cl.WorkflowService().SignalWithStartWorkflowExecution( - cctx, - &workflowservice.SignalWithStartWorkflowExecutionRequest{ - Namespace: c.Parent.Namespace, - RequestId: uuid.NewString(), - WorkflowId: c.WorkflowId, - WorkflowType: &common.WorkflowType{Name: c.Type}, - TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, - Input: wfInput, - WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout), - WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout), - WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout), - SignalName: c.SignalName, - SignalInput: signalInput, - Identity: clientIdentity(), - RetryPolicy: retryPolicy, - CronSchedule: wfStartOpts.CronSchedule, - Memo: memo, - SearchAttributes: searchAttr, - WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy, - WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy, - }, - ) - return err -} - func (c *TemporalWorkflowStackCommand) run(cctx *CommandContext, args []string) error { return queryHelper(cctx, c.Parent, PayloadInputOptions{}, "__stack_trace", c.RejectCondition, c.WorkflowReferenceOptions) diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index b885e4cac..2caf7f9ba 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -12,12 +12,18 @@ import ( "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "google.golang.org/protobuf/types/known/durationpb" ) func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error { @@ -92,6 +98,116 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string return err } +func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error { + if c.SharedWorkflowStartOptions.WorkflowId == "" { + return fmt.Errorf("--workflow-id flag must be provided") + } + + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions) + if err != nil { + return err + } + wfInput, err := c.buildRawInputPayloads() + if err != nil { + return err + } + + signalPayloadInputOpts := PayloadInputOptions{ + Input: c.SignalInput, + InputFile: c.SignalInputFile, + InputMeta: c.InputMeta, + InputBase64: c.SignalInputBase64, + } + signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() + if err != nil { + return err + } + + var retryPolicy *common.RetryPolicy + if wfStartOpts.RetryPolicy != nil { + retryPolicy = &commonpb.RetryPolicy{ + MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval), + InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval), + BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient, + MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts, + NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes, + } + } + var memo *common.Memo + if wfStartOpts.Memo != nil { + fields, err := encodeMapToPayloads(wfStartOpts.Memo) + if err != nil { + return err + } + memo = &common.Memo{Fields: fields} + } + var searchAttr *common.SearchAttributes + if wfStartOpts.SearchAttributes != nil { + fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes) + if err != nil { + return err + } + searchAttr = &common.SearchAttributes{IndexedFields: fields} + } + + if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) { + cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command") + } + + // We have to use the raw signal service call here because the Go SDK's + // signal-with-start call doesn't accept multiple signal arguments. + resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution( + cctx, + &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: c.Parent.Namespace, + RequestId: uuid.NewString(), + WorkflowId: c.WorkflowId, + WorkflowType: &common.WorkflowType{Name: c.Type}, + TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: wfInput, + WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout), + WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout), + WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout), + SignalName: c.SignalName, + SignalInput: signalInput, + Identity: clientIdentity(), + RetryPolicy: retryPolicy, + CronSchedule: wfStartOpts.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy, + }, + ) + if err != nil { + return err + } + cctx.Printer.Println(color.MagentaString("Running execution:")) + err = cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + TaskQueue string `json:"taskQueue"` + }{ + WorkflowId: c.WorkflowId, + RunId: resp.RunId, + Type: c.Type, + Namespace: c.Parent.Namespace, + TaskQueue: c.TaskQueue, + }, printer.StructuredOptions{}) + if err != nil { + return fmt.Errorf("failed printing: %w", err) + } + return nil +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index 88a0a6d70..e706d9465 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -664,3 +664,104 @@ func (s *SharedServerSuite) TestWorkflow_Execute_NullValue() { s.ContainsOnSameLine(out, "Status", "COMPLETED") s.ContainsOnSameLine(out, "Result", `{"foo":null}`) } + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { + res := s.Execute( + "workflow", "signal-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--signal-name", "sigName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { + wfId := uuid.NewString() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"wf-signal-with-start": "workflow-input"}`, + "--task-queue", "tq", + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", "tq") + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Check that new workflow was started with expected workflow ID. + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + + // Run workflow, block on signal. + var sigReceived any + s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Wait for workflow to complete. + var wfReturn any + err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) + + // Expect workflow to have received signal and given inputs from signal-with-start. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { + // Run workflow, block on signal. + var sigReceived any + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Start workflow + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + + wfId := run.GetID() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"workflow": "workflow-input"}`, + "--task-queue", s.Worker().Options.TaskQueue, + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Wait for workflow to complete. + var ret any + s.NoError(run.Get(s.Context, &ret)) + + // Expect workflow to have not been started by the signal-with-start command. + s.Equal("not-signal-with-start-input", ret) + // Expect signal to have been received with given input. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) +} diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 69ed8acd5..81ae0721d 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -1241,88 +1241,3 @@ func (s *SharedServerSuite) testWorkflowMetadata(json bool) { s.Error(res.Err) s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed") } - -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { - res := s.Execute( - "workflow", "signal-with-start", - "--type", "wfType", - "--task-queue", "tq", - "--signal-name", "sigName", - ) - s.ErrorContains(res.Err, "--workflow-id flag must be provided") -} - -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { - wfId := uuid.NewString() - - // Send signal-with-start command. - res := s.Execute( - "workflow", "signal-with-start", - "--address", s.Address(), - "--workflow-id", wfId, - "--type", "DevWorkflow", - "--input", `{"wf-signal-with-start": "workflow-input"}`, - "--task-queue", "tq", - "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, - ) - - s.NoError(res.Err) - - // Check that new workflow was started with expected workflow ID. - run := s.Client.GetWorkflow(s.Context, wfId, "") - s.Equal(wfId, run.GetID()) - - // Run workflow, block on signal. - var sigReceived any - s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil - }) - - // Wait for workflow to complete. - var wfReturn any - err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) - s.NoError(err) - - // Expect workflow to have received signal and given inputs from signal-with-start. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) - s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) -} - -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { - // Run workflow, block on signal. - var sigReceived any - s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil - }) - - // Start workflow - run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") - s.NoError(err) - - wfId := run.GetID() - - // Send signal-with-start command. - res := s.Execute( - "workflow", "signal-with-start", - "--address", s.Address(), - "--workflow-id", wfId, - "--type", "DevWorkflow", - "--input", `{"workflow": "workflow-input"}`, - "--task-queue", s.Worker().Options.TaskQueue, - "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, - ) - s.NoError(res.Err) - - // Wait for workflow to complete. - var ret any - s.NoError(run.Get(s.Context, &ret)) - - // Expect workflow to have not been started by the signal-with-start command. - s.Equal("not-signal-with-start-input", ret) - // Expect signal to have been received with given input. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) -} From fa0ea380ceefca21cac5b8d5db6dea9c36d9f686 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 21 Feb 2025 12:00:37 -0500 Subject: [PATCH 4/4] remove error wrapping --- temporalcli/commands.workflow_exec.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index 2caf7f9ba..08393a7d9 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -189,7 +189,7 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s return err } cctx.Printer.Println(color.MagentaString("Running execution:")) - err = cctx.Printer.PrintStructured(struct { + return cctx.Printer.PrintStructured(struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` Type string `json:"type"` @@ -202,10 +202,6 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s Namespace: c.Parent.Namespace, TaskQueue: c.TaskQueue, }, printer.StructuredOptions{}) - if err != nil { - return fmt.Errorf("failed printing: %w", err) - } - return nil } type workflowJSONResult struct {