Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest fakeredis click requests redis Pillow
pip install pytest fakeredis click requests redis Pillow pydantic fastapi uvicorn typer

- name: Run tests
run: |
Expand Down
193 changes: 167 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,135 @@ ModelQ is a lightweight Python library for scheduling and queuing machine learni
ModelQ is developed and maintained by the team at [Modelslab](https://modelslab.com/).

> **About Modelslab**: Modelslab provides powerful APIs for AI-native applications including:
> - Image generation
> - Uncensored chat
> - Video generation
> - Audio generation
> - And much more

## 🚀 Features

- ✅ Retry support (automatic and manual)
- ⏱ Timeout handling for long-running tasks
- 🔁 Manual retry using `RetryTaskException`
- 🎮 Streaming results from tasks in real-time
- 🧹 Middleware hooks for task lifecycle events
- ⚡ Fast, non-blocking concurrency using threads
- 🧵 Built-in decorators to register tasks quickly
- 💃 Redis-based task queueing
>
> * Image generation
> * Uncensored chat
> * Video generation
> * Audio generation
> * And much more

---

## 🛆 Installation
## ✨ Features

* ✅ Retry support (automatic and manual)
* ⏱ Timeout handling for long-running tasks
* 🔁 Manual retry using `RetryTaskException`
* 🎮 Streaming results from tasks in real-time
* 🧹 Middleware hooks for task lifecycle events
* ⚡ Fast, non-blocking concurrency using threads
* 🧵 Built-in decorators to register tasks quickly
* 💃 Redis-based task queueing
* 🖥️ CLI interface for orchestration
* 🔢 Pydantic model support for task validation and typing
* 🌐 Auto-generated REST API for tasks

---

## 🗆 Installation

```bash
pip install modelq
```

---

## 🚀 Auto-Generated REST API

One of ModelQ's most powerful features is the ability to **expose your tasks as HTTP endpoints automatically**.

By running a single command, every registered task becomes an API route:

```bash
modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000
```

### How It Works

* Each task registered with `@q.task(...)` is turned into a POST endpoint under `/tasks/{task_name}`
* If your task uses Pydantic input/output, the endpoint will validate the request and return a proper response schema
* The API is built using FastAPI, so you get automatic Swagger docs at:

```
http://localhost:8000/docs
```

### Example Usage

```bash
curl -X POST http://localhost:8000/tasks/add \
-H "Content-Type: application/json" \
-d '{"a": 3, "b": 7}'
```

You can now build ML inference APIs without needing to write any web code!

---

## 🖥️ CLI Usage

You can interact with ModelQ using the `modelq` command-line tool. All commands require an `--app-path` parameter to locate your ModelQ instance in `module:object` format.

### Start Workers

```bash
modelq run-workers main:modelq_app --workers 2
```

Start background worker threads for executing tasks.

### Check Queue Status

```bash
modelq status --app-path main:modelq_app
```

Show number of servers, queued tasks, and registered task types.

### List Queued Tasks

```bash
modelq list-queued --app-path main:modelq_app
```

Display a list of all currently queued task IDs and their names.

### Clear the Queue

```bash
modelq clear-queue --app-path main:modelq_app
```

Remove all tasks from the queue.

### Remove a Specific Task

```bash
modelq remove-task --app-path main:modelq_app --task-id <task_id>
```

Remove a specific task from the queue by ID.

### Serve API

```bash
modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000 --log-level info
```

Start a FastAPI server for ModelQ to accept task submissions over HTTP.

### Version

```bash
modelq version
```

Print the current version of ModelQ CLI.

More commands like `requeue-stuck`, `prune-results`, and `get-task-status` are coming soon.

---

## 🧠 Basic Usage

```python
Expand Down Expand Up @@ -72,18 +174,58 @@ print(task.get_result(q.redis_client))

---

## 🔢 Pydantic Support

ModelQ supports **Pydantic models** as both input and output types for tasks. This allows automatic validation of input parameters and structured return values.

### Example

```python
from pydantic import BaseModel, Field
from redis import Redis
from modelq import ModelQ
import time

class AddIn(BaseModel):
a: int = Field(ge=0)
b: int = Field(ge=0)

class AddOut(BaseModel):
total: int

redis_client = Redis(host="localhost", port=6379, db=0)
mq = ModelQ(redis_client=redis_client)

@mq.task(schema=AddIn, returns=AddOut)
def add(payload: AddIn) -> AddOut:
print(f"Processing addition: {payload.a} + {payload.b}.")
time.sleep(10) # Simulate some processing time
return AddOut(total=payload.a + payload.b)
```

### Getting Result

```python
output = job.get_result(mq.redis_client, returns=AddOut)
```

ModelQ will validate inputs using Pydantic and serialize/deserialize results seamlessly.

---

## ⚙️ Middleware Support

ModelQ allows you to plug in custom middleware to hook into events:

### Supported Events
- `before_worker_boot`
- `after_worker_boot`
- `before_worker_shutdown`
- `after_worker_shutdown`
- `before_enqueue`
- `after_enqueue`
- `on_error`

* `before_worker_boot`
* `after_worker_boot`
* `before_worker_shutdown`
* `after_worker_shutdown`
* `before_enqueue`
* `after_enqueue`
* `on_error`

### Example

Expand All @@ -106,7 +248,7 @@ q.middleware = LoggingMiddleware()

---

## 🛠 Configuration
## 🛠 Configuration

Connect to Redis using custom config:

Expand All @@ -132,4 +274,3 @@ ModelQ is released under the MIT License.
## 🤝 Contributing

We welcome contributions! Open an issue or submit a PR at [github.com/modelslab/modelq](https://github.com/modelslab/modelq).

106 changes: 57 additions & 49 deletions examples/fastapi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
from threading import Thread
from modelq import ModelQ
from modelq.app.middleware import Middleware
from PIL import Image
# from PIL import Image
import time
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
import os
import torch
import numpy as np
# import torch
# import numpy as np
from redis import Redis
import base64
# import base64

imagine_db = Redis(host="localhost", port=6379, db=0)

modelq = ModelQ(redis_client = imagine_db)
modelq_app = ModelQ(redis_client = imagine_db)

class CurrentModel:
def __init__(self):
Expand All @@ -26,59 +24,69 @@ def load_model(self):

model_path = "/workspace/XTTS-v2"
model_name = "tts_models/multilingual/multi-dataset/xtts_v2"

self.config = XttsConfig()
self.config.load_json(os.path.join(model_path, "config.json"))
self.model = Xtts.init_from_config(self.config)
self.model.load_checkpoint(self.config, checkpoint_dir=model_path, eval=True)
self.model.to(device)
self.model = model_path
print(f"Loading model from {model_path}...")

CURRENT_MODEL = CurrentModel()

class BeforeWorker(Middleware):
def before_worker_boot(self):
print("Loading model...")
CURRENT_MODEL.load_model()

modelq.middleware = BeforeWorker()


def wav_postprocess(wav):
"""Post process the output waveform"""
if isinstance(wav, list):
wav = torch.cat(wav, dim=0)
wav = wav.clone().detach().cpu().numpy()
wav = np.clip(wav, -1, 1)
wav = (wav * 32767).astype(np.int16)
return wav

@modelq.task(timeout=15, stream=True)
def stream(params):
time.sleep(10)
gpt_cond_latent, speaker_embedding = CURRENT_MODEL.model.get_conditioning_latents(audio_path=["/workspace/XTTS-v2/samples/en_sample.wav"])
streamer = CURRENT_MODEL.model.inference_stream(
params,
"en",
gpt_cond_latent,
speaker_embedding,
stream_chunk_size=150,
enable_text_splitting=True,
)
for chunk in streamer:
processed_chunk = wav_postprocess(chunk)
processed_bytes = processed_chunk.tobytes()
base64_chunk = base64.b64encode(processed_bytes).decode("utf-8")
yield base64_chunk


@modelq.task()
modelq_app.middleware = BeforeWorker()


# def wav_postprocess(wav):
# """Post process the output waveform"""
# if isinstance(wav, list):
# wav = torch.cat(wav, dim=0)
# wav = wav.clone().detach().cpu().numpy()
# wav = np.clip(wav, -1, 1)
# wav = (wav * 32767).astype(np.int16)
# return wav

# @modelq.task(timeout=15, stream=True)
# def stream(params):
# time.sleep(10)
# gpt_cond_latent, speaker_embedding = CURRENT_MODEL.model.get_conditioning_latents(audio_path=["/workspace/XTTS-v2/samples/en_sample.wav"])
# streamer = CURRENT_MODEL.model.inference_stream(
# params,
# "en",
# gpt_cond_latent,
# speaker_embedding,
# stream_chunk_size=150,
# enable_text_splitting=True,
# )
# for chunk in streamer:
# processed_chunk = wav_postprocess(chunk)
# processed_bytes = processed_chunk.tobytes()
# base64_chunk = base64.b64encode(processed_bytes).decode("utf-8")
# yield base64_chunk


@modelq_app.task()
def add_task():
time.sleep(20)
return 2 + 3


@modelq.task(timeout=15)
def image_task():
return Image.open("lmao.png")
# @modelq.task(timeout=15)
# def image_task():
# return Image.open("lmao.png")

# @modelq_app.cron_task(interval_seconds=10)
# def cron_task():
# print(CURRENT_MODEL.model)
# print("Cron task executed")


# if __name__ == "__main__":
# modelq_app.start_workers()

modelq.start_workers()
# # Keep the worker running indefinitely
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# print("\nGracefully shutting down...")
Loading