A Python pipeline for enriching email-style messages using local LLM inference. This system processes incoming messages through a local Ollama server running the Llama3 model to extract structured information including message categorization, descriptions, and email addresses.
This enrichment pipeline receives short, email-style messages and processes them through a local LLM to extract:
- Category: One of
["phishing", "newsletter", "internal"] - Description: A concise summary (≤25 words)
- Emails: List of unique, lowercase email addresses found in the text
The system is designed for reliability, performance, and maintainability with comprehensive error handling, retry mechanisms, and clean separation of concerns.
- LLM Integration: Uses Ollama with Llama3 model for local inference
- Validation: Validates and normalizes all extracted data
- Error Resilience: Implements retry logic with exponential backoff
- Async Processing: Efficient message queue processing
- Comprehensive Testing: Unit and integration tests with high coverage
- Clean Architecture: Modular design with clear separation of concerns
- Docker and Docker Compose
- Git
-
Clone the repository:
git clone <repository-url> cd "Data Stream LLM Enrichment"
-
Set up Python environment (only required if you are running the pipeline directly with Python i use 3.12, not with Docker):
python -m venv .venv # On Windows: .venv\Scripts\activate # On macOS/Linux: source .venv/bin/activate pip install -r requirements.txt
-
Start the LLM server:
# Start Ollama and download Llama3 model .\docker-manage.bat setup # Or manually: docker-compose up -d ollama docker-compose exec ollama ollama pull llama3
Option 1: Using Docker
# Start all services
.\docker-manage.bat start
# View logs
.\docker-manage.bat logs
# Stop services
.\docker-manage.bat stopOption 2: Direct Python Execution
# Ensure Ollama is running first
.\docker-manage.bat setup
# Run the pipeline
python -m src.mainThe pipeline can be configured via environment variables:
# LLM Configuration
LLM_URL=http://localhost:11434 # Ollama server URL
MODEL_NAME=llama3:latest # Model to use
TIMEOUT_S=60 # Request timeout
# Retry Configuration
RETRIES=5 # Number of retries
RETRY_BACKOFF_BASE_S=1.0 # Initial backoff delay
RETRY_BACKOFF_MAX_S=30.0 # Maximum backoff delay
RETRY_STATUS_CODES=429,500,502,503,504 # HTTP codes to retry
# File Paths
INPUT_PATH=./data/dev_emails_150.json # Input dataset
OUTPUT_PATH=./output/results.json # Output fileThe system follows a clean, layered architecture with clear separation of concerns:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Application │ │ Message Queue │ │ LLM Provider │
│ Orchestrator │───▶│ (In-Memory) │───▶│ (Ollama) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Result Handler │ │ Message Worker │ │ Validator │
│ (File Output) │◀───│ (Processor) │───▶│ (Normalization) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
-
EnrichmentApplication (
src/app.py)- Main orchestrator managing the complete pipeline
- Handles component initialization, dataset loading, and result aggregation
-
Message Processing Pipeline
- MessageQueue (
src/queues/): Abstraction for message queuing (easily extensible to Kafka/RabbitMQ) - Worker (
src/workers/): Processes messages from queue - MessageProcessor (
src/processors/): Core business logic for LLM interaction
- MessageQueue (
-
LLM Integration (
src/llm/)- LLMProvider: Abstract interface for LLM services
- LLMClient: Ollama-specific implementation with retry logic
- Prompts: Centralized prompt management with enhanced email extraction rules
-
Data Models (
src/models/)- Message: Input message structure with validation
- EnrichmentResult: Output format matching assignment requirements
- Category: Enum for message categories
- ValidatedLLMResponse: Normalized LLM output
-
Validation & Normalization (
src/validation/)- Email format validation and normalization
- Category validation against allowed values
- Description word limit enforcement
-
Result Handling (
src/handlers/)- File-based output with append functionality
- Supports incremental processing
- Factory Pattern: QueueFactory for extensible queue implementations
- Strategy Pattern: LLMProvider interface for different LLM services
- Dependency Injection: Clean separation and testability
- Enhanced Email Extraction Rules: The original assignment prompt was enhanced with explicit
EMAIL_EXTRACTION_RULESto prevent the LLM from "correcting" invalid emails - Problem Identified: During testing, the LLM was automatically fixing malformed emails (e.g., converting
'user@@company.com'to'user@company.com') - Solution Implemented: Added strict rules instructing the LLM to extract only emails that appear exactly in the text, without correction or inference
- Retry Logic: Exponential backoff with jitter using Tenacity
- Graceful Degradation: Individual message failures don't stop pipeline
- Comprehensive Logging: Structured logging throughout the pipeline
- Health Checks: LLM service availability verification
- Message Format: Input JSON contains
idandtextfields as shown in dataset - ID Uniqueness: Message IDs are unique within a dataset
- Email Extraction: Only extract emails that appear literally in the text (no inference or correction)
- Malformed Emails: Emails with
@@or..are considered invalid and excluded - Case Normalization: All extracted emails are converted to lowercase
- Description Length: Enforce 25-word limit by truncating excess words
- Local Deployment: System runs on single machine with Docker support
- Network Stability: Stable connection to local Ollama server
- Resource Availability: Sufficient CPU/memory for LLM inference
- Disk Space: Adequate space for model storage and output files
The pipeline produces JSON output in the exact format specified:
Successful Processing:
{
"id": 310005,
"success": true,
"category": "internal",
"description": "Maintenance window and changes notification with action items",
"emails": ["user@company.com"]
}Failed Processing:
{
"id": 310005,
"success": false,
"error": "Processing error: LLM service unavailable"
}- Throughput: ~2-5 messages/second (depends on LLM performance)
- Memory Usage: Minimal - processes messages individually
- Disk Usage: Only for model storage (~4GB for Llama3) and output files
- Scalability: Designed for easy migration to distributed queues
Data Stream LLM Enrichment/
├── src/
│ ├── app.py # Main application orchestrator
│ ├── main.py # Entry point
│ ├── config.py # Configuration management
│ ├── models/ # Data models and validation
│ ├── llm/ # LLM integration
│ ├── processors/ # Message processing logic
│ ├── queues/ # Message queue abstractions
│ ├── workers/ # Worker implementation
│ ├── validation/ # Data validation and normalization
│ └── handlers/ # Result handling
├── tests/
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
├── data/ # Input datasets
├── output/ # Processing results
├── docker-compose.yml # Docker services
├── Dockerfile # Application container
├── requirements.txt # Python dependencies
└── README.md # This file
-
LLM Server Not Running
# Check if Ollama is running docker ps | grep ollama # Restart Ollama .\docker-manage.bat setup
-
Model Not Downloaded
# Download Llama3 model docker-compose exec ollama ollama pull llama3
-
Permission Errors
# Ensure output directory exists and is writable mkdir -p output -
Port Conflicts
# Check if port 11434 is available netstat -an | findstr 11434
- Application logs are printed to console with timestamps
- Enable debug logging by setting
LOG_LEVEL=DEBUG - Check Docker logs:
.\docker-manage.bat logs
Example log for LLM retry:
2025-10-19 13:44:37,311 - Pipeline - WARNING - LLM retrying (attempt 1) after exception: HTTPConnectionPool(host='ollama', port=11434): Read timed out. (read timeout=60)
The project includes both unit and integration tests to ensure correctness and reliability.
pytest python -m pytest tests/unit/pytest -m pytest tests/integration/Test coverage includes:
- Core pipeline logic (app, worker, processor)
- LLM client and error handling
- Data models and validation
- Output file handler
All tests are located in the tests/ directory.
-
Note on Idempotency and Save Strategy:
- There was a consideration to implement streaming output (writing each result as soon as it is processed), but this approach carries a risk: if the process crashes or is interrupted mid-write, the output file may become corrupted or incomplete ("broken file").
- To maximize reliability, the current design saves all results at the end of processing, writing a single valid JSON array. This ensures that the output file is always complete and valid, even if the process is interrupted before the final write.
-
My first design choice was to try with N worker threads to process the messages concurrently, but after some testing I found that the LLM server was the bottleneck, so I switched to a single worker thread to avoid overwhelming the LLM server with requests.