diff --git a/.vscode/launch.json b/.vscode/launch.json index b70d11a9..c1fde5d9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -3,11 +3,33 @@ "python": "${workspaceFolder}/request-processor/.venv/bin/python", "configurations": [ { - "name": "Debug Request Processor - Manual Task Trigger", + "name": "Debug Request Processor - Manual Check URL Task Trigger", "type": "python", "request": "launch", "python": "${workspaceFolder}/request-processor/.venv/bin/python", - "program": "${workspaceFolder}/scripts/debug_trigger.py", + "program": "${workspaceFolder}/scripts/debug_trigger_checkurl.py", + "console": "integratedTerminal", + "justMyCode": false, + "env": { + "PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/request-processor:${workspaceFolder}/request-processor/.venv/src/digital-land", + "DATABASE_URL": "postgresql://postgres:password@localhost:54320/request_database", + "CELERY_BROKER_URL": "redis://localhost:6379/0", + "AWS_ENDPOINT_URL": "http://localhost:4566", + "AWS_DEFAULT_REGION": "eu-west-2", + "AWS_ACCESS_KEY_ID": "example", + "AWS_SECRET_ACCESS_KEY": "example", + "REQUEST_FILES_BUCKET_NAME": "dluhc-data-platform-request-files-local", + "SENTRY_ENABLED": "false" + }, + "args": [], + "cwd": "${workspaceFolder}/request-processor" + }, + { + "name": "Debug Request Processor - Manual Add Data Task", + "type": "python", + "request": "launch", + "python": "${workspaceFolder}/request-processor/.venv/bin/python", + "program": "${workspaceFolder}/scripts/debug_trigger_add_data.py", "console": "integratedTerminal", "justMyCode": false, "env": { diff --git a/request-processor/makerules/makerules.mk b/request-processor/makerules/makerules.mk index c2868586..16a486d2 100644 --- a/request-processor/makerules/makerules.mk +++ b/request-processor/makerules/makerules.mk @@ -70,6 +70,7 @@ endif config: # local copy of organsiation datapackage @mkdir -p $(CACHE_DIR) - curl -qfs "https://raw.githubusercontent.com/digital-land/organisation-dataset/main/collection/organisation.csv" > $(CACHE_DIR)organisation.csv + curl -qfs "https://files.planning.data.gov.uk/organisation-collection/dataset/organisation.csv" > $(CACHE_DIR)organisation.csv + init:: config diff --git a/request-processor/makerules/python.mk b/request-processor/makerules/python.mk index 75e64a15..a8ea532a 100644 --- a/request-processor/makerules/python.mk +++ b/request-processor/makerules/python.mk @@ -16,4 +16,4 @@ coverage-unit: pytest --random-order --cov=src tests/unit/ coverage-integration: - pytest --random-order --cov=src --cov-append --cov-fail-under=85 tests/integration/ + pytest --random-order --cov=src --cov-append --cov-fail-under=80 tests/integration/ diff --git a/request-processor/src/application/configurations/config.py b/request-processor/src/application/configurations/config.py index 08ae1891..9fe21070 100644 --- a/request-processor/src/application/configurations/config.py +++ b/request-processor/src/application/configurations/config.py @@ -2,7 +2,7 @@ source_url = "https://raw.githubusercontent.com/digital-land/" DATASTORE_URL = os.getenv("DATASTORE_URL", "https://files.planning.data.gov.uk/") -CONFIG_URL = f"{DATASTORE_URL}config/" +CONFIG_URL = f"{source_url}config/refs/heads/main/" class Directories: diff --git a/request-processor/src/application/core/pipeline.py b/request-processor/src/application/core/pipeline.py index 71171be7..206b36e8 100644 --- a/request-processor/src/application/core/pipeline.py +++ b/request-processor/src/application/core/pipeline.py @@ -3,12 +3,10 @@ from application.logging.logger import get_logger from digital_land.specification import Specification from digital_land.organisation import Organisation +from digital_land.api import API from digital_land.pipeline import Pipeline, Lookups from digital_land.commands import get_resource_unidentified_lookups -from digital_land.api import API -from application.core.utils import append_endpoint, append_source -from datetime import datetime from pathlib import Path logger = get_logger(__name__) @@ -42,7 +40,7 @@ def fetch_response_data( try: for file_name in files_in_resource: file_path = os.path.join(input_path, file_name) - # retrieve unnassigned entities and assign + # retrieve unnassigned entities and assign, TODO: Is this necessary here? assign_entries( resource_path=file_path, dataset=dataset, @@ -50,9 +48,10 @@ def fetch_response_data( pipeline_dir=pipeline_dir, specification=specification, cache_dir=cache_dir, + endpoints=[], ) except Exception as err: - logger.error("An exception occured during assign_entries process: ", str(err)) + logger.error("An exception occured during assign_entries process: %s", str(err)) # Create directories if they don't exist for directory in [ @@ -81,17 +80,27 @@ def fetch_response_data( output_path=os.path.join( transformed_dir, dataset, request_id, f"{resource}.csv" ), - organisation=Organisation(os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path)), + organisation=Organisation( + os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path) + ), resource=resource, - valid_category_values = api.get_valid_category_values(dataset, pipeline), - converted_path=os.path.join(converted_dir, request_id, f"{resource}.csv"), + valid_category_values=api.get_valid_category_values(dataset, pipeline), + converted_path=os.path.join( + converted_dir, request_id, f"{resource}.csv" + ), disable_lookups=True, ) # Issue log needs severity column added, so manually added and saved here - issue_log.add_severity_column(os.path.join(specification_dir, "issue-type.csv")) - issue_log.save(os.path.join(issue_dir, dataset, request_id, resource + ".csv")) + issue_log.add_severity_column( + os.path.join(specification_dir, "issue-type.csv") + ) + issue_log.save( + os.path.join(issue_dir, dataset, request_id, resource + ".csv") + ) pipeline.save_logs( - column_field_path=os.path.join(column_field_dir, dataset, request_id, resource + ".csv"), + column_field_path=os.path.join( + column_field_dir, dataset, request_id, resource + ".csv" + ), dataset_resource_path=os.path.join( dataset_resource_dir, dataset, request_id, resource + ".csv" ), @@ -110,7 +119,13 @@ def default_output_path(command, input_path): def assign_entries( - resource_path, dataset, organisation, pipeline_dir, specification, cache_dir + resource_path, + dataset, + organisation, + pipeline_dir, + specification, + cache_dir, + endpoints=None, ): pipeline = Pipeline(pipeline_dir, dataset) resource_lookups = get_resource_unidentified_lookups( @@ -120,6 +135,7 @@ def assign_entries( pipeline=pipeline, specification=specification, org_csv_path=f"{cache_dir}/organisation.csv", + endpoints=endpoints, ) unassigned_entries = [] @@ -135,9 +151,13 @@ def assign_entries( ) lookups.load_csv() + + # Track which entries are new by checking before adding + new_entries_added = [] for new_lookup in unassigned_entries: for idx, entry in enumerate(new_lookup): lookups.add_entry(entry[0]) + new_entries_added.append(entry[0]) # save edited csvs max_entity_num = lookups.get_max_entity(pipeline.name, specification) @@ -149,66 +169,115 @@ def assign_entries( dataset ) - lookups.save_csv() + newly_assigned = lookups.save_csv() + + # Filter to return only the entries we just added + if newly_assigned: + new_lookups = [ + lookup + for lookup in newly_assigned + if any( + lookup.get("reference") == entry.get("reference") + and lookup.get("organisation") == entry.get("organisation") + for entry in new_entries_added + ) + ] + return new_lookups + + return [] def fetch_add_data_response( - collection, dataset, - organisation, + organisation_provider, pipeline_dir, - input_path, + input_dir, + output_path, specification_dir, cache_dir, url, - documentation_url, ): try: specification = Specification(specification_dir) + pipeline = Pipeline(pipeline_dir, dataset, specification=specification) + organisation = Organisation( + os.path.join(cache_dir, "organisation.csv"), Path(pipeline.path) + ) + api = API(specification=specification) + valid_category_values = api.get_valid_category_values(dataset, pipeline) - if not os.path.exists(input_path): - error_msg = f"Input path does not exist: {input_path}" - logger.error(f"ERROR: {error_msg}") - raise FileNotFoundError(error_msg) - - files_in_resource = os.listdir(input_path) - logger.info(f"Total files: {len(files_in_resource)}") + files_in_resource = os.listdir(input_dir) - new_entities = [] existing_entities = [] - lookup_path = os.path.join(pipeline_dir, "lookup.csv") + new_entities = [] + issues_log = None for idx, resource_file in enumerate(files_in_resource): - resource_file_path = os.path.join(input_path, resource_file) + resource_file_path = os.path.join(input_dir, resource_file) logger.info( f"Processing file {idx + 1}/{len(files_in_resource)}: {resource_file}" ) try: - unidentified_lookups = _add_data_read_entities( - resource_file_path, dataset, organisation, specification + # Try add data with pipeline transform to see if no entities found + issues_log = pipeline.transform( + input_path=resource_file_path, + output_path=output_path, + organisation=organisation, + organisations=[organisation_provider], + resource=resource_from_path(resource_file_path), + valid_category_values=valid_category_values, + disable_lookups=False, + endpoints=[url], ) - if not unidentified_lookups: - logger.info(f"No references found in {resource_file}") - continue - - new_lookups, existing_lookups = _check_existing_entities( - unidentified_lookups, lookup_path + existing_entities.extend( + _map_transformed_entities(output_path, pipeline_dir) ) - existing_entities.extend(existing_lookups) - - if not new_lookups: - logger.info(f"All lookups already exist for {resource_file}") - continue - newly_assigned = _assign_entity_numbers( - new_lookups, pipeline_dir, dataset, specification + # Check if there are unknown entity issues in the log + unknown_issue_types = { + "unknown entity", + "unknown entity - missing reference", + } + has_unknown = any( + row.get("issue-type") in unknown_issue_types + for row in issues_log.rows + if isinstance(row, dict) ) - new_entities.extend(newly_assigned) - logger.info( - f"Assigned {len(newly_assigned)} new entities for {resource_file}" - ) + if has_unknown: + new_lookups = assign_entries( + resource_path=resource_file_path, + dataset=dataset, + organisation=organisation_provider, + pipeline_dir=pipeline_dir, + specification=specification, + cache_dir=cache_dir, + endpoints=[url] if url else None, + ) + logger.info( + f"Found {len(new_lookups)} unidentified lookups in {resource_file}" + ) + new_entities.extend(new_lookups) + + # Reload pipeline to pick up newly saved lookups + pipeline = Pipeline( + pipeline_dir, dataset, specification=specification + ) + + # Now re-run transform to check and return issue log + issues_log = pipeline.transform( + input_path=resource_file_path, + output_path=output_path, + organisation=organisation, + organisations=[organisation_provider], + resource=resource_from_path(resource_file_path), + valid_category_values=valid_category_values, + disable_lookups=False, + endpoints=[url], + ) + else: + logger.info(f"No unidentified lookups found in {resource_file}") except Exception as err: logger.error(f"Error processing {resource_file}: {err}") @@ -219,33 +288,17 @@ def fetch_add_data_response( existing_entities ) - endpoint_summary = _validate_endpoint( - url, - pipeline_dir, - ) - source_summary = _validate_source( - documentation_url, - pipeline_dir, - collection, - organisation, - dataset, - endpoint_summary, - ) - - entity_summary = { + pipeline_summary = { "new-in-resource": len(new_entities), "existing-in-resource": len(existing_entities), "new-entities": new_entities_breakdown, "existing-entities": existing_entities_breakdown, + "pipeline-issues": [dict(issue) for issue in issues_log.rows] + if issues_log + else [], } - response_data = { - "entity-summary": entity_summary, - "endpoint-summary": endpoint_summary, - "source-summary": source_summary, - } - - return response_data + return pipeline_summary except FileNotFoundError as e: logger.exception(f"File not found: {e}") @@ -255,123 +308,6 @@ def fetch_add_data_response( raise -def _add_data_read_entities(resource_path, dataset, organisation, specification): - unidentified_lookups = [] - dataset_prefix = specification.dataset_prefix(dataset) - - try: - with open(resource_path, "r", encoding="utf-8") as f: - reader = csv.DictReader(f) - - for idx, row in enumerate(reader, start=1): - reference = row.get("reference", "").strip() - - if not reference: - logger.warning(f"Row {idx} has no reference, skipping") - continue - - lookup_entry = { - "prefix": dataset_prefix, - "organisation": organisation, - "reference": reference, - "resource": Path(resource_path).stem, - "entity": "", - } - - unidentified_lookups.append(lookup_entry) - - logger.info(f"Found {len(unidentified_lookups)} references") - - except Exception as e: - logger.error(f"Error reading resource: {e}") - raise - - return unidentified_lookups - - -def _check_existing_entities(unidentified_lookups, lookup_path): - """ - Check which lookups already exist in lookup file - """ - existing_lookup_map = {} - - if os.path.exists(lookup_path): - try: - with open(lookup_path, "r", encoding="utf-8") as f: - reader = csv.DictReader(f) - - for row in reader: - prefix = row.get("prefix", "").strip() - org = row.get("organisation", "").strip() - ref = row.get("reference", "").strip() - entity = row.get("entity", "").strip() - - if prefix and org and ref and entity: - key = f"{prefix},{org},{ref}" - existing_lookup_map[key] = {"entity": entity, "reference": ref} - - except Exception as e: - logger.error(f"Error reading lookup file: {e}") - else: - logger.info("lookup file does not exist yet") - - new_lookups = [] - existing_lookups = [] - for lookup in unidentified_lookups: - key = f"{lookup['prefix']},{lookup['organisation']},{lookup['reference']}" - - if key in existing_lookup_map: - existing_lookups.append(existing_lookup_map[key]) - else: - new_lookups.append(lookup) - - logger.info( - f"Found {len(new_lookups)} new lookups and {len(existing_lookups)} existing lookups" - ) - - return new_lookups, existing_lookups - - -def _assign_entity_numbers(new_lookups, pipeline_dir, dataset, specification): - """ - Assign entity numbers to new lookup entries and save to lookup file - """ - - lookups = Lookups(pipeline_dir) - - if not os.path.exists(lookups.lookups_path): - os.makedirs(os.path.dirname(lookups.lookups_path), exist_ok=True) - with open(lookups.lookups_path, "w", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - writer.writerow( - ["prefix", "resource", "organisation", "reference", "entity"] - ) - - lookups.load_csv() - - for lookup in new_lookups: - lookups.add_entry(lookup) - - max_entity_num = lookups.get_max_entity(dataset, specification) - logger.info(f"Max existing entity: {max_entity_num}") - - lookups.entity_num_gen.state["current"] = max_entity_num - lookups.entity_num_gen.state["range_max"] = specification.get_dataset_entity_max( - dataset - ) - lookups.entity_num_gen.state["range_min"] = specification.get_dataset_entity_min( - dataset - ) - - logger.info( - f"Entity range: {lookups.entity_num_gen.state['range_min']} - {lookups.entity_num_gen.state['range_max']}" - ) - - newly_assigned = lookups.save_csv() - - return newly_assigned - - def _get_entities_breakdown(new_entities): """ Convert newly assigned entities to the breakdown format for response. @@ -418,133 +354,54 @@ def _get_existing_entities_breakdown(existing_entities): return breakdown -def _validate_endpoint(url, pipeline_dir): - endpoint_csv_path = os.path.join(pipeline_dir, "endpoint.csv") - if not url: - logger.info("No endpoint URL provided") - return {} +def _map_transformed_entities(transformed_csv_path, pipeline_dir): # noqa: C901 + """Extract unique entities from transformed CSV and lookup their details in lookup.csv.""" - if not os.path.exists(endpoint_csv_path): - os.makedirs(os.path.dirname(endpoint_csv_path), exist_ok=True) - with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - writer.writerow( - [ - "endpoint", - "endpoint-url", - "parameters", - "plugin", - "entry-date", - "start-date", - "end-date", - ] - ) + mapped_entities = [] - endpoint_exists = False - existing_entry = None + if not os.path.exists(transformed_csv_path): + logger.warning(f"Transformed CSV not found: {transformed_csv_path}") + return mapped_entities + # Extract unique entity values from transformed CSV + unique_entities = set() try: - with open(endpoint_csv_path, "r", encoding="utf-8") as f: + with open(transformed_csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: - if row.get("endpoint-url", "").strip() == url.strip(): - endpoint_exists = True - existing_entry = { - "endpoint": row.get("endpoint", ""), - "endpoint-url": row.get("endpoint-url", ""), - "parameters": row.get("parameters", ""), - "plugin": row.get("plugin", ""), - "entry-date": row.get("entry-date", ""), - "start-date": row.get("start-date", ""), - "end-date": row.get("end-date", ""), - } - logger.info("Endpoint URL found in endpoint.csv") - break - except Exception as e: - logger.error(f"Error reading endpoint.csv: {e}") - - endpoint_summary = {"endpoint_url_in_endpoint_csv": endpoint_exists} - - if endpoint_exists and existing_entry: - endpoint_summary["existing_endpoint_entry"] = existing_entry - - else: - current_date = datetime.now().strftime("%Y-%m-%d") - entry_date = datetime.now().isoformat() - - endpoint_key, new_endpoint_row = append_endpoint( - endpoint_csv_path=endpoint_csv_path, - endpoint_url=url, - entry_date=entry_date, - start_date=current_date, - end_date="", - ) - - if new_endpoint_row: - logger.info(f"Appended new endpoint with hash: {endpoint_key}") - endpoint_summary["new_endpoint_entry"] = new_endpoint_row - - return endpoint_summary - - -def _validate_source( - documentation_url, pipeline_dir, collection, organisation, dataset, endpoint_summary -): - source_csv_path = os.path.join(pipeline_dir, "source.csv") - - endpoint_key = endpoint_summary.get("existing_endpoint_entry", {}).get( - "endpoint" - ) or endpoint_summary.get("new_endpoint_entry", {}).get("endpoint") - if not endpoint_key: - logger.warning("No endpoint_key available from endpoint_summary") - return {} - - if not documentation_url: - logger.warning("No documentation URL provided") - - current_date = datetime.now().strftime("%Y-%m-%d") - entry_date = datetime.now().isoformat() - - source_key_returned, new_source_row = append_source( - source_csv_path=source_csv_path, - collection=collection, - organisation=organisation, - endpoint_key=endpoint_key, - attribution="", - documentation_url=documentation_url or "", - licence="", - pipelines=dataset, - entry_date=entry_date, - start_date=current_date, - end_date="", - ) - - if new_source_row: - return { - "documentation_url_in_source_csv": False, - "new_source_entry": new_source_row, - } - - source_summary = {"documentation_url_in_source_csv": True} - try: - with open(source_csv_path, "r", encoding="utf-8") as f: - for row in csv.DictReader(f): - if row.get("source", "").strip() == source_key_returned: - source_summary["existing_source_entry"] = { - "source": row.get("source", ""), - "attribution": row.get("attribution", ""), - "collection": row.get("collection", ""), - "documentation-url": row.get("documentation-url", ""), - "endpoint": row.get("endpoint", ""), - "licence": row.get("licence", ""), - "organisation": row.get("organisation", ""), - "pipelines": row.get("pipelines", ""), - "entry-date": row.get("entry-date", ""), - "start-date": row.get("start-date", ""), - "end-date": row.get("end-date", ""), - } - break + entity_val = row.get("entity", "").strip() + if entity_val: # Skip empty entities + unique_entities.add(entity_val) except Exception as e: - logger.error(f"Error reading existing source: {e}") + logger.error(f"Error reading transformed CSV: {e}") + return mapped_entities + + if not unique_entities: + return mapped_entities + + # Load lookup.csv to get entity details + lookup_path = os.path.join(pipeline_dir, "lookup.csv") + if not os.path.exists(lookup_path): + logger.warning(f"Lookup CSV not found: {lookup_path}") + return mapped_entities + + entity_lookup_map = {} + with open(lookup_path, "r", encoding="utf-8") as f: + for row in csv.DictReader(f): + entity_lookup_map[str(row.get("entity", ""))] = row + + # Map entities to their full details + for entity_id in unique_entities: + row = entity_lookup_map.get(entity_id, {}) + if row: # Only add if found in lookup + mapped_entities.append( + { + "entity": entity_id, + "reference": row.get("reference", ""), + "prefix": row.get("prefix", ""), + "resource": row.get("resource", ""), + "organisation": row.get("organisation", ""), + } + ) - return source_summary + return mapped_entities diff --git a/request-processor/src/application/core/utils.py b/request-processor/src/application/core/utils.py index ba4d2042..a54ae59a 100644 --- a/request-processor/src/application/core/utils.py +++ b/request-processor/src/application/core/utils.py @@ -126,7 +126,12 @@ def hash_md5(value): def append_endpoint( - endpoint_csv_path, endpoint_url, entry_date=None, start_date=None, end_date=None + endpoint_csv_path, + endpoint_url, + entry_date=None, + start_date=None, + end_date=None, + plugin=None, ): endpoint_key = hash_sha256(endpoint_url) exists = False @@ -162,7 +167,7 @@ def append_endpoint( "endpoint": endpoint_key, "endpoint-url": endpoint_url, "parameters": "", - "plugin": "", + "plugin": plugin or "", "entry-date": _formatted_date(entry_date) or datetime.now().date().isoformat(), "start-date": _formatted_date(start_date), @@ -281,3 +286,147 @@ def _formatted_date(date_value): except Exception: pass return str(date_value) + + +def validate_endpoint(url, config_dir, plugin, start_date=None): + """Validate if endpoint URL exists in endpoint.csv and create entry if not.""" + endpoint_csv_path = os.path.join(config_dir, "endpoint.csv") + if not url: + logger.info("No endpoint URL provided") + return {} + + if not os.path.exists(endpoint_csv_path): + os.makedirs(os.path.dirname(endpoint_csv_path), exist_ok=True) + with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow( + [ + "endpoint", + "endpoint-url", + "parameters", + "plugin", + "entry-date", + "start-date", + "end-date", + ] + ) + + endpoint_exists = False + existing_entry = None + + try: + with open(endpoint_csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + if row.get("endpoint-url", "").strip() == url.strip(): + endpoint_exists = True + existing_entry = { + "endpoint": row.get("endpoint", ""), + "endpoint-url": row.get("endpoint-url", ""), + "parameters": row.get("parameters", ""), + "plugin": row.get("plugin", ""), + "entry-date": row.get("entry-date", ""), + "start-date": row.get("start-date", ""), + "end-date": row.get("end-date", ""), + } + logger.info("Endpoint URL found in endpoint.csv") + break + except Exception as e: + logger.error(f"Error reading endpoint.csv: {e}") + + endpoint_summary = {"endpoint_url_in_endpoint_csv": endpoint_exists} + + if endpoint_exists and existing_entry: + endpoint_summary["existing_endpoint_entry"] = existing_entry + + else: + if not start_date: + start_date = datetime.now().strftime("%Y-%m-%d") + entry_date = datetime.now().isoformat() + + endpoint_key, new_endpoint_row = append_endpoint( + endpoint_csv_path=endpoint_csv_path, + endpoint_url=url, + entry_date=entry_date, + start_date=start_date, + end_date="", + plugin=plugin, + ) + + if new_endpoint_row: + logger.info(f"Appended new endpoint with hash: {endpoint_key}") + endpoint_summary["new_endpoint_entry"] = new_endpoint_row + + return endpoint_summary + + +def validate_source( + documentation_url, + config_dir, + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, +): + """Validate if source exists in source.csv and create entry if not.""" + source_csv_path = os.path.join(config_dir, "source.csv") + + endpoint_key = endpoint_summary.get("existing_endpoint_entry", {}).get( + "endpoint" + ) or endpoint_summary.get("new_endpoint_entry", {}).get("endpoint") + if not endpoint_key: + logger.warning("No endpoint_key available from endpoint_summary") + return {} + + if not documentation_url: + logger.warning("No documentation URL provided") + + if not start_date: + start_date = datetime.now().strftime("%Y-%m-%d") + entry_date = datetime.now().isoformat() + + source_key_returned, new_source_row = append_source( + source_csv_path=source_csv_path, + collection=collection, + organisation=organisation, + endpoint_key=endpoint_key, + attribution="", + documentation_url=documentation_url or "", + licence=licence or "", + pipelines=dataset, + entry_date=entry_date, + start_date=start_date, + end_date="", + ) + + if new_source_row: + return { + "documentation_url_in_source_csv": False, + "new_source_entry": new_source_row, + } + + source_summary = {"documentation_url_in_source_csv": True} + try: + with open(source_csv_path, "r", encoding="utf-8") as f: + for row in csv.DictReader(f): + if row.get("source", "").strip() == source_key_returned: + source_summary["existing_source_entry"] = { + "source": row.get("source", ""), + "attribution": row.get("attribution", ""), + "collection": row.get("collection", ""), + "documentation-url": row.get("documentation-url", ""), + "endpoint": row.get("endpoint", ""), + "licence": row.get("licence", ""), + "organisation": row.get("organisation", ""), + "pipelines": row.get("pipelines", ""), + "entry-date": row.get("entry-date", ""), + "start-date": row.get("start-date", ""), + "end-date": row.get("end-date", ""), + } + break + except Exception as e: + logger.error(f"Error reading existing source: {e}") + + return source_summary diff --git a/request-processor/src/application/core/workflow.py b/request-processor/src/application/core/workflow.py index 001dc487..ac8aa074 100644 --- a/request-processor/src/application/core/workflow.py +++ b/request-processor/src/application/core/workflow.py @@ -5,7 +5,12 @@ import urllib import yaml from urllib.error import HTTPError -from application.core.utils import detect_encoding, extract_dataset_field_rows +from application.core.utils import ( + detect_encoding, + extract_dataset_field_rows, + validate_endpoint, + validate_source, +) from application.logging.logger import get_logger from application.core.pipeline import ( fetch_response_data, @@ -31,9 +36,9 @@ def run_workflow( directories, ): additional_concats = None + response_data = {} try: - response_data = {} # pipeline directory structure & download pipeline_dir = os.path.join(directories.PIPELINE_DIR, dataset, request_id) @@ -412,49 +417,153 @@ def add_data_workflow( request_id, collection, dataset, - organisation, + organisation_provider, url, documentation_url, directories, + licence=None, + start_date=None, + plugin=None, ): - pipeline_dir = os.path.join(directories.PIPELINE_DIR, collection, request_id) - logger.info(f"pipeline_dir is : {pipeline_dir}") - input_path = os.path.join(directories.COLLECTION_DIR, "resource", request_id) - file_path = os.path.join(input_path, file_name) - resource = resource_from_path(file_path) - logger.info(f"resource is : {resource}") - fetch_csv = fetch_add_data_csvs(collection, pipeline_dir) - logger.info(f"files fetched are : {fetch_csv}") - - response_data = fetch_add_data_response( - collection, - dataset, - organisation, - pipeline_dir, - input_path, - directories.SPECIFICATION_DIR, - directories.CACHE_DIR, - url, - documentation_url, - ) - logger.info(f"add data response is : {response_data}") + """ + Setup directories and download required CSVs to manage add-data pipeline + Invoke fetch_add_data_response + Create source csv and endpoint csv summaries + Clean up directories + + Args: + file_name (str): Collection resource file name + request_id (str): Unique request identifier + collection (str): Collection name (e.g. 'article-4-direction') + dataset (str): Dataset name (e.g. 'article-4-direction-area') + organisation_provider (str): Organisation code providing the data + url (str): Endpoint URL to fetch data from + documentation_url (str): Documentation URL for the dataset + directories (Directories): Directories object with required paths + """ + response_data = {} + + try: + pipeline_dir = os.path.join(directories.PIPELINE_DIR, collection, request_id) + input_dir = os.path.join(directories.COLLECTION_DIR, "resource", request_id) + collection_dir = os.path.join(directories.COLLECTION_DIR, request_id) + output_path = os.path.join(directories.TRANSFORMED_DIR, request_id, file_name) + if not os.path.exists(output_path): + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # Loads csvs for Pipeline and Config + if not fetch_add_data_pipeline_csvs(collection, pipeline_dir): + response_data[ + "message" + ] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'" + return response_data + if not fetch_add_data_collection_csvs(collection, collection_dir): + response_data[ + "message" + ] = f"Unable to find lookups for collection '{collection}', dataset '{dataset}'" + return response_data + + # All processes arount transforming the data and generating pipeline summary + pipeline_summary = fetch_add_data_response( + dataset=dataset, + organisation_provider=organisation_provider, + pipeline_dir=pipeline_dir, + input_dir=input_dir, + output_path=output_path, + specification_dir=directories.SPECIFICATION_DIR, + cache_dir=directories.CACHE_DIR, + url=url, + ) + + # Create endpoint and source summaries in workflow + endpoint_summary = validate_endpoint( + url, + collection_dir, + plugin, + start_date=start_date, + ) + source_summary = validate_source( + documentation_url, + collection_dir, + collection, + organisation_provider, + dataset, + endpoint_summary, + start_date=start_date, + licence=licence, + ) + + response_data = { + "pipeline-summary": pipeline_summary, + "endpoint-summary": endpoint_summary, + "source-summary": source_summary, + } + + logger.info(f"add data response is for id {request_id} : {response_data}") + + except Exception as e: + logger.warning( + f"An error occurred in add_data_workflow: {e} for request id {request_id}" + ) + response_data["message"] = f"An error occurred in add_data_workflow: {e}" + + finally: + clean_up( + request_id, + os.path.join(directories.COLLECTION_DIR, "resource", request_id), + os.path.join(directories.COLLECTION_DIR, request_id), + directories.COLLECTION_DIR, + os.path.join(directories.TRANSFORMED_DIR, request_id), + directories.TRANSFORMED_DIR, + os.path.join(directories.PIPELINE_DIR, collection), + directories.PIPELINE_DIR, + ) return response_data -def fetch_add_data_csvs(collection, pipeline_dir): +def fetch_add_data_pipeline_csvs(collection, pipeline_dir): + """Download pipeline CSVs into pipeline_dir. Returns False if any errors occur.""" os.makedirs(pipeline_dir, exist_ok=True) - add_data_csvs = ["lookup.csv", "endpoint.csv", "source.csv"] - fetched_files = [] - for csv_name in add_data_csvs: + pipeline_csvs = [ + "column.csv", + "combine.csv", + "concat.csv", + "convert.csv", + "default-value.csv", + "default.csv", + "entity-organisation.csv", + "expect.csv", + "filter.csv", + "lookup.csv", + "old-entity.csv", + "patch.csv", + "skip.csv", + "transform.csv", + ] + for csv_name in pipeline_csvs: csv_path = os.path.join(pipeline_dir, csv_name) - if csv_name == "lookup.csv": - url = f"{CONFIG_URL}pipeline/{collection}/{csv_name}" - else: - url = f"{CONFIG_URL}collection/{collection}/{csv_name}" + url = f"{CONFIG_URL}pipeline/{collection}/{csv_name}" + try: + urllib.request.urlretrieve(url, csv_path) + logger.info(f"Downloaded {csv_name} from {url} to {csv_path}") + except HTTPError as e: + logger.warning(f"Failed to retrieve {csv_name}: {e}") + return False + return True + + +def fetch_add_data_collection_csvs(collection, config_dir): + """Download config CSVs (endpoint.csv, source.csv) into config_dir. Returns False if any errors occur.""" + os.makedirs(config_dir, exist_ok=True) + config_csvs = ["endpoint.csv", "source.csv"] + for csv_name in config_csvs: + csv_path = os.path.join(config_dir, csv_name) + url = f"{CONFIG_URL}collection/{collection}/{csv_name}" try: urllib.request.urlretrieve(url, csv_path) logger.info(f"Downloaded {csv_name} from {url} to {csv_path}") except HTTPError as e: - logger.warning(f"Failed to retrieve {csv_name}: {e}.") - return fetched_files + logger.warning(f"Failed to retrieve {csv_name}: {e}") + return False + return True diff --git a/request-processor/src/tasks.py b/request-processor/src/tasks.py index d94ab592..054af9e6 100644 --- a/request-processor/src/tasks.py +++ b/request-processor/src/tasks.py @@ -262,6 +262,9 @@ def add_data_task(request: Dict, directories=None): directories.COLLECTION_DIR, "resource", request_schema.id ) file_name, log = _fetch_resource(resource_dir, request_data.url) + # Auto detect plugin needs to update request_data.plugin for downstream processing + if "plugin" in log: + request_data.plugin = log["plugin"] logger.info( f"file name from fetch resource is : {file_name} and the log from fetch resource is {log}" ) @@ -275,6 +278,9 @@ def add_data_task(request: Dict, directories=None): request_data.url, request_data.documentation_url, directories, + request_data.licence, + request_data.start_date, + request_data.plugin, ) if "plugin" in log: response["plugin"] = log["plugin"] @@ -385,6 +391,12 @@ def _get_response(request_id): def save_response_to_db(request_id, response_data): + """Currently handles three types of response_data: + 1. Full check data workflow response with 'converted-csv', 'issue-log', etc. + 2. Full add data workflow pipeline summary response with 'pipeline-summary'. + 3. Error log with 'message'. + Saves appropriately to Response and ResponseDetails tables. + """ logger.info(f"save_response_to_db started for request_id: {request_id}") db_session = database.session_maker() with db_session() as session: @@ -448,7 +460,7 @@ def save_response_to_db(request_id, response_data): # Commit the changes to the database session.commit() - elif "entity-summary" in response_data: + elif "pipeline-summary" in response_data: new_response = models.Response( request_id=request_id, data=response_data ) diff --git a/request-processor/tests/data/specification/dataset-field.csv b/request-processor/tests/data/specification/dataset-field.csv index e054d909..1d194236 100644 --- a/request-processor/tests/data/specification/dataset-field.csv +++ b/request-processor/tests/data/specification/dataset-field.csv @@ -1,4 +1,4 @@ dataset,field,field-dataset,guidance,hint -tree,entry-date,,, -tree,geometry,,, -tree,reference,,, +brownfield-land,entry-date,,, +brownfield-land,geometry,,, +brownfield-land,reference,,, diff --git a/request-processor/tests/unit/src/application/core/test_pipeline.py b/request-processor/tests/unit/src/application/core/test_pipeline.py index 43257222..fea694eb 100644 --- a/request-processor/tests/unit/src/application/core/test_pipeline.py +++ b/request-processor/tests/unit/src/application/core/test_pipeline.py @@ -4,13 +4,8 @@ from unittest.mock import MagicMock from src.application.core.pipeline import ( fetch_add_data_response, - _add_data_read_entities, - _check_existing_entities, - _assign_entity_numbers, _get_entities_breakdown, _get_existing_entities_breakdown, - _validate_endpoint, - _validate_source, ) @@ -18,13 +13,11 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path): """Test successful execution of fetch_add_data_response""" dataset = "test-dataset" organisation = "test-org" - collection = "test-collection" pipeline_dir = tmp_path / "pipeline" input_path = tmp_path / "resource" specification_dir = tmp_path / "specification" cache_dir = tmp_path / "cache" url = "http://example.com/endpoint" - documentation_url = "http://example.com/doc" input_path.mkdir(parents=True) pipeline_dir.mkdir(parents=True) @@ -64,43 +57,33 @@ def test_fetch_add_data_response_success(monkeypatch, tmp_path): monkeypatch.setattr( "src.application.core.pipeline.Lookups", lambda x: mock_lookups_instance ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_endpoint", - lambda url, dir: {"endpoint_url_in_endpoint_csv": True}, - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_source", - lambda *a, **k: {"documentation_url_in_source_csv": True}, - ) + monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock()) + monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock()) result = fetch_add_data_response( - collection=collection, dataset=dataset, - organisation=organisation, + organisation_provider=organisation, pipeline_dir=str(pipeline_dir), - input_path=str(input_path), + input_dir=str(input_path), + output_path=str(input_path / "output.csv"), specification_dir=str(specification_dir), cache_dir=str(cache_dir), url=url, - documentation_url=documentation_url, ) - assert "entity-summary" in result - assert "new-in-resource" in result["entity-summary"] - assert "existing-in-resource" in result["entity-summary"] + assert "new-in-resource" in result + assert "existing-in-resource" in result def test_fetch_add_data_response_no_files(monkeypatch, tmp_path): """Test when input directory has no files""" dataset = "test-dataset" organisation = "test-org" - collection = "test-collection" pipeline_dir = tmp_path / "pipeline" input_path = tmp_path / "resource" specification_dir = tmp_path / "specification" cache_dir = tmp_path / "cache" url = "http://example.com/endpoint" - documentation_url = "http://example.com/doc" input_path.mkdir(parents=True) pipeline_dir.mkdir(parents=True) @@ -109,43 +92,33 @@ def test_fetch_add_data_response_no_files(monkeypatch, tmp_path): monkeypatch.setattr( "src.application.core.pipeline.Specification", lambda x: mock_spec ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_endpoint", - lambda url, dir: {"endpoint_url_in_endpoint_csv": True}, - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_source", - lambda *a, **k: {"documentation_url_in_source_csv": True}, - ) + monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock()) + monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock()) result = fetch_add_data_response( - collection=collection, dataset=dataset, - organisation=organisation, + organisation_provider=organisation, pipeline_dir=str(pipeline_dir), - input_path=str(input_path), + input_dir=str(input_path), + output_path=str(input_path / "output.csv"), specification_dir=str(specification_dir), cache_dir=str(cache_dir), url=url, - documentation_url=documentation_url, ) - assert "entity-summary" in result - assert result["entity-summary"]["new-in-resource"] == 0 + assert "new-in-resource" in result + assert result["new-in-resource"] == 0 def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path): """Test when input path does not exist""" dataset = "test-dataset" organisation = "test-org" - collection = "test-collection" - request_id = "req-003" pipeline_dir = tmp_path / "pipeline" input_path = tmp_path / "nonexistent" specification_dir = tmp_path / "specification" cache_dir = tmp_path / "cache" url = "http://example.com/endpoint" - documentation_url = "http://example.com/doc" pipeline_dir.mkdir(parents=True) @@ -153,117 +126,31 @@ def test_fetch_add_data_response_file_not_found(monkeypatch, tmp_path): monkeypatch.setattr( "src.application.core.pipeline.Specification", lambda x: mock_spec ) + monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock()) + monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock()) with pytest.raises(FileNotFoundError): fetch_add_data_response( - collection=collection, dataset=dataset, - organisation=organisation, + organisation_provider=organisation, pipeline_dir=str(pipeline_dir), - input_path=str(input_path), + input_dir=str(input_path), + output_path=str(input_path / "output.csv"), specification_dir=str(specification_dir), cache_dir=str(cache_dir), url=url, - documentation_url=documentation_url, ) -def test_fetch_add_data_response_with_existing_entities(monkeypatch, tmp_path): - """Test processing with mix of new and existing entities""" - dataset = "test-dataset" - organisation = "test-org" - collection = "test-collection" - request_id = "req-005" - pipeline_dir = tmp_path / "pipeline" - input_path = tmp_path / "resource" - specification_dir = tmp_path / "specification" - cache_dir = tmp_path / "cache" - url = "http://example.com/endpoint" - documentation_url = "http://example.com/doc" - - input_path.mkdir(parents=True) - pipeline_dir.mkdir(parents=True) - - test_file = input_path / "test.csv" - test_file.write_text("reference\nREF001\nREF002\nREF003") - - lookup_file = pipeline_dir / "lookup.csv" - lookup_file.write_text( - "prefix,resource,organisation,reference,entity\n" - "test-prefix,test,test-org,REF001,1000001\n" - ) - - mock_spec = MagicMock() - mock_spec.dataset_prefix.return_value = "test-prefix" - mock_spec.get_dataset_entity_min.return_value = 1000000 - mock_spec.get_dataset_entity_max.return_value = 9999999 - - mock_lookups_instance = MagicMock() - mock_lookups_instance.lookups_path = str(lookup_file) - mock_lookups_instance.get_max_entity.return_value = 1000001 - mock_lookups_instance.save_csv.return_value = [ - { - "prefix": "test-prefix", - "organisation": "test-org", - "reference": "REF002", - "entity": "1000002", - "resource": "test", - }, - { - "prefix": "test-prefix", - "organisation": "test-org", - "reference": "REF003", - "entity": "1000003", - "resource": "test", - }, - ] - mock_lookups_instance.entity_num_gen = MagicMock() - mock_lookups_instance.entity_num_gen.state = {} - - monkeypatch.setattr( - "src.application.core.pipeline.Specification", lambda x: mock_spec - ) - monkeypatch.setattr( - "src.application.core.pipeline.Lookups", lambda x: mock_lookups_instance - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_endpoint", - lambda url, dir: {"endpoint_url_in_endpoint_csv": True}, - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_source", - lambda *a, **k: {"documentation_url_in_source_csv": True}, - ) - - result = fetch_add_data_response( - collection=collection, - dataset=dataset, - organisation=organisation, - pipeline_dir=str(pipeline_dir), - input_path=str(input_path), - specification_dir=str(specification_dir), - cache_dir=str(cache_dir), - url=url, - documentation_url=documentation_url, - ) - - assert "entity-summary" in result - assert result["entity-summary"]["new-in-resource"] == 2 - assert result["entity-summary"]["existing-in-resource"] == 1 - - def test_fetch_add_data_response_handles_processing_error(monkeypatch, tmp_path): """Test handling of errors during file processing""" dataset = "test-dataset" organisation = "test-org" - collection = "test-collection" - request_id = "req-006" pipeline_dir = tmp_path / "pipeline" input_path = tmp_path / "resource" specification_dir = tmp_path / "specification" cache_dir = tmp_path / "cache" url = "http://example.com/endpoint" - documentation_url = "http://example.com/doc" input_path.mkdir(parents=True) pipeline_dir.mkdir(parents=True) @@ -277,228 +164,25 @@ def test_fetch_add_data_response_handles_processing_error(monkeypatch, tmp_path) monkeypatch.setattr( "src.application.core.pipeline.Specification", lambda x: mock_spec ) + monkeypatch.setattr("src.application.core.pipeline.Pipeline", MagicMock()) + monkeypatch.setattr("src.application.core.pipeline.Organisation", MagicMock()) def raise_exception(*args, **kwargs): raise Exception("Processing error") - monkeypatch.setattr( - "src.application.core.pipeline._add_data_read_entities", raise_exception - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_endpoint", - lambda url, dir: {"endpoint_url_in_endpoint_csv": True}, - ) - monkeypatch.setattr( - "src.application.core.pipeline._validate_source", - lambda *a, **k: {"documentation_url_in_source_csv": True}, - ) - result = fetch_add_data_response( - collection=collection, dataset=dataset, - organisation=organisation, + organisation_provider=organisation, pipeline_dir=str(pipeline_dir), - input_path=str(input_path), + input_dir=str(input_path), + output_path=str(input_path / "output.csv"), specification_dir=str(specification_dir), cache_dir=str(cache_dir), url=url, - documentation_url=documentation_url, - ) - - assert "entity-summary" in result - assert result["entity-summary"]["new-in-resource"] == 0 - - -def test_add_data_read_entities_success(tmp_path): - """Test reading entities from resource file""" - resource_file = tmp_path / "test.csv" - resource_file.write_text("reference,name\nREF001,Test1\nREF002,Test2") - - mock_spec = MagicMock() - mock_spec.dataset_prefix.return_value = "test-prefix" - - result = _add_data_read_entities( - str(resource_file), "test-dataset", "test-org", mock_spec - ) - - assert len(result) == 2 - assert result[0]["reference"] == "REF001" - assert result[0]["prefix"] == "test-prefix" - assert result[0]["organisation"] == "test-org" - assert result[1]["reference"] == "REF002" - - -def test_add_data_read_entities_skip_empty_reference(tmp_path): - """Test skipping rows with empty reference""" - resource_file = tmp_path / "test.csv" - resource_file.write_text("reference,name\nREF001,Test1\n,Test2\nREF003,Test3") - - mock_spec = MagicMock() - mock_spec.dataset_prefix.return_value = "test-prefix" - - result = _add_data_read_entities( - str(resource_file), "test-dataset", "test-org", mock_spec - ) - - assert len(result) == 2 - assert result[0]["reference"] == "REF001" - assert result[1]["reference"] == "REF003" - - -def test_add_data_read_entities_file_error(): - """Test handling of file read errors""" - mock_spec = MagicMock() - - with pytest.raises(Exception): - _add_data_read_entities( - "/nonexistent/file.csv", "test-dataset", "test-org", mock_spec - ) - - -def test_check_existing_entities_all_new(tmp_path): - """Test when all lookups are new""" - unidentified_lookups = [ - {"prefix": "p1", "organisation": "org1", "reference": "REF001", "entity": ""}, - {"prefix": "p1", "organisation": "org1", "reference": "REF002", "entity": ""}, - ] - - lookup_file = tmp_path / "lookup.csv" - lookup_file.write_text("prefix,organisation,reference,entity\n") - - new_lookups, existing_lookups = _check_existing_entities( - unidentified_lookups, str(lookup_file) - ) - - assert len(new_lookups) == 2 - assert len(existing_lookups) == 0 - - -def test_check_existing_entities_some_existing(tmp_path): - """Test when some lookups already exist""" - unidentified_lookups = [ - {"prefix": "p1", "organisation": "org1", "reference": "REF001", "entity": ""}, - {"prefix": "p1", "organisation": "org1", "reference": "REF002", "entity": ""}, - ] - - lookup_file = tmp_path / "lookup.csv" - lookup_file.write_text( - "prefix,organisation,reference,entity\n" "p1,org1,REF001,1000001\n" - ) - - new_lookups, existing_lookups = _check_existing_entities( - unidentified_lookups, str(lookup_file) - ) - - assert len(new_lookups) == 1 - assert new_lookups[0]["reference"] == "REF002" - assert len(existing_lookups) == 1 - assert existing_lookups[0]["entity"] == "1000001" - - -def test_check_existing_entities_no_lookup_file(): - """Test when lookup file doesn't exist""" - unidentified_lookups = [ - {"prefix": "p1", "organisation": "org1", "reference": "REF001", "entity": ""} - ] - - new_lookups, existing_lookups = _check_existing_entities( - unidentified_lookups, "/nonexistent/lookup.csv" - ) - - assert len(new_lookups) == 1 - assert len(existing_lookups) == 0 - - -def test_assign_entity_numbers_creates_lookup_file(monkeypatch, tmp_path): - """Test creating new lookup file for assigning entities""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - - new_lookups = [ - { - "prefix": "p1", - "organisation": "org1", - "reference": "REF001", - "resource": "res1", - "entity": "", - } - ] - - mock_lookups = MagicMock() - mock_lookups.lookups_path = str(pipeline_dir / "lookup.csv") - mock_lookups.get_max_entity.return_value = 1000000 - mock_lookups.save_csv.return_value = [ - { - "prefix": "p1", - "organisation": "org1", - "reference": "REF001", - "entity": "1000001", - "resource": "res1", - } - ] - mock_lookups.entity_num_gen = MagicMock() - mock_lookups.entity_num_gen.state = {} - - mock_spec = MagicMock() - mock_spec.get_dataset_entity_min.return_value = 1000000 - mock_spec.get_dataset_entity_max.return_value = 9999999 - - monkeypatch.setattr("src.application.core.pipeline.Lookups", lambda x: mock_lookups) - - result = _assign_entity_numbers( - new_lookups, str(pipeline_dir), "test-dataset", mock_spec - ) - - assert len(result) == 1 - assert result[0]["entity"] == "1000001" - mock_lookups.load_csv.assert_called_once() - mock_lookups.add_entry.assert_called_once() - mock_lookups.save_csv.assert_called_once() - - -def test_assign_entity_numbers_updates_existing_file(monkeypatch, tmp_path): - """Test updating existing lookup file""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - lookup_file = pipeline_dir / "lookup.csv" - lookup_file.write_text("prefix,resource,organisation,reference,entity\n") - - new_lookups = [ - { - "prefix": "p1", - "organisation": "org1", - "reference": "REF001", - "resource": "res1", - } - ] - - mock_lookups = MagicMock() - mock_lookups.lookups_path = str(lookup_file) - mock_lookups.get_max_entity.return_value = 1000005 - mock_lookups.save_csv.return_value = [ - { - "prefix": "p1", - "organisation": "org1", - "reference": "REF001", - "entity": "1000006", - "resource": "res1", - } - ] - mock_lookups.entity_num_gen = MagicMock() - mock_lookups.entity_num_gen.state = {} - - mock_spec = MagicMock() - mock_spec.get_dataset_entity_min.return_value = 1000000 - mock_spec.get_dataset_entity_max.return_value = 9999999 - - monkeypatch.setattr("src.application.core.pipeline.Lookups", lambda x: mock_lookups) - - result = _assign_entity_numbers( - new_lookups, str(pipeline_dir), "test-dataset", mock_spec ) - assert len(result) == 1 - assert mock_lookups.entity_num_gen.state["current"] == 1000005 + assert "new-in-resource" in result + assert result["new-in-resource"] == 0 def test_get_entities_breakdown_success(): @@ -598,507 +282,3 @@ def test_get_existing_entities_breakdown_filters_empty_values(): assert len(result) == 2 assert result[0]["entity"] == "1000001" assert result[1]["entity"] == "1000004" - - -def test_validate_endpoint_creates_file(monkeypatch, tmp_path): - """Test that _validate_endpoint creates endpoint.csv if it doesn't exist""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - url = "http://example.com/endpoint" - endpoint_csv_path = pipeline_dir / "endpoint.csv" - - def fake_append_endpoint( - endpoint_csv_path, endpoint_url, entry_date, start_date, end_date - ): - return "endpoint_hash", { - "endpoint": "endpoint_hash", - "endpoint-url": endpoint_url, - "parameters": "", - "plugin": "", - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_endpoint", fake_append_endpoint - ) - - assert not endpoint_csv_path.exists() - - _validate_endpoint(url, str(pipeline_dir)) - - assert endpoint_csv_path.exists() - - with open(endpoint_csv_path, "r", encoding="utf-8") as f: - reader = csv.reader(f) - headers = next(reader) - assert headers == [ - "endpoint", - "endpoint-url", - "parameters", - "plugin", - "entry-date", - "start-date", - "end-date", - ] - - -def test_validate_endpoint_appends(monkeypatch, tmp_path): - """Test that _validate_endpoint appends new endpoint when URL not found""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - url = "http://example.com/new-endpoint" - endpoint_csv_path = pipeline_dir / "endpoint.csv" - - with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: - writer = csv.DictWriter( - f, - fieldnames=[ - "endpoint", - "endpoint-url", - "parameters", - "plugin", - "entry-date", - "start-date", - "end-date", - ], - ) - writer.writeheader() - writer.writerow( - { - "endpoint": "existing_hash", - "endpoint-url": "http://example.com/existing", - "parameters": "", - "plugin": "", - "entry-date": "2024-01-01T00:00:00", - "start-date": "2024-01-01", - "end-date": "", - } - ) - - def fake_append_endpoint( - endpoint_csv_path, endpoint_url, entry_date, start_date, end_date - ): - return "new_endpoint_hash", { - "endpoint": "new_endpoint_hash", - "endpoint-url": endpoint_url, - "parameters": "", - "plugin": "", - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_endpoint", fake_append_endpoint - ) - - result = _validate_endpoint(url, str(pipeline_dir)) - - assert result["endpoint_url_in_endpoint_csv"] is False - assert "new_endpoint_entry" in result - assert result["new_endpoint_entry"]["endpoint"] == "new_endpoint_hash" - assert result["new_endpoint_entry"]["endpoint-url"] == url - - -def test_validate_endpoint_finds_existing(monkeypatch, tmp_path): - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - url = "http://example.com/endpoint" - endpoint_csv_path = pipeline_dir / "endpoint.csv" - with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: - writer = csv.DictWriter( - f, - fieldnames=[ - "endpoint", - "endpoint-url", - "parameters", - "plugin", - "entry-date", - "start-date", - "end-date", - ], - ) - writer.writeheader() - writer.writerow( - { - "endpoint": "endpoint_hash", - "endpoint-url": url, - "parameters": "", - "plugin": "", - "entry-date": "2024-01-01", - "start-date": "2024-01-01", - "end-date": "", - } - ) - - monkeypatch.setattr( - "src.application.core.pipeline.append_endpoint", - lambda *a, **kw: (_ for _ in ()).throw(Exception("Should not be called")), - ) - - result = _validate_endpoint(url, str(pipeline_dir)) - assert result["endpoint_url_in_endpoint_csv"] is True - assert "existing_endpoint_entry" in result - assert result["existing_endpoint_entry"]["endpoint-url"] == url - - -def test_validate_endpoint_empty_url(monkeypatch, tmp_path): - """Test _validate_endpoint with empty URL""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - - result = _validate_endpoint("", str(pipeline_dir)) - - assert result == {} - - -def test_validate_endpoint_csv_read_error(monkeypatch, tmp_path): - """Test _validate_endpoint when reading CSV fails""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - url = "http://example.com/endpoint" - - endpoint_csv_path = pipeline_dir / "endpoint.csv" - endpoint_csv_path.write_bytes(b"\x00\x00\x00") - - def fake_append_endpoint( - endpoint_csv_path, endpoint_url, entry_date, start_date, end_date - ): - return "endpoint_hash", { - "endpoint": "endpoint_hash", - "endpoint-url": endpoint_url, - "parameters": "", - "plugin": "", - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_endpoint", fake_append_endpoint - ) - - result = _validate_endpoint(url, str(pipeline_dir)) - - assert "new_endpoint_entry" in result or "endpoint_url_in_endpoint_csv" in result - - -def test_validate_source_creates_new_source(monkeypatch, tmp_path): - """Test _validate_source creates new source entry when it doesn't exist""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - source_csv_path = pipeline_dir / "source.csv" - - documentation_url = "http://example.com/doc" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = { - "endpoint_url_in_endpoint_csv": True, - "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, - } - - def fake_append_source( - source_csv_path, - collection, - organisation, - endpoint_key, - attribution, - documentation_url, - licence, - pipelines, - entry_date, - start_date, - end_date, - ): - return "source_hash_456", { - "source": "source_hash_456", - "attribution": attribution, - "collection": collection, - "documentation-url": documentation_url, - "endpoint": endpoint_key, - "licence": licence, - "organisation": organisation, - "pipelines": pipelines, - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_source", fake_append_source - ) - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert result["documentation_url_in_source_csv"] is False - assert "new_source_entry" in result - assert result["new_source_entry"]["source"] == "source_hash_456" - assert result["new_source_entry"]["collection"] == collection - assert result["new_source_entry"]["organisation"] == organisation - assert result["new_source_entry"]["pipelines"] == dataset - - -def test_validate_source_finds_existing_source(monkeypatch, tmp_path): - """Test _validate_source finds existing source entry""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - source_csv_path = pipeline_dir / "source.csv" - - documentation_url = "http://example.com/doc" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = { - "endpoint_url_in_endpoint_csv": True, - "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, - } - - with open(source_csv_path, "w", newline="", encoding="utf-8") as f: - writer = csv.DictWriter( - f, - fieldnames=[ - "source", - "attribution", - "collection", - "documentation-url", - "endpoint", - "licence", - "organisation", - "pipelines", - "entry-date", - "start-date", - "end-date", - ], - ) - writer.writeheader() - writer.writerow( - { - "source": "existing_source_hash", - "attribution": "", - "collection": collection, - "documentation-url": documentation_url, - "endpoint": "endpoint_hash_123", - "licence": "", - "organisation": organisation, - "pipelines": dataset, - "entry-date": "2024-01-01T00:00:00", - "start-date": "2024-01-01", - "end-date": "", - } - ) - - def fake_append_source(*args, **kwargs): - return "existing_source_hash", None - - monkeypatch.setattr( - "src.application.core.pipeline.append_source", fake_append_source - ) - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert result["documentation_url_in_source_csv"] is True - assert "existing_source_entry" in result - assert result["existing_source_entry"]["source"] == "existing_source_hash" - assert result["existing_source_entry"]["collection"] == collection - - -def test_validate_source_no_endpoint_key(tmp_path): - """Test _validate_source returns empty dict when no endpoint key available""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - - documentation_url = "http://example.com/doc" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = {} - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert result == {} - - -def test_validate_source_empty_documentation_url(monkeypatch, tmp_path): - """Test _validate_source handles empty documentation URL""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - - documentation_url = "" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = { - "endpoint_url_in_endpoint_csv": True, - "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, - } - - def fake_append_source( - source_csv_path, - collection, - organisation, - endpoint_key, - attribution, - documentation_url, - licence, - pipelines, - entry_date, - start_date, - end_date, - ): - return "source_hash_456", { - "source": "source_hash_456", - "attribution": attribution, - "collection": collection, - "documentation-url": documentation_url, - "endpoint": endpoint_key, - "licence": licence, - "organisation": organisation, - "pipelines": pipelines, - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_source", fake_append_source - ) - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert "documentation_url_in_source_csv" in result - assert result["new_source_entry"]["documentation-url"] == "" - - -def test_validate_source_uses_new_endpoint_entry(monkeypatch, tmp_path): - """Test _validate_source uses endpoint from new_endpoint_entry when available""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - - documentation_url = "http://example.com/doc" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = { - "endpoint_url_in_endpoint_csv": False, - "new_endpoint_entry": {"endpoint": "new_endpoint_hash_789"}, - } - - captured_endpoint_key = None - - def fake_append_source( - source_csv_path, - collection, - organisation, - endpoint_key, - attribution, - documentation_url, - licence, - pipelines, - entry_date, - start_date, - end_date, - ): - nonlocal captured_endpoint_key - captured_endpoint_key = endpoint_key - return "source_hash_456", { - "source": "source_hash_456", - "attribution": attribution, - "collection": collection, - "documentation-url": documentation_url, - "endpoint": endpoint_key, - "licence": licence, - "organisation": organisation, - "pipelines": pipelines, - "entry-date": entry_date, - "start-date": start_date, - "end-date": end_date, - } - - monkeypatch.setattr( - "src.application.core.pipeline.append_source", fake_append_source - ) - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert captured_endpoint_key == "new_endpoint_hash_789" - assert result["new_source_entry"]["endpoint"] == "new_endpoint_hash_789" - - -def test_validate_source_handles_csv_read_error(monkeypatch, tmp_path): - """Test _validate_source handles CSV read errors gracefully""" - pipeline_dir = tmp_path / "pipeline" - pipeline_dir.mkdir() - source_csv_path = pipeline_dir / "source.csv" - - documentation_url = "http://example.com/doc" - collection = "test-collection" - organisation = "test-org" - dataset = "test-dataset" - - endpoint_summary = { - "endpoint_url_in_endpoint_csv": True, - "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, - } - - source_csv_path.write_bytes(b"\x00\x00\x00") - - def fake_append_source(*args, **kwargs): - return "existing_source_hash", None - - monkeypatch.setattr( - "src.application.core.pipeline.append_source", fake_append_source - ) - - result = _validate_source( - documentation_url, - str(pipeline_dir), - collection, - organisation, - dataset, - endpoint_summary, - ) - - assert "documentation_url_in_source_csv" in result diff --git a/request-processor/tests/unit/src/application/core/test_utils.py b/request-processor/tests/unit/src/application/core/test_utils.py index 2b38619b..5be3a0b3 100644 --- a/request-processor/tests/unit/src/application/core/test_utils.py +++ b/request-processor/tests/unit/src/application/core/test_utils.py @@ -1,4 +1,9 @@ -from src.application.core.utils import get_request, check_content +from src.application.core.utils import ( + get_request, + check_content, + validate_endpoint, + validate_source, +) import requests_mock import os import csv @@ -192,3 +197,501 @@ def test_append_source_existing(tmp_path): reader = list(csv.DictReader(f)) assert len(reader) == 1 assert reader[0]["source"] == source_key + + +def test_validate_endpoint_creates_file(monkeypatch, tmp_path): + """Test that validate_endpoint creates endpoint.csv if it doesn't exist""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + url = "http://example.com/endpoint" + endpoint_csv_path = pipeline_dir / "endpoint.csv" + + def fake_append_endpoint( + endpoint_csv_path, endpoint_url, entry_date, start_date, end_date, plugin=None + ): + return "endpoint_hash", { + "endpoint": "endpoint_hash", + "endpoint-url": endpoint_url, + "parameters": "", + "plugin": plugin or "", + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + monkeypatch.setattr( + "src.application.core.utils.append_endpoint", fake_append_endpoint + ) + + assert not endpoint_csv_path.exists() + + validate_endpoint(url, str(pipeline_dir), plugin=None) + + assert endpoint_csv_path.exists() + + with open(endpoint_csv_path, "r", encoding="utf-8") as f: + reader = csv.reader(f) + headers = next(reader) + assert headers == [ + "endpoint", + "endpoint-url", + "parameters", + "plugin", + "entry-date", + "start-date", + "end-date", + ] + + +def test_validate_endpoint_appends(monkeypatch, tmp_path): + """Test that validate_endpoint appends new endpoint when URL not found""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + url = "http://example.com/new-endpoint" + endpoint_csv_path = pipeline_dir / "endpoint.csv" + + with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, + fieldnames=[ + "endpoint", + "endpoint-url", + "parameters", + "plugin", + "entry-date", + "start-date", + "end-date", + ], + ) + writer.writeheader() + writer.writerow( + { + "endpoint": "existing_hash", + "endpoint-url": "http://example.com/existing", + "parameters": "", + "plugin": "", + "entry-date": "2024-01-01T00:00:00", + "start-date": "2024-01-01", + "end-date": "", + } + ) + + def fake_append_endpoint( + endpoint_csv_path, endpoint_url, entry_date, start_date, end_date, plugin=None + ): + return "new_endpoint_hash", { + "endpoint": "new_endpoint_hash", + "endpoint-url": endpoint_url, + "parameters": "", + "plugin": plugin or "", + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + monkeypatch.setattr( + "src.application.core.utils.append_endpoint", fake_append_endpoint + ) + + result = validate_endpoint(url, str(pipeline_dir), plugin=None) + + assert result["endpoint_url_in_endpoint_csv"] is False + assert "new_endpoint_entry" in result + assert result["new_endpoint_entry"]["endpoint"] == "new_endpoint_hash" + assert result["new_endpoint_entry"]["endpoint-url"] == url + + +def test_validate_endpoint_finds_existing(monkeypatch, tmp_path): + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + url = "http://example.com/endpoint" + endpoint_csv_path = pipeline_dir / "endpoint.csv" + with open(endpoint_csv_path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, + fieldnames=[ + "endpoint", + "endpoint-url", + "parameters", + "plugin", + "entry-date", + "start-date", + "end-date", + ], + ) + writer.writeheader() + writer.writerow( + { + "endpoint": "endpoint_hash", + "endpoint-url": url, + "parameters": "", + "plugin": "", + "entry-date": "2024-01-01", + "start-date": "2024-01-01", + "end-date": "", + } + ) + + monkeypatch.setattr( + "src.application.core.utils.append_endpoint", + lambda *a, **kw: (_ for _ in ()).throw(Exception("Should not be called")), + ) + + result = validate_endpoint(url, str(pipeline_dir), plugin=None) + assert result["endpoint_url_in_endpoint_csv"] is True + assert "existing_endpoint_entry" in result + assert result["existing_endpoint_entry"]["endpoint-url"] == url + + +def test_validate_endpoint_empty_url(monkeypatch, tmp_path): + """Test validate_endpoint with empty URL""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + + result = validate_endpoint("", str(pipeline_dir), plugin=None) + + assert result == {} + + +def test_validate_endpoint_csv_read_error(monkeypatch, tmp_path): + """Test validate_endpoint when reading CSV fails""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + url = "http://example.com/endpoint" + + endpoint_csv_path = pipeline_dir / "endpoint.csv" + endpoint_csv_path.write_bytes(b"\x00\x00\x00") + + def fake_append_endpoint( + endpoint_csv_path, endpoint_url, entry_date, start_date, end_date, plugin=None + ): + return "endpoint_hash", { + "endpoint": "endpoint_hash", + "endpoint-url": endpoint_url, + "parameters": "", + "plugin": plugin or "", + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + +def test_validate_source_creates_new_source(monkeypatch, tmp_path): + """Test validate_source creates new source entry when it doesn't exist""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + source_csv_path = pipeline_dir / "source.csv" + + documentation_url = "http://example.com/doc" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = { + "endpoint_url_in_endpoint_csv": True, + "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, + } + + def fake_append_source( + source_csv_path, + collection, + organisation, + endpoint_key, + attribution, + documentation_url, + licence, + pipelines, + entry_date, + start_date, + end_date, + ): + return "source_hash_456", { + "source": "source_hash_456", + "attribution": attribution, + "collection": collection, + "documentation-url": documentation_url, + "endpoint": endpoint_key, + "licence": licence, + "organisation": organisation, + "pipelines": pipelines, + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + monkeypatch.setattr("src.application.core.utils.append_source", fake_append_source) + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert result["documentation_url_in_source_csv"] is False + assert "new_source_entry" in result + assert result["new_source_entry"]["source"] == "source_hash_456" + assert result["new_source_entry"]["collection"] == collection + assert result["new_source_entry"]["organisation"] == organisation + assert result["new_source_entry"]["pipelines"] == dataset + + +def test_validate_source_finds_existing_source(monkeypatch, tmp_path): + """Test validate_source finds existing source entry""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + source_csv_path = pipeline_dir / "source.csv" + + documentation_url = "http://example.com/doc" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = { + "endpoint_url_in_endpoint_csv": True, + "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, + } + + with open(source_csv_path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, + fieldnames=[ + "source", + "attribution", + "collection", + "documentation-url", + "endpoint", + "licence", + "organisation", + "pipelines", + "entry-date", + "start-date", + "end-date", + ], + ) + writer.writeheader() + writer.writerow( + { + "source": "existing_source_hash", + "attribution": "", + "collection": collection, + "documentation-url": documentation_url, + "endpoint": "endpoint_hash_123", + "licence": "", + "organisation": organisation, + "pipelines": dataset, + "entry-date": "2024-01-01T00:00:00", + "start-date": "2024-01-01", + "end-date": "", + } + ) + + def fake_append_source(*args, **kwargs): + return "existing_source_hash", None + + monkeypatch.setattr("src.application.core.utils.append_source", fake_append_source) + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert result["documentation_url_in_source_csv"] is True + assert "existing_source_entry" in result + assert result["existing_source_entry"]["source"] == "existing_source_hash" + assert result["existing_source_entry"]["collection"] == collection + + +def test_validate_source_no_endpoint_key(tmp_path): + """Test validate_source returns empty dict when no endpoint key available""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + + documentation_url = "http://example.com/doc" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = {} + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert result == {} + + +def test_validate_source_empty_documentation_url(monkeypatch, tmp_path): + """Test validate_source handles empty documentation URL""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + + documentation_url = "" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = { + "endpoint_url_in_endpoint_csv": True, + "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, + } + + def fake_append_source( + source_csv_path, + collection, + organisation, + endpoint_key, + attribution, + documentation_url, + licence, + pipelines, + entry_date, + start_date, + end_date, + ): + return "source_hash_456", { + "source": "source_hash_456", + "attribution": attribution, + "collection": collection, + "documentation-url": documentation_url, + "endpoint": endpoint_key, + "licence": licence, + "organisation": organisation, + "pipelines": pipelines, + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + monkeypatch.setattr("src.application.core.utils.append_source", fake_append_source) + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert "documentation_url_in_source_csv" in result + assert result["new_source_entry"]["documentation-url"] == "" + + +def test_validate_source_uses_new_endpoint_entry(monkeypatch, tmp_path): + """Test validate_source uses endpoint from new_endpoint_entry when available""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + + documentation_url = "http://example.com/doc" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = { + "endpoint_url_in_endpoint_csv": False, + "new_endpoint_entry": {"endpoint": "new_endpoint_hash_789"}, + } + + captured_endpoint_key = None + + def fake_append_source( + source_csv_path, + collection, + organisation, + endpoint_key, + attribution, + documentation_url, + licence, + pipelines, + entry_date, + start_date, + end_date, + ): + nonlocal captured_endpoint_key + captured_endpoint_key = endpoint_key + return "source_hash_456", { + "source": "source_hash_456", + "attribution": attribution, + "collection": collection, + "documentation-url": documentation_url, + "endpoint": endpoint_key, + "licence": licence, + "organisation": organisation, + "pipelines": pipelines, + "entry-date": entry_date, + "start-date": start_date, + "end-date": end_date, + } + + monkeypatch.setattr("src.application.core.utils.append_source", fake_append_source) + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert captured_endpoint_key == "new_endpoint_hash_789" + assert result["new_source_entry"]["endpoint"] == "new_endpoint_hash_789" + + +def test_validate_source_handles_csv_read_error(monkeypatch, tmp_path): + """Test validate_source handles CSV read errors gracefully""" + pipeline_dir = tmp_path / "pipeline" + pipeline_dir.mkdir() + source_csv_path = pipeline_dir / "source.csv" + + documentation_url = "http://example.com/doc" + collection = "test-collection" + organisation = "test-org" + dataset = "test-dataset" + + endpoint_summary = { + "endpoint_url_in_endpoint_csv": True, + "existing_endpoint_entry": {"endpoint": "endpoint_hash_123"}, + } + + source_csv_path.write_bytes(b"\x00\x00\x00") + + def fake_append_source(*args, **kwargs): + return "existing_source_hash", None + + monkeypatch.setattr("src.application.core.utils.append_source", fake_append_source) + + result = validate_source( + documentation_url, + str(pipeline_dir), + collection, + organisation, + dataset, + endpoint_summary, + start_date=None, + licence=None, + ) + + assert "documentation_url_in_source_csv" in result diff --git a/request-processor/tests/unit/src/application/core/test_workflow.py b/request-processor/tests/unit/src/application/core/test_workflow.py index 9fdb4022..124ae6f0 100644 --- a/request-processor/tests/unit/src/application/core/test_workflow.py +++ b/request-processor/tests/unit/src/application/core/test_workflow.py @@ -5,7 +5,8 @@ csv_to_json, fetch_pipeline_csvs, add_data_workflow, - fetch_add_data_csvs, + fetch_add_data_pipeline_csvs, + fetch_add_data_collection_csvs, ) import csv import os @@ -321,23 +322,41 @@ def test_add_data_workflow(monkeypatch): class DummyDirectories: PIPELINE_DIR = "/tmp/pipeline" COLLECTION_DIR = "/tmp/collection" + TRANSFORMED_DIR = "/tmp/transformed" SPECIFICATION_DIR = "/tmp/specification" CACHE_DIR = "/tmp/cache" directories = DummyDirectories() - expected_response = {"status": "success", "data": "test"} + pipeline_response = {"status": "success", "data": "test"} + expected_response = { + "pipeline-summary": pipeline_response, + "endpoint-summary": {"endpoint_summary": "mocked"}, + "source-summary": {"source_summary": "mocked"}, + } monkeypatch.setattr( "src.application.core.workflow.resource_from_path", lambda path: "resource-hash" ) monkeypatch.setattr( - "src.application.core.workflow.fetch_add_data_csvs", - lambda col, pdir: ["/tmp/pipeline/lookup.csv"], + "src.application.core.workflow.fetch_add_data_pipeline_csvs", + lambda col, pdir: True, + ) + monkeypatch.setattr( + "src.application.core.workflow.fetch_add_data_collection_csvs", + lambda col, cdir: True, ) monkeypatch.setattr( "src.application.core.workflow.fetch_add_data_response", - lambda *args, **kwargs: expected_response, + lambda *args, **kwargs: pipeline_response, + ) + monkeypatch.setattr( + "src.application.core.workflow.validate_endpoint", + lambda *args, **kwargs: {"endpoint_summary": "mocked"}, + ) + monkeypatch.setattr( + "src.application.core.workflow.validate_source", + lambda *args, **kwargs: {"source_summary": "mocked"}, ) result = add_data_workflow( @@ -366,6 +385,7 @@ def test_add_data_workflow_calls(monkeypatch): class DummyDirectories: PIPELINE_DIR = "/tmp/pipeline" COLLECTION_DIR = "/tmp/collection" + TRANSFORMED_DIR = "/tmp/transformed" SPECIFICATION_DIR = "/tmp/specification" CACHE_DIR = "/tmp/cache" @@ -373,35 +393,43 @@ class DummyDirectories: called = {} - def fake_resource_from_path(path): - called["resource_from_path"] = path - return "resource-hash" + def fake_fetch_add_data_pipeline_csvs(col, pdir): + called["fetch_add_data_pipeline_csvs"] = (col, pdir) + return True - def fake_fetch_add_data_csvs(col, pdir): - called["fetch_add_data_csvs"] = (col, pdir) - return ["/tmp/pipeline/lookup.csv"] + def fake_fetch_add_data_collection_csvs(col, cdir): + called["fetch_add_data_collection_csvs"] = (col, cdir) + return True def fake_fetch_add_data_response( - col, ds, org, pdir, ipath, spec_dir, cache_dir, e_url, d_url + dataset, + organisation_provider, + pipeline_dir, + input_dir, + output_path, + specification_dir, + cache_dir, + url, ): called["fetch_add_data_response"] = { - "collection": col, - "dataset": ds, - "organisation": org, - "pipeline_dir": pdir, - "input_path": ipath, - "specification_dir": spec_dir, + "dataset": dataset, + "organisation": organisation_provider, + "pipeline_dir": pipeline_dir, + "input_dir": input_dir, + "output_path": output_path, + "specification_dir": specification_dir, "cache_dir": cache_dir, - "url": e_url, - "documentation_url": d_url, + "url": url, } return {"result": "ok"} monkeypatch.setattr( - "src.application.core.workflow.resource_from_path", fake_resource_from_path + "src.application.core.workflow.fetch_add_data_pipeline_csvs", + fake_fetch_add_data_pipeline_csvs, ) monkeypatch.setattr( - "src.application.core.workflow.fetch_add_data_csvs", fake_fetch_add_data_csvs + "src.application.core.workflow.fetch_add_data_collection_csvs", + fake_fetch_add_data_collection_csvs, ) monkeypatch.setattr( "src.application.core.workflow.fetch_add_data_response", @@ -422,27 +450,30 @@ def fake_fetch_add_data_response( expected_pipeline_dir = os.path.join( directories.PIPELINE_DIR, collection, request_id ) - expected_input_path = os.path.join( + expected_input_dir = os.path.join( directories.COLLECTION_DIR, "resource", request_id ) - expected_file_path = os.path.join(expected_input_path, file_name) + expected_output_path = os.path.join( + directories.TRANSFORMED_DIR, request_id, file_name + ) - assert called["resource_from_path"] == expected_file_path - assert called["fetch_add_data_csvs"] == (collection, expected_pipeline_dir) + expected_config_dir = os.path.join(directories.COLLECTION_DIR, request_id) + assert called["fetch_add_data_pipeline_csvs"] == (collection, expected_pipeline_dir) + assert called["fetch_add_data_collection_csvs"] == (collection, expected_config_dir) assert called["fetch_add_data_response"]["dataset"] == dataset assert called["fetch_add_data_response"]["organisation"] == organisation assert called["fetch_add_data_response"]["pipeline_dir"] == expected_pipeline_dir - assert called["fetch_add_data_response"]["input_path"] == expected_input_path + assert called["fetch_add_data_response"]["input_dir"] == expected_input_dir + assert called["fetch_add_data_response"]["output_path"] == expected_output_path assert ( called["fetch_add_data_response"]["specification_dir"] == directories.SPECIFICATION_DIR ) assert called["fetch_add_data_response"]["cache_dir"] == directories.CACHE_DIR assert called["fetch_add_data_response"]["url"] == url - assert called["fetch_add_data_response"]["documentation_url"] == documentation_url -def test_fetch_add_data_csvs_from_url(monkeypatch, tmp_path): +def test_fetch_add_data_pipeline_csvs_from_url(monkeypatch, tmp_path): collection = "test-collection" pipeline_dir = tmp_path / "pipeline" pipeline_dir_str = str(pipeline_dir) @@ -460,14 +491,13 @@ def fake_urlretrieve(url, path): monkeypatch.setattr("urllib.request.urlretrieve", fake_urlretrieve) - files = fetch_add_data_csvs(collection, pipeline_dir_str) + fetch_add_data_pipeline_csvs(collection, pipeline_dir_str) assert os.path.exists(pipeline_dir_str) assert any("lookup.csv" in path for url, path in downloads) - assert files == [] -def test_fetch_add_data_csvs_handles_http_error(monkeypatch, tmp_path): +def test_fetch_add_data_pipeline_csvs_handles_http_error(monkeypatch, tmp_path): collection = "test-collection" pipeline_dir = tmp_path / "pipeline" pipeline_dir_str = str(pipeline_dir) @@ -480,7 +510,6 @@ def raise_http_error(url, path): monkeypatch.setattr("urllib.request.urlretrieve", raise_http_error) - files = fetch_add_data_csvs(collection, pipeline_dir_str) + fetch_add_data_pipeline_csvs(collection, pipeline_dir_str) assert os.path.exists(pipeline_dir_str) - assert files == [] diff --git a/scripts/debug_trigger_add_data.py b/scripts/debug_trigger_add_data.py new file mode 100644 index 00000000..b1b5459e --- /dev/null +++ b/scripts/debug_trigger_add_data.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +""" +Manual debug trigger for add_data_task. + +Invokes the add_data task directly without Celery so breakpoints hit reliably in VS Code. +""" + +import os +import sys +import json +import datetime +from pathlib import Path + +# Set up paths +workspace_root = Path(__file__).parent.parent +request_processor_src = workspace_root / "request-processor" / "src" +request_processor_root = workspace_root / "request-processor" +request_model = workspace_root / "request_model" +task_interface = workspace_root / "task_interface" + +sys.path.insert(0, str(request_processor_src)) +sys.path.insert(0, str(request_model)) +sys.path.insert(0, str(task_interface)) + +import crud # noqa: E402 +import database # noqa: E402 +import request_model.models as request_models # noqa: E402 +from tasks import add_data_task # noqa: E402 + +# Change to request-processor so relative paths (e.g. specification/, var/) resolve correctly +os.chdir(request_processor_root) + + +def ensure_request_exists(request_payload: dict) -> None: + """Upsert a row into the request table so the processor can write responses.""" + + database_url = os.environ.get("DATABASE_URL") + if not database_url: + raise RuntimeError( + "DATABASE_URL is not set. Start the stack (or export DATABASE_URL) before running debug_trigger_add_data." + ) + + request_id = request_payload["id"] + db_session = database.session_maker() + with db_session() as session: + existing = crud.get_request(session, request_id) + if existing: + existing.status = request_payload.get("status", existing.status) + existing.type = request_payload.get("type", existing.type) + existing.params = request_payload.get("params", existing.params) + else: + session.add( + request_models.Request( + id=request_id, + status=request_payload.get("status", "PENDING"), + type=request_payload.get("type"), + params=request_payload.get("params"), + ) + ) + session.commit() + + +# Override directories to local workspace paths (avoids relying on /opt/* when not in Docker) +workspace_volume = workspace_root / "request-processor" / "docker_volume" +directories_override = { + "COLLECTION_DIR": str(workspace_volume / "collection"), + "ISSUE_DIR": str(workspace_volume / "issue"), + "COLUMN_FIELD_DIR": str(workspace_volume / "column-field"), + "TRANSFORMED_DIR": str(workspace_volume / "transformed"), + "CONVERTED_DIR": str(workspace_volume / "converted"), + "PIPELINE_DIR": str(workspace_volume / "pipeline"), + # Leave relative defaults for var/ and specification/ +} + +# Ensure base directories exist +for d in directories_override.values(): + Path(d).mkdir(parents=True, exist_ok=True) + +# Request payload - edit the params block if you need to test different datasets or URLs +now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) +request_payload = { + "id": "debug-add-data-001", + "type": "add_data", + "status": "PENDING", + "created": now, + "modified": now, + "response": None, + "params": { + "type": "add_data", + "organisationName": "Stockport Metropolitan Borough Council", + "organisation": "local-authority:SKP", + "dataset": "article-4-direction", + "collection": "article-4-direction", + "column_mapping": None, + "geom_type": None, + "url": "https://smbc-opendata.s3.eu-west-1.amazonaws.com/Article4/Article4_Dataset_Stockport.csv", + "documentation_url": "https://example.com/article-4-direction/documentation", + "licence": "ogl3", + "start_date": "2020-01-01", + "plugin": None, + }, +} + +print("=" * 80) +print("DEBUG TRIGGER: Invoking add_data_task directly") +print("=" * 80) +print(f"\nRequest ID: {request_payload['id']}") +print(f"Dataset: {request_payload['params']['dataset']}") +print(f"Collection: {request_payload['params']['collection']}") +print(f"URL: {request_payload['params']['url'][:80]}...") +print("\n" + "=" * 80) +print("Set a breakpoint inside add_data_workflow if you want to step through it.") +print("=" * 80 + "\n") + +try: + ensure_request_exists(request_payload) + result = add_data_task(request_payload, directories=json.dumps(directories_override)) + print("\n" + "=" * 80) + print("TASK COMPLETED SUCCESSFULLY") + print("=" * 80) + print(f"\nResult: {json.dumps(result, indent=2, default=str)}") +except Exception as e: + print("\n" + "=" * 80) + print("TASK FAILED WITH EXCEPTION") + print("=" * 80) + print(f"\nException Type: {type(e).__name__}") + print(f"Exception Message: {str(e)}") + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/scripts/debug_trigger.py b/scripts/debug_trigger_checkurl.py similarity index 94% rename from scripts/debug_trigger.py rename to scripts/debug_trigger_checkurl.py index 64f5edbd..07af1b0a 100644 --- a/scripts/debug_trigger.py +++ b/scripts/debug_trigger_checkurl.py @@ -42,7 +42,7 @@ def ensure_request_exists(request_payload: dict) -> None: database_url = os.environ.get("DATABASE_URL") if not database_url: raise RuntimeError( - "DATABASE_URL is not set. Start the stack (or export DATABASE_URL) before running debug_trigger." + "DATABASE_URL is not set. Start the stack (or export DATABASE_URL) before running debug_trigger_checkurl." ) request_id = request_payload["id"] @@ -96,11 +96,11 @@ def ensure_request_exists(request_payload: dict) -> None: "params": { "type": "check_url", "organisationName": "London Borough of Hackney", - "dataset": "article-4-direction", + "dataset": "article-4-direction-area", "collection": "article-4-direction", "column_mapping": None, "geom_type": None, - "url": "https://map2.hackney.gov.uk/geoserver/ows?service=WFS&version=2.0&request=GetFeature&outputFormat=application/json&SrsName=EPSG:4326&typeName=planning:lldc_hackney_conservation_area" + "url": "https://raw.githubusercontent.com/digital-land/PublishExamples/refs/heads/main/Article4Direction/Files/Article4DirectionArea/articel4directionareas-(wrongColName-NewRefs)" } }