Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Ser
}
}

livenessProbeConfig := getLivenessProbeConfig(modelService.PredictorProtocol(), envVars, fmt.Sprintf("/v1/models/%s", modelService.Name))
livenessProbeConfig := getLivenessProbeConfig(modelService.PredictorProtocol(), envVars, fmt.Sprintf("/v1/models/%s", modelService.Name), modelService.ResourceRequest)

containerPorts := createContainerPorts(modelService.PredictorProtocol(), modelService.DeploymentMode)
storageUri := utils.CreateModelLocation(modelService.ArtifactURI)
Expand Down Expand Up @@ -411,7 +411,7 @@ func (t *InferenceServiceTemplater) createTransformerSpec(
}
}

livenessProbeConfig := getLivenessProbeConfig(modelService.Protocol, envVars, "/")
livenessProbeConfig := getLivenessProbeConfig(modelService.Protocol, envVars, "/", transformer.ResourceRequest)

containerPorts := createContainerPorts(modelService.Protocol, modelService.DeploymentMode)
transformerSpec := &kservev1beta1.TransformerSpec{
Expand Down Expand Up @@ -515,7 +515,7 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic
return envVars
}

func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {
func createHTTPGetLivenessProbe(httpPath string, port int, resourceRequest *models.ResourceRequest) *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Expand All @@ -526,45 +526,81 @@ func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {
},
},
},
InitialDelaySeconds: liveProbeInitialDelaySec,
TimeoutSeconds: liveProbeTimeoutSec,
PeriodSeconds: liveProbePeriodSec,
SuccessThreshold: liveProbeSuccessThreshold,
FailureThreshold: liveProbeFailureThreshold,
InitialDelaySeconds: getLivenessProbeInitialDelaySeconds(resourceRequest),
TimeoutSeconds: getLivenessProbeTimeoutSeconds(resourceRequest),
PeriodSeconds: getLivenessProbePeriodSeconds(resourceRequest),
SuccessThreshold: getLivenessProbeSuccessThreshold(resourceRequest),
FailureThreshold: getLivenessProbeFailureThreshold(resourceRequest),
}
}

func createGRPCLivenessProbe(port int) *corev1.Probe {
func createGRPCLivenessProbe(port int, resourceRequest *models.ResourceRequest) *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{grpcHealthProbeCommand, fmt.Sprintf("-addr=:%d", port)},
},
},
InitialDelaySeconds: liveProbeInitialDelaySec,
TimeoutSeconds: liveProbeTimeoutSec,
PeriodSeconds: liveProbePeriodSec,
SuccessThreshold: liveProbeSuccessThreshold,
FailureThreshold: liveProbeFailureThreshold,
InitialDelaySeconds: getLivenessProbeInitialDelaySeconds(resourceRequest),
TimeoutSeconds: getLivenessProbeTimeoutSeconds(resourceRequest),
PeriodSeconds: getLivenessProbePeriodSeconds(resourceRequest),
SuccessThreshold: getLivenessProbeSuccessThreshold(resourceRequest),
FailureThreshold: getLivenessProbeFailureThreshold(resourceRequest),
}
}

func getLivenessProbeConfig(protocol prt.Protocol, envVars []corev1.EnvVar, httpPath string) *corev1.Probe {
func getLivenessProbeConfig(protocol prt.Protocol, envVars []corev1.EnvVar, httpPath string, resourceRequest *models.ResourceRequest) *corev1.Probe {
// liveness probe config. if env var to disable != true or not set, it will default to enabled
var livenessProbeConfig *corev1.Probe = nil
envVarsMap := getEnvVarMap(envVars)
if !strings.EqualFold(envVarsMap[envOldDisableLivenessProbe].Value, "true") &&
!strings.EqualFold(envVarsMap[envDisableLivenessProbe].Value, "true") {
livenessProbeConfig = createLivenessProbeSpec(protocol, httpPath)
livenessProbeConfig = createLivenessProbeSpec(protocol, httpPath, resourceRequest)
}
return livenessProbeConfig
}

func createLivenessProbeSpec(protocol prt.Protocol, httpPath string) *corev1.Probe {
func createLivenessProbeSpec(protocol prt.Protocol, httpPath string, resourceRequest *models.ResourceRequest) *corev1.Probe {
if protocol == prt.UpiV1 {
return createGRPCLivenessProbe(defaultGRPCPort)
return createGRPCLivenessProbe(defaultGRPCPort, resourceRequest)
}
return createHTTPGetLivenessProbe(httpPath, defaultHTTPPort)
return createHTTPGetLivenessProbe(httpPath, defaultHTTPPort, resourceRequest)
}

// Helper functions to get liveness probe values with fallback to defaults
func getLivenessProbeInitialDelaySeconds(resourceRequest *models.ResourceRequest) int32 {
if resourceRequest != nil && resourceRequest.LivenessProbeInitialDelaySeconds != nil {
return *resourceRequest.LivenessProbeInitialDelaySeconds
}
return liveProbeInitialDelaySec
}

func getLivenessProbeTimeoutSeconds(resourceRequest *models.ResourceRequest) int32 {
if resourceRequest != nil && resourceRequest.LivenessProbeTimeoutSeconds != nil {
return *resourceRequest.LivenessProbeTimeoutSeconds
}
return liveProbeTimeoutSec
}

func getLivenessProbePeriodSeconds(resourceRequest *models.ResourceRequest) int32 {
if resourceRequest != nil && resourceRequest.LivenessProbePeriodSeconds != nil {
return *resourceRequest.LivenessProbePeriodSeconds
}
return liveProbePeriodSec
}

func getLivenessProbeSuccessThreshold(resourceRequest *models.ResourceRequest) int32 {
if resourceRequest != nil && resourceRequest.LivenessProbeSuccessThreshold != nil {
return *resourceRequest.LivenessProbeSuccessThreshold
}
return liveProbeSuccessThreshold
}

func getLivenessProbeFailureThreshold(resourceRequest *models.ResourceRequest) int32 {
if resourceRequest != nil && resourceRequest.LivenessProbeFailureThreshold != nil {
return *resourceRequest.LivenessProbeFailureThreshold
}
return liveProbeFailureThreshold
}

func createPredictorHost(modelService *models.Service) string {
Expand Down
4 changes: 2 additions & 2 deletions api/cluster/resource/templater_gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) {
storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI)

// Liveness probe config for the model containers
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)

tests := []struct {
name string
Expand Down
22 changes: 11 additions & 11 deletions api/cluster/resource/templater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ func TestCreateInferenceServiceSpec(t *testing.T) {
storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI)

// Liveness probe config for the model containers
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)

tests := []struct {
name string
Expand Down Expand Up @@ -2075,12 +2075,12 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) {
storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI)

// Liveness probe config for the model containers
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)
probeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)

// Liveness probe config for the transformers
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/")
transformerProbeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, "/")
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil)
transformerProbeConfigUPI := createLivenessProbeSpec(protocol.UpiV1, "/", nil)
tests := []struct {
name string
modelSvc *models.Service
Expand Down Expand Up @@ -3016,10 +3016,10 @@ func TestCreateInferenceServiceSpecWithLogger(t *testing.T) {
storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI)

// Liveness probe config for the model containers
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)

// Liveness probe config for the transformers
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/")
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil)

tests := []struct {
name string
Expand Down Expand Up @@ -3503,10 +3503,10 @@ func TestCreateInferenceServiceSpecWithTopologySpreadConstraints(t *testing.T) {
storageUri := fmt.Sprintf("%s/model", modelSvc.ArtifactURI)

// Liveness probe config for the model containers
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name))
probeConfig := createLivenessProbeSpec(protocol.HttpJson, fmt.Sprintf("/v1/models/%s", modelSvc.Name), nil)

// Liveness probe config for the transformers
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/")
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil)

tests := []struct {
name string
Expand Down Expand Up @@ -4419,7 +4419,7 @@ func TestCreateTransformerSpec(t *testing.T) {
customCPULimit := resource.MustParse("8")

// Liveness probe config for the transformers
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/")
transformerProbeConfig := createLivenessProbeSpec(protocol.HttpJson, "/", nil)

modelSvc := &models.Service{
Name: "model-1",
Expand Down
10 changes: 10 additions & 0 deletions api/models/resource_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type ResourceRequest struct {
GPUName string `json:"gpu_name,omitempty"`
// GPU Quantity requests
GPURequest resource.Quantity `json:"gpu_request,omitempty"`
// Liveness probe initial delay seconds
LivenessProbeInitialDelaySeconds *int32 `json:"liveness_probe_initial_delay_seconds,omitempty"`
// Liveness probe period seconds
LivenessProbePeriodSeconds *int32 `json:"liveness_probe_period_seconds,omitempty"`
// Liveness probe timeout seconds
LivenessProbeTimeoutSeconds *int32 `json:"liveness_probe_timeout_seconds,omitempty"`
// Liveness probe success threshold
LivenessProbeSuccessThreshold *int32 `json:"liveness_probe_success_threshold,omitempty"`
// Liveness probe failure threshold
LivenessProbeFailureThreshold *int32 `json:"liveness_probe_failure_threshold,omitempty"`
}

func (r ResourceRequest) Value() (driver.Value, error) {
Expand Down
10 changes: 8 additions & 2 deletions python/pyfunc-server/pyfuncserver/protocol/upi/server.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry if i'm not aware. why do we need to change this? is there a relation between liveness probe with this change?

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def start(self):
# multiprocessing based on https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing
workers = []
for _ in range(self._config.workers - 1):
worker = multiprocessing.Process(target=self._run_server)
worker = multiprocessing.Process(target=self._run_server_sync)
worker.start()
workers.append(worker)

Expand All @@ -67,7 +67,13 @@ def start(self):
publisher = Publisher(kafka_producer, sampler)
self._predict_service.set_publisher(publisher)

asyncio.get_event_loop().run_until_complete(self._run_server())
self._run_server_sync()

def _run_server_sync(self):
"""Synchronous wrapper to run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run_server())

async def _run_server(self):
"""
Expand Down
15 changes: 15 additions & 0 deletions swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,21 @@ components:
type: string
gpu_request:
type: string
liveness_probe_initial_delay_seconds:
type: integer
description: Initial delay in seconds before liveness probe is started
liveness_probe_period_seconds:
type: integer
description: How often (in seconds) to perform the liveness probe
liveness_probe_timeout_seconds:
type: integer
description: Number of seconds after which the liveness probe times out
liveness_probe_success_threshold:
type: integer
description: Minimum consecutive successes for the liveness probe to be considered successful
liveness_probe_failure_threshold:
type: integer
description: Minimum consecutive failures for the liveness probe to be considered failed
AutoscalingPolicy:
type: object
required:
Expand Down
41 changes: 41 additions & 0 deletions ui/src/components/ResourcesConfigTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ export const ResourcesConfigTable = ({
max_replica,
gpu_name,
gpu_request,
liveness_probe_initial_delay_seconds,
liveness_probe_period_seconds,
liveness_probe_timeout_seconds,
liveness_probe_success_threshold,
liveness_probe_failure_threshold,
},
}) => {
const items = [
Expand Down Expand Up @@ -68,6 +73,42 @@ export const ResourcesConfigTable = ({
});
}

// Add liveness probe configuration if any value is set
if (liveness_probe_initial_delay_seconds !== undefined && liveness_probe_initial_delay_seconds !== null) {
items.push({
title: "Liveness Initial Delay",
description: `${liveness_probe_initial_delay_seconds}s`,
});
}

if (liveness_probe_period_seconds !== undefined && liveness_probe_period_seconds !== null) {
items.push({
title: "Liveness Period",
description: `${liveness_probe_period_seconds}s`,
});
}

if (liveness_probe_timeout_seconds !== undefined && liveness_probe_timeout_seconds !== null) {
items.push({
title: "Liveness Timeout",
description: `${liveness_probe_timeout_seconds}s`,
});
}

if (liveness_probe_success_threshold !== undefined && liveness_probe_success_threshold !== null) {
items.push({
title: "Liveness Success Threshold",
description: liveness_probe_success_threshold,
});
}

if (liveness_probe_failure_threshold !== undefined && liveness_probe_failure_threshold !== null) {
items.push({
title: "Liveness Failure Threshold",
description: liveness_probe_failure_threshold,
});
}

return (
<EuiDescriptionList
compressed
Expand Down
Loading
Loading