From ad5f8d21f51d180838c1ece64882dab1fb22fad2 Mon Sep 17 00:00:00 2001 From: Krisoye Smith Date: Sun, 1 Feb 2026 23:47:38 -0500 Subject: [PATCH] feat: Phase 4 KB Integration - pdf_kb_ingest tool Implements one-shot PDF ingestion for knowledge bank integration: - Smart text chunking respecting paragraph/sentence boundaries - Page number tracking for source references - Document classification (LLM with heuristic fallback) - Caching support with parameter-specific keys - Comprehensive input validation and error handling Adds 43 new tests for kb_ingest functionality (266 total tests passing). Resolves Epic #21 Phase 4: KB Integration Refs: krisoye/project-tracker#96 Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 2 +- src/document_analysis_mcp/server.py | 45 +- src/document_analysis_mcp/tools/__init__.py | 7 + src/document_analysis_mcp/tools/kb_ingest.py | 837 +++++++++++++++++++ tests/test_extract.py | 2 +- tests/test_kb_ingest.py | 620 ++++++++++++++ 6 files changed, 1510 insertions(+), 3 deletions(-) create mode 100644 src/document_analysis_mcp/tools/kb_ingest.py create mode 100644 tests/test_kb_ingest.py diff --git a/pyproject.toml b/pyproject.toml index cd5f41b..f694954 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "document-analysis-mcp" -version = "0.3.0" +version = "0.4.0" description = "General-purpose Document Analysis MCP server for PDF processing" readme = "README.md" requires-python = ">=3.10" diff --git a/src/document_analysis_mcp/server.py b/src/document_analysis_mcp/server.py index 936bd78..d8ac228 100644 --- a/src/document_analysis_mcp/server.py +++ b/src/document_analysis_mcp/server.py @@ -17,12 +17,13 @@ from document_analysis_mcp.config import get_settings from document_analysis_mcp.tools.classify import pdf_classify from document_analysis_mcp.tools.extract import pdf_extract_full +from document_analysis_mcp.tools.kb_ingest import pdf_kb_ingest from document_analysis_mcp.tools.ocr import pdf_ocr from document_analysis_mcp.tools.structure import pdf_extract_structure from document_analysis_mcp.tracking import get_tracker # Server version - should match pyproject.toml -__version__ = "0.3.0" +__version__ = "0.4.0" # Track server startup time for uptime calculation _startup_time: datetime | None = None @@ -260,6 +261,48 @@ def pdf_extract_structure_tool( ) +@mcp.tool() +def pdf_kb_ingest_tool( + pdf_content: str, + title: str | None = None, + source_url: str | None = None, + content_type: str | None = None, + max_chunk_size: int = 4000, + max_file_size_mb: float = 50.0, + use_cache: bool = True, +) -> dict[str, Any]: + """One-shot PDF ingestion for knowledge bank. + + Combines extraction, classification, and chunking into a single operation + optimized for batch KB processing. The output format is designed to be + directly consumable by knowledge-bank-tools. + + Args: + pdf_content: Base64-encoded PDF content. + title: Document title. If not provided, will be extracted from PDF. + source_url: Source URL for the document. + content_type: Document type (research_paper, technical_doc, financial_report, + legal_doc, manual, other). If not provided, will be auto-classified. + max_chunk_size: Maximum characters per chunk (default 4000). + max_file_size_mb: Maximum allowed file size in megabytes. + use_cache: Whether to use caching for previously processed documents. + + Returns: + Dictionary containing success, title, content_type, chunks (with text, + page_numbers, word_count), metadata (page_count, has_tables, has_toc), + and processing_stats. + """ + return pdf_kb_ingest( + pdf_content=pdf_content, + title=title, + source_url=source_url, + content_type=content_type, + max_chunk_size=max_chunk_size, + max_file_size_mb=max_file_size_mb, + use_cache=use_cache, + ) + + @mcp.tool() def cache_stats() -> dict[str, Any]: """Get cache statistics and usage information. diff --git a/src/document_analysis_mcp/tools/__init__.py b/src/document_analysis_mcp/tools/__init__.py index 8ed88a1..e0d6aef 100644 --- a/src/document_analysis_mcp/tools/__init__.py +++ b/src/document_analysis_mcp/tools/__init__.py @@ -9,6 +9,10 @@ PDF_EXTRACT_FULL_METADATA, pdf_extract_full, ) +from document_analysis_mcp.tools.kb_ingest import ( + PDF_KB_INGEST_METADATA, + pdf_kb_ingest, +) __all__ = [ # Extract tool @@ -18,4 +22,7 @@ "pdf_classify", "PDF_CLASSIFY_METADATA", "DocumentType", + # KB Ingest tool + "pdf_kb_ingest", + "PDF_KB_INGEST_METADATA", ] diff --git a/src/document_analysis_mcp/tools/kb_ingest.py b/src/document_analysis_mcp/tools/kb_ingest.py new file mode 100644 index 0000000..8a6871e --- /dev/null +++ b/src/document_analysis_mcp/tools/kb_ingest.py @@ -0,0 +1,837 @@ +"""PDF Knowledge Bank ingestion tool. + +This module provides the pdf_kb_ingest tool which performs one-shot PDF processing +for knowledge bank ingestion. It combines extraction, classification, and chunking +into a single optimized operation. + +The output format is designed to be directly consumable by knowledge-bank-tools +for batch ingestion into the vector database. +""" + +import logging +import re +import time +from dataclasses import dataclass, field +from typing import Any + +from document_analysis_mcp.cache import get_cache +from document_analysis_mcp.config import get_settings +from document_analysis_mcp.processors.chunker import estimate_tokens +from document_analysis_mcp.processors.text_extractor import TextExtractor +from document_analysis_mcp.tools.classify import ( + CLASSIFICATION_SAMPLE_CHARS, + _get_classification_prompt, + _parse_classification_response, +) +from document_analysis_mcp.tracking import get_tracker + +logger = logging.getLogger(__name__) + +# Default constants +DEFAULT_MAX_FILE_SIZE_MB = 50.0 +DEFAULT_MAX_CHUNK_SIZE = 4000 # Characters per chunk (KB-friendly size) +MIN_CHUNK_SIZE = 500 # Minimum characters for a valid chunk + + +@dataclass +class KBChunk: + """A chunk of document content for KB ingestion. + + Attributes: + text: The chunk text content. + page_numbers: List of page numbers this chunk spans. + word_count: Number of words in the chunk. + char_count: Number of characters in the chunk. + chunk_index: Index of this chunk (0-based). + """ + + text: str + page_numbers: list[int] + word_count: int + char_count: int + chunk_index: int + + +@dataclass +class KBIngestResult: + """Result of KB ingestion preparation. + + Attributes: + success: Whether the ingestion preparation succeeded. + title: Document title (extracted or provided). + content_type: Classified document type. + chunks: List of content chunks for ingestion. + metadata: Document metadata. + processing_stats: Processing statistics. + error: Error message if processing failed. + """ + + success: bool + title: str + content_type: str + chunks: list[KBChunk] = field(default_factory=list) + metadata: dict[str, Any] = field(default_factory=dict) + processing_stats: dict[str, Any] = field(default_factory=dict) + error: str | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + "success": self.success, + "title": self.title, + "content_type": self.content_type, + "chunks": [ + { + "text": chunk.text, + "page_numbers": chunk.page_numbers, + "word_count": chunk.word_count, + "char_count": chunk.char_count, + "chunk_index": chunk.chunk_index, + } + for chunk in self.chunks + ], + "metadata": self.metadata, + "processing_stats": self.processing_stats, + "error": self.error, + } + + +def _validate_base64_size(pdf_content: str, max_file_size_mb: float) -> None: + """Validate base64-encoded content size before decoding. + + Args: + pdf_content: Base64-encoded PDF content. + max_file_size_mb: Maximum allowed file size in megabytes. + + Raises: + ValueError: If the encoded content exceeds the size limit. + """ + encoded_size_bytes = len(pdf_content) + estimated_decoded_size_mb = (encoded_size_bytes * 3 / 4) / (1024 * 1024) + + if estimated_decoded_size_mb > max_file_size_mb: + raise ValueError( + f"PDF content exceeds maximum size limit. " + f"Estimated size: {estimated_decoded_size_mb:.1f}MB, " + f"maximum allowed: {max_file_size_mb}MB" + ) + + +def _extract_title_from_text(text: str, metadata_title: str | None) -> str: + """Extract a meaningful title from the document. + + Attempts to find a title from: + 1. PDF metadata (if provided) + 2. First heading-like line in text + 3. First meaningful sentence + + Args: + text: Document text content. + metadata_title: Title from PDF metadata (may be None). + + Returns: + Best guess at document title. + """ + # Use metadata title if available and meaningful + if metadata_title and len(metadata_title.strip()) > 3: + return metadata_title.strip()[:200] + + # Try to find a heading in the first few lines + lines = text[:2000].split("\n") + for line in lines[:10]: + line = line.strip() + # Skip empty lines or very short lines + if len(line) < 10: + continue + # Skip lines that look like metadata (dates, page numbers, etc.) + if re.match(r"^[\d\-/.:]+$", line): + continue + if re.match(r"^(Page|Chapter|Section)\s+\d+", line, re.IGNORECASE): + continue + # Found a potential title + return line[:200] + + # Fallback to first meaningful text + return "Untitled Document" + + +def _extract_page_numbers_from_text(text: str) -> dict[int, tuple[int, int]]: + """Extract page boundaries from page-delimited text. + + The TextExtractor produces text with [Page N] markers. + + Args: + text: Text with [Page N] markers. + + Returns: + Dictionary mapping page numbers to (start, end) character positions. + """ + page_pattern = re.compile(r"\[Page (\d+)\]") + pages: dict[int, tuple[int, int]] = {} + + matches = list(page_pattern.finditer(text)) + for i, match in enumerate(matches): + page_num = int(match.group(1)) + start = match.end() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + pages[page_num] = (start, end) + + return pages + + +def _find_page_for_position(position: int, page_boundaries: dict[int, tuple[int, int]]) -> int: + """Find which page a character position belongs to. + + Args: + position: Character position in the text. + page_boundaries: Dictionary from _extract_page_numbers_from_text. + + Returns: + Page number (1-indexed) or 1 if not found. + """ + for page_num, (start, end) in page_boundaries.items(): + if start <= position < end: + return page_num + return 1 + + +def _chunk_text_for_kb( + text: str, + max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE, +) -> list[KBChunk]: + """Split text into chunks suitable for KB ingestion. + + Creates chunks that: + - Respect paragraph and sentence boundaries where possible + - Track which pages each chunk spans + - Stay within the specified size limit + + Args: + text: Full document text (with [Page N] markers). + max_chunk_size: Maximum characters per chunk. + + Returns: + List of KBChunk objects. + """ + if not text or not text.strip(): + return [] + + # Extract page boundaries for tracking + page_boundaries = _extract_page_numbers_from_text(text) + + # Remove page markers for cleaner chunking, but keep positions + clean_text = re.sub(r"\[Page \d+\]\n*", "", text) + + # If entire text fits in one chunk, return it + if len(clean_text) <= max_chunk_size: + pages = list(page_boundaries.keys()) if page_boundaries else [1] + return [ + KBChunk( + text=clean_text.strip(), + page_numbers=pages, + word_count=len(clean_text.split()), + char_count=len(clean_text.strip()), + chunk_index=0, + ) + ] + + # Split into paragraphs first + paragraphs = re.split(r"\n\s*\n", clean_text) + + chunks: list[KBChunk] = [] + current_chunk_text = "" + current_chunk_start = 0 + chunk_index = 0 + + for paragraph in paragraphs: + paragraph = paragraph.strip() + if not paragraph: + continue + + # If adding this paragraph would exceed the limit + if len(current_chunk_text) + len(paragraph) + 2 > max_chunk_size: + # Save the current chunk if it has content + if current_chunk_text.strip() and len(current_chunk_text.strip()) >= MIN_CHUNK_SIZE: + chunk_end = current_chunk_start + len(current_chunk_text) + pages = _get_pages_for_range(current_chunk_start, chunk_end, page_boundaries, text) + chunks.append( + KBChunk( + text=current_chunk_text.strip(), + page_numbers=pages, + word_count=len(current_chunk_text.split()), + char_count=len(current_chunk_text.strip()), + chunk_index=chunk_index, + ) + ) + chunk_index += 1 + + # Start a new chunk + current_chunk_start = current_chunk_start + len(current_chunk_text) + current_chunk_text = "" + + # If the paragraph itself is larger than max_chunk_size, split it + if len(paragraph) > max_chunk_size: + sub_chunks = _split_large_paragraph( + paragraph, + max_chunk_size, + chunk_index, + current_chunk_start, + page_boundaries, + text, + ) + chunks.extend(sub_chunks) + chunk_index += len(sub_chunks) + current_chunk_start += len(paragraph) + 2 + continue + + # Add paragraph to current chunk + if current_chunk_text: + current_chunk_text += "\n\n" + paragraph + else: + current_chunk_text = paragraph + + # Save the final chunk + if current_chunk_text.strip() and len(current_chunk_text.strip()) >= MIN_CHUNK_SIZE: + chunk_end = current_chunk_start + len(current_chunk_text) + pages = _get_pages_for_range(current_chunk_start, chunk_end, page_boundaries, text) + chunks.append( + KBChunk( + text=current_chunk_text.strip(), + page_numbers=pages, + word_count=len(current_chunk_text.split()), + char_count=len(current_chunk_text.strip()), + chunk_index=chunk_index, + ) + ) + + return chunks + + +def _get_pages_for_range( + start: int, end: int, page_boundaries: dict[int, tuple[int, int]], original_text: str +) -> list[int]: + """Get the page numbers that a text range spans. + + Args: + start: Start position in clean text. + end: End position in clean text. + page_boundaries: Page boundary map from original text. + original_text: Original text with page markers. + + Returns: + List of page numbers (may be empty if no page markers found). + """ + if not page_boundaries: + return [1] + + # Simple heuristic: map position to pages proportionally + pages = set() + for page_num, (page_start, page_end) in page_boundaries.items(): + # Check for overlap + if page_start < end and page_end > start: + pages.add(page_num) + + return sorted(pages) if pages else [1] + + +def _split_large_paragraph( + paragraph: str, + max_chunk_size: int, + start_chunk_index: int, + text_start: int, + page_boundaries: dict[int, tuple[int, int]], + original_text: str, +) -> list[KBChunk]: + """Split a large paragraph into multiple chunks at sentence boundaries. + + Args: + paragraph: The paragraph to split. + max_chunk_size: Maximum characters per chunk. + start_chunk_index: Starting index for chunk numbering. + text_start: Position in original text where this paragraph starts. + page_boundaries: Page boundary map. + original_text: Original text with page markers. + + Returns: + List of KBChunk objects. + """ + chunks = [] + sentences = re.split(r"(?<=[.!?])\s+", paragraph) + + current_text = "" + chunk_index = start_chunk_index + current_start = text_start + + for sentence in sentences: + if len(current_text) + len(sentence) + 1 > max_chunk_size: + if current_text.strip(): + chunk_end = current_start + len(current_text) + pages = _get_pages_for_range( + current_start, chunk_end, page_boundaries, original_text + ) + chunks.append( + KBChunk( + text=current_text.strip(), + page_numbers=pages, + word_count=len(current_text.split()), + char_count=len(current_text.strip()), + chunk_index=chunk_index, + ) + ) + chunk_index += 1 + current_start += len(current_text) + 1 + + current_text = sentence + else: + current_text = current_text + " " + sentence if current_text else sentence + + # Add remaining text + if current_text.strip() and len(current_text.strip()) >= MIN_CHUNK_SIZE: + chunk_end = current_start + len(current_text) + pages = _get_pages_for_range(current_start, chunk_end, page_boundaries, original_text) + chunks.append( + KBChunk( + text=current_text.strip(), + page_numbers=pages, + word_count=len(current_text.split()), + char_count=len(current_text.strip()), + chunk_index=chunk_index, + ) + ) + + return chunks + + +def _classify_document_fast( + sample_text: str, +) -> tuple[str, str, float]: + """Classify document type using LLM (if available) or heuristics. + + This is a simplified version of pdf_classify that avoids the full + tool overhead for one-shot ingestion. + + Args: + sample_text: Sample text from the document (first ~8000 chars). + + Returns: + Tuple of (document_type, confidence, reason). + """ + settings = get_settings() + + # If no API key, use heuristic classification + if not settings.has_api_key: + return _classify_heuristic(sample_text) + + # Use LLM for classification + try: + from anthropic import APIConnectionError, APIStatusError + + from document_analysis_mcp.processors.llm import LLMProcessor + + llm = LLMProcessor() + prompt = _get_classification_prompt() + + response = llm.analyze_chunk( + chunk=sample_text, + prompt=prompt, + model=settings.classification_model, + max_tokens=150, + system_prompt="You are a document classification expert. Classify documents accurately and concisely.", + ) + + doc_type, confidence, reason = _parse_classification_response(response.content) + return doc_type.value, confidence, reason + + except (APIConnectionError, APIStatusError, ValueError) as e: + logger.warning("LLM classification failed, using heuristic: %s", e) + return _classify_heuristic(sample_text) + + +def _classify_heuristic(text: str) -> tuple[str, str, str]: + """Classify document using keyword heuristics. + + Args: + text: Sample text from document. + + Returns: + Tuple of (document_type, confidence, reason). + """ + text_lower = text.lower() + + # Check for research paper indicators + research_keywords = [ + "abstract", + "methodology", + "conclusion", + "references", + "et al", + "hypothesis", + ] + research_count = sum(1 for kw in research_keywords if kw in text_lower) + if research_count >= 3: + return "research_paper", "medium", "Contains academic paper structure indicators" + + # Check for technical documentation + tech_keywords = [ + "api", + "endpoint", + "parameter", + "function", + "method", + "documentation", + "specification", + ] + tech_count = sum(1 for kw in tech_keywords if kw in text_lower) + if tech_count >= 3: + return "technical_doc", "medium", "Contains technical documentation indicators" + + # Check for financial documents + finance_keywords = [ + "revenue", + "profit", + "fiscal", + "quarter", + "earnings", + "balance sheet", + "income statement", + ] + finance_count = sum(1 for kw in finance_keywords if kw in text_lower) + if finance_count >= 2: + return "financial_report", "medium", "Contains financial report indicators" + + # Check for legal documents + legal_keywords = [ + "agreement", + "hereby", + "parties", + "terms and conditions", + "whereas", + "liability", + ] + legal_count = sum(1 for kw in legal_keywords if kw in text_lower) + if legal_count >= 2: + return "legal_doc", "medium", "Contains legal document indicators" + + # Check for manuals/guides + manual_keywords = ["step", "instruction", "how to", "guide", "tutorial", "procedure", "follow"] + manual_count = sum(1 for kw in manual_keywords if kw in text_lower) + if manual_count >= 2: + return "manual", "medium", "Contains instructional content indicators" + + # Default to other + return "other", "low", "No strong indicators for specific document type" + + +def _detect_tables(text: str) -> bool: + """Detect if document likely contains tables. + + Args: + text: Document text. + + Returns: + True if tables are detected. + """ + # Look for markdown table patterns or aligned data + table_patterns = [ + r"\|.*\|", # Markdown table + r"^\s*\S+\s{2,}\S+\s{2,}\S+", # Aligned columns + ] + for pattern in table_patterns: + if re.search(pattern, text, re.MULTILINE): + return True + return False + + +def _detect_toc(text: str) -> bool: + """Detect if document has a table of contents. + + Args: + text: Document text. + + Returns: + True if TOC is detected. + """ + # Look for "Table of Contents" or "Contents" header + if re.search(r"(Table of\s+)?Contents", text[:5000], re.IGNORECASE): + # Look for TOC-like patterns (title ... page number) + if re.search(r".+\.{3,}\s*\d+", text[:5000]): + return True + return False + + +def pdf_kb_ingest( + pdf_content: str, + title: str | None = None, + source_url: str | None = None, + content_type: str | None = None, + max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE, + max_file_size_mb: float = DEFAULT_MAX_FILE_SIZE_MB, + use_cache: bool = True, +) -> dict[str, Any]: + """One-shot PDF ingestion for knowledge bank. + + Combines extraction, classification, and chunking into a single operation + optimized for batch KB processing. The output format is designed to be + directly consumable by knowledge-bank-tools. + + Args: + pdf_content: Base64-encoded PDF content. + title: Document title. If None, will be extracted from PDF. + source_url: Source URL for the document. If None, uses a placeholder. + content_type: Document type. If None, will be auto-classified. + max_chunk_size: Maximum characters per chunk (default 4000). + max_file_size_mb: Maximum allowed file size in megabytes. + use_cache: Whether to use caching for previously processed documents. + + Returns: + Dictionary containing: + - success: Whether processing succeeded + - title: Document title + - content_type: Classified document type + - chunks: List of chunks, each with: + - text: Chunk content + - page_numbers: Pages this chunk spans + - word_count: Words in chunk + - char_count: Characters in chunk + - chunk_index: Chunk index (0-based) + - metadata: Document metadata including: + - page_count: Total pages + - word_count: Total words + - has_tables: Whether tables were detected + - has_toc: Whether TOC was detected + - classification_confidence: Confidence of auto-classification + - classification_reason: Reason for classification + - processing_stats: Processing statistics + - error: Error message if processing failed + + Raises: + ValueError: If pdf_content is empty or invalid. + """ + start_time = time.perf_counter() + + if not pdf_content: + raise ValueError("pdf_content cannot be empty") + + # Validate file size + _validate_base64_size(pdf_content, max_file_size_mb) + + # Check cache + cache = get_cache() + if use_cache: + params_str = f"kb_ingest:chunk:{max_chunk_size}:type:{content_type}" + params_hash = cache.compute_hash(params_str)[:16] + + cached_result = cache.get(pdf_content, "kb_ingest", params_hash=params_hash) + if cached_result: + cached_result["processing_stats"]["cache_hit"] = True + return cached_result + + logger.info( + "Starting KB ingestion: max_chunk_size=%d, content_type=%s", + max_chunk_size, + content_type or "auto", + ) + + # Extract text from PDF + extractor = TextExtractor() + extraction_result = extractor.extract_from_base64(pdf_content) + + if not extraction_result.success: + logger.error("PDF extraction failed: %s", extraction_result.error_message) + return KBIngestResult( + success=False, + title=title or "Unknown", + content_type=content_type or "unknown", + error=f"Failed to extract PDF text: {extraction_result.error_message}", + processing_stats={ + "processing_time_ms": (time.perf_counter() - start_time) * 1000, + "cache_hit": False, + }, + ).to_dict() + + full_text = extraction_result.combined_text + + if not full_text or not full_text.strip(): + logger.warning("PDF has no extractable text") + return KBIngestResult( + success=False, + title=title or "Unknown", + content_type=content_type or "unknown", + error="PDF contains no extractable text", + processing_stats={ + "processing_time_ms": (time.perf_counter() - start_time) * 1000, + "cache_hit": False, + }, + ).to_dict() + + # Extract or use provided title + doc_title = title or _extract_title_from_text(full_text, extraction_result.metadata.title) + + # Classify document if not provided + classification_confidence = "high" + classification_reason = "Provided by user" + + if content_type is None: + sample_text = full_text[:CLASSIFICATION_SAMPLE_CHARS] + content_type, classification_confidence, classification_reason = _classify_document_fast( + sample_text + ) + logger.info( + "Auto-classified document: type=%s, confidence=%s", + content_type, + classification_confidence, + ) + + # Chunk the text for KB ingestion + chunks = _chunk_text_for_kb(full_text, max_chunk_size) + + if not chunks: + logger.warning("No chunks generated from text") + return KBIngestResult( + success=False, + title=doc_title, + content_type=content_type, + error="No content chunks could be generated", + processing_stats={ + "processing_time_ms": (time.perf_counter() - start_time) * 1000, + "cache_hit": False, + }, + ).to_dict() + + # Detect document features + has_tables = _detect_tables(full_text) or extraction_result.processing_stats.table_count > 0 + has_toc = _detect_toc(full_text) + + processing_time_ms = (time.perf_counter() - start_time) * 1000 + + # Build result + result = KBIngestResult( + success=True, + title=doc_title, + content_type=content_type, + chunks=chunks, + metadata={ + "page_count": extraction_result.processing_stats.total_pages, + "word_count": extraction_result.processing_stats.word_count, + "has_tables": has_tables, + "has_toc": has_toc, + "classification_confidence": classification_confidence, + "classification_reason": classification_reason, + "source_url": source_url, + "extraction_method": extraction_result.processing_stats.extraction_method, + }, + processing_stats={ + "processing_time_ms": round(processing_time_ms, 2), + "pages_processed": extraction_result.processing_stats.pages_processed, + "total_pages": extraction_result.processing_stats.total_pages, + "chunk_count": len(chunks), + "total_chars": sum(c.char_count for c in chunks), + "total_words": sum(c.word_count for c in chunks), + "avg_chunk_size": round(sum(c.char_count for c in chunks) / len(chunks)), + "estimated_tokens": estimate_tokens(full_text), + "cache_hit": False, + }, + ) + + result_dict = result.to_dict() + + logger.info( + "KB ingestion complete: title='%s', type=%s, chunks=%d, words=%d, %.0fms", + doc_title[:50], + content_type, + len(chunks), + extraction_result.processing_stats.word_count, + processing_time_ms, + ) + + # Cache the result + if use_cache: + params_str = f"kb_ingest:chunk:{max_chunk_size}:type:{content_type}" + params_hash = cache.compute_hash(params_str)[:16] + cache.put( + pdf_content, + "kb_ingest", + result_dict, + params_hash=params_hash, + metadata={ + "title": doc_title, + "content_type": content_type, + "chunk_count": len(chunks), + }, + ) + + # Track usage + tracker = get_tracker() + tracker.record( + operation="kb_ingest", + model="pdfplumber", # No LLM tokens for basic extraction + input_tokens=0, + output_tokens=0, + processing_time_ms=processing_time_ms, + document_hash=cache.compute_hash(pdf_content), + success=True, + metadata={ + "title": doc_title, + "content_type": content_type, + "chunk_count": len(chunks), + "word_count": extraction_result.processing_stats.word_count, + }, + ) + + return result_dict + + +# Tool metadata for MCP registration +PDF_KB_INGEST_METADATA = { + "name": "pdf_kb_ingest", + "description": ( + "One-shot PDF ingestion for knowledge bank. " + "Combines extraction, classification, and chunking into a single operation. " + "Returns data formatted for knowledge-bank-tools batch ingestion." + ), + "parameters": { + "type": "object", + "properties": { + "pdf_content": { + "type": "string", + "description": "Base64-encoded PDF content", + }, + "title": { + "type": "string", + "description": "Document title. If not provided, will be extracted from PDF.", + }, + "source_url": { + "type": "string", + "description": "Source URL for the document.", + }, + "content_type": { + "type": "string", + "enum": [ + "research_paper", + "technical_doc", + "financial_report", + "legal_doc", + "manual", + "other", + ], + "description": "Document type. If not provided, will be auto-classified.", + }, + "max_chunk_size": { + "type": "integer", + "default": 4000, + "description": "Maximum characters per chunk (default 4000).", + }, + "max_file_size_mb": { + "type": "number", + "default": 50, + "description": "Maximum allowed file size in megabytes.", + }, + "use_cache": { + "type": "boolean", + "default": True, + "description": "Whether to use caching for previously processed documents.", + }, + }, + "required": ["pdf_content"], + }, +} diff --git a/tests/test_extract.py b/tests/test_extract.py index 39dbca3..50bcd62 100644 --- a/tests/test_extract.py +++ b/tests/test_extract.py @@ -317,5 +317,5 @@ def test_health_check_tool(self): result = health_check.fn() assert result["status"] == "healthy" - assert result["version"] == "0.3.0" + assert result["version"] == "0.4.0" assert "api_key_configured" in result diff --git a/tests/test_kb_ingest.py b/tests/test_kb_ingest.py new file mode 100644 index 0000000..50f70d7 --- /dev/null +++ b/tests/test_kb_ingest.py @@ -0,0 +1,620 @@ +"""Tests for PDF Knowledge Bank ingestion tool. + +This module tests the pdf_kb_ingest tool which performs one-shot +PDF processing for knowledge bank batch ingestion. +""" + +import base64 +import io +from unittest.mock import MagicMock, patch + +import pytest +from reportlab.lib.pagesizes import letter +from reportlab.pdfgen import canvas + +from document_analysis_mcp.tools.kb_ingest import ( + KBChunk, + KBIngestResult, + _chunk_text_for_kb, + _classify_heuristic, + _detect_tables, + _detect_toc, + _extract_page_numbers_from_text, + _extract_title_from_text, + _validate_base64_size, + pdf_kb_ingest, +) + + +def create_simple_pdf(text: str = "Hello World", num_pages: int = 1) -> bytes: + """Create a simple PDF with the given text. + + Args: + text: Text content for each page. + num_pages: Number of pages to create. + + Returns: + PDF file as bytes. + """ + buffer = io.BytesIO() + c = canvas.Canvas(buffer, pagesize=letter) + + for page_num in range(num_pages): + c.drawString(100, 700, f"{text} - Page {page_num + 1}") + if page_num < num_pages - 1: + c.showPage() + + c.save() + buffer.seek(0) + return buffer.read() + + +def create_pdf_with_content(pages_content: list[str]) -> bytes: + """Create a PDF with specific content on each page. + + Args: + pages_content: List of text content for each page. + + Returns: + PDF file as bytes. + """ + buffer = io.BytesIO() + c = canvas.Canvas(buffer, pagesize=letter) + + for i, content in enumerate(pages_content): + # Split content into lines and draw + lines = content.split("\n") + y_pos = 700 + for line in lines: + c.drawString(100, y_pos, line) + y_pos -= 15 + if i < len(pages_content) - 1: + c.showPage() + + c.save() + buffer.seek(0) + return buffer.read() + + +class TestKBChunk: + """Tests for KBChunk dataclass.""" + + def test_chunk_creation(self): + """Test creating a KBChunk.""" + chunk = KBChunk( + text="Sample text content", + page_numbers=[1, 2], + word_count=3, + char_count=19, + chunk_index=0, + ) + assert chunk.text == "Sample text content" + assert chunk.page_numbers == [1, 2] + assert chunk.word_count == 3 + assert chunk.char_count == 19 + assert chunk.chunk_index == 0 + + +class TestKBIngestResult: + """Tests for KBIngestResult dataclass.""" + + def test_result_to_dict(self): + """Test converting KBIngestResult to dictionary.""" + chunk = KBChunk( + text="Test chunk", + page_numbers=[1], + word_count=2, + char_count=10, + chunk_index=0, + ) + result = KBIngestResult( + success=True, + title="Test Document", + content_type="research_paper", + chunks=[chunk], + metadata={"page_count": 1}, + processing_stats={"processing_time_ms": 100}, + ) + + result_dict = result.to_dict() + + assert result_dict["success"] is True + assert result_dict["title"] == "Test Document" + assert result_dict["content_type"] == "research_paper" + assert len(result_dict["chunks"]) == 1 + assert result_dict["chunks"][0]["text"] == "Test chunk" + assert result_dict["metadata"]["page_count"] == 1 + + def test_result_with_error(self): + """Test KBIngestResult with error.""" + result = KBIngestResult( + success=False, + title="Unknown", + content_type="unknown", + error="PDF extraction failed", + ) + + result_dict = result.to_dict() + + assert result_dict["success"] is False + assert result_dict["error"] == "PDF extraction failed" + assert result_dict["chunks"] == [] + + +class TestValidateBase64Size: + """Tests for _validate_base64_size function.""" + + def test_small_file_passes(self): + """Test that small files pass validation.""" + small_content = "A" * 1024 + _validate_base64_size(small_content, max_file_size_mb=1.0) + + def test_large_file_rejected(self): + """Test that files exceeding limit are rejected.""" + large_content = "A" * (2 * 1024 * 1024) + with pytest.raises(ValueError, match="exceeds maximum size limit"): + _validate_base64_size(large_content, max_file_size_mb=1.0) + + +class TestExtractTitleFromText: + """Tests for _extract_title_from_text function.""" + + def test_uses_metadata_title(self): + """Test that metadata title is preferred.""" + text = "Some random content" + title = _extract_title_from_text(text, "Metadata Title") + assert title == "Metadata Title" + + def test_extracts_from_first_line(self): + """Test extracting title from first meaningful line.""" + text = "Introduction to Machine Learning\n\nThis is the content..." + title = _extract_title_from_text(text, None) + assert title == "Introduction to Machine Learning" + + def test_skips_short_lines(self): + """Test that short lines are skipped.""" + text = "\n\nABC\n\nActual Document Title Here\n\nContent..." + title = _extract_title_from_text(text, None) + assert title == "Actual Document Title Here" + + def test_fallback_to_untitled(self): + """Test fallback when no title found.""" + text = "" + title = _extract_title_from_text(text, None) + assert title == "Untitled Document" + + +class TestExtractPageNumbers: + """Tests for _extract_page_numbers_from_text function.""" + + def test_extracts_page_markers(self): + """Test extracting page boundaries.""" + text = "[Page 1]\nContent page 1\n\n[Page 2]\nContent page 2" + pages = _extract_page_numbers_from_text(text) + + assert 1 in pages + assert 2 in pages + assert len(pages) == 2 + + def test_no_page_markers(self): + """Test text without page markers.""" + text = "Just plain text without markers" + pages = _extract_page_numbers_from_text(text) + assert len(pages) == 0 + + +class TestChunkTextForKB: + """Tests for _chunk_text_for_kb function.""" + + def test_small_text_single_chunk(self): + """Test that small text produces single chunk.""" + text = "[Page 1]\nThis is a small document with not much content." + chunks = _chunk_text_for_kb(text, max_chunk_size=1000) + + assert len(chunks) == 1 + assert chunks[0].chunk_index == 0 + assert 1 in chunks[0].page_numbers + + def test_large_text_multiple_chunks(self): + """Test that large text produces multiple chunks.""" + # Create text larger than chunk size + paragraph = "This is a paragraph with some content. " * 50 + text = f"[Page 1]\n{paragraph}\n\n{paragraph}" + chunks = _chunk_text_for_kb(text, max_chunk_size=500) + + assert len(chunks) > 1 + # Verify chunk indices are sequential + for i, chunk in enumerate(chunks): + assert chunk.chunk_index == i + + def test_respects_max_chunk_size(self): + """Test that chunks respect maximum size.""" + paragraph = "Test content. " * 200 + text = f"[Page 1]\n{paragraph}" + max_size = 500 + chunks = _chunk_text_for_kb(text, max_chunk_size=max_size) + + for chunk in chunks: + assert chunk.char_count <= max_size + 100 # Allow some tolerance + + def test_empty_text_returns_empty(self): + """Test that empty text returns no chunks.""" + chunks = _chunk_text_for_kb("") + assert chunks == [] + + chunks = _chunk_text_for_kb(" ") + assert chunks == [] + + def test_tracks_word_count(self): + """Test that word count is tracked correctly.""" + text = "[Page 1]\nOne two three four five" + chunks = _chunk_text_for_kb(text, max_chunk_size=1000) + + assert len(chunks) == 1 + assert chunks[0].word_count == 5 + + +class TestClassifyHeuristic: + """Tests for _classify_heuristic function.""" + + def test_research_paper_detection(self): + """Test detection of research papers.""" + text = """ + Abstract: This paper presents a novel methodology for analyzing + data. Our hypothesis suggests improved results. The conclusion + supports our findings. References are listed below. + """ + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "research_paper" + assert "academic" in reason.lower() + + def test_technical_doc_detection(self): + """Test detection of technical documentation.""" + text = """ + API Documentation + Endpoint: /api/v1/users + Parameters: id, name, email + This function returns the user object. + """ + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "technical_doc" + + def test_financial_report_detection(self): + """Test detection of financial reports.""" + text = """ + Q4 2025 Earnings Report + Revenue increased by 15% this quarter. + Profit margins improved significantly. + """ + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "financial_report" + + def test_legal_doc_detection(self): + """Test detection of legal documents.""" + text = """ + This Agreement is entered into by and between the parties. + WHEREAS the parties wish to establish terms and conditions, + liability shall be limited as described herein. + """ + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "legal_doc" + + def test_manual_detection(self): + """Test detection of manuals/guides.""" + text = """ + Step 1: Follow these instructions carefully. + Step 2: This guide will help you understand the procedure. + How to complete the tutorial successfully. + """ + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "manual" + + def test_other_fallback(self): + """Test fallback to 'other' type.""" + text = "Random text without clear indicators of document type." + doc_type, confidence, reason = _classify_heuristic(text) + assert doc_type == "other" + assert confidence == "low" + + +class TestDetectTables: + """Tests for _detect_tables function.""" + + def test_detects_markdown_table(self): + """Test detection of markdown tables.""" + text = """ + | Header 1 | Header 2 | + |----------|----------| + | Value 1 | Value 2 | + """ + assert _detect_tables(text) is True + + def test_no_tables(self): + """Test when no tables present.""" + text = "Just plain text without any tables." + assert _detect_tables(text) is False + + +class TestDetectToc: + """Tests for _detect_toc function.""" + + def test_detects_toc(self): + """Test detection of table of contents.""" + text = """ + Table of Contents + + Introduction ........................ 1 + Chapter 1 ........................... 5 + Chapter 2 ........................... 10 + """ + assert _detect_toc(text) is True + + def test_no_toc(self): + """Test when no TOC present.""" + text = "Just regular content without a table of contents." + assert _detect_toc(text) is False + + +class TestPdfKbIngest: + """Tests for pdf_kb_ingest function.""" + + def test_empty_content_raises(self): + """Test that empty content raises ValueError.""" + with pytest.raises(ValueError, match="cannot be empty"): + pdf_kb_ingest("") + + def test_successful_ingestion(self): + """Test successful PDF ingestion.""" + pdf_bytes = create_simple_pdf("Test Document Content", num_pages=2) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + assert "title" in result + assert "content_type" in result + assert "chunks" in result + assert len(result["chunks"]) >= 1 + assert "metadata" in result + assert "processing_stats" in result + + def test_with_provided_title(self): + """Test ingestion with provided title.""" + pdf_bytes = create_simple_pdf("Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest( + pdf_b64, + title="My Custom Title", + use_cache=False, + ) + + assert result["success"] is True + assert result["title"] == "My Custom Title" + + def test_with_provided_content_type(self): + """Test ingestion with provided content type.""" + pdf_bytes = create_simple_pdf("Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest( + pdf_b64, + content_type="research_paper", + use_cache=False, + ) + + assert result["success"] is True + assert result["content_type"] == "research_paper" + + def test_with_source_url(self): + """Test ingestion with source URL.""" + pdf_bytes = create_simple_pdf("Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest( + pdf_b64, + source_url="https://example.com/doc.pdf", + use_cache=False, + ) + + assert result["success"] is True + assert result["metadata"]["source_url"] == "https://example.com/doc.pdf" + + def test_custom_chunk_size(self): + """Test ingestion with custom chunk size.""" + # Create PDF with more content + pdf_bytes = create_pdf_with_content( + [ + "This is page one with some content. " * 50, + "This is page two with more content. " * 50, + ] + ) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest( + pdf_b64, + max_chunk_size=500, + use_cache=False, + ) + + assert result["success"] is True + # Should have multiple chunks with smaller size + if result["processing_stats"]["total_chars"] > 500: + assert len(result["chunks"]) > 1 + + def test_invalid_base64(self): + """Test handling of invalid base64 input.""" + result = pdf_kb_ingest("not-valid-base64!!!", use_cache=False) + + assert result["success"] is False + assert "error" in result + + def test_processing_stats_included(self): + """Test that processing stats are included.""" + pdf_bytes = create_simple_pdf("Test Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + stats = result["processing_stats"] + assert "processing_time_ms" in stats + assert "pages_processed" in stats + assert "chunk_count" in stats + assert "total_chars" in stats + assert "total_words" in stats + + def test_metadata_extraction(self): + """Test that document metadata is extracted.""" + pdf_bytes = create_simple_pdf("Test Content", num_pages=3) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + metadata = result["metadata"] + assert "page_count" in metadata + assert "word_count" in metadata + assert "has_tables" in metadata + assert "has_toc" in metadata + assert "classification_confidence" in metadata + assert "classification_reason" in metadata + + def test_chunk_structure(self): + """Test that chunks have correct structure.""" + pdf_bytes = create_simple_pdf("Test Content for chunking") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + assert len(result["chunks"]) >= 1 + + for chunk in result["chunks"]: + assert "text" in chunk + assert "page_numbers" in chunk + assert "word_count" in chunk + assert "char_count" in chunk + assert "chunk_index" in chunk + assert isinstance(chunk["page_numbers"], list) + assert chunk["word_count"] >= 0 + assert chunk["char_count"] >= 0 + + @patch("document_analysis_mcp.tools.kb_ingest.get_cache") + def test_caching_behavior(self, mock_get_cache): + """Test that caching is used when enabled.""" + mock_cache = MagicMock() + mock_cache.get.return_value = None + mock_cache.compute_hash.return_value = "test_hash" + mock_get_cache.return_value = mock_cache + + pdf_bytes = create_simple_pdf("Test Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + pdf_kb_ingest(pdf_b64, use_cache=True) + + # Verify cache.put was called + assert mock_cache.put.called + + @patch("document_analysis_mcp.tools.kb_ingest.get_cache") + def test_cache_disabled(self, mock_get_cache): + """Test that cache is not used when disabled.""" + mock_cache = MagicMock() + # compute_hash returns a string, which is used for document_hash in tracker + mock_cache.compute_hash.return_value = "test_hash_123" + mock_get_cache.return_value = mock_cache + + pdf_bytes = create_simple_pdf("Test Content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + pdf_kb_ingest(pdf_b64, use_cache=False) + + # Verify cache.get was not called + assert not mock_cache.get.called + + +class TestPdfKbIngestIntegration: + """Integration-style tests for pdf_kb_ingest.""" + + def test_multipage_document(self): + """Test ingestion of multi-page document.""" + pdf_bytes = create_pdf_with_content( + [ + "Page 1: Introduction to the topic.", + "Page 2: Detailed analysis of the subject.", + "Page 3: Conclusion and recommendations.", + ] + ) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + assert result["metadata"]["page_count"] == 3 + + def test_auto_classification(self): + """Test auto-classification of document type.""" + # Create research-paper-like content + pdf_bytes = create_pdf_with_content( + [ + "Abstract: This study examines the methodology for data analysis.", + "Methodology: We applied statistical hypothesis testing.", + "Conclusion: The results support our hypothesis.", + "References: Smith et al. (2023)", + ] + ) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest(pdf_b64, use_cache=False) + + assert result["success"] is True + # Should be classified based on content + assert result["content_type"] in [ + "research_paper", + "technical_doc", + "other", + ] + + def test_large_document_chunking(self): + """Test that large documents are properly chunked.""" + # Create document with substantial content + pages = [f"Page {i}: " + "Content paragraph. " * 100 for i in range(5)] + pdf_bytes = create_pdf_with_content(pages) + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_kb_ingest( + pdf_b64, + max_chunk_size=1000, + use_cache=False, + ) + + assert result["success"] is True + # Should have multiple chunks + assert len(result["chunks"]) > 1 + # Verify chunks cover all content + total_chars = sum(c["char_count"] for c in result["chunks"]) + assert total_chars > 0 + + +class TestServerToolRegistration: + """Tests for server tool registration.""" + + def test_kb_ingest_tool_registered(self): + """Test that pdf_kb_ingest_tool is registered in server.""" + from document_analysis_mcp.server import pdf_kb_ingest_tool + + assert pdf_kb_ingest_tool is not None + # FastMCP decorates functions, creating FunctionTool objects + assert hasattr(pdf_kb_ingest_tool, "fn") + + def test_kb_ingest_tool_callable(self): + """Test that the tool can be called.""" + from document_analysis_mcp.server import pdf_kb_ingest_tool + + pdf_bytes = create_simple_pdf("Test") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + # Call the underlying function + result = pdf_kb_ingest_tool.fn(pdf_b64, use_cache=False) + + assert result["success"] is True