From 784018398a22e7bb7cbac46e4c20e67b67e52f17 Mon Sep 17 00:00:00 2001 From: Alessio Vertemati Date: Thu, 8 Jan 2026 20:40:34 +0100 Subject: [PATCH 1/6] Initial refactoring to isolate PDF interactions --- src/parxy_cli/commands/attach.py | 493 +++++++++++--------------- src/parxy_cli/commands/pdf.py | 219 +++--------- src/parxy_cli/services/__init__.py | 19 + src/parxy_cli/services/pdf_service.py | 314 ++++++++++++++++ src/parxy_cli/services/pdf_utils.py | 168 +++++++++ tests/commands/test_pdf.py | 7 +- tests/services/test_pdf_service.py | 351 ++++++++++++++++++ tests/services/test_pdf_utils.py | 253 +++++++++++++ 8 files changed, 1350 insertions(+), 474 deletions(-) create mode 100644 src/parxy_cli/services/__init__.py create mode 100644 src/parxy_cli/services/pdf_service.py create mode 100644 src/parxy_cli/services/pdf_utils.py create mode 100644 tests/services/test_pdf_service.py create mode 100644 tests/services/test_pdf_utils.py diff --git a/src/parxy_cli/commands/attach.py b/src/parxy_cli/commands/attach.py index 08d36a0..5b17995 100644 --- a/src/parxy_cli/commands/attach.py +++ b/src/parxy_cli/commands/attach.py @@ -5,9 +5,14 @@ import sys import typer -import pymupdf from parxy_cli.console.console import Console +from parxy_cli.services import ( + PdfService, + format_file_size, + validate_pdf_file, + is_binary_file, +) app = typer.Typer() console = Console() @@ -18,65 +23,6 @@ # ============================================================================ -def format_file_size(size_bytes: int) -> str: - """ - Convert bytes to human-readable format. - - Args: - size_bytes: Size in bytes - - Returns: - Formatted string like "1.5 MB" - """ - size = float(size_bytes) - for unit in ['B', 'KB', 'MB', 'GB']: - if size < 1024.0: - return f'{size:.1f} {unit}' - size /= 1024.0 - return f'{size:.1f} TB' - - -def validate_pdf_file(file_path: str) -> Path: - """ - Validate PDF file exists and has .pdf extension. - - Args: - file_path: Path to PDF file - - Returns: - Path object - - Raises: - FileNotFoundError: if file not found - ValueError: if file is not a PDF - """ - path = Path(file_path) - if not path.is_file(): - console.error(f'Input file not found: {file_path}', panel=True) - raise FileNotFoundError(f'Input file not found: {file_path}') - if path.suffix.lower() != '.pdf': - console.error(f'Input file must be a PDF: {file_path}', panel=True) - raise ValueError(f'Input file must be a PDF: {file_path}') - return path - - -def is_binary_file(content: bytes) -> bool: - """ - Detect if content is binary by checking for null bytes. - - Checks first 8KB of content for null bytes which typically - indicate binary data. - - Args: - content: File content as bytes - - Returns: - True if binary, False if likely text - """ - check_bytes = content[:8192] - return b'\x00' in check_bytes - - def prompt_overwrite(file_path: Path) -> bool: """ Prompt user to confirm overwriting existing file. @@ -132,46 +78,41 @@ def list_attachments( # Validate input file input_path = validate_pdf_file(input_file) - # Open PDF - doc = pymupdf.open(input_path) + # Use service to list attachments + with PdfService(input_path) as pdf: + embed_names = pdf.list_attachments() - # Get list of embedded files - embed_names = doc.embfile_names() + if not embed_names: + console.newline() + console.info(f'No attached files found in {input_path.name}') + return - if not embed_names: + # Display count + count = len(embed_names) + console.newline() + console.info( + f'Found {count} attached file{"s" if count != 1 else ""} in {input_path.name}:' + ) console.newline() - console.info(f'No attached files found in {input_path.name}') - doc.close() - return - - # Display count - count = len(embed_names) - console.newline() - console.info( - f'Found {count} attached file{"s" if count != 1 else ""} in {input_path.name}:' - ) - console.newline() - # Display each embed - for name in embed_names: - if verbose: - # Get metadata - info = doc.embfile_info(name) - size = info.get('size', 0) - size_str = format_file_size(size) - desc = info.get('description', '') + # Display each embed + for name in embed_names: + if verbose: + # Get metadata + info = pdf.get_attachment_info(name) + size = info.get('size', 0) + size_str = format_file_size(size) + desc = info.get('description', '') - # Build output string - output_parts = [f'[faint]⎿ [/faint]{name} ({size_str})'] + # Build output string + output_parts = [f'[faint]⎿ [/faint]{name} ({size_str})'] - if desc: - output_parts.append(f' - {desc}') + if desc: + output_parts.append(f' - {desc}') - console.print(''.join(output_parts)) - else: - console.print(f'[faint]⎿ [/faint]{name}') - - doc.close() + console.print(''.join(output_parts)) + else: + console.print(f'[faint]⎿ [/faint]{name}') except (FileNotFoundError, ValueError): raise typer.Exit(1) @@ -249,113 +190,104 @@ def remove_attachment( # Validate input file input_path = validate_pdf_file(input_file) - # Open PDF - doc = pymupdf.open(input_path) - # Get list of all embeds - all_embeds = doc.embfile_names() + # Use service to handle attachments + with PdfService(input_path) as pdf: + # Get list of all embeds + all_embeds = pdf.list_attachments() - if not all_embeds: - console.newline() - console.error(f'No attached files found in {input_path.name}', panel=True) - doc.close() - raise ValueError(f'No attached files found in {input_path.name}') - - # Determine which embeds to remove - if all: - embeds_to_remove = all_embeds - - # Show confirmation prompt - console.newline() - count = len(embeds_to_remove) - - if count <= 2: - # Show all embeds - console.print( - f'This will remove the following attached file{"s" if count != 1 else ""} from {input_path.name}:' - ) - for name in embeds_to_remove: - console.print(f'[faint]⎿ [/faint]{name}') - else: - # Show first + count - console.print( - f'This will remove the following attached file{"s" if count != 1 else ""} from {input_path.name}:' - ) - console.print( - f'[faint]⎿ [/faint]{embeds_to_remove[0]} and {count - 1} more' - ) + if not all_embeds: + console.newline() + console.error(f'No attached files found in {input_path.name}', panel=True) + raise ValueError(f'No attached files found in {input_path.name}') - console.newline() - confirm = typer.prompt('Continue? [y/N]', default='n') + # Determine which embeds to remove + if all: + embeds_to_remove = all_embeds - if confirm.lower() not in ['y', 'yes']: - console.info('Operation cancelled') - doc.close() - raise typer.Exit(0) - else: - embeds_to_remove = names if names else [] + # Show confirmation prompt + console.newline() + count = len(embeds_to_remove) - # Validate each embed exists - for name in embeds_to_remove: - if name not in all_embeds: - console.newline() - console.error( - f"Attachment '{name}' not found in {input_path.name}", - panel=True, + if count <= 2: + # Show all embeds + console.print( + f'This will remove the following attached file{"s" if count != 1 else ""} from {input_path.name}:' ) - console.newline() - console.print('Available attachments:') - for available in all_embeds: - console.print(f'[faint]⎿ [/faint]{available}') - doc.close() - raise ValueError( - f"Attachment '{name}' not found in {input_path.name}" + for name in embeds_to_remove: + console.print(f'[faint]⎿ [/faint]{name}') + else: + # Show first + count + console.print( + f'This will remove the following attached file{"s" if count != 1 else ""} from {input_path.name}:' + ) + console.print( + f'[faint]⎿ [/faint]{embeds_to_remove[0]} and {count - 1} more' ) - # Determine output path - if output is None: - output_path = input_path.parent / f'{input_path.stem}_no_attachments.pdf' - else: - output_path = Path(output) + console.newline() + confirm = typer.prompt('Continue? [y/N]', default='n') - # Make absolute if relative - if not output_path.is_absolute(): - output_path = input_path.parent / output_path + if confirm.lower() not in ['y', 'yes']: + console.info('Operation cancelled') + raise typer.Exit(0) + else: + embeds_to_remove = names if names else [] - # Ensure .pdf extension - if output_path.suffix.lower() != '.pdf': - output_path = output_path.with_suffix('.pdf') + # Validate each embed exists + for name in embeds_to_remove: + if name not in all_embeds: + console.newline() + console.error( + f"Attachment '{name}' not found in {input_path.name}", + panel=True, + ) + console.newline() + console.print('Available attachments:') + for available in all_embeds: + console.print(f'[faint]⎿ [/faint]{available}') + raise ValueError( + f"Attachment '{name}' not found in {input_path.name}" + ) - # Check if output exists - if output_path.exists(): - if not prompt_overwrite(output_path): - console.info('Operation cancelled') - doc.close() - raise typer.Exit(0) + # Determine output path + if output is None: + output_path = input_path.parent / f'{input_path.stem}_no_attachments.pdf' + else: + output_path = Path(output) - # Create output directory if needed - output_path.parent.mkdir(parents=True, exist_ok=True) + # Make absolute if relative + if not output_path.is_absolute(): + output_path = input_path.parent / output_path - console.newline() - with console.shimmer(f'Removing {len(embeds_to_remove)} attachment(s)...'): - # Remove each attachment - for name in embeds_to_remove: - doc.embfile_del(name) - console.print(f'[faint]⎿ [/faint]Removed {name}') + # Ensure .pdf extension + if output_path.suffix.lower() != '.pdf': + output_path = output_path.with_suffix('.pdf') + + # Check if output exists + if output_path.exists(): + if not prompt_overwrite(output_path): + console.info('Operation cancelled') + raise typer.Exit(0) - # Save the modified PDF - doc.save(str(output_path)) + console.newline() + with console.shimmer(f'Removing {len(embeds_to_remove)} attachment(s)...'): + # Remove each attachment + for name in embeds_to_remove: + pdf.remove_attachment(name) + console.print(f'[faint]⎿ [/faint]Removed {name}') - doc.close() + # Save the modified PDF + pdf.save(output_path) - console.newline() - console.success( - f'Successfully removed {len(embeds_to_remove)} attachment{"s" if len(embeds_to_remove) != 1 else ""} from {output_path}' - ) + console.newline() + console.success( + f'Successfully removed {len(embeds_to_remove)} attachment{"s" if len(embeds_to_remove) != 1 else ""} from {output_path}' + ) except typer.Exit: raise - except (FileNotFoundError, ValueError): + except (FileNotFoundError, ValueError, KeyError): raise typer.Exit(1) except Exception as e: console.error(f'Error processing PDF: {str(e)}') @@ -454,11 +386,6 @@ def add_attachment( console.error(f'File not found: {file_str}', panel=True) raise FileNotFoundError(f'File not found: {file_str}') file_paths.append(file_path) - # Open PDF - doc = pymupdf.open(input_path) - - # Get existing attachments - existing_embeds = doc.embfile_names() console.newline() console.info( @@ -466,84 +393,77 @@ def add_attachment( ) console.newline() - # Process each file to attach - with console.shimmer('Adding attachments...'): - for idx, file_path in enumerate(file_paths): - # Determine attachment name - embed_name = file_path.name - if name is not None and idx < len(name): - embed_name = name[idx] - - # Check if attachment already exists - if embed_name in existing_embeds: - if not overwrite: - console.newline() - console.error( - f"Attachment '{embed_name}' already exists in {input_path.name}", - panel=True, - ) - console.newline() - console.print('Use --overwrite to replace it') - doc.close() - raise ValueError( - f"Attachment '{embed_name}' already exists. Use --overwrite to replace it" - ) - else: - # Delete existing attachment before adding new one - doc.embfile_del(embed_name) - - # Get description - embed_desc = '' - if description is not None and idx < len(description): - embed_desc = description[idx] - - # Read file content - with open(file_path, 'rb') as f: - file_content = f.read() - - # Add attachment - doc.embfile_add( - name=embed_name, - buffer_=file_content, - filename=file_path.name, - desc=embed_desc, - ) + # Use service to handle attachments + with PdfService(input_path) as pdf: + # Get existing attachments + existing_embeds = pdf.list_attachments() + + # Process each file to attach + with console.shimmer('Adding attachments...'): + for idx, file_path in enumerate(file_paths): + # Determine attachment name + embed_name = file_path.name + if name is not None and idx < len(name): + embed_name = name[idx] + + # Check if attachment already exists + if embed_name in existing_embeds: + if not overwrite: + console.newline() + console.error( + f"Attachment '{embed_name}' already exists in {input_path.name}", + panel=True, + ) + console.newline() + console.print('Use --overwrite to replace it') + raise ValueError( + f"Attachment '{embed_name}' already exists. Use --overwrite to replace it" + ) + else: + # Delete existing attachment before adding new one + pdf.remove_attachment(embed_name) + + # Get description + embed_desc = '' + if description is not None and idx < len(description): + embed_desc = description[idx] + + # Add attachment using service + pdf.add_attachment(file_path, name=embed_name, desc=embed_desc) + + # Get file size for display + file_size = file_path.stat().st_size + size_str = format_file_size(file_size) + + # Build output message + msg_parts = [f'[faint]⎿ [/faint]Added {embed_name} ({size_str})'] + if embed_desc: + msg_parts.append(f' - {embed_desc}') + + console.print(''.join(msg_parts)) + + # Determine output path + if output is None: + output_path = input_path.parent / f'{input_path.stem}_with_attachments.pdf' + else: + output_path = Path(output) - # Build output message - size_str = format_file_size(len(file_content)) - msg_parts = [f'[faint]⎿ [/faint]Added {embed_name} ({size_str})'] - if embed_desc: - msg_parts.append(f' - {embed_desc}') + # Make absolute if relative + if not output_path.is_absolute(): + output_path = input_path.parent / output_path - console.print(''.join(msg_parts)) + # Ensure .pdf extension + if output_path.suffix.lower() != '.pdf': + output_path = output_path.with_suffix('.pdf') - # Determine output path - if output is None: - output_path = input_path.parent / f'{input_path.stem}_with_attachments.pdf' - else: - output_path = Path(output) + # Check if output exists + if output_path.exists(): + if not prompt_overwrite(output_path): + console.info('Operation cancelled') + raise typer.Exit(0) - # Make absolute if relative - if not output_path.is_absolute(): - output_path = input_path.parent / output_path - - # Ensure .pdf extension - if output_path.suffix.lower() != '.pdf': - output_path = output_path.with_suffix('.pdf') - - # Check if output exists - if output_path.exists(): - if not prompt_overwrite(output_path): - console.info('Operation cancelled') - doc.close() - raise typer.Exit(0) - - # Create output directory if needed - output_path.parent.mkdir(parents=True, exist_ok=True) - - # Save the PDF - doc.save(str(output_path)) - doc.close() + # Save the PDF + pdf.save(output_path) console.newline() console.success( @@ -552,7 +472,7 @@ def add_attachment( except typer.Exit: raise - except (FileNotFoundError, ValueError): + except (FileNotFoundError, ValueError, KeyError): raise typer.Exit(1) except Exception as e: console.error(f'Error processing PDF: {str(e)}') @@ -608,33 +528,31 @@ def read_attachment( # Validate input file input_path = validate_pdf_file(input_file) - # Open PDF - doc = pymupdf.open(input_path) + # Use service to extract attachment + with PdfService(input_path) as pdf: + # Get list of attachments + embed_names = pdf.list_attachments() - # Get list of attachments - embed_names = doc.embfile_names() - - # Validate attachment exists - if name not in embed_names: - console.newline() - console.error( - f"Attachment '{name}' not found in {input_path.name}", panel=True - ) - - if embed_names: - console.newline() - console.print('Available attachments:') - for available in embed_names: - console.print(f'[faint]⎿ [/faint]{available}') - else: + # Validate attachment exists + if name not in embed_names: console.newline() - console.print('No attached files found in this PDF') + console.error( + f"Attachment '{name}' not found in {input_path.name}", panel=True + ) + + if embed_names: + console.newline() + console.print('Available attachments:') + for available in embed_names: + console.print(f'[faint]⎿ [/faint]{available}') + else: + console.newline() + console.print('No attached files found in this PDF') - doc.close() - raise ValueError(f"Attachment '{name}' not found in {input_path.name}") + raise ValueError(f"Attachment '{name}' not found in {input_path.name}") - # Extract content - content = doc.embfile_get(name) + # Extract content + content = pdf.extract_attachment(name) # Handle stdout mode if stdout: @@ -645,12 +563,10 @@ def read_attachment( f"Cannot output binary file to stdout.\nFile '{name}' appears to be binary.\nUse -o to save to a file instead.", panel=True, ) - doc.close() raise ValueError(f"Cannot output binary file '{name}' to stdout") # Output to stdout sys.stdout.buffer.write(content) - doc.close() return # Determine output path @@ -663,7 +579,6 @@ def read_attachment( if output_path.exists(): if not prompt_overwrite(output_path): console.info('Operation cancelled') - doc.close() raise typer.Exit(0) # Create output directory if needed @@ -673,8 +588,6 @@ def read_attachment( with open(output_path, 'wb') as f: f.write(content) - doc.close() - console.newline() size_str = format_file_size(len(content)) console.success( @@ -683,7 +596,7 @@ def read_attachment( except typer.Exit: raise - except (FileNotFoundError, ValueError): + except (FileNotFoundError, ValueError, KeyError): raise typer.Exit(1) except Exception as e: console.error(f'Error extracting attachment: {str(e)}') diff --git a/src/parxy_cli/commands/pdf.py b/src/parxy_cli/commands/pdf.py index 700bfd6..a2847e8 100644 --- a/src/parxy_cli/commands/pdf.py +++ b/src/parxy_cli/commands/pdf.py @@ -1,119 +1,18 @@ """PDF manipulation commands.""" -import re from pathlib import Path -from typing import List, Annotated, Optional, Tuple +from typing import List, Annotated, Optional import typer -import pymupdf from parxy_cli.console.console import Console +from parxy_cli.services import PdfService, collect_pdf_files_with_ranges app = typer.Typer() console = Console() -def parse_input_with_pages( - input_str: str, -) -> Tuple[str, Optional[int], Optional[int]]: - """ - Parse input string to extract file path and page range. - - Supports formats: - - file.pdf[1] - single page (1-based) - - file.pdf[:2] - from start to page 2 (1-based, inclusive) - - file.pdf[3:] - from page 3 to end (1-based) - - file.pdf[3:5] - from page 3 to 5 (1-based, inclusive) - - file.pdf - all pages - - Args: - input_str: Input string with optional page range - - Returns: - Tuple of (file_path, from_page, to_page) where pages are 0-based for PyMuPDF. - from_page and to_page are None if no range specified or represent the range to use. - """ - # Match pattern: filename[range] - pattern = r'^(.+?)\[([^\]]+)\]$' - match = re.match(pattern, input_str) - - if not match: - # No page range specified - return input_str, None, None - - file_path = match.group(1) - page_range = match.group(2) - - # Parse the page range - if ':' in page_range: - # Range format [start:end] - parts = page_range.split(':', 1) - start_str = parts[0].strip() - end_str = parts[1].strip() - - # Convert to 0-based indices - # PyMuPDF uses 0-based indexing - from_page = (int(start_str) - 1) if start_str else 0 - to_page = (int(end_str) - 1) if end_str else None # None means last page - - else: - # Single page [n] - page_num = int(page_range) - 1 # Convert to 0-based - from_page = page_num - to_page = page_num - - return file_path, from_page, to_page - - -def collect_pdf_files_with_ranges( - inputs: List[str], -) -> List[Tuple[Path, Optional[int], Optional[int]]]: - """ - Collect PDF files from the input list with optional page ranges. - - For folders, only files in the exact directory are collected (non-recursive). - For files with page ranges (e.g., file.pdf[1:3]), parse and extract the range. - - Args: - inputs: List of file paths (with optional page ranges) and/or folder paths - - Returns: - List of tuples: (Path, from_page, to_page) where pages are 0-based. - from_page and to_page are None if all pages should be included. - """ - files = [] - - for input_str in inputs: - # Parse the input to extract file path and page range - file_path_str, from_page, to_page = parse_input_with_pages(input_str) - path = Path(file_path_str) - - if path.is_file(): - # Check if it's a PDF - if path.suffix.lower() == '.pdf': - files.append((path, from_page, to_page)) - else: - console.warning(f'Skipping non-PDF file: {file_path_str}') - elif path.is_dir(): - # Non-recursive: only files in the given directory - # Directories cannot have page ranges - if from_page is not None or to_page is not None: - console.warning( - f'Page ranges are not supported for directories: {input_str}' - ) - pdf_files = sorted(path.glob('*.pdf')) - if pdf_files: - # Add all PDFs from directory without page ranges - files.extend([(f, None, None) for f in pdf_files]) - else: - console.warning(f'No PDF files found in directory: {file_path_str}') - else: - console.warning(f'Path not found: {file_path_str}') - - return files - - @app.command(name='pdf:merge', help='Merge multiple PDF files into a single PDF') def merge( inputs: Annotated[ @@ -205,77 +104,40 @@ def merge( # Create output directory if it doesn't exist output_path.parent.mkdir(parents=True, exist_ok=True) - # Merge PDFs + # Merge PDFs using service try: with console.shimmer(f'Merging {len(files_with_ranges)} PDF files...'): - merged_pdf = pymupdf.open() - + # Display progress for each file for file_path, from_page, to_page in files_with_ranges: - try: - pdf = pymupdf.open(file_path) - - # Determine page range to insert - if from_page is None and to_page is None: - # Insert all pages - page_info = 'all pages' - merged_pdf.insert_pdf(pdf) + # Determine page range info for display + if from_page is None and to_page is None: + page_info = 'all pages' + else: + actual_from = from_page if from_page is not None else 0 + actual_to = to_page if to_page is not None else 'end' + + if from_page == to_page: + page_info = f'page {from_page + 1}' + elif to_page is None: + page_info = f'pages {actual_from + 1}-end' else: - # Insert specific page range - # PyMuPDF insert_pdf uses from_page and to_page (inclusive, 0-based) - actual_from = from_page if from_page is not None else 0 - actual_to = to_page if to_page is not None else (len(pdf) - 1) - - # Validate page range - if actual_from < 0 or actual_from >= len(pdf): - console.warning( - f'Invalid page range for {file_path.name}: page {actual_from + 1} does not exist' - ) - pdf.close() - continue - - if actual_to < 0 or actual_to >= len(pdf): - console.warning( - f'Invalid page range for {file_path.name}: page {actual_to + 1} does not exist' - ) - pdf.close() - continue - - if actual_from > actual_to: - console.warning( - f'Invalid page range for {file_path.name}: start page {actual_from + 1} > end page {actual_to + 1}' - ) - pdf.close() - continue - - # Format page info for display (1-based) - if actual_from == actual_to: - page_info = f'page {actual_from + 1}' - else: - page_info = f'pages {actual_from + 1}-{actual_to + 1}' - - merged_pdf.insert_pdf( - pdf, from_page=actual_from, to_page=actual_to - ) - - console.print( - f'[faint]⎿ [/faint] Adding {file_path.name} ({page_info})' - ) - pdf.close() - - except Exception as e: - console.error(f'Error processing {file_path.name}: {str(e)}') - merged_pdf.close() - raise typer.Exit(1) - - # Save the merged PDF - merged_pdf.save(str(output_path)) - merged_pdf.close() + page_info = f'pages {actual_from + 1}-{to_page + 1}' + + console.print( + f'[faint]⎿ [/faint] Adding {file_path.name} ({page_info})' + ) + + # Use service to merge PDFs + PdfService.merge_pdfs(files_with_ranges, output_path) console.newline() console.success( f'Successfully merged {len(files_with_ranges)} files into {output_path}' ) + except (ValueError, FileNotFoundError) as e: + console.error(f'Error during merge: {str(e)}') + raise typer.Exit(1) except Exception as e: console.error(f'Error during merge: {str(e)}') raise typer.Exit(1) @@ -353,14 +215,17 @@ def split( if prefix is None: prefix = input_path.stem - # Open and process the PDF + # Split PDF using service try: + # Get page count first to display info + import pymupdf + pdf = pymupdf.open(input_path) total_pages = len(pdf) + pdf.close() if total_pages == 0: console.error('PDF file is empty (no pages)', panel=True) - pdf.close() raise typer.Exit(1) console.info( @@ -371,27 +236,23 @@ def split( ) with console.shimmer(f'Splitting PDF...'): - output_files = [] - - # Split into individual pages - for page_num in range(total_pages): - output_file = output_path / f'{prefix}_page_{page_num + 1}.pdf' - output_pdf = pymupdf.open() - output_pdf.insert_pdf(pdf, from_page=page_num, to_page=page_num) - output_pdf.save(str(output_file)) - output_pdf.close() - output_files.append(output_file) + # Use service to split PDF + output_files = PdfService.split_pdf(input_path, output_path, prefix) + + # Display created files + for idx, output_file in enumerate(output_files): console.print( - f'[faint]⎿ [/faint] Created {output_file.name} (page {page_num + 1})' + f'[faint]⎿ [/faint] Created {output_file.name} (page {idx + 1})' ) - pdf.close() - console.newline() console.success( f'Successfully split PDF into {len(output_files)} file{"s" if len(output_files) > 1 else ""} in {output_path}' ) + except (ValueError, FileNotFoundError) as e: + console.error(f'Error during split: {str(e)}') + raise typer.Exit(1) except Exception as e: console.error(f'Error during split: {str(e)}') raise typer.Exit(1) diff --git a/src/parxy_cli/services/__init__.py b/src/parxy_cli/services/__init__.py new file mode 100644 index 0000000..2144370 --- /dev/null +++ b/src/parxy_cli/services/__init__.py @@ -0,0 +1,19 @@ +"""PDF manipulation services.""" + +from parxy_cli.services.pdf_service import PdfService +from parxy_cli.services.pdf_utils import ( + format_file_size, + validate_pdf_file, + is_binary_file, + parse_input_with_pages, + collect_pdf_files_with_ranges, +) + +__all__ = [ + 'PdfService', + 'format_file_size', + 'validate_pdf_file', + 'is_binary_file', + 'parse_input_with_pages', + 'collect_pdf_files_with_ranges', +] diff --git a/src/parxy_cli/services/pdf_service.py b/src/parxy_cli/services/pdf_service.py new file mode 100644 index 0000000..46f763d --- /dev/null +++ b/src/parxy_cli/services/pdf_service.py @@ -0,0 +1,314 @@ +"""PDF manipulation service using PyMuPDF.""" + +from pathlib import Path +from typing import List, Optional, Dict, Any, Tuple + +import pymupdf + + +class PdfService: + """ + Service for PDF manipulation operations using PyMuPDF. + + This class provides a high-level interface for PDF operations including + attachment management, merging, and splitting. It uses context manager + protocol for proper resource management. + + Example: + with PdfService(pdf_path) as pdf: + attachments = pdf.list_attachments() + pdf.add_attachment(file_path, name="data.csv", desc="Sales data") + pdf.save(output_path) + """ + + def __init__(self, pdf_path: Path): + """ + Initialize PDF service with a PDF file path. + + Args: + pdf_path: Path to the PDF file + """ + self.pdf_path = pdf_path + self._doc = None + + def __enter__(self): + """ + Open the PDF document when entering context manager. + + Returns: + Self for method chaining + """ + self._doc = pymupdf.open(self.pdf_path) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Close the PDF document when exiting context manager. + + Args: + exc_type: Exception type if an exception occurred + exc_val: Exception value if an exception occurred + exc_tb: Exception traceback if an exception occurred + """ + if self._doc: + self._doc.close() + return False + + # ======================================================================== + # Attachment Operations + # ======================================================================== + + def list_attachments(self) -> List[str]: + """ + List all attachment names in the PDF. + + Returns: + List of attachment names + + Raises: + RuntimeError: If called outside context manager + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + return self._doc.embfile_names() + + def get_attachment_info(self, name: str) -> Dict[str, Any]: + """ + Get metadata for a specific attachment. + + Args: + name: Name of the attachment + + Returns: + Dictionary containing attachment metadata (size, description, etc.) + + Raises: + RuntimeError: If called outside context manager + KeyError: If attachment not found + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + + attachments = self._doc.embfile_names() + if name not in attachments: + raise KeyError(f"Attachment '{name}' not found in PDF") + + return self._doc.embfile_info(name) + + def add_attachment( + self, + file_path: Path, + name: Optional[str] = None, + desc: str = '', + ) -> None: + """ + Add a file as an attachment to the PDF. + + Args: + file_path: Path to the file to attach + name: Custom name for the attachment (defaults to filename) + desc: Description for the attachment + + Raises: + RuntimeError: If called outside context manager + FileNotFoundError: If file_path doesn't exist + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + + if not file_path.is_file(): + raise FileNotFoundError(f'File not found: {file_path}') + + # Use filename if no custom name provided + embed_name = name if name else file_path.name + + # Read file content + with open(file_path, 'rb') as f: + file_content = f.read() + + # Add attachment + self._doc.embfile_add( + name=embed_name, + buffer_=file_content, + filename=file_path.name, + desc=desc, + ) + + def remove_attachment(self, name: str) -> None: + """ + Remove an attachment from the PDF. + + Args: + name: Name of the attachment to remove + + Raises: + RuntimeError: If called outside context manager + KeyError: If attachment not found + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + + attachments = self._doc.embfile_names() + if name not in attachments: + raise KeyError(f"Attachment '{name}' not found in PDF") + + self._doc.embfile_del(name) + + def extract_attachment(self, name: str) -> bytes: + """ + Extract attachment content from the PDF. + + Args: + name: Name of the attachment to extract + + Returns: + Raw bytes content of the attachment + + Raises: + RuntimeError: If called outside context manager + KeyError: If attachment not found + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + + attachments = self._doc.embfile_names() + if name not in attachments: + raise KeyError(f"Attachment '{name}' not found in PDF") + + return self._doc.embfile_get(name) + + def save(self, output_path: Path) -> None: + """ + Save the PDF to a file. + + Args: + output_path: Path where the PDF should be saved + + Raises: + RuntimeError: If called outside context manager + """ + if not self._doc: + raise RuntimeError('PdfService must be used within a context manager') + + # Ensure parent directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + self._doc.save(str(output_path)) + + # ======================================================================== + # Static Operations (Multiple PDFs) + # ======================================================================== + + @staticmethod + def merge_pdfs( + inputs: List[Tuple[Path, Optional[int], Optional[int]]], + output: Path, + ) -> None: + """ + Merge multiple PDF files into a single PDF. + + Args: + inputs: List of tuples (pdf_path, from_page, to_page) where + page numbers are 0-based. None means all pages or last page. + output: Path where the merged PDF should be saved + + Raises: + FileNotFoundError: If any input PDF doesn't exist + ValueError: If page ranges are invalid + """ + merged_pdf = pymupdf.open() + + try: + for file_path, from_page, to_page in inputs: + if not file_path.is_file(): + raise FileNotFoundError(f'PDF file not found: {file_path}') + + pdf = pymupdf.open(file_path) + + # Determine page range to insert + if from_page is None and to_page is None: + # Insert all pages + merged_pdf.insert_pdf(pdf) + else: + # Insert specific page range + actual_from = from_page if from_page is not None else 0 + actual_to = to_page if to_page is not None else (len(pdf) - 1) + + # Validate page range + if actual_from < 0 or actual_from >= len(pdf): + pdf.close() + raise ValueError( + f'Invalid page range for {file_path.name}: page {actual_from + 1} does not exist' + ) + + if actual_to < 0 or actual_to >= len(pdf): + pdf.close() + raise ValueError( + f'Invalid page range for {file_path.name}: page {actual_to + 1} does not exist' + ) + + if actual_from > actual_to: + pdf.close() + raise ValueError( + f'Invalid page range for {file_path.name}: start page {actual_from + 1} > end page {actual_to + 1}' + ) + + merged_pdf.insert_pdf( + pdf, from_page=actual_from, to_page=actual_to + ) + + pdf.close() + + # Ensure output directory exists + output.parent.mkdir(parents=True, exist_ok=True) + + # Save the merged PDF + merged_pdf.save(str(output)) + finally: + merged_pdf.close() + + @staticmethod + def split_pdf(input_path: Path, output_dir: Path, prefix: str) -> List[Path]: + """ + Split a PDF file into individual pages. + + Args: + input_path: Path to the PDF file to split + output_dir: Directory where split PDFs should be saved + prefix: Prefix for output filenames + + Returns: + List of paths to the created PDF files + + Raises: + FileNotFoundError: If input PDF doesn't exist + ValueError: If PDF is empty or invalid + """ + if not input_path.is_file(): + raise FileNotFoundError(f'PDF file not found: {input_path}') + + pdf = pymupdf.open(input_path) + total_pages = len(pdf) + + if total_pages == 0: + pdf.close() + raise ValueError('PDF file is empty (no pages)') + + # Ensure output directory exists + output_dir.mkdir(parents=True, exist_ok=True) + + output_files = [] + + try: + # Split into individual pages + for page_num in range(total_pages): + output_file = output_dir / f'{prefix}_page_{page_num + 1}.pdf' + output_pdf = pymupdf.open() + output_pdf.insert_pdf(pdf, from_page=page_num, to_page=page_num) + output_pdf.save(str(output_file)) + output_pdf.close() + output_files.append(output_file) + finally: + pdf.close() + + return output_files diff --git a/src/parxy_cli/services/pdf_utils.py b/src/parxy_cli/services/pdf_utils.py new file mode 100644 index 0000000..a0fe76d --- /dev/null +++ b/src/parxy_cli/services/pdf_utils.py @@ -0,0 +1,168 @@ +"""Utility functions for PDF manipulation.""" + +import re +from pathlib import Path +from typing import List, Tuple, Optional + +from parxy_cli.console.console import Console + +console = Console() + + +def format_file_size(size_bytes: int) -> str: + """ + Convert bytes to human-readable format. + + Args: + size_bytes: Size in bytes + + Returns: + Formatted string like "1.5 MB" + """ + size = float(size_bytes) + for unit in ['B', 'KB', 'MB', 'GB']: + if size < 1024.0: + return f'{size:.1f} {unit}' + size /= 1024.0 + return f'{size:.1f} TB' + + +def validate_pdf_file(file_path: str) -> Path: + """ + Validate PDF file exists and has .pdf extension. + + Args: + file_path: Path to PDF file + + Returns: + Path object + + Raises: + FileNotFoundError: if file not found + ValueError: if file is not a PDF + """ + path = Path(file_path) + if not path.is_file(): + console.error(f'Input file not found: {file_path}', panel=True) + raise FileNotFoundError(f'Input file not found: {file_path}') + if path.suffix.lower() != '.pdf': + console.error(f'Input file must be a PDF: {file_path}', panel=True) + raise ValueError(f'Input file must be a PDF: {file_path}') + return path + + +def is_binary_file(content: bytes) -> bool: + """ + Detect if content is binary by checking for null bytes. + + Checks first 8KB of content for null bytes which typically + indicate binary data. + + Args: + content: File content as bytes + + Returns: + True if binary, False if likely text + """ + check_bytes = content[:8192] + return b'\x00' in check_bytes + + +def parse_input_with_pages( + input_str: str, +) -> Tuple[str, Optional[int], Optional[int]]: + """ + Parse input string to extract file path and page range. + + Supports formats: + - file.pdf[1] - single page (1-based) + - file.pdf[:2] - from start to page 2 (1-based, inclusive) + - file.pdf[3:] - from page 3 to end (1-based) + - file.pdf[3:5] - from page 3 to 5 (1-based, inclusive) + - file.pdf - all pages + + Args: + input_str: Input string with optional page range + + Returns: + Tuple of (file_path, from_page, to_page) where pages are 0-based for PyMuPDF. + from_page and to_page are None if no range specified or represent the range to use. + """ + # Match pattern: filename[range] + pattern = r'^(.+?)\[([^\]]+)\]$' + match = re.match(pattern, input_str) + + if not match: + # No page range specified + return input_str, None, None + + file_path = match.group(1) + page_range = match.group(2) + + # Parse the page range + if ':' in page_range: + # Range format [start:end] + parts = page_range.split(':', 1) + start_str = parts[0].strip() + end_str = parts[1].strip() + + # Convert to 0-based indices + # PyMuPDF uses 0-based indexing + from_page = (int(start_str) - 1) if start_str else 0 + to_page = (int(end_str) - 1) if end_str else None # None means last page + + else: + # Single page [n] + page_num = int(page_range) - 1 # Convert to 0-based + from_page = page_num + to_page = page_num + + return file_path, from_page, to_page + + +def collect_pdf_files_with_ranges( + inputs: List[str], +) -> List[Tuple[Path, Optional[int], Optional[int]]]: + """ + Collect PDF files from the input list with optional page ranges. + + For folders, only files in the exact directory are collected (non-recursive). + For files with page ranges (e.g., file.pdf[1:3]), parse and extract the range. + + Args: + inputs: List of file paths (with optional page ranges) and/or folder paths + + Returns: + List of tuples: (Path, from_page, to_page) where pages are 0-based. + from_page and to_page are None if all pages should be included. + """ + files = [] + + for input_str in inputs: + # Parse the input to extract file path and page range + file_path_str, from_page, to_page = parse_input_with_pages(input_str) + path = Path(file_path_str) + + if path.is_file(): + # Check if it's a PDF + if path.suffix.lower() == '.pdf': + files.append((path, from_page, to_page)) + else: + console.warning(f'Skipping non-PDF file: {file_path_str}') + elif path.is_dir(): + # Non-recursive: only files in the given directory + # Directories cannot have page ranges + if from_page is not None or to_page is not None: + console.warning( + f'Page ranges are not supported for directories: {input_str}' + ) + pdf_files = sorted(path.glob('*.pdf')) + if pdf_files: + # Add all PDFs from directory without page ranges + files.extend([(f, None, None) for f in pdf_files]) + else: + console.warning(f'No PDF files found in directory: {file_path_str}') + else: + console.warning(f'Path not found: {file_path_str}') + + return files diff --git a/tests/commands/test_pdf.py b/tests/commands/test_pdf.py index 72ee0fc..750e762 100644 --- a/tests/commands/test_pdf.py +++ b/tests/commands/test_pdf.py @@ -5,11 +5,8 @@ from typer.testing import CliRunner import pymupdf -from parxy_cli.commands.pdf import ( - app, - parse_input_with_pages, - collect_pdf_files_with_ranges, -) +from parxy_cli.commands.pdf import app +from parxy_cli.services import parse_input_with_pages, collect_pdf_files_with_ranges @pytest.fixture diff --git a/tests/services/test_pdf_service.py b/tests/services/test_pdf_service.py new file mode 100644 index 0000000..95d3653 --- /dev/null +++ b/tests/services/test_pdf_service.py @@ -0,0 +1,351 @@ +"""Test suite for PDF service.""" + +import pytest +from pathlib import Path +import pymupdf + +from parxy_cli.services.pdf_service import PdfService + + +@pytest.fixture +def sample_pdf(tmp_path): + """Create a sample PDF without attachments.""" + pdf_path = tmp_path / 'test.pdf' + doc = pymupdf.open() + page = doc.new_page(width=612, height=792) + page.insert_text((100, 100), 'Test document') + doc.save(str(pdf_path)) + doc.close() + return pdf_path + + +@pytest.fixture +def sample_pdf_with_attachments(tmp_path): + """Create a sample PDF with attached files.""" + # Create the main PDF + pdf_path = tmp_path / 'document.pdf' + doc = pymupdf.open() + page = doc.new_page(width=612, height=792) + page.insert_text((100, 100), 'Test document with attachments') + + # Create and embed files + text_content = b'This is a text file with some notes.' + csv_content = b'name,value\nitem1,100\nitem2,200' + + doc.embfile_add( + name='notes.txt', + buffer_=text_content, + filename='notes.txt', + desc='Text notes', + ) + doc.embfile_add( + name='data.csv', + buffer_=csv_content, + filename='data.csv', + desc='Sales data', + ) + + doc.save(str(pdf_path)) + doc.close() + + return {'pdf': pdf_path, 'tmp_path': tmp_path} + + +@pytest.fixture +def sample_files(tmp_path): + """Create sample files for attaching.""" + text_file = tmp_path / 'file1.txt' + text_file.write_text('Sample text content') + + csv_file = tmp_path / 'file2.csv' + csv_file.write_text('col1,col2\nval1,val2') + + return {'text': text_file, 'csv': csv_file} + + +@pytest.fixture +def multiple_pdfs(tmp_path): + """Create multiple PDF files for testing merge.""" + # Create first PDF with 3 pages + pdf1_path = tmp_path / 'doc1.pdf' + pdf1 = pymupdf.open() + for i in range(3): + page = pdf1.new_page(width=612, height=792) + page.insert_text((100, 100), f'Page {i + 1} of doc1') + pdf1.save(str(pdf1_path)) + pdf1.close() + + # Create second PDF with 2 pages + pdf2_path = tmp_path / 'doc2.pdf' + pdf2 = pymupdf.open() + for i in range(2): + page = pdf2.new_page(width=612, height=792) + page.insert_text((100, 100), f'Page {i + 1} of doc2') + pdf2.save(str(pdf2_path)) + pdf2.close() + + return {'pdf1': pdf1_path, 'pdf2': pdf2_path, 'tmp_path': tmp_path} + + +# Tests for context manager +class TestContextManager: + """Tests for the context manager functionality.""" + + def test_context_manager_opens_and_closes(self, sample_pdf): + """Test that context manager opens and closes the PDF.""" + with PdfService(sample_pdf) as pdf: + assert pdf._doc is not None + + def test_operations_outside_context_manager_fail(self, sample_pdf): + """Test that operations outside context manager raise RuntimeError.""" + pdf = PdfService(sample_pdf) + with pytest.raises(RuntimeError): + pdf.list_attachments() + + +# Tests for attachment operations +class TestAttachmentOperations: + """Tests for attachment management operations.""" + + def test_list_attachments_empty(self, sample_pdf): + """Test listing attachments from PDF without attachments.""" + with PdfService(sample_pdf) as pdf: + attachments = pdf.list_attachments() + assert attachments == [] + + def test_list_attachments_with_files(self, sample_pdf_with_attachments): + """Test listing attachments from PDF with attachments.""" + with PdfService(sample_pdf_with_attachments['pdf']) as pdf: + attachments = pdf.list_attachments() + assert len(attachments) == 2 + assert 'notes.txt' in attachments + assert 'data.csv' in attachments + + def test_get_attachment_info(self, sample_pdf_with_attachments): + """Test getting attachment metadata.""" + with PdfService(sample_pdf_with_attachments['pdf']) as pdf: + info = pdf.get_attachment_info('notes.txt') + assert 'size' in info + assert info['description'] == 'Text notes' + + def test_get_attachment_info_not_found(self, sample_pdf): + """Test getting info for nonexistent attachment.""" + with PdfService(sample_pdf) as pdf: + with pytest.raises(KeyError): + pdf.get_attachment_info('nonexistent.txt') + + def test_add_attachment(self, sample_pdf, sample_files, tmp_path): + """Test adding an attachment to PDF.""" + output = tmp_path / 'output.pdf' + + with PdfService(sample_pdf) as pdf: + pdf.add_attachment(sample_files['text']) + pdf.save(output) + + # Verify attachment was added + doc = pymupdf.open(str(output)) + embeds = doc.embfile_names() + assert len(embeds) == 1 + assert 'file1.txt' in embeds + doc.close() + + def test_add_attachment_with_custom_name(self, sample_pdf, sample_files, tmp_path): + """Test adding attachment with custom name.""" + output = tmp_path / 'output.pdf' + + with PdfService(sample_pdf) as pdf: + pdf.add_attachment(sample_files['text'], name='custom.txt') + pdf.save(output) + + # Verify custom name + doc = pymupdf.open(str(output)) + embeds = doc.embfile_names() + assert 'custom.txt' in embeds + assert 'file1.txt' not in embeds + doc.close() + + def test_add_attachment_with_description(self, sample_pdf, sample_files, tmp_path): + """Test adding attachment with description.""" + output = tmp_path / 'output.pdf' + + with PdfService(sample_pdf) as pdf: + pdf.add_attachment(sample_files['text'], desc='Custom description') + pdf.save(output) + + # Verify description + doc = pymupdf.open(str(output)) + info = doc.embfile_info('file1.txt') + assert info['description'] == 'Custom description' + doc.close() + + def test_add_attachment_file_not_found(self, sample_pdf, tmp_path): + """Test adding nonexistent file.""" + with PdfService(sample_pdf) as pdf: + with pytest.raises(FileNotFoundError): + pdf.add_attachment(tmp_path / 'nonexistent.txt') + + def test_remove_attachment(self, sample_pdf_with_attachments, tmp_path): + """Test removing an attachment.""" + output = tmp_path / 'output.pdf' + + with PdfService(sample_pdf_with_attachments['pdf']) as pdf: + pdf.remove_attachment('notes.txt') + pdf.save(output) + + # Verify attachment was removed + doc = pymupdf.open(str(output)) + embeds = doc.embfile_names() + assert 'notes.txt' not in embeds + assert 'data.csv' in embeds + doc.close() + + def test_remove_attachment_not_found(self, sample_pdf): + """Test removing nonexistent attachment.""" + with PdfService(sample_pdf) as pdf: + with pytest.raises(KeyError): + pdf.remove_attachment('nonexistent.txt') + + def test_extract_attachment(self, sample_pdf_with_attachments): + """Test extracting attachment content.""" + with PdfService(sample_pdf_with_attachments['pdf']) as pdf: + content = pdf.extract_attachment('notes.txt') + assert b'text file with some notes' in content + + def test_extract_attachment_not_found(self, sample_pdf): + """Test extracting nonexistent attachment.""" + with PdfService(sample_pdf) as pdf: + with pytest.raises(KeyError): + pdf.extract_attachment('nonexistent.txt') + + +# Tests for save operation +class TestSaveOperation: + """Tests for saving PDFs.""" + + def test_save_creates_output_directory(self, sample_pdf, tmp_path): + """Test that save creates output directory if needed.""" + output = tmp_path / 'subdir' / 'nested' / 'output.pdf' + + with PdfService(sample_pdf) as pdf: + pdf.save(output) + + assert output.exists() + assert output.parent.exists() + + +# Tests for static operations +class TestMergePdfs: + """Tests for the merge_pdfs static method.""" + + def test_merge_two_files(self, multiple_pdfs): + """Test merging two PDF files.""" + output = multiple_pdfs['tmp_path'] / 'merged.pdf' + + PdfService.merge_pdfs( + [ + (multiple_pdfs['pdf1'], None, None), + (multiple_pdfs['pdf2'], None, None), + ], + output, + ) + + assert output.exists() + + # Verify merged PDF has correct number of pages + merged = pymupdf.open(str(output)) + assert len(merged) == 5 # 3 pages from pdf1 + 2 pages from pdf2 + merged.close() + + def test_merge_with_page_ranges(self, multiple_pdfs): + """Test merging with specific page ranges.""" + output = multiple_pdfs['tmp_path'] / 'merged.pdf' + + PdfService.merge_pdfs( + [ + (multiple_pdfs['pdf1'], 0, 1), # First 2 pages (0-based) + (multiple_pdfs['pdf2'], 0, 0), # Only first page + ], + output, + ) + + assert output.exists() + + merged = pymupdf.open(str(output)) + assert len(merged) == 3 # 2 pages from pdf1 + 1 page from pdf2 + merged.close() + + def test_merge_file_not_found(self, tmp_path): + """Test merging with nonexistent file.""" + output = tmp_path / 'merged.pdf' + + with pytest.raises(FileNotFoundError): + PdfService.merge_pdfs( + [(tmp_path / 'nonexistent.pdf', None, None)], + output, + ) + + def test_merge_invalid_page_range(self, multiple_pdfs): + """Test merging with invalid page range.""" + output = multiple_pdfs['tmp_path'] / 'merged.pdf' + + with pytest.raises(ValueError): + PdfService.merge_pdfs( + [(multiple_pdfs['pdf1'], 10, 20)], # Invalid range + output, + ) + + +class TestSplitPdf: + """Tests for the split_pdf static method.""" + + def test_split_into_individual_pages(self, multiple_pdfs): + """Test splitting a PDF into individual pages.""" + output_dir = multiple_pdfs['tmp_path'] / 'split' + + output_files = PdfService.split_pdf( + multiple_pdfs['pdf1'], output_dir, 'doc1' + ) + + assert len(output_files) == 3 + assert all(f.exists() for f in output_files) + + # Check filenames + assert output_files[0].name == 'doc1_page_1.pdf' + assert output_files[1].name == 'doc1_page_2.pdf' + assert output_files[2].name == 'doc1_page_3.pdf' + + # Verify each file has exactly 1 page + for output_file in output_files: + pdf = pymupdf.open(str(output_file)) + assert len(pdf) == 1 + pdf.close() + + def test_split_file_not_found(self, tmp_path): + """Test splitting nonexistent file.""" + output_dir = tmp_path / 'split' + + with pytest.raises(FileNotFoundError): + PdfService.split_pdf(tmp_path / 'nonexistent.pdf', output_dir, 'prefix') + + def test_split_empty_pdf(self, tmp_path): + """Test splitting empty PDF.""" + # Create a PDF and then manually create an empty one by bypassing pymupdf's save check + # Since pymupdf doesn't allow saving empty PDFs, we'll test the ValueError from our service + # by creating a PDF file with 1 page and then testing our service catches empty PDFs + empty_pdf = tmp_path / 'empty.pdf' + + # Create a minimal PDF file that will report 0 pages + # Since we can't create a truly empty PDF with pymupdf, we'll skip this test + # or test it differently - our service will raise ValueError when pdf has 0 pages + pytest.skip("PyMuPDF doesn't allow creating empty PDFs for testing") + + def test_split_creates_output_directory(self, multiple_pdfs): + """Test that split creates output directory if needed.""" + output_dir = multiple_pdfs['tmp_path'] / 'nested' / 'split' + + output_files = PdfService.split_pdf( + multiple_pdfs['pdf1'], output_dir, 'doc1' + ) + + assert output_dir.exists() + assert len(output_files) == 3 diff --git a/tests/services/test_pdf_utils.py b/tests/services/test_pdf_utils.py new file mode 100644 index 0000000..8fdf939 --- /dev/null +++ b/tests/services/test_pdf_utils.py @@ -0,0 +1,253 @@ +"""Test suite for PDF utility functions.""" + +import pytest +from pathlib import Path + +from parxy_cli.services.pdf_utils import ( + format_file_size, + validate_pdf_file, + is_binary_file, + parse_input_with_pages, + collect_pdf_files_with_ranges, +) + + +# Tests for format_file_size +class TestFormatFileSize: + """Tests for the format_file_size function.""" + + def test_format_file_size_bytes(self): + """Test formatting bytes.""" + assert format_file_size(100) == '100.0 B' + + def test_format_file_size_kilobytes(self): + """Test formatting kilobytes.""" + assert format_file_size(1024) == '1.0 KB' + assert format_file_size(1536) == '1.5 KB' + + def test_format_file_size_megabytes(self): + """Test formatting megabytes.""" + assert format_file_size(1048576) == '1.0 MB' + assert format_file_size(2621440) == '2.5 MB' + + def test_format_file_size_gigabytes(self): + """Test formatting gigabytes.""" + assert format_file_size(1073741824) == '1.0 GB' + + def test_format_file_size_zero(self): + """Test formatting zero bytes.""" + assert format_file_size(0) == '0.0 B' + + +# Tests for validate_pdf_file +class TestValidatePdfFile: + """Tests for the validate_pdf_file function.""" + + def test_validate_pdf_file_success(self, tmp_path): + """Test validating a valid PDF file.""" + pdf_file = tmp_path / 'test.pdf' + pdf_file.touch() + path = validate_pdf_file(str(pdf_file)) + assert path == pdf_file + + def test_validate_pdf_file_not_found(self, tmp_path): + """Test validating a nonexistent file.""" + with pytest.raises(FileNotFoundError): + validate_pdf_file(str(tmp_path / 'nonexistent.pdf')) + + def test_validate_pdf_file_not_pdf(self, tmp_path): + """Test validating a non-PDF file.""" + txt_file = tmp_path / 'file.txt' + txt_file.write_text('not a pdf') + with pytest.raises(ValueError): + validate_pdf_file(str(txt_file)) + + def test_validate_pdf_file_case_insensitive(self, tmp_path): + """Test that .PDF extension (uppercase) is accepted.""" + pdf_file = tmp_path / 'test.PDF' + pdf_file.touch() + path = validate_pdf_file(str(pdf_file)) + assert path == pdf_file + + +# Tests for is_binary_file +class TestIsBinaryFile: + """Tests for the is_binary_file function.""" + + def test_is_binary_file_text(self): + """Test detecting text content.""" + text_content = b'This is plain text content' + assert not is_binary_file(text_content) + + def test_is_binary_file_binary(self): + """Test detecting binary content.""" + binary_content = b'\x00\x01\x02\x03' + assert is_binary_file(binary_content) + + def test_is_binary_file_utf8(self): + """Test detecting UTF-8 encoded text.""" + utf8_content = 'UTF-8 text with special chars: café'.encode('utf-8') + assert not is_binary_file(utf8_content) + + def test_is_binary_file_empty(self): + """Test detecting empty content.""" + empty_content = b'' + assert not is_binary_file(empty_content) + + def test_is_binary_file_long_text(self): + """Test detecting long text content.""" + long_text = ('a' * 10000).encode('utf-8') + assert not is_binary_file(long_text) + + +# Tests for parse_input_with_pages +class TestParseInputWithPages: + """Tests for the parse_input_with_pages function.""" + + def test_no_page_range(self): + """Test parsing input without page range.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf') + assert file_path == 'file.pdf' + assert from_page is None + assert to_page is None + + def test_single_page(self): + """Test parsing single page specification.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf[3]') + assert file_path == 'file.pdf' + assert from_page == 2 # 0-based index + assert to_page == 2 + + def test_range_from_start(self): + """Test parsing range from start to specified page.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf[:5]') + assert file_path == 'file.pdf' + assert from_page == 0 + assert to_page == 4 # 0-based index + + def test_range_to_end(self): + """Test parsing range from specified page to end.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf[3:]') + assert file_path == 'file.pdf' + assert from_page == 2 # 0-based index + assert to_page is None + + def test_range_both_bounds(self): + """Test parsing range with both start and end specified.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf[2:5]') + assert file_path == 'file.pdf' + assert from_page == 1 # 0-based index + assert to_page == 4 # 0-based index + + def test_path_with_spaces(self): + """Test parsing path with spaces.""" + file_path, from_page, to_page = parse_input_with_pages( + 'path with spaces/file.pdf[1:3]' + ) + assert file_path == 'path with spaces/file.pdf' + assert from_page == 0 + assert to_page == 2 + + def test_path_without_brackets(self): + """Test parsing path without brackets.""" + file_path, from_page, to_page = parse_input_with_pages('file.pdf') + assert file_path == 'file.pdf' + assert from_page is None + assert to_page is None + + +# Tests for collect_pdf_files_with_ranges +class TestCollectPdfFilesWithRanges: + """Tests for the collect_pdf_files_with_ranges function.""" + + @pytest.fixture + def sample_pdfs(self, tmp_path): + """Create sample PDF files for testing.""" + pdf1 = tmp_path / 'doc1.pdf' + pdf1.touch() + pdf2 = tmp_path / 'doc2.pdf' + pdf2.touch() + pdf3 = tmp_path / 'doc3.pdf' + pdf3.touch() + return {'pdf1': pdf1, 'pdf2': pdf2, 'pdf3': pdf3, 'tmp_path': tmp_path} + + @pytest.fixture + def pdf_folder(self, tmp_path): + """Create a folder with multiple PDFs.""" + folder = tmp_path / 'pdfs' + folder.mkdir() + + for i in range(1, 4): + pdf_path = folder / f'file{i}.pdf' + pdf_path.touch() + + return folder + + def test_single_file_no_range(self, sample_pdfs): + """Test collecting a single file without page range.""" + files = collect_pdf_files_with_ranges([str(sample_pdfs['pdf1'])]) + assert len(files) == 1 + assert files[0][0] == sample_pdfs['pdf1'] + assert files[0][1] is None # from_page + assert files[0][2] is None # to_page + + def test_single_file_with_range(self, sample_pdfs): + """Test collecting a single file with page range.""" + files = collect_pdf_files_with_ranges([f'{sample_pdfs["pdf1"]}[1:2]']) + assert len(files) == 1 + assert files[0][0] == sample_pdfs['pdf1'] + assert files[0][1] == 0 # from_page (0-based) + assert files[0][2] == 1 # to_page (0-based) + + def test_multiple_files(self, sample_pdfs): + """Test collecting multiple files.""" + files = collect_pdf_files_with_ranges( + [str(sample_pdfs['pdf1']), str(sample_pdfs['pdf2'])] + ) + assert len(files) == 2 + assert files[0][0] == sample_pdfs['pdf1'] + assert files[1][0] == sample_pdfs['pdf2'] + + def test_folder_input(self, pdf_folder): + """Test collecting PDFs from a folder.""" + files = collect_pdf_files_with_ranges([str(pdf_folder)]) + assert len(files) == 3 + # Files should be sorted alphabetically + file_names = [f[0].name for f in files] + assert file_names == ['file1.pdf', 'file2.pdf', 'file3.pdf'] + + def test_mixed_files_and_folders(self, sample_pdfs, pdf_folder): + """Test collecting from both files and folders.""" + files = collect_pdf_files_with_ranges( + [str(sample_pdfs['pdf1']), str(pdf_folder)] + ) + assert len(files) == 4 # 1 file + 3 from folder + + def test_nonexistent_file(self, tmp_path): + """Test handling of nonexistent file.""" + files = collect_pdf_files_with_ranges([str(tmp_path / 'nonexistent.pdf')]) + assert len(files) == 0 + + def test_non_pdf_file(self, tmp_path): + """Test handling of non-PDF file.""" + txt_file = tmp_path / 'file.txt' + txt_file.write_text('not a pdf') + files = collect_pdf_files_with_ranges([str(txt_file)]) + assert len(files) == 0 + + def test_empty_folder(self, tmp_path): + """Test handling of empty folder.""" + empty_folder = tmp_path / 'empty' + empty_folder.mkdir() + files = collect_pdf_files_with_ranges([str(empty_folder)]) + assert len(files) == 0 + + def test_folder_with_page_range_ignores_range(self, pdf_folder): + """Test that page ranges on folders are ignored.""" + files = collect_pdf_files_with_ranges([f'{pdf_folder}[1:3]']) + # Should still collect files but ignore the page range + assert len(files) == 3 + # All files should have no page ranges + for file_path, from_page, to_page in files: + assert from_page is None + assert to_page is None From 208a2eb8ebfd7e5fc47a08e91dd2d20edb2aff93 Mon Sep 17 00:00:00 2001 From: Alessio Vertemati Date: Thu, 8 Jan 2026 20:44:27 +0100 Subject: [PATCH 2/6] Ensure utils do not depend on console --- src/parxy_cli/commands/attach.py | 36 +++++++++++++++++++++++++---- src/parxy_cli/services/pdf_utils.py | 27 ++++++++-------------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/parxy_cli/commands/attach.py b/src/parxy_cli/commands/attach.py index 5b17995..7955cd6 100644 --- a/src/parxy_cli/commands/attach.py +++ b/src/parxy_cli/commands/attach.py @@ -76,7 +76,14 @@ def list_attachments( try: # Validate input file - input_path = validate_pdf_file(input_file) + try: + input_path = validate_pdf_file(input_file) + except FileNotFoundError as e: + console.error(str(e), panel=True) + raise + except ValueError as e: + console.error(str(e), panel=True) + raise # Use service to list attachments with PdfService(input_path) as pdf: @@ -189,7 +196,14 @@ def remove_attachment( raise ValueError('Cannot specify both attachment names and --all flag') # Validate input file - input_path = validate_pdf_file(input_file) + try: + input_path = validate_pdf_file(input_file) + except FileNotFoundError as e: + console.error(str(e), panel=True) + raise + except ValueError as e: + console.error(str(e), panel=True) + raise # Use service to handle attachments with PdfService(input_path) as pdf: @@ -375,7 +389,14 @@ def add_attachment( try: # Validate input file - input_path = validate_pdf_file(input_file) + try: + input_path = validate_pdf_file(input_file) + except FileNotFoundError as e: + console.error(str(e), panel=True) + raise + except ValueError as e: + console.error(str(e), panel=True) + raise # Validate all files to attach exist file_paths = [] @@ -526,7 +547,14 @@ def read_attachment( try: # Validate input file - input_path = validate_pdf_file(input_file) + try: + input_path = validate_pdf_file(input_file) + except FileNotFoundError as e: + console.error(str(e), panel=True) + raise + except ValueError as e: + console.error(str(e), panel=True) + raise # Use service to extract attachment with PdfService(input_path) as pdf: diff --git a/src/parxy_cli/services/pdf_utils.py b/src/parxy_cli/services/pdf_utils.py index a0fe76d..72ce88b 100644 --- a/src/parxy_cli/services/pdf_utils.py +++ b/src/parxy_cli/services/pdf_utils.py @@ -4,10 +4,6 @@ from pathlib import Path from typing import List, Tuple, Optional -from parxy_cli.console.console import Console - -console = Console() - def format_file_size(size_bytes: int) -> str: """ @@ -43,10 +39,8 @@ def validate_pdf_file(file_path: str) -> Path: """ path = Path(file_path) if not path.is_file(): - console.error(f'Input file not found: {file_path}', panel=True) raise FileNotFoundError(f'Input file not found: {file_path}') if path.suffix.lower() != '.pdf': - console.error(f'Input file must be a PDF: {file_path}', panel=True) raise ValueError(f'Input file must be a PDF: {file_path}') return path @@ -129,6 +123,12 @@ def collect_pdf_files_with_ranges( For folders, only files in the exact directory are collected (non-recursive). For files with page ranges (e.g., file.pdf[1:3]), parse and extract the range. + Silently skips: + - Non-existent paths + - Non-PDF files + - Empty directories + - Page ranges on directories (directories are processed without ranges) + Args: inputs: List of file paths (with optional page ranges) and/or folder paths @@ -147,22 +147,15 @@ def collect_pdf_files_with_ranges( # Check if it's a PDF if path.suffix.lower() == '.pdf': files.append((path, from_page, to_page)) - else: - console.warning(f'Skipping non-PDF file: {file_path_str}') + # Silently skip non-PDF files elif path.is_dir(): # Non-recursive: only files in the given directory - # Directories cannot have page ranges - if from_page is not None or to_page is not None: - console.warning( - f'Page ranges are not supported for directories: {input_str}' - ) + # Directories cannot have page ranges - ignore them if specified pdf_files = sorted(path.glob('*.pdf')) if pdf_files: # Add all PDFs from directory without page ranges files.extend([(f, None, None) for f in pdf_files]) - else: - console.warning(f'No PDF files found in directory: {file_path_str}') - else: - console.warning(f'Path not found: {file_path_str}') + # Silently skip empty directories + # Silently skip non-existent paths return files From 909e6e63a2d39765e9fa9668dd45ae149383b781 Mon Sep 17 00:00:00 2001 From: Alessio Vertemati Date: Thu, 8 Jan 2026 20:44:51 +0100 Subject: [PATCH 3/6] Code style --- src/parxy_cli/commands/attach.py | 12 +++++++++--- src/parxy_cli/services/pdf_service.py | 4 +--- tests/services/test_pdf_service.py | 8 ++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/parxy_cli/commands/attach.py b/src/parxy_cli/commands/attach.py index 7955cd6..57f657d 100644 --- a/src/parxy_cli/commands/attach.py +++ b/src/parxy_cli/commands/attach.py @@ -212,7 +212,9 @@ def remove_attachment( if not all_embeds: console.newline() - console.error(f'No attached files found in {input_path.name}', panel=True) + console.error( + f'No attached files found in {input_path.name}', panel=True + ) raise ValueError(f'No attached files found in {input_path.name}') # Determine which embeds to remove @@ -266,7 +268,9 @@ def remove_attachment( # Determine output path if output is None: - output_path = input_path.parent / f'{input_path.stem}_no_attachments.pdf' + output_path = ( + input_path.parent / f'{input_path.stem}_no_attachments.pdf' + ) else: output_path = Path(output) @@ -465,7 +469,9 @@ def add_attachment( # Determine output path if output is None: - output_path = input_path.parent / f'{input_path.stem}_with_attachments.pdf' + output_path = ( + input_path.parent / f'{input_path.stem}_with_attachments.pdf' + ) else: output_path = Path(output) diff --git a/src/parxy_cli/services/pdf_service.py b/src/parxy_cli/services/pdf_service.py index 46f763d..3f4a298 100644 --- a/src/parxy_cli/services/pdf_service.py +++ b/src/parxy_cli/services/pdf_service.py @@ -253,9 +253,7 @@ def merge_pdfs( f'Invalid page range for {file_path.name}: start page {actual_from + 1} > end page {actual_to + 1}' ) - merged_pdf.insert_pdf( - pdf, from_page=actual_from, to_page=actual_to - ) + merged_pdf.insert_pdf(pdf, from_page=actual_from, to_page=actual_to) pdf.close() diff --git a/tests/services/test_pdf_service.py b/tests/services/test_pdf_service.py index 95d3653..e69d21f 100644 --- a/tests/services/test_pdf_service.py +++ b/tests/services/test_pdf_service.py @@ -302,9 +302,7 @@ def test_split_into_individual_pages(self, multiple_pdfs): """Test splitting a PDF into individual pages.""" output_dir = multiple_pdfs['tmp_path'] / 'split' - output_files = PdfService.split_pdf( - multiple_pdfs['pdf1'], output_dir, 'doc1' - ) + output_files = PdfService.split_pdf(multiple_pdfs['pdf1'], output_dir, 'doc1') assert len(output_files) == 3 assert all(f.exists() for f in output_files) @@ -343,9 +341,7 @@ def test_split_creates_output_directory(self, multiple_pdfs): """Test that split creates output directory if needed.""" output_dir = multiple_pdfs['tmp_path'] / 'nested' / 'split' - output_files = PdfService.split_pdf( - multiple_pdfs['pdf1'], output_dir, 'doc1' - ) + output_files = PdfService.split_pdf(multiple_pdfs['pdf1'], output_dir, 'doc1') assert output_dir.exists() assert len(output_files) == 3 From 5744a8a2ade82790a71bafda1badcdea9d1df20f Mon Sep 17 00:00:00 2001 From: Alessio Vertemati Date: Fri, 9 Jan 2026 09:13:32 +0100 Subject: [PATCH 4/6] PDF must have a single page, so zero-page pdf cannot be tested --- src/parxy_cli/services/pdf_service.py | 2 +- tests/fixtures/pdf-with-no-content.pdf | Bin 0 -> 2316 bytes tests/services/test_pdf_service.py | 12 ------------ 3 files changed, 1 insertion(+), 13 deletions(-) create mode 100644 tests/fixtures/pdf-with-no-content.pdf diff --git a/src/parxy_cli/services/pdf_service.py b/src/parxy_cli/services/pdf_service.py index 3f4a298..a512b7f 100644 --- a/src/parxy_cli/services/pdf_service.py +++ b/src/parxy_cli/services/pdf_service.py @@ -286,7 +286,7 @@ def split_pdf(input_path: Path, output_dir: Path, prefix: str) -> List[Path]: raise FileNotFoundError(f'PDF file not found: {input_path}') pdf = pymupdf.open(input_path) - total_pages = len(pdf) + total_pages = pdf.page_count if total_pages == 0: pdf.close() diff --git a/tests/fixtures/pdf-with-no-content.pdf b/tests/fixtures/pdf-with-no-content.pdf new file mode 100644 index 0000000000000000000000000000000000000000..d23c069c5c4d4355e4077c21bbce3abb63e74387 GIT binary patch literal 2316 zcma)8&2HO95N^>Mfxg2+g;58A#2=XwNl*-8$wu6mt{@3+0V9Vsxsn-^Tw=MZXrJ;R zeUl#gNPUIQEGdbS6C=E6clMjv`P*-FGM!v@&E8Nu`2wG&nZzLc{zXMmo4(tv0V8&B^cAJF?bLxhJS1elZDtBEBRVPtlR}T6+D_VN$zyWImTB>qF-fz zA2+|}pFp$rJ!El0&v`}e47kL0?VdHX2gErudyeBe2*lKYj|5Am+#B;;f}kX}lC-LV zg1w{HEMLh7;-HPoB$Z6SZwe)tu!zSD%UMdnXr@(v|MUZVKVo?s(YDZo1CwKbOD@PM zbWACU(KF9_)*HMah3zL%a z@GRK;`DPJ4KSplcc^CMVS7k*8)=8D6dFfT74u#fd9;{KUcZh<>$5986-Hk{f!JWVt z@zR??;cGS6jXDof7M`weHk;n2-{WGXo388XhNW9p7ZBa@DVMbB=H)Mqfe9-kkrWb= zMbIF<=W99Yz@5sOINBL4)*@9V<49-F+1MDm*)zevt>;;>L+52rMbV3RrdPB8q;Y@@ z!`HD{b^u>YSFhuxUaa;SMJpQzuD}7f(g!4DJ}bPY&la4gLA%v_{rJ?6BQK@-Y7OtJ zPX0Mv)r&L6QS4T5^$58PK9glZav`=#V8JeQ%D?N*H=qc`0cgv8zd7 zJjZ<2dK_NH?2Wzj6 z&V2CQ13tWEq6CEiMv;&4v}JXLTCLyI3|9(JU)E=+Coo2p+$)msAvXpHDmXYfo=C~vix}` zGl8}7Bl%wDz<@cDS!Xa)j%0lkGH@h2b6(oB2kr~pz=cV9sA~>~P>8Ke3Yw%$H2*s0 z$uomMAW8UtfndQ@=2b#*?$o;pxjX$ Date: Fri, 9 Jan 2026 11:39:41 +0100 Subject: [PATCH 5/6] pdf optimize --- src/parxy_cli/services/pdf_service.py | 118 +++++++++++++ tests/services/test_pdf_service.py | 237 ++++++++++++++++++++++++++ 2 files changed, 355 insertions(+) diff --git a/src/parxy_cli/services/pdf_service.py b/src/parxy_cli/services/pdf_service.py index a512b7f..caa94a0 100644 --- a/src/parxy_cli/services/pdf_service.py +++ b/src/parxy_cli/services/pdf_service.py @@ -310,3 +310,121 @@ def split_pdf(input_path: Path, output_dir: Path, prefix: str) -> List[Path]: pdf.close() return output_files + + @staticmethod + def optimize_pdf( + input_path: Path, + output_path: Path, + scrub_metadata: bool = True, + subset_fonts: bool = True, + compress_images: bool = True, + dpi_threshold: int = 100, + dpi_target: int = 72, + image_quality: int = 60, + convert_to_grayscale: bool = False, + ) -> Dict[str, Any]: + """ + Optimize PDF file size using PyMuPDF's compression techniques. + + This method applies three optimization techniques: + 1. Dead-weight removal - Removes metadata, thumbnails, embedded files + 2. Font subsetting - Keeps only used glyphs from embedded fonts + 3. Image compression - Downsamples and compresses images + + Args: + input_path: Path to the input PDF file + output_path: Path where optimized PDF should be saved + scrub_metadata: Remove metadata, thumbnails, and embedded files + subset_fonts: Subset embedded fonts to only used glyphs + compress_images: Apply image compression and downsampling + dpi_threshold: Only process images above this DPI (default: 100) + dpi_target: Target DPI for downsampling (default: 72) + image_quality: JPEG quality level 0-100 (default: 60) + convert_to_grayscale: Convert images to grayscale (default: False) + + Returns: + Dictionary with optimization results: + - original_size: Original file size in bytes + - optimized_size: Optimized file size in bytes + - reduction_bytes: Size reduction in bytes + - reduction_percent: Size reduction as percentage + + Raises: + FileNotFoundError: If input PDF doesn't exist + ValueError: If parameters are invalid + + Example: + result = PdfService.optimize_pdf( + input_path=Path("large.pdf"), + output_path=Path("optimized.pdf"), + compress_images=True, + convert_to_grayscale=True + ) + print(f"Reduced by {result['reduction_percent']:.1f}%") + """ + if not input_path.is_file(): + raise FileNotFoundError(f'PDF file not found: {input_path}') + + if dpi_threshold < 1 or dpi_target < 1: + raise ValueError('DPI values must be positive') + + if not 0 <= image_quality <= 100: + raise ValueError('Image quality must be between 0 and 100') + + # Get original file size + original_size = input_path.stat().st_size + + pdf = pymupdf.open(input_path) + + try: + # 1. Dead-weight removal - scrub unwanted content + if scrub_metadata: + pdf.scrub( + metadata=True, # Clear basic metadata + xml_metadata=True, # Remove XML metadata + attached_files=True, # Delete file attachments + embedded_files=True, # Delete embedded files + thumbnails=True, # Strip page thumbnails + reset_fields=True, # Revert form fields to defaults + reset_responses=True, # Remove annotation replies + ) + + # 2. Font subsetting - keep only used glyphs + if subset_fonts: + pdf.subset_fonts() + + # 3. Advanced image compression + if compress_images: + pdf.rewrite_images( + dpi_threshold=dpi_threshold, + dpi_target=dpi_target, + quality=image_quality, + lossy=True, # Include lossy images + lossless=True, # Include lossless images + bitonal=True, # Include monochrome images + color=True, # Include colored images + gray=True, # Include gray-scale images + set_to_gray=convert_to_grayscale, # Convert to grayscale + ) + + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Save with advanced compression options + # ez_save() applies garbage=3, deflate=True, use_objstms=True + pdf.ez_save(str(output_path)) + + finally: + pdf.close() + + # Get optimized file size + optimized_size = output_path.stat().st_size + reduction_bytes = original_size - optimized_size + reduction_percent = (reduction_bytes / original_size * 100) if original_size > 0 else 0 + + return { + 'original_size': original_size, + 'optimized_size': optimized_size, + 'reduction_bytes': reduction_bytes, + 'reduction_percent': reduction_percent, + } diff --git a/tests/services/test_pdf_service.py b/tests/services/test_pdf_service.py index ad5f0d0..bd4ebd2 100644 --- a/tests/services/test_pdf_service.py +++ b/tests/services/test_pdf_service.py @@ -333,3 +333,240 @@ def test_split_creates_output_directory(self, multiple_pdfs): assert output_dir.exists() assert len(output_files) == 3 + + +class TestOptimizePdf: + """Tests for the optimize_pdf static method.""" + + @pytest.fixture + def pdf_with_metadata_and_attachments(self, tmp_path): + """Create a PDF with metadata, attachments for testing optimization.""" + pdf_path = tmp_path / 'heavy.pdf' + doc = pymupdf.open() + + # Add pages with text + for i in range(3): + page = doc.new_page(width=612, height=792) + page.insert_text((100, 100), f'Page {i + 1} content\n' * 20) + + # Add metadata + doc.set_metadata({ + 'author': 'Test Author', + 'title': 'Test Document', + 'subject': 'Testing PDF optimization', + 'keywords': 'test, optimization, metadata', + 'creator': 'Test Creator', + 'producer': 'PyMuPDF', + }) + + # Add attachments + attachment_content = b'This is test attachment content ' * 100 + doc.embfile_add( + name='attachment.txt', + buffer_=attachment_content, + filename='attachment.txt', + desc='Test attachment', + ) + + doc.save(str(pdf_path)) + doc.close() + return pdf_path + + def test_optimize_with_all_defaults(self, tmp_path): + """Test optimization with all default settings.""" + # Use existing fixture + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf(input_pdf, output_pdf) + + # Verify output exists + assert output_pdf.exists() + + # Verify result structure + assert 'original_size' in result + assert 'optimized_size' in result + assert 'reduction_bytes' in result + assert 'reduction_percent' in result + + # Verify sizes are positive + assert result['original_size'] > 0 + assert result['optimized_size'] > 0 + + # For PDF with attachments and metadata, should see reduction + assert result['reduction_bytes'] >= 0 + + def test_optimize_reduces_file_size(self, pdf_with_metadata_and_attachments, tmp_path): + """Test that optimization actually reduces file size.""" + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf( + pdf_with_metadata_and_attachments, + output_pdf, + scrub_metadata=True, + subset_fonts=True, + compress_images=True, + ) + + # Should reduce size due to metadata and attachment removal + assert result['optimized_size'] < result['original_size'] + assert result['reduction_bytes'] > 0 + assert result['reduction_percent'] > 0 + + def test_optimize_without_scrubbing(self, pdf_with_metadata_and_attachments, tmp_path): + """Test optimization without scrubbing metadata.""" + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf( + pdf_with_metadata_and_attachments, + output_pdf, + scrub_metadata=False, + subset_fonts=False, + compress_images=False, + ) + + # Should still work, but may not reduce size much + assert output_pdf.exists() + assert result['optimized_size'] > 0 + + # Verify metadata is preserved + doc = pymupdf.open(str(output_pdf)) + metadata = doc.metadata + assert metadata['author'] == 'Test Author' + assert metadata['title'] == 'Test Document' + + # Verify attachments are preserved + attachments = doc.embfile_names() + assert 'attachment.txt' in attachments + doc.close() + + def test_optimize_with_scrubbing_removes_metadata( + self, pdf_with_metadata_and_attachments, tmp_path + ): + """Test that scrubbing removes metadata and attachments.""" + output_pdf = tmp_path / 'optimized.pdf' + + PdfService.optimize_pdf( + pdf_with_metadata_and_attachments, + output_pdf, + scrub_metadata=True, + ) + + # Verify metadata is cleared + doc = pymupdf.open(str(output_pdf)) + metadata = doc.metadata + + # Metadata should be empty or default values + assert metadata['author'] == '' + assert metadata['title'] == '' + + # Verify attachments are removed + attachments = doc.embfile_names() + assert len(attachments) == 0 + doc.close() + + def test_optimize_with_grayscale_conversion(self, tmp_path): + """Test optimization with grayscale conversion.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf( + input_pdf, + output_pdf, + compress_images=True, + convert_to_grayscale=True, + ) + + assert output_pdf.exists() + assert result['optimized_size'] > 0 + + def test_optimize_with_custom_dpi_settings(self, tmp_path): + """Test optimization with custom DPI settings.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf( + input_pdf, + output_pdf, + compress_images=True, + dpi_threshold=150, + dpi_target=50, + image_quality=50, + ) + + assert output_pdf.exists() + assert result['optimized_size'] > 0 + + def test_optimize_file_not_found(self, tmp_path): + """Test optimization with nonexistent input file.""" + input_pdf = tmp_path / 'nonexistent.pdf' + output_pdf = tmp_path / 'optimized.pdf' + + with pytest.raises(FileNotFoundError): + PdfService.optimize_pdf(input_pdf, output_pdf) + + def test_optimize_invalid_dpi_threshold(self, tmp_path): + """Test optimization with invalid DPI threshold.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + with pytest.raises(ValueError, match='DPI values must be positive'): + PdfService.optimize_pdf( + input_pdf, + output_pdf, + dpi_threshold=-1, + ) + + def test_optimize_invalid_dpi_target(self, tmp_path): + """Test optimization with invalid DPI target.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + with pytest.raises(ValueError, match='DPI values must be positive'): + PdfService.optimize_pdf( + input_pdf, + output_pdf, + dpi_target=0, + ) + + def test_optimize_invalid_image_quality(self, tmp_path): + """Test optimization with invalid image quality.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'optimized.pdf' + + with pytest.raises(ValueError, match='Image quality must be between 0 and 100'): + PdfService.optimize_pdf( + input_pdf, + output_pdf, + image_quality=150, + ) + + def test_optimize_creates_output_directory(self, tmp_path): + """Test that optimize creates output directory if needed.""" + input_pdf = Path('tests/fixtures/pdf-with-attachment.pdf') + output_pdf = tmp_path / 'nested' / 'subdir' / 'optimized.pdf' + + result = PdfService.optimize_pdf(input_pdf, output_pdf) + + assert output_pdf.exists() + assert output_pdf.parent.exists() + assert result['optimized_size'] > 0 + + def test_optimize_calculates_reduction_correctly( + self, pdf_with_metadata_and_attachments, tmp_path + ): + """Test that size reduction calculations are correct.""" + output_pdf = tmp_path / 'optimized.pdf' + + result = PdfService.optimize_pdf( + pdf_with_metadata_and_attachments, + output_pdf, + scrub_metadata=True, + ) + + # Verify reduction calculation + expected_reduction = result['original_size'] - result['optimized_size'] + assert result['reduction_bytes'] == expected_reduction + + expected_percent = (expected_reduction / result['original_size']) * 100 + assert abs(result['reduction_percent'] - expected_percent) < 0.01 From bca466ea3005656155c540174b7140d4537e24e3 Mon Sep 17 00:00:00 2001 From: Alessio Vertemati Date: Fri, 9 Jan 2026 11:40:42 +0100 Subject: [PATCH 6/6] Code style --- src/parxy_cli/services/pdf_service.py | 4 +++- tests/services/test_pdf_service.py | 26 ++++++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/parxy_cli/services/pdf_service.py b/src/parxy_cli/services/pdf_service.py index caa94a0..8f6fb9f 100644 --- a/src/parxy_cli/services/pdf_service.py +++ b/src/parxy_cli/services/pdf_service.py @@ -420,7 +420,9 @@ def optimize_pdf( # Get optimized file size optimized_size = output_path.stat().st_size reduction_bytes = original_size - optimized_size - reduction_percent = (reduction_bytes / original_size * 100) if original_size > 0 else 0 + reduction_percent = ( + (reduction_bytes / original_size * 100) if original_size > 0 else 0 + ) return { 'original_size': original_size, diff --git a/tests/services/test_pdf_service.py b/tests/services/test_pdf_service.py index bd4ebd2..ffd07a7 100644 --- a/tests/services/test_pdf_service.py +++ b/tests/services/test_pdf_service.py @@ -350,14 +350,16 @@ def pdf_with_metadata_and_attachments(self, tmp_path): page.insert_text((100, 100), f'Page {i + 1} content\n' * 20) # Add metadata - doc.set_metadata({ - 'author': 'Test Author', - 'title': 'Test Document', - 'subject': 'Testing PDF optimization', - 'keywords': 'test, optimization, metadata', - 'creator': 'Test Creator', - 'producer': 'PyMuPDF', - }) + doc.set_metadata( + { + 'author': 'Test Author', + 'title': 'Test Document', + 'subject': 'Testing PDF optimization', + 'keywords': 'test, optimization, metadata', + 'creator': 'Test Creator', + 'producer': 'PyMuPDF', + } + ) # Add attachments attachment_content = b'This is test attachment content ' * 100 @@ -396,7 +398,9 @@ def test_optimize_with_all_defaults(self, tmp_path): # For PDF with attachments and metadata, should see reduction assert result['reduction_bytes'] >= 0 - def test_optimize_reduces_file_size(self, pdf_with_metadata_and_attachments, tmp_path): + def test_optimize_reduces_file_size( + self, pdf_with_metadata_and_attachments, tmp_path + ): """Test that optimization actually reduces file size.""" output_pdf = tmp_path / 'optimized.pdf' @@ -413,7 +417,9 @@ def test_optimize_reduces_file_size(self, pdf_with_metadata_and_attachments, tmp assert result['reduction_bytes'] > 0 assert result['reduction_percent'] > 0 - def test_optimize_without_scrubbing(self, pdf_with_metadata_and_attachments, tmp_path): + def test_optimize_without_scrubbing( + self, pdf_with_metadata_and_attachments, tmp_path + ): """Test optimization without scrubbing metadata.""" output_pdf = tmp_path / 'optimized.pdf'