Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 24.1.1
rev: 24.8.0
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
1 change: 1 addition & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ def validate_and_add_data_input(
endpoint=endpoint["endpoint"],
end_date=endpoint["end-date"],
plugin=endpoint["plugin"],
refill_todays_logs=True,
)
try:
# log is already returned from fetch, but read from file if needed for verification
Expand Down
2 changes: 1 addition & 1 deletion digital_land/phase/harmonise.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def process(self, stream):
logger.error(
f"Exception occurred while fetching geoX, geoY coordinates: {e}"
)

# TODO need to identify why below exists and possibly remove
# ensure typology fields are a CURIE
for typology in ["organisation", "geography", "document"]:
value = o.get(typology, "")
Expand Down
82 changes: 37 additions & 45 deletions digital_land/phase/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,8 @@ def get_entity(self, block):
organisation=organisation,
reference=reference,
)
or self.lookup(prefix=prefix, reference=reference)
)
if not entity:
# TBD this needs to specifically not match unless the organisation and other columns
# are empty in the lookups.csv probably isn't a change here.
# or by the CURIE
entity = self.lookup(prefix=prefix, reference=reference)

# When obtaining an entity number using only the prefix and reference, check if the
# lookup includes an associated organisation. If it does, do not use the entity number,
# as it is organisation specific.
entity = self.check_associated_organisation(entity)

if entity and self.entity_range:
if (
Expand Down Expand Up @@ -195,43 +186,44 @@ def process(self, stream):
reference = row.get("reference", "")
entity_number = row.get("entity", "")

if not (prefix and reference and entity_number in self.reverse_lookups):
if not (prefix and reference and entity_number):
yield block
continue
value = self.reverse_lookups[entity_number]

if value:
organisation = value[-1].split(",")[-1]
find_entity = self.lookup(
prefix=prefix,
organisation=organisation,
reference=reference,
)
if not find_entity:
# TBD this needs to specifically not match unless the organisation and other columns
# are empty in the lookups.csv probably isn't a change here.
# or by the CURIE
find_entity = self.lookup(prefix=prefix, reference=reference)

# When obtaining an entity number using only the prefix and reference, check if the
# lookup includes an associated organisation. If it does, do not use the entity number,
# as it is organisation specific.
find_entity = self.check_associated_organisation(find_entity)

if not find_entity or (
str(find_entity) in self.redirect_lookups
and int(self.redirect_lookups[str(find_entity)].get("status", 0))
== 410
):
if self.odp_collections and prefix in self.odp_collections:
self.issues.log_issue(
prefix,
"missing associated entity",
reference,
line_number=line_number,
)
else:
row[self.entity_field] = find_entity

# Get organisation from block metadata (set by OrganisationPhase)
organisation = block.get("organisation", "").replace(
"local-authority-eng", "local-authority"
)

find_entity = self.lookup(
prefix=prefix,
organisation=organisation,
reference=reference,
)
if not find_entity:
# TBD this needs to specifically not match unless the organisation and other columns
# are empty in the lookups.csv probably isn't a change here.
# or by the CURIE
find_entity = self.lookup(prefix=prefix, reference=reference)

# When obtaining an entity number using only the prefix and reference, check if the
# lookup includes an associated organisation. If it does, do not use the entity number,
# as it is organisation specific.
find_entity = self.check_associated_organisation(find_entity)

if not find_entity or (
str(find_entity) in self.redirect_lookups
and int(self.redirect_lookups[str(find_entity)].get("status", 0)) == 410
):
if self.odp_collections and prefix in self.odp_collections:
self.issues.log_issue(
prefix,
"missing associated entity",
reference,
line_number=line_number,
)
else:
row[self.entity_field] = find_entity
yield block


Expand Down
4 changes: 4 additions & 0 deletions digital_land/phase/organisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ def process(self, stream):
self.issues.log_issue(
"organisation", "invalid organisation", organisation_value
)

# Store at block level for post-pivot lookups
block["organisation"] = row["organisation"]

yield block
17 changes: 5 additions & 12 deletions digital_land/pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,24 +256,17 @@ def load_lookup(self):
or row.get("pipeline", "")
)
reference = row.get("reference", "") or row.get("value", "")

# composite key, ordered by specificity
resource_lookup = self.lookup.setdefault(row.get("resource", ""), {})
resource_lookup[
lookup_key(
entry_number=entry_number,
prefix=prefix,
reference=reference,
)
] = row["entity"]

organisation = row.get("organisation", "")
# replace local-authority-eng while we migrate
# TODO remove this replacement for local-authority-eng now that migration is complete
organisation = organisation.replace(
"local-authority-eng", "local-authority"
)

# composite key, ordered by specificity
resource_lookup = self.lookup.setdefault(row.get("resource", ""), {})
resource_lookup[
lookup_key(
entry_number=entry_number,
prefix=prefix,
reference=reference,
organisation=organisation,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ test = [
"sphinx",
"sphinx-autobuild",
"sphinx_rtd_theme",
"black",
"black==24.8.0",
]

notebook = [
Expand Down
18 changes: 9 additions & 9 deletions tests/acceptance/test_async_backend_pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path):
endpoints = ["d779ad1c91c5a46e2d4ace4d5446d7d7f81df1ed058f882121070574697a5412"]
pipeline_dir = test_dirs["pipeline_dir"]
organisation = "test-org"
organisation = "test-org:test"
request_id = request_id
collection_dir = test_dirs["collection_dir"]
converted_dir = test_dirs["converted_resource_dir"]
Expand Down Expand Up @@ -78,7 +78,7 @@ def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path):
dictwriter.writerow(row2)

# Create organisation.csv with data
row = {"organisation": "test-org", "name": "Test Org"}
row = {"organisation": "test-org:test", "name": "Test Org"}
fieldnames = row.keys()
with open(organisation_path, "w") as f:
dictwriter = csv.DictWriter(f, fieldnames=fieldnames)
Expand Down Expand Up @@ -115,7 +115,7 @@ def run_pipeline_for_test(test_dirs, dataset, resource, request_id, input_path):

# Load organisations
organisation = Organisation(organisation_path, Path(pipeline.path))
default_values["organisation"] = organisation
default_values["organisation"] = "test-org:test"
try:
run_pipeline(
ConvertPhase(
Expand Down Expand Up @@ -216,12 +216,12 @@ def test_async_pipeline_run(test_dirs):
{
"reference": "ABC_0001",
"entry-date": "2024-01-01",
"organisation": "test-org",
"organisation": "test-org:test",
},
{
"reference": "ABC_0002",
"entry-date": "2024-01-02",
"organisation": "test-org",
"organisation": "test-org:test",
},
]

Expand Down Expand Up @@ -282,12 +282,12 @@ def test_pipeline_output_is_complete(test_dirs):
{
"reference": "ABC_0001",
"entry-date": "2024-01-01",
"organisation": "test-org",
"organisation": "test-org:test",
},
{
"reference": "ABC_0002",
"entry-date": "2024-01-02",
"organisation": "test-org",
"organisation": "test-org:test",
},
]

Expand Down Expand Up @@ -433,7 +433,7 @@ def test_pipeline_lookup_phase(test_dirs):
{
"reference": "ABC_0001",
"entry-date": "2025-01-01",
"organisation": "test-org",
"organisation": "test-org:test",
"article-4-direction": "a4d1",
}
]
Expand Down Expand Up @@ -481,7 +481,7 @@ def test_pipeline_lookup_phase_assign_reference_entity(test_dirs):
{
"reference": "ABC_0001",
"entry-date": "2025-01-01",
"organisation": "test-org",
"organisation": "test-org:test",
"article-4-direction": "a4d2",
}
]
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import csv
import urllib.request
import pandas as pd
import logging
from urllib.error import URLError
from digital_land.pipeline import Pipeline
from digital_land.pipeline import Lookups
from digital_land.specification import Specification
from digital_land.organisation import Organisation

logger = logging.getLogger(__name__)


def write_as_csv(dir, filename, data):
with open(os.path.join(dir, filename), "w") as f:
Expand Down Expand Up @@ -551,6 +554,53 @@ def test_load_concat_no_prepend_append(empty_pipeline_dir):
}


def test_load_lookup_creates_single_lookup_per_row(tmp_path):
"""
Test that load_lookup creates a single lookup entry per row in lookup.csv.
Each row should create one lookup key based on all provided fields.
Keys are normalized (lowercased, special chars removed).
"""
# -- Arrange --
pipeline_dir = tmp_path / "pipeline"
pipeline_dir.mkdir()

test_pipeline = "test-pipeline"

# Create lookup.csv with various combinations of fields
lookup_data = {
"resource": ["", "", "res123"],
"entry-number": ["", "5", ""],
"prefix": ["ancient-woodland", "conservation-area", "listed-building"],
"reference": ["AW001", "CA002", "LB003"],
"organisation": ["local-authority:ABC", "", "local-authority:XYZ"],
"entity": ["1000001", "1000002", "1000003"],
}
pd.DataFrame(lookup_data).to_csv(f"{pipeline_dir}/lookup.csv", index=False)

# -- Act --
pipeline = Pipeline(str(pipeline_dir), test_pipeline)

# -- Assert --
# Check that lookups are created correctly
# Row 1: prefix + reference + organisation (no resource)
assert "" in pipeline.lookup, "Should have empty resource key for general lookups"
general_lookups = pipeline.lookup[""]

# Row 1: normalized key (lowercased, colons removed)
assert ",ancient-woodland,aw001,local-authorityabc" in general_lookups
assert general_lookups[",ancient-woodland,aw001,local-authorityabc"] == "1000001"

# Row 2: entry-number + prefix + reference (no organisation)
assert "5,conservation-area,ca002," in general_lookups
assert general_lookups["5,conservation-area,ca002,"] == "1000002"

# Row 3: resource-scoped lookup
assert "res123" in pipeline.lookup, "Should have resource-specific key"
resource_lookups = pipeline.lookup["res123"]
assert ",listed-building,lb003,local-authorityxyz" in resource_lookups
assert resource_lookups[",listed-building,lb003,local-authorityxyz"] == "1000003"


@pytest.fixture(scope="session")
def specification_dir(tmp_path_factory):
"""Download specification files from GitHub for testing"""
Expand Down Expand Up @@ -665,7 +715,7 @@ def get_test_lookup_config():
"reference": ["0", "1"],
"entity": ["2200001", "2200002"],
"start-date": ["", ""],
"organisation": ["101", "101"],
"organisation": ["local-authority:LBH", "local-authority:LBH"],
"end-date": ["", ""],
"entry-date": ["", ""],
"endpoint": ["", ""],
Expand Down Expand Up @@ -713,11 +763,11 @@ def test_pipeline_transform_basic(
pd.DataFrame(get_test_lookup_config()).to_csv(
f"{pipeline_dir}/lookup.csv", index=False
)

# Initialize pipeline components
spec = Specification(specification_dir)
org = Organisation(organisation_path=organisation_path)
pipeline = Pipeline(str(pipeline_dir), dataset_name, specification=spec)
logger.info(f"Pipeline Lookups: {pipeline.lookup}")

output_path = tmp_path / "output" / "transformed.csv"
output_path.parent.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -893,7 +943,3 @@ def test_pipeline_transform_with_unmapped_reference_lookup_disabled(
# Verify reference field for unmapped data exists in output
reference_values = output_df[output_df["field"] == "reference"]["value"].tolist()
assert "99" in reference_values, "Unmapped reference 99 should be in output"


if __name__ == "__main__":
pytest.main()
Loading