diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 044143ad6..4fcfd717b 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -2704,6 +2704,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) @@ -3133,6 +3134,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork return &s } +type TemporalWorkflowSignalWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + SignalName string + SignalInput []string + SignalInputFile []string + SignalInputMeta []string + SignalInputBase64 bool +} + +func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand { + var s TemporalWorkflowSignalWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "signal-with-start [flags]" + s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running" + if hasHighlighting { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name") + s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "signal-type": "signal-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowStackCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command diff --git a/temporalcli/commands.go b/temporalcli/commands.go index b9ba6322a..217f2f299 100644 --- a/temporalcli/commands.go +++ b/temporalcli/commands.go @@ -23,8 +23,10 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" "github.com/temporalio/ui-server/v2/server/version" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/failure/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" "go.temporal.io/server/common/headers" "google.golang.org/grpc" @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err Details: deets, }, nil } + +func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { + if len(in) == 0 { + return nil, nil + } + // search attributes always use default dataconverter + dc := converter.GetDefaultDataConverter() + out := make(map[string]*commonpb.Payload, len(in)) + for key, val := range in { + payload, err := dc.ToPayload(val) + if err != nil { + return nil, err + } + out[key] = payload + } + return out, nil +} diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index e458ba918..3d645cb2e 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -16,7 +16,6 @@ import ( schedpb "go.temporal.io/api/schedule/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" "go.temporal.io/server/common/primitives/timestamp" ) @@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c if err != nil { return nil, err } - untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes) + untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes) if err != nil { return nil, err } @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string { s = strings.TrimSpace(s) return s } - -func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { - if len(in) == 0 { - return nil, nil - } - // search attributes always use default dataconverter - dc := converter.GetDefaultDataConverter() - out := make(map[string]*commonpb.Payload, len(in)) - for key, val := range in { - payload, err := dc.ToPayload(val) - if err != nil { - return nil, err - } - out[key] = payload - } - return out, nil -} diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index b885e4cac..08393a7d9 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -12,12 +12,18 @@ import ( "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "google.golang.org/protobuf/types/known/durationpb" ) func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error { @@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string return err } +func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error { + if c.SharedWorkflowStartOptions.WorkflowId == "" { + return fmt.Errorf("--workflow-id flag must be provided") + } + + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions) + if err != nil { + return err + } + wfInput, err := c.buildRawInputPayloads() + if err != nil { + return err + } + + signalPayloadInputOpts := PayloadInputOptions{ + Input: c.SignalInput, + InputFile: c.SignalInputFile, + InputMeta: c.InputMeta, + InputBase64: c.SignalInputBase64, + } + signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() + if err != nil { + return err + } + + var retryPolicy *common.RetryPolicy + if wfStartOpts.RetryPolicy != nil { + retryPolicy = &commonpb.RetryPolicy{ + MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval), + InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval), + BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient, + MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts, + NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes, + } + } + var memo *common.Memo + if wfStartOpts.Memo != nil { + fields, err := encodeMapToPayloads(wfStartOpts.Memo) + if err != nil { + return err + } + memo = &common.Memo{Fields: fields} + } + var searchAttr *common.SearchAttributes + if wfStartOpts.SearchAttributes != nil { + fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes) + if err != nil { + return err + } + searchAttr = &common.SearchAttributes{IndexedFields: fields} + } + + if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) { + cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command") + } + + // We have to use the raw signal service call here because the Go SDK's + // signal-with-start call doesn't accept multiple signal arguments. + resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution( + cctx, + &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: c.Parent.Namespace, + RequestId: uuid.NewString(), + WorkflowId: c.WorkflowId, + WorkflowType: &common.WorkflowType{Name: c.Type}, + TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: wfInput, + WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout), + WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout), + WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout), + SignalName: c.SignalName, + SignalInput: signalInput, + Identity: clientIdentity(), + RetryPolicy: retryPolicy, + CronSchedule: wfStartOpts.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy, + }, + ) + if err != nil { + return err + } + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + TaskQueue string `json:"taskQueue"` + }{ + WorkflowId: c.WorkflowId, + RunId: resp.RunId, + Type: c.Type, + Namespace: c.Parent.Namespace, + TaskQueue: c.TaskQueue, + }, printer.StructuredOptions{}) +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index 88a0a6d70..e706d9465 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -664,3 +664,104 @@ func (s *SharedServerSuite) TestWorkflow_Execute_NullValue() { s.ContainsOnSameLine(out, "Status", "COMPLETED") s.ContainsOnSameLine(out, "Result", `{"foo":null}`) } + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { + res := s.Execute( + "workflow", "signal-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--signal-name", "sigName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { + wfId := uuid.NewString() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"wf-signal-with-start": "workflow-input"}`, + "--task-queue", "tq", + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", "tq") + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Check that new workflow was started with expected workflow ID. + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + + // Run workflow, block on signal. + var sigReceived any + s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Wait for workflow to complete. + var wfReturn any + err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) + + // Expect workflow to have received signal and given inputs from signal-with-start. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { + // Run workflow, block on signal. + var sigReceived any + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Start workflow + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + + wfId := run.GetID() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"workflow": "workflow-input"}`, + "--task-queue", s.Worker().Options.TaskQueue, + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Wait for workflow to complete. + var ret any + s.NoError(run.Get(s.Context, &ret)) + + // Expect workflow to have not been started by the signal-with-start command. + s.Equal("not-signal-with-start-input", ret) + // Expect signal to have been received with given input. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) +} diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 7a8878e3d..19edcf4f1 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3156,6 +3156,59 @@ commands: aliases: - type + - name: temporal workflow signal-with-start + summary: Send a message to a Workflow Execution, start the execution if it isn't running + description: | + Send an asynchronous notification (Signal) to a Workflow Execution. + If the Workflow Execution is not running or is not found, it starts the + workflow then sends the signal. + + ``` + temporal workflow signal-with-start \ + --signal-name YourSignal \ + --signal-input '{"some-key": "some-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + # workflow-id is "required" (runtime check) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: signal-name + type: string + description: Signal name. + required: true + aliases: + - signal-type + - name: signal-input + type: string[] + description: | + Signal input value. + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input-file. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-meta + type: string[] + description: | + Input signal payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: signal-input-base64 + type: bool + description: | + Assume signal inputs are base64-encoded and attempt to decode them. + - name: temporal workflow stack summary: Trace a Workflow Execution description: | @@ -3707,6 +3760,7 @@ option-sets: - Fail - UseExisting - TerminateExisting + - name: payload-input options: - name: input diff --git a/temporalcli/internal/cmd/gen-commands/main.go b/temporalcli/internal/cmd/gen-commands/main.go index 00df1848d..817ff2968 100644 --- a/temporalcli/internal/cmd/gen-commands/main.go +++ b/temporalcli/internal/cmd/gen-commands/main.go @@ -21,7 +21,7 @@ func run() error { _, file, _, _ := runtime.Caller(0) commandsDir := filepath.Join(file, "../../../../") - // Parse markdown + // Parse YAML cmds, err := commandsgen.ParseCommands() if err != nil { return fmt.Errorf("failed parsing markdown: %w", err)