|
| 1 | +""" |
| 2 | +Tests for ModuleIOManager path registry. |
| 3 | +
|
| 4 | +The IO manager now properly stores the paths returned by assets in a registry, |
| 5 | +instead of trying to guess/reconstruct paths from asset keys. |
| 6 | +""" |
| 7 | +from __future__ import annotations |
| 8 | + |
| 9 | +from pathlib import Path |
| 10 | +from unittest.mock import MagicMock |
| 11 | + |
| 12 | +import pytest |
| 13 | + |
| 14 | +from prepare_annotations.core.dagster_io_managers import ModuleIOManager |
| 15 | + |
| 16 | + |
| 17 | +@pytest.fixture |
| 18 | +def io_manager(tmp_path: Path) -> ModuleIOManager: |
| 19 | + """Create an IO manager with a temp registry path.""" |
| 20 | + manager = ModuleIOManager() |
| 21 | + manager._registry_path = tmp_path / ".asset_paths.json" |
| 22 | + return manager |
| 23 | + |
| 24 | + |
| 25 | +class TestModuleIOManagerRegistry: |
| 26 | + """Tests for the path registry functionality.""" |
| 27 | + |
| 28 | + def test_handle_output_stores_path_in_registry( |
| 29 | + self, |
| 30 | + io_manager: ModuleIOManager, |
| 31 | + tmp_path: Path, |
| 32 | + ) -> None: |
| 33 | + """handle_output should store the returned Path in the registry.""" |
| 34 | + # Create a mock output context |
| 35 | + context = MagicMock() |
| 36 | + context.asset_key.to_user_string.return_value = "coronary_with_ensembl" |
| 37 | + |
| 38 | + # The asset returns this path |
| 39 | + asset_output_path = tmp_path / "coronary" / "coronary_ensembl_joined.parquet" |
| 40 | + asset_output_path.parent.mkdir(parents=True, exist_ok=True) |
| 41 | + asset_output_path.touch() |
| 42 | + |
| 43 | + # Handle the output |
| 44 | + io_manager.handle_output(context, asset_output_path) |
| 45 | + |
| 46 | + # Registry should contain the path |
| 47 | + registry = io_manager._load_registry() |
| 48 | + assert "coronary_with_ensembl" in registry |
| 49 | + assert registry["coronary_with_ensembl"] == str(asset_output_path) |
| 50 | + |
| 51 | + def test_load_input_retrieves_path_from_registry( |
| 52 | + self, |
| 53 | + io_manager: ModuleIOManager, |
| 54 | + tmp_path: Path, |
| 55 | + ) -> None: |
| 56 | + """load_input should retrieve the path from registry.""" |
| 57 | + # Create a file |
| 58 | + asset_path = tmp_path / "longevitymap" / "annotations.parquet" |
| 59 | + asset_path.parent.mkdir(parents=True, exist_ok=True) |
| 60 | + asset_path.touch() |
| 61 | + |
| 62 | + # Manually store in registry (simulating prior handle_output) |
| 63 | + registry = {"longevitymap_annotations": str(asset_path)} |
| 64 | + io_manager._save_registry(registry) |
| 65 | + |
| 66 | + # Create mock input context |
| 67 | + context = MagicMock() |
| 68 | + context.upstream_output.asset_key.to_user_string.return_value = "longevitymap_annotations" |
| 69 | + |
| 70 | + # Load should return the path |
| 71 | + result = io_manager.load_input(context) |
| 72 | + assert result == asset_path |
| 73 | + |
| 74 | + def test_load_input_raises_if_not_in_registry( |
| 75 | + self, |
| 76 | + io_manager: ModuleIOManager, |
| 77 | + ) -> None: |
| 78 | + """load_input should raise FileNotFoundError if asset not in registry.""" |
| 79 | + context = MagicMock() |
| 80 | + context.upstream_output.asset_key.to_user_string.return_value = "unknown_asset" |
| 81 | + |
| 82 | + with pytest.raises(FileNotFoundError, match="not found in registry"): |
| 83 | + io_manager.load_input(context) |
| 84 | + |
| 85 | + def test_registry_persists_across_instances( |
| 86 | + self, |
| 87 | + tmp_path: Path, |
| 88 | + ) -> None: |
| 89 | + """Registry should persist to disk and be readable by new instances.""" |
| 90 | + registry_path = tmp_path / ".asset_paths.json" |
| 91 | + |
| 92 | + # First instance stores a path |
| 93 | + manager1 = ModuleIOManager() |
| 94 | + manager1._registry_path = registry_path |
| 95 | + |
| 96 | + asset_path = tmp_path / "vo2max" / "weights.parquet" |
| 97 | + asset_path.parent.mkdir(parents=True, exist_ok=True) |
| 98 | + asset_path.touch() |
| 99 | + |
| 100 | + context = MagicMock() |
| 101 | + context.asset_key.to_user_string.return_value = "vo2max_weights" |
| 102 | + manager1.handle_output(context, asset_path) |
| 103 | + |
| 104 | + # New instance should be able to load |
| 105 | + manager2 = ModuleIOManager() |
| 106 | + manager2._registry_path = registry_path |
| 107 | + |
| 108 | + context2 = MagicMock() |
| 109 | + context2.upstream_output.asset_key.to_user_string.return_value = "vo2max_weights" |
| 110 | + |
| 111 | + result = manager2.load_input(context2) |
| 112 | + assert result == asset_path |
| 113 | + |
| 114 | + def test_multiple_assets_stored_in_registry( |
| 115 | + self, |
| 116 | + io_manager: ModuleIOManager, |
| 117 | + tmp_path: Path, |
| 118 | + ) -> None: |
| 119 | + """Multiple assets should be stored in the same registry.""" |
| 120 | + # Store multiple assets |
| 121 | + assets = { |
| 122 | + "coronary_annotations": tmp_path / "coronary" / "annotations.parquet", |
| 123 | + "coronary_studies": tmp_path / "coronary" / "studies.parquet", |
| 124 | + "coronary_with_ensembl": tmp_path / "coronary" / "coronary_ensembl_joined.parquet", |
| 125 | + } |
| 126 | + |
| 127 | + for asset_key, path in assets.items(): |
| 128 | + path.parent.mkdir(parents=True, exist_ok=True) |
| 129 | + path.touch() |
| 130 | + |
| 131 | + context = MagicMock() |
| 132 | + context.asset_key.to_user_string.return_value = asset_key |
| 133 | + io_manager.handle_output(context, path) |
| 134 | + |
| 135 | + # All should be in registry |
| 136 | + registry = io_manager._load_registry() |
| 137 | + assert len(registry) == 3 |
| 138 | + for asset_key, path in assets.items(): |
| 139 | + assert registry[asset_key] == str(path) |
| 140 | + |
| 141 | + |
| 142 | +class TestModuleIOManagerNonPathOutputs: |
| 143 | + """Tests for non-Path outputs (e.g., dicts from upload assets).""" |
| 144 | + |
| 145 | + def test_handle_output_ignores_non_path_objects( |
| 146 | + self, |
| 147 | + io_manager: ModuleIOManager, |
| 148 | + ) -> None: |
| 149 | + """Non-Path outputs should not be stored in registry.""" |
| 150 | + context = MagicMock() |
| 151 | + context.asset_key.to_user_string.return_value = "coronary_hf_upload" |
| 152 | + |
| 153 | + # Upload assets return dicts, not Paths |
| 154 | + upload_result = {"repo_id": "just-dna-seq/annotators", "num_uploaded": 3} |
| 155 | + io_manager.handle_output(context, upload_result) |
| 156 | + |
| 157 | + # Should not be in registry |
| 158 | + registry = io_manager._load_registry() |
| 159 | + assert "coronary_hf_upload" not in registry |
0 commit comments