From 0884c18aa2d522b09d7f2d91f38f07fe50ef137b Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Tue, 21 Jan 2025 22:09:23 -0800 Subject: [PATCH 1/5] chore: fix spelling --- zetta_utils/__init__.py | 4 ++-- zetta_utils/builder/registry.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zetta_utils/__init__.py b/zetta_utils/__init__.py index 947fd5812..076dfc39c 100644 --- a/zetta_utils/__init__.py +++ b/zetta_utils/__init__.py @@ -14,8 +14,8 @@ logger = get_logger("zetta_utils") -builder.registry.MUTLIPROCESSING_INCOMPATIBLE_CLASSES.add("mazepa") -builder.registry.MUTLIPROCESSING_INCOMPATIBLE_CLASSES.add("lightning") +builder.registry.MULTIPROCESSING_INCOMPATIBLE_CLASSES.add("mazepa") +builder.registry.MULTIPROCESSING_INCOMPATIBLE_CLASSES.add("lightning") log.add_supress_traceback_module(builder) diff --git a/zetta_utils/builder/registry.py b/zetta_utils/builder/registry.py index 8bda67146..e88594f0c 100644 --- a/zetta_utils/builder/registry.py +++ b/zetta_utils/builder/registry.py @@ -13,7 +13,7 @@ T = TypeVar("T", bound=Callable) REGISTRY: dict[str, list[RegistryEntry]] = defaultdict(list) -MUTLIPROCESSING_INCOMPATIBLE_CLASSES: set[str] = set() +MULTIPROCESSING_INCOMPATIBLE_CLASSES: set[str] = set() @attrs.frozen @@ -70,7 +70,7 @@ def register( def decorator(fn: T) -> T: nonlocal allow_parallel - for k in MUTLIPROCESSING_INCOMPATIBLE_CLASSES: + for k in MULTIPROCESSING_INCOMPATIBLE_CLASSES: if fn.__module__ is not None and k.lower() in fn.__module__.lower(): allow_parallel = False break From e0ee82c8f9d56fcdb77620b0b56d2570ed239119 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Tue, 21 Jan 2025 22:15:07 -0800 Subject: [PATCH 2/5] fix: change id_generation to cloudpickle due to dill recursion bug --- pyproject.toml | 3 ++- zetta_utils/mazepa/id_generation.py | 16 ++++------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 33f2c56cc..d3e2c6a13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "rich >= 12.6.0", "python-logging-loki >= 0.3.1", "neuroglancer >= 2.32", + "cloudpickle >= 3.1.1", "dill >= 0.3.6", "pyyaml ~= 6.0.1", "requests==2.31.0", # version conflicts otherwise @@ -34,7 +35,7 @@ keywords = ["neuroscience connectomics EM"] license = {text = "MIT"} name = "zetta_utils" readme = "README.md" -requires-python = ">3.9,<3.13" +requires-python = ">3.10,<3.13" urls = {Homepage = "https://github.com/zettaai/zetta_utils"} version = "0.0.2" diff --git a/zetta_utils/mazepa/id_generation.py b/zetta_utils/mazepa/id_generation.py index 62884393a..bb87597a2 100644 --- a/zetta_utils/mazepa/id_generation.py +++ b/zetta_utils/mazepa/id_generation.py @@ -4,7 +4,7 @@ import uuid from typing import Callable, Optional -import dill +import cloudpickle import xxhash from coolname import generate_slug @@ -42,7 +42,7 @@ def generate_invocation_id( prefix: Optional[str] = None, ) -> str: """Generate a unique and deterministic ID for a function invocation. - The ID is generated using xxhash and dill to hash the function and its arguments. + The ID is generated using xxhash and cloudpickle to hash the function and its arguments. :param fn: the function, or really any Callable, defaults to None :param args: the function arguments, or any list, defaults to None @@ -53,16 +53,8 @@ def generate_invocation_id( """ x = xxhash.xxh128() try: - x.update( - dill.dumps( - (fn, args, kwargs), - protocol=dill.DEFAULT_PROTOCOL, - byref=False, - recurse=True, - fmode=dill.FILE_FMODE, - ) - ) - except dill.PicklingError as e: + x.update(cloudpickle.dumps((fn, args, kwargs))) + except Exception as e: # pylint: disable=broad-exception-caught logger.warning(f"Failed to pickle {fn} with args {args} and kwargs {kwargs}: {e}") x.update(str(uuid.uuid4())) From ba3d7189284265a4328525c516926e14d70ce3de Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Wed, 22 Jan 2025 16:23:29 -0800 Subject: [PATCH 3/5] temp --- pyproject.toml | 2 +- tests/unit/mazepa/test_id_generation.py | 71 +++++++++++++------------ zetta_utils/mazepa/id_generation.py | 20 ++++++- 3 files changed, 56 insertions(+), 37 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d3e2c6a13..f581385b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ keywords = ["neuroscience connectomics EM"] license = {text = "MIT"} name = "zetta_utils" readme = "README.md" -requires-python = ">3.10,<3.13" +requires-python = ">3.9,<3.13" urls = {Homepage = "https://github.com/zettaai/zetta_utils"} version = "0.0.2" diff --git a/tests/unit/mazepa/test_id_generation.py b/tests/unit/mazepa/test_id_generation.py index c8a5a0b29..d234dc83d 100644 --- a/tests/unit/mazepa/test_id_generation.py +++ b/tests/unit/mazepa/test_id_generation.py @@ -226,32 +226,32 @@ def test_generate_invocation_id_subchunkable_flow() -> None: def _gen_id_calls(_) -> dict[str, str]: gen_ids = { - 'gen_id(ClassA().method, [], {"a": 1})': gen_id(ClassA().method, [], {"a": 1}), - "gen_id(ClassD1().method, [], {})": gen_id(ClassD1().method, [], {}), - "gen_id(ClassE(1).method, [], {})": gen_id(ClassE(1).method, [], {}), - "gen_id(partial(ClassA().method, 42), [], {})": gen_id( - partial(ClassA().method, 42), [], {} - ), - "gen_id(partial(ClassD1().method, 42), [], {})": gen_id( - partial(ClassD1().method, 42), [], {} - ), - "gen_id(partial(ClassE(1).method, 42), [], {})": gen_id( - partial(ClassE(1).method, 42), [], {} - ), - "gen_id(TaskableA(), [], {})": gen_id(TaskableA(), [], {}), - "gen_id(TaskableD(1), [], {})": gen_id(TaskableD(1), [], {}), - "gen_id(FlowSchema({}, ClassA().method).flow, [], {})": gen_id( - FlowSchema({}, ClassA().method).flow, [], {} - ), - "gen_id(FlowSchema({}, ClassD1().method).flow, [], {})": gen_id( - FlowSchema({}, ClassD1().method).flow, [], {} - ), - "gen_id(FlowSchema({}, ClassE(1).method).flow, [], {})": gen_id( - FlowSchema({}, ClassE(1).method).flow, [], {} - ), - "gen_id(subchunkable_flow(), [], {})": gen_id( - subchunkable_flow().fn, subchunkable_flow().args, subchunkable_flow().kwargs - ), + # 'gen_id(ClassA().method, [], {"a": 1})': gen_id(ClassA().method, [], {"a": 1}), + "gen_id(ClassD1().method, [], {})": gen_id(ClassD1().method, [], {}, None, True), + # "gen_id(ClassE(1).method, [], {})": gen_id(ClassE(1).method, [], {}), + # "gen_id(partial(ClassA().method, 42), [], {})": gen_id( + # partial(ClassA().method, 42), [], {} + # ), + # "gen_id(partial(ClassD1().method, 42), [], {})": gen_id( + # partial(ClassD1().method, 42), [], {} + # ), + # "gen_id(partial(ClassE(1).method, 42), [], {})": gen_id( + # partial(ClassE(1).method, 42), [], {} + # ), + # "gen_id(TaskableA(), [], {})": gen_id(TaskableA(), [], {}), + # "gen_id(TaskableD(1), [], {})": gen_id(TaskableD(1), [], {}), + # "gen_id(FlowSchema({}, ClassA().method).flow, [], {})": gen_id( + # FlowSchema({}, ClassA().method).flow, [], {} + # ), + # "gen_id(FlowSchema({}, ClassD1().method).flow, [], {})": gen_id( + # FlowSchema({}, ClassD1().method).flow, [], {} + # ), + # "gen_id(FlowSchema({}, ClassE(1).method).flow, [], {})": gen_id( + # FlowSchema({}, ClassE(1).method).flow, [], {} + # ), + # "gen_id(subchunkable_flow(), [], {})": gen_id( + # subchunkable_flow().fn, subchunkable_flow().args, subchunkable_flow().kwargs + # ), } return gen_ids @@ -259,16 +259,17 @@ def _gen_id_calls(_) -> dict[str, str]: def test_persistence_across_sessions() -> None: # Create two separate processes - spawn ensures a new PYTHONHASHSEED is used ctx = multiprocessing.get_context("spawn") - with ctx.Pool(processes=2) as pool: - result = pool.map(_gen_id_calls, range(2)) - - assert result[0] == result[1] + for _ in range(1): + with ctx.Pool(processes=2) as pool: + result = pool.map(_gen_id_calls, range(2)) + assert result[0] == result[1] -def test_unpickleable_fn(mocker) -> None: - # See https://github.com/uqfoundation/dill/issues/147 and possibly - # https://github.com/uqfoundation/dill/issues/56 - unpickleable_fn = mocker.MagicMock() +""" +def test_unpickleable_invocation(mocker) -> None: # gen_id will return a random UUID in case of pickle errors - assert gen_id(unpickleable_fn, [], {}) != gen_id(unpickleable_fn, [], {}) + some_fn = lambda x: x + unpicklable_arg = [1] + assert gen_id(some_fn, unpicklable_arg, {}) != gen_id(some_fn, unpicklable_arg, {}) +""" diff --git a/zetta_utils/mazepa/id_generation.py b/zetta_utils/mazepa/id_generation.py index bb87597a2..6aede9f16 100644 --- a/zetta_utils/mazepa/id_generation.py +++ b/zetta_utils/mazepa/id_generation.py @@ -4,6 +4,8 @@ import uuid from typing import Callable, Optional +from sympy import im + import cloudpickle import xxhash from coolname import generate_slug @@ -40,6 +42,7 @@ def generate_invocation_id( args: Optional[list] = None, kwargs: Optional[dict] = None, prefix: Optional[str] = None, + debug: Optional[bool] = False, ) -> str: """Generate a unique and deterministic ID for a function invocation. The ID is generated using xxhash and cloudpickle to hash the function and its arguments. @@ -51,10 +54,25 @@ def generate_invocation_id( :return: A unique, yet deterministic string that identifies (fn, args, kwargs) in the current Python environment. """ +# import dill + import pickletools + #return cloudpickle.dumps((fn, args, kwargs), protocol=dill.DEFAULT_PROTOCOL)s + if debug: + pickletools.dis(pickletools.optimize(cloudpickle.dumps((fn, args, kwargs)))) + + return str(cloudpickle.dumps((fn, args, kwargs))) + #return cloudpickle.dumps((fn, args, kwargs), protocol=dill.DEFAULT_PROTOCOL)s x = xxhash.xxh128() try: x.update(cloudpickle.dumps((fn, args, kwargs))) - except Exception as e: # pylint: disable=broad-exception-caught + #x.update(dill.dumps( + #(fn, args, kwargs), + #protocol=dill.DEFAULT_PROTOCOL, + #byref=False, + #recurse=True, + #fmode=dill.FILE_FMODE, + #)) + except Exception as e: # pylint: disable=broad-exception-caught logger.warning(f"Failed to pickle {fn} with args {args} and kwargs {kwargs}: {e}") x.update(str(uuid.uuid4())) From b882efe97f6bb8ede110d508394db090723f2617 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Wed, 22 Jan 2025 17:01:52 -0800 Subject: [PATCH 4/5] temp --- tests/unit/mazepa/test_id_generation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/mazepa/test_id_generation.py b/tests/unit/mazepa/test_id_generation.py index d234dc83d..f0318fb83 100644 --- a/tests/unit/mazepa/test_id_generation.py +++ b/tests/unit/mazepa/test_id_generation.py @@ -263,7 +263,10 @@ def test_persistence_across_sessions() -> None: with ctx.Pool(processes=2) as pool: result = pool.map(_gen_id_calls, range(2)) - assert result[0] == result[1] + #assert result[0] == result[1] + print(result[0]) + print(result[1]) + assert False """ From f0b342aedfbde15f56e5aa45e3c8e7bc753b94b5 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Wed, 22 Jan 2025 18:29:28 -0800 Subject: [PATCH 5/5] temp --- tests/unit/mazepa/test_id_generation.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/mazepa/test_id_generation.py b/tests/unit/mazepa/test_id_generation.py index f0318fb83..22ae30577 100644 --- a/tests/unit/mazepa/test_id_generation.py +++ b/tests/unit/mazepa/test_id_generation.py @@ -258,15 +258,16 @@ def _gen_id_calls(_) -> dict[str, str]: def test_persistence_across_sessions() -> None: # Create two separate processes - spawn ensures a new PYTHONHASHSEED is used - ctx = multiprocessing.get_context("spawn") + #ctx = multiprocessing.get_context("spawn") + ctx = multiprocessing.get_context("fork") for _ in range(1): with ctx.Pool(processes=2) as pool: result = pool.map(_gen_id_calls, range(2)) - #assert result[0] == result[1] + assert result[0] == result[1] print(result[0]) print(result[1]) - assert False + #assert False """