From 16fe5f70a175e3ef98289f10febf88cba161f01e Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Mon, 7 Apr 2025 15:40:06 +0300 Subject: [PATCH 01/13] fill files by retrieving them using annotation service on job creation --- jobs/jobs/main.py | 7 +++++++ jobs/jobs/utils.py | 51 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/jobs/jobs/main.py b/jobs/jobs/main.py index 6e98313a1..5ea1f4901 100644 --- a/jobs/jobs/main.py +++ b/jobs/jobs/main.py @@ -72,6 +72,13 @@ async def create_job( ) ) + if len(job_params.revisions) > 0: + job_params.files = await utils.get_file_ids_of_revisions( + revisions=list(job_params.revisions), + current_tenant=current_tenant, + jwt_token=jw_token, + ) + if job_params.type == schemas.JobType.ExtractionJob: created_extraction_job = await create_job_funcs.create_extraction_job( extraction_job_input=job_params, # type: ignore diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 35fc28ee2..1978b9288 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union, Set import aiohttp.client_exceptions import fastapi.encoders @@ -7,7 +7,7 @@ import jobs.airflow_utils as airflow_utils import jobs.databricks_utils as databricks_utils import jobs.pipeline as pipeline -from jobs import db_service +from jobs import db_service, schemas from jobs.config import ( ANNOTATION_SERVICE_HOST, ASSETS_SERVICE_HOST, @@ -788,6 +788,42 @@ async def get_annotation_revisions( return response +async def get_annotations_by_revisions( + current_tenant: Optional[str], jw_token: str, revisions: List[str] +) -> Optional[Dict[str, Any]]: + """Get annotations by filtering""" + + headers = { + "X-Current-Tenant": current_tenant, + "Authorization": f"Bearer: {jw_token}", + } + + timeout = aiohttp.ClientTimeout(total=5) + + post_data = { + "filters": [ + {"field": "revision", "operator": "in", "value": revisions} + ] + } + + try: + _, response = await fetch( + method="POST", + url=f"{ANNOTATION_SERVICE_HOST}/annotation", + headers=headers, + timeout=timeout, + body=post_data, + ) + except aiohttp.client_exceptions.ClientError as err: + logger.exception(f"Failed request to get annotations by filters") + raise fastapi.HTTPException( + status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Failed request to the Annotation Manager: {err}", + ) + + return response + + async def search_datasets_by_ids( datasets_ids: List[int], current_tenant: str, jw_token: str ) -> Dict[str, Any]: @@ -860,3 +896,14 @@ async def validate_create_job_previous_jobs( detail="Jobs with these ids do not exist.", ) return [j.id for j in previous_jobs] + + +async def get_file_ids_of_revisions( + revisions: List[str], current_tenant: str, jwt_token: str +) -> Optional[List[int]]: + + response = await get_annotations_by_revisions( + current_tenant=current_tenant, jw_token=jwt_token, revisions=revisions + ) + + return list({data["file_id"] for data in response.get("data", [])}) From 3e7a9cd32d194df9304c5d7bbbffd39431b19c74 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Mon, 7 Apr 2025 16:03:58 +0300 Subject: [PATCH 02/13] update the list of files instead of replacing from revision files --- jobs/jobs/main.py | 4 ++-- jobs/jobs/utils.py | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/jobs/jobs/main.py b/jobs/jobs/main.py index 5ea1f4901..122b69a9f 100644 --- a/jobs/jobs/main.py +++ b/jobs/jobs/main.py @@ -73,8 +73,8 @@ async def create_job( ) if len(job_params.revisions) > 0: - job_params.files = await utils.get_file_ids_of_revisions( - revisions=list(job_params.revisions), + await utils.update_create_job_params_using_revisions( + job_params=job_params, current_tenant=current_tenant, jwt_token=jw_token, ) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 1978b9288..73bae2041 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -898,12 +898,18 @@ async def validate_create_job_previous_jobs( return [j.id for j in previous_jobs] -async def get_file_ids_of_revisions( - revisions: List[str], current_tenant: str, jwt_token: str -) -> Optional[List[int]]: - +async def update_create_job_params_using_revisions( + job_params: schemas.JobParams, current_tenant: str, jwt_token: str +) -> None: response = await get_annotations_by_revisions( - current_tenant=current_tenant, jw_token=jwt_token, revisions=revisions + current_tenant=current_tenant, + jw_token=jwt_token, + revisions=list(job_params.revisions), ) - return list({data["file_id"] for data in response.get("data", [])}) + unique_file_ids_of_revisions = set( + [data["file_id"] for data in response.get("data", [])] + ) + job_params.files = list( + unique_file_ids_of_revisions.union(job_params.files) + ) From 647981fa8298d6df1e349bc3b7ca39ac7791caaf Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Mon, 7 Apr 2025 16:12:19 +0300 Subject: [PATCH 03/13] fix linter errors --- jobs/jobs/utils.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 73bae2041..08524f90c 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -1,4 +1,13 @@ -from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union, Set +from typing import ( + Any, + Dict, + Generator, + Iterator, + List, + Optional, + Tuple, + Union, +) import aiohttp.client_exceptions import fastapi.encoders @@ -815,7 +824,7 @@ async def get_annotations_by_revisions( body=post_data, ) except aiohttp.client_exceptions.ClientError as err: - logger.exception(f"Failed request to get annotations by filters") + logger.exception("Failed request to get annotations by revisions") raise fastapi.HTTPException( status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Failed request to the Annotation Manager: {err}", From 9070efe207b596f2eb2b2356afa3e84b5d8c2772 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Mon, 7 Apr 2025 17:38:56 +0300 Subject: [PATCH 04/13] fix linter errors 2 --- jobs/jobs/utils.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 08524f90c..bca48ae63 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -1,13 +1,4 @@ -from typing import ( - Any, - Dict, - Generator, - Iterator, - List, - Optional, - Tuple, - Union, -) +from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union import aiohttp.client_exceptions import fastapi.encoders From 81189560f4055d2b2b1c4424d3e03b510daedd79 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Wed, 9 Apr 2025 14:07:59 +0300 Subject: [PATCH 05/13] set the default max request timeout value to 1 minute --- jobs/jobs/config.py | 3 +++ jobs/jobs/utils.py | 13 ++++--------- processing/processing/config.py | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/jobs/jobs/config.py b/jobs/jobs/config.py index 4ec64c6bd..f3095169d 100644 --- a/jobs/jobs/config.py +++ b/jobs/jobs/config.py @@ -54,6 +54,9 @@ def get_service_uri(prefix: str) -> str: # noqa PAGINATION_THRESHOLD = 7 PROVIDE_JWT_IF_NO_ANY = True +# request settings +REQUEST_TIMEOUT = 60 # in seconds + # S3 settings STORAGE_PROVIDER = os.getenv("STORAGE_PROVIDER") JOBS_SIGNED_URL_ENABLED = ( diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index bca48ae63..9b85b0def 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -16,6 +16,7 @@ JOBS_SIGNED_URL_KEY_NAME, PAGINATION_THRESHOLD, PIPELINES_SERVICE_HOST, + REQUEST_TIMEOUT, ROOT_PATH, TAXONOMY_SERVICE_HOST, USERS_HOST, @@ -572,11 +573,8 @@ async def get_job_progress( "X-Current-Tenant": current_tenant, "Authorization": f"Bearer: {jw_token}", } - timeout = aiohttp.ClientTimeout(total=5) try: - _, response = await fetch( - method="GET", url=url, headers=headers, timeout=timeout - ) + _, response = await fetch(method="GET", url=url, headers=headers) except aiohttp.client_exceptions.ClientError as err: logger.exception(f"Failed request url = {url}, error = {err}") raise fastapi.HTTPException( @@ -596,6 +594,8 @@ async def fetch( headers: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Tuple[int, Any]: + if "timeout" not in kwargs: + kwargs["timeout"] = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) async with aiohttp.request( method=method, url=url, json=body, headers=headers, data=data, **kwargs ) as resp: @@ -767,13 +767,11 @@ async def get_annotation_revisions( "X-Current-Tenant": current_tenant, "Authorization": f"Bearer: {jw_token}", } - timeout = aiohttp.ClientTimeout(total=5) try: _, response = await fetch( method="GET", url=f"{ANNOTATION_SERVICE_HOST}/revisions/{job_id}/{file_id}", headers=headers, - timeout=timeout, ) except aiohttp.client_exceptions.ClientError as err: logger.exception( @@ -798,8 +796,6 @@ async def get_annotations_by_revisions( "Authorization": f"Bearer: {jw_token}", } - timeout = aiohttp.ClientTimeout(total=5) - post_data = { "filters": [ {"field": "revision", "operator": "in", "value": revisions} @@ -811,7 +807,6 @@ async def get_annotations_by_revisions( method="POST", url=f"{ANNOTATION_SERVICE_HOST}/annotation", headers=headers, - timeout=timeout, body=post_data, ) except aiohttp.client_exceptions.ClientError as err: diff --git a/processing/processing/config.py b/processing/processing/config.py index e666772c0..6db2801a7 100644 --- a/processing/processing/config.py +++ b/processing/processing/config.py @@ -40,7 +40,7 @@ class Settings(BaseSettings): retry_attempts: int = 3 retry_statuses: Set[int] = {501, 502, 503} | {i for i in range(505, 600)} delay_between_retry_attempts: int = 1 # in seconds - request_timeout: int = 3 * 60 * 60 + request_timeout: int = 60 # in seconds root_path: str From 824b9444209f17ed77510faf852cfd044b30469f Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Wed, 9 Apr 2025 14:18:38 +0300 Subject: [PATCH 06/13] check if 'file_id' exist in response in update_create_job_params_using_revisions --- jobs/jobs/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 9b85b0def..c787da3b2 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -903,7 +903,7 @@ async def update_create_job_params_using_revisions( ) unique_file_ids_of_revisions = set( - [data["file_id"] for data in response.get("data", [])] + [data["file_id"] for data in response.get("data", []) if "file_id" in data] ) job_params.files = list( unique_file_ids_of_revisions.union(job_params.files) From 9c9d5dafb661e5b2c9e5583c19eb0450af11e4df Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Wed, 9 Apr 2025 23:32:39 +0300 Subject: [PATCH 07/13] add unit tests --- jobs/jobs/utils.py | 6 +++- jobs/tests/test_utils.py | 72 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index c787da3b2..4796ecfab 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -903,7 +903,11 @@ async def update_create_job_params_using_revisions( ) unique_file_ids_of_revisions = set( - [data["file_id"] for data in response.get("data", []) if "file_id" in data] + [ + data["file_id"] + for data in response.get("data", []) + if "file_id" in data + ] ) job_params.files = list( unique_file_ids_of_revisions.union(job_params.files) diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index af3f104c3..766b25935 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -1,8 +1,10 @@ -from unittest.mock import patch +from unittest.mock import patch, Mock, AsyncMock import aiohttp.client_exceptions import pytest from fastapi import HTTPException + +from jobs.schemas import JobParams, JobType from tests.conftest import FakePipeline, patched_create_pre_signed_s3_url import jobs.utils as utils @@ -1193,3 +1195,71 @@ async def test_execute_external_pipeline(sign_s3_links: bool): ) else: assert FakePipeline.calls[-1]["files"][0].get("signed_url") is None + + +@pytest.mark.asyncio +async def test_update_create_job_params_using_revisions(monkeypatch): + job_params = JobParams( + name="name_1", + type=JobType.ExtractionJob, + pipeline_name="pipeline_name_1", + files=[1], + revisions=["revision_id_1", "revision_id_2", "revision_id_3"], + ) + + mock_response = { + "data": [ + {"file_id": 2, "revision": "revision_id_1"}, + {"file_id": 3, "revision": "revision_id_2"}, + {"file_id": 3, "revision": "revision_id_3"}, + ] + } + + mock_current_tenant = Mock() + mock_jwt_token = Mock() + mock_get_annotations_by_revisions = AsyncMock(return_value=mock_response) + + monkeypatch.setattr( + utils, + "get_annotations_by_revisions", + mock_get_annotations_by_revisions, + ) + + await utils.update_create_job_params_using_revisions( + job_params, mock_current_tenant, mock_jwt_token + ) + + mock_get_annotations_by_revisions.assert_called_once() + + assert job_params.files == [1, 2, 3] + + +@pytest.mark.asyncio +async def test_get_annotations_by_revisions(monkeypatch): + revisions = ["revision_id_1", "revision_id_2"] + + mock_fetch_response_status = Mock() + mock_fetch_response_json = Mock() + + def mock_fetch_side_effect(**kwargs): + assert kwargs["url"].endswith("/annotation") + assert kwargs["method"] == "POST" + assert kwargs["body"]["filters"][0] == { + "field": "revision", + "operator": "in", + "value": revisions, + } + + return mock_fetch_response_status, mock_fetch_response_json + + mock_fetch = AsyncMock(side_effect=mock_fetch_side_effect) + mock_current_tenant = Mock() + mock_jw_token = Mock() + + monkeypatch.setattr(utils, "fetch", mock_fetch) + + await utils.get_annotations_by_revisions( + mock_current_tenant, mock_jw_token, revisions + ) + + mock_fetch.assert_called_once() From 1ed28b37892e33267ae6aaf4dbad59af16140bc5 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Wed, 9 Apr 2025 23:36:36 +0300 Subject: [PATCH 08/13] linter import sort fix --- jobs/tests/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index 766b25935..adcf28fdd 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -1,4 +1,4 @@ -from unittest.mock import patch, Mock, AsyncMock +from unittest.mock import AsyncMock, Mock, patch import aiohttp.client_exceptions import pytest From 8901db31ebe84cf5e84af20648393928e41609a6 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Thu, 10 Apr 2025 10:56:55 +0300 Subject: [PATCH 09/13] fix import order --- jobs/tests/test_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index adcf28fdd..532ae113d 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -4,11 +4,10 @@ import pytest from fastapi import HTTPException +import jobs.utils as utils from jobs.schemas import JobParams, JobType from tests.conftest import FakePipeline, patched_create_pre_signed_s3_url -import jobs.utils as utils - # --------------TEST get_files_data_from_datasets--------------- From 1f04fae538000b68d660fee3c8ad71451a97ab47 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Thu, 10 Apr 2025 10:59:10 +0300 Subject: [PATCH 10/13] fix import order 2 --- jobs/tests/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index 532ae113d..f06bc0d36 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -3,10 +3,10 @@ import aiohttp.client_exceptions import pytest from fastapi import HTTPException +from tests.conftest import FakePipeline, patched_create_pre_signed_s3_url import jobs.utils as utils from jobs.schemas import JobParams, JobType -from tests.conftest import FakePipeline, patched_create_pre_signed_s3_url # --------------TEST get_files_data_from_datasets--------------- From 1cf8c5b06f0a83801d2c9bf0766783919d3d3c04 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Thu, 10 Apr 2025 11:48:09 +0300 Subject: [PATCH 11/13] fix comments --- jobs/jobs/utils.py | 6 ++++-- jobs/pytest.ini | 2 +- jobs/tests/test_utils.py | 1 - processing/processing/config.py | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 4796ecfab..850e34aeb 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -812,8 +812,10 @@ async def get_annotations_by_revisions( except aiohttp.client_exceptions.ClientError as err: logger.exception("Failed request to get annotations by revisions") raise fastapi.HTTPException( - status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Failed request to the Annotation Manager: {err}", + status_code=fastapi.status.HTTP_400_BAD_REQUEST, + detail="Could not retrieve annotations from selected revisions, " + "Please try again or select different revisions. " + f"Selected revisions: {revisions}", ) return response diff --git a/jobs/pytest.ini b/jobs/pytest.ini index 2f6c8d12f..40880458c 100644 --- a/jobs/pytest.ini +++ b/jobs/pytest.ini @@ -1,2 +1,2 @@ [pytest] -asyncio_mode=strict +asyncio_mode=auto diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index f06bc0d36..27a92759b 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -1233,7 +1233,6 @@ async def test_update_create_job_params_using_revisions(monkeypatch): assert job_params.files == [1, 2, 3] -@pytest.mark.asyncio async def test_get_annotations_by_revisions(monkeypatch): revisions = ["revision_id_1", "revision_id_2"] diff --git a/processing/processing/config.py b/processing/processing/config.py index 6db2801a7..e666772c0 100644 --- a/processing/processing/config.py +++ b/processing/processing/config.py @@ -40,7 +40,7 @@ class Settings(BaseSettings): retry_attempts: int = 3 retry_statuses: Set[int] = {501, 502, 503} | {i for i in range(505, 600)} delay_between_retry_attempts: int = 1 # in seconds - request_timeout: int = 60 # in seconds + request_timeout: int = 3 * 60 * 60 root_path: str From 13d6a799f601298436bdc81c37a0764a0a2cc1be Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Thu, 10 Apr 2025 12:39:40 +0300 Subject: [PATCH 12/13] fix exception message --- jobs/jobs/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 850e34aeb..464529fd1 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -810,13 +810,13 @@ async def get_annotations_by_revisions( body=post_data, ) except aiohttp.client_exceptions.ClientError as err: - logger.exception("Failed request to get annotations by revisions") + logger.exception( + f"Failed request to get annotations by revisions: {revisions}" + ) raise fastapi.HTTPException( status_code=fastapi.status.HTTP_400_BAD_REQUEST, - detail="Could not retrieve annotations from selected revisions, " - "Please try again or select different revisions. " - f"Selected revisions: {revisions}", - ) + detail=f"Could not retrieve selected annotations: {', '.join(revisions)}", + ) from err return response From 8ca5f1d3b821fe18b90129537b4b47ac6ae89648 Mon Sep 17 00:00:00 2001 From: halil_aydin Date: Thu, 10 Apr 2025 12:44:02 +0300 Subject: [PATCH 13/13] remove unnecessary pytest asyncio mark --- jobs/jobs/utils.py | 3 ++- jobs/tests/test_utils.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index 464529fd1..a796f0014 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -815,7 +815,8 @@ async def get_annotations_by_revisions( ) raise fastapi.HTTPException( status_code=fastapi.status.HTTP_400_BAD_REQUEST, - detail=f"Could not retrieve selected annotations: {', '.join(revisions)}", + detail="Could not retrieve selected annotations:" + f" {', '.join(revisions)}", ) from err return response diff --git a/jobs/tests/test_utils.py b/jobs/tests/test_utils.py index 27a92759b..4eff11700 100644 --- a/jobs/tests/test_utils.py +++ b/jobs/tests/test_utils.py @@ -1196,7 +1196,6 @@ async def test_execute_external_pipeline(sign_s3_links: bool): assert FakePipeline.calls[-1]["files"][0].get("signed_url") is None -@pytest.mark.asyncio async def test_update_create_job_params_using_revisions(monkeypatch): job_params = JobParams( name="name_1",