Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions Loading_Scripts/Python/API_and_Spliter
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
"""
PREREQUISITE
Python 3.x - this script has been amended for Python 3.x (tested on 3.7) and only tested on unzipped csv files

DESCRIPTION
This Python script can be used to download AB usising OS download API and then
will continue to seperate each AddressBase Premium CSV file into new CSV files
based on the record identifiers found within AddressBase Premium. The script
will read both AddressBase Premium CSV files and zipped CSV files. If you
have zipped files it is unneccessary to extract the zip file first as the
script reads data directly from the zip file.

# FIRST STEP : you will need to create a Download API in the osdata hub to obtain your API key

# SECOND STEP : There are 4 viarables to change in this script to your own prefered settings.

# api_key --- Created using the
# data_Packages --- Used to choose your data in the os datahub
# base_directory --- Base directory where files should be saved
# output_directory --- Directory to store processed files

# THIRD STEP : Install modules if required, however these modules are part of Python's standard library, meaning they come pre-installed with Python and you can use them without needing to install anything extra.

"""


import zipfile
import csv
import sys
import os
import io
import time
import requests
from time import strftime

# Replace 'YOUR_API_KEY' with your actual API key and 'YOUR_data_Packages' with your actual package
api_key = 'YOUR_API_KEY'
data_Packages = 'YOUR_data_Packages'

# Define the API endpoint URL
versions_url = f'https://api.os.uk/downloads/v1/dataPackages/{data_Packages}/versions'

# Define headers for the request
headers = {
'accept': 'application/json'
}

# Record the time when the download is submitted
time_submitted = time.time()

# Make the GET request to fetch the list of versions
response = requests.get(versions_url, params={'key': api_key}, headers=headers)

# Base directory where files should be saved
base_directory = "/lakehouse/default/Files/ABP_Download/"

# Check if the request was successful
if response.status_code == 200:
# Extract the most recent version ID
versions = response.json()
most_recent_version = versions[0]

# Extract details of the most recent version
most_recent_id = most_recent_version['id']
most_recent_product_version = most_recent_version['productVersion']

directory_name = os.path.join(base_directory, most_recent_product_version)

# Create a new directory with the product version name if it doesn't exist
if not os.path.exists(directory_name):

os.mkdir(directory_name)
else:
# Skip to the end of the script
time_completed = time.time()
total_time_seconds = time_completed - time_submitted
print(f"Total time taken: {total_time_seconds / 60:.2f} minutes")
pass

# If directory creation was successful or directory already exists, continue with the script

# Construct the URL for fetching details of the most recent version
version_details_url = f'https://api.os.uk/downloads/v1/dataPackages/{data_Packages}/versions/{most_recent_id}'

# Make the GET request to fetch the details of the most recent version
version_details_response = requests.get(version_details_url, params={'key': api_key}, headers=headers)

# Check if the request was successful
if version_details_response.status_code == 200:
# Extract the download URLs for all files associated with the most recent version
version_details = version_details_response.json()
downloads = version_details.get('downloads', [])

# Download each file and save it to the newly created directory
for download in downloads:
file_name = download['fileName']
file_url = download['url']

# Check if the file already exists
file_path = os.path.join(directory_name, file_name)
if os.path.exists(file_path):
continue

# Make a GET request to download the file
file_response = requests.get(file_url, allow_redirects=True, stream=True)

# Check if the request was successful
if file_response.status_code == 200:
# Save the file to the new directory without progress bar
with open(file_path, 'wb') as f:
for chunk in file_response.iter_content(chunk_size=1024):
f.write(chunk)
else:
print(f"Failed to download {file_name}: {file_response.status_code}")
else:
print(f"Failed to fetch version details: {version_details_response.status_code}")

# Record the time when the download is completed
time_completed = time.time()

# Calculate the total time taken for the download
total_time_seconds = time_completed - time_submitted
print(f"Total time taken: {total_time_seconds / 60:.2f} minutes")


# Header lines for the new CSV files, used later when writing the header to the new CSV files
CSV_CONFIG = {
"10": {"headings": ["RECORD_IDENTIFIER", "CUSTODIAN_NAME", "LOCAL_CUSTODIAN_NAME", "PROCESS_DATE", "VOLUME_NUMBER", "ENTRY_DATE", "TIME_STAMP", "VERSION", "FILE_TYPE"],
"dtype_mapping": None, "CSV_filename": "ID10_Header_Records", "sql_table": "dbo_TEST_abp_headers", "DATABASE": "FALSE"},
"11": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "USRN", "RECORD_TYPE", "SWA_ORG_REF_NAMING", "STATE", "STATE_DATE", "STREET_SURFACE", "STREET_CLASSIFICATION", "VERSION", "STREET_START_DATE", "STREET_END_DATE", "LAST_UPDATE_DATE", "RECORD_ENTRY_DATE", "STREET_START_X", "STREET_START_Y", "STREET_START_LAT", "STREET_START_LONG", "STREET_END_X", "STREET_END_Y", "STREET_END_LAT", "STREET_END_LONG", "STREET_TOLERANCE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "USRN": "Int32", "RECORD_TYPE": "Int16", "SWA_ORG_REF_NAMING": "Int16", "STATE": "Int16", "STATE_DATE": "string", "STREET_SURFACE": "Int16", "STREET_CLASSIFICATION": "Int16", "VERSION": "Int16", "STREET_START_DATE": "string", "STREET_END_DATE": "string", "LAST_UPDATE_DATE": "string", "RECORD_ENTRY_DATE": "string", "STREET_START_X": "float64", "STREET_START_Y": "float64", "STREET_START_LAT": "float64", "STREET_START_LONG": "float64", "STREET_END_X": "float64", "STREET_END_Y": "float64", "STREET_END_LAT": "float64", "STREET_END_LONG": "float64", "STREET_TOLERANCE": "Int16"
}, "CSV_filename": "ID11_Street_Records", "sql_table": "dbo_TEST_abp_street", "DATABASE": "TRUE"},
"15": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "USRN", "STREET_DESCRIPTION", "LOCALITY_NAME", "TOWN_NAME", "ADMINSTRATIVE_AREA", "LANGUAGE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "USRN": "Int32", "STREET_DESCRIPTION": "string", "LOCALITY_NAME": "string", "TOWN_NAME": "string", "ADMINSTRATIVE_AREA": "string", "LANGUAGE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string"
}, "CSV_filename": "ID15_StreetDesc_Records", "sql_table": "dbo_TEST_abp_street_descriptor", "DATABASE": "TRUE"},
"21": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "LOGICAL_STATUS", "BLPU_STATE", "BLPU_STATE_DATE", "PARENT_UPRN", "X_COORDINATE", "Y_COORDINATE", "LATITUDE", "LONGITUDE", "RPC", "LOCAL_CUSTODIAN_CODE", "COUNTRY", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "ADDRESSBASE_POSTAL", "POSTCODE_LOCATOR", "MULTI_OCC_COUNT"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "LOGICAL_STATUS": "Int16", "BLPU_STATE": "Int16", "BLPU_STATE_DATE": "string", "PARENT_UPRN": "Int64", "X_COORDINATE": "float64", "Y_COORDINATE": "float64", "LATITUDE": "float64", "LONGITUDE": "float64", "RPC": "Int16", "LOCAL_CUSTODIAN_CODE": "Int16", "COUNTRY": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "ADDRESSBASE_POSTAL": "string", "POSTCODE_LOCATOR": "string", "MULTI_OCC_COUNT": "Int16"
}, "CSV_filename": "ID21_BLPU_Records", "sql_table": "dbo_TEST_abp_blpu", "DATABASE": "TRUE"},
"23": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "XREF_KEY", "CROSS_REFERENCE", "VERSION", "SOURCE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "XREF_KEY": "string", "CROSS_REFERENCE": "string", "VERSION": "Int16", "SOURCE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string"
}, "CSV_filename": "ID23_XREF_Records", "sql_table": "dbo_TEST_abp_crossref", "DATABASE": "TRUE"},
"24": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "LPI_KEY", "LANGUAGE", "LOGICAL_STATUS", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "SAO_START_NUMBER", "SAO_START_SUFFIX", "SAO_END_NUMBER", "SAO_END_SUFFIX", "SAO_TEXT", "PAO_START_NUMBER", "PAO_START_SUFFIX", "PAO_END_NUMBER", "PAO_END_SUFFIX", "PAO_TEXT", "USRN", "USRN_MATCH_INDICATOR", "AREA_NAME", "LEVEL", "OFFICIAL_FLAG"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "LPI_KEY": "string", "LANGUAGE": "string", "LOGICAL_STATUS": "Int16", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "SAO_START_NUMBER": "Int16", "SAO_START_SUFFIX": "string", "SAO_END_NUMBER": "Int16", "SAO_END_SUFFIX": "string", "SAO_TEXT": "string", "PAO_START_NUMBER": "Int16", "PAO_START_SUFFIX": "string", "PAO_END_NUMBER": "Int16", "PAO_END_SUFFIX": "string", "PAO_TEXT": "string", "USRN": "Int32", "USRN_MATCH_INDICATOR": "string", "AREA_NAME": "string", "LEVEL": "string", "OFFICIAL_FLAG": "string"
}, "CSV_filename": "ID24_LPI_Records", "sql_table": "dbo_TEST_abp_lpi", "DATABASE": "TRUE"},
"28": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "UDPRN", "ORGANISATION_NAME", "DEPARTMENT_NAME", "SUB_BUILDING_NAME", "BUILDING_NAME", "BUILDING_NUMBER", "DEPENDENT_THOROUGHFARE", "THOROUGHFARE", "DOUBLE_DEPENDENT_LOCALITY", "DEPENDENT_LOCALITY", "POST_TOWN", "POSTCODE", "POSTCODE_TYPE", "DELIVERY_POINT_SUFFIX", "WELSH_DEPENDENT_THOROUGHFARE", "WELSH_THOROUGHFARE", "WELSH_DOUBLE_DEPENDENT_LOCALITY", "WELSH_DEPENDENT_LOCALITY", "WELSH_POST_TOWN", "PO_BOX_NUMBER", "PROCESS_DATE", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "UDPRN": "Int64", "ORGANISATION_NAME": "string", "DEPARTMENT_NAME": "string", "SUB_BUILDING_NAME": "string", "BUILDING_NAME": "string", "BUILDING_NUMBER": "Int16", "DEPENDENT_THOROUGHFARE": "string", "THOROUGHFARE": "string", "DOUBLE_DEPENDENT_LOCALITY": "string", "DEPENDENT_LOCALITY": "string", "POST_TOWN": "string", "POSTCODE": "string", "POSTCODE_TYPE": "string", "DELIVERY_POINT_SUFFIX": "string", "WELSH_DEPENDENT_THOROUGHFARE": "string", "WELSH_THOROUGHFARE": "string", "WELSH_DOUBLE_DEPENDENT_LOCALITY": "string", "WELSH_DEPENDENT_LOCALITY": "string", "WELSH_POST_TOWN": "string", "PO_BOX_NUMBER": "string", "PROCESS_DATE": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string"
}, "CSV_filename": "ID28_DPA_Records", "sql_table": "dbo_TEST_abp_delivery_point", "DATABASE": "TRUE"},
"29": {"headings": ["RECORD_IDENTIFIER", "GAZ_NAME", "GAZ_SCOPE", "TER_OF_USE", "LINKED_DATA", "GAZ_OWNER", "NGAZ_FREQ", "CUSTODIAN_NAME", "CUSTODIAN_UPRN", "LOCAL_CUSTODIAN_CODE", "CO_ORD_SYSTEM", "CO_ORD_UNIT", "META_DATE", "CLASS_SCHEME", "GAZ_DATE", "LANGUAGE", "CHARACTER_SET"],
"dtype_mapping": None, "CSV_filename": "ID29_Metadata_Records", "sql_table": "dbo_TEST_abp_metadata", "DATABASE": "FALSE"},
"30": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "SUCC_KEY", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE", "SUCCESSOR"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "SUCC_KEY": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string", "SUCCESSOR": "Int64"
}, "CSV_filename": "ID30_Successor_Records", "sql_table": "dbo_TEST_abp_successor", "DATABASE": "TRUE"},
"31": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "ORG_KEY", "ORGANISATION", "LEGAL_NAME", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "ORG_KEY": "string", "ORGANISATION": "string", "LEGAL_NAME": "string", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string"
}, "CSV_filename": "ID31_Org_Records", "sql_table": "dbo_TEST_abp_organisation", "DATABASE": "TRUE"},
"32": {"headings": ["RECORD_IDENTIFIER", "CHANGE_TYPE", "PRO_ORDER", "UPRN", "CLASS_KEY", "CLASSIFICATION_CODE", "CLASS_SCHEME", "SCHEME_VERSION", "START_DATE", "END_DATE", "LAST_UPDATE_DATE", "ENTRY_DATE"],
"dtype_mapping": {"RECORD_IDENTIFIER": "Int16", "CHANGE_TYPE": "string", "PRO_ORDER": "Int64", "UPRN": "Int64", "CLASS_KEY": "string", "CLASSIFICATION_CODE": "string", "CLASS_SCHEME": "string", "SCHEME_VERSION": "float64", "START_DATE": "string", "END_DATE": "string", "LAST_UPDATE_DATE": "string", "ENTRY_DATE": "string"
}, "CSV_filename": "ID32_Class_Records", "sql_table": "dbo_TEST_abp_classification", "DATABASE": "TRUE"},
"99": {"headings": ["RECORD_IDENTIFIER", "NEXT_VOLUME_NUMBER", "RECORD_COUNT", "ENTRY_DATE", "TIME_STAMP"],
"dtype_mapping": None, "CSV_filename": "ID99_Trailer_Records", "sql_table": "dbo_TEST_abp_trailer", "DATABASE": "FALSE"
}
}

print(f'{directory_name}')

# Directory to store processed files
output_directory = "/lakehouse/default/Files/ABP_processed/"

print('This program will split OS AddressBase Premium Zip CSV or extracted CSV files by record identifier into new CSV files')
start_time = time.time()

# Initialize counters dictionary
counters = {record_type: 0 for record_type in CSV_CONFIG}
# An empty dictionary to store csv writers
writers = {}

# Initialize a set to track which writers have been created
created_writers = set()
# Initialize the writers dictionary with keys from CSV_CONFIG
writers = {record_type: None for record_type in CSV_CONFIG}


# Clear existing CSV files by deleting them
for record_type in CSV_CONFIG:
file_path = os.path.join(output_directory, f"{CSV_CONFIG[record_type]['CSV_filename']}.csv")
if os.path.exists(file_path):
# If the file exists, delete it
os.remove(file_path)
# Now create a new file with headers if necessary
with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(CSV_CONFIG[record_type]['headings'])


# Creating CSV files and writers
for root, _, filenames in os.walk(directory_name):
for filename in filenames:
if filename.endswith(".zip"):
with zipfile.ZipFile(os.path.join(root, filename), 'r') as zip_ref:
for file_name in zip_ref.namelist():
if file_name.endswith(".csv"):
with zip_ref.open(file_name) as csvfile:
csv_data = csvfile.read().decode('utf-8') # Read entire CSV file into memory
csvreader = csv.reader(io.StringIO(csv_data)) # Use StringIO to create a file-like object

# Process CSV rows and write to appropriate CSV files
for row in csvreader:
record_type = row[0][:2]
if record_type not in writers:
print(f"Record type {record_type} not found in writers dictionary.")
elif writers[record_type] is None:
# Create CSV writer if not already created
file_path = os.path.join(output_directory, f"{CSV_CONFIG[record_type]['CSV_filename']}.csv")
print(f"File path: {file_path}")
writers[record_type] = open(file_path, 'a', newline='', encoding='utf-8') # Open file in append mode
csv_writer = csv.writer(writers[record_type])
else:
csv_writer = csv.writer(writers[record_type])

# Write the current row to the CSV file
csv_writer.writerow(row)
# Update the counter
counters[record_type] += 1

# Update the created writers set after processing each CSV file
created_writers.update({record_type for record_type in writers if writers[record_type] is not None})

# Close the CSV files after processing all rows
for writer in writers.values():
if writer is not None:
writer.close()

# Calculating elapsed time
end_time = time.time()
elapsed = end_time - start_time

# Printing summary statistics
print("Program has finished splitting the AddressBase Premium Files")
print('Finished translating data at', strftime("%a, %d %b %Y %H:%M:%S"))
print('Elapsed time:', round(elapsed / 60, 1), 'minutes')
for record_type, count in counters.items():
print(f"Number of {record_type} Records: {count}")

print("The program will close in 10 seconds")
time.sleep(10)