diff --git a/api/cluster/resource/templater.go b/api/cluster/resource/templater.go index ecc5fc734..8b28ad179 100644 --- a/api/cluster/resource/templater.go +++ b/api/cluster/resource/templater.go @@ -237,7 +237,7 @@ func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Ser } } - livenessProbeConfig := getLivenessProbeConfig(modelService.PredictorProtocol(), envVars, fmt.Sprintf("/v1/models/%s", modelService.Name)) + livenessProbeConfig := getLivenessProbeConfig(modelService.PredictorProtocol(), envVars, fmt.Sprintf("/v1/models/%s", modelService.Name), modelService.ResourceRequest) containerPorts := createContainerPorts(modelService.PredictorProtocol(), modelService.DeploymentMode) storageUri := utils.CreateModelLocation(modelService.ArtifactURI) @@ -411,7 +411,7 @@ func (t *InferenceServiceTemplater) createTransformerSpec( } } - livenessProbeConfig := getLivenessProbeConfig(modelService.Protocol, envVars, "/") + livenessProbeConfig := getLivenessProbeConfig(modelService.Protocol, envVars, "/", transformer.ResourceRequest) containerPorts := createContainerPorts(modelService.Protocol, modelService.DeploymentMode) transformerSpec := &kservev1beta1.TransformerSpec{ @@ -515,7 +515,7 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic return envVars } -func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe { +func createHTTPGetLivenessProbe(httpPath string, port int, resourceRequest *models.ResourceRequest) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ @@ -526,45 +526,81 @@ func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe { }, }, }, - InitialDelaySeconds: liveProbeInitialDelaySec, - TimeoutSeconds: liveProbeTimeoutSec, - PeriodSeconds: liveProbePeriodSec, - SuccessThreshold: liveProbeSuccessThreshold, - FailureThreshold: liveProbeFailureThreshold, + InitialDelaySeconds: getLivenessProbeInitialDelaySeconds(resourceRequest), + TimeoutSeconds: getLivenessProbeTimeoutSeconds(resourceRequest), + PeriodSeconds: getLivenessProbePeriodSeconds(resourceRequest), + SuccessThreshold: getLivenessProbeSuccessThreshold(resourceRequest), + FailureThreshold: getLivenessProbeFailureThreshold(resourceRequest), } } -func createGRPCLivenessProbe(port int) *corev1.Probe { +func createGRPCLivenessProbe(port int, resourceRequest *models.ResourceRequest) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ Command: []string{grpcHealthProbeCommand, fmt.Sprintf("-addr=:%d", port)}, }, }, - InitialDelaySeconds: liveProbeInitialDelaySec, - TimeoutSeconds: liveProbeTimeoutSec, - PeriodSeconds: liveProbePeriodSec, - SuccessThreshold: liveProbeSuccessThreshold, - FailureThreshold: liveProbeFailureThreshold, + InitialDelaySeconds: getLivenessProbeInitialDelaySeconds(resourceRequest), + TimeoutSeconds: getLivenessProbeTimeoutSeconds(resourceRequest), + PeriodSeconds: getLivenessProbePeriodSeconds(resourceRequest), + SuccessThreshold: getLivenessProbeSuccessThreshold(resourceRequest), + FailureThreshold: getLivenessProbeFailureThreshold(resourceRequest), } } -func getLivenessProbeConfig(protocol prt.Protocol, envVars []corev1.EnvVar, httpPath string) *corev1.Probe { +func getLivenessProbeConfig(protocol prt.Protocol, envVars []corev1.EnvVar, httpPath string, resourceRequest *models.ResourceRequest) *corev1.Probe { // liveness probe config. if env var to disable != true or not set, it will default to enabled var livenessProbeConfig *corev1.Probe = nil envVarsMap := getEnvVarMap(envVars) if !strings.EqualFold(envVarsMap[envOldDisableLivenessProbe].Value, "true") && !strings.EqualFold(envVarsMap[envDisableLivenessProbe].Value, "true") { - livenessProbeConfig = createLivenessProbeSpec(protocol, httpPath) + livenessProbeConfig = createLivenessProbeSpec(protocol, httpPath, resourceRequest) } return livenessProbeConfig } -func createLivenessProbeSpec(protocol prt.Protocol, httpPath string) *corev1.Probe { +func createLivenessProbeSpec(protocol prt.Protocol, httpPath string, resourceRequest *models.ResourceRequest) *corev1.Probe { if protocol == prt.UpiV1 { - return createGRPCLivenessProbe(defaultGRPCPort) + return createGRPCLivenessProbe(defaultGRPCPort, resourceRequest) } - return createHTTPGetLivenessProbe(httpPath, defaultHTTPPort) + return createHTTPGetLivenessProbe(httpPath, defaultHTTPPort, resourceRequest) +} + +// Helper functions to get liveness probe values with fallback to defaults +func getLivenessProbeInitialDelaySeconds(resourceRequest *models.ResourceRequest) int32 { + if resourceRequest != nil && resourceRequest.LivenessProbeInitialDelaySeconds != nil { + return *resourceRequest.LivenessProbeInitialDelaySeconds + } + return liveProbeInitialDelaySec +} + +func getLivenessProbeTimeoutSeconds(resourceRequest *models.ResourceRequest) int32 { + if resourceRequest != nil && resourceRequest.LivenessProbeTimeoutSeconds != nil { + return *resourceRequest.LivenessProbeTimeoutSeconds + } + return liveProbeTimeoutSec +} + +func getLivenessProbePeriodSeconds(resourceRequest *models.ResourceRequest) int32 { + if resourceRequest != nil && resourceRequest.LivenessProbePeriodSeconds != nil { + return *resourceRequest.LivenessProbePeriodSeconds + } + return liveProbePeriodSec +} + +func getLivenessProbeSuccessThreshold(resourceRequest *models.ResourceRequest) int32 { + if resourceRequest != nil && resourceRequest.LivenessProbeSuccessThreshold != nil { + return *resourceRequest.LivenessProbeSuccessThreshold + } + return liveProbeSuccessThreshold +} + +func getLivenessProbeFailureThreshold(resourceRequest *models.ResourceRequest) int32 { + if resourceRequest != nil && resourceRequest.LivenessProbeFailureThreshold != nil { + return *resourceRequest.LivenessProbeFailureThreshold + } + return liveProbeFailureThreshold } func createPredictorHost(modelService *models.Service) string { diff --git a/api/cluster/resource/templater_gpu_test.go b/api/cluster/resource/templater_gpu_test.go index ab7b6e534..d3d4dfb60 100644 --- a/api/cluster/resource/templater_gpu_test.go +++ b/api/cluster/resource/templater_gpu_test.go @@ -119,8 +119,8 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) { storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI) // Liveness probe config for the model containers - probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) - probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) + probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) + probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) tests := []struct { name string diff --git a/api/cluster/resource/templater_test.go b/api/cluster/resource/templater_test.go index 411139a73..ff0e9e620 100644 --- a/api/cluster/resource/templater_test.go +++ b/api/cluster/resource/templater_test.go @@ -287,8 +287,8 @@ func TestCreateInferenceServiceSpec(t *testing.T) { storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI) // Liveness probe config for the model containers - probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) - probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) + probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) + probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) tests := []struct { name string @@ -2075,12 +2075,12 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) { storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI) // Liveness probe config for the model containers - probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) - probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) + probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) + probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) // Liveness probe config for the transformers - transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/") - transformerProbeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, "/") + transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil) + transformerProbeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, "/", nil) tests := []struct { name string modelSvc *models.Service @@ -3016,10 +3016,10 @@ func TestCreateInferenceServiceSpecWithLogger(t *testing.T) { storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI) // Liveness probe config for the model containers - probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) + probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) // Liveness probe config for the transformers - transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/") + transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil) tests := []struct { name string @@ -3503,10 +3503,10 @@ func TestCreateInferenceServiceSpecWithTopologySpreadConstraints(t *testing.T) { storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI) // Liveness probe config for the model containers - probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name)) + probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil) // Liveness probe config for the transformers - transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/") + transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil) tests := []struct { name string @@ -4419,7 +4419,7 @@ func TestCreateTransformerSpec(t *testing.T) { customCPULimit := resource.MustParse("8") // Liveness probe config for the transformers - transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/") + transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil) modelSvc := &models.Service{ Name: "model-1", diff --git a/api/models/resource_request.go b/api/models/resource_request.go index f92fae9f8..5fc5c537a 100644 --- a/api/models/resource_request.go +++ b/api/models/resource_request.go @@ -37,6 +37,16 @@ type ResourceRequest struct { GPUName string `json:"gpu_name,omitempty"` // GPU Quantity requests GPURequest resource.Quantity `json:"gpu_request,omitempty"` + // Liveness probe initial delay seconds + LivenessProbeInitialDelaySeconds *int32 `json:"liveness_probe_initial_delay_seconds,omitempty"` + // Liveness probe period seconds + LivenessProbePeriodSeconds *int32 `json:"liveness_probe_period_seconds,omitempty"` + // Liveness probe timeout seconds + LivenessProbeTimeoutSeconds *int32 `json:"liveness_probe_timeout_seconds,omitempty"` + // Liveness probe success threshold + LivenessProbeSuccessThreshold *int32 `json:"liveness_probe_success_threshold,omitempty"` + // Liveness probe failure threshold + LivenessProbeFailureThreshold *int32 `json:"liveness_probe_failure_threshold,omitempty"` } func (r ResourceRequest) Value() (driver.Value, error) { diff --git a/python/pyfunc-server/pyfuncserver/protocol/upi/server.py b/python/pyfunc-server/pyfuncserver/protocol/upi/server.py index b0cb47a78..5363c2b05 100644 --- a/python/pyfunc-server/pyfuncserver/protocol/upi/server.py +++ b/python/pyfunc-server/pyfuncserver/protocol/upi/server.py @@ -57,7 +57,7 @@ def start(self): # multiprocessing based on https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing workers = [] for _ in range(self._config.workers - 1): - worker = multiprocessing.Process(target=self._run_server) + worker = multiprocessing.Process(target=self._run_server_sync) worker.start() workers.append(worker) @@ -67,7 +67,13 @@ def start(self): publisher = Publisher(kafka_producer, sampler) self._predict_service.set_publisher(publisher) - asyncio.get_event_loop().run_until_complete(self._run_server()) + self._run_server_sync() + + def _run_server_sync(self): + """Synchronous wrapper to run the async server in a new event loop.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._run_server()) async def _run_server(self): """ diff --git a/swagger.yaml b/swagger.yaml index b76351eb2..8fd1ebb06 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -2117,6 +2117,21 @@ components: type: string gpu_request: type: string + liveness_probe_initial_delay_seconds: + type: integer + description: Initial delay in seconds before liveness probe is started + liveness_probe_period_seconds: + type: integer + description: How often (in seconds) to perform the liveness probe + liveness_probe_timeout_seconds: + type: integer + description: Number of seconds after which the liveness probe times out + liveness_probe_success_threshold: + type: integer + description: Minimum consecutive successes for the liveness probe to be considered successful + liveness_probe_failure_threshold: + type: integer + description: Minimum consecutive failures for the liveness probe to be considered failed AutoscalingPolicy: type: object required: diff --git a/ui/src/components/ResourcesConfigTable.js b/ui/src/components/ResourcesConfigTable.js index ab0c8eb8b..da5921d54 100644 --- a/ui/src/components/ResourcesConfigTable.js +++ b/ui/src/components/ResourcesConfigTable.js @@ -27,6 +27,11 @@ export const ResourcesConfigTable = ({ max_replica, gpu_name, gpu_request, + liveness_probe_initial_delay_seconds, + liveness_probe_period_seconds, + liveness_probe_timeout_seconds, + liveness_probe_success_threshold, + liveness_probe_failure_threshold, }, }) => { const items = [ @@ -68,6 +73,42 @@ export const ResourcesConfigTable = ({ }); } + // Add liveness probe configuration if any value is set + if (liveness_probe_initial_delay_seconds !== undefined && liveness_probe_initial_delay_seconds !== null) { + items.push({ + title: "Liveness Initial Delay", + description: `${liveness_probe_initial_delay_seconds}s`, + }); + } + + if (liveness_probe_period_seconds !== undefined && liveness_probe_period_seconds !== null) { + items.push({ + title: "Liveness Period", + description: `${liveness_probe_period_seconds}s`, + }); + } + + if (liveness_probe_timeout_seconds !== undefined && liveness_probe_timeout_seconds !== null) { + items.push({ + title: "Liveness Timeout", + description: `${liveness_probe_timeout_seconds}s`, + }); + } + + if (liveness_probe_success_threshold !== undefined && liveness_probe_success_threshold !== null) { + items.push({ + title: "Liveness Success Threshold", + description: liveness_probe_success_threshold, + }); + } + + if (liveness_probe_failure_threshold !== undefined && liveness_probe_failure_threshold !== null) { + items.push({ + title: "Liveness Failure Threshold", + description: liveness_probe_failure_threshold, + }); + } + return ( { + const { onChange } = useOnChangeHandler(onChangeHandler); + + return ( + Liveness Probe Configuration

} + description={ + + Configure the liveness probe settings for your deployment. + These settings determine how Kubernetes checks if your container is still running. + + } + fullWidth + > + + + + } + isInvalid={!!errors.liveness_probe_initial_delay_seconds} + error={errors.liveness_probe_initial_delay_seconds} + fullWidth + > + onChange("liveness_probe_initial_delay_seconds")(e.target.value ? parseInt(e.target.value) : undefined)} + isInvalid={!!errors.liveness_probe_initial_delay_seconds} + name="liveness_probe_initial_delay_seconds" + min={0} + fullWidth + /> + + + + + + } + isInvalid={!!errors.liveness_probe_period_seconds} + error={errors.liveness_probe_period_seconds} + fullWidth + > + onChange("liveness_probe_period_seconds")(e.target.value ? parseInt(e.target.value) : undefined)} + isInvalid={!!errors.liveness_probe_period_seconds} + name="liveness_probe_period_seconds" + min={1} + fullWidth + /> + + + + + + } + isInvalid={!!errors.liveness_probe_timeout_seconds} + error={errors.liveness_probe_timeout_seconds} + fullWidth + > + onChange("liveness_probe_timeout_seconds")(e.target.value ? parseInt(e.target.value) : undefined)} + isInvalid={!!errors.liveness_probe_timeout_seconds} + name="liveness_probe_timeout_seconds" + min={1} + fullWidth + /> + + + + + + } + isInvalid={!!errors.liveness_probe_success_threshold} + error={errors.liveness_probe_success_threshold} + fullWidth + > + onChange("liveness_probe_success_threshold")(e.target.value ? parseInt(e.target.value) : undefined)} + isInvalid={!!errors.liveness_probe_success_threshold} + name="liveness_probe_success_threshold" + min={1} + fullWidth + /> + + + + + + } + isInvalid={!!errors.liveness_probe_failure_threshold} + error={errors.liveness_probe_failure_threshold} + fullWidth + > + onChange("liveness_probe_failure_threshold")(e.target.value ? parseInt(e.target.value) : undefined)} + isInvalid={!!errors.liveness_probe_failure_threshold} + name="liveness_probe_failure_threshold" + min={1} + fullWidth + /> + + + +
+ ); +}; diff --git a/ui/src/pages/version/components/forms/steps/ModelStep.js b/ui/src/pages/version/components/forms/steps/ModelStep.js index aaf726204..e33a99d07 100644 --- a/ui/src/pages/version/components/forms/steps/ModelStep.js +++ b/ui/src/pages/version/components/forms/steps/ModelStep.js @@ -14,6 +14,7 @@ import { LoggerPanel } from "../components/LoggerPanel"; import { ResourcesPanel } from "../components/ResourcesPanel"; import { ImageBuilderSection } from "../components/ImageBuilderSection"; import { CPULimitsFormGroup } from "../components/CPULimitsFormGroup"; +import { LivenessProbeFormGroup } from "../components/LivnessConfigFormGroup"; export const ModelStep = ({ version, isEnvironmentDisabled = false, maxAllowedReplica, setMaxAllowedReplica }) => { const { data, onChangeHandler } = useContext(FormContext); @@ -52,6 +53,12 @@ export const ModelStep = ({ version, isEnvironmentDisabled = false, maxAllowedRe onChangeHandler={onChange("resource_request")} errors={get(errors, "resource_request")} /> + + { const { @@ -52,6 +53,12 @@ export const TransformerStep = ({ maxAllowedReplica }) => { onChangeHandler={onChange("transformer.resource_request")} errors={get(errors, "transformer.resource_request")} /> + + } /> diff --git a/ui/src/services/transformer/Transformer.js b/ui/src/services/transformer/Transformer.js index 01bd4fb5b..e41c8207b 100644 --- a/ui/src/services/transformer/Transformer.js +++ b/ui/src/services/transformer/Transformer.js @@ -23,7 +23,12 @@ export class Transformer { max_replica: process.env.REACT_APP_ENVIRONMENT === "production" ? 4 : 2, cpu_request: "500m", cpu_limit: "", - memory_request: "512Mi" + memory_request: "512Mi", + liveness_probe_initial_delay_seconds: undefined, + liveness_probe_period_seconds: undefined, + liveness_probe_timeout_seconds: undefined, + liveness_probe_success_threshold: undefined, + liveness_probe_failure_threshold: undefined, }; this.env_vars = []; diff --git a/ui/src/services/version_endpoint/VersionEndpoint.js b/ui/src/services/version_endpoint/VersionEndpoint.js index eac1c5ecf..b82ced8c9 100644 --- a/ui/src/services/version_endpoint/VersionEndpoint.js +++ b/ui/src/services/version_endpoint/VersionEndpoint.js @@ -28,6 +28,11 @@ export class VersionEndpoint { cpu_request: "500m", cpu_limit: "", memory_request: "512Mi", + liveness_probe_initial_delay_seconds: undefined, + liveness_probe_period_seconds: undefined, + liveness_probe_timeout_seconds: undefined, + liveness_probe_success_threshold: undefined, + liveness_probe_failure_threshold: undefined, }; this.image_builder_resource_request = {