Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, err
}

if !workloadmanager.IsSandboxReady(sandbox) {
return ctrl.Result{}, nil
}

lastActivityStr, exists := sandbox.Annotations[workloadmanager.LastActivityAnnotationKey]
var lastActivity time.Time
if exists && lastActivityStr != "" {
Expand Down
2 changes: 2 additions & 0 deletions pkg/agentd/agentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) {
{
Type: string(sandboxv1alpha1.SandboxConditionReady),
Status: metav1.ConditionTrue,
Reason: "DependenciesReady",
},
},
},
Expand All @@ -69,6 +70,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) {
{
Type: string(sandboxv1alpha1.SandboxConditionReady),
Status: metav1.ConditionTrue,
Reason: "DependenciesReady",
},
},
},
Expand Down
31 changes: 30 additions & 1 deletion pkg/common/types/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,35 @@ type CreateSandboxRequest struct {
Metadata map[string]string `json:"metadata"`
PublicKey string `json:"publicKey,omitempty"`
InitTimeoutSeconds int `json:"initTimeoutSeconds,omitempty"`
ReadinessProbe *ReadinessProbe `json:"readinessProbe,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we donot need to pass thie probe config in the request body, the probe config is already in the pod spec.

For code interpreter, i think we may need to add a new api field in the spec to indicate the probe

}

type ReadinessProbe struct {
InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"`
PeriodSeconds int32 `json:"periodSeconds,omitempty"`
SuccessThreshold int32 `json:"successThreshold,omitempty"`
FailureThreshold int32 `json:"failureThreshold,omitempty"`
TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"`
TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty"`
HTTPGet *HTTPGetAction `json:"httpGet,omitempty"`
Exec *ExecAction `json:"exec,omitempty"`
}

type TCPSocketAction struct {
Port int `json:"port"`
Host string `json:"host,omitempty"`
}

type HTTPGetAction struct {
Path string `json:"path,omitempty"`
Port int `json:"port"`
Host string `json:"host,omitempty"`
Scheme string `json:"scheme,omitempty"`
HTTPHeaders map[string]string `json:"httpHeaders,omitempty"`
}

type ExecAction struct {
Command []string `json:"command,omitempty"`
}

type Auth struct {
Expand Down Expand Up @@ -61,4 +90,4 @@ func (car *CreateSandboxRequest) Validate() error {
return fmt.Errorf("name is required")
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Validate() method doesn't validate the newly added ReadinessProbe. It's a good practice to validate new fields to ensure requests are well-formed. For example, you could check that a ReadinessProbe has exactly one handler (TCPSocket, HTTPGet, or Exec) defined.

}
}
12 changes: 10 additions & 2 deletions pkg/workloadmanager/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/volcano-sh/agentcube/pkg/store"
)

const sandboxReadinessTimeout = 30 * time.Second

// handleHealth handles health check requests
func (s *Server) handleHealth(c *gin.Context) {
respondJSON(c, http.StatusOK, map[string]string{
Expand Down Expand Up @@ -124,9 +126,9 @@ func (s *Server) handleCreateSandbox(c *gin.Context) {
var err error
switch sandboxReq.Kind {
case types.AgentRuntimeKind:
sandbox, externalInfo, err = buildSandboxByAgentRuntime(sandboxReq.Namespace, sandboxReq.Name, s.informers)
sandbox, externalInfo, err = buildSandboxByAgentRuntime(sandboxReq.Namespace, sandboxReq.Name, s.informers, sandboxReq.ReadinessProbe)
case types.CodeInterpreterKind:
sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(sandboxReq.Namespace, sandboxReq.Name, s.informers)
sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(sandboxReq.Namespace, sandboxReq.Name, s.informers, sandboxReq.ReadinessProbe)
default:
klog.Errorf("invalid request kind: %v", sandboxReq.Kind)
respondError(c, http.StatusBadRequest, "INVALID_REQUEST", fmt.Sprintf("invalid request kind: %v", sandboxReq.Kind))
Expand Down Expand Up @@ -199,6 +201,12 @@ func (s *Server) handleCreateSandbox(c *gin.Context) {
return
}

if err := s.k8sClient.WaitForSandboxDependenciesReady(c.Request.Context(), namespace, sandboxName, sandboxReadinessTimeout); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just check IsSandboxReady, and return after ready

klog.Errorf("sandbox %s/%s dependencies not ready: %v", namespace, sandboxName, err)
respondError(c, http.StatusInternalServerError, "SANDBOX_NOT_READY", err.Error())
return
}

needRollbackSandbox := true
sandboxRollbackFunc := func() {
ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down
92 changes: 89 additions & 3 deletions pkg/workloadmanager/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"fmt"
"time"

"k8s.io/klog/v2"

runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -21,8 +19,11 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1"
extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1"

runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1"
)

const (
Expand Down Expand Up @@ -298,6 +299,7 @@ func (c *K8sClient) GetSandboxPodIP(_ context.Context, namespace, sandboxName, p
}

return "", fmt.Errorf("no pod found for sandbox %s", sandboxName)

}

// validateAndGetPodIP validates pod status and returns IP
Expand All @@ -315,6 +317,90 @@ func validateAndGetPodIP(pod *corev1.Pod) (string, error) {
return pod.Status.PodIP, nil
}

// IsPodReady returns true if the pod Ready condition is true.
func IsPodReady(pod *corev1.Pod) bool {
if pod == nil {
return false
}
return isPodReadyConditionTrue(pod.Status)
}

func isPodReadyConditionTrue(status corev1.PodStatus) bool {
for _, condition := range status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue
}
}
return false
}

func (c *K8sClient) getSandboxPod(ctx context.Context, namespace, sandboxName string) (*corev1.Pod, error) {
labelSelector := fmt.Sprintf("sandbox-name=%s", sandboxName)
pods, err := c.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, err
}
if len(pods.Items) == 0 {
return nil, nil
}
pod := pods.Items[0]
return &pod, nil
}

func (c *K8sClient) getSandboxService(ctx context.Context, namespace, sandboxName string) (*corev1.Service, error) {
svc, err := c.clientset.CoreV1().Services(namespace).Get(ctx, sandboxName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return svc, nil
}

func (c *K8sClient) sandboxDependenciesReady(ctx context.Context, namespace, sandboxName string) (bool, error) {
pod, err := c.getSandboxPod(ctx, namespace, sandboxName)
if err != nil {
return false, err
}
podReady := IsPodReady(pod)

svc, err := c.getSandboxService(ctx, namespace, sandboxName)
if err != nil {
return false, err
}
svcReady := svc != nil

return podReady && svcReady, nil
}

// WaitForSandboxDependenciesReady waits until both the sandbox pod and service are ready.
func (c *K8sClient) WaitForSandboxDependenciesReady(ctx context.Context, namespace, sandboxName string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
ready, err := c.sandboxDependenciesReady(ctx, namespace, sandboxName)
if err != nil {
return err
}
if ready {
return nil
}

select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for sandbox dependencies to become ready")
case <-ticker.C:
}
}
}

// WaitForSandboxReady waits for the Sandbox to be ready
func (c *K8sClient) WaitForSandboxReady(ctx context.Context, namespace, sandboxName string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand Down
27 changes: 27 additions & 0 deletions pkg/workloadmanager/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package workloadmanager

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1"
)

const sandboxReadyReasonDependenciesReady = "DependenciesReady"

// IsSandboxReady returns true when the sandbox Ready condition is true and dependencies are confirmed healthy.
func IsSandboxReady(sandbox *sandboxv1alpha1.Sandbox) bool {
if sandbox == nil {
return false
}
return IsSandboxReadyConditionTrue(sandbox.Status)
}

// IsSandboxReadyConditionTrue inspects the Ready condition ensuring the DependenciesReady reason is set.
func IsSandboxReadyConditionTrue(status sandboxv1alpha1.SandboxStatus) bool {
for _, condition := range status.Conditions {
if condition.Type != string(sandboxv1alpha1.SandboxConditionReady) {
continue
}
return condition.Status == metav1.ConditionTrue && condition.Reason == sandboxReadyReasonDependenciesReady
}
return false
}
8 changes: 2 additions & 6 deletions pkg/workloadmanager/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/volcano-sh/agentcube/pkg/common/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -316,11 +315,8 @@ func convertSandboxToStoreCache(sandboxCRD *sandboxv1alpha1.Sandbox, podIP strin

// getSandboxStatus extracts status from Sandbox CRD conditions
func getSandboxStatus(sandbox *sandboxv1alpha1.Sandbox) string {
// Check conditions for Ready status
for _, condition := range sandbox.Status.Conditions {
if condition.Type == string(sandboxv1alpha1.SandboxConditionReady) && condition.Status == metav1.ConditionTrue {
return "running"
}
if IsSandboxReady(sandbox) {
return "running"
}
return "paused"
}
Loading
Loading