Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build-push-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- main
- canary
- new_hub


env:
Expand Down Expand Up @@ -42,6 +43,8 @@ jobs:
run: |
if [[ "${{ github.ref }}" == "refs/heads/canary" ]]; then
echo "CUSTOM_TAG=canary" >> $GITHUB_ENV
elif [[ "${{ github.ref }}" == "refs/heads/new_hub" ]]; then
echo "CUSTOM_TAG=new-hub" >> $GITHUB_ENV
else
echo "CUSTOM_TAG=latest" >> $GITHUB_ENV
fi
Expand Down
358 changes: 200 additions & 158 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "Node-Pod-Orchestration"
version = "0.3.8"
version = "0.4.0"
description = ""
authors = ["Alexander Röhl <alexander.roehl@uni-tuebingen.de>", "David Hieber <david.hieber@uni-tuebingen.de>"]
license = "Apache 2.0"
Expand All @@ -18,7 +18,7 @@ sqlalchemy = "^2.0.26"
PyJWT = "^2.10.1"
python-dotenv = "^0.21.0"
kong-admin-client = {git = "https://github.com/PrivateAIM/kong-admin-python-client.git"}
flame-hub-client = "0.2.7"
flame-hub-client = "^0.2.12"
cryptography="^44.0.3"
truststore = "^0.10.4"

Expand Down
33 changes: 17 additions & 16 deletions src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from src.utils.hub_client import init_hub_client_with_robot, get_node_id_by_robot
from src.utils.hub_client import init_hub_client_with_client, get_node_id_by_client
from src.utils.other import extract_hub_envs
from src.api.oauth import valid_access_token
from src.resources.database.entity import Database
Expand All @@ -14,7 +14,7 @@
from src.resources.utils import (create_analysis,
retrieve_history,
retrieve_logs,
get_status,
get_status_and_progress,
get_pods,
stop_analysis,
delete_analysis,
Expand All @@ -25,16 +25,17 @@
class PodOrchestrationAPI:
def __init__(self, database: Database, namespace: str = 'default'):
self.database = database
robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()

client_id, client_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()

self.enable_hub_logging = enable_hub_logging
self.hub_client = init_hub_client_with_robot(robot_id,
robot_secret,
hub_url_core,
hub_auth,
http_proxy,
https_proxy)
self.node_id = get_node_id_by_robot(self.hub_client, robot_id) if self.hub_client else None
self.hub_client = init_hub_client_with_client(client_id,
client_secret,
hub_url_core,
hub_auth,
http_proxy,
https_proxy)
self.node_id = get_node_id_by_client(self.hub_client, client_id) if self.hub_client else None
self.namespace = namespace
app = FastAPI(title="FLAME PO",
docs_url="/api/docs",
Expand Down Expand Up @@ -79,12 +80,12 @@ def __init__(self, database: Database, namespace: str = 'default'):
methods=["GET"],
response_class=JSONResponse)
router.add_api_route("/status",
self.get_all_status_call,
self.get_all_status_and_progress_call,
dependencies=[Depends(valid_access_token)],
methods=["GET"],
response_class=JSONResponse)
router.add_api_route("/status/{analysis_id}",
self.get_status_call,
self.get_status_and_progress_call,
dependencies=[Depends(valid_access_token)],
methods=["GET"],
response_class=JSONResponse)
Expand Down Expand Up @@ -170,15 +171,15 @@ def retrieve_logs_call(self, analysis_id: str):
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error retrieving logs data: {e}")

def get_all_status_call(self):
def get_all_status_and_progress_call(self):
try:
return get_status('all', self.database)
return get_status_and_progress('all', self.database)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error retrieving ALL status data: {e}")

def get_status_call(self, analysis_id: str):
def get_status_and_progress_call(self, analysis_id: str):
try:
return get_status(analysis_id, self.database)
return get_status_and_progress(analysis_id, self.database)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error retrieving status data: {e}")

Expand Down
12 changes: 10 additions & 2 deletions src/k8s/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _create_nginx_config_map(analysis_name: str,


# egress: analysis deployment to storage-service
location ~ ^/storage/(final|local|intermediate)/ {{
location ~ ^/storage/(final|local|intermediate) {{
rewrite ^/storage(/.*) $1 break;
proxy_pass http://{storage_service_name}:8080;
allow {analysis_ip};
Expand Down Expand Up @@ -519,7 +519,15 @@ def _get_logs(name: str, pod_ids: Optional[list[str]] = None, namespace: str = '
return []

# sanitize pod logs
return [''.join(filter(lambda x: x in string.printable, log)) for log in pod_logs]
final_logs = []
for log in pod_logs:
log = ''.join(filter(lambda x: x in string.printable, log))
log = '\n'.join([l for l in log.split('\n')
if not l.startswith('INFO:') and
not (l.endswith('"GET /healthz HTTP/1.0" 200 OK') or
l.endswith('"POST /webhook HTTP/1.0" 200 OK'))])
final_logs.append(log)
return final_logs


def _get_pods(name: str, namespace: str = 'default') -> list[str]:
Expand Down
8 changes: 6 additions & 2 deletions src/resources/analysis/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Analysis(BaseModel):
kong_token: str

restart_counter: int = 0
progress: int = 0
deployment_name: str = ''
tokens: Optional[dict[str, str]] = None
analysis_config: Optional[dict[str, str]] = None
Expand Down Expand Up @@ -53,7 +54,8 @@ def start(self, database: Database, namespace: str = 'default') -> None:
registry_password=self.registry_password,
namespace=self.namespace,
kong_token=self.kong_token,
restart_counter=self.restart_counter)
restart_counter=self.restart_counter,
progress=self.progress)

def stop(self,
database: Database,
Expand Down Expand Up @@ -82,7 +84,8 @@ def read_db_analysis(analysis: AnalysisDB) -> Analysis:
log=analysis.log,
namespace=analysis.namespace,
kong_token=analysis.kong_token,
restart_counter=analysis.restart_counter)
restart_counter=analysis.restart_counter,
progress=analysis.progress)


class CreateAnalysis(BaseModel):
Expand All @@ -94,3 +97,4 @@ class CreateAnalysis(BaseModel):
registry_password: str = 'default_pw'
kong_token: str = 'default_kong_token'
restart_counter: int = 0
progress: int = 0
2 changes: 2 additions & 0 deletions src/resources/database/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AnalysisDB(Base):
namespace = Column(String, nullable=True)
kong_token = Column(String, nullable=True)
restart_counter = Column(Integer, nullable=True, default=0)
progress = Column(Integer, nullable=True, default=0)
time_created = Column(Float, nullable=True)
time_updated = Column(Float, nullable=True)

Expand All @@ -50,6 +51,7 @@ class ArchiveDB(Base):
namespace = Column(String, nullable=True)
kong_token = Column(String, nullable=True)
restart_counter = Column(Integer, nullable=True, default=0)
progress = Column(Integer, nullable=True, default=0)
time_created = Column(Float, nullable=True)
time_updated = Column(Float, nullable=True)

26 changes: 23 additions & 3 deletions src/resources/database/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def create_analysis(self,
registry_user: str,
registry_password: str,
kong_token: str,
restart_counter: int ,
restart_counter: int,
progress: int,
namespace: str = 'default') -> AnalysisDB:
analysis = AnalysisDB(analysis_id=analysis_id,
deployment_name=deployment_name,
Expand All @@ -77,6 +78,7 @@ def create_analysis(self,
namespace=namespace,
kong_token=kong_token,
restart_counter=restart_counter,
progress=progress,
time_created=time.time())
with self.SessionLocal() as session:
session.add(analysis)
Expand Down Expand Up @@ -137,19 +139,36 @@ def get_analysis_pod_ids(self, analysis_id: str) -> list[str]:
return [deployment.pod_ids for deployment in self.get_deployments(analysis_id) if deployment is not None]

def get_analysis_log(self, analysis_id: str) -> str:
deployment = self.get_deployments(analysis_id)[0]
deployment = self.get_latest_deployment(analysis_id)
if deployment is not None:
log = deployment.log
if log is not None:
return log
return ""

def get_analysis_progress(self, analysis_id: str) -> Optional[int]:
deployment = self.get_latest_deployment(analysis_id)
if deployment is not None:
progress = deployment.progress
if progress is not None:
return progress
return None

def update_analysis_log(self, analysis_id: str, log: str) -> None:
latest = self.get_analysis_log(analysis_id)
if latest:
log = latest + "\n" + log
self.update_analysis(analysis_id, log=log)

def progress_valid(self, analysis_id: str, progress: int) -> bool:
latest = self.get_analysis_progress(analysis_id)
if (latest is not None) and (latest < progress <= 100):
return True
return False

def update_analysis_progress(self, analysis_id: str, progress: int) -> None:
self.update_analysis(analysis_id, progress=progress)

def update_analysis_status(self, analysis_id: str, status: str) -> None:
self.update_analysis(analysis_id, status=status)

Expand All @@ -172,7 +191,8 @@ def extract_analysis_body(self, analysis_id: str) -> Optional[dict]:
'registry_password': analysis.registry_password,
'namespace': analysis.namespace,
'kong_token': analysis.kong_token,
'restart_counter': analysis.restart_counter}
'restart_counter': analysis.restart_counter,
'progress': 0}
return None

def delete_old_deployments_from_db(self, analysis_id: str) -> None:
Expand Down
5 changes: 3 additions & 2 deletions src/resources/log/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class CreateLogEntity(BaseModel):

analysis_id: str
status: str
progress: int

def to_log_entity(self) -> LogEntity:
return LogEntity(log=self.log,
Expand All @@ -34,7 +35,7 @@ def to_log_entity(self) -> LogEntity:
class CreateStartUpErrorLog(CreateLogEntity):
def __init__(self,
restart_num: int,
error_type: Literal["stuck", "slow", "k8s"],
error_type: Literal['stuck', 'slow', 'k8s'],
analysis_id: str,
status: str,
k8s_error_msg: str = '') -> None:
Expand All @@ -59,7 +60,7 @@ def __init__(self,
else:
log = ''

super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status)
super().__init__(log=log, log_type="error", analysis_id=analysis_id, status=status, progress=0)


class AnalysisStoppedLog(CreateLogEntity):
Expand Down
27 changes: 21 additions & 6 deletions src/resources/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from src.k8s.utils import get_current_namespace, find_k8s_resources, delete_k8s_resource
from src.utils.token import _get_all_keycloak_clients
from src.utils.token import delete_keycloak_client
from src.utils.hub_client import init_hub_client_and_update_hub_status_with_robot
from src.utils.hub_client import (init_hub_client_and_update_hub_status_with_client,
update_hub_status,
get_node_analysis_id)
from src.utils.other import resource_name_to_analysis


Expand All @@ -37,12 +39,13 @@ def create_analysis(body: Union[CreateAnalysis, str], database: Database) -> dic
registry_password=body.registry_password,
namespace=namespace,
kong_token=body.kong_token,
restart_counter=body.restart_counter + 1
restart_counter=body.restart_counter + 1,
progress=body.progress
)
analysis.start(database=database, namespace=namespace)

# update hub status
init_hub_client_and_update_hub_status_with_robot(body.analysis_id, AnalysisStatus.STARTED.value)
init_hub_client_and_update_hub_status_with_client(body.analysis_id, AnalysisStatus.STARTED.value)

return {body.analysis_id: analysis.status}

Expand Down Expand Up @@ -94,7 +97,7 @@ def retrieve_logs(analysis_id_str: str, database: Database) -> dict[str, dict[st
return get_analysis_logs(deployment_names, database=database)


def get_status(analysis_id_str: str, database: Database) -> dict[str, str]:
def get_status_and_progress(analysis_id_str: str, database: Database) -> dict[str, dict[str, str]]:
if analysis_id_str == 'all':
analysis_ids = database.get_analysis_ids()
else:
Expand All @@ -106,7 +109,8 @@ def get_status(analysis_id_str: str, database: Database) -> dict[str, str]:
if deployment is not None:
deployments[analysis_id] = read_db_analysis(deployment)

return {analysis_id: deployment.status for analysis_id, deployment in deployments.items()}
return {analysis_id: {'status': deployment.status, 'progress': deployment.progress}
for analysis_id, deployment in deployments.items()}


def get_pods(analysis_id_str: str, database: Database) -> dict[str, list[str]]:
Expand Down Expand Up @@ -150,7 +154,7 @@ def stop_analysis(analysis_id_str: str, database: Database) -> dict[str, str]:
final_status = AnalysisStatus.STOPPED.value

# update hub status
init_hub_client_and_update_hub_status_with_robot(analysis_id, final_status)
init_hub_client_and_update_hub_status_with_client(analysis_id, final_status)

return {analysis_id: deployment.status for analysis_id, deployment in deployments.items()}

Expand Down Expand Up @@ -268,3 +272,14 @@ def stream_logs(log_entity: CreateLogEntity, node_id: str, enable_hub_logging: b
status=log_entity.status,
level=log_entity.log_type,
message=log_entity.log)

if database.progress_valid(log_entity.analysis_id, log_entity.progress):
database.update_analysis_progress(log_entity.analysis_id, log_entity.progress)
update_hub_status(hub_core_client,
get_node_analysis_id(hub_core_client, log_entity.analysis_id, node_id),
run_status=log_entity.status,
run_progress=log_entity.progress)
else:
update_hub_status(hub_core_client,
get_node_analysis_id(hub_core_client, log_entity.analysis_id, node_id),
run_status=log_entity.status)
22 changes: 12 additions & 10 deletions src/status/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@

import flame_hub

from src.resources.log.entity import CreateStartUpErrorLog
from src.k8s.kubernetes import PORTS, get_pod_status
from src.resources.database.entity import Database, AnalysisDB
from src.resources.log.entity import CreateStartUpErrorLog


from src.utils.hub_client import (init_hub_client_with_client,
get_node_id_by_client,
get_node_analysis_id,
get_partner_node_statuses,
update_hub_status)
from src.resources.utils import (unstuck_analysis_deployments,
stop_analysis,
delete_analysis,
stream_logs)
from src.utils.hub_client import (init_hub_client_with_robot,
get_node_id_by_robot,
get_node_analysis_id,
get_partner_node_statuses,
update_hub_status)
from src.status.constants import AnalysisStatus
from src.utils.other import extract_hub_envs
from src.utils.token import get_keycloak_token
Expand All @@ -33,18 +35,18 @@ def status_loop(database: Database, status_loop_interval: int) -> None:
node_id = None
node_analysis_ids = {}

robot_id, robot_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()
client_id, client_secret, hub_url_core, hub_auth, enable_hub_logging, http_proxy, https_proxy = extract_hub_envs()

# Enter lifecycle loop
while True:
if hub_client is None:
hub_client = init_hub_client_with_robot(robot_id,
robot_secret,
hub_client = init_hub_client_with_client(client_id,
client_secret,
hub_url_core,
hub_auth,
http_proxy,
https_proxy)
node_id = get_node_id_by_robot(hub_client, robot_id)
node_id = get_node_id_by_client(hub_client, client_id)
# Catch unresponsive hub client
if node_id is None:
print("PO ACTION - Resetting hub client...")
Expand Down
Loading
Loading