Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions request-api/makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
test: lint test-coverage

Comment thread
eveleighoj marked this conversation as resolved.
lint:: black-check flake8
lint: black-check flake8

black-check:
black --check .
black-check:
python -m black --check ./src ./tests

black:
black .
flake8:
python -m flake8 ./src ./tests

flake8:
flake8 .
black:
python -m black .

test-coverage:: coverage-unit coverage-integration coverage-acceptance

Expand Down
9 changes: 5 additions & 4 deletions request-processor/makerules/python.mk
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
all:: lint test-coverage

lint:
make black ./src/application
python -m flake8 ./src/application
lint: black-check flake8

black-check:
black --check .
python -m black --check --exclude "digital-land" ./src ./tests

flake8:
python -m flake8 --exclude="./src/digital-land" ./src ./tests

black:
python -m black .
Expand Down
48 changes: 44 additions & 4 deletions request-processor/src/application/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def fetch_add_data_response(
output_path,
specification_dir,
cache_dir,
url,
endpoint,
):
try:
specification = Specification(specification_dir)
Expand All @@ -210,6 +210,7 @@ def fetch_add_data_response(

existing_entities = []
new_entities = []
entity_org_mapping = []
issues_log = None

for idx, resource_file in enumerate(files_in_resource):
Expand All @@ -227,7 +228,7 @@ def fetch_add_data_response(
resource=resource_from_path(resource_file_path),
valid_category_values=valid_category_values,
disable_lookups=False,
endpoints=[url],
endpoints=[endpoint],
)

existing_entities.extend(
Expand All @@ -253,13 +254,19 @@ def fetch_add_data_response(
pipeline_dir=pipeline_dir,
specification=specification,
cache_dir=cache_dir,
endpoints=[url] if url else None,
endpoints=[endpoint] if endpoint else None,
)
logger.info(
f"Found {len(new_lookups)} unidentified lookups in {resource_file}"
)
new_entities.extend(new_lookups)

# Default create a entity-organisation mapping, front end can use the 'authoritative' flag
entity_org_mapping = create_entity_organisation(
new_lookups, dataset, organisation_provider
)
# TODO, save to pipeline as well for rerun?

# Reload pipeline to pick up newly saved lookups
pipeline = Pipeline(
pipeline_dir, dataset, specification=specification
Expand All @@ -274,7 +281,7 @@ def fetch_add_data_response(
resource=resource_from_path(resource_file_path),
valid_category_values=valid_category_values,
disable_lookups=False,
endpoints=[url],
endpoints=[endpoint],
)
else:
logger.info(f"No unidentified lookups found in {resource_file}")
Expand All @@ -293,6 +300,7 @@ def fetch_add_data_response(
"existing-in-resource": len(existing_entities),
"new-entities": new_entities_breakdown,
"existing-entities": existing_entities_breakdown,
"entity-organisation": entity_org_mapping,
"pipeline-issues": [dict(issue) for issue in issues_log.rows]
if issues_log
else [],
Expand Down Expand Up @@ -354,6 +362,38 @@ def _get_existing_entities_breakdown(existing_entities):
return breakdown


def create_entity_organisation(new_entities, dataset, organisation):
"""
Create entity-organisation mapping from new entities.

Args:
new_entities: List of entity dicts with 'entity' key
dataset: Dataset name
organisation: Organisation identifier

Returns:
List with single dict containing dataset, entity-minimum, entity-maximum, organisation
"""
if not new_entities:
return []

entity_values = [
entry.get("entity") for entry in new_entities if entry.get("entity")
]

if not entity_values:
return []

return [
{
"dataset": dataset,
"entity-minimum": min(entity_values),
"entity-maximum": max(entity_values),
"organisation": organisation,
}
]


def _map_transformed_entities(transformed_csv_path, pipeline_dir): # noqa: C901
"""Extract unique entities from transformed CSV and lookup their details in lookup.csv."""

Expand Down
4 changes: 4 additions & 0 deletions request-processor/src/application/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ def create_user_friendly_error_log(exception_detail):
status_code = exception_detail.get("errCode")
exception_type = exception_detail.get("exceptionType")
content_type = exception_detail.get("contentType")
plugin = exception_detail.get("plugin")

user_message = "An error occurred, please try again later."

# The ordering here has been considered to show the most relevant message to users in the front end
if exception_type in ["SSLError", "SSLCertVerificationError"]:
user_message = "SSL certificate verification failed"
elif content_type and "text/html" in content_type:
Expand All @@ -264,6 +266,8 @@ def create_user_friendly_error_log(exception_detail):
user_message = "The URL must be accessible"
elif status_code == "404":
user_message = "The URL does not exist. Check the URL you've entered is correct (HTTP 404 error)"
elif plugin == "arcgis" and status_code == "200":
user_message = "URL must be the data layer"

result = dict(exception_detail)
result["message"] = user_message
Expand Down
5 changes: 4 additions & 1 deletion request-processor/src/application/core/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import hashlib
import os
import csv
from pathlib import Path
Expand Down Expand Up @@ -463,6 +464,8 @@ def add_data_workflow(
] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'"
return response_data

endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()

# All processes arount transforming the data and generating pipeline summary
pipeline_summary = fetch_add_data_response(
dataset=dataset,
Expand All @@ -472,7 +475,7 @@ def add_data_workflow(
output_path=output_path,
specification_dir=directories.SPECIFICATION_DIR,
cache_dir=directories.CACHE_DIR,
url=url,
endpoint=endpoint_hash,
)

# Create endpoint and source summaries in workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ def __init__(self, log={}):
# Content type of the response useful for text/html checks (when not using arcgis/wfs plugin)
self.content_type = log.get("content-type")
self.message_detail = log.get("user_message_detail")
self.plugin = log.get("plugin")

self.load()
super().__init__(self.detail)

def load(self):
# This is not the best way to do this but keeps backward compatibility for now
self.detail = {
"errCode": str(self.status) if self.status is not None else None,
"errType": "User Error",
Expand All @@ -60,4 +62,5 @@ def load(self):
"fetchStatus": self.fetch_status,
"exceptionType": self.exception_type,
"contentType": self.content_type,
"plugin": self.plugin,
}
3 changes: 2 additions & 1 deletion request-processor/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def handle_check_file(request_schema, request_data, tmp_dir):


@celery.task(base=CheckDataUrlTask, name=CheckDataUrlTask.name)
def check_dataurl(request: Dict, directories=None):
def check_dataurl(request: Dict, directories=None): # noqa
logger.info(
f"Started check_dataurl task for request_id = {request.get('id', 'unknown')}"
)
Expand Down Expand Up @@ -513,6 +513,7 @@ def _fetch_resource(resource_dir, url):
}
)
elif log.get("exception") or log.get("status", "").startswith("4"):
log["plugin"] = plugin # Save plugin used for arcgis error context
break

# All fetch attempts failed - include content-type if available
Expand Down
2 changes: 1 addition & 1 deletion request-processor/tests/integration/src/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_check_datafile(
"https://example.com/arcgis/rest/services/MapServer",
(
None,
'{"type":"FeatureCollection","features":[{"type":"Feature","properties":{"name":"Test Feature"}}]}'.encode(
'{"type":"FeatureCollection","features":[{"type":"Feature","properties":{"name":"Test Feature"}}]}'.encode( # noqa: E501
"utf-8"
),
),
Expand Down
18 changes: 8 additions & 10 deletions request-processor/tests/unit/src/application/core/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import pytest
import os
import csv
from unittest.mock import MagicMock
from src.application.core.pipeline import (
fetch_add_data_response,
Expand All @@ -17,7 +15,7 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path):
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand Down Expand Up @@ -68,7 +66,7 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand All @@ -83,7 +81,7 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path):
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand All @@ -103,7 +101,7 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand All @@ -118,7 +116,7 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path):
input_path = tmp_path / "nonexistent"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

pipeline_dir.mkdir(parents=True)

Expand All @@ -138,7 +136,7 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)


Expand All @@ -150,7 +148,7 @@ def test_fetch_add_data_response_handles_processing_error(monkeypatch, tmp_path)
input_path = tmp_path / "resource"
specification_dir = tmp_path / "specification"
cache_dir = tmp_path / "cache"
url = "http://example.com/endpoint"
endpoint = "abc123hash"

input_path.mkdir(parents=True)
pipeline_dir.mkdir(parents=True)
Expand Down Expand Up @@ -178,7 +176,7 @@ def raise_exception(*args, **kwargs):
output_path=str(input_path / "output.csv"),
specification_dir=str(specification_dir),
cache_dir=str(cache_dir),
url=url,
endpoint=endpoint,
)

assert "new-in-resource" in result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,6 @@ def test_validate_endpoint_csv_read_error(monkeypatch, tmp_path):
"""Test validate_endpoint when reading CSV fails"""
pipeline_dir = tmp_path / "pipeline"
pipeline_dir.mkdir()
url = "http://example.com/endpoint"

endpoint_csv_path = pipeline_dir / "endpoint.csv"
endpoint_csv_path.write_bytes(b"\x00\x00\x00")

Expand All @@ -380,7 +378,6 @@ def test_validate_source_creates_new_source(monkeypatch, tmp_path):
"""Test validate_source creates new source entry when it doesn't exist"""
pipeline_dir = tmp_path / "pipeline"
pipeline_dir.mkdir()
source_csv_path = pipeline_dir / "source.csv"

documentation_url = "http://example.com/doc"
collection = "test-collection"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
fetch_pipeline_csvs,
add_data_workflow,
fetch_add_data_pipeline_csvs,
fetch_add_data_collection_csvs,
)
import csv
import hashlib
import os
from pathlib import Path
import urllib
from urllib.error import HTTPError


Expand Down Expand Up @@ -409,7 +408,7 @@ def fake_fetch_add_data_response(
output_path,
specification_dir,
cache_dir,
url,
endpoint,
):
called["fetch_add_data_response"] = {
"dataset": dataset,
Expand All @@ -419,7 +418,7 @@ def fake_fetch_add_data_response(
"output_path": output_path,
"specification_dir": specification_dir,
"cache_dir": cache_dir,
"url": url,
"endpoint": endpoint,
}
return {"result": "ok"}

Expand Down Expand Up @@ -470,7 +469,8 @@ def fake_fetch_add_data_response(
== directories.SPECIFICATION_DIR
)
assert called["fetch_add_data_response"]["cache_dir"] == directories.CACHE_DIR
assert called["fetch_add_data_response"]["url"] == url
expected_endpoint_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()
assert called["fetch_add_data_response"]["endpoint"] == expected_endpoint_hash


def test_fetch_add_data_pipeline_csvs_from_url(monkeypatch, tmp_path):
Expand Down
7 changes: 3 additions & 4 deletions request-processor/tests/unit/src/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import pytest
import json
from src import tasks
from src.tasks import save_response_to_db, _fetch_resource, check_dataurl
from src.tasks import save_response_to_db, check_dataurl
from request_model import models, schemas
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock
from application.exceptions.customExceptions import CustomException
from application.configurations.config import Directories


@pytest.mark.parametrize(
Expand Down Expand Up @@ -640,7 +639,7 @@ def mock_run_workflow(
tasks, "_get_request", lambda rid: {"id": rid, "status": "COMPLETE"}
)

result = check_dataurl(request, directories_json)
check_dataurl(request, directories_json)

assert len(workflow_calls) == 1
assert workflow_calls[0]["file_name"] == "brownfield-data.csv"
Expand Down
Loading