A lightweight PHP task queue library, alternative to traditional queue systems, optimized for ML inference workloads.
- Simple API - Easy to use task registration and enqueueing
- Redis-backed - Fast and reliable task storage using Redis
- Streaming Support - Real-time streaming results for long-running tasks
- Remote Workers - Connect PHP frontend to Python GPU workers via Redis
- Middleware Hooks - Lifecycle hooks for monitoring and customization
- Delayed Tasks - Schedule tasks to run after a delay
- CLI Tools - Built-in commands for queue management
- ML Optimized - Designed for machine learning inference workloads
ModelQ PHP is designed to work seamlessly with Python ModelQ workers running on GPU servers. This enables you to:
- Run your PHP web application on standard servers
- Offload ML inference tasks to dedicated GPU servers running Python
- Share a managed Redis instance (AWS ElastiCache, GCP Memorystore, etc.)
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PHP Frontend │─────▶│ Managed Redis │◀─────│ Python Worker │
│ (Web Server) │ │ (AWS/GCP/etc) │ │ (GPU Server) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
Enqueue tasks Process tasks
Get results (ML inference)
from modelq import ModelQ
app = ModelQ(
redis_host="your-redis.cache.amazonaws.com",
redis_port=6379,
redis_password="your-password"
)
@app.task("generate_image")
def generate_image(data):
prompt = data.get("prompt")
# Use GPU for image generation...
return {"image_url": "https://cdn.example.com/generated.jpg"}
@app.task("llm_completion", stream=True)
def llm_completion(data):
for token in generate_tokens(data["prompt"]):
yield token
app.run_workers(num_workers=4)<?php
use ModelsLab\ModelQ\ModelQ;
// Connect to the SAME Redis as Python worker
$modelq = new ModelQ(
host: 'your-redis.cache.amazonaws.com',
port: 6379,
password: 'your-password'
);
// Register tasks (handlers empty - Python does the work)
$modelq->task('generate_image', fn($d) => null);
$modelq->task('llm_completion', fn($d) => null, ['stream' => true]);
// Enqueue task for Python worker
$task = $modelq->enqueue('generate_image', [
'prompt' => 'A sunset over mountains'
]);
// Get result from Python worker
$result = $task->getResult($modelq->getRedisClient(), timeout: 120);
echo $result['image_url'];
// Or stream LLM responses
$task = $modelq->enqueue('llm_completion', ['prompt' => 'Hello']);
foreach ($task->getStream($modelq->getRedisClient()) as $token) {
echo $token;
}See examples/remote_worker.php for a complete example.
- PHP 8.1 or higher
- Redis server
- phpredis extension (
ext-redis)
composer require modelslab/modelqMake sure you have the phpredis extension installed:
# Ubuntu/Debian
sudo apt-get install php-redis
# macOS with Homebrew
pecl install redis<?php
require_once 'vendor/autoload.php';
use ModelsLab\ModelQ\ModelQ;
$modelq = new ModelQ(host: '127.0.0.1', port: 6379);
// Register a task handler
$modelq->task('process_image', function (array $data): array {
$imageUrl = $data['url'];
// Process the image...
return ['status' => 'processed', 'url' => $imageUrl];
});
// Start the worker
$modelq->startWorkers(numWorkers: 2);<?php
require_once 'vendor/autoload.php';
use ModelsLab\ModelQ\ModelQ;
$modelq = new ModelQ(host: '127.0.0.1', port: 6379);
// Register the task (handler can be empty on producer side)
$modelq->task('process_image', fn($data) => null);
// Enqueue a task
$task = $modelq->enqueue('process_image', ['url' => 'https://example.com/image.jpg']);
// Wait for the result
$result = $task->getResult($modelq->getRedisClient(), timeout: 30);
echo "Result: " . json_encode($result);use ModelsLab\ModelQ\ModelQ;
$modelq = new ModelQ(host: '127.0.0.1', port: 6379);
// Simple task
$modelq->task('add_numbers', function (array $data): array {
return ['sum' => $data['a'] + $data['b']];
});
// Task with options
$modelq->task('long_running_task', function (array $data): mixed {
// Long running operation...
return $result;
}, [
'timeout' => 300, // 5 minute timeout
'retries' => 3, // Retry up to 3 times on failure
]);// Basic enqueue
$task = $modelq->enqueue('add_numbers', ['a' => 5, 'b' => 3]);
// Get the task ID
echo "Task ID: " . $task->taskId;
// Wait for result (blocking)
$result = $task->getResult($modelq->getRedisClient(), timeout: 10);By default, ModelQ generates a UUID for each task. You can provide your own task ID to correlate tasks with your database records:
// Use your database record ID as the task ID
$orderId = 'order-12345';
$task = $modelq->enqueue('process_order', ['order_id' => $orderId], taskId: $orderId);
echo $task->taskId; // 'order-12345'
// Later, retrieve the task using the same ID
$status = $modelq->getTaskStatus($orderId);
$details = $modelq->getTaskDetails($orderId);This is useful when you want to:
- Track tasks using your existing database primary keys
- Easily correlate queue tasks with database records
- Look up task status without storing the generated UUID
For tasks that produce incremental output (like ML text generation):
// Register a streaming task
$modelq->task('generate_text', function (array $data): Generator {
$prompt = $data['prompt'];
$words = ['Hello', 'World', 'from', 'ModelQ'];
foreach ($words as $word) {
usleep(100000); // Simulate processing
yield $word;
}
}, ['stream' => true]);
// Consume the stream
$task = $modelq->enqueue('generate_text', ['prompt' => 'Hello']);
foreach ($task->getStream($modelq->getRedisClient()) as $chunk) {
echo $chunk . " ";
}
// Output: Hello World from ModelQ// Get all queued tasks
$tasks = $modelq->getAllQueuedTasks();
foreach ($tasks as $task) {
echo "Task: {$task['task_id']} - {$task['task_name']}\n";
}
// Get task status
$status = $modelq->getTaskStatus($taskId);
echo "Status: $status"; // queued, processing, completed, failed
// Remove a task from queue
$modelq->removeTaskFromQueue($taskId);
// Clear the entire queue
$modelq->deleteQueue();
// Get currently processing tasks
$processing = $modelq->getProcessingTasks();Track past tasks, view errors, and monitor remote workers:
// Get full task details (including error info for failed tasks)
$details = $modelq->getTaskDetails($taskId);
if ($details['status'] === 'failed') {
echo "Error: " . $details['error']['message'];
echo "Type: " . $details['error']['type'];
echo "File: " . $details['error']['file'] . ":" . $details['error']['line'];
echo "Trace: " . $details['error']['trace'];
}
// Get task history (most recent first)
$history = $modelq->getTaskHistory(limit: 50);
// Get only failed tasks
$failed = $modelq->getFailedTasks(limit: 20);
foreach ($failed as $task) {
echo "Task {$task['task_name']} failed: {$task['error']['message']}\n";
}
// Get only completed tasks
$completed = $modelq->getCompletedTasks(limit: 20);
// Filter by task name
$imageTasks = $modelq->getTasksByName('process_image', limit: 50);
// Get task statistics
$stats = $modelq->getTaskStats();
echo "Total: {$stats['total']}\n";
echo "Completed: {$stats['by_status']['completed']}\n";
echo "Failed: {$stats['by_status']['failed']}\n";
// See per-task-name stats
foreach ($stats['by_task_name'] as $name => $counts) {
echo "{$name}: {$counts['completed']}/{$counts['total']} succeeded\n";
}
// Get task count in history
$count = $modelq->getTaskHistoryCount();
// Clear old history (older than 7 days by default)
$removed = $modelq->clearTaskHistory(); // Default: 7 days
$removed = $modelq->clearTaskHistory(3600); // Older than 1 hourGet detailed information about registered workers including system resources (CPU, RAM, GPU):
// Get all registered workers
$workers = $modelq->getWorkers();
foreach ($workers as $workerId => $worker) {
echo "Worker: {$workerId}\n";
echo " Status: {$worker['status']}\n";
echo " Hostname: {$worker['hostname']}\n";
echo " OS: {$worker['os']}\n";
if ($worker['system_info']) {
$cpu = $worker['system_info']['cpu'];
$ram = $worker['system_info']['ram'];
echo " CPU: {$cpu['cores_logical']} cores ({$cpu['usage_percent']}% used)\n";
echo " RAM: {$ram['total_gb']} GB ({$ram['used_percent']}% used)\n";
// GPU info (if available)
foreach ($worker['system_info']['gpu'] as $gpu) {
echo " GPU: {$gpu['name']} - {$gpu['memory_total_gb']} GB\n";
echo " Utilization: {$gpu['gpu_utilization_percent']}%\n";
}
}
echo " Tasks: " . implode(', ', $worker['allowed_tasks']) . "\n";
}
// Get a specific worker by ID
$worker = $modelq->getWorker('gpu-server-1');
if ($worker) {
echo "Worker {$worker['worker_id']} is {$worker['status']}\n";
}Worker info includes:
worker_id- Unique worker identifierstatus- Current status (idle, busy)allowed_tasks- List of tasks this worker handleslast_heartbeat- Unix timestamp of last heartbeathostname- Worker hostnameos- Operating system infopython_version- Python version (for Python workers)php_version- PHP version (for PHP workers)system_info- Detailed CPU, RAM, and GPU information
// Enqueue a task to run after 60 seconds
$taskData = [
'task_id' => 'delayed-' . uniqid(),
'task_name' => 'send_reminder',
'payload' => ['user_id' => 123],
'status' => 'queued',
];
$modelq->enqueueDelayedTask($taskData, delaySeconds: 60);Create custom middleware for lifecycle hooks:
use ModelsLab\ModelQ\Middleware\Middleware;
use ModelsLab\ModelQ\Task\Task;
class LoggingMiddleware extends Middleware
{
public function beforeEnqueue(?Task $task): void
{
echo "Enqueueing task: {$task->taskName}\n";
}
public function afterEnqueue(?Task $task): void
{
echo "Task enqueued: {$task->taskId}\n";
}
public function beforeWorkerBoot(): void
{
echo "Worker starting...\n";
}
public function afterWorkerBoot(): void
{
echo "Worker ready!\n";
}
public function onError(?Task $task, ?\Throwable $error): void
{
echo "Task {$task->taskId} failed: {$error->getMessage()}\n";
}
public function onTimeout(?Task $task): void
{
echo "Task {$task->taskId} timed out\n";
}
}
// Apply middleware
$modelq->setMiddleware(new LoggingMiddleware());// Use your own Redis connection
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('your-password');
$redis->select(2);
$modelq = new ModelQ(redisClient: $redis);use ModelsLab\ModelQ\Exception\TaskTimeoutException;
use ModelsLab\ModelQ\Exception\TaskProcessingException;
use ModelsLab\ModelQ\Exception\RetryTaskException;
try {
$result = $task->getResult($modelq->getRedisClient(), timeout: 10);
} catch (TaskTimeoutException $e) {
echo "Task {$e->taskId} timed out\n";
} catch (TaskProcessingException $e) {
echo "Task {$e->taskName} failed: {$e->getMessage()}\n";
}
// Inside a task handler, trigger a retry
$modelq->task('flaky_task', function (array $data): mixed {
if (someCondition()) {
throw new RetryTaskException('Temporary failure, retrying...');
}
return $result;
});ModelQ includes CLI commands for queue management:
# Show queue status
./vendor/bin/modelq status app.php
# List all queued tasks
./vendor/bin/modelq list-queued app.php
# Remove a specific task
./vendor/bin/modelq remove-task app.php <task-id>
# Clear the entire queue
./vendor/bin/modelq clear-queue app.php
# Start workers
./vendor/bin/modelq run-workers app.php --workers=4The app.php file should return a configured ModelQ instance:
<?php
// app.php
require_once 'vendor/autoload.php';
use ModelsLab\ModelQ\ModelQ;
$modelq = new ModelQ(host: '127.0.0.1', port: 6379);
$modelq->task('my_task', function (array $data): mixed {
// Task handler
return $data;
});
return $modelq;$modelq = new ModelQ(
redisClient: null, // Optional: Provide your own Redis client
host: '127.0.0.1', // Redis host
port: 6379, // Redis port
db: 0, // Redis database number
password: null, // Redis password
serverId: null, // Custom server ID (defaults to hostname)
webhookUrl: null, // Webhook URL for task completion notifications
requeueThreshold: null, // Time before requeuing stuck tasks
delaySeconds: 30, // Default delay for delayed tasks
logger: null, // PSR-3 logger instance
);$modelq->task('my_task', $handler, [
'timeout' => 60, // Task timeout in seconds
'stream' => false, // Enable streaming mode
'retries' => 3, // Number of retry attempts
]);┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │────▶│ Redis │◀────│ Worker │
│ (Enqueue) │ │ (Queue) │ │ (Process) │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Results │
│ Storage │
└─────────────┘
ModelQ uses the following Redis keys:
| Key Pattern | Type | Description |
|---|---|---|
task_queue |
List | Main task queue |
task:{id} |
String | Task data |
task_result:{id} |
String | Task result |
task_stream:{id} |
Stream | Streaming task output |
servers |
Hash | Registered worker servers |
processing_tasks |
Set | Currently processing tasks |
delayed_tasks |
Sorted Set | Delayed tasks (score = execution time) |
# Run PHPUnit tests
./vendor/bin/phpunit
# Run manual tests
php test_manual.php
# Run worker tests
php test_worker.php
# Run streaming tests
php test_streaming.phpCheck the examples/ directory for complete working examples:
examples/basic_usage.php- Basic task queue operationsexamples/producer.php- Enqueueing tasks and getting resultsexamples/worker.php- Worker process setupexamples/streaming_example.php- Streaming task exampleexamples/middleware_example.php- Custom middleware usageexamples/queue_management.php- Queue management operationsexamples/remote_worker.php- PHP + Remote Python GPU Worker
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.
- ModelQ Python - The original Python implementation