diff --git a/go.mod b/go.mod index e28b3ea44..02684e571 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/temporalio/cli go 1.23.2 require ( - github.com/alitto/pond v1.9.1 + github.com/alitto/pond v1.9.2 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.18.0 @@ -12,14 +12,14 @@ require ( github.com/nexus-rpc/sdk-go v0.2.0 github.com/olekukonko/tablewriter v0.0.5 github.com/spf13/cobra v1.8.1 - github.com/spf13/pflag v1.0.5 + github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 - github.com/temporalio/ui-server/v2 v2.32.0 + github.com/temporalio/ui-server/v2 v2.35.0 go.temporal.io/api v1.44.1 go.temporal.io/sdk v1.32.1 - go.temporal.io/server v1.27.0-128.0 + go.temporal.io/server v1.27.0-128.4 google.golang.org/grpc v1.70.0 - google.golang.org/protobuf v1.36.4 + google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 ) @@ -53,7 +53,7 @@ require ( github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-jose/go-jose/v4 v4.0.2 // indirect + github.com/go-jose/go-jose/v4 v4.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect @@ -68,7 +68,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/gorilla/mux v1.8.1 // indirect - github.com/gorilla/securecookie v1.1.1 // indirect + github.com/gorilla/securecookie v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect diff --git a/go.sum b/go.sum index d0b6f8134..4152bc815 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/alitto/pond v1.9.1 h1:OfCpIrMyrWJpn34f647DcFmUxjK8+7Nu3eoVN/WTP+o= -github.com/alitto/pond v1.9.1/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= +github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs= +github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= @@ -112,8 +112,8 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/go-faker/faker/v4 v4.5.0 h1:ARzAY2XoOL9tOUK+KSecUQzyXQsUaZHefjyF8x6YFHc= github.com/go-faker/faker/v4 v4.5.0/go.mod h1:p3oq1GRjG2PZ7yqeFFfQI20Xm61DoBDlCA8RiSyZ48M= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= -github.com/go-jose/go-jose/v4 v4.0.2 h1:R3l3kkBds16bO7ZFAEEcofK0MkrAJt3jlJznWZG0nvk= -github.com/go-jose/go-jose/v4 v4.0.2/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY= +github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= +github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -152,6 +152,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= @@ -168,8 +170,8 @@ github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrk github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= -github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA= +github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg= @@ -295,8 +297,9 @@ github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= @@ -315,8 +318,8 @@ github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb/go.mod h1:143 github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938 h1:sEJGhmDo+0FaPWM6f0v8Tjia0H5pR6/Baj6+kS78B+M= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938/go.mod h1:ezRQRwu9KQXy8Wuuv1aaFFxoCNz5CeNbVOOkh3xctbY= -github.com/temporalio/ui-server/v2 v2.32.0 h1:mR6eet9n4eRkGgHcZqaJdXWK5sfQguN4LoWxQXsqpY0= -github.com/temporalio/ui-server/v2 v2.32.0/go.mod h1:b8whRt0/lbgNDzG7alSdiDzXFO8Fk783eRMhIycWtn8= +github.com/temporalio/ui-server/v2 v2.35.0 h1:9msJGma2dEiwI+nI3iU7TfpnmgRCQv+R8Yd8JWqurC4= +github.com/temporalio/ui-server/v2 v2.35.0/go.mod h1:suM9z9+8208achw/6ZWd3/mF4k9x567mIBAg10S5lf4= github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber-common/bark v1.0.0/go.mod h1:g0ZuPcD7XiExKHynr93Q742G/sbrdVQkghrqLGOoFuY= @@ -371,8 +374,8 @@ go.temporal.io/api v1.44.1 h1:sb5Hq08AB0WtYvfLJMiWmHzxjqs2b+6Jmzg4c8IOeng= go.temporal.io/api v1.44.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.32.1 h1:slA8prhdFr4lxpsTcRusWVitD/cGjELfKUh0mBj73SU= go.temporal.io/sdk v1.32.1/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8= -go.temporal.io/server v1.27.0-128.0 h1:rGQGzr8lOCDhf5Z6jnbv5LRSZa3r1IyvPf1d2ywrDhY= -go.temporal.io/server v1.27.0-128.0/go.mod h1:YgN/yuBArvm7q5VEk2SXY+cGTTvDbt5AyH34DvEd3so= +go.temporal.io/server v1.27.0-128.4 h1:K3N4AMhEot0/E4eXA7hnYu+D/vC4UTF2cilVbxKqd8o= +go.temporal.io/server v1.27.0-128.4/go.mod h1:YgN/yuBArvm7q5VEk2SXY+cGTTvDbt5AyH34DvEd3so= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -549,8 +552,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= -google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= -google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 9928d645d..12a724051 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -131,6 +131,18 @@ func (v *WorkflowReferenceOptions) buildFlags(cctx *CommandContext, f *pflag.Fla f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID.") } +type DeploymentReferenceOptions struct { + SeriesName string + BuildId string +} + +func (v *DeploymentReferenceOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { + f.StringVar(&v.SeriesName, "series-name", "", "Series Name for a Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(f, "series-name") + f.StringVar(&v.BuildId, "build-id", "", "Build ID for a Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(f, "build-id") +} + type SingleWorkflowOrBatchOptions struct { WorkflowId string Query string @@ -266,6 +278,15 @@ func (v *NexusEndpointConfigOptions) buildFlags(cctx *CommandContext, f *pflag.F f.StringVar(&v.TargetUrl, "target-url", "", "An external Nexus Endpoint that receives forwarded Nexus requests. May be used as an alternative to `--target-namespace` and `--target-task-queue`.") } +type QueryModifiersOptions struct { + RejectCondition StringEnum +} + +func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { + v.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "") + f.Var(&v.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.") +} + type TemporalCommand struct { Command cobra.Command Env string @@ -296,6 +317,7 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand { s.Command.AddCommand(&NewTemporalScheduleCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalServerCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalTaskQueueCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowCommand(cctx, &s).Command) s.Command.PersistentFlags().StringVar(&s.Env, "env", "default", "Active environment name (`ENV`).") cctx.BindFlagEnvVar(s.Command.PersistentFlags().Lookup("env"), "TEMPORAL_ENV") @@ -329,9 +351,9 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.Use = "activity" s.Command.Short = "Complete, update, pause, unpause, reset or fail an Activity" if hasHighlighting { - s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m" + s.Command.Long = "Update an Activity's options, manage activity lifecycle or update\nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m" } else { - s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```" + s.Command.Long = "Update an Activity's options, manage activity lifecycle or update\nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```" } s.Command.Args = cobra.NoArgs s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command) @@ -551,28 +573,28 @@ func NewTemporalActivityUpdateOptionsCommand(cctx *CommandContext, parent *Tempo s.Command.Use = "update-options [flags]" s.Command.Short = "Update Activity options" if hasHighlighting { - s.Command.Long = "Update Activity options. Specify the Activity and Workflow IDs, and \noptions you want to update. \nUpdates are incremental, only changing the specified options. \n\n\x1b[1mtemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\x1b[0m" + s.Command.Long = "Update Activity options. Specify the Activity and Workflow IDs, and\noptions you want to update.\nUpdates are incremental, only changing the specified options.\n\n\x1b[1mtemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\x1b[0m" } else { - s.Command.Long = "Update Activity options. Specify the Activity and Workflow IDs, and \noptions you want to update. \nUpdates are incremental, only changing the specified options. \n\n```\ntemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\n\n```" + s.Command.Long = "Update Activity options. Specify the Activity and Workflow IDs, and\noptions you want to update.\nUpdates are incremental, only changing the specified options.\n\n```\ntemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\n\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "Activity ID. Required.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") s.Command.Flags().StringVar(&s.TaskQueue, "task-queue", "", "Name of the task queue for the Activity.") s.ScheduleToCloseTimeout = 0 - s.Command.Flags().Var(&s.ScheduleToCloseTimeout, "schedule-to-close-timeout", "Indicates how long the caller is willing to wait for an activity completion. Limits how long retries will be attempted.") + s.Command.Flags().Var(&s.ScheduleToCloseTimeout, "schedule-to-close-timeout", "Indicates how long the caller is willing to wait for an activity completion. Limits how long retries will be attempted.") s.ScheduleToStartTimeout = 0 - s.Command.Flags().Var(&s.ScheduleToStartTimeout, "schedule-to-start-timeout", "Limits time an activity task can stay in a task queue before a worker picks it up. This timeout is always non retryable, as all a retry would achieve is to put it back into the same queue. Defaults to the schedule-to-close timeout or workflow execution timeout if not specified.") + s.Command.Flags().Var(&s.ScheduleToStartTimeout, "schedule-to-start-timeout", "Limits time an activity task can stay in a task queue before a worker picks it up. This timeout is always non retryable, as all a retry would achieve is to put it back into the same queue. Defaults to the schedule-to-close timeout or workflow execution timeout if not specified.") s.StartToCloseTimeout = 0 - s.Command.Flags().Var(&s.StartToCloseTimeout, "start-to-close-timeout", "Maximum time an activity is allowed to execute after being picked up by a worker. This timeout is always retryable.") + s.Command.Flags().Var(&s.StartToCloseTimeout, "start-to-close-timeout", "Maximum time an activity is allowed to execute after being picked up by a worker. This timeout is always retryable.") s.HeartbeatTimeout = 0 s.Command.Flags().Var(&s.HeartbeatTimeout, "heartbeat-timeout", "Maximum permitted time between successful worker heartbeats.") s.RetryInitialInterval = 0 s.Command.Flags().Var(&s.RetryInitialInterval, "retry-initial-interval", "Interval of the first retry. If retryBackoffCoefficient is 1.0 then it is used for all retries.") s.RetryMaximumInterval = 0 - s.Command.Flags().Var(&s.RetryMaximumInterval, "retry-maximum-interval", "Maximum interval between retries. Exponential backoff leads to interval increase. This value is the cap of the increase.") + s.Command.Flags().Var(&s.RetryMaximumInterval, "retry-maximum-interval", "Maximum interval between retries. Exponential backoff leads to interval increase. This value is the cap of the increase.") s.Command.Flags().Float32Var(&s.RetryBackoffCoefficient, "retry-backoff-coefficient", 0, "Coefficient used to calculate the next retry interval. The next retry interval is previous interval multiplied by the backoff coefficient. Must be 1 or larger.") - s.Command.Flags().IntVar(&s.RetryMaximumAttempts, "retry-maximum-attempts", 0, "Maximum number of attempts. When exceeded the retries stop even if not expired yet. Setting this value to 1 disables retries. Setting this value to 0 means unlimited attempts(up to the timeouts).") + s.Command.Flags().IntVar(&s.RetryMaximumAttempts, "retry-maximum-attempts", 0, "Maximum number of attempts. When exceeded the retries stop even if not expired yet. Setting this value to 1 disables retries. Setting this value to 0 means unlimited attempts(up to the timeouts).") s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.") s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { @@ -2498,6 +2520,164 @@ func NewTemporalTaskQueueVersioningReplaceRedirectRuleCommand(cctx *CommandConte return &s } +type TemporalWorkerCommand struct { + Parent *TemporalCommand + Command cobra.Command + ClientOptions +} + +func NewTemporalWorkerCommand(cctx *CommandContext, parent *TemporalCommand) *TemporalWorkerCommand { + var s TemporalWorkerCommand + s.Parent = parent + s.Command.Use = "worker" + s.Command.Short = "Read or update Worker state" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker is experimental. Worker commands are subject to |\n| change. |\n+---------------------------------------------------------------------+\n\nModify or read state associated with a Worker, for example,\nusing Worker Deployments commands:\n\n\x1b[1mtemporal worker deployment\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker is experimental. Worker commands are subject to |\n| change. |\n+---------------------------------------------------------------------+\n\nModify or read state associated with a Worker, for example,\nusing Worker Deployments commands:\n\n```\ntemporal worker deployment\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalWorkerDeploymentCommand(cctx, &s).Command) + s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) + return &s +} + +type TemporalWorkerDeploymentCommand struct { + Parent *TemporalWorkerCommand + Command cobra.Command +} + +func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWorkerCommand) *TemporalWorkerDeploymentCommand { + var s TemporalWorkerDeploymentCommand + s.Parent = parent + s.Command.Use = "deployment" + s.Command.Short = "Describe, list, and operate on Worker Deployments" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDeployment commands perform operations on Worker Deployments:\n\n\x1b[1mtemporal worker deployment [command] [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment list\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDeployment commands perform operations on Worker Deployments:\n\n```\ntemporal worker deployment [command] [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment list\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalWorkerDeploymentDescribeCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentGetCurrentCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentListCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentSetCurrentCommand(cctx, &s).Command) + return &s +} + +type TemporalWorkerDeploymentDescribeCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentReferenceOptions + ReportReachability bool +} + +func NewTemporalWorkerDeploymentDescribeCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentDescribeCommand { + var s TemporalWorkerDeploymentDescribeCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "describe [flags]" + s.Command.Short = "Show properties of a Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDescribes properties of a Worker Deployment, such as whether it is\ncurrent, the non-empty list of its task queues, custom metadata if\npresent, and reachability status when requested.\n\n\x1b[1mtemporal worker deployment describe [options]\x1b[0m\n\nFor example, to also include reachability information:\n\n\x1b[1mtemporal worker deployment describe \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId \\\n --report-reachability\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDescribes properties of a Worker Deployment, such as whether it is\ncurrent, the non-empty list of its task queues, custom metadata if\npresent, and reachability status when requested.\n\n```\ntemporal worker deployment describe [options]\n```\n\nFor example, to also include reachability information:\n\n```\ntemporal worker deployment describe \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId \\\n --report-reachability\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().BoolVar(&s.ReportReachability, "report-reachability", false, "Include reachability information of a Worker Deployment.") + s.DeploymentReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentGetCurrentCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + SeriesName string +} + +func NewTemporalWorkerDeploymentGetCurrentCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentGetCurrentCommand { + var s TemporalWorkerDeploymentGetCurrentCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "get-current [flags]" + s.Command.Short = "Show the current Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nGets the current Worker Deployment for a Deployment Series Name.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n\x1b[1mtemporal worker deployment get-current [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment get-current \\\n --series-name YourDeploymentSeriesName\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nGets the current Worker Deployment for a Deployment Series Name.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n```\ntemporal worker deployment get-current [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment get-current \\\n --series-name YourDeploymentSeriesName\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SeriesName, "series-name", "", "Series Name for the current Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "series-name") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentListCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + SeriesName string +} + +func NewTemporalWorkerDeploymentListCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentListCommand { + var s TemporalWorkerDeploymentListCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "list [flags]" + s.Command.Short = "Enumerate Worker Deployments in the client's namespace" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nList existing Worker Deployments in the client's namespace, optionally\nfiltering them by Deployment Series Name.\n\n\n\x1b[1mtemporal worker deployment list [options]\x1b[0m\n\nFor example, adding an optional filter:\n\n\x1b[1mtemporal worker deployment list \\\n --series-name YourDeploymentSeriesName\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nList existing Worker Deployments in the client's namespace, optionally\nfiltering them by Deployment Series Name.\n\n\n```\ntemporal worker deployment list [options]\n```\n\nFor example, adding an optional filter:\n\n```\ntemporal worker deployment list \\\n --series-name YourDeploymentSeriesName\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SeriesName, "series-name", "", "Series Name to filter Worker Deployments.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentSetCurrentCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentReferenceOptions + Metadata []string +} + +func NewTemporalWorkerDeploymentSetCurrentCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentSetCurrentCommand { + var s TemporalWorkerDeploymentSetCurrentCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "set-current [flags]" + s.Command.Short = "Change the current Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nSets the current Deployment for a given Deployment Series.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n\x1b[1mtemporal worker deployment set-current [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment set-current \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nSets the current Deployment for a given Deployment Series.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n```\ntemporal worker deployment set-current [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment set-current \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringArrayVar(&s.Metadata, "metadata", nil, "Set deployment metadata using `KEY=\"VALUE\"` pairs. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.") + s.DeploymentReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowCommand struct { Parent *TemporalCommand Command cobra.Command @@ -2522,6 +2702,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowExecuteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowFixHistoryJsonCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowMetadataCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowQueryCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowResetCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command) @@ -2532,6 +2713,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowUpdateOptionsCommand(cctx, &s).Command) s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) return &s } @@ -2745,13 +2927,42 @@ func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkfl return &s } +type TemporalWorkflowMetadataCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + WorkflowReferenceOptions + QueryModifiersOptions +} + +func NewTemporalWorkflowMetadataCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowMetadataCommand { + var s TemporalWorkflowMetadataCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "metadata [flags]" + s.Command.Short = "Query the Workflow for user-specified metadata" + if hasHighlighting { + s.Command.Long = "Issue a Query for and display user-set metadata like summary and\ndetails for a specific Workflow Execution:\n\n\x1b[1mtemporal workflow metadata \\\n --workflow-id YourWorkflowId\x1b[0m" + } else { + s.Command.Long = "Issue a Query for and display user-set metadata like summary and\ndetails for a specific Workflow Execution:\n\n```\ntemporal workflow metadata \\\n --workflow-id YourWorkflowId\n```" + } + s.Command.Args = cobra.NoArgs + s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.QueryModifiersOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowQueryCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command PayloadInputOptions WorkflowReferenceOptions - Name string - RejectCondition StringEnum + QueryModifiersOptions + Name string } func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowQueryCommand { @@ -2768,10 +2979,9 @@ func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.Name, "name", "", "Query Type/Name. Required. Aliased as \"--type\".") _ = cobra.MarkFlagRequired(s.Command.Flags(), "name") - s.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "") - s.Command.Flags().Var(&s.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.") s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.QueryModifiersOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ "type": "name", })) @@ -3201,3 +3411,38 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora } return &s } + +type TemporalWorkflowUpdateOptionsCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SingleWorkflowOrBatchOptions + VersioningOverrideBehavior StringEnum + VersioningOverrideSeriesName string + VersioningOverrideBuildId string +} + +func NewTemporalWorkflowUpdateOptionsCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateOptionsCommand { + var s TemporalWorkflowUpdateOptionsCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "update-options [flags]" + s.Command.Short = "Change Workflow Execution Options" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worflow update-options is experimental. Workflow Execution |\n| properties are subject to change. |\n+---------------------------------------------------------------------+\n\nModify properties of Workflow Executions:\n\n\x1b[1mtemporal workflow update-options [options]\x1b[0m\n\nIt can override the Worker Deployment configuration of a\nWorkflow Execution, which controls Worker Versioning.\n\nFor example, to force Workers in the current Deployment execute the\nnext Workflow Task change behavior to \x1b[1mauto_upgrade\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior auto_upgrade\x1b[0m\n\nor to pin the workflow execution to a Worker Deployment, set behavior\nto \x1b[1mpinned\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior pinned \\\n --versioning-override-series-name YourDeploymentSeriesName \\\n --versioning-override-build-id YourDeploymentBuildId\x1b[0m\n\nTo remove any previous overrides, set the behavior to\n\x1b[1munspecified\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior unspecified\x1b[0m\n\nTo see the current override use \x1b[1mtemporal workflow describe\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worflow update-options is experimental. Workflow Execution |\n| properties are subject to change. |\n+---------------------------------------------------------------------+\n\nModify properties of Workflow Executions:\n\n```\ntemporal workflow update-options [options]\n```\n\nIt can override the Worker Deployment configuration of a\nWorkflow Execution, which controls Worker Versioning.\n\nFor example, to force Workers in the current Deployment execute the\nnext Workflow Task change behavior to `auto_upgrade`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior auto_upgrade\n```\n\nor to pin the workflow execution to a Worker Deployment, set behavior\nto `pinned`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior pinned \\\n --versioning-override-series-name YourDeploymentSeriesName \\\n --versioning-override-build-id YourDeploymentBuildId\n```\n\nTo remove any previous overrides, set the behavior to\n`unspecified`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior unspecified\n```\n\nTo see the current override use `temporal workflow describe`" + } + s.Command.Args = cobra.NoArgs + s.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") + s.Command.Flags().Var(&s.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "versioning-override-behavior") + s.Command.Flags().StringVar(&s.VersioningOverrideSeriesName, "versioning-override-series-name", "", "Override Series Name for a Worker Deployment (Only for pinned).") + s.Command.Flags().StringVar(&s.VersioningOverrideBuildId, "versioning-override-build-id", "", "Override Build ID for a Worker Deployment (Only for pinned).") + s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} diff --git a/temporalcli/commands.taskqueue_test.go b/temporalcli/commands.taskqueue_test.go index dd8638442..346df180d 100644 --- a/temporalcli/commands.taskqueue_test.go +++ b/temporalcli/commands.taskqueue_test.go @@ -2,11 +2,12 @@ package temporalcli_test import ( "encoding/json" - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/workflow" "strings" "time" + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/workflow" + "github.com/google/uuid" "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" @@ -261,6 +262,10 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { ) s.NoError(res.Err) + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + // Text res = s.Execute( "task-queue", "describe", @@ -270,11 +275,14 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { "--task-queue", s.Worker().Options.TaskQueue, ) s.NoError(res.Err) - s.ContainsOnSameLine(res.Stdout.String(), "id1", "reachable") // No pollers on id1 s.NotContains(res.Stdout.String(), "now") + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + res = s.Execute( "task-queue", "describe", "--select-unversioned", @@ -286,8 +294,12 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { s.NoError(res.Err) s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "unreachable") - s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "workflow", s.DevServer.Options.ClientOptions.Identity, "now", "100000") - s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "activity", s.DevServer.Options.ClientOptions.Identity, "now", "100000") + s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "workflow", s.DevServer.Options.ClientOptions.Identity, "2 seconds ago", "100000") + s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "activity", s.DevServer.Options.ClientOptions.Identity, "2 seconds ago", "100000") + + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) res = s.Execute( "task-queue", "describe", @@ -303,6 +315,10 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { // No pollers on id2 s.NotContains(res.Stdout.String(), "now") + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + res = s.Execute( "task-queue", "describe", "--select-all-active", diff --git a/temporalcli/commands.taskqueue_versioning_rules_test.go b/temporalcli/commands.taskqueue_versioning_rules_test.go index f82179abb..0e25a94d7 100644 --- a/temporalcli/commands.taskqueue_versioning_rules_test.go +++ b/temporalcli/commands.taskqueue_versioning_rules_test.go @@ -165,9 +165,9 @@ func (s *SharedServerSuite) TestTaskQueue_Rules_BuildId() { ) s.NoError(res.Err) - s.ContainsOnSameLine(res.Stdout.String(), "0", "id2", "40", "now") - s.ContainsOnSameLine(res.Stdout.String(), "1", "id1", "100", "now") - s.ContainsOnSameLine(res.Stdout.String(), "id3", "id5", "now") + s.ContainsOnSameLine(res.Stdout.String(), "0", "id2", "40") + s.ContainsOnSameLine(res.Stdout.String(), "1", "id1", "100") + s.ContainsOnSameLine(res.Stdout.String(), "id3", "id5") // Safe mode @@ -192,9 +192,9 @@ func (s *SharedServerSuite) TestTaskQueue_Rules_BuildId() { ) s.NoError(res.Err) // Shown before replacing - s.ContainsOnSameLine(res.Stdout.String(), "id3", "id5", "now") + s.ContainsOnSameLine(res.Stdout.String(), "id3", "id5") // Shown after replacing - s.ContainsOnSameLine(res.Stdout.String(), "id3", "id9", "now") + s.ContainsOnSameLine(res.Stdout.String(), "id3", "id9") // Commit diff --git a/temporalcli/commands.worker.deployment.go b/temporalcli/commands.worker.deployment.go new file mode 100644 index 000000000..45a44ec04 --- /dev/null +++ b/temporalcli/commands.worker.deployment.go @@ -0,0 +1,377 @@ +package temporalcli + +import ( + "fmt" + "time" + + "github.com/fatih/color" + "github.com/temporalio/cli/temporalcli/internal/printer" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/client" +) + +type taskQueuesInfosRowType struct { + Name string `json:"name"` + Type string `json:"type"` + FirstPollerTime time.Time `json:"firstPollerTime"` +} + +type deploymentType struct { + SeriesName string `json:"seriesName"` + BuildID string `json:"buildId"` +} + +type formattedDeploymentInfoType struct { + Deployment deploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` + TaskQueuesInfos []taskQueuesInfosRowType `json:"taskQueuesInfos,omitempty"` + Metadata map[string]*common.Payload `json:"metadata,omitempty"` +} + +type formattedDeploymentReachabilityInfoType struct { + DeploymentInfo formattedDeploymentInfoType `json:"deploymentInfo"` + Reachability string `json:"reachability"` + LastUpdateTime time.Time `json:"lastUpdateTime"` +} + +type formattedDeploymentListEntryType struct { + SeriesName string + BuildID string + CreateTime time.Time + IsCurrent bool +} + +type formattedDualDeploymentInfoType struct { + Previous formattedDeploymentInfoType `json:"previous"` + Current formattedDeploymentInfoType `json:"current"` +} + +func formatTaskQueuesInfos(tqis []client.DeploymentTaskQueueInfo) ([]taskQueuesInfosRowType, error) { + var tqiRows []taskQueuesInfosRowType + for _, tqi := range tqis { + tqTypeStr, err := taskQueueTypeToStr(tqi.Type) + if err != nil { + return tqiRows, err + } + tqiRows = append(tqiRows, taskQueuesInfosRowType{ + Name: tqi.Name, + Type: tqTypeStr, + FirstPollerTime: tqi.FirstPollerTime, + }) + } + return tqiRows, nil +} + +func deploymentInfoToRows(deploymentInfo client.DeploymentInfo) (formattedDeploymentInfoType, error) { + tqi, err := formatTaskQueuesInfos(deploymentInfo.TaskQueuesInfos) + if err != nil { + return formattedDeploymentInfoType{}, err + } + + return formattedDeploymentInfoType{ + Deployment: deploymentType{ + SeriesName: deploymentInfo.Deployment.SeriesName, + BuildID: deploymentInfo.Deployment.BuildID, + }, + CreateTime: deploymentInfo.CreateTime, + IsCurrent: deploymentInfo.IsCurrent, + TaskQueuesInfos: tqi, + Metadata: deploymentInfo.Metadata, + }, nil +} + +func printDeploymentInfo(cctx *CommandContext, deploymentInfo client.DeploymentInfo, msg string) error { + + fDeploymentInfo, err := deploymentInfoToRows(deploymentInfo) + if err != nil { + return err + } + + if !cctx.JSONOutput { + cctx.Printer.Println(color.MagentaString(msg)) + printMe := struct { + SeriesName string + BuildID string + CreateTime time.Time + IsCurrent bool + Metadata map[string]*common.Payload `cli:",cardOmitEmpty"` + }{ + SeriesName: deploymentInfo.Deployment.SeriesName, + BuildID: deploymentInfo.Deployment.BuildID, + CreateTime: deploymentInfo.CreateTime, + IsCurrent: deploymentInfo.IsCurrent, + Metadata: deploymentInfo.Metadata, + } + err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) + if err != nil { + return fmt.Errorf("displaying worker deployment info failed: %w", err) + } + + if len(deploymentInfo.TaskQueuesInfos) > 0 { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Task Queues:")) + err := cctx.Printer.PrintStructured( + deploymentInfo.TaskQueuesInfos, + printer.StructuredOptions{Table: &printer.TableOptions{}}, + ) + if err != nil { + return fmt.Errorf("displaying task queues info failed: %w", err) + } + } + + return nil + } + + // json output + return cctx.Printer.PrintStructured(fDeploymentInfo, printer.StructuredOptions{}) +} + +func deploymentReachabilityTypeToStr(reachabilityType client.DeploymentReachability) (string, error) { + switch reachabilityType { + case client.DeploymentReachabilityUnspecified: + return "unspecified", nil + case client.DeploymentReachabilityReachable: + return "reachable", nil + case client.DeploymentReachabilityClosedWorkflows: + return "closed", nil + case client.DeploymentReachabilityUnreachable: + return "unreachable", nil + default: + return "", fmt.Errorf("unrecognized deployment reachability type: %d", reachabilityType) + } +} + +func printDeploymentReachabilityInfo(cctx *CommandContext, reachability client.DeploymentReachabilityInfo) error { + fDeploymentInfo, err := deploymentInfoToRows(reachability.DeploymentInfo) + if err != nil { + return err + } + + rTypeStr, err := deploymentReachabilityTypeToStr(reachability.Reachability) + if err != nil { + return err + } + + fReachabilityInfo := formattedDeploymentReachabilityInfoType{ + DeploymentInfo: fDeploymentInfo, + LastUpdateTime: reachability.LastUpdateTime, + Reachability: rTypeStr, + } + + if !cctx.JSONOutput { + err := printDeploymentInfo(cctx, reachability.DeploymentInfo, "Worker Deployment:") + if err != nil { + return err + } + + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Reachability:")) + printMe := struct { + LastUpdateTime time.Time + Reachability string + }{ + LastUpdateTime: fReachabilityInfo.LastUpdateTime, + Reachability: fReachabilityInfo.Reachability, + } + return cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) + } + + // json output + return cctx.Printer.PrintStructured(fReachabilityInfo, printer.StructuredOptions{}) +} + +func printDeploymentSetCurrentResponse(cctx *CommandContext, response client.DeploymentSetCurrentResponse) error { + + if !cctx.JSONOutput { + err := printDeploymentInfo(cctx, response.Previous, "Previous Worker Deployment:") + if err != nil { + return fmt.Errorf("displaying previous worker deployment info failed: %w", err) + } + + err = printDeploymentInfo(cctx, response.Current, "Current Worker Deployment:") + if err != nil { + return fmt.Errorf("displaying current worker deployment info failed: %w", err) + } + + return nil + } + + previous, err := deploymentInfoToRows(response.Previous) + if err != nil { + return fmt.Errorf("displaying previous worker deployment info failed: %w", err) + } + current, err := deploymentInfoToRows(response.Current) + if err != nil { + return fmt.Errorf("displaying current worker deployment info failed: %w", err) + } + + return cctx.Printer.PrintStructured(formattedDualDeploymentInfoType{ + Previous: previous, + Current: current, + }, printer.StructuredOptions{}) +} + +func (c *TemporalWorkerDeploymentDescribeCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + if c.ReportReachability { + // Expensive call, rate-limited by target method + resp, err := cl.DeploymentClient().GetReachability(cctx, client.DeploymentGetReachabilityOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + }) + if err != nil { + return fmt.Errorf("error describing worker deployment with reachability: %w", err) + } + + err = printDeploymentReachabilityInfo(cctx, resp) + if err != nil { + return err + } + } else { + resp, err := cl.DeploymentClient().Describe(cctx, client.DeploymentDescribeOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + }) + if err != nil { + return fmt.Errorf("error describing worker deployment: %w", err) + } + err = printDeploymentInfo(cctx, resp.DeploymentInfo, "Worker Deployment:") + if err != nil { + return err + } + + } + + return nil +} + +func (c *TemporalWorkerDeploymentGetCurrentCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + resp, err := cl.DeploymentClient().GetCurrent(cctx, client.DeploymentGetCurrentOptions{ + SeriesName: c.SeriesName, + }) + if err != nil { + return fmt.Errorf("error getting the current deployment: %w", err) + } + + err = printDeploymentInfo(cctx, resp.DeploymentInfo, "Current Worker Deployment:") + if err != nil { + return err + } + + return nil +} + +func (c *TemporalWorkerDeploymentListCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + res, err := cl.DeploymentClient().List(cctx, client.DeploymentListOptions{ + SeriesName: c.SeriesName, + }) + if err != nil { + return err + } + + // This is a listing command subject to json vs jsonl rules + cctx.Printer.StartList() + defer cctx.Printer.EndList() + + printTableOpts := printer.StructuredOptions{ + Table: &printer.TableOptions{}, + } + + // make artificial "pages" so we get better aligned columns + page := make([]*formattedDeploymentListEntryType, 0, 100) + + for res.HasNext() { + entry, err := res.Next() + if err != nil { + return err + } + listEntry := formattedDeploymentInfoType{ + Deployment: deploymentType{ + SeriesName: entry.Deployment.SeriesName, + BuildID: entry.Deployment.BuildID, + }, + CreateTime: entry.CreateTime, + IsCurrent: entry.IsCurrent, + } + if cctx.JSONOutput { + // For JSON dump one line of JSON per deployment + _ = cctx.Printer.PrintStructured(listEntry, printer.StructuredOptions{}) + } else { + // For non-JSON, we are doing a table for each page + page = append(page, &formattedDeploymentListEntryType{ + SeriesName: listEntry.Deployment.SeriesName, + BuildID: listEntry.Deployment.BuildID, + CreateTime: listEntry.CreateTime, + IsCurrent: listEntry.IsCurrent, + }) + if len(page) == cap(page) { + _ = cctx.Printer.PrintStructured(page, printTableOpts) + page = page[:0] + printTableOpts.Table.NoHeader = true + } + } + } + + if !cctx.JSONOutput { + // Last partial page for non-JSON + _ = cctx.Printer.PrintStructured(page, printTableOpts) + } + + return nil +} + +func (c *TemporalWorkerDeploymentSetCurrentCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + metadata, err := stringKeysJSONValues(c.Metadata, false) + if err != nil { + return fmt.Errorf("invalid metadata values: %w", err) + } + + resp, err := cl.DeploymentClient().SetCurrent(cctx, client.DeploymentSetCurrentOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + MetadataUpdate: client.DeploymentMetadataUpdate{ + UpsertEntries: metadata, + }, + }) + if err != nil { + return fmt.Errorf("error setting the current worker deployment: %w", err) + } + + err = printDeploymentSetCurrentResponse(cctx, resp) + if err != nil { + return err + } + + cctx.Printer.Println("Successfully setting the current worker deployment") + return nil +} diff --git a/temporalcli/commands.worker.deployment_test.go b/temporalcli/commands.worker.deployment_test.go new file mode 100644 index 000000000..08c61309d --- /dev/null +++ b/temporalcli/commands.worker.deployment_test.go @@ -0,0 +1,191 @@ +package temporalcli_test + +import ( + "encoding/base64" + "encoding/json" + "sort" + "time" + + "github.com/google/uuid" + "go.temporal.io/api/common/v1" +) + +type jsonTaskQueuesInfosRowType struct { + Name string `json:"name"` + Type string `json:"type"` + FirstPollerTime time.Time `json:"firstPollerTime"` +} + +type jsonDeploymentType struct { + SeriesName string `json:"seriesName"` + BuildID string `json:"buildId"` +} + +type jsonDeploymentInfoType struct { + Deployment jsonDeploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` + TaskQueuesInfos []jsonTaskQueuesInfosRowType `json:"taskQueuesInfos,omitempty"` + Metadata map[string]*common.Payload `json:"metadata,omitempty"` +} + +type jsonDeploymentReachabilityInfoType struct { + DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"` + Reachability string `json:"reachability"` + LastUpdateTime time.Time `json:"lastUpdateTime"` +} + +type jsonDeploymentListEntryType struct { + Deployment jsonDeploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` +} + +func (s *SharedServerSuite) TestDeployment_Set_Current() { + seriesName := uuid.NewString() + buildId := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId, + "--metadata", "bar=1", + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "get-current", + "--address", s.Address(), + "--series-name", seriesName, + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "SeriesName", seriesName) + s.ContainsOnSameLine(res.Stdout.String(), "BuildID", buildId) + s.ContainsOnSameLine(res.Stdout.String(), "IsCurrent", "true") + s.ContainsOnSameLine(res.Stdout.String(), "Metadata", "data:\"1\"") + + // json + res = s.Execute( + "worker", "deployment", "get-current", + "--address", s.Address(), + "--series-name", seriesName, + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut jsonDeploymentInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId}, jsonOut.Deployment) + s.True(jsonOut.IsCurrent) + // "1" is "MQ==" + s.Equal("MQ==", base64.StdEncoding.EncodeToString(jsonOut.Metadata["bar"].GetData())) + // "json/plain" is "anNvbi9wbGFpbg==" + s.Equal("anNvbi9wbGFpbg==", base64.StdEncoding.EncodeToString(jsonOut.Metadata["bar"].GetMetadata()["encoding"])) +} + +func (s *SharedServerSuite) TestDeployment_List() { + seriesName := uuid.NewString() + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "list", + "--address", s.Address(), + "--series-name", seriesName, + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), seriesName, buildId1, "false") + s.ContainsOnSameLine(res.Stdout.String(), seriesName, buildId2, "true") + + // json + res = s.Execute( + "worker", "deployment", "list", + "--address", s.Address(), + "--series-name", seriesName, + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut []jsonDeploymentListEntryType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + sort.Slice(jsonOut, func(i, j int) bool { + return jsonOut[i].CreateTime.Before(jsonOut[j].CreateTime) + }) + s.Equal(len(jsonOut), 2) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId1}, jsonOut[0].Deployment) + s.True(!jsonOut[0].IsCurrent) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId2}, jsonOut[1].Deployment) + s.True(jsonOut[1].IsCurrent) +} + +func (s *SharedServerSuite) TestDeployment_Describe_Reachability() { + seriesName := uuid.NewString() + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "describe", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + "--report-reachability", + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "SeriesName", seriesName) + s.ContainsOnSameLine(res.Stdout.String(), "BuildID", buildId1) + s.ContainsOnSameLine(res.Stdout.String(), "IsCurrent", "false") + s.ContainsOnSameLine(res.Stdout.String(), "Reachability", "unreachable") + + // json + res = s.Execute( + "worker", "deployment", "describe", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + "--report-reachability", + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut jsonDeploymentReachabilityInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId2}, jsonOut.DeploymentInfo.Deployment) + s.True(jsonOut.DeploymentInfo.IsCurrent) + s.Equal(jsonOut.Reachability, "reachable") +} diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 02316aad0..421fa354b 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -8,20 +8,27 @@ import ( "os/user" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" "github.com/fatih/color" "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/query/v1" + sdkpb "go.temporal.io/api/sdk/v1" "go.temporal.io/api/update/v1" + workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) +const metadataQueryName = "__temporal_workflow_metadata" + func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error { cl, err := c.Parent.ClientOptions.dialClient(cctx) if err != nil { @@ -88,6 +95,122 @@ func (c *TemporalWorkflowDeleteCommand) run(cctx *CommandContext, args []string) return nil } +func (c *TemporalWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + if c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade" { + if c.VersioningOverrideSeriesName != "" { + return fmt.Errorf("cannot set deployment series name with %v behavior", c.VersioningOverrideBehavior) + } + if c.VersioningOverrideBuildId != "" { + return fmt.Errorf("cannot set deployment build ID with %v behavior", c.VersioningOverrideBehavior) + } + } + + if c.VersioningOverrideBehavior.Value == "pinned" { + if c.VersioningOverrideSeriesName == "" { + return fmt.Errorf("missing deployment series name with 'pinned' behavior") + } + if c.VersioningOverrideBuildId == "" { + return fmt.Errorf("missing deployment build ID with 'pinned' behavior") + } + } + + exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{}) + + // Run single or batch + if err != nil { + return err + } else if exec != nil { + behavior := workflow.VersioningBehaviorUnspecified + switch c.VersioningOverrideBehavior.Value { + case "unspecified": + case "pinned": + behavior = workflow.VersioningBehaviorPinned + case "auto_upgrade": + behavior = workflow.VersioningBehaviorAutoUpgrade + default: + return fmt.Errorf( + "invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", + c.VersioningOverrideBehavior, + ) + } + + _, err := cl.UpdateWorkflowExecutionOptions(cctx, client.UpdateWorkflowExecutionOptionsRequest{ + WorkflowId: exec.WorkflowId, + RunId: exec.RunId, + WorkflowExecutionOptionsChanges: client.WorkflowExecutionOptionsChanges{ + VersioningOverride: &client.VersioningOverride{ + Behavior: behavior, + Deployment: client.Deployment{ + SeriesName: c.VersioningOverrideSeriesName, + BuildID: c.VersioningOverrideBuildId, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to update workflow options: %w", err) + } + cctx.Printer.Println("Update workflow options succeeded") + } else { // Run batch + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } + + behavior := enums.VERSIONING_BEHAVIOR_UNSPECIFIED + switch c.VersioningOverrideBehavior.Value { + case "unspecified": + case "pinned": + behavior = enums.VERSIONING_BEHAVIOR_PINNED + case "auto_upgrade": + behavior = enums.VERSIONING_BEHAVIOR_AUTO_UPGRADE + default: + return fmt.Errorf( + "invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", + c.VersioningOverrideBehavior, + ) + } + + deployment := &deploymentpb.Deployment{ + SeriesName: c.VersioningOverrideSeriesName, + BuildId: c.VersioningOverrideBuildId, + } + if c.VersioningOverrideSeriesName == "" && c.VersioningOverrideBuildId == "" { + // auto_upgrade needs a `nil` pointer + deployment = nil + } + + batchReq.Operation = &workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation{ + UpdateWorkflowOptionsOperation: &batch.BatchOperationUpdateWorkflowExecutionOptions{ + Identity: clientIdentity(), + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: behavior, + Deployment: deployment, + }, + }, + UpdateMask: protoMask, + }, + } + if err := startBatchJob(cctx, cl, batchReq); err != nil { + return err + } + } + return nil +} + +func (c *TemporalWorkflowMetadataCommand) run(cctx *CommandContext, _ []string) error { + return queryHelper(cctx, c.Parent, PayloadInputOptions{}, + metadataQueryName, c.RejectCondition, c.WorkflowReferenceOptions) +} + func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) error { return queryHelper(cctx, c.Parent, c.PayloadInputOptions, c.Name, c.RejectCondition, c.WorkflowReferenceOptions) @@ -522,14 +645,63 @@ func queryHelper(cctx *CommandContext, return cctx.Printer.PrintStructured(result, printer.StructuredOptions{}) } - cctx.Printer.Println(color.MagentaString("Query result:")) - output := struct { - QueryResult json.RawMessage `cli:",cardOmitEmpty"` - }{} - output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult) - if err != nil { - return fmt.Errorf("failed to marshal query result: %w", err) - } + if queryType == metadataQueryName { + var metadata sdkpb.WorkflowMetadata + err := UnmarshalProtoJSONWithOptions(result.QueryResult.Payloads[0].Data, &metadata, true) + if err != nil { + return fmt.Errorf("failed to unmarshal metadata: %w", err) + } + cctx.Printer.Println(color.MagentaString("Metadata:")) + + qDefs := metadata.GetDefinition().GetQueryDefinitions() + if len(qDefs) > 0 { + cctx.Printer.Println(printer.NonJSONIndent, color.MagentaString("Query Definitions:")) + err := cctx.Printer.PrintStructured(qDefs, printer.StructuredOptions{ + Table: &printer.TableOptions{NoHeader: true}, + NonJSONExtraIndent: 1, + }) + if err != nil { + return err + } + } + sigDefs := metadata.GetDefinition().GetSignalDefinitions() + if len(sigDefs) > 0 { + cctx.Printer.Println(printer.NonJSONIndent, color.MagentaString("Signal Definitions:")) + err := cctx.Printer.PrintStructured(sigDefs, printer.StructuredOptions{ + Table: &printer.TableOptions{NoHeader: true}, + NonJSONExtraIndent: 1, + }) + if err != nil { + return err + } + } + updDefs := metadata.GetDefinition().GetUpdateDefinitions() + if len(updDefs) > 0 { + cctx.Printer.Println(printer.NonJSONIndent, color.MagentaString("Update Definitions:")) + err := cctx.Printer.PrintStructured(updDefs, printer.StructuredOptions{ + Table: &printer.TableOptions{NoHeader: true}, + NonJSONExtraIndent: 1, + }) + if err != nil { + return err + } + } + if metadata.GetCurrentDetails() != "" { + cctx.Printer.Println(printer.NonJSONIndent, color.MagentaString("Current Details:")) + cctx.Printer.Println(printer.NonJSONIndent, printer.NonJSONIndent, + metadata.GetCurrentDetails()) + } + return nil + } else { + cctx.Printer.Println(color.MagentaString("Query result:")) + output := struct { + QueryResult json.RawMessage `cli:",cardOmitEmpty"` + }{} + output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult) + if err != nil { + return fmt.Errorf("failed to marshal query result: %w", err) + } - return cctx.Printer.PrintStructured(output, printer.StructuredOptions{}) + return cctx.Printer.PrintStructured(output, printer.StructuredOptions{}) + } } diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 65c76839c..8ece140c6 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -11,9 +11,12 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" "google.golang.org/grpc" ) @@ -423,6 +426,193 @@ func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() { s.Error(workflow.ErrCanceled, run.Get(s.Context, nil)) } +func (s *SharedServerSuite) TestWorkflow_Batch_Update_Options_Versioning_Override() { + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId1, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + // Start workflows + numWorkflows := 5 + runs := make([]client.WorkflowRun, numWorkflows) + searchAttr := "keyword-" + uuid.NewString() + for i := range runs { + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: w.Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + waitingWorkflow, + ) + s.NoError(err) + runs[i] = run + } + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, run := range runs { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId1) + assert.Contains(t, res.Stdout.String(), "Pinned") + } + }, 30*time.Second, 100*time.Millisecond) + + s.CommandHarness.Stdin.WriteString("y\n") + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "--query", "CustomKeywordField = '"+searchAttr+"'", + "--versioning-override-behavior", "pinned", + "--versioning-override-series-name", seriesName, + "--versioning-override-build-id", buildId2, + ) + s.NoError(res.Err) + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, run := range runs { + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + assert.NoError(t, res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + assert.NoError(t, temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + assert.NotNil(t, versioningInfo.VersioningOverride) + assert.Equal(t, seriesName+"."+buildId2, versioningInfo.VersioningOverride.PinnedVersion) + } + }, 30*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestWorkflow_Update_Options_Versioning_Override() { + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId1, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + // Start the workflow and wait until the operation is started. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: w.Options.TaskQueue}, + waitingWorkflow, + ) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId1) + assert.Contains(t, res.Stdout.String(), "Pinned") + }, 30*time.Second, 100*time.Millisecond) + + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "-w", run.GetID(), + "--versioning-override-behavior", "pinned", + "--versioning-override-series-name", seriesName, + "--versioning-override-build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "OverrideBehavior", "Pinned") + // TODO(antlai-temporal): replace by new Deployment API + // These fields are deprecated, and not populated in latest server + // s.ContainsOnSameLine(res.Stdout.String(), "OverrideDeploymentSeriesName", seriesName) + // s.ContainsOnSameLine(res.Stdout.String(), "OverrideDeploymentBuildID", buildId2) + + // remove override + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "-w", run.GetID(), + "--versioning-override-behavior", "unspecified", + ) + s.NoError(res.Err) + + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + s.NoError(res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + s.Nil(versioningInfo.VersioningOverride) +} + func (s *SharedServerSuite) TestWorkflow_Update_Execute() { workflowUpdateTest{ s: s, @@ -965,3 +1155,96 @@ func (s *SharedServerSuite) testStackWorkflow(json bool) { s.Error(res.Err) s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed") } + +func (s *SharedServerSuite) TestWorkflow_MetadataJSON() { + s.testWorkflowMetadata(true) +} + +func (s *SharedServerSuite) TestWorkflow_Metadata() { + s.testWorkflowMetadata(false) +} + +func (s *SharedServerSuite) testWorkflowMetadata(json bool) { + // Make workflow wait for signal and then return it + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + done := false + workflow.Go(ctx, func(ctx workflow.Context) { + _ = workflow.Await(ctx, func() bool { + return done + }) + }) + workflow.SetQueryHandlerWithOptions(ctx, "my-query", func(arg string) (string, error) { + return "hi", nil + }, workflow.QueryHandlerOptions{Description: "q-desc"}) + workflow.SetUpdateHandlerWithOptions(ctx, "my-update", + func(ctx workflow.Context, arg string) (string, error) { return "hi", nil }, + workflow.UpdateHandlerOptions{Description: "upd-desc"}) + workflow.SetCurrentDetails(ctx, "current-deets") + workflow.GetSignalChannelWithOptions(ctx, "my-signal", + workflow.SignalChannelOptions{Description: "sig-desc"}).Receive(ctx, nil) + done = true + return nil, nil + + }) + + // Start the workflow + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + StaticSummary: "summie", + StaticDetails: "deets", + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + + args := []string{ + "workflow", "metadata", + "--address", s.Address(), + "-w", run.GetID(), + } + if json { + args = append(args, "-o", "json") + } + + res := s.Execute(args...) + s.NoError(res.Err) + if !json { + s.Contains(res.Stdout.String(), "Query Definitions:") + s.ContainsOnSameLine(res.Stdout.String(), "my-query", "q-desc") + s.Contains(res.Stdout.String(), "Signal Definitions:") + s.ContainsOnSameLine(res.Stdout.String(), "my-signal", "sig-desc") + s.Contains(res.Stdout.String(), "Update Definitions:") + s.ContainsOnSameLine(res.Stdout.String(), "my-update", "upd-desc") + s.Contains(res.Stdout.String(), "Current Details:") + s.Contains(res.Stdout.String(), "current-deets") + } else { + s.Contains(res.Stdout.String(), "queryDefinitions") + s.ContainsOnSameLine(res.Stdout.String(), "name", "my-query") + s.Contains(res.Stdout.String(), "signalDefinitions") + s.ContainsOnSameLine(res.Stdout.String(), "name", "my-signal") + s.Contains(res.Stdout.String(), "updateDefinitions") + s.ContainsOnSameLine(res.Stdout.String(), "name", "my-update") + s.ContainsOnSameLine(res.Stdout.String(), "currentDetails", "current-deets") + } + + // Unblock the workflow with a signal + s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil)) + s.NoError(run.Get(s.Context, nil)) + + // Ensure query is rejected when using not open rejection condition + args = []string{ + "workflow", "metadata", + "--address", s.Address(), + "-w", run.GetID(), + "--reject-condition", "not_open", + } + if json { + args = append(args, "-o", "json") + } + res = s.Execute(args...) + s.Error(res.Err) + s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed") +} diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index 86dbf3551..3ca20cd37 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -134,6 +134,32 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin HistorySize: info.HistorySizeBytes, }, printer.StructuredOptions{}) + if info.VersioningInfo != nil { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Versioning Info:")) + cctx.Printer.Println() + vInfo := info.VersioningInfo + _ = cctx.Printer.PrintStructured(struct { + Behavior string + DeploymentSeriesName string + DeploymentBuildID string + OverrideBehavior string `cli:",cardOmitEmpty"` + OverrideDeploymentSeriesName string `cli:",cardOmitEmpty"` + OverrideDeploymentBuildID string `cli:",cardOmitEmpty"` + TransitionDeploymentSeriesName string `cli:",cardOmitEmpty"` + TransitionDeploymentBuildID string `cli:",cardOmitEmpty"` + }{ + Behavior: vInfo.Behavior.String(), + DeploymentSeriesName: vInfo.Deployment.GetSeriesName(), + DeploymentBuildID: vInfo.Deployment.GetBuildId(), + OverrideBehavior: vInfo.VersioningOverride.GetBehavior().String(), + OverrideDeploymentSeriesName: vInfo.VersioningOverride.GetDeployment().GetSeriesName(), + OverrideDeploymentBuildID: vInfo.VersioningOverride.GetDeployment().GetBuildId(), + TransitionDeploymentSeriesName: vInfo.DeploymentTransition.GetDeployment().GetSeriesName(), + TransitionDeploymentBuildID: vInfo.DeploymentTransition.GetDeployment().GetBuildId(), + }, printer.StructuredOptions{}) + } + if len(resp.Callbacks) > 0 { cctx.Printer.Println() cctx.Printer.Println(color.MagentaString("Callbacks: %v", len(resp.Callbacks))) diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index f1c6a60c9..8ad7a80cc 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/assert" "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" nexuspb "go.temporal.io/api/nexus/v1" @@ -21,6 +22,7 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/temporalnexus" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -546,6 +548,83 @@ func (s *SharedServerSuite) TestWorkflow_Count() { s.Contains(out, `{"groupValues":["Completed"],"count":"3"}`) } +func (s *SharedServerSuite) TestWorkflow_Describe_Deployment() { + buildId := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId, + ) + s.NoError(res.Err) + + // Start the workflow and wait until the operation is started. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: w.Options.TaskQueue}, + waitingWorkflow, + ) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId) + assert.Contains(t, res.Stdout.String(), "Pinned") + }, 30*time.Second, 100*time.Millisecond) + + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Behavior", "Pinned") + // TODO(antlai-temporal): replace by new Deployment API + // These fields are deprecated, and not populated in latest server + //s.ContainsOnSameLine(out, "DeploymentBuildID", buildId) + //s.ContainsOnSameLine(out, "DeploymentSeriesName", seriesName) + s.ContainsOnSameLine(out, "OverrideBehavior", "Unspecified") + + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + s.NoError(res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + s.Equal("Pinned", versioningInfo.Behavior.String()) + // TODO(antlai-temporal): replace by new Deployment API + // These fields are deprecated, and not populated in latest server + // s.Equal(buildId, versioningInfo.Deployment.BuildId) + // s.Equal(seriesName, versioningInfo.Deployment.SeriesName) + s.Nil(versioningInfo.VersioningOverride) + s.Nil(versioningInfo.DeploymentTransition) +} + func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationAndCallback() { handlerWorkflowID := uuid.NewString() endpointName := validEndpointName(s.T()) @@ -732,7 +811,7 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationBlocked() { s.NoError(err) return len(resp.PendingNexusOperations) > 0 && resp.PendingNexusOperations[0].State == enums.PENDING_NEXUS_OPERATION_STATE_BLOCKED - }, 30*time.Second, 100*time.Millisecond) + }, 60*time.Second, 100*time.Millisecond) // Operations - Text res := s.Execute( diff --git a/temporalcli/commands_test.go b/temporalcli/commands_test.go index 379adb8ef..467f2c9bb 100644 --- a/temporalcli/commands_test.go +++ b/temporalcli/commands_test.go @@ -370,6 +370,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer { d.Options.DynamicConfigValues["frontend.workerVersioningRuleAPIs"] = true d.Options.DynamicConfigValues["frontend.workerVersioningDataAPIs"] = true d.Options.DynamicConfigValues["frontend.workerVersioningWorkflowAPIs"] = true + d.Options.DynamicConfigValues["system.enableDeployments"] = true d.Options.DynamicConfigValues["worker.buildIdScavengerEnabled"] = true d.Options.DynamicConfigValues["frontend.enableUpdateWorkflowExecution"] = true d.Options.DynamicConfigValues["frontend.MaxConcurrentBatchOperationPerNamespace"] = 1000 diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 090d56324..249930ab3 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -206,7 +206,7 @@ commands: - name: temporal activity summary: Complete, update, pause, unpause, reset or fail an Activity description: | - Update an Activity's options, manage activity lifecycle or update + Update an Activity's options, manage activity lifecycle or update an Activity's state to completed or failed. Updating activity state marks an Activity as successfully finished or as @@ -297,9 +297,9 @@ commands: - name: temporal activity update-options summary: Update Activity options description: | - Update Activity options. Specify the Activity and Workflow IDs, and - options you want to update. - Updates are incremental, only changing the specified options. + Update Activity options. Specify the Activity and Workflow IDs, and + options you want to update. + Updates are incremental, only changing the specified options. ``` temporal activity update-options \ @@ -327,22 +327,22 @@ commands: - name: schedule-to-close-timeout type: duration description: | - Indicates how long the caller is willing to wait for an activity - completion. + Indicates how long the caller is willing to wait for an activity + completion. Limits how long retries will be attempted. - name: schedule-to-start-timeout type: duration description: | - Limits time an activity task can stay in a task queue before a worker - picks it up. - This timeout is always non retryable, as all a retry would achieve is + Limits time an activity task can stay in a task queue before a worker + picks it up. + This timeout is always non retryable, as all a retry would achieve is to put it back into the same queue. Defaults to the schedule-to-close timeout or workflow execution timeout if not specified. - name: start-to-close-timeout type: duration description: | - Maximum time an activity is allowed to execute after being picked up + Maximum time an activity is allowed to execute after being picked up by a worker. This timeout is always retryable. - name: heartbeat-timeout type: duration @@ -356,7 +356,7 @@ commands: - name: retry-maximum-interval type: duration description: | - Maximum interval between retries. Exponential backoff leads to + Maximum interval between retries. Exponential backoff leads to interval increase. This value is the cap of the increase. - name: retry-backoff-coefficient @@ -371,7 +371,7 @@ commands: description: | Maximum number of attempts. When exceeded the retries stop even if not expired yet. - Setting this value to 1 disables retries. Setting this value to 0 + Setting this value to 1 disables retries. Setting this value to 0 means unlimited attempts(up to the timeouts). - name: identity type: string @@ -647,6 +647,184 @@ commands: description: Reason for terminating the batch job. required: true + - name: temporal worker + summary: Read or update Worker state + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker is experimental. Worker commands are subject to | + | change. | + +---------------------------------------------------------------------+ + + Modify or read state associated with a Worker, for example, + using Worker Deployments commands: + + ``` + temporal worker deployment + ``` + option-sets: + - client + docs: + description-header: >- + Learn how to read or modify state associated with a Worker, + such as Worker Deployments. + keywords: + - worker + - worker deployment + + - name: temporal worker deployment + summary: Describe, list, and operate on Worker Deployments + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Deployment commands perform operations on Worker Deployments: + + ``` + temporal worker deployment [command] [options] + ``` + + For example: + + ``` + temporal worker deployment list + ``` + docs: + description-header: >- + Temporal Deployment commands enable operations on Worker Deployments, + such as describe, list, set-current, and get-current, simplifying + versioning and management of workers. + keywords: + - worker deployment + - worker deployment describe + - worker deployment list + - worker deployment get-current + - worker deployment set-current + + - name: temporal worker deployment describe + summary: Show properties of a Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Describes properties of a Worker Deployment, such as whether it is + current, the non-empty list of its task queues, custom metadata if + present, and reachability status when requested. + + ``` + temporal worker deployment describe [options] + ``` + + For example, to also include reachability information: + + ``` + temporal worker deployment describe \ + --series-name YourDeploymentSeriesName \ + --build-id YourDeploymentBuildId \ + --report-reachability + ``` + option-sets: + - deployment-reference + options: + - name: report-reachability + type: bool + description: | + Include reachability information of a Worker Deployment. + + - name: temporal worker deployment get-current + summary: Show the current Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Gets the current Worker Deployment for a Deployment Series Name. + When a Deployment is current, Workers of that Deployment will receive + tasks from new Workflows and from existing AutoUpgrade Workflows that + are running on this Deployment Series. + + ``` + temporal worker deployment get-current [options] + ``` + + For example: + + ``` + temporal worker deployment get-current \ + --series-name YourDeploymentSeriesName + ``` + options: + - name: series-name + type: string + description: Series Name for the current Worker Deployment. + required: true + + - name: temporal worker deployment list + summary: Enumerate Worker Deployments in the client's namespace + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + List existing Worker Deployments in the client's namespace, optionally + filtering them by Deployment Series Name. + + + ``` + temporal worker deployment list [options] + ``` + + For example, adding an optional filter: + + ``` + temporal worker deployment list \ + --series-name YourDeploymentSeriesName + ``` + options: + - name: series-name + type: string + description: Series Name to filter Worker Deployments. + + - name: temporal worker deployment set-current + summary: Change the current Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Sets the current Deployment for a given Deployment Series. + When a Deployment is current, Workers of that Deployment will receive + tasks from new Workflows and from existing AutoUpgrade Workflows that + are running on this Deployment Series. + + ``` + temporal worker deployment set-current [options] + ``` + + For example: + + ``` + temporal worker deployment set-current \ + --series-name YourDeploymentSeriesName \ + --build-id YourDeploymentBuildId + ``` + option-sets: + - deployment-reference + options: + - name: metadata + type: string[] + description: | + Set deployment metadata using `KEY="VALUE"` pairs. + Keys must be identifiers, and values must be JSON values. + For example: 'YourKey={"your": "value"}'. + Can be passed multiple times. + - name: temporal env summary: Manage environments description: | @@ -2537,9 +2715,9 @@ commands: docs: description-header: >- Temporal Workflow commands enable operations on Workflow Executions, - such as cancel, count, delete, describe, execute, list, query, reset, - reset-batch, show, signal, stack, start, terminate, trace, and update, - enhancing efficiency and control. + such as cancel, count, delete, describe, execute, list, update-options, + query, reset, reset-batch, show, signal, stack, start, terminate, + trace, and update, enhancing efficiency and control. keywords: - call stack - cancellation @@ -2562,6 +2740,7 @@ commands: - workflow execute - workflow execution - workflow list + - workflow metadata - workflow query - workflow reset - workflow reset-batch @@ -2571,6 +2750,7 @@ commands: - workflow start - workflow terminate - workflow trace + - workflow update-options - name: temporal workflow cancel summary: Send cancellation to Workflow Execution @@ -2749,6 +2929,87 @@ commands: type: int description: Maximum number of Workflow Executions to display. + - name: temporal workflow metadata + summary: Query the Workflow for user-specified metadata + description: | + Issue a Query for and display user-set metadata like summary and + details for a specific Workflow Execution: + + ``` + temporal workflow metadata \ + --workflow-id YourWorkflowId + ``` + option-sets: + - workflow-reference + - query-modifiers + + - name: temporal workflow update-options + summary: Change Workflow Execution Options + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worflow update-options is experimental. Workflow Execution | + | properties are subject to change. | + +---------------------------------------------------------------------+ + + Modify properties of Workflow Executions: + + ``` + temporal workflow update-options [options] + ``` + + It can override the Worker Deployment configuration of a + Workflow Execution, which controls Worker Versioning. + + For example, to force Workers in the current Deployment execute the + next Workflow Task change behavior to `auto_upgrade`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior auto_upgrade + ``` + + or to pin the workflow execution to a Worker Deployment, set behavior + to `pinned`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior pinned \ + --versioning-override-series-name YourDeploymentSeriesName \ + --versioning-override-build-id YourDeploymentBuildId + ``` + + To remove any previous overrides, set the behavior to + `unspecified`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior unspecified + ``` + + To see the current override use `temporal workflow describe` + + option-sets: + - single-workflow-or-batch + options: + - name: versioning-override-behavior + type: string-enum + description: | + Override the versioning behavior of a Workflow. + required: true + enum-values: + - unspecified + - pinned + - auto_upgrade + - name: versioning-override-series-name + type: string + description: Override Series Name for a Worker Deployment (Only for pinned). + - name: versioning-override-build-id + type: string + description: Override Build ID for a Worker Deployment (Only for pinned). + - name: temporal workflow query summary: Retrieve Workflow Execution state description: | @@ -2766,6 +3027,7 @@ commands: option-sets: - payload-input - workflow-reference + - query-modifiers options: - name: name type: string @@ -2773,13 +3035,6 @@ commands: required: true aliases: - type - - name: reject-condition - type: string-enum - description: | - Optional flag for rejecting Queries based on Workflow state. - enum-values: - - not_open - - not_completed_cleanly - name: temporal workflow reset summary: Move Workflow Execution history point @@ -3353,6 +3608,17 @@ option-sets: short: r description: Run ID. + - name: deployment-reference + options: + - name: series-name + type: string + description: Series Name for a Worker Deployment. + required: true + - name: build-id + type: string + description: Build ID for a Worker Deployment. + required: true + - name: single-workflow-or-batch options: - name: workflow-id @@ -3585,3 +3851,13 @@ option-sets: An external Nexus Endpoint that receives forwarded Nexus requests. May be used as an alternative to `--target-namespace` and `--target-task-queue`. + + - name: query-modifiers + options: + - name: reject-condition + type: string-enum + description: | + Optional flag for rejecting Queries based on Workflow state. + enum-values: + - not_open + - not_completed_cleanly diff --git a/temporalcli/internal/printer/printer.go b/temporalcli/internal/printer/printer.go index 6c3fdac11..d20b806f7 100644 --- a/temporalcli/internal/printer/printer.go +++ b/temporalcli/internal/printer/printer.go @@ -17,6 +17,8 @@ import ( "google.golang.org/protobuf/proto" ) +const NonJSONIndent = " " + type Colorer func(string, ...interface{}) string type Printer struct { @@ -99,6 +101,8 @@ type StructuredOptions struct { // printing. Table *TableOptions OverrideJSONPayloadShorthand *bool + // Indent this many additional times when printing non-JSON + NonJSONExtraIndent int } type Align int @@ -260,6 +264,7 @@ type col struct { width int cardOmitEmpty bool align Align + indentAmount int } type colVal struct { @@ -313,6 +318,7 @@ func adjustColsToOptions(cols []*col, options StructuredOptions) []*col { col.align = align } } + col.indentAmount = options.NonJSONExtraIndent + 1 adjusted = append(adjusted, col) } return adjusted @@ -331,8 +337,9 @@ func (p *Printer) printHeader(cols []*col) { colorer = color.MagentaString } for _, col := range cols { - // We want to indent even the first field - p.writeStr(" ") + for i := 0; i < col.indentAmount; i++ { + p.writeStr(NonJSONIndent) + } p.writeStr(tablewriter.Pad(colorer("%v", col.name), " ", col.width)) } p.writeStr("\n") @@ -346,8 +353,9 @@ func (p *Printer) printRows(cols []*col, rows []map[string]colVal) { func (p *Printer) printRow(cols []*col, row map[string]colVal) { for _, col := range cols { - // We want to indent even the first field - p.writeStr(" ") + for i := 0; i < col.indentAmount; i++ { + p.writeStr(NonJSONIndent) + } p.printCol(col, row[col.name].text) } p.writeStr("\n") @@ -377,6 +385,12 @@ func (p *Printer) printCards(cols []*col, rows []map[string]colVal) { func (p *Printer) printCard(cols []*col, row map[string]colVal) { nameValueRows := make([]map[string]colVal, 0, len(cols)) + indentAmount := 1 + // Since this option applies to everything in a structured print, there should be + // no difference among columns + if len(cols) > 0 { + indentAmount = cols[0].indentAmount + } for _, col := range cols { rowVal := row[col.name].val if !col.cardOmitEmpty || (rowVal != nil && !reflect.ValueOf(row[col.name].val).IsZero()) { @@ -387,10 +401,10 @@ func (p *Printer) printCard(cols []*col, row map[string]colVal) { } } nameValueCols := []*col{ - {name: "Name"}, + {name: "Name", indentAmount: indentAmount}, // We want to set the width to 1 here, because we want it to stretch as far // as it needs to the right - {name: "Value", width: 1}, + {name: "Value", width: 1, indentAmount: indentAmount}, } p.calculateUnsetColWidths(nameValueCols, nameValueRows) p.printRows(nameValueCols, nameValueRows)