diff --git a/src/api/api.py b/src/api/api.py index 7842adc..2fa06ea 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -6,6 +6,7 @@ from fastapi.responses import JSONResponse from src.utils.hub_client import init_hub_client_with_robot, get_node_id_by_robot +from src.utils.other import extract_hub_envs from src.api.oauth import valid_access_token from src.resources.database.entity import Database from src.resources.analysis.entity import CreateAnalysis @@ -24,13 +25,9 @@ class PodOrchestrationAPI: def __init__(self, database: Database, namespace: str = 'default'): self.database = database - robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy = (os.getenv('HUB_ROBOT_USER'), - os.getenv('HUB_ROBOT_SECRET'), - os.getenv('HUB_URL_CORE'), - os.getenv('HUB_URL_AUTH'), - os.getenv('PO_HTTP_PROXY'), - os.getenv('PO_HTTPS_PROXY')) + robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() + self.enable_hub_logging = enable_hub_logging self.hub_core_client = init_hub_client_with_robot(robot_id, robot_secret, hub_url_core, @@ -229,7 +226,7 @@ def cleanup_call(self, cleanup_type: str): def stream_logs_call(self, body: CreateLogEntity): try: - return stream_logs(body, self.node_id, self.database, self.hub_core_client) + return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client) except Exception as e: raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}") @@ -238,4 +235,4 @@ def health_call(self): if not main_alive: raise RuntimeError("Main thread is not alive.") else: - return {"status": "ok"} + return {'status': "ok"} diff --git a/src/api/oauth.py b/src/api/oauth.py index 4dd7e95..6adee59 100644 --- a/src/api/oauth.py +++ b/src/api/oauth.py @@ -7,20 +7,20 @@ from typing import Annotated oauth2_scheme = OAuth2AuthorizationCodeBearer( - tokenUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token", - authorizationUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/auth", - refreshUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token", + tokenUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token", + authorizationUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/auth", + refreshUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token", ) async def valid_access_token(token: Annotated[str, Depends(oauth2_scheme)]) -> dict: - url = os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/certs" + url = os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/certs" jwks_client = PyJWKClient(url) try: sig_key = jwks_client.get_signing_key_from_jwt(token) return jwt.decode(token, key=sig_key, - options={"verify_signature": True, "verify_aud": False, "exp": True}) + options={'verify_signature': True, 'verify_aud': False, 'exp': True}) except jwt.exceptions.InvalidTokenError: raise HTTPException(status_code=401, detail="Not authenticated") diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 495c76e..243e226 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -4,10 +4,10 @@ from typing import Optional import string -from kubernetes import client, config +from kubernetes import client from src.resources.database.entity import Database -from src.k8s.utils import get_k8s_resource_names +from src.k8s.utils import find_k8s_resources, delete_k8s_resource PORTS = {'nginx': [80], @@ -15,10 +15,6 @@ 'service': [80]} -def load_cluster_config(): - config.load_incluster_config() - - def create_harbor_secret(host_address: str, user: str, password: str, @@ -31,11 +27,11 @@ def create_harbor_secret(host_address: str, string_data={'docker-server': host_address, 'docker-username': user.replace('$', '\$'), 'docker-password': password, - '.dockerconfigjson': json.dumps({"auths": + '.dockerconfigjson': json.dumps({'auths': {host_address: - {"username": user, - "password": password, - "auth": base64.b64encode(f'{user}:{password}'.encode("ascii")).decode("ascii")}}})} + {'username': user, + 'password': password, + 'auth': base64.b64encode(f"{user}:{password}".encode("ascii")).decode("ascii")}}})} ) try: core_client.create_namespaced_secret(namespace=namespace, body=secret) @@ -47,7 +43,7 @@ def create_harbor_secret(host_address: str, if e.reason != 'Conflict': raise e else: - print('Conflict remains unresolved!') + print("Conflict remains unresolved!") raise e @@ -63,14 +59,14 @@ def create_analysis_deployment(name: str, period_seconds=20, failure_threshold=1, timeout_seconds=5) - container1 = client.V1Container(name=name, image=image, image_pull_policy="IfNotPresent", + container1 = client.V1Container(name=name, image=image, image_pull_policy='IfNotPresent', ports=[client.V1ContainerPort(PORTS['analysis'][0])], env=[client.V1EnvVar(name=key, value=val) for key, val in env.items()], #liveness_probe=liveness_probe, ) containers.append(container1) - labels = {'app': name, 'component': 'flame-analysis'} + labels = {'app': name, 'component': "flame-analysis"} depl_metadata = client.V1ObjectMeta(name=name, namespace=namespace, labels=labels) depl_pod_metadata = client.V1ObjectMeta(labels=labels) depl_selector = client.V1LabelSelector(match_labels=labels) @@ -96,55 +92,8 @@ def create_analysis_deployment(name: str, return _get_pods(name) -def delete_resource(name: str, resource_type: str, namespace: str = 'default') -> None: - """ - Deletes a Kubernetes resource by name and type. - :param name: Name of the resource to delete. - :param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap'). - :param namespace: Namespace in which the resource exists. - """ - print(f"Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") - if resource_type == 'deployment': - try: - app_client = client.AppsV1Api() - app_client.delete_namespaced_deployment(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Not Found: {name} deployment") - elif resource_type == 'service': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_service(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Not Found: {name} service") - elif resource_type == 'pod': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_pod(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Not Found: {name} pod") - elif resource_type == 'configmap': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_config_map(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Not Found: {name} configmap") - elif resource_type == 'networkpolicy': - try: - network_client = client.NetworkingV1Api() - network_client.delete_namespaced_network_policy(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Not Found: {name} networkpolicy") - else: - raise ValueError(f"Unsupported resource type: {resource_type}") - - def delete_deployment(deployment_name: str, namespace: str = 'default') -> None: - print(f"Deleting deployment {deployment_name} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"PO ACTION - Deleting deployment {deployment_name} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") app_client = client.AppsV1Api() for name in [deployment_name, f'nginx-{deployment_name}']: try: @@ -152,19 +101,19 @@ def delete_deployment(deployment_name: str, namespace: str = 'default') -> None: _delete_service(name, namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found {name}") + print(f"Error: Not Found {name}") network_client = client.NetworkingV1Api() try: network_client.delete_namespaced_network_policy(name=f'nginx-to-{deployment_name}-policy', namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found nginx-to-{deployment_name}-policy") + print(f"Error: Not Found nginx-to-{deployment_name}-policy") core_client = client.CoreV1Api() try: core_client.delete_namespaced_config_map(name=f"nginx-{deployment_name}-config", namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found {deployment_name}-config") + print(f"Error: Not Found {deployment_name}-config") def get_analysis_logs(deployment_names: dict[str, str], @@ -177,45 +126,16 @@ def get_analysis_logs(deployment_names: dict[str, str], :param namespace: :return: """ - return {"analysis": {analysis_id: _get_logs(name=deployment_name, + return {'analysis': {analysis_id: _get_logs(name=deployment_name, pod_ids=database.get_deployment_pod_ids(deployment_name), namespace=namespace) for analysis_id, deployment_name in deployment_names.items()}, - "nginx": {analysis_id: _get_logs(name=f"nginx-{deployment_name}", + 'nginx': {analysis_id: _get_logs(name=f"nginx-{deployment_name}", namespace=namespace) for analysis_id, deployment_name in deployment_names.items()} } -def delete_analysis_pods(deployment_name: str, project_id: str, namespace: str = 'default') -> None: - print(f"Deleting pods of deployment {deployment_name} in namespace {namespace} at " - f"{time.strftime('%Y-%m-%d %H:%M:%S')}") - core_client = client.CoreV1Api() - # delete nginx deployment - delete_resource(f'nginx-{deployment_name}', 'deployment', namespace) - delete_resource(f'nginx-{deployment_name}', 'service', namespace) - delete_resource(f'nginx-{deployment_name}-config', 'configmap', namespace) - - - # get pods in deployment - pods = core_client.list_namespaced_pod(namespace=namespace, label_selector=f'app={deployment_name}').items - for pod in pods: - delete_resource(pod.metadata.name, 'pod', namespace) - - # delete network policy - delete_resource(f'nginx-to-{deployment_name}-policy', 'networkpolicy', namespace) - - # create new nginx deployment and policy - _create_analysis_nginx_deployment(analysis_name=deployment_name, - analysis_service_name=get_k8s_resource_names('service', - 'label', - f'app={deployment_name}', - namespace=namespace), - analysis_env={'PROJECT_ID': project_id, - 'ANALYSIS_ID': deployment_name.split('analysis-')[-1].rsplit('-', 1)[0]}, - namespace=namespace) - - def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional[dict[str, dict[str, str]]]: core_client = client.CoreV1Api() @@ -330,16 +250,16 @@ def _create_nginx_config_map(analysis_name: str, core_client = client.CoreV1Api() # get the service name of the message broker - message_broker_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-message-broker', - namespace=namespace) - - # await and get the pod id and name of the message broker - message_broker_pod_name = get_k8s_resource_names('pod', + message_broker_service_name = find_k8s_resources('service', 'label', 'component=flame-message-broker', namespace=namespace) + + # await and get the pod id and name of the message broker + message_broker_pod_name = find_k8s_resources('pod', + 'label', + 'component=flame-message-broker', + namespace=namespace) message_broker_pod = None while message_broker_pod is None: try: @@ -352,16 +272,16 @@ def _create_nginx_config_map(analysis_name: str, time.sleep(1) # get the service name of the pod orchestrator - po_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-po', - namespace=namespace) + po_service_name = find_k8s_resources('service', + 'label', + 'component=flame-po', + namespace=namespace) # await and get the pod ip and name of the pod orchestrator - pod_orchestration_name = get_k8s_resource_names('pod', - 'label', - 'component=flame-po', - namespace=namespace) + pod_orchestration_name = find_k8s_resources('pod', + 'label', + 'component=flame-po', + namespace=namespace) pod_orchestration_pod = None while pod_orchestration_pod is None: try: @@ -384,20 +304,20 @@ def _create_nginx_config_map(analysis_name: str, analysis_ip = pod_list_object.items[0].status.pod_ip time.sleep(1) - # get the name of the hub adapter, kong proxy, and result service - hub_adapter_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-hub-adapter', - namespace=namespace) - kong_proxy_name = get_k8s_resource_names('service', + # get the name of the hub adapter, kong proxy, and storage service + hub_adapter_service_name = find_k8s_resources('service', + 'label', + 'component=flame-hub-adapter', + namespace=namespace) + kong_proxy_name = find_k8s_resources('service', + 'label', + 'app.kubernetes.io/name=kong', + manual_name_selector='proxy', + namespace=namespace) + storage_service_name = find_k8s_resources('service', 'label', - 'app.kubernetes.io/name=kong', - manual_name_selector='proxy', + 'component=flame-storage-service', namespace=namespace) - result_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-result-service', - namespace=namespace) # generate config map data = { @@ -434,10 +354,10 @@ def _create_nginx_config_map(analysis_name: str, }} - # egress: analysis deployment to result-service + # egress: analysis deployment to storage-service location ~ ^/storage/(final|local|intermediate)/ {{ rewrite ^/storage(/.*) $1 break; - proxy_pass http://{result_service_name}:8080; + proxy_pass http://{storage_service_name}:8080; allow {analysis_ip}; deny all; }} @@ -588,14 +508,14 @@ def _get_logs(name: str, pod_ids: Optional[list[str]] = None, namespace: str = ' pod_logs = [core_client.read_namespaced_pod_log(pod.metadata.name, namespace) for pod in pods.items if pod.metadata.name in pod_ids] except client.exceptions.ApiException as e: - print(e) + print(f"Error: APIException while trying to retrieve pod logs (pod_ids in list)\n{e}") return [] else: try: pod_logs = [core_client.read_namespaced_pod_log(pod.metadata.name, namespace) for pod in pods.items] except client.exceptions.ApiException as e: - print(e) + print(f"Error: APIException while trying to retrieve pod logs (pod_ids=None)\n{e}") return [] # sanitize pod logs diff --git a/src/k8s/utils.py b/src/k8s/utils.py index 7e3fbd9..936b009 100644 --- a/src/k8s/utils.py +++ b/src/k8s/utils.py @@ -1,16 +1,32 @@ +import time from typing import Literal, Optional, Union -from kubernetes import client +from kubernetes import config, client -def get_k8s_resource_names(resource_type: str, - selector_type: Optional[Literal['label', 'field']] = None, - selector_arg: Optional[str] = None, - manual_name_selector: Optional[str] = None, - namespace: str = "default") -> Optional[Union[str, list[str]]]: - if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap']: +def load_cluster_config(): + config.load_incluster_config() + + +def get_current_namespace() -> str: + namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' + try: + with open(namespace_file, 'r') as file: + return file.read().strip() + # Handle the case where the file is not found + except FileNotFoundError: + # Fallback to a default namespace if the file is not found + return 'default' + + +def find_k8s_resources(resource_type: str, + selector_type: Optional[Literal['label', 'field']] = None, + selector_arg: Optional[str] = None, + manual_name_selector: Optional[str] = None, + namespace: str = "default") -> Optional[Union[str, list[str]]]: + if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap', 'job']: raise ValueError("For k8s resource search: resource_type must be one of 'deployment', 'pod', 'service', " - "'networkpolicy', or 'configmap'") + "'networkpolicy', 'configmap', or 'job") if (selector_type is not None) and (selector_type not in ['label', 'field']): raise ValueError("For k8s resource search: selector_type must be either 'label' or 'field'") if (selector_type is not None) and (selector_arg is None): @@ -19,7 +35,7 @@ def get_k8s_resource_names(resource_type: str, kwargs = {'namespace': namespace} if selector_type: kwargs[f'{selector_type}_selector'] = selector_arg - + if resource_type == 'deployment': resources = client.AppsV1Api().list_namespaced_deployment(**kwargs) elif resource_type == 'networkpolicy': @@ -32,6 +48,8 @@ def get_k8s_resource_names(resource_type: str, resources = core_client.list_namespaced_service(**kwargs) elif resource_type == 'configmap': resources = core_client.list_namespaced_config_map(**kwargs) + elif resource_type == 'job': + resources = client.BatchV1Api().list_namespaced_job(**kwargs) else: raise ValueError(f"Uncaptured resource type discovered! Message the Devs... (found={resource_type})") @@ -52,12 +70,55 @@ def get_k8s_resource_names(resource_type: str, return None -def get_current_namespace() -> str: - namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' - try: - with open(namespace_file, 'r') as file: - return file.read().strip() - # Handle the case where the file is not found - except FileNotFoundError: - # Fallback to a default namespace if the file is not found - return 'default' +def delete_k8s_resource(name: str, resource_type: str, namespace: str = 'default') -> None: + """ + Deletes a Kubernetes resource by name and type. + :param name: Name of the resource to delete. + :param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap', 'job'). + :param namespace: Namespace in which the resource exists. + """ + print(f"PO ACTION - Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + if resource_type == 'deployment': + try: + app_client = client.AppsV1Api() + app_client.delete_namespaced_deployment(name=name, namespace=namespace, propagation_policy='Foreground') + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} deployment") + elif resource_type == 'service': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_service(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} service") + elif resource_type == 'pod': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_pod(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} pod") + elif resource_type == 'configmap': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_config_map(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} configmap") + elif resource_type == 'networkpolicy': + try: + network_client = client.NetworkingV1Api() + network_client.delete_namespaced_network_policy(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} networkpolicy") + elif resource_type == 'job': + try: + batch_client = client.BatchV1Api() + batch_client.delete_namespaced_job(name=name, namespace=namespace, propagation_policy='Foreground') + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} job") + else: + raise ValueError(f"Unsupported resource type: {resource_type}") diff --git a/src/main.py b/src/main.py index 37e3240..c8331cc 100644 --- a/src/main.py +++ b/src/main.py @@ -5,8 +5,7 @@ from src.resources.database.entity import Database from src.api.api import PodOrchestrationAPI -from src.k8s.kubernetes import load_cluster_config -from src.k8s.utils import get_current_namespace +from src.k8s.utils import get_current_namespace, load_cluster_config from src.status.status import status_loop diff --git a/src/resources/database/entity.py b/src/resources/database/entity.py index 93605e8..0d7ddef 100644 --- a/src/resources/database/entity.py +++ b/src/resources/database/entity.py @@ -11,12 +11,12 @@ class Database: def __init__(self) -> None: - host = os.getenv("POSTGRES_HOST") + host = os.getenv('POSTGRES_HOST') port = "5432" - user = os.getenv("POSTGRES_USER") - password = os.getenv("POSTGRES_PASSWORD") - database = os.getenv("POSTGRES_DB") - conn_uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}' + user = os.getenv('POSTGRES_USER') + password = os.getenv('POSTGRES_PASSWORD') + database = os.getenv('POSTGRES_DB') + conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" print(conn_uri) self.engine = create_engine(conn_uri, pool_pre_ping=True, @@ -31,11 +31,11 @@ def reset_db(self) -> None: def get_deployment(self, deployment_name: str) -> Optional[AnalysisDB]: with self.SessionLocal() as session: - return session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first() + return session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first() def get_latest_deployment(self, analysis_id: str) -> Optional[AnalysisDB]: with self.SessionLocal() as session: - deployments = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + deployments = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() if deployments: return deployments[-1] return None @@ -50,7 +50,7 @@ def analysis_is_running(self, analysis_id: str) -> bool: def get_deployments(self, analysis_id: str) -> list[AnalysisDB]: with self.SessionLocal() as session: - return session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + return session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() def create_analysis(self, analysis_id: str, @@ -86,7 +86,7 @@ def create_analysis(self, def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: with self.SessionLocal() as session: - analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() for deployment in analysis: if deployment: for key, value in kwargs.items(): @@ -97,7 +97,7 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB: with self.SessionLocal() as session: - deployment = session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first() + deployment = session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first() for key, value in kwargs.items(): setattr(deployment, key, value) session.commit() @@ -105,7 +105,7 @@ def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB: def delete_analysis(self, analysis_id: str) -> None: with self.SessionLocal() as session: - analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() for deployment in analysis: if deployment: session.delete(deployment) @@ -154,7 +154,7 @@ def update_analysis_status(self, analysis_id: str, status: str) -> None: self.update_analysis(analysis_id, status=status) def update_deployment_status(self, deployment_name: str, status: str) -> None: - print(f"Updating deployment {deployment_name} to status {status}") + print(f"PO ACTION - Updating deployment {deployment_name} to status {status}") self.update_deployment(deployment_name, status=status) def stop_analysis(self, analysis_id: str) -> None: @@ -164,19 +164,19 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]: analysis = self.get_deployments(analysis_id) if analysis: analysis = analysis[0] - return {"analysis_id": analysis.analysis_id, - "project_id": analysis.project_id, - "registry_url": analysis.registry_url, - "image_url": analysis.image_url, - "registry_user": analysis.registry_user, - "registry_password": analysis.registry_password, - "namespace": analysis.namespace, - "kong_token": analysis.kong_token, - "restart_counter": analysis.restart_counter} + return {'analysis_id': analysis.analysis_id, + 'project_id': analysis.project_id, + 'registry_url': analysis.registry_url, + 'image_url': analysis.image_url, + 'registry_user': analysis.registry_user, + 'registry_password': analysis.registry_password, + 'namespace': analysis.namespace, + 'kong_token': analysis.kong_token, + 'restart_counter': analysis.restart_counter} return None def delete_old_deployments_from_db(self, analysis_id: str) -> None: deployments = self.get_deployments(analysis_id) deployments = sorted(deployments, key=lambda x: x.time_created, reverse=True) for deployment in deployments[1:]: - self.delete_deployment(deployment.deployment_name) \ No newline at end of file + self.delete_deployment(deployment.deployment_name) diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 4f369f4..a06414e 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -10,7 +10,7 @@ class LogEntity(BaseModel): log: str - log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"] + log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug'] id: str = str(uuid.uuid4()) created_at: str = str(datetime.now()) @@ -21,7 +21,7 @@ def __str__(self) -> str: class CreateLogEntity(BaseModel): log: str - log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"] + log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug'] analysis_id: str status: str diff --git a/src/resources/utils.py b/src/resources/utils.py index e8ec615..fb481ab 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -8,10 +8,8 @@ from src.resources.analysis.entity import Analysis, CreateAnalysis, read_db_analysis from src.resources.log.entity import CreateLogEntity from src.status.constants import AnalysisStatus -from src.k8s.kubernetes import (create_harbor_secret, - get_analysis_logs, - delete_resource) -from src.k8s.utils import get_current_namespace, get_k8s_resource_names +from src.k8s.kubernetes import create_harbor_secret, get_analysis_logs +from src.k8s.utils import get_current_namespace, find_k8s_resources, delete_k8s_resource from src.utils.token import _get_all_keycloak_clients from src.utils.token import delete_keycloak_client from src.utils.hub_client import init_hub_client_and_update_hub_status_with_robot @@ -24,7 +22,7 @@ def create_analysis(body: Union[CreateAnalysis, str], database: Database) -> dic if isinstance(body, str): body = database.extract_analysis_body(body) if body is None: - return {"status": "Analysis ID not found in database."} + return {'status': "Analysis ID not found in database."} else: body = CreateAnalysis(**body) @@ -56,7 +54,7 @@ def retrieve_history(analysis_id_str: str, database: Database) -> dict[str, dict :param database: :return: """ - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -73,16 +71,15 @@ def retrieve_history(analysis_id_str: str, database: Database) -> dict[str, dict analysis_logs, nginx_logs = ({}, {}) for analysis_id, deployment in deployments.items(): # interpret log string as a dictionary - if deployment.log is not None: - log = ast.literal_eval(deployment.log) - analysis_logs[analysis_id] = log["analysis"][analysis_id] - nginx_logs[analysis_id] = log["nginx"][analysis_id] + log = ast.literal_eval(deployment.log) + analysis_logs[analysis_id] = log['analysis'][analysis_id] + nginx_logs[analysis_id] = log['nginx'][analysis_id] - return {"analysis": analysis_logs, "nginx": nginx_logs} + return {'analysis': analysis_logs, 'nginx': nginx_logs} def retrieve_logs(analysis_id_str: str, database: Database) -> dict[str, dict[str, list[str]]]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -98,7 +95,7 @@ def retrieve_logs(analysis_id_str: str, database: Database) -> dict[str, dict[st def get_status(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -113,7 +110,7 @@ def get_status(analysis_id_str: str, database: Database) -> dict[str, str]: def get_pods(analysis_id_str: str, database: Database) -> dict[str, list[str]]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -121,7 +118,7 @@ def get_pods(analysis_id_str: str, database: Database) -> dict[str, list[str]]: def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -159,7 +156,7 @@ def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: def delete_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -191,7 +188,7 @@ def unstuck_analysis_deployments(analysis_id: str, database: Database) -> None: def cleanup(cleanup_type: str, database: Database, - namespace: str = "default") -> dict[str, str]: + namespace: str = 'default') -> dict[str, str]: cleanup_types = set(cleanup_type.split(',')) if ',' in cleanup_type else [cleanup_type] response_content = {} @@ -206,26 +203,26 @@ def cleanup(cleanup_type: str, # Service cleanup/reinit if cleanup_type in ['all', 'services', 'mb']: # reinitialize message-broker pod - message_broker_pod_name = get_k8s_resource_names('pod', - 'label', - 'component=flame-message-broker', - namespace=namespace) - delete_resource(message_broker_pod_name, 'pod', namespace) - response_content[cleanup_type] = "Reset message broker" - if cleanup_type in ['all', 'services', 'rs']: - # reinitialize result-service pod - result_service_name = get_k8s_resource_names('pod', + message_broker_pod_name = find_k8s_resources('pod', 'label', - 'component=flame-result-service', + "component=flame-message-broker", namespace=namespace) - delete_resource(result_service_name, 'pod', namespace) - response_content[cleanup_type] = "Reset result service" + delete_k8s_resource(message_broker_pod_name, 'pod', namespace) + response_content[cleanup_type] = "Reset message broker" + if cleanup_type in ['all', 'services', 'rs']: + # reinitialize storage-service pod + storage_service_name = find_k8s_resources('pod', + 'label', + "component=flame-storage-service", + namespace=namespace) + delete_k8s_resource(storage_service_name, 'pod', namespace) + response_content[cleanup_type] = "Reset storage service" if cleanup_type in ['all', 'keycloak']: # cleanup keycloak clients without corresponding analysis # if all is all flame clients are deleted because ther are no analyzes in the db analysis_ids = database.get_analysis_ids() for client in _get_all_keycloak_clients(): - if client['clientId'] not in analysis_ids and client['name'].startswith('flame-'): + if (client['clientId'] not in analysis_ids) and client['name'].startswith('flame-'): delete_keycloak_client(client['clientId']) else: @@ -239,34 +236,35 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: known_analysis_ids = database.get_analysis_ids() result_str = "" - for res, (selector_args, max_r_split) in {'deployment': (['component=flame-analysis', 'component=flame-analysis-nginx'], 1), - 'pod': (['component=flame-analysis', 'component=flame-analysis-nginx'], 2), - 'service': (['component=flame-analysis', 'component=flame-analysis-nginx'], 1), - 'networkpolicy': (['component=flame-nginx-to-analysis-policy'], 2), - 'configmap': (['component=flame-nginx-analysis-config-map'], 2)}.items(): + for res, (selector_args, max_r_split) in {'deployment': (["component=flame-analysis", "component=flame-analysis-nginx"], 1), + 'pod': (["component=flame-analysis", "component=flame-analysis-nginx"], 2), + 'service': (["component=flame-analysis", "component=flame-analysis-nginx"], 1), + 'networkpolicy': (["component=flame-nginx-to-analysis-policy"], 2), + 'configmap': (["component=flame-nginx-analysis-config-map"], 2)}.items(): for selector_arg in selector_args: - resources = get_k8s_resource_names(res, 'label', selector_arg, namespace=namespace) + resources = find_k8s_resources(res, 'label', selector_arg, namespace=namespace) resources = [resources] if type(resources) == str else resources if resources is not None: zombie_resources = [r for r in resources if resource_name_to_analysis(r, max_r_split) not in known_analysis_ids] for z in zombie_resources: - delete_resource(z, res, namespace=namespace) + delete_k8s_resource(z, res, namespace=namespace) result_str += f"Deleted {len(zombie_resources)} zombie " + \ f"{'' if '-nginx' not in selector_arg else 'nginx-'}{res}s\n" return result_str -def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: +def stream_logs(log_entity: CreateLogEntity, node_id: str, enable_hub_logging: bool, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) - #database.update_analysis_status(log_entity.analysis_id, log_entity.status) #TODO: Implement this? + #database.update_analysis_status(log_entity.analysis_id, log_entity.status) # TODO: Implement this? except IndexError as e: - print(f"Error updating analysis log in database: {e}") + print(f"Error: Failed to update analysis log in database\n{e}") # log to hub - hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, - node_id=node_id, - status=log_entity.status, - level=log_entity.log_type, - message=log_entity.log) + if enable_hub_logging: + hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, + node_id=node_id, + status=log_entity.status, + level=log_entity.log_type, + message=log_entity.log) diff --git a/src/status/status.py b/src/status/status.py index b854c76..77985c5 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -6,21 +6,20 @@ import flame_hub -from src.k8s.kubernetes import PORTS +from src.k8s.kubernetes import PORTS, get_pod_status from src.resources.database.entity import Database, AnalysisDB -from src.utils.hub_client import (init_hub_client_with_robot, - get_node_id_by_robot, - get_node_analysis_id, - update_hub_status) +from src.resources.log.entity import CreateStartUpErrorLog from src.resources.utils import (unstuck_analysis_deployments, stop_analysis, delete_analysis, stream_logs) -from src.resources.log.entity import CreateStartUpErrorLog -from src.k8s.kubernetes import get_pod_status +from src.utils.hub_client import (init_hub_client_with_robot, + get_node_id_by_robot, + get_node_analysis_id, + update_hub_status) from src.status.constants import AnalysisStatus +from src.utils.other import extract_hub_envs from src.utils.token import get_keycloak_token - from src.status.constants import _MAX_RESTARTS, _INTERNAL_STATUS_TIMEOUT @@ -34,12 +33,8 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_id = None node_analysis_ids = {} - robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy = (os.getenv('HUB_ROBOT_USER'), - os.getenv('HUB_ROBOT_SECRET'), - os.getenv('HUB_URL_CORE'), - os.getenv('HUB_URL_AUTH'), - os.getenv('PO_HTTP_PROXY'), - os.getenv('PO_HTTPS_PROXY')) + robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() + # Enter lifecycle loop while True: if hub_client is None: @@ -52,18 +47,18 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_id = get_node_id_by_robot(hub_client, robot_id) # Catch unresponsive hub client if node_id is None: - print("Resetting hub client...") + print("PO ACTION - Resetting hub client...") hub_client = None continue else: # If running analyzes exist, enter status loop running_analyzes = [analysis_id for analysis_id in database.get_analysis_ids() if database.analysis_is_running(analysis_id)] - print(f"Checking for running analyzes...{running_analyzes}") + print(f"PO ACTION - Checking for running analyzes...{running_analyzes}") if running_analyzes: hub_client_issues = 0 for analysis_id in running_analyzes: - print(f"Current analysis id: {analysis_id}") + print(f"PO STATUS LOOP - Current analysis id: {analysis_id}") # Get node analysis id if analysis_id not in node_analysis_ids.keys(): node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) @@ -80,43 +75,49 @@ def status_loop(database: Database, status_loop_interval: int) -> None: # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id is not None: + # Retrieve analysis status (skip iteration if analysis is not deployed) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue print(f"\tDatabase status: {analysis_status['db_status']}") print(f"\tInternal status: {analysis_status['int_status']}") - # Fix for stuck analyzes - _fix_stuck_status(database, analysis_status, node_id, hub_client) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") - - # Update created to running status if deployment responsive - _update_running_status(database, analysis_status) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUpdate created to running database status: {analysis_status['db_status']}") - - # update running to finished status if analysis finished - _update_finished_status(database, analysis_status) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUpdate running to finished database status: {analysis_status['db_status']}") - - # update hub analysis status + # Fix stuck analyzes + if analysis_status['status_action'] == 'unstuck': + print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") + _fix_stuck_status(database, analysis_status, node_id, enable_hub_logging, hub_client) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue + + # Update created to running status + if analysis_status['status_action'] == 'running': + print(f"\tUpdate created-to-running database status: {analysis_status['db_status']}") + _update_running_status(database, analysis_status) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue + + # Update running to finished status + if analysis_status['status_action'] == 'finishing': + print(f"\tUpdate running-to-finished database status: {analysis_status['db_status']}") + _update_finished_status(database, analysis_status) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue + + # Submit analysis_status to hub analysis_hub_status = _set_analysis_hub_status(hub_client, node_analysis_id, analysis_status) print(f"\tSet Hub analysis status with node_analysis={node_analysis_id}, " f"db_status={analysis_status['db_status']}, " f"internal_status={analysis_status['int_status']} " f"to {analysis_hub_status}") - hub_client = None if hub_client_issues > 0 else hub_client time.sleep(status_loop_interval) - print(f"Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") + print(f"PO STATUS LOOP - Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]: analysis = database.get_latest_deployment(analysis_id) @@ -128,39 +129,62 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[ int_status = AnalysisStatus.FINISHED.value else: int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) - return {"analysis_id": analysis_id, - "db_status": analysis.status, - "int_status": int_status} + return {'analysis_id': analysis_id, + 'db_status': analysis.status, + 'int_status': int_status, + 'status_action': _decide_status_action(analysis.status, int_status)} + else: + return None + + +def _decide_status_action(db_status: str, int_status: str) -> Optional[str]: + is_stuck = int_status == AnalysisStatus.STUCK.value + is_slow = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.FAILED.value])) + newly_running = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.RUNNING.value])) + speedy_finished = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.FINISHED.value])) + newly_ended = ((db_status in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value]) + and (int_status in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) + firmly_stuck = ((db_status in [AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.STUCK.value])) + if is_stuck or is_slow: + return 'unstuck' + elif newly_running: + return 'running' + elif speedy_finished or newly_ended or firmly_stuck: + return 'finishing' else: return None async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: + # Attempt to retrieve internal analysis status via health endpoint start_time = time.time() while True: try: - response = await (AsyncClient(base_url=f'http://nginx-{deployment_name}:{PORTS["nginx"][0]}') - .get('/analysis/healthz', headers=[('Connection', 'close')])) + response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") + .get("/analysis/healthz", headers=[('Connection', 'close')])) response.raise_for_status() break except HTTPStatusError as e: - print(f"\tError getting internal deployment status: {e}") + print(f"\tError whilst retrieving internal deployment status: {e}") except ConnectError as e: print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") except ConnectTimeout as e: print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") elapsed_time = time.time() - start_time time.sleep(1) - if elapsed_time > _INTERNAL_STATUS_TIMEOUT: + if elapsed_time > _INTERNAL_STATUS_TIMEOUT: # TODO: Handle case of this happening for large images print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds") return AnalysisStatus.FAILED.value + # Extract fields from response analysis_status, analysis_token_remaining_time = (response.json()['status'], response.json()['token_remaining_time']) + # Check if token needs refresh, do so if needed await _refresh_keycloak_token(deployment_name=deployment_name, analysis_id=analysis_id, token_remaining_time=analysis_token_remaining_time) + # Map status from response to preset values if analysis_status == AnalysisStatus.FINISHED.value: health_status = AnalysisStatus.FINISHED.value elif analysis_status == AnalysisStatus.RUNNING.value: @@ -169,7 +193,6 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str health_status = AnalysisStatus.STUCK.value else: health_status = AnalysisStatus.FAILED.value - return health_status @@ -181,95 +204,85 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ if token_remaining_time < (int(os.getenv('STATUS_LOOP_INTERVAL')) * 2 + 1): keycloak_token = get_keycloak_token(analysis_id) try: - response = await (AsyncClient(base_url=f'http://nginx-{deployment_name}:{PORTS["nginx"][0]}') - .post('/analysis/token_refresh', - json={"token": keycloak_token}, + response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") + .post("/analysis/token_refresh", + json={'token': keycloak_token}, headers=[('Connection', 'close')])) response.raise_for_status() except HTTPStatusError as e: - print(f"Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") + print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") def _fix_stuck_status(database: Database, analysis_status: dict[str, str], node_id: str, + enable_hub_logging: bool, hub_client: flame_hub.CoreClient) -> None: - # Deployment selection - is_stuck = analysis_status['int_status'] == AnalysisStatus.STUCK.value - is_slow = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.FAILED.value])) - - # Update Status - if is_stuck or is_slow: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) - - # Tracking restarts - if analysis.restart_counter < _MAX_RESTARTS: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) - unstuck_analysis_deployments(analysis_status["analysis_id"], database) - else: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) + is_slow = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and + (analysis_status['int_status'] in [AnalysisStatus.FAILED.value])) + + # Tracking restarts + if analysis.restart_counter < _MAX_RESTARTS: + _stream_stuck_logs(analysis, node_id, enable_hub_logging, database, hub_client, is_slow) + unstuck_analysis_deployments(analysis_status['analysis_id'], database) + else: + _stream_stuck_logs(analysis, node_id,enable_hub_logging, database, hub_client, is_slow) def _stream_stuck_logs(analysis: AnalysisDB, node_id: str, + enable_hub_logging: bool, database: Database, hub_client: flame_hub.CoreClient, is_slow: bool) -> None: + # If is_slow=True differentiate between slow, or kubernetes_error state, else assume stuck state is_k8s_related = False if is_slow: deployment_name = analysis.deployment_name + # Retrieve status of latest pod pod_status_dict = get_pod_status(deployment_name) if pod_status_dict is not None: _, pod_status_dict = list(pod_status_dict.items())[-1] ready, reason, message = pod_status_dict['ready'], pod_status_dict['reason'], pod_status_dict['message'] + # ready=True implicates slow state, else assume kubernetes_error state if not ready: is_k8s_related = True print(f"\tDeployment of analysis={analysis.analysis_id} failed (ready={ready}).\n" f"\t\t{reason}: {message}") + # Create and stream POAPIError logs or either slow, stuck, or kubernetes_error state to Hub stream_logs(CreateStartUpErrorLog(analysis.restart_counter, - ("k8s" if is_k8s_related else "slow") if is_slow else "stuck", + ('k8s' if is_k8s_related else 'slow') if is_slow else 'stuck', analysis.analysis_id, analysis.status, k8s_error_msg=reason if is_k8s_related else ''), node_id, + enable_hub_logging, database, hub_client) def _update_running_status(database: Database, analysis_status: dict[str, str]) -> None: - newly_running = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.RUNNING.value])) - if newly_running: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, AnalysisStatus.RUNNING.value) + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, AnalysisStatus.RUNNING.value) def _update_finished_status(database: Database, analysis_status: dict[str, str]) -> None: - speedy_finished = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value])) - newly_ended = ((analysis_status['db_status'] in [AnalysisStatus.RUNNING.value, - AnalysisStatus.FAILED.value]) - and (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value, - AnalysisStatus.FAILED.value])) - firmly_stuck = ((analysis_status['db_status'] in [AnalysisStatus.FAILED.value]) - and (analysis_status['int_status'] in [AnalysisStatus.STUCK.value])) - if speedy_finished or newly_ended or firmly_stuck: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) - if analysis_status['int_status'] == AnalysisStatus.FINISHED.value: - print("\tDelete deployment") - # TODO: final local log save (minio?) # archive logs - # delete_analysis(analysis_status['analysis_id'], database) # delete analysis from database - stop_analysis(analysis_status['analysis_id'], database) # stop analysis TODO: Change to delete in the future (when archive logs implemented) - else: - print("\tStop deployment") - stop_analysis(analysis_status['analysis_id'], database) # stop analysis + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) + if analysis_status['int_status'] == AnalysisStatus.FINISHED.value: + print("\tDelete deployment") + # TODO: final local log save (minio?) # archive logs + # delete_analysis(analysis_status['analysis_id'], database) # delete analysis from database + stop_analysis(analysis_status['analysis_id'], database) # stop analysis TODO: Change to delete in the future (when archive logs implemented) + else: + print("\tStop deployment") + stop_analysis(analysis_status['analysis_id'], database) # stop analysis def _set_analysis_hub_status(hub_client: flame_hub.CoreClient, diff --git a/src/utils/docker.py b/src/utils/docker.py deleted file mode 100644 index 2356406..0000000 --- a/src/utils/docker.py +++ /dev/null @@ -1,40 +0,0 @@ - - -#_CLIENT = docker.from_env() -_CLIENT = None -#_CLIENT.login( -# username=os.getenv("NODE_NAME"), -# password=os.getenv("HARBOR_PW"), -# registry=os.getenv("HARBOR_URL"), -#) - - -def download_image(image_registry_address: str) -> str: - _CLIENT.images.pull(image_registry_address) - return image_registry_address.split('/')[-1] + ':latest' - - -def validate_image(image_name: str, master_image_name: str) -> bool: - _ = download_image(master_image_name) - - master_image = _CLIENT.images.get(master_image_name) - image = _CLIENT.images.get(image_name) - - return _history_validation(image, master_image) and _image_valid(image, master_image) - - -def _history_validation(image, master_image) -> bool: - master_img_entry_ids = [ - {key: entry[key] for key in ["Created", "CreatedBy", "Size"]} - for entry in master_image.history() - ] - img_entry_ids = [ - {key: entry[key] for key in ["Created", "CreatedBy", "Size"]} - for entry in image.history() - ] - - return all([entry_dict in img_entry_ids for entry_dict in master_img_entry_ids]) - - -def _image_valid(image, master_image) -> bool: - return True diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 644d575..3af936b 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -4,7 +4,7 @@ from pathlib import Path from functools import lru_cache from json import JSONDecodeError -from typing import Optional, Tuple +from typing import Optional from httpx import (Client, HTTPTransport, HTTPStatusError, @@ -41,10 +41,10 @@ def init_hub_client_with_robot(robot_id: str, client = Client(base_url=hub_url_core, mounts=proxies, auth=hub_robot, verify=ssl_ctx) hub_client = flame_hub.CoreClient(client=client) - print("Hub client init successful") + print("PO ACTION - Hub client init successful") except Exception as e: hub_client = None - print(f"Failed to authenticate with hub python client library.\n{e}") + print(f"Error: Failed to authenticate with hub python client library.\n{e}") return hub_client @@ -60,19 +60,19 @@ def get_ssl_context() -> ssl.SSLContext: def get_node_id_by_robot(hub_client: flame_hub.CoreClient, robot_id: str) -> Optional[str]: try: - node_id_object = hub_client.find_nodes(filter={"robot_id": robot_id})[0] + node_id_object = hub_client.find_nodes(filter={'robot_id': robot_id})[0] except (HTTPStatusError, JSONDecodeError, ConnectTimeout, flame_hub._exceptions.HubAPIError, AttributeError) as e: - print(f"Error in hub python client whilst retrieving node id object!\n{e}") + print(f"Error: Failed to retrieve node id object from hub python client\n{e}") node_id_object = None return str(node_id_object.id) if node_id_object is not None else None def get_node_analysis_id(hub_client: flame_hub.CoreClient, analysis_id: str, node_id_object_id: str) -> Optional[str]: try: - node_analyzes = hub_client.find_analysis_nodes(filter={"analysis_id": analysis_id, - "node_id": node_id_object_id}) + node_analyzes = hub_client.find_analysis_nodes(filter={'analysis_id': analysis_id, + 'node_id': node_id_object_id}) except (HTTPStatusError, flame_hub._exceptions.HubAPIError, AttributeError) as e: - print(f"Error in hub python client whilst retrieving node analyzes!\n{e}") + print(f"Error: Failed to retrieve node analyzes from hub python client\n{e}") node_analyzes = None if node_analyzes: @@ -92,7 +92,7 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r run_status = AnalysisStatus.FAILED.value hub_client.update_analysis_node(node_analysis_id, run_status=run_status) except (HTTPStatusError, ConnectError, flame_hub._exceptions.HubAPIError, AttributeError) as e: - print(f"Failed to update hub status for node_analysis_id {node_analysis_id}.\n{e}") + print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}") def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: str) -> None: @@ -113,11 +113,11 @@ def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: s if node_analysis_id is not None: update_hub_status(hub_client, node_analysis_id, run_status=status) else: - print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") + print("Error: Failed to retrieve node_analysis_id from hub client. Cannot update status.") else: - print("Failed to retrieve node_id from hub client. Cannot update status.") + print("Error: Failed to retrieve node_id from hub client. Cannot update status.") else: - print("Failed to initialize hub client. Cannot update status.") + print("Error: Failed to initialize hub client. Cannot update status.") # TODO: Import this from flame sdk? (from flamesdk import HUB_LOG_LITERALS) diff --git a/src/utils/other.py b/src/utils/other.py index 8516934..6653532 100644 --- a/src/utils/other.py +++ b/src/utils/other.py @@ -1,5 +1,16 @@ from httpx import AsyncClient import asyncio +import os + + +def extract_hub_envs() -> tuple[str, str, str, str, bool, str, str]: + return (os.getenv('HUB_ROBOT_USER'), + os.getenv('HUB_ROBOT_SECRET'), + os.getenv('HUB_URL_CORE'), + os.getenv('HUB_URL_AUTH'), + os.getenv('HUB_LOGGING') in ['True', 'true', '1', 't'], + os.getenv('PO_HTTP_PROXY'), + os.getenv('PO_HTTPS_PROXY')) def resource_name_to_analysis(deployment_name: str, max_r_split: int = 1) -> str: @@ -17,8 +28,8 @@ def get_project_data_source(keycloak_token, project_id, hub_adapter_service_name :return: """ client = AsyncClient(base_url=f"http://{hub_adapter_service_name}:5000", - headers={"Authorization": f"Bearer {keycloak_token}", - "accept": "application/json"}) + headers={'Authorization': f"Bearer {keycloak_token}", + 'accept': "application/json"}) return asyncio.run(call_sources(client, project_id)) diff --git a/src/utils/token.py b/src/utils/token.py index 776bea2..b906436 100644 --- a/src/utils/token.py +++ b/src/utils/token.py @@ -16,7 +16,9 @@ def get_keycloak_token(analysis_id: str) -> Optional[str]: client_secret = _get_keycloak_client_secret(analysis_id) keycloak_url = f"{_KEYCLOAK_URL}/realms/flame/protocol/openid-connect/token" - data = {"grant_type": "client_credentials", "client_id": analysis_id, "client_secret": client_secret} + data = {'grant_type': 'client_credentials', + 'client_id': analysis_id, + 'client_secret': client_secret} # get token from keycloak like in the above curl command try: @@ -25,7 +27,7 @@ def get_keycloak_token(analysis_id: str) -> Optional[str]: return response.json()['access_token'] except requests.exceptions.RequestException as e: - print(e) + print(f"Error: Failed to retrieve keycloak token\n{e}") return None @@ -38,7 +40,7 @@ def _get_keycloak_client_secret(analysis_id: str) -> str: # get client secret url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() @@ -53,9 +55,9 @@ def _get_keycloak_admin_token() -> str: # get admin token url_admin_access_token = f"{_KEYCLOAK_URL}/realms/{_KEYCLOAK_REALM}/protocol/openid-connect/token" data = { - "grant_type": "client_credentials", - "client_id": keycloak_admin_client_id, - "client_secret": keycloak_admin_client_secret + 'grant_type': 'client_credentials', + 'client_id': keycloak_admin_client_id, + 'client_secret': keycloak_admin_client_secret } response = requests.post(url_admin_access_token, data=data) response.raise_for_status() @@ -65,7 +67,7 @@ def _get_keycloak_admin_token() -> str: def _keycloak_client_exists(analysis_id: str, admin_token: str) -> bool: url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() @@ -75,11 +77,11 @@ def _keycloak_client_exists(analysis_id: str, admin_token: str) -> bool: def _create_keycloak_client(admin_token: str, analysis_id: str) -> None: url_create_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients" - headers = {"Authorization": f"Bearer {admin_token}", - "Content-Type": "application/json"} - client_data = {"clientId": f"{analysis_id}", - "name": f"flame-{analysis_id}", - "serviceAccountsEnabled": "true"} + headers = {'Authorization': f"Bearer {admin_token}", + 'Content-Type': "application/json"} + client_data = {'clientId': f"{analysis_id}", + 'name': f"flame-{analysis_id}", + 'serviceAccountsEnabled': 'true'} response = requests.post(url_create_client, headers=headers, json=client_data) response.raise_for_status() @@ -87,7 +89,7 @@ def _create_keycloak_client(admin_token: str, analysis_id: str) -> None: def _get_all_keycloak_clients() -> list[dict]: admin_token = _get_keycloak_admin_token() url_get_clients = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_clients, headers=headers) response.raise_for_status() @@ -99,19 +101,18 @@ def delete_keycloak_client(analysis_id: str) -> None: # get client uuid url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() try: uuid = response.json()[0]['id'] - except (KeyError, IndexError): - print('keycloak Client not found') + except (KeyError, IndexError) as e: + print(f"Error: Keycloak client not found\n{e}") return url_delete_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients/{uuid}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.delete(url_delete_client, headers=headers) response.raise_for_status() - diff --git a/src/test/test_db.py b/tests/test_db.py similarity index 100% rename from src/test/test_db.py rename to tests/test_db.py