Skip to content

threads.ServiceThread hangs on exception #54

@r313pp

Description

@r313pp

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Steps to reproduce

import asyncio
import mode.threads

class CrashingServiceThread(mode.threads.ServiceThread):
    async def on_start(self):
        raise RuntimeError('I am here to crash')

async def main():
    await CrashingServiceThread().start()

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior

Process exits.

Actual behavior

Process hangs.

Full traceback

'CrashingServiceThread' crashed: RuntimeError('I am here to crash')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 218, in _serve
    await self._default_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 739, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 756, in _actually_start
    await self.on_start()
  File "app2.py", line 6, in on_start
    raise RuntimeError('I am here to crash')
RuntimeError: I am here to crash
--> Hangs here. ^C is pressed.
Traceback (most recent call last):
  File "app2.py", line 12, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 570, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 538, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1746, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
--> Another ^C.
Exception ignored in: <module 'threading' from '/usr/local/lib/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 1307, in _shutdown
    lock.acquire()
KeyboardInterrupt

Versions

  • Python 3.7.4
  • Mode 4.1.6
  • Debian GNU/Linux 10 (buster), Windows 8.1

Additional Info

mode.threads.ServiceThread and mode.threads.WorkerThread locks each other when an exception is raised.
I've dug up the problem, but I don't have sufficient knowledge of this lib to fix this properly.
Hire is what I've found (my comments are denoted by ###):

class WorkerThread(threading.Thread):
    def run(self) -> None:
        try:
            self.service._start_thread()
        finally:
            ### self._is_stopped.set() done hire
            ### but WorkerThread.stop() is called inside self.service._start_thread()
            ### and infinitely waits, so we never reach this
            self._set_stopped()

    def stop(self) -> None:
        self._is_stopped.wait()  ### we infinitely wait here because run() is still running up the stack
        if self.is_alive():
            self.join(threading.TIMEOUT_MAX)

class ServiceThread(Service):
    def _start_thread(self) -> None:
        # set the default event loop for this thread
        asyncio.set_event_loop(self.thread_loop)
        try:
            self.thread_loop.run_until_complete(self._serve())
        except Exception:
            # if self._serve raises an exception we need to set
            # shutdown here, since _shutdown_thread will not execute.
            ### I think this is not true, _shutdown_thread is executed
            ### in case of the exception in _serve.
            self.set_shutdown()
            raise

    async def _shutdown_thread(self) -> None:
        await self._default_stop_children()
        await self.on_thread_stop()
        self.set_shutdown()
        await self._default_stop_futures()
        if self._thread is not None:
            ### problem is here
            ### May be we shouldn't call this from inside the thread
            self._thread.stop()
        await self._default_stop_exit_stacks()

    async def _serve(self) -> None:
        try:
            # start the service
            await self._default_start()
            # allow ServiceThread.start() to return
            # when wait_for_thread is enabled.
            await self.on_thread_started()
            notify(self._thread_running)
            await self.wait_until_stopped()
        except asyncio.CancelledError:
            raise
        except BaseException as exc:  # pylint: disable=broad-except
            self.on_crash('{0!r} crashed: {1!r}', self.label, exc)
            await self.crash(exc)
            if self.beacon.root is not None:
                await self.beacon.root.data.crash(exc)
            raise
        finally:
            await self._shutdown_thread()  ### this is called in case of the exception

It also causes faust to hang as well:

import faust
import mode

app = faust.App(
    "some_app",
    broker="kafka://localhost:9092",
    # agent_supervisor=mode.CrashingSupervisor,  # This won't help
)

@app.task
async def some_task():
    raise RuntimeError("Some Error")

if __name__ == "__main__":
    app.main()

Process will hang in crashed state.

Also, it seems to be causing this faust issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions