From b772f157df55d222d18b21875326add6bf1b1fa8 Mon Sep 17 00:00:00 2001 From: David Svenson Date: Thu, 11 May 2023 17:54:14 +0200 Subject: [PATCH 1/2] Add option to reload using the django autoreloader. --- dpq/commands.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/dpq/commands.py b/dpq/commands.py index 7223db6..9eb6d55 100644 --- a/dpq/commands.py +++ b/dpq/commands.py @@ -5,6 +5,8 @@ from django.core.management.base import BaseCommand from django.db import connection +from django.utils import autoreload +from django.utils.autoreload import raise_last_exception class Worker(BaseCommand): @@ -24,6 +26,12 @@ def add_arguments(self, parser): action='store_true', help="Use LISTEN/NOTIFY to wait for events." ) + parser.add_argument( + '--reload', + action='store_true', + dest='use_reloader', + help="Use the auto-reloader.", + ) def handle_shutdown(self, sig, frame): if self._in_task: @@ -67,15 +75,23 @@ def handle(self, **options): self.delay = options['delay'] self.listen = options['listen'] + # Handle the signals for warm shutdown. + signal.signal(signal.SIGINT, self.handle_shutdown) + signal.signal(signal.SIGTERM, self.handle_shutdown) + + self.run(**options) + + def inner_run(self, **options): + # If an exception was silenced in ManagementUtility.execute in order + # to be raised in the child process, raise it now. + raise_last_exception() + with connection.cursor() as cursor: cursor.execute("SET application_name TO %s", ['dpq#{}'.format(os.getpid())]) if self.listen: self.queue.listen() try: - # Handle the signals for warm shutdown. - signal.signal(signal.SIGINT, self.handle_shutdown) - signal.signal(signal.SIGTERM, self.handle_shutdown) while True: self.run_available_tasks() @@ -84,6 +100,13 @@ def handle(self, **options): # got shutdown signal pass + def run(self, **options): + """Run the worker, using the autoreloader if needed.""" + if options['use_reloader']: + autoreload.run_with_reloader(self.inner_run, **options) + else: + self.inner_run(**options) + def wait(self): if self.listen: count = len(self.queue.wait(self.delay)) From 13fcebbf9231559086e21343df5c0dd85c806780 Mon Sep 17 00:00:00 2001 From: David Svenson Date: Fri, 12 May 2023 13:54:59 +0200 Subject: [PATCH 2/2] Add logging to indicate worker is listening. --- dpq/commands.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dpq/commands.py b/dpq/commands.py index 9eb6d55..8c737fc 100644 --- a/dpq/commands.py +++ b/dpq/commands.py @@ -90,6 +90,9 @@ def inner_run(self, **options): cursor.execute("SET application_name TO %s", ['dpq#{}'.format(os.getpid())]) if self.listen: + self.logger.info('Listening for queued tasks', extra={ + 'channel': self.queue.notify_channel, + }) self.queue.listen() try: