Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.api.oauth import valid_access_token
from src.resources.database.entity import Database
from src.resources.analysis.entity import CreateAnalysis
from src.resources.log.entity import CreateLogEntity
from src.resources.log.entity import CreateLogEntity, AnalysisStoppedLog
from src.resources.utils import (create_analysis,
retrieve_history,
retrieve_logs,
Expand All @@ -28,13 +28,13 @@ def __init__(self, database: Database, namespace: str = 'default'):
robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()

self.enable_hub_logging = enable_hub_logging
self.hub_core_client = init_hub_client_with_robot(robot_id,
robot_secret,
hub_url_core,
hub_auth,
http_proxy,
https_proxy)
self.node_id = get_node_id_by_robot(self.hub_core_client, robot_id) if self.hub_core_client else None
self.hub_client = init_hub_client_with_robot(robot_id,
robot_secret,
hub_url_core,
hub_auth,
http_proxy,
https_proxy)
self.node_id = get_node_id_by_robot(self.hub_client, robot_id) if self.hub_client else None
self.namespace = namespace
app = FastAPI(title="FLAME PO",
docs_url="/api/docs",
Expand Down Expand Up @@ -196,13 +196,26 @@ def get_pods_call(self, analysis_id: str):

def stop_all_analysis_call(self):
try:
return stop_analysis('all', self.database)
response = stop_analysis('all', self.database)
for analysis_id in self.database.get_analysis_ids():
stream_logs(AnalysisStoppedLog(analysis_id),
self.node_id,
self.enable_hub_logging,
self.database,
self.hub_client)
Comment on lines +199 to +205
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only emit stop logs for analyses that actually transitioned to stopped.

database.get_analysis_ids() is treated elsewhere as the full set and filtered through database.analysis_is_running() in src/status/status.py Lines 55-56. Iterating it here after stop_analysis('all', ...) will also append AnalysisStoppedLog entries to finished/failed analyses, and stream_logs() can then propagate a bogus stopped lifecycle event.

Proposed fix
-            response = stop_analysis('all', self.database)
-            for analysis_id in self.database.get_analysis_ids():
+            analysis_ids = [
+                analysis_id
+                for analysis_id in self.database.get_analysis_ids()
+                if self.database.analysis_is_running(analysis_id)
+            ]
+            response = stop_analysis('all', self.database)
+            for analysis_id in analysis_ids:
                 stream_logs(AnalysisStoppedLog(analysis_id),
                             self.node_id,
                             self.enable_hub_logging,
                             self.database,
                             self.hub_client)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
response = stop_analysis('all', self.database)
for analysis_id in self.database.get_analysis_ids():
stream_logs(AnalysisStoppedLog(analysis_id),
self.node_id,
self.enable_hub_logging,
self.database,
self.hub_client)
analysis_ids = [
analysis_id
for analysis_id in self.database.get_analysis_ids()
if self.database.analysis_is_running(analysis_id)
]
response = stop_analysis('all', self.database)
for analysis_id in analysis_ids:
stream_logs(AnalysisStoppedLog(analysis_id),
self.node_id,
self.enable_hub_logging,
self.database,
self.hub_client)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/api.py` around lines 199 - 205, Before calling stop_analysis('all',
...), capture the set of analyses that were actually running by calling
database.get_analysis_ids() and filtering with
database.analysis_is_running(analysis_id) (e.g., running_ids_before = [id for id
in database.get_analysis_ids() if database.analysis_is_running(id)]); then call
stop_analysis('all', self.database) and iterate only over running_ids_before
(optionally re-check database.analysis_is_running(analysis_id) is now False)
when creating and streaming AnalysisStoppedLog via stream_logs to ensure you
only emit stopped logs for analyses that truly transitioned to stopped.

return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error stopping ALL analyzes: {e}")

def stop_analysis_call(self, analysis_id: str):
try:
return stop_analysis(analysis_id, self.database)
response = stop_analysis(analysis_id, self.database)
stream_logs(AnalysisStoppedLog(analysis_id),
self.node_id,
self.enable_hub_logging,
self.database,
self.hub_client)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error stopping analysis: {e}")

Expand All @@ -226,7 +239,7 @@ def cleanup_call(self, cleanup_type: str):

def stream_logs_call(self, body: CreateLogEntity):
try:
return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client)
return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_client)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}")

Expand Down
2 changes: 1 addition & 1 deletion src/k8s/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def _create_nginx_config_map(analysis_name: str,
namespace=namespace)
storage_service_name = find_k8s_resources('service',
'label',
'component=flame-result-service',
'component=flame-storage-service',
namespace=namespace)

# generate config map
Expand Down
8 changes: 8 additions & 0 deletions src/resources/log/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ def __init__(self,
log = ''

super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status)


class AnalysisStoppedLog(CreateLogEntity):
def __init__(self, analysis_id: str) -> None:
log = (f"[flame -- POAPI: ANALYSISSTOPPED -- "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] "
f"Info: The analysis was stopped either locally, or externally on another node.")
super().__init__(log=log, log_type="info", analysis_id=analysis_id, status="stopped")
63 changes: 47 additions & 16 deletions src/status/status.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import time
import os
import asyncio
from typing import Optional
from httpx import AsyncClient, HTTPStatusError, ConnectError, ConnectTimeout
from httpx import Client, HTTPStatusError, ConnectError, ConnectTimeout

import flame_hub

Expand All @@ -16,6 +15,7 @@
from src.utils.hub_client import (init_hub_client_with_robot,
get_node_id_by_robot,
get_node_analysis_id,
get_partner_node_statuses,
update_hub_status)
from src.status.constants import AnalysisStatus
from src.utils.other import extract_hub_envs
Expand Down Expand Up @@ -75,6 +75,15 @@ def status_loop(database: Database, status_loop_interval: int) -> None:
# If node analysis id found
print(f"\tNode analysis id: {node_analysis_id}")
if node_analysis_id is not None:
try:
# Inform local analysis of partner node statuses
_ = inform_analysis_of_partner_statuses(database,
hub_client,
analysis_id,
node_analysis_id)
except Exception as e:
print(f"\tPO STATUS LOOP - Error when attempting to access partner_status endpoint of {analysis_id} ({repr(e)})")

# Retrieve analysis status (skip iteration if analysis is not deployed)
analysis_status = _get_analysis_status(analysis_id, database)
if analysis_status is None:
Expand Down Expand Up @@ -119,6 +128,28 @@ def status_loop(database: Database, status_loop_interval: int) -> None:
time.sleep(status_loop_interval)
print(f"PO STATUS LOOP - Status loop iteration completed. Sleeping for {status_loop_interval} seconds.")


def inform_analysis_of_partner_statuses(database: Database,
hub_client: flame_hub.CoreClient,
analysis_id: str,
node_analysis_id: str) -> Optional[dict[str, str]]:
node_statuses = get_partner_node_statuses(hub_client, analysis_id, node_analysis_id)
deployment_name = database.get_latest_deployment(analysis_id).deployment_name
try: # try except, in case analysis api is not yet ready
response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status",
headers=[('Connection', 'close')],
json={'partner_status': node_statuses})
response.raise_for_status()
Comment on lines +140 to +143
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "status.py" -path "*/status/*" | head -20

Repository: PrivateAIM/node-pod-orchestration

Length of output: 99


🏁 Script executed:

cat -n src/status/status.py | head -250

Repository: PrivateAIM/node-pod-orchestration

Length of output: 15453


🏁 Script executed:

# Check imports and the _refresh_keycloak_token function
rg "def _refresh_keycloak_token|from httpx|import httpx|ConnectError|ConnectTimeout" src/status/status.py -A 3 -B 1

Repository: PrivateAIM/node-pod-orchestration

Length of output: 1170


Close the per-request httpx.Clients and add error handling for network failures in token refresh.

These branches allocate fresh Clients without closing them. In the long-running status loop, this accumulates open transports. Additionally, _refresh_keycloak_token() at line 239 only handles HTTPStatusError but will let ConnectError/ConnectTimeout bubble out.

Suggested pattern
-        response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post(url="/analysis/partner_status",
-                                                                                               headers=[('Connection', 'close')],
-                                                                                               json={'partner_status': node_statuses})
+        with Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") as client:
+            response = client.post(url="/analysis/partner_status",
+                                   headers=[('Connection', 'close')],
+                                   json={'partner_status': node_statuses})

-            response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").get("/analysis/healthz",
-                                                                                                  headers=[('Connection', 'close')])
+            with Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") as client:
+                response = client.get("/analysis/healthz",
+                                      headers=[('Connection', 'close')])

-        try:
-            response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post("/analysis/token_refresh",
-                                                                                                   json={'token': keycloak_token},
-                                                                                                   headers=[('Connection', 'close')])
+        try:
+            with Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}") as client:
+                response = client.post("/analysis/token_refresh",
+                                       json={'token': keycloak_token},
+                                       headers=[('Connection', 'close')])
             response.raise_for_status()
         except HTTPStatusError as e:
             print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}")
+        except (ConnectError, ConnectTimeout) as e:
+            print(f"Error: Failed to reach deployment {deployment_name} for token refresh.\n{e}")

Also applies to: 195-197, 239-241

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/status/status.py` around lines 140 - 143, The per-request httpx.Client
invocations in src/status/status.py (e.g., the Client(...) used to POST to
"/analysis/partner_status" with deployment_name/PORTS and the similar clients at
the other locations) must be used as context managers (with Client(...) as
client: client.post(...)) so transports are closed instead of leaked; update
those call sites (the Client(...) creation around the partner_status POST and
the other two places around lines ~195-197) to use with and close the client
after the request. Also extend _refresh_keycloak_token() error handling to catch
httpx.ConnectError and httpx.ConnectTimeout (in addition to
httpx.HTTPStatusError), log the failure and either return an appropriate
sentinel/falsy value or retry/backoff as the surrounding code expects so
network/connectivity failures don’t bubble out of the long-running status loop.

return response.json()
except HTTPStatusError as e:
print(f"\tError whilst trying to access analysis partner_status endpoint: {e}")
except ConnectError as e:
print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} yielded an error: {e}")
except ConnectTimeout as e:
print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}")
return None


def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[str, str]]:
analysis = database.get_latest_deployment(analysis_id)
if analysis is not None:
Expand All @@ -128,7 +159,7 @@ def _get_analysis_status(analysis_id: str, database: Database) -> Optional[dict[
if db_status == AnalysisStatus.FINISHED.value:
int_status = AnalysisStatus.FINISHED.value
else:
int_status = asyncio.run(_get_internal_deployment_status(analysis.deployment_name, analysis_id))
int_status = _get_internal_deployment_status(analysis.deployment_name, analysis_id)
return {'analysis_id': analysis_id,
'db_status': analysis.status,
'int_status': int_status,
Expand All @@ -145,23 +176,24 @@ def _decide_status_action(db_status: str, int_status: str) -> Optional[str]:
newly_ended = ((db_status in [AnalysisStatus.RUNNING.value, AnalysisStatus.FAILED.value])
and (int_status in [AnalysisStatus.FINISHED.value, AnalysisStatus.FAILED.value]))
firmly_stuck = ((db_status in [AnalysisStatus.FAILED.value]) and (int_status in [AnalysisStatus.STUCK.value]))
was_stopped = int_status == AnalysisStatus.STOPPED.value
if is_stuck or is_slow:
return 'unstuck'
elif newly_running:
return 'running'
elif speedy_finished or newly_ended or firmly_stuck:
elif speedy_finished or newly_ended or firmly_stuck or was_stopped:
Comment on lines +180 to +185
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

was_stopped is dead code until the health mapper returns stopped.

_get_internal_deployment_status() still normalizes every non-finished/running/stuck result to failed, so this branch never fires. A real internal stopped result will continue down the failure path instead of the intended finishing path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/status/status.py` around lines 180 - 185, The branch checking was_stopped
never triggers because _get_internal_deployment_status currently normalizes
non-finished/running/stuck statuses to failed; update that normalization so a
true internal "stopped" status is preserved (i.e., return AnalysisStatus.STOPPED
or its .value from _get_internal_deployment_status instead of mapping it to
failed), and keep the existing check that uses was_stopped (int_status ==
AnalysisStatus.STOPPED.value) so the finalization path for stopped runs is
executed; ensure any callers of _get_internal_deployment_status still handle the
preserved STOPPED enum/value.

return 'finishing'
else:
return None


async def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str:
def _get_internal_deployment_status(deployment_name: str, analysis_id: str) -> str:
# Attempt to retrieve internal analysis status via health endpoint
start_time = time.time()
while True:
try:
response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}")
.get("/analysis/healthz", headers=[('Connection', 'close')]))
response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").get("/analysis/healthz",
headers=[('Connection', 'close')])
response.raise_for_status()
break
except HTTPStatusError as e:
Expand All @@ -172,17 +204,17 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str
print(f"\tConnection to http://nginx-{deployment_name}:{PORTS['nginx'][0]} timed out: {e}")
elapsed_time = time.time() - start_time
time.sleep(1)
if elapsed_time > _INTERNAL_STATUS_TIMEOUT: # TODO: Handle case of this happening for large images
if elapsed_time > _INTERNAL_STATUS_TIMEOUT:
print(f"\tTimeout getting internal deployment status after {elapsed_time} seconds")
return AnalysisStatus.FAILED.value

# Extract fields from response
analysis_status, analysis_token_remaining_time = (response.json()['status'],
response.json()['token_remaining_time'])
# Check if token needs refresh, do so if needed
await _refresh_keycloak_token(deployment_name=deployment_name,
analysis_id=analysis_id,
token_remaining_time=analysis_token_remaining_time)
_refresh_keycloak_token(deployment_name=deployment_name,
analysis_id=analysis_id,
token_remaining_time=analysis_token_remaining_time)

# Map status from response to preset values
if analysis_status == AnalysisStatus.FINISHED.value:
Expand All @@ -196,18 +228,17 @@ async def _get_internal_deployment_status(deployment_name: str, analysis_id: str
return health_status


async def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None:
def _refresh_keycloak_token(deployment_name: str, analysis_id: str, token_remaining_time: int) -> None:
"""
Refresh the keycloak token
:return:
"""
if token_remaining_time < (int(os.getenv('STATUS_LOOP_INTERVAL')) * 2 + 1):
keycloak_token = get_keycloak_token(analysis_id)
try:
response = await (AsyncClient(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}")
.post("/analysis/token_refresh",
json={'token': keycloak_token},
headers=[('Connection', 'close')]))
response = Client(base_url=f"http://nginx-{deployment_name}:{PORTS['nginx'][0]}").post("/analysis/token_refresh",
json={'token': keycloak_token},
headers=[('Connection', 'close')])
response.raise_for_status()
except HTTPStatusError as e:
print(f"Error: Failed to refresh keycloak token in deployment {deployment_name}.\n{e}")
Expand Down
20 changes: 20 additions & 0 deletions src/utils/hub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, r
print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}")


def get_analysis_node_statuses(hub_client: flame_hub.CoreClient, analysis_id: str) -> Optional[dict[str, str]]:
try:
node_analyzes = hub_client.find_analysis_nodes(filter={'analysis_id': analysis_id})
except (HTTPStatusError, flame_hub._exceptions.HubAPIError, AttributeError) as e:
print(f"Error: Failed to retrieve node analyzes from hub python client\n{e}")
return None
analysis_node_statuses = {}
for node in node_analyzes:
analysis_node_statuses[str(node.id)] = node.run_status
return analysis_node_statuses


def get_partner_node_statuses(hub_client: flame_hub.CoreClient,
analysis_id: str,
node_analysis_id: str) -> Optional[dict[str, str]]:
analysis_node_statuses = get_analysis_node_statuses(hub_client, analysis_id)
return {k : v for k, v in analysis_node_statuses.items() if k != node_analysis_id} \
if analysis_node_statuses is not None else None


def init_hub_client_and_update_hub_status_with_robot(analysis_id: str, status: str) -> None:
"""
Create a hub client for the analysis and update the current status.
Expand Down
Loading