Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/temporalio/cli/temporalcli/internal/printer"
activitypb "go.temporal.io/api/activity/v1"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
Expand Down Expand Up @@ -230,28 +231,70 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
}
defer cl.Close()

request := &workflowservice.UnpauseActivityRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
},
ResetAttempts: c.ResetAttempts,
ResetHeartbeat: c.ResetHeartbeats,
Identity: c.Identity,
opts := SingleWorkflowOrBatchOptions{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
Query: c.Query,
Reason: c.Reason,
Yes: c.Yes,
Rps: c.Rps,
}

if c.ActivityType != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Type{Type: c.ActivityType}
} else if c.ActivityId != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId}
} else {
return fmt.Errorf("either Activity Type or Activity Id must be provided")
exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{
// You're allowed to specify a reason when terminating a workflow
AllowReasonWithWorkflowID: true,
})
if err != nil {
return err
}

_, err = cl.WorkflowService().UnpauseActivity(cctx, request)
if err != nil {
return fmt.Errorf("unable to uppause an Activity: %w", err)
if exec != nil { // single workflow operation
request := &workflowservice.UnpauseActivityRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
},
ResetAttempts: c.ResetAttempts,
ResetHeartbeat: c.ResetHeartbeats,
Jitter: durationpb.New(c.Jitter.Duration()),
Identity: c.Identity,
}

if c.ActivityType != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Type{Type: c.ActivityType}
} else if c.ActivityId != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId}
} else {
return fmt.Errorf("either Activity Type or Activity Id must be provided")
}

_, err = cl.WorkflowService().UnpauseActivity(cctx, request)
if err != nil {
return fmt.Errorf("unable to uppause an Activity: %w", err)
}
} else { // batch operation
unpauseActivitiesOperation := &batch.BatchOperationUnpauseActivities{
Identity: clientIdentity(),
ResetAttempts: c.ResetAttempts,
ResetHeartbeat: c.ResetHeartbeats,
Jitter: durationpb.New(c.Jitter.Duration()),
}
if c.ActivityType != "" {
unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_Type{Type: c.ActivityType}
} else if c.MatchAll == true {
unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_MatchAll{MatchAll: true}
} else {
return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true")
}

batchReq.Operation = &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
UnpauseActivitiesOperation: unpauseActivitiesOperation,
}

if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}

return nil
Expand Down
Loading