diff --git a/dpq/commands.py b/dpq/commands.py index 7223db6..8c737fc 100644 --- a/dpq/commands.py +++ b/dpq/commands.py @@ -5,6 +5,8 @@ from django.core.management.base import BaseCommand from django.db import connection +from django.utils import autoreload +from django.utils.autoreload import raise_last_exception class Worker(BaseCommand): @@ -24,6 +26,12 @@ def add_arguments(self, parser): action='store_true', help="Use LISTEN/NOTIFY to wait for events." ) + parser.add_argument( + '--reload', + action='store_true', + dest='use_reloader', + help="Use the auto-reloader.", + ) def handle_shutdown(self, sig, frame): if self._in_task: @@ -67,15 +75,26 @@ def handle(self, **options): self.delay = options['delay'] self.listen = options['listen'] + # Handle the signals for warm shutdown. + signal.signal(signal.SIGINT, self.handle_shutdown) + signal.signal(signal.SIGTERM, self.handle_shutdown) + + self.run(**options) + + def inner_run(self, **options): + # If an exception was silenced in ManagementUtility.execute in order + # to be raised in the child process, raise it now. + raise_last_exception() + with connection.cursor() as cursor: cursor.execute("SET application_name TO %s", ['dpq#{}'.format(os.getpid())]) if self.listen: + self.logger.info('Listening for queued tasks', extra={ + 'channel': self.queue.notify_channel, + }) self.queue.listen() try: - # Handle the signals for warm shutdown. - signal.signal(signal.SIGINT, self.handle_shutdown) - signal.signal(signal.SIGTERM, self.handle_shutdown) while True: self.run_available_tasks() @@ -84,6 +103,13 @@ def handle(self, **options): # got shutdown signal pass + def run(self, **options): + """Run the worker, using the autoreloader if needed.""" + if options['use_reloader']: + autoreload.run_with_reloader(self.inner_run, **options) + else: + self.inner_run(**options) + def wait(self): if self.listen: count = len(self.queue.wait(self.delay))