Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 60 additions & 53 deletions src/diagnose_vault.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Diagnose vault files to find empty string issues."""

import asyncio
import os
import sys
from pathlib import Path
Expand All @@ -9,61 +10,67 @@
from src.embedder import VoyageEmbedder
from src.exceptions import EmbeddingError

vault_path = Path(os.getenv("OBSIDIAN_VAULT_PATH", "/vault"))
embedder = VoyageEmbedder()

print(f"Scanning vault: {vault_path}")
md_files = list(vault_path.rglob("*.md"))
print(f"Found {len(md_files)} markdown files\n")
async def diagnose():
vault_path = Path(os.getenv("OBSIDIAN_VAULT_PATH", "/vault"))
embedder = VoyageEmbedder()

problematic_files = []
empty_files = []
successful = 0
print(f"Scanning vault: {vault_path}")
md_files = list(vault_path.rglob("*.md"))
print(f"Found {len(md_files)} markdown files\n")

for i, file_path in enumerate(md_files, 1):
try:
with open(file_path, encoding="utf-8") as f:
content = f.read().strip()
problematic_files = []
empty_files = []
successful = 0

rel_path = str(file_path.relative_to(vault_path))

if not content:
empty_files.append(rel_path)
print(f"[{i}/{len(md_files)}] EMPTY: {rel_path}")
continue

if len(content) < 5:
print(f"[{i}/{len(md_files)}] VERY SHORT ({len(content)} chars): {rel_path}")

# Try to embed single file
for i, file_path in enumerate(md_files, 1):
try:
embedding = embedder.embed(content, input_type="document")
successful += 1
if i % 10 == 0:
print(f"[{i}/{len(md_files)}] ✓ Progress: {successful} successful")

except EmbeddingError as e:
problematic_files.append((rel_path, f"Embedding failed: {e}"))
print(f"[{i}/{len(md_files)}] FAILED: {rel_path} - {e}")

except Exception as e:
problematic_files.append((rel_path, str(e)))
print(f"[{i}/{len(md_files)}] ERROR: {rel_path} - {e}")

print(f"\n{'=' * 60}")
print("Summary:")
print(f" Successful: {successful}/{len(md_files)}")
print(f" Empty files: {len(empty_files)}")
print(f" Problematic: {len(problematic_files)}")

if empty_files:
print(f"\nEmpty files ({len(empty_files)}):")
for path in empty_files[:10]:
print(f" - {path}")
if len(empty_files) > 10:
print(f" ... and {len(empty_files) - 10} more")

if problematic_files:
print(f"\nProblematic files ({len(problematic_files)}):")
for path, error in problematic_files[:10]:
print(f" - {path}: {error}")
with open(file_path, encoding="utf-8") as f:
content = f.read().strip()

rel_path = str(file_path.relative_to(vault_path))

if not content:
empty_files.append(rel_path)
print(f"[{i}/{len(md_files)}] EMPTY: {rel_path}")
continue

if len(content) < 5:
print(f"[{i}/{len(md_files)}] VERY SHORT " f"({len(content)} chars): {rel_path}")

# Try to embed single file
try:
await embedder.embed(content, input_type="document")
successful += 1
if i % 10 == 0:
print(f"[{i}/{len(md_files)}] " f"Progress: {successful} successful")

except EmbeddingError as e:
problematic_files.append((rel_path, f"Embedding failed: {e}"))
print(f"[{i}/{len(md_files)}] FAILED: {rel_path} - {e}")

except Exception as e:
problematic_files.append((rel_path, str(e)))
print(f"[{i}/{len(md_files)}] ERROR: {rel_path} - {e}")
Comment on lines +53 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 rel_path used before assignment in outer exception handler

If open(file_path) at line 28 or f.read() at line 29 throws an exception, execution jumps to the outer except Exception at line 52. At that point, rel_path (assigned at line 31) has not been defined yet. On the first loop iteration this causes a NameError, crashing the diagnostic tool. On subsequent iterations, rel_path retains the value from the previous iteration, silently attributing the error to the wrong file.

Suggested change
problematic_files.append((rel_path, str(e)))
print(f"[{i}/{len(md_files)}] ERROR: {rel_path} - {e}")
rel_path = str(file_path.relative_to(vault_path)) if vault_path in file_path.parents or file_path.parent == vault_path else str(file_path)
problematic_files.append((rel_path, str(e)))
print(f"[{i}/{len(md_files)}] ERROR: {rel_path} - {e}")
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


print(f"\n{'=' * 60}")
print("Summary:")
print(f" Successful: {successful}/{len(md_files)}")
print(f" Empty files: {len(empty_files)}")
print(f" Problematic: {len(problematic_files)}")

if empty_files:
print(f"\nEmpty files ({len(empty_files)}):")
for path in empty_files[:10]:
print(f" - {path}")
if len(empty_files) > 10:
print(f" ... and {len(empty_files) - 10} more")

if problematic_files:
print(f"\nProblematic files ({len(problematic_files)}):")
for path, error in problematic_files[:10]:
print(f" - {path}: {error}")


if __name__ == "__main__":
asyncio.run(diagnose())
51 changes: 30 additions & 21 deletions src/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def chunk_text(self, text: str, chunk_size: int = 2000, overlap: int = 0) -> lis
)
return chunks

def embed_with_chunks(
async def embed_with_chunks(
self, text: str, chunk_size: int = 2000, input_type: str = "document"
) -> tuple[list[list[float]], int]:
"""
Expand Down Expand Up @@ -174,7 +174,7 @@ def embed_with_chunks(
# If under limit, embed whole
if estimated_tokens < 30000:
try:
embedding = self.embed(text, input_type=input_type)
embedding = await self.embed(text, input_type=input_type)
return ([embedding], 1)
except EmbeddingError as e:
if _is_token_limit_error(e):
Expand Down Expand Up @@ -206,12 +206,12 @@ def embed_with_chunks(
while i < len(chunks):
chunk_batch = chunks[i : i + batch_size]

# Rate limit
self._rate_limit_sync()
# Rate limit (async - non-blocking)
await self._rate_limit_async()

try:
# Embed this batch of chunks with context
result = self._call_api_with_retry(
# Embed this batch of chunks with context (runs in thread pool)
result = await self._call_api_with_timeout(
self.client.contextualized_embed,
inputs=[chunk_batch], # One document's chunks
model=self.model,
Expand All @@ -230,7 +230,7 @@ def embed_with_chunks(
# Halve batch size and retry this batch
batch_size = max(1, batch_size // 2)
logger.warning(
f"Batch too large for token limit, reducing to {batch_size} chunks"
f"Batch too large for token limit, " f"reducing to {batch_size} chunks"
)
continue # Retry same position with smaller batch
raise
Expand All @@ -242,7 +242,9 @@ def embed_with_chunks(
error_msg = _redact_sensitive(str(e))
logger.error(f"Chunked embedding failed: {error_msg}", exc_info=True)
raise EmbeddingError(
f"Failed to embed chunked text: {error_msg}", text_preview=text[:100], cause=e
f"Failed to embed chunked text: {error_msg}",
text_preview=text[:100],
cause=e,
) from e

def _load_cache_index(self) -> dict:
Expand Down Expand Up @@ -315,7 +317,8 @@ def _call_api_with_retry(self, api_func, *args, **kwargs):
# Exponential backoff: 2^attempt seconds (1, 2, 4, ...)
backoff = 2 ** (attempt + 1)
logger.warning(
f"Rate limited, retrying in {backoff}s (attempt {attempt + 1}/{self.max_retries})"
f"Rate limited, retrying in {backoff}s "
f"(attempt {attempt + 1}/{self.max_retries})"
)
time.sleep(backoff)
elif attempt < self.max_retries - 1:
Expand All @@ -327,10 +330,13 @@ def _call_api_with_retry(self, api_func, *args, **kwargs):
)
time.sleep(backoff)
else:
logger.error(f"API call failed after {self.max_retries} attempts: {error_msg}")
logger.error(
f"API call failed after {self.max_retries} attempts: " f"{error_msg}"
)

raise EmbeddingError(
f"API call failed after {self.max_retries} attempts: {_redact_sensitive(str(last_error))}",
f"API call failed after {self.max_retries} attempts: "
f"{_redact_sensitive(str(last_error))}",
cause=last_error,
)

Expand All @@ -349,7 +355,7 @@ async def _call_api_with_timeout(self, api_func, *args, **kwargs):
Raises:
EmbeddingError: If timeout or API error occurs
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

try:
result = await asyncio.wait_for(
Expand All @@ -363,7 +369,9 @@ async def _call_api_with_timeout(self, api_func, *args, **kwargs):
except TimeoutError as e:
raise EmbeddingError(f"API call timed out after {self.api_timeout}s", cause=e) from e

def embed(self, text: str, input_type: str = "document", use_cache: bool = True) -> list[float]:
async def embed(
self, text: str, input_type: str = "document", use_cache: bool = True
) -> list[float]:
"""
Generate embedding for a single text.

Expand All @@ -378,14 +386,14 @@ def embed(self, text: str, input_type: str = "document", use_cache: bool = True)
Raises:
EmbeddingError: If embedding generation fails
"""
results = self.embed_batch([text], input_type, use_cache)
results = await self.embed_batch([text], input_type, use_cache)

if not results or results[0] is None:
raise EmbeddingError("Failed to generate embedding for text", text_preview=text[:100])

return results[0]

def embed_batch(
async def embed_batch(
self, texts: list[str], input_type: str = "document", use_cache: bool = True
) -> list[list[float]]:
"""
Expand Down Expand Up @@ -435,8 +443,8 @@ def embed_batch(
for i in range(0, len(texts_to_embed), self.batch_size):
batch = texts_to_embed[i : i + self.batch_size]

# Rate limiting
self._rate_limit_sync()
# Rate limiting (async - non-blocking)
await self._rate_limit_async()

try:
# Filter out empty strings (Voyage API rejects them)
Expand All @@ -459,8 +467,8 @@ def embed_batch(
# Each note is a single-element list (whole note, not chunked)
nested_inputs = [[text] for text in non_empty]

# Call Voyage API with retry and error handling
result = self._call_api_with_retry(
# Call Voyage API with timeout (runs in thread pool, non-blocking)
result = await self._call_api_with_timeout(
self.client.contextualized_embed,
inputs=nested_inputs,
model=self.model,
Expand All @@ -473,7 +481,8 @@ def embed_batch(
# Since we pass whole notes as single chunks, we take [0]
api_embeddings = [doc_result.embeddings[0] for doc_result in result.results]

# Map back to original batch positions (accounting for None placeholders)
# Map back to original batch positions
# (accounting for None placeholders)
embedding_idx = 0
for text in filtered_batch:
if text is None:
Expand All @@ -482,7 +491,7 @@ def embed_batch(
new_embeddings.append(api_embeddings[embedding_idx])
embedding_idx += 1

# Cache results using JSON (safer than pickle)
# Cache results using JSON (safer than other formats)
if use_cache:
# Cache only non-None embeddings
for text, embedding in zip(non_empty, api_embeddings, strict=False):
Expand Down
2 changes: 1 addition & 1 deletion src/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ async def _reindex_file(self, file_path: str):

# Generate embedding(s) with automatic chunking for large notes
try:
embeddings_list, total_chunks = self.embedder.embed_with_chunks(
embeddings_list, total_chunks = await self.embedder.embed_with_chunks(
content, chunk_size=2000, input_type="document"
)
except EmbeddingError as e:
Expand Down
2 changes: 1 addition & 1 deletion src/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async def index_vault(vault_path: str, batch_size: int = 100):
for note_data in valid_notes:
# embed_with_chunks handles both small (whole) and large (chunked) notes
try:
embeddings_list, total_chunks = embedder.embed_with_chunks(
embeddings_list, total_chunks = await embedder.embed_with_chunks(
note_data["content"],
chunk_size=2000, # oachatbot standard
input_type="document",
Expand Down
7 changes: 5 additions & 2 deletions src/security_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ def validate_vault_path(user_path: str, vault_root: str) -> str:
raise SecurityError(f"Path traversal detected: {user_path}")

# 4. Resolve against vault root and ensure it stays within bounds
vault_root_resolved = Path(vault_root).resolve()
full_path = (vault_root_resolved / sanitized).resolve()
try:
vault_root_resolved = Path(vault_root).resolve()
full_path = (vault_root_resolved / sanitized).resolve()
except (OSError, RuntimeError) as e:
raise SecurityError(f"Path resolution failed for '{user_path}': {e}") from e

# Check if resolved path is still within vault
try:
Expand Down
Loading
Loading