From af903e9b3caafbbbdf9a7eba76139363193425ab Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 21 Oct 2025 12:15:42 +0200 Subject: [PATCH 01/28] fix: avoid Nonetype errors in analysis status checks Co-authored-by: antidodo --- src/status/status.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index 7fd7842..5a4ec2b 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -1,6 +1,7 @@ import time import os import asyncio +from typing import Optional from httpx import AsyncClient, HTTPStatusError, ConnectError, ConnectTimeout from src.k8s.kubernetes import PORTS @@ -70,22 +71,30 @@ def status_loop(database: Database, status_loop_interval: int) -> None: print(f"Node analysis id: {node_analysis_id}") if node_analysis_id: analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue print(f"Database status: {analysis_status['db_status']}") print(f"Internal status: {analysis_status['int_status']}") # Fix for stuck analyzes _fix_stuck_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue print(f"Unstuck analysis with internal status: {analysis_status['int_status']}") # Update created to running status if deployment responsive _update_running_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue print(f"Update created to running database status: {analysis_status['db_status']}") # update running to finished status if analysis finished _update_finished_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue print(f"Update running to finished database status: {analysis_status['db_status']}") # update hub analysis status @@ -98,18 +107,21 @@ def status_loop(database: Database, status_loop_interval: int) -> None: time.sleep(status_loop_interval) print(f"Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") -def _get_analysis_status(analysis_id: str, database: Database) -> dict[str, str]: +def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]: analysis = database.get_latest_deployment(analysis_id) - db_status = analysis.status - # Make the Finished status final, the internal status is not checked anymore, - # because the analysis will already be deleted - if db_status == AnalysisStatus.FINISHED.value: - int_status = AnalysisStatus.FINISHED.value + if analysis is not None: + db_status = analysis.status + # Make the Finished status final, the internal status is not checked anymore, + # because the analysis will already be deleted + if db_status == AnalysisStatus.FINISHED.value: + int_status = AnalysisStatus.FINISHED.value + else: + int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) + return {"analysis_id": analysis_id, + "db_status": analysis.status, + "int_status": int_status} else: - int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) - return {"analysis_id": analysis_id, - "db_status": analysis.status, - "int_status": int_status} + return None async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: From 2b0a2e571b8e9a10395fc93914f521d5844ee53d Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 21 Oct 2025 14:42:47 +0200 Subject: [PATCH 02/28] feat: implement POAPI StartupError Logs Co-authored-by: antidodo --- src/api/api.py | 2 +- src/k8s/kubernetes.py | 27 +++++++++++++++++++++++++ src/resources/log/entity.py | 33 ++++++++++++++++++++++++++++++ src/resources/utils.py | 2 +- src/status/status.py | 40 +++++++++++++++++++++++++++++++++---- 5 files changed, 98 insertions(+), 6 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index 4958fec..64cb011 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -149,7 +149,7 @@ def stream_logs_call(self, body: CreateLogEntity): print(body.json()) except Exception as e: print(f"Error printing body as json: {e}") - return stream_logs(body,self.node_id, self.database, self.hub_core_client) + return stream_logs(body, self.node_id, self.database, self.hub_core_client) def health_call(self): main_alive = threading.main_thread().is_alive() diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 35c03cb..279c675 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -216,6 +216,33 @@ def delete_analysis_pods(deployment_name: str, project_id: str, namespace: str = namespace=namespace) +def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional[dict[str, dict[str, str]]]: + core_client = client.CoreV1Api() + + # get pods in deployment + pods = core_client.list_namespaced_pod(namespace=namespace, label_selector=f'app={deployment_name}').items + + if pods is not None: + pod_status = {} + for pod in pods: + if pod is not None: + name = pod.metadata.name + status = pod.status.phase + + if status is not None: + pod_status[name] = {} + pod_status[name]['status'] = status + if status == "Failed": + pod_status[name]['reason'] = str(pod.status.reason) + pod_status[name]['message'] = str(pod.status.message) + if pod_status: + return pod_status + else: + return None + else: + return None + + def _create_analysis_nginx_deployment(analysis_name: str, analysis_service_name: str, analysis_env: dict[str, str] = {}, diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 1d1f0dd..4c56a1e 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -1,9 +1,12 @@ import uuid +import time from datetime import datetime from typing import Literal from pydantic import BaseModel +from src.status.status import _MAX_RESTARTS + class LogEntity(BaseModel): log: str @@ -26,3 +29,33 @@ class CreateLogEntity(BaseModel): def to_log_entity(self) -> LogEntity: return LogEntity(log=self.log, log_type=self.log_type) + + +class CreateStartUpErrorLog(CreateLogEntity): + def __init__(self, + restart_num: int, + error_type: Literal["stuck", "slow", "k8s"], + analysis_id: str, + status: str, + k8s_error_msg: str = '') -> None: + if error_type == "stuck": + log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " + f"Error: The analysis failed to connect to other node components " + f"[restart {restart_num} of {_MAX_RESTARTS}].") + elif error_type == "slow": + log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " + f"Error: The analysis took to long during startup and was restarted " + f"[restart {restart_num} of {_MAX_RESTARTS}].") + elif error_type == "k8s": + log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " + f"Error: The analysis failed to deploy in kubernetes " + f"[restart {restart_num} of {_MAX_RESTARTS}].)") + if k8s_error_msg: + log += f"\n\tKubernetesApiError: {k8s_error_msg}." + else: + log = '' + + super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status) diff --git a/src/resources/utils.py b/src/resources/utils.py index 9543809..1ca3e77 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -217,7 +217,7 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: return result_str -def stream_logs(log_entity: CreateLogEntity,node_id: str, database: Database, hub_core_client: CoreClient) -> None: +def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) except IndexError as e: diff --git a/src/status/status.py b/src/status/status.py index 7fd7842..8d44984 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -6,14 +6,17 @@ import flame_hub -from src.resources.database.entity import Database +from src.resources.database.entity import Database, AnalysisDB from src.utils.hub_client import (init_hub_client_with_robot, get_node_id_by_robot, get_node_analysis_id, update_hub_status) from src.resources.utils import (unstuck_analysis_deployments, stop_analysis, - delete_analysis) + delete_analysis, + stream_logs) +from src.resources.log.entity import CreateStartUpErrorLog +from src.k8s.kubernetes import get_pod_status from src.status.constants import AnalysisStatus from src.utils.token import get_keycloak_token @@ -74,7 +77,7 @@ def status_loop(database: Database, status_loop_interval: int) -> None: print(f"Internal status: {analysis_status['int_status']}") # Fix for stuck analyzes - _fix_stuck_status(database, analysis_status) + _fix_stuck_status(database, analysis_status, node_id, hub_client) analysis_status = _get_analysis_status(analysis_id, database) print(f"Unstuck analysis with internal status: {analysis_status['int_status']}") @@ -166,7 +169,10 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ print(f"Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") -def _fix_stuck_status(database: Database, analysis_status: dict[str, str]) -> None: +def _fix_stuck_status(database: Database, + analysis_status: dict[str, str], + node_id: str, + hub_client: flame_hub.CoreClient) -> None: # Deployment selection is_stuck = analysis_status['int_status'] == AnalysisStatus.STUCK.value is_slow = ((analysis_status['int_status'] in [AnalysisStatus.FAILED.value]) and @@ -181,10 +187,36 @@ def _fix_stuck_status(database: Database, analysis_status: dict[str, str]) -> No # Tracking restarts if analysis.restart_counter < _MAX_RESTARTS: unstuck_analysis_deployments(analysis_status["analysis_id"], database) + _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) else: database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) +def _stream_stuck_logs(analysis: AnalysisDB, + node_id: str, + database: Database, + hub_client: flame_hub.CoreClient, + is_slow: bool) -> None: + is_k8s_related = False + if is_slow: + deployment_name = analysis.deployment_name + pod_status_dict = get_pod_status(deployment_name) + if pod_status_dict is not None: + pod_name, pod_status_dict = list(pod_status_dict.items())[-1] + status, reason, message = pod_status_dict['status'], pod_status_dict['reason'], pod_status_dict['message'] + if status == "Failed": + is_k8s_related = True + + stream_logs(CreateStartUpErrorLog(analysis.restart_counter, + ("k8s" if is_k8s_related else "slow") if is_slow else "stuck", + analysis.analysis_id, + analysis.status, + k8s_error_msg=f"{status} (reason={reason}): {message}" if is_k8s_related else ''), + node_id, + database, + hub_client) + + def _update_running_status(database: Database, analysis_status: dict[str, str]) -> None: newly_running = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and (analysis_status['int_status'] in [AnalysisStatus.RUNNING.value])) From 8454b5cd415d47f1f469dc378e2ae25359075319 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Tue, 28 Oct 2025 11:50:13 +0100 Subject: [PATCH 03/28] fix: add internal status timeout and max restarts constants Co-authored-by: Nightknight3000 --- src/resources/log/entity.py | 2 +- src/status/constants.py | 6 ++++++ src/status/status.py | 4 +--- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 4c56a1e..58ddc9c 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -5,7 +5,7 @@ from pydantic import BaseModel -from src.status.status import _MAX_RESTARTS +from src.status.constants import _MAX_RESTARTS class LogEntity(BaseModel): diff --git a/src/status/constants.py b/src/status/constants.py index 5733251..113e462 100644 --- a/src/status/constants.py +++ b/src/status/constants.py @@ -1,6 +1,12 @@ from enum import Enum +_INTERNAL_STATUS_TIMEOUT = 10 # Time in seconds to wait for internal status response + + +_MAX_RESTARTS = 10 # Maximum number of restarts for a stuck analysis + + class AnalysisStatus(Enum): STARTING = 'starting' STARTED = 'started' diff --git a/src/status/status.py b/src/status/status.py index 2628b7a..a5615c3 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -21,9 +21,7 @@ from src.status.constants import AnalysisStatus from src.utils.token import get_keycloak_token - -_INTERNAL_STATUS_TIMEOUT = 10 # Time in seconds to wait for internal status response -_MAX_RESTARTS = 10 # Maximum number of restarts for a stuck analysis +from src.status.constants import _MAX_RESTARTS, _INTERNAL_STATUS_TIMEOUT def status_loop(database: Database, status_loop_interval: int) -> None: From c9b56a8ad3a7a6423bcc5109d8f26f3e2348538c Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 12:30:29 +0100 Subject: [PATCH 04/28] fix: init pod_status_dict fields Co-authored-by: antidodo --- src/k8s/kubernetes.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 7f66e0d..b32c8af 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -235,6 +235,9 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional if status == "Failed": pod_status[name]['reason'] = str(pod.status.reason) pod_status[name]['message'] = str(pod.status.message) + else: + pod_status[name]['reason'] = '' + pod_status[name]['message'] = '' if pod_status: return pod_status else: From b0a37ca27a47be5d28372861240ce66697f699fc Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 13:29:25 +0100 Subject: [PATCH 05/28] fix: checks within hub_client init Co-authored-by: antidodo --- src/utils/hub_client.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index b19a683..b8d03f5 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -103,15 +103,18 @@ def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: s os.getenv('PO_HTTP_PROXY'), os.getenv('PO_HTTPS_PROXY')) hub_client = init_hub_client_with_robot(robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy) - if hub_client is None: + if hub_client is not None: + node_id = get_node_id_by_robot(hub_client, robot_id) + if node_id is not None: + node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) + if node_analysis_id is not None: + update_hub_status(hub_client, node_analysis_id, run_status=status) + else: + print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") + else: + print("Failed to retrieve node_id from hub client. Cannot update status.") + else: print("Failed to initialize hub client. Cannot update status.") - node_id = get_node_id_by_robot(hub_client, robot_id) - if node_id is None: - print("Failed to retrieve node_id from hub client. Cannot update status.") - node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) - if node_id is None: - print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") - update_hub_status(hub_client, node_analysis_id, run_status=status) # TODO: Import this from flame sdk? (from flamesdk import HUB_LOG_LITERALS) From 8bf79af57a838a10a0d845e902bdfe6991efdb77 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 13:57:45 +0100 Subject: [PATCH 06/28] fix: final startup error logs and status update Co-authored-by: antidodo --- src/resources/log/entity.py | 7 ++++--- src/status/status.py | 9 +++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 58ddc9c..4f369f4 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -38,21 +38,22 @@ def __init__(self, analysis_id: str, status: str, k8s_error_msg: str = '') -> None: + term_msg = "" if restart_num < _MAX_RESTARTS else " -> Terminating analysis as failed." if error_type == "stuck": log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " f"Error: The analysis failed to connect to other node components " - f"[restart {restart_num} of {_MAX_RESTARTS}].") + f"[restart {restart_num} of {_MAX_RESTARTS}].{term_msg}") elif error_type == "slow": log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " f"Error: The analysis took to long during startup and was restarted " - f"[restart {restart_num} of {_MAX_RESTARTS}].") + f"[restart {restart_num} of {_MAX_RESTARTS}].{term_msg}") elif error_type == "k8s": log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " f"Error: The analysis failed to deploy in kubernetes " - f"[restart {restart_num} of {_MAX_RESTARTS}].)") + f"[restart {restart_num} of {_MAX_RESTARTS}].{term_msg}") if k8s_error_msg: log += f"\n\tKubernetesApiError: {k8s_error_msg}." else: diff --git a/src/status/status.py b/src/status/status.py index a5615c3..a42144a 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -201,6 +201,7 @@ def _fix_stuck_status(database: Database, _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) else: database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) + _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) def _stream_stuck_logs(analysis: AnalysisDB, @@ -260,11 +261,11 @@ def _set_analysis_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, analysis_status: dict[str, str]) -> str: if analysis_status['db_status'] in [AnalysisStatus.FAILED.value, - AnalysisStatus.FINISHED.value]: - analysis_hub_status = AnalysisStatus.FINISHED.value + AnalysisStatus.FINISHED.value]: + analysis_hub_status = analysis_status['db_status'] elif analysis_status['int_status'] in [AnalysisStatus.FAILED.value, - AnalysisStatus.FINISHED.value, - AnalysisStatus.RUNNING.value]: + AnalysisStatus.FINISHED.value, + AnalysisStatus.RUNNING.value]: analysis_hub_status = analysis_status['int_status'] else: analysis_hub_status = analysis_status['db_status'] From 469690b9d519ac5a31a1a2d24f06293b00e5bcb8 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 14:53:01 +0100 Subject: [PATCH 07/28] feat: actively update db analysis status through logs Co-authored-by: antidodo --- src/resources/utils.py | 1 + src/utils/hub_client.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/src/resources/utils.py b/src/resources/utils.py index d10c736..a0ff6e8 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -271,6 +271,7 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) + database.update_analysis_status(log_entity.analysis_id, log_entity.status) except IndexError as e: print(f"Error updating analysis log in database: {e}") print(f"sending logs to hub client") diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index b8d03f5..0d0b0d4 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -15,6 +15,8 @@ import flame_hub +from src.status.constants import AnalysisStatus + def init_hub_client_with_robot(robot_id: str, robot_secret: str, @@ -108,6 +110,8 @@ def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: s if node_id is not None: node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) if node_analysis_id is not None: + if status == AnalysisStatus.STUCK.value: + status = AnalysisStatus.FAILED.value update_hub_status(hub_client, node_analysis_id, run_status=status) else: print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") From cbe7cb6a44aa626eac5ac2c83a1b710f6d792422 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 15:10:44 +0100 Subject: [PATCH 08/28] fix: move stuck check Co-authored-by: antidodo --- src/utils/hub_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 0d0b0d4..27784e0 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -88,6 +88,8 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r Update the status of the analysis in the hub. """ try: + if run_status == AnalysisStatus.STUCK.value: + run_status = AnalysisStatus.FAILED.value hub_client.update_analysis_node(node_analysis_id, run_status=run_status) print(f"Updated hub status to {run_status} for node_analysis_id {node_analysis_id}") except (HTTPStatusError, ConnectError) as e: @@ -110,8 +112,6 @@ def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: s if node_id is not None: node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) if node_analysis_id is not None: - if status == AnalysisStatus.STUCK.value: - status = AnalysisStatus.FAILED.value update_hub_status(hub_client, node_analysis_id, run_status=status) else: print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") From c224ad44fa4587cc1fe16b02da5d746427946f29 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 15:27:02 +0100 Subject: [PATCH 09/28] fix: delete firmly stuck analyzes Co-authored-by: antidodo --- src/status/status.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/status/status.py b/src/status/status.py index a42144a..f4c27cb 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -243,7 +243,9 @@ def _update_finished_status(database: Database, analysis_status: dict[str, str]) AnalysisStatus.FAILED.value]) and (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) - if newly_ended: + firmly_stuck = ((analysis_status['db_status'] in [AnalysisStatus.FAILED.value]) + and (analysis_status['int_status'] in [AnalysisStatus.STUCK.value])) + if newly_ended or firmly_stuck: analysis = database.get_latest_deployment(analysis_status["analysis_id"]) if analysis is not None: database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) From 7680435895fc55583abb7d1d9ea99d33fb2330cd Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 16:09:58 +0100 Subject: [PATCH 10/28] fix: comment out db status updates through logs Co-authored-by: antidodo --- src/resources/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/resources/utils.py b/src/resources/utils.py index a0ff6e8..aac65a2 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -271,7 +271,7 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) - database.update_analysis_status(log_entity.analysis_id, log_entity.status) + #database.update_analysis_status(log_entity.analysis_id, log_entity.status) except IndexError as e: print(f"Error updating analysis log in database: {e}") print(f"sending logs to hub client") From 986934b89afbbf5bda256603704fe809e8f6f405 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Tue, 28 Oct 2025 16:37:27 +0100 Subject: [PATCH 11/28] fix: running analysis check Co-authored-by: antidodo --- src/resources/database/entity.py | 8 ++++++++ src/status/status.py | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/resources/database/entity.py b/src/resources/database/entity.py index 443e909..395d2e8 100644 --- a/src/resources/database/entity.py +++ b/src/resources/database/entity.py @@ -40,6 +40,14 @@ def get_latest_deployment(self, analysis_id: str) -> Optional[AnalysisDB]: return deployments[-1] return None + def analysis_is_running(self, analysis_id: str) -> bool: + latest_deployment = self.get_latest_deployment(analysis_id) + if latest_deployment is not None: + return latest_deployment.status not in [AnalysisStatus.FINISHED.value, + AnalysisStatus.STOPPED.value, + AnalysisStatus.FAILED.value] + return False + def get_deployments(self, analysis_id: str) -> list[AnalysisDB]: with self.SessionLocal() as session: return session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() diff --git a/src/status/status.py b/src/status/status.py index f4c27cb..0c9433f 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -57,8 +57,10 @@ def status_loop(database: Database, status_loop_interval: int) -> None: continue else: # If running analyzes exist, enter status loop - print(f"Checking for running analyzes...{database.get_analysis_ids()}") - if database.get_analysis_ids(): + running_analyzes = [analysis_id for analysis_id in database.get_analysis_ids() + if database.analysis_is_running(analysis_id)] + print(f"Checking for running analyzes...{running_analyzes}") + if running_analyzes: for analysis_id in set(database.get_analysis_ids()): # Get node analysis id if analysis_id not in node_analysis_ids.keys(): From da2aaa3fb7db96c7e33ef2e0300056ebcb9a8106 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 15:13:18 +0100 Subject: [PATCH 12/28] refactor: console prints Co-authored-by: antidodo --- src/api/api.py | 8 -------- src/k8s/kubernetes.py | 1 - src/resources/database/entity.py | 2 -- src/resources/utils.py | 8 +------- src/status/status.py | 27 ++++++++++++++------------- src/utils/hub_client.py | 3 --- 6 files changed, 15 insertions(+), 34 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index d6ae816..f638f17 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -189,14 +189,6 @@ def get_service_status_call(self): pass def stream_logs_call(self, body: CreateLogEntity): - try: - print(body) - except Exception as e: - print(f"Error printing body: {e}") - try: - print(body.json()) - except Exception as e: - print(f"Error printing body as json: {e}") return stream_logs(body, self.node_id, self.database, self.hub_core_client) def health_call(self): diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index b32c8af..ce659df 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -375,7 +375,6 @@ def _create_nginx_config_map(analysis_name: str, if len(pod_list_object.items) > 0: analysis_ip = pod_list_object.items[0].status.pod_ip - print(analysis_ip) time.sleep(1) # get the name of the hub adapter, kong proxy, and result service diff --git a/src/resources/database/entity.py b/src/resources/database/entity.py index 395d2e8..0972013 100644 --- a/src/resources/database/entity.py +++ b/src/resources/database/entity.py @@ -90,7 +90,6 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: for deployment in analysis: if deployment: for key, value in kwargs.items(): - print(f"in update analysis Setting {key} to {value}") setattr(deployment, key, value) session.commit() @@ -99,7 +98,6 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB: with self.SessionLocal() as session: deployment = session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first() - print(kwargs.items()) for key, value in kwargs.items(): setattr(deployment, key, value) session.commit() diff --git a/src/resources/utils.py b/src/resources/utils.py index aac65a2..4c73a2b 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -138,7 +138,6 @@ def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: for analysis_id, deployment in deployments.items(): # save logs as string to database (will be read as dict in retrieve_history) log = str(get_analysis_logs({analysis_id: deployment.deployment_name}, database=database)) - print(f"log to be saved in stop_analysis for {deployment.deployment_name}: {log[:10]}...") if deployment.status in [AnalysisStatus.FAILED.value, AnalysisStatus.FINISHED.value]: deployment.stop(database, log=log, status=deployment.status) @@ -237,7 +236,6 @@ def cleanup(cleanup_type: str, for client in _get_all_keycloak_clients(): if client['clientId'] not in analysis_ids and client['name'].startswith('flame-'): delete_keycloak_client(client['clientId']) - print(client) else: response_content[cleanup_type] = f"Unknown cleanup type: {cleanup_type} (known types: 'all', " + \ @@ -274,14 +272,10 @@ def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, h #database.update_analysis_status(log_entity.analysis_id, log_entity.status) except IndexError as e: print(f"Error updating analysis log in database: {e}") - print(f"sending logs to hub client") - # log to hub - print(f"analysis_id: {log_entity.analysis_id}, node_id: {node_id}, ") - print(f"status: {log_entity.status}, level: {log_entity.log_type}, message: {log_entity.log}") + # log to hub hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, node_id=node_id, status=log_entity.status, level=log_entity.log_type, message=log_entity.log) - print(f"sent logs to hub client") diff --git a/src/status/status.py b/src/status/status.py index 0c9433f..e44d703 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -62,6 +62,7 @@ def status_loop(database: Database, status_loop_interval: int) -> None: print(f"Checking for running analyzes...{running_analyzes}") if running_analyzes: for analysis_id in set(database.get_analysis_ids()): + print(f"Current analysis id: {analysis_id}") # Get node analysis id if analysis_id not in node_analysis_ids.keys(): node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) @@ -71,38 +72,38 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_analysis_id = node_analysis_ids[analysis_id] # If node analysis id found - print(f"Node analysis id: {node_analysis_id}") + print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id: analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue - print(f"Database status: {analysis_status['db_status']}") - print(f"Internal status: {analysis_status['int_status']}") + print(f"\tDatabase status: {analysis_status['db_status']}") + print(f"\tInternal status: {analysis_status['int_status']}") # Fix for stuck analyzes _fix_stuck_status(database, analysis_status, node_id, hub_client) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue - print(f"Unstuck analysis with internal status: {analysis_status['int_status']}") + print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") # Update created to running status if deployment responsive _update_running_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue - print(f"Update created to running database status: {analysis_status['db_status']}") + print(f"\tUpdate created to running database status: {analysis_status['db_status']}") # update running to finished status if analysis finished _update_finished_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue - print(f"Update running to finished database status: {analysis_status['db_status']}") + print(f"\tUpdate running to finished database status: {analysis_status['db_status']}") # update hub analysis status analysis_hub_status = _set_analysis_hub_status(hub_client, node_analysis_id, analysis_status) - print(f"Set Hub analysis status with node_analysis={node_analysis_id}, " + print(f"\tSet Hub analysis status with node_analysis={node_analysis_id}, " f"db_status={analysis_status['db_status']}, " f"internal_status={analysis_status['int_status']} " f"to {analysis_hub_status}") @@ -136,15 +137,15 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str response.raise_for_status() break except HTTPStatusError as e: - print(f"Error getting internal deployment status: {e}") + print(f"\tError getting internal deployment status: {e}") except ConnectError as e: - print(f"Connection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") except ConnectTimeout as e: - print(f"Connection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") + print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") elapsed_time = time.time() - start_time time.sleep(1) if elapsed_time > _INTERNAL_STATUS_TIMEOUT: - print(f"Timeout getting internal deployment status after {elapsed_time} seconds") + print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds") return AnalysisStatus.FAILED.value analysis_status, analysis_token_remaining_time = (response.json()['status'], @@ -252,12 +253,12 @@ def _update_finished_status(database: Database, analysis_status: dict[str, str]) if analysis is not None: database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) if analysis_status['int_status'] == AnalysisStatus.FINISHED.value: - print("Delete deployment") + print("\tDelete deployment") # TODO: final local log save (minio?) # archive logs # delete_analysis(analysis_status['analysis_id'], database) # delete analysis from database stop_analysis(analysis_status['analysis_id'], database) # stop analysis TODO: Change to delete in the future (when archive logs implemented) else: - print("Stop deployment") + print("\tStop deployment") stop_analysis(analysis_status['analysis_id'], database) # stop analysis diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 27784e0..0576008 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -59,7 +59,6 @@ def get_ssl_context() -> ssl.SSLContext: def get_node_id_by_robot(hub_client: flame_hub.CoreClient, robot_id: str) -> Optional[str]: try: node_id_object = hub_client.find_nodes(filter={"robot_id": robot_id})[0] - print(f"Found node id object: {node_id_object}") except (HTTPStatusError, JSONDecodeError, ConnectTimeout) as e: print(f"Error in hub python client whilst retrieving node id object!\n{e}") node_id_object = None @@ -70,7 +69,6 @@ def get_node_analysis_id(hub_client: flame_hub.CoreClient, analysis_id: str, nod try: node_analyzes = hub_client.find_analysis_nodes(filter={"analysis_id": analysis_id, "node_id": node_id_object_id}) - print(f"Found node analyzes: {node_analyzes}") except HTTPStatusError as e: print(f"Error in hub python client whilst retrieving node analyzes!\n{e}") node_analyzes = None @@ -91,7 +89,6 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r if run_status == AnalysisStatus.STUCK.value: run_status = AnalysisStatus.FAILED.value hub_client.update_analysis_node(node_analysis_id, run_status=run_status) - print(f"Updated hub status to {run_status} for node_analysis_id {node_analysis_id}") except (HTTPStatusError, ConnectError) as e: print(f"Failed to update hub status for node_analysis_id {node_analysis_id}.\n{e}") From 65536125b029564f13bf765a6f35d77081d73001 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 15:14:32 +0100 Subject: [PATCH 13/28] refactor: imports and functions Co-authored-by: antidodo --- src/k8s/utils.py | 15 --------------- src/resources/utils.py | 6 ++---- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/src/k8s/utils.py b/src/k8s/utils.py index 7b37e60..7e3fbd9 100644 --- a/src/k8s/utils.py +++ b/src/k8s/utils.py @@ -52,21 +52,6 @@ def get_k8s_resource_names(resource_type: str, return None -def get_all_analysis_deployment_names(namespace: str = 'default') -> list[str]: - """ - Get all analysis deployments in the specified namespace. - :param namespace: The namespace to search for deployments. - :return: A list of deployment names. - """ - analysis_deployment_names = get_k8s_resource_names('deployment', - 'label', - 'component=flame-analysis', - namespace=namespace) - analysis_deployment_names = [analysis_deployment_names] if type(analysis_deployment_names) == str \ - else analysis_deployment_names - return analysis_deployment_names - - def get_current_namespace() -> str: namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' try: diff --git a/src/resources/utils.py b/src/resources/utils.py index 4c73a2b..8b84739 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -1,6 +1,6 @@ import ast import time -from typing import Union, Optional +from typing import Union from flame_hub import CoreClient @@ -10,10 +10,8 @@ from src.status.constants import AnalysisStatus from src.k8s.kubernetes import (create_harbor_secret, get_analysis_logs, - delete_deployment, - delete_analysis_pods, delete_resource) -from src.k8s.utils import get_current_namespace, get_all_analysis_deployment_names, get_k8s_resource_names +from src.k8s.utils import get_current_namespace, get_k8s_resource_names from src.utils.token import _get_all_keycloak_clients from src.utils.token import delete_keycloak_client from src.utils.hub_client import init_hub_client_and_update_hub_status_with_robot From 51127fc4960d556078f90c2012c9b4d3a7c8065c Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 15:16:12 +0100 Subject: [PATCH 14/28] fix: running_analyzes iteration and cleanup endpoint (zombie check) Co-authored-by: antidodo --- src/resources/utils.py | 9 ++++----- src/status/status.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/resources/utils.py b/src/resources/utils.py index 8b84739..942f293 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -185,7 +185,6 @@ def unstuck_analysis_deployments(analysis_id: str, database: Database) -> None: for deployment in deployments: if deployment.status == AnalysisStatus.STUCK.value: - #delete_analysis_pods(deployment.deployment_name, deployment.project_id, get_current_namespace()) stop_analysis(analysis_id, database) time.sleep(10) # wait for k8s to update status create_analysis(analysis_id, database) @@ -203,7 +202,7 @@ def cleanup(cleanup_type: str, response_content = {} for cleanup_type in cleanup_types: - if cleanup_type in ['all', 'analyzes', 'services', 'mb', 'rs', 'keycloak']: + if cleanup_type in ['zombies', 'all', 'analyzes', 'services', 'mb', 'rs', 'keycloak']: # Analysis cleanup if cleanup_type in ['all', 'analyzes']: # cleanup all analysis deployments, associated services, policies and configmaps @@ -236,8 +235,8 @@ def cleanup(cleanup_type: str, delete_keycloak_client(client['clientId']) else: - response_content[cleanup_type] = f"Unknown cleanup type: {cleanup_type} (known types: 'all', " + \ - "'analyzes','keycloak' , 'services', 'mb', and 'rs')" + response_content[cleanup_type] = f"Unknown cleanup type: {cleanup_type} (known types: 'zombies', 'all', " +\ + "'analyzes', 'keycloak', 'services', 'mb', and 'rs')" response_content['zombies'] = clean_up_the_rest(database, namespace) return response_content @@ -267,7 +266,7 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) - #database.update_analysis_status(log_entity.analysis_id, log_entity.status) + #database.update_analysis_status(log_entity.analysis_id, log_entity.status) #TODO: Implement this? except IndexError as e: print(f"Error updating analysis log in database: {e}") diff --git a/src/status/status.py b/src/status/status.py index e44d703..bf0c1bd 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -61,7 +61,7 @@ def status_loop(database: Database, status_loop_interval: int) -> None: if database.analysis_is_running(analysis_id)] print(f"Checking for running analyzes...{running_analyzes}") if running_analyzes: - for analysis_id in set(database.get_analysis_ids()): + for analysis_id in running_analyzes: print(f"Current analysis id: {analysis_id}") # Get node analysis id if analysis_id not in node_analysis_ids.keys(): From fe952d8cff619c50a1d6c97cd113454d11da7e27 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 15:53:20 +0100 Subject: [PATCH 15/28] fix: account for speedy analysis terminations Co-authored-by: antidodo --- src/status/status.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index bf0c1bd..96692c1 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -189,8 +189,8 @@ def _fix_stuck_status(database: Database, hub_client: flame_hub.CoreClient) -> None: # Deployment selection is_stuck = analysis_status['int_status'] == AnalysisStatus.STUCK.value - is_slow = ((analysis_status['int_status'] in [AnalysisStatus.FAILED.value]) and - (analysis_status['db_status'] in [AnalysisStatus.STARTED.value])) + is_slow = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and + (analysis_status['int_status'] in [AnalysisStatus.FAILED.value])) # Update Status if is_stuck or is_slow: @@ -242,13 +242,15 @@ def _update_running_status(database: Database, analysis_status: dict[str, str]) def _update_finished_status(database: Database, analysis_status: dict[str, str]) -> None: + speedy_finished = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and + (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value])) newly_ended = ((analysis_status['db_status'] in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value]) and (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) firmly_stuck = ((analysis_status['db_status'] in [AnalysisStatus.FAILED.value]) and (analysis_status['int_status'] in [AnalysisStatus.STUCK.value])) - if newly_ended or firmly_stuck: + if speedy_finished or newly_ended or firmly_stuck: analysis = database.get_latest_deployment(analysis_status["analysis_id"]) if analysis is not None: database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) From fb8ff622824e6f01daf849aac11f68f8591c527f Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 16:42:57 +0100 Subject: [PATCH 16/28] fix: unstuck logic Co-authored-by: antidodo --- src/resources/database/entity.py | 2 +- src/resources/utils.py | 17 +++++------------ src/status/constants.py | 2 +- src/status/status.py | 6 +++--- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/resources/database/entity.py b/src/resources/database/entity.py index 0972013..93605e8 100644 --- a/src/resources/database/entity.py +++ b/src/resources/database/entity.py @@ -175,7 +175,7 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]: "restart_counter": analysis.restart_counter} return None - def delete_old_deployments_db(self, analysis_id: str) -> None: + def delete_old_deployments_from_db(self, analysis_id: str) -> None: deployments = self.get_deployments(analysis_id) deployments = sorted(deployments, key=lambda x: x.time_created, reverse=True) for deployment in deployments[1:]: diff --git a/src/resources/utils.py b/src/resources/utils.py index 942f293..b580e61 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -181,18 +181,11 @@ def delete_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: def unstuck_analysis_deployments(analysis_id: str, database: Database) -> None: - deployments = [read_db_analysis(deployment) for deployment in database.get_deployments(analysis_id)] - - for deployment in deployments: - if deployment.status == AnalysisStatus.STUCK.value: - stop_analysis(analysis_id, database) - time.sleep(10) # wait for k8s to update status - create_analysis(analysis_id, database) - database.delete_old_deployments_db(analysis_id) - - deployment = database.get_deployments(analysis_id)[0] - database.update_deployment_status(deployment.deployment_name, AnalysisStatus.STARTED.value) - break + if database.get_latest_deployment(analysis_id) is not None: + stop_analysis(analysis_id, database) + time.sleep(10) # wait for k8s to update status + create_analysis(analysis_id, database) + database.delete_old_deployments_from_db(analysis_id) def cleanup(cleanup_type: str, diff --git a/src/status/constants.py b/src/status/constants.py index 113e462..0cff50f 100644 --- a/src/status/constants.py +++ b/src/status/constants.py @@ -11,7 +11,7 @@ class AnalysisStatus(Enum): STARTING = 'starting' STARTED = 'started' - STUCK = 'stuck' + STUCK = 'stuck' # internal analysis status only RUNNING = 'running' STOPPING = 'stopping' diff --git a/src/status/status.py b/src/status/status.py index 96692c1..4696e8c 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -196,14 +196,13 @@ def _fix_stuck_status(database: Database, if is_stuck or is_slow: analysis = database.get_latest_deployment(analysis_status["analysis_id"]) if analysis is not None: - database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.STUCK.value) + database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) # Tracking restarts if analysis.restart_counter < _MAX_RESTARTS: - unstuck_analysis_deployments(analysis_status["analysis_id"], database) _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + unstuck_analysis_deployments(analysis_status["analysis_id"], database) else: - database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) @@ -216,6 +215,7 @@ def _stream_stuck_logs(analysis: AnalysisDB, if is_slow: deployment_name = analysis.deployment_name pod_status_dict = get_pod_status(deployment_name) + print(pod_status_dict) if pod_status_dict is not None: pod_name, pod_status_dict = list(pod_status_dict.items())[-1] status, reason, message = pod_status_dict['status'], pod_status_dict['reason'], pod_status_dict['message'] From 1c4b9fc3c105f45d696bd0f3e824cb2585ef76e5 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 17:11:09 +0100 Subject: [PATCH 17/28] fix: KubernetesAPIError forwarding Co-authored-by: antidodo --- src/k8s/kubernetes.py | 10 +++++----- src/status/status.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index ce659df..089f975 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -227,14 +227,14 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional for pod in pods: if pod is not None: name = pod.metadata.name - status = pod.status.phase + status = pod.status.container_statuses[0] if status is not None: pod_status[name] = {} - pod_status[name]['status'] = status - if status == "Failed": - pod_status[name]['reason'] = str(pod.status.reason) - pod_status[name]['message'] = str(pod.status.message) + pod_status[name]['ready'] = status.ready + if not status.ready: + pod_status[name]['reason'] = str(status.state.waiting.reason) + pod_status[name]['message'] = str(status.state.waiting.message) else: pod_status[name]['reason'] = '' pod_status[name]['message'] = '' diff --git a/src/status/status.py b/src/status/status.py index 4696e8c..6741e0d 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -217,16 +217,16 @@ def _stream_stuck_logs(analysis: AnalysisDB, pod_status_dict = get_pod_status(deployment_name) print(pod_status_dict) if pod_status_dict is not None: - pod_name, pod_status_dict = list(pod_status_dict.items())[-1] - status, reason, message = pod_status_dict['status'], pod_status_dict['reason'], pod_status_dict['message'] - if status == "Failed": + _, pod_status_dict = list(pod_status_dict.items())[-1] + ready, reason, message = pod_status_dict['ready'], pod_status_dict['reason'], pod_status_dict['message'] + if not ready: is_k8s_related = True stream_logs(CreateStartUpErrorLog(analysis.restart_counter, ("k8s" if is_k8s_related else "slow") if is_slow else "stuck", analysis.analysis_id, analysis.status, - k8s_error_msg=f"{status} (reason={reason}): {message}" if is_k8s_related else ''), + k8s_error_msg=f"{ready} (reason={reason}): {message}" if is_k8s_related else ''), node_id, database, hub_client) From b9ea9e25b65e143463a483ded503b084b2ef940e Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 6 Nov 2025 17:39:31 +0100 Subject: [PATCH 18/28] refactor: clean forwarded KubernetesAPIError log Co-authored-by: antidodo --- src/status/status.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index 6741e0d..2df5e96 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -215,18 +215,19 @@ def _stream_stuck_logs(analysis: AnalysisDB, if is_slow: deployment_name = analysis.deployment_name pod_status_dict = get_pod_status(deployment_name) - print(pod_status_dict) if pod_status_dict is not None: _, pod_status_dict = list(pod_status_dict.items())[-1] ready, reason, message = pod_status_dict['ready'], pod_status_dict['reason'], pod_status_dict['message'] if not ready: is_k8s_related = True + print(f"\tDeployment of analysis={analysis.analysis_id} failed (ready={ready}).\n" + f"\t\t{reason}: {message}") stream_logs(CreateStartUpErrorLog(analysis.restart_counter, ("k8s" if is_k8s_related else "slow") if is_slow else "stuck", analysis.analysis_id, analysis.status, - k8s_error_msg=f"{ready} (reason={reason}): {message}" if is_k8s_related else ''), + k8s_error_msg=reason if is_k8s_related else ''), node_id, database, hub_client) From d1d3bed7640ab8a163097a07b1bb9dae4e54ed42 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Fri, 7 Nov 2025 13:23:57 +0100 Subject: [PATCH 19/28] fix: comment out access token dependency for history route for testing Co-authored-by: Nightknight3000 --- src/api/api.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index f638f17..1b907e7 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -63,22 +63,22 @@ def __init__(self, database: Database, namespace: str = 'default'): response_class=JSONResponse) router.add_api_route("/history", self.retrieve_all_history_call, - dependencies=[Depends(valid_access_token)], + #dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/history/{analysis_id}", self.retrieve_history_call, - dependencies=[Depends(valid_access_token)], + #dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/logs", self.retrieve_all_logs_call, - dependencies=[Depends(valid_access_token)], + #dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/logs/{analysis_id}", self.retrieve_logs_call, - dependencies=[Depends(valid_access_token)], + #dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/status", From 2a08146868e6ccc6931325cf451dc2e6c966aacb Mon Sep 17 00:00:00 2001 From: davidhieber Date: Fri, 7 Nov 2025 14:02:32 +0100 Subject: [PATCH 20/28] fix: update log retrieval to use analysis_id for accurate mapping Co-authored-by: Nightknight3000 --- src/resources/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/resources/utils.py b/src/resources/utils.py index b580e61..692ddf5 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -74,8 +74,8 @@ def retrieve_history(analysis_id_str: str, database: Database) -> dict[str, dict for analysis_id, deployment in deployments.items(): # interpret log string as a dictionary log = ast.literal_eval(deployment.log) - analysis_logs[analysis_id] = log["analysis"][deployment.deployment_name] - nginx_logs[analysis_id] = log["nginx"][f"nginx-{deployment.deployment_name}"] + analysis_logs[analysis_id] = log["analysis"][analysis_id] + nginx_logs[analysis_id] = log["nginx"][analysis_id] return {"analysis": analysis_logs, "nginx": nginx_logs} From fe3839ab263068b21372aab25de189a6a0d5c1c4 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Fri, 7 Nov 2025 14:06:26 +0100 Subject: [PATCH 21/28] fix: restore access token dependency for history and logs routes Co-authored-by: Nightknight3000 --- src/api/api.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index 1b907e7..f638f17 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -63,22 +63,22 @@ def __init__(self, database: Database, namespace: str = 'default'): response_class=JSONResponse) router.add_api_route("/history", self.retrieve_all_history_call, - #dependencies=[Depends(valid_access_token)], + dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/history/{analysis_id}", self.retrieve_history_call, - #dependencies=[Depends(valid_access_token)], + dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/logs", self.retrieve_all_logs_call, - #dependencies=[Depends(valid_access_token)], + dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/logs/{analysis_id}", self.retrieve_logs_call, - #dependencies=[Depends(valid_access_token)], + dependencies=[Depends(valid_access_token)], methods=["GET"], response_class=JSONResponse) router.add_api_route("/status", From 5e2086e0c417f2b12719a5b288b519caaee45430 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Fri, 7 Nov 2025 14:18:28 +0100 Subject: [PATCH 22/28] fix: handle HubAPIError Co-authored-by: antidodo --- src/status/status.py | 4 +++- src/utils/hub_client.py | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index 2df5e96..4d38671 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -68,12 +68,14 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) if node_analysis_id is not None: node_analysis_ids[analysis_id] = node_analysis_id + else: + hub_client = None else: node_analysis_id = node_analysis_ids[analysis_id] # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") - if node_analysis_id: + if node_analysis_id is not None: analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 0576008..679f2fc 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -47,6 +47,7 @@ def init_hub_client_with_robot(robot_id: str, print(f"Failed to authenticate with hub python client library.\n{e}") return hub_client + @lru_cache def get_ssl_context() -> ssl.SSLContext: """Check if there are additional certificates present and if so, load them.""" @@ -56,10 +57,11 @@ def get_ssl_context() -> ssl.SSLContext: ctx.load_verify_locations(cafile=cert_path) return ctx + def get_node_id_by_robot(hub_client: flame_hub.CoreClient, robot_id: str) -> Optional[str]: try: node_id_object = hub_client.find_nodes(filter={"robot_id": robot_id})[0] - except (HTTPStatusError, JSONDecodeError, ConnectTimeout) as e: + except (HTTPStatusError, JSONDecodeError, ConnectTimeout, flame_hub._exceptions.HubAPIError) as e: print(f"Error in hub python client whilst retrieving node id object!\n{e}") node_id_object = None return str(node_id_object.id) if node_id_object is not None else None @@ -69,7 +71,7 @@ def get_node_analysis_id(hub_client: flame_hub.CoreClient, analysis_id: str, nod try: node_analyzes = hub_client.find_analysis_nodes(filter={"analysis_id": analysis_id, "node_id": node_id_object_id}) - except HTTPStatusError as e: + except (HTTPStatusError, flame_hub._exceptions.HubAPIError) as e: print(f"Error in hub python client whilst retrieving node analyzes!\n{e}") node_analyzes = None @@ -89,7 +91,7 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r if run_status == AnalysisStatus.STUCK.value: run_status = AnalysisStatus.FAILED.value hub_client.update_analysis_node(node_analysis_id, run_status=run_status) - except (HTTPStatusError, ConnectError) as e: + except (HTTPStatusError, ConnectError, flame_hub._exceptions.HubAPIError) as e: print(f"Failed to update hub status for node_analysis_id {node_analysis_id}.\n{e}") From 111ce94e5c3396fb6c5290c1954ff687de4c70e1 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Mon, 10 Nov 2025 12:25:40 +0100 Subject: [PATCH 23/28] refactor: add comments, streamline logs, and string quotations Co-authored-by: antidodo --- src/api/api.py | 2 +- src/api/oauth.py | 10 +++---- src/k8s/kubernetes.py | 44 +++++++++++++++---------------- src/resources/database/entity.py | 44 +++++++++++++++---------------- src/resources/log/entity.py | 6 ++--- src/resources/utils.py | 38 +++++++++++++-------------- src/status/status.py | 45 ++++++++++++++++++-------------- src/utils/docker.py | 40 ---------------------------- src/utils/hub_client.py | 24 ++++++++--------- src/utils/other.py | 4 +-- src/utils/token.py | 37 +++++++++++++------------- {src/test => tests}/test_db.py | 0 12 files changed, 131 insertions(+), 163 deletions(-) delete mode 100644 src/utils/docker.py rename {src/test => tests}/test_db.py (100%) diff --git a/src/api/api.py b/src/api/api.py index f638f17..5fb00b6 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -196,4 +196,4 @@ def health_call(self): if not main_alive: raise RuntimeError("Main thread is not alive.") else: - return {"status": "ok"} + return {'status': "ok"} diff --git a/src/api/oauth.py b/src/api/oauth.py index 4dd7e95..6adee59 100644 --- a/src/api/oauth.py +++ b/src/api/oauth.py @@ -7,20 +7,20 @@ from typing import Annotated oauth2_scheme = OAuth2AuthorizationCodeBearer( - tokenUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token", - authorizationUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/auth", - refreshUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token", + tokenUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token", + authorizationUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/auth", + refreshUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token", ) async def valid_access_token(token: Annotated[str, Depends(oauth2_scheme)]) -> dict: - url = os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/certs" + url = os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/certs" jwks_client = PyJWKClient(url) try: sig_key = jwks_client.get_signing_key_from_jwt(token) return jwt.decode(token, key=sig_key, - options={"verify_signature": True, "verify_aud": False, "exp": True}) + options={'verify_signature': True, 'verify_aud': False, 'exp': True}) except jwt.exceptions.InvalidTokenError: raise HTTPException(status_code=401, detail="Not authenticated") diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 089f975..5e56c4a 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -31,11 +31,11 @@ def create_harbor_secret(host_address: str, string_data={'docker-server': host_address, 'docker-username': user.replace('$', '\$'), 'docker-password': password, - '.dockerconfigjson': json.dumps({"auths": + '.dockerconfigjson': json.dumps({'auths': {host_address: - {"username": user, - "password": password, - "auth": base64.b64encode(f'{user}:{password}'.encode("ascii")).decode("ascii")}}})} + {'username': user, + 'password': password, + 'auth': base64.b64encode(f"{user}:{password}".encode("ascii")).decode("ascii")}}})} ) try: core_client.create_namespaced_secret(namespace=namespace, body=secret) @@ -47,7 +47,7 @@ def create_harbor_secret(host_address: str, if e.reason != 'Conflict': raise e else: - print('Conflict remains unresolved!') + print("Conflict remains unresolved!") raise e @@ -63,14 +63,14 @@ def create_analysis_deployment(name: str, period_seconds=20, failure_threshold=1, timeout_seconds=5) - container1 = client.V1Container(name=name, image=image, image_pull_policy="IfNotPresent", + container1 = client.V1Container(name=name, image=image, image_pull_policy='IfNotPresent', ports=[client.V1ContainerPort(PORTS['analysis'][0])], env=[client.V1EnvVar(name=key, value=val) for key, val in env.items()], #liveness_probe=liveness_probe, ) containers.append(container1) - labels = {'app': name, 'component': 'flame-analysis'} + labels = {'app': name, 'component': "flame-analysis"} depl_metadata = client.V1ObjectMeta(name=name, namespace=namespace, labels=labels) depl_pod_metadata = client.V1ObjectMeta(labels=labels) depl_selector = client.V1LabelSelector(match_labels=labels) @@ -103,48 +103,48 @@ def delete_resource(name: str, resource_type: str, namespace: str = 'default') - :param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap'). :param namespace: Namespace in which the resource exists. """ - print(f"Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"PO ACTION - Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") if resource_type == 'deployment': try: app_client = client.AppsV1Api() app_client.delete_namespaced_deployment(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found: {name} deployment") + print(f"Error: Not Found {name} deployment") elif resource_type == 'service': try: core_client = client.CoreV1Api() core_client.delete_namespaced_service(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found: {name} service") + print(f"Error: Not Found {name} service") elif resource_type == 'pod': try: core_client = client.CoreV1Api() core_client.delete_namespaced_pod(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found: {name} pod") + print(f"Error: Not Found {name} pod") elif resource_type == 'configmap': try: core_client = client.CoreV1Api() core_client.delete_namespaced_config_map(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found: {name} configmap") + print(f"Error: Not Found {name} configmap") elif resource_type == 'networkpolicy': try: network_client = client.NetworkingV1Api() network_client.delete_namespaced_network_policy(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found: {name} networkpolicy") + print(f"Error: Not Found {name} networkpolicy") else: raise ValueError(f"Unsupported resource type: {resource_type}") def delete_deployment(deployment_name: str, namespace: str = 'default') -> None: - print(f"Deleting deployment {deployment_name} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"PO ACTION - Deleting deployment {deployment_name} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") app_client = client.AppsV1Api() for name in [deployment_name, f'nginx-{deployment_name}']: try: @@ -152,19 +152,19 @@ def delete_deployment(deployment_name: str, namespace: str = 'default') -> None: _delete_service(name, namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found {name}") + print(f"Error: Not Found {name}") network_client = client.NetworkingV1Api() try: network_client.delete_namespaced_network_policy(name=f'nginx-to-{deployment_name}-policy', namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found nginx-to-{deployment_name}-policy") + print(f"Error: Not Found nginx-to-{deployment_name}-policy") core_client = client.CoreV1Api() try: core_client.delete_namespaced_config_map(name=f"nginx-{deployment_name}-config", namespace=namespace) except client.exceptions.ApiException as e: if e.reason != 'Not Found': - print(f"Not Found {deployment_name}-config") + print(f"Error: Not Found {deployment_name}-config") def get_analysis_logs(deployment_names: dict[str, str], @@ -177,18 +177,18 @@ def get_analysis_logs(deployment_names: dict[str, str], :param namespace: :return: """ - return {"analysis": {analysis_id: _get_logs(name=deployment_name, + return {'analysis': {analysis_id: _get_logs(name=deployment_name, pod_ids=database.get_deployment_pod_ids(deployment_name), namespace=namespace) for analysis_id, deployment_name in deployment_names.items()}, - "nginx": {analysis_id: _get_logs(name=f"nginx-{deployment_name}", + 'nginx': {analysis_id: _get_logs(name=f"nginx-{deployment_name}", namespace=namespace) for analysis_id, deployment_name in deployment_names.items()} } def delete_analysis_pods(deployment_name: str, project_id: str, namespace: str = 'default') -> None: - print(f"Deleting pods of deployment {deployment_name} in namespace {namespace} at " + print(f"PO ACTION - Deleting pods of deployment {deployment_name} in namespace {namespace} at " f"{time.strftime('%Y-%m-%d %H:%M:%S')}") core_client = client.CoreV1Api() # delete nginx deployment @@ -573,14 +573,14 @@ def _get_logs(name: str, pod_ids: Optional[list[str]] = None, namespace: str = ' pod_logs = [core_client.read_namespaced_pod_log(pod.metadata.name, namespace) for pod in pods.items if pod.metadata.name in pod_ids] except client.exceptions.ApiException as e: - print(e) + print(f"Error: APIException while trying to retrieve pod logs (pod_ids in list)\n{e}") return [] else: try: pod_logs = [core_client.read_namespaced_pod_log(pod.metadata.name, namespace) for pod in pods.items] except client.exceptions.ApiException as e: - print(e) + print(f"Error: APIException while trying to retrieve pod logs (pod_ids=None)\n{e}") return [] # sanitize pod logs diff --git a/src/resources/database/entity.py b/src/resources/database/entity.py index 93605e8..0d7ddef 100644 --- a/src/resources/database/entity.py +++ b/src/resources/database/entity.py @@ -11,12 +11,12 @@ class Database: def __init__(self) -> None: - host = os.getenv("POSTGRES_HOST") + host = os.getenv('POSTGRES_HOST') port = "5432" - user = os.getenv("POSTGRES_USER") - password = os.getenv("POSTGRES_PASSWORD") - database = os.getenv("POSTGRES_DB") - conn_uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}' + user = os.getenv('POSTGRES_USER') + password = os.getenv('POSTGRES_PASSWORD') + database = os.getenv('POSTGRES_DB') + conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" print(conn_uri) self.engine = create_engine(conn_uri, pool_pre_ping=True, @@ -31,11 +31,11 @@ def reset_db(self) -> None: def get_deployment(self, deployment_name: str) -> Optional[AnalysisDB]: with self.SessionLocal() as session: - return session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first() + return session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first() def get_latest_deployment(self, analysis_id: str) -> Optional[AnalysisDB]: with self.SessionLocal() as session: - deployments = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + deployments = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() if deployments: return deployments[-1] return None @@ -50,7 +50,7 @@ def analysis_is_running(self, analysis_id: str) -> bool: def get_deployments(self, analysis_id: str) -> list[AnalysisDB]: with self.SessionLocal() as session: - return session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + return session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() def create_analysis(self, analysis_id: str, @@ -86,7 +86,7 @@ def create_analysis(self, def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: with self.SessionLocal() as session: - analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() for deployment in analysis: if deployment: for key, value in kwargs.items(): @@ -97,7 +97,7 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]: def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB: with self.SessionLocal() as session: - deployment = session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first() + deployment = session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first() for key, value in kwargs.items(): setattr(deployment, key, value) session.commit() @@ -105,7 +105,7 @@ def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB: def delete_analysis(self, analysis_id: str) -> None: with self.SessionLocal() as session: - analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all() + analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all() for deployment in analysis: if deployment: session.delete(deployment) @@ -154,7 +154,7 @@ def update_analysis_status(self, analysis_id: str, status: str) -> None: self.update_analysis(analysis_id, status=status) def update_deployment_status(self, deployment_name: str, status: str) -> None: - print(f"Updating deployment {deployment_name} to status {status}") + print(f"PO ACTION - Updating deployment {deployment_name} to status {status}") self.update_deployment(deployment_name, status=status) def stop_analysis(self, analysis_id: str) -> None: @@ -164,19 +164,19 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]: analysis = self.get_deployments(analysis_id) if analysis: analysis = analysis[0] - return {"analysis_id": analysis.analysis_id, - "project_id": analysis.project_id, - "registry_url": analysis.registry_url, - "image_url": analysis.image_url, - "registry_user": analysis.registry_user, - "registry_password": analysis.registry_password, - "namespace": analysis.namespace, - "kong_token": analysis.kong_token, - "restart_counter": analysis.restart_counter} + return {'analysis_id': analysis.analysis_id, + 'project_id': analysis.project_id, + 'registry_url': analysis.registry_url, + 'image_url': analysis.image_url, + 'registry_user': analysis.registry_user, + 'registry_password': analysis.registry_password, + 'namespace': analysis.namespace, + 'kong_token': analysis.kong_token, + 'restart_counter': analysis.restart_counter} return None def delete_old_deployments_from_db(self, analysis_id: str) -> None: deployments = self.get_deployments(analysis_id) deployments = sorted(deployments, key=lambda x: x.time_created, reverse=True) for deployment in deployments[1:]: - self.delete_deployment(deployment.deployment_name) \ No newline at end of file + self.delete_deployment(deployment.deployment_name) diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 4f369f4..c1cc57f 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -10,7 +10,7 @@ class LogEntity(BaseModel): log: str - log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"] + log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug'] id: str = str(uuid.uuid4()) created_at: str = str(datetime.now()) @@ -21,7 +21,7 @@ def __str__(self) -> str: class CreateLogEntity(BaseModel): log: str - log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"] + log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug'] analysis_id: str status: str @@ -34,7 +34,7 @@ def to_log_entity(self) -> LogEntity: class CreateStartUpErrorLog(CreateLogEntity): def __init__(self, restart_num: int, - error_type: Literal["stuck", "slow", "k8s"], + error_type: Literal['stuck', 'slow', 'k8s'], analysis_id: str, status: str, k8s_error_msg: str = '') -> None: diff --git a/src/resources/utils.py b/src/resources/utils.py index 692ddf5..9930521 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -24,7 +24,7 @@ def create_analysis(body: Union[CreateAnalysis, str], database: Database) -> dic if isinstance(body, str): body = database.extract_analysis_body(body) if body is None: - return {"status": "Analysis ID not found in database."} + return {'status': "Analysis ID not found in database."} else: body = CreateAnalysis(**body) @@ -56,7 +56,7 @@ def retrieve_history(analysis_id_str: str, database: Database) -> dict[str, dict :param database: :return: """ - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -74,14 +74,14 @@ def retrieve_history(analysis_id_str: str, database: Database) -> dict[str, dict for analysis_id, deployment in deployments.items(): # interpret log string as a dictionary log = ast.literal_eval(deployment.log) - analysis_logs[analysis_id] = log["analysis"][analysis_id] - nginx_logs[analysis_id] = log["nginx"][analysis_id] + analysis_logs[analysis_id] = log['analysis'][analysis_id] + nginx_logs[analysis_id] = log['nginx'][analysis_id] - return {"analysis": analysis_logs, "nginx": nginx_logs} + return {'analysis': analysis_logs, 'nginx': nginx_logs} def retrieve_logs(analysis_id_str: str, database: Database) -> dict[str, dict[str, list[str]]]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -97,7 +97,7 @@ def retrieve_logs(analysis_id_str: str, database: Database) -> dict[str, dict[st def get_status(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -112,7 +112,7 @@ def get_status(analysis_id_str: str, database: Database) -> dict[str, str]: def get_pods(analysis_id_str: str, database: Database) -> dict[str, list[str]]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -120,7 +120,7 @@ def get_pods(analysis_id_str: str, database: Database) -> dict[str, list[str]]: def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -158,7 +158,7 @@ def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: def delete_analysis(analysis_id_str: str, database: Database) -> dict[str, str]: - if analysis_id_str == "all": + if analysis_id_str == 'all': analysis_ids = database.get_analysis_ids() else: analysis_ids = [analysis_id_str] @@ -190,7 +190,7 @@ def unstuck_analysis_deployments(analysis_id: str, database: Database) -> None: def cleanup(cleanup_type: str, database: Database, - namespace: str = "default") -> dict[str, str]: + namespace: str = 'default') -> dict[str, str]: cleanup_types = set(cleanup_type.split(',')) if ',' in cleanup_type else [cleanup_type] response_content = {} @@ -207,7 +207,7 @@ def cleanup(cleanup_type: str, # reinitialize message-broker pod message_broker_pod_name = get_k8s_resource_names('pod', 'label', - 'component=flame-message-broker', + "component=flame-message-broker", namespace=namespace) delete_resource(message_broker_pod_name, 'pod', namespace) response_content[cleanup_type] = "Reset message broker" @@ -215,7 +215,7 @@ def cleanup(cleanup_type: str, # reinitialize result-service pod result_service_name = get_k8s_resource_names('pod', 'label', - 'component=flame-result-service', + "component=flame-result-service", namespace=namespace) delete_resource(result_service_name, 'pod', namespace) response_content[cleanup_type] = "Reset result service" @@ -238,11 +238,11 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: known_analysis_ids = database.get_analysis_ids() result_str = "" - for res, (selector_args, max_r_split) in {'deployment': (['component=flame-analysis', 'component=flame-analysis-nginx'], 1), - 'pod': (['component=flame-analysis', 'component=flame-analysis-nginx'], 2), - 'service': (['component=flame-analysis', 'component=flame-analysis-nginx'], 1), - 'networkpolicy': (['component=flame-nginx-to-analysis-policy'], 2), - 'configmap': (['component=flame-nginx-analysis-config-map'], 2)}.items(): + for res, (selector_args, max_r_split) in {'deployment': (["component=flame-analysis", "component=flame-analysis-nginx"], 1), + 'pod': (["component=flame-analysis", "component=flame-analysis-nginx"], 2), + 'service': (["component=flame-analysis", "component=flame-analysis-nginx"], 1), + 'networkpolicy': (["component=flame-nginx-to-analysis-policy"], 2), + 'configmap': (["component=flame-nginx-analysis-config-map"], 2)}.items(): for selector_arg in selector_args: resources = get_k8s_resource_names(res, 'label', selector_arg, namespace=namespace) resources = [resources] if type(resources) == str else resources @@ -261,7 +261,7 @@ def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, h database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) #database.update_analysis_status(log_entity.analysis_id, log_entity.status) #TODO: Implement this? except IndexError as e: - print(f"Error updating analysis log in database: {e}") + print(f"Error: Failed to update analysis log in database\n{e}") # log to hub hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, diff --git a/src/status/status.py b/src/status/status.py index 4d38671..07ed677 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -20,7 +20,6 @@ from src.k8s.kubernetes import get_pod_status from src.status.constants import AnalysisStatus from src.utils.token import get_keycloak_token - from src.status.constants import _MAX_RESTARTS, _INTERNAL_STATUS_TIMEOUT @@ -52,17 +51,17 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_id = get_node_id_by_robot(hub_client, robot_id) # Catch unresponsive hub client if node_id is None: - print("Resetting hub client...") + print("PO ACTION - Resetting hub client...") hub_client = None continue else: # If running analyzes exist, enter status loop running_analyzes = [analysis_id for analysis_id in database.get_analysis_ids() if database.analysis_is_running(analysis_id)] - print(f"Checking for running analyzes...{running_analyzes}") + print(f"PO ACTION - Checking for running analyzes...{running_analyzes}") if running_analyzes: for analysis_id in running_analyzes: - print(f"Current analysis id: {analysis_id}") + print(f"PO STATUS LOOP - Current analysis id: {analysis_id}") # Get node analysis id if analysis_id not in node_analysis_ids.keys(): node_analysis_id = get_node_analysis_id(hub_client, analysis_id, node_id) @@ -76,34 +75,35 @@ def status_loop(database: Database, status_loop_interval: int) -> None: # If node analysis id found print(f"\tNode analysis id: {node_analysis_id}") if node_analysis_id is not None: + # Retrieve analysis status (skip iteration if analysis is not deployed) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue print(f"\tDatabase status: {analysis_status['db_status']}") print(f"\tInternal status: {analysis_status['int_status']}") - # Fix for stuck analyzes _fix_stuck_status(database, analysis_status, node_id, hub_client) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") + # Fix stuck analyzes - # Update created to running status if deployment responsive _update_running_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue print(f"\tUpdate created to running database status: {analysis_status['db_status']}") + # Update created to running status - # update running to finished status if analysis finished _update_finished_status(database, analysis_status) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue print(f"\tUpdate running to finished database status: {analysis_status['db_status']}") + # Update running to finished status - # update hub analysis status + # Submit analysis_status to hub analysis_hub_status = _set_analysis_hub_status(hub_client, node_analysis_id, analysis_status) print(f"\tSet Hub analysis status with node_analysis={node_analysis_id}, " f"db_status={analysis_status['db_status']}, " @@ -111,7 +111,7 @@ def status_loop(database: Database, status_loop_interval: int) -> None: f"to {analysis_hub_status}") time.sleep(status_loop_interval) - print(f"Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") + print(f"PO STATUS LOOP - Status loop iteration completed. Sleeping for {status_loop_interval} seconds.") def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]: analysis = database.get_latest_deployment(analysis_id) @@ -131,31 +131,35 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str: + # Attempt to retrieve internal analysis status via health endpoint start_time = time.time() while True: try: - response = await (AsyncClient(base_url=f'http://nginx-{deployment_name}:{PORTS["nginx"][0]}') - .get('/analysis/healthz', headers=[('Connection', 'close')])) + response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") + .get("/analysis/healthz", headers=[('Connection', 'close')])) response.raise_for_status() break except HTTPStatusError as e: - print(f"\tError getting internal deployment status: {e}") + print(f"\tError whilst retrieving internal deployment status: {e}") except ConnectError as e: print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}") except ConnectTimeout as e: print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}") elapsed_time = time.time() - start_time time.sleep(1) - if elapsed_time > _INTERNAL_STATUS_TIMEOUT: + if elapsed_time > _INTERNAL_STATUS_TIMEOUT: # TODO: Handle case of this happening for large images print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds") return AnalysisStatus.FAILED.value + # Extract fields from response analysis_status, analysis_token_remaining_time = (response.json()['status'], response.json()['token_remaining_time']) + # Check if token needs refresh, do so if needed await _refresh_keycloak_token(deployment_name=deployment_name, analysis_id=analysis_id, token_remaining_time=analysis_token_remaining_time) + # Map status from response to preset values if analysis_status == AnalysisStatus.FINISHED.value: health_status = AnalysisStatus.FINISHED.value elif analysis_status == AnalysisStatus.RUNNING.value: @@ -164,7 +168,6 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str health_status = AnalysisStatus.STUCK.value else: health_status = AnalysisStatus.FAILED.value - return health_status @@ -176,13 +179,13 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ if token_remaining_time < (int(os.getenv('STATUS_LOOP_INTERVAL')) * 2 + 1): keycloak_token = get_keycloak_token(analysis_id) try: - response = await (AsyncClient(base_url=f'http://nginx-{deployment_name}:{PORTS["nginx"][0]}') - .post('/analysis/token_refresh', - json={"token": keycloak_token}, + response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") + .post("/analysis/token_refresh", + json={'token': keycloak_token}, headers=[('Connection', 'close')])) response.raise_for_status() except HTTPStatusError as e: - print(f"Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") + print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}") def _fix_stuck_status(database: Database, @@ -213,20 +216,24 @@ def _stream_stuck_logs(analysis: AnalysisDB, database: Database, hub_client: flame_hub.CoreClient, is_slow: bool) -> None: + # If is_slow=True differentiate between slow, or kubernetes_error state, else assume stuck state is_k8s_related = False if is_slow: deployment_name = analysis.deployment_name + # Retrieve status of latest pod pod_status_dict = get_pod_status(deployment_name) if pod_status_dict is not None: _, pod_status_dict = list(pod_status_dict.items())[-1] ready, reason, message = pod_status_dict['ready'], pod_status_dict['reason'], pod_status_dict['message'] + # ready=True implicates slow state, else assume kubernetes_error state if not ready: is_k8s_related = True print(f"\tDeployment of analysis={analysis.analysis_id} failed (ready={ready}).\n" f"\t\t{reason}: {message}") + # Create and stream POAPIError logs or either slow, stuck, or kubernetes_error state to Hub stream_logs(CreateStartUpErrorLog(analysis.restart_counter, - ("k8s" if is_k8s_related else "slow") if is_slow else "stuck", + ('k8s' if is_k8s_related else 'slow') if is_slow else 'stuck', analysis.analysis_id, analysis.status, k8s_error_msg=reason if is_k8s_related else ''), diff --git a/src/utils/docker.py b/src/utils/docker.py deleted file mode 100644 index 2356406..0000000 --- a/src/utils/docker.py +++ /dev/null @@ -1,40 +0,0 @@ - - -#_CLIENT = docker.from_env() -_CLIENT = None -#_CLIENT.login( -# username=os.getenv("NODE_NAME"), -# password=os.getenv("HARBOR_PW"), -# registry=os.getenv("HARBOR_URL"), -#) - - -def download_image(image_registry_address: str) -> str: - _CLIENT.images.pull(image_registry_address) - return image_registry_address.split('/')[-1] + ':latest' - - -def validate_image(image_name: str, master_image_name: str) -> bool: - _ = download_image(master_image_name) - - master_image = _CLIENT.images.get(master_image_name) - image = _CLIENT.images.get(image_name) - - return _history_validation(image, master_image) and _image_valid(image, master_image) - - -def _history_validation(image, master_image) -> bool: - master_img_entry_ids = [ - {key: entry[key] for key in ["Created", "CreatedBy", "Size"]} - for entry in master_image.history() - ] - img_entry_ids = [ - {key: entry[key] for key in ["Created", "CreatedBy", "Size"]} - for entry in image.history() - ] - - return all([entry_dict in img_entry_ids for entry_dict in master_img_entry_ids]) - - -def _image_valid(image, master_image) -> bool: - return True diff --git a/src/utils/hub_client.py b/src/utils/hub_client.py index 679f2fc..64f4e08 100644 --- a/src/utils/hub_client.py +++ b/src/utils/hub_client.py @@ -4,7 +4,7 @@ from pathlib import Path from functools import lru_cache from json import JSONDecodeError -from typing import Optional, Tuple +from typing import Optional from httpx import (Client, HTTPTransport, HTTPStatusError, @@ -41,10 +41,10 @@ def init_hub_client_with_robot(robot_id: str, client = Client(base_url=hub_url_core, mounts=proxies, auth=hub_robot, verify=ssl_ctx) hub_client = flame_hub.CoreClient(client=client) - print("Hub client init successful") + print("PO ACTION - Hub client init successful") except Exception as e: hub_client = None - print(f"Failed to authenticate with hub python client library.\n{e}") + print(f"Error: Failed to authenticate with hub python client library.\n{e}") return hub_client @@ -60,19 +60,19 @@ def get_ssl_context() -> ssl.SSLContext: def get_node_id_by_robot(hub_client: flame_hub.CoreClient, robot_id: str) -> Optional[str]: try: - node_id_object = hub_client.find_nodes(filter={"robot_id": robot_id})[0] + node_id_object = hub_client.find_nodes(filter={'robot_id': robot_id})[0] except (HTTPStatusError, JSONDecodeError, ConnectTimeout, flame_hub._exceptions.HubAPIError) as e: - print(f"Error in hub python client whilst retrieving node id object!\n{e}") + print(f"Error: Failed to retrieve node id object from hub python client\n{e}") node_id_object = None return str(node_id_object.id) if node_id_object is not None else None def get_node_analysis_id(hub_client: flame_hub.CoreClient, analysis_id: str, node_id_object_id: str) -> Optional[str]: try: - node_analyzes = hub_client.find_analysis_nodes(filter={"analysis_id": analysis_id, - "node_id": node_id_object_id}) + node_analyzes = hub_client.find_analysis_nodes(filter={'analysis_id': analysis_id, + 'node_id': node_id_object_id}) except (HTTPStatusError, flame_hub._exceptions.HubAPIError) as e: - print(f"Error in hub python client whilst retrieving node analyzes!\n{e}") + print(f"Error: Failed to retrieve node analyzes from hub python client\n{e}") node_analyzes = None if node_analyzes: @@ -92,7 +92,7 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r run_status = AnalysisStatus.FAILED.value hub_client.update_analysis_node(node_analysis_id, run_status=run_status) except (HTTPStatusError, ConnectError, flame_hub._exceptions.HubAPIError) as e: - print(f"Failed to update hub status for node_analysis_id {node_analysis_id}.\n{e}") + print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}") def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: str) -> None: @@ -113,11 +113,11 @@ def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: s if node_analysis_id is not None: update_hub_status(hub_client, node_analysis_id, run_status=status) else: - print("Failed to retrieve node_analysis_id from hub client. Cannot update status.") + print("Error: Failed to retrieve node_analysis_id from hub client. Cannot update status.") else: - print("Failed to retrieve node_id from hub client. Cannot update status.") + print("Error: Failed to retrieve node_id from hub client. Cannot update status.") else: - print("Failed to initialize hub client. Cannot update status.") + print("Error: Failed to initialize hub client. Cannot update status.") # TODO: Import this from flame sdk? (from flamesdk import HUB_LOG_LITERALS) diff --git a/src/utils/other.py b/src/utils/other.py index 8516934..b954cd2 100644 --- a/src/utils/other.py +++ b/src/utils/other.py @@ -17,8 +17,8 @@ def get_project_data_source(keycloak_token, project_id, hub_adapter_service_name :return: """ client = AsyncClient(base_url=f"http://{hub_adapter_service_name}:5000", - headers={"Authorization": f"Bearer {keycloak_token}", - "accept": "application/json"}) + headers={'Authorization': f"Bearer {keycloak_token}", + 'accept': "application/json"}) return asyncio.run(call_sources(client, project_id)) diff --git a/src/utils/token.py b/src/utils/token.py index 776bea2..b906436 100644 --- a/src/utils/token.py +++ b/src/utils/token.py @@ -16,7 +16,9 @@ def get_keycloak_token(analysis_id: str) -> Optional[str]: client_secret = _get_keycloak_client_secret(analysis_id) keycloak_url = f"{_KEYCLOAK_URL}/realms/flame/protocol/openid-connect/token" - data = {"grant_type": "client_credentials", "client_id": analysis_id, "client_secret": client_secret} + data = {'grant_type': 'client_credentials', + 'client_id': analysis_id, + 'client_secret': client_secret} # get token from keycloak like in the above curl command try: @@ -25,7 +27,7 @@ def get_keycloak_token(analysis_id: str) -> Optional[str]: return response.json()['access_token'] except requests.exceptions.RequestException as e: - print(e) + print(f"Error: Failed to retrieve keycloak token\n{e}") return None @@ -38,7 +40,7 @@ def _get_keycloak_client_secret(analysis_id: str) -> str: # get client secret url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() @@ -53,9 +55,9 @@ def _get_keycloak_admin_token() -> str: # get admin token url_admin_access_token = f"{_KEYCLOAK_URL}/realms/{_KEYCLOAK_REALM}/protocol/openid-connect/token" data = { - "grant_type": "client_credentials", - "client_id": keycloak_admin_client_id, - "client_secret": keycloak_admin_client_secret + 'grant_type': 'client_credentials', + 'client_id': keycloak_admin_client_id, + 'client_secret': keycloak_admin_client_secret } response = requests.post(url_admin_access_token, data=data) response.raise_for_status() @@ -65,7 +67,7 @@ def _get_keycloak_admin_token() -> str: def _keycloak_client_exists(analysis_id: str, admin_token: str) -> bool: url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() @@ -75,11 +77,11 @@ def _keycloak_client_exists(analysis_id: str, admin_token: str) -> bool: def _create_keycloak_client(admin_token: str, analysis_id: str) -> None: url_create_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients" - headers = {"Authorization": f"Bearer {admin_token}", - "Content-Type": "application/json"} - client_data = {"clientId": f"{analysis_id}", - "name": f"flame-{analysis_id}", - "serviceAccountsEnabled": "true"} + headers = {'Authorization': f"Bearer {admin_token}", + 'Content-Type': "application/json"} + client_data = {'clientId': f"{analysis_id}", + 'name': f"flame-{analysis_id}", + 'serviceAccountsEnabled': 'true'} response = requests.post(url_create_client, headers=headers, json=client_data) response.raise_for_status() @@ -87,7 +89,7 @@ def _create_keycloak_client(admin_token: str, analysis_id: str) -> None: def _get_all_keycloak_clients() -> list[dict]: admin_token = _get_keycloak_admin_token() url_get_clients = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_clients, headers=headers) response.raise_for_status() @@ -99,19 +101,18 @@ def delete_keycloak_client(analysis_id: str) -> None: # get client uuid url_get_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients?clientId={analysis_id}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.get(url_get_client, headers=headers) response.raise_for_status() try: uuid = response.json()[0]['id'] - except (KeyError, IndexError): - print('keycloak Client not found') + except (KeyError, IndexError) as e: + print(f"Error: Keycloak client not found\n{e}") return url_delete_client = f"{_KEYCLOAK_URL}/admin/realms/{_KEYCLOAK_REALM}/clients/{uuid}" - headers = {"Authorization": f"Bearer {admin_token}"} + headers = {'Authorization': f"Bearer {admin_token}"} response = requests.delete(url_delete_client, headers=headers) response.raise_for_status() - diff --git a/src/test/test_db.py b/tests/test_db.py similarity index 100% rename from src/test/test_db.py rename to tests/test_db.py From c265b60c48e234bbd39655f9bb29481a6ad7b389 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Mon, 10 Nov 2025 12:29:32 +0100 Subject: [PATCH 24/28] feat: migrate action decisions into one function Co-authored-by: antidodo --- src/status/status.py | 130 +++++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 61 deletions(-) diff --git a/src/status/status.py b/src/status/status.py index 07ed677..e837f8d 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -82,26 +82,32 @@ def status_loop(database: Database, status_loop_interval: int) -> None: print(f"\tDatabase status: {analysis_status['db_status']}") print(f"\tInternal status: {analysis_status['int_status']}") - _fix_stuck_status(database, analysis_status, node_id, hub_client) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") # Fix stuck analyzes + if analysis_status['status_action'] == 'unstuck': + print(f"\tUnstuck analysis with internal status: {analysis_status['int_status']}") + _fix_stuck_status(database, analysis_status, node_id, hub_client) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue - _update_running_status(database, analysis_status) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUpdate created to running database status: {analysis_status['db_status']}") # Update created to running status + if analysis_status['status_action'] == 'running': + print(f"\tUpdate created-to-running database status: {analysis_status['db_status']}") + _update_running_status(database, analysis_status) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue - _update_finished_status(database, analysis_status) - analysis_status = _get_analysis_status(analysis_id, database) - if analysis_status is None: - continue - print(f"\tUpdate running to finished database status: {analysis_status['db_status']}") # Update running to finished status + if analysis_status['status_action'] == 'finishing': + print(f"\tUpdate running-to-finished database status: {analysis_status['db_status']}") + _update_finished_status(database, analysis_status) + # Update analysis status (skip iteration if analysis is not deployed) + analysis_status = _get_analysis_status(analysis_id, database) + if analysis_status is None: + continue # Submit analysis_status to hub analysis_hub_status = _set_analysis_hub_status(hub_client, node_analysis_id, analysis_status) @@ -123,9 +129,28 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[ int_status = AnalysisStatus.FINISHED.value else: int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id)) - return {"analysis_id": analysis_id, - "db_status": analysis.status, - "int_status": int_status} + return {'analysis_id': analysis_id, + 'db_status': analysis.status, + 'int_status': int_status, + 'status_action': _decide_status_action(analysis.status, int_status)} + else: + return None + + +def _decide_status_action(db_status: str, int_status: str) -> Optional[str]: + is_stuck = int_status == AnalysisStatus.STUCK.value + is_slow = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.FAILED.value])) + newly_running = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.RUNNING.value])) + speedy_finished = ((db_status in [AnalysisStatus.STARTED.value]) and (int_status in [AnalysisStatus.FINISHED.value])) + newly_ended = ((db_status in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value]) + and (int_status in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value])) + firmly_stuck = ((db_status in [AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.STUCK.value])) + if is_stuck or is_slow: + return 'unstuck' + elif newly_running: + return 'running' + elif speedy_finished or newly_ended or firmly_stuck: + return 'finishing' else: return None @@ -192,23 +217,18 @@ def _fix_stuck_status(database: Database, analysis_status: dict[str, str], node_id: str, hub_client: flame_hub.CoreClient) -> None: - # Deployment selection - is_stuck = analysis_status['int_status'] == AnalysisStatus.STUCK.value - is_slow = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.FAILED.value])) - - # Update Status - if is_stuck or is_slow: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) - - # Tracking restarts - if analysis.restart_counter < _MAX_RESTARTS: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) - unstuck_analysis_deployments(analysis_status["analysis_id"], database) - else: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, status=AnalysisStatus.FAILED.value) + is_slow = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and + (analysis_status['int_status'] in [AnalysisStatus.FAILED.value])) + + # Tracking restarts + if analysis.restart_counter < _MAX_RESTARTS: + _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + unstuck_analysis_deployments(analysis_status['analysis_id'], database) + else: + _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) def _stream_stuck_logs(analysis: AnalysisDB, @@ -243,35 +263,23 @@ def _stream_stuck_logs(analysis: AnalysisDB, def _update_running_status(database: Database, analysis_status: dict[str, str]) -> None: - newly_running = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.RUNNING.value])) - if newly_running: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, AnalysisStatus.RUNNING.value) + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, AnalysisStatus.RUNNING.value) def _update_finished_status(database: Database, analysis_status: dict[str, str]) -> None: - speedy_finished = ((analysis_status['db_status'] in [AnalysisStatus.STARTED.value]) and - (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value])) - newly_ended = ((analysis_status['db_status'] in [AnalysisStatus.RUNNING.value, - AnalysisStatus.FAILED.value]) - and (analysis_status['int_status'] in [AnalysisStatus.FINISHED.value, - AnalysisStatus.FAILED.value])) - firmly_stuck = ((analysis_status['db_status'] in [AnalysisStatus.FAILED.value]) - and (analysis_status['int_status'] in [AnalysisStatus.STUCK.value])) - if speedy_finished or newly_ended or firmly_stuck: - analysis = database.get_latest_deployment(analysis_status["analysis_id"]) - if analysis is not None: - database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) - if analysis_status['int_status'] == AnalysisStatus.FINISHED.value: - print("\tDelete deployment") - # TODO: final local log save (minio?) # archive logs - # delete_analysis(analysis_status['analysis_id'], database) # delete analysis from database - stop_analysis(analysis_status['analysis_id'], database) # stop analysis TODO: Change to delete in the future (when archive logs implemented) - else: - print("\tStop deployment") - stop_analysis(analysis_status['analysis_id'], database) # stop analysis + analysis = database.get_latest_deployment(analysis_status['analysis_id']) + if analysis is not None: + database.update_deployment_status(analysis.deployment_name, analysis_status['int_status']) + if analysis_status['int_status'] == AnalysisStatus.FINISHED.value: + print("\tDelete deployment") + # TODO: final local log save (minio?) # archive logs + # delete_analysis(analysis_status['analysis_id'], database) # delete analysis from database + stop_analysis(analysis_status['analysis_id'], database) # stop analysis TODO: Change to delete in the future (when archive logs implemented) + else: + print("\tStop deployment") + stop_analysis(analysis_status['analysis_id'], database) # stop analysis def _set_analysis_hub_status(hub_client: flame_hub.CoreClient, From ca70a9552374bf2a0d51446b08b1be383f0605e0 Mon Sep 17 00:00:00 2001 From: Nightknight3000 Date: Thu, 13 Nov 2025 12:38:08 +0100 Subject: [PATCH 25/28] feat: add find and delete support for jobs, migrate some functions to k8s.utils Co-authored-by: antidodo --- src/k8s/kubernetes.py | 127 ++++++++++++----------------------------- src/k8s/utils.py | 97 +++++++++++++++++++++++++------ src/main.py | 3 +- src/resources/utils.py | 30 +++++----- 4 files changed, 132 insertions(+), 125 deletions(-) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index cdabc9b..e9d69de 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -4,10 +4,10 @@ from typing import Optional import string -from kubernetes import client, config +from kubernetes import client from src.resources.database.entity import Database -from src.k8s.utils import get_k8s_resource_names +from src.k8s.utils import find_k8s_resources, delete_k8s_resource PORTS = {'nginx': [80], @@ -15,10 +15,6 @@ 'service': [80]} -def load_cluster_config(): - config.load_incluster_config() - - def create_harbor_secret(host_address: str, user: str, password: str, @@ -96,53 +92,6 @@ def create_analysis_deployment(name: str, return _get_pods(name) -def delete_resource(name: str, resource_type: str, namespace: str = 'default') -> None: - """ - Deletes a Kubernetes resource by name and type. - :param name: Name of the resource to delete. - :param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap'). - :param namespace: Namespace in which the resource exists. - """ - print(f"PO ACTION - Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") - if resource_type == 'deployment': - try: - app_client = client.AppsV1Api() - app_client.delete_namespaced_deployment(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Error: Not Found {name} deployment") - elif resource_type == 'service': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_service(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Error: Not Found {name} service") - elif resource_type == 'pod': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_pod(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Error: Not Found {name} pod") - elif resource_type == 'configmap': - try: - core_client = client.CoreV1Api() - core_client.delete_namespaced_config_map(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Error: Not Found {name} configmap") - elif resource_type == 'networkpolicy': - try: - network_client = client.NetworkingV1Api() - network_client.delete_namespaced_network_policy(name=name, namespace=namespace) - except client.exceptions.ApiException as e: - if e.reason != 'Not Found': - print(f"Error: Not Found {name} networkpolicy") - else: - raise ValueError(f"Unsupported resource type: {resource_type}") - - def delete_deployment(deployment_name: str, namespace: str = 'default') -> None: print(f"PO ACTION - Deleting deployment {deployment_name} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") app_client = client.AppsV1Api() @@ -192,25 +141,25 @@ def delete_analysis_pods(deployment_name: str, project_id: str, namespace: str = f"{time.strftime('%Y-%m-%d %H:%M:%S')}") core_client = client.CoreV1Api() # delete nginx deployment - delete_resource(f'nginx-{deployment_name}', 'deployment', namespace) - delete_resource(f'nginx-{deployment_name}', 'service', namespace) - delete_resource(f'nginx-{deployment_name}-config', 'configmap', namespace) + delete_k8s_resource(f'nginx-{deployment_name}', 'deployment', namespace) + delete_k8s_resource(f'nginx-{deployment_name}', 'service', namespace) + delete_k8s_resource(f'nginx-{deployment_name}-config', 'configmap', namespace) - # get pods in deployment + # get pods in deployment # TODO: Might've become redundant with changes introduced to deployment deletion using propagation_policy='Foreground' in k8s.utils (13.11.25) pods = core_client.list_namespaced_pod(namespace=namespace, label_selector=f'app={deployment_name}').items for pod in pods: - delete_resource(pod.metadata.name, 'pod', namespace) + delete_k8s_resource(pod.metadata.name, 'pod', namespace) # delete network policy - delete_resource(f'nginx-to-{deployment_name}-policy', 'networkpolicy', namespace) + delete_k8s_resource(f'nginx-to-{deployment_name}-policy', 'networkpolicy', namespace) # create new nginx deployment and policy _create_analysis_nginx_deployment(analysis_name=deployment_name, - analysis_service_name=get_k8s_resource_names('service', - 'label', - f'app={deployment_name}', - namespace=namespace), + analysis_service_name=find_k8s_resources('service', + 'label', + f'app={deployment_name}', + namespace=namespace), analysis_env={'PROJECT_ID': project_id, 'ANALYSIS_ID': deployment_name.split('analysis-')[-1].rsplit('-', 1)[0]}, namespace=namespace) @@ -323,16 +272,16 @@ def _create_nginx_config_map(analysis_name: str, core_client = client.CoreV1Api() # get the service name of the message broker - message_broker_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-message-broker', - namespace=namespace) - - # await and get the pod id and name of the message broker - message_broker_pod_name = get_k8s_resource_names('pod', + message_broker_service_name = find_k8s_resources('service', 'label', 'component=flame-message-broker', namespace=namespace) + + # await and get the pod id and name of the message broker + message_broker_pod_name = find_k8s_resources('pod', + 'label', + 'component=flame-message-broker', + namespace=namespace) message_broker_pod = None while message_broker_pod is None: try: @@ -345,16 +294,16 @@ def _create_nginx_config_map(analysis_name: str, time.sleep(1) # get the service name of the pod orchestrator - po_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-po', - namespace=namespace) + po_service_name = find_k8s_resources('service', + 'label', + 'component=flame-po', + namespace=namespace) # await and get the pod ip and name of the pod orchestrator - pod_orchestration_name = get_k8s_resource_names('pod', - 'label', - 'component=flame-po', - namespace=namespace) + pod_orchestration_name = find_k8s_resources('pod', + 'label', + 'component=flame-po', + namespace=namespace) pod_orchestration_pod = None while pod_orchestration_pod is None: try: @@ -378,19 +327,19 @@ def _create_nginx_config_map(analysis_name: str, time.sleep(1) # get the name of the hub adapter, kong proxy, and result service - hub_adapter_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-hub-adapter', - namespace=namespace) - kong_proxy_name = get_k8s_resource_names('service', + hub_adapter_service_name = find_k8s_resources('service', + 'label', + 'component=flame-hub-adapter', + namespace=namespace) + kong_proxy_name = find_k8s_resources('service', + 'label', + 'app.kubernetes.io/name=kong', + manual_name_selector='proxy', + namespace=namespace) + result_service_name = find_k8s_resources('service', 'label', - 'app.kubernetes.io/name=kong', - manual_name_selector='proxy', + 'component=flame-result-service', namespace=namespace) - result_service_name = get_k8s_resource_names('service', - 'label', - 'component=flame-result-service', - namespace=namespace) # generate config map data = { diff --git a/src/k8s/utils.py b/src/k8s/utils.py index 7e3fbd9..936b009 100644 --- a/src/k8s/utils.py +++ b/src/k8s/utils.py @@ -1,16 +1,32 @@ +import time from typing import Literal, Optional, Union -from kubernetes import client +from kubernetes import config, client -def get_k8s_resource_names(resource_type: str, - selector_type: Optional[Literal['label', 'field']] = None, - selector_arg: Optional[str] = None, - manual_name_selector: Optional[str] = None, - namespace: str = "default") -> Optional[Union[str, list[str]]]: - if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap']: +def load_cluster_config(): + config.load_incluster_config() + + +def get_current_namespace() -> str: + namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' + try: + with open(namespace_file, 'r') as file: + return file.read().strip() + # Handle the case where the file is not found + except FileNotFoundError: + # Fallback to a default namespace if the file is not found + return 'default' + + +def find_k8s_resources(resource_type: str, + selector_type: Optional[Literal['label', 'field']] = None, + selector_arg: Optional[str] = None, + manual_name_selector: Optional[str] = None, + namespace: str = "default") -> Optional[Union[str, list[str]]]: + if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap', 'job']: raise ValueError("For k8s resource search: resource_type must be one of 'deployment', 'pod', 'service', " - "'networkpolicy', or 'configmap'") + "'networkpolicy', 'configmap', or 'job") if (selector_type is not None) and (selector_type not in ['label', 'field']): raise ValueError("For k8s resource search: selector_type must be either 'label' or 'field'") if (selector_type is not None) and (selector_arg is None): @@ -19,7 +35,7 @@ def get_k8s_resource_names(resource_type: str, kwargs = {'namespace': namespace} if selector_type: kwargs[f'{selector_type}_selector'] = selector_arg - + if resource_type == 'deployment': resources = client.AppsV1Api().list_namespaced_deployment(**kwargs) elif resource_type == 'networkpolicy': @@ -32,6 +48,8 @@ def get_k8s_resource_names(resource_type: str, resources = core_client.list_namespaced_service(**kwargs) elif resource_type == 'configmap': resources = core_client.list_namespaced_config_map(**kwargs) + elif resource_type == 'job': + resources = client.BatchV1Api().list_namespaced_job(**kwargs) else: raise ValueError(f"Uncaptured resource type discovered! Message the Devs... (found={resource_type})") @@ -52,12 +70,55 @@ def get_k8s_resource_names(resource_type: str, return None -def get_current_namespace() -> str: - namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' - try: - with open(namespace_file, 'r') as file: - return file.read().strip() - # Handle the case where the file is not found - except FileNotFoundError: - # Fallback to a default namespace if the file is not found - return 'default' +def delete_k8s_resource(name: str, resource_type: str, namespace: str = 'default') -> None: + """ + Deletes a Kubernetes resource by name and type. + :param name: Name of the resource to delete. + :param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap', 'job'). + :param namespace: Namespace in which the resource exists. + """ + print(f"PO ACTION - Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + if resource_type == 'deployment': + try: + app_client = client.AppsV1Api() + app_client.delete_namespaced_deployment(name=name, namespace=namespace, propagation_policy='Foreground') + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} deployment") + elif resource_type == 'service': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_service(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} service") + elif resource_type == 'pod': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_pod(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} pod") + elif resource_type == 'configmap': + try: + core_client = client.CoreV1Api() + core_client.delete_namespaced_config_map(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} configmap") + elif resource_type == 'networkpolicy': + try: + network_client = client.NetworkingV1Api() + network_client.delete_namespaced_network_policy(name=name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} networkpolicy") + elif resource_type == 'job': + try: + batch_client = client.BatchV1Api() + batch_client.delete_namespaced_job(name=name, namespace=namespace, propagation_policy='Foreground') + except client.exceptions.ApiException as e: + if e.reason != 'Not Found': + print(f"Error: Not Found {name} job") + else: + raise ValueError(f"Unsupported resource type: {resource_type}") diff --git a/src/main.py b/src/main.py index 37e3240..c8331cc 100644 --- a/src/main.py +++ b/src/main.py @@ -5,8 +5,7 @@ from src.resources.database.entity import Database from src.api.api import PodOrchestrationAPI -from src.k8s.kubernetes import load_cluster_config -from src.k8s.utils import get_current_namespace +from src.k8s.utils import get_current_namespace, load_cluster_config from src.status.status import status_loop diff --git a/src/resources/utils.py b/src/resources/utils.py index cd26e9d..9a18fa4 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -8,10 +8,8 @@ from src.resources.analysis.entity import Analysis, CreateAnalysis, read_db_analysis from src.resources.log.entity import CreateLogEntity from src.status.constants import AnalysisStatus -from src.k8s.kubernetes import (create_harbor_secret, - get_analysis_logs, - delete_resource) -from src.k8s.utils import get_current_namespace, get_k8s_resource_names +from src.k8s.kubernetes import create_harbor_secret, get_analysis_logs +from src.k8s.utils import get_current_namespace, find_k8s_resources, delete_k8s_resource from src.utils.token import _get_all_keycloak_clients from src.utils.token import delete_keycloak_client from src.utils.hub_client import init_hub_client_and_update_hub_status_with_robot @@ -205,19 +203,19 @@ def cleanup(cleanup_type: str, # Service cleanup/reinit if cleanup_type in ['all', 'services', 'mb']: # reinitialize message-broker pod - message_broker_pod_name = get_k8s_resource_names('pod', - 'label', - "component=flame-message-broker", - namespace=namespace) - delete_resource(message_broker_pod_name, 'pod', namespace) + message_broker_pod_name = find_k8s_resources('pod', + 'label', + "component=flame-message-broker", + namespace=namespace) + delete_k8s_resource(message_broker_pod_name, 'pod', namespace) response_content[cleanup_type] = "Reset message broker" if cleanup_type in ['all', 'services', 'rs']: # reinitialize result-service pod - result_service_name = get_k8s_resource_names('pod', - 'label', - "component=flame-result-service", - namespace=namespace) - delete_resource(result_service_name, 'pod', namespace) + result_service_name = find_k8s_resources('pod', + 'label', + "component=flame-result-service", + namespace=namespace) + delete_k8s_resource(result_service_name, 'pod', namespace) response_content[cleanup_type] = "Reset result service" if cleanup_type in ['all', 'keycloak']: # cleanup keycloak clients without corresponding analysis @@ -244,13 +242,13 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: 'networkpolicy': (["component=flame-nginx-to-analysis-policy"], 2), 'configmap': (["component=flame-nginx-analysis-config-map"], 2)}.items(): for selector_arg in selector_args: - resources = get_k8s_resource_names(res, 'label', selector_arg, namespace=namespace) + resources = find_k8s_resources(res, 'label', selector_arg, namespace=namespace) resources = [resources] if type(resources) == str else resources if resources is not None: zombie_resources = [r for r in resources if resource_name_to_analysis(r, max_r_split) not in known_analysis_ids] for z in zombie_resources: - delete_resource(z, res, namespace=namespace) + delete_k8s_resource(z, res, namespace=namespace) result_str += f"Deleted {len(zombie_resources)} zombie " + \ f"{'' if '-nginx' not in selector_arg else 'nginx-'}{res}s\n" return result_str From c6760a7ecaefb7cf6d45549ab8d14211d1d76a72 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Thu, 13 Nov 2025 13:24:38 +0100 Subject: [PATCH 26/28] refactor: remove redundant delete_analysis_pods function and related logic Co-authored-by: Nightknight3000 --- src/k8s/kubernetes.py | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index e9d69de..8bfa3f9 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -136,35 +136,6 @@ def get_analysis_logs(deployment_names: dict[str, str], } -def delete_analysis_pods(deployment_name: str, project_id: str, namespace: str = 'default') -> None: - print(f"PO ACTION - Deleting pods of deployment {deployment_name} in namespace {namespace} at " - f"{time.strftime('%Y-%m-%d %H:%M:%S')}") - core_client = client.CoreV1Api() - # delete nginx deployment - delete_k8s_resource(f'nginx-{deployment_name}', 'deployment', namespace) - delete_k8s_resource(f'nginx-{deployment_name}', 'service', namespace) - delete_k8s_resource(f'nginx-{deployment_name}-config', 'configmap', namespace) - - - # get pods in deployment # TODO: Might've become redundant with changes introduced to deployment deletion using propagation_policy='Foreground' in k8s.utils (13.11.25) - pods = core_client.list_namespaced_pod(namespace=namespace, label_selector=f'app={deployment_name}').items - for pod in pods: - delete_k8s_resource(pod.metadata.name, 'pod', namespace) - - # delete network policy - delete_k8s_resource(f'nginx-to-{deployment_name}-policy', 'networkpolicy', namespace) - - # create new nginx deployment and policy - _create_analysis_nginx_deployment(analysis_name=deployment_name, - analysis_service_name=find_k8s_resources('service', - 'label', - f'app={deployment_name}', - namespace=namespace), - analysis_env={'PROJECT_ID': project_id, - 'ANALYSIS_ID': deployment_name.split('analysis-')[-1].rsplit('-', 1)[0]}, - namespace=namespace) - - def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional[dict[str, dict[str, str]]]: core_client = client.CoreV1Api() From 73e674ea99e10b393062d210554b3fc27bd41ea6 Mon Sep 17 00:00:00 2001 From: davidhieber Date: Tue, 3 Mar 2026 11:03:47 +0100 Subject: [PATCH 27/28] feat: enable toggling of hub log forwarding Co-authored-by: Nightknight3000 --- src/api/api.py | 11 ++++------- src/resources/utils.py | 15 ++++++++------- src/status/status.py | 31 +++++++++++++++---------------- src/utils/other.py | 11 +++++++++++ 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/api/api.py b/src/api/api.py index 7842adc..2a25f3c 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -6,6 +6,7 @@ from fastapi.responses import JSONResponse from src.utils.hub_client import init_hub_client_with_robot, get_node_id_by_robot +from src.utils.other import extract_hub_envs from src.api.oauth import valid_access_token from src.resources.database.entity import Database from src.resources.analysis.entity import CreateAnalysis @@ -24,13 +25,9 @@ class PodOrchestrationAPI: def __init__(self, database: Database, namespace: str = 'default'): self.database = database - robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy = (os.getenv('HUB_ROBOT_USER'), - os.getenv('HUB_ROBOT_SECRET'), - os.getenv('HUB_URL_CORE'), - os.getenv('HUB_URL_AUTH'), - os.getenv('PO_HTTP_PROXY'), - os.getenv('PO_HTTPS_PROXY')) + robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() + self.enable_hub_logging = enable_hub_logging self.hub_core_client = init_hub_client_with_robot(robot_id, robot_secret, hub_url_core, @@ -229,7 +226,7 @@ def cleanup_call(self, cleanup_type: str): def stream_logs_call(self, body: CreateLogEntity): try: - return stream_logs(body, self.node_id, self.database, self.hub_core_client) + return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client) except Exception as e: raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}") diff --git a/src/resources/utils.py b/src/resources/utils.py index e8ec615..c91d04e 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -225,7 +225,7 @@ def cleanup(cleanup_type: str, # if all is all flame clients are deleted because ther are no analyzes in the db analysis_ids = database.get_analysis_ids() for client in _get_all_keycloak_clients(): - if client['clientId'] not in analysis_ids and client['name'].startswith('flame-'): + if (client['clientId'] not in analysis_ids) and client['name'].startswith('flame-'): delete_keycloak_client(client['clientId']) else: @@ -257,7 +257,7 @@ def clean_up_the_rest(database: Database, namespace: str = 'default') -> str: return result_str -def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, hub_core_client: CoreClient) -> None: +def stream_logs(log_entity: CreateLogEntity, node_id: str, enable_hub_logging: bool, database: Database, hub_core_client: CoreClient) -> None: try: database.update_analysis_log(log_entity.analysis_id, str(log_entity.to_log_entity())) #database.update_analysis_status(log_entity.analysis_id, log_entity.status) #TODO: Implement this? @@ -265,8 +265,9 @@ def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, h print(f"Error updating analysis log in database: {e}") # log to hub - hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, - node_id=node_id, - status=log_entity.status, - level=log_entity.log_type, - message=log_entity.log) + if enable_hub_logging: + hub_core_client.create_analysis_node_log(analysis_id=log_entity.analysis_id, + node_id=node_id, + status=log_entity.status, + level=log_entity.log_type, + message=log_entity.log) diff --git a/src/status/status.py b/src/status/status.py index b854c76..2566ae5 100644 --- a/src/status/status.py +++ b/src/status/status.py @@ -6,19 +6,19 @@ import flame_hub -from src.k8s.kubernetes import PORTS +from src.k8s.kubernetes import PORTS, get_pod_status from src.resources.database.entity import Database, AnalysisDB -from src.utils.hub_client import (init_hub_client_with_robot, - get_node_id_by_robot, - get_node_analysis_id, - update_hub_status) +from src.resources.log.entity import CreateStartUpErrorLog from src.resources.utils import (unstuck_analysis_deployments, stop_analysis, delete_analysis, stream_logs) -from src.resources.log.entity import CreateStartUpErrorLog -from src.k8s.kubernetes import get_pod_status +from src.utils.hub_client import (init_hub_client_with_robot, + get_node_id_by_robot, + get_node_analysis_id, + update_hub_status) from src.status.constants import AnalysisStatus +from src.utils.other import extract_hub_envs from src.utils.token import get_keycloak_token from src.status.constants import _MAX_RESTARTS, _INTERNAL_STATUS_TIMEOUT @@ -34,12 +34,8 @@ def status_loop(database: Database, status_loop_interval: int) -> None: node_id = None node_analysis_ids = {} - robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy = (os.getenv('HUB_ROBOT_USER'), - os.getenv('HUB_ROBOT_SECRET'), - os.getenv('HUB_URL_CORE'), - os.getenv('HUB_URL_AUTH'), - os.getenv('PO_HTTP_PROXY'), - os.getenv('PO_HTTPS_PROXY')) + robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs() + # Enter lifecycle loop while True: if hub_client is None: @@ -87,7 +83,7 @@ def status_loop(database: Database, status_loop_interval: int) -> None: print(f"\tInternal status: {analysis_status['int_status']}") # Fix for stuck analyzes - _fix_stuck_status(database, analysis_status, node_id, hub_client) + _fix_stuck_status(database, analysis_status, node_id, enable_hub_logging, hub_client) analysis_status = _get_analysis_status(analysis_id, database) if analysis_status is None: continue @@ -193,6 +189,7 @@ async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_ def _fix_stuck_status(database: Database, analysis_status: dict[str, str], node_id: str, + enable_hub_logging: bool, hub_client: flame_hub.CoreClient) -> None: # Deployment selection is_stuck = analysis_status['int_status'] == AnalysisStatus.STUCK.value @@ -207,14 +204,15 @@ def _fix_stuck_status(database: Database, # Tracking restarts if analysis.restart_counter < _MAX_RESTARTS: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + _stream_stuck_logs(analysis, node_id, enable_hub_logging, database, hub_client, is_slow) unstuck_analysis_deployments(analysis_status["analysis_id"], database) else: - _stream_stuck_logs(analysis, node_id, database, hub_client, is_slow) + _stream_stuck_logs(analysis, node_id, enable_hub_logging, database, hub_client, is_slow) def _stream_stuck_logs(analysis: AnalysisDB, node_id: str, + enable_hub_logging: bool, database: Database, hub_client: flame_hub.CoreClient, is_slow: bool) -> None: @@ -236,6 +234,7 @@ def _stream_stuck_logs(analysis: AnalysisDB, analysis.status, k8s_error_msg=reason if is_k8s_related else ''), node_id, + enable_hub_logging, database, hub_client) diff --git a/src/utils/other.py b/src/utils/other.py index 8516934..7563050 100644 --- a/src/utils/other.py +++ b/src/utils/other.py @@ -1,5 +1,16 @@ from httpx import AsyncClient import asyncio +import os + + +def extract_hub_envs() -> tuple[str, str, str, str, bool, str, str]: + return (os.getenv('HUB_ROBOT_USER'), + os.getenv('HUB_ROBOT_SECRET'), + os.getenv('HUB_URL_CORE'), + os.getenv('HUB_URL_AUTH'), + os.getenv('HUB_LOGGING') in ['True', 'true', '1', 't'], + os.getenv('PO_HTTP_PROXY'), + os.getenv('PO_HTTPS_PROXY')) def resource_name_to_analysis(deployment_name: str, max_r_split: int = 1) -> str: From 000f999365beb7e68bc0517b797a168ba99c631c Mon Sep 17 00:00:00 2001 From: davidhieber Date: Wed, 4 Mar 2026 14:14:56 +0100 Subject: [PATCH 28/28] refactor: update references from result-service to storage-service Co-authored-by: Nightknight3000 --- src/k8s/kubernetes.py | 10 +++++----- src/resources/utils.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index c77cf24..243e226 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -304,7 +304,7 @@ def _create_nginx_config_map(analysis_name: str, analysis_ip = pod_list_object.items[0].status.pod_ip time.sleep(1) - # get the name of the hub adapter, kong proxy, and result service + # get the name of the hub adapter, kong proxy, and storage service hub_adapter_service_name = find_k8s_resources('service', 'label', 'component=flame-hub-adapter', @@ -314,9 +314,9 @@ def _create_nginx_config_map(analysis_name: str, 'app.kubernetes.io/name=kong', manual_name_selector='proxy', namespace=namespace) - result_service_name = find_k8s_resources('service', + storage_service_name = find_k8s_resources('service', 'label', - 'component=flame-result-service', + 'component=flame-storage-service', namespace=namespace) # generate config map @@ -354,10 +354,10 @@ def _create_nginx_config_map(analysis_name: str, }} - # egress: analysis deployment to result-service + # egress: analysis deployment to storage-service location ~ ^/storage/(final|local|intermediate)/ {{ rewrite ^/storage(/.*) $1 break; - proxy_pass http://{result_service_name}:8080; + proxy_pass http://{storage_service_name}:8080; allow {analysis_ip}; deny all; }} diff --git a/src/resources/utils.py b/src/resources/utils.py index 704e578..fb481ab 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -210,13 +210,13 @@ def cleanup(cleanup_type: str, delete_k8s_resource(message_broker_pod_name, 'pod', namespace) response_content[cleanup_type] = "Reset message broker" if cleanup_type in ['all', 'services', 'rs']: - # reinitialize result-service pod - result_service_name = find_k8s_resources('pod', + # reinitialize storage-service pod + storage_service_name = find_k8s_resources('pod', 'label', - "component=flame-result-service", + "component=flame-storage-service", namespace=namespace) - delete_k8s_resource(result_service_name, 'pod', namespace) - response_content[cleanup_type] = "Reset result service" + delete_k8s_resource(storage_service_name, 'pod', namespace) + response_content[cleanup_type] = "Reset storage service" if cleanup_type in ['all', 'keycloak']: # cleanup keycloak clients without corresponding analysis # if all is all flame clients are deleted because ther are no analyzes in the db