From d6e85ec5e324b7f24fd83d4b29b1a5ed7433138f Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 11 Apr 2025 14:09:28 +0530 Subject: [PATCH 01/11] remove queued function from queue --- modelq/app/base.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/modelq/app/base.py b/modelq/app/base.py index 6da702e..8176668 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -726,3 +726,23 @@ def _post_error_to_webhook_sync(self, content_str: str): ) except Exception as e2: logger.error(f"Exception while sending error to webhook: {e2}") + + def remove_task_from_queue(self, task_id: str) -> bool: + """ + Removes a task from the 'ml_tasks' queue using its task_id. + Returns True if the task was found and removed, False otherwise. + """ + tasks = self.redis_client.lrange("ml_tasks", 0, -1) + removed = False + for task_json in tasks: + try: + task_dict = json.loads(task_json) + if task_dict.get("task_id") == task_id: + self.redis_client.lrem("ml_tasks", 1, task_json) + self.redis_client.zrem("queued_requests", task_id) + removed = True + logger.info(f"Removed task {task_id} from queue.") + break + except Exception as e: + logger.error(f"Failed to process task while trying to remove: {e}") + return removed \ No newline at end of file From f6bd822c4cac0e1aa42eba3285e50fc4474b5468 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 11 Apr 2025 22:31:27 +0530 Subject: [PATCH 02/11] added cron_task decrator and feature --- README.md | 34 ++++++++++++++++++++++--- examples/fastapi/tasks.py | 3 +++ examples/text-streaming-server/tasks.py | 6 ++++- modelq/app/base.py | 34 +++++++++++++++++++++++-- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index de674ef..e187a7f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ModelQ is developed and maintained by the team at [Modelslab](https://modelslab. > - Audio generation > - And much more -## 🚀 Features +## ✨ Features - ✅ Retry support (automatic and manual) - ⏱ Timeout handling for long-running tasks @@ -29,7 +29,7 @@ ModelQ is developed and maintained by the team at [Modelslab](https://modelslab. --- -## 🛆 Installation +## 🗆 Installation ```bash pip install modelq @@ -72,6 +72,34 @@ print(task.get_result(q.redis_client)) --- +## ⏰ Cron Task Scheduling (NEW) + +ModelQ now supports periodic background tasks using the `@cron_task(interval_seconds=...)` decorator. + +Use this to run tasks at regular intervals—great for polling, periodic cleanups, or scheduled retraining! + +```python +from modelq import ModelQ +from redis import Redis +import time + +db = Redis(host="localhost", port=6379, db=0) +q = ModelQ(redis_client=db) + +@q.cron_task(interval_seconds=10) +def say_hello(): + print("Hello from cron task!") + +q.start_workers() + +while True: + time.sleep(1) +``` + +🧠 ModelQ runs these cron tasks in a background thread, using in-memory scheduling and Redis to persist the last execution timestamp—without polling Redis constantly. + +--- + ## ⚙️ Middleware Support ModelQ allows you to plug in custom middleware to hook into events: @@ -106,7 +134,7 @@ q.middleware = LoggingMiddleware() --- -## 🛠 Configuration +## 🛠️ Configuration Connect to Redis using custom config: diff --git a/examples/fastapi/tasks.py b/examples/fastapi/tasks.py index 919ca13..b51c463 100644 --- a/examples/fastapi/tasks.py +++ b/examples/fastapi/tasks.py @@ -80,5 +80,8 @@ def add_task(): def image_task(): return Image.open("lmao.png") +@modelq.cron_task(interval=10) +def cron_task(): + print("Cron task executed") modelq.start_workers() diff --git a/examples/text-streaming-server/tasks.py b/examples/text-streaming-server/tasks.py index 5dd50b1..65b46ff 100644 --- a/examples/text-streaming-server/tasks.py +++ b/examples/text-streaming-server/tasks.py @@ -43,4 +43,8 @@ def stream(params): thread.start() for new_text in streamer: - yield new_text \ No newline at end of file + yield new_text + +@modelq_app.cron_task(interval_seconds=10) +def cron_task(): + print("Cron task running...") \ No newline at end of file diff --git a/modelq/app/base.py b/modelq/app/base.py index 8176668..6650ab7 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -73,7 +73,11 @@ def __init__( self.delay_seconds = delay_seconds # Register this server in Redis (with an initial heartbeat) + self._cron_registry = {} # function_name: (callable, interval_seconds) self.register_server() + cron_thread = threading.Thread(target=self._cron_task_loop, daemon=True) + cron_thread.start() + self.worker_threads.append(cron_thread) def _connect_to_redis( self, @@ -726,7 +730,7 @@ def _post_error_to_webhook_sync(self, content_str: str): ) except Exception as e2: logger.error(f"Exception while sending error to webhook: {e2}") - + def remove_task_from_queue(self, task_id: str) -> bool: """ Removes a task from the 'ml_tasks' queue using its task_id. @@ -745,4 +749,30 @@ def remove_task_from_queue(self, task_id: str) -> bool: break except Exception as e: logger.error(f"Failed to process task while trying to remove: {e}") - return removed \ No newline at end of file + return removed + + def cron_task(self, interval_seconds: int): + def decorator(func): + self._cron_registry[func.__name__] = { + "func": func, + "interval": interval_seconds, + "last_run_key": f"cron_task:{func.__name__}:last_run" + } + self.redis_client.setnx(self._cron_registry[func.__name__]["last_run_key"], 0) + logger.info(f"Registered cron task: {func.__name__} to run every {interval_seconds} seconds") + return func + return decorator + + def _cron_task_loop(self): + while True: + now = time.time() + for name, meta in self._cron_registry.items(): + try: + last_run = float(self.redis_client.get(meta["last_run_key"]) or 0) + if now - last_run >= meta["interval"]: + logger.info(f"Running scheduled cron task: {name}") + threading.Thread(target=meta["func"], daemon=True).start() + self.redis_client.set(meta["last_run_key"], str(now)) + except Exception as e: + logger.error(f"Error running cron task {name}: {e}") + time.sleep(1) \ No newline at end of file From 62e37f7abc23a01e83cf588fd51426750d8a129f Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Thu, 29 May 2025 15:20:34 +0530 Subject: [PATCH 03/11] cli added --- README.md | 76 ++++++++++------- examples/fastapi/tasks.py | 109 ++++++++++++------------ modelq/app/base.py | 31 +------ modelq/app/cli/__init__.py | 0 modelq/app/cli/main.py | 163 ++++++++++++++++++++++++++++++++++++ poetry.lock | 167 +++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +- 7 files changed, 437 insertions(+), 112 deletions(-) create mode 100644 modelq/app/cli/__init__.py create mode 100644 modelq/app/cli/main.py create mode 100644 poetry.lock diff --git a/README.md b/README.md index e187a7f..db13674 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ ModelQ is developed and maintained by the team at [Modelslab](https://modelslab. > - Audio generation > - And much more +--- + ## ✨ Features - ✅ Retry support (automatic and manual) @@ -26,6 +28,7 @@ ModelQ is developed and maintained by the team at [Modelslab](https://modelslab. - ⚡ Fast, non-blocking concurrency using threads - 🧵 Built-in decorators to register tasks quickly - 💃 Redis-based task queueing +- 🖥️ CLI interface for orchestration --- @@ -37,6 +40,50 @@ pip install modelq --- +## 🖥️ CLI Usage + +You can interact with ModelQ using the `modelq` command-line tool. All commands require an `--app-path` parameter to locate your ModelQ instance in `module:object` format. + +### Start Workers +```bash +modelq run-workers main:modelq_app --workers 2 +``` +Start background worker threads for executing tasks. + +### Check Queue Status +```bash +modelq status --app-path main:modelq_app +``` +Show number of servers, queued tasks, and registered task types. + +### List Queued Tasks +```bash +modelq list-queued --app-path main:modelq_app +``` +Display a list of all currently queued task IDs and their names. + +### Clear the Queue +```bash +modelq clear-queue --app-path main:modelq_app +``` +Remove all tasks from the queue. + +### Remove a Specific Task +```bash +modelq remove-task --app-path main:modelq_app --task-id +``` +Remove a specific task from the queue by ID. + +### Version +```bash +modelq version +``` +Print the current version of ModelQ CLI. + +More commands like `requeue-stuck`, `prune-results`, and `get-task-status` are coming soon. + +--- + ## 🧠 Basic Usage ```python @@ -72,34 +119,6 @@ print(task.get_result(q.redis_client)) --- -## ⏰ Cron Task Scheduling (NEW) - -ModelQ now supports periodic background tasks using the `@cron_task(interval_seconds=...)` decorator. - -Use this to run tasks at regular intervals—great for polling, periodic cleanups, or scheduled retraining! - -```python -from modelq import ModelQ -from redis import Redis -import time - -db = Redis(host="localhost", port=6379, db=0) -q = ModelQ(redis_client=db) - -@q.cron_task(interval_seconds=10) -def say_hello(): - print("Hello from cron task!") - -q.start_workers() - -while True: - time.sleep(1) -``` - -🧠 ModelQ runs these cron tasks in a background thread, using in-memory scheduling and Redis to persist the last execution timestamp—without polling Redis constantly. - ---- - ## ⚙️ Middleware Support ModelQ allows you to plug in custom middleware to hook into events: @@ -160,4 +179,3 @@ ModelQ is released under the MIT License. ## 🤝 Contributing We welcome contributions! Open an issue or submit a PR at [github.com/modelslab/modelq](https://github.com/modelslab/modelq). - diff --git a/examples/fastapi/tasks.py b/examples/fastapi/tasks.py index b51c463..d24115d 100644 --- a/examples/fastapi/tasks.py +++ b/examples/fastapi/tasks.py @@ -2,19 +2,17 @@ from threading import Thread from modelq import ModelQ from modelq.app.middleware import Middleware -from PIL import Image +# from PIL import Image import time -from TTS.tts.configs.xtts_config import XttsConfig -from TTS.tts.models.xtts import Xtts import os -import torch -import numpy as np +# import torch +# import numpy as np from redis import Redis -import base64 +# import base64 imagine_db = Redis(host="localhost", port=6379, db=0) -modelq = ModelQ(redis_client = imagine_db) +modelq_app = ModelQ(redis_client = imagine_db) class CurrentModel: def __init__(self): @@ -26,62 +24,69 @@ def load_model(self): model_path = "/workspace/XTTS-v2" model_name = "tts_models/multilingual/multi-dataset/xtts_v2" - - self.config = XttsConfig() - self.config.load_json(os.path.join(model_path, "config.json")) - self.model = Xtts.init_from_config(self.config) - self.model.load_checkpoint(self.config, checkpoint_dir=model_path, eval=True) - self.model.to(device) + self.model = model_path + print(f"Loading model from {model_path}...") CURRENT_MODEL = CurrentModel() class BeforeWorker(Middleware): def before_worker_boot(self): + print("Loading model...") CURRENT_MODEL.load_model() -modelq.middleware = BeforeWorker() - - -def wav_postprocess(wav): - """Post process the output waveform""" - if isinstance(wav, list): - wav = torch.cat(wav, dim=0) - wav = wav.clone().detach().cpu().numpy() - wav = np.clip(wav, -1, 1) - wav = (wav * 32767).astype(np.int16) - return wav - -@modelq.task(timeout=15, stream=True) -def stream(params): - time.sleep(10) - gpt_cond_latent, speaker_embedding = CURRENT_MODEL.model.get_conditioning_latents(audio_path=["/workspace/XTTS-v2/samples/en_sample.wav"]) - streamer = CURRENT_MODEL.model.inference_stream( - params, - "en", - gpt_cond_latent, - speaker_embedding, - stream_chunk_size=150, - enable_text_splitting=True, - ) - for chunk in streamer: - processed_chunk = wav_postprocess(chunk) - processed_bytes = processed_chunk.tobytes() - base64_chunk = base64.b64encode(processed_bytes).decode("utf-8") - yield base64_chunk - - -@modelq.task() +modelq_app.middleware = BeforeWorker() + + +# def wav_postprocess(wav): +# """Post process the output waveform""" +# if isinstance(wav, list): +# wav = torch.cat(wav, dim=0) +# wav = wav.clone().detach().cpu().numpy() +# wav = np.clip(wav, -1, 1) +# wav = (wav * 32767).astype(np.int16) +# return wav + +# @modelq.task(timeout=15, stream=True) +# def stream(params): +# time.sleep(10) +# gpt_cond_latent, speaker_embedding = CURRENT_MODEL.model.get_conditioning_latents(audio_path=["/workspace/XTTS-v2/samples/en_sample.wav"]) +# streamer = CURRENT_MODEL.model.inference_stream( +# params, +# "en", +# gpt_cond_latent, +# speaker_embedding, +# stream_chunk_size=150, +# enable_text_splitting=True, +# ) +# for chunk in streamer: +# processed_chunk = wav_postprocess(chunk) +# processed_bytes = processed_chunk.tobytes() +# base64_chunk = base64.b64encode(processed_bytes).decode("utf-8") +# yield base64_chunk + + +@modelq_app.task() def add_task(): time.sleep(20) return 2 + 3 -@modelq.task(timeout=15) -def image_task(): - return Image.open("lmao.png") +# @modelq.task(timeout=15) +# def image_task(): +# return Image.open("lmao.png") + +# @modelq_app.cron_task(interval_seconds=10) +# def cron_task(): +# print(CURRENT_MODEL.model) +# print("Cron task executed") + -@modelq.cron_task(interval=10) -def cron_task(): - print("Cron task executed") +# if __name__ == "__main__": +# modelq_app.start_workers() -modelq.start_workers() +# # Keep the worker running indefinitely +# try: +# while True: +# time.sleep(1) +# except KeyboardInterrupt: +# print("\nGracefully shutting down...") \ No newline at end of file diff --git a/modelq/app/base.py b/modelq/app/base.py index 6650ab7..4bccebd 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -73,11 +73,7 @@ def __init__( self.delay_seconds = delay_seconds # Register this server in Redis (with an initial heartbeat) - self._cron_registry = {} # function_name: (callable, interval_seconds) self.register_server() - cron_thread = threading.Thread(target=self._cron_task_loop, daemon=True) - cron_thread.start() - self.worker_threads.append(cron_thread) def _connect_to_redis( self, @@ -750,29 +746,4 @@ def remove_task_from_queue(self, task_id: str) -> bool: except Exception as e: logger.error(f"Failed to process task while trying to remove: {e}") return removed - - def cron_task(self, interval_seconds: int): - def decorator(func): - self._cron_registry[func.__name__] = { - "func": func, - "interval": interval_seconds, - "last_run_key": f"cron_task:{func.__name__}:last_run" - } - self.redis_client.setnx(self._cron_registry[func.__name__]["last_run_key"], 0) - logger.info(f"Registered cron task: {func.__name__} to run every {interval_seconds} seconds") - return func - return decorator - - def _cron_task_loop(self): - while True: - now = time.time() - for name, meta in self._cron_registry.items(): - try: - last_run = float(self.redis_client.get(meta["last_run_key"]) or 0) - if now - last_run >= meta["interval"]: - logger.info(f"Running scheduled cron task: {name}") - threading.Thread(target=meta["func"], daemon=True).start() - self.redis_client.set(meta["last_run_key"], str(now)) - except Exception as e: - logger.error(f"Error running cron task {name}: {e}") - time.sleep(1) \ No newline at end of file + \ No newline at end of file diff --git a/modelq/app/cli/__init__.py b/modelq/app/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modelq/app/cli/main.py b/modelq/app/cli/main.py new file mode 100644 index 0000000..1d29b29 --- /dev/null +++ b/modelq/app/cli/main.py @@ -0,0 +1,163 @@ +import typer +import importlib +import os +import sys +import logging +import time +import signal +import threading +from typing import Optional + +app = typer.Typer(help="ModelQ CLI for managing and queuing tasks.") + +# Global variable to handle graceful shutdown +shutdown_event = threading.Event() + +def setup_logging(log_level: str = "INFO"): + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f'Invalid log level: {log_level}') + + logging.basicConfig( + level=numeric_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler("modelq.log") + ] + ) + +def signal_handler(signum, frame): + print(f"\n🛑 Received signal {signum}. Shutting down gracefully...") + shutdown_event.set() + +def load_app_instance(app_path: str): + if ":" not in app_path: + typer.echo("❌ Format should be module:object (e.g., 'myapp:modelq_instance')") + raise typer.Exit(1) + + module_name, var_name = app_path.split(":", 1) + + sys.path.insert(0, os.getcwd()) + + try: + mod = importlib.import_module(module_name) + app_instance = getattr(mod, var_name) + if not hasattr(app_instance, 'start_workers'): + typer.echo(f"❌ {var_name} is not a valid ModelQ instance") + raise typer.Exit(1) + return app_instance + except Exception as e: + typer.echo(f"❌ Failed to load app instance: {e}") + raise typer.Exit(1) + +@app.command() +def version(): + typer.echo("ModelQ v0.1.0") + +@app.command() +def run_workers( + app_path: str, + workers: int = typer.Option(1, "--workers", "-w", help="Number of worker threads"), + log_level: str = typer.Option("INFO", "--log-level", "-l", help="Logging level"), + log_file: Optional[str] = typer.Option(None, "--log-file", "-f", help="Log file path") +): + setup_logging(log_level) + logger = logging.getLogger("modelq.cli") + + app_instance = load_app_instance(app_path) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + typer.echo(f"🚀 Starting ModelQ workers...") + typer.echo(f" Workers: {workers}") + typer.echo(f" Log Level: {log_level}") + typer.echo(f" Redis Host: {getattr(app_instance.redis_client.connection_pool, 'connection_kwargs', {}).get('host', 'unknown')}") + typer.echo(f" Registered Tasks: {', '.join(app_instance.allowed_tasks) if app_instance.allowed_tasks else 'None'}") + typer.echo(" Press Ctrl+C to stop") + typer.echo("-" * 50) + + try: + logger.info(f"Starting {workers} worker(s)") + app_instance.start_workers(no_of_workers=workers) + typer.echo("✅ Workers are running. Waiting for tasks...") + while not shutdown_event.is_set(): + time.sleep(1) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + except Exception as e: + logger.error(f"Error running workers: {e}") + typer.echo(f"❌ Error: {e}") + raise typer.Exit(1) + finally: + logger.info("Shutting down workers...") + typer.echo("🛑 Shutting down workers...") + typer.echo("✅ Shutdown complete") + +@app.command() +def status(app_path: str): + app_instance = load_app_instance(app_path) + + try: + servers = app_instance.get_registered_server_ids() + queued_tasks = app_instance.get_all_queued_tasks() + + typer.echo("📊 ModelQ Status:") + typer.echo(f" Registered Servers: {len(servers)}") + typer.echo(f" Queued Tasks: {len(queued_tasks)}") + typer.echo(f" Allowed Tasks: {', '.join(app_instance.allowed_tasks) if app_instance.allowed_tasks else 'None'}") + + if servers: + typer.echo("\n🖥️ Active Servers:") + for server_id in servers: + typer.echo(f" - {server_id}") + + except Exception as e: + typer.echo(f"❌ Failed to get status: {e}") + raise typer.Exit(1) + +@app.command() +def clear_queue(app_path: str): + """Clear all tasks from the ML queue.""" + app_instance = load_app_instance(app_path) + try: + app_instance.delete_queue() + typer.echo("🗑️ Cleared all tasks from the queue.") + except Exception as e: + typer.echo(f"❌ Failed to clear queue: {e}") + raise typer.Exit(1) + +@app.command() +def remove_task(app_path: str, task_id: str): + """Remove a task from the queue by task ID.""" + app_instance = load_app_instance(app_path) + try: + removed = app_instance.remove_task_from_queue(task_id) + if removed: + typer.echo(f"🗑️ Task {task_id} removed from queue.") + else: + typer.echo(f"⚠️ Task {task_id} not found in queue.") + except Exception as e: + typer.echo(f"❌ Failed to remove task: {e}") + raise typer.Exit(1) + +@app.command() +def list_queued(app_path: str): + """List all currently queued tasks.""" + app_instance = load_app_instance(app_path) + try: + tasks = app_instance.get_all_queued_tasks() + if not tasks: + typer.echo("📭 No tasks in queue.") + return + + typer.echo(f"📋 Queued Tasks ({len(tasks)}):") + for t in tasks: + typer.echo(f" - {t['task_id']} ({t['task_name']})") + except Exception as e: + typer.echo(f"❌ Failed to list queued tasks: {e}") + raise typer.Exit(1) + +if __name__ == "__main__": + app() diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..e88a909 --- /dev/null +++ b/poetry.lock @@ -0,0 +1,167 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "async-timeout" +version = "5.0.1" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, + {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, +] + +[[package]] +name = "click" +version = "8.1.8" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, + {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "markdown-it-py" +version = "3.0.0" +description = "Python port of markdown-it. Markdown parsing, done right!" +optional = false +python-versions = ">=3.8" +files = [ + {file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"}, + {file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"}, +] + +[package.dependencies] +mdurl = ">=0.1,<1.0" + +[package.extras] +benchmarking = ["psutil", "pytest", "pytest-benchmark"] +code-style = ["pre-commit (>=3.0,<4.0)"] +compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "mistletoe (>=1.0,<2.0)", "mistune (>=2.0,<3.0)", "panflute (>=2.3,<3.0)"] +linkify = ["linkify-it-py (>=1,<3)"] +plugins = ["mdit-py-plugins"] +profiling = ["gprof2dot"] +rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] +testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] + +[[package]] +name = "mdurl" +version = "0.1.2" +description = "Markdown URL utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"}, + {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, +] + +[[package]] +name = "pygments" +version = "2.19.1" +description = "Pygments is a syntax highlighting package written in Python." +optional = false +python-versions = ">=3.8" +files = [ + {file = "pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c"}, + {file = "pygments-2.19.1.tar.gz", hash = "sha256:61c16d2a8576dc0649d9f39e089b5f02bcd27fba10d8fb4dcc28173f7a45151f"}, +] + +[package.extras] +windows-terminal = ["colorama (>=0.4.6)"] + +[[package]] +name = "redis" +version = "4.6.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.7" +files = [ + {file = "redis-4.6.0-py3-none-any.whl", hash = "sha256:e2b03db868160ee4591de3cb90d40ebb50a90dd302138775937f6a42b7ed183c"}, + {file = "redis-4.6.0.tar.gz", hash = "sha256:585dc516b9eb042a619ef0a39c3d7d55fe81bdb4df09a52c9cdde0d07bf1aa7d"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""} + +[package.extras] +hiredis = ["hiredis (>=1.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"] + +[[package]] +name = "rich" +version = "14.0.0" +description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "rich-14.0.0-py3-none-any.whl", hash = "sha256:1c9491e1951aac09caffd42f448ee3d04e58923ffe14993f6e83068dc395d7e0"}, + {file = "rich-14.0.0.tar.gz", hash = "sha256:82f1bc23a6a21ebca4ae0c45af9bdbc492ed20231dcb63f297d6d1021a9d5725"}, +] + +[package.dependencies] +markdown-it-py = ">=2.2.0" +pygments = ">=2.13.0,<3.0.0" +typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.11\""} + +[package.extras] +jupyter = ["ipywidgets (>=7.5.1,<9)"] + +[[package]] +name = "shellingham" +version = "1.5.4" +description = "Tool to Detect Surrounding Shell" +optional = false +python-versions = ">=3.7" +files = [ + {file = "shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686"}, + {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, +] + +[[package]] +name = "typer" +version = "0.16.0" +description = "Typer, build great CLIs. Easy to code. Based on Python type hints." +optional = false +python-versions = ">=3.7" +files = [ + {file = "typer-0.16.0-py3-none-any.whl", hash = "sha256:1f79bed11d4d02d4310e3c1b7ba594183bcedb0ac73b27a9e5f28f6fb5b98855"}, + {file = "typer-0.16.0.tar.gz", hash = "sha256:af377ffaee1dbe37ae9440cb4e8f11686ea5ce4e9bae01b84ae7c63b87f1dd3b"}, +] + +[package.dependencies] +click = ">=8.0.0" +rich = ">=10.11.0" +shellingham = ">=1.3.0" +typing-extensions = ">=3.7.4.3" + +[[package]] +name = "typing-extensions" +version = "4.13.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"}, + {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, +] + +[metadata] +lock-version = "2.0" +python-versions = "^3.9" +content-hash = "881b3e95c9ae6bb03b6d054b62eed492359c8c426b476d31dae59e7aa8687387" diff --git a/pyproject.toml b/pyproject.toml index 5353b95..41f91a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,12 +6,13 @@ authors = ["Tanmaypatil123 "] readme = "README.md" [tool.poetry.scripts] -modelq = "modelq.app:run_worker" +modelq = "modelq.app.cli.main:app" [tool.poetry.dependencies] python = "^3.9" click = "^8.0.0" redis = "^4.0.0" +typer = "^0.16.0" [build-system] requires = ["poetry-core"] From cb8bb51b03218cb820d41863f893ccfa37fca2e9 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 30 May 2025 13:32:04 +0530 Subject: [PATCH 04/11] pydantic support added --- modelq/app/base.py | 119 ++++++++++++++++++++++++++++++++------------- test.py | 33 +++++++++++++ 2 files changed, 117 insertions(+), 35 deletions(-) create mode 100644 test.py diff --git a/modelq/app/base.py b/modelq/app/base.py index 4bccebd..91c4964 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -14,6 +14,9 @@ from modelq.app.tasks import Task from modelq.exceptions import TaskProcessingError, TaskTimeoutError,RetryTaskException from modelq.app.middleware import Middleware + +from pydantic import BaseModel, ValidationError +from typing import Optional, Dict, Any, Type import os logging.basicConfig( @@ -342,42 +345,55 @@ def task( timeout: Optional[int] = None, stream: bool = False, retries: int = 0, + schema: Optional[Type[BaseModel]] = None, # ▶ pydantic + returns: Optional[Type[BaseModel]] = None, # ▶ pydantic ): - """ - Decorator to register a function as a task. - We create a Task object, set created_at + queued_at, then enqueue. - """ def decorator(func): + # make the schema classes discoverable at run time + func._mq_schema = schema # ▶ pydantic + func._mq_returns = returns # ▶ pydantic + @functools.wraps(func) def wrapper(*args, **kwargs): - task_name = func.__name__ + # --------------------------- PRODUCER-SIDE VALIDATION + if schema is not None: # ▶ pydantic + try: + # allow either a ready-made model instance + # or raw kwargs/args that build one + if len(args) == 1 and isinstance(args[0], schema): + validated = args[0] + else: + validated = schema(*args, **kwargs) + except ValidationError as ve: + raise TaskProcessingError( + func.__name__, f"Input validation failed – {ve}" + ) + payload_data = validated.model_dump(mode="json") # zero-copy + args, kwargs = (), {} # we’ll carry payload in kwargs only + else: + payload_data = {"args": args, "kwargs": kwargs} + payload = { - "args": args, - "kwargs": kwargs, + "data": payload_data, # ▶ pydantic – typed or raw "timeout": timeout, "stream": stream, "retries": retries, } - # Create the Task object - task = task_class(task_name=task_name, payload=payload) + + task = task_class(task_name=func.__name__, payload=payload) if stream: task.stream = True - # Convert to dict task_dict = task.to_dict() - - # Record creation and queue time now_ts = time.time() task_dict["created_at"] = now_ts - task_dict["queued_at"] = now_ts + task_dict["queued_at"] = now_ts - # Enqueue the task self.enqueue_task(task_dict, payload=payload) - - # Keep a record of the task in Redis - self.redis_client.set(f"task:{task.task_id}", json.dumps(task_dict),ex=86400) + self.redis_client.set(f"task:{task.task_id}", + json.dumps(task_dict), + ex=86400) return task - setattr(self, func.__name__, func) self.allowed_tasks.add(func.__name__) self.register_server() @@ -548,52 +564,84 @@ def process_task(self, task: Task) -> None: logger.error(f"Task {task.task_name} failed - function not found.") raise TaskProcessingError(task.task_name, "Task function not found") - logger.info( - f"Processing task: {task.task_name} " - f"with args: {task.payload.get('args', [])}, " - f"kwargs: {task.payload.get('kwargs', {})}" - ) + # ---- New: Check for Pydantic schema + schema_cls = getattr(task_function, "_mq_schema", None) + return_cls = getattr(task_function, "_mq_returns", None) + + # ---- Prepare args/kwargs based on schema + if schema_cls is not None: + try: + # Accept either dict or JSON-serialized dict + payload_data = task.payload["data"] + if isinstance(payload_data, str): + import json + payload_data = json.loads(payload_data) + validated_in = schema_cls(**payload_data) + except Exception as ve: + task.status = "failed" + task.result = f"Input validation failed – {ve}" + self._store_final_task_state(task, success=False) + logger.error(f"[ModelQ] Input validation failed: {ve}") + raise TaskProcessingError(task.task_name, f"Input validation failed: {ve}") + call_args = (validated_in,) + call_kwargs = {} + else: + # Legacy: no schema + call_args = tuple(task.payload.get("args", ())) + call_kwargs = dict(task.payload.get("kwargs", {})) timeout = task.payload.get("timeout", None) stream = task.payload.get("stream", False) + logger.info( + f"Processing task: {task.task_name} " + f"with args: {call_args}, kwargs: {call_kwargs}" + ) + if stream: # Stream results - for result in task_function( - *task.payload["args"], - **task.payload["kwargs"], - ): + for result in task_function(*call_args, **call_kwargs): + task.status = "in_progress" self.redis_client.xadd( f"task_stream:{task.task_id}", - {"result": json.dumps(result)} + {"result": json.dumps(result, default=str)} ) # Once streaming is done task.status = "completed" - self.redis_client.expire(f"task_stream:{task.task_id}", 3600) # Expires in 1 hour - # Mark finished_at in the final store + self.redis_client.expire(f"task_stream:{task.task_id}", 3600) self._store_final_task_state(task, success=True) - else: # Standard execution with optional timeout if timeout: result = self._run_with_timeout( task_function, timeout, - *task.payload["args"], - **task.payload["kwargs"] + *call_args, **call_kwargs ) else: result = task_function( - *task.payload["args"], - **task.payload["kwargs"] + *call_args, **call_kwargs ) + # ---- New: Output validation for standard result + if return_cls is not None: + try: + if not isinstance(result, return_cls): + result = return_cls(**(result if isinstance(result, dict) else result.__dict__)) + except Exception as ve: + task.status = "failed" + task.result = f"Output validation failed – {ve}" + self._store_final_task_state(task, success=False) + logger.error(f"[ModelQ] Output validation failed: {ve}") + raise TaskProcessingError(task.task_name, f"Output validation failed: {ve}") + result_str = task._convert_to_string(result) task.result = result_str task.status = "completed" self._store_final_task_state(task, success=True) logger.info(f"Task {task.task_name} completed successfully.") + except RetryTaskException as e: logger.warning(f"Task {task.task_name} requested retry: {e}") new_task_dict = task.to_dict() @@ -617,6 +665,7 @@ def process_task(self, task: Task) -> None: finally: self.redis_client.srem("processing_tasks", task.task_id) + def _store_final_task_state(self, task: Task, success: bool): """ Persists the final status/result of the task in Redis, adding finished_at. diff --git a/test.py b/test.py new file mode 100644 index 0000000..264a89a --- /dev/null +++ b/test.py @@ -0,0 +1,33 @@ +from pydantic import BaseModel, Field +from modelq import ModelQ +from redis import Redis + +class AddIn(BaseModel): + a: int = Field(ge=0) + b: int = Field(ge=0) + +class AddOut(BaseModel): + total: int + +redis_client = Redis(host="localhost", port=6379, db=0) +mq = ModelQ(redis_client = redis_client) + + +@mq.task(schema=AddIn, returns=AddOut, timeout=5) +def add(payload: AddIn) -> AddOut: + return AddOut(total=payload.a + payload.b) + +job = add(a=3, b=4) # ✨ validated on the spot + + +import time + +if __name__ == "__main__": + mq.start_workers() + + # Keep the worker running indefinitely + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nGracefully shutting down...") \ No newline at end of file From 8d75903b2075382ff2160016ae7fcd780d65099d Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 30 May 2025 14:19:10 +0530 Subject: [PATCH 05/11] bug fix with normal task queue --- modelq/app/base.py | 7 ++++--- test.py | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/modelq/app/base.py b/modelq/app/base.py index 91c4964..918e567 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -587,9 +587,10 @@ def process_task(self, task: Task) -> None: call_kwargs = {} else: # Legacy: no schema - call_args = tuple(task.payload.get("args", ())) - call_kwargs = dict(task.payload.get("kwargs", {})) - + print(task.payload) + call_args = tuple(task.payload['data'].get("args", ())) + call_kwargs = dict(task.payload['data'].get("kwargs", {})) + print(f"call_args: {call_args}, call_kwargs: {call_kwargs}") timeout = task.payload.get("timeout", None) stream = task.payload.get("stream", False) diff --git a/test.py b/test.py index 264a89a..d6fd9ce 100644 --- a/test.py +++ b/test.py @@ -15,11 +15,27 @@ class AddOut(BaseModel): @mq.task(schema=AddIn, returns=AddOut, timeout=5) def add(payload: AddIn) -> AddOut: + print(f"Processing addition: {payload.a} + {payload.b}") + # time.sleep(10) # Simulate some processing time return AddOut(total=payload.a + payload.b) +@mq.task() +def sub(a: int, b: int): + print(f"Processing subtraction: {a} - {b}") + return a - b + +@mq.task() +def image_task(params: dict): + print(f"Processing image task with params: {params}") + # Simulate image processing + return "Image processed successfully" + job = add(a=3, b=4) # ✨ validated on the spot +job2 = sub(a=10, b=5) # ✨ no schema validation, just a simple task +task = image_task({"image": "example.png"}) # ✨ no schema validation, just a simple task +task2 = image_task(params={"image": "example.png"}) import time if __name__ == "__main__": @@ -28,6 +44,16 @@ def add(payload: AddIn) -> AddOut: # Keep the worker running indefinitely try: while True: + output = job.get_result(mq.redis_client) + + print(f"Result of addition: {output}") + print(type(output)) + + output2 = job2.get_result(mq.redis_client) + print(f"Result of subtraction: {output2}") + + output3 = task.get_result(mq.redis_client) + print(f"Result of image task: {output3}") time.sleep(1) except KeyboardInterrupt: print("\nGracefully shutting down...") \ No newline at end of file From 1725df6d114836a25e5b19a0ec59fa8f00b5d843 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 30 May 2025 14:45:20 +0530 Subject: [PATCH 06/11] added output task serializer and modified to accept pydantic basemodel --- modelq/app/base.py | 11 ++++- modelq/app/tasks/base.py | 86 ++++++++++++++++++++++++++-------------- test.py | 3 +- 3 files changed, 67 insertions(+), 33 deletions(-) diff --git a/modelq/app/base.py b/modelq/app/base.py index 918e567..9904912 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -636,8 +636,15 @@ def process_task(self, task: Task) -> None: logger.error(f"[ModelQ] Output validation failed: {ve}") raise TaskProcessingError(task.task_name, f"Output validation failed: {ve}") - result_str = task._convert_to_string(result) - task.result = result_str + # When you set `task.result` (in process_task), use this logic: + if isinstance(result, BaseModel): + # Pydantic object: store as dict, not string! + task.result = result.model_dump(mode="json") + elif isinstance(result, (dict, list, int, float, bool)): + task.result = result + else: + task.result = str(result) + task.status = "completed" self._store_final_task_state(task, success=True) diff --git a/modelq/app/tasks/base.py b/modelq/app/tasks/base.py index be1012a..431ce23 100644 --- a/modelq/app/tasks/base.py +++ b/modelq/app/tasks/base.py @@ -8,6 +8,7 @@ from PIL import Image, PngImagePlugin import io import copy +from typing import Type class Task: def __init__(self, task_name: str, payload: dict, timeout: int = 15): @@ -109,43 +110,68 @@ def get_result(self, redis_client: redis.Redis, timeout: int = None) -> Any: # If we exit the loop, we timed out raise TaskTimeoutError(self.task_id) - def get_stream(self, redis_client: redis.Redis) -> Generator[Any, None, None]: + def get_result( + self, + redis_client: redis.Redis, + timeout: int = None, + returns: Optional[Type[Any]] = None, + modelq_ref: Any = None, + ) -> Any: """ - Generator to yield results from a streaming task. - Continuously reads from a Redis stream and stops when - the task is completed or failed. + Waits for the result of the task until the timeout. + Raises TaskProcessingError if the task failed, + or TaskTimeoutError if it never completes within the timeout. + Optionally validates/deserializes the result using a Pydantic model. """ - stream_key = f"task_stream:{self.task_id}" - last_id = "0" - completed = False - - while not completed: - # block=1000 => block for up to 1s, count=10 => max 10 messages - results = redis_client.xread({stream_key: last_id}, block=1000, count=10) - if results: - for _, messages in results: - for message_id, message_data in messages: - # print(message_data) - result = json.loads(message_data[b"result"].decode("utf-8")) - yield result - last_id = message_id - # Append to combined_result - self.combined_result += result - - # Check if the task is finished or failed + if not timeout: + timeout = self.timeout + + start_time = time.time() + while time.time() - start_time < timeout: task_json = redis_client.get(f"task_result:{self.task_id}") if task_json: task_data = json.loads(task_json) - if task_data.get("status") == "completed": - completed = True - # Update local fields - self.status = "completed" - self.result = self.combined_result - elif task_data.get("status") == "failed": - error_message = task_data.get("result", "Task failed without an error message") + self.result = task_data.get("result") + self.status = task_data.get("status") + + if self.status == "failed": + error_message = self.result or "Task failed without an error message" raise TaskProcessingError( task_data.get("task_name", self.task_name), error_message ) + elif self.status == "completed": + raw_result = self.result + + # Auto-detect returns schema if not given + if returns is None and modelq_ref is not None: + task_function = getattr(modelq_ref, self.task_name, None) + returns = getattr(task_function, "_mq_returns", None) + + if returns is not None: + try: + if isinstance(raw_result, str): + try: + result_data = json.loads(raw_result) + except Exception: + result_data = raw_result + else: + result_data = raw_result + + if isinstance(result_data, dict): + return returns(**result_data) + elif isinstance(result_data, returns): + return result_data + else: + return returns.parse_obj(result_data) + except Exception as ve: + raise TaskProcessingError( + self.task_name, + f"Result validation failed: {ve}" + ) + else: + return raw_result + + time.sleep(1) - return + raise TaskTimeoutError(self.task_id) diff --git a/test.py b/test.py index d6fd9ce..ca64780 100644 --- a/test.py +++ b/test.py @@ -44,10 +44,11 @@ def image_task(params: dict): # Keep the worker running indefinitely try: while True: - output = job.get_result(mq.redis_client) + output = job.get_result(mq.redis_client,returns=AddOut) print(f"Result of addition: {output}") print(type(output)) + print(f"Result of addition (total): {output.total}") output2 = job2.get_result(mq.redis_client) print(f"Result of subtraction: {output2}") From 1a3078d338b7220a2a7f416de417b89d235433c6 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Fri, 30 May 2025 16:48:05 +0530 Subject: [PATCH 07/11] first working draft of modelq server --- modelq/app/api/server.py | 131 ++++++++++++++++++ modelq/app/base.py | 1 + modelq/app/cli/main.py | 13 ++ poetry.lock | 291 ++++++++++++++++++++++++++++++++++++++- pyproject.toml | 2 + test.py | 20 +-- 6 files changed, 447 insertions(+), 11 deletions(-) create mode 100644 modelq/app/api/server.py diff --git a/modelq/app/api/server.py b/modelq/app/api/server.py new file mode 100644 index 0000000..33bce02 --- /dev/null +++ b/modelq/app/api/server.py @@ -0,0 +1,131 @@ +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel +import uvicorn +import time +import json +import gc +from typing import Dict, Callable, Any + +# ------------------------------------------------------------ +# Helper – find every *wrapper* produced by @mq.task anywhere +# in the current Python process. +# A wrapper will have __wrapped__ (thanks to functools.wraps) +# and the *inner* function carries the _mq_schema attribute. +# ------------------------------------------------------------ + +def _discover_task_wrappers() -> Dict[str, Callable[..., Any]]: + wrappers: Dict[str, Callable[..., Any]] = {} + for obj in gc.get_objects(): + # Must be callable and look like a functools.wraps wrapper + if callable(obj) and hasattr(obj, "__wrapped__"): + inner = getattr(obj, "__wrapped__", None) + if inner and hasattr(inner, "_mq_schema"): + wrappers[inner.__name__] = obj # key = task name + return wrappers + +# ------------------------------------------------------------ +# Factory that builds a FastAPI app wired to ModelQ automatically +# ------------------------------------------------------------ + +def create_api_app(modelq_instance): + app = FastAPI(title="ModelQ Tasks API") + + # ---------- Health ---------- + @app.get("/healthz") + def healthz(): + return {"status": "ok"} + + @app.get("/status") + def status(): + servers = modelq_instance.get_registered_server_ids() + queued = modelq_instance.get_all_queued_tasks() + return { + "registered_servers": servers, + "queued_tasks_count": len(queued), + "allowed_tasks": list(modelq_instance.allowed_tasks), + } + + @app.get("/queue") + def queue(): + return {"queued_tasks": modelq_instance.get_all_queued_tasks()} + + # ---------- Task‑level helpers ---------- + @app.get("/task/{task_id}/status") + def get_task_status(task_id: str): + st = modelq_instance.get_task_status(task_id) + if st is None: + raise HTTPException(404, detail="Task not found") + return {"task_id": task_id, "status": st} + + @app.get("/task/{task_id}/result") + def get_task_result(task_id: str): + blob = modelq_instance.redis_client.get(f"task_result:{task_id}") + if not blob: + raise HTTPException(404, detail="Task not found or not completed yet") + return json.loads(blob) + + # -------------------------------------------------------- + # Dynamic endpoints: use wrapper if we discovered one. + # Fallback to ModelQ attribute (original) if wrapper missing. + # -------------------------------------------------------- + wrapper_map = _discover_task_wrappers() + + for task_name in modelq_instance.allowed_tasks: + task_func = wrapper_map.get(task_name) or getattr(modelq_instance, task_name) + schema = getattr(task_func, "_mq_schema", None) or getattr(getattr(task_func, "__wrapped__", None), "_mq_schema", None) + returns = getattr(task_func, "_mq_returns", None) or getattr(getattr(task_func, "__wrapped__", None), "_mq_returns", None) + endpoint_path = f"/task/{task_name}" + + # Closure-bound defaults to avoid late‑binding bugs + def make_endpoint(_func=task_func, _schema=schema, _returns=returns, _tname=task_name): + async def endpoint(payload: _schema, request: Request): # type: ignore[valid-type] + job = None + try: + # Call wrapper → returns Task; call original → might return AddOut + job = _func(payload) + # Try quick result (3 s) + result = job.get_result( + modelq_instance.redis_client, + timeout=3, + returns=_returns, + modelq_ref=modelq_instance, + ) + if isinstance(result, BaseModel): + return JSONResponse(content=result.model_dump()) + elif isinstance(result, dict): + return JSONResponse(content=result) + else: + return JSONResponse(content={"result": result}) + except Exception as e: + if "timeout" in str(e).lower() or isinstance(e, TimeoutError): + return JSONResponse( + status_code=202, + content={ + "message": "Request is queued. Check status/result later.", + "task_id": getattr(job, "task_id", "unknown"), + "status": "queued", + }, + ) + raise HTTPException(400, detail=str(e)) + + return endpoint + + # Pydantic schema may be None → accept empty body + if schema is not None: + app.post(endpoint_path, response_model=returns or dict)(make_endpoint()) + else: + class AnyInput(BaseModel): + pass + app.post(endpoint_path, response_model=returns or dict)(make_endpoint(_schema=AnyInput)) + + return app + +# ------------------------------------------------------------ +# Entry helper used by Typer CLI +# ------------------------------------------------------------ + +def run_api(modelq_instance, host="0.0.0.0", port=8000): + """Spin up the auto‑wired FastAPI server.""" + app = create_api_app(modelq_instance) + uvicorn.run(app, host=host, port=port) \ No newline at end of file diff --git a/modelq/app/base.py b/modelq/app/base.py index 9904912..b05e02f 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -642,6 +642,7 @@ def process_task(self, task: Task) -> None: task.result = result.model_dump(mode="json") elif isinstance(result, (dict, list, int, float, bool)): task.result = result + # only images as base64 string else: task.result = str(result) diff --git a/modelq/app/cli/main.py b/modelq/app/cli/main.py index 1d29b29..a18989c 100644 --- a/modelq/app/cli/main.py +++ b/modelq/app/cli/main.py @@ -7,6 +7,7 @@ import signal import threading from typing import Optional +from modelq.app.api.server import run_api app = typer.Typer(help="ModelQ CLI for managing and queuing tasks.") @@ -159,5 +160,17 @@ def list_queued(app_path: str): typer.echo(f"❌ Failed to list queued tasks: {e}") raise typer.Exit(1) +@app.command("serve-api") +def serve_api_cmd( + app_path: str, + host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind the API server"), + port: int = typer.Option(8000, "--port", "-p", help="Port to serve the API"), + log_level: str = typer.Option("info", "--log-level", "-l", help="Uvicorn/FastAPI log level") +): + app_instance = load_app_instance(app_path) + typer.echo(f"🌐 Starting ModelQ API server on http://{host}:{port} ...") + typer.echo(f" Registered Tasks: {', '.join(app_instance.allowed_tasks) if app_instance.allowed_tasks else 'None'}") + run_api(app_instance, host=host, port=port) + if __name__ == "__main__": app() diff --git a/poetry.lock b/poetry.lock index e88a909..682898d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,38 @@ # This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +[[package]] +name = "annotated-types" +version = "0.7.0" +description = "Reusable constraint types to use with typing.Annotated" +optional = false +python-versions = ">=3.8" +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + +[[package]] +name = "anyio" +version = "4.9.0" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.9" +files = [ + {file = "anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c"}, + {file = "anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} +idna = ">=2.8" +sniffio = ">=1.1" +typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} + +[package.extras] +doc = ["Sphinx (>=8.2,<9.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"] +test = ["anyio[trio]", "blockbuster (>=1.5.23)", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"] +trio = ["trio (>=0.26.1)"] + [[package]] name = "async-timeout" version = "5.0.1" @@ -36,6 +69,68 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "exceptiongroup" +version = "1.3.0" +description = "Backport of PEP 654 (exception groups)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"}, + {file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.6.0", markers = "python_version < \"3.13\""} + +[package.extras] +test = ["pytest (>=6)"] + +[[package]] +name = "fastapi" +version = "0.115.12" +description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +optional = false +python-versions = ">=3.8" +files = [ + {file = "fastapi-0.115.12-py3-none-any.whl", hash = "sha256:e94613d6c05e27be7ffebdd6ea5f388112e5e430c8f7d6494a9d1d88d43e814d"}, + {file = "fastapi-0.115.12.tar.gz", hash = "sha256:1e2c2a2646905f9e83d32f04a3f86aff4a286669c6c950ca95b5fd68c2602681"}, +] + +[package.dependencies] +pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0" +starlette = ">=0.40.0,<0.47.0" +typing-extensions = ">=4.8.0" + +[package.extras] +all = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=3.1.5)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.18)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] +standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=3.1.5)", "python-multipart (>=0.0.18)", "uvicorn[standard] (>=0.12.0)"] + +[[package]] +name = "h11" +version = "0.16.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.8" +files = [ + {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, + {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, +] + +[[package]] +name = "idna" +version = "3.10" +description = "Internationalized Domain Names in Applications (IDNA)" +optional = false +python-versions = ">=3.6" +files = [ + {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, + {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, +] + +[package.extras] +all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -71,6 +166,138 @@ files = [ {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, ] +[[package]] +name = "pydantic" +version = "2.11.5" +description = "Data validation using Python type hints" +optional = false +python-versions = ">=3.9" +files = [ + {file = "pydantic-2.11.5-py3-none-any.whl", hash = "sha256:f9c26ba06f9747749ca1e5c94d6a85cb84254577553c8785576fd38fa64dc0f7"}, + {file = "pydantic-2.11.5.tar.gz", hash = "sha256:7f853db3d0ce78ce8bbb148c401c2cdd6431b3473c0cdff2755c7690952a7b7a"}, +] + +[package.dependencies] +annotated-types = ">=0.6.0" +pydantic-core = "2.33.2" +typing-extensions = ">=4.12.2" +typing-inspection = ">=0.4.0" + +[package.extras] +email = ["email-validator (>=2.0.0)"] +timezone = ["tzdata"] + +[[package]] +name = "pydantic-core" +version = "2.33.2" +description = "Core functionality for Pydantic validation and serialization" +optional = false +python-versions = ">=3.9" +files = [ + {file = "pydantic_core-2.33.2-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:2b3d326aaef0c0399d9afffeb6367d5e26ddc24d351dbc9c636840ac355dc5d8"}, + {file = "pydantic_core-2.33.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0e5b2671f05ba48b94cb90ce55d8bdcaaedb8ba00cc5359f6810fc918713983d"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0069c9acc3f3981b9ff4cdfaf088e98d83440a4c7ea1bc07460af3d4dc22e72d"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d53b22f2032c42eaaf025f7c40c2e3b94568ae077a606f006d206a463bc69572"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:0405262705a123b7ce9f0b92f123334d67b70fd1f20a9372b907ce1080c7ba02"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4b25d91e288e2c4e0662b8038a28c6a07eaac3e196cfc4ff69de4ea3db992a1b"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bdfe4b3789761f3bcb4b1ddf33355a71079858958e3a552f16d5af19768fef2"}, + {file = "pydantic_core-2.33.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:efec8db3266b76ef9607c2c4c419bdb06bf335ae433b80816089ea7585816f6a"}, + {file = "pydantic_core-2.33.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:031c57d67ca86902726e0fae2214ce6770bbe2f710dc33063187a68744a5ecac"}, + {file = "pydantic_core-2.33.2-cp310-cp310-musllinux_1_1_armv7l.whl", hash = "sha256:f8de619080e944347f5f20de29a975c2d815d9ddd8be9b9b7268e2e3ef68605a"}, + {file = "pydantic_core-2.33.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:73662edf539e72a9440129f231ed3757faab89630d291b784ca99237fb94db2b"}, + {file = "pydantic_core-2.33.2-cp310-cp310-win32.whl", hash = "sha256:0a39979dcbb70998b0e505fb1556a1d550a0781463ce84ebf915ba293ccb7e22"}, + {file = "pydantic_core-2.33.2-cp310-cp310-win_amd64.whl", hash = "sha256:b0379a2b24882fef529ec3b4987cb5d003b9cda32256024e6fe1586ac45fc640"}, + {file = "pydantic_core-2.33.2-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:4c5b0a576fb381edd6d27f0a85915c6daf2f8138dc5c267a57c08a62900758c7"}, + {file = "pydantic_core-2.33.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e799c050df38a639db758c617ec771fd8fb7a5f8eaaa4b27b101f266b216a246"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dc46a01bf8d62f227d5ecee74178ffc448ff4e5197c756331f71efcc66dc980f"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a144d4f717285c6d9234a66778059f33a89096dfb9b39117663fd8413d582dcc"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:73cf6373c21bc80b2e0dc88444f41ae60b2f070ed02095754eb5a01df12256de"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3dc625f4aa79713512d1976fe9f0bc99f706a9dee21dfd1810b4bbbf228d0e8a"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:881b21b5549499972441da4758d662aeea93f1923f953e9cbaff14b8b9565aef"}, + {file = "pydantic_core-2.33.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:bdc25f3681f7b78572699569514036afe3c243bc3059d3942624e936ec93450e"}, + {file = "pydantic_core-2.33.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:fe5b32187cbc0c862ee201ad66c30cf218e5ed468ec8dc1cf49dec66e160cc4d"}, + {file = "pydantic_core-2.33.2-cp311-cp311-musllinux_1_1_armv7l.whl", hash = "sha256:bc7aee6f634a6f4a95676fcb5d6559a2c2a390330098dba5e5a5f28a2e4ada30"}, + {file = "pydantic_core-2.33.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:235f45e5dbcccf6bd99f9f472858849f73d11120d76ea8707115415f8e5ebebf"}, + {file = "pydantic_core-2.33.2-cp311-cp311-win32.whl", hash = "sha256:6368900c2d3ef09b69cb0b913f9f8263b03786e5b2a387706c5afb66800efd51"}, + {file = "pydantic_core-2.33.2-cp311-cp311-win_amd64.whl", hash = "sha256:1e063337ef9e9820c77acc768546325ebe04ee38b08703244c1309cccc4f1bab"}, + {file = "pydantic_core-2.33.2-cp311-cp311-win_arm64.whl", hash = "sha256:6b99022f1d19bc32a4c2a0d544fc9a76e3be90f0b3f4af413f87d38749300e65"}, + {file = "pydantic_core-2.33.2-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:a7ec89dc587667f22b6a0b6579c249fca9026ce7c333fc142ba42411fa243cdc"}, + {file = "pydantic_core-2.33.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:3c6db6e52c6d70aa0d00d45cdb9b40f0433b96380071ea80b09277dba021ddf7"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4e61206137cbc65e6d5256e1166f88331d3b6238e082d9f74613b9b765fb9025"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:eb8c529b2819c37140eb51b914153063d27ed88e3bdc31b71198a198e921e011"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c52b02ad8b4e2cf14ca7b3d918f3eb0ee91e63b3167c32591e57c4317e134f8f"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:96081f1605125ba0855dfda83f6f3df5ec90c61195421ba72223de35ccfb2f88"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f57a69461af2a5fa6e6bbd7a5f60d3b7e6cebb687f55106933188e79ad155c1"}, + {file = "pydantic_core-2.33.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:572c7e6c8bb4774d2ac88929e3d1f12bc45714ae5ee6d9a788a9fb35e60bb04b"}, + {file = "pydantic_core-2.33.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:db4b41f9bd95fbe5acd76d89920336ba96f03e149097365afe1cb092fceb89a1"}, + {file = "pydantic_core-2.33.2-cp312-cp312-musllinux_1_1_armv7l.whl", hash = "sha256:fa854f5cf7e33842a892e5c73f45327760bc7bc516339fda888c75ae60edaeb6"}, + {file = "pydantic_core-2.33.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:5f483cfb75ff703095c59e365360cb73e00185e01aaea067cd19acffd2ab20ea"}, + {file = "pydantic_core-2.33.2-cp312-cp312-win32.whl", hash = "sha256:9cb1da0f5a471435a7bc7e439b8a728e8b61e59784b2af70d7c169f8dd8ae290"}, + {file = "pydantic_core-2.33.2-cp312-cp312-win_amd64.whl", hash = "sha256:f941635f2a3d96b2973e867144fde513665c87f13fe0e193c158ac51bfaaa7b2"}, + {file = "pydantic_core-2.33.2-cp312-cp312-win_arm64.whl", hash = "sha256:cca3868ddfaccfbc4bfb1d608e2ccaaebe0ae628e1416aeb9c4d88c001bb45ab"}, + {file = "pydantic_core-2.33.2-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:1082dd3e2d7109ad8b7da48e1d4710c8d06c253cbc4a27c1cff4fbcaa97a9e3f"}, + {file = "pydantic_core-2.33.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f517ca031dfc037a9c07e748cefd8d96235088b83b4f4ba8939105d20fa1dcd6"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a9f2c9dd19656823cb8250b0724ee9c60a82f3cdf68a080979d13092a3b0fef"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:2b0a451c263b01acebe51895bfb0e1cc842a5c666efe06cdf13846c7418caa9a"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1ea40a64d23faa25e62a70ad163571c0b342b8bf66d5fa612ac0dec4f069d916"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0fb2d542b4d66f9470e8065c5469ec676978d625a8b7a363f07d9a501a9cb36a"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9fdac5d6ffa1b5a83bca06ffe7583f5576555e6c8b3a91fbd25ea7780f825f7d"}, + {file = "pydantic_core-2.33.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:04a1a413977ab517154eebb2d326da71638271477d6ad87a769102f7c2488c56"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c8e7af2f4e0194c22b5b37205bfb293d166a7344a5b0d0eaccebc376546d77d5"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:5c92edd15cd58b3c2d34873597a1e20f13094f59cf88068adb18947df5455b4e"}, + {file = "pydantic_core-2.33.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:65132b7b4a1c0beded5e057324b7e16e10910c106d43675d9bd87d4f38dde162"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win32.whl", hash = "sha256:52fb90784e0a242bb96ec53f42196a17278855b0f31ac7c3cc6f5c1ec4811849"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win_amd64.whl", hash = "sha256:c083a3bdd5a93dfe480f1125926afcdbf2917ae714bdb80b36d34318b2bec5d9"}, + {file = "pydantic_core-2.33.2-cp313-cp313-win_arm64.whl", hash = "sha256:e80b087132752f6b3d714f041ccf74403799d3b23a72722ea2e6ba2e892555b9"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:61c18fba8e5e9db3ab908620af374db0ac1baa69f0f32df4f61ae23f15e586ac"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:95237e53bb015f67b63c91af7518a62a8660376a6a0db19b89acc77a4d6199f5"}, + {file = "pydantic_core-2.33.2-cp313-cp313t-win_amd64.whl", hash = "sha256:c2fc0a768ef76c15ab9238afa6da7f69895bb5d1ee83aeea2e3509af4472d0b9"}, + {file = "pydantic_core-2.33.2-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:a2b911a5b90e0374d03813674bf0a5fbbb7741570dcd4b4e85a2e48d17def29d"}, + {file = "pydantic_core-2.33.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6fa6dfc3e4d1f734a34710f391ae822e0a8eb8559a85c6979e14e65ee6ba2954"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c54c939ee22dc8e2d545da79fc5381f1c020d6d3141d3bd747eab59164dc89fb"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:53a57d2ed685940a504248187d5685e49eb5eef0f696853647bf37c418c538f7"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:09fb9dd6571aacd023fe6aaca316bd01cf60ab27240d7eb39ebd66a3a15293b4"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0e6116757f7959a712db11f3e9c0a99ade00a5bbedae83cb801985aa154f071b"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8d55ab81c57b8ff8548c3e4947f119551253f4e3787a7bbc0b6b3ca47498a9d3"}, + {file = "pydantic_core-2.33.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c20c462aa4434b33a2661701b861604913f912254e441ab8d78d30485736115a"}, + {file = "pydantic_core-2.33.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:44857c3227d3fb5e753d5fe4a3420d6376fa594b07b621e220cd93703fe21782"}, + {file = "pydantic_core-2.33.2-cp39-cp39-musllinux_1_1_armv7l.whl", hash = "sha256:eb9b459ca4df0e5c87deb59d37377461a538852765293f9e6ee834f0435a93b9"}, + {file = "pydantic_core-2.33.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:9fcd347d2cc5c23b06de6d3b7b8275be558a0c90549495c699e379a80bf8379e"}, + {file = "pydantic_core-2.33.2-cp39-cp39-win32.whl", hash = "sha256:83aa99b1285bc8f038941ddf598501a86f1536789740991d7d8756e34f1e74d9"}, + {file = "pydantic_core-2.33.2-cp39-cp39-win_amd64.whl", hash = "sha256:f481959862f57f29601ccced557cc2e817bce7533ab8e01a797a48b49c9692b3"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5c4aa4e82353f65e548c476b37e64189783aa5384903bfea4f41580f255fddfa"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:d946c8bf0d5c24bf4fe333af284c59a19358aa3ec18cb3dc4370080da1e8ad29"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:87b31b6846e361ef83fedb187bb5b4372d0da3f7e28d85415efa92d6125d6e6d"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa9d91b338f2df0508606f7009fde642391425189bba6d8c653afd80fd6bb64e"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2058a32994f1fde4ca0480ab9d1e75a0e8c87c22b53a3ae66554f9af78f2fe8c"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:0e03262ab796d986f978f79c943fc5f620381be7287148b8010b4097f79a39ec"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:1a8695a8d00c73e50bff9dfda4d540b7dee29ff9b8053e38380426a85ef10052"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:fa754d1850735a0b0e03bcffd9d4b4343eb417e47196e4485d9cca326073a42c"}, + {file = "pydantic_core-2.33.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:a11c8d26a50bfab49002947d3d237abe4d9e4b5bdc8846a63537b6488e197808"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-macosx_10_12_x86_64.whl", hash = "sha256:dd14041875d09cc0f9308e37a6f8b65f5585cf2598a53aa0123df8b129d481f8"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:d87c561733f66531dced0da6e864f44ebf89a8fba55f31407b00c2f7f9449593"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2f82865531efd18d6e07a04a17331af02cb7a651583c418df8266f17a63c6612"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bfb5112df54209d820d7bf9317c7a6c9025ea52e49f46b6a2060104bba37de7"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:64632ff9d614e5eecfb495796ad51b0ed98c453e447a76bcbeeb69615079fc7e"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:f889f7a40498cc077332c7ab6b4608d296d852182211787d4f3ee377aaae66e8"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:de4b83bb311557e439b9e186f733f6c645b9417c84e2eb8203f3f820a4b988bf"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:82f68293f055f51b51ea42fafc74b6aad03e70e191799430b90c13d643059ebb"}, + {file = "pydantic_core-2.33.2-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:329467cecfb529c925cf2bbd4d60d2c509bc2fb52a20c1045bf09bb70971a9c1"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:87acbfcf8e90ca885206e98359d7dca4bcbb35abdc0ff66672a293e1d7a19101"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:7f92c15cd1e97d4b12acd1cc9004fa092578acfa57b67ad5e43a197175d01a64"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d3f26877a748dc4251cfcfda9dfb5f13fcb034f5308388066bcfe9031b63ae7d"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dac89aea9af8cd672fa7b510e7b8c33b0bba9a43186680550ccf23020f32d535"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:970919794d126ba8645f3837ab6046fb4e72bbc057b3709144066204c19a455d"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:3eb3fe62804e8f859c49ed20a8451342de53ed764150cb14ca71357c765dc2a6"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:3abcd9392a36025e3bd55f9bd38d908bd17962cc49bc6da8e7e96285336e2bca"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:3a1c81334778f9e3af2f8aeb7a960736e5cab1dfebfb26aabca09afd2906c039"}, + {file = "pydantic_core-2.33.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:2807668ba86cb38c6817ad9bc66215ab8584d1d304030ce4f0887336f28a5e27"}, + {file = "pydantic_core-2.33.2.tar.gz", hash = "sha256:7cb8bc3605c29176e1b105350d2e6474142d7c1bd1d9327c4a9bdb46bf827acc"}, +] + +[package.dependencies] +typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" + [[package]] name = "pygments" version = "2.19.1" @@ -133,6 +360,35 @@ files = [ {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + +[[package]] +name = "starlette" +version = "0.46.2" +description = "The little ASGI library that shines." +optional = false +python-versions = ">=3.9" +files = [ + {file = "starlette-0.46.2-py3-none-any.whl", hash = "sha256:595633ce89f8ffa71a015caed34a5b2dc1c0cdb3f0f1fbd1e69339cf2abeec35"}, + {file = "starlette-0.46.2.tar.gz", hash = "sha256:7f7361f34eed179294600af672f565727419830b54b7b084efe44bb82d2fccd5"}, +] + +[package.dependencies] +anyio = ">=3.6.2,<5" +typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\""} + +[package.extras] +full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.18)", "pyyaml"] + [[package]] name = "typer" version = "0.16.0" @@ -161,7 +417,40 @@ files = [ {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, ] +[[package]] +name = "typing-inspection" +version = "0.4.1" +description = "Runtime typing introspection tools" +optional = false +python-versions = ">=3.9" +files = [ + {file = "typing_inspection-0.4.1-py3-none-any.whl", hash = "sha256:389055682238f53b04f7badcb49b989835495a96700ced5dab2d8feae4b26f51"}, + {file = "typing_inspection-0.4.1.tar.gz", hash = "sha256:6ae134cc0203c33377d43188d4064e9b357dba58cff3185f22924610e70a9d28"}, +] + +[package.dependencies] +typing-extensions = ">=4.12.0" + +[[package]] +name = "uvicorn" +version = "0.34.2" +description = "The lightning-fast ASGI server." +optional = false +python-versions = ">=3.9" +files = [ + {file = "uvicorn-0.34.2-py3-none-any.whl", hash = "sha256:deb49af569084536d269fe0a6d67e3754f104cf03aba7c11c40f01aadf33c403"}, + {file = "uvicorn-0.34.2.tar.gz", hash = "sha256:0e929828f6186353a80b58ea719861d2629d766293b6d19baf086ba31d4f3328"}, +] + +[package.dependencies] +click = ">=7.0" +h11 = ">=0.8" +typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""} + +[package.extras] +standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] + [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "881b3e95c9ae6bb03b6d054b62eed492359c8c426b476d31dae59e7aa8687387" +content-hash = "9d2f829136b30463e96785b5b5bd048ed74268edb09a9befd369c160c03d2973" diff --git a/pyproject.toml b/pyproject.toml index 41f91a4..6e10608 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,8 @@ python = "^3.9" click = "^8.0.0" redis = "^4.0.0" typer = "^0.16.0" +fastapi = "^0.115.12" +uvicorn = "^0.34.2" [build-system] requires = ["poetry-core"] diff --git a/test.py b/test.py index ca64780..47fbb1c 100644 --- a/test.py +++ b/test.py @@ -13,10 +13,10 @@ class AddOut(BaseModel): mq = ModelQ(redis_client = redis_client) -@mq.task(schema=AddIn, returns=AddOut, timeout=5) +@mq.task(schema=AddIn, returns=AddOut) def add(payload: AddIn) -> AddOut: print(f"Processing addition: {payload.a} + {payload.b}") - # time.sleep(10) # Simulate some processing time + time.sleep(10) # Simulate some processing time return AddOut(total=payload.a + payload.b) @mq.task() @@ -44,17 +44,17 @@ def image_task(params: dict): # Keep the worker running indefinitely try: while True: - output = job.get_result(mq.redis_client,returns=AddOut) + # output = job.get_result(mq.redis_client,returns=AddOut) - print(f"Result of addition: {output}") - print(type(output)) - print(f"Result of addition (total): {output.total}") + # print(f"Result of addition: {output}") + # print(type(output)) + # print(f"Result of addition (total): {output.total}") - output2 = job2.get_result(mq.redis_client) - print(f"Result of subtraction: {output2}") + # output2 = job2.get_result(mq.redis_client) + # print(f"Result of subtraction: {output2}") - output3 = task.get_result(mq.redis_client) - print(f"Result of image task: {output3}") + # output3 = task.get_result(mq.redis_client) + # print(f"Result of image task: {output3}") time.sleep(1) except KeyboardInterrupt: print("\nGracefully shutting down...") \ No newline at end of file From 03f831c74b2665350d479ad0423d2964c34c7d45 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Sat, 31 May 2025 14:05:14 +0530 Subject: [PATCH 08/11] some last modifications --- .github/workflows/ci.yml | 2 +- examples/text-streaming-server/tasks.py | 10 +- modelq/app/api/server.py | 126 +++++++++--------- modelq/app/base.py | 6 +- modelq/app/middleware/base.py | 21 --- modelq/app/tasks/base.py | 52 ++++---- poetry.lock | 2 +- pyproject.toml | 1 + test.py | 8 +- tests/test_base.py | 168 ++++++++++++++++-------- 10 files changed, 221 insertions(+), 175 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21bfda0..aea34d3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install pytest fakeredis click requests redis Pillow + pip install pytest fakeredis click requests redis Pillow pydantic fastapi uvicorn typer - name: Run tests run: | diff --git a/examples/text-streaming-server/tasks.py b/examples/text-streaming-server/tasks.py index 65b46ff..84e9c6b 100644 --- a/examples/text-streaming-server/tasks.py +++ b/examples/text-streaming-server/tasks.py @@ -10,9 +10,15 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from threading import Thread +from huggingface_hub import login + + +login("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") + redis_client = Redis(host="localhost", port=6379, db=0) modelq_app = ModelQ(redis_client = redis_client) + class Model: def __init__(self): self.model = None @@ -44,7 +50,3 @@ def stream(params): for new_text in streamer: yield new_text - -@modelq_app.cron_task(interval_seconds=10) -def cron_task(): - print("Cron task running...") \ No newline at end of file diff --git a/modelq/app/api/server.py b/modelq/app/api/server.py index 33bce02..919f239 100644 --- a/modelq/app/api/server.py +++ b/modelq/app/api/server.py @@ -1,48 +1,45 @@ from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel -import uvicorn -import time -import json -import gc +import uvicorn, json, gc, time from typing import Dict, Callable, Any +from modelq.exceptions import TaskTimeoutError -# ------------------------------------------------------------ -# Helper – find every *wrapper* produced by @mq.task anywhere -# in the current Python process. -# A wrapper will have __wrapped__ (thanks to functools.wraps) -# and the *inner* function carries the _mq_schema attribute. -# ------------------------------------------------------------ - +# ------------------------------------------------------------------ +# Discover every wrapper produced by @mq.task in the current process +# ------------------------------------------------------------------ def _discover_task_wrappers() -> Dict[str, Callable[..., Any]]: wrappers: Dict[str, Callable[..., Any]] = {} for obj in gc.get_objects(): - # Must be callable and look like a functools.wraps wrapper if callable(obj) and hasattr(obj, "__wrapped__"): inner = getattr(obj, "__wrapped__", None) if inner and hasattr(inner, "_mq_schema"): - wrappers[inner.__name__] = obj # key = task name + wrappers[inner.__name__] = obj return wrappers -# ------------------------------------------------------------ -# Factory that builds a FastAPI app wired to ModelQ automatically -# ------------------------------------------------------------ +# ------------------------------------------------------------------ +# Fallback schema for tasks that declared no Pydantic schema +# The body must look like: { "params": { ... } } +# ------------------------------------------------------------------ +class ParamWrapper(BaseModel): + params: Dict[str, Any] +# ------------------------------------------------------------------ +# Build a FastAPI app that auto-wires ModelQ tasks +# ------------------------------------------------------------------ def create_api_app(modelq_instance): + app = FastAPI(title="ModelQ Tasks API") - # ---------- Health ---------- + # ---------- health ---------- @app.get("/healthz") - def healthz(): - return {"status": "ok"} + def healthz(): return {"status": "ok"} @app.get("/status") def status(): - servers = modelq_instance.get_registered_server_ids() - queued = modelq_instance.get_all_queued_tasks() return { - "registered_servers": servers, - "queued_tasks_count": len(queued), + "registered_servers": modelq_instance.get_registered_server_ids(), + "queued_tasks_count": len(modelq_instance.get_all_queued_tasks()), "allowed_tasks": list(modelq_instance.allowed_tasks), } @@ -50,7 +47,7 @@ def status(): def queue(): return {"queued_tasks": modelq_instance.get_all_queued_tasks()} - # ---------- Task‑level helpers ---------- + # ---------- task helpers ---------- @app.get("/task/{task_id}/status") def get_task_status(task_id: str): st = modelq_instance.get_task_status(task_id) @@ -65,26 +62,41 @@ def get_task_result(task_id: str): raise HTTPException(404, detail="Task not found or not completed yet") return json.loads(blob) - # -------------------------------------------------------- - # Dynamic endpoints: use wrapper if we discovered one. - # Fallback to ModelQ attribute (original) if wrapper missing. - # -------------------------------------------------------- + # ---------- dynamic endpoints ---------- wrapper_map = _discover_task_wrappers() for task_name in modelq_instance.allowed_tasks: + task_func = wrapper_map.get(task_name) or getattr(modelq_instance, task_name) - schema = getattr(task_func, "_mq_schema", None) or getattr(getattr(task_func, "__wrapped__", None), "_mq_schema", None) - returns = getattr(task_func, "_mq_returns", None) or getattr(getattr(task_func, "__wrapped__", None), "_mq_returns", None) + schema = (getattr(task_func, "_mq_schema", None) or + getattr(getattr(task_func, "__wrapped__", None), "_mq_schema", None)) + returns = (getattr(task_func, "_mq_returns", None) or + getattr(getattr(task_func, "__wrapped__", None), "_mq_returns", None)) + + # if no schema declared → use ParamWrapper contract + if schema is None: + schema = ParamWrapper + endpoint_path = f"/task/{task_name}" - # Closure-bound defaults to avoid late‑binding bugs - def make_endpoint(_func=task_func, _schema=schema, _returns=returns, _tname=task_name): + # ---- factory with captured defaults (avoid late-binding) ---- + def make_endpoint( + _func = task_func, + _schema = schema, + _returns= returns, + _tname = task_name + ): async def endpoint(payload: _schema, request: Request): # type: ignore[valid-type] - job = None - try: - # Call wrapper → returns Task; call original → might return AddOut + # ----- normalise call signature ----- + if isinstance(payload, ParamWrapper): + job = _func(**payload.params) + elif isinstance(payload, dict): + job = _func(**payload) + else: job = _func(payload) - # Try quick result (3 s) + + # ----- quick result (3 s) ----- + try: result = job.get_result( modelq_instance.redis_client, timeout=3, @@ -97,35 +109,27 @@ async def endpoint(payload: _schema, request: Request): # type: ignore[valid-ty return JSONResponse(content=result) else: return JSONResponse(content={"result": result}) + + except TaskTimeoutError: + return JSONResponse( + status_code=202, + content={ + "message": "Request is queued. Check status/result later.", + "task_id": job.task_id, + "status": "queued", + }, + ) except Exception as e: - if "timeout" in str(e).lower() or isinstance(e, TimeoutError): - return JSONResponse( - status_code=202, - content={ - "message": "Request is queued. Check status/result later.", - "task_id": getattr(job, "task_id", "unknown"), - "status": "queued", - }, - ) - raise HTTPException(400, detail=str(e)) + raise HTTPException(400, detail=f"Error processing task {_tname}: {e}") return endpoint - # Pydantic schema may be None → accept empty body - if schema is not None: - app.post(endpoint_path, response_model=returns or dict)(make_endpoint()) - else: - class AnyInput(BaseModel): - pass - app.post(endpoint_path, response_model=returns or dict)(make_endpoint(_schema=AnyInput)) + app.post(endpoint_path, response_model=returns or dict)(make_endpoint()) return app -# ------------------------------------------------------------ -# Entry helper used by Typer CLI -# ------------------------------------------------------------ - -def run_api(modelq_instance, host="0.0.0.0", port=8000): - """Spin up the auto‑wired FastAPI server.""" - app = create_api_app(modelq_instance) - uvicorn.run(app, host=host, port=port) \ No newline at end of file +# ------------------------------------------------------------------ +# Typer CLI helper +# ------------------------------------------------------------------ +def run_api(modelq_instance, host: str="0.0.0.0", port: int=8000): + uvicorn.run(create_api_app(modelq_instance), host=host, port=port) diff --git a/modelq/app/base.py b/modelq/app/base.py index b05e02f..88a5050 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -535,12 +535,12 @@ def _pruning_loop(self): self.prune_old_task_results(older_than_seconds=self.TASK_RESULT_RETENTION) time.sleep(self.PRUNE_CHECK_INTERVAL) - def check_middleware(self, middleware_event: str): + def check_middleware(self, middleware_event: str,task: Optional[Task] = None, error: Optional[Exception] = None): """ Hooks into the Middleware lifecycle if a Middleware instance is attached. """ if self.middleware: - self.middleware.execute(event=middleware_event) + self.middleware.execute(event=middleware_event,task=task, error=error) def process_task(self, task: Task) -> None: """ @@ -602,7 +602,7 @@ def process_task(self, task: Task) -> None: if stream: # Stream results for result in task_function(*call_args, **call_kwargs): - + import json task.status = "in_progress" self.redis_client.xadd( f"task_stream:{task.task_id}", diff --git a/modelq/app/middleware/base.py b/modelq/app/middleware/base.py index 3e3fa8b..42bcd74 100644 --- a/modelq/app/middleware/base.py +++ b/modelq/app/middleware/base.py @@ -1,24 +1,3 @@ -# modelq/app/middleware.py - -from abc import ABC, abstractmethod - - -class Middleware(ABC): - def __init__(self) -> None: - pass - - def execute(self, event, *args, **kwargs): - if event == "before_worker_boot": - self.before_worker_boot() - elif event == "on_timeout": - self.on_timeout(*args, **kwargs) - # You can add more events here as needed - - @abstractmethod - def before_worker_boot(self): - """Called before the worker process starts up.""" - pass - class Middleware: def __init__(self) -> None: pass diff --git a/modelq/app/tasks/base.py b/modelq/app/tasks/base.py index 431ce23..b4cc9eb 100644 --- a/modelq/app/tasks/base.py +++ b/modelq/app/tasks/base.py @@ -77,38 +77,46 @@ def _convert_to_string(self, data: Any) -> str: except TypeError: return str(data) - def get_result(self, redis_client: redis.Redis, timeout: int = None) -> Any: + def get_stream(self, redis_client: redis.Redis) -> Generator[Any, None, None]: """ - Waits for the result of the task until the timeout. - Raises TaskProcessingError if the task failed, - or TaskTimeoutError if it never completes within the timeout. + Generator to yield results from a streaming task. + Continuously reads from a Redis stream and stops when + the task is completed or failed. """ - if not timeout: - timeout = self.timeout - - start_time = time.time() - while time.time() - start_time < timeout: + stream_key = f"task_stream:{self.task_id}" + last_id = "0" + completed = False + + while not completed: + # block=1000 => block for up to 1s, count=10 => max 10 messages + results = redis_client.xread({stream_key: last_id}, block=1000, count=10) + if results: + for _, messages in results: + for message_id, message_data in messages: + # print(message_data) + result = json.loads(message_data[b"result"].decode("utf-8")) + yield result + last_id = message_id + # Append to combined_result + self.combined_result += result + + # Check if the task is finished or failed task_json = redis_client.get(f"task_result:{self.task_id}") if task_json: task_data = json.loads(task_json) - self.result = task_data.get("result") - self.status = task_data.get("status") - - if self.status == "failed": - # Raise the original error message as a TaskProcessingError - error_message = self.result or "Task failed without an error message" + if task_data.get("status") == "completed": + completed = True + # Update local fields + self.status = "completed" + self.result = self.combined_result + elif task_data.get("status") == "failed": + error_message = task_data.get("result", "Task failed without an error message") raise TaskProcessingError( task_data.get("task_name", self.task_name), error_message ) - elif self.status == "completed": - return self.result - # If status is something else like 'processing', keep polling - - time.sleep(1) - # If we exit the loop, we timed out - raise TaskTimeoutError(self.task_id) + return def get_result( self, diff --git a/poetry.lock b/poetry.lock index 682898d..4d747c0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -453,4 +453,4 @@ standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "9d2f829136b30463e96785b5b5bd048ed74268edb09a9befd369c160c03d2973" +content-hash = "c2767c553e7716243feae57e79a2b49942f0a73255b9d1db15c0c5066f11e959" diff --git a/pyproject.toml b/pyproject.toml index 6e10608..6a2da06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ redis = "^4.0.0" typer = "^0.16.0" fastapi = "^0.115.12" uvicorn = "^0.34.2" +pydantic = "^2.11.5" [build-system] requires = ["poetry-core"] diff --git a/test.py b/test.py index 47fbb1c..f45824e 100644 --- a/test.py +++ b/test.py @@ -30,12 +30,12 @@ def image_task(params: dict): # Simulate image processing return "Image processed successfully" -job = add(a=3, b=4) # ✨ validated on the spot +# job = add(a=3, b=4) # ✨ validated on the spot -job2 = sub(a=10, b=5) # ✨ no schema validation, just a simple task +# job2 = sub(a=10, b=5) # ✨ no schema validation, just a simple task -task = image_task({"image": "example.png"}) # ✨ no schema validation, just a simple task -task2 = image_task(params={"image": "example.png"}) +# task = image_task({"image": "example.png"}) # ✨ no schema validation, just a simple task +# task2 = image_task(params={"image": "example.png"}) import time if __name__ == "__main__": diff --git a/tests/test_base.py b/tests/test_base.py index f3d767a..30a8982 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,9 +1,22 @@ -import pytest -import fakeredis -import time import json +import time +import fakeredis +import pytest + from modelq import ModelQ -from modelq.app.tasks import Task + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + +def _json_bytes_to_dict(blob): + """Decode Redis bytes → dict.""" + return json.loads(blob.decode() if isinstance(blob, (bytes, bytearray)) else blob) + + +# --------------------------------------------------------------------------- +# fixtures +# --------------------------------------------------------------------------- @pytest.fixture def mock_redis(): @@ -11,103 +24,142 @@ def mock_redis(): @pytest.fixture def modelq_instance(mock_redis): + # No extra kwargs – defaults on the new ModelQ are fine return ModelQ(redis_client=mock_redis) + +# --------------------------------------------------------------------------- +# tests +# --------------------------------------------------------------------------- + def test_register_server(modelq_instance): + # register_server is already called in __init__, but calling again is harmless modelq_instance.register_server() - server_data = modelq_instance.redis_client.hget("servers", modelq_instance.server_id) - assert server_data is not None + raw = modelq_instance.redis_client.hget("servers", modelq_instance.server_id) + assert raw is not None + data = _json_bytes_to_dict(raw) + assert data["status"] == "idle" + assert isinstance(data["last_heartbeat"], float) + def test_modelq_initialization(modelq_instance): - assert modelq_instance.server_id is not None - assert modelq_instance.redis_client is not None + assert modelq_instance.server_id + assert modelq_instance.redis_client + def test_enqueue_task(modelq_instance): task_data = {"task_id": "123", "status": "new"} payload = {"data": "sample"} + modelq_instance.enqueue_task(task_data, payload) - queued_task = modelq_instance.redis_client.lpop("ml_tasks") - assert queued_task is not None - assert b'"task_id": "123"' in queued_task + + # queued in list --------------------------------------------------------- + queued_blob = modelq_instance.redis_client.lpop("ml_tasks") + assert queued_blob + queued = _json_bytes_to_dict(queued_blob) + assert queued["task_id"] == "123" + assert queued["status"] == "queued" + assert "queued_at" in queued + + # registered in sorted-set ----------------------------------------------- + assert modelq_instance.redis_client.zscore("queued_requests", "123") is not None + def test_requeue_stuck_processing_tasks(modelq_instance): task_id = "stuck_task" task_data = { "task_id": task_id, "status": "processing", - "started_at": time.time() - 200 + "started_at": time.time() - 200, } + # simulate a stuck task modelq_instance.redis_client.set(f"task:{task_id}", json.dumps(task_data)) modelq_instance.redis_client.sadd("processing_tasks", task_id) + modelq_instance.requeue_stuck_processing_tasks(threshold=180) - queued_task = modelq_instance.redis_client.lpop("ml_tasks") - assert queued_task is not None - queued_task_data = json.loads(queued_task) - assert queued_task_data["task_id"] == task_id - assert queued_task_data["status"] == "queued" + + queued_blob = modelq_instance.redis_client.lpop("ml_tasks") + assert queued_blob + queued = _json_bytes_to_dict(queued_blob) + assert queued["task_id"] == task_id + assert queued["status"] == "queued" + assert "queued_at" in queued + + # processing set cleaned up + assert task_id.encode() not in modelq_instance.redis_client.smembers("processing_tasks") + def test_prune_old_task_results(modelq_instance): - old_task_id = "old_task" + old_id = "old_task" old_task_data = { - "task_id": old_task_id, + "task_id": old_id, "status": "completed", - "finished_at": time.time() - 90000 + "finished_at": time.time() - 90_000, } - modelq_instance.redis_client.set(f"task_result:{old_task_id}", json.dumps(old_task_data)) - modelq_instance.prune_old_task_results(older_than_seconds=86400) - pruned_task = modelq_instance.redis_client.get(f"task_result:{old_task_id}") - assert pruned_task is None + modelq_instance.redis_client.set(f"task_result:{old_id}", json.dumps(old_task_data)) + modelq_instance.redis_client.set(f"task:{old_id}", json.dumps(old_task_data)) + + modelq_instance.prune_old_task_results(older_than_seconds=86_400) + + assert modelq_instance.redis_client.get(f"task_result:{old_id}") is None + assert modelq_instance.redis_client.get(f"task:{old_id}") is None + def test_heartbeat(modelq_instance): modelq_instance.register_server() - initial_data = json.loads(modelq_instance.redis_client.hget("servers", modelq_instance.server_id)) - initial_heartbeat = initial_data["last_heartbeat"] + initial = _json_bytes_to_dict( + modelq_instance.redis_client.hget("servers", modelq_instance.server_id) + )["last_heartbeat"] + time.sleep(1) modelq_instance.heartbeat() - updated_data = json.loads(modelq_instance.redis_client.hget("servers", modelq_instance.server_id)) - updated_heartbeat = updated_data["last_heartbeat"] - assert updated_heartbeat > initial_heartbeat -def mock_task_function(): - return "Task Completed" + updated = _json_bytes_to_dict( + modelq_instance.redis_client.hget("servers", modelq_instance.server_id) + )["last_heartbeat"] + + assert updated > initial + def test_enqueue_and_retrieve_task(modelq_instance): - task_data = { - "task_id": "task_456", - "status": "new" - } + task_data = {"task_id": "task_456", "status": "new"} payload = {"data": "sample"} + modelq_instance.enqueue_task(task_data, payload) - queued_task = modelq_instance.redis_client.lpop("ml_tasks") - assert queued_task is not None - queued_task_data = json.loads(queued_task) - assert queued_task_data["task_id"] == "task_456" - assert queued_task_data["status"] == "queued" + queued = _json_bytes_to_dict(modelq_instance.redis_client.lpop("ml_tasks")) + + assert queued["task_id"] == "task_456" + assert queued["status"] == "queued" + assert "queued_at" in queued + def test_enqueue_delayed_task(modelq_instance): - task_data = { - "task_id": "delayed_task", - "status": "new" - } - delay_seconds = 10 - modelq_instance.enqueue_delayed_task(task_data, delay_seconds) - delayed_tasks = modelq_instance.redis_client.zrangebyscore("delayed_tasks", 0, time.time() + delay_seconds) - assert len(delayed_tasks) == 1 - delayed_task_data = json.loads(delayed_tasks[0]) - assert delayed_task_data["task_id"] == "delayed_task" + task_dict = {"task_id": "delayed_task", "status": "new"} + delay = 10 + + modelq_instance.enqueue_delayed_task(task_dict, delay) + + # should exist in delayed_tasks with a score ~ now + delay + now_plus_delay = time.time() + delay + delayed = modelq_instance.redis_client.zrangebyscore("delayed_tasks", 0, now_plus_delay) + assert len(delayed) == 1 + assert _json_bytes_to_dict(delayed[0])["task_id"] == "delayed_task" + def test_get_all_queued_tasks(modelq_instance): - modelq_instance.redis_client.delete("ml_tasks") + # clean slate + modelq_instance.delete_queue() + tasks = [ {"task_id": "task_1", "status": "queued"}, - {"task_id": "task_2", "status": "queued"} + {"task_id": "task_2", "status": "queued"}, ] - for task in tasks: - modelq_instance.redis_client.rpush("ml_tasks", json.dumps(task)) - queued_tasks = modelq_instance.get_all_queued_tasks() - assert len(queued_tasks) == 2 - assert queued_tasks[0]["task_id"] == "task_1" - assert queued_tasks[1]["task_id"] == "task_2" + for t in tasks: + modelq_instance.redis_client.rpush("ml_tasks", json.dumps(t)) + + queued = modelq_instance.get_all_queued_tasks() + assert [t["task_id"] for t in queued] == ["task_1", "task_2"] + def test_get_registered_server_ids(modelq_instance): modelq_instance.register_server() From 9041949421c0288d1dcb906c197eeaae68386156 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Sat, 31 May 2025 14:06:10 +0530 Subject: [PATCH 09/11] some last modifications --- examples/text-streaming-server/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/text-streaming-server/tasks.py b/examples/text-streaming-server/tasks.py index 84e9c6b..cc88766 100644 --- a/examples/text-streaming-server/tasks.py +++ b/examples/text-streaming-server/tasks.py @@ -13,7 +13,7 @@ from huggingface_hub import login -login("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") +login("your token here") # Replace with your Hugging Face token redis_client = Redis(host="localhost", port=6379, db=0) modelq_app = ModelQ(redis_client = redis_client) From 02ecc97a530ab8fb457ac8cfccf271636478b37d Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Sat, 31 May 2025 14:41:40 +0530 Subject: [PATCH 10/11] docs update --- README.md | 137 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 116 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index db13674..94b012d 100644 --- a/README.md +++ b/README.md @@ -10,25 +10,28 @@ ModelQ is a lightweight Python library for scheduling and queuing machine learni ModelQ is developed and maintained by the team at [Modelslab](https://modelslab.com/). > **About Modelslab**: Modelslab provides powerful APIs for AI-native applications including: -> - Image generation -> - Uncensored chat -> - Video generation -> - Audio generation -> - And much more +> +> * Image generation +> * Uncensored chat +> * Video generation +> * Audio generation +> * And much more --- ## ✨ Features -- ✅ Retry support (automatic and manual) -- ⏱ Timeout handling for long-running tasks -- 🔁 Manual retry using `RetryTaskException` -- 🎮 Streaming results from tasks in real-time -- 🧹 Middleware hooks for task lifecycle events -- ⚡ Fast, non-blocking concurrency using threads -- 🧵 Built-in decorators to register tasks quickly -- 💃 Redis-based task queueing -- 🖥️ CLI interface for orchestration +* ✅ Retry support (automatic and manual) +* ⏱ Timeout handling for long-running tasks +* 🔁 Manual retry using `RetryTaskException` +* 🎮 Streaming results from tasks in real-time +* 🧹 Middleware hooks for task lifecycle events +* ⚡ Fast, non-blocking concurrency using threads +* 🧵 Built-in decorators to register tasks quickly +* 💃 Redis-based task queueing +* 🖥️ CLI interface for orchestration +* 🔢 Pydantic model support for task validation and typing +* 🌐 Auto-generated REST API for tasks --- @@ -40,44 +43,96 @@ pip install modelq --- +## 🚀 Auto-Generated REST API + +One of ModelQ's most powerful features is the ability to **expose your tasks as HTTP endpoints automatically**. + +By running a single command, every registered task becomes an API route: + +```bash +modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000 +``` + +### How It Works + +* Each task registered with `@q.task(...)` is turned into a POST endpoint under `/tasks/{task_name}` +* If your task uses Pydantic input/output, the endpoint will validate the request and return a proper response schema +* The API is built using FastAPI, so you get automatic Swagger docs at: + +``` +http://localhost:8000/docs +``` + +### Example Usage + +```bash +curl -X POST http://localhost:8000/tasks/add \ + -H "Content-Type: application/json" \ + -d '{"a": 3, "b": 7}' +``` + +You can now build ML inference APIs without needing to write any web code! + +--- + ## 🖥️ CLI Usage You can interact with ModelQ using the `modelq` command-line tool. All commands require an `--app-path` parameter to locate your ModelQ instance in `module:object` format. ### Start Workers + ```bash modelq run-workers main:modelq_app --workers 2 ``` + Start background worker threads for executing tasks. ### Check Queue Status + ```bash modelq status --app-path main:modelq_app ``` + Show number of servers, queued tasks, and registered task types. ### List Queued Tasks + ```bash modelq list-queued --app-path main:modelq_app ``` + Display a list of all currently queued task IDs and their names. ### Clear the Queue + ```bash modelq clear-queue --app-path main:modelq_app ``` + Remove all tasks from the queue. ### Remove a Specific Task + ```bash modelq remove-task --app-path main:modelq_app --task-id ``` + Remove a specific task from the queue by ID. +### Serve API + +```bash +modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000 --log-level info +``` + +Start a FastAPI server for ModelQ to accept task submissions over HTTP. + ### Version + ```bash modelq version ``` + Print the current version of ModelQ CLI. More commands like `requeue-stuck`, `prune-results`, and `get-task-status` are coming soon. @@ -119,18 +174,58 @@ print(task.get_result(q.redis_client)) --- +## 🔢 Pydantic Support + +ModelQ supports **Pydantic models** as both input and output types for tasks. This allows automatic validation of input parameters and structured return values. + +### Example + +```python +from pydantic import BaseModel, Field +from redis import Redis +from modelq import ModelQ +import time + +class AddIn(BaseModel): + a: int = Field(ge=0) + b: int = Field(ge=0) + +class AddOut(BaseModel): + total: int + +redis_client = Redis(host="localhost", port=6379, db=0) +mq = ModelQ(redis_client=redis_client) + +@mq.task(schema=AddIn, returns=AddOut) +def add(payload: AddIn) -> AddOut: + print(f"Processing addition: {payload.a} + {payload.b}.") + time.sleep(10) # Simulate some processing time + return AddOut(total=payload.a + payload.b) +``` + +### Getting Result + +```python +output = job.get_result(mq.redis_client, returns=AddOut) +``` + +ModelQ will validate inputs using Pydantic and serialize/deserialize results seamlessly. + +--- + ## ⚙️ Middleware Support ModelQ allows you to plug in custom middleware to hook into events: ### Supported Events -- `before_worker_boot` -- `after_worker_boot` -- `before_worker_shutdown` -- `after_worker_shutdown` -- `before_enqueue` -- `after_enqueue` -- `on_error` + +* `before_worker_boot` +* `after_worker_boot` +* `before_worker_shutdown` +* `after_worker_shutdown` +* `before_enqueue` +* `after_enqueue` +* `on_error` ### Example From 1ca663d67f42cfd3b72c4854ef0bc9fd05c0e390 Mon Sep 17 00:00:00 2001 From: Tanmay patil Date: Sat, 31 May 2025 16:22:48 +0530 Subject: [PATCH 11/11] v1.0.0 --- test.py => examples/pydantic_demo.py | 0 modelq/app/api/server.py | 30 ++++++++++++++++------------ pyproject.toml | 2 +- 3 files changed, 18 insertions(+), 14 deletions(-) rename test.py => examples/pydantic_demo.py (100%) diff --git a/test.py b/examples/pydantic_demo.py similarity index 100% rename from test.py rename to examples/pydantic_demo.py diff --git a/modelq/app/api/server.py b/modelq/app/api/server.py index 919f239..cf4730d 100644 --- a/modelq/app/api/server.py +++ b/modelq/app/api/server.py @@ -1,5 +1,5 @@ from fastapi import FastAPI, HTTPException, Request -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse,StreamingResponse from pydantic import BaseModel import uvicorn, json, gc, time from typing import Dict, Callable, Any @@ -95,20 +95,24 @@ async def endpoint(payload: _schema, request: Request): # type: ignore[valid-ty else: job = _func(payload) - # ----- quick result (3 s) ----- try: - result = job.get_result( - modelq_instance.redis_client, - timeout=3, - returns=_returns, - modelq_ref=modelq_instance, - ) - if isinstance(result, BaseModel): - return JSONResponse(content=result.model_dump()) - elif isinstance(result, dict): - return JSONResponse(content=result) + if job.stream: + return StreamingResponse( + job.get_stream(modelq_instance.redis_client), media_type="text/event-stream" + ) else: - return JSONResponse(content={"result": result}) + result = job.get_result( + modelq_instance.redis_client, + timeout=3, + returns=_returns, + modelq_ref=modelq_instance, + ) + if isinstance(result, BaseModel): + return JSONResponse(content=result.model_dump()) + elif isinstance(result, dict): + return JSONResponse(content=result) + else: + return JSONResponse(content={"result": result}) except TaskTimeoutError: return JSONResponse( diff --git a/pyproject.toml b/pyproject.toml index 6a2da06..6a5ae1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "modelq" -version = "0.1.28" +version = "1.0.0" description = "Celery-like task queue for ML inference." authors = ["Tanmaypatil123 "] readme = "README.md"