diff --git a/src/trackteroid/entities/base.py b/src/trackteroid/entities/base.py index 332d234..0da59c5 100644 --- a/src/trackteroid/entities/base.py +++ b/src/trackteroid/entities/base.py @@ -1603,7 +1603,8 @@ def _prepare_note_entity(self, ftrack_note_entity, kwargs, parent_source): for resource_id in resource_ids: recipient = recipients.create( note_id=ftrack_note_entity["id"], - resource_id=resource_id + resource_id=resource_id, + no_parent=True ) ftrack_note_entity["recipients"].append(list(recipient.values())[0].ftrack_entity) diff --git a/tests/conftest.py b/tests/conftest.py index 4a99ba9..a7207dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,52 +1,119 @@ -import dataclasses +from collections import OrderedDict +import logging +import random import uuid -from typing import List import ftrack_api +from ftrack_api.collection import Collection import pytest -from trackteroid import SESSION +from trackteroid import SESSION, Query +from trackteroid.entities import * +from trackteroid.entities.base import Entity, EntityCollection -@dataclasses.dataclass class TestScenario: - project_id: str - sequence_ids: List[str] = dataclasses.field(default_factory=list) - shot_ids: List[str] = dataclasses.field(default_factory=list) - assetbuild_ids: List[str] = dataclasses.field(default_factory=list) - task_ids: List[str] = dataclasses.field(default_factory=list) - asset_ids: List[str] = dataclasses.field(default_factory=list) - version_ids: List[str] = dataclasses.field(default_factory=list) - component_ids: List[str] = dataclasses.field(default_factory=list) + __test__ = False + + def __init__(self, project_ids=[]): + self.entity_ids = { + "Project": project_ids, + "Folder": [], + "Sequence": [], + "Shot": [], + "AssetBuild": [], + "Task": [], + "Asset": [], + "AssetVersion": [], + "Component": [] + } def query_project(self, session, projections): return session.query( f"select {', '.join(projections)} from Project " - f"where id is {self.project_id}" + f"where id is {self.entity_ids['Project'][0]}" ).one() - def grab(self, session, entity_type, required_fields): - - type_ids = { - "Sequence": self.sequence_ids, - "Shot": self.shot_ids, - "AssetBuild": self.assetbuild_ids, - "Task": self.task_ids, - "Asset": self.asset_ids, - "AssetVersion": self.version_ids, - "Component": self.component_ids, - } - - if entity_type not in type_ids: + def create_entity(self, ftrack_session, entity_type, params): + entity = ftrack_session.create(entity_type, params) + self.entity_ids[entity_type].append(entity["id"]) + return entity["id"] + + def create_assets_tasks_versions(self, ftrack_session): + + for parent_id in self.entity_ids["AssetBuild"] + self.entity_ids["Shot"]: + parent = ftrack_session.get("TypedContext", parent_id) + for _ in range(3): + params = {"name": "asset_{0}".format(uuid.uuid4()), "parent": parent} + asset_id = self.create_entity(ftrack_session, "Asset", params) + params = {"name": "task_{0}".format(uuid.uuid4()), "parent": parent} + task_id = self.create_entity(ftrack_session, "Task", params=params) + for version in range(1, 4): + asset = ftrack_session.get("Asset", asset_id) + task = ftrack_session.get("Task", task_id) + params = {"version": version, "asset": asset, "task": task} + self.create_entity(ftrack_session, "AssetVersion", params=params) + + def grab(self, session, entity_type, required_fields=[], limit=0): + if entity_type not in self.entity_ids: raise KeyError(entity_type) - if not type_ids[entity_type]: + if not self.entity_ids[entity_type]: raise ValueError(f"No entities of type {entity_type} exists in the scenario") - query = f"select {', '.join(required_fields)} from {entity_type} where " - for field in required_fields: - query += f"{field} like '%' " - query += f"and id in ({', '.join(type_ids[entity_type])})" - return session.query(query).all() + query = "" + if required_fields: + query += f"select {', '.join(required_fields)} from " + query += f"{entity_type} where " + query += f"id in ({', '.join(self.entity_ids[entity_type])})" + if limit > 0: + query += f" limit {limit}" + result = session.query(query).all() + random.shuffle(result) + return result + + def construct_collection_from_ftrack_entities(self, ftrack_entities): + entities = [] + for ftrack_entity in ftrack_entities: + entities.append( + (ftrack_entity["id"], Entity(_cls=globals()[ftrack_entity.entity_type], ftrack_entity=ftrack_entity))) + collection = EntityCollection( + _cls=entities[0][1].__class__, + entities=OrderedDict(entities), + session=SESSION + ) + collection.query = Query(entities[0][1].__class__).by_id(*[_["id"] for _ in ftrack_entities]) + return collection + + def construct_simple_collection(self, session, entity_type, limit=0, required_fields=[]): + entities = self.grab(session, entity_type, limit=limit, required_fields=required_fields) + return self.construct_collection_from_ftrack_entities(entities) + + def _assert_attributes(self, ftrack_entity, entity_collection): + not_implemented_msg = "Skipped attribute testing for '{key}', because required Entity is not implemented" + + for key, value in ftrack_entity.items(): + if isinstance(value, Collection): + try: + _collection_ids = [] + if len(value) > 0: + for _ in value: + _collection_ids.append(_["id"]) + logging.debug("Test for secondary") + assert _collection_ids == getattr(entity_collection, key).id + except NotImplementedError: + logging.warning(not_implemented_msg.format(key=key)) + else: + # testing primary attributes + logging.debug("Testing against attribute '{}'".format(key)) + try: + _attribute_value = getattr(entity_collection, key) + if isinstance(_attribute_value, EntityCollection): + _ftrack_entities = [_.ftrack_entity for _ in _attribute_value._entities.values()] + assert [value] == _ftrack_entities + else: + assert [value] == getattr(entity_collection, key) + except NotImplementedError: + logging.warning(not_implemented_msg.format(key=key)) @pytest.fixture(autouse=True) @@ -60,7 +127,7 @@ def ftrack_session(): yield session -@pytest.fixture(scope="session") +@pytest.fixture def ftrack_project_id(): session = ftrack_api.Session() name = "unittests_{0}".format(uuid.uuid1().hex) @@ -96,111 +163,57 @@ def ftrack_project_id(): @pytest.fixture def scenario_project(ftrack_project_id) -> TestScenario: - return TestScenario(project_id=ftrack_project_id) + return TestScenario(project_ids=[ftrack_project_id]) @pytest.fixture -def scenario_sequence(ftrack_session, scenario_project) -> TestScenario: +def scenario_sequence_folder(ftrack_session, scenario_project) -> TestScenario: project = scenario_project.query_project(ftrack_session, ["project_schema"]) - - # Create sequences, shots and tasks. - for sequence_number in range(1, 5): - sequence = ftrack_session.create( - "Sequence", - {"name": "seq_{0}".format(uuid.uuid4()), "parent": project}, - ) - - scenario_project.sequence_ids.append(sequence["id"]) + for _ in range(2): + params = {"name": "seq_{0}".format(uuid.uuid4()), "parent": project} + scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Sequence", params=params) + params = {"name": "asset_builds".format(uuid.uuid4()), "parent": project} + scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Folder", params=params) ftrack_session.commit() - return scenario_project - - -@pytest.fixture -def scenario_assetbuild(ftrack_session, scenario_project) -> TestScenario: - project = scenario_project.query_project(ftrack_session, ["project_schema"]) - for _ in range(4): - sequence = ftrack_session.create( - "AssetBuild", - {"name": "ab_{0}".format(uuid.uuid4()), "parent": project}, - ) - - scenario_project.assetbuild_ids.append(sequence["id"]) - ftrack_session.commit() return scenario_project @pytest.fixture -def scenario_shot(ftrack_session, scenario_sequence) -> TestScenario: - project = scenario_sequence.query_project(ftrack_session, ["project_schema"]) +def scenario_assetbuild_shot(ftrack_session, scenario_sequence_folder) -> TestScenario: + assetbuild_folder_id = scenario_sequence_folder.entity_ids["Folder"][0] + assetbuild_folder = ftrack_session.get("Folder", assetbuild_folder_id) + for _ in range(3): + params = {"name": "asset_build_{0}".format(uuid.uuid4()), "parent": assetbuild_folder} + scenario_sequence_folder.create_entity(ftrack_session, "AssetBuild", params=params) + + project = scenario_sequence_folder.query_project(ftrack_session, ["project_schema"]) project_schema = project["project_schema"] default_shot_status = project_schema.get_statuses("Shot")[0] - - for sequence_id in scenario_sequence.sequence_ids: + for sequence_id in scenario_sequence_folder.entity_ids["Sequence"]: sequence = ftrack_session.get("Sequence", sequence_id) - for shot_number in range(1, 8): - shot = ftrack_session.create( - "Shot", - { - "name": "shot_{}".format(uuid.uuid4()), - "parent": sequence, - "status": default_shot_status, - }, - ) - scenario_sequence.shot_ids.append(shot["id"]) + for shot in range(3): + params = {"name": "shot_{}".format(uuid.uuid4()), "parent": sequence, "status": default_shot_status} + scenario_sequence_folder.create_entity(ftrack_session, "Shot", params=params) ftrack_session.commit() - return scenario_sequence - - -def _create_tasks(session, scenario, parent_ids): - for id_ in parent_ids: - parent = session.get("TypedContext", id_) - for task_number in range(1, 5): - task = session.create( - "Task", - { - "name": "task_{0}".format(uuid.uuid4()), - "parent": parent - }, - ) - scenario.task_ids.append(task["id"]) - session.commit() - return scenario + return scenario_sequence_folder @pytest.fixture -def scenario_shot_task(ftrack_session, scenario_shot) -> TestScenario: - return _create_tasks(ftrack_session, scenario_shot, scenario_shot.shot_ids) - - -@pytest.fixture -def scenario_assetbuild_task(ftrack_session, scenario_shot) -> TestScenario: - return _create_tasks(ftrack_session, scenario_shot, scenario_shot.assetbuild_ids) - - -def _create_assets(session, scenario, parent_ids): - for id_ in parent_ids: - parent = session.get("TypedContext", id_) - for task_number in range(1, 5): - task = session.create( - "Asset", - { - "name": "asset_{0}".format(uuid.uuid4()), - "parent": parent - }, - ) - scenario.asset_ids.append(task["id"]) +def scenario_asset_task_version(ftrack_session, scenario_assetbuild_shot) -> TestScenario: + scenario_assetbuild_shot.create_assets_tasks_versions(ftrack_session) + ftrack_session.commit() - session.commit() - return scenario + return scenario_assetbuild_shot @pytest.fixture -def scenario_shot_asset(ftrack_session, scenario_shot) -> TestScenario: - return _create_assets(ftrack_session, scenario_shot, scenario_shot.shot_ids) - +def scenario_components(ftrack_session, scenario_asset_task_version) -> TestScenario: + for assetversion_id in scenario_asset_task_version.entity_ids["AssetVersion"]: + assetversion = ftrack_session.get("AssetVersion", assetversion_id) + params = {"name": "component_{}".format(uuid.uuid4()), "parent": assetversion} + scenario_asset_task_version.create_entity(ftrack_session, "Component", params=params) + ftrack_session.commit() -@pytest.fixture -def scenario_assetbuild_asset(ftrack_session, scenario_assetbuild) -> TestScenario: - return _create_assets(ftrack_session, scenario_assetbuild, scenario_assetbuild.assetbuild_ids) + return scenario_asset_task_version diff --git a/tests/test_authoring.py b/tests/test_authoring.py index c34cd5f..136f00e 100644 --- a/tests/test_authoring.py +++ b/tests/test_authoring.py @@ -18,9 +18,9 @@ from trackteroid.entities.base import EntityCollection, Entity -def test_link_inputs(scenario_sequence): +def test_link_inputs(scenario_sequence_folder): sequences = ( - Query(Sequence).by_id(*scenario_sequence.sequence_ids).get_all(order_by="name") + Query(Sequence).by_id(*scenario_sequence_folder.entity_ids["Sequence"]).get_all(order_by="name") ) sequence1 = sequences[0] sequence2 = sequences[1] @@ -54,7 +54,7 @@ def test_link_inputs(scenario_sequence): def test_create(scenario_project, ftrack_session): - project = Query(Project).by_id(scenario_project.project_id).get_one() + project = Query(Project).by_id(scenario_project.entity_ids["Project"][0]).get_one() test_entities = [] @@ -146,7 +146,7 @@ def test_create_project(ftrack_session): def test_create_sequence(scenario_project): - test_project = Query(Project).by_id(scenario_project.project_id).get_one() + test_project = Query(Project).by_id(scenario_project.entity_ids["Project"][0]).get_one() with pytest.raises(AssertionError): test_project.children[Sequence].create() @@ -163,9 +163,9 @@ def test_create_sequence(scenario_project): assert test_project.id == queried_sequence.parent_id -def test_create_shot(scenario_sequence): +def test_create_shot(scenario_sequence_folder): test_sequence = ( - Query(Sequence).by_id(Project, scenario_sequence.project_id).get_all() + Query(Sequence).by_id(Project, scenario_sequence_folder.entity_ids["Project"][0]).get_all() ) with pytest.raises(AssertionError): @@ -177,7 +177,7 @@ def test_create_shot(scenario_sequence): assert "Ambiguous context" in str(excinfo.value) test_sequence = ( - Query(Sequence).by_id(Project, scenario_sequence.project_id).get_first() + Query(Sequence).by_id(Project, scenario_sequence_folder.entity_ids["Project"][0]).get_first() ) created_shot = test_sequence.children[Shot].create(name=str(uuid.uuid4())) @@ -191,10 +191,10 @@ def test_create_shot(scenario_sequence): assert test_sequence.id == queried_shot.parent_id -def test_create_task(scenario_shot): +def test_create_task(scenario_assetbuild_shot): test_shot = ( Query(Shot) - .by_id(Project, scenario_shot.project_id) + .by_id(Project, scenario_assetbuild_shot.entity_ids["Project"][0]) .get_all(projections=["project.project_schema._task_type_schema.types.name"]) ) task_types = [ @@ -214,7 +214,7 @@ def test_create_task(scenario_shot): ) assert "Ambiguous context" in str(context.value) - test_shot = Query(Shot).by_id(Project, scenario_shot.project_id).get_first() + test_shot = Query(Shot).by_id(Project, scenario_assetbuild_shot.entity_ids["Project"][0]).get_first() created_task = test_shot.children[Task].create( name=str(uuid.uuid4()), type=random.choice(task_types) @@ -257,9 +257,9 @@ def _construct_collection_from_ftrack_entities(ftrack_entities, session): return collection -def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_asset, ftrack_session): - shots = scenario_shot_asset.grab(ftrack_session, "Shot", ["assets.id"]) - asset_builds = scenario_assetbuild_asset.grab(ftrack_session, "AssetBuild", ["assets.id"]) +def test_create_on_ambigious_context(scenario_asset_task_version, ftrack_session): + shots = scenario_asset_task_version.grab(ftrack_session, "Shot", ["assets.id"]) + asset_builds = scenario_asset_task_version.grab(ftrack_session, "AssetBuild", ["assets.id"]) mix = [shots[0]["assets"][0], asset_builds[0]["assets"][0]] @@ -277,7 +277,7 @@ def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_as assert "Ambiguous context" in str(context.value) -def test_create_note(scenario_shot): +def test_create_note(scenario_assetbuild_shot): categories = Query(NoteCategory).get_all() # Run it three times, once on an empty collection, another in a collection # with a single entity and then two (this last one will test that the parent @@ -285,7 +285,7 @@ def test_create_note(scenario_shot): for i in range(3): chosen_category = random.choice(categories) # Query every time to ensure the cache is not fooling the tests - test_shot = Query(Shot).by_id(*scenario_shot.shot_ids).get_first(projections=["notes.content"]) + test_shot = Query(Shot).by_id(*scenario_assetbuild_shot.entity_ids["Shot"]).get_first(projections=["notes.content"]) assert ( len(test_shot.notes) == i diff --git a/tests/test_entity.py b/tests/test_entity.py new file mode 100644 index 0000000..da0c8ad --- /dev/null +++ b/tests/test_entity.py @@ -0,0 +1,505 @@ +import logging +import pytest + +from arrow.arrow import Arrow +from collections import OrderedDict, Counter +from datetime import datetime +from itertools import chain + +from trackteroid import ( + SESSION, + Query +) +from trackteroid.entities.base import ( + Entity, + EmptyCollection, + EntityCollection +) +from trackteroid.entities.schematypes import CUSTOM_ATTRIBUTE_TYPE_COMPATIBILITY +from trackteroid.entities import * + + +# TODO: tests missing for `count()`, ` map()`, ` group_by_an_map` + + +def test_fetch_attributes(scenario_asset_task_version, ftrack_session): + assetversion = scenario_asset_task_version.grab(ftrack_session, "AssetVersion")[0] + query = SESSION.query("AssetVersion where id is {}".format(assetversion["id"])) + assert not query.one()["asset"] + collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([assetversion]) + collection.fetch_attributes("asset") + assert query.one()["asset"] + +def test_get_attributes(scenario_asset_task_version, ftrack_session): + assetversion = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["asset.versions"])[0] + + # first pass without auto population + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([assetversion]) + scenario_asset_task_version._assert_attributes(assetversion, assetversion_collection) + + # second with auto population + setattr(SESSION, "auto_populate", True) + scenario_asset_task_version._assert_attributes(assetversion, assetversion_collection) + setattr(SESSION, "auto_populate", False) + + # test nested attribute access + versions = [] + for _assetversion in assetversion["asset"]["versions"]: + versions.append(_assetversion["version"]) + assert versions == assetversion_collection.asset.versions.version + +def test_get_custom_attributes(scenario_asset_task_version, ftrack_session): + task = scenario_asset_task_version.grab(ftrack_session, "Task", ["custom_attributes"])[0] + task_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([task]) + + for key, value in task["custom_attributes"].items(): + assert [value] == getattr(task_collection, "custom_{}".format(key)) + + setattr(SESSION, "auto_populate", True) + for key, value in task["custom_attributes"].items(): + assert [value] == getattr(task_collection, "custom_{}".format(key)) + setattr(SESSION, "auto_populate", False) + +def test_get_item(scenario_asset_task_version, ftrack_session): + assetversions = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(assetversions) + for i, assetversion in enumerate(assetversions): + collections = [ + assetversion_collection[assetversion["id"]], + assetversion_collection[i], + assetversion_collection[i:len(assetversions)] + ] + + # ensure result is an EntityCollection + assert any([isinstance(_, EntityCollection) for _ in collections]) + + # ensure it holds the proper entities + assert [assetversion] == [_.ftrack_entity for _ in collections[0]._entities.values()] + assert [assetversion] == [_.ftrack_entity for _ in collections[1]._entities.values()] + assert assetversions[i:len(assetversions)] == [_.ftrack_entity for _ in collections[2]._entities.values()] + +def test_contains(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=10) + + for _id, entity in assetversion_collection._entities.items(): + assert _id in assetversion_collection + assert entity in assetversion_collection + +def test_equality(scenario_asset_task_version, ftrack_session): + assetversion_collection_1 = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=20) + assetversion_collection_2 = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=20) + + assetversion_collection_3 = EntityCollection( + _cls=AssetVersion, + entities=assetversion_collection_1._entities + ) + assetversion_collection_3.query = Query(AssetVersion).by_id( + *[_.ftrack_entity["id"] for _ in assetversion_collection_3._entities.values()] + ) + + assert assetversion_collection_1 != assetversion_collection_2 + assert assetversion_collection_1 == assetversion_collection_3 + +def test_length(scenario_asset_task_version, ftrack_session): + assert 3 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "AssetVersion", limit=3)) + assert 10 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=10)) + assert 1 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "Project", limit=1)) + +def test_from_entities(scenario_asset_task_version, ftrack_session): + entities = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=10)._entities.values() + ftrack_entities = [_.ftrack_entity for _ in entities] + + collection = EntityCollection( + _cls=AssetVersion, + entities=OrderedDict( + [(ftrack_entities[0]["id"], entities)] + ) + ) + collection.query = Query(AssetVersion).by_id(*[_["id"] for _ in ftrack_entities]) + new_collection = collection.from_entities(entities) + + assert ftrack_entities == [_.ftrack_entity for _ in new_collection._entities.values()] + +def test_is_attribute_compatible_with_value(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["version", "task"]) + + # all cases where receiver collection has only a single entity + assert assetversion_collection._is_attribute_compatible_with_value("version", 5)[0] + assert assetversion_collection._is_attribute_compatible_with_value("version", [5])[0] + assert assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=1))[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", "foobar")[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", [5, 10])[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", ["5"])[0] + + # receiver collection has multiple entries + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2)._is_attribute_compatible_with_value("version", 5)[0] + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["version"])._is_attribute_compatible_with_value( + "version", [5])[0] + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["version"])._is_attribute_compatible_with_value( + "version", [5, 10, 15])[0] + assert not assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=2))[0] + assert not assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1))[0] + + # rebuild our test collection + new_assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=3, required_fields=["version", "task"]) + + assert new_assetversion_collection._is_attribute_compatible_with_value("version", [5, 6, 7])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", [5])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", [5, 6])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", "foobar")[0] + assert new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=3))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=1))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "AssetVersion", limit=1))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", EmptyCollection())[0] + +def test_is_custom_attribute_compatible_with_value(scenario_components, ftrack_session): + # test custom attributes + # test each type for each custom attribute for each entity type + for entity_type in scenario_components.entity_ids.keys(): + entity_cls = globals()[entity_type] + if issubclass(entity_cls, Entity): + try: + entity_collection = scenario_components.construct_simple_collection( + ftrack_session, entity_type, limit=1, required_fields=["custom_attributes"]) + except: + logging.info("No custom attributes found for entity type '{}'. Skipping.".format(entity_type)) + continue + entity = list(entity_collection._entities.values())[0].ftrack_entity + if entity["custom_attributes"]: + for attribute in entity["custom_attributes"]: + for _type in getattr(CUSTOM_ATTRIBUTE_TYPE_COMPATIBILITY, attribute).types: + attribute = "custom_{}".format(attribute) + assert not entity_collection._is_attribute_compatible_with_value(attribute, None)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, type)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [None])[0] + if _type is str: + assert entity_collection._is_attribute_compatible_with_value(attribute, "foobar")[0] + assert entity_collection._is_attribute_compatible_with_value( + attribute, u"anderes wort ohne umlaut ... stupid pycharm")[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, ["bla"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [5])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, ["bla", "blubb"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 5)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, {"furz": "testen"})[0] + elif _type in (int, float): + assert entity_collection._is_attribute_compatible_with_value(attribute, 42)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, 0)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, 11.12)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [42])[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [42.24])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, "hallo")[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, True)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, ["bla", "blubb"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [5, 2])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, {"furz": "testen"})[0] + elif _type in (bool,): + assert entity_collection._is_attribute_compatible_with_value(attribute, True)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, False)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [True])[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [False])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 0)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 1)[0] + elif _type in (Arrow, datetime): + assert entity_collection._is_attribute_compatible_with_value(attribute, datetime.now())[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, Arrow.now())[0] + +def test_setattr(scenario_asset_task_version, ftrack_session): + # test POD + def set_and_assert_equal(collection, attributes, value): + attribute_tokens = attributes.split(".") + current = collection + idx = 0 + while idx < len(attribute_tokens) - 1: + # we want to break just before the last token to call setattr + # instead of getattr + current = getattr(current, attribute_tokens[idx]) + idx += 1 + + setattr(current, attribute_tokens[-1], value) + + after_value = getattr(current, attribute_tokens[-1]) + + if isinstance(value, EntityCollection): + value_list = [entity.ftrack_entity for entity in after_value._entities.values()] + expected_list = [entity.ftrack_entity for entity in value._entities.values()] + assert value_list == expected_list + elif isinstance(value, list): + assert after_value == value + else: + assert after_value == [value] + + assetversion = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["version", "asset.name"]) + assetversions = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=5, required_fields=["version", "asset.name", "task"]) + + set_and_assert_equal(assetversion, "version", 5) + set_and_assert_equal(assetversion, "asset.name", "foobar") + + set_and_assert_equal(assetversions, "version", [1, 2, 99, 4, 5]) + + assets = assetversions.asset + assetnames = ["new_assetname_{}".format(idx) for idx in range(len(assets))] + set_and_assert_equal(assets, "name", assetnames) + + task = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["task"]).task + set_and_assert_equal(assetversion, "task", task) + + tasks = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "Task", limit=5, required_fields=["name"]) + set_and_assert_equal(assetversions, "task", tasks) + + assetversions_with_links = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["uses_versions"]) + set_and_assert_equal(assetversions_with_links[0], "uses_versions", assetversions_with_links[1]) + + #assetversions_with_links = self.construct_simple_collection(AssetVersion, 2, ["used_in_versions"]) + #set_and_assert_equal(assetversions_with_links[0], "used_in_versions", assetversions_with_links[1]) + # TODO(T13213): this does not work, as ftrack seemingly only updates these connections + # once we commit + # self.assertIn(assetversions_with_links[0], assetversions_with_links[1].used_in_versions) + +def test_group(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=5) + expected_keys = assetversion_collection._entities.keys() + grouped = assetversion_collection.group(lambda x: x.id[0]) + + assert isinstance(grouped, dict) + assert expected_keys == grouped.keys() + + for key, value in grouped.items(): + assert [key] == [_.ftrack_entity["id"] for _ in value._entities.values()] + +def test_fold(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["asset.versions.user.username", "user.username"]) + + user_assetversions = {} + all_assetversions = [] + + for assetversion in assetversion_collection._entities.values(): + for asset_assetversion in assetversion.ftrack_entity["asset"]["versions"]: + user = asset_assetversion["user"]["username"] + if user not in user_assetversions: + user_assetversions[user] = 1 + else: + user_assetversions[user] += 1 + all_assetversions.append(asset_assetversion) + + for user in user_assetversions: + assert user_assetversions[user] == scenario_asset_task_version.construct_collection_from_ftrack_entities( + all_assetversions).fold(0, lambda x, y: x + 1 if y.user.username == [user] else x) + +def test_keys(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert list(assetversion_collection.keys()) == [_["id"] for _ in ftrack_entities] + +def test_values(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in assetversion_collection.values()] == ftrack_entities + +def test_max(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + + assert [_.ftrack_entity["version"] for _ in assetversion_collection.max(lambda x: x.version).values()] \ + == [max([_["version"] for _ in ftrack_entities])] + + ftrack_entities = scenario_asset_task_version.grab( + ftrack_session, "Asset", ["versions.version", "versions.asset"], 10) + all_assetversions = [] + for asset in ftrack_entities: + for assetversion in asset["versions"]: + all_assetversions.append(assetversion) + all_assetversions.sort(key=lambda x: x["version"]) + asset_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in asset_collection.max(lambda x: x.versions.version).values()] \ + == [all_assetversions[-1]["asset"]] + +def test_min(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity["version"] for _ in assetversion_collection.min(lambda x: x.version).values()] \ + == [min([_["version"] for _ in ftrack_entities])] + + ftrack_entities = scenario_asset_task_version.grab( + ftrack_session, "Asset", ["versions.version", "versions.asset"], 10) + all_assetversions = [] + for asset in ftrack_entities: + for assetversion in asset["versions"]: + all_assetversions.append(assetversion) + all_assetversions.sort(key=lambda x: x["version"]) + asset_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in asset_collection.min(lambda x: x.versions.version).values()] \ + == [all_assetversions[0]["asset"]] + +def test_sort_by(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + + # section sort by version + sorted_ftrack_entities = sorted(ftrack_entities, key=lambda x: x["version"]) + unsorted_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + sorted_collection = unsorted_collection.sort(lambda x: x.version) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + + # section sort by id + sorted_ftrack_entities = sorted(ftrack_entities, key=lambda x: x["id"]) + sorted_collection = unsorted_collection.sort(lambda x: x.id) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + simple_task_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "Task", 10) + sorted_ftrack_entities = sorted( + [_.ftrack_entity for _ in simple_task_collection.values()], + key=lambda x: x["id"]) + sorted_collection = simple_task_collection.sort(lambda x: x.id) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + +def test_filter(scenario_asset_task_version, ftrack_session): + repeat = True + while repeat: + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + filtered_ftrack_entities = [_ for _ in ftrack_entities if 1 < _["version"] < 20] + # we can only test filtering properly if our result differs + if len(filtered_ftrack_entities) != len(ftrack_entities): + repeat = False + + # section sort by id + unfiltered_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + filtered_collection = unfiltered_collection.filter(lambda x: 1 < x.version[0] < 20) + assert filtered_ftrack_entities == [_.ftrack_entity for _ in filtered_collection._entities.values()] + +def test_partition(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 20) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + versions_one = [_ for _ in ftrack_entities if _["version"] == 1] + versions_not_one = [_ for _ in ftrack_entities if _["version"] != 1] + collection_one, collection_not_one = assetversion_collection.partition(lambda x: x.version[0] == 1) + assert versions_one == [_.ftrack_entity for _ in collection_one._entities.values()] + assert versions_not_one == [_.ftrack_entity for _ in collection_not_one._entities.values()] + +def test_union(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_b = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_d = entities_c[0:2] + + ftrack_entities_union = [] + for ftrack_entities in [entities_a, entities_b, entities_c, entities_d]: + for ftrack_entity in ftrack_entities: + if not ftrack_entity in ftrack_entities_union: + ftrack_entities_union.append(ftrack_entity) + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_union = collection_a.union(collection_b, collection_c, collection_d) + + assert ftrack_entities_union == [_.ftrack_entity for _ in collection_union._entities.values()] + + typedcontext = scenario_asset_task_version.grab(ftrack_session, "Shot", limit=100) + typedcontext_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(typedcontext) + # Ftrack will never return a generic TypedContext. To recreate a situation + # that we encounter in our API, we have to set the type explicitly. + typedcontext_collection._entity = TypedContext() + emptycollection = EmptyCollection(_type=Task()) + + task = scenario_asset_task_version.grab(ftrack_session, "Task", limit=10) + task_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(task) + emptycollection.union(task_collection) + +def test_symmetric_difference(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=18) + entities_b = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=15) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=12) + entities_d = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + all_entities = [set(entities_a), set(entities_b), set(entities_c), set(entities_d)] + + ftrack_entities_difference = {key for key, val in Counter(chain.from_iterable(all_entities)).items() if val == 1} + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_difference = collection_a.symmetric_difference(collection_b, collection_c, collection_d) + assert ftrack_entities_difference == {_.ftrack_entity for _ in collection_difference._entities.values()} + +def test_difference(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=18) + entities_b = scenario_asset_task_version.grab(ftrack_session, "Task", limit=15) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=12) + entities_d = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + all_entities = entities_a + entities_b + entities_c + entities_d + + ftrack_entities_difference = [ + _ for _ in all_entities if _ in + list(set(entities_a).difference(entities_b).difference(entities_c).difference(entities_d)) + ] + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_difference = collection_a.difference(collection_b, collection_c, collection_d) + assert ftrack_entities_difference == [_.ftrack_entity for _ in collection_difference._entities.values()] + + +def test_intersection(scenario_asset_task_version, ftrack_session): + all_versions = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=50) + expected_versions = all_versions[:10] + additional_versions = all_versions[10:] + + collection1 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + expected_versions + additional_versions[5:30]) + collection2 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + list(reversed(expected_versions)) + additional_versions[15:20]) + collection3 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + expected_versions + additional_versions[1:9]) + + collection_intersection = collection1.intersection(collection2, collection3) + assert expected_versions == [_.ftrack_entity for _ in collection_intersection._entities.values()] + + # test with ambiguous parent context + shots = scenario_asset_task_version.grab(ftrack_session, "Shot", ["assets"], 2) + asset_build = scenario_asset_task_version.grab(ftrack_session, "AssetBuild", ["assets"]) + + mix = [shots[0]["assets"][0], asset_build[0]["assets"][0]] + collection4 = scenario_asset_task_version.construct_collection_from_ftrack_entities(mix) + collection5 = scenario_asset_task_version.construct_collection_from_ftrack_entities([shots[1]["assets"][0]]) + assert not collection4.intersection(collection5) + +def test_type_coercion(scenario_components, ftrack_session): + collection1 = scenario_components.construct_simple_collection(ftrack_session, "Component", limit=10) + coerced_collection1 = Component(collection1) + collection2 = scenario_components.construct_simple_collection(ftrack_session, "Sequence", limit=2) + coerced_collection2 = TypedContext(collection2) + collection_packet = [ + (collection1, coerced_collection1, Component), + (collection2, coerced_collection2, TypedContext)] + + for collection, coerced_collection, _type in collection_packet: + assert isinstance(coerced_collection, EntityCollection) + assert [_.id for _ in collection.values()] == [_.id for _ in coerced_collection.values()] + assert coerced_collection.entity_type == _type + with pytest.raises(AssertionError): + AssetVersion(collection) diff --git a/tests/test_session.py b/tests/test_session.py index d4db826..2f9f79b 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,7 +1,6 @@ import dbm import os import platform -import shutil import tempfile import ftrack_api @@ -9,12 +8,13 @@ from trackteroid import ( - SESSION, Query, - AssetVersion, + Shot, User ) +from trackteroid.session import Session + @pytest.fixture def dumped_operations_file(): @@ -46,21 +46,23 @@ def initial_operations(): def test_deferred_operations(dumped_operations_file): # case 1. clear an existing cache temporarily - query_result = Query(AssetVersion).get_first(projections=["task", "version"]) - operations_count = len(SESSION.recorded_operations) + session = Session() + operations_count = len(session.recorded_operations) + query_result = Query(Shot).get_first(projections=["name"]) # do a query -> cache a result - with SESSION.deferred_operations(dumped_operations_file): - assert 0 == len(SESSION.recorded_operations) + with session.deferred_operations(dumped_operations_file): + assert 0 == len(session.recorded_operations) - avs = Query(AssetVersion).by_id(*query_result.id).get_one(projections=["task"]) - avs.version = avs.version[0] + 10 - avs2 = avs.create(task=avs.task) # -> create entity, update task, update asset - avs2.version = avs.version[0] + 10 + shot = Query(Shot).by_id(*query_result.id).get_one(projections=["name"]) + print(shot) + shot.name = "first shot" + shot2 = shot.create(name="second shot") # -> create entity, update task, update asset + shot2.name = "second shot renamed" - assert 5 == len(SESSION.recorded_operations) + assert 4 == len(session.recorded_operations) - assert operations_count == len(SESSION.recorded_operations) + assert operations_count == len(session.recorded_operations) # check the created file database database = dbm.open(dumped_operations_file, "r") @@ -69,13 +71,13 @@ def make_keys(entity_collection): lambda x: "('{}', ['{}'])".format(x.entity_type.__name__, x.id[0]) ) - expected_keys = make_keys(avs.union(avs2)) + make_keys(avs.task) + make_keys(avs.asset) + ["__operations__"] + expected_keys = make_keys(shot.union(shot2)) + ["__operations__"] assert expected_keys, database.keys() def test_reconnect_and_commit(initial_operations): - - SESSION.reconnect_and_commit(initial_operations[0]) + session = Session() + session.reconnect_and_commit(initial_operations[0]) user = Query(User).by_name(initial_operations[1]["username"]).get_one( projections=["first_name", "last_name", "is_active"] @@ -86,9 +88,12 @@ def test_reconnect_and_commit(initial_operations): def test_get_cached_collections(initial_operations): - SESSION.reconnect_and_commit(initial_operations[0]) - - users = SESSION.get_cached_collections()[User].fetch_attributes("first_name", "last_name", "is_active") - - users_split = users.partition(lambda u: u.username[0] == initial_operations[1]["username"]) - assert users_split[0].first_name == [initial_operations[1]["first_name"]] + session = Session() + session.reconnect_and_commit(initial_operations[0]) + + cached_users = session.get_cached_collections()[User] + for cached_user in cached_users: + user = Query(User).by_id(cached_user.id[0]).get_first(projections=["username", "first_name"]) + assert user, "Cached user does not exist: {}".format(cached_user.id[0]) + users_split = user.partition(lambda u: u.username[0] == initial_operations[1]["username"]) + assert users_split[0].first_name == [initial_operations[1]["first_name"]]