diff --git a/test_automation_framework/.flake8 b/test_automation_framework/.flake8 new file mode 100644 index 000000000..37c03609e --- /dev/null +++ b/test_automation_framework/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 120 +ignore = E203, W503 +exclude = .git,__pycache__,build,dist diff --git a/test_automation_framework/.gitignore b/test_automation_framework/.gitignore new file mode 100644 index 000000000..2d08c15ef --- /dev/null +++ b/test_automation_framework/.gitignore @@ -0,0 +1,5 @@ +.env +__pycache__/ +*.pyc +.vscode/ +.idea/ \ No newline at end of file diff --git a/test_automation_framework/.pre-commit-config.yaml b/test_automation_framework/.pre-commit-config.yaml new file mode 100644 index 000000000..f1d963b4e --- /dev/null +++ b/test_automation_framework/.pre-commit-config.yaml @@ -0,0 +1,14 @@ +repos: + - repo: https://github.com/psf/black + rev: 25.1.0 + hooks: + - id: black + args: [--line-length=120] + language_version: python3 + + - repo: https://github.com/pycqa/flake8 + rev: 7.3.0 + hooks: + - id: flake8 + args: [--config=test_automation_framework/.flake8] + additional_dependencies: [] diff --git a/test_automation_framework/README.md b/test_automation_framework/README.md new file mode 100644 index 000000000..b8d16ec11 --- /dev/null +++ b/test_automation_framework/README.md @@ -0,0 +1,47 @@ +# BadgerDoc Test Automation Framework + +This project is a Python-based **test automation framework** built with [pytest](https://docs.pytest.org/). + +## Getting Started + +### 1. Install PDM +Make sure you have [PDM](https://pdm-project.org/latest/#installation) installed: + +```bash +brew install pdm # macOS +# or +pip install pdm +``` + +Verify installation: + +```bash +pdm --version +``` + +### 2. Clone the repository + +```bash +git clone https://github.com/epam/badgerdoc.git +cd badgerdoc +``` + +### 3. Install dependencies + +```bash +pdm install +``` + +### 4. Pre-commit hooks + +Enable pre-commit to enforce style and linting: +```bash +pre-commit install +``` +Now hooks will run automatically before each commit. + +### 5. Run tests + +```bash +pdm run pytest +``` \ No newline at end of file diff --git a/test_automation_framework/config/defaults.yaml b/test_automation_framework/config/defaults.yaml new file mode 100644 index 000000000..10cd5f9f2 --- /dev/null +++ b/test_automation_framework/config/defaults.yaml @@ -0,0 +1,7 @@ +BASE_URL: "http://demo.badgerdoc.com:8080" +TIMEOUT_SECONDS: 30 +MAX_WORKERS: 4 +USE_MOCK_LLM: true +LOG_LEVEL: "INFO" +API_USER: "user@example.com" +API_PASS: "changeme" diff --git a/test_automation_framework/conftest.py b/test_automation_framework/conftest.py new file mode 100644 index 000000000..d99090344 --- /dev/null +++ b/test_automation_framework/conftest.py @@ -0,0 +1,150 @@ +import logging +from logging import getLogger +from typing import Tuple + +import pytest + +from settings import load_settings +from helpers.auth.auth_service import AuthService +from helpers.base_client.base_client import BaseClient +from helpers.datasets.dataset_client import DatasetClient +from helpers.files.file_client import FileClient +from helpers.jobs.jobs_client import JobsClient +from helpers.menu.menu_client import MenuClient +from helpers.category.categories import CategoriesClient +from helpers.users.users import UsersClient +from helpers.reports.reports_client import ReportsClient +from helpers.plugins.plugins_client import PluginsClient + +logger = getLogger(__name__) + + +def pytest_configure(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + + +@pytest.fixture(scope="session") +def settings(): + return load_settings() + + +@pytest.fixture(scope="session") +def tenant(settings) -> str: + return getattr(settings, "TENANT", "demo-badgerdoc") + + +@pytest.fixture(scope="session") +def base_client(settings) -> BaseClient: + client = BaseClient(settings.BASE_URL, timeout=10) + yield client + client.close() + + +@pytest.fixture(scope="session") +def auth_service(base_client) -> AuthService: + return AuthService(base_client) + + +@pytest.fixture(scope="session") +def auth_token(auth_service, settings) -> Tuple[str, str]: + return auth_service.get_token(settings.API_USER, settings.API_PASS.get_secret_value()) + + +@pytest.fixture +def access_token(auth_token) -> str: + return auth_token[0] + + +@pytest.fixture +def menu_client(settings, access_token, tenant) -> MenuClient: + return MenuClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def dataset_client(settings, access_token, tenant) -> DatasetClient: + return DatasetClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def file_client(settings, access_token, tenant) -> FileClient: + return FileClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def jobs_client(settings, access_token, tenant) -> JobsClient: + return JobsClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def reports_client(settings, access_token, tenant) -> ReportsClient: + return ReportsClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def plugins_client(settings, access_token, tenant) -> PluginsClient: + return PluginsClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def user_uuid(settings, access_token, tenant) -> str: + users_client = UsersClient(settings.BASE_URL, access_token, tenant) + users = users_client.search_users() + return next((u.id for u in users if u.username == "admin"), None) + + +@pytest.fixture +def categories_client(settings, access_token, tenant) -> CategoriesClient: + return CategoriesClient(settings.BASE_URL, access_token, tenant) + + +@pytest.fixture +def dataset_tracker(dataset_client): + created: list[str] = [] + yield created, dataset_client + for name in created: + try: + resp = dataset_client.delete_dataset(name=name) + logger.info(f"[dataset_tracker] Deleted dataset {name}: {resp.get('detail')}") + except Exception as e: + logger.warning(f"[dataset_tracker] Failed to delete dataset {name}: {e}") + + +@pytest.fixture +def file_tracker(file_client): + created_files: list[dict] = [] + yield created_files, file_client + if created_files: + ids = [f["id"] for f in created_files if f.get("id") is not None] + if ids: + try: + result = file_client.delete_files(ids) + logger.info(f"[file_tracker] Deleted files: {ids}, response={result}") + except Exception as e: + logger.warning(f"[file_tracker] Failed to cleanup files {ids}: {e}") + + +@pytest.fixture +def job_tracker(jobs_client): + created: list[dict] = [] + yield created, jobs_client + for job in created: + job_id = job.get("id") or job.get("job_id") or (job.get("job") or {}).get("id") + if not job_id: + continue + try: + jobs_client.post("/jobs/jobs/cancel", json={"id": job_id}, headers=jobs_client._default_headers()) + logger.info(f"[job_tracker] Cancelled job {job_id}") + except Exception as e: + logger.warning(f"[job_tracker] Could not cancel job {job_id}: {e}") + + +@pytest.fixture +def plugins_tracker(plugins_client): + created: list[int] = [] + yield created, plugins_client + for id in created: + try: + plugins_client.delete_plugin(plugin_id=id) + logger.info(f"[plugins_tracker] Deleted plugin {id}") + except Exception as e: + logger.warning(f"[plugins_tracker] Failed to delete plugin {id}: {e}") diff --git a/test_automation_framework/helpers/auth/auth_service.py b/test_automation_framework/helpers/auth/auth_service.py new file mode 100644 index 000000000..89753c18c --- /dev/null +++ b/test_automation_framework/helpers/auth/auth_service.py @@ -0,0 +1,46 @@ +from __future__ import annotations +from typing import Optional +from pydantic import BaseModel + +from helpers.base_client.base_client import BaseClient + + +class TokenResponse(BaseModel): + access_token: str + refresh_token: str + id_token: Optional[str] = None + scope: Optional[str] = None + session_state: Optional[str] = None + token_type: Optional[str] = None + expires_in: Optional[int] = None + + +class AuthService: + def __init__(self, client: BaseClient) -> None: + self.client = client + + def get_token(self, username: str, password: str, client_id: str = "admin-cli") -> tuple[str, str]: + resp = self.client.post_json( + "/users/token", + data={ + "grant_type": "password", + "username": username, + "password": password, + "client_id": client_id, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + result = TokenResponse.model_validate(resp) + return result.access_token, result.refresh_token + + def refresh_token(self, refresh_token: str, client_id: str = "admin-cli") -> tuple[str, str]: + resp = self.client.post_json( + "/users/refresh_token", + json={ + "grant_type": "refresh_token", + "client_id": client_id, + "refresh_token": refresh_token, + }, + ) + result = TokenResponse.model_validate(resp) + return result.access_token, result.refresh_token diff --git a/test_automation_framework/helpers/base_client/base_client.py b/test_automation_framework/helpers/base_client/base_client.py new file mode 100644 index 000000000..b21a77548 --- /dev/null +++ b/test_automation_framework/helpers/base_client/base_client.py @@ -0,0 +1,125 @@ +from __future__ import annotations +from typing import Any, Optional +import httpx +import time +import logging + +logger = logging.getLogger(__name__) + + +class HTTPError(RuntimeError): + def __init__( + self, + message: str, + status_code: Optional[int] = None, + body: Optional[str] = None, + ): + super().__init__(message) + self.status_code = status_code + self.body = body + + def __str__(self): + base = super().__str__() + if self.body: + return f"{base}\nResponse body: {self.body}" + return base + + +class BaseClient: + def __init__( + self, base_url: str, timeout: int = 30, token: Optional[str] = None, tenant: Optional[str] = None + ) -> None: + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self._token = token + self._tenant = tenant + self._client = httpx.Client(base_url=self.base_url, timeout=self.timeout) + + def set_token(self, token: str | None) -> None: + self._token = token + + def set_tenant(self, tenant: str | None) -> None: + self._tenant = tenant + + def _default_headers(self, content_type_json: bool = False, extra: dict[str, str] | None = None) -> dict[str, str]: + headers: dict[str, str] = {} + if self._token: + headers["Authorization"] = f"Bearer {self._token}" + if self._tenant: + headers["X-Current-Tenant"] = self._tenant + if content_type_json: + headers["Content-Type"] = "application/json" + if extra: + headers.update(extra) + return headers + + def _request(self, method: str, path: str, headers: dict | None = None, **kwargs: Any) -> httpx.Response: + rel_path = path if path.startswith("/") else "/" + path + start = time.perf_counter() + merged_headers = {**self._default_headers(), **(headers or {})} + + # Log the request details for debugging + logger.debug(f"Making {method} request to {self.base_url}{rel_path}") + logger.debug(f"Headers: {merged_headers}") + if "json" in kwargs: + logger.debug(f"JSON payload: {kwargs['json']}") + + try: + resp = self._client.request(method, rel_path, headers=merged_headers, **kwargs) + resp.raise_for_status() + logger.debug( + f"HTTP {method} {self.base_url}{rel_path} -> {resp.status_code} in {time.perf_counter() - start:.3f}s" + ) + return resp + except httpx.HTTPStatusError as exc: + resp = exc.response + error_body = resp.text + logger.error( + f"Bad response: {resp.status_code} for {method} {self.base_url}{rel_path} - body: {error_body[:500]}" + ) + # Create a more informative error message + error_message = f"{method} {self.base_url}{rel_path} -> {resp.status_code}" + if error_body: + error_message += f"\nServer response: {error_body}" + + raise HTTPError( + error_message, + status_code=resp.status_code, + body=error_body, + ) from exc + except httpx.RequestError as exc: + logger.exception(f"Request failed: {method} {self.base_url}{rel_path}") + raise HTTPError(f"request failed: {method} {self.base_url}{rel_path}") from exc + + def get(self, path: str, **kwargs: Any) -> httpx.Response: + return self._request("GET", path, **kwargs) + + def post(self, path: str, **kwargs: Any) -> httpx.Response: + return self._request("POST", path, **kwargs) + + def put(self, path: str, **kwargs: Any) -> httpx.Response: + return self._request("PUT", path, **kwargs) + + def delete(self, path: str, **kwargs: Any) -> httpx.Response: + return self._request("DELETE", path, **kwargs) + + def get_json(self, path: str, headers: dict | None = None, **kwargs: Any) -> Any: + return self._request("GET", path, headers=headers, **kwargs).json() + + def post_json(self, path: str, headers: dict | None = None, **kwargs: Any) -> Any: + return self._request("POST", path, headers=headers, **kwargs).json() + + def put_json(self, path: str, headers: dict | None = None, **kwargs: Any) -> Any: + return self._request("PUT", path, headers=headers, **kwargs).json() + + def delete_json(self, path: str, headers: dict | None = None, **kwargs: Any) -> Any: + return self._request("DELETE", path, headers=headers, **kwargs).json() + + def close(self) -> None: + self._client.close() + + def __enter__(self) -> "BaseClient": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() diff --git a/test_automation_framework/helpers/category/categories.py b/test_automation_framework/helpers/category/categories.py new file mode 100644 index 000000000..12760608e --- /dev/null +++ b/test_automation_framework/helpers/category/categories.py @@ -0,0 +1,113 @@ +from __future__ import annotations +from typing import List, Optional +from pydantic import BaseModel +import logging +from helpers.base_client.base_client import BaseClient + +logger = logging.getLogger(__name__) + + +class CategoryParent(BaseModel): + name: str + id: str + type: str + metadata: dict + parent: Optional[str] = None + data_attributes: List[dict] = [] + is_leaf: Optional[bool] = None + + +class Category(BaseModel): + id: str + name: str + type: str + metadata: dict + parent: Optional[str] = None + data_attributes: List[dict] = [] + parents: List[CategoryParent] = [] + is_leaf: bool + + +class Pagination(BaseModel): + page_num: int + page_offset: int + page_size: int + min_pages_left: int + total: int + has_more: bool + + +class CategoriesResponse(BaseModel): + pagination: Pagination + data: List[Category] + + +class CategoryCreateResponse(BaseModel): + id: str + name: str + type: str + metadata: dict + parent: Optional[str] = None + data_attributes: list[dict] = [] + editor: Optional[str] = None + parents: Optional[list[dict]] = None + is_leaf: Optional[bool] = None + + +class CategoriesClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def search_categories( + self, + page_num: int = 1, + page_size: int = 15, + filters: list[dict] | None = None, + sorting: list[dict] | None = None, + ) -> CategoriesResponse: + payload = { + "pagination": {"page_num": page_num, "page_size": page_size}, + "filters": filters or [], + "sorting": sorting or [{"direction": "desc", "field": "name"}], + } + + resp = self.post_json( + "/annotation/categories/search", + json=payload, + headers=self._default_headers(content_type_json=True), + ) + return CategoriesResponse.model_validate(resp) + + def create_category( + self, + category_id: str, + name: str, + category_type: str = "box", + parent: str | None = None, + metadata: dict | None = None, + data_attributes: list[dict] | None = None, + ) -> CategoryCreateResponse: + payload = { + "id": category_id, + "name": name, + "type": category_type, + "parent": parent, + "metadata": metadata or {"color": "#67DE61"}, + "data_attributes": data_attributes or [], + } + resp = self.post_json( + "/annotation/categories", + json=payload, + headers=self._default_headers(content_type_json=True), + ) + return CategoryCreateResponse.model_validate(resp) + + def delete_category(self, category_id: str) -> dict: + payload = {"id": category_id} + resp = self.delete_json( + "/annotation/categories", + json=payload, + headers=self._default_headers(content_type_json=True), + ) + logger.info(f"Deleted category {category_id}") + return resp diff --git a/test_automation_framework/helpers/constants.py b/test_automation_framework/helpers/constants.py new file mode 100644 index 000000000..fae538694 --- /dev/null +++ b/test_automation_framework/helpers/constants.py @@ -0,0 +1,2 @@ +AIRFLOW_PIPELINE = "airflow" +PRINT_PIPELINE = "print" diff --git a/test_automation_framework/helpers/datasets/dataset_client.py b/test_automation_framework/helpers/datasets/dataset_client.py new file mode 100644 index 000000000..653213a6e --- /dev/null +++ b/test_automation_framework/helpers/datasets/dataset_client.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from helpers.base_client.base_client import BaseClient +import logging + +logger = logging.getLogger(__name__) + + +class DatasetClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def search( + self, + page_num: int = 1, + page_size: int = 100, + filters: list[dict] | None = None, + sorting: list[dict] | None = None, + ) -> dict: + payload = { + "pagination": {"page_num": page_num, "page_size": page_size}, + "filters": filters or [], + "sorting": sorting or [{"direction": "asc", "field": "name"}], + } + return self.post_json( + "/assets/datasets/search", json=payload, headers=self._default_headers(content_type_json=True) + ) + + def search_files( + self, + dataset_id: int | None = None, + page_num: int = 1, + page_size: int = 15, + ) -> dict: + filters = [] + if dataset_id is not None: + filters.append({"field": "datasets.id", "operator": "eq", "value": dataset_id}) + else: + filters.append({"field": "original_name", "operator": "ilike", "value": "%%"}) + + payload = { + "pagination": {"page_num": page_num, "page_size": page_size}, + "filters": filters, + "sorting": [{"direction": "desc", "field": "last_modified"}], + } + + return self.post_json( + "/assets/files/search", json=payload, headers=self._default_headers(content_type_json=True) + ) + + def create_dataset(self, name: str) -> dict: + payload = {"name": name} + resp = self.post_json("/assets/datasets", json=payload, headers=self._default_headers(content_type_json=True)) + logger.info(f"Created dataset {name}") + return resp + + def delete_dataset(self, name: str) -> dict: + payload = {"name": name} + resp = self.delete_json("/assets/datasets", json=payload, headers=self._default_headers(content_type_json=True)) + logger.info(f"Deleted dataset {name}") + return resp diff --git a/test_automation_framework/helpers/files/file_client.py b/test_automation_framework/helpers/files/file_client.py new file mode 100644 index 000000000..ca7a33b95 --- /dev/null +++ b/test_automation_framework/helpers/files/file_client.py @@ -0,0 +1,93 @@ +from __future__ import annotations +from helpers.base_client.base_client import BaseClient +import logging +from typing import List +import shutil +import uuid +from pathlib import Path +import httpx +from helpers.base_client.base_client import HTTPError + +logger = logging.getLogger(__name__) + + +class FileClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def upload_file(self, file_path: str) -> dict: + with open(file_path, "rb") as f: + files = {"files": (file_path.split("/")[-1], f, "application/pdf")} + resp = self.post("/assets/files", files=files, headers=self._default_headers()) + logger.info(f"Uploaded file {file_path}") + return resp.json() + + def delete_files(self, ids: List[int]) -> dict: + resp = self.delete_json( + "/assets/files", + json={"objects": ids}, + headers=self._default_headers(content_type_json=True), + ) + logger.info(f"Deleted file {ids}") + return resp + + def search_files( + self, + page_num: int = 1, + page_size: int = 15, + filters: list[dict] | None = None, + ) -> dict: + payload = { + "pagination": {"page_num": page_num, "page_size": page_size}, + "filters": filters or [{"field": "original_name", "operator": "ilike", "value": "%%"}], + "sorting": [{"direction": "desc", "field": "last_modified"}], + } + return self.post_json( + "/assets/files/search", json=payload, headers=self._default_headers(content_type_json=True) + ) + + def move_files(self, name: str, objects: list) -> dict: + payload = {"name": name, "objects": objects} + resp = self.post_json( + "/assets/datasets/bonds", json=payload, headers=self._default_headers(content_type_json=True) + ) + logger.info(f"Moved object {objects} to the dataset {name}") + return resp + + @staticmethod + def upload_temp_file(client, file_tracker, tmp_path, suffix="pdf"): + data_dir = Path(__file__).parent.parent.parent / "data" + original_file = data_dir / "multivitamin.pdf" + unique_name = f"{uuid.uuid4().hex}.{suffix}" + temp_file = tmp_path / unique_name + shutil.copy(original_file, temp_file) + result = client.upload_file(str(temp_file)) + file_info = result[0] + assert file_info["status"] is True + file_tracker[0].append(file_info) + return file_info, temp_file + + def download_file(self, file_id: int) -> bytes: + resp = self._client.get( + f"{self.base_url}/assets/download?file_id={file_id}", + headers=self._default_headers(), + follow_redirects=False, + ) + + if resp.status_code >= 400: + raise HTTPError( + f"GET {resp.request.url} -> {resp.status_code}", + status_code=resp.status_code, + body=resp.text, + ) + + if resp.status_code == 302 and "location" in resp.headers: + s3_resp = httpx.get(resp.headers["location"]) + s3_resp.raise_for_status() + return s3_resp.content + + raise HTTPError( + f"Unexpected response {resp.status_code} for file_id={file_id}", + status_code=resp.status_code, + body=resp.text, + ) diff --git a/test_automation_framework/helpers/jobs/jobs_client.py b/test_automation_framework/helpers/jobs/jobs_client.py new file mode 100644 index 000000000..632f3d7ce --- /dev/null +++ b/test_automation_framework/helpers/jobs/jobs_client.py @@ -0,0 +1,107 @@ +from __future__ import annotations +from typing import Any, Dict, List +import time +import logging +from helpers.constants import AIRFLOW_PIPELINE, PRINT_PIPELINE + +from helpers.base_client.base_client import BaseClient + +logger = logging.getLogger(__name__) + + +class JobsClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def get_supported_pipelines(self) -> List[Dict[str, Any]]: + return self.get_json("/jobs/pipelines/support", headers=self._default_headers()) + + def get_pipeline(self, engine_resource: str) -> Dict[str, Any]: + return self.get_json(f"/jobs/pipelines/{engine_resource}", headers=self._default_headers()) + + def create_job( + self, + name: str, + file_ids: list[int], + owners: list[str], + pipeline_id: str = PRINT_PIPELINE, + pipeline_engine: str = AIRFLOW_PIPELINE, + datasets: list[int] | None = None, + categories: list[str] | None = None, + annotators: list[str] | None = None, + validators: list[str] | None = None, + previous_jobs: list[int] | None = None, + revisions: list[int] | None = None, + is_draft: bool = False, + is_auto_distribution: bool = False, + start_manual_job_automatically: bool = False, + job_type: str = "ExtractionJob", + pipeline_name: str | None = None, + ): + payload = { + "name": name, + "revisions": revisions or [], + "datasets": datasets or [], + "files": file_ids, + "previous_jobs": previous_jobs or [], + "type": job_type, + "is_draft": is_draft, + "is_auto_distribution": is_auto_distribution, + "start_manual_job_automatically": start_manual_job_automatically, + "categories": categories or [], + "owners": owners or [], + "annotators": annotators or [], + "validators": validators or [], + "pipeline_name": pipeline_name or pipeline_id, + "pipeline_id": pipeline_id, + "pipeline_engine": pipeline_engine, + } + + return self.post_json( + "/jobs/jobs/create_job", json=payload, headers=self._default_headers(content_type_json=True) + ) + + def get_job(self, job_id: int) -> Dict[str, Any]: + return self.get_json(f"/jobs/jobs/{job_id}", headers=self._default_headers()) + + def get_progress(self, job_id: int) -> Dict[str, Any]: + return self.post_json( + "/jobs/jobs/progress", json=[job_id], headers=self._default_headers(content_type_json=True) + ) + + def poll_until_finished( + self, + job_id: int, + timeout_seconds: int = 120, + interval_seconds: float = 1.0, + backoff_factor: float = 1.5, + ) -> Dict[str, Any]: + start = time.monotonic() + current_interval = interval_seconds + + logger.info(f"Polling job {job_id} until finished (timeout {timeout_seconds}s)") + while True: + elapsed = time.monotonic() - start + if elapsed > timeout_seconds: + raise TimeoutError(f"Job {job_id} not finished after {timeout_seconds}s") + job_obj = self.get_job(job_id) + status = job_obj.get("status") or job_obj.get("data", {}).get("status") + logger.info(f"Polled job {job_id} status: {status}") + + if status and str(status).lower() in {"finished", "success", "completed"}: + logger.info(f"Job {job_id} finished with status={status}") + return job_obj + try: + progress = self.get_progress(job_id) + if isinstance(progress, dict): + for k, v in progress.items(): + if str(k) == str(job_id) and isinstance(v, dict): + fin = v.get("finished") + tot = v.get("total") + if fin is not None and tot is not None and fin >= tot: + logger.info("Progress shows job finished (finished>=total)") + return self.get_job(job_id) + except Exception: + logger.debug(f"Progress probe failed for job {job_id}; will retry") + time.sleep(current_interval) + current_interval = min(current_interval * backoff_factor, 10.0) diff --git a/test_automation_framework/helpers/menu/menu_client.py b/test_automation_framework/helpers/menu/menu_client.py new file mode 100644 index 000000000..9081dbc88 --- /dev/null +++ b/test_automation_framework/helpers/menu/menu_client.py @@ -0,0 +1,10 @@ +from __future__ import annotations +from helpers.base_client.base_client import BaseClient + + +class MenuClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def get_menu(self) -> list[dict]: + return self.get_json("/core/menu", headers=self._default_headers()) diff --git a/test_automation_framework/helpers/plugins/plugins_client.py b/test_automation_framework/helpers/plugins/plugins_client.py new file mode 100644 index 000000000..9f4d31f04 --- /dev/null +++ b/test_automation_framework/helpers/plugins/plugins_client.py @@ -0,0 +1,73 @@ +from typing import Any, Dict, List +import logging +from helpers.base_client.base_client import BaseClient + + +logger = logging.getLogger(__name__) + + +class PluginsClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def get_plugins(self) -> List[Dict[str, Any]]: + return self.get_json("/core/plugins", headers=self._default_headers()) + + def create_plugin( + self, + name: str, + menu_name: str, + url: str, + version: str = "1", + description: str = "", + is_iframe: bool = True, + ) -> dict: + payload = { + "name": name, + "menu_name": menu_name, + "description": description, + "version": version, + "url": url, + "is_iframe": is_iframe, + } + + headers = self._default_headers(content_type_json=True) + headers.update( + { + "Accept": "*/*", + "Accept-Encoding": "gzip, deflate", + "Accept-Language": "en-US,en;q=0.5", + "Connection": "keep-alive", + "DNT": "1", + "Origin": "http://demo.badgerdoc.com:8083", + "Priority": "u=0", + "Referer": "http://demo.badgerdoc.com:8083/", + "Sec-GPC": "1", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:138.0) Gecko/20100101 Firefox/138.0", + } + ) + + try: + return self.post_json( + "/core/plugins", + json=payload, + headers=headers, + ) + except Exception as e: + logger.error(f"Failed to create plugin: {e}") + if hasattr(e, "body"): + logger.error(f"Response body: {e.body}") + raise + + def update_plugin(self, plugin_id: int, **fields) -> dict: + return self.put_json( + f"/core/plugins/{plugin_id}", + json=fields, + headers=self._default_headers(content_type_json=True), + ) + + def delete_plugin(self, plugin_id: int) -> dict: + return self.delete_json( + f"/core/plugins/{plugin_id}", + headers=self._default_headers(content_type_json=True), + ) diff --git a/test_automation_framework/helpers/reports/reports_client.py b/test_automation_framework/helpers/reports/reports_client.py new file mode 100644 index 000000000..69ea929ce --- /dev/null +++ b/test_automation_framework/helpers/reports/reports_client.py @@ -0,0 +1,30 @@ +from typing import List +import logging +from helpers.base_client.base_client import BaseClient + +logger = logging.getLogger(__name__) + + +class ReportsClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def export_tasks( + self, + user_ids: List[str], + date_from: str, + date_to: str, + ) -> str: + payload = { + "user_ids": user_ids, + "date_from": date_from, + "date_to": date_to, + } + resp = self.post( + "/annotation/tasks/export", + json=payload, + headers=self._default_headers(content_type_json=True), + ) + resp.raise_for_status() + logger.info(f"Exported tasks for users={user_ids} from {date_from} to {date_to}") + return resp.text diff --git a/test_automation_framework/helpers/users/users.py b/test_automation_framework/helpers/users/users.py new file mode 100644 index 000000000..3222d371f --- /dev/null +++ b/test_automation_framework/helpers/users/users.py @@ -0,0 +1,37 @@ +from __future__ import annotations +from typing import Any, Dict, List, Optional +from pydantic import BaseModel + +from helpers.base_client.base_client import BaseClient + + +class UserAccess(BaseModel): + manageGroupMembership: bool + view: bool + mapRoles: bool + impersonate: bool + manage: bool + + +class UserResponse(BaseModel): + id: str + username: str + enabled: bool + email: Optional[str] = None + emailVerified: Optional[bool] = None + firstName: Optional[str] = None + lastName: Optional[str] = None + attributes: Optional[Dict[str, Any]] = None + access: Optional[UserAccess] = None + + +class UsersClient(BaseClient): + def __init__(self, base_url: str, token: str, tenant: str) -> None: + super().__init__(base_url, token=token, tenant=tenant) + + def search_users(self, filters: Optional[List[Dict[str, Any]]] = None) -> List[UserResponse]: + payload = {"filters": filters or []} + resp = self.post_json( + "/users/users/search", json=payload, headers=self._default_headers(content_type_json=True) + ) + return [UserResponse.model_validate(u) for u in resp] diff --git a/test_automation_framework/pdm.lock b/test_automation_framework/pdm.lock new file mode 100644 index 000000000..d597e7a73 --- /dev/null +++ b/test_automation_framework/pdm.lock @@ -0,0 +1,425 @@ +# This file is @generated by PDM. +# It is not intended for manual editing. + +[metadata] +groups = ["default"] +strategy = ["inherit_metadata"] +lock_version = "4.5.0" +content_hash = "sha256:d56b4fa3df2a34dc34169a0a8ae56c73e69c5bec37976962b9e4dfc446248a24" + +[[metadata.targets]] +requires_python = "==3.13.*" + +[[package]] +name = "annotated-types" +version = "0.7.0" +requires_python = ">=3.8" +summary = "Reusable constraint types to use with typing.Annotated" +groups = ["default"] +dependencies = [ + "typing-extensions>=4.0.0; python_version < \"3.9\"", +] +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + +[[package]] +name = "anyio" +version = "4.10.0" +requires_python = ">=3.9" +summary = "High-level concurrency and networking framework on top of asyncio or Trio" +groups = ["default"] +dependencies = [ + "exceptiongroup>=1.0.2; python_version < \"3.11\"", + "idna>=2.8", + "sniffio>=1.1", + "typing-extensions>=4.5; python_version < \"3.13\"", +] +files = [ + {file = "anyio-4.10.0-py3-none-any.whl", hash = "sha256:60e474ac86736bbfd6f210f7a61218939c318f43f9972497381f1c5e930ed3d1"}, + {file = "anyio-4.10.0.tar.gz", hash = "sha256:3f3fae35c96039744587aa5b8371e7e8e603c0702999535961dd336026973ba6"}, +] + +[[package]] +name = "certifi" +version = "2025.8.3" +requires_python = ">=3.7" +summary = "Python package for providing Mozilla's CA Bundle." +groups = ["default"] +files = [ + {file = "certifi-2025.8.3-py3-none-any.whl", hash = "sha256:f6c12493cfb1b06ba2ff328595af9350c65d6644968e5d3a2ffd78699af217a5"}, + {file = "certifi-2025.8.3.tar.gz", hash = "sha256:e564105f78ded564e3ae7c923924435e1daa7463faeab5bb932bc53ffae63407"}, +] + +[[package]] +name = "cfgv" +version = "3.4.0" +requires_python = ">=3.8" +summary = "Validate configuration and produce human readable error messages." +groups = ["default"] +files = [ + {file = "cfgv-3.4.0-py2.py3-none-any.whl", hash = "sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9"}, + {file = "cfgv-3.4.0.tar.gz", hash = "sha256:e52591d4c5f5dead8e0f673fb16db7949d2cfb3f7da4582893288f0ded8fe560"}, +] + +[[package]] +name = "colorama" +version = "0.4.6" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +summary = "Cross-platform colored terminal text." +groups = ["default"] +marker = "sys_platform == \"win32\"" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "distlib" +version = "0.4.0" +summary = "Distribution utilities" +groups = ["default"] +files = [ + {file = "distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16"}, + {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, +] + +[[package]] +name = "dotenv" +version = "0.9.9" +summary = "Deprecated package" +groups = ["default"] +dependencies = [ + "python-dotenv", +] +files = [ + {file = "dotenv-0.9.9-py2.py3-none-any.whl", hash = "sha256:29cf74a087b31dafdb5a446b6d7e11cbce8ed2741540e2339c69fbef92c94ce9"}, +] + +[[package]] +name = "filelock" +version = "3.19.1" +requires_python = ">=3.9" +summary = "A platform independent file lock." +groups = ["default"] +files = [ + {file = "filelock-3.19.1-py3-none-any.whl", hash = "sha256:d38e30481def20772f5baf097c122c3babc4fcdb7e14e57049eb9d88c6dc017d"}, + {file = "filelock-3.19.1.tar.gz", hash = "sha256:66eda1888b0171c998b35be2bcc0f6d75c388a7ce20c3f3f37aa8e96c2dddf58"}, +] + +[[package]] +name = "h11" +version = "0.16.0" +requires_python = ">=3.8" +summary = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +groups = ["default"] +files = [ + {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, + {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +requires_python = ">=3.8" +summary = "A minimal low-level HTTP client." +groups = ["default"] +dependencies = [ + "certifi", + "h11>=0.16", +] +files = [ + {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, + {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, +] + +[[package]] +name = "httpx" +version = "0.28.1" +requires_python = ">=3.8" +summary = "The next generation HTTP client." +groups = ["default"] +dependencies = [ + "anyio", + "certifi", + "httpcore==1.*", + "idna", +] +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[[package]] +name = "identify" +version = "2.6.13" +requires_python = ">=3.9" +summary = "File identification library for Python" +groups = ["default"] +files = [ + {file = "identify-2.6.13-py2.py3-none-any.whl", hash = "sha256:60381139b3ae39447482ecc406944190f690d4a2997f2584062089848361b33b"}, + {file = "identify-2.6.13.tar.gz", hash = "sha256:da8d6c828e773620e13bfa86ea601c5a5310ba4bcd65edf378198b56a1f9fb32"}, +] + +[[package]] +name = "idna" +version = "3.10" +requires_python = ">=3.6" +summary = "Internationalized Domain Names in Applications (IDNA)" +groups = ["default"] +files = [ + {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, + {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, +] + +[[package]] +name = "iniconfig" +version = "2.1.0" +requires_python = ">=3.8" +summary = "brain-dead simple config-ini parsing" +groups = ["default"] +files = [ + {file = "iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760"}, + {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, +] + +[[package]] +name = "nodeenv" +version = "1.9.1" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +summary = "Node.js virtual environment builder" +groups = ["default"] +files = [ + {file = "nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9"}, + {file = "nodeenv-1.9.1.tar.gz", hash = "sha256:6ec12890a2dab7946721edbfbcd91f3319c6ccc9aec47be7c7e6b7011ee6645f"}, +] + +[[package]] +name = "packaging" +version = "25.0" +requires_python = ">=3.8" +summary = "Core utilities for Python packages" +groups = ["default"] +files = [ + {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, + {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, +] + +[[package]] +name = "platformdirs" +version = "4.3.8" +requires_python = ">=3.9" +summary = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." +groups = ["default"] +files = [ + {file = "platformdirs-4.3.8-py3-none-any.whl", hash = "sha256:ff7059bb7eb1179e2685604f4aaf157cfd9535242bd23742eadc3c13542139b4"}, + {file = "platformdirs-4.3.8.tar.gz", hash = "sha256:3d512d96e16bcb959a814c9f348431070822a6496326a4be0911c40b5a74c2bc"}, +] + +[[package]] +name = "pluggy" +version = "1.6.0" +requires_python = ">=3.9" +summary = "plugin and hook calling mechanisms for python" +groups = ["default"] +files = [ + {file = "pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746"}, + {file = "pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3"}, +] + +[[package]] +name = "pre-commit" +version = "4.3.0" +requires_python = ">=3.9" +summary = "A framework for managing and maintaining multi-language pre-commit hooks." +groups = ["default"] +dependencies = [ + "cfgv>=2.0.0", + "identify>=1.0.0", + "nodeenv>=0.11.1", + "pyyaml>=5.1", + "virtualenv>=20.10.0", +] +files = [ + {file = "pre_commit-4.3.0-py2.py3-none-any.whl", hash = "sha256:2b0747ad7e6e967169136edffee14c16e148a778a54e4f967921aa1ebf2308d8"}, + {file = "pre_commit-4.3.0.tar.gz", hash = "sha256:499fe450cc9d42e9d58e606262795ecb64dd05438943c62b66f6a8673da30b16"}, +] + +[[package]] +name = "pydantic" +version = "2.11.7" +requires_python = ">=3.9" +summary = "Data validation using Python type hints" +groups = ["default"] +dependencies = [ + "annotated-types>=0.6.0", + "pydantic-core==2.33.2", + "typing-extensions>=4.12.2", + "typing-inspection>=0.4.0", +] +files = [ + {file = "pydantic-2.11.7-py3-none-any.whl", hash = "sha256:dde5df002701f6de26248661f6835bbe296a47bf73990135c7d07ce741b9623b"}, + {file = "pydantic-2.11.7.tar.gz", hash = "sha256:d989c3c6cb79469287b1569f7447a17848c998458d49ebe294e975b9baf0f0db"}, +] + +[[package]] +name = "pydantic-core" +version = "2.33.2" +requires_python = ">=3.9" +summary = "Core functionality for Pydantic validation and serialization" +groups = ["default"] +dependencies = [ + "typing-extensions!=4.7.0,>=4.6.0", +] +files = [ + {file = "pydantic_core-2.33.2-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:1082dd3e2d7109ad8b7da48e1d4710c8d06c253cbc4a27c1cff4fbcaa97a9e3f"}, + {file = "pydantic_core-2.33.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f517ca031dfc037a9c07e748cefd8d96235088b83b4f4ba8939105d20fa1dcd6"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a9f2c9dd19656823cb8250b0724ee9c60a82f3cdf68a080979d13092a3b0fef"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:2b0a451c263b01acebe51895bfb0e1cc842a5c666efe06cdf13846c7418caa9a"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1ea40a64d23faa25e62a70ad163571c0b342b8bf66d5fa612ac0dec4f069d916"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0fb2d542b4d66f9470e8065c5469ec676978d625a8b7a363f07d9a501a9cb36a"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9fdac5d6ffa1b5a83bca06ffe7583f5576555e6c8b3a91fbd25ea7780f825f7d"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:04a1a413977ab517154eebb2d326da71638271477d6ad87a769102f7c2488c56"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c8e7af2f4e0194c22b5b37205bfb293d166a7344a5b0d0eaccebc376546d77d5"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:5c92edd15cd58b3c2d34873597a1e20f13094f59cf88068adb18947df5455b4e"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:65132b7b4a1c0beded5e057324b7e16e10910c106d43675d9bd87d4f38dde162"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win32.whl", hash = "sha256:52fb90784e0a242bb96ec53f42196a17278855b0f31ac7c3cc6f5c1ec4811849"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win_amd64.whl", hash = "sha256:c083a3bdd5a93dfe480f1125926afcdbf2917ae714bdb80b36d34318b2bec5d9"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win_arm64.whl", hash = "sha256:e80b087132752f6b3d714f041ccf74403799d3b23a72722ea2e6ba2e892555b9"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:61c18fba8e5e9db3ab908620af374db0ac1baa69f0f32df4f61ae23f15e586ac"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:95237e53bb015f67b63c91af7518a62a8660376a6a0db19b89acc77a4d6199f5"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-win_amd64.whl", hash = "sha256:c2fc0a768ef76c15ab9238afa6da7f69895bb5d1ee83aeea2e3509af4472d0b9"}, + {file = "pydantic_core-2.33.2.tar.gz", hash = "sha256:7cb8bc3605c29176e1b105350d2e6474142d7c1bd1d9327c4a9bdb46bf827acc"}, +] + +[[package]] +name = "pydantic-settings" +version = "2.10.1" +requires_python = ">=3.9" +summary = "Settings management using Pydantic" +groups = ["default"] +dependencies = [ + "pydantic>=2.7.0", + "python-dotenv>=0.21.0", + "typing-inspection>=0.4.0", +] +files = [ + {file = "pydantic_settings-2.10.1-py3-none-any.whl", hash = "sha256:a60952460b99cf661dc25c29c0ef171721f98bfcb52ef8d9ea4c943d7c8cc796"}, + {file = "pydantic_settings-2.10.1.tar.gz", hash = "sha256:06f0062169818d0f5524420a360d632d5857b83cffd4d42fe29597807a1614ee"}, +] + +[[package]] +name = "pygments" +version = "2.19.2" +requires_python = ">=3.8" +summary = "Pygments is a syntax highlighting package written in Python." +groups = ["default"] +files = [ + {file = "pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b"}, + {file = "pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887"}, +] + +[[package]] +name = "pytest" +version = "8.4.1" +requires_python = ">=3.9" +summary = "pytest: simple powerful testing with Python" +groups = ["default"] +dependencies = [ + "colorama>=0.4; sys_platform == \"win32\"", + "exceptiongroup>=1; python_version < \"3.11\"", + "iniconfig>=1", + "packaging>=20", + "pluggy<2,>=1.5", + "pygments>=2.7.2", + "tomli>=1; python_version < \"3.11\"", +] +files = [ + {file = "pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7"}, + {file = "pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c"}, +] + +[[package]] +name = "python-dotenv" +version = "1.1.1" +requires_python = ">=3.9" +summary = "Read key-value pairs from a .env file and set them as environment variables" +groups = ["default"] +files = [ + {file = "python_dotenv-1.1.1-py3-none-any.whl", hash = "sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc"}, + {file = "python_dotenv-1.1.1.tar.gz", hash = "sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab"}, +] + +[[package]] +name = "pyyaml" +version = "6.0.2" +requires_python = ">=3.8" +summary = "YAML parser and emitter for Python" +groups = ["default"] +files = [ + {file = "PyYAML-6.0.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:efdca5630322a10774e8e98e1af481aad470dd62c3170801852d752aa7a783ba"}, + {file = "PyYAML-6.0.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:50187695423ffe49e2deacb8cd10510bc361faac997de9efef88badc3bb9e2d1"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0ffe8360bab4910ef1b9e87fb812d8bc0a308b0d0eef8c8f44e0254ab3b07133"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:17e311b6c678207928d649faa7cb0d7b4c26a0ba73d41e99c4fff6b6c3276484"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:70b189594dbe54f75ab3a1acec5f1e3faa7e8cf2f1e08d9b561cb41b845f69d5"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:41e4e3953a79407c794916fa277a82531dd93aad34e29c2a514c2c0c5fe971cc"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:68ccc6023a3400877818152ad9a1033e3db8625d899c72eacb5a668902e4d652"}, + {file = "PyYAML-6.0.2-cp313-cp313-win32.whl", hash = "sha256:bc2fa7c6b47d6bc618dd7fb02ef6fdedb1090ec036abab80d4681424b84c1183"}, + {file = "PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563"}, + {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, +] + +[[package]] +name = "sniffio" +version = "1.3.1" +requires_python = ">=3.7" +summary = "Sniff out which async library your code is running under" +groups = ["default"] +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + +[[package]] +name = "typing-extensions" +version = "4.14.1" +requires_python = ">=3.9" +summary = "Backported and Experimental Type Hints for Python 3.9+" +groups = ["default"] +files = [ + {file = "typing_extensions-4.14.1-py3-none-any.whl", hash = "sha256:d1e1e3b58374dc93031d6eda2420a48ea44a36c2b4766a4fdeb3710755731d76"}, + {file = "typing_extensions-4.14.1.tar.gz", hash = "sha256:38b39f4aeeab64884ce9f74c94263ef78f3c22467c8724005483154c26648d36"}, +] + +[[package]] +name = "typing-inspection" +version = "0.4.1" +requires_python = ">=3.9" +summary = "Runtime typing introspection tools" +groups = ["default"] +dependencies = [ + "typing-extensions>=4.12.0", +] +files = [ + {file = "typing_inspection-0.4.1-py3-none-any.whl", hash = "sha256:389055682238f53b04f7badcb49b989835495a96700ced5dab2d8feae4b26f51"}, + {file = "typing_inspection-0.4.1.tar.gz", hash = "sha256:6ae134cc0203c33377d43188d4064e9b357dba58cff3185f22924610e70a9d28"}, +] + +[[package]] +name = "virtualenv" +version = "20.34.0" +requires_python = ">=3.8" +summary = "Virtual Python Environment builder" +groups = ["default"] +dependencies = [ + "distlib<1,>=0.3.7", + "filelock<4,>=3.12.2", + "importlib-metadata>=6.6; python_version < \"3.8\"", + "platformdirs<5,>=3.9.1", + "typing-extensions>=4.13.2; python_version < \"3.11\"", +] +files = [ + {file = "virtualenv-20.34.0-py3-none-any.whl", hash = "sha256:341f5afa7eee943e4984a9207c025feedd768baff6753cd660c857ceb3e36026"}, + {file = "virtualenv-20.34.0.tar.gz", hash = "sha256:44815b2c9dee7ed86e387b842a84f20b93f7f417f95886ca1996a72a4138eb1a"}, +] diff --git a/test_automation_framework/pyproject.toml b/test_automation_framework/pyproject.toml new file mode 100644 index 000000000..67db4bda5 --- /dev/null +++ b/test_automation_framework/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "badgerdoc_taf" +version = "0.1.0" +description = "Default template for PDM package" +authors = [ + {name = "asobolev", email = "aleksei_sobolev@epam.com"}, +] +dependencies = ["PyYAML==6.0.2", "dotenv==0.9.9", "httpx==0.28.1", "pre-commit==4.3.0", "pydantic-settings==2.10.1", "pydantic==2.11.7", "pytest==8.4.1"] +requires-python = "==3.13.*" +readme = "README.md" +license = {text = "MIT"} + + +[tool.pdm] +distribution = false diff --git a/test_automation_framework/settings.py b/test_automation_framework/settings.py new file mode 100644 index 000000000..13076ce2f --- /dev/null +++ b/test_automation_framework/settings.py @@ -0,0 +1,36 @@ +import yaml +from pathlib import Path +from pydantic_settings import BaseSettings, SettingsConfigDict +from pydantic import SecretStr + +ROOT = Path(__file__).parent +DEFAULTS_PATH = ROOT / "config" / "defaults.yaml" + + +class Settings(BaseSettings): + BASE_URL: str + API_USER: str + API_PASS: SecretStr + TIMEOUT_SECONDS: int = 30 + MAX_WORKERS: int = 4 + USE_MOCK_LLM: bool = True + LOG_LEVEL: str = "INFO" + LLM_API_KEY: str | None = None + + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + + +def load_settings() -> Settings: + with open(DEFAULTS_PATH, "r") as f: + yaml_defaults = yaml.safe_load(f) + + from dotenv import dotenv_values + + env_data = dotenv_values(".env") + + merged = { + **yaml_defaults, + **{k: v for k, v in env_data.items() if v is not None}, + } + + return Settings(**merged) diff --git a/test_automation_framework/tests/test_base_api.py b/test_automation_framework/tests/test_base_api.py new file mode 100644 index 000000000..b4a2f9b0d --- /dev/null +++ b/test_automation_framework/tests/test_base_api.py @@ -0,0 +1,488 @@ +from logging import getLogger +from datetime import datetime, timedelta +import uuid + +import pytest + + +from helpers.base_client.base_client import HTTPError + +logger = getLogger(__name__) + + +class TestAuthAPI: + def test_basic_auth(self, auth_token): + access_token, refresh_token = auth_token + assert access_token + assert refresh_token + + def test_wrong_creds(self, auth_service): + with pytest.raises(HTTPError) as exc: + auth_service.get_token("wrong", "wrong") + assert exc.value.status_code == 401 + + def test_refresh_token(self, auth_token, auth_service): + access_token, refresh_token = auth_token + new_access, new_refresh = auth_service.refresh_token(refresh_token=refresh_token) + assert new_access != access_token + assert new_refresh != refresh_token + + +class TestAPI: + def test_menu(self, menu_client): + menu = menu_client.get_menu() + assert isinstance(menu, list) + assert menu + required_keys = {"name", "badgerdoc_path", "is_external", "is_iframe", "url", "children"} + for item in menu: + assert required_keys <= item.keys() + first_item = menu[0] + assert isinstance(first_item["name"], str) + assert isinstance(first_item["badgerdoc_path"], str) + assert isinstance(first_item["is_external"], bool) + assert isinstance(first_item["children"], (list, type(None))) + expected_names = {"Documents", "My Tasks", "Jobs", "Settings"} + actual_names = {item["name"] for item in menu} + assert expected_names <= actual_names + settings_item = next(i for i in menu if i["name"] == "Settings") + assert isinstance(settings_item["children"], list) + assert any(child["name"] == "Keycloak" for child in settings_item["children"]) + + +class TestDatasets: + def test_clear_search_for_datasets(self, dataset_client): + result = dataset_client.search() + assert "pagination" in result + assert "data" in result + assert isinstance(result["data"], list) + pagination = result["pagination"] + required_pagination_keys = {"page_num", "page_offset", "page_size", "min_pages_left", "total", "has_more"} + assert required_pagination_keys <= pagination.keys() + for dataset in result["data"]: + required_dataset_keys = {"id", "name", "count", "created"} + assert required_dataset_keys <= dataset.keys() + assert isinstance(dataset["id"], int) + assert isinstance(dataset["name"], str) + assert isinstance(dataset["count"], int) + datetime.fromisoformat(dataset["created"]) + + def test_search_sorting(self, dataset_client): + result = dataset_client.search(sorting=[{"direction": "desc", "field": "name"}]) + names = [d["name"] for d in result["data"]] + assert names == sorted(names, reverse=True) + + def test_search_pagination(self, dataset_client): + result = dataset_client.search(page_num=1, page_size=15) + assert len(result["data"]) <= 15 + assert result["pagination"]["page_num"] == 1 + + def test_selection(self, dataset_client): + datasets = dataset_client.search()["data"] + assert datasets + dataset_id = datasets[0]["id"] + files_selected = dataset_client.search_files(dataset_id=dataset_id)["data"] + assert isinstance(files_selected, list) + for f in files_selected: + assert any(d["id"] == dataset_id for d in f.get("datasets", [])) + files_all = dataset_client.search_files()["data"] + assert isinstance(files_all, list) + has_dataset = any(f.get("datasets") for f in files_all) + has_no_dataset = any(not f.get("datasets") for f in files_all) + assert has_dataset or has_no_dataset + + def test_create_and_delete_dataset(self, dataset_client): + dataset_name = f"autotest_{uuid.uuid4().hex[:8]}" + create_resp = dataset_client.create_dataset(name=dataset_name) + assert "detail" in create_resp + assert "successfully created" in create_resp["detail"].lower() + search_resp = dataset_client.search(filters=[{"field": "name", "operator": "eq", "value": dataset_name}]) + assert any(d["name"] == dataset_name for d in search_resp["data"]) + delete_resp = dataset_client.delete_dataset(name=dataset_name) + assert "detail" in delete_resp + assert "successfully deleted" in delete_resp["detail"].lower() + search_after = dataset_client.search(filters=[{"field": "name", "operator": "eq", "value": dataset_name}]) + assert all(d["name"] != dataset_name for d in search_after["data"]) + + @pytest.mark.skip(reason="Successfully creates dataset") + def test_create_dataset_with_empty_name(self, dataset_tracker): + created, client = dataset_tracker + + with pytest.raises(HTTPError) as e: + client.create_dataset(name="") + + assert e.value.status_code in (400, 422) + + def test_create_duplicate_dataset(self, dataset_tracker): + created, client = dataset_tracker + dataset_name = f"autotest_{uuid.uuid4().hex[:8]}" + resp = client.create_dataset(name=dataset_name) + created.append(dataset_name) + assert "successfully created" in resp["detail"].lower() + with pytest.raises(HTTPError) as exc: + client.create_dataset(name=dataset_name) + assert exc.value.status_code == 400 + assert "already exists" in exc.value.body.lower() + + def test_search_existing_dataset(self, dataset_tracker): + created, client = dataset_tracker + dataset_name = f"autotest_{uuid.uuid4().hex[:8]}" + resp = client.create_dataset(name=dataset_name) + created.append(dataset_name) + assert "successfully created" in resp["detail"].lower() + + search_resp = client.search(filters=[{"field": "name", "operator": "eq", "value": dataset_name}]) + names = [d["name"] for d in search_resp["data"]] + assert dataset_name in names + + def test_search_non_existing_dataset(self, dataset_client): + search_resp = dataset_client.search( + filters=[{"field": "name", "operator": "eq", "value": "non_existing_dataset"}] + ) + assert search_resp["data"] == [] + + def test_search_multiple_existing_datasets(self, dataset_tracker): + created, client = dataset_tracker + names = [f"autotest_{uuid.uuid4().hex[:8]}" for _ in range(2)] + for n in names: + resp = client.create_dataset(name=n) + created.append(n) + assert "successfully created" in resp["detail"].lower() + + search_resp = client.search(filters=[{"field": "name", "operator": "in", "value": names}]) + found_names = {d["name"] for d in search_resp["data"]} + assert set(names) <= found_names + + +class TestFiles: + def test_upload_and_delete_file(self, file_tracker, tmp_path): + created_files, client = file_tracker + try: + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + assert file_info["status"] is True + assert "id" in file_info + assert "file_name" in file_info + created_files.append(file_info) + search = client.search_files() + ids = [f["id"] for f in search["data"]] + assert file_info["id"] in ids + delete_result = client.delete_files([file_info["id"]]) + assert delete_result[0]["status"] is True + assert delete_result[0]["action"] == "delete" + search_after = client.search_files() + ids_after = [f["id"] for f in search_after["data"]] + assert file_info["id"] not in ids_after + created_files.clear() + finally: + if temp_file.exists(): + temp_file.unlink() + + @pytest.mark.skip(reason="Uploads a file, but returns 500") + @pytest.mark.parametrize("content", ["", " "]) + def test_upload_empty_file(self, file_client, tmp_path, content): + empty_file = tmp_path / f"{uuid.uuid4().hex}_empty.pdf" + empty_file.write_text(content) + with pytest.raises(HTTPError) as exc: + file_client.upload_file(str(empty_file)) + assert exc.value.status_code == 400 + + def test_move_file(self, file_tracker, dataset_tracker, tmp_path): + created_datasets, dataset_client = dataset_tracker + + first_dataset_name = f"autotest_{uuid.uuid4().hex[:8]}" + second_dataset_name = f"autotest_{uuid.uuid4().hex[:8]}" + + first_resp = dataset_client.create_dataset(name=first_dataset_name) + created_datasets.append(first_dataset_name) + assert "successfully created" in first_resp["detail"].lower() + first_dataset_id = dataset_client.search( + filters=[{"field": "name", "operator": "eq", "value": first_dataset_name}] + )["data"][0]["id"] + + second_resp = dataset_client.create_dataset(name=second_dataset_name) + created_datasets.append(second_dataset_name) + assert "successfully created" in second_resp["detail"].lower() + second_dataset_id = dataset_client.search( + filters=[{"field": "name", "operator": "eq", "value": second_dataset_name}] + )["data"][0]["id"] + + created_files, client = file_tracker + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + created_files.append(file_info) + file_id = file_info["id"] + try: + move1 = client.move_files(name=first_dataset_name, objects=[file_id])[0] + assert move1["status"] is True + assert "successfully bounded" in move1["message"].lower() + files_in_first = dataset_client.search_files(dataset_id=first_dataset_id)["data"] + assert any(f["id"] == file_id for f in files_in_first) + move2 = client.move_files(name=second_dataset_name, objects=[file_id])[0] + assert move2["status"] is True + assert "successfully bounded" in move2["message"].lower() + files_in_second = dataset_client.search_files(dataset_id=second_dataset_id)["data"] + assert any(f["id"] == file_id for f in files_in_second) + finally: + if temp_file.exists(): + temp_file.unlink() + + def test_search_existing_file(self, file_tracker, tmp_path): + created_files, client = file_tracker + try: + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + assert file_info["status"] is True + search_resp = client.search_files( + filters=[{"field": "original_name", "operator": "eq", "value": file_info["file_name"]}] + ) + names = [f["original_name"] for f in search_resp["data"]] + assert file_info["file_name"] in names + finally: + if temp_file.exists(): + temp_file.unlink() + + def test_search_non_existing_file(self, file_client): + search_resp = file_client.search_files( + filters=[{"field": "original_name", "operator": "eq", "value": "definitely_not_a_file.pdf"}] + ) + assert search_resp["data"] == [] + + def test_search_multiple_existing_files(self, file_tracker, tmp_path): + created_files, client = file_tracker + f1, t1 = client.upload_temp_file(client, file_tracker, tmp_path) + f2, t2 = client.upload_temp_file(client, file_tracker, tmp_path) + names = [f1["file_name"], f2["file_name"]] + + search = client.search_files(filters=[{"field": "original_name", "operator": "in", "value": names}]) + found_names = {f["original_name"] for f in search["data"]} + assert set(names) <= found_names + + t1.unlink(missing_ok=True) + t2.unlink(missing_ok=True) + + def test_download_existing_file(self, file_tracker, tmp_path): + created_files, client = file_tracker + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + file_id = file_info["id"] + + content = client.download_file(file_id) + assert isinstance(content, (bytes, bytearray)) + assert len(content) > 100 + assert content.startswith(b"%PDF") + + temp_file.unlink(missing_ok=True) + + def test_download_nonexistent_file(self, file_client): + with pytest.raises(HTTPError) as exc: + file_client.download_file(9999999) + assert exc.value.status_code == 404 + + +class TestJobs: + def test_create_and_poll_job( + self, file_client, jobs_client, file_tracker, dataset_tracker, job_tracker, tmp_path, user_uuid + ): + created_files, client = file_tracker + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + created_datasets, dataset_client = dataset_tracker + + dataset_name = f"autotest_ds_{uuid.uuid4().hex[:8]}" + dataset_client.create_dataset(name=dataset_name) + created_datasets.append(dataset_name) + move_resp = file_client.move_files(name=dataset_name, objects=[file_info["id"]])[0] + assert move_resp["status"] is True + job_name = f"test_job_{uuid.uuid4().hex[:8]}" + create_resp = jobs_client.create_job( + name=job_name, + file_ids=[file_info["id"]], + owners=[user_uuid], + ) + job_tracker[0].append(create_resp) + job_id = create_resp.get("id") + assert job_id + final_job = jobs_client.poll_until_finished(job_id=job_id, timeout_seconds=300) + status = final_job.get("status") + assert str(status).lower() in {"finished", "success", "completed"} + job_files = final_job.get("files") or [] + assert file_info["id"] in job_files + + @pytest.mark.parametrize("field", ["name", "type", "status", "deadline", "creation_datetime"]) + @pytest.mark.parametrize("direction", ["asc", "desc"]) + # descending name sorting works weird + def test_sorting(self, jobs_client, field, direction): + resp = jobs_client.post_json( + "/jobs/jobs/search", + json={ + "pagination": {"page_num": 1, "page_size": 15}, + "filters": [], + "sorting": [{"direction": direction, "field": field}], + }, + headers=jobs_client._default_headers(content_type_json=True), + ) + data = resp["data"] + values = [d[field] for d in data if field in d and d[field] is not None] + + if field in {"creation_datetime", "deadline"}: + values = [datetime.fromisoformat(v) for v in values] + + expected = sorted(values, reverse=(direction == "desc")) + assert values == expected + + @pytest.mark.parametrize("field", ["name", "type", "status", "deadline", "creation_datetime"]) + def test_job_search(self, jobs_client, job_tracker, file_tracker, dataset_tracker, user_uuid, tmp_path, field): + created_files, client = file_tracker + file_info, temp_file = client.upload_temp_file(client, file_tracker, tmp_path) + created_datasets, dataset_client = dataset_tracker + + dataset_name = f"autotest_ds_{uuid.uuid4().hex[:8]}" + dataset_client.create_dataset(name=dataset_name) + created_datasets.append(dataset_name) + + job_name = f"test_job_{uuid.uuid4().hex[:8]}" + create_resp = jobs_client.create_job( + name=job_name, + file_ids=[file_info["id"]], + owners=[user_uuid], + ) + job_id = create_resp.get("id") + jobs_client.poll_until_finished(job_id=job_id, timeout_seconds=300) + job_tracker[0].append(create_resp) + search_value = create_resp.get(field, None) + + filters = [ + {"field": field, "operator": "eq", "value": search_value}, + {"field": "name", "operator": "eq", "value": job_name}, + ] + + search_resp = jobs_client.post_json( + "/jobs/jobs/search", + json={ + "pagination": {"page_num": 1, "page_size": 100}, + "filters": filters, + }, + headers=jobs_client._default_headers(content_type_json=True), + ) + + job_ids = [j["id"] for j in search_resp["data"]] + assert job_id in job_ids + + @pytest.mark.parametrize("field", ["creation_datetime", "deadline"]) + def test_date_range_filter(self, jobs_client, field): + start = (datetime.utcnow() - timedelta(days=365)).replace(microsecond=0).isoformat() + end = (datetime.utcnow() + timedelta(days=365)).replace(microsecond=0).isoformat() + + resp = jobs_client.post_json( + "/jobs/jobs/search", + json={ + "pagination": {"page_num": 1, "page_size": 15}, + "filters": [ + {"field": field, "operator": "ge", "value": start}, + {"field": field, "operator": "le", "value": end}, + ], + }, + headers=jobs_client._default_headers(content_type_json=True), + ) + + data = resp["data"] + for job in data: + if field in job and job[field] is not None: + date_val = datetime.fromisoformat(job[field]) + assert datetime.fromisoformat(start) <= date_val <= datetime.fromisoformat(end) + + +class TestCategories: + @pytest.mark.skip(reason="Creation works, but deletion not implemented, will be cluttered by multiple runs") + def test_create_and_delete_category(self, auth_token, settings, tenant, categories_client): + access_token, _ = auth_token + + unique_id = f"test_cat_{uuid.uuid4().hex[:6]}" + created = categories_client.create_category(category_id=unique_id, name=unique_id, parent="example") + assert created.id == unique_id + search_result = categories_client.search_categories(page_size=100) + ids = [c.id for c in search_result.data] + assert unique_id in ids, f"Category {unique_id} not found after creation" + + deleted = categories_client.delete_category(unique_id) + assert deleted.get("detail") or deleted.get("status") or "success" in str(deleted).lower() + search_after_delete = categories_client.search_categories(page_size=100) + ids_after = [c.id for c in search_after_delete.data] + assert unique_id not in ids_after, f"Category {unique_id} still present after deletion" + + +class TestReports: + def test_export_tasks_csv(self, reports_client, user_uuid): + csv_text = reports_client.export_tasks( + user_ids=[user_uuid], + date_from="2025-05-01 00:00:00", + date_to="2025-08-31 00:00:00", + ) + assert "annotator_id" in csv_text + assert "task_id" in csv_text + + @pytest.mark.parametrize( + "date_from,date_to", + [ + ("2028-05-01 00:00:00", "2028-08-31 00:00:00"), + ("1900-01-01 00:00:00", "1900-12-31 00:00:00"), + ("2025-09-01 00:00:00", "2025-08-01 00:00:00"), + ], + ) + def test_export_tasks_wrong_date(self, reports_client, user_uuid, date_from, date_to): + with pytest.raises(HTTPError) as exc: + reports_client.export_tasks( + user_ids=[user_uuid], + date_from=date_from, + date_to=date_to, + ) + assert exc.value.status_code == 406 + + +class TestPlugins: + def test_create_and_delete_plugin(self, plugins_tracker): + created, plugins_client = plugins_tracker + unique_name = f"plugin_{uuid.uuid4().hex[:8]}" + resp = plugins_client.create_plugin( + name=unique_name, + menu_name=unique_name, + description="bar", + version="1", + url="http://what.com/what", + is_iframe=True, + ) + plugin_id = resp["id"] + created.append(plugin_id) + + plugins = plugins_client.get_plugins() + assert any(p["id"] == plugin_id for p in plugins) + assert any(p["name"] == unique_name for p in plugins) + + plugins_client.delete_plugin(plugin_id) + + plugins = plugins_client.get_plugins() + assert not any(p["id"] == plugin_id for p in plugins) + + def test_update_plugin(self, plugins_tracker): + created, plugins_client = plugins_tracker + unique_name = f"plugin_{uuid.uuid4().hex[:8]}" + resp = plugins_client.create_plugin( + name=unique_name, + menu_name=unique_name, + description="bar", + version="1", + url="http://what.com/what", + is_iframe=True, + ) + plugin_id = resp["id"] + created.append(plugin_id) + + updated_payload = { + "name": unique_name, + "menu_name": unique_name, + "description": "updated desc", + "version": "1", + "url": "http://what.com/what", + "is_iframe": True, + } + update_resp = plugins_client.update_plugin(plugin_id, **updated_payload) + assert update_resp["description"] == "updated desc" + + plugins = plugins_client.get_plugins() + updated = next(p for p in plugins if p["id"] == plugin_id) + assert updated["description"] == "updated desc"