|
| 1 | + |
| 2 | +""" |
| 3 | +Builds two ODP quality CSV reports by combining active endpoint issue data, provision/organisation lookups, and geospatial checks against LPA boundaries. |
| 4 | +It maps issues to quality criteria, calculates provider-dataset quality levels plus criteria pass/fail detail, writes the two CSV outputs. |
| 5 | +""" |
| 6 | + |
| 7 | +from __future__ import annotations |
| 8 | + |
| 9 | +import argparse |
| 10 | +import os |
| 11 | +import urllib.parse |
| 12 | + |
| 13 | +import geopandas as gpd |
| 14 | +import numpy as np |
| 15 | +import pandas as pd |
| 16 | +import shapely.wkt |
| 17 | + |
| 18 | +ODP_DATASETS = [ |
| 19 | + "conservation-area", |
| 20 | + "conservation-area-document", |
| 21 | + "article-4-direction-area", |
| 22 | + "article-4-direction", |
| 23 | + "listed-building-outline", |
| 24 | + "tree", |
| 25 | + "tree-preservation-zone", |
| 26 | + "tree-preservation-order", |
| 27 | +] |
| 28 | + |
| 29 | + |
| 30 | +def parse_args() -> argparse.Namespace: |
| 31 | + parser = argparse.ArgumentParser() |
| 32 | + parser.add_argument("--output-dir", required=True) |
| 33 | + return parser.parse_args() |
| 34 | + |
| 35 | + |
| 36 | +def datasette_query(db: str, sql: str) -> pd.DataFrame: |
| 37 | + params = urllib.parse.urlencode({"sql": sql, "_size": "max"}) |
| 38 | + return pd.read_csv(f"https://datasette.planning.data.gov.uk/{db}.csv?{params}") |
| 39 | + |
| 40 | + |
| 41 | +def datasette_query_paginated(db: str, sql: str, page_size: int = 1000) -> pd.DataFrame: |
| 42 | + frames = [] |
| 43 | + offset = 0 |
| 44 | + |
| 45 | + while True: |
| 46 | + page_sql = f"{sql}\nLIMIT {page_size} OFFSET {offset}" |
| 47 | + page_df = datasette_query(db, page_sql) |
| 48 | + if page_df.empty: |
| 49 | + break |
| 50 | + |
| 51 | + frames.append(page_df) |
| 52 | + |
| 53 | + if len(page_df) < page_size: |
| 54 | + break |
| 55 | + offset += page_size |
| 56 | + |
| 57 | + if not frames: |
| 58 | + return pd.DataFrame() |
| 59 | + return pd.concat(frames, ignore_index=True) |
| 60 | + |
| 61 | + |
| 62 | +def get_pdp_gdf(dataset: str, geometry_field: str) -> gpd.GeoDataFrame: |
| 63 | + df = pd.read_csv(f"https://files.planning.data.gov.uk/dataset/{dataset}.csv", dtype="str") |
| 64 | + df.columns = [c.replace("-", "_") for c in df.columns] |
| 65 | + df = df[df[geometry_field].notnull()].copy() |
| 66 | + df[geometry_field] = df[geometry_field].apply(shapely.wkt.loads) |
| 67 | + gdf = gpd.GeoDataFrame(df, geometry=geometry_field) |
| 68 | + gdf.set_crs(epsg=4326, inplace=True) |
| 69 | + return gdf |
| 70 | + |
| 71 | + |
| 72 | +def main() -> None: |
| 73 | + args = parse_args() |
| 74 | + output_dir = args.output_dir |
| 75 | + os.makedirs(output_dir, exist_ok=True) |
| 76 | + |
| 77 | + issue_lookup = datasette_query( |
| 78 | + "digital-land", |
| 79 | + """ |
| 80 | + SELECT issue_type, |
| 81 | + quality_criteria_level || ' - ' || quality_criteria AS quality_criteria, |
| 82 | + quality_criteria_level AS quality_level |
| 83 | + FROM issue_type |
| 84 | + """, |
| 85 | + ) |
| 86 | + |
| 87 | + provision = datasette_query( |
| 88 | + "digital-land", |
| 89 | + "SELECT * FROM provision WHERE project = 'open-digital-planning'", |
| 90 | + ).rename(columns={"dataset": "pipeline"}) |
| 91 | + |
| 92 | + endpoint_issues = datasette_query_paginated( |
| 93 | + "performance", |
| 94 | + """ |
| 95 | + SELECT rhe.organisation, |
| 96 | + rhe.name AS organisation_name, |
| 97 | + rhe.collection, |
| 98 | + rhe.pipeline, |
| 99 | + rhe.endpoint, |
| 100 | + rhe.resource, |
| 101 | + its.issue_type |
| 102 | + FROM reporting_historic_endpoints rhe |
| 103 | + LEFT JOIN endpoint_dataset_issue_type_summary its ON rhe.resource = its.resource |
| 104 | + WHERE rhe.endpoint_end_date = '' |
| 105 | + AND rhe.resource_end_date = '' |
| 106 | + AND rhe.latest_status = 200 |
| 107 | + """, |
| 108 | + ) |
| 109 | + |
| 110 | + org_lookup = datasette_query( |
| 111 | + "digital-land", |
| 112 | + """ |
| 113 | + SELECT entity AS organisation_entity, |
| 114 | + name AS organisation_name, |
| 115 | + organisation, |
| 116 | + end_date, |
| 117 | + local_planning_authority AS LPACD, |
| 118 | + CASE |
| 119 | + WHEN local_planning_authority != '' OR organisation IN ('local-authority:NDO', 'local-authority:PUR') THEN 1 |
| 120 | + ELSE 0 |
| 121 | + END AS lpa_flag |
| 122 | + FROM organisation |
| 123 | + WHERE name != 'Waveney District Council' |
| 124 | + """, |
| 125 | + ) |
| 126 | + org_lookup[["lpa_flag", "organisation_entity"]] = org_lookup[["lpa_flag", "organisation_entity"]].astype(int) |
| 127 | + |
| 128 | + ca_gdf = get_pdp_gdf("conservation-area", "point") |
| 129 | + ca_gdf[["organisation_entity"]] = ca_gdf[["organisation_entity"]].astype(int) |
| 130 | + |
| 131 | + lpa_gdf = get_pdp_gdf("local-planning-authority", "geometry").rename( |
| 132 | + columns={"reference": "LPACD", "name": "lpa_name"} |
| 133 | + ) |
| 134 | + |
| 135 | + lpa_live = lpa_gdf[["LPACD", "geometry"]].merge( |
| 136 | + org_lookup[org_lookup["end_date"].isnull()][["LPACD", "organisation", "organisation_name", "organisation_entity"]], |
| 137 | + how="inner", |
| 138 | + on="LPACD", |
| 139 | + ) |
| 140 | + |
| 141 | + base = lpa_live[["LPACD", "organisation"]].merge(endpoint_issues, how="outer", on="organisation") |
| 142 | + |
| 143 | + ca_gdf = ca_gdf.merge( |
| 144 | + org_lookup[["organisation_entity", "organisation_name", "lpa_flag"]], |
| 145 | + how="left", |
| 146 | + on="organisation_entity", |
| 147 | + ) |
| 148 | + |
| 149 | + lpa_ca_join = gpd.sjoin( |
| 150 | + lpa_live[["LPACD", "organisation", "organisation_name", "geometry"]], |
| 151 | + ca_gdf[["entity", "organisation_entity", "lpa_flag", "point"]], |
| 152 | + how="inner", |
| 153 | + predicate="intersects", |
| 154 | + ) |
| 155 | + |
| 156 | + qual_prov = ( |
| 157 | + lpa_ca_join.groupby(["LPACD", "organisation", "organisation_name"], as_index=False) |
| 158 | + .agg(prov_rank_max=("lpa_flag", "max")) |
| 159 | + .query("prov_rank_max == 0") |
| 160 | + ) |
| 161 | + qual_prov[["collection", "pipeline"]] = "conservation-area" |
| 162 | + qual_prov["issue_type"] = "non_auth" |
| 163 | + qual_prov["quality_criteria"] = "1 - authoritative data from the LPA" |
| 164 | + qual_prov["quality_level"] = 1 |
| 165 | + qual_prov = qual_prov[["LPACD", "collection", "pipeline", "organisation", "organisation_name", "issue_type", "quality_criteria", "quality_level"]] |
| 166 | + |
| 167 | + qual_match_orgs = datasette_query( |
| 168 | + "digital-land", |
| 169 | + """ |
| 170 | + SELECT DISTINCT organisation |
| 171 | + FROM expectation |
| 172 | + WHERE name = 'Check number of conservation-area entities inside the local planning authority boundary matches the manual count' |
| 173 | + AND passed = 'False' |
| 174 | + """, |
| 175 | + ) |
| 176 | + qual_match = lpa_live.merge(qual_match_orgs, how="inner", on="organisation")[["LPACD", "organisation", "organisation_name"]] |
| 177 | + qual_match["collection"] = "conservation-area" |
| 178 | + qual_match["pipeline"] = "conservation-area" |
| 179 | + qual_match["quality_criteria"] = "3 - entity count matches LPA" |
| 180 | + qual_match["quality_level"] = 3 |
| 181 | + |
| 182 | + bounds_orgs = datasette_query( |
| 183 | + "digital-land", |
| 184 | + """ |
| 185 | + SELECT DISTINCT organisation, dataset AS pipeline |
| 186 | + FROM expectation |
| 187 | + WHERE name LIKE '%outside%' |
| 188 | + AND message NOT LIKE '%error%' |
| 189 | + AND passed = 'False' |
| 190 | + """, |
| 191 | + ) |
| 192 | + qual_bounds = lpa_live.merge(bounds_orgs, how="inner", on="organisation")[["LPACD", "organisation", "organisation_name", "pipeline"]] |
| 193 | + qual_bounds["quality_criteria"] = "3 - entities within LPA boundary" |
| 194 | + qual_bounds["quality_level"] = 3 |
| 195 | + |
| 196 | + qual_issues = base.merge(issue_lookup, how="left", on="issue_type")[[ |
| 197 | + "LPACD", |
| 198 | + "collection", |
| 199 | + "pipeline", |
| 200 | + "organisation", |
| 201 | + "organisation_name", |
| 202 | + "issue_type", |
| 203 | + "quality_criteria", |
| 204 | + "quality_level", |
| 205 | + ]] |
| 206 | + |
| 207 | + qual_all = pd.concat([qual_prov, qual_match, qual_bounds, qual_issues], ignore_index=True) |
| 208 | + |
| 209 | + level_map = { |
| 210 | + 4: "4. data that is trustworthy", |
| 211 | + 3: "3. data that is good for ODP", |
| 212 | + 2: "2. authoritative data from the LPA", |
| 213 | + 1: "1. some data", |
| 214 | + } |
| 215 | + |
| 216 | + qual_summary = ( |
| 217 | + qual_all.groupby(["LPACD", "pipeline", "organisation", "organisation_name"], as_index=False, dropna=False) |
| 218 | + .agg(quality_level=("quality_level", "min")) |
| 219 | + ) |
| 220 | + qual_summary["quality_level"] = qual_summary["quality_level"].replace(np.nan, 4) |
| 221 | + qual_summary["quality_level_label"] = qual_summary["quality_level"].map(level_map) |
| 222 | + |
| 223 | + odp_lpa_summary = qual_summary.merge( |
| 224 | + provision[["organisation", "pipeline", "cohort"]], |
| 225 | + how="inner", |
| 226 | + on=["organisation", "pipeline"], |
| 227 | + ) |
| 228 | + |
| 229 | + odp_lpa_summary_wide = ( |
| 230 | + odp_lpa_summary.pivot( |
| 231 | + columns="pipeline", |
| 232 | + values="quality_level_label", |
| 233 | + index=["cohort", "organisation", "organisation_name"], |
| 234 | + ) |
| 235 | + .reset_index() |
| 236 | + .sort_values(["cohort", "organisation_name"]) |
| 237 | + ) |
| 238 | + odp_lpa_summary_wide.replace(np.nan, "0. no data", inplace=True) |
| 239 | + |
| 240 | + ready = qual_summary[ |
| 241 | + qual_summary["pipeline"].isin( |
| 242 | + [ |
| 243 | + "article-4-direction-area", |
| 244 | + "conservation-area", |
| 245 | + "listed-building-outline", |
| 246 | + "tree", |
| 247 | + "tree-preservation-zone", |
| 248 | + ] |
| 249 | + ) |
| 250 | + ].groupby("organisation", as_index=False).agg( |
| 251 | + area_dataset_count=("pipeline", "count"), |
| 252 | + min_quality_level=("quality_level", "min"), |
| 253 | + ) |
| 254 | + ready["ready_for_ODP_adoption"] = np.where( |
| 255 | + (ready["area_dataset_count"] == 5) & (ready["min_quality_level"] >= 2), |
| 256 | + "yes", |
| 257 | + "no", |
| 258 | + ) |
| 259 | + odp_lpa_summary_wide = odp_lpa_summary_wide.merge( |
| 260 | + ready[["organisation", "ready_for_ODP_adoption"]], |
| 261 | + how="left", |
| 262 | + on="organisation", |
| 263 | + ) |
| 264 | + |
| 265 | + qual_cat_count = qual_all.groupby( |
| 266 | + ["pipeline", "organisation", "organisation_name", "quality_criteria"], |
| 267 | + as_index=False, |
| 268 | + ).agg(n_issues=("quality_level", "count")) |
| 269 | + |
| 270 | + prov = qual_all[["pipeline", "organisation", "organisation_name"]].drop_duplicates() |
| 271 | + prov["key"] = 1 |
| 272 | + qual_cat = qual_all[qual_all["quality_criteria"].notnull()][["quality_criteria"]].drop_duplicates() |
| 273 | + qual_cat["key"] = 1 |
| 274 | + |
| 275 | + qual_cat_summary = prov.merge(qual_cat, how="left", on="key") |
| 276 | + qual_cat_summary = qual_cat_summary.merge( |
| 277 | + qual_cat_count, |
| 278 | + how="left", |
| 279 | + on=["pipeline", "organisation", "organisation_name", "quality_criteria"], |
| 280 | + ) |
| 281 | + qual_cat_summary["issue_flag"] = np.where(qual_cat_summary["n_issues"] > 0, False, True) |
| 282 | + |
| 283 | + qual_cat_summary_wide = qual_cat_summary.pivot( |
| 284 | + columns="quality_criteria", |
| 285 | + values="issue_flag", |
| 286 | + index=["pipeline", "organisation", "organisation_name"], |
| 287 | + ).reset_index().merge( |
| 288 | + qual_summary[["pipeline", "organisation", "quality_level_label"]], |
| 289 | + how="left", |
| 290 | + on=["pipeline", "organisation"], |
| 291 | + ) |
| 292 | + |
| 293 | + odp_qual_summary = qual_cat_summary_wide[ |
| 294 | + qual_cat_summary_wide["pipeline"].isin(ODP_DATASETS) |
| 295 | + ].copy() |
| 296 | + |
| 297 | + out_scores = os.path.join(output_dir, "quality_ODP_dataset_scores_by_LPA.csv") |
| 298 | + out_detail = os.path.join(output_dir, "quality_ODP_dataset_quality_detail.csv") |
| 299 | + |
| 300 | + odp_lpa_summary_wide.to_csv(out_scores, index=False) |
| 301 | + odp_qual_summary.to_csv(out_detail, index=False) |
| 302 | + |
| 303 | + print(f"Saved {out_scores} ({len(odp_lpa_summary_wide)} rows)") |
| 304 | + print(f"Saved {out_detail} ({len(odp_qual_summary)} rows)") |
| 305 | + |
| 306 | + |
| 307 | +if __name__ == "__main__": |
| 308 | + main() |
0 commit comments