Skip to content

Bug using async state machine with multiprocessing #544

@gkane26

Description

@gkane26

I was trying to create a StateMachine with async callbacks in the main process and serialize it to a background process via multiprocessing.Process. Serialization works but there's an error when calling activate_initial_state. It seems that when the StateMachine is instantiated on the background process, the initial transition, which exists upon instantiation in the main process, is not created on the background. The problem seems to be that this initial transition is put in the engine's external queue via engine.start, which is called in the constructor but not when the engine is created in StateMachine.__setstate__. Calling self._enging.start() in __setstate__ seems to fix the problem. See example:

import asyncio
import multiprocessing as mp
import platform

import statemachine
from statemachine import State, StateMachine


class TrafficLightMachine(StateMachine):
    "A traffic light machine"

    green = State(initial=True)
    yellow = State()
    red = State()

    cycle = green.to(yellow) | yellow.to(red) | red.to(green)

    async def before_cycle(
        self, event: str, source: State, target: State, message: str = ""
    ):
        message = ". " + message if message else ""
        print(f"Running {event} from {source.id} to {target.id}{message}")

    async def on_enter_red(self):
        print("Don't move.")

    async def on_exit_red(self):
        print("Go ahead!")


class PatchedTrafficLightMachine(TrafficLightMachine):
    def __setstate__(self, state):
        super().__setstate__(state)
        self._engine.start()


async def run_main_async(sm):
    await sm.activate_initial_state()
    await sm.cycle()
    await sm.cycle()
    await sm.cycle()


def run_main(sm):
    asyncio.run(run_main_async(sm))


def run_process(sm):
    mp_proc = mp.Process(target=run_main, args=(sm,))
    mp_proc.start()
    mp_proc.join()


def main():
    print(f"System: {platform.uname()}")
    print(f"verion = {statemachine.__version__}\n")

    print("Trying activation with multiprocessing...")
    sm = TrafficLightMachine()
    run_process(sm)

    print("\n\nWith patch using multiprocessing...")
    smp = PatchedTrafficLightMachine()
    run_process(smp)


if __name__ == "__main__":
    main()

With output:

System: uname_result(system='Darwin', node='at-gk-macbook', release='24.6.0', version='Darwin Kernel Version 24.6.0: Wed Oct 15 21:12:15 PDT 2025; root:xnu-11417.140.69.703.14~1/RELEASE_ARM64_T6041', machine='arm64')
verion = 2.5.0

Trying activation with multiprocessing...
Process Process-1:
Traceback (most recent call last):
  File "/Users/gkane/atlab/projects/enigma-task-future/.venv/lib/python3.12/site-packages/statemachine/statemachine.py", line 267, in current_state
    state: State = self.states_map[self.current_state_value].for_instance(
                   ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: None

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.12/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/gkane/atlab/projects/enigma-task-future/packages/state-machine-tasks/tests/manual/mp_bug.py", line 45, in run_main
    asyncio.run(run_main_async(sm))
  File "/opt/homebrew/Cellar/python@3.12/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/gkane/atlab/projects/enigma-task-future/packages/state-machine-tasks/tests/manual/mp_bug.py", line 39, in run_main_async
    await sm.cycle()
  File "/Users/gkane/atlab/projects/enigma-task-future/.venv/lib/python3.12/site-packages/statemachine/engines/async_.py", line 66, in processing_loop
    result = await self._trigger(trigger_data)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/gkane/atlab/projects/enigma-task-future/.venv/lib/python3.12/site-packages/statemachine/engines/async_.py", line 85, in _trigger
    state = self.sm.current_state
            ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/gkane/atlab/projects/enigma-task-future/.venv/lib/python3.12/site-packages/statemachine/statemachine.py", line 274, in current_state
    raise InvalidStateValue(
statemachine.exceptions.InvalidStateValue: There's no current state set. In async code, did you activate the initial state? (e.g., `await sm.activate_initial_state()`)


With patch using multiprocessing...
Running cycle from green to yellow
Running cycle from yellow to red
Don't move.
Running cycle from red to green
Go ahead!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions