Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
af903e9
fix: avoid Nonetype errors in analysis status checks
Nightknight3000 Oct 21, 2025
2b0a2e5
feat: implement POAPI StartupError Logs
Nightknight3000 Oct 21, 2025
9490afc
Merge remote-tracking branch 'origin/main' into analysis_status
Nightknight3000 Oct 22, 2025
06b8b4c
Merge remote-tracking branch 'origin/main' into startuperror_logs
Nightknight3000 Oct 22, 2025
cb498f4
Merge pull request #33 from PrivateAIM/startuperror_logs
antidodo Oct 24, 2025
dfcd7ea
Merge pull request #30 from PrivateAIM/analysis_status
antidodo Oct 24, 2025
8454b5c
fix: add internal status timeout and max restarts constants
antidodo Oct 28, 2025
c9b56a8
fix: init pod_status_dict fields
Nightknight3000 Oct 28, 2025
b0a37ca
fix: checks within hub_client init
Nightknight3000 Oct 28, 2025
8bf79af
fix: final startup error logs and status update
Nightknight3000 Oct 28, 2025
469690b
feat: actively update db analysis status through logs
Nightknight3000 Oct 28, 2025
cbe7cb6
fix: move stuck check
Nightknight3000 Oct 28, 2025
c224ad4
fix: delete firmly stuck analyzes
Nightknight3000 Oct 28, 2025
7680435
fix: comment out db status updates through logs
Nightknight3000 Oct 28, 2025
986934b
fix: running analysis check
Nightknight3000 Oct 28, 2025
da2aaa3
refactor: console prints
Nightknight3000 Nov 6, 2025
6553612
refactor: imports and functions
Nightknight3000 Nov 6, 2025
51127fc
fix: running_analyzes iteration and cleanup endpoint (zombie check)
Nightknight3000 Nov 6, 2025
fe952d8
fix: account for speedy analysis terminations
Nightknight3000 Nov 6, 2025
fb8ff62
fix: unstuck logic
Nightknight3000 Nov 6, 2025
1c4b9fc
fix: KubernetesAPIError forwarding
Nightknight3000 Nov 6, 2025
b9ea9e2
refactor: clean forwarded KubernetesAPIError log
Nightknight3000 Nov 6, 2025
d1d3bed
fix: comment out access token dependency for history route for testing
antidodo Nov 7, 2025
2a08146
fix: update log retrieval to use analysis_id for accurate mapping
antidodo Nov 7, 2025
fe3839a
fix: restore access token dependency for history and logs routes
antidodo Nov 7, 2025
5e2086e
fix: handle HubAPIError
Nightknight3000 Nov 7, 2025
111ce94
refactor: add comments, streamline logs, and string quotations
Nightknight3000 Nov 10, 2025
c265b60
feat: migrate action decisions into one function
Nightknight3000 Nov 10, 2025
7926dc8
Merge remote-tracking branch 'origin/main' into canary
Nightknight3000 Nov 10, 2025
207a2ee
Merge remote-tracking branch 'origin/main' into canary
Nightknight3000 Nov 11, 2025
ca70a95
feat: add find and delete support for jobs, migrate some functions to…
Nightknight3000 Nov 13, 2025
c6760a7
refactor: remove redundant delete_analysis_pods function and related …
antidodo Nov 13, 2025
73e674e
feat: enable toggling of hub log forwarding
antidodo Mar 3, 2026
b581dce
Merge branch 'log_config' into canary
antidodo Mar 3, 2026
000f999
refactor: update references from result-service to storage-service
antidodo Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi.responses import JSONResponse

from src.utils.hub_client import init_hub_client_with_robot, get_node_id_by_robot
from src.utils.other import extract_hub_envs
from src.api.oauth import valid_access_token
from src.resources.database.entity import Database
from src.resources.analysis.entity import CreateAnalysis
Expand All @@ -24,13 +25,9 @@
class PodOrchestrationAPI:
def __init__(self, database: Database, namespace: str = 'default'):
self.database = database
robot_id, robot_secret, hub_url_core, hub_auth, http_proxy, https_proxy = (os.getenv('HUB_ROBOT_USER'),
os.getenv('HUB_ROBOT_SECRET'),
os.getenv('HUB_URL_CORE'),
os.getenv('HUB_URL_AUTH'),
os.getenv('PO_HTTP_PROXY'),
os.getenv('PO_HTTPS_PROXY'))
robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()

self.enable_hub_logging = enable_hub_logging
self.hub_core_client = init_hub_client_with_robot(robot_id,
robot_secret,
hub_url_core,
Expand Down Expand Up @@ -229,7 +226,7 @@ def cleanup_call(self, cleanup_type: str):

def stream_logs_call(self, body: CreateLogEntity):
try:
return stream_logs(body, self.node_id, self.database, self.hub_core_client)
return stream_logs(body, self.node_id, self.enable_hub_logging, self.database, self.hub_core_client)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error streaming logs: {e}")

Expand All @@ -238,4 +235,4 @@ def health_call(self):
if not main_alive:
raise RuntimeError("Main thread is not alive.")
else:
return {"status": "ok"}
return {'status': "ok"}
10 changes: 5 additions & 5 deletions src/api/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
from typing import Annotated

oauth2_scheme = OAuth2AuthorizationCodeBearer(
tokenUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token",
authorizationUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/auth",
refreshUrl=os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/token",
tokenUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token",
authorizationUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/auth",
refreshUrl=os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/token",
)


async def valid_access_token(token: Annotated[str, Depends(oauth2_scheme)]) -> dict:
url = os.getenv("KEYCLOAK_URL") + "/realms/flame/protocol/openid-connect/certs"
url = os.getenv('KEYCLOAK_URL') + "/realms/flame/protocol/openid-connect/certs"
jwks_client = PyJWKClient(url)

try:
sig_key = jwks_client.get_signing_key_from_jwt(token)
return jwt.decode(token,
key=sig_key,
options={"verify_signature": True, "verify_aud": False, "exp": True})
options={'verify_signature': True, 'verify_aud': False, 'exp': True})
except jwt.exceptions.InvalidTokenError:
raise HTTPException(status_code=401, detail="Not authenticated")
172 changes: 46 additions & 126 deletions src/k8s/kubernetes.py

Large diffs are not rendered by default.

97 changes: 79 additions & 18 deletions src/k8s/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import time
from typing import Literal, Optional, Union

from kubernetes import client
from kubernetes import config, client


def get_k8s_resource_names(resource_type: str,
selector_type: Optional[Literal['label', 'field']] = None,
selector_arg: Optional[str] = None,
manual_name_selector: Optional[str] = None,
namespace: str = "default") -> Optional[Union[str, list[str]]]:
if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap']:
def load_cluster_config():
config.load_incluster_config()


def get_current_namespace() -> str:
namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
try:
with open(namespace_file, 'r') as file:
return file.read().strip()
# Handle the case where the file is not found
except FileNotFoundError:
# Fallback to a default namespace if the file is not found
return 'default'


def find_k8s_resources(resource_type: str,
selector_type: Optional[Literal['label', 'field']] = None,
selector_arg: Optional[str] = None,
manual_name_selector: Optional[str] = None,
namespace: str = "default") -> Optional[Union[str, list[str]]]:
if resource_type not in ['deployment', 'pod', 'service', 'networkpolicy', 'configmap', 'job']:
raise ValueError("For k8s resource search: resource_type must be one of 'deployment', 'pod', 'service', "
"'networkpolicy', or 'configmap'")
"'networkpolicy', 'configmap', or 'job")
if (selector_type is not None) and (selector_type not in ['label', 'field']):
raise ValueError("For k8s resource search: selector_type must be either 'label' or 'field'")
if (selector_type is not None) and (selector_arg is None):
Expand All @@ -19,7 +35,7 @@ def get_k8s_resource_names(resource_type: str,
kwargs = {'namespace': namespace}
if selector_type:
kwargs[f'{selector_type}_selector'] = selector_arg

if resource_type == 'deployment':
resources = client.AppsV1Api().list_namespaced_deployment(**kwargs)
elif resource_type == 'networkpolicy':
Expand All @@ -32,6 +48,8 @@ def get_k8s_resource_names(resource_type: str,
resources = core_client.list_namespaced_service(**kwargs)
elif resource_type == 'configmap':
resources = core_client.list_namespaced_config_map(**kwargs)
elif resource_type == 'job':
resources = client.BatchV1Api().list_namespaced_job(**kwargs)
else:
raise ValueError(f"Uncaptured resource type discovered! Message the Devs... (found={resource_type})")

Expand All @@ -52,12 +70,55 @@ def get_k8s_resource_names(resource_type: str,
return None


def get_current_namespace() -> str:
namespace_file = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
try:
with open(namespace_file, 'r') as file:
return file.read().strip()
# Handle the case where the file is not found
except FileNotFoundError:
# Fallback to a default namespace if the file is not found
return 'default'
def delete_k8s_resource(name: str, resource_type: str, namespace: str = 'default') -> None:
"""
Deletes a Kubernetes resource by name and type.
:param name: Name of the resource to delete.
:param resource_type: Type of the resource (e.g., 'deployment', 'service', 'pod', 'configmap', 'job').
:param namespace: Namespace in which the resource exists.
"""
print(f"PO ACTION - Deleting resource: {name} of type {resource_type} in namespace {namespace} at {time.strftime('%Y-%m-%d %H:%M:%S')}")
if resource_type == 'deployment':
try:
app_client = client.AppsV1Api()
app_client.delete_namespaced_deployment(name=name, namespace=namespace, propagation_policy='Foreground')
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} deployment")
elif resource_type == 'service':
try:
core_client = client.CoreV1Api()
core_client.delete_namespaced_service(name=name, namespace=namespace)
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} service")
elif resource_type == 'pod':
try:
core_client = client.CoreV1Api()
core_client.delete_namespaced_pod(name=name, namespace=namespace)
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} pod")
elif resource_type == 'configmap':
try:
core_client = client.CoreV1Api()
core_client.delete_namespaced_config_map(name=name, namespace=namespace)
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} configmap")
elif resource_type == 'networkpolicy':
try:
network_client = client.NetworkingV1Api()
network_client.delete_namespaced_network_policy(name=name, namespace=namespace)
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} networkpolicy")
elif resource_type == 'job':
try:
batch_client = client.BatchV1Api()
batch_client.delete_namespaced_job(name=name, namespace=namespace, propagation_policy='Foreground')
except client.exceptions.ApiException as e:
if e.reason != 'Not Found':
print(f"Error: Not Found {name} job")
Comment on lines +85 to +122
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n src/k8s/utils.py | sed -n '75,130p'

Repository: PrivateAIM/node-pod-orchestration

Length of output: 2977


🏁 Script executed:

#!/bin/bash
# Search for the exception handling pattern across the file
rg -nP "except.*ApiException|if e\.reason|Error: Not Found" src/k8s/utils.py -C1

Repository: PrivateAIM/node-pod-orchestration

Length of output: 1863


🌐 Web query:

Kubernetes Python client ApiException e.reason 404 Not Found attribute

💡 Result:

In the Kubernetes Python client, ApiException exposes these fields (set from the HTTP response): status (HTTP status code), reason (HTTP reason phrase), body (response payload), and headers. So for a “404 Not Found” you typically get e.status == 404 and e.reason == "Not Found". [1]

If you need the Kubernetes “Status” object’s details (often includes "reason": "NotFound" and "code": 404), parse e.body (it’s often JSON, sometimes bytes):

from kubernetes.client.rest import ApiException
import json

try:
    ...
except ApiException as e:
    if e.status == 404:
        k8s_status = json.loads(e.body.decode() if isinstance(e.body, (bytes, bytearray)) else e.body)
        # k8s_status["reason"]  -> "NotFound"
        # k8s_status["message"] -> e.g. 'pods "x" not found'

Real 404 examples show Reason: Not Found plus a JSON body containing "reason":"NotFound" and "code":404. [2]


Fix inverted exception handling across all resource deletion branches.

The condition is backwards: all non-404 errors are printed as "Error: Not Found" while actual 404s are silently ignored. This masks real deletion failures (permissions, connection errors, timeouts) that should either be re-raised or handled explicitly.

Lines affected: 86, 93, 100, 107, 114, 121 (deployment, service, pod, configmap, networkpolicy, job).

Fix pattern
-        except client.exceptions.ApiException as e:
-            if e.reason != 'Not Found':
-                print(f"Error: Not Found {name} deployment")
+        except client.exceptions.ApiException as e:
+            if e.status == 404:
+                return
+            raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/k8s/utils.py` around lines 85 - 122, The exception handling in all delete
branches (delete_namespaced_deployment, delete_namespaced_service,
delete_namespaced_pod, delete_namespaced_config_map,
delete_namespaced_network_policy, delete_namespaced_job) is inverted: it
currently prints "Error: Not Found" for non-404 errors and ignores actual 404s.
Change each except client.exceptions.ApiException as e block to treat 404/Not
Found as the expected ignore case (e.status == 404 or e.reason == 'Not Found')
and otherwise log or re-raise the error with the real exception details (include
e or str(e) in the message) so real failures (permission, connection, timeout)
are surfaced.

else:
raise ValueError(f"Unsupported resource type: {resource_type}")
3 changes: 1 addition & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from src.resources.database.entity import Database
from src.api.api import PodOrchestrationAPI
from src.k8s.kubernetes import load_cluster_config
from src.k8s.utils import get_current_namespace
from src.k8s.utils import get_current_namespace, load_cluster_config
from src.status.status import status_loop


Expand Down
44 changes: 22 additions & 22 deletions src/resources/database/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

class Database:
def __init__(self) -> None:
host = os.getenv("POSTGRES_HOST")
host = os.getenv('POSTGRES_HOST')
port = "5432"
user = os.getenv("POSTGRES_USER")
password = os.getenv("POSTGRES_PASSWORD")
database = os.getenv("POSTGRES_DB")
conn_uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
database = os.getenv('POSTGRES_DB')
conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
print(conn_uri)
Comment on lines +19 to 20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove credential leakage from startup logs.

Line 20 prints the full PostgreSQL URI, which includes secrets from Line 19 (POSTGRES_USER / POSTGRES_PASSWORD). This is a credentials exposure risk.

🔧 Proposed fix
-        conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
-        print(conn_uri)
+        conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
print(conn_uri)
conn_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/resources/database/entity.py` around lines 19 - 20, The startup currently
prints the full connection string (conn_uri) which exposes POSTGRES_USER and
POSTGRES_PASSWORD; remove the print(conn_uri) and, if you need runtime
visibility, log a redacted/safe version instead (e.g., only include host, port,
and database or replace user/password with "***") where the code that builds
conn_uri is located; ensure any logging uses the module's logger instead of
printing and never emits the raw conn_uri or credential values.

self.engine = create_engine(conn_uri,
pool_pre_ping=True,
Expand All @@ -31,11 +31,11 @@ def reset_db(self) -> None:

def get_deployment(self, deployment_name: str) -> Optional[AnalysisDB]:
with self.SessionLocal() as session:
return session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first()
return session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first()

def get_latest_deployment(self, analysis_id: str) -> Optional[AnalysisDB]:
with self.SessionLocal() as session:
deployments = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all()
deployments = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all()
if deployments:
return deployments[-1]
return None
Expand All @@ -50,7 +50,7 @@ def analysis_is_running(self, analysis_id: str) -> bool:

def get_deployments(self, analysis_id: str) -> list[AnalysisDB]:
with self.SessionLocal() as session:
return session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all()
return session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all()

def create_analysis(self,
analysis_id: str,
Expand Down Expand Up @@ -86,7 +86,7 @@ def create_analysis(self,

def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]:
with self.SessionLocal() as session:
analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all()
analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all()
for deployment in analysis:
if deployment:
for key, value in kwargs.items():
Expand All @@ -97,15 +97,15 @@ def update_analysis(self, analysis_id: str, **kwargs) -> list[AnalysisDB]:

def update_deployment(self, deployment_name: str, **kwargs) -> AnalysisDB:
with self.SessionLocal() as session:
deployment = session.query(AnalysisDB).filter_by(**{"deployment_name": deployment_name}).first()
deployment = session.query(AnalysisDB).filter_by(**{'deployment_name': deployment_name}).first()
for key, value in kwargs.items():
setattr(deployment, key, value)
session.commit()
return deployment

def delete_analysis(self, analysis_id: str) -> None:
with self.SessionLocal() as session:
analysis = session.query(AnalysisDB).filter_by(**{"analysis_id": analysis_id}).all()
analysis = session.query(AnalysisDB).filter_by(**{'analysis_id': analysis_id}).all()
for deployment in analysis:
if deployment:
session.delete(deployment)
Expand Down Expand Up @@ -154,7 +154,7 @@ def update_analysis_status(self, analysis_id: str, status: str) -> None:
self.update_analysis(analysis_id, status=status)

def update_deployment_status(self, deployment_name: str, status: str) -> None:
print(f"Updating deployment {deployment_name} to status {status}")
print(f"PO ACTION - Updating deployment {deployment_name} to status {status}")
self.update_deployment(deployment_name, status=status)

def stop_analysis(self, analysis_id: str) -> None:
Expand All @@ -164,19 +164,19 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]:
analysis = self.get_deployments(analysis_id)
if analysis:
analysis = analysis[0]
return {"analysis_id": analysis.analysis_id,
"project_id": analysis.project_id,
"registry_url": analysis.registry_url,
"image_url": analysis.image_url,
"registry_user": analysis.registry_user,
"registry_password": analysis.registry_password,
"namespace": analysis.namespace,
"kong_token": analysis.kong_token,
"restart_counter": analysis.restart_counter}
return {'analysis_id': analysis.analysis_id,
'project_id': analysis.project_id,
'registry_url': analysis.registry_url,
'image_url': analysis.image_url,
'registry_user': analysis.registry_user,
'registry_password': analysis.registry_password,
'namespace': analysis.namespace,
'kong_token': analysis.kong_token,
'restart_counter': analysis.restart_counter}
return None

def delete_old_deployments_from_db(self, analysis_id: str) -> None:
deployments = self.get_deployments(analysis_id)
deployments = sorted(deployments, key=lambda x: x.time_created, reverse=True)
for deployment in deployments[1:]:
self.delete_deployment(deployment.deployment_name)
self.delete_deployment(deployment.deployment_name)
4 changes: 2 additions & 2 deletions src/resources/log/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class LogEntity(BaseModel):
log: str
log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"]
log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug']

id: str = str(uuid.uuid4())
created_at: str = str(datetime.now())
Expand All @@ -21,7 +21,7 @@ def __str__(self) -> str:

class CreateLogEntity(BaseModel):
log: str
log_type: Literal["emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"]
log_type: Literal['emerg', 'alert', 'crit', 'error', 'warn', 'notice', 'info', 'debug']

analysis_id: str
status: str
Expand Down
Loading