Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
773 changes: 414 additions & 359 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[tool.poetry]
name = "Node-Pod-Orchestration"
version = "0.3.0"
version = "0.3.5"
description = ""
authors = ["Alexander Röhl <alexander.roehl@uni-tuebingen.de>","David Hieber <david.hieber@uni-tuebingen.de>", "Maximilian Jugl <Maximilian.Jugl@medizin.uni-leipzig.de>"]
authors = ["Alexander Röhl <alexander.roehl@uni-tuebingen.de>", "David Hieber <david.hieber@uni-tuebingen.de>"]
license = "Apache 2.0"
readme = "README.md"
packages = [{ include = "src" }]
Expand Down
8 changes: 6 additions & 2 deletions src/resources/analysis/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Analysis(BaseModel):
kong_token: str

restart_counter: int = 0
progress: int = 0
deployment_name: str = ''
tokens: Optional[dict[str, str]] = None
analysis_config: Optional[dict[str, str]] = None
Expand Down Expand Up @@ -53,7 +54,8 @@ def start(self, database: Database, namespace: str = 'default') -> None:
registry_password=self.registry_password,
namespace=self.namespace,
kong_token=self.kong_token,
restart_counter=self.restart_counter)
restart_counter=self.restart_counter,
progress=self.progress)

def stop(self,
database: Database,
Expand Down Expand Up @@ -82,7 +84,8 @@ def read_db_analysis(analysis: AnalysisDB) -> Analysis:
log=analysis.log,
namespace=analysis.namespace,
kong_token=analysis.kong_token,
restart_counter=analysis.restart_counter)
restart_counter=analysis.restart_counter,
progress=analysis.progress)


class CreateAnalysis(BaseModel):
Expand All @@ -94,3 +97,4 @@ class CreateAnalysis(BaseModel):
registry_password: str = 'default_pw'
kong_token: str = 'default_kong_token'
restart_counter: int = 0
progress: int = 0
2 changes: 2 additions & 0 deletions src/resources/database/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AnalysisDB(Base):
namespace = Column(String, nullable=True)
kong_token = Column(String, nullable=True)
restart_counter = Column(Integer, nullable=True, default=0)
progress = Column(Integer, nullable=True, default=0)
time_created = Column(Float, nullable=True)
time_updated = Column(Float, nullable=True)

Expand All @@ -50,6 +51,7 @@ class ArchiveDB(Base):
namespace = Column(String, nullable=True)
kong_token = Column(String, nullable=True)
restart_counter = Column(Integer, nullable=True, default=0)
progress = Column(Integer, nullable=True, default=0)
time_created = Column(Float, nullable=True)
time_updated = Column(Float, nullable=True)

26 changes: 23 additions & 3 deletions src/resources/database/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def create_analysis(self,
registry_user: str,
registry_password: str,
kong_token: str,
restart_counter: int ,
restart_counter: int,
progress: int,
namespace: str = 'default') -> AnalysisDB:
analysis = AnalysisDB(analysis_id=analysis_id,
deployment_name=deployment_name,
Expand All @@ -77,6 +78,7 @@ def create_analysis(self,
namespace=namespace,
kong_token=kong_token,
restart_counter=restart_counter,
progress=progress,
time_created=time.time())
with self.SessionLocal() as session:
session.add(analysis)
Expand Down Expand Up @@ -137,19 +139,36 @@ def get_analysis_pod_ids(self, analysis_id: str) -> list[str]:
return [deployment.pod_ids for deployment in self.get_deployments(analysis_id) if deployment is not None]

def get_analysis_log(self, analysis_id: str) -> str:
deployment = self.get_deployments(analysis_id)[0]
deployment = self.get_latest_deployment(analysis_id)
if deployment is not None:
log = deployment.log
if log is not None:
return log
return ""

def get_analysis_progress(self, analysis_id: str) -> Optional[int]:
deployment = self.get_latest_deployment(analysis_id)
if deployment is not None:
progress = deployment.progress
if progress is not None:
return progress
return None

def update_analysis_log(self, analysis_id: str, log: str) -> None:
latest = self.get_analysis_log(analysis_id)
if latest:
log = latest + "\n" + log
self.update_analysis(analysis_id, log=log)

def progress_valid(self, analysis_id: str, progress: int) -> bool:
latest = self.get_analysis_progress(analysis_id)
if (latest is not None) and (latest < progress <= 100):
return True
return False

def update_analysis_progress(self, analysis_id: str, progress: int) -> None:
self.update_analysis(analysis_id, progress=progress)

def update_analysis_status(self, analysis_id: str, status: str) -> None:
self.update_analysis(analysis_id, status=status)

Expand All @@ -172,7 +191,8 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]:
'registry_password': analysis.registry_password,
'namespace': analysis.namespace,
'kong_token': analysis.kong_token,
'restart_counter': analysis.restart_counter}
'restart_counter': analysis.restart_counter,
'progress': 0}
return None

def delete_old_deployments_from_db(self, analysis_id: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion src/resources/log/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class CreateLogEntity(BaseModel):

analysis_id: str
status: str
progress: int

def to_log_entity(self) -> LogEntity:
return LogEntity(log=self.log,
Expand All @@ -34,7 +35,7 @@ def to_log_entity(self) -> LogEntity:
class CreateStartUpErrorLog(CreateLogEntity):
def __init__(self,
restart_num: int,
error_type: Literal["stuck", "slow", "k8s"],
error_type: Literal['stuck', 'slow', 'k8s'],
analysis_id: str,
status: str,
k8s_error_msg: str = '') -> None:
Expand Down
18 changes: 16 additions & 2 deletions src/resources/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from src.k8s.utils import get_current_namespace, get_k8s_resource_names
from src.utils.token import _get_all_keycloak_clients
from src.utils.token import delete_keycloak_client
from src.utils.hub_client import init_hub_client_and_update_hub_status_with_robot
from src.utils.hub_client import (init_hub_client_and_update_hub_status_with_robot,
update_hub_status,
get_node_analysis_id)
from src.utils.other import resource_name_to_analysis


Expand All @@ -39,7 +41,8 @@ def create_analysis(body: Union[CreateAnalysis, str], database: Database) -> dic
registry_password=body.registry_password,
namespace=namespace,
kong_token=body.kong_token,
restart_counter=body.restart_counter + 1
restart_counter=body.restart_counter + 1,
progress=body.progress
)
analysis.start(database=database, namespace=namespace)

Expand Down Expand Up @@ -269,3 +272,14 @@ def stream_logs(log_entity: CreateLogEntity, node_id: str, database: Database, h
status=log_entity.status,
level=log_entity.log_type,
message=log_entity.log)

if database.progress_valid(log_entity.analysis_id, log_entity.progress):
database.update_analysis_progress(log_entity.analysis_id, log_entity.progress)
update_hub_status(hub_core_client,
get_node_analysis_id(hub_core_client, log_entity.analysis_id, node_id),
run_status=log_entity.status,
run_progress=log_entity.progress)
else:
update_hub_status(hub_core_client,
get_node_analysis_id(hub_core_client, log_entity.analysis_id, node_id),
run_status=log_entity.status)
10 changes: 8 additions & 2 deletions src/utils/hub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,20 @@ def get_node_analysis_id(hub_client: flame_hub.CoreClient, analysis_id: str, nod
return node_analysis_id


def update_hub_status(hub_client: flame_hub.CoreClient, node_analysis_id: str, run_status: str) -> None:
def update_hub_status(hub_client: flame_hub.CoreClient,
node_analysis_id: str,
run_status: str,
run_progress: Optional[int] = None) -> None:
"""
Update the status of the analysis in the hub.
"""
try:
if run_status == AnalysisStatus.STUCK.value:
run_status = AnalysisStatus.FAILED.value
hub_client.update_analysis_node(node_analysis_id, run_status=run_status)
if run_progress is None:
hub_client.update_analysis_node(node_analysis_id, run_status=run_status)
else:
hub_client.update_analysis_node(node_analysis_id, run_status=run_status, execution_progress=run_progress)
except (HTTPStatusError, ConnectError, flame_hub._exceptions.HubAPIError) as e:
print(f"Error: Failed to update hub status for node_analysis_id {node_analysis_id}\n{e}")

Expand Down