diff --git a/src/hermes/model/types/__init__.py b/src/hermes/model/types/__init__.py index 67d2acc0..8ab05171 100644 --- a/src/hermes/model/types/__init__.py +++ b/src/hermes/model/types/__init__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileContributor: Michael Meinel +# SPDX-FileContributor: Michael Fritzsche from datetime import date, time, datetime @@ -19,57 +20,51 @@ lambda c: isinstance(c, ld_container), { "ld_container": lambda c, **_: c, - "json": lambda c, **_: c.compact(), "expanded_json": lambda c, **_: c.ld_value, } ), - # Wrap expanded_json to ld_container - (ld_container.is_ld_id, dict(python=lambda c, **_: c[0]['@id'])), - (ld_container.is_typed_ld_value, dict(python=ld_container.typed_ld_to_py)), - (ld_container.is_ld_value, dict(python=lambda c, **_: c[0]['@value'])), - (ld_list.is_ld_list, dict(ld_container=ld_list)), - (ld_dict.is_ld_dict, dict(ld_container=ld_dict)), - - # Expand and access JSON data - (ld_container.is_json_id, dict(python=lambda c, **_: c["@id"], expanded_json=lambda c, **_: [c])), - (ld_container.is_typed_json_value, dict(python=ld_container.typed_ld_to_py)), - (ld_container.is_json_value, dict(python=lambda c, **_: c["@value"], expanded_json=lambda c, **_: [c])), - (ld_list.is_container, dict(ld_container=lambda c, **kw: ld_list([c], **kw))), - - # FIXME: add conversion from list and json dict to expanded_json - # to parse nested dicts and lists when using for example __setitem__(key, value) from ld_dict - # where value is converted to expanded_json bevor adding it to data_dict - # Suggested: - # ( - # ld_dict.is_json_dict, - # { - # "ld_container": ld_dict.from_dict, - # "expanded_json": lambda c, **kw: kw["parent"]._to_expanded_json(kw["key"], ld_dict.from_dict(c, **kw)) - # } - # ), - # - # ( - # lambda c: isinstance(c, list), - # { - # "ld_container": ld_list.from_list, - # "expanded_json": lambda c, **kw: kw["parent"]._to_expanded_json(kw["key"], ld_list.from_list(c, **kw)) - # } - # ), - (ld_dict.is_json_dict, dict(ld_container=ld_dict.from_dict)), - - (lambda c: isinstance(c, list), dict(ld_container=ld_list.from_list)), - - # Wrap internal data types - (lambda v: isinstance(v, (int, float, str, bool)), dict(expanded_json=lambda v, **_: [{"@value": v}])), - - (lambda v: isinstance(v, datetime), - dict(expanded_json=lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:DateTime"]}])), - (lambda v: isinstance(v, date), - dict(expanded_json=lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:Date"]}])), - (lambda v: isinstance(v, time), - dict(expanded_json=lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:Time"]}])), + # Wrap item from ld_dict in ld_list + (ld_list.is_ld_list, {"ld_container": ld_list}), + (lambda c: isinstance(c, list), {"ld_container": lambda c, **kw: ld_list(c, **kw)}), + + # pythonize items from lists (expanded set is already handled above) + (ld_container.is_json_id, {"python": lambda c, **_: c["@id"]}), + (ld_container.is_typed_json_value, {"python": lambda c, **kw: ld_container.typed_ld_to_py([c], **kw)}), + (ld_container.is_json_value, {"python": lambda c, **_: c["@value"]}), + (ld_list.is_container, {"ld_container": lambda c, **kw: ld_list([c], **kw)}), + (ld_dict.is_json_dict, {"ld_container": lambda c, **kw: ld_dict([c], **kw)}), + (lambda v: isinstance(v, str), {"python": lambda v, parent, **_: parent.ld_proc.compact_iri(parent.active_ctx, v)}), + + # Convert internal data types to expanded_json + (ld_container.is_json_id, {"expanded_json": lambda c, **_: [c]}), + (ld_container.is_ld_id, {"expanded_json": lambda c, **_: c}), + (ld_container.is_json_value, {"expanded_json": lambda c, **_: [c]}), + (ld_container.is_ld_value, {"expanded_json": lambda c, **_: c}), + (ld_dict.is_json_dict, {"expanded_json": lambda c, **kw: ld_dict.from_dict(c, **kw).ld_value}), + ( + ld_list.is_container, + {"expanded_json": lambda c, **kw: ld_list.from_list(ld_list.get_item_list_from_container(c), **kw).ld_value} + ), + ( + ld_list.is_ld_list, + {"expanded_json": lambda c, **kw: ld_list.from_list(ld_list.get_item_list_from_container(c[0]), **kw).ld_value} + ), + (lambda c: isinstance(c, list), {"expanded_json": lambda c, **kw: ld_list.from_list(c, **kw).ld_value}), + (lambda v: isinstance(v, (int, float, str, bool)), {"expanded_json": lambda v, **_: [{"@value": v}]}), + ( + lambda v: isinstance(v, datetime), + {"expanded_json": lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:DateTime"]}]} + ), + ( + lambda v: isinstance(v, date), + {"expanded_json": lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:Date"]}]} + ), + ( + lambda v: isinstance(v, time), + {"expanded_json": lambda v, **_: [{"@value": v.isoformat(), "@type": iri_map["schema:Time"]}]} + ), ] diff --git a/src/hermes/model/types/ld_container.py b/src/hermes/model/types/ld_container.py index eb5be351..88d92795 100644 --- a/src/hermes/model/types/ld_container.py +++ b/src/hermes/model/types/ld_container.py @@ -3,31 +3,91 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileContributor: Michael Meinel +# SPDX-FileContributor: Michael Fritzsche from .pyld_util import JsonLdProcessor, bundled_loader +from datetime import date, time, datetime +from typing import Union, Self, Any + + +JSON_LD_CONTEXT_DICT = dict[str, Union[str, "JSON_LD_CONTEXT_DICT"]] +BASIC_TYPE = Union[str, float, int, bool] +EXPANDED_JSON_LD_VALUE = list[dict[str, Union["EXPANDED_JSON_LD_VALUE", BASIC_TYPE]]] +COMPACTED_JSON_LD_VALUE = Union[ + list[Union[dict[str, Union["COMPACTED_JSON_LD_VALUE", BASIC_TYPE]], BASIC_TYPE]], + dict[str, Union["COMPACTED_JSON_LD_VALUE", BASIC_TYPE]], +] +TIME_TYPE = Union[datetime, date, time] +JSON_LD_VALUE = Union[ + list[Union["JSON_LD_VALUE", BASIC_TYPE, TIME_TYPE, "ld_container"]], + dict[str, Union["JSON_LD_VALUE", BASIC_TYPE, TIME_TYPE, "ld_container"]], +] +PYTHONIZED_LD_CONTAINER = Union[ + list[Union["PYTHONIZED_LD_CONTAINER", BASIC_TYPE, TIME_TYPE]], + dict[str, Union["PYTHONIZED_LD_CONTAINER", BASIC_TYPE, TIME_TYPE]], +] + class ld_container: """ Base class for Linked Data containers. - A linked data container implements a view on the expanded form of an JSON-LD document. - It allows to easily interact with such documents by hiding all the nesting and - automatically mapping between different forms. + A linked data container impelements a view on the expanded form of an JSON-LD document. + It allows to easily interacts them by hinding all the nesting and automatically mapping + between different forms. + + :ivar active_ctx: The active context that is used by the json-ld processor. + :ivar context: The context exclusive to this ld_container and all its childs + (it can still be the same as e.g. parent.context) + :ivartype context: list[str | JSON_LD_CONTEXT_DICT] + :ivar full_context: The context of this ld_container and all its parents merged into one list. + :ivartype full_context: list[str | JSON_LD_CONTEXT_DICT] + :ivar index: The index into the parent container if it is a list. + :ivartype index: int + :ivar key: The key into the inner most parent that is a dict of this ld_container. + :ivartype key: str + :ivar ld_value: The expanded JSON-LD value this object represents. + :ivartype ld_value: EXPANDED_JSON_LD_VALUE + :ivar parent: The ld_container this one is directly contained in. + :ivartype parent: ld_container + :ivar path: The path from the outer most parent to this ld_container. + :ivartype path: list[str | int] + + :cvar ld_proc: The JSON-LD processor object for all ld_container. + :cvartype ld_proc: JsonLdProcessor """ ld_proc = JsonLdProcessor() - def __init__(self, data, *, parent=None, key=None, index=None, context=None): + def __init__( + self: Self, + data: EXPANDED_JSON_LD_VALUE, + *, + parent: Union["ld_container", None] = None, + key: Union[str, None] = None, + index: Union[int, None] = None, + context: Union[list[Union[str, JSON_LD_CONTEXT_DICT]], None] = None, + ) -> None: """ Create a new instance of an ld_container. + :param self: The instance of ld_container to be initialized. + :type self: Self :param data: The expanded json-ld data that is mapped. - :param parent: Optional parent node of this container. - :param key: Optional key into the parent container. - :param context: Optional local context for this container. + :type data: EXPANDED_JSON_LD_VALUE + :param parent: parent node of this container. + :type parent: ld_container | None + :param key: key into the parent container. + :type key: str | None + :param index: index into the parent container. + :type index: int | None + :param context: local context for this container. + :type context: list[str | JSON_LD_CONTEXT_DICT] | None + + :return: + :rtype: None """ - # Store basic data self.parent = parent self.key = key @@ -41,153 +101,431 @@ def __init__(self, data, *, parent=None, key=None, index=None, context=None): if self.parent: if self.context: self.active_ctx = self.ld_proc.process_context( - self.parent.active_ctx, - self.context, - {"documentLoader": bundled_loader}) + self.parent.active_ctx, self.context, {"documentLoader": bundled_loader} + ) else: self.active_ctx = parent.active_ctx else: - self.active_ctx = self.ld_proc.initial_ctx( - self.full_context, - {"documentLoader": bundled_loader} - ) + self.active_ctx = self.ld_proc.initial_ctx(self.full_context, {"documentLoader": bundled_loader}) + + def add_context(self: Self, context: list[Union[str | JSON_LD_CONTEXT_DICT]]) -> None: + """ + Add the given context to the ld_container. - def add_context(self, context): + :param self: The ld_container the context should be added to. + :type self: Self + :param context: The context to be added to self. + :type context: list[str | JSON_LD_CONTEXT_DICT] + + :return: + :rtype: None + """ self.context = self.merge_to_list(self.context, context) - self.active_ctx = self.ld_proc.process_context( - self.active_ctx, - context, - {"documentLoader": bundled_loader} - ) + self.active_ctx = self.ld_proc.process_context(self.active_ctx, context, {"documentLoader": bundled_loader}) @property - def full_context(self): + def full_context(self: Self) -> list[Union[str, JSON_LD_CONTEXT_DICT]]: + """ + Return the context of the ld_container merged with the full_context of its parent. + + :param self: The ld_container whose full_context is returned + :type self: Self + + :return: The context of the ld_container merged with the full_context of its parent via + ld_container.merge_to_list or just the context of this ld_container if self.parent is None. + :rtype: list[str | JSON_LD_CONTEXT_DICT] + """ if self.parent is not None: return self.merge_to_list(self.parent.full_context, self.context) else: return self.context @property - def path(self): - """ Create a path representation for this item. """ + def path(self: Self) -> list[Union[str, int]]: + """ + Create a path representation for this item. + + :param self: The ld_container the path leads to from its outer most parent container. + :type self: Self + + :return: The path from selfs outer most parent to it self. + Let parent be the outer most parent of self. + Start with index = 1 and iteratively set parent to parent[path[index]] and then increment index + until index == len(path) to get parent is self == true. + :rtype: list[str | int] + """ if self.parent: return self.parent.path + [self.key if self.index is None else self.index] else: - return ['$'] + return ["$"] @property - def ld_value(self): - """ Retrun a representation that is suitable as a value in expanded JSON-LD. """ + def ld_value(self: Self) -> EXPANDED_JSON_LD_VALUE: + """ + Return a representation that is suitable as a value in expanded JSON-LD of this ld_container. + + :param self: The ld_container whose expanded JSON-LD representation is returned. + :type self: Self + + :return: The expanded JSON-LD value of this container. + This value is the basis of all operations and a reference to the original is returned and not a copy. + Do **not** modify unless strictly necessary and you know what you do. + Otherwise unexpected behavior may occur. + :rtype: EXPANDED_JSON_LD_VALUE + """ return self._data - def _to_python(self, full_iri, ld_value): - # FIXME: #434 dates are not returned as datetime/ date/ time but as string + def _to_python( + self: Self, full_iri: str, ld_value: Union[list, dict, str] + ) -> Union["ld_container", BASIC_TYPE, TIME_TYPE]: + """ + Returns a pythonized version of the given value pretending the value is in self and full_iri its key. + + :param self: the ld_container ld_value is considered to be in. + :type self: Self + :param full_iri: The expanded iri of the key of ld_value / self (later if self is not a dictionary). + :type full_iri: str + :param ld_value: The value thats pythonized value is requested. ld_value has to be valid expanded JSON-LD if it + was embeded in self._data. + :type ld_value: list | dict | str + + :return: The pythonized value of the ld_value. + :rtype: ld_container | BASIC_TYPE | TIME_TYPE + """ if full_iri == "@id": + # values of key "@id" only have to be compacted value = self.ld_proc.compact_iri(self.active_ctx, ld_value, vocab=False) - elif full_iri == "@type": - value = [ - self.ld_proc.compact_iri(self.active_ctx, ld_type) - for ld_type in ld_value - ] - if len(value) == 1: - value = value[0] else: - value, ld_output = self.ld_proc.apply_typemap(ld_value, "python", "ld_container", - parent=self, key=full_iri) + # use the type map from src/hermes/model/types/__init__.py to convert all other values. + value, ld_output = self.ld_proc.apply_typemap(ld_value, "python", "ld_container", parent=self, key=full_iri) + # check if conversion was successful if ld_output is None: raise TypeError(full_iri, ld_value) return value - def _to_expanded_json(self, key, value): - if key == "@id": - ld_value = self.ld_proc.expand_iri(self.active_ctx, value, vocab=False) - elif key == "@type": - if not isinstance(value, list): - value = [value] - ld_value = [self.ld_proc.expand_iri(self.active_ctx, ld_type) for ld_type in value] - else: - short_key = self.ld_proc.compact_iri(self.active_ctx, key) - if ':' in short_key: - prefix, short_key = short_key.split(':', 1) - ctx_value = self.ld_proc.get_context_value(self.active_ctx, prefix, "@id") - active_ctx = self.ld_proc.process_context(self.active_ctx, [ctx_value], - {"documentLoader": bundled_loader}) - else: - active_ctx = self.active_ctx - ld_type = self.ld_proc.get_context_value(active_ctx, short_key, "@type") - if ld_type == "@id": - ld_value = [{"@id": value}] - ld_output = "expanded_json" - else: - ld_value, ld_output = self.ld_proc.apply_typemap(value, "expanded_json", "json", - parent=self, key=key) - if ld_output == "json": - ld_value = self.ld_proc.expand(ld_value, {"expandContext": self.full_context, - "documentLoader": bundled_loader}) - elif ld_output != "expanded_json": - raise TypeError(f"Cannot convert {type(value)}") + def _to_expanded_json( + self: Self, value: JSON_LD_VALUE + ) -> Union[EXPANDED_JSON_LD_VALUE, dict[str, EXPANDED_JSON_LD_VALUE]]: + """ + Returns an expanded version of the given value. + + The item_list/ data_dict of self will be substituted with value. + Value can be an ld_container or contain zero or more. + Then the _data of the inner most ld_dict that contains or is self will be expanded + using the JSON_LD-Processor. + If self and none of self's parents is an ld_dict, use the key from outer most ld_list + to generate a minimal dict. + + The result of this function is what value has turned into. + + :param self: The ld_dict or ld_list in which value gets expanded + :type self: Self + :param value: The value that is to be expanded. Different types are expected based on the type of self: + + value will be expanded as if it was the data_dict/ the item_list of self. + :type value: JSON_LD_VALUE + + :return: The expanded version of value i.e. the data_dict/ item_list of self if it had been value. + The return type is based on the type of self: + + :rtype: EXPANDED_JSON_LD_VALUE | dict[str, EXPANDED_JSON_LD_VALUE] + """ + # search for an ld_dict that is either self or the inner most parents parent of self that is an ld_dict + # while searching build a path such that it leads from the found ld_dicts ld_value to selfs data_dict/ item_list + parent = self + path = [] + while parent.__class__.__name__ != "ld_dict": + if parent.container_type == "@list": + path.extend(["@list", 0]) + elif parent.container_type == "@graph": + path.extend(["@graph", 0]) + path.append(self.ld_proc.expand_iri(parent.active_ctx, parent.key) if self.index is None else self.index) + if parent.parent is None: + break + parent = parent.parent + + # if neither self nor any of its parents is a ld_dict: + # create a dict with the key of the outer most parent of self and this parents ld_value as a value + # this dict is stored in an ld_container and simulates the most minimal JSON-LD object possible + if parent.__class__.__name__ != "ld_dict": + key = self.ld_proc.expand_iri(parent.active_ctx, parent.key) + parent = ld_container([{key: parent._data}]) + path.append(0) + + # all ld_container (ld_dicts and ld_lists) and datetime, date as well as time objects in value have to dissolved + # because the JSON-LD processor can't handle them + # to do this traverse value in a BFS and replace all items with a type in 'special_types' with a usable values + key_and_reference_todo_list = [(0, [value])] + special_types = (list, dict, ld_container, datetime, date, time) + while True: + # check if ready + if len(key_and_reference_todo_list) == 0: + break + # get next item + key, ref = key_and_reference_todo_list.pop() + temp = ref[key] + # replace item if necessary and add childs to the todo list + if isinstance(temp, list): + key_and_reference_todo_list.extend( + [(index, temp) for index, val in enumerate(temp) if isinstance(val, special_types)] + ) + elif isinstance(temp, dict): + key_and_reference_todo_list.extend( + [(new_key, temp) for new_key in temp.keys() if isinstance(temp[new_key], special_types)] + ) + elif isinstance(temp, ld_container): + if temp.__class__.__name__ == "ld_list" and temp.container_type == "@set": + ref[key] = temp._data + else: + ref[key] = temp._data[0] + elif isinstance(temp, datetime): + ref[key] = {"@value": temp.isoformat(), "@type": "schema:DateTime"} + elif isinstance(temp, date): + ref[key] = {"@value": temp.isoformat(), "@type": "schema:Date"} + elif isinstance(temp, time): + ref[key] = {"@value": temp.isoformat(), "@type": "schema:Time"} + + # traverse the ld_value of parent with the previously generated path + current_data = parent._data + for index in range(len(path) - 1, 0, -1): + current_data = current_data[path[index]] + # replace the data_dict/ item_list so that value is now inside of the ld_value of parent and store the old value + self_data = current_data[path[0]] + current_data[path[0]] = value + + # expand the ld_value of parent to implicitly expand value + # important the ld_value of parent is not modified because the processor makes a deep copy + expanded_data = self.ld_proc.expand( + parent._data, + {"expandContext": self.full_context, "documentLoader": bundled_loader, "keepFreeFloatingNodes": True}, + ) - return ld_value + # restore the data_dict/ item_list to its former state + current_data[path[0]] = self_data + + # use the path to get the expansion of value + for index in range(len(path) - 1, -1, -1): + expanded_data = expanded_data[path[index]] + + return expanded_data - def __repr__(self): - return f'{type(self).__name__}({self._data[0]})' + def __repr__(self: Self) -> str: + """ + Returns a short string representation of this object. + + :param self: The object whose representation is returned. + :type self: Self + + :returns: The short representation of self. + :rtype: str + """ + return f"{type(self).__name__}({self._data})" + + def __str__(self: Self) -> str: + """ + Returns a string representation of this object. - def __str__(self): + :param self: The object whose representation is returned. + :type self: Self + + :returns: The representation of self. + :rtype: str + """ return str(self.to_python()) - def compact(self, context=None): + def compact( + self: Self, context: Union[list[Union[JSON_LD_CONTEXT_DICT, str]], JSON_LD_CONTEXT_DICT, str, None] = None + ) -> COMPACTED_JSON_LD_VALUE: + """ + Returns the compacted version of the given ld_container using its context only if none was supplied. + + :param self: The ld_container that is to be compacted. + :type self: Self + :param context: The context to use for the compaction. If None the context of self is used. + :type context: list[JSON_LD_CONTEXT_DICT | str] | JSON_LD_CONTEXT_DICT | str | None + + :returns: The compacted version of selfs JSON-LD representation. + :rtype: COMPACTED_JSON_LD_VALUE + """ return self.ld_proc.compact( - self.ld_value, - context or self.context, - {"documentLoader": bundled_loader, "skipExpand": True} + self.ld_value, context or self.context, {"documentLoader": bundled_loader, "skipExpand": True} ) def to_python(self): raise NotImplementedError() @classmethod - def merge_to_list(cls, *args): + def merge_to_list(cls: type[Self], *args: tuple[Any]) -> list[Any]: + """ + Returns a list that is contains all non-list items from args and all items in the lists in args. + + :param *args: The items that should be put into one list. + :type *args: tuple[Any] + + :return: A list containing all non-list items and all items from lists in args. (Same order as in args) + :rytpe: list[Any] + """ + # base case for recursion if not args: return [] + # split args into first and all other items head, *tail = args + # recursion calls if isinstance(head, list): return [*head, *cls.merge_to_list(*tail)] else: return [head, *cls.merge_to_list(*tail)] @classmethod - def is_ld_node(cls, ld_value): + def is_ld_node(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an expanded JSON-LD node.
+ I.e. if ld_value is of the form [{a: b, ..., y: z}]. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent an expanded JSON-LD node. + :rtype: bool + """ return isinstance(ld_value, list) and len(ld_value) == 1 and isinstance(ld_value[0], dict) @classmethod - def is_ld_id(cls, ld_value): + def is_ld_id(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an expanded JSON-LD node + containing only an @id value.
+ I.e. if ld_value is of the form [{"@id": ...}]. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent an expanded JSON-LD node containing only an @id value. + :rtype: bool + """ return cls.is_ld_node(ld_value) and cls.is_json_id(ld_value[0]) @classmethod - def is_ld_value(cls, ld_value): + def is_ld_value(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an expanded JSON-LD value.
+ I.e. if ld_value is of the form [{"@value": a, ..., x: z}]. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent an expanded JSON-LD value. + :rtype: bool + """ return cls.is_ld_node(ld_value) and "@value" in ld_value[0] @classmethod - def is_typed_ld_value(cls, ld_value): + def is_typed_ld_value(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an expanded JSON-LD value + containing a value type.
+ I.e. if ld_value is of the form [{"@value": a, "@type": b, ..., x: z}]. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent an expanded JSON-LD value containing a value type. + :rtype: bool + """ return cls.is_ld_value(ld_value) and "@type" in ld_value[0] @classmethod - def is_json_id(cls, ld_value): + def is_json_id(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing a non-expanded JSON-LD node + containing only an @id value.
+ I.e. if ld_value is of the form {"@id": ...}. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent a non-expanded JSON-LD node containing only an @id value. + :rtype: bool + """ return isinstance(ld_value, dict) and ["@id"] == [*ld_value.keys()] @classmethod - def is_json_value(cls, ld_value): + def is_json_value(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing a non-expanded JSON-LD value.
+ I.e. if ld_value is of the form {"@value": b, ..., x: z}. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent a non-expanded JSON-LD value. + :rtype: bool + """ return isinstance(ld_value, dict) and "@value" in ld_value @classmethod - def is_typed_json_value(cls, ld_value): + def is_typed_json_value(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing a non-expanded JSON-LD value + containing a value type.
+ I.e. if ld_value is of the form {"@value": a, "@type": b, ..., x: z}. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent a non-expanded JSON-LD value containing a value type. + :rtype: bool + """ return cls.is_json_value(ld_value) and "@type" in ld_value @classmethod - def typed_ld_to_py(cls, data, **kwargs): + def typed_ld_to_py(cls: type[Self], data: list[dict[str, BASIC_TYPE]], **kwargs) -> Union[BASIC_TYPE, TIME_TYPE]: + """ + Returns the value of the given expanded JSON-LD value containing a value type converted into that type. + Meaning the pythonized version of the JSON-LD value data is returned.
+ ld_container.is_typed_ld_value(data) must return True. + + :param data: The value that is that is converted into its pythonized from. + :type data: list[dict[str, BASIC_TYPE]] + + :returns: The pythonized version of data. + :rtype: BASIC_TYPE | TIME_TYPE + """ # FIXME: #434 dates are not returned as datetime/ date/ time but as string ld_value = data[0]['@value'] return ld_value + + @classmethod + def are_values_equal( + cls: type[Self], first: dict[str, Union[BASIC_TYPE, TIME_TYPE]], second: dict[str, Union[BASIC_TYPE, TIME_TYPE]] + ) -> bool: + """ + Returns whether or not the given expanded JSON-LD values are considered equal. + The comparison compares the "@id" values first and returns the result if it is conclusive. + + If the comparison is inconclusive i.e. exactly one or zero of both values have an "@id" value: + Return whether or not all other keys exist in both values and all values of the keys are the same. + + :param first: The first value of the comparison + :type first: dict[str, Union[BASIC_TYPE, TIME_TYPE]] + :param second: The second value of the comparison + :type second: dict[str, Union[BASIC_TYPE, TIME_TYPE]] + + :return: Whether the values are considered equal or not. + :rtype: bool + """ + # compare @id's + if "@id" in first and "@id" in second: + return first["@id"] == second["@id"] + # compare all other values and keys (@id-comparison was inconclusive) + for key in {"@value", "@type"}: + if (key in first) ^ (key in second): + return False + if key in first and key in second and first[key] != second[key]: + return False + return True diff --git a/src/hermes/model/types/ld_dict.py b/src/hermes/model/types/ld_dict.py index 19c95ce7..589e5246 100644 --- a/src/hermes/model/types/ld_dict.py +++ b/src/hermes/model/types/ld_dict.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileContributor: Michael Meinel +# SPDX-FileContributor: Michael Fritzsche from .ld_container import ld_container @@ -26,8 +27,8 @@ def __getitem__(self, key): def __setitem__(self, key, value): full_iri = self.ld_proc.expand_iri(self.active_ctx, key) - ld_value = self._to_expanded_json(full_iri, value) - self.data_dict.update({full_iri: ld_value}) + ld_value = self._to_expanded_json({full_iri: value}) + self.data_dict.update(ld_value) def __delitem__(self, key): full_iri = self.ld_proc.expand_iri(self.active_ctx, key) @@ -37,6 +38,42 @@ def __contains__(self, key): full_iri = self.ld_proc.expand_iri(self.active_ctx, key) return full_iri in self.data_dict + def __eq__(self, other): + if not isinstance(other, (dict, ld_dict)): + return NotImplemented + if ld_container.is_json_id(other): + if "@id" in self: + return self["@id"] == other["@id"] + return self.data_dict == {} + if ld_container.is_json_value(other): + if {*self.keys()}.issubset({"@id", *other.keys()}): + return ld_container.are_values_equal(self.data_dict, other) + return False + if isinstance(other, dict): + other = self.from_dict(other, parent=self.parent, key=self.key, context=self.context) + if "@id" in self and "@id" in other: + return self["@id"] == other["@id"] + keys_self = {*self.keys()} + keys_other = {*other.keys()} + unique_keys = keys_self.symmetric_difference(keys_other) + if unique_keys and unique_keys != {"@id"}: + return False + for key in keys_self.intersection(keys_other): + item = self[key] + other_item = other[key] + res = item.__eq__(other_item) + if res == NotImplemented: + res = other_item.__eq__(item) + if res is False or res == NotImplemented: # res is not True + return False + return True + + def __ne__(self, other): + x = self.__eq__(other) + if x is NotImplemented: + return NotImplemented + return not x + def get(self, key, default=_NO_DEFAULT): try: value = self[key] diff --git a/src/hermes/model/types/ld_list.py b/src/hermes/model/types/ld_list.py index 62a7e5f3..c4d1c450 100644 --- a/src/hermes/model/types/ld_list.py +++ b/src/hermes/model/types/ld_list.py @@ -3,78 +3,714 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileContributor: Michael Meinel +# SPDX-FileContributor: Michael Fritzsche -from .ld_container import ld_container +from collections import deque +from types import NotImplementedType +from .ld_container import ( + ld_container, + JSON_LD_CONTEXT_DICT, + EXPANDED_JSON_LD_VALUE, + PYTHONIZED_LD_CONTAINER, + JSON_LD_VALUE, + TIME_TYPE, + BASIC_TYPE, +) +from typing import Generator, Hashable, Union, Self, Any -class ld_list(ld_container): - """ An JSON-LD container resembling a list. """ - container_types = ['@list', '@set', '@graph'] +class ld_list(ld_container): + """ + An JSON-LD container resembling a list ("@set", "@list" or "@graph"). + See also :class:`ld_container` - def __init__(self, data, *, parent=None, key=None, index=None, context=None): - """ Create a new ld_list.py container. + :ivar container_type: The type of JSON-LD container the list is representing. ("@set", "@list", "graph") + :ivartype container_type: str + :ivar item_list: The list of items (in expanded JSON-LD form) that are contained in this ld_list. + :ivartype item_list: EXPANDED_JSON_LD_VALUE + """ - :param container: The container type for this list. + def __init__( + self: Self, + data: Union[list[str], list[dict[str, EXPANDED_JSON_LD_VALUE]]], + *, + parent: Union["ld_container", None] = None, + key: Union[str, None] = None, + index: Union[int, None] = None, + context: Union[list[Union[str, JSON_LD_CONTEXT_DICT]], None] = None, + ) -> None: """ + Create a new ld_list container. - super().__init__(data, parent=parent, key=key, index=index, context=context) + :param self: The instance of ld_list to be initialized. + :type self: Self + :param data: The expanded json-ld data that is mapped (must be valid for @set, @list or @graph) + :type data: list[str] | list[dict[str, BASIC_TYPE | EXPANDED_JSON_LD_VALUE]] + :param parent: parent node of this container. + :type parent: ld_container | None + :param key: key into the parent container. + :type key: str | None + :param index: index into the parent container. + :type index: int | None + :param context: local context for this container. + :type context: list[str | JSON_LD_CONTEXT_DICT] | None + + :return: + :rtype: None - # Determine container and correct item list - for container in self.container_types: - if container in self._data[0]: - self.item_list = self._data[0][container] - self.container = container - break + :raises ValueError: If the given key is not a string or None was given. + :raises ValueError: If the given data is not a list. + :raises ValueError: If the data represents an unexpanded @set. I.e. is of the form [{"@set": [...]}] + :raises ValueError: If the given key is "@type" but the container_type not "@set" + or a value in the item_list not a string. + :raises ValueError: If the given key is not "@type" and any value in the item_list not a dict. + """ + # check for validity of data + if not isinstance(key, str): + raise ValueError("The key is not a string or was omitted.") + if not isinstance(data, list): + raise ValueError("The given data does not represent an ld_list.") + # infer the container type and item_list from data + if self.is_ld_list(data): + if "@list" in data[0]: + self.container_type = "@list" + self.item_list: list = data[0]["@list"] + elif "@graph" in data[0]: + self.container_type = "@graph" + self.item_list: list = data[0]["@graph"] + else: + raise ValueError("The given @set is not fully expanded.") else: - raise ValueError(f"Unexpected dict: {self.data}") + self.container_type = "@set" + self.item_list: list = data + # further validity checks + if key == "@type": + if any(not isinstance(item, str) for item in self.item_list) or self.container_type != "@set": + raise ValueError("A given value for @type is not a string.") + elif any(not isinstance(item, dict) for item in self.item_list): + raise ValueError("A given value is not properly expanded.") + # call super constructor + super().__init__(data, parent=parent, key=key, index=index, context=context) - def __getitem__(self, index): + def __getitem__( + self: Self, index: Union[int, slice] + ) -> Union[BASIC_TYPE, TIME_TYPE, ld_container, list[Union[BASIC_TYPE, TIME_TYPE, ld_container]]]: + """ + Get the item(s) at position index in a pythonized form. + + :param self: The ld_list the items are taken from. + :type self: Self + :param index: The positon(s) from which the item(s) is/ are taken. + :type index: int | slice + + :return: The pythonized item(s) at index. + :rtype: BASIC_TYPE | TIME_TYPE | ld_container | list[BASIC_TYPE | TIME_TYPE | ld_container]] + """ + # handle slices by applying them to a list of indices and then getting the items at those if isinstance(index, slice): return [self[i] for i in [*range(len(self))][index]] - item = self._to_python(self.key, self.item_list[index:index + 1]) + # get the item from the item_list and pythonize it. If necessary add the index. + item = self._to_python(self.key, self.item_list[index]) if isinstance(item, ld_container): item.index = index return item - def __setitem__(self, index, value): - self.item_list[index:index + 1] = self._to_expanded_json(self.key, value) + def __setitem__( + self: Self, index: Union[int, slice], value: Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container] + ) -> None: + """ + Set the item(s) at position index to the given value(s). + All given values are expanded. If any are assimilated by self all items that would be added by this are added. + + :param self: The ld_list the items are set in. + :type self: Self + :param index: The positon(s) at which the item(s) is/ are set. + :type index: int | slice + :param value: The new value(s). + :type value: Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container] - def __len__(self): + :return: + :rtype: None + """ + if not isinstance(index, slice): + # expand the value + value = self._to_expanded_json([value]) + # the returned value is always a list but may contain more then one item + # therefor a slice on the item_list is used to add the expanded value(s) + if index != -1: + self.item_list[index:index+1] = value + else: + self.item_list[index:] = value + return + # check if the given values can be iterated (value does not have to be a list) + try: + iter(value) + except TypeError as exc: + raise TypeError("must assign iterable to extended slice") from exc + # expand the values and merge all expanded values into one list + expanded_value = ld_container.merge_to_list(*[self._to_expanded_json([val]) for val in value]) + # set the values at index to the expanded values + self.item_list[index] = [val[0] if isinstance(val, list) else val for val in expanded_value] + + def __delitem__(self: Self, index: Union[int, slice]) -> None: + """ + Delete the item(s) at position index. + Note that if a deleted object is represented by an ld_container druing this process it will still exist + and not be modified afterwards. + + :param self: The ld_list the items are deleted from. + :type self: Self + :param index: The positon(s) at which the item(s) is/ are deleted. + :type index: int | slice + + :return: + :rtype: None + """ + del self.item_list[index] + + def __len__(self: Self) -> int: + """ + Returns the number of items in this ld_list. + + :param self: The ld_list whose length is to be returned. + :type self: Self + + :return: The length of self. + :rtype: int + """ return len(self.item_list) - def __iter__(self): + def __iter__(self: Self) -> Generator[Union[BASIC_TYPE | TIME_TYPE | ld_container], None, None]: + """ + Returns an iterator over the pythonized values contained in self. + + :param self: The ld_list over whose items is iterated. + :type self: Self + + :return: The Iterator over self's values. + :rtype: Generator[Union[BASIC_TYPE | TIME_TYPE | ld_container], None, None] + """ + # return an Iterator over each value in self in its pythonized from for index, value in enumerate(self.item_list): - item = self._to_python(self.key, [value]) + item = self._to_python(self.key, value) + # add which entry an ld_container is stored at, if item is an ld_container if isinstance(item, ld_container): item.index = index yield item - def append(self, value): - ld_value = self._to_expanded_json(self.key, value) - self.item_list.extend(ld_value) + def __contains__(self: Self, value: JSON_LD_VALUE) -> bool: + """ + Returns whether or not value is contained in self. + Note that it is not directly checked if value is in self.item_list. + First value is expanded then it is checked if value is in self.item_list. + If however value is assimilated by self it is checked if all values are contained in self.item_list. + Also note that the checks whether the expanded value is in self.item_list is based on ld_list.__eq__. + That means that this value is 'contained' in self.item_list if any object in self.item_list + has the same @id like it or it xor the object in the item_list has an id an all other values are the same. + + :param self: The ld_list that is checked if it contains value. + :type self: Self + :param value: The object being checked whether or not it is in self. + :type value: JSON_LD_VALUE + + :return: Whether or not value is being considered to be contained in self. + :rtype: bool + """ + # expand value + expanded_value = self._to_expanded_json([value]) + # empty list -> no value to check + if len(expanded_value) == 0: + return True + # call contains on all items in the expanded list if it contains more then one item + # and return true only if all calls return true + if len(expanded_value) > 1: + return all(val in self for val in expanded_value) + self_attributes = {"parent": self.parent, "key": self.key, "index": self.index, "context": self.full_context} + # create a temporary list containing the expanded value + # check for equality with a list containg exactly one item from self.item_list for every item in self.item_list + # return true if for any item in self.item_list this check returns true + if self.container_type == "@set": + temp_list = ld_list(expanded_value, **self_attributes) + return any(temp_list == ld_list([val], **self_attributes) for val in self.item_list) + temp_list = ld_list([{self.container_type: expanded_value}], **self_attributes) + return any(temp_list == ld_list([{self.container_type: [val]}], **self_attributes) for val in self.item_list) + + def __eq__( + self: Self, + other: Union[ + "ld_list", + list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]], + dict[str, list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]]], + ], + ) -> Union[bool, NotImplementedType]: + """ + Returns wheter or not self is considered to be equal to other.
+ If other is not an ld_list, it is converted first. + For each index it is checked if the ids of the items at index in self and other match if both have one, + if only one has or neither have an id all other values are compared.
+ Note that due to those circumstances equality is not transitve + meaning if a == b and b == c is is not guaranteed that a == c.
+ If self or other is considered unordered the comparison is more difficult. All items in self are compared + with all items in other. On the resulting graph given by the realtion == the Hopcroft-Karp algoritm is used + to determine if there exists a bijection reordering self so that the ordered comparison of self with other + returns true. + + :param self: The ld_list other is compared to. + :type self: Self + :param other: The list/ container/ ld_list self is compared to. + :type other: ld_list | list[JSON_LD_VALUE | BASIC_TYPE | TIME_TYPE | ld_container] + | dict[str, list[JSON_LD_VALUE | BASIC_TYPE | TIME_TYPE | ld_container]] + + :return: Whether or not self and other are considered equal. + If other is of the wrong type return NotImplemented instead. + :rtype: bool | NotImplementedType + """ + # check if other has an acceptable type + if not (isinstance(other, (list, ld_list)) or ld_list.is_container(other)): + return NotImplemented + + # convert other into an ld_list if it isn't one already + if isinstance(other, dict): + other = [other] + if isinstance(other, list): + if ld_list.is_ld_list(other): + if "@list" in other[0]: + cont = "@list" + elif "@graph" in other[0]: + cont = "@graph" + else: + cont = "@set" + other = other[0][cont] + else: + cont = "@set" + other = self.from_list(other, parent=self.parent, key=self.key, context=self.context, container_type=cont) + + # check if the length matches + if len(self.item_list) != len(other.item_list): + return False + + # check for special case (= key is @type) + if (self.key == "@type") ^ (other.key == "@type"): + return False + if self.key == other.key == "@type": + # lists will only contain string + return self.item_list == other.item_list + + if self.container_type == other.container_type == "@list": + # check if at each index the items are considered equal + for index, (item, other_item) in enumerate(zip(self.item_list, other.item_list)): + # check if items are values + if ((ld_container.is_typed_json_value(item) or ld_container.is_json_value(item)) and + (ld_container.is_typed_json_value(other_item) or ld_container.is_json_value(other_item))): + if not ld_container.are_values_equal(item, other_item): + return False + continue + # check if both contain an id and compare + if "@id" in item and "@id" in other_item: + if item["@id"] != other_item["@id"]: + return False + continue + # get the 'real' items (i.e. can also be ld_dicts or ld_lists) + item = self[index] + other_item = other[index] + # compare using the correct equals method + res = item.__eq__(other_item) + if res == NotImplemented: + # swap order if first try returned NotImplemented + res = other_item.__eq__(item) + # return false if the second comparison also fails or one of them returned false + if res is False or res == NotImplemented: + return False + # return true because no unequal elements where found + return True + else: + # check which items in self are equal the which in other + equality_pairs = [[] for i in range(len(self))] # j in equality_pairs[i] <=> self[i] == other[j] + for index, item in enumerate(self.item_list): + for other_index, other_item in enumerate(other.item_list): + # check if items are values + if ((ld_container.is_typed_json_value(item) or ld_container.is_json_value(item)) and + (ld_container.is_typed_json_value(other_item) or ld_container.is_json_value(other_item))): + if ld_container.are_values_equal(item, other_item): + equality_pairs[index] += [other_index] + continue + # check if both contain an id and compare + if "@id" in item and "@id" in other_item: + if item["@id"] == other_item["@id"]: + equality_pairs[index] += [other_index] + continue + # get the 'real' items (i.e. can also be ld_dicts or ld_lists) + item = self[index] + other_item = other[index] + # compare using the correct equals method + res = item.__eq__(other_item) + if res == NotImplemented: + # swap order if first try returned NotImplemented + res = other_item.__eq__(item) + # if one of both comparisons returned true the elements are equal + if res is not NotImplemented and res: + equality_pairs[index] += [other_index] + if len(equality_pairs[index]) == 0: + # there exists no element in other that is equal to item + return False + # check if there is a way to chose one index from equality_pairs[i] for every i + # so that there are no two i's with the same chosen index. + # If such a way exists self and other are considered equal. If not they are considered to be not equal. + # solved via a Hopcroft-Karp algorithm variant: + # The bipartite graph is the disjoint union of the vertices 1 to len(self) and + # freely chosen ids for each list in equality_pairs. + # The graph has an edge from i to the id of a list if i is contained in the list. + item_count = len(self) + verticies_set1 = {*range(item_count)} + verticies_set2 = {*range(item_count, 2 * item_count)} + edges = {i: tuple(j for j in verticies_set2 if i in equality_pairs[j - item_count]) for i in verticies_set1} + return ld_list._hopcroft_karp(verticies_set1, verticies_set2, edges) == len(self) + + @classmethod + def _bfs_step( + cls: Self, verticies1: set[Hashable], edges: dict[Hashable, tuple[Hashable]], matches: dict[Hashable, Hashable], + distances: dict[Hashable, Union[int, float]] + ) -> bool: + """ + Completes the BFS step of Hopcroft-Karp. I.e.:
+ Finds the shortest path from all unmatched verticies in verticies1 to any unmatched vertex in any value in edges + where the connecting paths are alternating between matches and its complement.
+ It also marks each vertex in verticies1 with how few verticies from verticies1 have to be passed + to reach the vertex from an unmatched one in verticies1. This is stored in distances. + + :param verticies1: The set of verticies in the left partition of the bipartite graph. + :type verticies1: set[Hashable] + :param edges: The edges in the bipartite graph. (As the edges are bidirectional they are expected to be given in + this format: Dictionary with keys being the vertices in the left partition and values being tuples + of verticies in the right partition.) + :type edges: dict[Hashable, tuple[Hashable]] + :param matches: The current matching of verticies in the left partition with the ones in the right partition. + :type matches: dict[Hashable, Hashable] + :param distances: The reference to the dictionary mapping verticies of the left partition to the minimal + number of verticies in the left partition that will be passed on a path from an unmatched vertex of the left + partition to the vertex that is the key. + :type distances: dict[Hashable, Union[int, float]] + + :returns: Wheter or not a alternating path from an unmatched vertex in the left partition to an unmatched vertex + in the right partition exists. + :rtype: bool + """ + # initialize the queue and set the distances to zero for unmatched vertices and to inf for all others + queue = deque() + for ver in verticies1: + if matches[ver] is None: + distances[ver] = 0 + queue.append(ver) + else: + distances[ver] = float("inf") + distances[None] = float("inf") + # begin BFS + while len(queue) != 0: + ver1 = queue.popleft() + # if the current vertex has a distance less then the current minimal one from an unmatched vertex in the + # left partition to an unmatched one in the right partition + if distances[ver1] < distances[None]: + # iterate over all vertices in the right partition connected to ver1 + for ver2 in edges[ver1]: + # if the vertex ver2 is matched with (or None if not matched) wasn't visited yet + if distances[matches[ver2]] == float("inf"): + # initialize the distance and queue the vertex for further search + distances[matches[ver2]] = distances[ver1] + 1 + queue.append(matches[ver2]) + # if a path to None i.e. an unmatched vertex in the right partition was found return true otherwise false + return distances[None] != float("inf") + + @classmethod + def _dfs_step( + cls: Self, ver: Hashable, edges: dict[Hashable, tuple[Hashable]], matches: dict[Hashable, Hashable], + distances: dict[Hashable, Union[int, float]] + ) -> bool: + """ + Completes the DFS step of Hopcroft-Karp. I.e.:
+ Adds all edges on every path with the minimal path length to matches if they would be in the symmetric + difference of matches and the set of edges on the union of the paths. + + :param ver: The set of verticies in the left partition of the bipartite graph. + :type vert: Hashable + :param edges: The edges in the bipartite graph. (As the edges are bidirectional they are expected to be given in + this format: Dictionary with keys being the vertices in the left partition and values being tuples + of verticies in the right partition.) + :type edges: dict[Hashable, tuple[Hashable]] + :param matches: The current matching of verticies in the left partition with the ones in the right partition. + :type matches: dict[Hashable, Hashable] + :param distances: The reference to the dictionary mapping verticies of the left partition to the minimal + number of verticies in the left partition that will be passed on a path from an unmatched vertex of the left + partition to the vertex that is the key. The values will be replaced with float("inf") to mark already + visited vertices. + :type distances: dict[Hashable, Union[int, float]] + + :returns: Wheter or not a path from the unmatched vertex ver in the left partition to an unmatched vertex + in the right partition could still exist. + :rtype: bool + """ + # recursion base case: None always has a shortest possible path to itself + if ver is None: + return True + # iterate over all vertices connected to ver in the right partition + for ver2 in edges[ver]: + # if ver2 is on a path with minimal length and not all subtrees have been searched already + if distances[matches[ver2]] == distances[ver] + 1: + if cls._dfs_step(matches[ver], edges, matches, distances): + # add the edge to the matches and return true + matches[ver2] = ver + matches[ver] = ver2 + return True + # mark this vertex as completly searched + distances[ver] = float("inf") + return False + + @classmethod + def _hopcroft_karp( + cls: Self, verticies1: set[Hashable], verticies2: set[Hashable], edges: dict[Hashable, tuple[Hashable]] + ) -> int: + """ + Implementation of Hopcroft-Karp. I.e.:
+ Finds how maximal number of edges with the property that no two edges share an endpoint (and startpoint) + in the given bipartite graph.
+ Note that verticies1 and verticies2 have to be disjoint. + + :param verticies1: The set of verticies in the left partition of the bipartite graph. + :type verticies1: set[Hashable] + :param verticies2: The set of verticies in the right partition of the bipartite graph. + :type verticies2: set[Hashable] + :param edges: The edges in the bipartite graph. (As the edges are bidirectional they are expected to be given in + this format: Dictionary with keys being the vertices in the left partition and values being tuples + of verticies in the right partition.) + :type edges: dict[Hashable, tuple[Hashable]] - def extend(self, value): + :returns: The number of edges. + :rtype: int + """ + # initializes the first matching. None is a imaginary vertex to denote unmatched vertices. + matches = dict() + for ver in verticies1: + matches[ver] = None + for ver in verticies2: + matches[ver] = None + matching_size = 0 + distances = dict() + while cls._bfs_step(verticies1, edges, matches, distances): + # while a alternating path from an unmatched vertex in the left partition exits + # recalculate the distances and + # iterate over all unmatched vertices in the left partition. + for ver in verticies1: + if matches[ver] is None: + # create the new matches dict and if a new edge was added increase the size of the matching + if cls._dfs_step(ver, edges, matches, distances): + matching_size += 1 + # return the size of the matching + return matching_size + + def __ne__( + self: Self, + other: Union[ + "ld_list", + list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]], + dict[str, list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]]], + ], + ) -> Union[bool, NotImplementedType]: + """ + Returns whether or not self and other not considered to be equal. + (Returns not self.__eq__(other) if the return type is bool. + See ld_list.__eq__ for more details on the comparison.) + + :param self: The ld_list other is compared to. + :type self: Self + :param other: The list/ container/ ld_list self is compared to. + :type other: ld_list | list[JSON_LD_VALUE | BASIC_TYPE | TIME_TYPE | ld_container] + | dict[str, list[JSON_LD_VALUE | BASIC_TYPE | TIME_TYPE | ld_container]] + + :return: Whether or not self and other are not considered equal. + If other is of the wrong type return NotImplemented instead. + :rtype: bool | NotImplementedType + """ + # compare self and other using __eq__ + x = self.__eq__(other) + # return NotImplemented if __eq__ did so and else the inverted result of __eq__ + if x is NotImplemented: + return NotImplemented + return not x + + def append(self: Self, value: Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]) -> None: + """ + Append the item to the given ld_list self. + The given value is expanded. If it is assimilated by self all items that would be added by this are added. + + :param self: The ld_list the item is appended to. + :type self: Self + :param value: The new value. + :type value: Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container] + + :return: + :rtype: None + """ + self.item_list.extend(self._to_expanded_json([value])) + + def extend(self: Self, value: list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]]) -> None: + """ + Append the items in value to the given ld_list self. + The given values are expanded. If any are assimilated by self all items that would be added by this are added. + + :param self: The ld_list the items are appended to. + :type self: Self + :param value: The new values. + :type value: list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE, ld_container]] + + :return: + :rtype: None + """ for item in value: self.append(item) - def to_python(self): + def to_python(self: Self) -> list[PYTHONIZED_LD_CONTAINER]: + """ + Return a fully pythonized version of this object where all ld_container are replaced by lists and dicts. + + :param self: The ld_list whose fully pythonized version is returned. + :type self: Self + + :return: The fully pythonized version of self. + :rtype: list[PYTHONIZED_LD_CONTAINER] + """ return [ item.to_python() if isinstance(item, ld_container) else item for item in self ] @classmethod - def is_ld_list(cls, ld_value): + def is_ld_list(cls: type[Self], ld_value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an ld_list.
+ I.e. if ld_value is of the form [{container_type: [...]}] where container_type is '@set', '@list' or '@graph'. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent an ld_list. + :rtype: bool + """ return cls.is_ld_node(ld_value) and cls.is_container(ld_value[0]) @classmethod - def is_container(cls, value): - return isinstance(value, dict) and any(ct in value for ct in cls.container_types) + def is_container(cls: type[Self], value: Any) -> bool: + """ + Returns wheter the given value is considered to be possible of representing an json-ld container.
+ I.e. if ld_value is of the form {container_type: [...]} where container_type is '@set', '@list' or '@graph'. + + :param ld_value: The value that is checked. + :type ld_value: Any + + :returns: Wheter or not ld_value could represent a json-ld container. + :rtype: bool + """ + return ( + isinstance(value, dict) + and [*value.keys()] in [["@list"], ["@set"], ["@graph"]] + and any(isinstance(value.get(cont, None), list) for cont in {"@list", "@set", "@graph"}) + ) @classmethod - def from_list(cls, value, *, parent=None, key=None, context=None, container=None): - new_list = cls([{container or "@list": []}], parent=parent, key=key, context=context) - new_list.extend(value) - return new_list + def from_list( + cls: type[Self], + value: list[Union[JSON_LD_VALUE, BASIC_TYPE, TIME_TYPE]], + *, + parent: Union[ld_container, None] = None, + key: Union[str, None] = None, + context: Union[str, JSON_LD_CONTEXT_DICT, list[Union[str, JSON_LD_CONTEXT_DICT]], None] = None, + container_type: str = "@set" + ) -> "ld_list": + """ + Creates a ld_list from the given list with the given parent, key, context and container_type.
+ Note that only container_type '@set' is valid for key '@type'.
+ Further more note that if parent would assimilate the values in value no new ld_list is created + and the given values are appended to parent instead and parent is returned. + + :param value: The list of values the ld_list should be created from. + :type value: list[JSON_LD_VALUE | BASIC_TYPE | TIME_TYPE] + :param parent: The parent container of the new ld_list.
If value is assimilated by parent druing JSON-LD + expansion parent is extended by value and parent is returned. + :type parent: ld_container | None + :param key: The key into the inner most parent container representing a dict of the new ld_list. + :type: key: str | None + :param context: The context for the new list (is will also inherit the context of parent).
+ Note that this context won't be added to parent if value is assimilated by parent and parent is returned. + :type context: str | JSON_LD_CONTEXT_DICT | list[str | JSON_LD_CONTEXT_DICT] | None + :param container_type: The container type of the new list valid are '@set', '@list' and '@graph'.
+ If value is assimilated by parent and parent is returned the given container_type won't affect + the container type of parent.
Also note that only '@set' is valid if key is '@type'. + :type container_type: str + + :return: The new ld_list build from value or if value is assimilated by parent, parent extended by value. + :rtype: ld_list + + :raises ValueError: If key is '@type' and container_type is not '@set'. + """ + # TODO: handle context if not of type list or None + # validate container_type + if key == "@type": + if container_type != "@set": + raise ValueError(f"The given container type is {container_type} which is invalid for a list" + " containing values for '@type' (valid is only '@set').") + if container_type in {"@list", "@graph"}: + # construct json-ld container that indicates the container type + value = [{container_type: value}] + elif container_type != "@set": + raise ValueError(f"Invalid container type: {container_type}. (valid are only '@set', '@list' and '@graph')") + + if parent is not None: + # expand value in the "context" of parent + if isinstance(parent, ld_list): + expanded_value = parent._to_expanded_json([value]) + if (len(expanded_value) != 1 or + not (isinstance(expanded_value[0], list) or cls.is_container(expanded_value[0]))): + # parent assimilated value druing expansion. Therefor the values are appended and parent returned + # if value is assimilated but contained only one list after expansion this list is used for + # the new list instead of expanding parent + parent.extend(expanded_value) + return parent + else: + expanded_value = parent._to_expanded_json({key: value})[cls.ld_proc.expand_iri(parent.active_ctx, key)] + else: + # create a temporary ld_list which is necessary for expansion + # value is not passed in a list as usual because value should be treated like the item list of the + # temporary object and not like a item in it + expanded_value = cls([], parent=None, key=key, context=context)._to_expanded_json(value) + + # construct and return the final ld_list from the expanded_value + return cls(expanded_value, parent=parent, key=key, context=context) + + @classmethod + def get_item_list_from_container(cls: type[Self], ld_value: dict[str, list[Any]]) -> list[Any]: + """ + Returns the item list from a container, the given ld_value, (i.e. {container_type: item_list}).
+ Only '@set', '@list' and '@graph' are valid container types. + + :param ld_value: The container whose item list is to be returned. + :type ld_value: dict[str, list[Any]] + + :returns: The list the container holds. + :rtype: list[Any] + + :raises ValueError: If the item_container is not a dict. + :raises ValueError: If the container_type is not exactly one of '@set', '@list' and '@graph'. + :raises ValueError: If the item_list is no list. + """ + if type(ld_value) != dict: + raise ValueError(f"The given data {ld_value} is not a dictionary and therefor no container.") + if len(ld_value.keys()) != 1: + raise ValueError(f"The given data contains two many or few entries ({len(ld_value.keys())})." + " It should be only one entry: '@set', '@list' or '@graph' as key and a list as value.") + # find the container type to return the item_list + for cont in {"@list", "@set", "@graph"}: + if cont in ld_value: + if type(ld_value[cont]) != list: + raise ValueError(f"The item list of {ld_value} is of type {type(ld_value[cont])} and not list.") + return ld_value[cont] + raise ValueError(f"The given data {ld_value} does not represent a container.") diff --git a/test/hermes_test/model/types/conftest.py b/test/hermes_test/model/types/conftest.py index 8a1c7c2e..7d7e52d5 100644 --- a/test/hermes_test/model/types/conftest.py +++ b/test/hermes_test/model/types/conftest.py @@ -25,6 +25,7 @@ def vocabulary(cls, base_url: str = "http://spam.eggs/") -> dict: "ham": {"@id": f"{base_url}ham", "@type": "@id"}, "eggs": {"@id": f"{base_url}eggs", "@container": "@list"}, "use_until": {"@id": f"{base_url}use_until", "@type": "http://schema.org/DateTime"}, + "schema": "https://schema.org/", "Egg": {"@id": f"{base_url}Egg"}, } diff --git a/test/hermes_test/model/types/test_ld_container.py b/test/hermes_test/model/types/test_ld_container.py index 9da5b461..f73fdcd9 100644 --- a/test/hermes_test/model/types/test_ld_container.py +++ b/test/hermes_test/model/types/test_ld_container.py @@ -4,12 +4,14 @@ # SPDX-FileContributor: Sophie Kernchen # SPDX-FileContributor: Michael Meinel +# SPDX-FileContributor: Michael Fritzsche -from datetime import datetime +from datetime import datetime, time import pytest from hermes.model.types.ld_container import ld_container +from hermes.model.types.ld_dict import ld_dict '''we expect user of this class to give the right input data types @@ -82,7 +84,7 @@ def test_container_full_context_and_path(self, httpserver): def test_container_str_and_repr(self): cont = ld_container([{"spam": [{"@value": "bacon"}]}]) - assert repr(cont) == "ld_container({'spam': [{'@value': 'bacon'}]})" + assert repr(cont) == "ld_container([{'spam': [{'@value': 'bacon'}]}])" with pytest.raises(NotImplementedError): str(cont) @@ -95,61 +97,106 @@ def test_to_python_id_with_prefix(self, mock_context): assert cont._to_python("@id", f"{self.url}identifier") == "prefix:identifier" def test_to_python_type(self, mock_context): - cont = ld_container([{}], context=[mock_context]) - assert cont._to_python("@type", ["@id"]) == '@id' + cont = ld_dict([{"@type": ["@id"]}], context=[mock_context]) + assert cont._to_python("@type", ["@id"]) == ['@id'] + cont = ld_dict([{"@type": ["@id", "http://spam.eggs/Egg"]}], context=[mock_context]) assert cont._to_python("@type", ["@id", "http://spam.eggs/Egg"]) == ["@id", "Egg"] def test_to_python_id_value(self, mock_context): - cont = ld_container([{}], context=[mock_context]) + cont = ld_dict([{}], context=[mock_context]) assert cont._to_python("http://spam.eggs/ham", - [{"@id": "http://spam.eggs/spam"}]) == "http://spam.eggs/spam" + [{"@id": "http://spam.eggs/spam"}]) == [{"@id": "http://spam.eggs/spam"}] assert cont._to_python("http://spam.eggs/ham", - [{"@id": "http://spam.eggs/identifier"}]) == "http://spam.eggs/identifier" + {"@id": "http://spam.eggs/identifier"}) == "http://spam.eggs/identifier" def test_to_python_basic_value(self, mock_context): cont = ld_container([{}], context=[mock_context]) - assert cont._to_python("http://soam.eggs/spam", [{"@value": "bacon"}]) == 'bacon' - assert cont._to_python("http://spam.eggs/spam", [{"@value": True}]) is True - assert cont._to_python("http://spam.eggs/spam", [{"@value": 123}]) == 123 + assert cont._to_python("http://soam.eggs/spam", {"@value": "bacon"}) == 'bacon' + assert cont._to_python("http://spam.eggs/spam", {"@value": True}) is True + assert cont._to_python("http://spam.eggs/spam", {"@value": 123}) == 123 def test_to_python_datetime_value(self, mock_context): cont = ld_container([{}], context=[mock_context]) - assert cont._to_python("http://spam.eggs/eggs", [{ + assert cont._to_python("http://spam.eggs/eggs", { "@value": "2022-02-22T00:00:00", "@type": "https://schema.org/DateTime" - }]) == "2022-02-22T00:00:00" + }) == "2022-02-22T00:00:00" # TODO: #434 typed date is returned as string instead of date - def test_to_expanded_id(self, mock_context): + def test_to_python_error(self, mock_context): cont = ld_container([{}], context=[mock_context]) - assert cont._to_expanded_json("@id", f"{self.url}identifier") == f"{self.url}identifier" + with pytest.raises(TypeError): + cont._to_python("http://spam.eggs/eggs", set()) + + def test_to_expanded_id(self, mock_context): + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"@id": f"{self.url}identifier"}) == {"@id": f"{self.url}identifier"} # Regression test: "ham" is vocabulary and must not be expanded. - assert cont._to_expanded_json("@id", "ham") == "ham" + assert cont._to_expanded_json({"@id": "ham"}) == {"@id": "ham"} def test_to_expanded_id_with_prefix(self, mock_context): - cont = ld_container([{}], context=[mock_context, {"prefix": self.url}]) - assert cont._to_expanded_json("@id", "prefix:identifier") == f"{self.url}identifier" + cont = ld_dict([{}], context=[mock_context, {"prefix": self.url}]) + assert cont._to_expanded_json({"@id": "prefix:identifier"}) == {"@id": f"{self.url}identifier"} # Regression test: "ham" should still not be expaned, but "prefix:ham" should be. - assert cont._to_expanded_json("@id", "ham") == "ham" - assert cont._to_expanded_json("@id", "prefix:ham") == f"{self.url}ham" + assert cont._to_expanded_json({"@id": "ham"}) == {"@id": "ham"} + assert cont._to_expanded_json({"@id": "prefix:ham"}) == {"@id": f"{self.url}ham"} def test_to_expanded_type(self, mock_context): - cont = ld_container([{}], context=[mock_context]) - assert cont._to_expanded_json("@type", "Egg") == ["http://spam.eggs/Egg"] - assert cont._to_expanded_json("@type", ["Egg", "@id"]) == ["http://spam.eggs/Egg", "@id"] + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"@type": "Egg"}) == {"@type": ["http://spam.eggs/Egg"]} + assert cont._to_expanded_json({"@type": ["Egg", "@id"]}) == {"@type": ["http://spam.eggs/Egg", "@id"]} def test_to_expanded_id_value(self, mock_context): - cont = ld_container([{}], context=[mock_context]) - assert cont._to_expanded_json("ham", "spam") == [{"@id": "spam"}] + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"ham": "spam"}) == {"http://spam.eggs/ham": [{"@id": "spam"}]} def test_to_expanded_basic_value(self, mock_context): - cont = ld_container([{}], context=[mock_context]) - assert cont._to_expanded_json("spam", "bacon") == [{"@value": "bacon"}] - assert cont._to_expanded_json("spam", 123) == [{"@value": 123}] - assert cont._to_expanded_json("spam", True) == [{"@value": True}] + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"spam": "bacon"}) == {"http://spam.eggs/spam": [{"@value": "bacon"}]} + assert cont._to_expanded_json({"spam": 123}) == {"http://spam.eggs/spam": [{"@value": 123}]} + assert cont._to_expanded_json({"spam": True}) == {"http://spam.eggs/spam": [{"@value": True}]} def test_to_expanded_datetime_value(self, mock_context): - cont = ld_container([{}], context=[mock_context]) - assert cont._to_expanded_json("eggs", datetime(2022, 2, 22)) == [ - {"@value": "2022-02-22T00:00:00", "@type": "http://schema.org/DateTime"} - ] + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"eggs": datetime(2022, 2, 22)}) == {"http://spam.eggs/eggs": [{"@list": [ + {"@value": "2022-02-22T00:00:00", "@type": "https://schema.org/DateTime"} + ]}]} + cont = ld_dict([{}], context=[mock_context]) + assert cont._to_expanded_json({"eggs": time(5, 4, 3)}) == {"http://spam.eggs/eggs": [{"@list": [ + {"@value": "05:04:03", "@type": "https://schema.org/Time"} + ]}]} + + def test_compact(self, mock_context): + cont = ld_container([{"http://spam.eggs/eggs": [{"@list": [{"@value": "a"}]}], + "http://spam.eggs/spam": [{"@value": "bacon"}]}]) + assert cont.compact([mock_context]) == {"@context": mock_context, "spam": "bacon", "eggs": ["a"]} + + def test_is_ld_id(self): + assert ld_container.is_ld_id([{"@id": "foo"}]) + assert not ld_container.is_ld_id([{"@id": "foo", "bar": "barfoo"}]) + assert not ld_container.is_ld_id({"@id": "foo"}) + assert not ld_container.is_ld_id([{"bar": "foo"}]) + + def test_is_ld_value(self): + assert ld_container.is_ld_value([{"@value": "foo"}]) + assert ld_container.is_ld_value([{"@value": "foo", "bar": "barfoo"}]) + assert not ld_container.is_ld_value({"@value": "foo"}) + assert not ld_container.is_ld_value([{"bar": "foo"}]) + + def test_is_typed_ld_value(self): + assert ld_container.is_typed_ld_value([{"@value": "foo", "@type": "bar"}]) + assert ld_container.is_typed_ld_value([{"@value": "foo", "@type": "bar", "bar": "barfoo"}]) + assert not ld_container.is_typed_ld_value([{"@type": "bar"}]) + assert not ld_container.is_typed_ld_value([{"@value": "foo"}]) + assert not ld_container.is_typed_ld_value({"@value": "foo", "@type": "bar"}) + assert not ld_container.is_typed_ld_value([{"bar": "foo"}]) + + def test_are_values_equal(self): + assert ld_container.are_values_equal({"@id": "foo"}, {"@id": "foo"}) + assert not ld_container.are_values_equal({"@id": "foo"}, {"@id": "bar"}) + assert ld_container.are_values_equal({"@id": "foo"}, {"@id": "foo", "@value": "bar"}) + assert ld_container.are_values_equal({"@value": "foo"}, {"@value": "foo"}) + assert ld_container.are_values_equal({"@value": "bar"}, {"@id": "foo", "@value": "bar"}) + assert not ld_container.are_values_equal({"@value": "foo"}, {"@value": "bar"}) + assert not ld_container.are_values_equal({"@type": "bar", "@value": "foo"}, {"@value": "foo"}) + assert ld_container.are_values_equal({"@type": "bar", "@value": "foo"}, {"@type": "bar", "@value": "foo"}) diff --git a/test/hermes_test/model/types/test_ld_dict.py b/test/hermes_test/model/types/test_ld_dict.py index 0bc15792..545b704f 100644 --- a/test/hermes_test/model/types/test_ld_dict.py +++ b/test/hermes_test/model/types/test_ld_dict.py @@ -1,9 +1,10 @@ # SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR) -# SPDX-FileContributor: Stephan Druskat -# SPDX-FileContributor: Michael Fritzsche # # SPDX-License-Identifier: Apache-2.0 +# SPDX-FileContributor: Stephan Druskat +# SPDX-FileContributor: Michael Fritzsche + import pytest from hermes.model.types.ld_dict import ld_dict @@ -26,26 +27,26 @@ def test_malformed_input(): def test_build_in_get(): - di = ld_dict([{"name": [{"@value": "Manu Sporny"}], - "homepage": [{"@id": "http://manu.sporny.org/"}], - "foo": [{"foobar": "bar", "barfoo": "foo"}]}]) - assert di["name"] == "Manu Sporny" - assert di["homepage"] == "http://manu.sporny.org/" - assert di["foo"].data_dict == ld_dict([{"foobar": "bar", "barfoo": "foo"}]).data_dict + di = ld_dict([{"http://xmlns.com/foaf/0.1/name": [{"@value": "Manu Sporny"}], + "http://xmlns.com/foaf/0.1/homepage": [{"@id": "http://manu.sporny.org/"}], + "http://xmlns.com/foaf/0.1/foo": [{"http://xmlns.com/foaf/0.1/foobar": [{"@value": "bar"}], + "http://xmlns.com/foaf/0.1/barfoo": [{"@value": "foo"}]}]}], + context=[{"xmlns": "http://xmlns.com/foaf/0.1/"}]) + assert di["xmlns:name"] == ["Manu Sporny"] + assert di["xmlns:homepage"] == ["http://manu.sporny.org/"] + assert di["xmlns:foo"] == [{"xmlns:foobar": ["bar"], "xmlns:barfoo": ["foo"]}] with pytest.raises(KeyError): di["bar"] di = ld_dict([{"http://xmlns.com/foaf/0.1/name": [{"@value": "Manu Sporny"}]}], context=[{"xmlns": "http://xmlns.com/foaf/0.1/"}]) - assert di["xmlns:name"] == "Manu Sporny" + assert di["xmlns:name"] == ["Manu Sporny"] - # FIXME: fixing #433 would fix this - # get -> list to python -> create empty list -> to fill dicts -> expand them -> no expansion method for dicts di = ld_dict([{"http://xmlns.com/foaf/0.1/name": [{"@value": "foo"}], "http://xmlns.com/foaf/0.1/foo": [{"http://xmlns.com/foaf/0.1/barfoo": [{"@id": "foo"}], "http://xmlns.com/foaf/0.1/fooba": [{"@value": "ba"}]}, {"http://xmlns.com/foaf/0.1/barfoo": [{"@id": "foo"}], - "http://xmlns.com/foaf/0.1/fooba": [{"@value": "ba"}]}]}], + "http://xmlns.com/foaf/0.1/fooba": [{"@value": "ba"}]}]}], context=[{"xmlns": "http://xmlns.com/foaf/0.1/"}]) assert isinstance(di["http://xmlns.com/foaf/0.1/foo"], ld_list) @@ -130,9 +131,8 @@ def test_build_in_set(): }] }] } - assert isinstance(di["schema:result"]["schema:error"]["schema:name"], ld_list) + assert isinstance(di["schema:result"][0]["schema:error"][0]["schema:name"], ld_list) - # FIXME: fixing #433 would fix this (setting nested python dicts) di = ld_dict([{}], context=[{"schema": "https://schema.org/"}]) di["@type"] = "schema:Thing" di["schema:result"] = {"@type": "schema:Action", "schema:error": {"@type": "schema:Thing", "schema:name": "foo"}} @@ -165,10 +165,30 @@ def test_build_in_contains(): assert "xmlns:foo" not in di and "homepage" not in di and "foo" not in di +def test_build_in_comparison(): + di = ld_dict([{}], context={"schema": "https://schema.org/"}) + assert di != 1 and di != [] and di != "" + di["@id"] = "foo" + di["schema:name"] = "bar" + assert di == {"@id": "foo"} + # Fail probably because of bug in ld_dict + # that is fixed on refactor/data-model after merge of refactor/384-test-ld_dict + assert di == {"@id": "foo", "schema:name": "bar"} + assert di == {"@id": "foo", "schema:name": "b"} + assert di == {"schema:name": "bar"} + di = ld_dict([{}], context={"schema": "https://schema.org/"}) + di["schema:Person"] = {"schema:name": "foo"} + assert di == {"schema:Person": {"schema:name": "foo"}} + di["schema:Person"].append({"schema:name": "bar"}) + assert di == {"schema:Person": [{"schema:name": "foo"}, {"schema:name": "bar"}]} + assert di != {"schema:name": "foo"} + + def test_get(): - di = ld_dict([{"https://schema.org/name": [{"@value": "Manu Sporny"}]}], context=[{"schema": "https://schema.org/"}]) - assert di.get("https://schema.org/name") == "Manu Sporny" - assert di.get("schema:name") == "Manu Sporny" + di = ld_dict([{"https://schema.org/name": [{"@value": "Manu Sporny"}]}], + context=[{"schema": "https://schema.org/"}]) + assert di.get("https://schema.org/name") == ["Manu Sporny"] + assert di.get("schema:name") == ["Manu Sporny"] assert di.get("bar", None) is None with pytest.raises(KeyError): di.get("bar") @@ -222,7 +242,8 @@ def test_compact_keys(): "http://xmlns.com/foaf/0.1/homepage": {"@id": "http://manu.sporny.org/"}}) assert {*di.compact_keys()} == {"http://xmlns.com/foaf/0.1/name", "homepage"} - di = ld_dict([{}], context=[{"xmls": "http://xmlns.com/foaf/0.1/", "homepage": "http://xmlns.com/foaf/0.1/homepage"}]) + di = ld_dict([{}], + context=[{"xmls": "http://xmlns.com/foaf/0.1/", "homepage": "http://xmlns.com/foaf/0.1/homepage"}]) di.update({"http://xmlns.com/foaf/0.1/name": "Manu Sporny", "http://xmlns.com/foaf/0.1/homepage": {"@id": "http://manu.sporny.org/"}}) assert {*di.compact_keys()} == {"xmls:name", "homepage"} @@ -233,11 +254,11 @@ def test_items(): inner_di = ld_dict([{}], parent=di, key="http://xmlns.com/foaf/0.1/foo") inner_di.update({"xmlns:foobar": "bar", "http://xmlns.com/foaf/0.1/barfoo": {"@id": "foo"}}) di.update({"http://xmlns.com/foaf/0.1/name": "foo", "xmlns:homepage": {"@id": "bar"}, "xmlns:foo": inner_di}) - assert [*di.items()][0:2] == [("http://xmlns.com/foaf/0.1/name", "foo"), - ("http://xmlns.com/foaf/0.1/homepage", "bar")] + assert [*di.items()][0:2] == [("http://xmlns.com/foaf/0.1/name", ["foo"]), + ("http://xmlns.com/foaf/0.1/homepage", ["bar"])] assert [*di.items()][2][0] == "http://xmlns.com/foaf/0.1/foo" - assert [*di.items()][2][1].data_dict == {"http://xmlns.com/foaf/0.1/foobar": [{"@value": "bar"}], - "http://xmlns.com/foaf/0.1/barfoo": [{"@id": "foo"}]} + assert [*di.items()][2][1][0] == {"http://xmlns.com/foaf/0.1/foobar": [{"@value": "bar"}], + "http://xmlns.com/foaf/0.1/barfoo": [{"@id": "foo"}]} def test_ref(): @@ -255,14 +276,14 @@ def test_to_python(): inner_di = ld_dict([{}], parent=di) inner_di.update({"xmlns:foobar": "bar", "http://xmlns.com/foaf/0.1/barfoo": {"@id": "foo"}}) di.update({"http://xmlns.com/foaf/0.1/name": "foo", "xmlns:homepage": {"@id": "bar"}, "xmlns:foo": inner_di}) - assert di.to_python() == {"xmlns:name": "foo", "xmlns:homepage": "bar", - "xmlns:foo": {"xmlns:foobar": "bar", "xmlns:barfoo": "foo"}} + assert di.to_python() == {"xmlns:name": ["foo"], "xmlns:homepage": ["bar"], + "xmlns:foo": [{"xmlns:foobar": ["bar"], "xmlns:barfoo": ["foo"]}]} di.update({"http://spam.eggs/eggs": { "@value": "2022-02-22T00:00:00", "@type": "https://schema.org/DateTime" }}) - assert di.to_python() == {"xmlns:name": "foo", "xmlns:homepage": "bar", - "xmlns:foo": {"xmlns:foobar": "bar", "xmlns:barfoo": "foo"}, - "http://spam.eggs/eggs": "2022-02-22T00:00:00"} + assert di.to_python() == {"xmlns:name": ["foo"], "xmlns:homepage": ["bar"], + "xmlns:foo": [{"xmlns:foobar": ["bar"], "xmlns:barfoo": ["foo"]}], + "http://spam.eggs/eggs": ["2022-02-22T00:00:00"]} def test_from_dict(): @@ -308,13 +329,15 @@ def test_from_dict(): assert di.full_context == 2 * [{"schema": "https://schema.org/"}] assert di.context == [] and di.key == "schema:error" and di.index is None - di = ld_dict.from_dict({"@context": [{"schema": "https://schema.org/"}], "@type": "schema:Thing", "xmlns:name": "fo"}, + di = ld_dict.from_dict({"@context": [{"schema": "https://schema.org/"}], + "@type": "schema:Thing", "xmlns:name": "fo"}, context=[{"schema": "https://schema.org/", "xmlns": "http://xmlns.com/foaf/0.1/"}]) - assert di["http://xmlns.com/foaf/0.1/name"] == di["xmlns:name"] == "fo" + assert di["http://xmlns.com/foaf/0.1/name"] == di["xmlns:name"] == ["fo"] assert di.context == [{"schema": "https://schema.org/"}, {"schema": "https://schema.org/", "xmlns": "http://xmlns.com/foaf/0.1/"}] - outer_di = ld_dict.from_dict({"@context": [{"schema": "https://schema.org/"}], "@type": "schema:Thing", "@id": "foo"}) + outer_di = ld_dict.from_dict({"@context": [{"schema": "https://schema.org/"}], + "@type": "schema:Thing", "@id": "foo"}) di = ld_dict.from_dict({"@context": {"schema": "https://schema.org/"}, "@type": "schema:Action", "schema:name": "foo"}, parent=outer_di, key="schema:result") @@ -324,7 +347,7 @@ def test_from_dict(): di = ld_dict.from_dict({"@context": {"schema": "https://schema.org/"}, "@type": "schema:Thing", "xmlns:name": "fo"}, context={"xmlns": "http://xmlns.com/foaf/0.1/"}) - assert di["http://xmlns.com/foaf/0.1/name"] == di["xmlns:name"] == "fo" + assert di["http://xmlns.com/foaf/0.1/name"] == di["xmlns:name"] == ["fo"] assert di.context == [{"schema": "https://schema.org/"}, {"xmlns": "http://xmlns.com/foaf/0.1/"}] diff --git a/test/hermes_test/model/types/test_ld_list.py b/test/hermes_test/model/types/test_ld_list.py new file mode 100644 index 00000000..fc9ca6a5 --- /dev/null +++ b/test/hermes_test/model/types/test_ld_list.py @@ -0,0 +1,325 @@ +# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR) +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Sophie Kernchen +# SPDX-FileContributor: Michael Fritzsche + +from datetime import date +import pytest + +from hermes.model.types.ld_list import ld_list +from hermes.model.types.ld_dict import ld_dict + + +def test_undefined_list(): + with pytest.raises(ValueError): + ld_list({}, key="foo") + with pytest.raises(ValueError): + ld_list([{"@set": [{"@value": "bacon"}]}], key="foo") + with pytest.raises(ValueError): + ld_list([{"@value": "bacon"}], key="@type") + with pytest.raises(ValueError): + ld_list(["bacon"], key="eggs") + with pytest.raises(ValueError): + ld_list([{"@list": ["a", "b"]}]) # no given key + + +def test_list_basics(): + li_data = [{"@list": [{"@value": "bar"}]}] + li = ld_list(li_data, key="foo") + assert li._data is li_data + assert li.item_list is li_data[0]["@list"] + li_data = [{"@graph": [{"@value": "bar"}]}] + li = ld_list(li_data, key="foo") + assert li._data is li_data + assert li.item_list is li_data[0]["@graph"] + li_data = [{"@value": "bar"}] + li = ld_list(li_data, key="foo") + assert li._data is li_data + assert li.item_list is li_data + assert li.container_type == "@set" + + +def test_build_in_get(): + li = ld_list([{"@list": [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}]}], key="name") + assert li[0] == "foo" and li[-1] == "foobar" + assert li[:2] == ["foo", "bar"] and li[1:-1] == ["bar"] + assert li[::2] == ["foo", "foobar"] and li[::-1] == ["foobar", "bar", "foo"] + + li = ld_list([{"@list": [{"@type": "A", "schema:name": "a"}, {"@list": [{"@type": "A", "schema:name": "a"}]}]}], + key="schema:person") + assert isinstance(li[0], ld_dict) and li[0].data_dict == {"@type": "A", "schema:name": "a"} and li[0].index == 0 + assert isinstance(li[1], ld_list) and li[1].item_list == [{"@type": "A", "schema:name": "a"}] and li[1].index == 1 + assert li[1].key == li.key + + +def test_build_in_set(): + li = ld_list([{"@list": [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}]}], + key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li[0] = "bar" + li[-1] = "barfoo" + assert li.item_list[0] == {"@value": "bar"} and li.item_list[-1] == {"@value": "barfoo"} + li[:2] = ["fo", "ar"] + assert li.item_list == [{"@value": "fo"}, {"@value": "ar"}, {"@value": "barfoo"}] + li[1:-1] = ["br"] + assert li.item_list == [{"@value": "fo"}, {"@value": "br"}, {"@value": "barfoo"}] + li[::2] = ["oo", "fooba"] + assert li.item_list == [{"@value": "oo"}, {"@value": "br"}, {"@value": "fooba"}] + li[::-1] = ["foobar", "bar", "foo"] + assert li.item_list == [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}] + with pytest.raises(ValueError): + li[::2] = "foo" + with pytest.raises(TypeError): + li[:2] = 1 + li[0] = ld_dict([{"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "a"}]}], parent=li, + key=li.key) + assert isinstance(li[0], ld_dict) + assert li[0].data_dict == {"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "a"}]} + li[0] = {"@type": "schema:Thing", "schema:name": "b"} + assert isinstance(li[0], ld_dict) + assert li[0].data_dict == {"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "b"}]} + li[0] = ld_list.from_list([{"@type": "schema:Thing", "schema:name": "a"}], parent=li, key=li.key) + assert isinstance(li[0], ld_list) + assert li[0].item_list == [{"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "a"}]}] + li[0] = {"@set": [{"@type": "schema:Thing", "schema:name": "b"}]} + assert isinstance(li[0], ld_list) + assert li[0].item_list == [{"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "b"}]}] + li[0] = [{"@type": "schema:Thing", "schema:name": "b"}] + assert isinstance(li[0], ld_list) + assert li[0].item_list == [{"@type": ["https://schema.org/Thing"], "https://schema.org/name": [{"@value": "b"}]}] + + +def test_build_in_set_complex(): + di = ld_dict([{"https://schema.org/name": [{"@list": [{"@value": "c"}]}]}], + context=[{"schema": "https://schema.org/"}]) + temp = di["schema:name"] + di["schema:name"][0] = {"@list": ["a", "b"]} + assert di["schema:name"][0] == ["a", "b"] and temp._data is di["schema:name"]._data + li = ld_list([], key="schema:time", context=[{"schema": "https://schema.org/"}]) + date_obj = date(year=2025, month=12, day=31) + li.append(date_obj) + assert li.item_list == [{"@value": date_obj.isoformat(), "@type": "https://schema.org/Date"}] + li[0:1] = ["a", "b", "c"] + assert li == ["a", "b", "c"] + li[0:3:2] = [["aa", "bb"]] + assert li == ["aa", "b", "bb"] + + +def test_build_in_del(): + li = ld_list([{"@list": [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}]}], + key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + del li[0:3:2] + assert li == ["bar"] + del li[0] + assert li == [] + di = ld_dict([{}], context=[{"schema": "https://schema.org/"}]) + di["schema:name"] = [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}] + li = di["schema:name"] + del li[0] + assert len(di["schema:name"]) == 2 + di = ld_dict([{}], context=[{"schema": "https://schema.org/"}]) + di["schema:name"] = [{"@list": [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}]}] + li = di["schema:name"] + del di["schema:name"][0:3:2] + assert len(di["schema:name"]) == 1 and len(li) == 1 + + +def test_build_in_len(): + assert len(ld_list([{"@list": []}], key="foo")) == 0 + assert len(ld_list([{"@list": [{"@value": "foo"}, {"@value": "bar"}, {"@value": "foobar"}]}], key="foo")) == 3 + + +def test_build_in_iter(): + li = ld_list([{"@list": [{"@value": "foo"}, {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}, + {"@list": [{"@value": "bar"}]}]}], key="https://schema.org/name", + context=[{"schema": "https://schema.org/"}]) + li = [val for val in li] + assert li[0] == "foo" + assert li[1].data_dict == {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]} and li[1].index == 1 + assert isinstance(li[2], ld_list) and li[2].item_list == [{"@value": "bar"}] and li[2].index == 2 + assert li[2].key == "https://schema.org/name" + + +def test_append(): + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append(ld_list([{"@value": "foo"}], key="https://schema.org/name")) + assert isinstance(li[0], ld_list) and li[0].container_type == "@list" + li = ld_list([{"@graph": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append({"schema:name": "foo"}) + assert li[0] == {"https://schema.org/name": "foo"} and len(li) == 1 + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append("foo") + assert li[0] == "foo" and li.item_list[0] == {"@value": "foo"} and len(li) == 1 + li.append("bar") + assert li[0:2] == ["foo", "bar"] and li.item_list[1] == {"@value": "bar"} and len(li) == 2 + li.append(ld_dict.from_dict({"@type": "A", "schema:name": "a"}, parent=li, key=li.key)) + assert li.item_list[2] == {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]} and len(li) == 3 + li.append({"@type": "A", "schema:name": "a"}) + assert li.item_list[2] == li.item_list[3] + li.append(ld_list([{"@list": [{"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}]}], parent=li, + key=li.key)) + li.append([{"@type": "A", "schema:name": "a"}]) + li.append(2 * [{"@type": "A", "schema:name": "a"}]) + assert 2 * li[4].item_list == 2 * li[5].item_list == li[6].item_list + + +def test_build_in_contains(): + li = ld_list([], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + assert [] in li + li.append("foo") + li.append({"@type": "A", "schema:name": "a"}) + assert "foo" in li and {"@type": "A", "schema:name": "a"} in li + assert {"@value": "foo"} in li and {"@type": "A", "https://schema.org/name": "a"} in li + assert ["foo", {"@type": "A", "schema:name": "a"}] in li + assert [{"@list": ["foo", {"@type": "A", "schema:name": "a"}]}] not in li + li.append({"@id": "schema:foo", "schema:name": "foo"}) + assert {"@id": "schema:foo"} in li and {"@id": "schema:foo", "schema:name": "foobar"} in li + assert {"schema:name": "foo"} in li + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append("foo") + assert "foo" in li + + +def test_build_in_comparison(): + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append({"@id": "foo", "schema:bar": "foobar"}) + assert [{"@list": [{"@id": "foo", "schema:bar": "barfoo"}]}] == li + assert [{"@list": [{"@id": "bar", "schema:bar": "foobar"}]}] != li + assert [{"@set": [{"@id": "foo", "schema:bar": "barfoo"}]}] == li + assert [{"@graph": [{"@id": "foo", "schema:bar": "barfoo"}]}] == li + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li2 = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema2": "https://schema.org/"}]) + assert li == [] and [] == li + assert li == li2 + li.append("foo") + li.append({"@type": "A", "schema:name": "a"}) + assert li != li2 + assert ["foo", {"@type": "A", "schema:name": "a"}] == li + assert ["foo"] != li2 + assert ["foo", {"@type": "A", "https://schema.org/name": "a"}] == li + li2.extend(["foo", {"@type": "A", "schema2:name": "a"}]) + assert li == li2 + li3 = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li3.extend([{"@type": "A", "schema:name": "a"}, "foo"]) + assert li != li3 + assert not li == 3 + assert li != 3 + li = ld_list([{"@list": []}], key="https://schema.org/Person", context=[{"schema": "https://schema.org/"}]) + li.append({"@id": "foo"}) + assert li == [{"@id": "foo"}] and li == [{"@id": "foo", "schema:name": "bar"}] and li == {"@list": [{"@id": "foo"}]} + li2 = ld_list([], key="@type", context=[{"schema": "https://schema.org/"}]) + li2.append("schema:name") + assert li != li2 + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li2 = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema2": "https://schema.org/"}]) + li.append("foo") + li2.append("bar") + assert li != li2 + li[0] = {"@type": "schema:foo", "@value": "bar"} + assert li != li2 + li[0] = {"@type": "schema:foobar", "@value": "bar"} + assert li != li2 + li = ld_list([], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li2 = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema2": "https://schema.org/"}]) + li.extend(["foo", "bar"]) + li2.extend(["bar", "foo"]) + assert li == li2 + li.append("bar") + li2.append("foo") + assert li != li2 + + +def test_hopcroft_karp(): + ver1 = {0, 1, 2, 3, 4} + ver2 = {10, 11, 12, 13, 14} + edges = {0: (10, 11), 1: (10, 14), 2: (12, 13), 3: (10, 14), 4: tuple([11])} + assert ld_list._hopcroft_karp(ver1, ver2, edges) == 4 + edges[4] = (11, 13) + assert ld_list._hopcroft_karp(ver1, ver2, edges) == 5 + ver1 = {0, 1, 2, 3, 4} + ver2 = {(0, 1, 3), (0, 4), (2, ), (2, 4), (1, 3)} + edges = { + 0: ((0, 1, 3), (0, 4)), 1: ((0, 1, 3), (1, 3)), 2: ((2,), (2, 4)), 3: ((0, 1, 3), (1, 3)), 4: ((0, 4), (2, 4)) + } + assert ld_list._hopcroft_karp(ver1, ver2, edges) == 5 + + +def test_extend(): + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.extend([]) + assert len(li) == 0 + li.extend(["foo"]) + assert li[0] == "foo" and li.item_list[0] == {"@value": "foo"} and len(li) == 1 + li.extend(["bar"]) + assert li[0:2] == ["foo", "bar"] and li.item_list[1] == {"@value": "bar"} and len(li) == 2 + li.extend([ld_dict([{"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}])]) + assert li[-1].data_dict == {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]} and len(li) == 3 + + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.extend(["foo", "bar", ld_dict([{"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}])]) + assert li[0:2] == ["foo", "bar"] and li.item_list[0:2] == [{"@value": "foo"}, {"@value": "bar"}] + assert li[-1].data_dict == {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]} and len(li) == 3 + + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append("foo") + li.extend(["bar", ld_dict([{"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}])]) + assert li[0:2] == ["foo", "bar"] and li.item_list[0:2] == [{"@value": "foo"}, {"@value": "bar"}] + assert li[-1].data_dict == {"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]} and len(li) == 3 + + +def test_to_python(): + li = ld_list([{"@list": []}], key="https://schema.org/name", context=[{"schema": "https://schema.org/"}]) + li.append("foo") + li.append(ld_dict([{"@type": ["A"], "https://schema.org/name": [{"@value": "a"}]}], parent=li)) + li.append(["a"]) + assert li[1]["@type"].item_list == ["A"] + assert li.to_python() == ["foo", {"@type": ["A"], "schema:name": ["a"]}, ["a"]] + + +def test_is_ld_list(): + assert not any(ld_list.is_ld_list(item) for item in [1, "", [], {}, {"@list": []}, [{}], [{"a": "b"}]]) + assert not any(ld_list.is_ld_list(item) for item in [[{"@list": ""}], [{"@list": ["a"]}, {"@list": ["b"]}]]) + assert all(ld_list.is_ld_list([{container_type: []}]) for container_type in ["@list", "@set", "@graph"]) + + +def test_is_container(): + assert not any(ld_list.is_container(item) for item in [1, "", [], {}, {"a": "b"}]) + assert not any(ld_list.is_container(item) for item in [ld_dict([{"a": "b"}]), + ld_list([{"@list": [{"@value": "a"}]}], key="foo")]) + assert not any(ld_list.is_container({"@list": value}) for value in ["", 1, {}]) + assert all(ld_list.is_container({container_type: []}) for container_type in ["@list", "@graph", "@set"]) + + +def test_from_list(): + with pytest.raises(ValueError): + ld_list.from_list([], key="@type", container_type="@list") + with pytest.raises(ValueError): + ld_list.from_list([], container_type="foo") + li = ld_list.from_list([], key="schema:foo") + assert li.item_list == li.context == [] and li.parent is li.index is None and li.key == "schema:foo" + assert li._data == [] and li.container_type == "@set" + li = ld_list.from_list([], parent=li, key="schema:name", context=[{"schema": "https://schema.org/"}]) + assert li.item_list == [] and li.parent is None and li.key == "schema:foo" + assert li.index is None and li.context == [] + li_data = ["a", {"@value": "b"}] + li = ld_list.from_list(li_data, parent=None, key="https://schema.org/name", + context=[{"schema": "https://schema.org/"}]) + assert li.item_list == [{"@value": "a"}, {"@value": "b"}] and li.parent is None + assert li.key == "https://schema.org/name" and li.index is None + assert li.context == [{"schema": "https://schema.org/"}] + assert li.item_list is not li_data # as li_data is expected to change they should not be the same object + + +def test_get_item_list_from_container(): + assert ld_list.get_item_list_from_container({"@list": ["a"]}) == ["a"] + assert ld_list.get_item_list_from_container({"@set": ["a"]}) == ["a"] + assert ld_list.get_item_list_from_container({"@graph": ["a"]}) == ["a"] + with pytest.raises(ValueError): + ld_list.get_item_list_from_container(["a"]) + with pytest.raises(ValueError): + ld_list.get_item_list_from_container({"@list": [], "@set": []}) + with pytest.raises(ValueError): + ld_list.get_item_list_from_container({"@list": {}}) + with pytest.raises(ValueError): + ld_list.get_item_list_from_container({"foo": []})