Skip to content
This repository was archived by the owner on Nov 26, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/alita_tools/carrier/api_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ def run_test(self, test_id: str, json_body):
def get_engagements_list(self) -> List[Dict[str, Any]]:
return self._client.get_engagements_list()

def download_and_unzip_reports(self, file_name: str, bucket: str, extract_to: str = "/tmp") -> str:
return self._client.download_and_unzip_reports(file_name, bucket, extract_to)

def get_report_file_name(self, report_id: str, extract_to: str = "/tmp"):
return self._client.get_report_file_name(report_id, extract_to)

def get_report_file_log(self, bucket: str, file_name: str):
return self._client.get_report_file_log(bucket, file_name)

def upload_excel_report(self, bucket_name: str, excel_report_name: str):
return self._client.upload_excel_report(bucket_name, excel_report_name)
193 changes: 146 additions & 47 deletions src/alita_tools/carrier/backend_reports_tool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
import json
import traceback
from typing import Type
Expand All @@ -8,6 +9,7 @@
from .api_wrapper import CarrierAPIWrapper
from .utils import get_latest_log_file, calculate_thresholds
import os
from .excel_reporter import GatlingReportParser, JMeterReportParser, ExcelReporter


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,7 +73,7 @@ class GetReportByIDTool(BaseTool):
description: str = "Get report data from the Carrier platform."
args_schema: Type[BaseModel] = create_model(
"GetReportByIdInput",
report_id=(str, Field(default="", description="Report id to retrieve")),
report_id=(str, Field(description="Report id to retrieve")),
)

def _run(self, report_id: str):
Expand All @@ -96,63 +98,160 @@ class CreateExcelReportTool(BaseTool):
description: str = "Create excel report by report ID from Carrier."
args_schema: Type[BaseModel] = create_model(
"CreateExcelReportInput",
report_id=(str, Field(description="Report ID to retrieve"))
report_id=(str, Field(default=None, description="Report ID to retrieve")),
bucket=(str, Field(default=None, description="Bucket with jtl/log file")),
file_name=(str, Field(default=None, description="File name for .jtl or .log report")),
**{
"think_time": (str, Field(default=None, description="Think time parameter")),
"pct": (str, Field(default=None, description="Percentile parameter")),
"tp_threshold": (int, Field(default=None, description="Throughput threshold")),
"rt_threshold": (int, Field(default=None, description="Response time threshold")),
"er_threshold": (int, Field(default=None, description="Error rate threshold")),
}
)

def _run(self, report_id: str):
try:
report, file_path = self.api_wrapper.get_report_file_name(report_id)
def _run(self, report_id=None, bucket=None, file_name=None, **kwargs):
# Validate input
if not report_id and not all([bucket, file_name]):
return self._missing_input_response()

#
from .excel_reporter import GatlingReportParser, JMeterReportParser, ExcelReporter
# Default parameters
default_parameters = self._get_default_parameters()
if not kwargs:
return self._request_parameter_confirmation(default_parameters)

# TODO get think_time, thresholds, pct from parameters
think_time = "2,0-5,0"
pct = "95Pct"
thresholds = []
carrier_report = f"https://platform.getcarrier.io/-/performance/backend/results?result_id={report_id}"
# Merge default parameters with user-provided values
parameters = {**default_parameters, **kwargs}

lg_type = report.get("lg_type")
if lg_type == "gatling":
report_file = get_latest_log_file(file_path, "simulation.log")
gatling_parser = GatlingReportParser(report_file, think_time)
result_stats_j = gatling_parser.parse()
result_stats_j['requests'].update(result_stats_j['groups'])
elif lg_type == "jmeter":
report_file = f"{file_path}/jmeter.jtl"
jmeter_parser = JMeterReportParser(report_file, think_time)
result_stats_j = jmeter_parser.parse()
try:
# Process report based on input type
if report_id:
return self._process_report_by_id(report_id, parameters)
else:
return "Unsupported type of backend report"
return self._process_report_by_file(bucket, file_name, parameters)
except Exception:
stacktrace = traceback.format_exc()
logger.error(f"Error retrieving report file: {stacktrace}")
raise ToolException(stacktrace)

def _missing_input_response(self):
"""Response when required input is missing."""
return {
"message": "Please provide report ID or bucket and .jtl/.log file name.",
"parameters": {
"report_id": None,
"bucket": None,
"file_name": None,
},
}

try:
calc_thr_j = calculate_thresholds(result_stats_j, pct, thresholds)
except Exception as e:
print(e)
calc_thr_j = []
def _get_default_parameters(self):
"""Return default parameters."""
return {
"think_time": "2,0-5,0",
"pct": "95Pct",
"tp_threshold": 10,
"rt_threshold": 500,
"er_threshold": 5,
}

excel_report_file_name = f'/tmp/reports_test_results_{report["build_id"]}_excel_report.xlsx'
excel_reporter_object = ExcelReporter(report_path=excel_report_file_name)
excel_reporter_object.prepare_headers_and_titles()
excel_reporter_object.write_to_excel(result_stats_j, carrier_report, calc_thr_j, pct)
def _request_parameter_confirmation(self, default_parameters):
"""Ask user to confirm or override default parameters."""
return {
"message": "Please confirm or override the following parameters to proceed with the report generation.",
"parameters": default_parameters,
}

bucket_name = report["name"].replace("_", "").replace(" ", "").lower()
def _process_report_by_id(self, report_id, parameters):
"""Process report using report ID."""
report, file_path = self.api_wrapper.get_report_file_name(report_id)
carrier_report = f"{self.api_wrapper.url.rstrip('/')}/-/performance/backend/results?result_id={report_id}"
lg_type = report.get("lg_type")
excel_report_file_name = f'/tmp/reports_test_results_{report["build_id"]}_excel_report.xlsx'
bucket_name = report["name"].replace("_", "").replace(" ", "").lower()

self.api_wrapper.upload_excel_report(bucket_name, excel_report_file_name)
result_stats_j = self._parse_report(file_path, lg_type, parameters["think_time"], is_absolute_file_path=True)
calc_thr_j = self._calculate_thresholds(result_stats_j, parameters)

# Clean up
return self._generate_and_upload_report(
result_stats_j, carrier_report, calc_thr_j, parameters, excel_report_file_name, bucket_name, file_path
)

import shutil
try:
shutil.rmtree(file_path)
except Exception as e:
print(e)
if os.path.exists(excel_report_file_name):
os.remove(excel_report_file_name)
def _process_report_by_file(self, bucket, file_name, parameters):
"""Process report using bucket and file name."""
file_path = self.api_wrapper.get_report_file_log(bucket, file_name)
carrier_report = "not specified"
lg_type = "jmeter" if "jtl" in file_name else "gatling"
current_date = datetime.now().strftime('%Y-%m-%d')
excel_report_file_name = f'{file_path}_{current_date}.xlsx'
bucket_name = bucket

return f"Excel report generated and uploaded to bucket {bucket_name}, report name: {excel_report_file_name.replace('/tmp/', '')}"
except Exception:
stacktrace = traceback.format_exc()
logger.error(f"Error retrieving report file: {stacktrace}")
raise ToolException(stacktrace)
result_stats_j = self._parse_report(file_path, lg_type, parameters["think_time"], is_absolute_file_path=True)
calc_thr_j = self._calculate_thresholds(result_stats_j, parameters)

return self._generate_and_upload_report(
result_stats_j, carrier_report, calc_thr_j, parameters, excel_report_file_name, bucket_name, file_path
)

def _parse_report(self, file_path, lg_type, think_time, is_absolute_file_path=False):
"""Parse the report based on its type."""
if lg_type == "gatling":
if is_absolute_file_path:
report_file = file_path
else:
report_file = get_latest_log_file(file_path, "simulation.log")
parser = GatlingReportParser(report_file, think_time)
result_stats_j = parser.parse()
result_stats_j["requests"].update(result_stats_j["groups"])
elif lg_type == "jmeter":
if is_absolute_file_path:
report_file = file_path
else:
report_file = f"{file_path}/jmeter.jtl"
parser = JMeterReportParser(report_file, think_time)
result_stats_j = parser.parse()
else:
raise ToolException("Unsupported type of backend report")
return result_stats_j

def _calculate_thresholds(self, result_stats_j, parameters):
"""Calculate thresholds."""
thresholds = {
"tp_threshold": parameters["tp_threshold"],
"rt_threshold": parameters["rt_threshold"],
"er_threshold": parameters["er_threshold"],
}
try:
return calculate_thresholds(result_stats_j, parameters["pct"], thresholds)
except Exception as e:
logger.error(e)
return []

def _generate_and_upload_report(self, result_stats_j, carrier_report, calc_thr_j, parameters, excel_report_file_name, bucket_name, file_path):
"""Generate and upload the Excel report."""
excel_reporter_object = ExcelReporter(report_path=excel_report_file_name)
excel_reporter_object.prepare_headers_and_titles()
excel_reporter_object.write_to_excel(result_stats_j, carrier_report, calc_thr_j, parameters["pct"])

self.api_wrapper.upload_excel_report(bucket_name, excel_report_file_name)

# Clean up
self._cleanup(file_path, excel_report_file_name)

excel_report = excel_report_file_name.replace('/tmp/', '')
return f"Excel report generated and uploaded to bucket {bucket_name}, " \
f"report name: {excel_report}, " \
f"link to download report from Carrier: " \
f"{self.api_wrapper.url.rstrip('/')}/api/v1/artifacts/artifact/default/{self.api_wrapper.project_id}/{bucket_name}/{excel_report}"

def _cleanup(self, file_path, excel_report_file_name):
"""Clean up temporary files."""
import shutil
try:
shutil.rmtree(file_path)
except Exception as e:
logger.error(e)
if os.path.exists(file_path):
os.remove(file_path)
if os.path.exists(excel_report_file_name):
os.remove(excel_report_file_name)
41 changes: 36 additions & 5 deletions src/alita_tools/carrier/backend_tests_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class GetTestByIDTool(BaseTool):
description: str = "Get test data from the Carrier platform."
args_schema: Type[BaseModel] = create_model(
"GetTestByIdInput",
test_id=(str, Field(default="", description="Test id to retrieve")),
test_id=(str, Field(description="Test id to retrieve")),
)

def _run(self, test_id: str):
Expand All @@ -81,10 +81,12 @@ class RunTestByIDTool(BaseTool):
args_schema: Type[BaseModel] = create_model(
"RunTestByIdInput",
test_id=(str, Field(default="", description="Test id to execute")),
test_parameters=(dict, Field(default=None, description="Test parameters to override")),
)

def _run(self, test_id: str):
def _run(self, test_id: str, test_parameters=None):
try:
# Fetch test data
tests = self.api_wrapper.get_tests_list()
test_data = {}
for test in tests:
Expand All @@ -95,10 +97,24 @@ def _run(self, test_id: str):
if not test_data:
raise ValueError(f"Test with id {test_id} not found.")

# Default test parameters
default_test_parameters = test_data.get("test_parameters", [])

# If no test_parameters are provided, return the default ones for confirmation
if test_parameters is None:
return {
"message": "Please confirm or override the following test parameters to proceed with the test execution.",
"default_test_parameters": default_test_parameters,
"instruction": "To override parameters, provide a dictionary of updated values for 'test_parameters'.",
}

# Apply user-provided test parameters
updated_test_parameters = self._apply_test_parameters(default_test_parameters, test_parameters)

# Build common_params dictionary
common_params = {
param["name"]: param
for param in test_data.get("test_parameters", [])
for param in default_test_parameters
if param["name"] in {"test_name", "test_type", "env_type"}
}

Expand All @@ -107,17 +123,32 @@ def _run(self, test_id: str):
common_params["parallel_runners"] = test_data.get("parallel_runners")
common_params["location"] = test_data.get("location")

# Build the JSON body
json_body = {
"common_params": common_params,
"test_parameters": test_data.get("test_parameters", []),
"test_parameters": updated_test_parameters,
"integrations": test_data.get("integrations", {})
}

# Execute the test
report_id = self.api_wrapper.run_test(test_id, json_body)
return f"Test started. Report id: {report_id}. Link to report:" \
f" https://platform.getcarrier.io/-/performance/backend/results?result_id={report_id}"
f"{self.api_wrapper.url.rstrip('/')}/-/performance/backend/results?result_id={report_id}"

except Exception:
stacktrace = traceback.format_exc()
logger.error(f"Test not found: {stacktrace}")
raise ToolException(stacktrace)

def _apply_test_parameters(self, default_test_parameters, user_parameters):
"""
Apply user-provided parameters to the default test parameters.
"""
updated_parameters = []
for param in default_test_parameters:
name = param["name"]
if name in user_parameters:
# Override the parameter value with the user-provided value
param["default"] = user_parameters[name]
updated_parameters.append(param)
return updated_parameters
Loading