Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gcm/monitoring/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
99 changes: 99 additions & 0 deletions gcm/monitoring/kubernetes/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from __future__ import annotations

import logging
from typing import Iterable

from gcm.monitoring.kubernetes.client import KubernetesClient
from gcm.schemas.kubernetes.node import KubernetesNodeConditionRow
from gcm.schemas.kubernetes.pod import KubernetesPodRow

logger = logging.getLogger(__name__)

_SLURM_JOB_ANNOTATION = "slurm.coreweave.com/job-id"


class KubernetesApiClient(KubernetesClient):
"""Kubernetes client that queries the Kubernetes API via the official Python client.

Requires the ``kubernetes`` package: ``pip install kubernetes``.
"""

def __init__(self, *, in_cluster: bool = True) -> None:
try:
import kubernetes # type: ignore[import-not-found] # noqa: F401
except ImportError:
raise RuntimeError(
"The 'kubernetes' package is required for KubernetesApiClient. "
"Install it with: pip install 'gpucm[kubernetes]'"
)

from kubernetes import client, config # type: ignore[import-not-found]

if in_cluster:
config.load_incluster_config()
else:
config.load_kube_config()

self._core_api = client.CoreV1Api()

def list_pods(
self, namespace: str = "", label_selector: str = ""
) -> Iterable[KubernetesPodRow]:
try:
if namespace:
response = self._core_api.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
)
else:
response = self._core_api.list_pod_for_all_namespaces(
label_selector=label_selector,
)
except Exception as e:
raise RuntimeError(f"Failed to list pods: {e}") from e

for pod in response.items:
annotations = pod.metadata.annotations or {}
slurm_job_id = annotations.get(_SLURM_JOB_ANNOTATION)

container_statuses = pod.status.container_statuses or []
if container_statuses:
for cs in container_statuses:
yield KubernetesPodRow(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
node_name=pod.spec.node_name,
phase=pod.status.phase,
restart_count=cs.restart_count,
container_name=cs.name,
slurm_job_id=slurm_job_id,
)
else:
yield KubernetesPodRow(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
node_name=pod.spec.node_name,
phase=pod.status.phase,
restart_count=0,
container_name=None,
slurm_job_id=slurm_job_id,
)

def list_node_conditions(self) -> Iterable[KubernetesNodeConditionRow]:
try:
response = self._core_api.list_node()
except Exception as e:
raise RuntimeError(f"Failed to list nodes: {e}") from e

for node in response.items:
conditions = node.status.conditions or []
for condition in conditions:
yield KubernetesNodeConditionRow(
name=node.metadata.name,
condition_type=condition.type,
status=condition.status,
reason=condition.reason,
message=condition.message,
)
30 changes: 30 additions & 0 deletions gcm/monitoring/kubernetes/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from __future__ import annotations

from typing import Iterable, Protocol

from gcm.schemas.kubernetes.node import KubernetesNodeConditionRow
from gcm.schemas.kubernetes.pod import KubernetesPodRow


class KubernetesClient(Protocol):
"""A low-level Kubernetes client for pod and node monitoring."""

def list_pods(
self, namespace: str = "", label_selector: str = ""
) -> Iterable[KubernetesPodRow]:
"""Get pod information from the Kubernetes API.

Args:
namespace: Kubernetes namespace to filter pods. Empty string means all namespaces.
label_selector: Kubernetes label selector to filter pods.

If an error occurs during execution, RuntimeError should be raised.
"""

def list_node_conditions(self) -> Iterable[KubernetesNodeConditionRow]:
"""Get node condition information from the Kubernetes API.

If an error occurs during execution, RuntimeError should be raised.
"""
29 changes: 29 additions & 0 deletions gcm/monitoring/kubernetes/fake_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Iterable, List

from gcm.monitoring.kubernetes.client import KubernetesClient
from gcm.schemas.kubernetes.node import KubernetesNodeConditionRow
from gcm.schemas.kubernetes.pod import KubernetesPodRow


@dataclass
class KubernetesFakeClient(KubernetesClient):
"""A fake Kubernetes client for testing with injectable pod and node data."""

pods: List[KubernetesPodRow] = field(default_factory=list)
node_conditions: List[KubernetesNodeConditionRow] = field(default_factory=list)

def list_pods(
self, namespace: str = "", label_selector: str = ""
) -> Iterable[KubernetesPodRow]:
for pod in self.pods:
if namespace and pod.namespace != namespace:
continue
yield pod

def list_node_conditions(self) -> Iterable[KubernetesNodeConditionRow]:
yield from self.node_conditions
2 changes: 2 additions & 0 deletions gcm/schemas/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
27 changes: 27 additions & 0 deletions gcm/schemas/kubernetes/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from dataclasses import dataclass

from gcm.schemas.slurm.derived_cluster import DerivedCluster


@dataclass
class KubernetesNodeConditionRow:
"""Kubernetes node condition schema.

Fields correspond to node condition data from the Kubernetes API.
"""

name: str | None = None
condition_type: str | None = None
status: str | None = None
reason: str | None = None
message: str | None = None


@dataclass(kw_only=True)
class KubernetesNodePayload(DerivedCluster):
ds: str
collection_unixtime: int
cluster: str
node_condition: KubernetesNodeConditionRow
29 changes: 29 additions & 0 deletions gcm/schemas/kubernetes/pod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from dataclasses import dataclass

from gcm.schemas.slurm.derived_cluster import DerivedCluster


@dataclass
class KubernetesPodRow:
"""Kubernetes pod status schema.

Fields correspond to pod metadata and container status from the Kubernetes API.
"""

name: str | None = None
namespace: str | None = None
node_name: str | None = None
phase: str | None = None
restart_count: int | None = None
container_name: str | None = None
slurm_job_id: str | None = None


@dataclass(kw_only=True)
class KubernetesPodPayload(DerivedCluster):
ds: str
collection_unixtime: int
cluster: str
pod: KubernetesPodRow
Loading
Loading