diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..1f8a301 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +source=django_leek +omit=venv/*, test_app/* diff --git a/.gitignore b/.gitignore index 528730d..91059df 100644 --- a/.gitignore +++ b/.gitignore @@ -1,63 +1,16 @@ -# Wing IDE -*.wpr -*.wpu - - - -# Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -#lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt +/venv/ # Unit test / coverage reports -htmlcov/ -.tox/ .coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*,cover - -# Translations -*.mo -*.pot -# Django stuff: -*.log - -# Sphinx documentation -docs/_build/ +# setuptools +/build/ +/dist/*.whl +/dist/*.gz +*.egg-info/ -# PyBuilder -target/ +# test app +db.sqlite3 diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..e33918f --- /dev/null +++ b/.pylintrc @@ -0,0 +1,3 @@ + [MASTER] + errors-only=yes + load-plugins=pylint_django diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..9d146b2 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: python +python: + - "3.5" + - "3.6" +cache: pip +script: + - pylint django_leek + - coverage run $(which django-admin) test --pythonpath=. --settings=django_leek.settings + - python -m coverage_shield diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..5b80df3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.pythonPath": "venv/bin/python" +} \ No newline at end of file diff --git a/README.md b/README.md index be9f297..5ac0a42 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,83 @@ -# Django Queue +# Django Leek +![Logo](logo.svg) -**A simple async tasks queue via a django app and SocketServer, zero configs.** +[![Build Status](https://travis-ci.com/Volumental/django-leek.svg?branch=master)](https://travis-ci.com/Volumental/django-leek) +![Code Coverage](https://badges-io.now.sh/badge/Volumental/django-leek/coverage.svg) -[Why?](#why) -[Overview](#overview) -[Install](#install) -[Settings](#settings) -[Run the Tasks Queue Server](#run-the-tasks-queue-server) -[Persistency](#persistency) -[Failed Tasks](#failed-tasks) -[Run the Tasks Queue on Another Server](#run-the-tasks-queue-on-another-server) +The _simple_ and _slick_ way to run async tasks in Django. +* Django-friendly API +* Easy to start and stop + +Based on [django-queue](https://github.com/Aviah/django-queue). ## Why? +With a healthy mix of vegetables, such as [Celery](celeryproject.org) and [Carrot](http://www.django-carrot.com/) aleady in the midst, what does `django-leek` bring? -Although Celery is pretty much the standard for a django tasks queue solution, it can be complex to install and config. +The most "lightweight" library so far has "install Redis" as step one. Although, Redis is a fantastic software, sometimes you just want a simple way of offload the webserver and run a task async, such as sending an email. -The common case for a web application queue is to send emails: you don't want the django thread to wait until the SMTP, or email provider API, finishes. But to send emails from a site without a lot of traffic, or to run other similar simple tasks, you don't need Celery. +Here `django-leek` comes to the rescue. Usage and architecture cannot be simpler, and with so few moving parts, it should be very stable, although it's still not battle tested as e.g. Celery. -This queue app is a simple, up and running queueing solution. The more complex distributed queues can wait until the website has a lot of traffic, and the scalability is really required. +With `django-leek` you can get up and running quickly The more complex distributed queues can wait until the website has a lot of traffic, and the scalability is really required. -## Overview +## Getting started +1. Install `django-leek` with pip + ```bash + $ pip install django-leek + ```` -In a nutshell, a python SocketServer runs in the background, and listens to a tcp socket. SocketServer gets the request to run a task from it's socket, puts the task on a Queue. A Worker thread picks tasks from this Queue, and runs the tasks one by one. +2. Add `django_leek` to `INSTALLED_APPS` in your `settings.py` file. -You send a task request to the SocketServer with: +3. Create tables needed - - from mysite.tasks_queue.API import push_task_to_queue - ... - push_task_to_queue(a_callable,*args,**kwargs) - -Sending email might look like: + ```bash + $ manange.py migrate + ``` + +4. Make sure the django-leek server is running. - push_task_to_queue(send_mail,subject="foo",message="baz",recipient_list=[user.email]) + ```bash + $ python manage.py leek + ``` + +5. Go nuts + + ```python + leek = Leek() + @leek.task + def send_mail(to): + do_what_ever() + send_mail.offload(to='foobar@example.com') + ``` + + You can also use the "old" as found in `django-queue` + ```python + push_task_to_queue(send_mail, to='foobar@example.com') + ``` + +6. It's easy to unit test code that in production offloads work to the Leek server. + + ```python + def _invoke(a_callable, *args, **kwargs): ++ a_callable(*args, **kwargs) + @patch('django_leek.api.push_task_to_queue', _invoke) + def test_mytest(): + send_mail.offload(to='sync@leek.com') # now runs synchronously, like a normal function + ``` + +## Development +There is a test application you can play around with when developing on `django-leek`. Example: + +1. `./manage.sh test_app runserver` - Starts the test app +2. `./manage.sh test_app leek` - Starts a leek instance for the test app +3. `./manage.sh django_leek test` - Run test suite. + +## Technical overview +In a nutshell, a python SocketServer runs in the background, listening on a tcp socket. SocketServer gets the request to run a task from it's socket, puts the task on a Queue. A Worker thread picks tasks from this Queue, and runs the tasks one by one. + ### Components 1. Python SocketServer that listens to a tcp socket. @@ -53,128 +94,39 @@ The workflow that runs an async task: 5. The `Worker` thread runs the task. ### Can this queue scale to production? - -Depends on the traffic: SocketServer is simple, but solid, and as the site gets more traffic, it's possible to move the django-queue server to another machine, separate database etc. At some point, probably, it's better to pick Celery. Until then, django-queue is a simple, solid, and no-hustle solution. - - - -## Install - -1. Add the `tasks_queue` app to your django project -2. Replace `mysite` in the `tasks_queue/worker.py` with the full path to the `tasks_queue` in your project. It should look like the path in the project's manage.py. -2. Add the tasks_queue app to `INSTALLED_APPS` -3. Migrate: - - $ manange.py migrate - -4. The tasks_queue app has an API module, with a `push_task_to_queue` function. Use this function to send callables with args and kwargs to the queue, for the async run. - +Depends on the traffic: SocketServer is simple, but solid, and as the site gets more traffic, it's possible to move the django-queue server to another machine, separate database etc. At some point, probably, it's better to pick Celery. Until then, django-leek is a simple, solid, and no-hustle solution. ## Settings - -To change the default django-queue settings, add a `TASKS_QUEUE` dictionary to your project main `settings.py` file. +To change the default django-queue settings, add a `LEEK` dictionary to your project main `settings.py` file. This is the dictionary and the defaults: + LEEK = { + 'bind': "localhost:8002", + 'host': "localhost", + 'port': 8002} - TASKS_QUEUE = { - "MAX_RETRIES":3, - "TASKS_HOST":"localhost", - "TASKS_PORT":8002} - -**MAX_RETRIES** -The number of times the Worker thread will try to run a task before skipping it. The default is 3. - -**TASKS_HOST** -The host that runs the SocketServer. the default is 'localhost'. - -**TASKS_PORT** -The port that SocketServer listens to. The default is 8002. - - - -## Run the Tasks Queue Server - - -###Start the Server - -From shell: - - $ python -m mysite.tasks_queue.service-start & - -Provide the full path, without the .py extention. - - -*Note: The tasks queue uses relative imports, and thus should run as a package. If you want to run it with the common `python service-start.py &`, -then edit the imports of the `tasks_queue` files, and convert all the imports to absolute imports.* - - -###Stop the Server - -First stop the worker thread gracefully: - - $ python tasks_queue/service-stop-worker.py - -This will send a stop event to the Worker thread. -Check that the Worker thread stopped: - - $ python tasks_queue/shell.py ping - Sent: ping - Received: (False, 'Worker Off') - -Now you can safely stop SocketServer: +**`bind`** +The leek server will bind here. - $ ps ax | grep tasks_queue - 12345 pts/1 S 7:20 python -m mysite.tasks_queue.service-start - $ sudo kill 12345 - - -###Ping the Server - -From shell: - - $ python tasks_queue/shell.py ping - Sent: ping - Received: (True, "I'm OK") - -### Tasks that are waiting on the Queue +**`host`** +The django server will connect to this host when notifying leek of jobs. -From shell: +**`port`** +The django server will connect to this port when notifying leek of jobs. - $ python tasks_queue/shell.py waiting - Sent: waiting - Received: (True, 115) - -115 tasks are waiting on the queue - -### Count total tasks handled to the Queue - -From shell: - - $ python tasks_queue/shell.py handled - Sent: handled - Received: (True, 862) - -Total of 862 tasks were handled to the Queue from the moment the thread started - - -*Note: If you use the tasks server commands a lot, add shell aliases for these commands* - -## Persistency - -### Tasks saved in the database +## Persistence +The following models are used. **QueuedTasks** The model saves every tasks pushed to the queue. The task is pickled as a `tasks_queue.tasks.Task` object, which is a simple class with a `callable`,`args` and `kwargs` attributes, and one method: `run()` - **SuccessTasks** The Worker thread saves to this model the `task_id` of every task that was carried out successfuly. **task_id** is the task's `QueuedTasks` id. **FailedTasks** -After the Worker tries to run a task several times according to `MAX_RETRIES`, and the task still fails, the Worker saves it to this model. The failed taks is saved by the `task_id`, with the exception message. Only the exception from the last run is saved. - +After the Worker tries to run a task and it fails by raising an exception, the Worker saves it to this model. The failed taks is saved by the `task_id`, with the exception message. Only the exception from the last run is saved. ### Purge Tasks @@ -191,79 +143,20 @@ The SQL to delete these tasks: In a similar way, delete the failed tasks. You can run a cron script, or other script, to purge the tasks. +## Release a new version +1. Checkout master branch +2. Make sure virtual environment is activated. `source venv/bin/activate` +3. Make sure version in `setup.py` is correct. `grep version setup.py` +4. Make sure setuptools, twine, and wheel are installed and up to date -## Failed Tasks - -### Retry failed tasks with a script - -When the Worker fails to run the task `MAX_RETRIES` times, it saves the **task_id** and the exception message to the `FailedTasks` model. - -To re-try failed tasks, after they are saved to the database, you can run this script, from shell: - - $ python tasks_queue/run_failed_tasks.py - -*Note: The path is provided in the script with `mysite`. Edit this entry with the full path to the tasks_queue in your project, similar to the path provided in the project's manage.py* - -### Connections -If most of the tasks require a specific connection, such as SMTP or a database, you can edit the Worker class and add a ping or other check for this connection **before*8 the tasks runs. If the connection is not avaialable, just try to re-connect. - -Otherwise the Worker will just run and fail a lot of tasks. - -## Run the Tasks Queue on Another Server + pip install "setuptools>=38.6.0" "twine>=1.11.0" "wheel>=0.31.0" -The same `tasks_queue` app can run from another server, and provide a seprate server queue for the async tasks. +5. Clean out any old dist packages. `rm -r dist/` +6. Build source and wheel dists. `python setup.py sdist bdist_wheel` +7. Upload to PyPI `twine upload dist/*` +8. Profit! -Here is a simple way to do it: - -1. The queue server should be similar to the main django server, just without a webserver. -2. Deploy your django code to these two remotes: the main with the web-server, and the queue server -3. Open firewalls ports between the main django server, and the queue server, for the tasks_queue TASKS_PORT, and between the tasks_queue server and the databse server, for the database ports. -5. On the django main server, set TASKS_HOST to the tasks_queue server. - -That's it! Now run the server in tasks_queue server with `service-start`, and the main django server will put the tasks to this tasks_queue server. - -*Note: "main django server" can be more than one server that run django, and push message to the django queue server* - - - -Support this project with my affiliate link| --------------------------------------------| -https://www.linode.com/?r=cc1175deb6f3ad2f2cd6285f8f82cefe1f0b3f46| - - - - - - - - +## Authors +Aviah, Silvia Scalisi and Samuel Carlsson - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +See [contributors]( https://github.com/Volumental/django-leek/graphs/contributors) for full list. diff --git a/tasks_queue/__init__.py b/django_leek/__init__.py similarity index 100% rename from tasks_queue/__init__.py rename to django_leek/__init__.py diff --git a/django_leek/api.py b/django_leek/api.py new file mode 100644 index 0000000..f8c51f8 --- /dev/null +++ b/django_leek/api.py @@ -0,0 +1,46 @@ +import socket +from functools import wraps +import json + +from . import models +from . import helpers +from .settings import HOST, PORT + + +class Leek(object): + def task(self, f, pool=None): + pool_name = pool or f.__name__ + + @wraps(f) + def _offload(*args, **kwargs): + return push_task_to_queue(f, pool_name=pool_name, *args, **kwargs) + f.offload = _offload + return f + + +class Task(object): + def __init__(self, a_callable, *args, **kwargs): + assert callable(a_callable) + self.task_callable = a_callable + self.args = args + self.kwargs = kwargs + + def __call__(self): + return self.task_callable(*self.args, **self.kwargs) + + +def push_task_to_queue(a_callable, *args, **kwargs): + """Original API""" + pool_name = kwargs.pop('pool_name', None) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + new_task = Task(a_callable, *args, **kwargs) + queued_task = helpers.save_task_to_db(new_task, pool_name) + sock.connect((HOST, PORT)) + sock.send("{}".format(queued_task.id).encode()) + received = sock.recv(1024) + sock.close() + return json.loads(received.decode()) + + +def query_task(task_id: int) -> models.Task: + return helpers.load_task(task_id) diff --git a/django_leek/helpers.py b/django_leek/helpers.py new file mode 100644 index 0000000..dc34224 --- /dev/null +++ b/django_leek/helpers.py @@ -0,0 +1,24 @@ +import pickle +import base64 + +from . import models + + +def unpack(pickled_task): + new_task = pickle.loads(base64.b64decode(pickled_task)) + return new_task + + +def serialize(task): + return base64.b64encode(pickle.dumps(task)) + + +def load_task(task_id) -> models.Task: + return models.Task.objects.get(pk=task_id) + + +def save_task_to_db(new_task, pool_name): + pickled_task = serialize(new_task) + t = models.Task(pickled_task=pickled_task, pool=pool_name) + t.save() + return t diff --git a/tasks_queue/migrations/__init__.py b/django_leek/management/__init__.py similarity index 100% rename from tasks_queue/migrations/__init__.py rename to django_leek/management/__init__.py diff --git a/django_leek/management/commands/__init__.py b/django_leek/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_leek/management/commands/leek.py b/django_leek/management/commands/leek.py new file mode 100644 index 0000000..b825901 --- /dev/null +++ b/django_leek/management/commands/leek.py @@ -0,0 +1,31 @@ +import socketserver + +from django.core.management.base import BaseCommand, CommandError +from django.conf import settings + +from django_leek.server import TaskSocketServer + + +def _endpoint(endpoint): + host, port = endpoint.split(':') + return host, int(port) + + +class Command(BaseCommand): + help = 'Starts leek worker server' + + #def add_arguments(self, parser): + # parser.add_argument('poll_id', nargs='+', type=int) + + def handle(self, *args, **options): + try: + cfg = getattr(settings, 'LEEK', {}) + host, port = _endpoint(cfg.get('bind', 'localhost:8002')) + + print('Listening on {port}'.format(port=port)) + socketserver.TCPServer.allow_reuse_address = True + server = socketserver.TCPServer((host, port), TaskSocketServer) + server.serve_forever() + except KeyboardInterrupt: + pass + print('exiting') diff --git a/tasks_queue/migrations/0001_initial.py b/django_leek/migrations/0001_initial.py similarity index 100% rename from tasks_queue/migrations/0001_initial.py rename to django_leek/migrations/0001_initial.py diff --git a/django_leek/migrations/0002_queuedtasks_pool.py b/django_leek/migrations/0002_queuedtasks_pool.py new file mode 100644 index 0000000..e656ad4 --- /dev/null +++ b/django_leek/migrations/0002_queuedtasks_pool.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11 on 2018-09-05 03:03 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_leek', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='queuedtasks', + name='pool', + field=models.CharField(max_length=256, null=True), + ), + ] diff --git a/django_leek/migrations/0003_auto_20180910_1028.py b/django_leek/migrations/0003_auto_20180910_1028.py new file mode 100644 index 0000000..5d09139 --- /dev/null +++ b/django_leek/migrations/0003_auto_20180910_1028.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11 on 2018-09-10 10:28 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_leek', '0002_queuedtasks_pool'), + ] + + operations = [ + migrations.AlterField( + model_name='queuedtasks', + name='pickled_task', + field=models.BinaryField(max_length=5000), + ), + ] diff --git a/django_leek/migrations/0004_new_task_structure_20200310_1518.py b/django_leek/migrations/0004_new_task_structure_20200310_1518.py new file mode 100644 index 0000000..9bfad2c --- /dev/null +++ b/django_leek/migrations/0004_new_task_structure_20200310_1518.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11 on 2020-03-10 09:18 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_leek', '0003_auto_20180910_1028'), + ] + + operations = [ + migrations.CreateModel( + name='Task', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('pickled_task', models.BinaryField(max_length=4096)), + ('pool', models.CharField(max_length=256, null=True)), + ('queued_at', models.DateTimeField(auto_now_add=True)), + ('started_at', models.DateTimeField(null=True)), + ('finished_at', models.DateTimeField(null=True)), + ('pickled_exception', models.BinaryField(max_length=2048, null=True)), + ('pickled_return', models.BinaryField(max_length=4096, null=True)), + ], + ), + migrations.DeleteModel( + name='FailedTasks', + ), + migrations.DeleteModel( + name='QueuedTasks', + ), + migrations.DeleteModel( + name='SuccessTasks', + ), + ] diff --git a/django_leek/migrations/__init__.py b/django_leek/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_leek/models.py b/django_leek/models.py new file mode 100644 index 0000000..c0f977c --- /dev/null +++ b/django_leek/models.py @@ -0,0 +1,36 @@ +import base64 +import pickle +from typing import Any + +from django.db import models + + +class Task(models.Model): + pickled_task = models.BinaryField(max_length=4096) + pool = models.CharField(max_length=256, null=True) + queued_at = models.DateTimeField(auto_now_add=True) + started_at = models.DateTimeField(null=True) + finished_at = models.DateTimeField(null=True) + pickled_exception = models.BinaryField(max_length=2048, null=True) + pickled_return = models.BinaryField(max_length=4096, null=True) + + @property + def exception(self): + if self.pickled_exception is None: + return None + return pickle.loads(base64.b64decode(self.pickled_exception)) + + @property + def return_value(self): + if self.pickled_return is None: + return None + return pickle.loads(base64.b64decode(self.pickled_return)) + + def started(self) -> bool: + return self.started_at is not None + + def finished(self) -> bool: + return self.finished_at is not None + + def successful(self) -> bool: + return self.finished() and self.pickled_return is not None diff --git a/django_leek/server.py b/django_leek/server.py new file mode 100644 index 0000000..cc49416 --- /dev/null +++ b/django_leek/server.py @@ -0,0 +1,120 @@ +from datetime import datetime +from sys import platform +from queue import Empty +import json +import logging +import socketserver +import multiprocessing +import queue +import threading + +from .helpers import load_task +from . import helpers +from django.utils import timezone +import django + + +log = logging.getLogger(__name__) + +MAX_QUEUE_SIZE = 10000 + +def target(queue): + django.setup() + log.info('Worker Starts') + done = False + while not done: + try: + task_id = queue.get(block=True, timeout=1) + except Empty as e: + done = True + break + + log.info('running task...') + + # Force this forked process to create its own db connection + django.db.connection.close() + + task = load_task(task_id=task_id) + pickled_task = helpers.unpack(task.pickled_task) + try: + task.started_at = timezone.now() + task.save() + return_value = pickled_task() + task.finished_at = timezone.now() + task.pickled_return = helpers.serialize(return_value) + task.save() + + log.info('...successfully') + queue.task_done() + except Exception as e: + log.exception("...task failed") + task.finished_at = timezone.now() + task.pickled_exception = helpers.serialize(e) + task.save() + + # workaround to solve problems with django + psycopg2 + # solution found here: https://stackoverflow.com/a/36580629/10385696 + django.db.connection.close() + + log.info('Worker stopped') + + +class Pool(object): + def __init__(self): + if platform == 'darwin': + # OSX does not support forking + self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) + self.worker = threading.Thread(target=target, args=(self.queue,)) + else: + self.queue = multiprocessing.Queue(maxsize=MAX_QUEUE_SIZE) + self.worker = multiprocessing.Process(target=target, args=(self.queue,)) + + +class TaskSocketServer(socketserver.BaseRequestHandler): + DEFAULT_POOL = 'default' + # pools holds a mapping from pool names to process objects + pools = {} + + def handle(self): + try: + data = self.request.recv(5000).strip() + + # assume a serialized task + log.info('Got a task') + response = None + try: + task_id = int(data.decode()) + + # Connection are closed by tasks, force it to reconnect + django.db.connections.close_all() + task = load_task(task_id=task_id) + + # Ensure pool got a worker processing it + pool_name = task.pool or self.DEFAULT_POOL + pool = self.pools.get(pool_name) + if pool is None or not pool.worker.is_alive(): + # Spawn new pool + log.info('Spawning new pool: {}'.format(pool_name)) + self.pools[pool_name] = Pool() + self.pools[pool_name].worker.start() + + self.pools[pool_name].queue.put(task_id) + + response = {'task': 'queued', 'task_id': task_id} + except Exception as e: + log.exception("failed to queue task") + response = (False, "TaskServer Put: {}".format(e).encode(),) + response = { + 'task': 'failed to queue', + 'task_id': task_id, + 'error': str(e) + } + + self.request.send(json.dumps(response).encode()) + + except OSError as e: + # in case of network error, just log + log.exception("network error") + + def finish(self): + None diff --git a/tasks_queue/service-stop-worker.py b/django_leek/service-stop-worker.py similarity index 52% rename from tasks_queue/service-stop-worker.py rename to django_leek/service-stop-worker.py index 978127f..ccf5fdd 100644 --- a/tasks_queue/service-stop-worker.py +++ b/django_leek/service-stop-worker.py @@ -1,15 +1,16 @@ import socket -from app_settings import TASKS_HOST,TASKS_PORT +from .settings import HOST, PORT -def stop_server(): +def stop_server(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((TASKS_HOST, TASKS_PORT)) + sock.connect((HOST, PORT)) sock.send("stop") received = sock.recv(1024) sock.close() - print "Sent: %s" % "stop" - print "Received: %s" % received - + print("Sent: %s" % "stop") + print("Received: %s" % received) + + if __name__ == "__main__": - stop_server() \ No newline at end of file + stop_server() diff --git a/django_leek/settings.py b/django_leek/settings.py new file mode 100644 index 0000000..0a4662a --- /dev/null +++ b/django_leek/settings.py @@ -0,0 +1,14 @@ +import sys +from django.conf import settings + + +if 'test' in sys.argv: + SECRET_KEY="just to make tests run" + DATABASES = {'default': {'ENGINE': 'django.db.backends.sqlite3', 'NAME': ':memory:'}} + INSTALLED_APPS = ['django_leek'] + + +cfg = getattr(settings, "LEEK", {}) + +HOST = cfg.get('host', "localhost") +PORT = int(cfg.get('port', "8002")) diff --git a/django_leek/tests.py b/django_leek/tests.py new file mode 100644 index 0000000..2c5351a --- /dev/null +++ b/django_leek/tests.py @@ -0,0 +1,52 @@ +import json +from unittest.mock import patch, MagicMock +import socketserver + +from django.test import TestCase +from django.core.management import call_command + +from django_leek.server import TaskSocketServer +from django_leek import helpers, api + + +@patch.object(socketserver.TCPServer, 'serve_forever') +class LeekCommandTestCase(TestCase): + def test_leek(self, serve_forever): + call_command('leek') + serve_forever.assert_called_with() + + def test_keyboard_interrupt(self, serve_forever): + serve_forever.side_effect = KeyboardInterrupt + call_command('leek') + + +def nop(): + pass + + +class TestServer(TestCase): + def setUp(self): + self.request = MagicMock() + + def _request(self, data): + if isinstance(data, Exception): + self.request.recv.side_effect = data + else: + self.request.recv.return_value = data + + def _response(self): + return b''.join(call[0][0] for call in self.request.send.call_args_list) + + def act(self): + TaskSocketServer(self.request, 'client adress', 'server') + + def test_recv_error(self): + self._request(OSError('Nuclear Winter')) + self.act() + + def test_task(self): + task = helpers.save_task_to_db(api.Task(nop), 'pool_name') + self._request(str(task.id).encode()) + self.act() + actual = json.loads(self._response().decode()) + self.assertEqual(actual, {"task": "queued", "task_id": 1}) diff --git a/logo.svg b/logo.svg new file mode 100644 index 0000000..f68d682 --- /dev/null +++ b/logo.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/manage.sh b/manage.sh new file mode 100755 index 0000000..c84ea8f --- /dev/null +++ b/manage.sh @@ -0,0 +1,5 @@ +#!/bin/sh +APP=$1 +shift +. venv/bin/activate +django-admin $@ --pythonpath=. --settings=$APP.settings diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f9592bb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +django==1.11 + +# for development +pylint +pylint-django +coverage +coverage-shield diff --git a/run_test_app.sh b/run_test_app.sh new file mode 100755 index 0000000..d79b3e8 --- /dev/null +++ b/run_test_app.sh @@ -0,0 +1,3 @@ +#!/bin/sh +./manage.sh test_app migrate +./manage.sh test_app runserver diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a15dfbc --- /dev/null +++ b/setup.py @@ -0,0 +1,37 @@ +import os +from setuptools import find_packages, setup + +# allow setup.py to be run from any path +os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) + + +with open('README.md') as f: + README = f.read() + + +setup( + name='django-leek', + version='1.0.1', + packages=find_packages(exclude=['test_app']), + install_requires = ['django>=1.11'], + include_package_data=True, + license='MIT License', + description='A simple Django app to offload tasks from main web server', + long_description=README, + long_description_content_type='text/markdown', + url='https://github.com/Volumental/django-leek', + author='Volumental', + author_email='maintainer@volumental.com', + classifiers=[ + 'Environment :: Web Environment', + 'Framework :: Django', + 'Framework :: Django :: 1.11', + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Topic :: Internet :: WWW/HTTP', + 'Topic :: Internet :: WWW/HTTP :: Dynamic Content', + ], +) diff --git a/tasks_queue/API.py b/tasks_queue/API.py deleted file mode 100644 index f5e5a42..0000000 --- a/tasks_queue/API.py +++ /dev/null @@ -1,17 +0,0 @@ -import socket -from .task import Task -from . import helpers -from .app_settings import TASKS_HOST,TASKS_PORT - - -def push_task_to_queue(a_callable,*args,**kwargs): - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - new_task = Task(a_callable,*args,**kwargs) - new_task = helpers.save_task_to_db(new_task) #returns with db_id - sock.connect((TASKS_HOST, TASKS_PORT)) - sock.send(helpers.serielize(new_task)) - received = sock.recv(1024) - sock.close() - - return received \ No newline at end of file diff --git a/tasks_queue/app_settings.py b/tasks_queue/app_settings.py deleted file mode 100644 index 6c94404..0000000 --- a/tasks_queue/app_settings.py +++ /dev/null @@ -1,19 +0,0 @@ -from django.conf import settings - - -D = { - "MAX_RETRIES":3, - "TASKS_HOST":"localhost", - "TASKS_PORT":8002 -} - - -if hasattr(settings,"TASKS_QUEUE"): - for key,value in getattr(settings,"TASKS_QUEUE"): - D[key] = value - - -MAX_RETRIES = D["MAX_RETRIES"] -TASKS_HOST = D['TASKS_HOST'] -TASKS_PORT = D['TASKS_PORT'] - diff --git a/tasks_queue/helpers.py b/tasks_queue/helpers.py deleted file mode 100644 index 89ea0bf..0000000 --- a/tasks_queue/helpers.py +++ /dev/null @@ -1,42 +0,0 @@ -import datetime -import cPickle -import base64 - -from . import models -from .task import Task - -def unpack(pickled_task): - - new_task = cPickle.loads(base64.b64decode(pickled_task)) - assert isinstance(new_task,Task) - return new_task - -def serielize(task): - - return base64.b64encode(cPickle.dumps(task)) - -def save_task_to_db(new_task): - - t = models.QueuedTasks() - pickled_task = serielize(new_task) - t = models.QueuedTasks(pickled_task=pickled_task) - t.save() - new_task.db_id = t.id - return new_task - -def save_task_failed(task,exception): - - t = models.FailedTasks(task_id=task.db_id,exception=exception.message) - t.save() - - -def save_task_success(task): - - t = models.SuccessTasks(task_id=task.db_id) - t.save() - - - - - - \ No newline at end of file diff --git a/tasks_queue/models.py b/tasks_queue/models.py deleted file mode 100644 index 0da3b17..0000000 --- a/tasks_queue/models.py +++ /dev/null @@ -1,22 +0,0 @@ -from django.db import models - -class QueuedTasks(models.Model): - - pickled_task = models.CharField(max_length=5000) #max row 65535 - queued_on = models.DateTimeField(auto_now_add=True) - -class SuccessTasks(models.Model): - - task_id = models.IntegerField() - saved_on = models.DateTimeField(auto_now_add=True) - - -class FailedTasks(models.Model): - - task_id = models.IntegerField() - exception = models.CharField(max_length=2048) - saved_on = models.DateTimeField(auto_now_add=True) - - - - \ No newline at end of file diff --git a/tasks_queue/run_failed_tasks.py b/tasks_queue/run_failed_tasks.py deleted file mode 100644 index de3a568..0000000 --- a/tasks_queue/run_failed_tasks.py +++ /dev/null @@ -1,22 +0,0 @@ -import cPickle -import base64 -import os -import sys - -# environ settings variable, should be the same as in manage.py -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") -os.environ["DJANGO_SETTINGS_MODULE"] = "mysite.settings" -import django -django.setup() - - -# edit to the correct mysite.tasks_queue path -from mysite.tasks_queue.models import FailedTasks,QueuedTasks - -Lfailed_tasks_id = FailedTasks.objects.values_list("task_id",flat=True) -tasks = QueuedTasks.objects.filter(pk__in=Lfailed_tasks_id) -for r in tasks: - task = cPickle.loads(base64.b64decode(r.pickled_task)) - task.run() - - diff --git a/tasks_queue/server.py b/tasks_queue/server.py deleted file mode 100644 index 55d4291..0000000 --- a/tasks_queue/server.py +++ /dev/null @@ -1,65 +0,0 @@ -import SocketServer -import threading -from . import worker_manager - -Dcommands = { - 'ping':worker_manager.ping, - 'waiting':worker_manager.waiting, - 'handled':worker_manager.hanled, - 'stop':worker_manager.stop -} - -class TaskSocketServer(SocketServer.BaseRequestHandler): - - def handle(self): - - try: - data = self.request.recv(5000).strip() #like the pickled task field - except Exception as e: - response = (False,"SocketServer:%s"%e.message) - self.request.send(response) - - - if data in Dcommands.keys(): - try: - worker_response = Dcommands[data]() - if worker_response == 'Worker Off': - response = (False,worker_response) - else: - response = (True,worker_response,) - except Exception as e: - response = (False,"TaskServer Command: %s"%e.message,) - else: - try: - worker_response = worker_manager.put_task(data) #a tuple - response = worker_response - except Exception as e: - response = (False,"TaskServer Put: %s"%e.message,) - - try: - self.request.send(str(response)) - except Exception as e: - self.request.send("SocketServer Response:%s"%e.message) - - -class TaskSocketServerThread(threading.Thread): - - def __init__(self,host,port): - - threading.Thread.__init__(self, name='tasks-socket-server') - self.host = host - self.port = port - self.setDaemon(1) - self.start() - - def socket_server(self): - return self.server - - - def run(self): - - self.server = SocketServer.TCPServer((self.host,self.port), TaskSocketServer) - - - - \ No newline at end of file diff --git a/tasks_queue/service-start.py b/tasks_queue/service-start.py deleted file mode 100644 index 70c9457..0000000 --- a/tasks_queue/service-start.py +++ /dev/null @@ -1,14 +0,0 @@ -# run from shell with python -m and the full python path -# to allow relative imports -from .app_settings import TASKS_PORT -from . import worker_manager -from .server import TaskSocketServerThread -import time - -worker_manager.start() -server_thread = TaskSocketServerThread('localhost',TASKS_PORT) -time.sleep(5) -socket_server = server_thread.socket_server() -socket_server.serve_forever() - - diff --git a/tasks_queue/shell.py b/tasks_queue/shell.py deleted file mode 100644 index 7386029..0000000 --- a/tasks_queue/shell.py +++ /dev/null @@ -1,18 +0,0 @@ -import socket -import sys -from app_settings import TASKS_HOST,TASKS_PORT - -def send_data(): - - data = sys.argv[1] - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((TASKS_HOST, TASKS_PORT)) - sock.send(data) - received = sock.recv(1024) - sock.close() - print "Sent: %s" % data - print "Received: %s" % received - -if __name__ == "__main__": - send_data() - diff --git a/tasks_queue/task.py b/tasks_queue/task.py deleted file mode 100644 index f874320..0000000 --- a/tasks_queue/task.py +++ /dev/null @@ -1,14 +0,0 @@ - - -class Task(object): - - def __init__(self,a_callable,*args,**kwargs): - - assert callable(a_callable) - self.task_callable = a_callable - self.args = args - self.kwargs = kwargs - self.db_id = None - - def run(self): - self.task_callable(*self.args,**self.kwargs) diff --git a/tasks_queue/tests.py b/tasks_queue/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/tasks_queue/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/tasks_queue/worker.py b/tasks_queue/worker.py deleted file mode 100644 index a62f217..0000000 --- a/tasks_queue/worker.py +++ /dev/null @@ -1,94 +0,0 @@ -# Based on the Worker class from Python in a nutshell, by Alex Martelli -import logging -import os -import sys -import threading -import Queue - -# environ settings variable, should be the same as in manage.py -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") -os.environ["DJANGO_SETTINGS_MODULE"] = "mysite.settings" -import django - -from . import app_settings -from . import helpers - -class Worker(threading.Thread): - def __init__(self,logger_name=None): - - threading.Thread.__init__(self, name="django-tasks-queue") - self._stopevent = threading.Event() - self.setDaemon(1) - self.worker_queue = Queue.Queue() - self.tasks_counter = 0 - if logger_name != None: - self.logger = logging.getLogger(logger_name) - else: - self.logger = logging - - self.start() - - def put_task_on_queue(self,new_pickled_task): - - try: - new_task = helpers.unpack(new_pickled_task) - self.tasks_counter += 1 - self.worker_queue.put(new_task) - return True,"sent" - except Exception as e: - return False,"Worker: %s"%e.message - - def run_task(self,task): - - for i in range(app_settings.MAX_RETRIES): - try: - task.run() - break - except: - if i < app_settings.MAX_RETRIES - 1: - pass - else: - raise - - def stop_thread(self, timeout=None): - """ Stop the thread and wait for it to end. """ - if self.worker_queue != None: - self._stopevent.set() - self.logger.warn('Worker stop event set') - return "Stop Set" - else: - return "Worker Off" - - def ping(self): - if self.worker_queue != None: - return "I'm OK" - else: - return "Worker Off" - - def status_waiting(self): - return self.worker_queue.qsize() - - def status_handled(self): - # all, success & failes - return self.tasks_counter - - def run(self): - # the code until the while statement does NOT run atomicaly - # a thread while loop cycle is atomic - # thread safe locals: L = threading.local(), then L.foo="baz" - django.setup() - self.logger.info('Worker Starts') - while not self._stopevent.isSet(): - if not self.worker_queue.empty(): - try: - task = self.worker_queue.get() - self.run_task(task) - except Exception as e: - helpers.save_task_failed(task,e) - else: - helpers.save_task_success(task) - - - self.worker_queue = None - self.logger.warn('Worker stopped, %s tasks handled'%self.tasks_counter) - diff --git a/tasks_queue/worker_manager.py b/tasks_queue/worker_manager.py deleted file mode 100644 index 896c1ee..0000000 --- a/tasks_queue/worker_manager.py +++ /dev/null @@ -1,26 +0,0 @@ -from .worker import Worker - -def start(): - - global worker_thread - worker_thread = Worker(logger_name='main') - -def put_task(task): - - return worker_thread.put_task_on_queue(task) - -def stop(): - - return worker_thread.stop_thread() - -def ping(): - - return worker_thread.ping() - -def waiting(): - - return worker_thread.status_waiting() - -def hanled(): - - return worker_thread.status_handled() \ No newline at end of file diff --git a/test_app/README.md b/test_app/README.md new file mode 100644 index 0000000..734ccc1 --- /dev/null +++ b/test_app/README.md @@ -0,0 +1,2 @@ +# Test App +Minimal django app that depend on the django_leek module diff --git a/test_app/__init__.py b/test_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_app/app.py b/test_app/app.py new file mode 100644 index 0000000..c5e1b5c --- /dev/null +++ b/test_app/app.py @@ -0,0 +1,80 @@ +import time +import json + +from django.conf.urls import url +from django.forms.models import model_to_dict +from django.core.serializers.json import DjangoJSONEncoder +from django.http import HttpResponse +from django.shortcuts import render +from django.db import models + +from django_leek.api import Leek, push_task_to_queue, query_task + +leek = Leek() + + +class Person(models.Model): + name = models.CharField(max_length=30) + + +@leek.task +def fail(): + ValueError('ops') + + +@leek.task +def hello(to): + person = Person.objects.create(name="to") + person.save() + + print('Hello {}!'.format(to)) + return 'ok' + + +@leek.task +def slow(seconds: int): + person = Person.objects.create(name="to") + person.save() + print('sleeping') + time.sleep(seconds) + print('ok') + return 'ok' + + +def index(request): + if 'queue' in request.GET: + # Run sync + #hello(to='sync') + + # Run async + hello.offload(to='kwargs') + fail.offload() + r = slow.offload(seconds=5) + + push_task_to_queue(hello, to='old') + return render(request, 'index.html', { + 'message': '✓ task queued', + 'task_id': r['task_id'] + }) + + return render(request, 'index.html', {'task_id': None}) + + +def query(request, task_id): + task = query_task(task_id) + data = { + 'queued_at': task.queued_at, + 'started_at': task.started_at, + 'finished_at': task.finished_at, + 'exception': str(task.exception) if task.exception else None, + 'return_value': task.return_value + } + return HttpResponse( + json.dumps(data, cls=DjangoJSONEncoder), + content_type='application/json') + + +urlpatterns = [ + url(r'^$', index), + url(r'^query/(\d+)/?$', query) +] diff --git a/test_app/migrations/0001_initial.py b/test_app/migrations/0001_initial.py new file mode 100644 index 0000000..7b37bfe --- /dev/null +++ b/test_app/migrations/0001_initial.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11 on 2018-05-08 16:46 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='Person', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(max_length=30)), + ], + ), + ] diff --git a/test_app/migrations/__init__.py b/test_app/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_app/settings.py b/test_app/settings.py new file mode 100644 index 0000000..f3176aa --- /dev/null +++ b/test_app/settings.py @@ -0,0 +1,25 @@ +import os +ALLOWED_HOSTS = ['localhost', '127.0.0.1'] +SECRET_KEY = "not so secret" + +DEBUG=True + +INSTALLED_APPS = [ + 'test_app', + 'django_leek' +] + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': 'db.sqlite3', + } +} + +ROOT_URLCONF = 'test_app.app' + +PROJECT_PATH = os.path.realpath(os.path.dirname(__file__)) +TEMPLATES = [{ + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [ os.path.join(PROJECT_PATH, 'templates/')]} +] diff --git a/test_app/templates/index.html b/test_app/templates/index.html new file mode 100644 index 0000000..77f70bc --- /dev/null +++ b/test_app/templates/index.html @@ -0,0 +1,49 @@ + + + +

{{ message }}

+
+

+ + +
+ + + \ No newline at end of file