Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions src/qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@
The set_running_loop parameter is there for backwards compatibility and does nothing.
"""

def __init__(self, app=None, set_running_loop=False, already_running=False):
def __init__(
self, app=None, set_running_loop=False, already_running=False, qtparent=None
):
self.__app = app or QApplication.instance()
assert self.__app is not None, "No QApplication has been instantiated"
self.__is_running = False
Expand All @@ -364,8 +366,10 @@
self._read_notifiers = {}
self._write_notifiers = {}
self._timer = _SimpleTimer()
self.qtparent = qtparent or self.__app

self.__call_soon_signaller = signaller = _make_signaller(QtCore, object, tuple)

self.__call_soon_signal = signaller.signal
self.__call_soon_signal.connect(
lambda callback, args: self.call_soon(callback, *args)
Expand All @@ -374,6 +378,18 @@
assert self.__app is not None
super().__init__()

# Parent helper objects, such as timers, to this Qt parent for safe
# lifetime management.
if (
self.qtparent is not None
and self.qtparent.thread() is not QtCore.QThread.currentThread()
):
raise RuntimeError(

Check warning on line 387 in src/qasync/__init__.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on line 387
"qt_parent must belong to the same QThread as the event loop"
)
self._timer.setParent(self.qtparent)
signaller.setParent(self.qtparent)

# We have to set __is_running to True after calling
# super().__init__() because of a bug in BaseEventLoop.
if already_running:
Expand All @@ -385,8 +401,11 @@
self.__app.aboutToQuit.connect(self._after_run_forever)

# for asyncio to recognize the already running loop
asyncio.events._set_running_loop(self)

Check warning on line 404 in src/qasync/__init__.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on line 404

def get_qtparent(self):
return self.qtparent

def run_forever(self):
"""Run eventloop forever."""

Expand Down Expand Up @@ -463,26 +482,53 @@
if self.is_closed():
return

# the following code places try/catch around possibly failing
# operations for safety and to guard against implementation
# difference in the QT bindings.

self.__log_debug("Closing event loop...")
# Catch exceptions for safety between bindings.
try:
poller = self.get_proactor_event_poller()
except AttributeError:
pass
else:
poller.stop()

if self.__default_executor is not None:
self.__default_executor.shutdown()

if self.__call_soon_signal:
# Disconnect thread-safe signaller and schedule deletion of helper QObjects
try:
self.__call_soon_signal.disconnect()
except Exception: # pragma: no cover
pass
try:
# may raise if already deleted
self.__call_soon_signaller.deleteLater()
except Exception: # pragma: no cover
pass

super().close()

# Stop timers first to avoid late invocations during teardown
self._timer.stop()
self.__app = None
try:
self._timer.deleteLater()
except Exception: # pragma: no cover
pass

# Disable and disconnect any remaining notifiers before closing
for notifier in itertools.chain(
self._read_notifiers.values(), self._write_notifiers.values()
):
notifier.setEnabled(False)
notifier.activated["int"].disconnect()
self._delete_notifier(notifier)

self._read_notifiers = None
self._write_notifiers = None
self._read_notifiers.clear()
self._write_notifiers.clear()

super().close()

# Finally, clear app reference
self.__app = None

def call_later(self, delay, callback, *args, context=None):
"""Register callback to be invoked after a certain delay."""
Expand Down Expand Up @@ -531,15 +577,16 @@
pass
else:
# this is necessary to avoid race condition-like issues
existing.setEnabled(False)
existing.activated["int"].disconnect()
self._delete_notifier(existing)
# will get overwritten by the assignment below anyways

notifier = QtCore.QSocketNotifier(_fileno(fd), QtCore.QSocketNotifier.Type.Read)
notifier = QtCore.QSocketNotifier(
_fileno(fd), QtCore.QSocketNotifier.Type.Read, self.__app
)
notifier.setEnabled(True)
self.__log_debug("Adding reader callback for file descriptor %s", fd)
notifier.activated["int"].connect(
lambda: self.__on_notifier_ready(
lambda *_: self.__on_notifier_ready(
self._read_notifiers, notifier, fd, callback, args
) # noqa: C812
)
Expand All @@ -556,8 +603,7 @@
except KeyError:
return False
else:
notifier.setEnabled(False)
notifier.activated["int"].disconnect()
self._delete_notifier(notifier)
return True

def _add_writer(self, fd, callback, *args):
Expand All @@ -569,18 +615,18 @@
pass
else:
# this is necessary to avoid race condition-like issues
existing.setEnabled(False)
existing.activated["int"].disconnect()
self._delete_notifier(existing)
# will get overwritten by the assignment below anyways

notifier = QtCore.QSocketNotifier(
_fileno(fd),
QtCore.QSocketNotifier.Type.Write,
self.__app,
)
notifier.setEnabled(True)
self.__log_debug("Adding writer callback for file descriptor %s", fd)
notifier.activated["int"].connect(
lambda: self.__on_notifier_ready(
lambda *_: self.__on_notifier_ready(
self._write_notifiers, notifier, fd, callback, args
) # noqa: C812
)
Expand All @@ -597,8 +643,7 @@
except KeyError:
return False
else:
notifier.setEnabled(False)
notifier.activated["int"].disconnect()
self._delete_notifier(notifier)
return True

def __notifier_cb_wrapper(self, notifiers, notifier, fd, callback, args):
Expand All @@ -616,15 +661,14 @@
notifier.setEnabled(True)

def __on_notifier_ready(self, notifiers, notifier, fd, callback, args):
if fd not in notifiers:
if fd not in notifiers: # pragma: no cover
self._logger.warning(
"Socket notifier for fd %s is ready, even though it should "
"be disabled, not calling %s and disabling",
fd,
callback,
)
notifier.setEnabled(False)
notifier.activated["int"].disconnect()
self._delete_notifier(notifier)
return

# It can be necessary to disable QSocketNotifier when e.g. checking
Expand All @@ -636,6 +680,18 @@
self.__notifier_cb_wrapper, notifiers, notifier, fd, callback, args
)

@staticmethod
def _delete_notifier(notifier):
notifier.setEnabled(False)
try:
notifier.activated["int"].disconnect()
except Exception: # pragma: no cover
pass
try:
notifier.deleteLater()
except Exception: # pragma: no cover
pass

# Methods for interacting with threads.

def call_soon_threadsafe(self, callback, *args, context=None):
Expand Down
49 changes: 42 additions & 7 deletions src/qasync/_unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

import asyncio
import collections
import itertools
import selectors

from . import QtCore, _fileno, with_logger

EVENT_READ = 1 << 0
EVENT_WRITE = 1 << 1

# Qt5/Qt6 compatibility
NotifierEnum = getattr(QtCore.QSocketNotifier, "Type", QtCore.QSocketNotifier)


class _SelectorMapping(collections.abc.Mapping):
"""Mapping of file objects to selector keys."""
Expand All @@ -40,14 +44,15 @@

@with_logger
class _Selector(selectors.BaseSelector):
def __init__(self, parent):
def __init__(self, parent, qtparent=None):
# this maps file descriptors to keys
self._fd_to_key = {}
# read-only mapping returned by get_map()
self.__map = _SelectorMapping(self)
self.__read_notifiers = {}
self.__write_notifiers = {}
self.__parent = parent
self.__qtparent = qtparent

def select(self, *args, **kwargs):
"""Implement abstract method even though we don't need it."""
Expand Down Expand Up @@ -83,16 +88,22 @@
if key.fd in self._fd_to_key:
raise KeyError("{!r} (FD {}) is already registered".format(fileobj, key.fd))

self._fd_to_key[key.fd] = key

if events & EVENT_READ:
notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Read)
notifier = QtCore.QSocketNotifier(
key.fd, NotifierEnum.Read, self.__qtparent
)
notifier.setEnabled(True)
notifier.activated["int"].connect(self.__on_read_activated)
self.__read_notifiers[key.fd] = notifier
if events & EVENT_WRITE:
notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Write)
notifier = QtCore.QSocketNotifier(
key.fd, NotifierEnum.Write, self.__qtparent
)
notifier.setEnabled(True)
notifier.activated["int"].connect(self.__on_write_activated)
self.__write_notifiers[key.fd] = notifier

Check warning on line 106 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 91-106

return key

Expand All @@ -109,16 +120,16 @@
self.__parent._process_event(key, EVENT_WRITE & key.events)

def unregister(self, fileobj):
def drop_notifier(notifiers):
try:
notifier = notifiers.pop(key.fd)

Check warning on line 125 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 123-125
except KeyError:
except KeyError: # pragma: no cover
pass
else:
notifier.activated["int"].disconnect()
self._delete_notifier(notifier)

try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))

Check warning on line 132 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 129-132
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None

Expand All @@ -144,6 +155,10 @@
def close(self):
self._logger.debug("Closing")
self._fd_to_key.clear()
for notifier in itertools.chain(
self.__read_notifiers.values(), self.__write_notifiers.values()
):
self._delete_notifier(notifier)

Check warning on line 161 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on line 161
self.__read_notifiers.clear()
self.__write_notifiers.clear()

Expand All @@ -163,16 +178,36 @@
"""
try:
return self._fd_to_key[fd]
except KeyError:
return None

Check warning on line 182 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 181-182

@staticmethod
def _delete_notifier(notifier):
notifier.setEnabled(False)
try:
notifier.activated["int"].disconnect()

Check warning on line 188 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 186-188
except Exception: # pragma: no cover
pass
try:
notifier.deleteLater()

Check warning on line 192 in src/qasync/_unix.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on lines 191-192
except Exception: # pragma: no cover
pass


class _SelectorEventLoop(asyncio.SelectorEventLoop):
def __init__(self):
self._signal_safe_callbacks = []

selector = _Selector(self)
asyncio.SelectorEventLoop.__init__(self, selector)
try:
qtparent = self.get_qtparent()
except AttributeError: # pragma: no cover
qtparent = None
self._qtselector = _Selector(self, qtparent=qtparent)
asyncio.SelectorEventLoop.__init__(self, self._qtselector)

def close(self):
self._qtselector.close()
super().close()

def _before_run_forever(self):
pass
Expand Down
8 changes: 7 additions & 1 deletion src/qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __init__(self):
self.__event_signal.connect(self._process_events)
self.__event_poller = _EventPoller(self.__event_signal)

def get_proactor_event_poller(self):
return self.__event_poller

def _process_events(self, events):
"""Process events from proactor."""
for f, callback, transferred, key, ov in events:
Expand Down Expand Up @@ -210,6 +213,7 @@ class _EventPoller:

def __init__(self, sig_events):
self.sig_events = sig_events
self.__worker = None

def start(self, proactor):
self._logger.debug("Starting (proactor: %s)...", proactor)
Expand All @@ -218,4 +222,6 @@ def start(self, proactor):

def stop(self):
self._logger.debug("Stopping worker thread...")
self.__worker.stop()
if self.__worker is not None:
self.__worker.stop()
self.__worker = None
Loading
Loading