diff --git a/src/api/api.py b/src/api/api.py index 2fa06ea..4a6e632 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -10,7 +10,7 @@ from src.api.oauth import valid_access_token from src.resources.database.entity import Database from src.resources.analysis.entity import CreateAnalysis -from src.resources.log.entity import CreateLogEntity +from src.resources.log.entity import CreateLogEntity, AnalysisStoppedLog from src.resources.utils import (create_analysis, retrieve_history, retrieve_logs, @@ -28,13 +28,13 @@ def __init__(self, database: Database, namespace: str = 'default'): robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() self.enable_hub_logging = enable_hub_logging - self.hub_core_client = init_hub_client_with_robot(robot_id, - robot_secret, - hub_url_core, - hub_auth, - http_proxy, - https_proxy) - self.node_id = get_node_id_by_robot(self.hub_core_client, robot_id) if self.hub_core_client else None + self.hub_client = init_hub_client_with_robot(robot_id, + robot_secret, + hub_url_core, + hub_auth, + http_proxy, + https_proxy) + self.node_id = get_node_id_by_robot(self.hub_client, robot_id) if self.hub_client else None self.namespace = namespace app = FastAPI(title="FLAME PO", docs_url="/api/docs", @@ -196,13 +196,26 @@ def get_pods_call(self, analysis_id: str): def stop_all_analysis_call(self): try: - return stop_analysis('all', self.database) + response = stop_analysis('all', self.database) + for analysis_id in self.database.get_analysis_ids(): + stream_logs(AnalysisStoppedLog(analysis_id), + self.node_id, + self.enable_hub_logging, + self.database, + self.hub_client) + return response except Exception as e: raise HTTPException(status_code=500, detail=f"Error stopping ALL analyzes: {e}") def stop_analysis_call(self, analysis_id: str): try: - return stop_analysis(analysis_id, self.database) + response = stop_analysis(analysis_id, self.database) + stream_logs(AnalysisStoppedLog(analysis_id), + self.node_id, + self.enable_hub_logging, + self.database, + self.hub_client) + return response except Exception as e: raise HTTPException(status_code=500, detail=f"Error stopping analysis: {e}") @@ -226,7 +239,7 @@ def cleanup_call(self, cleanup_type: str): def stream_logs_call(self, body: CreateLogEntity): try: - return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client) + return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_client) except Exception as e: raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}") diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index a870619..243e226 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -316,7 +316,7 @@ def _create_nginx_config_map(analysis_name: str, namespace=namespace) storage_service_name = find_k8s_resources('service', 'label', - 'component=flame-result-service', + 'component=flame-storage-service', namespace=namespace) # generate config map diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index a06414e..c487171 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -60,3 +60,11 @@ def __init__(self, log = '' super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status) + + +class AnalysisStoppedLog(CreateLogEntity): + def __init__(self, analysis_id: str) -> None: + log = (f"[flame -- POAPI: ANALYSISSTOPPED -- " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " + f"Info: The analysis was stopped either locally, or externally on another node.") + super().__init__(log=log, log_type="info", analysis_id=analysis_id, status="stopped") diff --git a/src/status/status.py b/src/status/status.py index 77985c5..97c1d34 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -1,8 +1,7 @@ import time import os -import asyncio from typing import Optional -from httpx import AsyncClient, HTTPStatusError, ConnectError, ConnectTimeout +from httpx import Client, HTTPStatusError, ConnectError, ConnectTimeout import flame_hub @@ -16,6 +15,7 @@ from src.utils.hub_client import (init_hub_client_with_robot, get_node_id_by_robot, get_node_analysis_id, + get_partner_node_statuses, update_hub_status) from src.status.constants import AnalysisStatus from src.utils.other import extract_hub_envs @@ -75,6 +75,15 @@ def status_loop(database: Database, status_loop_interval: int) -> None: # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id is not None: + try: + # Inform local analysis of partner node statuses + _ = inform_analysis_of_partner_statuses(database, + hub_client, + analysis_id, + node_analysis_id) + except Exception as e: + print(f"\tPO STATUS LOOP - Error when attempting to access partner_status endpoint of {analysis_id} ({repr(e)})") + # Retrieve analysis status (skip iteration if analysis is not deployed) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: @@ -119,6 +128,28 @@ def status_loop(database: Database, status_loop_interval: int) -> None: time.sleep(status_loop_interval) print(f"PO STATUS LOOP - Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") + +def inform_analysis_of_partner_statuses(database: Database, + hub_client: flame_hub.CoreClient, + analysis_id: str, + node_analysis_id: str) -> Optional[dict[str, str]]: + node_statuses = get_partner_node_statuses(hub_client, analysis_id, node_analysis_id) + deployment_name = database.get_latest_deployment(analysis_id).deployment_name + try: # try except, in case analysis api is not yet ready + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status", + headers=[('Connection', 'close')], + json={'partner_status': node_statuses}) + response.raise_for_status() + return response.json() + except HTTPStatusError as e: + print(f"\tError whilst trying to access analysis partner_status endpoint: {e}") + except ConnectError as e: + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") + except ConnectTimeout as e: + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") + return None + + def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]: analysis = database.get_latest_deployment(analysis_id) if analysis is not None: @@ -128,7 +159,7 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[ if db_status == AnalysisStatus.FINISHED.value: int_status = AnalysisStatus.FINISHED.value else: - int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) + int_status = _get_internal_deployment_status(analysis.deployment_name, analysis_id) return {'analysis_id': analysis_id, 'db_status': analysis.status, 'int_status': int_status, @@ -145,23 +176,24 @@ def _decide_status_action(db_status: str, int_status: str) -> Optional[str]: newly_ended = ((db_status in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) firmly_stuck = ((db_status in [AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.STUCK.value])) + was_stopped = int_status == AnalysisStatus.STOPPED.value if is_stuck or is_slow: return 'unstuck' elif newly_running: return 'running' - elif speedy_finished or newly_ended or firmly_stuck: + elif speedy_finished or newly_ended or firmly_stuck or was_stopped: return 'finishing' else: return None -async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: +def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: # Attempt to retrieve internal analysis status via health endpoint start_time = time.time() while True: try: - response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") - .get("/analysis/healthz", headers=[('Connection', 'close')])) + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").get("/analysis/healthz", + headers=[('Connection', 'close')]) response.raise_for_status() break except HTTPStatusError as e: @@ -172,7 +204,7 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") elapsed_time = time.time() - start_time time.sleep(1) - if elapsed_time > _INTERNAL_STATUS_TIMEOUT: # TODO: Handle case of this happening for large images + if elapsed_time > _INTERNAL_STATUS_TIMEOUT: print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds") return AnalysisStatus.FAILED.value @@ -180,9 +212,9 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str analysis_status, analysis_token_remaining_time = (response.json()['status'], response.json()['token_remaining_time']) # Check if token needs refresh, do so if needed - await _refresh_keycloak_token(deployment_name=deployment_name, - analysis_id=analysis_id, - token_remaining_time=analysis_token_remaining_time) + _refresh_keycloak_token(deployment_name=deployment_name, + analysis_id=analysis_id, + token_remaining_time=analysis_token_remaining_time) # Map status from response to preset values if analysis_status == AnalysisStatus.FINISHED.value: @@ -196,7 +228,7 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str return health_status -async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None: +def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None: """ Refresh the keycloak token :return: @@ -204,10 +236,9 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ if token_remaining_time < (int(os.getenv('STATUS_LOOP_INTERVAL')) * 2 + 1): keycloak_token = get_keycloak_token(analysis_id) try: - response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") - .post("/analysis/token_refresh", - json={'token': keycloak_token}, - headers=[('Connection', 'close')])) + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post("/analysis/token_refresh", + json={'token': keycloak_token}, + headers=[('Connection', 'close')]) response.raise_for_status() except HTTPStatusError as e: print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 3af936b..2572b1b 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -95,6 +95,26 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}") +def get_analysis_node_statuses(hub_client: flame_hub.CoreClient, analysis_id: str) -> Optional[dict[str, str]]: + try: + node_analyzes = hub_client.find_analysis_nodes(filter={'analysis_id': analysis_id}) + except (HTTPStatusError, flame_hub._exceptions.HubAPIError, AttributeError) as e: + print(f"Error: Failed to retrieve node analyzes from hub python client\n{e}") + return None + analysis_node_statuses = {} + for node in node_analyzes: + analysis_node_statuses[str(node.id)] = node.run_status + return analysis_node_statuses + + +def get_partner_node_statuses(hub_client: flame_hub.CoreClient, + analysis_id: str, + node_analysis_id: str) -> Optional[dict[str, str]]: + analysis_node_statuses = get_analysis_node_statuses(hub_client, analysis_id) + return {k : v for k, v in analysis_node_statuses.items() if k != node_analysis_id} \ + if analysis_node_statuses is not None else None + + def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: str) -> None: """ Create a hub client for the analysis and update the current status.