Skip to content
This repository was archived by the owner on Nov 26, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/alita_tools/carrier/api_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field, model_validator, SecretStr
from .carrier_sdk import CarrierClient, CarrierCredentials, CarrierAPIError
Expand Down Expand Up @@ -77,3 +78,42 @@ def get_report_file_log(self, bucket: str, file_name: str):

def upload_excel_report(self, bucket_name: str, excel_report_name: str):
return self._client.upload_excel_report(bucket_name, excel_report_name)

def get_ui_reports_list(self) -> List[Dict[str, Any]]:
"""Get list of UI test reports from the Carrier platform."""
return self._client.get_ui_tests_list()

def get_ui_report_links(self, uid: str) -> list:
"""Get all unique file_names for a given UI report UID, ending with .html, without #index=*, and only unique values."""
endpoint = f"api/v1/ui_performance/results/{self.project_id}/{uid}?sort=loop&order=asc"
try:
response = self._client.request('get', endpoint)
file_names = set()
def clean_file_name(file_name):
# Remove #index=... and everything after, then ensure it ends with .html
# This regex removes #index=... and anything after .html
match = re.match(r"(.+?\.html)", file_name)
if match:
return match.group(1)
return file_name
# If the response is a dict with lists as values, flatten all file_names from all values
if isinstance(response, dict):
for value in response.values():
if isinstance(value, list):
for item in value:
file_name = item.get("file_name")
if file_name:
clean_name = clean_file_name(file_name)
file_names.add(clean_name)
elif isinstance(response, list):
for item in response:
file_name = item.get("file_name")
if file_name:
clean_name = clean_file_name(file_name)
file_names.add(clean_name)
sorted_names = sorted(file_names)
prefix = f"https://platform.getcarrier.io/api/v1/artifacts/artifact/default/{self.project_id}/reports/"
return [prefix + name for name in sorted_names]
except Exception as e:
logger.error(f"Failed to fetch UI report links: {e}")
return []
5 changes: 5 additions & 0 deletions src/alita_tools/carrier/carrier_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,8 @@ def upload_excel_report(self, bucket_name: str, excel_report_name: str):
s3_config = {'integration_id': 1, 'is_local': False}
requests.post(full_url, params=s3_config, allow_redirects=True, files=files, headers=headers)

def get_ui_tests_list(self) -> List[Dict[str, Any]]:
"""Get list of UI test reports from the Carrier platform."""
endpoint = f"api/v1/ui_performance/reports/{self.credentials.project_id}?offset=0&limit=25"
return self.request('get', endpoint).get("rows", [])

6 changes: 4 additions & 2 deletions src/alita_tools/carrier/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .tickets_tool import FetchTicketsTool, CreateTicketTool
from .backend_reports_tool import GetReportsTool, GetReportByIDTool, CreateExcelReportTool
from .backend_tests_tool import GetTestsTool, GetTestByIDTool, RunTestByIDTool

from .ui_reports_tool import GetUIReportsTool, GetUIReportByIDTool

__all__ = [
{"name": "get_ticket_list", "tool": FetchTicketsTool},
Expand All @@ -12,5 +12,7 @@
{"name": "create_excel_report", "tool": CreateExcelReportTool},
{"name": "get_tests", "tool": GetTestsTool},
{"name": "get_test_by_id", "tool": GetTestByIDTool},
{"name": "run_test_by_id", "tool": RunTestByIDTool}
{"name": "run_test_by_id", "tool": RunTestByIDTool},
{"name": "get_ui_reports", "tool": GetUIReportsTool},
{"name": "get_ui_report_by_id", "tool": GetUIReportByIDTool}
]
89 changes: 89 additions & 0 deletions src/alita_tools/carrier/ui_reports_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import json
import traceback
from typing import Type
from langchain_core.tools import BaseTool, ToolException
from pydantic.fields import Field
from pydantic import create_model, BaseModel
from .api_wrapper import CarrierAPIWrapper

logger = logging.getLogger("carrier_ui_reports_tool")

class GetUIReportsTool(BaseTool):
api_wrapper: CarrierAPIWrapper = Field(..., description="Carrier API Wrapper instance")
name: str = "get_ui_reports"
description: str = "Get list of UI test reports from the Carrier platform."
args_schema: Type[BaseModel] = create_model(
"GetUIReportsInput",
tag_name=(str, Field(default="", description="Optional parameter. Tag name to filter UI reports")),
)

def _run(self, tag_name=""):
try:
reports = self.api_wrapper.get_ui_reports_list()

# Fields to keep in each report
base_fields = {
"id", "name", "environment", "test_type", "browser", "browser_version", "test_status",
"start_time", "end_time", "duration", "loops", "aggregation", "passed"
}
trimmed_reports = []
for report in reports:
# Filter by tag title if present
tags = report.get("tags", [])
if tag_name and not any(tag.get("title") == tag_name for tag in tags):
continue

trimmed = {k: report[k] for k in base_fields if k in report}
trimmed["tags"] = [tag["title"] for tag in tags]

# Simplify test_parameters from test_config
test_config = report.get("test_config", {})
trimmed["test_parameters"] = [
{"name": param["name"], "default": param["default"]}
for param in test_config.get("test_parameters", [])
]

# Extract source from test_config
if "source" in test_config:
trimmed["source"] = test_config["source"]

trimmed_reports.append(trimmed)

return json.dumps(trimmed_reports)
except Exception:
stacktrace = traceback.format_exc()
logger.error(f"Error downloading UI reports: {stacktrace}")
raise ToolException(stacktrace)

class GetUIReportByIDTool(BaseTool):
api_wrapper: CarrierAPIWrapper = Field(..., description="Carrier API Wrapper instance")
name: str = "get_ui_report_by_id"
description: str = "Get UI report data from the Carrier platform."
args_schema: Type[BaseModel] = create_model(
"GetUIReportByIdInput",
report_id=(str, Field(description="UI Report id to retrieve")),
)

def _run(self, report_id: str):
try:
reports = self.api_wrapper.get_ui_reports_list()
report_data = {}
for report in reports:
if report_id == str(report["id"]):
report_data = report
break

# Step 1: Get uid from report_data
uid = report_data.get("uid")
report_links = []
if uid:
# Step 2: Fetch report links using the new API wrapper method
report_links = self.api_wrapper.get_ui_report_links(uid)
report_data["report_links"] = report_links

return json.dumps(report_data)
except Exception:
stacktrace = traceback.format_exc()
logger.error(f"Error downloading UI report: {stacktrace}")
raise ToolException(stacktrace)