Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2704,6 +2704,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
Expand Down Expand Up @@ -3133,6 +3134,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
return &s
}

type TemporalWorkflowSignalWithStartCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
SignalName string
SignalInput []string
SignalInputFile []string
SignalInputMeta []string
SignalInputBase64 bool
}

func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand {
var s TemporalWorkflowSignalWithStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "signal-with-start [flags]"
s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running"
if hasHighlighting {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name")
s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.")
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
"signal-type": "signal-name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowStackCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down
19 changes: 19 additions & 0 deletions temporalcli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/temporalio/cli/temporalcli/internal/printer"
"github.com/temporalio/ui-server/v2/server/version"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/common/headers"
"google.golang.org/grpc"
Expand Down Expand Up @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err
Details: deets,
}, nil
}

func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
20 changes: 1 addition & 19 deletions temporalcli/commands.schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/server/common/primitives/timestamp"
)

Expand Down Expand Up @@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c
if err != nil {
return nil, err
}
untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes)
untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string {
s = strings.TrimSpace(s)
return s
}

func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
112 changes: 112 additions & 0 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ import (
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/types/known/durationpb"
)

func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
return err
}

func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error {
if c.SharedWorkflowStartOptions.WorkflowId == "" {
return fmt.Errorf("--workflow-id flag must be provided")
}

cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions)
if err != nil {
return err
}
wfInput, err := c.buildRawInputPayloads()
if err != nil {
return err
}

signalPayloadInputOpts := PayloadInputOptions{
Input: c.SignalInput,
InputFile: c.SignalInputFile,
InputMeta: c.InputMeta,
InputBase64: c.SignalInputBase64,
}
signalInput, err := signalPayloadInputOpts.buildRawInputPayloads()
if err != nil {
return err
}

var retryPolicy *common.RetryPolicy
if wfStartOpts.RetryPolicy != nil {
retryPolicy = &commonpb.RetryPolicy{
MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval),
InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval),
BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient,
MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts,
NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes,
}
}
var memo *common.Memo
if wfStartOpts.Memo != nil {
fields, err := encodeMapToPayloads(wfStartOpts.Memo)
if err != nil {
return err
}
memo = &common.Memo{Fields: fields}
}
var searchAttr *common.SearchAttributes
if wfStartOpts.SearchAttributes != nil {
fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes)
if err != nil {
return err
}
searchAttr = &common.SearchAttributes{IndexedFields: fields}
}

if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) {
cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command")
}

// We have to use the raw signal service call here because the Go SDK's
// signal-with-start call doesn't accept multiple signal arguments.
resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution(
cctx,
&workflowservice.SignalWithStartWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
RequestId: uuid.NewString(),
WorkflowId: c.WorkflowId,
WorkflowType: &common.WorkflowType{Name: c.Type},
TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: wfInput,
WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout),
WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout),
WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout),
SignalName: c.SignalName,
SignalInput: signalInput,
Identity: clientIdentity(),
RetryPolicy: retryPolicy,
CronSchedule: wfStartOpts.CronSchedule,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy,
},
)
if err != nil {
return err
}
cctx.Printer.Println(color.MagentaString("Running execution:"))
return cctx.Printer.PrintStructured(struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
TaskQueue string `json:"taskQueue"`
}{
WorkflowId: c.WorkflowId,
RunId: resp.RunId,
Type: c.Type,
Namespace: c.Parent.Namespace,
TaskQueue: c.TaskQueue,
}, printer.StructuredOptions{})
}

type workflowJSONResult struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Expand Down
101 changes: 101 additions & 0 deletions temporalcli/commands.workflow_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,104 @@ func (s *SharedServerSuite) TestWorkflow_Execute_NullValue() {
s.ContainsOnSameLine(out, "Status", "COMPLETED")
s.ContainsOnSameLine(out, "Result", `{"foo":null}`)
}

func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() {
res := s.Execute(
"workflow", "signal-with-start",
"--type", "wfType",
"--task-queue", "tq",
"--signal-name", "sigName",
)
s.ErrorContains(res.Err, "--workflow-id flag must be provided")
}

func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() {
wfId := uuid.NewString()

// Send signal-with-start command.
res := s.Execute(
"workflow", "signal-with-start",
"--address", s.Address(),
"--workflow-id", wfId,
"--type", "DevWorkflow",
"--input", `{"wf-signal-with-start": "workflow-input"}`,
"--task-queue", "tq",
"--signal-name", "sigName",
"--signal-input", `{"signal-with-start": "signal-input"}`,
)

s.NoError(res.Err)

// Confirm text output has key/vals as expected
out := res.Stdout.String()
s.ContainsOnSameLine(out, "WorkflowId", wfId)
s.Contains(out, "RunId")
s.ContainsOnSameLine(out, "TaskQueue", "tq")
s.ContainsOnSameLine(out, "Type", "DevWorkflow")
s.ContainsOnSameLine(out, "Namespace", "default")

// Check that new workflow was started with expected workflow ID.
run := s.Client.GetWorkflow(s.Context, wfId, "")
s.Equal(wfId, run.GetID())

// Run workflow, block on signal.
var sigReceived any
s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) {
workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived)
return wfInput, nil
})

// Wait for workflow to complete.
var wfReturn any
err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn)
s.NoError(err)

// Expect workflow to have received signal and given inputs from signal-with-start.
s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived)
s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn)
}

func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() {
// Run workflow, block on signal.
var sigReceived any
s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) {
workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived)
return wfInput, nil
})

// Start workflow
run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input")
s.NoError(err)

wfId := run.GetID()

// Send signal-with-start command.
res := s.Execute(
"workflow", "signal-with-start",
"--address", s.Address(),
"--workflow-id", wfId,
"--type", "DevWorkflow",
"--input", `{"workflow": "workflow-input"}`,
"--task-queue", s.Worker().Options.TaskQueue,
"--signal-name", "sigName",
"--signal-input", `{"signal-with-start": "signal-input"}`,
)
s.NoError(res.Err)

// Confirm text output has key/vals as expected
out := res.Stdout.String()
s.ContainsOnSameLine(out, "WorkflowId", wfId)
s.Contains(out, "RunId")
s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue)
s.ContainsOnSameLine(out, "Type", "DevWorkflow")
s.ContainsOnSameLine(out, "Namespace", "default")

// Wait for workflow to complete.
var ret any
s.NoError(run.Get(s.Context, &ret))

// Expect workflow to have not been started by the signal-with-start command.
s.Equal("not-signal-with-start-input", ret)
// Expect signal to have been received with given input.
s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived)
}
Loading