From f883be61377f4daa2e5d0472757680e6dfcf4a99 Mon Sep 17 00:00:00 2001 From: Salvador Olmos Miralles Date: Wed, 5 Jul 2023 18:46:38 +0200 Subject: [PATCH 1/6] added pytest tests ported from the old unitests --- tests/conftest.py | 206 +++++++++++++++++++++++++ tests/test_authoring.py | 317 ++++++++++++++++++++++++++++++++++++++ tests/test_placeholder.py | 2 - tests/test_session.py | 75 +++++++++ tox.ini | 1 + 5 files changed, 599 insertions(+), 2 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_authoring.py delete mode 100644 tests/test_placeholder.py create mode 100644 tests/test_session.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..4a99ba9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,206 @@ +import dataclasses +import uuid +from typing import List + +import ftrack_api +import pytest + +from trackteroid import SESSION + + +@dataclasses.dataclass +class TestScenario: + project_id: str + sequence_ids: List[str] = dataclasses.field(default_factory=list) + shot_ids: List[str] = dataclasses.field(default_factory=list) + assetbuild_ids: List[str] = dataclasses.field(default_factory=list) + task_ids: List[str] = dataclasses.field(default_factory=list) + asset_ids: List[str] = dataclasses.field(default_factory=list) + version_ids: List[str] = dataclasses.field(default_factory=list) + component_ids: List[str] = dataclasses.field(default_factory=list) + + def query_project(self, session, projections): + return session.query( + f"select {', '.join(projections)} from Project " + f"where id is {self.project_id}" + ).one() + + def grab(self, session, entity_type, required_fields): + + type_ids = { + "Sequence": self.sequence_ids, + "Shot": self.shot_ids, + "AssetBuild": self.assetbuild_ids, + "Task": self.task_ids, + "Asset": self.asset_ids, + "AssetVersion": self.version_ids, + "Component": self.component_ids, + } + + if entity_type not in type_ids: + raise KeyError(entity_type) + if not type_ids[entity_type]: + raise ValueError(f"No entities of type {entity_type} exists in the scenario") + + query = f"select {', '.join(required_fields)} from {entity_type} where " + for field in required_fields: + query += f"{field} like '%' " + query += f"and id in ({', '.join(type_ids[entity_type])})" + return session.query(query).all() + + +@pytest.fixture(autouse=True) +def reconnect_session(): + return SESSION.reconnect() + + +@pytest.fixture() +def ftrack_session(): + with ftrack_api.Session() as session: + yield session + + +@pytest.fixture(scope="session") +def ftrack_project_id(): + session = ftrack_api.Session() + name = "unittests_{0}".format(uuid.uuid1().hex) + + # Naively pick the first project schema. For this example to work the + # schema must contain `Shot` and `Sequence` object types. + required_types = ["Sequence", "Shot"] + project_schema = None + for schema in session.query("ProjectSchema").all(): + types = [x["name"] for x in schema["object_types"]] + if all([x in types for x in required_types]): + project_schema = schema + break + + if not project_schema: + raise ValueError( + f"A project schema with the following types could not be found on {session.server_url}:" + f" {', '.join(required_types)}" + ) + + # Create the project with the chosen schema. + project = session.create( + "Project", + {"name": name, "full_name": name + "_full", "project_schema": project_schema}, + ) + session.commit() + + yield project["id"] + + session.delete(project) + session.commit() + + +@pytest.fixture +def scenario_project(ftrack_project_id) -> TestScenario: + return TestScenario(project_id=ftrack_project_id) + + +@pytest.fixture +def scenario_sequence(ftrack_session, scenario_project) -> TestScenario: + project = scenario_project.query_project(ftrack_session, ["project_schema"]) + + # Create sequences, shots and tasks. + for sequence_number in range(1, 5): + sequence = ftrack_session.create( + "Sequence", + {"name": "seq_{0}".format(uuid.uuid4()), "parent": project}, + ) + + scenario_project.sequence_ids.append(sequence["id"]) + ftrack_session.commit() + return scenario_project + + +@pytest.fixture +def scenario_assetbuild(ftrack_session, scenario_project) -> TestScenario: + project = scenario_project.query_project(ftrack_session, ["project_schema"]) + + for _ in range(4): + sequence = ftrack_session.create( + "AssetBuild", + {"name": "ab_{0}".format(uuid.uuid4()), "parent": project}, + ) + + scenario_project.assetbuild_ids.append(sequence["id"]) + ftrack_session.commit() + return scenario_project + + +@pytest.fixture +def scenario_shot(ftrack_session, scenario_sequence) -> TestScenario: + project = scenario_sequence.query_project(ftrack_session, ["project_schema"]) + project_schema = project["project_schema"] + default_shot_status = project_schema.get_statuses("Shot")[0] + + for sequence_id in scenario_sequence.sequence_ids: + sequence = ftrack_session.get("Sequence", sequence_id) + for shot_number in range(1, 8): + shot = ftrack_session.create( + "Shot", + { + "name": "shot_{}".format(uuid.uuid4()), + "parent": sequence, + "status": default_shot_status, + }, + ) + scenario_sequence.shot_ids.append(shot["id"]) + ftrack_session.commit() + return scenario_sequence + + +def _create_tasks(session, scenario, parent_ids): + for id_ in parent_ids: + parent = session.get("TypedContext", id_) + for task_number in range(1, 5): + task = session.create( + "Task", + { + "name": "task_{0}".format(uuid.uuid4()), + "parent": parent + }, + ) + scenario.task_ids.append(task["id"]) + + session.commit() + return scenario + + +@pytest.fixture +def scenario_shot_task(ftrack_session, scenario_shot) -> TestScenario: + return _create_tasks(ftrack_session, scenario_shot, scenario_shot.shot_ids) + + +@pytest.fixture +def scenario_assetbuild_task(ftrack_session, scenario_shot) -> TestScenario: + return _create_tasks(ftrack_session, scenario_shot, scenario_shot.assetbuild_ids) + + +def _create_assets(session, scenario, parent_ids): + for id_ in parent_ids: + parent = session.get("TypedContext", id_) + for task_number in range(1, 5): + task = session.create( + "Asset", + { + "name": "asset_{0}".format(uuid.uuid4()), + "parent": parent + }, + ) + scenario.asset_ids.append(task["id"]) + + session.commit() + return scenario + + +@pytest.fixture +def scenario_shot_asset(ftrack_session, scenario_shot) -> TestScenario: + return _create_assets(ftrack_session, scenario_shot, scenario_shot.shot_ids) + + +@pytest.fixture +def scenario_assetbuild_asset(ftrack_session, scenario_assetbuild) -> TestScenario: + return _create_assets(ftrack_session, scenario_assetbuild, scenario_assetbuild.assetbuild_ids) diff --git a/tests/test_authoring.py b/tests/test_authoring.py new file mode 100644 index 0000000..c34cd5f --- /dev/null +++ b/tests/test_authoring.py @@ -0,0 +1,317 @@ +import random +import uuid +from collections import OrderedDict + +import pytest + +from trackteroid import ( + Query, + Sequence, + Project, + Shot, + Task, + PROJECT_SCHEMAS, + TASK_TYPES, + NoteCategory, + Note, +) +from trackteroid.entities.base import EntityCollection, Entity + + +def test_link_inputs(scenario_sequence): + sequences = ( + Query(Sequence).by_id(*scenario_sequence.sequence_ids).get_all(order_by="name") + ) + sequence1 = sequences[0] + sequence2 = sequences[1] + sequence3 = sequences[2:4] + + after_link1 = sequence1.link_inputs(sequence2) + after_link2 = sequence2.link_inputs(sequence3) + + assert isinstance(after_link1, EntityCollection) + assert "Sequence" == after_link1._entity.__class__.__name__ + assert isinstance(after_link2, EntityCollection) + assert "Sequence" == after_link2._entity.__class__.__name__ + + sequence1.commit() + + assert ( + Query(Sequence) + .by_id(*sequence1.id) + .get_one(projections=["incoming_links.from_id"]) + .incoming_links.from_id + == sequence2.id + ) + + assert ( + Query(Sequence) + .by_id(*sequence2.id) + .get_one(projections=["incoming_links.from_id"]) + .incoming_links.from_id + == sequence3.id, + ) + + +def test_create(scenario_project, ftrack_session): + project = Query(Project).by_id(scenario_project.project_id).get_one() + + test_entities = [] + + # create a sequence + sequence = project.children[Sequence].create(name="Sequence") + test_entities.append(("Sequence", sequence.id[0])) + + # create some shots + shot1 = sequence.children[Shot].create(name="Shot1") + shot2 = sequence.children[Shot].create(name="Shot2") + test_entities.append(("Shot", shot1.id[0])) + test_entities.append(("Shot", shot2.id[0])) + + # create some tasks["name + shot1_task1, shot1_task2 = shot1.children[Task].create_batch( + {"name": "Modeling", "type": "Modeling"}, + {"name": "Rigging", "type": "Rigging"}, + ) + shot2_tasks = shot2.children[Task].create_batch( + {"name": "Modeling", "type": "Modeling"}, + ) + shot2_tasks2 = shot2_tasks.create_batch( + {"name": "Rigging", "type": "Rigging"}, + {"name": "Animation", "type": "Animation"}, + ) + tasks = [ + shot1_task1, + shot1_task2, + shot2_tasks, + shot2_tasks2, + ] + for task in tasks: + for id in task.id: + test_entities.append(("Task", id)) + + asset_types = ftrack_session.query("AssetType").all() + + # create some assets + asset1 = shot1.assets.create(name="Asset1", type=asset_types[0]["name"]) + asset2 = shot1.assets.create(name="Asset2", type=asset_types[1]["name"]) + test_entities.append(("Asset", asset1.id[0])) + test_entities.append(("Asset", asset2.id[0])) + + # create some versions + assetversion1 = asset1.versions.create(task=shot1_task1) + assetversion2 = asset2.versions.create(task=shot1_task2) + test_entities.append(("AssetVersion", assetversion1.id[0])) + test_entities.append(("AssetVersion", assetversion2.id[0])) + + # link some versions + assetversion1.link_outputs(assetversion2) + + # push to server + project.commit() + + retrieved_entities = [] + for entity in test_entities: + retrieved_entities.append(ftrack_session.get(*entity)) + assert all(retrieved_entities), "Some entities were not created" + + +def test_create_project(ftrack_session): + some_project = Query(Project).get_first() + + with pytest.raises(AssertionError): + some_project.create() + + with pytest.raises(AssertionError): + some_project.create(name="Foobar") + + with pytest.raises(AssertionError): + some_project.create(name="Foobar", project_schema="DuDoedl") + + created_project = some_project.create( + name=str(uuid.uuid4()), + project_schema=random.choice(list(PROJECT_SCHEMAS.types.keys())), + ) + + try: + assert isinstance(created_project, EntityCollection) + assert "Project" == created_project._entity.__class__.__name__ + + created_project.commit() + + assert Query(Project).by_id(*created_project.id).get_one() == created_project + finally: + ftrack_session.delete(ftrack_session.get("Project", created_project.id[0])) + ftrack_session.commit() + + +def test_create_sequence(scenario_project): + test_project = Query(Project).by_id(scenario_project.project_id).get_one() + + with pytest.raises(AssertionError): + test_project.children[Sequence].create() + + created_sequence = test_project.children[Sequence].create(name=str(uuid.uuid4())) + + assert isinstance(created_sequence, EntityCollection) + assert "Sequence" == created_sequence._entity.__class__.__name__ + + test_project.commit() + + queried_sequence = Query(Sequence).by_id(*created_sequence.id).get_one() + assert isinstance(queried_sequence, EntityCollection) + assert test_project.id == queried_sequence.parent_id + + +def test_create_shot(scenario_sequence): + test_sequence = ( + Query(Sequence).by_id(Project, scenario_sequence.project_id).get_all() + ) + + with pytest.raises(AssertionError): + test_sequence.children[Shot].create() + + with pytest.raises(AssertionError) as excinfo: + test_sequence.children[Shot].create(name=str(uuid.uuid4())) + + assert "Ambiguous context" in str(excinfo.value) + + test_sequence = ( + Query(Sequence).by_id(Project, scenario_sequence.project_id).get_first() + ) + created_shot = test_sequence.children[Shot].create(name=str(uuid.uuid4())) + + assert isinstance(created_shot, EntityCollection) + assert "Shot" == created_shot._entity.__class__.__name__ + + test_sequence.commit() + + queried_shot = Query(Shot).by_id(*created_shot.id).get_one() + assert queried_shot == created_shot + assert test_sequence.id == queried_shot.parent_id + + +def test_create_task(scenario_shot): + test_shot = ( + Query(Shot) + .by_id(Project, scenario_shot.project_id) + .get_all(projections=["project.project_schema._task_type_schema.types.name"]) + ) + task_types = [ + TASK_TYPES._to_camel_case(_) + for _ in test_shot.project.project_schema._task_type_schema.types.name + ] + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create() + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create(name=str(uuid.uuid4())) + + with pytest.raises(AssertionError) as context: + test_shot.children[Task].create( + name=str(uuid.uuid4()), type=random.choice(task_types) + ) + assert "Ambiguous context" in str(context.value) + + test_shot = Query(Shot).by_id(Project, scenario_shot.project_id).get_first() + + created_task = test_shot.children[Task].create( + name=str(uuid.uuid4()), type=random.choice(task_types) + ) + + assert isinstance(created_task, EntityCollection) + assert "Task" == created_task._entity.__class__.__name__ + + test_shot.commit() + + queried_task = Query(Task).by_id(*created_task.id).get_one() + + assert queried_task == created_task + assert test_shot.id == queried_task.parent_id + + +def _construct_collection_from_ftrack_entities(ftrack_entities, session): + import trackteroid.entities + assert ftrack_entities + + if not isinstance(ftrack_entities, list): + ftrack_entities = [ftrack_entities] + entities = [] + + for ftrack_entity in ftrack_entities: + entities.append( + ( + ftrack_entity["id"], + Entity( + _cls=getattr(trackteroid.entities, ftrack_entity.entity_type), + ftrack_entity=ftrack_entity, + ), + ) + ) + + collection = EntityCollection( + _cls=entities[0][1].__class__, entities=OrderedDict(entities), session=session + ) + collection.query = Query(entities[0][1].__class__).by_id(*[_["id"] for _ in ftrack_entities]) + return collection + + +def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_asset, ftrack_session): + shots = scenario_shot_asset.grab(ftrack_session, "Shot", ["assets.id"]) + asset_builds = scenario_assetbuild_asset.grab(ftrack_session, "AssetBuild", ["assets.id"]) + + mix = [shots[0]["assets"][0], asset_builds[0]["assets"][0]] + + collection4 = _construct_collection_from_ftrack_entities(mix, ftrack_session) + + collection5 = _construct_collection_from_ftrack_entities( + shots[1]["assets"][0], ftrack_session + ) + + asset_type = ftrack_session.query("AssetType").first() + with pytest.raises(AssertionError) as context: + collection4.intersection(collection5).create( + name=str(uuid.uuid4()), type=asset_type["name"] + ) + assert "Ambiguous context" in str(context.value) + + +def test_create_note(scenario_shot): + categories = Query(NoteCategory).get_all() + # Run it three times, once on an empty collection, another in a collection + # with a single entity and then two (this last one will test that the parent + # ambiguity error is not raised) + for i in range(3): + chosen_category = random.choice(categories) + # Query every time to ensure the cache is not fooling the tests + test_shot = Query(Shot).by_id(*scenario_shot.shot_ids).get_first(projections=["notes.content"]) + + assert ( + len(test_shot.notes) == i + ), "Expected shot to have {} notes, {} found".format(i, len(test_shot.notes)) + + with pytest.raises(AssertionError): + test_shot.notes.create(contents="A note", category="A String") + + note = test_shot.notes.create(content="A note", category=chosen_category) + note.commit() + + # For some unknown reason, note.commit() converts parent_type to "Resource", so we can ignore testing + # the parent_type, as it'll be wrong anyway + assert ( + note.parent_id[0] == test_shot.id[0] + ), "Expected parent id {!r}, got {!r}".format( + test_shot.id[0], note.parent_id[0] + ) + + # To ensure it's on the entity and not only on the session cache, a new query + # should do the trick + notes = Query(Note).inject("parent_id is {}".format(test_shot.id[0])).get_all() + assert len(notes) == (i + 1), "Expected {} notes, {} found".format( + i + 1, len(notes) + ) + assert ( + notes.filter(lambda x: x.id[0] == note.id[0]).category.name[0] + == chosen_category.name[0] + ) diff --git a/tests/test_placeholder.py b/tests/test_placeholder.py deleted file mode 100644 index 3ada1ee..0000000 --- a/tests/test_placeholder.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_placeholder(): - assert True diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..a38436e --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,75 @@ +import os +import shutil +import tempfile + +import pytest + +from trackteroid import SESSION, Query, AssetVersion +import dbm + + +@pytest.fixture +def dumped_operations_file(): + temp = tempfile.gettempdir() + yield os.path.join(temp, "operations.dbm") + # TODO: This fails as it's in use when it reaches teardown. Why? + # shutil.rmtree(temp) + + +@pytest.fixture +def initial_operations_file(): + return os.path.join(tempfile.gettempdir(), "operations.dbm") + + +def test_deferred_operations(dumped_operations_file): + # case 1. clear an existing cache temporarily + query_result = Query(AssetVersion).get_first(projections=["task", "version"]) + operations_count = len(SESSION.recorded_operations) + + # do a query -> cache a result + with SESSION.deferred_operations(dumped_operations_file): + assert 0 == len(SESSION.recorded_operations) + + avs = Query(AssetVersion).by_id(*query_result.id).get_one(projections=["task"]) + avs.version = avs.version[0] + 10 + avs2 = avs.create(task=avs.task) # -> create entity, update task, update asset + avs2.version = avs.version[0] + 10 + + assert 5 == len(SESSION.recorded_operations) + + assert operations_count == len(SESSION.recorded_operations) + # check the created file database + database = dbm.open(dumped_operations_file, "r") + + def make_keys(entity_collection): + return entity_collection.map( + lambda x: "('{}', ['{}'])".format(x.entity_type.__name__, x.id[0]) + ) + + expected_keys = make_keys(avs.union(avs2)) + make_keys(avs.task) + make_keys(avs.asset) + ["__operations__"] + assert expected_keys, database.keys() + + +def test_reconnect_and_commit(initial_operations_file): + + + + SESSION.reconnect_and_commit(initial_operations_file) + + assetversion = Query(AssetVersion).by_id("fbb682b6-e9e6-4111-8edb-38d0797c9ffe").get_one( + projections=["components.name"] + ) + assert 99 == assetversion.version[0] + assert "pymelle" == assetversion.components.name[0] + + +def test_get_cached_collections(initial_operations_file): + SESSION.reconnect_and_commit(initial_operations_file) + + avs = SESSION.get_cached_collections()[AssetVersion] + avs.fetch_attributes("asset.versions") + + v99, rest = avs.asset.versions.partition(lambda x: x.version[0] == 99) + v99.uses_versions = rest + + SESSION.commit() diff --git a/tox.ini b/tox.ini index 590288c..613de32 100644 --- a/tox.ini +++ b/tox.ini @@ -11,3 +11,4 @@ deps = pytest-sugar commands = pytest {posargs:tests} +passenv = FTRACK_API_KEY,FTRACK_API_USER,FTRACK_SERVER From 4185fbf7a63b49cbf365413d1c455d38492e4f50 Mon Sep 17 00:00:00 2001 From: Alf Kraus <99alfie@gmail.com> Date: Mon, 14 Aug 2023 15:00:41 +0200 Subject: [PATCH 2/6] fixed test_create_note --- src/trackteroid/entities/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/trackteroid/entities/base.py b/src/trackteroid/entities/base.py index 26a57c1..f87d3e6 100644 --- a/src/trackteroid/entities/base.py +++ b/src/trackteroid/entities/base.py @@ -1518,7 +1518,8 @@ def _prepare_note_entity(self, ftrack_note_entity, kwargs, parent_source): for resource_id in resource_ids: recipient = recipients.create( note_id=ftrack_note_entity["id"], - resource_id=resource_id + resource_id=resource_id, + no_parent=True ) ftrack_note_entity["recipients"].append(list(recipient.values())[0].ftrack_entity) From 7b93f0d551f171355625576f160c4f98cc5bb402 Mon Sep 17 00:00:00 2001 From: Alf Kraus <99alfie@gmail.com> Date: Mon, 14 Aug 2023 15:00:41 +0200 Subject: [PATCH 3/6] fixed test_create_note --- src/trackteroid/entities/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/trackteroid/entities/base.py b/src/trackteroid/entities/base.py index 332d234..0da59c5 100644 --- a/src/trackteroid/entities/base.py +++ b/src/trackteroid/entities/base.py @@ -1603,7 +1603,8 @@ def _prepare_note_entity(self, ftrack_note_entity, kwargs, parent_source): for resource_id in resource_ids: recipient = recipients.create( note_id=ftrack_note_entity["id"], - resource_id=resource_id + resource_id=resource_id, + no_parent=True ) ftrack_note_entity["recipients"].append(list(recipient.values())[0].ftrack_entity) From 6c9325fadeb3e1ab1c58ca09efcb8e761864b3e4 Mon Sep 17 00:00:00 2001 From: Alf Kraus <99alfie@gmail.com> Date: Mon, 21 Aug 2023 13:57:03 +0200 Subject: [PATCH 4/6] reverted file to conform to main, added conditional so test works --- tests/test_session.py | 72 +++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/tests/test_session.py b/tests/test_session.py index c5fee38..8cdc4d6 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,24 +1,46 @@ +import dbm import os -import shutil +import platform import tempfile +import ftrack_api import pytest -from trackteroid import SESSION, Query, AssetVersion -import dbm + +from trackteroid import ( + SESSION, + Query, + AssetVersion, + User +) @pytest.fixture def dumped_operations_file(): - temp = tempfile.gettempdir() - yield os.path.join(temp, "operations.dbm") - # TODO: This fails as it's in use when it reaches teardown. Why? - # shutil.rmtree(temp) + yield os.path.join(tempfile.gettempdir(), "operations.dbm") @pytest.fixture -def initial_operations_file(): - return os.path.join(tempfile.gettempdir(), "operations.dbm") +def initial_operations(): + # these contents need to match the operations in the given resource file + test_data = { + "username": "demo.user@example.com", + "first_name": "demo", + "last_name": "user", + "is_active": True + } + yield ( + f"{os.path.dirname(__file__)}/resources/session" + f"/operations_{'windows' if platform.platform().startswith('Windows') else 'linux'}.dbm", + test_data + ) + with ftrack_api.Session(auto_populate=False, auto_connect_event_hub=False) as session: + demo_user = session.query( + f"select first_name, last_name from User where username is \"{test_data['username']}\"" + ).first() + if demo_user: + session.delete(demo_user) + session.commit() def test_deferred_operations(dumped_operations_file): @@ -50,26 +72,24 @@ def make_keys(entity_collection): assert expected_keys, database.keys() -def test_reconnect_and_commit(initial_operations_file): +def test_reconnect_and_commit(initial_operations): + SESSION.reconnect_and_commit(initial_operations[0]) - - SESSION.reconnect_and_commit(initial_operations_file) - - assetversion = Query(AssetVersion).by_id("fbb682b6-e9e6-4111-8edb-38d0797c9ffe").get_one( - projections=["components.name"] + user = Query(User).by_name(initial_operations[1]["username"]).get_one( + projections=["first_name", "last_name", "is_active"] ) - assert 99 == assetversion.version[0] - assert "pymelle" == assetversion.components.name[0] - - -def test_get_cached_collections(initial_operations_file): - SESSION.reconnect_and_commit(initial_operations_file) + assert initial_operations[1]["first_name"] == user.first_name[0] + assert initial_operations[1]["last_name"] == user.last_name[0] + assert initial_operations[1]["is_active"] is True - avs = SESSION.get_cached_collections()[AssetVersion] - avs.fetch_attributes("asset.versions") - v99, rest = avs.asset.versions.partition(lambda x: x.version[0] == 99) - v99.uses_versions = rest +def test_get_cached_collections(initial_operations): + SESSION.reconnect_and_commit(initial_operations[0]) - SESSION.commit() \ No newline at end of file + cached_users = SESSION.get_cached_collections()[User] + for cached_user in cached_users: + user = Query(User).by_id(cached_user.id[0]).get_first(projections=["username", "first_name"]) + if user: + users_split = user.partition(lambda u: u.username[0] == initial_operations[1]["username"]) + assert users_split[0].first_name == [initial_operations[1]["first_name"]] From d21845f29c86bfb2627ccfb0199c590f6e186f62 Mon Sep 17 00:00:00 2001 From: Alf Kraus <99alfie@gmail.com> Date: Thu, 31 Aug 2023 13:39:36 +0200 Subject: [PATCH 5/6] some refactoring and initial commit of test_entity --- tests/conftest.py | 249 ++++++++++---------- tests/test_authoring.py | 30 +-- tests/test_entity.py | 505 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 651 insertions(+), 133 deletions(-) create mode 100644 tests/test_entity.py diff --git a/tests/conftest.py b/tests/conftest.py index 4a99ba9..a7207dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,52 +1,119 @@ -import dataclasses +from collections import OrderedDict +import logging +import random import uuid -from typing import List import ftrack_api +from ftrack_api.collection import Collection import pytest -from trackteroid import SESSION +from trackteroid import SESSION, Query +from trackteroid.entities import * +from trackteroid.entities.base import Entity, EntityCollection -@dataclasses.dataclass class TestScenario: - project_id: str - sequence_ids: List[str] = dataclasses.field(default_factory=list) - shot_ids: List[str] = dataclasses.field(default_factory=list) - assetbuild_ids: List[str] = dataclasses.field(default_factory=list) - task_ids: List[str] = dataclasses.field(default_factory=list) - asset_ids: List[str] = dataclasses.field(default_factory=list) - version_ids: List[str] = dataclasses.field(default_factory=list) - component_ids: List[str] = dataclasses.field(default_factory=list) + __test__ = False + + def __init__(self, project_ids=[]): + self.entity_ids = { + "Project": project_ids, + "Folder": [], + "Sequence": [], + "Shot": [], + "AssetBuild": [], + "Task": [], + "Asset": [], + "AssetVersion": [], + "Component": [] + } def query_project(self, session, projections): return session.query( f"select {', '.join(projections)} from Project " - f"where id is {self.project_id}" + f"where id is {self.entity_ids['Project'][0]}" ).one() - def grab(self, session, entity_type, required_fields): - - type_ids = { - "Sequence": self.sequence_ids, - "Shot": self.shot_ids, - "AssetBuild": self.assetbuild_ids, - "Task": self.task_ids, - "Asset": self.asset_ids, - "AssetVersion": self.version_ids, - "Component": self.component_ids, - } - - if entity_type not in type_ids: + def create_entity(self, ftrack_session, entity_type, params): + entity = ftrack_session.create(entity_type, params) + self.entity_ids[entity_type].append(entity["id"]) + return entity["id"] + + def create_assets_tasks_versions(self, ftrack_session): + + for parent_id in self.entity_ids["AssetBuild"] + self.entity_ids["Shot"]: + parent = ftrack_session.get("TypedContext", parent_id) + for _ in range(3): + params = {"name": "asset_{0}".format(uuid.uuid4()), "parent": parent} + asset_id = self.create_entity(ftrack_session, "Asset", params) + params = {"name": "task_{0}".format(uuid.uuid4()), "parent": parent} + task_id = self.create_entity(ftrack_session, "Task", params=params) + for version in range(1, 4): + asset = ftrack_session.get("Asset", asset_id) + task = ftrack_session.get("Task", task_id) + params = {"version": version, "asset": asset, "task": task} + self.create_entity(ftrack_session, "AssetVersion", params=params) + + def grab(self, session, entity_type, required_fields=[], limit=0): + if entity_type not in self.entity_ids: raise KeyError(entity_type) - if not type_ids[entity_type]: + if not self.entity_ids[entity_type]: raise ValueError(f"No entities of type {entity_type} exists in the scenario") - query = f"select {', '.join(required_fields)} from {entity_type} where " - for field in required_fields: - query += f"{field} like '%' " - query += f"and id in ({', '.join(type_ids[entity_type])})" - return session.query(query).all() + query = "" + if required_fields: + query += f"select {', '.join(required_fields)} from " + query += f"{entity_type} where " + query += f"id in ({', '.join(self.entity_ids[entity_type])})" + if limit > 0: + query += f" limit {limit}" + result = session.query(query).all() + random.shuffle(result) + return result + + def construct_collection_from_ftrack_entities(self, ftrack_entities): + entities = [] + for ftrack_entity in ftrack_entities: + entities.append( + (ftrack_entity["id"], Entity(_cls=globals()[ftrack_entity.entity_type], ftrack_entity=ftrack_entity))) + collection = EntityCollection( + _cls=entities[0][1].__class__, + entities=OrderedDict(entities), + session=SESSION + ) + collection.query = Query(entities[0][1].__class__).by_id(*[_["id"] for _ in ftrack_entities]) + return collection + + def construct_simple_collection(self, session, entity_type, limit=0, required_fields=[]): + entities = self.grab(session, entity_type, limit=limit, required_fields=required_fields) + return self.construct_collection_from_ftrack_entities(entities) + + def _assert_attributes(self, ftrack_entity, entity_collection): + not_implemented_msg = "Skipped attribute testing for '{key}', because required Entity is not implemented" + + for key, value in ftrack_entity.items(): + if isinstance(value, Collection): + try: + _collection_ids = [] + if len(value) > 0: + for _ in value: + _collection_ids.append(_["id"]) + logging.debug("Test for secondary") + assert _collection_ids == getattr(entity_collection, key).id + except NotImplementedError: + logging.warning(not_implemented_msg.format(key=key)) + else: + # testing primary attributes + logging.debug("Testing against attribute '{}'".format(key)) + try: + _attribute_value = getattr(entity_collection, key) + if isinstance(_attribute_value, EntityCollection): + _ftrack_entities = [_.ftrack_entity for _ in _attribute_value._entities.values()] + assert [value] == _ftrack_entities + else: + assert [value] == getattr(entity_collection, key) + except NotImplementedError: + logging.warning(not_implemented_msg.format(key=key)) @pytest.fixture(autouse=True) @@ -60,7 +127,7 @@ def ftrack_session(): yield session -@pytest.fixture(scope="session") +@pytest.fixture def ftrack_project_id(): session = ftrack_api.Session() name = "unittests_{0}".format(uuid.uuid1().hex) @@ -96,111 +163,57 @@ def ftrack_project_id(): @pytest.fixture def scenario_project(ftrack_project_id) -> TestScenario: - return TestScenario(project_id=ftrack_project_id) + return TestScenario(project_ids=[ftrack_project_id]) @pytest.fixture -def scenario_sequence(ftrack_session, scenario_project) -> TestScenario: +def scenario_sequence_folder(ftrack_session, scenario_project) -> TestScenario: project = scenario_project.query_project(ftrack_session, ["project_schema"]) - - # Create sequences, shots and tasks. - for sequence_number in range(1, 5): - sequence = ftrack_session.create( - "Sequence", - {"name": "seq_{0}".format(uuid.uuid4()), "parent": project}, - ) - - scenario_project.sequence_ids.append(sequence["id"]) + for _ in range(2): + params = {"name": "seq_{0}".format(uuid.uuid4()), "parent": project} + scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Sequence", params=params) + params = {"name": "asset_builds".format(uuid.uuid4()), "parent": project} + scenario_project.create_entity(ftrack_session=ftrack_session, entity_type="Folder", params=params) ftrack_session.commit() - return scenario_project - - -@pytest.fixture -def scenario_assetbuild(ftrack_session, scenario_project) -> TestScenario: - project = scenario_project.query_project(ftrack_session, ["project_schema"]) - for _ in range(4): - sequence = ftrack_session.create( - "AssetBuild", - {"name": "ab_{0}".format(uuid.uuid4()), "parent": project}, - ) - - scenario_project.assetbuild_ids.append(sequence["id"]) - ftrack_session.commit() return scenario_project @pytest.fixture -def scenario_shot(ftrack_session, scenario_sequence) -> TestScenario: - project = scenario_sequence.query_project(ftrack_session, ["project_schema"]) +def scenario_assetbuild_shot(ftrack_session, scenario_sequence_folder) -> TestScenario: + assetbuild_folder_id = scenario_sequence_folder.entity_ids["Folder"][0] + assetbuild_folder = ftrack_session.get("Folder", assetbuild_folder_id) + for _ in range(3): + params = {"name": "asset_build_{0}".format(uuid.uuid4()), "parent": assetbuild_folder} + scenario_sequence_folder.create_entity(ftrack_session, "AssetBuild", params=params) + + project = scenario_sequence_folder.query_project(ftrack_session, ["project_schema"]) project_schema = project["project_schema"] default_shot_status = project_schema.get_statuses("Shot")[0] - - for sequence_id in scenario_sequence.sequence_ids: + for sequence_id in scenario_sequence_folder.entity_ids["Sequence"]: sequence = ftrack_session.get("Sequence", sequence_id) - for shot_number in range(1, 8): - shot = ftrack_session.create( - "Shot", - { - "name": "shot_{}".format(uuid.uuid4()), - "parent": sequence, - "status": default_shot_status, - }, - ) - scenario_sequence.shot_ids.append(shot["id"]) + for shot in range(3): + params = {"name": "shot_{}".format(uuid.uuid4()), "parent": sequence, "status": default_shot_status} + scenario_sequence_folder.create_entity(ftrack_session, "Shot", params=params) ftrack_session.commit() - return scenario_sequence - - -def _create_tasks(session, scenario, parent_ids): - for id_ in parent_ids: - parent = session.get("TypedContext", id_) - for task_number in range(1, 5): - task = session.create( - "Task", - { - "name": "task_{0}".format(uuid.uuid4()), - "parent": parent - }, - ) - scenario.task_ids.append(task["id"]) - session.commit() - return scenario + return scenario_sequence_folder @pytest.fixture -def scenario_shot_task(ftrack_session, scenario_shot) -> TestScenario: - return _create_tasks(ftrack_session, scenario_shot, scenario_shot.shot_ids) - - -@pytest.fixture -def scenario_assetbuild_task(ftrack_session, scenario_shot) -> TestScenario: - return _create_tasks(ftrack_session, scenario_shot, scenario_shot.assetbuild_ids) - - -def _create_assets(session, scenario, parent_ids): - for id_ in parent_ids: - parent = session.get("TypedContext", id_) - for task_number in range(1, 5): - task = session.create( - "Asset", - { - "name": "asset_{0}".format(uuid.uuid4()), - "parent": parent - }, - ) - scenario.asset_ids.append(task["id"]) +def scenario_asset_task_version(ftrack_session, scenario_assetbuild_shot) -> TestScenario: + scenario_assetbuild_shot.create_assets_tasks_versions(ftrack_session) + ftrack_session.commit() - session.commit() - return scenario + return scenario_assetbuild_shot @pytest.fixture -def scenario_shot_asset(ftrack_session, scenario_shot) -> TestScenario: - return _create_assets(ftrack_session, scenario_shot, scenario_shot.shot_ids) - +def scenario_components(ftrack_session, scenario_asset_task_version) -> TestScenario: + for assetversion_id in scenario_asset_task_version.entity_ids["AssetVersion"]: + assetversion = ftrack_session.get("AssetVersion", assetversion_id) + params = {"name": "component_{}".format(uuid.uuid4()), "parent": assetversion} + scenario_asset_task_version.create_entity(ftrack_session, "Component", params=params) + ftrack_session.commit() -@pytest.fixture -def scenario_assetbuild_asset(ftrack_session, scenario_assetbuild) -> TestScenario: - return _create_assets(ftrack_session, scenario_assetbuild, scenario_assetbuild.assetbuild_ids) + return scenario_asset_task_version diff --git a/tests/test_authoring.py b/tests/test_authoring.py index c34cd5f..136f00e 100644 --- a/tests/test_authoring.py +++ b/tests/test_authoring.py @@ -18,9 +18,9 @@ from trackteroid.entities.base import EntityCollection, Entity -def test_link_inputs(scenario_sequence): +def test_link_inputs(scenario_sequence_folder): sequences = ( - Query(Sequence).by_id(*scenario_sequence.sequence_ids).get_all(order_by="name") + Query(Sequence).by_id(*scenario_sequence_folder.entity_ids["Sequence"]).get_all(order_by="name") ) sequence1 = sequences[0] sequence2 = sequences[1] @@ -54,7 +54,7 @@ def test_link_inputs(scenario_sequence): def test_create(scenario_project, ftrack_session): - project = Query(Project).by_id(scenario_project.project_id).get_one() + project = Query(Project).by_id(scenario_project.entity_ids["Project"][0]).get_one() test_entities = [] @@ -146,7 +146,7 @@ def test_create_project(ftrack_session): def test_create_sequence(scenario_project): - test_project = Query(Project).by_id(scenario_project.project_id).get_one() + test_project = Query(Project).by_id(scenario_project.entity_ids["Project"][0]).get_one() with pytest.raises(AssertionError): test_project.children[Sequence].create() @@ -163,9 +163,9 @@ def test_create_sequence(scenario_project): assert test_project.id == queried_sequence.parent_id -def test_create_shot(scenario_sequence): +def test_create_shot(scenario_sequence_folder): test_sequence = ( - Query(Sequence).by_id(Project, scenario_sequence.project_id).get_all() + Query(Sequence).by_id(Project, scenario_sequence_folder.entity_ids["Project"][0]).get_all() ) with pytest.raises(AssertionError): @@ -177,7 +177,7 @@ def test_create_shot(scenario_sequence): assert "Ambiguous context" in str(excinfo.value) test_sequence = ( - Query(Sequence).by_id(Project, scenario_sequence.project_id).get_first() + Query(Sequence).by_id(Project, scenario_sequence_folder.entity_ids["Project"][0]).get_first() ) created_shot = test_sequence.children[Shot].create(name=str(uuid.uuid4())) @@ -191,10 +191,10 @@ def test_create_shot(scenario_sequence): assert test_sequence.id == queried_shot.parent_id -def test_create_task(scenario_shot): +def test_create_task(scenario_assetbuild_shot): test_shot = ( Query(Shot) - .by_id(Project, scenario_shot.project_id) + .by_id(Project, scenario_assetbuild_shot.entity_ids["Project"][0]) .get_all(projections=["project.project_schema._task_type_schema.types.name"]) ) task_types = [ @@ -214,7 +214,7 @@ def test_create_task(scenario_shot): ) assert "Ambiguous context" in str(context.value) - test_shot = Query(Shot).by_id(Project, scenario_shot.project_id).get_first() + test_shot = Query(Shot).by_id(Project, scenario_assetbuild_shot.entity_ids["Project"][0]).get_first() created_task = test_shot.children[Task].create( name=str(uuid.uuid4()), type=random.choice(task_types) @@ -257,9 +257,9 @@ def _construct_collection_from_ftrack_entities(ftrack_entities, session): return collection -def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_asset, ftrack_session): - shots = scenario_shot_asset.grab(ftrack_session, "Shot", ["assets.id"]) - asset_builds = scenario_assetbuild_asset.grab(ftrack_session, "AssetBuild", ["assets.id"]) +def test_create_on_ambigious_context(scenario_asset_task_version, ftrack_session): + shots = scenario_asset_task_version.grab(ftrack_session, "Shot", ["assets.id"]) + asset_builds = scenario_asset_task_version.grab(ftrack_session, "AssetBuild", ["assets.id"]) mix = [shots[0]["assets"][0], asset_builds[0]["assets"][0]] @@ -277,7 +277,7 @@ def test_create_on_ambigious_context(scenario_shot_asset, scenario_assetbuild_as assert "Ambiguous context" in str(context.value) -def test_create_note(scenario_shot): +def test_create_note(scenario_assetbuild_shot): categories = Query(NoteCategory).get_all() # Run it three times, once on an empty collection, another in a collection # with a single entity and then two (this last one will test that the parent @@ -285,7 +285,7 @@ def test_create_note(scenario_shot): for i in range(3): chosen_category = random.choice(categories) # Query every time to ensure the cache is not fooling the tests - test_shot = Query(Shot).by_id(*scenario_shot.shot_ids).get_first(projections=["notes.content"]) + test_shot = Query(Shot).by_id(*scenario_assetbuild_shot.entity_ids["Shot"]).get_first(projections=["notes.content"]) assert ( len(test_shot.notes) == i diff --git a/tests/test_entity.py b/tests/test_entity.py new file mode 100644 index 0000000..da0c8ad --- /dev/null +++ b/tests/test_entity.py @@ -0,0 +1,505 @@ +import logging +import pytest + +from arrow.arrow import Arrow +from collections import OrderedDict, Counter +from datetime import datetime +from itertools import chain + +from trackteroid import ( + SESSION, + Query +) +from trackteroid.entities.base import ( + Entity, + EmptyCollection, + EntityCollection +) +from trackteroid.entities.schematypes import CUSTOM_ATTRIBUTE_TYPE_COMPATIBILITY +from trackteroid.entities import * + + +# TODO: tests missing for `count()`, ` map()`, ` group_by_an_map` + + +def test_fetch_attributes(scenario_asset_task_version, ftrack_session): + assetversion = scenario_asset_task_version.grab(ftrack_session, "AssetVersion")[0] + query = SESSION.query("AssetVersion where id is {}".format(assetversion["id"])) + assert not query.one()["asset"] + collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([assetversion]) + collection.fetch_attributes("asset") + assert query.one()["asset"] + +def test_get_attributes(scenario_asset_task_version, ftrack_session): + assetversion = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["asset.versions"])[0] + + # first pass without auto population + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([assetversion]) + scenario_asset_task_version._assert_attributes(assetversion, assetversion_collection) + + # second with auto population + setattr(SESSION, "auto_populate", True) + scenario_asset_task_version._assert_attributes(assetversion, assetversion_collection) + setattr(SESSION, "auto_populate", False) + + # test nested attribute access + versions = [] + for _assetversion in assetversion["asset"]["versions"]: + versions.append(_assetversion["version"]) + assert versions == assetversion_collection.asset.versions.version + +def test_get_custom_attributes(scenario_asset_task_version, ftrack_session): + task = scenario_asset_task_version.grab(ftrack_session, "Task", ["custom_attributes"])[0] + task_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities([task]) + + for key, value in task["custom_attributes"].items(): + assert [value] == getattr(task_collection, "custom_{}".format(key)) + + setattr(SESSION, "auto_populate", True) + for key, value in task["custom_attributes"].items(): + assert [value] == getattr(task_collection, "custom_{}".format(key)) + setattr(SESSION, "auto_populate", False) + +def test_get_item(scenario_asset_task_version, ftrack_session): + assetversions = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(assetversions) + for i, assetversion in enumerate(assetversions): + collections = [ + assetversion_collection[assetversion["id"]], + assetversion_collection[i], + assetversion_collection[i:len(assetversions)] + ] + + # ensure result is an EntityCollection + assert any([isinstance(_, EntityCollection) for _ in collections]) + + # ensure it holds the proper entities + assert [assetversion] == [_.ftrack_entity for _ in collections[0]._entities.values()] + assert [assetversion] == [_.ftrack_entity for _ in collections[1]._entities.values()] + assert assetversions[i:len(assetversions)] == [_.ftrack_entity for _ in collections[2]._entities.values()] + +def test_contains(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=10) + + for _id, entity in assetversion_collection._entities.items(): + assert _id in assetversion_collection + assert entity in assetversion_collection + +def test_equality(scenario_asset_task_version, ftrack_session): + assetversion_collection_1 = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=20) + assetversion_collection_2 = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=20) + + assetversion_collection_3 = EntityCollection( + _cls=AssetVersion, + entities=assetversion_collection_1._entities + ) + assetversion_collection_3.query = Query(AssetVersion).by_id( + *[_.ftrack_entity["id"] for _ in assetversion_collection_3._entities.values()] + ) + + assert assetversion_collection_1 != assetversion_collection_2 + assert assetversion_collection_1 == assetversion_collection_3 + +def test_length(scenario_asset_task_version, ftrack_session): + assert 3 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "AssetVersion", limit=3)) + assert 10 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=10)) + assert 1 == len(scenario_asset_task_version.construct_simple_collection(ftrack_session, "Project", limit=1)) + +def test_from_entities(scenario_asset_task_version, ftrack_session): + entities = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=10)._entities.values() + ftrack_entities = [_.ftrack_entity for _ in entities] + + collection = EntityCollection( + _cls=AssetVersion, + entities=OrderedDict( + [(ftrack_entities[0]["id"], entities)] + ) + ) + collection.query = Query(AssetVersion).by_id(*[_["id"] for _ in ftrack_entities]) + new_collection = collection.from_entities(entities) + + assert ftrack_entities == [_.ftrack_entity for _ in new_collection._entities.values()] + +def test_is_attribute_compatible_with_value(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["version", "task"]) + + # all cases where receiver collection has only a single entity + assert assetversion_collection._is_attribute_compatible_with_value("version", 5)[0] + assert assetversion_collection._is_attribute_compatible_with_value("version", [5])[0] + assert assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=1))[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", "foobar")[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", [5, 10])[0] + assert not assetversion_collection._is_attribute_compatible_with_value("version", ["5"])[0] + + # receiver collection has multiple entries + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2)._is_attribute_compatible_with_value("version", 5)[0] + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["version"])._is_attribute_compatible_with_value( + "version", [5])[0] + assert not scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["version"])._is_attribute_compatible_with_value( + "version", [5, 10, 15])[0] + assert not assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=2))[0] + assert not assetversion_collection._is_attribute_compatible_with_value( + "task", scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1))[0] + + # rebuild our test collection + new_assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=3, required_fields=["version", "task"]) + + assert new_assetversion_collection._is_attribute_compatible_with_value("version", [5, 6, 7])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", [5])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", [5, 6])[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("version", "foobar")[0] + assert new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=3))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "Task", limit=1))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", + scenario_asset_task_version.construct_simple_collection(ftrack_session, "AssetVersion", limit=1))[0] + assert not new_assetversion_collection._is_attribute_compatible_with_value("task", EmptyCollection())[0] + +def test_is_custom_attribute_compatible_with_value(scenario_components, ftrack_session): + # test custom attributes + # test each type for each custom attribute for each entity type + for entity_type in scenario_components.entity_ids.keys(): + entity_cls = globals()[entity_type] + if issubclass(entity_cls, Entity): + try: + entity_collection = scenario_components.construct_simple_collection( + ftrack_session, entity_type, limit=1, required_fields=["custom_attributes"]) + except: + logging.info("No custom attributes found for entity type '{}'. Skipping.".format(entity_type)) + continue + entity = list(entity_collection._entities.values())[0].ftrack_entity + if entity["custom_attributes"]: + for attribute in entity["custom_attributes"]: + for _type in getattr(CUSTOM_ATTRIBUTE_TYPE_COMPATIBILITY, attribute).types: + attribute = "custom_{}".format(attribute) + assert not entity_collection._is_attribute_compatible_with_value(attribute, None)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, type)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [None])[0] + if _type is str: + assert entity_collection._is_attribute_compatible_with_value(attribute, "foobar")[0] + assert entity_collection._is_attribute_compatible_with_value( + attribute, u"anderes wort ohne umlaut ... stupid pycharm")[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, ["bla"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [5])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, ["bla", "blubb"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 5)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, {"furz": "testen"})[0] + elif _type in (int, float): + assert entity_collection._is_attribute_compatible_with_value(attribute, 42)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, 0)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, 11.12)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [42])[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [42.24])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, "hallo")[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, True)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, ["bla", "blubb"])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, [5, 2])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, {"furz": "testen"})[0] + elif _type in (bool,): + assert entity_collection._is_attribute_compatible_with_value(attribute, True)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, False)[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [True])[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, [False])[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 0)[0] + assert not entity_collection._is_attribute_compatible_with_value(attribute, 1)[0] + elif _type in (Arrow, datetime): + assert entity_collection._is_attribute_compatible_with_value(attribute, datetime.now())[0] + assert entity_collection._is_attribute_compatible_with_value(attribute, Arrow.now())[0] + +def test_setattr(scenario_asset_task_version, ftrack_session): + # test POD + def set_and_assert_equal(collection, attributes, value): + attribute_tokens = attributes.split(".") + current = collection + idx = 0 + while idx < len(attribute_tokens) - 1: + # we want to break just before the last token to call setattr + # instead of getattr + current = getattr(current, attribute_tokens[idx]) + idx += 1 + + setattr(current, attribute_tokens[-1], value) + + after_value = getattr(current, attribute_tokens[-1]) + + if isinstance(value, EntityCollection): + value_list = [entity.ftrack_entity for entity in after_value._entities.values()] + expected_list = [entity.ftrack_entity for entity in value._entities.values()] + assert value_list == expected_list + elif isinstance(value, list): + assert after_value == value + else: + assert after_value == [value] + + assetversion = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["version", "asset.name"]) + assetversions = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=5, required_fields=["version", "asset.name", "task"]) + + set_and_assert_equal(assetversion, "version", 5) + set_and_assert_equal(assetversion, "asset.name", "foobar") + + set_and_assert_equal(assetversions, "version", [1, 2, 99, 4, 5]) + + assets = assetversions.asset + assetnames = ["new_assetname_{}".format(idx) for idx in range(len(assets))] + set_and_assert_equal(assets, "name", assetnames) + + task = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["task"]).task + set_and_assert_equal(assetversion, "task", task) + + tasks = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "Task", limit=5, required_fields=["name"]) + set_and_assert_equal(assetversions, "task", tasks) + + assetversions_with_links = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=2, required_fields=["uses_versions"]) + set_and_assert_equal(assetversions_with_links[0], "uses_versions", assetversions_with_links[1]) + + #assetversions_with_links = self.construct_simple_collection(AssetVersion, 2, ["used_in_versions"]) + #set_and_assert_equal(assetversions_with_links[0], "used_in_versions", assetversions_with_links[1]) + # TODO(T13213): this does not work, as ftrack seemingly only updates these connections + # once we commit + # self.assertIn(assetversions_with_links[0], assetversions_with_links[1].used_in_versions) + +def test_group(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=5) + expected_keys = assetversion_collection._entities.keys() + grouped = assetversion_collection.group(lambda x: x.id[0]) + + assert isinstance(grouped, dict) + assert expected_keys == grouped.keys() + + for key, value in grouped.items(): + assert [key] == [_.ftrack_entity["id"] for _ in value._entities.values()] + +def test_fold(scenario_asset_task_version, ftrack_session): + assetversion_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "AssetVersion", limit=1, required_fields=["asset.versions.user.username", "user.username"]) + + user_assetversions = {} + all_assetversions = [] + + for assetversion in assetversion_collection._entities.values(): + for asset_assetversion in assetversion.ftrack_entity["asset"]["versions"]: + user = asset_assetversion["user"]["username"] + if user not in user_assetversions: + user_assetversions[user] = 1 + else: + user_assetversions[user] += 1 + all_assetversions.append(asset_assetversion) + + for user in user_assetversions: + assert user_assetversions[user] == scenario_asset_task_version.construct_collection_from_ftrack_entities( + all_assetversions).fold(0, lambda x, y: x + 1 if y.user.username == [user] else x) + +def test_keys(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert list(assetversion_collection.keys()) == [_["id"] for _ in ftrack_entities] + +def test_values(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in assetversion_collection.values()] == ftrack_entities + +def test_max(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + + assert [_.ftrack_entity["version"] for _ in assetversion_collection.max(lambda x: x.version).values()] \ + == [max([_["version"] for _ in ftrack_entities])] + + ftrack_entities = scenario_asset_task_version.grab( + ftrack_session, "Asset", ["versions.version", "versions.asset"], 10) + all_assetversions = [] + for asset in ftrack_entities: + for assetversion in asset["versions"]: + all_assetversions.append(assetversion) + all_assetversions.sort(key=lambda x: x["version"]) + asset_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in asset_collection.max(lambda x: x.versions.version).values()] \ + == [all_assetversions[-1]["asset"]] + +def test_min(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity["version"] for _ in assetversion_collection.min(lambda x: x.version).values()] \ + == [min([_["version"] for _ in ftrack_entities])] + + ftrack_entities = scenario_asset_task_version.grab( + ftrack_session, "Asset", ["versions.version", "versions.asset"], 10) + all_assetversions = [] + for asset in ftrack_entities: + for assetversion in asset["versions"]: + all_assetversions.append(assetversion) + all_assetversions.sort(key=lambda x: x["version"]) + asset_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + assert [_.ftrack_entity for _ in asset_collection.min(lambda x: x.versions.version).values()] \ + == [all_assetversions[0]["asset"]] + +def test_sort_by(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + + # section sort by version + sorted_ftrack_entities = sorted(ftrack_entities, key=lambda x: x["version"]) + unsorted_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + sorted_collection = unsorted_collection.sort(lambda x: x.version) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + + # section sort by id + sorted_ftrack_entities = sorted(ftrack_entities, key=lambda x: x["id"]) + sorted_collection = unsorted_collection.sort(lambda x: x.id) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + simple_task_collection = scenario_asset_task_version.construct_simple_collection( + ftrack_session, "Task", 10) + sorted_ftrack_entities = sorted( + [_.ftrack_entity for _ in simple_task_collection.values()], + key=lambda x: x["id"]) + sorted_collection = simple_task_collection.sort(lambda x: x.id) + assert sorted_ftrack_entities == [_.ftrack_entity for _ in sorted_collection._entities.values()] + +def test_filter(scenario_asset_task_version, ftrack_session): + repeat = True + while repeat: + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 10) + filtered_ftrack_entities = [_ for _ in ftrack_entities if 1 < _["version"] < 20] + # we can only test filtering properly if our result differs + if len(filtered_ftrack_entities) != len(ftrack_entities): + repeat = False + + # section sort by id + unfiltered_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + filtered_collection = unfiltered_collection.filter(lambda x: 1 < x.version[0] < 20) + assert filtered_ftrack_entities == [_.ftrack_entity for _ in filtered_collection._entities.values()] + +def test_partition(scenario_asset_task_version, ftrack_session): + ftrack_entities = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", ["version"], 20) + assetversion_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(ftrack_entities) + versions_one = [_ for _ in ftrack_entities if _["version"] == 1] + versions_not_one = [_ for _ in ftrack_entities if _["version"] != 1] + collection_one, collection_not_one = assetversion_collection.partition(lambda x: x.version[0] == 1) + assert versions_one == [_.ftrack_entity for _ in collection_one._entities.values()] + assert versions_not_one == [_.ftrack_entity for _ in collection_not_one._entities.values()] + +def test_union(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_b = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + entities_d = entities_c[0:2] + + ftrack_entities_union = [] + for ftrack_entities in [entities_a, entities_b, entities_c, entities_d]: + for ftrack_entity in ftrack_entities: + if not ftrack_entity in ftrack_entities_union: + ftrack_entities_union.append(ftrack_entity) + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_union = collection_a.union(collection_b, collection_c, collection_d) + + assert ftrack_entities_union == [_.ftrack_entity for _ in collection_union._entities.values()] + + typedcontext = scenario_asset_task_version.grab(ftrack_session, "Shot", limit=100) + typedcontext_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(typedcontext) + # Ftrack will never return a generic TypedContext. To recreate a situation + # that we encounter in our API, we have to set the type explicitly. + typedcontext_collection._entity = TypedContext() + emptycollection = EmptyCollection(_type=Task()) + + task = scenario_asset_task_version.grab(ftrack_session, "Task", limit=10) + task_collection = scenario_asset_task_version.construct_collection_from_ftrack_entities(task) + emptycollection.union(task_collection) + +def test_symmetric_difference(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=18) + entities_b = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=15) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=12) + entities_d = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + all_entities = [set(entities_a), set(entities_b), set(entities_c), set(entities_d)] + + ftrack_entities_difference = {key for key, val in Counter(chain.from_iterable(all_entities)).items() if val == 1} + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_difference = collection_a.symmetric_difference(collection_b, collection_c, collection_d) + assert ftrack_entities_difference == {_.ftrack_entity for _ in collection_difference._entities.values()} + +def test_difference(scenario_asset_task_version, ftrack_session): + entities_a = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=18) + entities_b = scenario_asset_task_version.grab(ftrack_session, "Task", limit=15) + entities_c = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=12) + entities_d = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=10) + all_entities = entities_a + entities_b + entities_c + entities_d + + ftrack_entities_difference = [ + _ for _ in all_entities if _ in + list(set(entities_a).difference(entities_b).difference(entities_c).difference(entities_d)) + ] + + collection_a = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_a) + collection_b = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_b) + collection_c = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_c) + collection_d = scenario_asset_task_version.construct_collection_from_ftrack_entities(entities_d) + collection_difference = collection_a.difference(collection_b, collection_c, collection_d) + assert ftrack_entities_difference == [_.ftrack_entity for _ in collection_difference._entities.values()] + + +def test_intersection(scenario_asset_task_version, ftrack_session): + all_versions = scenario_asset_task_version.grab(ftrack_session, "AssetVersion", limit=50) + expected_versions = all_versions[:10] + additional_versions = all_versions[10:] + + collection1 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + expected_versions + additional_versions[5:30]) + collection2 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + list(reversed(expected_versions)) + additional_versions[15:20]) + collection3 = scenario_asset_task_version.construct_collection_from_ftrack_entities( + expected_versions + additional_versions[1:9]) + + collection_intersection = collection1.intersection(collection2, collection3) + assert expected_versions == [_.ftrack_entity for _ in collection_intersection._entities.values()] + + # test with ambiguous parent context + shots = scenario_asset_task_version.grab(ftrack_session, "Shot", ["assets"], 2) + asset_build = scenario_asset_task_version.grab(ftrack_session, "AssetBuild", ["assets"]) + + mix = [shots[0]["assets"][0], asset_build[0]["assets"][0]] + collection4 = scenario_asset_task_version.construct_collection_from_ftrack_entities(mix) + collection5 = scenario_asset_task_version.construct_collection_from_ftrack_entities([shots[1]["assets"][0]]) + assert not collection4.intersection(collection5) + +def test_type_coercion(scenario_components, ftrack_session): + collection1 = scenario_components.construct_simple_collection(ftrack_session, "Component", limit=10) + coerced_collection1 = Component(collection1) + collection2 = scenario_components.construct_simple_collection(ftrack_session, "Sequence", limit=2) + coerced_collection2 = TypedContext(collection2) + collection_packet = [ + (collection1, coerced_collection1, Component), + (collection2, coerced_collection2, TypedContext)] + + for collection, coerced_collection, _type in collection_packet: + assert isinstance(coerced_collection, EntityCollection) + assert [_.id for _ in collection.values()] == [_.id for _ in coerced_collection.values()] + assert coerced_collection.entity_type == _type + with pytest.raises(AssertionError): + AssetVersion(collection) From a09bc1e66bff54cdc37001bf648a7afb37d58c78 Mon Sep 17 00:00:00 2001 From: Alf Kraus <99alfie@gmail.com> Date: Mon, 4 Sep 2023 07:42:41 +0200 Subject: [PATCH 6/6] wip commit fix/i10 --- tests/test_session.py | 44 +++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/tests/test_session.py b/tests/test_session.py index 8cdc4d6..2f9f79b 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -8,12 +8,13 @@ from trackteroid import ( - SESSION, Query, - AssetVersion, + Shot, User ) +from trackteroid.session import Session + @pytest.fixture def dumped_operations_file(): @@ -45,21 +46,23 @@ def initial_operations(): def test_deferred_operations(dumped_operations_file): # case 1. clear an existing cache temporarily - query_result = Query(AssetVersion).get_first(projections=["task", "version"]) - operations_count = len(SESSION.recorded_operations) + session = Session() + operations_count = len(session.recorded_operations) + query_result = Query(Shot).get_first(projections=["name"]) # do a query -> cache a result - with SESSION.deferred_operations(dumped_operations_file): - assert 0 == len(SESSION.recorded_operations) + with session.deferred_operations(dumped_operations_file): + assert 0 == len(session.recorded_operations) - avs = Query(AssetVersion).by_id(*query_result.id).get_one(projections=["task"]) - avs.version = avs.version[0] + 10 - avs2 = avs.create(task=avs.task) # -> create entity, update task, update asset - avs2.version = avs.version[0] + 10 + shot = Query(Shot).by_id(*query_result.id).get_one(projections=["name"]) + print(shot) + shot.name = "first shot" + shot2 = shot.create(name="second shot") # -> create entity, update task, update asset + shot2.name = "second shot renamed" - assert 5 == len(SESSION.recorded_operations) + assert 4 == len(session.recorded_operations) - assert operations_count == len(SESSION.recorded_operations) + assert operations_count == len(session.recorded_operations) # check the created file database database = dbm.open(dumped_operations_file, "r") @@ -68,13 +71,13 @@ def make_keys(entity_collection): lambda x: "('{}', ['{}'])".format(x.entity_type.__name__, x.id[0]) ) - expected_keys = make_keys(avs.union(avs2)) + make_keys(avs.task) + make_keys(avs.asset) + ["__operations__"] + expected_keys = make_keys(shot.union(shot2)) + ["__operations__"] assert expected_keys, database.keys() def test_reconnect_and_commit(initial_operations): - - SESSION.reconnect_and_commit(initial_operations[0]) + session = Session() + session.reconnect_and_commit(initial_operations[0]) user = Query(User).by_name(initial_operations[1]["username"]).get_one( projections=["first_name", "last_name", "is_active"] @@ -85,11 +88,12 @@ def test_reconnect_and_commit(initial_operations): def test_get_cached_collections(initial_operations): - SESSION.reconnect_and_commit(initial_operations[0]) + session = Session() + session.reconnect_and_commit(initial_operations[0]) - cached_users = SESSION.get_cached_collections()[User] + cached_users = session.get_cached_collections()[User] for cached_user in cached_users: user = Query(User).by_id(cached_user.id[0]).get_first(projections=["username", "first_name"]) - if user: - users_split = user.partition(lambda u: u.username[0] == initial_operations[1]["username"]) - assert users_split[0].first_name == [initial_operations[1]["first_name"]] + assert user, "Cached user does not exist: {}".format(cached_user.id[0]) + users_split = user.partition(lambda u: u.username[0] == initial_operations[1]["username"]) + assert users_split[0].first_name == [initial_operations[1]["first_name"]]