diff --git a/src/alita_tools/carrier/api_wrapper.py b/src/alita_tools/carrier/api_wrapper.py index 45d09b1a..a50b9b6a 100644 --- a/src/alita_tools/carrier/api_wrapper.py +++ b/src/alita_tools/carrier/api_wrapper.py @@ -1,4 +1,5 @@ import logging +import re from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field, model_validator, SecretStr from .carrier_sdk import CarrierClient, CarrierCredentials, CarrierAPIError @@ -77,3 +78,42 @@ def get_report_file_log(self, bucket: str, file_name: str): def upload_excel_report(self, bucket_name: str, excel_report_name: str): return self._client.upload_excel_report(bucket_name, excel_report_name) + + def get_ui_reports_list(self) -> List[Dict[str, Any]]: + """Get list of UI test reports from the Carrier platform.""" + return self._client.get_ui_tests_list() + + def get_ui_report_links(self, uid: str) -> list: + """Get all unique file_names for a given UI report UID, ending with .html, without #index=*, and only unique values.""" + endpoint = f"api/v1/ui_performance/results/{self.project_id}/{uid}?sort=loop&order=asc" + try: + response = self._client.request('get', endpoint) + file_names = set() + def clean_file_name(file_name): + # Remove #index=... and everything after, then ensure it ends with .html + # This regex removes #index=... and anything after .html + match = re.match(r"(.+?\.html)", file_name) + if match: + return match.group(1) + return file_name + # If the response is a dict with lists as values, flatten all file_names from all values + if isinstance(response, dict): + for value in response.values(): + if isinstance(value, list): + for item in value: + file_name = item.get("file_name") + if file_name: + clean_name = clean_file_name(file_name) + file_names.add(clean_name) + elif isinstance(response, list): + for item in response: + file_name = item.get("file_name") + if file_name: + clean_name = clean_file_name(file_name) + file_names.add(clean_name) + sorted_names = sorted(file_names) + prefix = f"https://platform.getcarrier.io/api/v1/artifacts/artifact/default/{self.project_id}/reports/" + return [prefix + name for name in sorted_names] + except Exception as e: + logger.error(f"Failed to fetch UI report links: {e}") + return [] diff --git a/src/alita_tools/carrier/carrier_sdk.py b/src/alita_tools/carrier/carrier_sdk.py index db5385f6..2f7ebc4f 100644 --- a/src/alita_tools/carrier/carrier_sdk.py +++ b/src/alita_tools/carrier/carrier_sdk.py @@ -198,3 +198,8 @@ def upload_excel_report(self, bucket_name: str, excel_report_name: str): s3_config = {'integration_id': 1, 'is_local': False} requests.post(full_url, params=s3_config, allow_redirects=True, files=files, headers=headers) + def get_ui_tests_list(self) -> List[Dict[str, Any]]: + """Get list of UI test reports from the Carrier platform.""" + endpoint = f"api/v1/ui_performance/reports/{self.credentials.project_id}?offset=0&limit=25" + return self.request('get', endpoint).get("rows", []) + diff --git a/src/alita_tools/carrier/tools.py b/src/alita_tools/carrier/tools.py index 8e941677..fd2760cd 100644 --- a/src/alita_tools/carrier/tools.py +++ b/src/alita_tools/carrier/tools.py @@ -2,7 +2,7 @@ from .tickets_tool import FetchTicketsTool, CreateTicketTool from .backend_reports_tool import GetReportsTool, GetReportByIDTool, CreateExcelReportTool from .backend_tests_tool import GetTestsTool, GetTestByIDTool, RunTestByIDTool - +from .ui_reports_tool import GetUIReportsTool, GetUIReportByIDTool __all__ = [ {"name": "get_ticket_list", "tool": FetchTicketsTool}, @@ -12,5 +12,7 @@ {"name": "create_excel_report", "tool": CreateExcelReportTool}, {"name": "get_tests", "tool": GetTestsTool}, {"name": "get_test_by_id", "tool": GetTestByIDTool}, - {"name": "run_test_by_id", "tool": RunTestByIDTool} + {"name": "run_test_by_id", "tool": RunTestByIDTool}, + {"name": "get_ui_reports", "tool": GetUIReportsTool}, + {"name": "get_ui_report_by_id", "tool": GetUIReportByIDTool} ] diff --git a/src/alita_tools/carrier/ui_reports_tool.py b/src/alita_tools/carrier/ui_reports_tool.py new file mode 100644 index 00000000..e727bce1 --- /dev/null +++ b/src/alita_tools/carrier/ui_reports_tool.py @@ -0,0 +1,89 @@ +import logging +import json +import traceback +from typing import Type +from langchain_core.tools import BaseTool, ToolException +from pydantic.fields import Field +from pydantic import create_model, BaseModel +from .api_wrapper import CarrierAPIWrapper + +logger = logging.getLogger("carrier_ui_reports_tool") + +class GetUIReportsTool(BaseTool): + api_wrapper: CarrierAPIWrapper = Field(..., description="Carrier API Wrapper instance") + name: str = "get_ui_reports" + description: str = "Get list of UI test reports from the Carrier platform." + args_schema: Type[BaseModel] = create_model( + "GetUIReportsInput", + tag_name=(str, Field(default="", description="Optional parameter. Tag name to filter UI reports")), + ) + + def _run(self, tag_name=""): + try: + reports = self.api_wrapper.get_ui_reports_list() + + # Fields to keep in each report + base_fields = { + "id", "name", "environment", "test_type", "browser", "browser_version", "test_status", + "start_time", "end_time", "duration", "loops", "aggregation", "passed" + } + trimmed_reports = [] + for report in reports: + # Filter by tag title if present + tags = report.get("tags", []) + if tag_name and not any(tag.get("title") == tag_name for tag in tags): + continue + + trimmed = {k: report[k] for k in base_fields if k in report} + trimmed["tags"] = [tag["title"] for tag in tags] + + # Simplify test_parameters from test_config + test_config = report.get("test_config", {}) + trimmed["test_parameters"] = [ + {"name": param["name"], "default": param["default"]} + for param in test_config.get("test_parameters", []) + ] + + # Extract source from test_config + if "source" in test_config: + trimmed["source"] = test_config["source"] + + trimmed_reports.append(trimmed) + + return json.dumps(trimmed_reports) + except Exception: + stacktrace = traceback.format_exc() + logger.error(f"Error downloading UI reports: {stacktrace}") + raise ToolException(stacktrace) + +class GetUIReportByIDTool(BaseTool): + api_wrapper: CarrierAPIWrapper = Field(..., description="Carrier API Wrapper instance") + name: str = "get_ui_report_by_id" + description: str = "Get UI report data from the Carrier platform." + args_schema: Type[BaseModel] = create_model( + "GetUIReportByIdInput", + report_id=(str, Field(description="UI Report id to retrieve")), + ) + + def _run(self, report_id: str): + try: + reports = self.api_wrapper.get_ui_reports_list() + report_data = {} + for report in reports: + if report_id == str(report["id"]): + report_data = report + break + + # Step 1: Get uid from report_data + uid = report_data.get("uid") + report_links = [] + if uid: + # Step 2: Fetch report links using the new API wrapper method + report_links = self.api_wrapper.get_ui_report_links(uid) + report_data["report_links"] = report_links + + return json.dumps(report_data) + except Exception: + stacktrace = traceback.format_exc() + logger.error(f"Error downloading UI report: {stacktrace}") + raise ToolException(stacktrace)