Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 126 additions & 49 deletions src/document_analysis_mcp/cache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
- TTL-based cache expiration (configurable via CACHE_TTL_DAYS)
- JSON storage for cache metadata and results
- Automatic cleanup of expired entries
- Thread-safe operations with file locking and in-memory locks
"""

import fcntl
import hashlib
import json
import logging
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
Expand Down Expand Up @@ -109,38 +112,77 @@ def __init__(self, cache_dir: Path | None = None, ttl_days: int | None = None) -
self._ttl_days = ttl_days or settings.cache_ttl_days
self._metadata: dict[str, CacheEntry] = {}
self._initialized = False
self._lock = threading.Lock() # Thread safety for in-memory metadata

def _ensure_initialized(self) -> None:
"""Ensure cache directory exists and metadata is loaded."""
"""Ensure cache directory exists and metadata is loaded.

Thread-safe: Uses file locking for metadata reads and in-memory lock
for initialization state.
"""
if self._initialized:
return

# Create cache directory
self._cache_dir.mkdir(parents=True, exist_ok=True)
with self._lock:
# Double-check after acquiring lock
if self._initialized:
return

# Create cache directory
self._cache_dir.mkdir(parents=True, exist_ok=True)

# Load existing metadata with file locking
self._metadata = self._load_metadata()

self._initialized = True

# Load existing metadata
def _load_metadata(self) -> dict[str, CacheEntry]:
"""Load cache metadata from disk with file locking.

Returns:
Dictionary of cache entries keyed by cache key.
"""
metadata_path = self._cache_dir / METADATA_FILE
if metadata_path.exists():
try:
with open(metadata_path) as f:
data = json.load(f)
self._metadata = {
key: CacheEntry.from_dict(entry) for key, entry in data.items()
}
logger.debug("Loaded %d cache entries from %s", len(self._metadata), metadata_path)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning("Failed to load cache metadata: %s", e)
self._metadata = {}
if not metadata_path.exists():
return {}

self._initialized = True
try:
with open(metadata_path) as f:
fcntl.flock(f.fileno(), fcntl.LOCK_SH) # Shared lock for reading
try:
data = json.load(f)
entries = {key: CacheEntry.from_dict(entry) for key, entry in data.items()}
logger.debug("Loaded %d cache entries from %s", len(entries), metadata_path)
return entries
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning("Failed to load cache metadata: %s", e)
return {}
except OSError as e:
logger.error("Failed to read cache metadata file: %s", e)
return {}

def _save_metadata(self) -> None:
"""Save cache metadata to disk."""
"""Save cache metadata to disk with file locking.

Uses exclusive file lock to prevent concurrent writes from corrupting
the metadata file.
"""
metadata_path = self._cache_dir / METADATA_FILE
try:
with open(metadata_path, "w") as f:
data = {key: entry.to_dict() for key, entry in self._metadata.items()}
json.dump(data, f, indent=2)
# Write to temp file first, then rename for atomicity
temp_path = metadata_path.with_suffix(".tmp")
with open(temp_path, "w") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Exclusive lock for writing
try:
data = {key: entry.to_dict() for key, entry in self._metadata.items()}
json.dump(data, f, indent=2)
f.flush()
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock
# Atomic rename
temp_path.rename(metadata_path)
except OSError as e:
logger.error("Failed to save cache metadata: %s", e)

Expand Down Expand Up @@ -178,6 +220,8 @@ def get(
) -> dict[str, Any] | None:
"""Retrieve a cached result for the given content and operation.

Thread-safe: Uses in-memory lock for metadata access.

Args:
content: PDF content (base64 string or bytes).
operation: Operation type to look up.
Expand All @@ -193,18 +237,20 @@ def get(
if params_hash:
cache_key = f"{cache_key}:{params_hash}"

entry = self._metadata.get(cache_key)
if entry is None:
logger.debug("Cache miss: %s (not found)", cache_key[:16])
return None
with self._lock:
entry = self._metadata.get(cache_key)
if entry is None:
logger.debug("Cache miss: %s (not found)", cache_key[:16])
return None

if entry.is_expired():
logger.debug("Cache miss: %s (expired)", cache_key[:16])
self._remove_entry(cache_key)
return None
if entry.is_expired():
logger.debug("Cache miss: %s (expired)", cache_key[:16])
self._remove_entry_unlocked(cache_key)
return None

result_path = Path(entry.file_path)

# Load cached result
result_path = Path(entry.file_path)
# Read file outside the lock (file I/O can be slow)
if not result_path.exists():
logger.warning("Cache file missing: %s", result_path)
self._remove_entry(cache_key)
Expand All @@ -230,6 +276,9 @@ def put(
) -> str:
"""Store a result in the cache.

Thread-safe: Uses in-memory lock for metadata updates and file locking
for disk persistence.

Args:
content: Original PDF content (base64 string or bytes).
operation: Operation type being cached.
Expand Down Expand Up @@ -260,15 +309,15 @@ def put(
tz=timezone.utc,
)

# Save result file
# Save result file (outside lock since file I/O is slow)
try:
with open(result_path, "w") as f:
json.dump(result, f, indent=2)
except OSError as e:
logger.error("Failed to write cache file: %s", e)
return content_hash

# Create and save entry
# Create entry and update metadata with lock
entry = CacheEntry(
content_hash=content_hash,
created_at=now,
Expand All @@ -277,8 +326,10 @@ def put(
operation=operation,
metadata=metadata or {},
)
self._metadata[cache_key] = entry
self._save_metadata()

with self._lock:
self._metadata[cache_key] = entry
self._save_metadata()

logger.info(
"Cached result: %s (expires %s)",
Expand All @@ -290,6 +341,19 @@ def put(
def _remove_entry(self, cache_key: str) -> None:
"""Remove a cache entry and its associated file.

Thread-safe: Acquires the lock before modifying metadata.

Args:
cache_key: Cache key to remove.
"""
with self._lock:
self._remove_entry_unlocked(cache_key)

def _remove_entry_unlocked(self, cache_key: str) -> None:
"""Remove a cache entry without acquiring the lock.

Must be called while holding self._lock.

Args:
cache_key: Cache key to remove.
"""
Expand All @@ -304,15 +368,18 @@ def _remove_entry(self, cache_key: str) -> None:
def cleanup_expired(self) -> int:
"""Remove all expired cache entries.

Thread-safe: Acquires the lock for the entire cleanup operation.

Returns:
Number of entries removed.
"""
self._ensure_initialized()

expired_keys = [key for key, entry in self._metadata.items() if entry.is_expired()]
with self._lock:
expired_keys = [key for key, entry in self._metadata.items() if entry.is_expired()]

for key in expired_keys:
self._remove_entry(key)
for key in expired_keys:
self._remove_entry_unlocked(key)

if expired_keys:
logger.info("Cleaned up %d expired cache entries", len(expired_keys))
Expand All @@ -322,45 +389,55 @@ def cleanup_expired(self) -> int:
def clear(self) -> int:
"""Clear all cache entries.

Thread-safe: Acquires the lock for the entire clear operation.

Returns:
Number of entries removed.
"""
self._ensure_initialized()

count = len(self._metadata)
for key in list(self._metadata.keys()):
self._remove_entry(key)
with self._lock:
count = len(self._metadata)
for key in list(self._metadata.keys()):
self._remove_entry_unlocked(key)

logger.info("Cleared %d cache entries", count)
return count

def get_stats(self) -> dict[str, Any]:
"""Get cache statistics.

Thread-safe: Acquires the lock to read metadata.

Returns:
Dictionary containing cache statistics.
"""
self._ensure_initialized()

expired_count = sum(1 for e in self._metadata.values() if e.is_expired())
operations = {}
for entry in self._metadata.values():
operations[entry.operation] = operations.get(entry.operation, 0) + 1
with self._lock:
expired_count = sum(1 for e in self._metadata.values() if e.is_expired())
operations: dict[str, int] = {}
for entry in self._metadata.values():
operations[entry.operation] = operations.get(entry.operation, 0) + 1

# Collect file paths while holding lock
file_paths = [Path(entry.file_path) for entry in self._metadata.values()]
total_entries = len(self._metadata)

# Calculate total cache size
# Calculate total cache size outside lock (file I/O is slow)
total_size = 0
for entry in self._metadata.values():
for file_path in file_paths:
try:
total_size += Path(entry.file_path).stat().st_size
total_size += file_path.stat().st_size
except OSError:
pass

return {
"cache_dir": str(self._cache_dir),
"ttl_days": self._ttl_days,
"total_entries": len(self._metadata),
"total_entries": total_entries,
"expired_entries": expired_count,
"valid_entries": len(self._metadata) - expired_count,
"valid_entries": total_entries - expired_count,
"operations": operations,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
Expand Down
62 changes: 62 additions & 0 deletions src/document_analysis_mcp/tools/ocr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,44 @@
DEFAULT_LANGUAGE = "eng" # Tesseract language code
MIN_TEXT_CHARS = 100 # Minimum chars before OCR is considered necessary

# Valid Tesseract language codes (common languages)
# Full list at: https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html
VALID_LANGUAGES = frozenset(
{
# Western European
"eng", # English
"fra", # French
"deu", # German
"spa", # Spanish
"ita", # Italian
"por", # Portuguese
"nld", # Dutch
# Eastern European
"rus", # Russian
"pol", # Polish
"ces", # Czech
"ukr", # Ukrainian
# Asian
"chi_sim", # Chinese Simplified
"chi_tra", # Chinese Traditional
"jpn", # Japanese
"kor", # Korean
"vie", # Vietnamese
"tha", # Thai
# Middle Eastern
"ara", # Arabic
"heb", # Hebrew
"tur", # Turkish
# South Asian
"hin", # Hindi
# Nordic
"dan", # Danish
"fin", # Finnish
"nor", # Norwegian
"swe", # Swedish
}
)


def _is_tesseract_available() -> bool:
"""Check if Tesseract OCR is available.
Expand All @@ -49,6 +87,27 @@ def _is_tesseract_available() -> bool:
return False


def _validate_language(language: str) -> str:
"""Validate and normalize the Tesseract language code.

Args:
language: Language code to validate.

Returns:
Valid language code (original if valid, DEFAULT_LANGUAGE if not).
"""
if language in VALID_LANGUAGES:
return language

logger.warning(
"Invalid language code '%s', using default '%s'. Valid languages: %s",
language,
DEFAULT_LANGUAGE,
", ".join(sorted(VALID_LANGUAGES)),
)
return DEFAULT_LANGUAGE


def _page_to_image(page: Page, dpi: int = DEFAULT_DPI) -> Image.Image:
"""Convert a pdfplumber page to a PIL Image.

Expand Down Expand Up @@ -156,6 +215,9 @@ def pdf_ocr(
# Validate file size
_validate_base64_size(pdf_content, max_file_size_mb)

# Validate and normalize language parameter
language = _validate_language(language)

# Check Tesseract availability
if not _is_tesseract_available():
return {
Expand Down
Loading