From da143ecf9879ed17cda92a95360c8940d0734edd Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Thu, 22 Jan 2026 13:08:41 +0000 Subject: [PATCH 1/2] edit how lookups are created and used (#457) * edit how lookups are created andd used to remove complication from the phases * remove unneccessary step from lookup phase * lock black version due to breaking change * add init files so tests are imiported correctly * fix transform tests by correcting lookups * remove old test location * fix async_request_pocessor tests --- .pre-commit-config.yaml | 2 +- digital_land/phase/harmonise.py | 2 +- digital_land/phase/lookup.py | 82 +++++++++---------- digital_land/phase/organisation.py | 4 + digital_land/pipeline/main.py | 17 ++-- pyproject.toml | 2 +- .../test_async_backend_pipeline_run.py | 18 ++-- tests/integration/configuration/__init__.py | 0 tests/integration/pipeline/__init__.py | 0 .../test_main.py} | 58 +++++++++++-- tests/unit/phase/test_lookup.py | 3 +- 11 files changed, 112 insertions(+), 76 deletions(-) create mode 100644 tests/integration/configuration/__init__.py create mode 100644 tests/integration/pipeline/__init__.py rename tests/integration/{test_pipeline.py => pipeline/test_main.py} (93%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7bedebd69..8054cceb2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 24.1.1 + rev: 24.8.0 hooks: - id: black - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/digital_land/phase/harmonise.py b/digital_land/phase/harmonise.py index 45022fd6b..ce7fbdc2a 100644 --- a/digital_land/phase/harmonise.py +++ b/digital_land/phase/harmonise.py @@ -162,7 +162,7 @@ def process(self, stream): logger.error( f"Exception occurred while fetching geoX, geoY coordinates: {e}" ) - + # TODO need to identify why below exists and possibly remove # ensure typology fields are a CURIE for typology in ["organisation", "geography", "document"]: value = o.get(typology, "") diff --git a/digital_land/phase/lookup.py b/digital_land/phase/lookup.py index f4edfd9e2..454b5f68a 100644 --- a/digital_land/phase/lookup.py +++ b/digital_land/phase/lookup.py @@ -84,17 +84,8 @@ def get_entity(self, block): organisation=organisation, reference=reference, ) + or self.lookup(prefix=prefix, reference=reference) ) - if not entity: - # TBD this needs to specifically not match unless the organisation and other columns - # are empty in the lookups.csv probably isn't a change here. - # or by the CURIE - entity = self.lookup(prefix=prefix, reference=reference) - - # When obtaining an entity number using only the prefix and reference, check if the - # lookup includes an associated organisation. If it does, do not use the entity number, - # as it is organisation specific. - entity = self.check_associated_organisation(entity) if entity and self.entity_range: if ( @@ -195,43 +186,44 @@ def process(self, stream): reference = row.get("reference", "") entity_number = row.get("entity", "") - if not (prefix and reference and entity_number in self.reverse_lookups): + if not (prefix and reference and entity_number): yield block continue - value = self.reverse_lookups[entity_number] - - if value: - organisation = value[-1].split(",")[-1] - find_entity = self.lookup( - prefix=prefix, - organisation=organisation, - reference=reference, - ) - if not find_entity: - # TBD this needs to specifically not match unless the organisation and other columns - # are empty in the lookups.csv probably isn't a change here. - # or by the CURIE - find_entity = self.lookup(prefix=prefix, reference=reference) - - # When obtaining an entity number using only the prefix and reference, check if the - # lookup includes an associated organisation. If it does, do not use the entity number, - # as it is organisation specific. - find_entity = self.check_associated_organisation(find_entity) - - if not find_entity or ( - str(find_entity) in self.redirect_lookups - and int(self.redirect_lookups[str(find_entity)].get("status", 0)) - == 410 - ): - if self.odp_collections and prefix in self.odp_collections: - self.issues.log_issue( - prefix, - "missing associated entity", - reference, - line_number=line_number, - ) - else: - row[self.entity_field] = find_entity + + # Get organisation from block metadata (set by OrganisationPhase) + organisation = block.get("organisation", "").replace( + "local-authority-eng", "local-authority" + ) + + find_entity = self.lookup( + prefix=prefix, + organisation=organisation, + reference=reference, + ) + if not find_entity: + # TBD this needs to specifically not match unless the organisation and other columns + # are empty in the lookups.csv probably isn't a change here. + # or by the CURIE + find_entity = self.lookup(prefix=prefix, reference=reference) + + # When obtaining an entity number using only the prefix and reference, check if the + # lookup includes an associated organisation. If it does, do not use the entity number, + # as it is organisation specific. + find_entity = self.check_associated_organisation(find_entity) + + if not find_entity or ( + str(find_entity) in self.redirect_lookups + and int(self.redirect_lookups[str(find_entity)].get("status", 0)) == 410 + ): + if self.odp_collections and prefix in self.odp_collections: + self.issues.log_issue( + prefix, + "missing associated entity", + reference, + line_number=line_number, + ) + else: + row[self.entity_field] = find_entity yield block diff --git a/digital_land/phase/organisation.py b/digital_land/phase/organisation.py index ba99cceab..040f26f6b 100644 --- a/digital_land/phase/organisation.py +++ b/digital_land/phase/organisation.py @@ -24,4 +24,8 @@ def process(self, stream): self.issues.log_issue( "organisation", "invalid organisation", organisation_value ) + + # Store at block level for post-pivot lookups + block["organisation"] = row["organisation"] + yield block diff --git a/digital_land/pipeline/main.py b/digital_land/pipeline/main.py index ed4a7433f..01a46ddab 100644 --- a/digital_land/pipeline/main.py +++ b/digital_land/pipeline/main.py @@ -256,24 +256,17 @@ def load_lookup(self): or row.get("pipeline", "") ) reference = row.get("reference", "") or row.get("value", "") - - # composite key, ordered by specificity - resource_lookup = self.lookup.setdefault(row.get("resource", ""), {}) - resource_lookup[ - lookup_key( - entry_number=entry_number, - prefix=prefix, - reference=reference, - ) - ] = row["entity"] - organisation = row.get("organisation", "") - # replace local-authority-eng while we migrate + # TODO remove this replacement for local-authority-eng now that migration is complete organisation = organisation.replace( "local-authority-eng", "local-authority" ) + + # composite key, ordered by specificity + resource_lookup = self.lookup.setdefault(row.get("resource", ""), {}) resource_lookup[ lookup_key( + entry_number=entry_number, prefix=prefix, reference=reference, organisation=organisation, diff --git a/pyproject.toml b/pyproject.toml index ec82c6d33..19d458a49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ test = [ "sphinx", "sphinx-autobuild", "sphinx_rtd_theme", - "black", + "black==24.8.0", ] notebook = [ diff --git a/tests/acceptance/test_async_backend_pipeline_run.py b/tests/acceptance/test_async_backend_pipeline_run.py index eda0d6d3b..aedc09b7f 100644 --- a/tests/acceptance/test_async_backend_pipeline_run.py +++ b/tests/acceptance/test_async_backend_pipeline_run.py @@ -36,7 +36,7 @@ def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path): endpoints = ["d779ad1c91c5a46e2d4ace4d5446d7d7f81df1ed058f882121070574697a5412"] pipeline_dir = test_dirs["pipeline_dir"] - organisation = "test-org" + organisation = "test-org:test" request_id = request_id collection_dir = test_dirs["collection_dir"] converted_dir = test_dirs["converted_resource_dir"] @@ -78,7 +78,7 @@ def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path): dictwriter.writerow(row2) # Create organisation.csv with data - row = {"organisation": "test-org", "name": "Test Org"} + row = {"organisation": "test-org:test", "name": "Test Org"} fieldnames = row.keys() with open(organisation_path, "w") as f: dictwriter = csv.DictWriter(f, fieldnames=fieldnames) @@ -115,7 +115,7 @@ def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path): # Load organisations organisation = Organisation(organisation_path, Path(pipeline.path)) - default_values["organisation"] = organisation + default_values["organisation"] = "test-org:test" try: run_pipeline( ConvertPhase( @@ -216,12 +216,12 @@ def test_async_pipeline_run(test_dirs): { "reference": "ABC_0001", "entry-date": "2024-01-01", - "organisation": "test-org", + "organisation": "test-org:test", }, { "reference": "ABC_0002", "entry-date": "2024-01-02", - "organisation": "test-org", + "organisation": "test-org:test", }, ] @@ -282,12 +282,12 @@ def test_pipeline_output_is_complete(test_dirs): { "reference": "ABC_0001", "entry-date": "2024-01-01", - "organisation": "test-org", + "organisation": "test-org:test", }, { "reference": "ABC_0002", "entry-date": "2024-01-02", - "organisation": "test-org", + "organisation": "test-org:test", }, ] @@ -433,7 +433,7 @@ def test_pipeline_lookup_phase(test_dirs): { "reference": "ABC_0001", "entry-date": "2025-01-01", - "organisation": "test-org", + "organisation": "test-org:test", "article-4-direction": "a4d1", } ] @@ -481,7 +481,7 @@ def test_pipeline_lookup_phase_assign_reference_entity(test_dirs): { "reference": "ABC_0001", "entry-date": "2025-01-01", - "organisation": "test-org", + "organisation": "test-org:test", "article-4-direction": "a4d2", } ] diff --git a/tests/integration/configuration/__init__.py b/tests/integration/configuration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/pipeline/__init__.py b/tests/integration/pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/test_pipeline.py b/tests/integration/pipeline/test_main.py similarity index 93% rename from tests/integration/test_pipeline.py rename to tests/integration/pipeline/test_main.py index 5b1df0009..b76700d45 100644 --- a/tests/integration/test_pipeline.py +++ b/tests/integration/pipeline/test_main.py @@ -3,12 +3,15 @@ import csv import urllib.request import pandas as pd +import logging from urllib.error import URLError from digital_land.pipeline import Pipeline from digital_land.pipeline import Lookups from digital_land.specification import Specification from digital_land.organisation import Organisation +logger = logging.getLogger(__name__) + def write_as_csv(dir, filename, data): with open(os.path.join(dir, filename), "w") as f: @@ -551,6 +554,53 @@ def test_load_concat_no_prepend_append(empty_pipeline_dir): } +def test_load_lookup_creates_single_lookup_per_row(tmp_path): + """ + Test that load_lookup creates a single lookup entry per row in lookup.csv. + Each row should create one lookup key based on all provided fields. + Keys are normalized (lowercased, special chars removed). + """ + # -- Arrange -- + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + + test_pipeline = "test-pipeline" + + # Create lookup.csv with various combinations of fields + lookup_data = { + "resource": ["", "", "res123"], + "entry-number": ["", "5", ""], + "prefix": ["ancient-woodland", "conservation-area", "listed-building"], + "reference": ["AW001", "CA002", "LB003"], + "organisation": ["local-authority:ABC", "", "local-authority:XYZ"], + "entity": ["1000001", "1000002", "1000003"], + } + pd.DataFrame(lookup_data).to_csv(f"{pipeline_dir}/lookup.csv", index=False) + + # -- Act -- + pipeline = Pipeline(str(pipeline_dir), test_pipeline) + + # -- Assert -- + # Check that lookups are created correctly + # Row 1: prefix + reference + organisation (no resource) + assert "" in pipeline.lookup, "Should have empty resource key for general lookups" + general_lookups = pipeline.lookup[""] + + # Row 1: normalized key (lowercased, colons removed) + assert ",ancient-woodland,aw001,local-authorityabc" in general_lookups + assert general_lookups[",ancient-woodland,aw001,local-authorityabc"] == "1000001" + + # Row 2: entry-number + prefix + reference (no organisation) + assert "5,conservation-area,ca002," in general_lookups + assert general_lookups["5,conservation-area,ca002,"] == "1000002" + + # Row 3: resource-scoped lookup + assert "res123" in pipeline.lookup, "Should have resource-specific key" + resource_lookups = pipeline.lookup["res123"] + assert ",listed-building,lb003,local-authorityxyz" in resource_lookups + assert resource_lookups[",listed-building,lb003,local-authorityxyz"] == "1000003" + + @pytest.fixture(scope="session") def specification_dir(tmp_path_factory): """Download specification files from GitHub for testing""" @@ -665,7 +715,7 @@ def get_test_lookup_config(): "reference": ["0", "1"], "entity": ["2200001", "2200002"], "start-date": ["", ""], - "organisation": ["101", "101"], + "organisation": ["local-authority:LBH", "local-authority:LBH"], "end-date": ["", ""], "entry-date": ["", ""], "endpoint": ["", ""], @@ -713,11 +763,11 @@ def test_pipeline_transform_basic( pd.DataFrame(get_test_lookup_config()).to_csv( f"{pipeline_dir}/lookup.csv", index=False ) - # Initialize pipeline components spec = Specification(specification_dir) org = Organisation(organisation_path=organisation_path) pipeline = Pipeline(str(pipeline_dir), dataset_name, specification=spec) + logger.info(f"Pipeline Lookups: {pipeline.lookup}") output_path = tmp_path / "output" / "transformed.csv" output_path.parent.mkdir(parents=True, exist_ok=True) @@ -893,7 +943,3 @@ def test_pipeline_transform_with_unmapped_reference_lookup_disabled( # Verify reference field for unmapped data exists in output reference_values = output_df[output_df["field"] == "reference"]["value"].tolist() assert "99" in reference_values, "Unmapped reference 99 should be in output" - - -if __name__ == "__main__": - pytest.main() diff --git a/tests/unit/phase/test_lookup.py b/tests/unit/phase/test_lookup.py index d52908ac1..55f3153b1 100644 --- a/tests/unit/phase/test_lookup.py +++ b/tests/unit/phase/test_lookup.py @@ -28,6 +28,7 @@ def get_input_stream(): def get_input_stream_with_linked_field(): return [ { + "organisation": "local-authorityabc", "row": { "fact": "abc", "entity": "10", @@ -37,7 +38,7 @@ def get_input_stream_with_linked_field(): "reference": "a4d1", "entry-number": 1, "line-number": 2, - } + }, } ] From 74fe9338c07e6ffbf2b71d33d3f7664c9f4a82ac Mon Sep 17 00:00:00 2001 From: Jandu <143708067+Jandums@users.noreply.github.com> Date: Thu, 22 Jan 2026 16:24:11 +0000 Subject: [PATCH 2/2] Msj/fix refill logs (#417) * Refill Logs on Fetch * Tests * add test structure * test for overwitting * wip * remove random file * run black on new files * fix test and mock responses --------- Co-authored-by: eveleighoj <35256612+eveleighoj@users.noreply.github.com> --- digital_land/commands.py | 1 + tests/integration/test_collect.py | 99 +++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 7012c504f..463f0f452 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -959,6 +959,7 @@ def validate_and_add_data_input( endpoint=endpoint["endpoint"], end_date=endpoint["end-date"], plugin=endpoint["plugin"], + refill_todays_logs=True, ) try: # log is already returned from fetch, but read from file if needed for verification diff --git a/tests/integration/test_collect.py b/tests/integration/test_collect.py index e118ed590..94d6000a9 100644 --- a/tests/integration/test_collect.py +++ b/tests/integration/test_collect.py @@ -3,10 +3,15 @@ import urllib.request import pytest +from pathlib import Path + from datetime import datetime from click.testing import CliRunner +from digital_land.collect import FetchStatus from digital_land.cli import cli +from digital_land.collect import Collector + from tests.utils.helpers import hash_digest ENDPOINT = "https://raw.githubusercontent.com/digital-land/digital-land-python/main/tests/data/resource_examples/csv.csv" @@ -64,3 +69,97 @@ def resource_collected(collection_dir, resource): raw = urllib.request.urlopen(ENDPOINT).read().decode("utf-8") downloaded = "\n".join(raw.splitlines()) # Convert CRLF to LF return saved == downloaded + + +def create_mock_response(mocker, status_code=200, content=b"", content_type="text/csv"): + """Helper to create a mock requests response object.""" + mock_response = mocker.Mock() + mock_response.status_code = status_code + mock_response.content = content + mock_response.headers = {"Content-Type": content_type} + mock_response.request = mocker.Mock() + mock_response.request.headers = {"User-Agent": "test-agent"} + return mock_response + + +class TestCollector: + def test_fetch_overwrite_endpoint_logs(self, collection_dir, tmp_path, mocker): + """fetch a single source endpoint URL, and add it to the collection""" + # -- Arrange -- + url = "https://example.com/test-endpoint.csv" + mock_content = b"reference,name\n1,Test Name\n2,Another Name" + + mock_response = create_mock_response( + mocker, status_code=500, content=mock_content + ) + + mocker.patch( + "digital_land.collect.requests.Session.get", + return_value=mock_response, + ) + + log_dir = Path(collection_dir) / "log" + resource_dir = Path(collection_dir) / "resource" + collector = Collector(resource_dir=str(resource_dir), log_dir=str(log_dir)) + fetch_status, log = collector.fetch(url=url, refill_todays_logs=True) + + # -- Act -- + # run initial fetch to create log + fetch_status, log = collector.fetch(url=url, refill_todays_logs=True) + + assert fetch_status == FetchStatus.FAILED, "initial mock should fail" + + # update to a successful response + mock_response.status_code = 200 + + # now run without refill_todays_logs to ensure log is not overwritten + fetch_status, log = collector.fetch(url=url, refill_todays_logs=False) + + assert ( + fetch_status == FetchStatus.ALREADY_FETCHED + ), "log should not be overwritten" + mock_response.status_code = 200 + fetch_status, log = collector.fetch(url=url, refill_todays_logs=True) + + assert fetch_status == FetchStatus.OK + assert log["endpoint-url"] == url + assert log["status"] == "200" + assert "resource" in log + assert log["resource"] is not None + + # Check log file was created + log_files = list(log_dir.rglob("*.json")) + assert len(log_files) == 1 + + # Read and verify log file content + with open(log_files[0], "r") as f: + saved_log = json.load(f) + assert saved_log["endpoint-url"] == url + assert saved_log["status"] == "200" + + def test_fetch_handles_non_200_status(self, collection_dir, tmp_path, mocker): + """Test that fetch handles non-200 status codes correctly""" + # -- Arrange -- + url = "https://example.com/not-found.csv" + + mock_response = create_mock_response( + mocker, status_code=404, content=b"Not Found", content_type="text/html" + ) + + mocker.patch( + "digital_land.collect.requests.Session.get", + return_value=mock_response, + ) + + log_dir = Path(collection_dir) / "log" + resource_dir = Path(collection_dir) / "resource" + + collector = Collector(resource_dir=str(resource_dir), log_dir=str(log_dir)) + + # -- Act -- + fetch_status, log = collector.fetch(url=url) + + # -- Assert -- + assert fetch_status == FetchStatus.FAILED + assert log["status"] == "404" + assert "resource" not in log