Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 418 additions & 0 deletions client/rust/src/gen/api.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import api.submit.{
QueueDeleteRequest,
QueueGetRequest,
QueueList,
RetryPolicy,
RetryPolicyDeleteRequest,
RetryPolicyGetRequest,
RetryPolicyList,
RetryPolicyListRequest,
StreamingQueueGetRequest,
StreamingQueueMessage,
SubmitGrpc
Expand Down Expand Up @@ -317,6 +322,36 @@ private class SubmitMockServer(
): scala.concurrent.Future[BatchQueueUpdateResponse] = {
Future.successful(new BatchQueueUpdateResponse)
}

def createRetryPolicy(
request: RetryPolicy
): scala.concurrent.Future[com.google.protobuf.empty.Empty] = {
Future.successful(new Empty)
}

def updateRetryPolicy(
request: RetryPolicy
): scala.concurrent.Future[com.google.protobuf.empty.Empty] = {
Future.successful(new Empty)
}

def deleteRetryPolicy(
request: RetryPolicyDeleteRequest
): scala.concurrent.Future[com.google.protobuf.empty.Empty] = {
Future.successful(new Empty)
}

def getRetryPolicy(
request: RetryPolicyGetRequest
): scala.concurrent.Future[RetryPolicy] = {
Future.failed(new StatusRuntimeException(Status.NOT_FOUND))
}

def getRetryPolicies(
request: RetryPolicyListRequest
): scala.concurrent.Future[RetryPolicyList] = {
Future.successful(new RetryPolicyList)
}
}

private class JobsMockServer(
Expand Down
5 changes: 5 additions & 0 deletions cmd/armadactl/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func createCmd(a *armadactl.App) *cobra.Command {
}
cmd.Flags().Bool("dry-run", false, "Validate the input file and exit without making any changes.")
cmd.AddCommand(queueCreateCmd())
cmd.AddCommand(retryPolicyCreateCmd())
return cmd
}

Expand All @@ -37,6 +38,7 @@ func deleteCmd() *cobra.Command {
Long: "Delete Armada resource. Supported: queue",
}
cmd.AddCommand(queueDeleteCmd())
cmd.AddCommand(retryPolicyDeleteCmd())
return cmd
}

Expand All @@ -47,6 +49,7 @@ func updateCmd() *cobra.Command {
Long: "Update Armada resource. Supported: queue",
}
cmd.AddCommand(queueUpdateCmd())
cmd.AddCommand(retryPolicyUpdateCmd())
return cmd
}

Expand All @@ -59,6 +62,8 @@ func getCmd() *cobra.Command {
cmd.AddCommand(
queueGetCmd(),
queuesGetCmd(),
retryPolicyGetCmd(),
retryPolicyGetAllCmd(),
getSchedulingReportCmd(armadactl.New()),
getQueueSchedulingReportCmd(armadactl.New()),
getJobSchedulingReportCmd(armadactl.New()),
Expand Down
7 changes: 7 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ce "github.com/armadaproject/armada/pkg/client/executor"
cn "github.com/armadaproject/armada/pkg/client/node"
cq "github.com/armadaproject/armada/pkg/client/queue"
crp "github.com/armadaproject/armada/pkg/client/retrypolicy"
)

// initParams initialises the command parameters, flags, and a configuration file.
Expand All @@ -33,6 +34,12 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.QueueAPI.Preempt = cq.Preempt(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cancel = cq.Cancel(client.ExtractCommandlineArmadaApiConnectionDetails)

params.RetryPolicyAPI.Create = crp.Create(client.ExtractCommandlineArmadaApiConnectionDetails)
params.RetryPolicyAPI.Delete = crp.Delete(client.ExtractCommandlineArmadaApiConnectionDetails)
params.RetryPolicyAPI.Get = crp.Get(client.ExtractCommandlineArmadaApiConnectionDetails)
params.RetryPolicyAPI.GetAll = crp.GetAll(client.ExtractCommandlineArmadaApiConnectionDetails)
params.RetryPolicyAPI.Update = crp.Update(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

Expand Down
14 changes: 14 additions & 0 deletions cmd/armadactl/cmd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
return fmt.Errorf("error converting queue labels to map: %s", err)
}

retryPolicyName, err := cmd.Flags().GetString("retry-policy")
if err != nil {
return fmt.Errorf("error reading retry-policy: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labelsAsMap,
RetryPolicy: retryPolicyName,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
Expand All @@ -83,6 +89,7 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
cmd.Flags().String("retry-policy", "", "Name of the retry policy to assign to this queue.")
return cmd
}

Expand Down Expand Up @@ -230,13 +237,19 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
return fmt.Errorf("error converting queue labels to map: %s", err)
}

retryPolicyName, err := cmd.Flags().GetString("retry-policy")
if err != nil {
return fmt.Errorf("error reading retry-policy: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labelsAsMap,
RetryPolicy: retryPolicyName,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
Expand All @@ -251,5 +264,6 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
cmd.Flags().String("retry-policy", "", "Name of the retry policy to assign to this queue.")
return cmd
}
123 changes: 123 additions & 0 deletions cmd/armadactl/cmd/retrypolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package cmd

import (
"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
)

func retryPolicyCreateCmd() *cobra.Command {
return retryPolicyCreateCmdWithApp(armadactl.New())
}

func retryPolicyCreateCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "retry-policy",
Short: "Create a retry policy from a YAML/JSON file",
Long: "Create a retry policy that defines rules for whether failed jobs should be retried.",
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
filePath, err := cmd.Flags().GetString("file")
if err != nil {
return err
}
return a.CreateRetryPolicyFromFile(filePath)
},
}
cmd.Flags().StringP("file", "f", "", "Path to YAML/JSON file defining the retry policy.")
if err := cmd.MarkFlagRequired("file"); err != nil {
panic(err)
}
return cmd
}

func retryPolicyUpdateCmd() *cobra.Command {
return retryPolicyUpdateCmdWithApp(armadactl.New())
}

func retryPolicyUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "retry-policy",
Short: "Update a retry policy from a YAML/JSON file",
Long: "Update an existing retry policy with the definition from a YAML/JSON file.",
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
filePath, err := cmd.Flags().GetString("file")
if err != nil {
return err
}
return a.UpdateRetryPolicyFromFile(filePath)
},
}
cmd.Flags().StringP("file", "f", "", "Path to YAML/JSON file defining the retry policy.")
if err := cmd.MarkFlagRequired("file"); err != nil {
panic(err)
}
return cmd
}

func retryPolicyGetCmd() *cobra.Command {
return retryPolicyGetCmdWithApp(armadactl.New())
}

func retryPolicyGetCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "retry-policy <name>",
Short: "Get a retry policy by name",
Long: "Get the definition of a retry policy by its name.",
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
return a.GetRetryPolicy(args[0])
},
}
return cmd
}

func retryPolicyGetAllCmd() *cobra.Command {
return retryPolicyGetAllCmdWithApp(armadactl.New())
}

func retryPolicyGetAllCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "retry-policies",
Short: "List all retry policies",
Long: "List all retry policies defined in the system.",
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
return a.GetAllRetryPolicies()
},
}
return cmd
}

func retryPolicyDeleteCmd() *cobra.Command {
return retryPolicyDeleteCmdWithApp(armadactl.New())
}

func retryPolicyDeleteCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "retry-policy <name>",
Short: "Delete a retry policy by name",
Long: "Delete an existing retry policy by its name.",
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
return a.DeleteRetryPolicy(args[0])
},
}
return cmd
}
17 changes: 14 additions & 3 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/armadaproject/armada/pkg/client/executor"
"github.com/armadaproject/armada/pkg/client/node"
"github.com/armadaproject/armada/pkg/client/queue"
"github.com/armadaproject/armada/pkg/client/retrypolicy"
)

type App struct {
Expand All @@ -41,6 +42,7 @@ type App struct {
type Params struct {
ApiConnectionDetails *client.ApiConnectionDetails
QueueAPI *QueueAPI
RetryPolicyAPI *RetryPolicyAPI
ExecutorAPI *ExecutorAPI
NodeAPI *NodeAPI
}
Expand Down Expand Up @@ -69,6 +71,14 @@ type ExecutorAPI struct {
PreemptOnExecutor executor.PreemptAPI
}

type RetryPolicyAPI struct {
Create retrypolicy.CreateAPI
Delete retrypolicy.DeleteAPI
Get retrypolicy.GetAPI
GetAll retrypolicy.GetAllAPI
Update retrypolicy.UpdateAPI
}

type NodeAPI struct {
PreemptOnNode node.PreemptAPI
CancelOnNode node.CancelAPI
Expand All @@ -79,9 +89,10 @@ type NodeAPI struct {
func New() *App {
return &App{
Params: &Params{
QueueAPI: &QueueAPI{},
ExecutorAPI: &ExecutorAPI{},
NodeAPI: &NodeAPI{},
QueueAPI: &QueueAPI{},
RetryPolicyAPI: &RetryPolicyAPI{},
ExecutorAPI: &ExecutorAPI{},
NodeAPI: &NodeAPI{},
},
Out: os.Stdout,
Random: rand.Reader,
Expand Down
8 changes: 8 additions & 0 deletions internal/armadactl/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func (a *App) CreateResource(fileName string, dryRun bool) error {
if !dryRun {
return a.Params.QueueAPI.Create(queue)
}
case client.ResourceKindRetryPolicy:
policy := &api.RetryPolicy{}
if err := util.BindJsonOrYaml(fileName, policy); err != nil {
return errors.Errorf("file %s error: %s", fileName, err)
}
if !dryRun {
return a.Params.RetryPolicyAPI.Create(policy)
}
default:
return errors.Errorf("invalid resource kind: %s", resource.Kind)
}
Expand Down
Loading
Loading