Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,40 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string

return nil
}

func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

request := &workflowservice.ResetActivityByIdRequest{
Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Identity: c.Identity,
NoWait: c.NoWait,
ResetHeartbeat: c.ResetHeartbeats,
}

resp, err := cl.WorkflowService().ResetActivityById(cctx, request)
if err != nil {
return fmt.Errorf("unable to reset an Activity: %w", err)
}

resetResponse := struct {
NoWait bool `json:"noWait"`
ResetHeartbeats bool `json:"resetHeartbeats"`
ServerResponse bool `json:"-"`
}{
ServerResponse: resp != nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure users really need this information and not the biggest fan of adding user-facing information only needed for tests, but this does technically confirm server response so I guess good enough.

NoWait: c.NoWait,
ResetHeartbeats: c.ResetHeartbeats,
}

_ = cctx.Printer.PrintStructured(resetResponse, printer.StructuredOptions{})

return nil
}
38 changes: 38 additions & 0 deletions temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/client"
)

Expand Down Expand Up @@ -231,6 +232,43 @@ func (s *SharedServerSuite) TestActivityUnPause_Failed() {
s.Error(res.Err)
}

func (s *SharedServerSuite) TestActivityReset() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
identity := "MyIdentity"

res := s.Execute(
"activity", "reset",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
)

s.NoError(res.Err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't mention it on the last review but I should have. Ideally these tests should confirm the invoked action actually occurs.

Copy link
Contributor Author

@ychebotarev ychebotarev Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't return any values for that. No error indicates that the command was processed successfully. I don't want to write more complicated test - it will be a duplicate of existing functional tests. In actual functional tests I'm faling activity few times, then reseting, then calling DescribeWorkflow to confirm that number of attempts becomes 1.

Copy link
Contributor

@cretz cretz Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know this command did anything? Sometimes we do replicate functional tests in every place we do them to ensure end-to-end behavior. When these tests are written in each SDK (and the CLI is basically an SDK), we make sure the intended effect occurs, yes even if that technically is duplicated in each language.

Compare with the other tests in this repo. We technically could just run a command and check error, but we often want more.

Copy link
Contributor Author

@ychebotarev ychebotarev Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this command reach the server and return no errors. Which means it was a valid call, and it was processed.
How do I know it was correctly processed? Unit tests, functional tests. canary test.
I don't want to replicate those functional tests here, don't want to test server functionality in CLI tests.

Copy link
Contributor

@cretz cretz Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to replicate those functional tests here, don't want to test server functionality in CLI tests.

Not all of them, just confirm it did something, like the other tests in this repository. It should be really easy to describe the workflow and check pending activities or whatever is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike for example for updating activity there is no direct expected side effect - the response is empty.
If there will be any kind of error on server side - like "Not Found" - it will return an error.

Even If the response was not empty - one may apply the same argument "what if I just return expected result directly without calling OSS".
So the proper test will be:

  • make sure that OSS call happens
  • independently make sure that expected effect take place

In this case (just like in my functional test) for the proper testing I need to make activity fail few times, then make DescribeWorkflow call and verify that the number of attempts is greater then 1.
Then block it (otherwise test becomes non-deterministic), then make CLI reset activity call, then make DescribeWorkflow call and verify that the number of attempts is 1. This is literally functional test, just much harder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it'd basically be a functional test that resets after attempt > 1 and confirms it goes back to 1. You don't have to provide full coverage like you do where the implementation is, but at least one end-to-end assertion is worth it. We will surely be writing this same functional test for every SDK we provide this high-level activity reset call on.

Copy link
Contributor Author

@ychebotarev ychebotarev Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it'd basically be a functional test

Which already exist.
And that is why there is no need to do that.

Copy link
Contributor

@cretz cretz Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a need and we do these kinds of end-to-end tests in other tests in all of our SDKs (which includes the CLI). Right now if the CLI code did nothing, this assertion would pass. Or if it did something but as a developer you may have missed actually making the gRPC call, you'd never know. Even with the next test testing for failure, you don't even know what failure it might hit.

Lets validate at least some side effect occurred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, talked offline. I will make a change to make a check that some result (of proper type) is coming from server

// make sure we receive a server response
out := res.Stdout.String()
s.ContainsOnSameLine(out, "ServerResponse", "true")

// reset should fail because activity is not found

res = s.Execute(
"activity", "reset",
"--activity-id", "fake_id",
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
)

s.Error(res.Err)
// make sure we receive a NotFound error from the server`
var notFound *serviceerror.NotFound
s.ErrorAs(res.Err, &notFound)
}

// Test helpers

func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun {
Expand Down
43 changes: 40 additions & 3 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,17 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) *
var s TemporalActivityCommand
s.Parent = parent
s.Command.Use = "activity"
s.Command.Short = "Complete, update, pause, unpause or fail an Activity"
s.Command.Short = "Complete, update, pause, unpause, reset or fail an Activity"
if hasHighlighting {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
} else {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityPauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityResetCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUnpauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUpdateOptionsCommand(cctx, &s).Command)
s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags())
Expand Down Expand Up @@ -445,6 +446,42 @@ func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActiv
return &s
}

type TemporalActivityResetCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Identity string
NoWait bool
ResetHeartbeats bool
}

func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityResetCommand {
var s TemporalActivityResetCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "reset [flags]"
s.Command.Short = "Reset an Activity"
if hasHighlighting {
s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the \x1b[1mno-wait\x1b[0m flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the \x1b[1mno-wait\x1b[0m flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\x1b[0m"
} else {
s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the `no-wait` flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the `no-wait` flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.NoWait, "no-wait", false, "Schedule the Activity immediately, even if its retry timeout has not expired or the activity is currently running.")
s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeat.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalActivityUnpauseCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
Expand Down
52 changes: 47 additions & 5 deletions temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,10 @@ commands:
The command execution timeout. 0s means no timeout.

- name: temporal activity
summary: Complete, update, pause, unpause or fail an Activity
summary: Complete, update, pause, unpause, reset or fail an Activity
description: |
Update an Activity's options or update an Activity's state to completed
or failed.

Pause or unpause an Activity.
Update an Activity's options, manage activity lifecycle or update
an Activity's state to completed or failed.

Updating activity state marks an Activity as successfully finished or as
having encountered an error.
Expand All @@ -230,6 +228,7 @@ commands:
- activity update-options
- activity pause
- activity unpause
- activity reset
- activity execution
- activity fail
- cli reference
Expand Down Expand Up @@ -454,6 +453,49 @@ commands:
option-sets:
- workflow reference

- name: temporal activity reset
summary: Reset an Activity
description: |
Resetting an activity resets both the number of attempts and the activity
timeout. If activity is paused, it will be un-paused.

If the `no-wait` flag is provided, the activity will be rescheduled
immediately. Even if the activity is currently running.
If the `no-wait` flag is not provided, the activity will be scheduled
after the current instance completes, if needed.
If the 'reset_heartbeats' flag is set, the activity heartbeat timer and
heartbeats will be reset.

Specify the Activity and Workflow IDs:

```
temporal activity reset \
--activity-id YourActivityId \
--workflow-id YourWorkflowId
--no-wait
--reset-heartbeats
```
options:
- name: activity-id
short: a
type: string
description: Activity ID to pause.
required: true
- name: identity
type: string
description: Identity of the user submitting this request.
- name: no-wait
type: bool
description: |
Schedule the Activity immediately, even if its retry timeout has not
expired or the activity is currently running.
- name: reset-heartbeats
type: bool
description: Reset the Activity's heartbeat.
option-sets:
- workflow reference



- name: temporal batch
summary: Manage running batch jobs
Expand Down
Loading