Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 70 additions & 6 deletions digital_land/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def build(
current_config_hash=pipeline_hash,
current_specification_hash=specification_hash,
),
"transform_resources": State.get_transform_resources_by_dataset(
collection,
dataset_resource_dir=dataset_resource_dir,
current_code_version=__version__,
current_config_hash=pipeline_hash,
current_specification_hash=specification_hash,
),
}
)

Expand Down Expand Up @@ -109,6 +116,21 @@ def get_code_hash():
commit = repo.revparse_single("HEAD")
return str(commit.id)

def _active_dataset_resources(collection: Collection):
"""Return dataset_resource_map with retired resources (no replacement) removed.

Applies the same redirect filtering as makerules so that counts and
resource lists only include resources that will actually be processed.
"""
redirect = {
entry["old-resource"]: entry["resource"]
for entry in collection.old_resource.entries
}
return {
dataset: {r for r in resources if redirect.get(r, r)}
for dataset, resources in collection.dataset_resource_map().items()
}

def get_transform_count(
collection: Collection,
dataset_resource_dir=None,
Expand All @@ -118,11 +140,12 @@ def get_transform_count(
):
"""Calculate the number of transformations that need to be completed.

When dataset_resource_dir is provided, only resources whose existing log
Retired resources with no replacement are excluded. When
dataset_resource_dir is provided, only resources whose existing log
differs from the current code version, config hash, or specification hash
are counted. If None, all resources are counted.
are counted. If None, all active resources are counted.
"""
dataset_resource = collection.dataset_resource_map()
dataset_resource = State._active_dataset_resources(collection)

if dataset_resource_dir is None:
return sum(len(resources) for resources in dataset_resource.values())
Expand Down Expand Up @@ -150,11 +173,12 @@ def get_transform_count_by_dataset(
):
"""Calculate the number of transformations needed per dataset.

When dataset_resource_dir is provided, only resources whose existing log
Retired resources with no replacement are excluded. When
dataset_resource_dir is provided, only resources whose existing log
differs from the current code version, config hash, or specification hash
are counted. If None, all resources are counted.
are counted. If None, all active resources are counted.
"""
dataset_resource = collection.dataset_resource_map()
dataset_resource = State._active_dataset_resources(collection)
transform_count_by_dataset = {}
for dataset, resources in dataset_resource.items():
if dataset_resource_dir is None:
Expand All @@ -174,6 +198,43 @@ def get_transform_count_by_dataset(
)
return transform_count_by_dataset

def get_transform_resources_by_dataset(
collection: Collection,
dataset_resource_dir=None,
current_code_version=None,
current_config_hash=None,
current_specification_hash=None,
):
"""Get the resource hashes that need transformation per dataset.

Retired resources with no replacement are excluded. When
dataset_resource_dir is provided, only resources whose existing log
differs from the current code version, config hash, or specification hash
are included. If None, all active resources are included.

Returns a dict mapping dataset name to a sorted list of resource hashes.
"""
dataset_resource = State._active_dataset_resources(collection)

resources_by_dataset = {}
for dataset, resources in dataset_resource.items():
if dataset_resource_dir is None:
resources_by_dataset[dataset] = sorted(resources)
else:
resources_by_dataset[dataset] = sorted(
resource
for resource in resources
if resource_needs_processing(
dataset_resource_dir,
dataset,
resource,
current_code_version,
current_config_hash,
current_specification_hash,
)
)
return resources_by_dataset


def compare_state(
specification_dir,
Expand Down Expand Up @@ -207,6 +268,9 @@ def compare_state(
# transform count by dataset should not be compared as it changes
current.pop("transform_count_by_dataset", None)
compare.pop("transform_count_by_dataset", None)
# transform resources should not be compared as it changes
current.pop("transform_resources", None)
compare.pop("transform_resources", None)

if current == compare:
return None
Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def test_state(tmp_path):
"last_updated_date",
"transform_count",
"transform_count_by_dataset",
"transform_resources",
]
assert state_data["code"] == get_code_hash()
assert state_data["specification"] == specification_hash
Expand Down
86 changes: 86 additions & 0 deletions tests/integration/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ def collection_with_transforms(tmp_path):
return collection_dir, 4 # Return directory and expected count


@pytest.fixture
def collection_with_retired_resource(collection_with_transforms):
"""Extends collection_with_transforms with a retired resource (no replacement).

resource1 is marked as retired with no replacement in old-resource.csv,
so it should be excluded from transform_resources even though it still
appears in dataset_resource_map().
"""
collection_dir, _ = collection_with_transforms
old_resource_file = collection_dir / "old-resource.csv"
with open(old_resource_file, "w", newline="") as f:
f.write("old-resource,resource\n")
f.write("resource1,\n") # retired, no replacement
return collection_dir


def test_get_code_hash():
proc = subprocess.run(
"git log -n 1".split(), cwd=os.path.dirname(__file__), stdout=subprocess.PIPE
Expand Down Expand Up @@ -226,3 +242,73 @@ def test_state_build_has_transform_count(collection_with_transforms, tmp_path):
assert (
test_state["transform_count"] == expected_count
), f"Expected {expected_count} transforms, got {test_state['transform_count']}"


class TestState:
def test_get_transform_resources_by_dataset_returns_all_resources(
self, collection_with_transforms
):
collection_dir, _ = collection_with_transforms
collection = Collection(directory=str(collection_dir))
collection.load()

result = State.get_transform_resources_by_dataset(collection)

assert set(result["dataset1"]) == {"resource1", "resource2"}
assert set(result["dataset2"]) == {"resource2", "resource3"}

def test_get_transform_resources_by_dataset_returns_sorted_lists(
self, collection_with_transforms
):
collection_dir, _ = collection_with_transforms
collection = Collection(directory=str(collection_dir))
collection.load()

result = State.get_transform_resources_by_dataset(collection)

for resources in result.values():
assert resources == sorted(resources)

def test_get_transform_resources_by_dataset_excludes_retired_resources(
self, collection_with_retired_resource
):
collection_dir = collection_with_retired_resource
collection = Collection(directory=str(collection_dir))
collection.load()

result = State.get_transform_resources_by_dataset(collection)

# resource1 is retired with no replacement — must not appear
assert "resource1" not in result["dataset1"]
# resource2 is still active
assert "resource2" in result["dataset1"]

def test_build_includes_transform_resources(
self, collection_with_transforms, tmp_path
):
collection_dir, _ = collection_with_transforms
specification_dir = tmp_path / "specification"
specification_dir.mkdir()
pipeline_dir = tmp_path / "pipeline"
pipeline_dir.mkdir()
resource_dir = tmp_path / "resource"
resource_dir.mkdir()

test_state = State.build(
str(specification_dir),
str(collection_dir),
str(pipeline_dir),
str(resource_dir),
True,
)

assert "transform_resources" in test_state
assert isinstance(test_state["transform_resources"], dict)
assert set(test_state["transform_resources"]["dataset1"]) == {
"resource1",
"resource2",
}
assert set(test_state["transform_resources"]["dataset2"]) == {
"resource2",
"resource3",
}
Loading