From 571abf754bfa1e22b81a63e5ff5e9cc07e689a73 Mon Sep 17 00:00:00 2001 From: LeslieKuo <676365950@qq.com> Date: Mon, 22 Dec 2025 00:08:14 +0800 Subject: [PATCH 1/2] add readinessProbe --- pkg/common/types/sandbox.go | 31 ++++++++- pkg/workloadmanager/handlers.go | 14 +++- pkg/workloadmanager/k8s_client.go | 90 ++++++++++++++++++++++++- pkg/workloadmanager/workload_builder.go | 85 ++++++++++++++++++++--- 4 files changed, 204 insertions(+), 16 deletions(-) diff --git a/pkg/common/types/sandbox.go b/pkg/common/types/sandbox.go index 6c659734..f87f3dc6 100644 --- a/pkg/common/types/sandbox.go +++ b/pkg/common/types/sandbox.go @@ -33,6 +33,35 @@ type CreateSandboxRequest struct { Metadata map[string]string `json:"metadata"` PublicKey string `json:"publicKey,omitempty"` InitTimeoutSeconds int `json:"initTimeoutSeconds,omitempty"` + ReadinessProbe *ReadinessProbe `json:"readinessProbe,omitempty"` +} + +type ReadinessProbe struct { + InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"` + PeriodSeconds int32 `json:"periodSeconds,omitempty"` + SuccessThreshold int32 `json:"successThreshold,omitempty"` + FailureThreshold int32 `json:"failureThreshold,omitempty"` + TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"` + TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty"` + HTTPGet *HTTPGetAction `json:"httpGet,omitempty"` + Exec *ExecAction `json:"exec,omitempty"` +} + +type TCPSocketAction struct { + Port int `json:"port"` + Host string `json:"host,omitempty"` +} + +type HTTPGetAction struct { + Path string `json:"path,omitempty"` + Port int `json:"port"` + Host string `json:"host,omitempty"` + Scheme string `json:"scheme,omitempty"` + HTTPHeaders map[string]string `json:"httpHeaders,omitempty"` +} + +type ExecAction struct { + Command []string `json:"command,omitempty"` } type Auth struct { @@ -61,4 +90,4 @@ func (car *CreateSandboxRequest) Validate() error { return fmt.Errorf("name is required") } return nil -} +} \ No newline at end of file diff --git a/pkg/workloadmanager/handlers.go b/pkg/workloadmanager/handlers.go index bc1e397d..4c03601c 100644 --- a/pkg/workloadmanager/handlers.go +++ b/pkg/workloadmanager/handlers.go @@ -19,6 +19,8 @@ import ( "github.com/volcano-sh/agentcube/pkg/store" ) +const sandboxReadinessTimeout = 30 * time.Second + // handleHealth handles health check requests func (s *Server) handleHealth(c *gin.Context) { respondJSON(c, http.StatusOK, map[string]string{ @@ -110,7 +112,7 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { case strings.HasSuffix(reqPath, "/code-interpreter"): sandboxReq.Kind = types.CodeInterpreterKind default: - } + } if err := sandboxReq.Validate(); err != nil { klog.Errorf("request body validation failed: %v", err) @@ -124,9 +126,9 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { var err error switch sandboxReq.Kind { case types.AgentRuntimeKind: - sandbox, externalInfo, err = buildSandboxByAgentRuntime(sandboxReq.Namespace, sandboxReq.Name, s.informers) + sandbox, externalInfo, err = buildSandboxByAgentRuntime(createAgentRequest.Namespace, createAgentRequest.Name, s.informers, createAgentRequest.ReadinessProbe) case types.CodeInterpreterKind: - sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(sandboxReq.Namespace, sandboxReq.Name, s.informers) + sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(createAgentRequest.Namespace, createAgentRequest.Name, s.informers, createAgentRequest.ReadinessProbe) default: klog.Errorf("invalid request kind: %v", sandboxReq.Kind) respondError(c, http.StatusBadRequest, "INVALID_REQUEST", fmt.Sprintf("invalid request kind: %v", sandboxReq.Kind)) @@ -199,6 +201,12 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { return } + if err := s.k8sClient.WaitForSandboxDependenciesReady(c.Request.Context(), namespace, sandboxName, sandboxReadinessTimeout); err != nil { + log.Printf("sandbox %s/%s dependencies not ready: %v", namespace, sandboxName, err) + respondError(c, http.StatusInternalServerError, "SANDBOX_NOT_READY", err.Error()) + return + } + needRollbackSandbox := true sandboxRollbackFunc := func() { ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/pkg/workloadmanager/k8s_client.go b/pkg/workloadmanager/k8s_client.go index 9ce13f7c..b21eb0d1 100644 --- a/pkg/workloadmanager/k8s_client.go +++ b/pkg/workloadmanager/k8s_client.go @@ -6,9 +6,8 @@ import ( "time" "k8s.io/klog/v2" - - runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -23,6 +22,8 @@ import ( "k8s.io/client-go/tools/clientcmd" sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1" + + runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1" ) const ( @@ -298,7 +299,6 @@ func (c *K8sClient) GetSandboxPodIP(_ context.Context, namespace, sandboxName, p } return "", fmt.Errorf("no pod found for sandbox %s", sandboxName) -} // validateAndGetPodIP validates pod status and returns IP func validateAndGetPodIP(pod *corev1.Pod) (string, error) { @@ -315,6 +315,90 @@ func validateAndGetPodIP(pod *corev1.Pod) (string, error) { return pod.Status.PodIP, nil } +// IsPodReady returns true if the pod Ready condition is true. +func IsPodReady(pod *corev1.Pod) bool { + if pod == nil { + return false + } + return isPodReadyConditionTrue(pod.Status) +} + +func isPodReadyConditionTrue(status corev1.PodStatus) bool { + for _, condition := range status.Conditions { + if condition.Type == corev1.PodReady { + return condition.Status == corev1.ConditionTrue + } + } + return false +} + +func (c *K8sClient) getSandboxPod(ctx context.Context, namespace, sandboxName string) (*corev1.Pod, error) { + labelSelector := fmt.Sprintf("sandbox-name=%s", sandboxName) + pods, err := c.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, err + } + if len(pods.Items) == 0 { + return nil, nil + } + pod := pods.Items[0] + return &pod, nil +} + +func (c *K8sClient) getSandboxService(ctx context.Context, namespace, sandboxName string) (*corev1.Service, error) { + svc, err := c.clientset.CoreV1().Services(namespace).Get(ctx, sandboxName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return svc, nil +} + +func (c *K8sClient) sandboxDependenciesReady(ctx context.Context, namespace, sandboxName string) (bool, error) { + pod, err := c.getSandboxPod(ctx, namespace, sandboxName) + if err != nil { + return false, err + } + podReady := IsPodReady(pod) + + svc, err := c.getSandboxService(ctx, namespace, sandboxName) + if err != nil { + return false, err + } + svcReady := svc != nil + + return podReady && svcReady, nil +} + +// WaitForSandboxDependenciesReady waits until both the sandbox pod and service are ready. +func (c *K8sClient) WaitForSandboxDependenciesReady(ctx context.Context, namespace, sandboxName string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + ready, err := c.sandboxDependenciesReady(ctx, namespace, sandboxName) + if err != nil { + return err + } + if ready { + return nil + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for sandbox dependencies to become ready") + case <-ticker.C: + } + } +} + // WaitForSandboxReady waits for the Sandbox to be ready func (c *K8sClient) WaitForSandboxReady(ctx context.Context, namespace, sandboxName string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/pkg/workloadmanager/workload_builder.go b/pkg/workloadmanager/workload_builder.go index c86f2158..ccd0641c 100644 --- a/pkg/workloadmanager/workload_builder.go +++ b/pkg/workloadmanager/workload_builder.go @@ -5,16 +5,18 @@ import ( "time" "github.com/google/uuid" - runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1" - "github.com/volcano-sh/agentcube/pkg/common/types" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/ptr" sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1" + + runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1" + "github.com/volcano-sh/agentcube/pkg/common/types" ) type buildSandboxParams struct { @@ -27,6 +29,7 @@ type buildSandboxParams struct { podSpec corev1.PodSpec podLabels map[string]string podAnnotations map[string]string + readinessProbe *types.ReadinessProbe } type buildSandboxClaimParams struct { @@ -81,6 +84,68 @@ func buildSandboxObject(params *buildSandboxParams) *sandboxv1alpha1.Sandbox { } sandbox.Spec.PodTemplate.ObjectMeta.Labels[SessionIdLabelKey] = params.sessionID sandbox.Spec.PodTemplate.ObjectMeta.Labels["sandbox-name"] = params.sandboxName + + // Handle Readiness Probe + if len(sandbox.Spec.PodTemplate.Spec.Containers) > 0 { + container := &sandbox.Spec.PodTemplate.Spec.Containers[0] + + if params.readinessProbe != nil { + probe := &corev1.Probe{ + InitialDelaySeconds: params.readinessProbe.InitialDelaySeconds, + PeriodSeconds: params.readinessProbe.PeriodSeconds, + SuccessThreshold: params.readinessProbe.SuccessThreshold, + FailureThreshold: params.readinessProbe.FailureThreshold, + TimeoutSeconds: params.readinessProbe.TimeoutSeconds, + } + + if params.readinessProbe.TCPSocket != nil { + probe.ProbeHandler = corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(params.readinessProbe.TCPSocket.Port), + Host: params.readinessProbe.TCPSocket.Host, + }, + } + } else if params.readinessProbe.HTTPGet != nil { + headers := []corev1.HTTPHeader{} + for k, v := range params.readinessProbe.HTTPGet.HTTPHeaders { + headers = append(headers, corev1.HTTPHeader{Name: k, Value: v}) + } + probe.ProbeHandler = corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: params.readinessProbe.HTTPGet.Path, + Port: intstr.FromInt(params.readinessProbe.HTTPGet.Port), + Host: params.readinessProbe.HTTPGet.Host, + Scheme: corev1.URIScheme(params.readinessProbe.HTTPGet.Scheme), + HTTPHeaders: headers, + }, + } + } else if params.readinessProbe.Exec != nil { + probe.ProbeHandler = corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: params.readinessProbe.Exec.Command, + }, + } + } + container.ReadinessProbe = probe + } else if container.ReadinessProbe == nil { + // Add default TCP probe if ports are available + if len(container.Ports) > 0 { + container.ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(int(container.Ports[0].ContainerPort)), + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 3, + TimeoutSeconds: 1, + } + } + } + } + return sandbox } @@ -108,7 +173,7 @@ func buildSandboxClaimObject(params *buildSandboxClaimParams) *extensionsv1alpha return sandboxClaim } -func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers) (*sandboxv1alpha1.Sandbox, *sandboxExternalInfo, error) { +func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers, readinessProbe *types.ReadinessProbe) (*sandboxv1alpha1.Sandbox, *sandboxExternalInfo, error) { agentRuntimeKey := namespace + "/" + name runtimeObj, exists, err := ifm.AgentRuntimeInformer.GetStore().GetByKey(agentRuntimeKey) if err != nil { @@ -136,11 +201,12 @@ func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers) ( sessionID := uuid.New().String() sandboxName := fmt.Sprintf("%s-%s", name, RandString(8)) buildParams := &buildSandboxParams{ - namespace: namespace, - workloadName: name, - sandboxName: sandboxName, - sessionID: sessionID, - podSpec: agentRuntimeObj.Spec.Template.Spec, + namespace: namespace, + workloadName: name, + sandboxName: sandboxName, + sessionID: sessionID, + podSpec: agentRuntimeObj.Spec.Template.Spec, + readinessProbe: readinessProbe, } if agentRuntimeObj.Spec.MaxSessionDuration != nil { buildParams.ttl = agentRuntimeObj.Spec.MaxSessionDuration.Duration @@ -158,7 +224,7 @@ func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers) ( return sandbox, externalInfo, nil } -func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, ifm *Informers) (*sandboxv1alpha1.Sandbox, *extensionsv1alpha1.SandboxClaim, *sandboxExternalInfo, error) { +func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, ifm *Informers, readinessProbe *types.ReadinessProbe) (*sandboxv1alpha1.Sandbox, *extensionsv1alpha1.SandboxClaim, *sandboxExternalInfo, error) { codeInterpreterKey := namespace + "/" + codeInterpreterName runtimeObj, exists, err := ifm.CodeInterpreterInformer.GetStore().GetByKey(codeInterpreterKey) if err != nil { @@ -257,6 +323,7 @@ func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, podSpec: podSpec, podLabels: codeInterpreterObj.Spec.Template.Labels, podAnnotations: codeInterpreterObj.Spec.Template.Annotations, + readinessProbe: readinessProbe, } if codeInterpreterObj.Spec.MaxSessionDuration != nil { buildParams.ttl = codeInterpreterObj.Spec.MaxSessionDuration.Duration From 4ba1c99a542ab18f9b12c54e12a54141ac5e2b41 Mon Sep 17 00:00:00 2001 From: LeslieKuo <676365950@qq.com> Date: Mon, 22 Dec 2025 09:48:04 +0800 Subject: [PATCH 2/2] Improve sandbox readiness checks --- pkg/agentd/agentd.go | 4 ++++ pkg/agentd/agentd_test.go | 2 ++ pkg/workloadmanager/handlers.go | 8 ++++---- pkg/workloadmanager/k8s_client.go | 4 +++- pkg/workloadmanager/readiness.go | 27 +++++++++++++++++++++++++++ pkg/workloadmanager/store.go | 8 ++------ 6 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 pkg/workloadmanager/readiness.go diff --git a/pkg/agentd/agentd.go b/pkg/agentd/agentd.go index e2606877..6692bbe2 100644 --- a/pkg/agentd/agentd.go +++ b/pkg/agentd/agentd.go @@ -33,6 +33,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, err } + if !workloadmanager.IsSandboxReady(sandbox) { + return ctrl.Result{}, nil + } + lastActivityStr, exists := sandbox.Annotations[workloadmanager.LastActivityAnnotationKey] var lastActivity time.Time if exists && lastActivityStr != "" { diff --git a/pkg/agentd/agentd_test.go b/pkg/agentd/agentd_test.go index da9b4d98..e739bc68 100644 --- a/pkg/agentd/agentd_test.go +++ b/pkg/agentd/agentd_test.go @@ -46,6 +46,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) { { Type: string(sandboxv1alpha1.SandboxConditionReady), Status: metav1.ConditionTrue, + Reason: "DependenciesReady", }, }, }, @@ -69,6 +70,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) { { Type: string(sandboxv1alpha1.SandboxConditionReady), Status: metav1.ConditionTrue, + Reason: "DependenciesReady", }, }, }, diff --git a/pkg/workloadmanager/handlers.go b/pkg/workloadmanager/handlers.go index 4c03601c..a46f6b9c 100644 --- a/pkg/workloadmanager/handlers.go +++ b/pkg/workloadmanager/handlers.go @@ -112,7 +112,7 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { case strings.HasSuffix(reqPath, "/code-interpreter"): sandboxReq.Kind = types.CodeInterpreterKind default: - } + } if err := sandboxReq.Validate(); err != nil { klog.Errorf("request body validation failed: %v", err) @@ -126,9 +126,9 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { var err error switch sandboxReq.Kind { case types.AgentRuntimeKind: - sandbox, externalInfo, err = buildSandboxByAgentRuntime(createAgentRequest.Namespace, createAgentRequest.Name, s.informers, createAgentRequest.ReadinessProbe) + sandbox, externalInfo, err = buildSandboxByAgentRuntime(sandboxReq.Namespace, sandboxReq.Name, s.informers, sandboxReq.ReadinessProbe) case types.CodeInterpreterKind: - sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(createAgentRequest.Namespace, createAgentRequest.Name, s.informers, createAgentRequest.ReadinessProbe) + sandbox, sandboxClaim, externalInfo, err = buildSandboxByCodeInterpreter(sandboxReq.Namespace, sandboxReq.Name, s.informers, sandboxReq.ReadinessProbe) default: klog.Errorf("invalid request kind: %v", sandboxReq.Kind) respondError(c, http.StatusBadRequest, "INVALID_REQUEST", fmt.Sprintf("invalid request kind: %v", sandboxReq.Kind)) @@ -202,7 +202,7 @@ func (s *Server) handleCreateSandbox(c *gin.Context) { } if err := s.k8sClient.WaitForSandboxDependenciesReady(c.Request.Context(), namespace, sandboxName, sandboxReadinessTimeout); err != nil { - log.Printf("sandbox %s/%s dependencies not ready: %v", namespace, sandboxName, err) + klog.Errorf("sandbox %s/%s dependencies not ready: %v", namespace, sandboxName, err) respondError(c, http.StatusInternalServerError, "SANDBOX_NOT_READY", err.Error()) return } diff --git a/pkg/workloadmanager/k8s_client.go b/pkg/workloadmanager/k8s_client.go index b21eb0d1..9ed160ea 100644 --- a/pkg/workloadmanager/k8s_client.go +++ b/pkg/workloadmanager/k8s_client.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "k8s.io/klog/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +19,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1" @@ -300,6 +300,8 @@ func (c *K8sClient) GetSandboxPodIP(_ context.Context, namespace, sandboxName, p return "", fmt.Errorf("no pod found for sandbox %s", sandboxName) +} + // validateAndGetPodIP validates pod status and returns IP func validateAndGetPodIP(pod *corev1.Pod) (string, error) { // Check if Pod is running diff --git a/pkg/workloadmanager/readiness.go b/pkg/workloadmanager/readiness.go new file mode 100644 index 00000000..56c770f0 --- /dev/null +++ b/pkg/workloadmanager/readiness.go @@ -0,0 +1,27 @@ +package workloadmanager + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" +) + +const sandboxReadyReasonDependenciesReady = "DependenciesReady" + +// IsSandboxReady returns true when the sandbox Ready condition is true and dependencies are confirmed healthy. +func IsSandboxReady(sandbox *sandboxv1alpha1.Sandbox) bool { + if sandbox == nil { + return false + } + return IsSandboxReadyConditionTrue(sandbox.Status) +} + +// IsSandboxReadyConditionTrue inspects the Ready condition ensuring the DependenciesReady reason is set. +func IsSandboxReadyConditionTrue(status sandboxv1alpha1.SandboxStatus) bool { + for _, condition := range status.Conditions { + if condition.Type != string(sandboxv1alpha1.SandboxConditionReady) { + continue + } + return condition.Status == metav1.ConditionTrue && condition.Reason == sandboxReadyReasonDependenciesReady + } + return false +} diff --git a/pkg/workloadmanager/store.go b/pkg/workloadmanager/store.go index 81f8dae7..d2dd1d86 100644 --- a/pkg/workloadmanager/store.go +++ b/pkg/workloadmanager/store.go @@ -9,7 +9,6 @@ import ( "time" "github.com/volcano-sh/agentcube/pkg/common/types" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" @@ -316,11 +315,8 @@ func convertSandboxToStoreCache(sandboxCRD *sandboxv1alpha1.Sandbox, podIP strin // getSandboxStatus extracts status from Sandbox CRD conditions func getSandboxStatus(sandbox *sandboxv1alpha1.Sandbox) string { - // Check conditions for Ready status - for _, condition := range sandbox.Status.Conditions { - if condition.Type == string(sandboxv1alpha1.SandboxConditionReady) && condition.Status == metav1.ConditionTrue { - return "running" - } + if IsSandboxReady(sandbox) { + return "running" } return "paused" }