1+ import pandas as pd
2+ import json
3+ import os
4+ import logging
5+
6+ logger = logging .getLogger (__name__ )
7+
8+ FILES_URL = os .environ .get ("FILES_URL" , "https://files.planning.data.gov.uk" )
9+
10+ # URLs for data sources
11+ ENDPOINT_URL = "https://datasette.planning.data.gov.uk/digital-land/expectation.csv?_sort=rowid&passed__exact=False&operation__exact=count_deleted_entities"
12+ ORG_URL = "https://datasette.planning.data.gov.uk/digital-land/organisation.csv?_stream=on"
13+
14+
15+ def main (output_dir : str ):
16+ """
17+ Fetch deleted entities from expectations, enrich with entity metadata from parquet files.
18+ """
19+
20+ # ---------------------------------------------------------------
21+ # Load and filter expectations
22+ # ---------------------------------------------------------------
23+ df = pd .read_csv (ENDPOINT_URL )
24+ df_filtered = df [['dataset' , 'organisation' , 'details' ]].copy ()
25+
26+ # Parse JSON and extract entities list
27+ df_filtered ['entities' ] = df_filtered ['details' ].apply (
28+ lambda x : json .loads (x )['entities' ]
29+ )
30+
31+ # Explode to one entity per row
32+ df_expanded = df_filtered .explode ('entities' )[
33+ ['dataset' , 'organisation' , 'entities' ]
34+ ].copy ()
35+ df_expanded = df_expanded .reset_index (drop = True )
36+
37+ #print(f"Found {len(df_expanded)} entities across {df_expanded['dataset'].nunique()} datasets")
38+
39+ # ---------------------------------------------------------------
40+ # Load and merge organisation data
41+ # ---------------------------------------------------------------
42+ df_org = pd .read_csv (ORG_URL )
43+ df_org = df_org [['entity' , 'organisation' , 'name' ]].copy ()
44+ df_org = df_org .rename (
45+ columns = {
46+ 'entity' : 'organisation-entity' ,
47+ 'name' : 'organisation-name'
48+ }
49+ )
50+
51+ df_final = pd .merge (df_expanded , df_org , on = 'organisation' , how = 'left' )
52+ df_final = df_final .rename (columns = {'entities' : 'entity' })
53+
54+ # ---------------------------------------------------------------
55+ # Load entity data from parquet files
56+ # ---------------------------------------------------------------
57+ # Get unique datasets and build URLs dynamically
58+ unique_datasets = df_final ['dataset' ].unique ()
59+ ENTITY_URLS = {
60+ dataset : f"{ FILES_URL } /dataset/{ dataset } .parquet"
61+ for dataset in unique_datasets
62+ }
63+
64+ entity_dfs = []
65+ for dataset_name , url in ENTITY_URLS .items ():
66+ try :
67+ df_entity = pd .read_parquet (url )
68+
69+ # Check for required columns
70+ if 'entity' not in df_entity .columns or 'name' not in df_entity .columns :
71+ continue
72+
73+ # Keep only needed columns
74+ cols_to_keep = ['entity' , 'name' ]
75+ if 'reference' in df_entity .columns :
76+ cols_to_keep .append ('reference' )
77+ else :
78+ # If reference column doesn't exist, add empty column
79+ df_entity ['reference' ] = ''
80+ cols_to_keep .append ('reference' )
81+
82+ df_entity = df_entity [cols_to_keep ].copy ()
83+ df_entity ['dataset' ] = dataset_name
84+ entity_dfs .append (df_entity )
85+
86+ except Exception as e :
87+ logger .error (f"Failed to load { dataset_name } : { e } " )
88+
89+ if not entity_dfs :
90+ logger .error ("No entity datasets loaded successfully" )
91+ os .makedirs (output_dir , exist_ok = True )
92+ df_final .to_csv (
93+ os .path .join (output_dir , 'deleted_entities.csv' ),
94+ index = False
95+ )
96+ return
97+
98+ # ---------------------------------------------------------------
99+ # Combine entity data and merge
100+ # ---------------------------------------------------------------
101+ df_entities = pd .concat (entity_dfs , ignore_index = True )
102+
103+ # Normalize entity IDs to numeric for consistent merging
104+ df_entities ['entity' ] = pd .to_numeric (
105+ df_entities ['entity' ], errors = 'coerce'
106+ ).astype ('Int64' )
107+ df_final ['entity' ] = pd .to_numeric (
108+ df_final ['entity' ], errors = 'coerce'
109+ ).astype ('Int64' )
110+
111+ # Merge with entity metadata
112+ df_final = df_final .merge (
113+ df_entities ,
114+ how = 'left' ,
115+ on = ['dataset' , 'entity' ],
116+ validate = 'm:1' ,
117+ suffixes = ('' , '_entity' )
118+ )
119+
120+ # ---------------------------------------------------------------
121+ # Select and order final columns
122+ # ---------------------------------------------------------------
123+ final_cols = [
124+ 'dataset' ,
125+ 'entity' ,
126+ 'organisation' ,
127+ 'organisation-name' ,
128+ 'organisation-entity' ,
129+ 'name' ,
130+ 'reference'
131+ ]
132+
133+ df_final = df_final [final_cols ].copy ()
134+
135+ # ---------------------------------------------------------------
136+ # Save output
137+ # ---------------------------------------------------------------
138+ os .makedirs (output_dir , exist_ok = True )
139+ output_file = os .path .join (output_dir , 'deleted_entities.csv' )
140+ df_final .to_csv (output_file , index = False )
141+
142+ def parse_args ():
143+ import argparse
144+ parser = argparse .ArgumentParser (
145+ description = "Fetch and enrich deleted entities from expectations"
146+ )
147+ parser .add_argument (
148+ "--output-dir" ,
149+ type = str ,
150+ required = True ,
151+ help = "Output directory for CSV file"
152+ )
153+ return parser .parse_args ()
154+
155+
156+ if __name__ == "__main__" :
157+ args = parse_args ()
158+ main (args .output_dir )
0 commit comments