From 5f7ebd53c4f8bae2ac78dd3c9b2096fd50137628 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Wed, 11 Mar 2026 12:03:34 +0100 Subject: [PATCH 1/7] refactor: add debug prints Co-authored-by: Nightknight3000 --- src/k8s/kubernetes.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index a870619..93eea1f 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -148,6 +148,15 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional if pod is not None: name = pod.metadata.name status = pod.status.container_statuses[0] + print(f"Pod {name} status retrieved successfully.") + if status is not None: + print(f"ready:{status.ready}, started:{status.started}\n" + f"\tstate: {status.state}\n" + f"\t\trunning:{status.state.running}, started_at:{status.state.running.started_at}\n" + f"\t\twaiting:{status.state.waiting}, reason:{status.state.waiting.reason}, reason:{status.state.waiting.message}\n" + f"\t\tterminated:{status.state.terminated}, reason:{status.state.terminated.reason}, reason:{status.state.terminated.message}\n") + else: + print("No status found yet") if status is not None: pod_status[name] = {} @@ -316,7 +325,7 @@ def _create_nginx_config_map(analysis_name: str, namespace=namespace) storage_service_name = find_k8s_resources('service', 'label', - 'component=flame-result-service', + 'component=flame-storage-service', namespace=namespace) # generate config map From df0b2acf57fe050a9f4b941d78ec2898d448ad8d Mon Sep 17 00:00:00 2001 From: davidhieber Date: Wed, 11 Mar 2026 14:19:45 +0100 Subject: [PATCH 2/7] refactor: add debug prints Co-authored-by: Nightknight3000 --- src/api/api.py | 1 + src/k8s/kubernetes.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index 2fa06ea..9d5f0ec 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -144,6 +144,7 @@ def create_analysis_call(self, body: CreateAnalysis): try: return create_analysis(body, self.database) except Exception as e: + print(f"Error creating analysis: {e}") raise HTTPException(status_code=500, detail=f"Error creating analysis: {e}") def retrieve_all_history_call(self): diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 93eea1f..c2e400b 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -151,10 +151,19 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional print(f"Pod {name} status retrieved successfully.") if status is not None: print(f"ready:{status.ready}, started:{status.started}\n" - f"\tstate: {status.state}\n" - f"\t\trunning:{status.state.running}, started_at:{status.state.running.started_at}\n" - f"\t\twaiting:{status.state.waiting}, reason:{status.state.waiting.reason}, reason:{status.state.waiting.message}\n" - f"\t\tterminated:{status.state.terminated}, reason:{status.state.terminated.reason}, reason:{status.state.terminated.message}\n") + f"\tstate: {status.state}") + if status.state.running is not None: + print(f"\t\trunning:{status.state.running}, started_at:{status.state.running.started_at}") + else: + print(f"\t\trunning:{status.state.running}") + if status.state.waiting is not None: + print(f"\t\twaiting:{status.state.waiting}, reason:{status.state.waiting.reason}, reason:{status.state.waiting.message}") + else: + print(f"\t\twaiting:{status.state.waiting}") + if status.state.terminated is not None: + print(f"\t\tterminated:{status.state.terminated}, reason:{status.state.terminated.reason}, reason:{status.state.terminated.message}") + else: + print(f"\t\tterminated:{status.state.terminated}") else: print("No status found yet") From aa0f91e196a5a47b804e63cb575ed3750fcec46e Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 12 Mar 2026 17:06:10 +0100 Subject: [PATCH 3/7] feat: propagate partner node statuses and log stop endpoint Co-authored-by: antidodo --- src/api/api.py | 36 ++++++++++++++-------- src/k8s/kubernetes.py | 18 ----------- src/resources/log/entity.py | 8 +++++ src/status/status.py | 60 +++++++++++++++++++++++++++---------- src/utils/hub_client.py | 20 +++++++++++++ 5 files changed, 96 insertions(+), 46 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index 9d5f0ec..4a6e632 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -10,7 +10,7 @@ from src.api.oauth import valid_access_token from src.resources.database.entity import Database from src.resources.analysis.entity import CreateAnalysis -from src.resources.log.entity import CreateLogEntity +from src.resources.log.entity import CreateLogEntity, AnalysisStoppedLog from src.resources.utils import (create_analysis, retrieve_history, retrieve_logs, @@ -28,13 +28,13 @@ def __init__(self, database: Database, namespace: str = 'default'): robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() self.enable_hub_logging = enable_hub_logging - self.hub_core_client = init_hub_client_with_robot(robot_id, - robot_secret, - hub_url_core, - hub_auth, - http_proxy, - https_proxy) - self.node_id = get_node_id_by_robot(self.hub_core_client, robot_id) if self.hub_core_client else None + self.hub_client = init_hub_client_with_robot(robot_id, + robot_secret, + hub_url_core, + hub_auth, + http_proxy, + https_proxy) + self.node_id = get_node_id_by_robot(self.hub_client, robot_id) if self.hub_client else None self.namespace = namespace app = FastAPI(title="FLAME PO", docs_url="/api/docs", @@ -144,7 +144,6 @@ def create_analysis_call(self, body: CreateAnalysis): try: return create_analysis(body, self.database) except Exception as e: - print(f"Error creating analysis: {e}") raise HTTPException(status_code=500, detail=f"Error creating analysis: {e}") def retrieve_all_history_call(self): @@ -197,13 +196,26 @@ def get_pods_call(self, analysis_id: str): def stop_all_analysis_call(self): try: - return stop_analysis('all', self.database) + response = stop_analysis('all', self.database) + for analysis_id in self.database.get_analysis_ids(): + stream_logs(AnalysisStoppedLog(analysis_id), + self.node_id, + self.enable_hub_logging, + self.database, + self.hub_client) + return response except Exception as e: raise HTTPException(status_code=500, detail=f"Error stopping ALL analyzes: {e}") def stop_analysis_call(self, analysis_id: str): try: - return stop_analysis(analysis_id, self.database) + response = stop_analysis(analysis_id, self.database) + stream_logs(AnalysisStoppedLog(analysis_id), + self.node_id, + self.enable_hub_logging, + self.database, + self.hub_client) + return response except Exception as e: raise HTTPException(status_code=500, detail=f"Error stopping analysis: {e}") @@ -227,7 +239,7 @@ def cleanup_call(self, cleanup_type: str): def stream_logs_call(self, body: CreateLogEntity): try: - return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client) + return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_client) except Exception as e: raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}") diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index c2e400b..243e226 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -148,24 +148,6 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional if pod is not None: name = pod.metadata.name status = pod.status.container_statuses[0] - print(f"Pod {name} status retrieved successfully.") - if status is not None: - print(f"ready:{status.ready}, started:{status.started}\n" - f"\tstate: {status.state}") - if status.state.running is not None: - print(f"\t\trunning:{status.state.running}, started_at:{status.state.running.started_at}") - else: - print(f"\t\trunning:{status.state.running}") - if status.state.waiting is not None: - print(f"\t\twaiting:{status.state.waiting}, reason:{status.state.waiting.reason}, reason:{status.state.waiting.message}") - else: - print(f"\t\twaiting:{status.state.waiting}") - if status.state.terminated is not None: - print(f"\t\tterminated:{status.state.terminated}, reason:{status.state.terminated.reason}, reason:{status.state.terminated.message}") - else: - print(f"\t\tterminated:{status.state.terminated}") - else: - print("No status found yet") if status is not None: pod_status[name] = {} diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index a06414e..c487171 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -60,3 +60,11 @@ def __init__(self, log = '' super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status) + + +class AnalysisStoppedLog(CreateLogEntity): + def __init__(self, analysis_id: str) -> None: + log = (f"[flame -- POAPI: ANALYSISSTOPPED -- " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " + f"Info: The analysis was stopped either locally, or externally on another node.") + super().__init__(log=log, log_type="info", analysis_id=analysis_id, status="stopped") diff --git a/src/status/status.py b/src/status/status.py index 77985c5..245a922 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -1,8 +1,7 @@ import time import os -import asyncio from typing import Optional -from httpx import AsyncClient, HTTPStatusError, ConnectError, ConnectTimeout +from httpx import Client, HTTPStatusError, ConnectError, ConnectTimeout import flame_hub @@ -16,6 +15,7 @@ from src.utils.hub_client import (init_hub_client_with_robot, get_node_id_by_robot, get_node_analysis_id, + get_partner_node_statuses, update_hub_status) from src.status.constants import AnalysisStatus from src.utils.other import extract_hub_envs @@ -75,6 +75,12 @@ def status_loop(database: Database, status_loop_interval: int) -> None: # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id is not None: + # Inform local analysis of partner node statuses + _ = inform_analysis_of_partner_statuses(database, + hub_client, + analysis_id, + node_analysis_id) + # Retrieve analysis status (skip iteration if analysis is not deployed) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: @@ -119,6 +125,28 @@ def status_loop(database: Database, status_loop_interval: int) -> None: time.sleep(status_loop_interval) print(f"PO STATUS LOOP - Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") + +def inform_analysis_of_partner_statuses(database: Database, + hub_client: flame_hub.CoreClient, + analysis_id: str, + node_analysis_id: str) -> Optional[dict[str, str]]: + node_statuses = get_partner_node_statuses(hub_client, analysis_id, node_analysis_id) + deployment_name = database.get_latest_deployment(analysis_id).deployment_name + try: # try except, in case analysis api is not yet ready + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status", + headers=[('Connection', 'close')], + json={'partner_status': node_statuses}) + response.raise_for_status() + return response.json() + except HTTPStatusError as e: + print(f"\tError whilst trying to access analysis partner_status endpoint: {e}") + except ConnectError as e: + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") + except ConnectTimeout as e: + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") + return None + + def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]: analysis = database.get_latest_deployment(analysis_id) if analysis is not None: @@ -128,7 +156,7 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[ if db_status == AnalysisStatus.FINISHED.value: int_status = AnalysisStatus.FINISHED.value else: - int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) + int_status = _get_internal_deployment_status(analysis.deployment_name, analysis_id) return {'analysis_id': analysis_id, 'db_status': analysis.status, 'int_status': int_status, @@ -145,23 +173,24 @@ def _decide_status_action(db_status: str, int_status: str) -> Optional[str]: newly_ended = ((db_status in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) firmly_stuck = ((db_status in [AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.STUCK.value])) + was_stopped = int_status == AnalysisStatus.STOPPED.value if is_stuck or is_slow: return 'unstuck' elif newly_running: return 'running' - elif speedy_finished or newly_ended or firmly_stuck: + elif speedy_finished or newly_ended or firmly_stuck or was_stopped: return 'finishing' else: return None -async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: +def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: # Attempt to retrieve internal analysis status via health endpoint start_time = time.time() while True: try: - response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") - .get("/analysis/healthz", headers=[('Connection', 'close')])) + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").get("/analysis/healthz", + headers=[('Connection', 'close')]) response.raise_for_status() break except HTTPStatusError as e: @@ -172,7 +201,7 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") elapsed_time = time.time() - start_time time.sleep(1) - if elapsed_time > _INTERNAL_STATUS_TIMEOUT: # TODO: Handle case of this happening for large images + if elapsed_time > _INTERNAL_STATUS_TIMEOUT: print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds") return AnalysisStatus.FAILED.value @@ -180,9 +209,9 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str analysis_status, analysis_token_remaining_time = (response.json()['status'], response.json()['token_remaining_time']) # Check if token needs refresh, do so if needed - await _refresh_keycloak_token(deployment_name=deployment_name, - analysis_id=analysis_id, - token_remaining_time=analysis_token_remaining_time) + _refresh_keycloak_token(deployment_name=deployment_name, + analysis_id=analysis_id, + token_remaining_time=analysis_token_remaining_time) # Map status from response to preset values if analysis_status == AnalysisStatus.FINISHED.value: @@ -196,7 +225,7 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str return health_status -async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None: +def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None: """ Refresh the keycloak token :return: @@ -204,10 +233,9 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ if token_remaining_time < (int(os.getenv('STATUS_LOOP_INTERVAL')) * 2 + 1): keycloak_token = get_keycloak_token(analysis_id) try: - response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") - .post("/analysis/token_refresh", - json={'token': keycloak_token}, - headers=[('Connection', 'close')])) + response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post("/analysis/token_refresh", + json={'token': keycloak_token}, + headers=[('Connection', 'close')]) response.raise_for_status() except HTTPStatusError as e: print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 3af936b..4541a49 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -95,6 +95,26 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}") +def get_analysis_node_statuses(hub_client: flame_hub.CoreClient, analysis_id: str) -> Optional[dict[str, str]]: + try: + node_analyzes = hub_client.find_analysis_nodes(filter={'analysis_id': analysis_id}) + except (HTTPStatusError, flame_hub._exceptions.HubAPIError, AttributeError) as e: + print(f"Error: Failed to retrieve node analyzes from hub python client\n{e}") + return None + analysis_node_statuses = {} + for node in node_analyzes: + analysis_node_statuses[node.id] = node.run_status + return analysis_node_statuses + + +def get_partner_node_statuses(hub_client: flame_hub.CoreClient, + analysis_id: str, + node_analysis_id: str) -> Optional[dict[str, str]]: + analysis_node_statuses = get_analysis_node_statuses(hub_client, analysis_id) + return {k : v for k, v in analysis_node_statuses.items() if k != node_analysis_id} \ + if analysis_node_statuses is not None else None + + def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: str) -> None: """ Create a hub client for the analysis and update the current status. From 4a48ecf365a68ab7fa58897c209169eabc255e45 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Fri, 13 Mar 2026 15:53:05 +0100 Subject: [PATCH 4/7] fix: set node ids to str Co-authored-by: antidodo --- src/utils/hub_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 4541a49..2572b1b 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -103,7 +103,7 @@ def get_analysis_node_statuses(hub_client: flame_hub.CoreClient, analysis_id: st return None analysis_node_statuses = {} for node in node_analyzes: - analysis_node_statuses[node.id] = node.run_status + analysis_node_statuses[str(node.id)] = node.run_status return analysis_node_statuses From e9b0387f1d127695b8e6ea0abf6e4f183aef472f Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Mon, 16 Mar 2026 09:47:59 +0100 Subject: [PATCH 5/7] fix: add control print for nod status update to analysis Co-authored-by: antidodo --- src/status/status.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/status/status.py b/src/status/status.py index 245a922..86bafeb 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -133,6 +133,7 @@ def inform_analysis_of_partner_statuses(database: Database, node_statuses = get_partner_node_statuses(hub_client, analysis_id, node_analysis_id) deployment_name = database.get_latest_deployment(analysis_id).deployment_name try: # try except, in case analysis api is not yet ready + print(f"Node statuses to be submitted to {f'http://nginx-{deployment_name}:80'}: {node_statuses} (size={len(node_statuses)})") response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status", headers=[('Connection', 'close')], json={'partner_status': node_statuses}) From e4303e1b3cb4d3afc2ee84730e8b841904a38e3f Mon Sep 17 00:00:00 2001 From: davidhieber Date: Tue, 17 Mar 2026 11:57:41 +0100 Subject: [PATCH 6/7] refactor: add error handling for partner node status updates Co-authored-by: Nightknight3000 --- src/status/status.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index 86bafeb..8d2e2cd 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -75,11 +75,14 @@ def status_loop(database: Database, status_loop_interval: int) -> None: # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id is not None: - # Inform local analysis of partner node statuses - _ = inform_analysis_of_partner_statuses(database, - hub_client, - analysis_id, - node_analysis_id) + try: + # Inform local analysis of partner node statuses + _ = inform_analysis_of_partner_statuses(database, + hub_client, + analysis_id, + node_analysis_id) + except Exception as e: + print(f"\tPO STATUS LOOP - Error when attempting to access partner_status endpoint of {analysis_id} ({repr(e)})") # Retrieve analysis status (skip iteration if analysis is not deployed) analysis_status = _get_analysis_status(analysis_id, database) From 28ec410865c718b420f896a31175b68eaf223941 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Tue, 17 Mar 2026 12:05:41 +0100 Subject: [PATCH 7/7] refactor: remove debug prints Co-authored-by: Nightknight3000 --- src/status/status.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/status/status.py b/src/status/status.py index 8d2e2cd..97c1d34 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -136,7 +136,6 @@ def inform_analysis_of_partner_statuses(database: Database, node_statuses = get_partner_node_statuses(hub_client, analysis_id, node_analysis_id) deployment_name = database.get_latest_deployment(analysis_id).deployment_name try: # try except, in case analysis api is not yet ready - print(f"Node statuses to be submitted to {f'http://nginx-{deployment_name}:80'}: {node_statuses} (size={len(node_statuses)})") response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status", headers=[('Connection', 'close')], json={'partner_status': node_statuses})