diff --git a/src/document_analysis_mcp/cache/__init__.py b/src/document_analysis_mcp/cache/__init__.py index aa11907..9876de5 100644 --- a/src/document_analysis_mcp/cache/__init__.py +++ b/src/document_analysis_mcp/cache/__init__.py @@ -8,11 +8,14 @@ - TTL-based cache expiration (configurable via CACHE_TTL_DAYS) - JSON storage for cache metadata and results - Automatic cleanup of expired entries +- Thread-safe operations with file locking and in-memory locks """ +import fcntl import hashlib import json import logging +import threading import time from dataclasses import dataclass, field from datetime import datetime, timezone @@ -109,38 +112,77 @@ def __init__(self, cache_dir: Path | None = None, ttl_days: int | None = None) - self._ttl_days = ttl_days or settings.cache_ttl_days self._metadata: dict[str, CacheEntry] = {} self._initialized = False + self._lock = threading.Lock() # Thread safety for in-memory metadata def _ensure_initialized(self) -> None: - """Ensure cache directory exists and metadata is loaded.""" + """Ensure cache directory exists and metadata is loaded. + + Thread-safe: Uses file locking for metadata reads and in-memory lock + for initialization state. + """ if self._initialized: return - # Create cache directory - self._cache_dir.mkdir(parents=True, exist_ok=True) + with self._lock: + # Double-check after acquiring lock + if self._initialized: + return + + # Create cache directory + self._cache_dir.mkdir(parents=True, exist_ok=True) + + # Load existing metadata with file locking + self._metadata = self._load_metadata() + + self._initialized = True - # Load existing metadata + def _load_metadata(self) -> dict[str, CacheEntry]: + """Load cache metadata from disk with file locking. + + Returns: + Dictionary of cache entries keyed by cache key. + """ metadata_path = self._cache_dir / METADATA_FILE - if metadata_path.exists(): - try: - with open(metadata_path) as f: - data = json.load(f) - self._metadata = { - key: CacheEntry.from_dict(entry) for key, entry in data.items() - } - logger.debug("Loaded %d cache entries from %s", len(self._metadata), metadata_path) - except (json.JSONDecodeError, KeyError, ValueError) as e: - logger.warning("Failed to load cache metadata: %s", e) - self._metadata = {} + if not metadata_path.exists(): + return {} - self._initialized = True + try: + with open(metadata_path) as f: + fcntl.flock(f.fileno(), fcntl.LOCK_SH) # Shared lock for reading + try: + data = json.load(f) + entries = {key: CacheEntry.from_dict(entry) for key, entry in data.items()} + logger.debug("Loaded %d cache entries from %s", len(entries), metadata_path) + return entries + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.warning("Failed to load cache metadata: %s", e) + return {} + except OSError as e: + logger.error("Failed to read cache metadata file: %s", e) + return {} def _save_metadata(self) -> None: - """Save cache metadata to disk.""" + """Save cache metadata to disk with file locking. + + Uses exclusive file lock to prevent concurrent writes from corrupting + the metadata file. + """ metadata_path = self._cache_dir / METADATA_FILE try: - with open(metadata_path, "w") as f: - data = {key: entry.to_dict() for key, entry in self._metadata.items()} - json.dump(data, f, indent=2) + # Write to temp file first, then rename for atomicity + temp_path = metadata_path.with_suffix(".tmp") + with open(temp_path, "w") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Exclusive lock for writing + try: + data = {key: entry.to_dict() for key, entry in self._metadata.items()} + json.dump(data, f, indent=2) + f.flush() + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock + # Atomic rename + temp_path.rename(metadata_path) except OSError as e: logger.error("Failed to save cache metadata: %s", e) @@ -178,6 +220,8 @@ def get( ) -> dict[str, Any] | None: """Retrieve a cached result for the given content and operation. + Thread-safe: Uses in-memory lock for metadata access. + Args: content: PDF content (base64 string or bytes). operation: Operation type to look up. @@ -193,18 +237,20 @@ def get( if params_hash: cache_key = f"{cache_key}:{params_hash}" - entry = self._metadata.get(cache_key) - if entry is None: - logger.debug("Cache miss: %s (not found)", cache_key[:16]) - return None + with self._lock: + entry = self._metadata.get(cache_key) + if entry is None: + logger.debug("Cache miss: %s (not found)", cache_key[:16]) + return None - if entry.is_expired(): - logger.debug("Cache miss: %s (expired)", cache_key[:16]) - self._remove_entry(cache_key) - return None + if entry.is_expired(): + logger.debug("Cache miss: %s (expired)", cache_key[:16]) + self._remove_entry_unlocked(cache_key) + return None + + result_path = Path(entry.file_path) - # Load cached result - result_path = Path(entry.file_path) + # Read file outside the lock (file I/O can be slow) if not result_path.exists(): logger.warning("Cache file missing: %s", result_path) self._remove_entry(cache_key) @@ -230,6 +276,9 @@ def put( ) -> str: """Store a result in the cache. + Thread-safe: Uses in-memory lock for metadata updates and file locking + for disk persistence. + Args: content: Original PDF content (base64 string or bytes). operation: Operation type being cached. @@ -260,7 +309,7 @@ def put( tz=timezone.utc, ) - # Save result file + # Save result file (outside lock since file I/O is slow) try: with open(result_path, "w") as f: json.dump(result, f, indent=2) @@ -268,7 +317,7 @@ def put( logger.error("Failed to write cache file: %s", e) return content_hash - # Create and save entry + # Create entry and update metadata with lock entry = CacheEntry( content_hash=content_hash, created_at=now, @@ -277,8 +326,10 @@ def put( operation=operation, metadata=metadata or {}, ) - self._metadata[cache_key] = entry - self._save_metadata() + + with self._lock: + self._metadata[cache_key] = entry + self._save_metadata() logger.info( "Cached result: %s (expires %s)", @@ -290,6 +341,19 @@ def put( def _remove_entry(self, cache_key: str) -> None: """Remove a cache entry and its associated file. + Thread-safe: Acquires the lock before modifying metadata. + + Args: + cache_key: Cache key to remove. + """ + with self._lock: + self._remove_entry_unlocked(cache_key) + + def _remove_entry_unlocked(self, cache_key: str) -> None: + """Remove a cache entry without acquiring the lock. + + Must be called while holding self._lock. + Args: cache_key: Cache key to remove. """ @@ -304,15 +368,18 @@ def _remove_entry(self, cache_key: str) -> None: def cleanup_expired(self) -> int: """Remove all expired cache entries. + Thread-safe: Acquires the lock for the entire cleanup operation. + Returns: Number of entries removed. """ self._ensure_initialized() - expired_keys = [key for key, entry in self._metadata.items() if entry.is_expired()] + with self._lock: + expired_keys = [key for key, entry in self._metadata.items() if entry.is_expired()] - for key in expired_keys: - self._remove_entry(key) + for key in expired_keys: + self._remove_entry_unlocked(key) if expired_keys: logger.info("Cleaned up %d expired cache entries", len(expired_keys)) @@ -322,14 +389,17 @@ def cleanup_expired(self) -> int: def clear(self) -> int: """Clear all cache entries. + Thread-safe: Acquires the lock for the entire clear operation. + Returns: Number of entries removed. """ self._ensure_initialized() - count = len(self._metadata) - for key in list(self._metadata.keys()): - self._remove_entry(key) + with self._lock: + count = len(self._metadata) + for key in list(self._metadata.keys()): + self._remove_entry_unlocked(key) logger.info("Cleared %d cache entries", count) return count @@ -337,30 +407,37 @@ def clear(self) -> int: def get_stats(self) -> dict[str, Any]: """Get cache statistics. + Thread-safe: Acquires the lock to read metadata. + Returns: Dictionary containing cache statistics. """ self._ensure_initialized() - expired_count = sum(1 for e in self._metadata.values() if e.is_expired()) - operations = {} - for entry in self._metadata.values(): - operations[entry.operation] = operations.get(entry.operation, 0) + 1 + with self._lock: + expired_count = sum(1 for e in self._metadata.values() if e.is_expired()) + operations: dict[str, int] = {} + for entry in self._metadata.values(): + operations[entry.operation] = operations.get(entry.operation, 0) + 1 + + # Collect file paths while holding lock + file_paths = [Path(entry.file_path) for entry in self._metadata.values()] + total_entries = len(self._metadata) - # Calculate total cache size + # Calculate total cache size outside lock (file I/O is slow) total_size = 0 - for entry in self._metadata.values(): + for file_path in file_paths: try: - total_size += Path(entry.file_path).stat().st_size + total_size += file_path.stat().st_size except OSError: pass return { "cache_dir": str(self._cache_dir), "ttl_days": self._ttl_days, - "total_entries": len(self._metadata), + "total_entries": total_entries, "expired_entries": expired_count, - "valid_entries": len(self._metadata) - expired_count, + "valid_entries": total_entries - expired_count, "operations": operations, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2), diff --git a/src/document_analysis_mcp/tools/ocr.py b/src/document_analysis_mcp/tools/ocr.py index 7799e79..5ed9e71 100644 --- a/src/document_analysis_mcp/tools/ocr.py +++ b/src/document_analysis_mcp/tools/ocr.py @@ -33,6 +33,44 @@ DEFAULT_LANGUAGE = "eng" # Tesseract language code MIN_TEXT_CHARS = 100 # Minimum chars before OCR is considered necessary +# Valid Tesseract language codes (common languages) +# Full list at: https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html +VALID_LANGUAGES = frozenset( + { + # Western European + "eng", # English + "fra", # French + "deu", # German + "spa", # Spanish + "ita", # Italian + "por", # Portuguese + "nld", # Dutch + # Eastern European + "rus", # Russian + "pol", # Polish + "ces", # Czech + "ukr", # Ukrainian + # Asian + "chi_sim", # Chinese Simplified + "chi_tra", # Chinese Traditional + "jpn", # Japanese + "kor", # Korean + "vie", # Vietnamese + "tha", # Thai + # Middle Eastern + "ara", # Arabic + "heb", # Hebrew + "tur", # Turkish + # South Asian + "hin", # Hindi + # Nordic + "dan", # Danish + "fin", # Finnish + "nor", # Norwegian + "swe", # Swedish + } +) + def _is_tesseract_available() -> bool: """Check if Tesseract OCR is available. @@ -49,6 +87,27 @@ def _is_tesseract_available() -> bool: return False +def _validate_language(language: str) -> str: + """Validate and normalize the Tesseract language code. + + Args: + language: Language code to validate. + + Returns: + Valid language code (original if valid, DEFAULT_LANGUAGE if not). + """ + if language in VALID_LANGUAGES: + return language + + logger.warning( + "Invalid language code '%s', using default '%s'. Valid languages: %s", + language, + DEFAULT_LANGUAGE, + ", ".join(sorted(VALID_LANGUAGES)), + ) + return DEFAULT_LANGUAGE + + def _page_to_image(page: Page, dpi: int = DEFAULT_DPI) -> Image.Image: """Convert a pdfplumber page to a PIL Image. @@ -156,6 +215,9 @@ def pdf_ocr( # Validate file size _validate_base64_size(pdf_content, max_file_size_mb) + # Validate and normalize language parameter + language = _validate_language(language) + # Check Tesseract availability if not _is_tesseract_available(): return { diff --git a/src/document_analysis_mcp/tracking/__init__.py b/src/document_analysis_mcp/tracking/__init__.py index 09200b7..94bd915 100644 --- a/src/document_analysis_mcp/tracking/__init__.py +++ b/src/document_analysis_mcp/tracking/__init__.py @@ -7,8 +7,10 @@ - Historical usage data Data is stored in JSON Lines format for efficient append-only writes. +Thread-safe with file locking for concurrent append operations. """ +import fcntl import json import logging from dataclasses import dataclass, field @@ -196,10 +198,15 @@ def record( metadata=metadata or {}, ) - # Append to tracking file + # Append to tracking file with file locking try: with open(self._tracking_file, "a") as f: - f.write(json.dumps(record.to_dict()) + "\n") + fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Exclusive lock for writing + try: + f.write(json.dumps(record.to_dict()) + "\n") + f.flush() # Ensure data is written before releasing lock + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock except OSError as e: logger.error("Failed to write usage record: %s", e) @@ -242,28 +249,32 @@ def get_records( try: with open(self._tracking_file) as f: - for line in f: - line = line.strip() - if not line: - continue - try: - data = json.loads(line) - record = UsageRecord.from_dict(data) - - # Apply filters - if operation and record.operation != operation: + fcntl.flock(f.fileno(), fcntl.LOCK_SH) # Shared lock for reading + try: + for line in f: + line = line.strip() + if not line: continue - if model and record.model != model: + try: + data = json.loads(line) + record = UsageRecord.from_dict(data) + + # Apply filters + if operation and record.operation != operation: + continue + if model and record.model != model: + continue + if since and record.timestamp < since: + continue + if until and record.timestamp > until: + continue + + records.append(record) + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.debug("Skipping malformed record: %s", e) continue - if since and record.timestamp < since: - continue - if until and record.timestamp > until: - continue - - records.append(record) - except (json.JSONDecodeError, KeyError, ValueError) as e: - logger.debug("Skipping malformed record: %s", e) - continue + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Release lock except OSError as e: logger.error("Failed to read usage records: %s", e) return [] diff --git a/tests/test_cache.py b/tests/test_cache.py index c271098..8a4573b 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,6 +4,7 @@ """ import tempfile +import threading from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import patch @@ -317,3 +318,144 @@ def test_uses_settings(self): assert cache._cache_dir == Path("/tmp/test-cache") assert cache._ttl_days == 14 + + +class TestCacheThreadSafety: + """Tests for thread-safety of cache operations.""" + + def test_concurrent_puts(self): + """Test that concurrent put operations don't corrupt metadata.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = DocumentCache(cache_dir=Path(tmpdir), ttl_days=30) + errors: list[Exception] = [] + num_threads = 10 + iterations_per_thread = 10 + + def worker(thread_id: int) -> None: + try: + for i in range(iterations_per_thread): + content = f"content-{thread_id}-{i}" + result = {"thread": thread_id, "iteration": i} + cache.put(content, "extract", result) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" + + # All entries should be present (or at least no corruption) + stats = cache.get_stats() + assert stats["total_entries"] == num_threads * iterations_per_thread + + def test_concurrent_gets(self): + """Test that concurrent get operations are safe.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = DocumentCache(cache_dir=Path(tmpdir), ttl_days=30) + + # Pre-populate cache + for i in range(10): + cache.put(f"content-{i}", "extract", {"id": i}) + + errors: list[Exception] = [] + results: list[dict | None] = [] + lock = threading.Lock() + + def reader(content_id: int) -> None: + try: + for _ in range(20): + result = cache.get(f"content-{content_id}", "extract") + with lock: + results.append(result) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=reader, args=(i % 10,)) for i in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" + # All results should be valid + assert all(r is not None for r in results) + + def test_concurrent_put_and_get(self): + """Test that concurrent put and get operations are safe.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = DocumentCache(cache_dir=Path(tmpdir), ttl_days=30) + errors: list[Exception] = [] + + # Pre-populate some entries + for i in range(5): + cache.put(f"initial-{i}", "extract", {"type": "initial", "id": i}) + + def writer(thread_id: int) -> None: + try: + for i in range(10): + content = f"content-{thread_id}-{i}" + result = {"thread": thread_id, "iteration": i} + cache.put(content, "extract", result) + except Exception as e: + errors.append(e) + + def reader() -> None: + try: + for _ in range(50): + # Try to read existing entries + for i in range(5): + cache.get(f"initial-{i}", "extract") + except Exception as e: + errors.append(e) + + threads = [] + # Add writer threads + for i in range(5): + threads.append(threading.Thread(target=writer, args=(i,))) + # Add reader threads + for _ in range(5): + threads.append(threading.Thread(target=reader)) + + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" + + def test_concurrent_cleanup(self): + """Test that concurrent cleanup operations are safe.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = DocumentCache(cache_dir=Path(tmpdir), ttl_days=30) + errors: list[Exception] = [] + + # Add entries + for i in range(20): + cache.put(f"content-{i}", "extract", {"id": i}) + + # Expire half of them + for key in list(cache._metadata.keys())[:10]: + cache._metadata[key].expires_at = datetime.now(timezone.utc) - timedelta(days=1) + + def cleanup_worker() -> None: + try: + for _ in range(5): + cache.cleanup_expired() + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=cleanup_worker) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" diff --git a/tests/test_ocr.py b/tests/test_ocr.py index ff1a9ed..8c09ad3 100644 --- a/tests/test_ocr.py +++ b/tests/test_ocr.py @@ -16,8 +16,10 @@ DEFAULT_DPI, DEFAULT_LANGUAGE, MIN_TEXT_CHARS, + VALID_LANGUAGES, _is_tesseract_available, _validate_base64_size, + _validate_language, pdf_ocr, ) @@ -235,3 +237,54 @@ def test_default_language(self): def test_min_text_chars(self): """Test minimum text chars threshold.""" assert MIN_TEXT_CHARS == 100 + + +class TestLanguageValidation: + """Tests for language parameter validation.""" + + def test_valid_english(self): + """Test that English is valid.""" + assert _validate_language("eng") == "eng" + + def test_valid_other_languages(self): + """Test that other common languages are valid.""" + valid_codes = ["fra", "deu", "spa", "ita", "chi_sim", "chi_tra", "jpn", "kor", "ara"] + for code in valid_codes: + assert _validate_language(code) == code + + def test_invalid_returns_default(self): + """Test that invalid language codes return the default.""" + assert _validate_language("invalid") == DEFAULT_LANGUAGE + assert _validate_language("xyz") == DEFAULT_LANGUAGE + assert _validate_language("") == DEFAULT_LANGUAGE + + def test_case_sensitive(self): + """Test that language codes are case-sensitive.""" + # Tesseract uses lowercase codes + assert _validate_language("ENG") == DEFAULT_LANGUAGE + assert _validate_language("Eng") == DEFAULT_LANGUAGE + + def test_valid_languages_contains_common_codes(self): + """Test that VALID_LANGUAGES includes common language codes.""" + common_codes = {"eng", "fra", "deu", "spa", "chi_sim", "jpn", "ara"} + assert common_codes.issubset(VALID_LANGUAGES) + + @patch("document_analysis_mcp.tools.ocr.get_tracker") + @patch("document_analysis_mcp.tools.ocr.get_cache") + @patch("document_analysis_mcp.tools.ocr._is_tesseract_available") + def test_invalid_language_uses_default_in_pdf_ocr( + self, mock_tesseract, mock_cache, mock_tracker + ): + """Test that pdf_ocr uses default language for invalid codes.""" + mock_tesseract.return_value = True + mock_cache.return_value.get.return_value = None + mock_tracker.return_value.record.return_value = MagicMock() + + pdf_bytes = create_simple_pdf("Test content") + pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8") + + result = pdf_ocr(pdf_b64, language="invalid_lang", use_cache=False) + + assert result["success"] + # The result should show the default language was used + assert result["ocr_stats"]["language"] == DEFAULT_LANGUAGE diff --git a/tests/test_tracking.py b/tests/test_tracking.py index 51f08ca..4f3a145 100644 --- a/tests/test_tracking.py +++ b/tests/test_tracking.py @@ -4,6 +4,7 @@ """ import tempfile +import threading from datetime import datetime, timedelta, timezone from pathlib import Path @@ -342,3 +343,141 @@ def test_returns_same_instance(self): tracker2 = get_tracker() assert tracker1 is tracker2 + + +class TestTrackerThreadSafety: + """Tests for thread-safety of tracking operations.""" + + def test_concurrent_records(self): + """Test that concurrent record operations don't corrupt the file.""" + with tempfile.TemporaryDirectory() as tmpdir: + tracker = UsageTracker(tracking_dir=Path(tmpdir)) + errors: list[Exception] = [] + num_threads = 10 + records_per_thread = 20 + + def worker(thread_id: int) -> None: + try: + for i in range(records_per_thread): + tracker.record( + operation=f"op-{thread_id}", + model="claude-sonnet-4-20250514", + input_tokens=100 * i, + output_tokens=50 * i, + processing_time_ms=100.0, + metadata={"thread": thread_id, "iteration": i}, + ) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" + + # All records should be present and readable + records = tracker.get_records() + assert len(records) == num_threads * records_per_thread + + def test_concurrent_record_and_read(self): + """Test that concurrent record and read operations are safe.""" + with tempfile.TemporaryDirectory() as tmpdir: + tracker = UsageTracker(tracking_dir=Path(tmpdir)) + errors: list[Exception] = [] + + # Pre-populate with some records + for i in range(10): + tracker.record( + operation="initial", + model="sonnet", + input_tokens=i * 100, + output_tokens=i * 50, + ) + + def writer(thread_id: int) -> None: + try: + for i in range(15): + tracker.record( + operation=f"write-{thread_id}", + model="sonnet", + input_tokens=i * 100, + output_tokens=i * 50, + ) + except Exception as e: + errors.append(e) + + def reader() -> None: + try: + for _ in range(20): + tracker.get_records() + tracker.get_summary() + except Exception as e: + errors.append(e) + + threads = [] + # Add writer threads + for i in range(5): + threads.append(threading.Thread(target=writer, args=(i,))) + # Add reader threads + for _ in range(5): + threads.append(threading.Thread(target=reader)) + + for t in threads: + t.start() + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0, f"Errors occurred: {errors}" + + # Should have 10 initial + (5 writers * 15 records) = 85 records + records = tracker.get_records() + assert len(records) == 10 + 5 * 15 + + def test_no_corrupted_lines(self): + """Test that concurrent writes don't interleave lines.""" + with tempfile.TemporaryDirectory() as tmpdir: + tracker = UsageTracker(tracking_dir=Path(tmpdir)) + num_threads = 20 + records_per_thread = 50 + + def worker(thread_id: int) -> None: + for i in range(records_per_thread): + tracker.record( + operation=f"thread-{thread_id}-op-{i}", + model="claude-sonnet-4-20250514", + input_tokens=thread_id * 1000 + i, + output_tokens=thread_id * 500 + i, + ) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Read the raw file and verify each line is valid JSON + tracking_file = Path(tmpdir) / TRACKING_FILE + with open(tracking_file) as f: + lines = f.readlines() + + valid_lines = 0 + corrupted_lines = 0 + for line in lines: + line = line.strip() + if not line: + continue + try: + import json + + json.loads(line) + valid_lines += 1 + except json.JSONDecodeError: + corrupted_lines += 1 + + assert corrupted_lines == 0, f"Found {corrupted_lines} corrupted lines" + assert valid_lines == num_threads * records_per_thread