diff --git a/surfkit/runtime/agent/kube.py b/surfkit/runtime/agent/kube.py index 1c4bd86..b04008e 100644 --- a/surfkit/runtime/agent/kube.py +++ b/surfkit/runtime/agent/kube.py @@ -10,7 +10,7 @@ import urllib.error import urllib.parse import urllib.request -from typing import Dict, Iterator, List, Optional, Tuple, Type, Union +from typing import Dict, Iterator, Literal, List, Optional, Tuple, Type, Union from agentdesk.util import find_open_port from google.auth.transport.requests import Request @@ -40,6 +40,10 @@ logger = logging.getLogger(__name__) +class APIOpts(BaseModel): + url: str + + class GKEOpts(BaseModel): cluster_name: str region: str @@ -51,8 +55,9 @@ class LocalOpts(BaseModel): class KubeConnectConfig(BaseModel): - provider: str = "local" + provider: Literal["api", "gke", "local"] = "local" namespace: str = "default" + api_opts: Optional[APIOpts] = None gke_opts: Optional[GKEOpts] = None local_opts: Optional[LocalOpts] = None @@ -65,7 +70,12 @@ def __init__(self, cfg: Optional[KubeConnectConfig] = None) -> None: if not cfg: cfg = KubeConnectConfig() self.cfg = cfg - if cfg.provider == "gke": + if cfg.provider == "api": + opts = cfg.api_opts + if not opts: + raise ValueError("API opts missing") + self.connect_to_api(opts) + elif cfg.provider == "gke": opts = cfg.gke_opts if not opts: raise ValueError("GKE opts missing") @@ -247,6 +257,13 @@ def connect_config(self) -> KubeConnectConfig: def connect(cls, cfg: KubeConnectConfig) -> "KubeAgentRuntime": return cls(cfg) + def connect_to_api(self, opts: APIOpts) -> Tuple[client.CoreV1Api, str, str]: + configuration = client.Configuration() + configuration.host = opts.url + api_client = client.ApiClient(configuration) + v1_client = client.CoreV1Api(api_client) + return v1_client, "unknown", "anonymous" + @retry(stop=stop_after_attempt(15)) def connect_to_gke(self, opts: GKEOpts) -> Tuple[client.CoreV1Api, str, str]: """ diff --git a/surfkit/runtime/container/kube.py b/surfkit/runtime/container/kube.py index 503eacf..2577198 100644 --- a/surfkit/runtime/container/kube.py +++ b/surfkit/runtime/container/kube.py @@ -5,7 +5,7 @@ import urllib.error import urllib.parse import urllib.request -from typing import List, Optional, Tuple, Type +from typing import Literal, List, Optional, Tuple, Type from google.auth.transport.requests import Request from google.cloud import container_v1 @@ -22,6 +22,10 @@ from .base import ContainerRuntime +class APIOpts(BaseModel): + url: str + + class GKEOpts(BaseModel): cluster_name: str region: str @@ -33,8 +37,9 @@ class LocalOpts(BaseModel): class ConnectConfig(BaseModel): - provider: str = "local" + provider: Literal["api", "gke", "local"] = "local" namespace: str = "default" + api_opts: Optional[APIOpts] = None gke_opts: Optional[GKEOpts] = None local_opts: Optional[LocalOpts] = None @@ -44,7 +49,12 @@ class KubernetesRuntime(ContainerRuntime): def __init__(self, cfg: ConnectConfig) -> None: # Load the Kubernetes configuration, typically from ~/.kube/config - if cfg.provider == "gke": + if cfg.provider == "api": + opts = cfg.api_opts + if not opts: + raise ValueError("API opts missing") + self.connect_to_api(opts) + elif cfg.provider == "gke": opts = cfg.gke_opts if not opts: raise ValueError("GKE opts missing") @@ -137,6 +147,13 @@ def connect_config_type(cls) -> Type[ConnectConfig]: def connect(cls, cfg: ConnectConfig) -> "KubernetesRuntime": return cls(cfg) + def connect_to_api(self, opts: APIOpts) -> Tuple[client.CoreV1Api, str, str]: + configuration = client.Configuration() + configuration.host = opts.url + api_client = client.ApiClient(configuration) + v1_client = client.CoreV1Api(api_client) + return v1_client, "unknown", "anonymous" + @retry(stop=stop_after_attempt(15)) def connect_to_gke(self, opts: GKEOpts) -> Tuple[client.CoreV1Api, str, str]: """