diff --git a/.gitignore b/.gitignore index b3f3e57d9..76c22fc60 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea/ +.nox/ __pycache__ *.egg-info/ bfabric/scripts/query_result.txt diff --git a/bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py b/bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py new file mode 100644 index 000000000..3f0521207 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, assert_never + +import polars as pl + +if TYPE_CHECKING: + from pathlib import Path + + from bfabric_app_runner.specs.outputs.annotations import BfabricOutputDataset, IncludeDatasetRef, IncludeResourceRef + + +def _validate_table_schema(df: pl.DataFrame) -> None: + """Validate that required columns exist and have correct types.""" + if "Resource" not in df.columns: + raise ValueError("Missing required column: Resource") + + if not df["Resource"].dtype.is_integer(): + raise ValueError(f"Column 'Resource' must be integer type, got {df['Resource'].dtype}") + + if df["Resource"].null_count() > 0: + raise ValueError("Column 'Resource' cannot contain null values") + + if "Anchor" in df.columns and df["Anchor"].dtype not in (pl.Null, pl.String): + raise ValueError(f"Column 'Anchor' must be String type, got {df['Anchor'].dtype}") + + +def _load_table(ref: IncludeDatasetRef) -> pl.DataFrame: + format = ref.get_format() + # TODO maybe this should be a generic function somewhere, it's duplicated with input specs probably! + match format: + case "csv": + df = pl.read_csv(ref.local_path) + case "tsv": + df = pl.read_csv(ref.local_path, separator="\t") + case "parquet": + df = pl.read_parquet(ref.local_path) + case _: + assert_never(format) + _validate_table_schema(df) + return df + + +def _get_resource_row(resource: IncludeResourceRef, resource_mapping: dict[Path, int]) -> dict[str, int | str | None]: + # TODO check if accessing store_entry_path like this is robust enough (see comment below) + resource_id = resource_mapping[resource.store_entry_path] + return {"Resource": resource_id, "Anchor": resource.anchor, **resource.metadata} + + +# TODO the resource mapping has some challenges if it's not a model regarding mis-use non-canonical paths, +# e.g. they should never start with `/` but this could also be validated in the originating model + + +def generate_output_table(config: BfabricOutputDataset, resource_mapping: dict[Path, int]) -> pl.DataFrame: + """Generates the output table contents for the specified output dataset configuration. + + :param config: the output dataset configuration + :param resource_mapping: a mapping of store_entry_path to resource_id of the resources which were created + :return: the output table contents + """ + # read the input tables + tables_df = pl.concat([_load_table(ref) for ref in config.include_tables], how="diagonal_relaxed") + + # generate the rows for the resources + resource_rows = [] + for resource in config.include_resources: + resource_rows.append(_get_resource_row(resource, resource_mapping)) + resources_df = pl.from_dicts(resource_rows, strict=False) + _validate_table_schema(resources_df) + + # TODO the schema handling could maybe be relaxed a tiny bit + # concatenate these two dataframes now + return pl.concat([tables_df, resources_df], how="diagonal_relaxed") diff --git a/bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py b/bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py index 9a343f382..39e40468a 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py +++ b/bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py @@ -1,26 +1,29 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, assert_never import yaml +from bfabric.entities import Resource, Storage, Workunit +from bfabric.experimental.upload_dataset import bfabric_save_csv2dataset from loguru import logger +from pydantic import BaseModel -from bfabric.entities import Resource -from bfabric.entities import Storage, Workunit -from bfabric.experimental.upload_dataset import bfabric_save_csv2dataset +from bfabric_app_runner.output_registration.annotation_table import generate_output_table +from bfabric_app_runner.specs.outputs.annotations import AnnotationType, BfabricOutputDataset from bfabric_app_runner.specs.outputs_spec import ( CopyResourceSpec, - UpdateExisting, OutputsSpec, - SpecType, SaveDatasetSpec, SaveLinkSpec, + SpecType, + UpdateExisting, ) from bfabric_app_runner.util.checksums import md5sum from bfabric_app_runner.util.scp import scp if TYPE_CHECKING: from pathlib import Path + from bfabric import Bfabric from bfabric.experimental.workunit_definition import WorkunitDefinition @@ -37,8 +40,8 @@ def register_file_in_workunit( client: Bfabric, workunit_definition: WorkunitDefinition, resource_id: int | None = None, -) -> None: - """Registers a file in the workunit.""" +) -> int: + """Registers a file in the workunit and returns the resource ID.""" existing_id = _identify_existing_resource_id(client, spec, workunit_definition) if resource_id is not None and existing_id is not None and resource_id != existing_id: raise ValueError(f"Resource id {resource_id} does not match existing resource id {existing_id}") @@ -59,7 +62,8 @@ def register_file_in_workunit( if existing_id is not None: resource_data["id"] = existing_id - client.save("resource", resource_data) + result = client.save("resource", resource_data) + return result[0]["id"] def _identify_existing_resource_id( @@ -145,7 +149,7 @@ def _save_link(spec: SaveLinkSpec, client: Bfabric, workunit_definition: Workuni if existing_link_id is not None: link_data["id"] = existing_link_id res = client.save("link", link_data) - logger.info(f"Link {spec.name} saved with id {res['id']} for entity {entity_type} with id {entity_id}") + logger.info(f"Link {spec.name} saved with id {res[0]['id']} for entity {entity_type} with id {entity_id}") def find_default_resource_id(workunit_definition: WorkunitDefinition, client: Bfabric) -> int | None: @@ -160,6 +164,11 @@ def find_default_resource_id(workunit_definition: WorkunitDefinition, client: Bf return None +class RegisterAllOutputs(BaseModel): + resources_mapping: dict[Path, int] = {} + """This maps the store_entry_path to the ids of the resources which were created or updated""" + + def register_all( client: Bfabric, workunit_definition: WorkunitDefinition, @@ -167,13 +176,17 @@ def register_all( ssh_user: str | None, reuse_default_resource: bool, force_storage: Path | None, -) -> None: +) -> RegisterAllOutputs: """Registers all the output specs to the workunit.""" default_resource_was_reused = not reuse_default_resource storage = _get_storage(client, force_storage, specs_list, workunit_definition) logger.info(f"Using storage: {storage}") + # TODO maybe the naming "created" is a bit confusing, because it could have existed beforehand + # created_resources_mapping: dict[Path, int] = {} + outputs = RegisterAllOutputs() + for spec in specs_list: logger.debug(f"Registering {spec}") if isinstance(spec, CopyResourceSpec): @@ -188,7 +201,8 @@ def register_all( default_resource_was_reused = True else: resource_id = None - register_file_in_workunit( + + outputs.resources_mapping[spec.store_entry_path] = register_file_in_workunit( spec, client=client, workunit_definition=workunit_definition, @@ -199,7 +213,13 @@ def register_all( elif isinstance(spec, SaveLinkSpec): _save_link(spec, client, workunit_definition=workunit_definition) else: - raise ValueError(f"Unknown spec type: {type(spec)}") + assert_never(type(spec)) + + return outputs + + # TODO register the dataset... now this gets tricky, how should we handle the case when there is existing dataset + # - it could be modifiable or it could not be modifiable anymore + # -> recompute and then have a logic for handling def _get_storage( @@ -215,6 +235,22 @@ def _get_storage( return None +def _save_annotations(outputs: RegisterAllOutputs, annotations: list[AnnotationType], client: Bfabric) -> None: + for annotation in annotations: + if isinstance(annotation, BfabricOutputDataset): + output_df = generate_output_table(config=annotation, resource_mapping=outputs.resources_mapping) + + # TODO we should also check if there is id already + existing_dataset = None + # TODO and now we need to handle this maybe with the old logic of save/update from savedataset + # TODO but it will need some logic for handling the existing dataset already + # TODO the final step will be attaching this dataset to the workunit + + _ = output_df + _ = existing_dataset + raise NotImplementedError + + def register_outputs( outputs_yaml: Path, workunit_definition: WorkunitDefinition, @@ -224,12 +260,16 @@ def register_outputs( force_storage: Path | None, ) -> None: """Registers outputs to the workunit.""" - specs_list = OutputsSpec.read_yaml(outputs_yaml) - register_all( + with outputs_yaml.open("r") as file: + spec = OutputsSpec.model_validate(yaml.safe_load(file)) + + outputs = register_all( client=client, workunit_definition=workunit_definition, - specs_list=specs_list, + specs_list=spec.outputs, ssh_user=ssh_user, reuse_default_resource=reuse_default_resource, force_storage=force_storage, ) + # TODO and here we create the dataset mapping + _save_annotations(outputs, spec.annotations, client=client) diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/inputs/static_file_spec.py b/bfabric_app_runner/src/bfabric_app_runner/specs/inputs/static_file_spec.py index aea8c9fc5..fbeba2903 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/inputs/static_file_spec.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/inputs/static_file_spec.py @@ -11,6 +11,7 @@ class StaticFileSpec(BaseModel): type: Literal["static_file"] = "static_file" content: str | bytes + """The text or binary content to write.""" filename: str """The target filename to write to.""" diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/__init__.py b/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py b/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py new file mode 100644 index 000000000..6558aaf48 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import typing +from pathlib import Path +from typing import ClassVar, Literal + +from pydantic import BaseModel, model_validator + +from bfabric_app_runner.specs.outputs_spec import UpdateExisting + + +class IncludeDatasetRef(BaseModel): + Formats: ClassVar = Literal["csv", "tsv", "parquet"] + + local_path: Path + format: Formats | None = None + + # TODO decide if this is the correct place or it should be a level higher + update_existing: UpdateExisting = UpdateExisting.IF_EXISTS + + def get_format(self) -> Formats: + """Returns the format inferring the type from the filename if not specified explicitly.""" + if self.format is not None: + return self.format + return self.local_path.suffix.removeprefix(".") + + @model_validator(mode="after") + def _check_format_or_correct_suffix(self) -> IncludeDatasetRef: + allowed_formats = typing.get_args(self.Formats) + if self.format is None and self.local_path.suffix.removeprefix(".") not in allowed_formats: + msg = f"When format is not specified, the file extension must be one of {allowed_formats}" + raise ValueError(msg) + return self + + +class IncludeResourceRef(BaseModel): + store_entry_path: Path + # TODO None vs empty string + anchor: str | None = None + metadata: dict[str, str] = {} + + +class BfabricOutputDataset(BaseModel): + # TODO since there is only one output annotation, we cannot set the default value yet, because + # adding more types later would be a breaking change otherwise. + # type: Literal["bfabric_output_dataset"] = "bfabric_output_dataset" + type: Literal["bfabric_output_dataset"] + include_tables: list[IncludeDatasetRef] + include_resources: list[IncludeResourceRef] + + +AnnotationType = BfabricOutputDataset diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py b/bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py index 7ab7d07a6..9b21b3027 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py @@ -1,12 +1,15 @@ from __future__ import annotations import enum +from collections import Counter from pathlib import Path # noqa: TCH003 -from typing import Literal, Annotated +from typing import Annotated, Literal import yaml from pydantic import BaseModel, ConfigDict, Field, model_validator +from bfabric_app_runner.specs.outputs.annotations import AnnotationType + class UpdateExisting(enum.Enum): NO = "no" @@ -45,6 +48,7 @@ class SaveDatasetSpec(BaseModel): invalid_characters: str = "" +# TODO deprecate! class SaveLinkSpec(BaseModel): """Saves a link to the workunit, or, if desired to an arbitrary entity of type entity_type with id entity_id.""" @@ -74,13 +78,28 @@ def require_entity_id_if_not_workunit(self) -> SaveLinkSpec: class OutputsSpec(BaseModel): model_config = ConfigDict(extra="forbid") outputs: list[SpecType] + annotations: list[AnnotationType] = [] - @classmethod - def read_yaml(cls, path: Path) -> list[SpecType]: - model = cls.model_validate(yaml.safe_load(path.read_text())) - return model.outputs + # @classmethod + # def read_yaml(cls, path: Path) -> list[SpecType]: + # model = cls.model_validate(yaml.safe_load(path.read_text())) + # return model.outputs @classmethod def write_yaml(cls, specs: list[SpecType], path: Path) -> None: model = cls.model_validate(dict(outputs=specs)) path.write_text(yaml.dump(model.model_dump(mode="json"))) + + @model_validator(mode="after") + def _no_duplicate_store_entry_paths(self) -> OutputsSpec: + # arguably, this is a bit strict and might be relaxed in the future + # TODO although the main scenario where this could be a problem is when store_folder_path is used and maybe we + # should handle this differently? + resource_specs = list(filter(lambda x: isinstance(x, CopyResourceSpec), self.outputs)) + store_entry_paths = [spec.store_entry_path for spec in resource_specs] + # check for duplicates + duplicates = [path for path, count in Counter(store_entry_paths).items() if count > 1] + if duplicates: + msg = f"Duplicate store entry paths: {duplicates}" + raise ValueError(msg) + return self diff --git a/tests/bfabric_app_runner/output_registration/test_annotation_table.py b/tests/bfabric_app_runner/output_registration/test_annotation_table.py new file mode 100644 index 000000000..729549f58 --- /dev/null +++ b/tests/bfabric_app_runner/output_registration/test_annotation_table.py @@ -0,0 +1,45 @@ +import pytest +import polars as pl +import polars.testing +from pathlib import Path +from bfabric_app_runner.specs.outputs.annotations import BfabricOutputDataset, IncludeDatasetRef, IncludeResourceRef +from bfabric_app_runner.output_registration.annotation_table import generate_output_table + + +class TestGenerateOutputTable: + @pytest.fixture + def input_table_path(self, tmp_path): + table = pl.DataFrame( + {"Resource": [100, 200], "Anchor": [None, "#example"], "My custom column": ["value1", "value2"]} + ) + path = tmp_path / "input" / "table.parquet" + path.parent.mkdir() + table.write_parquet(path) + return path + + @pytest.fixture() + def spec(self, input_table_path): + return BfabricOutputDataset( + type="bfabric_output_dataset", + include_tables=[IncludeDatasetRef(local_path=input_table_path)], + include_resources=[IncludeResourceRef(store_entry_path="custom.txt", metadata={"prop": "value"})], + ) + + @pytest.fixture() + def resource_mapping(self) -> dict[Path, int]: + return {Path("custom.txt"): 300} + + def test_generate_output_table(self, spec, resource_mapping): + table = generate_output_table(spec, resource_mapping) + assert table.columns == ["Resource", "Anchor", "My custom column", "prop"] + polars.testing.assert_frame_equal( + table, + pl.DataFrame( + { + "Resource": [100, 200, 300], + "Anchor": [None, "#example", None], + "My custom column": ["value1", "value2", None], + "prop": [None, None, "value"], + } + ), + ) diff --git a/tests/bfabric_app_runner/specs/outputs/test_annotations.py b/tests/bfabric_app_runner/specs/outputs/test_annotations.py new file mode 100644 index 000000000..08f857754 --- /dev/null +++ b/tests/bfabric_app_runner/specs/outputs/test_annotations.py @@ -0,0 +1,25 @@ +from pathlib import Path +import pytest +from bfabric_app_runner.specs.outputs.annotations import IncludeDatasetRef + + +class TestIncludeDatasetRef: + @pytest.mark.parametrize("format", ["csv", "tsv", "parquet"]) + def test_get_format_when_unspecified(self, format): + ref = IncludeDatasetRef(local_path=Path(f"dummy.{format}"), format=None) + assert ref.format is None + assert ref.get_format() == format + + @pytest.mark.parametrize("format", ["csv", "tsv", "parquet"]) + def test_get_format_when_specified(self, format): + ref = IncludeDatasetRef(local_path=Path("dummy.png"), format=format) + assert ref.format == format + assert ref.get_format() == format + + def test_validate_invalid_suffix_when_format_unspecified(self): + with pytest.raises(ValueError) as err: + _ = IncludeDatasetRef(local_path=Path("dummy.png"), format=None) + assert ( + err.value.errors()[0]["msg"] + == "Value error, When format is not specified, the file extension must be one of ('csv', 'tsv', 'parquet')" + ) diff --git a/tests/bfabric_app_runner/specs/test_outputs_spec.py b/tests/bfabric_app_runner/specs/test_outputs_spec.py index ade870ea3..f32a54a53 100644 --- a/tests/bfabric_app_runner/specs/test_outputs_spec.py +++ b/tests/bfabric_app_runner/specs/test_outputs_spec.py @@ -28,7 +28,8 @@ def parsed() -> OutputsSpec: @pytest.fixture() def serialized() -> str: - return """outputs: + return """annotations: [] +outputs: - local_path: local_path protocol: scp store_entry_path: store_entry_path