From f524bdafd6a20859f18489dcde6654491c9b7861 Mon Sep 17 00:00:00 2001 From: ANorthDDfire <156321499+ANorthDDfire@users.noreply.github.com> Date: Tue, 7 May 2024 18:06:53 +0100 Subject: [PATCH] Create new file : API_and_Spliter --- Loading_Scripts/Python/API_and_Spliter | 245 +++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 Loading_Scripts/Python/API_and_Spliter diff --git a/Loading_Scripts/Python/API_and_Spliter b/Loading_Scripts/Python/API_and_Spliter new file mode 100644 index 0000000..f216964 --- /dev/null +++ b/Loading_Scripts/Python/API_and_Spliter @@ -0,0 +1,245 @@ +""" +PREREQUISITE + Python 3.x - this script has been amended for Python 3.x (tested on 3.7) and only tested on unzipped csv files + +DESCRIPTION + This Python script can be used to download AB usising OS download API and then + will continue to seperate each AddressBase Premium CSV file into new CSV files + based on the record identifiers found within AddressBase Premium. The script + will read both AddressBase Premium CSV files and zipped CSV files. If you + have zipped files it is unneccessary to extract the zip file first as the + script reads data directly from the zip file. + +# FIRST STEP : you will need to create a Download API in the osdata hub to obtain your API key + +# SECOND STEP : There are 4 viarables to change in this script to your own prefered settings. + + # api_key --- Created using the + # data_Packages --- Used to choose your data in the os datahub + # base_directory --- Base directory where files should be saved + # output_directory --- Directory to store processed files + +# THIRD STEP : Install modules if required, however these modules are part of Python's standard library, meaning they come pre-installed with Python and you can use them without needing to install anything extra. + +""" + + +import zipfile +import csv +import sys +import os +import io +import time +import requests +from time import strftime + +# Replace 'YOUR_API_KEY' with your actual API key and 'YOUR_data_Packages' with your actual package +api_key = 'YOUR_API_KEY' +data_Packages = 'YOUR_data_Packages' + +# Define the API endpoint URL +versions_url = f'https://api.os.uk/downloads/v1/dataPackages/{data_Packages}/versions' + +# Define headers for the request +headers = { + 'accept': 'application/json' +} + +# Record the time when the download is submitted +time_submitted = time.time() + +# Make the GET request to fetch the list of versions +response = requests.get(versions_url, params={'key': api_key}, headers=headers) + +# Base directory where files should be saved +base_directory = "/lakehouse/default/Files/ABP_Download/" + +# Check if the request was successful +if response.status_code == 200: + # Extract the most recent version ID + versions = response.json() + most_recent_version = versions[0] + + # Extract details of the most recent version + most_recent_id = most_recent_version['id'] + most_recent_product_version = most_recent_version['productVersion'] + + directory_name = os.path.join(base_directory, most_recent_product_version) + + # Create a new directory with the product version name if it doesn't exist + if not os.path.exists(directory_name): + + os.mkdir(directory_name) + else: + # Skip to the end of the script + time_completed = time.time() + total_time_seconds = time_completed - time_submitted + print(f"Total time taken: {total_time_seconds / 60:.2f} minutes") + pass + +# If directory creation was successful or directory already exists, continue with the script + +# Construct the URL for fetching details of the most recent version +version_details_url = f'https://api.os.uk/downloads/v1/dataPackages/{data_Packages}/versions/{most_recent_id}' + +# Make the GET request to fetch the details of the most recent version +version_details_response = requests.get(version_details_url, params={'key': api_key}, headers=headers) + +# Check if the request was successful +if version_details_response.status_code == 200: + # Extract the download URLs for all files associated with the most recent version + version_details = version_details_response.json() + downloads = version_details.get('downloads', []) + + # Download each file and save it to the newly created directory + for download in downloads: + file_name = download['fileName'] + file_url = download['url'] + + # Check if the file already exists + file_path = os.path.join(directory_name, file_name) + if os.path.exists(file_path): + continue + + # Make a GET request to download the file + file_response = requests.get(file_url, allow_redirects=True, stream=True) + + # Check if the request was successful + if file_response.status_code == 200: + # Save the file to the new directory without progress bar + with open(file_path, 'wb') as f: + for chunk in file_response.iter_content(chunk_size=1024): + f.write(chunk) + else: + print(f"Failed to download {file_name}: {file_response.status_code}") +else: + print(f"Failed to fetch version details: {version_details_response.status_code}") + +# Record the time when the download is completed +time_completed = time.time() + +# Calculate the total time taken for the download +total_time_seconds = time_completed - time_submitted +print(f"Total time taken: {total_time_seconds / 60:.2f} minutes") + + +# Header lines for the new CSV files, used later when writing the header to the new CSV files +CSV_CONFIG = { + "10": {"headings": ["RECORD_IDENTIFIER", "CUSTODIAN_NAME", "LOCAL_CUSTODIAN_NAME", "PROCESS_DATE", "VOLUME_NUMBER", "ENTRY_DATE", "TIME_STAMP", "VERSION", "FILE_TYPE"], + "dtype_mapping": None, "CSV_filename": "ID10_Header_Records", "sql_table": "dbo_TEST_abp_headers", "DATABASE": "FALSE"}, + "11": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "USRN", "RECORD_TYPE", "SWA_ORG_REF_NAMING", "STATE", "STATE_DATE", "STREET_SURFACE", "STREET_CLASSIFICATION", "VERSION", "STREET_START_DATE", "STREET_END_DATE", "LAST_UPDATE_DATE", "RECORD_ENTRY_DATE", "STREET_START_X", "STREET_START_Y", "STREET_START_LAT", "STREET_START_LONG", "STREET_END_X", "STREET_END_Y", "STREET_END_LAT", "STREET_END_LONG", "STREET_TOLERANCE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "USRN": "Int32", "RECORD_TYPE": "Int16", "SWA_ORG_REF_NAMING": "Int16", "STATE": "Int16", "STATE_DATE": "string", "STREET_SURFACE": "Int16", "STREET_CLASSIFICATION": "Int16", "VERSION": "Int16", "STREET_START_DATE": "string", "STREET_END_DATE": "string", "LAST_UPDATE_DATE": "string", "RECORD_ENTRY_DATE": "string", "STREET_START_X": "float64", "STREET_START_Y": "float64", "STREET_START_LAT": "float64", "STREET_START_LONG": "float64", "STREET_END_X": "float64", "STREET_END_Y": "float64", "STREET_END_LAT": "float64", "STREET_END_LONG": "float64", "STREET_TOLERANCE": "Int16" + }, "CSV_filename": "ID11_Street_Records", "sql_table": "dbo_TEST_abp_street", "DATABASE": "TRUE"}, + "15": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "USRN", "STREET_DESCRIPTION", "LOCALITY_NAME", "TOWN_NAME", "ADMINSTRATIVE_AREA", "LANGUAGE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "USRN": "Int32", "STREET_DESCRIPTION": "string", "LOCALITY_NAME": "string", "TOWN_NAME": "string", "ADMINSTRATIVE_AREA": "string", "LANGUAGE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string" + }, "CSV_filename": "ID15_StreetDesc_Records", "sql_table": "dbo_TEST_abp_street_descriptor", "DATABASE": "TRUE"}, + "21": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "LOGICAL_STATUS", "BLPU_STATE", "BLPU_STATE_DATE", "PARENT_UPRN", "X_COORDINATE", "Y_COORDINATE", "LATITUDE", "LONGITUDE", "RPC", "LOCAL_CUSTODIAN_CODE", "COUNTRY", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "ADDRESSBASE_POSTAL", "POSTCODE_LOCATOR", "MULTI_OCC_COUNT"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "LOGICAL_STATUS": "Int16", "BLPU_STATE": "Int16", "BLPU_STATE_DATE": "string", "PARENT_UPRN": "Int64", "X_COORDINATE": "float64", "Y_COORDINATE": "float64", "LATITUDE": "float64", "LONGITUDE": "float64", "RPC": "Int16", "LOCAL_CUSTODIAN_CODE": "Int16", "COUNTRY": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "ADDRESSBASE_POSTAL": "string", "POSTCODE_LOCATOR": "string", "MULTI_OCC_COUNT": "Int16" + }, "CSV_filename": "ID21_BLPU_Records", "sql_table": "dbo_TEST_abp_blpu", "DATABASE": "TRUE"}, + "23": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "XREF_KEY", "CROSS_REFERENCE", "VERSION", "SOURCE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "XREF_KEY": "string", "CROSS_REFERENCE": "string", "VERSION": "Int16", "SOURCE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string" + }, "CSV_filename": "ID23_XREF_Records", "sql_table": "dbo_TEST_abp_crossref", "DATABASE": "TRUE"}, + "24": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "LPI_KEY", "LANGUAGE", "LOGICAL_STATUS", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "SAO_START_NUMBER", "SAO_START_SUFFIX", "SAO_END_NUMBER", "SAO_END_SUFFIX", "SAO_TEXT", "PAO_START_NUMBER", "PAO_START_SUFFIX", "PAO_END_NUMBER", "PAO_END_SUFFIX", "PAO_TEXT", "USRN", "USRN_MATCH_INDICATOR", "AREA_NAME", "LEVEL", "OFFICIAL_FLAG"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "LPI_KEY": "string", "LANGUAGE": "string", "LOGICAL_STATUS": "Int16", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "SAO_START_NUMBER": "Int16", "SAO_START_SUFFIX": "string", "SAO_END_NUMBER": "Int16", "SAO_END_SUFFIX": "string", "SAO_TEXT": "string", "PAO_START_NUMBER": "Int16", "PAO_START_SUFFIX": "string", "PAO_END_NUMBER": "Int16", "PAO_END_SUFFIX": "string", "PAO_TEXT": "string", "USRN": "Int32", "USRN_MATCH_INDICATOR": "string", "AREA_NAME": "string", "LEVEL": "string", "OFFICIAL_FLAG": "string" + }, "CSV_filename": "ID24_LPI_Records", "sql_table": "dbo_TEST_abp_lpi", "DATABASE": "TRUE"}, + "28": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "UDPRN", "ORGANISATION_NAME", "DEPARTMENT_NAME", "SUB_BUILDING_NAME", "BUILDING_NAME", "BUILDING_NUMBER", "DEPENDENT_THOROUGHFARE", "THOROUGHFARE", "DOUBLE_DEPENDENT_LOCALITY", "DEPENDENT_LOCALITY", "POST_TOWN", "POSTCODE", "POSTCODE_TYPE", "DELIVERY_POINT_SUFFIX", "WELSH_DEPENDENT_THOROUGHFARE", "WELSH_THOROUGHFARE", "WELSH_DOUBLE_DEPENDENT_LOCALITY", "WELSH_DEPENDENT_LOCALITY", "WELSH_POST_TOWN", "PO_BOX_NUMBER", "PROCESS_DATE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "UDPRN": "Int64", "ORGANISATION_NAME": "string", "DEPARTMENT_NAME": "string", "SUB_BUILDING_NAME": "string", "BUILDING_NAME": "string", "BUILDING_NUMBER": "Int16", "DEPENDENT_THOROUGHFARE": "string", "THOROUGHFARE": "string", "DOUBLE_DEPENDENT_LOCALITY": "string", "DEPENDENT_LOCALITY": "string", "POST_TOWN": "string", "POSTCODE": "string", "POSTCODE_TYPE": "string", "DELIVERY_POINT_SUFFIX": "string", "WELSH_DEPENDENT_THOROUGHFARE": "string", "WELSH_THOROUGHFARE": "string", "WELSH_DOUBLE_DEPENDENT_LOCALITY": "string", "WELSH_DEPENDENT_LOCALITY": "string", "WELSH_POST_TOWN": "string", "PO_BOX_NUMBER": "string", "PROCESS_DATE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string" + }, "CSV_filename": "ID28_DPA_Records", "sql_table": "dbo_TEST_abp_delivery_point", "DATABASE": "TRUE"}, + "29": {"headings": ["RECORD_IDENTIFIER", "GAZ_NAME", "GAZ_SCOPE", "TER_OF_USE", "LINKED_DATA", "GAZ_OWNER", "NGAZ_FREQ", "CUSTODIAN_NAME", "CUSTODIAN_UPRN", "LOCAL_CUSTODIAN_CODE", "CO_ORD_SYSTEM", "CO_ORD_UNIT", "META_DATE", "CLASS_SCHEME", "GAZ_DATE", "LANGUAGE", "CHARACTER_SET"], + "dtype_mapping": None, "CSV_filename": "ID29_Metadata_Records", "sql_table": "dbo_TEST_abp_metadata", "DATABASE": "FALSE"}, + "30": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "SUCC_KEY", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "SUCCESSOR"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "SUCC_KEY": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "SUCCESSOR": "Int64" + }, "CSV_filename": "ID30_Successor_Records", "sql_table": "dbo_TEST_abp_successor", "DATABASE": "TRUE"}, + "31": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "ORG_KEY", "ORGANISATION", "LEGAL_NAME", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "ORG_KEY": "string", "ORGANISATION": "string", "LEGAL_NAME": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string" + }, "CSV_filename": "ID31_Org_Records", "sql_table": "dbo_TEST_abp_organisation", "DATABASE": "TRUE"}, + "32": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "CLASS_KEY", "CLASSIFICATION_CODE", "CLASS_SCHEME", "SCHEME_VERSION", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"], + "dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "CLASS_KEY": "string", "CLASSIFICATION_CODE": "string", "CLASS_SCHEME": "string", "SCHEME_VERSION": "float64", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string" + }, "CSV_filename": "ID32_Class_Records", "sql_table": "dbo_TEST_abp_classification", "DATABASE": "TRUE"}, + "99": {"headings": ["RECORD_IDENTIFIER", "NEXT_VOLUME_NUMBER", "RECORD_COUNT", "ENTRY_DATE", "TIME_STAMP"], + "dtype_mapping": None, "CSV_filename": "ID99_Trailer_Records", "sql_table": "dbo_TEST_abp_trailer", "DATABASE": "FALSE" + } +} + +print(f'{directory_name}') + +# Directory to store processed files +output_directory = "/lakehouse/default/Files/ABP_processed/" + +print('This program will split OS AddressBase Premium Zip CSV or extracted CSV files by record identifier into new CSV files') +start_time = time.time() + +# Initialize counters dictionary +counters = {record_type: 0 for record_type in CSV_CONFIG} +# An empty dictionary to store csv writers +writers = {} + +# Initialize a set to track which writers have been created +created_writers = set() +# Initialize the writers dictionary with keys from CSV_CONFIG +writers = {record_type: None for record_type in CSV_CONFIG} + + +# Clear existing CSV files by deleting them +for record_type in CSV_CONFIG: + file_path = os.path.join(output_directory, f"{CSV_CONFIG[record_type]['CSV_filename']}.csv") + if os.path.exists(file_path): + # If the file exists, delete it + os.remove(file_path) + # Now create a new file with headers if necessary + with open(file_path, 'w', newline='', encoding='utf-8') as csvfile: + csv_writer = csv.writer(csvfile) + csv_writer.writerow(CSV_CONFIG[record_type]['headings']) + + +# Creating CSV files and writers +for root, _, filenames in os.walk(directory_name): + for filename in filenames: + if filename.endswith(".zip"): + with zipfile.ZipFile(os.path.join(root, filename), 'r') as zip_ref: + for file_name in zip_ref.namelist(): + if file_name.endswith(".csv"): + with zip_ref.open(file_name) as csvfile: + csv_data = csvfile.read().decode('utf-8') # Read entire CSV file into memory + csvreader = csv.reader(io.StringIO(csv_data)) # Use StringIO to create a file-like object + + # Process CSV rows and write to appropriate CSV files + for row in csvreader: + record_type = row[0][:2] + if record_type not in writers: + print(f"Record type {record_type} not found in writers dictionary.") + elif writers[record_type] is None: + # Create CSV writer if not already created + file_path = os.path.join(output_directory, f"{CSV_CONFIG[record_type]['CSV_filename']}.csv") + print(f"File path: {file_path}") + writers[record_type] = open(file_path, 'a', newline='', encoding='utf-8') # Open file in append mode + csv_writer = csv.writer(writers[record_type]) + else: + csv_writer = csv.writer(writers[record_type]) + + # Write the current row to the CSV file + csv_writer.writerow(row) + # Update the counter + counters[record_type] += 1 + + # Update the created writers set after processing each CSV file + created_writers.update({record_type for record_type in writers if writers[record_type] is not None}) + +# Close the CSV files after processing all rows +for writer in writers.values(): + if writer is not None: + writer.close() + +# Calculating elapsed time +end_time = time.time() +elapsed = end_time - start_time + +# Printing summary statistics +print("Program has finished splitting the AddressBase Premium Files") +print('Finished translating data at', strftime("%a, %d %b %Y %H:%M:%S")) +print('Elapsed time:', round(elapsed / 60, 1), 'minutes') +for record_type, count in counters.items(): + print(f"Number of {record_type} Records: {count}") + +print("The program will close in 10 seconds") +time.sleep(10)