diff --git a/.gitignore b/.gitignore index 434e265..c093ccf 100644 --- a/.gitignore +++ b/.gitignore @@ -84,3 +84,7 @@ docs/_build /local_test /script /.venv/ + +# Metadata generation +meta*/ +*.json \ No newline at end of file diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 207bd1c..29bcebc 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,7 +5,10 @@ Changelog current -------------------- * Add the creation module and create entry: They implement yaml based metadata creation, provide template feature to keep metadata creation DRY, provide functionality to setup the metadata structure & generate metadata from existing sources like datapackages and csv files, provide functionality to create the full datapackage.json and save it to file [(#127)](https://github.com/rl-institut/super-repo/pull/127) - +* Enhance new creation module. The creator now offers a builder to integrate metadata creation into 3rd party code and add metadata during runtime, the new cleaner module helps to create proper and valid metadata. [(#134)](https://github.com/rl-institut/super-repo/pull/134) +* Add api module to integrate omi with the OEP better. If users use omi locally they can now push/pull metadata to and form table resources which exists on the OEP [(#134)](https://github.com/rl-institut/super-repo/pull/134) +* Fixed a gut that prevented users from use omi as real cli too [(#134)](https://github.com/rl-institut/super-repo/pull/134) +* Updated OEP-API usage path when retrieving metadata form a table available on the OEP as the URL pattern has changed on OEP side [(#134)](https://github.com/rl-institut/super-repo/pull/134) 1.1.0 (2025-03-25) -------------------- diff --git a/docs/OMI-workflow.md b/docs/OMI-workflow.md new file mode 100644 index 0000000..5f2b5a0 --- /dev/null +++ b/docs/OMI-workflow.md @@ -0,0 +1,338 @@ +# Local - remote metadata workflow using OMI and the OEP + +This document provides documentation about the core workflow we suggest to work with omi locally to create a single or multiple oemetadata documents. + +The workflow describes how local metadata creation using OMI´s YAML file structure can be used together with table resources available on the OEP. The YAML system describes dataset and resources and also provides the option to add reoccurring information to a template YAML. It is used locally on the users PC and allows for a structured metadata management for one or multiple dataset. +To use the OEP as remote metadata repository OMI provides functionality to push or pull metadata to or from tables which are available on the OEP using the Rest-API. + +## Workflow + +The workflow is still not perfect and must be followed quite strictly especially when working with local metadata files and tables on the OEP. Otherwise it might happen that users create a local version of the metadata and on the same time a table on the OEP where they also can create and edit metadata. In a case where the local version contains less information than the remote version, pushing metadata from local to remote would overwrite the remote version. The same is true vice versa when importing metadata from the OEP to local. + +That said, the workflow we currently suggest got at least 5 initial states: + +1. The user does not know how a dataset will look like. Data is not yet available. +2. The user already got a complete dataset of tabular data (CSV, excel files) available locally. +3. The user already got data uploaded to the OEP. +4. The user has some data locally and some on the OEP. +5. The user already got a OMI metadata workspace and wants to extend it. + +In general in case of 1. it is mandatory to first get the data, users could already start to create metadata documents using omi and enhance them once data is available. Here it is worth to mention that data must be provided in a database (relational database system) conform data. Otherwise data cant be uploaded to the OEP. + +In case 2. users we see a good starting point for using OMI. Users can use omi to create metadata files for all files using its functionality. OMI also helps with inspecting data and inferring metadata form data files. After that users already have the base set of metadata available and could go ahead with uploading the data to the OEP. They can also refine the metadata by extending the information in the metadata YAML files. + +In case 3. The user should initialize a dataset using OMI and then add resources from the OEP. This will create a Dataset skeleton and add resource metadata files to the dataset. Here the metadata which is available on the OEP is imported. After that users can enhance metadata in the YAML files and then push the updated metadata back to all tables on the OEP. While working on metadata locally its important to stop editing metadata on the OEP. + +In case 4. The user can initialize a new dataset from local files. This results in the YAML files structure for all files available. Then they might infer metadata from files to get a good minimal metadata set. The user can add the table resources from oep to the existing dataset using OMI. Now bot local and remote resources are available. The user now could go ahead and upload missing resources to the OEP or publish them e.g. on Zenodo to make them publicly available. + +In case 5. The user can just add more resources either from file, oep or just by resource name with an empty metadata skeleton. The user also might want to integrate omi into a data pipeline then they should use the build system omi provides. The YAML based metadata system here is still the baseline as for metadata creation manual/human inputs are required. Still the build system enables users to add more metadata during runtime of their code (like pipeline run). This enables Dynamic metadata creation/enhancements and full integration into 3rd party code. + +## OMI installation + +Currently latest functionality is only available on GitHub in the `dev` branch. In general omi is available on PyPi. + +Get code from GitHub + +```bash +# navigate into your github repo´s directory +cd github + +git clone https://github.com/OpenEnergyPlatform/omi.git + +``` + +Create python environment + +```bash +# navigate into your workspace directory +cd omi-workspace + +# i recommend using the tool uv here but you can use your local python and pip directly +python3 -m venv .venv +source .venv/bin/activate +``` + +Install omi package + +Option 1 + +```bash +# from pypi +pip install omi +``` + +Option 2 + +```bash +# from cloned github repo using dev mode installation +pip install -e ../github/omi/ +``` + +Make sure you use at least python 3.10 as otherwise installation might fail. If you still encounter issues create a [GitHub issue](https://github.com/OpenEnergyPlatform/omi/issues/new/choose). + +## OMI usage + +You can use OMI either as python module to integrate certain functionality in your codebase. If you just want to use OMI´s features you can opt for the CLI tool omi provides. + +The documentation on how to use omi in your codebase is missing. + +In general you can use the omi modules for oemetadata: + +- validation +- open data license check +- infer metadata form files +- convert metadata from previous to latest version +- get the oemetadata spec JSON artifacts: schema, template, example +- Upload / download metadata form OEP tables +- Create metadata dataset +- Use the YAML based system to manage metadata locally in dataset with multiple resources and only define information in a template which is applied to all dataset resources +- Initialize or extend dataset metadata from frictionless datapackage json files, from directories containing data files, from oep tables or add resources with empty skeleton + +## The OMI-CLI offers easy access to its functionality + +The CLI entry point is: + +```bash +omi ... +``` + +The main groups/commands are: + +Try + +```bash +omi --help +``` + +* `omi assemble` – build OEMetadata JSON from YAML. +* `omi dataset|resources|from-json|oep-resource` – scaffold metadata. +* `omi push-oep-all` – push metadata for **all / selected** tables of a dataset. +* `omi push-oep-one` – push metadata for **one** specific table. + +All commands assume a **split layout** like: + +```text +metadata/ + datasets/ + my_dataset.dataset.yaml + my_dataset.template.yaml + resources/ + my_dataset/ + table_1.resource.yaml + table_2.resource.yaml +``` + +You can initialize this setup automatically. You’ll usually set `--base-dir ./metadata`. + +--- + +## 1. Assembling OEMetadata locally + +Build one OEMetadata JSON file from split YAML: + +```bash +omi assemble \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --output-file ./out/my_dataset.json +``` + +Optional if you use a metadata index: + +```bash +omi assemble \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --output-file ./out/my_dataset.json \ + --index-file ./metadata/metadata_index.yaml +``` + +--- + +## 2. Init / Scaffolding + +### 2.1 Create an empty dataset skeleton + +```bash +omi dataset ./metadata my_dataset \ + --oem-version OEMetadata-2.0 \ + --resource table_1 \ + --resource table_2 \ + --overwrite +``` + +Creates: + +* `datasets/my_dataset.dataset.yaml` +* `datasets/my_dataset.template.yaml` +* optional stub resource YAMLs for `table_1`, `table_2`. + +### 2.2 Create resource stubs from files + +```bash +omi resources ./metadata my_dataset path/to/data1.csv path/to/data2.csv \ + --oem-version OEMetadata-2.0 \ + --overwrite +``` + +Infers schemas for CSV etc. and creates: + +* `resources/my_dataset/data1.resource.yaml` +* `resources/my_dataset/data2.resource.yaml` + +### 2.3 Import from existing OEMetadata JSON + +```bash +omi from-json ./metadata my_dataset ./oem.json \ + --oem-version OEMetadata-2.0 \ + --collect-common +``` + +* Creates dataset + template skeleton. +* Generates resource YAMLs from `oem.json`. +* Optionally hoists common fields to the template. + +### 2.4 Import a single OEP table as a resource + +Fetch metadata from OEP and add it as resource YAML: + +```bash +omi oep-resource ./metadata my_dataset parameter_photovoltaik_openfield145 \ + +``` + +* If `datasets/my_dataset.dataset.yaml` does **not** exist, a skeleton is created. +* A resource YAML is written to `resources/my_dataset/.resource.yaml`. +* Top-level OEP dataset fields are ignored. + +If you **do not** want auto-creation of the dataset, use the `--no-create-dataset` option (depending on how you wired it; if you followed earlier code it’s there). + +--- + +## 3. Pushing metadata back to OEP + +### Token format + +Pass the **raw token** to the CLI (e.g. `123abc...`). +The code builds the header `Authorization: Token ` internally. + +--- + +### 3.1 Push metadata for **all** tables in a dataset + +```bash +omi push-oep-all \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --token YOUR_OEP_TOKEN \ +``` + +What it does: + +* Assembles full OEMetadata from split YAML. +* For each `resource`: + + * builds a per-table OEMetadata that includes: + + * all dataset-level attributes, + * exactly that one resource in `resources`. + * sends it to `/api/v0/tables//meta/`. + +So the **OEP table name** must match `resource.name`. + +Restrict to specific tables: + +```bash +omi push-oep-all \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --token YOUR_OEP_TOKEN \ + --only-table parameter_photovoltaik_openfield145 \ + --only-table some_other_table +``` + +Use PUT instead of POST: + +```bash +omi push-oep-all \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --token YOUR_OEP_TOKEN \ + --method PUT +``` + +--- + +### 3.2 Push metadata for **one** specific table + +```bash +omi push-oep-one \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --table parameter_photovoltaik_openfield145 \ + --token YOUR_OEP_TOKEN \ +``` + +What it does: + +* Assembles full OEMetadata from split YAML. +* Finds the resource where `resource.name == "parameter_photovoltaik_openfield145"`. +* Builds a per-table OEMetadata with: + + * dataset-level attributes, + * only that resource. +* Sends it to `/api/v0/tables/parameter_photovoltaik_openfield145/meta/`. + +You can again choose PUT: + +```bash +omi push-oep-one \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --table parameter_photovoltaik_openfield145 \ + --token YOUR_OEP_TOKEN \ + --method PUT +``` + +--- + +## 4. Minimal workflow examples + +### A. Start from an OEP table, edit locally, push back + +1. **Import OEP table metadata into local layout** + + ```bash + omi oep-resource ./metadata pv_bundle parameter_photovoltaik_openfield145 \ + ``` + +2. **Edit YAMLs** + + * Edit `datasets/pv_bundle.dataset.yaml`. + * Edit `resources/pv_bundle/parameter_photovoltaik_openfield145.resource.yaml`. + +3. **Push back just that table** + + ```bash + omi push-oep-one \ + --base-dir ./metadata \ + --dataset-id pv_bundle \ + --table parameter_photovoltaik_openfield145 \ + --token YOUR_OEP_TOKEN \ + ``` + +--- + +### B. Manage a dataset with many tables + +1. Create/maintain YAMLs for all resources under `resources/my_dataset/`. +2. When ready, push all metadata to OEP: + + ```bash + omi push-oep-all \ + --base-dir ./metadata \ + --dataset-id my_dataset \ + --token YOUR_OEP_TOKEN \ + ``` + +That’s it – this should be enough to drive everything from the command line without digging into the code. diff --git a/pyproject.toml b/pyproject.toml index 585b49f..4e1ace7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,5 +79,5 @@ unfixable = ["UP007", "I001"] "D104", # Missing docstring in public package ] -[omi.scripts] +[tool.poetry.scripts] omi = "omi.cli:main" diff --git a/src/omi/api/__init__.py b/src/omi/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/omi/api/oep.py b/src/omi/api/oep.py new file mode 100644 index 0000000..2a06c6d --- /dev/null +++ b/src/omi/api/oep.py @@ -0,0 +1,294 @@ +"""Helpers for importing and pushing OEMetadata to the Open Energy Platform (OEP).""" + +from __future__ import annotations + +from copy import deepcopy +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional, Union + +from omi.base import ( + MetadataError, + get_metadata_from_oep_table, + get_metadata_version, + update_metadata_for_oep_table, +) +from omi.creation.init import ( + add_resource_from_oem_metadata, + init_dataset, +) + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def import_oep_table_as_resource( + base_dir: Union[str, Path], + dataset_id: str, + oep_table: str, + *, + create_dataset_if_missing: bool = True, + overwrite_resource: bool = False, +) -> Path: + """ + Import the OEMetadata of an OEP table and attach it as a resource to a local OMI dataset. + + Behavior + -------- + - Fetches metadata from the OEP (GET /api/v0/tables/{table}/meta/). + - Ignores the top-level dataset in the returned JSON. + - Converts exactly one resource from ``resources[0]`` into a + ``resources//.resource.yaml`` file. + - If ``datasets/.dataset.yaml`` does not exist and + ``create_dataset_if_missing=True``, a new dataset skeleton is created + from the OEMetadata specification (with ``name=dataset_id``, etc.). + + Parameters + ---------- + base_dir : + Base directory for the split-files layout. + dataset_id : + Local dataset name in OMI (e.g. "pv_openfield145"). + oep_table : + Table name on the OEP (e.g. "parameter_photovoltaik_openfield145"). + create_dataset_if_missing : + Whether to create a dataset skeleton if it does not yet exist. + overwrite_resource : + Whether to overwrite an existing resource YAML with the same name. + + Returns + ------- + Path + Path to the created resource YAML file. + """ + base_dir = Path(base_dir) + dataset_yaml = base_dir / "datasets" / f"{dataset_id}.dataset.yaml" + + # 1) Fetch OEMetadata from OEP (raises MetadataError if empty) + oem = get_metadata_from_oep_table(oep_table) + + # 2) Read OEMetadata version from metaMetadata (e.g. OEMetadata-2.0.4 -> OEMetadata-2.0) + try: + oem_version = get_metadata_version(oem) + except MetadataError: + oem_version = "OEMetadata-2.0" + + # 3) Create dataset skeleton if requested and not yet present + if not dataset_yaml.exists() and create_dataset_if_missing: + init_dataset( + base_dir=base_dir, + dataset_id=dataset_id, + oem_version=oem_version, + resources=(), + overwrite=False, + ) + # Important: we do NOT call _update_dataset_yaml_from_top_level() + # so the OEP top-level dataset fields remain ignored. + + # 4) Derive a resource from the OEMetadata and store it as .resource.yaml + res_path = add_resource_from_oem_metadata( + base_dir=base_dir, + dataset_id=dataset_id, + oem=oem, + resource_index=0, + resource_name=None, # or explicitly e.g. oep_table + overwrite=overwrite_resource, + fill_missing_from_template=True, + ) + + print( # noqa: T201 + f"Imported OEP table '{oep_table}' as resource into dataset '{dataset_id}': {res_path}", + ) + return res_path + + +def _metadata_for_single_resource(metadata: dict, resource_index: int) -> dict: + """ + Return a new OEMetadata dict that contains exactly one resource (resources[resource_index]). + + but keeps all top-level dataset attributes. + + Parameters + ---------- + metadata : + Full OEMetadata mapping (dataset + multiple resources). + resource_index : + Index into ``metadata["resources"]``. + + Returns + ------- + dict + A new OEMetadata mapping with all top-level keys preserved and + exactly one entry in ``resources``. + + Raises + ------ + MetadataError + If no valid 'resources' list is present. + IndexError + If the resource_index is out of range. + """ + resources = metadata.get("resources") + if not isinstance(resources, list) or not resources: + msg = "Metadata must contain a non-empty 'resources' list." + raise MetadataError(msg) + + if resource_index < 0 or resource_index >= len(resources): + raise IndexError( + f"Resource index {resource_index} out of range for metadata.resources (len={len(resources)}).", + ) + + # Copy all top-level keys except 'resources' + base: dict[str, Any] = {k: deepcopy(v) for k, v in metadata.items() if k != "resources"} + + # Attach only the selected resource + base["resources"] = [deepcopy(resources[resource_index])] + return base + + +def update_oep_tables_from_dataset_metadata( + metadata: dict, + *, + token: str, + method: str = "POST", + timeout: int = 90, + only_tables: Optional[Iterable[str]] = None, +) -> dict[str, dict]: + """ + Update OEP table metadata for all resources in a dataset-level OEMetadata dict. + + For each resource in ``metadata["resources"]``: + + - A per-table OEMetadata dict is constructed that: + * keeps all dataset-level (top-level) attributes, and + * contains only that single resource in ``resources``. + - The OEP table name is taken from ``resource["name"]``. + - The per-table metadata is sent to the OEP meta API using the + ``update_metadata_for_oep_table`` helper. + + Parameters + ---------- + metadata : + Full OEMetadata mapping (dataset attributes + multiple resources). + token : + OEP user API token for authentication (raw token string; the + ``Authorization: Token `` header is constructed internally). + method : + HTTP method to use for the OEP meta API ("POST" or "PUT"). + timeout : + Request timeout in seconds. + only_tables : + Optional iterable of table names to restrict updates to. If provided, + only resources whose ``name`` is in this set are updated. + + Returns + ------- + Dict[str, dict] + Mapping from OEP table name to the parsed JSON response returned + by the OEP meta API for that table. + + Raises + ------ + MetadataError + If 'resources' is missing/invalid, or if a resource lacks a name. + """ + resources = metadata.get("resources") + if not isinstance(resources, list) or not resources: + msg = "Metadata must contain a non-empty 'resources' list." + raise MetadataError(msg) + + restrict = set(only_tables) if only_tables is not None else None + results: dict[str, dict] = {} + + for idx, res in enumerate(resources): + if not isinstance(res, dict): + raise MetadataError(f"Resource at index {idx} is not a mapping.") + + table_name = (res.get("name") or "").strip() + if not table_name: + raise MetadataError(f"Resource at index {idx} is missing a 'name' field.") + + if restrict is not None and table_name not in restrict: + continue # skip this resource if it's not in the filter + + per_table_md = _metadata_for_single_resource(metadata, idx) + + resp = update_metadata_for_oep_table( + oep_table=table_name, + metadata=per_table_md, + token=token, + method=method, + timeout=timeout, + ) + results[table_name] = resp + + return results + + +def update_single_oep_table_from_dataset_metadata( + metadata: dict, + oep_table: str, + *, + token: str, + method: str = "POST", + timeout: int = 90, +) -> dict: + """ + Update the metadata for a single OEP table from a dataset-level OEMetadata dict. + + The table name is matched against the ``name`` field of the resources in + ``metadata["resources"]``. The payload sent to the OEP meta API contains: + + - all dataset-level attributes from ``metadata``, and + - exactly one resource (the one whose name matches ``oep_table``). + + Parameters + ---------- + metadata : + Full OEMetadata mapping (dataset + multiple resources). + oep_table : + Name of the OEP table to update (matched against resource.name). + token : + OEP user API token for authentication (raw token string; the + ``Authorization: Token `` header is constructed internally). + method : + HTTP method to use for the OEP meta API ("POST" or "PUT"). + timeout : + Request timeout in seconds. + + Returns + ------- + dict + Parsed JSON response from the OEP meta API. + + Raises + ------ + MetadataError + If no resource with the given name is found, or 'resources' is invalid. + """ + resources = metadata.get("resources") + if not isinstance(resources, list) or not resources: + msg = "Metadata must contain a non-empty 'resources' list." + raise MetadataError(msg) + + target_index: Optional[int] = None + for idx, res in enumerate(resources): + if not isinstance(res, dict): + continue + name = (res.get("name") or "").strip() + if name == oep_table: + target_index = idx + break + + if target_index is None: + raise MetadataError( + f"No resource with name '{oep_table}' found in metadata.resources.", + ) + + per_table_md = _metadata_for_single_resource(metadata, target_index) + return update_metadata_for_oep_table( + oep_table=oep_table, + metadata=per_table_md, + token=token, + method=method, + timeout=timeout, + ) diff --git a/src/omi/base.py b/src/omi/base.py index 5b7e4dc..1dbbf92 100644 --- a/src/omi/base.py +++ b/src/omi/base.py @@ -31,7 +31,7 @@ class MetadataSpecification: example: dict | None = None -def get_metadata_from_oep_table(oep_table: str, oep_schema: str = "model_draft") -> dict: +def get_metadata_from_oep_table(oep_table: str) -> dict: """ Get metadata from OEP table. @@ -39,20 +39,18 @@ def get_metadata_from_oep_table(oep_table: str, oep_schema: str = "model_draft") ---------- oep_table: str OEP table name - oep_schema: str - OEP schema name Returns ------- dict Metadata in OEMetadata format """ - response = requests.get(f"{OEP_URL}/api/v0/schema/{oep_schema}/tables/{oep_table}/meta/", timeout=90) + response = requests.get(f"{OEP_URL}/api/v0/tables/{oep_table}/meta/", timeout=90) if response.status_code != requests.codes.ok: - raise MetadataError(f"Could not retrieve metadata from OEP table '{oep_schema}.{oep_table}'.") + raise MetadataError(f"Could not retrieve metadata from OEP table '{oep_table}'.") metadata = response.json() if not metadata: - raise MetadataError(f"Metadata from '{oep_schema}.{oep_table}' is empty.") + raise MetadataError(f"Metadata from '{oep_table}' is empty.") return metadata @@ -175,4 +173,72 @@ def __get_metadata_specs_for_oep(metadata_version: str) -> MetadataSpecification return MetadataSpecification(**specs) +def update_metadata_for_oep_table( + oep_table: str, + metadata: dict, + *, + token: str, + method: str = "POST", + timeout: int = 90, +) -> dict: + """ + Update metadata for an OEP table via the /tables/{table}/meta/ endpoint. + + Parameters + ---------- + oep_table : str + OEP table name. + metadata : dict + OEMetadata dict to send to the API. + token : str + OEP user API token ("Token " style authentication). + method : str + HTTP method to use ("POST" or "PUT"), default "POST". + timeout : int + Request timeout in seconds, default 90. + + Returns + ------- + dict + Parsed JSON response from the server (or {"raw": } if not JSON). + + Raises + ------ + MetadataError + If the request failed (non-2xx status). + ValueError + If an unsupported HTTP method is requested. + """ + url = f"{OEP_URL}/api/v0/tables/{oep_table}/meta/" + + headers = { + "Authorization": f"Token {token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + method = method.upper() + if method == "POST": + response = requests.post(url, headers=headers, json=metadata, timeout=timeout) + elif method == "PUT": + response = requests.put(url, headers=headers, json=metadata, timeout=timeout) + else: + raise ValueError(f"Unsupported HTTP method: {method!r} (use 'POST' or 'PUT').") + + if not response.ok: + msg = f"Could not update metadata for OEP table '{oep_table}'. Status {response.status_code}: {response.text}" + raise MetadataError(msg) + + try: + data = response.json() + except ValueError: + data = {"raw": response.text} + + print( # noqa: T201 + f"Updated metadata for {oep_table}: {response.status_code} {response.reason}", + ) + + return data + + METADATA_SPECIFICATIONS = {"OEP": __get_metadata_specs_for_oep} diff --git a/src/omi/cli.py b/src/omi/cli.py index fbfb722..8335a3f 100644 --- a/src/omi/cli.py +++ b/src/omi/cli.py @@ -23,9 +23,23 @@ import click +from omi.api.oep import ( + import_oep_table_as_resource, + update_oep_tables_from_dataset_metadata, + update_single_oep_table_from_dataset_metadata, +) +from omi.creation.assembler import assemble_metadata_dict from omi.creation.creator import OEMetadataCreator -from omi.creation.init import init_dataset, init_resources_from_files -from omi.creation.utils import apply_template_to_resources, load_parts +from omi.creation.init import ( + init_dataset, + init_from_oem_json, + init_resources_from_files, +) +from omi.creation.utils import ( + DEFAULT_CONCAT_LIST_KEYS, + apply_template_to_resources, + load_parts, +) @click.group() @@ -53,17 +67,208 @@ def grp() -> None: type=click.Path(dir_okay=False, path_type=Path), help="Optional metadata index YAML for explicit mapping.", ) -def assemble_cmd(base_dir: Path, dataset_id: str, output_file: Path, index_file: Optional[Path]) -> None: +@click.option( + "--concat-list-key", + "concat_list_keys", + multiple=True, + help=( + "List-valued keys to concatenate (template+resource) instead of overriding. " + "Defaults to: keywords, topics, languages." + ), +) +def assemble_cmd( + base_dir: Path, + dataset_id: str, + output_file: Path, + index_file: Optional[Path], + concat_list_keys: tuple[str, ...], +) -> None: """Assemble OEMetadata from split YAML files and write JSON to OUTPUT_FILE.""" # Load pieces version, dataset, resources, template = load_parts(base_dir, dataset_id, index_file=index_file) - merged_resources = apply_template_to_resources(resources, template) + + # Choose which list keys should be concatenated + keys = set(concat_list_keys) if concat_list_keys else DEFAULT_CONCAT_LIST_KEYS + + merged_resources = apply_template_to_resources( + resources, + template, + concat_list_keys=keys, + ) # Build & save with the correct spec version creator = OEMetadataCreator(oem_version=version) creator.save(dataset, merged_resources, output_file, ensure_ascii=False, indent=2) +@grp.command("push-oep-all") +@click.option( + "--base-dir", + required=True, + type=click.Path(file_okay=False, path_type=Path), + help="Root directory containing 'datasets/' and 'resources/'.", +) +@click.option( + "--dataset-id", + required=True, + help="Logical dataset id (e.g. 'pv_bundle').", +) +@click.option( + "--token", + required=True, + help=( + "OEP user API token (raw token string). The 'Authorization: Token ' header is constructed internally." + ), +) +@click.option( + "--method", + default="POST", + show_default=True, + type=click.Choice(["POST", "PUT"], case_sensitive=False), + help="HTTP method to use for the OEP meta API.", +) +@click.option( + "--timeout", + default=90, + show_default=True, + type=int, + help="Request timeout in seconds.", +) +@click.option( + "--only-table", + "only_tables", + multiple=True, + help=( + "Restrict updates to specific table names (can be given multiple times). " + "If omitted, all resources in the dataset are used." + ), +) +@click.option( + "--index-file", + default=None, + type=click.Path(dir_okay=False, path_type=Path), + help=("Optional metadata index YAML for resolving dataset parts, same semantics as in the 'assemble' command."), +) +def push_oep_all_cmd( # noqa: PLR0913 + base_dir: Path, + dataset_id: str, + token: str, + method: str, + timeout: int, + only_tables: tuple[str, ...], + index_file: Optional[Path], +) -> None: + """Push OEMetadata for all (or selected) tables of a dataset to the OEP.""" + # 1) Assemble full dataset OEMetadata from split YAML + md = assemble_metadata_dict( + base_dir=base_dir, + dataset_id=dataset_id, + index_file=index_file, + ) + + # 2) Bundle optional call arguments to keep PLR0913 happy + call_opts: dict[str, object] = { + "method": method.upper(), + "timeout": timeout, + } + if only_tables: + call_opts["only_tables"] = only_tables + + # 3) Push per-table metadata to OEP + results = update_oep_tables_from_dataset_metadata( + metadata=md, + token=token, + **call_opts, + ) + + # 4) Print a short summary + if not results: + click.echo("No tables were updated (no matching resources or filter excluded all).") + else: + click.echo("Updated metadata for the following OEP tables:") + for table_name in sorted(results.keys()): + click.echo(f" - {table_name}") + + +@grp.command("push-oep") +@click.option( + "--base-dir", + required=True, + type=click.Path(file_okay=False, path_type=Path), + help="Root directory containing 'datasets/' and 'resources/'.", +) +@click.option( + "--dataset-id", + required=True, + help="Logical dataset id (e.g. 'pv_bundle').", +) +@click.option( + "--table", + "oep_table", + required=True, + help="Name of the OEP table to update (must match resource.name in the metadata).", +) +@click.option( + "--token", + required=True, + help=( + "OEP user API token (raw token string). The 'Authorization: Token ' header is constructed internally." + ), +) +@click.option( + "--method", + default="POST", + show_default=True, + type=click.Choice(["POST", "PUT"], case_sensitive=False), + help="HTTP method to use for the OEP meta API.", +) +@click.option( + "--timeout", + default=90, + show_default=True, + type=int, + help="Request timeout in seconds.", +) +@click.option( + "--index-file", + default=None, + type=click.Path(dir_okay=False, path_type=Path), + help="Optional metadata index YAML for resolving dataset parts.", +) +def push_oep_one_cmd( # noqa: PLR0913 + base_dir: Path, + dataset_id: str, + oep_table: str, + token: str, + method: str, + timeout: int, + index_file: Optional[Path], +) -> None: + """Push OEMetadata for a single OEP table, based on a dataset-level OEMetadata dict.""" + # 1) Assemble full dataset OEMetadata from split YAML + md = assemble_metadata_dict( + base_dir=base_dir, + dataset_id=dataset_id, + index_file=index_file, + ) + + # 2) Bundle options to avoid PLR0913 on the call + call_opts: dict[str, object] = { + "method": method.upper(), + "timeout": timeout, + } + + # 3) Push just this one table's metadata + update_single_oep_table_from_dataset_metadata( + metadata=md, + oep_table=oep_table, + token=token, + **call_opts, + ) + + click.echo(f"Updated metadata for {oep_table}") + + @click.group() def init() -> None: """Scaffold OEMetadata split-files layout.""" @@ -111,6 +316,99 @@ def init_resources_cmd( click.echo(p) +@init.command("from-json") +@click.argument("base_dir", type=click.Path(file_okay=False, path_type=Path)) +@click.argument("dataset_id") +@click.argument("oem_json", type=click.Path(exists=True, dir_okay=False, path_type=Path)) +@click.option("--oem-version", default="OEMetadata-2.0", show_default=True) +@click.option( + "--collect-common", + is_flag=True, + help=( + "Collect fields that are identical across all resources " + "into the dataset template (e.g. context/spatial/temporal)." + ), +) +def init_from_json_cmd( + base_dir: Path, + dataset_id: str, + oem_json: Path, + oem_version: str, + *, + collect_common: bool, +) -> None: + """ + Initialize split-files layout from an existing OEMetadata JSON file. + + BASE_DIR: Root directory containing 'datasets/' and 'resources/'. + DATASET_ID: Logical dataset id (e.g. 'sle'). + OEM_JSON: Path to an OEMetadata JSON file with one or more resources. + """ + res = init_from_oem_json( + base_dir=base_dir, + dataset_id=dataset_id, + oem_json_path=oem_json, + oem_version=oem_version, + collect_common=collect_common, + ) + + click.echo(f"dataset: {res.dataset_yaml}") + click.echo(f"template: {res.template_yaml}") + for p in res.resource_yamls: + click.echo(f"resource: {p}") + + +@init.command("oep-resource") +@click.argument("base_dir", type=click.Path(file_okay=False, path_type=Path)) +@click.argument("dataset_id") +@click.argument("oep_table") +@click.option( + "--no-create-dataset", + is_flag=True, + help=( + "Do not create a dataset skeleton if it is missing. " + "If set and the dataset does not exist, the command will fail." + ), +) +@click.option( + "--overwrite-resource", + is_flag=True, + help="Overwrite an existing resource YAML with the same name.", +) +def init_oep_resource_cmd( + base_dir: Path, + dataset_id: str, + oep_table: str, + no_create_dataset: bool, # noqa: FBT001 + overwrite_resource: bool, # noqa: FBT001 +) -> None: + """ + Import an OEP table's OEMetadata and add it as a resource to a local dataset. + + BASE_DIR: Root directory containing 'datasets/' and 'resources/'. + DATASET_ID: Local dataset id in the split-files layout. + OEP_TABLE: Name of the table on the Open Energy Platform. + + Notes + ----- + - Fetches OEMetadata from the OEP meta API for the given table. + - Ignores the top-level dataset fields in the OEP JSON (id, name, title, @id, + @context, description, ...). + - Converts only the first entry in ``resources`` into a + ``resources//.resource.yaml`` file. + """ + create_dataset_if_missing = not no_create_dataset + + res_path = import_oep_table_as_resource( + base_dir=base_dir, + dataset_id=dataset_id, + oep_table=oep_table, + create_dataset_if_missing=create_dataset_if_missing, + overwrite_resource=overwrite_resource, + ) + click.echo(f"resource: {res_path}") + + # Keep CommandCollection for backwards compatibility with your entry point cli = click.CommandCollection(sources=[grp, init]) diff --git a/src/omi/creation/assembler.py b/src/omi/creation/assembler.py index edaa431..4a332e7 100644 --- a/src/omi/creation/assembler.py +++ b/src/omi/creation/assembler.py @@ -7,6 +7,7 @@ from .creator import OEMetadataCreator from .utils import ( + DEFAULT_CONCAT_LIST_KEYS, apply_template_to_resources, discover_dataset_ids, discover_dataset_ids_from_index, @@ -21,6 +22,8 @@ def assemble_metadata_dict( base_dir: Union[str, Path], dataset_id: str, index_file: Optional[Union[str, Path]] = None, + *, + concat_list_keys: Optional[Iterable[str]] = None, ) -> dict[str, Any]: """ Load dataset/template/resources; apply template; validate via creator; return dict. @@ -33,6 +36,10 @@ def assemble_metadata_dict( Identifier for the dataset to load. index_file: Optional[Union[str, Path]] Optional path to an index YAML file for resolving dataset parts. + concat_list_keys: Optional[Iterable[str]] + List-valued keys for which template + resource values should be concatenated + (deduplicated). If None, uses DEFAULT_CONCAT_LIST_KEYS + (e.g. {"keywords", "topics", "languages"}). Returns ------- @@ -40,7 +47,15 @@ def assemble_metadata_dict( The assembled and validated OEMetadata dictionary. """ version, dataset, resources, template = load_parts(base_dir, dataset_id, index_file) - merged_resources = apply_template_to_resources(resources, template) + + keys = set(concat_list_keys) if concat_list_keys is not None else DEFAULT_CONCAT_LIST_KEYS + + merged_resources = apply_template_to_resources( + resources, + template, + concat_list_keys=keys, + ) + creator = OEMetadataCreator(oem_version=version) return creator.generate_metadata(dataset, merged_resources) @@ -51,6 +66,7 @@ def assemble_many_metadata( index_file: Optional[Union[str, Path]] = None, *, as_dict: bool = True, + concat_list_keys: Optional[Iterable[str]] = None, ) -> Union[dict[str, dict], list[tuple[str, dict]]]: """ Assemble OEMetadata for multiple datasets in one call. @@ -73,6 +89,8 @@ def assemble_many_metadata( as_dict : bool, optional Whether to return results as a dict mapping dataset_id to metadata. If False, returns a list of (dataset_id, metadata) tuples, by default True. + concat_list_keys: Optional[Iterable[str]] + Forwarded to assemble_metadata_dict (see there for semantics). Returns ------- @@ -88,7 +106,12 @@ def assemble_many_metadata( results_pairs: list[tuple[str, dict]] = [] for ds_id in ids: - md = assemble_metadata_dict(base, ds_id, index_file=index_file) + md = assemble_metadata_dict( + base, + ds_id, + index_file=index_file, + concat_list_keys=concat_list_keys, + ) results_pairs.append((ds_id, md)) if as_dict: diff --git a/src/omi/creation/builder.py b/src/omi/creation/builder.py new file mode 100644 index 0000000..39417e1 --- /dev/null +++ b/src/omi/creation/builder.py @@ -0,0 +1,783 @@ +""" +Programmatic extensions for assembled OEMetadata dictionaries. + +This module exposes a lightweight, schema-agnostic builder that lets code +augment an already assembled OEMetadata mapping without touching the YAML +authoring files. Typical pipeline use cases: + +- inject runtime values (e.g., publicationDate), +- append contributors (dataset- or resource-level) with de-duplication, +- add or refine field descriptions, +- merge dicts/lists at JSONPointer-like paths, +- schema-driven hygiene: ensure required keys, prune empty values. + +The builder mutates an internal deep copy of the input; call ``build()`` to +retrieve the final dict (optionally validated). +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Mapping +from copy import deepcopy +from typing import Literal, Optional + +from omi.base import get_metadata_specification +from omi.creation.cleaner import ( + detect_unknown_keys, + lint_metadata_against_schema, + normalize_metadata_for_schema, + strip_unknown_keys, +) +from omi.validation import validate_metadata + +Json = dict[str, object] + +# ----------------------------------------------------------------------------- +# Policy types +# ----------------------------------------------------------------------------- + +CreatePolicy = Literal["never", "dicts"] +OverwritePolicy = Literal["always", "if_absent"] +ListStrategy = Literal["concat", "replace", "dedupe"] +ValidatePolicy = Literal["validate", "skip"] +LicensePolicy = Literal["check", "skip"] + + +# ----------------------------------------------------------------------------- +# Internal: JSON-pointer resolution +# ----------------------------------------------------------------------------- + + +def _resolve_pointer( + root: object, + pointer: str, + *, + create: bool | None = None, +) -> tuple[object | None, str]: + """ + Resolve a JSONPointer-like path and return (parent, final_key). + + Supports simple slash-separated paths into dicts/lists (e.g., ``/a/b/0``). + + Parameters + ---------- + root : + Root JSON-like structure (dict/list/scalars). + pointer : + Path to resolve. Use ``/``-separated segments; list indices may be + addressed with integers (e.g., ``/resources/0/schema``). + create : + If True, missing dict segments along the path are created. + + Returns + ------- + tuple[object | None, str] + A pair ``(parent, final_key)`` where ``parent[final_key]`` is the target. + If the pointer refers to the root itself (``""`` or ``"/"``), returns + ``(None, "")``. + + Raises + ------ + KeyError + If a required path component is missing and ``create`` is False. + IndexError + If a list index segment is out of bounds. + TypeError + If traversal encounters a non-container where a dict/list is required. + ValueError + If a list index segment cannot be parsed as an integer. + """ + if pointer == "" or pointer == "/": + return None, "" + + parts = [p for p in pointer.split("/") if p != ""] + cur = root + for part in parts[:-1]: + if isinstance(cur, list): + try: + idx = int(part) + except ValueError as exc: + raise ValueError(f"List index segment must be an integer, got '{part}'.") from exc + if idx < 0 or idx >= len(cur): + raise IndexError(f"List index {idx} out of range at segment '{part}'.") + cur = cur[idx] + elif isinstance(cur, dict): + if part not in cur: + if create: + cur[part] = {} + else: + raise KeyError(f"Missing path component: '{part}' in '{pointer}'.") + cur = cur[part] + else: + raise TypeError(f"Cannot traverse into {type(cur).__name__} at segment '{part}'.") + return cur, parts[-1] + + +def _ensure_dict_target( + parent: object, + key: str, + path: str, + *, + create_flag: bool, +) -> dict[str, object]: + """ + Ensure the value at parent[key] is a dict, creating it if allowed. + + Returns the dict to be used as merge target. + """ + if isinstance(parent, list): + idx = int(key) + if not isinstance(parent[idx], dict): + if create_flag: + parent[idx] = {} + else: + msg = f"Target at '{path}' is not a dict." + raise TypeError(msg) + return parent[idx] + + if isinstance(parent, dict): + if key not in parent or not isinstance(parent[key], dict): + if create_flag: + parent[key] = {} + else: + msg = f"Target at '{path}' is not a dict." + raise TypeError(msg) + return parent[key] + + msg = f"Target parent at '{path}' is not a list or mapping." + raise TypeError(msg) + + +def _merge_into_target( # noqa: PLR0913 + builder: MetadataBuilder, + target: dict[str, object], + path: str, + key: str, + value: object, + create_policy: CreatePolicy, + overwrite_policy: OverwritePolicy, +) -> None: + """Merge a single key/value pair into target according to policies.""" + if key in target and isinstance(target[key], dict) and isinstance(value, dict): + # delegate nested dict merge back to the builder + builder.merge_dict( + f"{path}/{key}", + value, + create_policy=create_policy, + overwrite_policy=overwrite_policy, + ) + elif overwrite_policy == "always" or key not in target: + target[key] = deepcopy(value) + + +# ----------------------------------------------------------------------------- +# Internal: schema-driven hygiene (ensure required, prune empty) +# ----------------------------------------------------------------------------- + +_EMPTY_SENTINELS: tuple[object, ...] = (None, "", [], {}) + + +def _join_ptr(base: str, prop: str) -> str: + return f"{base}/{prop}" if base else f"/{prop}" + + +def _collect_required_paths(schema: Mapping[str, object], base: str = "") -> set[str]: + """ + Collect JSON-pointer paths for required object properties in a JSON Schema. + + Handles: + - ``type: object`` with ``required`` and ``properties`` + - composition (``anyOf``/``oneOf``/``allOf``): union of branches' required paths + - arrays: walks into ``items`` when present (records wildcard paths like ``/*/prop``) + """ + paths: set[str] = set() + + # composition first: union is a safe over-approximation + for key in ("allOf", "anyOf", "oneOf"): + if key in schema: + for br in schema[key]: + paths.update(_collect_required_paths(br, base)) + return paths + + t = schema.get("type") + + if t == "object": + props = schema.get("properties", {}) + required = schema.get("required", []) + for prop in required: + paths.add(_join_ptr(base, prop)) + for prop, p_schema in props.items(): + paths.update(_collect_required_paths(p_schema, _join_ptr(base, prop))) + + elif t == "array": + items = schema.get("items") + if isinstance(items, Mapping): + paths.update(_collect_required_paths(items, f"{base}/*")) + + return paths + + +def _ensure_required_paths(obj: object, required_paths: set[str]) -> object: # noqa: C901 + """ + Ensure all required paths exist in ``obj`` by creating missing dict keys with None. + + Only operates on object properties (dicts). For required paths that include an + array wildcard (``/*/prop``), we attempt to set the property on each object element + of the array if the array exists. + """ + out = deepcopy(obj) + + # Group leaves by parent pointer: /a/b/c -> parent=/a/b, leaf=c + parents: dict[str, list[str]] = {} + for p in required_paths: + if not p or p == "/": + continue + parts = [x for x in p.split("/") if x] + parent = "/" + "/".join(parts[:-1]) if len(parts) > 1 else "" + leaf = parts[-1] + parents.setdefault(parent, []).append(leaf) + + def _walk_and_set(parent_ptr: str, leaves: list[str]) -> None: # noqa: C901 + # wildcard support + if "/*/" in parent_ptr or parent_ptr.endswith("/*"): + parts = [x for x in parent_ptr.split("/") if x] + + def _recur(curr: object, idx: int) -> None: + if idx >= len(parts): + if isinstance(curr, dict): + for leaf in leaves: + curr.setdefault(leaf, None) + return + part = parts[idx] + if part == "*": + if isinstance(curr, list): + for e in curr: + _recur(e, idx + 1) + elif isinstance(curr, dict) and part in curr: + _recur(curr[part], idx + 1) + + _recur(out, 0) + return + + # normal object parent + if parent_ptr == "": + parent = out + else: + parent = out + for seg in [x for x in parent_ptr.split("/") if x]: + if not isinstance(parent, dict): + return + parent = parent.setdefault(seg, {}) + if isinstance(parent, dict): + for leaf in leaves: + parent.setdefault(leaf, None) + + for parent_ptr, leaves in parents.items(): + _walk_and_set(parent_ptr, leaves) + + return out + + +def _prune_empty(obj: object, required_paths: set[str], base: str = "") -> object: + """Remove keys whose values are empty (None, '', [], {}) unless required.""" + + def _is_required(path: str) -> bool: + if path in required_paths: + return True + # keep parents of required children + return any(rp.startswith(path + "/") for rp in required_paths) + + if isinstance(obj, dict): + result: dict[str, object] = {} + for k, v in obj.items(): + p = f"{base}/{k}" + cleaned = _prune_empty(v, required_paths, p) + if cleaned in _EMPTY_SENTINELS and not _is_required(p): + continue + result[k] = cleaned + return result + + if isinstance(obj, list): + return [_prune_empty(v, required_paths, f"{base}/*") for v in obj] + + return obj + + +# ----------------------------------------------------------------------------- +# Resource scope +# ----------------------------------------------------------------------------- + + +class _ResourceScope: + """Fluent resource-scoped view that prefixes all paths with /resources/{i}.""" + + def __init__(self, parent: MetadataBuilder, index: int) -> None: + self._p = parent + self._idx = index + self._base = f"/resources/{index}" + + # -- relative path helpers -- + + def _rel(self, rel_path: str) -> str: + rel = rel_path[1:] if rel_path.startswith("/") else rel_path + return f"{self._base}/{rel}" if rel else self._base + + # -- public API within resource -- + + def set(self, rel_path: str, value: object, **kw) -> _ResourceScope: # noqa: A003 + """Set a value relative to this resource (e.g. ``'context'`` or ``'/context'``).""" + self._p.set_path(self._rel(rel_path), value, **kw) + return self + + def merge_dict(self, rel_path: str, mapping: dict[str, object], **kw) -> _ResourceScope: + """Deep-merge a mapping into a dict relative to this resource.""" + self._p.merge_dict(self._rel(rel_path), mapping, **kw) + return self + + def merge_list(self, rel_path: str, values: Iterable[object], **kw) -> _ResourceScope: + """Merge a list relative to this resource using a strategy (replace/concat/dedupe).""" + self._p.merge_list(self._rel(rel_path), values, **kw) + return self + + def append_contributor(self, contributor: dict[str, object], *, dedupe_on: str = "title") -> _ResourceScope: + """Append a contributor to this resource's ``contributors`` (de-duplicated).""" + + def _key(x: object) -> object: + return x.get(dedupe_on) if isinstance(x, dict) else repr(x) + + self.merge_list("contributors", [contributor], strategy="dedupe", key=_key) + return self + + def set_field_descriptions( + self, + descriptions: dict[str, str], + *, + default_nullable: bool | None = None, + ) -> _ResourceScope: + """Set schema field descriptions (and optionally default ``nullable``) on this resource.""" + self._p.set_field_descriptions_for_index( + self._idx, + descriptions, + default_nullable=default_nullable, + ) + return self + + def done(self) -> MetadataBuilder: + """Return to the root builder.""" + return self._p + + +# ----------------------------------------------------------------------------- +# Builder +# ----------------------------------------------------------------------------- + + +class MetadataBuilder: + """ + Lightweight, schema-agnostic builder to mutate assembled OEMetadata dicts. + + Features + -------- + - Path-based set/merge operations (dicts and lists) using explicit policies. + - Resource scoping: ``.resource('name').append_contributor({...}).done()``. + - Convenience helpers (contributors, field descriptions). + - Schema-driven hygiene: ``ensure_required`` and ``prune_empty``. + - Optional validation via :func:`omi.validation.validate_metadata` in ``build()``. + + Notes + ----- + The builder keeps a deep copy of the input mapping; original metadata is not + modified. Methods return ``self`` for fluent chaining. + """ + + def __init__(self, metadata: Json, oem_version: Optional[str] = None) -> None: + """ + Initialize with an OEMetadata mapping. + + Parameters + ---------- + metadata : + Already-assembled OEMetadata dictionary. + oem_version : + If provided, the corresponding specification is loaded (kept for + potential future helpers; not required for core operations). + """ + self._md: Json = deepcopy(metadata) + self._oem_version = oem_version + self._oem_spec = get_metadata_specification(oem_version) if oem_version else None + self._concat_list_keys: set[str] = {"keywords", "topics", "languages"} + + # ---------- Low-level path operations ---------- + + def set_path( + self, + path: str, + value: object, + *, + create_policy: CreatePolicy = "dicts", + # backward-compat shim (deprecated): if someone passes create=..., map it + **deprecated_bool: object, + ) -> MetadataBuilder: + """ + Set a value at *path* (JSONPointer-like), per the create policy. + + Parameters + ---------- + path : + Slash-separated navigation path (e.g., ``/resources/0/title``). + value : + Value to assign at the target location. + create_policy : + - ``"dicts"``: create missing dict segments, + - ``"never"``: do not create, raise if missing. + Default is ``"dicts"``. + deprecated_bool : + Backwards-compat shim, accepting legacy boolean keyword arguments + such as ``create=...``. Prefer using ``create_policy`` instead. + """ + if "create" in deprecated_bool: # type: ignore[truthy-bool] + create_policy = "dicts" if bool(deprecated_bool["create"]) else "never" + + create_flag = create_policy == "dicts" + parent, key = _resolve_pointer(self._md, path, create=create_flag) + if parent is None: + if key in ("", "/"): + self._md = value # replace root + else: + msg = "Invalid root replacement request." + raise ValueError(msg) + return self + + if isinstance(parent, list): + idx = int(key) + parent[idx] = value + elif isinstance(parent, dict): + parent[key] = value + else: + raise TypeError(f"Target parent at '{path}' is not indexable/mapping.") + return self + + def merge_dict( + self, + path: str, + mapping: dict[str, object], + *, + create_policy: CreatePolicy = "dicts", + overwrite_policy: OverwritePolicy = "always", + **deprecated_bool: object, + ) -> MetadataBuilder: + """ + Deep-merge a mapping into the dict found at *path*. + + Nested dicts are merged recursively. Non-dict values follow the policy: + - ``overwrite_policy="always"`` replaces existing values. + - ``overwrite_policy="if_absent"`` only sets when the key is missing. + + ``create_policy`` controls whether a missing dict is created at the target. + """ + if "create" in deprecated_bool: + create_policy = "dicts" if bool(deprecated_bool["create"]) else "never" + if "overwrite" in deprecated_bool: + overwrite_policy = "always" if bool(deprecated_bool["overwrite"]) else "if_absent" + + create_flag = create_policy == "dicts" + parent, key = _resolve_pointer(self._md, path, create=create_flag) + if parent is None: + msg = "Cannot merge into root; use set_path('/') if you truly need root replacement." + raise ValueError(msg) + + target = _ensure_dict_target(parent, key, path, create_flag=create_flag) + + for k, v in mapping.items(): + _merge_into_target( + self, + target, + path, + k, + v, + create_policy=create_policy, + overwrite_policy=overwrite_policy, + ) + + return self + + def merge_list( + self, + path: str, + values: Iterable[object], + *, + strategy: ListStrategy = "concat", + key: Optional[Callable[[object], object]] = None, + ) -> MetadataBuilder: + """ + Merge a list at *path* with the provided *values* using a strategy. + + Strategies + ---------- + - ``"replace"``: replace existing list with ``values``. + - ``"concat"``: append ``values`` to existing list (creating if absent). + - ``"dedupe"``: concat then drop duplicates, using ``key(item)`` or ``repr(item)``. + """ + parent, k = _resolve_pointer(self._md, path, create=True) + base: object + if isinstance(parent, list): + idx = int(k) + base = parent[idx] + elif isinstance(parent, dict): + base = parent.get(k) + else: + raise TypeError(f"List parent at '{path}' must be list or mapping.") + + if base is None or not isinstance(base, list) or strategy == "replace": + new_list = list(values) + elif strategy == "concat": + new_list = list(base) + list(values) + elif strategy == "dedupe": + seen: set[object] = set() + out: list[object] = [] + for item in list(base) + list(values): + ident = key(item) if key else repr(item) + if ident not in seen: + seen.add(ident) + out.append(item) + new_list = out + else: + raise ValueError(f"Unknown strategy: {strategy!r}") + + if isinstance(parent, list): + parent[int(k)] = new_list + else: + parent[k] = new_list + return self + + def set_field_descriptions_for_index( + self, + index: int, + descriptions: dict[str, str], + *, + default_nullable: bool | None = None, + ) -> None: + """Set schema field descriptions for a resource by index.""" + resources = self._md.get("resources", []) + if not isinstance(resources, list): + return + if index < 0 or index >= len(resources): + raise IndexError(f"Resource index {index} out of range.") + + res = resources[index] + if not isinstance(res, dict): + return + + schema = res.get("schema", {}) + if not isinstance(schema, dict): + return + + fields = schema.get("fields", []) + if not isinstance(fields, list): + return + + for f in fields: + if not isinstance(f, dict): + continue + fname = f.get("name") + if isinstance(fname, str) and fname in descriptions: + f["description"] = descriptions[fname] + if default_nullable is not None and "nullable" not in f: + f["nullable"] = default_nullable + + # ---------- High-level convenience (dataset-level) ---------- + + def set_publication_date(self, date_iso: str) -> MetadataBuilder: + """Set top-level ``publicationDate`` to an ISO-8601 string.""" + return self.set_path("/publicationDate", date_iso) + + def append_contributor_dataset( + self, + contributor: dict[str, object], + *, + dedupe_on: str = "title", + ) -> MetadataBuilder: + """Append a contributor to dataset-level ``/contributors`` (de-duplicated).""" + + def _ident(x: object) -> object: + return x.get(dedupe_on) if isinstance(x, dict) else repr(x) + + return self.merge_list("/contributors", [contributor], strategy="dedupe", key=_ident) + + def append_contributor( + self, + _contributor: dict[str, object], + *, + _dedupe_on: str = "title", + ) -> MetadataBuilder: + """ + (Guarded) Append a contributor at the **dataset level**. + + This method now deliberately **raises** to prevent accidental placement of + contributors on the dataset if you intended to target a resource. + + Use one of: + - ``append_contributor_dataset(...)`` for dataset-level on purpose, or + - ``.resource('name').append_contributor(...)`` for resource-level. + """ + msg = ( + "append_contributor() at the root is ambiguous. Use " + "append_contributor_dataset(...) for dataset-level or " + "ResourceScope.append_contributor(...) within .resource(...)." + ) + raise RuntimeError( + msg, + ) + + def set_resource_field_descriptions( + self, + resource_name: str, + descriptions: dict[str, str], + *, + default_nullable: Optional[bool] = None, + ) -> MetadataBuilder: + """ + Set descriptions (and optional default ``nullable``) for schema fields. + + Parameters + ---------- + resource_name : + Name of the target resource (its ``name`` property). + descriptions : + Mapping field-name → description string. + default_nullable : + If provided, set ``nullable`` when the key is missing (never overwrites). + """ + resources = self._md.get("resources", []) + if not isinstance(resources, list): + return self + + for res in resources: + if not isinstance(res, dict) or res.get("name") != resource_name: + continue + schema = res.get("schema", {}) + if not isinstance(schema, dict): + continue + fields = schema.get("fields", []) + if not isinstance(fields, list): + continue + + for f in fields: + if not isinstance(f, dict): + continue + fname = f.get("name") + if isinstance(fname, str) and fname in descriptions: + f["description"] = descriptions[fname] + if default_nullable is not None and "nullable" not in f: + f["nullable"] = default_nullable + return self + + def add_keywords(self, keywords: list[str]) -> MetadataBuilder: + """Add dataset-level keywords with de-duplication.""" + return self.merge_list("/keywords", keywords, strategy="dedupe") + + def ensure_template_defaults(self) -> MetadataBuilder: + """(Reserved) Apply template-like defaults using the loaded spec, if any.""" + return self + + # ---------- Hygiene (schema-driven) ---------- + + def ensure_required(self, *, oem_schema: dict) -> MetadataBuilder: + """Ensure all required properties exist (with ``None``) according to the schema.""" + req = _collect_required_paths(oem_schema) + self._md = _ensure_required_paths(self._md, req) + return self + + def prune_empty(self, *, oem_schema: dict) -> MetadataBuilder: + """Remove empty (None/''/[]/{}) properties unless required by the schema.""" + req = _collect_required_paths(oem_schema) + self._md = _prune_empty(self._md, req) + return self + + def lint(self) -> list[str]: + """Run non-destructive lint checks against the current metadata snapshot.""" + return lint_metadata_against_schema(self._md) # type: ignore[arg-type] + + def normalize(self, **opts) -> MetadataBuilder: + """ + Normalize the in-memory metadata to better match the v2 schema. + + Options are forwarded to `normalize_metadata_for_schema(...)`. + """ + self._md = normalize_metadata_for_schema(self._md, **opts) # type: ignore[arg-type] + return self + + def strip_unknown(self, *, oem_schema: dict) -> MetadataBuilder: + """Strip keys not allowed by the given OEMetadata JSON Schema.""" + self._md = strip_unknown_keys(self._md, oem_schema=oem_schema) # type: ignore[arg-type] + return self + + def unknown_keys(self, *, oem_schema: dict) -> list[str]: + """List JSON-Pointer paths to keys not allowed by the given schema.""" + return detect_unknown_keys(self._md, oem_schema=oem_schema) # type: ignore[arg-type] + + # ---------- Finalize ---------- + + def build( + self, + *, + validate_policy: ValidatePolicy = "validate", + license_policy: LicensePolicy = "skip", + # backward-compat shims: + **deprecated_bool: object, + ) -> Json: + """ + Return the final OEMetadata dict, optionally validated. + + Parameters + ---------- + validate_policy : + Run schema validation (``"validate"``) or skip (``"skip"``). Default ``"validate"``. + license_policy : + Check license compliance (``"check"``) or skip (``"skip"``). Default ``"skip"``. + deprecated_bool : + Backwards-compat shim for legacy boolean keyword arguments such as + ``validate=...`` or ``check_license=...``. Prefer the explicit policy + enums instead + + Returns + ------- + dict[str, object] + Deep-copied metadata dictionary (safe for caller mutation). + """ + if "validate" in deprecated_bool: + validate_policy = "validate" if bool(deprecated_bool["validate"]) else "skip" + if "check_license" in deprecated_bool: + license_policy = "check" if bool(deprecated_bool["check_license"]) else "skip" + + out = deepcopy(self._md) + if validate_policy == "validate": + validate_metadata(out, check_license=(license_policy == "check")) + return out + + # ---------- Resource selection ---------- + + def resource(self, selector: int | str) -> _ResourceScope: + """ + Return a scoped helper for a specific resource. + + Parameters + ---------- + selector : + - ``int`` index in ``resources``; or + - ``str`` resource.name (must be unique). + """ + if isinstance(selector, int): + idx = selector + else: + matches = [ + i + for i, r in enumerate(self._md.get("resources", [])) + if isinstance(r, dict) and r.get("name") == selector + ] + if not matches: + raise ValueError(f"Resource named '{selector}' not found.") + if len(matches) > 1: + raise ValueError(f"Multiple resources named '{selector}' found; select by index.") + idx = matches[0] + return _ResourceScope(self, idx) diff --git a/src/omi/creation/cleaner.py b/src/omi/creation/cleaner.py new file mode 100644 index 0000000..a5aac38 --- /dev/null +++ b/src/omi/creation/cleaner.py @@ -0,0 +1,420 @@ +""" +Minimal schema hygiene helpers for OEMetadata. + +This module focuses on two practical tasks: + +1. Empty-value handling + - Optionally drop keys whose values are "empty" (None, "", [], {}). + - This can be used at the end of a pipeline to create a compact + metadata product, depending on user choice. + + Behavior: + * keep_empty=True -> keep all empty fields. + * keep_empty=False -> remove empty fields and fully-empty objects, + except for a small set of schema-required + keys that must exist even when empty. + +2. Bounding box normalization + - Ensure each resource's ``spatial.extent.boundingBox`` exists and has + exactly four numeric values (padding with zeros if needed). + - This prevents JSON Schema validation failures due to wrong shape. + +Additionally, for better editing UX, "short" objects appended later +(e.g. contributors with only a few keys) can be normalized to have the +same key set as their siblings when keep_empty=True. +""" + +from __future__ import annotations + +from copy import deepcopy +from typing import Any + +Json = dict[str, Any] + +_EMPTY_SENTINELS = (None, "", [], {}) + +#: Keys that must never be pruned even if their value is "empty". +#: This keeps schema-required properties (e.g. primaryKey: []) intact. +_PROTECTED_EMPTY_KEYS = { + "primaryKey", + # You can add more here if validation errors show up for other fields. +} + +# Magic-number replacements / small schema constants +_FIELD_PATH_MIN_LEN = 3 +_BBOX_COORDINATE_COUNT = 4 + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _is_protected_path(path: str, key: str) -> bool: + """ + Return True if this key at this path must not be pruned, even if empty. + + Rules derived from the OEMetadata JSON Schema: + - schema.fields[*].name and .type are required + - schema.primaryKey is required (handled via _PROTECTED_EMPTY_KEYS) + """ + # Global protected keys (e.g. schema.primaryKey) + if key in _PROTECTED_EMPTY_KEYS: + return True + + # Split path and drop empty segments + segs = [s for s in path.split("/") if s] + + # Protect only the *field object* name/type: + # resources/.../schema/fields//name + # resources/.../schema/fields//type + # and NOT nested ones like isAbout[*].name, valueReference[*].name + if len(segs) >= _FIELD_PATH_MIN_LEN and segs[-3] == "fields": + if segs[-1] == "name": + return True + if segs[-1] == "type": + return True + + return False + + +def _prune_empty(obj: object, *, path: str = "") -> object: + """ + Recursively remove empty values (None, '', [], {}) from a JSON-like structure. + + Keys listed in _PROTECTED_EMPTY_KEYS, and schema-critical paths such as + schema.fields[*].name/type, are never removed even if their value is empty. + """ + if isinstance(obj, dict): + result: dict[str, object] = {} + for k, v in obj.items(): + child_path = f"{path}/{k}" if path else k + cleaned = _prune_empty(v, path=child_path) + # drop if empty and not protected by schema + if cleaned in _EMPTY_SENTINELS and not _is_protected_path(child_path, k): + continue + result[k] = cleaned + return result + + if isinstance(obj, list): + cleaned_list: list[object] = [] + for idx, v in enumerate(obj): + child_path = f"{path}/{idx}" if path else str(idx) + cleaned = _prune_empty(v, path=child_path) + if cleaned in _EMPTY_SENTINELS: + continue + cleaned_list.append(cleaned) + return cleaned_list + + return obj + + +def _ensure_bounding_boxes(md: Json) -> None: # noqa: C901, PLR0912 + """ + In-place: ensure each resource has a 4-element numeric boundingBox. + + Path targeted: + resources[*].spatial.extent.boundingBox + + Rules: + - If boundingBox is missing or not a list -> set to [0, 0, 0, 0]. + - If length < 4 -> pad with zeros. + - If length > 4 -> truncate to first 4 elements. + - Try to coerce entries to float; empty strings or invalid values -> 0. + """ + resources = md.get("resources") + if not isinstance(resources, list): + return + + for res in resources: + if not isinstance(res, dict): + continue + + spatial = res.get("spatial") + if not isinstance(spatial, dict): + continue + + extent = spatial.get("extent") + if not isinstance(extent, dict): + continue + + bbox = extent.get("boundingBox") + if not isinstance(bbox, list): + bbox = [] + + # normalize length + if len(bbox) < _BBOX_COORDINATE_COUNT: + bbox = list(bbox) + [0] * (_BBOX_COORDINATE_COUNT - len(bbox)) + elif len(bbox) > _BBOX_COORDINATE_COUNT: + bbox = list(bbox[:_BBOX_COORDINATE_COUNT]) + + # coerce to numbers, treating empty/invalid as 0 + cleaned: list[float] = [] + for v in bbox: + if isinstance(v, (int, float)): + cleaned.append(float(v)) + elif isinstance(v, str): + s = v.strip() + if not s: + cleaned.append(0.0) + else: + try: + cleaned.append(float(s)) + except ValueError: + cleaned.append(0.0) + else: + cleaned.append(0.0) + + extent["boundingBox"] = cleaned + + +def _normalize_object_list_shape(items: object) -> None: # noqa: C901 + """ + In-place: make all dict elements in a list share the same key set. + + Strategy: + - Compute the union of keys across all dict items. + - For each key, look at the first non-empty value type: + * if it's a list -> default is [] for missing keys + * otherwise -> default is "" for missing keys + - Fill missing keys in each dict with the appropriate default. + + This is mainly used for small, schema'd objects like contributors so + that "short" objects appended later get the same shape as template + skeletons when keep_empty=True. + """ + if not isinstance(items, list): + return + + union_keys: set[str] = set() + exemplar_is_list: dict[str, bool] = {} + + # First pass: discover keys and whether they are list-like + for obj in items: + if not isinstance(obj, dict): + continue + for k, v in obj.items(): + union_keys.add(k) + if k not in exemplar_is_list and v is not None: + exemplar_is_list[k] = isinstance(v, list) + + # Second pass: fill missing keys + for obj in items: + if not isinstance(obj, dict): + continue + for k in union_keys: + if k in obj: + continue + if exemplar_is_list.get(k, False): + obj[k] = [] + else: + obj[k] = "" + + +def _normalize_resource_lists_for_editing(md: Json, *, keep_empty: bool) -> None: + """ + In-place normalization of list-of-dicts shapes for better editing. + + Currently only normalizes: + - resources[*].contributors + """ + if not keep_empty: + # No need to expand shapes if we're going to drop empties anyway. + return + + resources = md.get("resources") + if not isinstance(resources, list): + return + + for res in resources: + if not isinstance(res, dict): + continue + + contributors = res.get("contributors") + _normalize_object_list_shape(contributors) + + +def _collect_primary_key_names(schema: dict[str, Any]) -> set[str]: + """Return a set of primary key field names from a schema object.""" + pk_names: set[str] = set() + pk = schema.get("primaryKey") + + if isinstance(pk, str): + pk_names.add(pk) + return pk_names + + if isinstance(pk, list): + for item in pk: + if isinstance(item, str): + pk_names.add(item) + elif isinstance(item, dict): + name = item.get("name") + if isinstance(name, str): + pk_names.add(name) + + return pk_names + + +def _apply_nullable_default(field: dict[str, Any], pk_names: set[str]) -> None: + """Ensure a sensible nullable value for a single field dict.""" + name = field.get("name") + if not isinstance(name, str): + return + + is_pk = name in pk_names + is_id_like = name == "id" or name.endswith("_id") + + # IDs & PKs: always non-nullable + if is_pk or is_id_like: + field["nullable"] = False + else: + # Other fields: if nullable is missing, set a safe default. + field.setdefault("nullable", False) + + +def _ensure_field_defaults(md: Json) -> None: + """ + In-place: ensure reasonable defaults for schema.fields[*]. + + Rules + ----- + - Ensure every field has a 'nullable' key. + - Fields that are primary keys or ID-like are forced to nullable=False. + """ + resources = md.get("resources") + if not isinstance(resources, list): + return + + for res in resources: + if not isinstance(res, dict): + continue + + schema = res.get("schema") + if not isinstance(schema, dict): + continue + + fields = schema.get("fields") + if not isinstance(fields, list): + continue + + pk_names = _collect_primary_key_names(schema) + + for field in fields: + if not isinstance(field, dict): + continue + _apply_nullable_default(field, pk_names) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def normalize_metadata_for_schema( + md: Json, + *, + keep_empty: bool = True, + **_: object, +) -> Json: + """ + Return a cleaned metadata dict, suitable for saving or further editing. + + This function does **not** mutate the input. + + Parameters + ---------- + md : + The assembled OEMetadata mapping. + keep_empty : + Controls how empty values are handled: + + - True -> keep empty fields (None, '', [], {}), but normalize + object shapes in certain lists so all entries look + consistent (good for editing). + - False -> drop empty values and fully-empty objects using a + recursive prune, while **preserving keys** listed + in ``_PROTECTED_EMPTY_KEYS`` (e.g. ``primaryKey``). + + **_ : + Extra keyword arguments are accepted and ignored for backward + compatibility with older call sites (e.g. `fill_nullable`, + `ensure_primary_key`, etc.). + + Returns + ------- + dict + A deep-copied and cleaned metadata dictionary. + """ + out: Json = deepcopy(md) + + # Always fix bounding boxes (cheap and schema-friendly) + _ensure_bounding_boxes(out) + + _ensure_field_defaults(out) + + # Make contributors list elements look consistent in editing mode + _normalize_resource_lists_for_editing(out, keep_empty=keep_empty) + + # Optionally drop empty values (but keep protected keys) + if not keep_empty: + out = _prune_empty(out) # type: ignore[assignment] + + return out + + +def lint_metadata_against_schema(md: Json) -> list[str]: + """ + Very minimal linting for obvious issues. + + Currently checks: + - 'resources' is a list (if present). + - each boundingBox has length 4, if present. + """ + warnings: list[str] = [] + + resources = md.get("resources") + if resources is None: + return warnings # multi-dataset shapes are handled elsewhere + + if not isinstance(resources, list): + warnings.append("Top-level 'resources' should be a list.") + return warnings + + for i, res in enumerate(resources): + if not isinstance(res, dict): + warnings.append(f"resources[{i}] should be an object.") + continue + spatial = res.get("spatial") + if isinstance(spatial, dict): + extent = spatial.get("extent") + if isinstance(extent, dict): + bbox = extent.get("boundingBox") + if bbox is not None and (not isinstance(bbox, list) or len(bbox) != _BBOX_COORDINATE_COUNT): + warnings.append( + f"resources[{i}].spatial.extent.boundingBox should be a " + f"list of {_BBOX_COORDINATE_COUNT} values.", + ) + + return warnings + + +def detect_unknown_keys(metadata: Json, *, oem_schema: Json) -> list[str]: + """ + Return an empty list of unknown keys. + + API-compatible stub; implement real logic if needed. + """ + # Mark parameters as used to keep the signature without triggering lints. + del metadata, oem_schema + return [] + + +def strip_unknown_keys(metadata: Json, *, oem_schema: Json) -> Json: + """ + Return a deep copy unchanged. + + API-compatible stub; implement real logic if needed. + """ + # Mark unused parameter as used. + del oem_schema + return deepcopy(metadata) diff --git a/src/omi/creation/creator.py b/src/omi/creation/creator.py index 9d93b2b..95fa88d 100644 --- a/src/omi/creation/creator.py +++ b/src/omi/creation/creator.py @@ -3,9 +3,15 @@ from __future__ import annotations import json +from copy import deepcopy from pathlib import Path from omi.base import get_metadata_specification +from omi.creation.cleaner import ( + detect_unknown_keys, + normalize_metadata_for_schema, + strip_unknown_keys, +) from omi.validation import validate_metadata @@ -27,9 +33,14 @@ def generate_metadata(self, dataset: dict, resources: list[dict]) -> dict: "@context": self.oem_spec.schema["properties"]["@context"]["examples"][0], **dataset, "resources": resources, + # metaMetadata is *always* taken from the spec example, + # so users don't have to provide it. "metaMetadata": self.oem_spec.example["metaMetadata"], } + # Normalize for schema (incl. bounding boxes) before validation + metadata = normalize_metadata_for_schema(metadata, keep_empty=True) + validate_metadata(metadata, check_license=False) return metadata @@ -66,3 +77,76 @@ def save( json.dump(metadata, f, indent=indent, ensure_ascii=ensure_ascii, **dump_kwargs) print(f"OEMetadata written to {output_file}") # noqa: T201 + + def save_metadata( # noqa: PLR0913 + self, + metadata: dict, + output_file: Path | str, + *, + validate: bool = False, + check_license: bool = False, + ensure_ascii: bool = False, + indent: int = 2, + strip_before_validate: bool = False, + fail_on_unknown: bool = False, + ) -> None: + """ + Save a pre-built OEMetadata dict to disk with optional cleaning and validation. + + This variant is meant for the *augmented* metadata you produce after assembly + and potential builder/overlay mutations. + + Parameters + ---------- + metadata + OEMetadata dict to write. + output_file + Destination JSON path. + validate + If True, validate using `omi.validation.validate_metadata`. + check_license + Forwarded to validator; if True, also checks license map compliance. + ensure_ascii + If True, JSON-escape non-ASCII characters; default: False (UTF-8). + indent + JSON indentation; default: 2. + strip_before_validate + If True, drop all keys not allowed by the spec (best-effort) before + validation and writing. + fail_on_unknown + If True, raise with a list of JSON-Pointer-like paths if unknown keys + are present (checked *before* stripping). + + Raises + ------ + ValueError + If `fail_on_unknown` is True and unknown keys are detected. + + Notes + ----- + - Cleaning relies on the schema from `self.oem_spec.schema`. + - If you want silent cleanup, set `strip_before_validate=True`. + - If you prefer fail-fast CI behavior, set `fail_on_unknown=True`. + """ + md = deepcopy(metadata) + schema = self.oem_spec.schema + + if fail_on_unknown: + unknown = detect_unknown_keys(md, oem_schema=schema) + if unknown: + raise ValueError( + "Metadata contains keys not allowed by the OEMetadata schema:\n" + + "\n".join(f" - {p}" for p in unknown), + ) + + if strip_before_validate: + md = strip_unknown_keys(md, oem_schema=schema) + + if validate: + from omi.validation import validate_metadata as _validate + + _validate(md, check_license=check_license) + + p = Path(output_file) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps(md, indent=indent, ensure_ascii=ensure_ascii), encoding="utf-8") diff --git a/src/omi/creation/init.py b/src/omi/creation/init.py index 8be170f..d1e4007 100644 --- a/src/omi/creation/init.py +++ b/src/omi/creation/init.py @@ -7,17 +7,25 @@ from __future__ import annotations +import json from dataclasses import dataclass -from typing import TYPE_CHECKING +from pathlib import Path +from typing import TYPE_CHECKING, Union import yaml -from omi.base import get_metadata_specification +from omi.base import MetadataError, get_metadata_specification from omi.inspection import InspectionError, infer_metadata +from .utils import ( + collect_common_resource_fields, + dump_yaml, + load_yaml, + normalize_bounding_box_in_resource, +) + if TYPE_CHECKING: from collections.abc import Iterable - from pathlib import Path @dataclass @@ -122,6 +130,74 @@ def _dump_yaml(path: Path, data: dict, *, overwrite: bool) -> Path: return path +# --------------------------------------------------------------------------- +# Init from an existing OEMetadata JSON document +# --------------------------------------------------------------------------- + +_RESOURCE_KEYS_FROM_OEM: tuple[str, ...] = ( + "@id", + "name", + "topics", + "title", + "path", + "description", + "languages", + "subject", + "keywords", + "publicationDate", + "embargoPeriod", + "context", + "spatial", + "temporal", + "sources", + "licenses", + "contributors", + "type", + "format", + "encoding", + "schema", + "dialect", + "review", + "scheme", # used by tooling, not part of spec, but safe to keep +) + + +def _merge_known_resource_keys_from_oem(dst: dict, src: dict) -> dict: + """ + Copy a subset of resource keys from an existing OEMetadata JSON resource. + + Also normalizes the boundingBox, so later JSON Schema validation won't + fail on `['', '', '', '']`. + """ + for k in _RESOURCE_KEYS_FROM_OEM: + if k in src: + dst[k] = src[k] + normalize_bounding_box_in_resource(dst) + return dst + + +def _update_dataset_yaml_from_top_level(dataset_yaml_path: Path, top: dict) -> None: + """ + Enrich datasets/.dataset.yaml with top-level OEMetadata information. + + Copies: + - dataset.name / title / description / @id + + Does *not* copy metaMetadata, because that is owned by the spec and will + be added by OEMetadataCreator later. + """ + doc = load_yaml(dataset_yaml_path) + ds = doc.get("dataset") or {} + + for key in ("name", "title", "description", "@id"): + value = top.get(key) + if value not in (None, ""): + ds[key] = value + + doc["dataset"] = ds + dump_yaml(dataset_yaml_path, doc) + + # ----------------------------- # public API # ----------------------------- @@ -227,3 +303,175 @@ def init_resources_from_files( outputs.append(_dump_yaml(out_path, res, overwrite=overwrite)) return outputs + + +def init_from_oem_json( + base_dir: Path, + dataset_id: str, + oem_json_path: Path, + *, + oem_version: str = "OEMetadata-2.0", + collect_common: bool = False, +) -> InitResult: + """ + Initialise split-YAML layout (dataset + template + resources) from an. + + existing OEMetadata JSON document that may contain multiple resources. + + Parameters + ---------- + base_dir : + Base metadata directory (contains `datasets/` and `resources/`). + dataset_id : + Identifier for `.dataset.yaml`, `.template.yaml` and the + `resources//` folder. + oem_json_path : + Path to the OEMetadata JSON file to import. + oem_version : + OEMetadata version string used for the spec/template. + collect_common : + If True, fields that are common across resources (context/spatial/ + temporal/sources/licenses/contributors) are hoisted into the template. + + Returns + ------- + InitResult + Paths to the dataset YAML, template YAML and created resource YAMLs. + """ + base_dir = Path(base_dir) + oem = json.loads(Path(oem_json_path).read_text(encoding="utf-8")) + + # 1) Create dataset + template stubs (from spec template) + init_result = init_dataset( + base_dir=base_dir, + dataset_id=dataset_id, + oem_version=oem_version, + resources=(), + overwrite=False, + ) + + # 2) Enrich dataset YAML from top-level OEMetadata info + _update_dataset_yaml_from_top_level(init_result.dataset_yaml, oem) + # metaMetadata stays handled centrally by OEMetadataCreator + + # 3) Create resource YAMLs from OEMetadata resources + resources = oem.get("resources", []) + res_dir = base_dir / "resources" / dataset_id + res_dir.mkdir(parents=True, exist_ok=True) + + created_resources: list[Path] = [] + for res in resources: + if not isinstance(res, dict): + continue + + raw_name = (res.get("name") or "").strip() + name = raw_name or Path(str(res.get("path", "resource"))).stem + + out: dict[str, object] = {"name": name} + out = _merge_known_resource_keys_from_oem(out, res) + + out_path = res_dir / f"{name}.resource.yaml" + created_resources.append(dump_yaml(out_path, out)) + + # 4) Optionally collect common fields (e.g. context/spatial/temporal/...) + if collect_common: + collect_common_resource_fields(base_dir, dataset_id) + + return InitResult( + dataset_yaml=init_result.dataset_yaml, + template_yaml=init_result.template_yaml, + resource_yamls=created_resources, + ) + + +def add_resource_from_oem_metadata( # noqa: PLR0913 + base_dir: Union[str, Path], + dataset_id: str, + oem: dict, + *, + resource_index: int = 0, + resource_name: str | None = None, + overwrite: bool = False, + oem_version: str = "OEMetadata-2.0", + fill_missing_from_template: bool = False, +) -> Path: + """ + Add a single resource YAML file to an existing dataset from an OEMetadata mapping. + + Notes + ----- + - The given OEMetadata object may be a complete OEP meta JSON document. + The top-level dataset fields (id, name, title, @id, @context, description, ...) + are ignored. + - Only the entry at ``oem["resources"][resource_index]`` is converted into + a ``.resource.yaml`` file. + - If ``fill_missing_from_template=True``, the resource is first initialized + from the OEMetadata spec resource template (all keys present with empty + values) and then overlaid with the OEP values. This makes it easier to + see which fields are still missing when editing the YAML. + + Parameters + ---------- + base_dir : + Base directory containing ``datasets/`` and ``resources/``. + dataset_id : + ID of the local dataset (corresponds to ``resources//``). + oem : + OEMetadata mapping, e.g. directly from the OEP API. + resource_index : + Index within ``oem["resources"]``, default is 0. + resource_name : + Optional explicit resource name. If None, the name is taken from the + OEMetadata resource or derived from its ``path``. + overwrite : + If False (default) and the ``.resource.yaml`` already exists, a + ``FileExistsError`` is raised. + oem_version : + OEMetadata version string (e.g. ``"OEMetadata-2.0"``) used to select + the appropriate resource template when ``fill_missing_from_template`` + is True. + fill_missing_from_template : + If True, start from the blank resource template from the spec and + merge the OEP resource into it, so all known fields are visible + (with empty values where not provided). + + Returns + ------- + Path + Path to the created or overwritten resource YAML file. + """ + base_dir = Path(base_dir) + resources = oem.get("resources") or [] + if not resources: + msg = "OEMetadata document contains no resources." + raise MetadataError(msg) + + if resource_index < 0 or resource_index >= len(resources): + raise IndexError( + f"Resource index {resource_index} out of range for OEMetadata.resources (len={len(resources)}).", + ) + + res = resources[resource_index] + if not isinstance(res, dict): + msg = "OEMetadata resource entry is not a mapping." + raise MetadataError(msg) + + raw_name = resource_name or (res.get("name") or "").strip() + if not raw_name: + raw_name = Path(str(res.get("path", "resource"))).stem + + # Start either from a full blank resource template (all keys) or from a minimal dict + base = _resource_stub_from_spec(oem_version, raw_name) if fill_missing_from_template else {"name": raw_name} + + # Overlay OEP info onto that base + out: dict[str, object] = _merge_known_resource_keys_from_oem(base, res) + + res_dir = base_dir / "resources" / dataset_id + res_dir.mkdir(parents=True, exist_ok=True) + out_path = res_dir / f"{raw_name}.resource.yaml" + + if out_path.exists() and not overwrite: + raise FileExistsError(f"Resource YAML already exists: {out_path}") + + dump_yaml(out_path, out) + return out_path diff --git a/src/omi/creation/utils.py b/src/omi/creation/utils.py index fa59186..2677e15 100644 --- a/src/omi/creation/utils.py +++ b/src/omi/creation/utils.py @@ -14,12 +14,13 @@ import yaml if TYPE_CHECKING: - from collections.abc import Hashable + from collections.abc import Hashable, Iterable # --- deep merge helpers ------------------------------------------------------- # List keys we concatenate (resource + template) instead of replacing. DEFAULT_CONCAT_LIST_KEYS = {"keywords", "topics", "languages"} +OEM_BBOX_MIN_LENGTH = 4 def _hashable_key(x: object) -> Hashable | tuple: @@ -105,11 +106,13 @@ def deep_apply_template_to_resource( def apply_template_to_resources( resources: list[dict[str, object]], template: dict[str, object], + *, + concat_list_keys: Union[tuple[str, ...], set[str]] = DEFAULT_CONCAT_LIST_KEYS, ) -> list[dict[str, object]]: """Apply the same `template` to each resource in `resources`.""" if not template: return resources - return [deep_apply_template_to_resource(r, template) for r in resources] + return [deep_apply_template_to_resource(r, template, concat_list_keys=concat_list_keys) for r in resources] # --- YAML IO + discovery ------------------------------------------------------ @@ -293,3 +296,184 @@ def discover_dataset_ids_from_index(index_file: Union[str, Path]) -> list[str]: data = yaml.safe_load(f) or {} ds = data.get("datasets") or {} return sorted(ds.keys()) + + +def dump_yaml(path: Union[str, Path], data: dict[str, object]) -> Path: + """ + Write `data` as YAML to `path`, creating parent directories if needed. + + Returns + ------- + Path + The path that was written. + """ + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + yaml.safe_dump(data, sort_keys=False, allow_unicode=True), + encoding="utf-8", + ) + return path + + +def normalize_bounding_box_in_resource(resource: dict[str, object]) -> None: + """ + Ensure spatial.extent.boundingBox is JSON-schema friendly. + + Rules + ----- + - If boundingBox is a list of 4 empty-ish values -> [0, 0, 0, 0]. + - If boundingBox is 4 numbers -> keep as-is. + - Otherwise -> remove boundingBox (user can re-add a proper one). + """ + spatial = resource.get("spatial") + if not isinstance(spatial, dict): + return + + extent = spatial.get("extent") + if not isinstance(extent, dict): + return + + bbox = extent.get("boundingBox") + if bbox is None: + return + + if not isinstance(bbox, list) or len(bbox) != OEM_BBOX_MIN_LENGTH: + extent.pop("boundingBox", None) + return + + # all empty-ish values → default to zeros + if all(v in ("", None, "", 0, 0.0, False) for v in bbox): + extent["boundingBox"] = [0, 0, 0, 0] + return + + # mixed types → require all numbers, else drop + if not all(isinstance(v, (int, float)) for v in bbox): + extent.pop("boundingBox", None) + + +def _is_effectively_empty(value: object) -> bool: + """ + Return True if `value` is 'empty' in the sense of 'no opinion'. + + - None or "" -> empty + - list/tuple/set -> empty if all elements are empty + - dict -> empty if all values are empty + """ + if value is None: + return True + if isinstance(value, str): + return value.strip() == "" + if isinstance(value, (list, tuple, set)): + return len(value) == 0 or all(_is_effectively_empty(v) for v in value) + if isinstance(value, dict): + return len(value) == 0 or all(_is_effectively_empty(v) for v in value.values()) + return False + + +def _find_common_value_for_key( + docs: list[dict[str, object]], + key: str, + min_resources: int, +) -> tuple[object, list[int]] | None: + """ + For a given key, find the most common non-empty value across docs. + + Returns (value, indices) or None if there is no sufficiently common value. + """ + clusters: list[tuple[object, list[int]]] = [] + + for idx, d in enumerate(docs): + if key not in d or _is_effectively_empty(d[key]): + continue + v = d[key] + # try to find matching cluster + for c_val, indices in clusters: + if v == c_val: + indices.append(idx) + break + else: + # no matching cluster + clusters.append((v, [idx])) + + if not clusters: + return None + + c_val, indices = max(clusters, key=lambda pair: len(pair[1])) + if len(indices) < min_resources: + return None + + return c_val, indices + + +def collect_common_resource_fields( + base_dir: Union[str, Path], + dataset_id: str, + *, + keys: Iterable[str] = ("context", "spatial", "temporal", "sources", "licenses", "contributors"), + min_resources: int = 2, +) -> None: + """ + Hoist common top-level fields from resource YAMLs into the dataset template. + + Rules (per key): + - Look at resources//*.resource.yaml + - Ignore resources where the value is 'effectively empty'. + - Group non-empty values by structural equality (==). + - Pick the value that occurs most often. + - If it appears in at least `min_resources` resources: + * write that key/value into datasets/.template.yaml + * delete that key from any resource that has that value. + + This allows scenarios like: + - 9 resources share the same `context`, 1 has a special `context`: + -> shared one goes to template, 9 resources drop `context`, + the special one keeps its own. + """ + base = Path(base_dir) + res_dir = base / "resources" / dataset_id + template_path = base / "datasets" / f"{dataset_id}.template.yaml" + + if not res_dir.exists() or not template_path.exists(): + return + + resource_paths = sorted(res_dir.glob("*.resource.yaml")) + if not resource_paths: + return + + docs = [load_yaml(p) for p in resource_paths] + adjusted = [deepcopy(d) for d in docs] + common: dict[str, object] = {} + + for key in keys: + result = _find_common_value_for_key(docs, key, min_resources=min_resources) + if result is None: + continue + + c_val, indices = result + common[key] = c_val + + # delete that key from those resources that have this common value + for i in indices: + if key in adjusted[i] and adjusted[i][key] == c_val: + del adjusted[i][key] + + if not common: + return + + # merge common values into template + tmpl = load_yaml(template_path) + for k, v in common.items(): + tmpl[k] = v + + template_path.write_text( + yaml.safe_dump(tmpl, sort_keys=False, allow_unicode=True), + encoding="utf-8", + ) + + # write back updated resources + for p, doc in zip(resource_paths, adjusted): + p.write_text( + yaml.safe_dump(doc, sort_keys=False, allow_unicode=True), + encoding="utf-8", + ) diff --git a/tests/test_base.py b/tests/test_base.py index c6faae6..245e501 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -15,7 +15,7 @@ def test_metadata_from_oep_non_existing_table(): """Test error for non existing table.""" with pytest.raises( base.MetadataError, - match="Could not retrieve metadata from OEP table 'model_draft.non_existing_table'.", + match="Could not retrieve metadata from OEP table 'non_existing_table'.", ): base.get_metadata_from_oep_table("non_existing_table") @@ -24,6 +24,6 @@ def deactivate_test_metadata_from_oep_empty(): """Test error for empty metadata.""" with pytest.raises( base.MetadataError, - match="Metadata from 'model_draft.bnetza_eeg_anlagenstammdaten_wind_classification' is empty.", + match="Metadata from 'bnetza_eeg_anlagenstammdaten_wind_classification' is empty.", ): base.get_metadata_from_oep_table("bnetza_eeg_anlagenstammdaten_wind_classification")