Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-05-16 07:38:06.028489

"""

from alembic import op


Expand Down
21 changes: 15 additions & 6 deletions request-api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
HealthStatus,
DependencyHealth,
)
from task_interface.check_tasks import celery, CheckDataFileTask
from task_interface.tasks import celery, CheckDataFileTask, CheckDataUrlTask

CheckDataFileTask = celery.register_task(CheckDataFileTask())
CheckDataUrlTask = celery.register_task(CheckDataUrlTask())

# TODO maybe move to factory
task_map = {"check_file": CheckDataFileTask, "check_url": CheckDataUrlTask}

if os.environ.get("SENTRY_ENABLED", "false").lower() == "true":
sentry_sdk.init(
Expand Down Expand Up @@ -126,9 +130,9 @@ def healthcheck(
),
DependencyHealth(
name="sqs",
status=HealthStatus.HEALTHY
if queue_reachable
else HealthStatus.UNHEALTHY,
status=(
HealthStatus.HEALTHY if queue_reachable else HealthStatus.UNHEALTHY
),
),
],
)
Expand All @@ -144,10 +148,15 @@ def create_request(
request_schema = _map_to_schema(request_model=crud.create_request(db, request))

try:
CheckDataFileTask.delay(request_schema.model_dump())
if request_schema.type == "check_file":
CheckDataFileTask.delay(request_schema.model_dump())
elif request_schema.type == "check_url":
CheckDataUrlTask.delay(request_schema.model_dump())
else:
raise ValueError("invalid request type")

except Exception as error:
logging.error("Async call to celery check data file task failed: %s", error)
logging.error("Async call to celery task failed: %s", error)
raise error

http_response.headers[
Expand Down
6 changes: 3 additions & 3 deletions request-api/tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def test_get_db_fails_after_retries(mock_restored, mock_slack, mock_session_make
@patch("main.WebClient")
@patch(
"main.os.environ.get",
side_effect=lambda k, d=None: "fake_token"
if k == "SLACK_BOT_TOKEN"
else "fake_channel",
side_effect=lambda k, d=None: (
"fake_token" if k == "SLACK_BOT_TOKEN" else "fake_channel"
),
)
def test_send_slack_alert(mock_env, mock_webclient):
mock_client_instance = mock_webclient.return_value
Expand Down
2 changes: 1 addition & 1 deletion request-api/tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _create_request_model():

@patch("crud.create_request", return_value=_create_request_model())
@patch(
"task_interface.check_tasks.CheckDataFileTask.delay",
"task_interface.tasks.CheckDataFileTask.delay",
side_effect=OperationalError(exception_msg),
)
def test_create_request_when_celery_throws_exception(
Expand Down
115 changes: 79 additions & 36 deletions request-processor/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import s3_transfer_manager
import crud
import database
from task_interface.check_tasks import celery, CheckDataFileTask
from task_interface.tasks import celery, CheckDataFileTask, CheckDataUrlTask
import json
from application.core import workflow
from application.configurations.config import Directories
import application.core.utils as utils
from application.exceptions.customExceptions import CustomException
from pathlib import Path
from digital_land.collect import Collector, FetchStatus
Expand All @@ -24,8 +23,8 @@


@celery.task(base=CheckDataFileTask, name=CheckDataFileTask.name)
def check_datafile(request: Dict, directories=None):
logger.info("check datafile")
def check_data_file(request: Dict, directories=None):
logger.info("check data file")
request_schema = schemas.Request.model_validate(request)
request_data = request_schema.params
if not request_schema.status == "COMPLETE":
Expand All @@ -39,55 +38,99 @@ def check_datafile(request: Dict, directories=None):
for key, value in data_dict.items():
setattr(directories, key, value)

fileName = ""
file_name = ""
tmp_dir = os.path.join(
directories.COLLECTION_DIR, "resource", request_schema.id
)
# Ensure tmp_dir exists, create it if it doesn't
Path(tmp_dir).mkdir(parents=True, exist_ok=True)
if request_data.type == "check_file":
fileName = handle_check_file(request_schema, request_data, tmp_dir)

elif request_data.type == "check_url":
# With Collector from digital-land/collect, edit to use correct directory path without changing Collector class
collector = Collector(collection_dir=Path(directories.COLLECTION_DIR))
# Override the resource_dir to match our tmp_dir structure
collector.resource_dir = Path(tmp_dir) # Use the same directory as tmp_dir
collector.log_dir = (
Path(directories.COLLECTION_DIR) / "log" / request_schema.id
file_name = handle_check_file(request_schema, request_data, tmp_dir)

if file_name:
response = workflow.run_workflow(
file_name,
request_schema.id,
request_data.collection,
request_data.dataset,
"",
request_data.geom_type if hasattr(request_data, "geom_type") else "",
(
request_data.column_mapping
if hasattr(request_data, "column_mapping")
else {}
),
directories,
)
save_response_to_db(request_schema.id, response)
else:
print("need to fix")
# save_response_to_db(request_schema.id, log)
# raise CustomException(log)
return _get_request(request_schema.id)

# TBD: Can test infering plugin from URL, then if fails retry normal method without plugin?
# if 'FeatureServer' in request_data.url or 'MapServer' in request_data.url:
# request_data.plugin = "arcgis"

status = collector.fetch(request_data.url, plugin=request_data.plugin)
logger.info(f"Collector Fetch status: {status}")
@celery.task(base=CheckDataUrlTask, name=CheckDataUrlTask.name)
def check_data_url(request: Dict, directories=None):
logger.info("check data url")
request_schema = schemas.Request.model_validate(request)
request_data = request_schema.params
if not request_schema.status == "COMPLETE":
if not directories:
directories = Directories
elif directories:
data_dict = json.loads(directories)
# Create an instance of the Directories class
directories = Directories()
# Update attribute values based on the dictionary
for key, value in data_dict.items():
setattr(directories, key, value)

# The resource is saved in collector.resource_dir with hash as filename
resource_files = list(collector.resource_dir.iterdir())
fileName = ""
tmp_dir = os.path.join(
directories.COLLECTION_DIR, "resource", request_schema.id
)
# Ensure tmp_dir exists, create it if it doesn't
Path(tmp_dir).mkdir(parents=True, exist_ok=True)
# if request_data.type == "check_file":
# fileName = handle_check_file(request_schema, request_data, tmp_dir)

# With Collector from digital-land/collect, edit to use correct directory path without changing Collector class
collector = Collector(collection_dir=Path(directories.COLLECTION_DIR))
# Override the resource_dir to match our tmp_dir structure
collector.resource_dir = Path(tmp_dir) # Use the same directory as tmp_dir
collector.log_dir = Path(directories.COLLECTION_DIR) / "log" / request_schema.id

# TBD: Can test infering plugin from URL, then if fails retry normal method without plugin?
# if 'FeatureServer' in request_data.url or 'MapServer' in request_data.url:
# request_data.plugin = "arcgis"

status = collector.fetch(request_data.url, plugin=request_data.plugin)
logger.info(f"Collector Fetch status: {status}")

log = {}
# The resource is saved in collector.resource_dir with hash as filename
resource_files = list(collector.resource_dir.iterdir())

if status == FetchStatus.OK:
if resource_files and len(resource_files) == 1:
logger.info(f"Resource Files Path from collector: {resource_files}")
fileName = resource_files[-1].name # Get the hash filename
logger.info(f"File Hash From Collector: {fileName}")
log = {}

if status == FetchStatus.OK:
if resource_files and len(resource_files) == 1:
logger.info(f"Resource Files Path from collector: {resource_files}")
fileName = resource_files[-1].name # Get the hash filename
logger.info(f"File Hash From Collector: {fileName}")

else:
log["message"] = "No endpoint files found after successful fetch."
log["status"] = str(status)
log["exception_type"] = "URL check failed"
save_response_to_db(request_schema.id, log)
raise CustomException(log)
else:
log["message"] = "No endpoint files found after successful fetch."
log["status"] = str(status)
log["message"] = "Fetch operation failed"
log["exception_type"] = "URL check failed"
save_response_to_db(request_schema.id, log)
logger.warning(f"URL check failed with fetch status: {status}")
raise CustomException(log)
else:
log["status"] = str(status)
log["message"] = "Fetch operation failed"
log["exception_type"] = "URL check failed"
save_response_to_db(request_schema.id, log)
logger.warning(f"URL check failed with fetch status: {status}")
raise CustomException(log)

if fileName:
response = workflow.run_workflow(
Expand Down
13 changes: 0 additions & 13 deletions task_interface/check_tasks.py

This file was deleted.

34 changes: 34 additions & 0 deletions task_interface/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
module to create the celery app and fake tasks to allow tasks to be accessed separetely from different projects
and therefore worker app/api does not need to use the same requirements as worker. In our case stops the api from
needed to install digital-land and all it's dependencies

when using in the api for a celery set up the task classes can be instantiated and used to
que on the message broker

when using in the worker you should overwrite the task name with the functionality
"""

import os
from typing import Dict, Optional

from celery import Task, Celery

# celery object with same broker instantiation from the command line to feed both app
# and worker
celery = Celery("async-request-processor", broker=os.environ["CELERY_BROKER_URL"])


# classes to be used in both app and worker, in app used to que task,
# in the worker you should override the task name with the correct task
class CheckDataFileTask(Task):
name = "task_interface.check_datafile_task"

def run(self, request: Dict, directories: Optional[str] = None):
raise NotImplementedError("Base class must implement")

class CheckDataUrlTask(Task):
name = "task_interface.check_url_task"

def run(self, request: Dict, directories: Optional[str] = None):
raise NotImplementedError("Base class must implement")
Loading