|
| 1 | +import urllib |
| 2 | +from pathlib import Path |
| 3 | +import time |
| 4 | + |
| 5 | +import pytest |
| 6 | + |
| 7 | +from digital_land.commands import pipeline_run |
| 8 | +from digital_land.specification import Specification |
| 9 | +from digital_land.configuration.main import Config |
| 10 | +from digital_land.collection import Collection |
| 11 | +from digital_land.pipeline import Pipeline |
| 12 | +from digital_land.organisation import Organisation |
| 13 | + |
| 14 | +# set the below if you want to use a permenant directory to view data after running test |
| 15 | +# o to stopp epeated downloads of tests |
| 16 | +DATA_DIRECTORY = None |
| 17 | + |
| 18 | + |
| 19 | +@pytest.mark.parametrize( |
| 20 | + "resource_hash,dataset", |
| 21 | + [ |
| 22 | + ( |
| 23 | + "1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836", |
| 24 | + "article-4-direction-area", |
| 25 | + ), |
| 26 | + ], |
| 27 | +) |
| 28 | +def test_transformation_performance( |
| 29 | + resource_hash: str, dataset: str, capsys, tmp_path: Path |
| 30 | +): |
| 31 | + # endpoint hash TODO we should look at automating the retrieval of this in the future |
| 32 | + # endpoint_hash = 'f4bfb0e3a3f0f0e2e5e1f3c6e4b2a7d8c9e0f1a2b3c4d5e6f7g8h9i0j1k2l3m4' |
| 33 | + if DATA_DIRECTORY: |
| 34 | + data_dir = Path(DATA_DIRECTORY) |
| 35 | + else: |
| 36 | + data_dir = tmp_path / "data" |
| 37 | + |
| 38 | + data_dir.mkdir(parents=True, exist_ok=True) |
| 39 | + |
| 40 | + # Specification first as can be used for the others |
| 41 | + specification_dir = data_dir / "specification" |
| 42 | + specification_dir.mkdir(parents=True, exist_ok=True) |
| 43 | + Specification.download(specification_dir) |
| 44 | + |
| 45 | + spec = Specification(specification_dir) |
| 46 | + collection = spec.dataset[dataset]["collection"] |
| 47 | + |
| 48 | + if not collection: |
| 49 | + raise ValueError( |
| 50 | + f"Dataset {dataset} does not have a collection defined in the specification" |
| 51 | + ) |
| 52 | + |
| 53 | + # download the configuration files for that collection |
| 54 | + pipeline_dir = data_dir / "pipeline" |
| 55 | + pipeline_dir.mkdir(parents=True, exist_ok=True) |
| 56 | + |
| 57 | + Config.download_pipeline_files(path=pipeline_dir, collection=collection) |
| 58 | + |
| 59 | + # download the resource |
| 60 | + data_collection_url = "https://files.planning.data.gov.uk/" |
| 61 | + resource_url = f"{data_collection_url}{collection}-collection/collection/resource/{resource_hash}" |
| 62 | + resource_path = data_dir / "resource" / f"{resource_hash}" |
| 63 | + resource_path.parent.mkdir(parents=True, exist_ok=True) |
| 64 | + if not resource_path.exists(): |
| 65 | + print(f"Downloading {resource_url} to {resource_path}") |
| 66 | + urllib.request.urlretrieve(resource_url, resource_path) |
| 67 | + else: |
| 68 | + print(f"Using existing file {resource_path}") |
| 69 | + |
| 70 | + # we need to know the endpoint hash for the resource so will need to download the logs |
| 71 | + collection_dir = data_dir / "collection" |
| 72 | + collection_dir.mkdir(parents=True, exist_ok=True) |
| 73 | + Collection.download(path=collection_dir, collection=collection) |
| 74 | + |
| 75 | + # download organisation data |
| 76 | + cache_dir = data_dir / "cache" |
| 77 | + cache_dir.mkdir(parents=True, exist_ok=True) |
| 78 | + org_path = cache_dir / "organisation.csv" |
| 79 | + Organisation.download(path=org_path) |
| 80 | + |
| 81 | + output_path = data_dir / "transformed" / dataset / f"{resource_hash}.csv" |
| 82 | + output_path.parent.mkdir(parents=True, exist_ok=True) |
| 83 | + converted_path = data_dir / "converted" / dataset / f"{resource_hash}.csv" |
| 84 | + converted_path.parent.mkdir(parents=True, exist_ok=True) |
| 85 | + |
| 86 | + # create pipeline object |
| 87 | + pipeline = Pipeline(pipeline_dir, dataset) |
| 88 | + |
| 89 | + # create logs |
| 90 | + issue_dir = data_dir / "issues" / dataset |
| 91 | + issue_dir.mkdir(parents=True, exist_ok=True) |
| 92 | + operational_issue_dir = data_dir / "performance" / "operational_issues" |
| 93 | + operational_issue_dir.mkdir(parents=True, exist_ok=True) |
| 94 | + column_field_dir = cache_dir / "column_field" / dataset |
| 95 | + column_field_dir.mkdir(parents=True, exist_ok=True) |
| 96 | + dataset_resource_dir = cache_dir / "dataset_resource" / dataset |
| 97 | + dataset_resource_dir.mkdir(parents=True, exist_ok=True) |
| 98 | + converted_resource_dir = cache_dir / "converted_resource" / dataset |
| 99 | + converted_resource_dir.mkdir(parents=True, exist_ok=True) |
| 100 | + output_log_dir = data_dir / "log" |
| 101 | + output_log_dir.mkdir(parents=True, exist_ok=True) |
| 102 | + |
| 103 | + # get endpoints from the collection TODO include redirects |
| 104 | + collection = Collection(directory=collection_dir) |
| 105 | + collection.load() |
| 106 | + endpoints = collection.resource_endpoints(resource_hash) |
| 107 | + organisations = collection.resource_organisations(resource_hash) |
| 108 | + entry_date = collection.resource_start_date(resource_hash) |
| 109 | + |
| 110 | + # build config from downloaded files |
| 111 | + config_path = cache_dir / "config.sqlite3" |
| 112 | + config = Config(path=config_path, specification=spec) |
| 113 | + config.create() |
| 114 | + tables = {key: pipeline.path for key in config.tables.keys()} |
| 115 | + config.load(tables) |
| 116 | + |
| 117 | + # measure length of time |
| 118 | + start_old = time.perf_counter() |
| 119 | + |
| 120 | + # currently uses pipeline_run as the entrypoint for the old pipeline |
| 121 | + # this may need changing if we alter this function to need new code |
| 122 | + pipeline_run( |
| 123 | + dataset=dataset, |
| 124 | + pipeline=pipeline, |
| 125 | + specification=spec, |
| 126 | + input_path=resource_path, |
| 127 | + output_path=output_path, |
| 128 | + collection_dir=collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date |
| 129 | + issue_dir=issue_dir, |
| 130 | + operational_issue_dir=operational_issue_dir, |
| 131 | + organisation_path=org_path, |
| 132 | + save_harmonised=False, |
| 133 | + column_field_dir=column_field_dir, |
| 134 | + dataset_resource_dir=dataset_resource_dir, |
| 135 | + converted_resource_dir=converted_resource_dir, |
| 136 | + cache_dir=cache_dir, |
| 137 | + endpoints=endpoints, |
| 138 | + organisations=organisations, |
| 139 | + entry_date=entry_date, |
| 140 | + config_path=config_path, |
| 141 | + resource=resource_hash, |
| 142 | + output_log_dir=output_log_dir, |
| 143 | + converted_path=converted_path, |
| 144 | + ) |
| 145 | + |
| 146 | + end_old = time.perf_counter() |
| 147 | + |
| 148 | + with capsys.disabled(): |
| 149 | + print(f" Original Transform Took: {end_old - start_old:.4f} seconds") |
| 150 | + |
| 151 | + # now run new version |
| 152 | + print("running new transform") |
| 153 | + |
| 154 | + start_new = time.perf_counter() |
| 155 | + |
| 156 | + time.sleep(5) # replace with new code |
| 157 | + |
| 158 | + end_new = time.perf_counter() |
| 159 | + |
| 160 | + with capsys.disabled(): |
| 161 | + print(f" New Transform Took: {end_new - start_new:.4f} seconds") |
0 commit comments