From 31438afb27083dea2ce843e9212451b33a2f18ce Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Thu, 23 Oct 2025 20:22:12 +0100 Subject: [PATCH 1/3] separate tasks and add notes --- request-api/src/main.py | 18 +++++- request-processor/src/tasks.py | 115 ++++++++++++++++++++++----------- task_interface/check_tasks.py | 13 ---- task_interface/tasks.py | 34 ++++++++++ 4 files changed, 128 insertions(+), 52 deletions(-) delete mode 100644 task_interface/check_tasks.py create mode 100644 task_interface/tasks.py diff --git a/request-api/src/main.py b/request-api/src/main.py index 13615398..2eb80ccd 100644 --- a/request-api/src/main.py +++ b/request-api/src/main.py @@ -24,9 +24,16 @@ HealthStatus, DependencyHealth, ) -from task_interface.check_tasks import celery, CheckDataFileTask +from task_interface.tasks import celery, CheckDataFileTask, CheckDataUrlTask CheckDataFileTask = celery.register_task(CheckDataFileTask()) +CheckDataUrlTask = celery.register_task(CheckDataUrlTask()) + +# TODO maybe move to factory +task_map = { + "check_file":CheckDataFileTask, + "check_url":CheckDataUrlTask +} if os.environ.get("SENTRY_ENABLED", "false").lower() == "true": sentry_sdk.init( @@ -144,10 +151,15 @@ def create_request( request_schema = _map_to_schema(request_model=crud.create_request(db, request)) try: - CheckDataFileTask.delay(request_schema.model_dump()) + if request_schema.type == 'check_file': + CheckDataFileTask.delay(request_schema.model_dump()) + elif request_schema.type == "check_url": + CheckDataUrlTask.delay(request_schema.model_dump()) + else: + raise ValueError("invalid request type") except Exception as error: - logging.error("Async call to celery check data file task failed: %s", error) + logging.error("Async call to celery task failed: %s", error) raise error http_response.headers[ diff --git a/request-processor/src/tasks.py b/request-processor/src/tasks.py index 6d578473..fa3100c0 100644 --- a/request-processor/src/tasks.py +++ b/request-processor/src/tasks.py @@ -9,7 +9,7 @@ import s3_transfer_manager import crud import database -from task_interface.check_tasks import celery, CheckDataFileTask +from task_interface.tasks import celery, CheckDataFileTask,CheckDataUrlTask import json from application.core import workflow from application.configurations.config import Directories @@ -24,8 +24,8 @@ @celery.task(base=CheckDataFileTask, name=CheckDataFileTask.name) -def check_datafile(request: Dict, directories=None): - logger.info("check datafile") +def check_data_file(request: Dict, directories=None): + logger.info("check data file") request_schema = schemas.Request.model_validate(request) request_data = request_schema.params if not request_schema.status == "COMPLETE": @@ -39,55 +39,99 @@ def check_datafile(request: Dict, directories=None): for key, value in data_dict.items(): setattr(directories, key, value) - fileName = "" + file_name = "" tmp_dir = os.path.join( directories.COLLECTION_DIR, "resource", request_schema.id ) # Ensure tmp_dir exists, create it if it doesn't Path(tmp_dir).mkdir(parents=True, exist_ok=True) - if request_data.type == "check_file": - fileName = handle_check_file(request_schema, request_data, tmp_dir) - - elif request_data.type == "check_url": - # With Collector from digital-land/collect, edit to use correct directory path without changing Collector class - collector = Collector(collection_dir=Path(directories.COLLECTION_DIR)) - # Override the resource_dir to match our tmp_dir structure - collector.resource_dir = Path(tmp_dir) # Use the same directory as tmp_dir - collector.log_dir = ( - Path(directories.COLLECTION_DIR) / "log" / request_schema.id + file_name = handle_check_file(request_schema, request_data, tmp_dir) + + if file_name: + response = workflow.run_workflow( + file_name, + request_schema.id, + request_data.collection, + request_data.dataset, + "", + request_data.geom_type if hasattr(request_data, "geom_type") else "", + ( + request_data.column_mapping + if hasattr(request_data, "column_mapping") + else {} + ), + directories, ) + save_response_to_db(request_schema.id, response) + else: + save_response_to_db(request_schema.id, log) + raise CustomException(log) + return _get_request(request_schema.id) - # TBD: Can test infering plugin from URL, then if fails retry normal method without plugin? - # if 'FeatureServer' in request_data.url or 'MapServer' in request_data.url: - # request_data.plugin = "arcgis" +@celery.task(base=CheckDataUrlTask, name=CheckDataUrlTask.name) +def check_data_url(request: Dict, directories=None): + logger.info("check data url") + request_schema = schemas.Request.model_validate(request) + request_data = request_schema.params + if not request_schema.status == "COMPLETE": + if not directories: + directories = Directories + elif directories: + data_dict = json.loads(directories) + # Create an instance of the Directories class + directories = Directories() + # Update attribute values based on the dictionary + for key, value in data_dict.items(): + setattr(directories, key, value) + + fileName = "" + tmp_dir = os.path.join( + directories.COLLECTION_DIR, "resource", request_schema.id + ) + # Ensure tmp_dir exists, create it if it doesn't + Path(tmp_dir).mkdir(parents=True, exist_ok=True) + # if request_data.type == "check_file": + # fileName = handle_check_file(request_schema, request_data, tmp_dir) + + # With Collector from digital-land/collect, edit to use correct directory path without changing Collector class + collector = Collector(collection_dir=Path(directories.COLLECTION_DIR)) + # Override the resource_dir to match our tmp_dir structure + collector.resource_dir = Path(tmp_dir) # Use the same directory as tmp_dir + collector.log_dir = ( + Path(directories.COLLECTION_DIR) / "log" / request_schema.id + ) - status = collector.fetch(request_data.url, plugin=request_data.plugin) - logger.info(f"Collector Fetch status: {status}") + # TBD: Can test infering plugin from URL, then if fails retry normal method without plugin? + # if 'FeatureServer' in request_data.url or 'MapServer' in request_data.url: + # request_data.plugin = "arcgis" - # The resource is saved in collector.resource_dir with hash as filename - resource_files = list(collector.resource_dir.iterdir()) + status = collector.fetch(request_data.url, plugin=request_data.plugin) + logger.info(f"Collector Fetch status: {status}") - log = {} + # The resource is saved in collector.resource_dir with hash as filename + resource_files = list(collector.resource_dir.iterdir()) - if status == FetchStatus.OK: - if resource_files and len(resource_files) == 1: - logger.info(f"Resource Files Path from collector: {resource_files}") - fileName = resource_files[-1].name # Get the hash filename - logger.info(f"File Hash From Collector: {fileName}") + log = {} + + if status == FetchStatus.OK: + if resource_files and len(resource_files) == 1: + logger.info(f"Resource Files Path from collector: {resource_files}") + fileName = resource_files[-1].name # Get the hash filename + logger.info(f"File Hash From Collector: {fileName}") - else: - log["message"] = "No endpoint files found after successful fetch." - log["status"] = str(status) - log["exception_type"] = "URL check failed" - save_response_to_db(request_schema.id, log) - raise CustomException(log) else: + log["message"] = "No endpoint files found after successful fetch." log["status"] = str(status) - log["message"] = "Fetch operation failed" log["exception_type"] = "URL check failed" save_response_to_db(request_schema.id, log) - logger.warning(f"URL check failed with fetch status: {status}") raise CustomException(log) + else: + log["status"] = str(status) + log["message"] = "Fetch operation failed" + log["exception_type"] = "URL check failed" + save_response_to_db(request_schema.id, log) + logger.warning(f"URL check failed with fetch status: {status}") + raise CustomException(log) if fileName: response = workflow.run_workflow( @@ -110,7 +154,6 @@ def check_datafile(request: Dict, directories=None): raise CustomException(log) return _get_request(request_schema.id) - def handle_check_file(request_schema, request_data, tmp_dir): fileName = request_data.uploaded_filename try: diff --git a/task_interface/check_tasks.py b/task_interface/check_tasks.py deleted file mode 100644 index 828073d9..00000000 --- a/task_interface/check_tasks.py +++ /dev/null @@ -1,13 +0,0 @@ -import os -from typing import Dict, Optional - -from celery import Task, Celery - -celery = Celery("async-request-processor", broker=os.environ["CELERY_BROKER_URL"]) - - -class CheckDataFileTask(Task): - name = "task_interface.check_datafile_task" - - def run(self, request: Dict, directories: Optional[str] = None): - raise NotImplementedError("Base class must implement") diff --git a/task_interface/tasks.py b/task_interface/tasks.py new file mode 100644 index 00000000..126a2778 --- /dev/null +++ b/task_interface/tasks.py @@ -0,0 +1,34 @@ +""" +module to create the celery app and fake tasks to allow tasks to be accessed separetely from different projects +and therefore worker app/api does not need to use the same requirements as worker. In our case stops the api from +needed to install digital-land and all it's dependencies + +when using in the api for a celery set up the task classes can be instantiated and used to +que on the message broker + +when using in the worker you should overwrite the task name with the functionality +""" + +import os +from typing import Dict, Optional + +from celery import Task, Celery + +# celery object with same broker instantiation from the command line to feed both app +# and worker +celery = Celery("async-request-processor", broker=os.environ["CELERY_BROKER_URL"]) + + +# classes to be used in both app and worker, in app used to que task, +# in the worker you should override the task name with the correct task +class CheckDataFileTask(Task): + name = "task_interface.check_datafile_task" + + def run(self, request: Dict, directories: Optional[str] = None): + raise NotImplementedError("Base class must implement") + +class CheckDataUrlTask(Task): + name = "task_interface.check_url_task" + + def run(self, request: Dict, directories: Optional[str] = None): + raise NotImplementedError("Base class must implement") From 29248e7ef6dd9ef1f01e3d7265b2ba2d938bb178 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Thu, 23 Oct 2025 21:02:49 +0100 Subject: [PATCH 2/3] run black --- ...d45c986e2727_create_indexes_for_foreign_keys.py | 1 + request-api/src/main.py | 13 +++++-------- request-api/tests/integration/test_main.py | 6 +++--- request-processor/src/tasks.py | 14 +++++++------- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/request-api/migrations/versions/d45c986e2727_create_indexes_for_foreign_keys.py b/request-api/migrations/versions/d45c986e2727_create_indexes_for_foreign_keys.py index 072aaf34..2f925353 100644 --- a/request-api/migrations/versions/d45c986e2727_create_indexes_for_foreign_keys.py +++ b/request-api/migrations/versions/d45c986e2727_create_indexes_for_foreign_keys.py @@ -5,6 +5,7 @@ Create Date: 2024-05-16 07:38:06.028489 """ + from alembic import op diff --git a/request-api/src/main.py b/request-api/src/main.py index 2eb80ccd..3443f59a 100644 --- a/request-api/src/main.py +++ b/request-api/src/main.py @@ -30,10 +30,7 @@ CheckDataUrlTask = celery.register_task(CheckDataUrlTask()) # TODO maybe move to factory -task_map = { - "check_file":CheckDataFileTask, - "check_url":CheckDataUrlTask -} +task_map = {"check_file": CheckDataFileTask, "check_url": CheckDataUrlTask} if os.environ.get("SENTRY_ENABLED", "false").lower() == "true": sentry_sdk.init( @@ -133,9 +130,9 @@ def healthcheck( ), DependencyHealth( name="sqs", - status=HealthStatus.HEALTHY - if queue_reachable - else HealthStatus.UNHEALTHY, + status=( + HealthStatus.HEALTHY if queue_reachable else HealthStatus.UNHEALTHY + ), ), ], ) @@ -151,7 +148,7 @@ def create_request( request_schema = _map_to_schema(request_model=crud.create_request(db, request)) try: - if request_schema.type == 'check_file': + if request_schema.type == "check_file": CheckDataFileTask.delay(request_schema.model_dump()) elif request_schema.type == "check_url": CheckDataUrlTask.delay(request_schema.model_dump()) diff --git a/request-api/tests/integration/test_main.py b/request-api/tests/integration/test_main.py index 53d8093a..80aa8039 100644 --- a/request-api/tests/integration/test_main.py +++ b/request-api/tests/integration/test_main.py @@ -116,9 +116,9 @@ def test_get_db_fails_after_retries(mock_restored, mock_slack, mock_session_make @patch("main.WebClient") @patch( "main.os.environ.get", - side_effect=lambda k, d=None: "fake_token" - if k == "SLACK_BOT_TOKEN" - else "fake_channel", + side_effect=lambda k, d=None: ( + "fake_token" if k == "SLACK_BOT_TOKEN" else "fake_channel" + ), ) def test_send_slack_alert(mock_env, mock_webclient): mock_client_instance = mock_webclient.return_value diff --git a/request-processor/src/tasks.py b/request-processor/src/tasks.py index fa3100c0..95e41f07 100644 --- a/request-processor/src/tasks.py +++ b/request-processor/src/tasks.py @@ -9,11 +9,10 @@ import s3_transfer_manager import crud import database -from task_interface.tasks import celery, CheckDataFileTask,CheckDataUrlTask +from task_interface.tasks import celery, CheckDataFileTask, CheckDataUrlTask import json from application.core import workflow from application.configurations.config import Directories -import application.core.utils as utils from application.exceptions.customExceptions import CustomException from pathlib import Path from digital_land.collect import Collector, FetchStatus @@ -64,10 +63,12 @@ def check_data_file(request: Dict, directories=None): ) save_response_to_db(request_schema.id, response) else: - save_response_to_db(request_schema.id, log) - raise CustomException(log) + print("need to fix") + # save_response_to_db(request_schema.id, log) + # raise CustomException(log) return _get_request(request_schema.id) + @celery.task(base=CheckDataUrlTask, name=CheckDataUrlTask.name) def check_data_url(request: Dict, directories=None): logger.info("check data url") @@ -97,9 +98,7 @@ def check_data_url(request: Dict, directories=None): collector = Collector(collection_dir=Path(directories.COLLECTION_DIR)) # Override the resource_dir to match our tmp_dir structure collector.resource_dir = Path(tmp_dir) # Use the same directory as tmp_dir - collector.log_dir = ( - Path(directories.COLLECTION_DIR) / "log" / request_schema.id - ) + collector.log_dir = Path(directories.COLLECTION_DIR) / "log" / request_schema.id # TBD: Can test infering plugin from URL, then if fails retry normal method without plugin? # if 'FeatureServer' in request_data.url or 'MapServer' in request_data.url: @@ -154,6 +153,7 @@ def check_data_url(request: Dict, directories=None): raise CustomException(log) return _get_request(request_schema.id) + def handle_check_file(request_schema, request_data, tmp_dir): fileName = request_data.uploaded_filename try: From e359292bbe68aeaa24915702510ebcd143cfe1c7 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Thu, 23 Oct 2025 21:13:25 +0100 Subject: [PATCH 3/3] remove incorrect patch --- request-api/tests/unit/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/request-api/tests/unit/test_main.py b/request-api/tests/unit/test_main.py index f9771e61..a7d27b24 100644 --- a/request-api/tests/unit/test_main.py +++ b/request-api/tests/unit/test_main.py @@ -40,7 +40,7 @@ def _create_request_model(): @patch("crud.create_request", return_value=_create_request_model()) @patch( - "task_interface.check_tasks.CheckDataFileTask.delay", + "task_interface.tasks.CheckDataFileTask.delay", side_effect=OperationalError(exception_msg), ) def test_create_request_when_celery_throws_exception(