Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "document-analysis-mcp"
version = "0.2.0"
version = "0.3.0"
description = "General-purpose Document Analysis MCP server for PDF processing"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
384 changes: 383 additions & 1 deletion src/document_analysis_mcp/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,383 @@
"""Hash-based document caching for deduplication."""
"""Hash-based document caching for deduplication.

This module provides a persistent cache for document extraction and analysis results.
It uses content hashing to detect duplicate documents and avoid re-processing.

Key Features:
- SHA-256 content hashing for document deduplication
- TTL-based cache expiration (configurable via CACHE_TTL_DAYS)
- JSON storage for cache metadata and results
- Automatic cleanup of expired entries
"""

import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from document_analysis_mcp.config import get_settings

logger = logging.getLogger(__name__)

# Cache file names
METADATA_FILE = "cache_metadata.json"


@dataclass
class CacheEntry:
"""Represents a cached document entry.

Attributes:
content_hash: SHA-256 hash of the original PDF content.
created_at: UTC timestamp when the entry was created.
expires_at: UTC timestamp when the entry expires.
file_path: Path to the cached result file.
operation: Type of operation (e.g., 'extract', 'analyze', 'structure').
metadata: Additional metadata about the cached content.
"""

content_hash: str
created_at: datetime
expires_at: datetime
file_path: str
operation: str
metadata: dict[str, Any] = field(default_factory=dict)

def is_expired(self) -> bool:
"""Check if this cache entry has expired."""
return datetime.now(timezone.utc) > self.expires_at

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"content_hash": self.content_hash,
"created_at": self.created_at.isoformat(),
"expires_at": self.expires_at.isoformat(),
"file_path": self.file_path,
"operation": self.operation,
"metadata": self.metadata,
}

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CacheEntry":
"""Create a CacheEntry from a dictionary."""
return cls(
content_hash=data["content_hash"],
created_at=datetime.fromisoformat(data["created_at"]),
expires_at=datetime.fromisoformat(data["expires_at"]),
file_path=data["file_path"],
operation=data["operation"],
metadata=data.get("metadata", {}),
)


class DocumentCache:
"""Hash-based cache for document processing results.

Provides persistent caching of extraction and analysis results to avoid
re-processing identical documents. Uses SHA-256 content hashing for
deduplication.

Usage:
cache = DocumentCache()

# Check for cached result
cached = cache.get(pdf_content, "extract")
if cached:
return cached

# Process document...
result = extract_pdf(pdf_content)

# Cache the result
cache.put(pdf_content, "extract", result)
"""

def __init__(self, cache_dir: Path | None = None, ttl_days: int | None = None) -> None:
"""Initialize the document cache.

Args:
cache_dir: Directory for cache storage. Defaults to settings.cache_dir.
ttl_days: Cache TTL in days. Defaults to settings.cache_ttl_days.
"""
settings = get_settings()
self._cache_dir = cache_dir or settings.cache_dir
self._ttl_days = ttl_days or settings.cache_ttl_days
self._metadata: dict[str, CacheEntry] = {}
self._initialized = False

def _ensure_initialized(self) -> None:
"""Ensure cache directory exists and metadata is loaded."""
if self._initialized:
return

# Create cache directory
self._cache_dir.mkdir(parents=True, exist_ok=True)

# Load existing metadata
metadata_path = self._cache_dir / METADATA_FILE
if metadata_path.exists():
try:
with open(metadata_path) as f:
data = json.load(f)
self._metadata = {
key: CacheEntry.from_dict(entry) for key, entry in data.items()
}
logger.debug("Loaded %d cache entries from %s", len(self._metadata), metadata_path)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning("Failed to load cache metadata: %s", e)
self._metadata = {}

self._initialized = True

def _save_metadata(self) -> None:
"""Save cache metadata to disk."""
metadata_path = self._cache_dir / METADATA_FILE
try:
with open(metadata_path, "w") as f:
data = {key: entry.to_dict() for key, entry in self._metadata.items()}
json.dump(data, f, indent=2)
except OSError as e:
logger.error("Failed to save cache metadata: %s", e)

@staticmethod
def compute_hash(content: str | bytes) -> str:
"""Compute SHA-256 hash of content.

Args:
content: String or bytes content to hash.

Returns:
Hex-encoded SHA-256 hash.
"""
if isinstance(content, str):
content = content.encode("utf-8")
return hashlib.sha256(content).hexdigest()

def _make_cache_key(self, content_hash: str, operation: str) -> str:
"""Create a cache key from content hash and operation.

Args:
content_hash: SHA-256 hash of the content.
operation: Operation type (e.g., 'extract', 'analyze').

Returns:
Cache key string.
"""
return f"{content_hash}:{operation}"

def get(
self,
content: str | bytes,
operation: str,
params_hash: str | None = None,
) -> dict[str, Any] | None:
"""Retrieve a cached result for the given content and operation.

Args:
content: PDF content (base64 string or bytes).
operation: Operation type to look up.
params_hash: Optional hash of operation parameters for cache keying.

Returns:
Cached result dictionary, or None if not found or expired.
"""
self._ensure_initialized()

content_hash = self.compute_hash(content)
cache_key = self._make_cache_key(content_hash, operation)
if params_hash:
cache_key = f"{cache_key}:{params_hash}"

entry = self._metadata.get(cache_key)
if entry is None:
logger.debug("Cache miss: %s (not found)", cache_key[:16])
return None

if entry.is_expired():
logger.debug("Cache miss: %s (expired)", cache_key[:16])
self._remove_entry(cache_key)
return None

# Load cached result
result_path = Path(entry.file_path)
if not result_path.exists():
logger.warning("Cache file missing: %s", result_path)
self._remove_entry(cache_key)
return None

try:
with open(result_path) as f:
result = json.load(f)
logger.info("Cache hit: %s", cache_key[:16])
return result
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to load cached result: %s", e)
self._remove_entry(cache_key)
return None

def put(
self,
content: str | bytes,
operation: str,
result: dict[str, Any],
params_hash: str | None = None,
metadata: dict[str, Any] | None = None,
) -> str:
"""Store a result in the cache.

Args:
content: Original PDF content (base64 string or bytes).
operation: Operation type being cached.
result: Result dictionary to cache.
params_hash: Optional hash of operation parameters.
metadata: Optional metadata to store with the entry.

Returns:
The content hash used for caching.
"""
self._ensure_initialized()

content_hash = self.compute_hash(content)
cache_key = self._make_cache_key(content_hash, operation)
if params_hash:
cache_key = f"{cache_key}:{params_hash}"

# Generate file path (include params_hash to avoid collisions)
timestamp = int(time.time())
params_suffix = f"_{params_hash}" if params_hash else ""
result_filename = f"{content_hash[:16]}_{operation}{params_suffix}_{timestamp}.json"
result_path = self._cache_dir / result_filename

# Calculate expiration
now = datetime.now(timezone.utc)
expires_at = datetime.fromtimestamp(
now.timestamp() + (self._ttl_days * 24 * 60 * 60),
tz=timezone.utc,
)

# Save result file
try:
with open(result_path, "w") as f:
json.dump(result, f, indent=2)
except OSError as e:
logger.error("Failed to write cache file: %s", e)
return content_hash

# Create and save entry
entry = CacheEntry(
content_hash=content_hash,
created_at=now,
expires_at=expires_at,
file_path=str(result_path),
operation=operation,
metadata=metadata or {},
)
self._metadata[cache_key] = entry
self._save_metadata()

logger.info(
"Cached result: %s (expires %s)",
cache_key[:16],
expires_at.isoformat(),
)
return content_hash

def _remove_entry(self, cache_key: str) -> None:
"""Remove a cache entry and its associated file.

Args:
cache_key: Cache key to remove.
"""
entry = self._metadata.pop(cache_key, None)
if entry:
try:
Path(entry.file_path).unlink(missing_ok=True)
except OSError as e:
logger.debug("Failed to remove cache file: %s", e)
self._save_metadata()

def cleanup_expired(self) -> int:
"""Remove all expired cache entries.

Returns:
Number of entries removed.
"""
self._ensure_initialized()

expired_keys = [key for key, entry in self._metadata.items() if entry.is_expired()]

for key in expired_keys:
self._remove_entry(key)

if expired_keys:
logger.info("Cleaned up %d expired cache entries", len(expired_keys))

return len(expired_keys)

def clear(self) -> int:
"""Clear all cache entries.

Returns:
Number of entries removed.
"""
self._ensure_initialized()

count = len(self._metadata)
for key in list(self._metadata.keys()):
self._remove_entry(key)

logger.info("Cleared %d cache entries", count)
return count

def get_stats(self) -> dict[str, Any]:
"""Get cache statistics.

Returns:
Dictionary containing cache statistics.
"""
self._ensure_initialized()

expired_count = sum(1 for e in self._metadata.values() if e.is_expired())
operations = {}
for entry in self._metadata.values():
operations[entry.operation] = operations.get(entry.operation, 0) + 1

# Calculate total cache size
total_size = 0
for entry in self._metadata.values():
try:
total_size += Path(entry.file_path).stat().st_size
except OSError:
pass

return {
"cache_dir": str(self._cache_dir),
"ttl_days": self._ttl_days,
"total_entries": len(self._metadata),
"expired_entries": expired_count,
"valid_entries": len(self._metadata) - expired_count,
"operations": operations,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
}


# Module-level cache instance (lazily initialized)
_cache: DocumentCache | None = None


def get_cache() -> DocumentCache:
"""Get the global document cache instance.

Returns:
Singleton DocumentCache instance.
"""
global _cache
if _cache is None:
_cache = DocumentCache()
return _cache
Loading