perf: add redis caching and reduce embedding API calls#372
perf: add redis caching and reduce embedding API calls#372Rajat-Ahuja1997 wants to merge 7 commits intomainfrom
Conversation
WalkthroughAdds caching and cache-key helpers for collections and session peer configurations, exposes two new CRUD helpers, introduces batch embedding with per-item fallback and embedding propagation, updates call sites to use the new CRUD helpers, and adds tests covering embedding fallback behavior. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/crud/session.py`:
- Around line 1024-1027: The pre-commit cache invalidation in
_get_or_add_peers_to_session creates a stale-cache race when called from
get_or_create_session; after the DB commit in get_or_create_session (after the
block that updates the session and calls db.commit()), add a post-commit
invalidation that calls
cache.delete(session_peer_config_cache_key(workspace_name, session.name)) when
peers were actually added (e.g., check session.peer_names or a boolean from
_get_or_add_peers_to_session) so peer configs are reloaded after the commit and
avoid caching stale data.
In `@src/routers/sessions.py`:
- Around line 269-270: After db.commit() succeeds in the session delete flow,
make cache invalidation best-effort by wrapping the two cache.delete calls
(session_cache_key(...) and session_peer_config_cache_key(...)) in a try/except
that catches broad cache/connection errors (e.g., Exception or specific Redis
errors) so they don’t propagate and turn a successful delete into a 500; on
exception, log a warning with context (workspace_id, session_id, function/method
name) and continue without re-raising. Ensure the try/except surrounds both
cache.delete calls and does not change the existing ValueError handling around
the DB operations.
🧹 Nitpick comments (4)
tests/utils/test_agent_tools.py (1)
821-885: Test name doesn't match the verified behavior.The test name
test_falls_back_to_per_query_embedding_when_batch_failssuggests per-query embedding is invoked as a fallback, but the test actually verifies the opposite: thatembedding_client.embedis not called (it would raiseAssertionError) andNoneis passed as the embedding tosearch_messages. Consider renaming to something liketest_proceeds_with_none_embedding_when_batch_fails.src/utils/agent_tools.py (1)
1179-1195:ValidationExceptionwill be caught as an "unexpected" error, causing ERROR-level logging and an unnecessary rollback on a read path.
_handle_search_memoryraisesValidationException(line 1183), which isn't aValueErrororKeyError. Inexecute_tool, it falls into the genericexcept Exceptionhandler (line 1813) which logs atERRORwith a full traceback and rolls back the transaction — both inappropriate for a simple token-limit validation failure on a read-only path.Consider returning an error string directly, matching the pattern used by other tool handlers:
♻️ Proposed fix: return error string directly
query = tool_input["query"] try: query_embedding = await embedding_client.embed(query) except ValueError as e: - raise ValidationException( - f"Query exceeds maximum token limit of {settings.MAX_EMBEDDING_TOKENS}." - ) from e + return f"ERROR: Query exceeds maximum token limit of {settings.MAX_EMBEDDING_TOKENS}. Please use a shorter query." documents = await crud.query_documents(src/crud/session.py (2)
828-851: Return typedict[str, list[Any]]loses positional semantics.The list holds
[peer_configuration, session_peer_configuration, is_active], but consumers must rely on index positions. ATypedDictorNamedTuplewould make this contract explicit and prevent index-based errors at call sites. With cashews serialization, aTypedDictshould work fine.
42-47:SESSION_PEER_CONFIG_LOCK_PREFIXreuses the same value asSESSION_LOCK_PREFIX— works but may confuse readers.Both resolve to
f"{get_cache_namespace()}:lock". This is functionally correct since the actual lock keys differ (different key templates), but sharing the identical value may mislead future maintainers into thinking they share a lock. Consider giving it a distinct suffix (e.g.,:peer_config_lock) or adding a brief comment explaining why the shared prefix is safe.
Code Review - Issues FoundI've reviewed this PR and found 3 issues that need attention: Issue 1: Silent data loss in error handling (CLAUDE.md violation)File: Issue: When individual observation embedding fails, the code logs a warning and uses
CLAUDE.md Rule: "Explicit error handling with appropriate exception types" Why this matters: This pattern is inconsistent with the rest of the codebase. Lines 1180-1185 in the same file show the correct pattern: catch Suggested fix: Instead of
Link: honcho/src/utils/agent_tools.py Lines 654 to 660 in 14fa071 CLAUDE.md: Line 115 in 14fa071 Issue 2: Cache invalidation race condition in
|
good callouts will fix |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/crud/collection.py`:
- Around line 132-153: The call to cache.delete in
update_collection_internal_metadata can raise (e.g., Redis down) and should be
best-effort so a successful DB update isn't turned into an error; after await
db.commit() wrap the await cache.delete(collection_cache_key(...)) in a
try/except Exception block (catch broad exceptions), log or ignore the error
(e.g., with logger.warning or process_logger.warning) and do not re-raise so the
function returns normally even if cache eviction fails; update the function name
update_collection_internal_metadata to include this error-handling around
cache.delete.
- Around line 112-119: The cache.set call that writes honcho_collection using
collection_cache_key and settings.CACHE.DEFAULT_TTL_SECONDS must be made
best-effort: wrap the await cache.set(...) in a try/except (catch Exception) so
Redis/connection errors do not bubble up after the commit; on exception, log the
error (using the module logger or existing logging pattern) and continue without
re-raising. Follow the same pattern used in src/routers/sessions.py (lines
handling cache in a try/except) to keep semantics consistent.
In `@src/crud/session.py`:
- Around line 586-589: The cache deletion calls
(cache.delete(session_cache_key(...) and
cache.delete(session_peer_config_cache_key(...)) currently live inside the
try/except that surrounds the DB transaction and will cause a caught exception
to incorrectly trigger rollback/re-raise; move these two cache.invalidate calls
out of the try block so they run after a successful db.commit() and treat them
as best-effort (catch/log but don’t re-raise) similar to the pattern used in
routers/sessions.py; update the function around db.commit() in
crud.delete_session to perform cache.delete calls only after commit and wrap
those deletes in a non-fatal exception handler so Redis failures don’t surface
to callers.
🧹 Nitpick comments (2)
src/crud/session.py (1)
821-855: Returning positional lists reduces type safety for consumers.The return value maps
peer_name → [peer_configuration, session_peer_configuration, is_active], requiring consumers to use magic indices. ATypedDictorNamedTuplewould make access self-documenting and prevent index mix-up bugs.That said, this is an internal caching helper and the plain
dict/liststructure serializes cleanly with cashews — so this is fine to defer.src/routers/sessions.py (1)
269-273: Consider logging the swallowed exception for observability.The best-effort
try/exceptis correct — a cache failure shouldn't turn a successful deletion into a 500. However, the barepassmeans Redis connectivity issues are completely invisible. Alogger.warningwould aid debugging without changing behavior, and aligns with the coding guideline to use proper logging and the Ruff S110 hint.🔧 Optional improvement
try: await cache.delete(session_cache_key(workspace_id, session_id)) await cache.delete(session_peer_config_cache_key(workspace_id, session_id)) - except Exception: # nosec B110 — best-effort; TTL will clean up - pass + except Exception: + logger.warning( + "Failed to invalidate cache for session %s, entries will expire via TTL", + session_id, + )As per coding guidelines, "Use proper logging with context instead of print statements" and "Use explicit error handling with appropriate exception types."
|
|
||
| key = collection_cache_key(workspace_name, observer, observed) | ||
| await cache.set( | ||
| key, | ||
| honcho_collection, | ||
| expire=settings.CACHE.DEFAULT_TTL_SECONDS, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Unguarded cache.set can surface a 500 after a successful commit.
If Redis is unavailable, the cache.set call on line 114 raises and propagates to the caller, even though the collection was already persisted at line 111. Other mutation paths in src/routers/sessions.py (lines 269-273) already wrap cache operations in try/except for best-effort semantics; this path should be consistent.
🛡️ Proposed fix
await db.commit()
- key = collection_cache_key(workspace_name, observer, observed)
- await cache.set(
- key,
- honcho_collection,
- expire=settings.CACHE.DEFAULT_TTL_SECONDS,
- )
+ try:
+ key = collection_cache_key(workspace_name, observer, observed)
+ await cache.set(
+ key,
+ honcho_collection,
+ expire=settings.CACHE.DEFAULT_TTL_SECONDS,
+ )
+ except Exception:
+ logger.warning(
+ "Failed to cache collection %s/%s, will be populated on next read",
+ observer,
+ observed,
+ )
return honcho_collection🤖 Prompt for AI Agents
In `@src/crud/collection.py` around lines 112 - 119, The cache.set call that
writes honcho_collection using collection_cache_key and
settings.CACHE.DEFAULT_TTL_SECONDS must be made best-effort: wrap the await
cache.set(...) in a try/except (catch Exception) so Redis/connection errors do
not bubble up after the commit; on exception, log the error (using the module
logger or existing logging pattern) and continue without re-raising. Follow the
same pattern used in src/routers/sessions.py (lines handling cache in a
try/except) to keep semantics consistent.
| async def update_collection_internal_metadata( | ||
| db: AsyncSession, | ||
| workspace_name: str, | ||
| observer: str, | ||
| observed: str, | ||
| update_data: dict[str, Any], | ||
| ) -> None: | ||
| """Merge a patch into a collection's internal_metadata (JSONB ||) and invalidate cache.""" | ||
| stmt = ( | ||
| update(models.Collection) | ||
| .where( | ||
| models.Collection.workspace_name == workspace_name, | ||
| models.Collection.observer == observer, | ||
| models.Collection.observed == observed, | ||
| ) | ||
| .values( | ||
| internal_metadata=models.Collection.internal_metadata.op("||")(update_data) | ||
| ) | ||
| ) | ||
| await db.execute(stmt) | ||
| await db.commit() | ||
| await cache.delete(collection_cache_key(workspace_name, observer, observed)) |
There was a problem hiding this comment.
Same resilience gap on cache.delete after commit.
Line 153's cache.delete can raise if Redis is down, turning a successful DB update into a caller-visible error. Wrap it in best-effort handling for consistency with the router-layer pattern.
🛡️ Proposed fix
await db.execute(stmt)
await db.commit()
- await cache.delete(collection_cache_key(workspace_name, observer, observed))
+ try:
+ await cache.delete(collection_cache_key(workspace_name, observer, observed))
+ except Exception:
+ logger.warning(
+ "Failed to invalidate collection cache for %s/%s, entry will expire via TTL",
+ observer,
+ observed,
+ )🤖 Prompt for AI Agents
In `@src/crud/collection.py` around lines 132 - 153, The call to cache.delete in
update_collection_internal_metadata can raise (e.g., Redis down) and should be
best-effort so a successful DB update isn't turned into an error; after await
db.commit() wrap the await cache.delete(collection_cache_key(...)) in a
try/except Exception block (catch broad exceptions), log or ignore the error
(e.g., with logger.warning or process_logger.warning) and do not re-raise so the
function returns normally even if cache eviction fails; update the function name
update_collection_internal_metadata to include this error-handling around
cache.delete.
|
|
||
| # Invalidate caches | ||
| await cache.delete(session_cache_key(workspace_name, session_name)) | ||
| await cache.delete(session_peer_config_cache_key(workspace_name, session_name)) |
There was a problem hiding this comment.
Cache failures inside the try block will trigger a spurious rollback and re-raise.
cache.delete at lines 588–589 runs inside the try that catches Exception (line 592). If Redis is unreachable after a successful db.commit() at line 585, the handler rolls back (no-op since already committed) and re-raises — surfacing an error to the caller even though deletion succeeded. Since crud.delete_session is invoked from the background deletion task, this would cause unnecessary retries.
Move cache invalidation outside the try block with best-effort handling, consistent with the router-layer pattern in src/routers/sessions.py (lines 269-273).
🛡️ Proposed fix
# Finally, delete the session itself
await db.delete(honcho_session)
await db.commit()
-
- # Invalidate caches
- await cache.delete(session_cache_key(workspace_name, session_name))
- await cache.delete(session_peer_config_cache_key(workspace_name, session_name))
-
logger.debug("Session %s and all associated data deleted", session_name)
except Exception as e:
logger.error("Failed to delete session %s: %s", session_name, e)
await db.rollback()
raise e
+ # Best-effort cache invalidation (DB is already committed)
+ try:
+ await cache.delete(session_cache_key(workspace_name, session_name))
+ await cache.delete(session_peer_config_cache_key(workspace_name, session_name))
+ except Exception:
+ logger.warning(
+ "Failed to invalidate caches for session %s, entries will expire via TTL",
+ session_name,
+ )
+
return SessionDeletionResult(🤖 Prompt for AI Agents
In `@src/crud/session.py` around lines 586 - 589, The cache deletion calls
(cache.delete(session_cache_key(...) and
cache.delete(session_peer_config_cache_key(...)) currently live inside the
try/except that surrounds the DB transaction and will cause a caught exception
to incorrectly trigger rollback/re-raise; move these two cache.invalidate calls
out of the try block so they run after a successful db.commit() and treat them
as best-effort (catch/log but don’t re-raise) similar to the pattern used in
routers/sessions.py; update the function around db.commit() in
crud.delete_session to perform cache.delete calls only after commit and wrap
those deletes in a non-fatal exception handler so Redis failures don’t surface
to callers.
| @@ -619,8 +621,42 @@ async def create_observations( | |||
| else: | |||
| level = "explicit" | |||
|
|
|||
| # Generate embedding for the observation | |||
| embedding = await embedding_client.embed(content) | |||
| valid_observations.append((obs, level)) | |||
| contents.append(content) | |||
There was a problem hiding this comment.
nitpick but I think validation should be done with pydantic if possible
| except Exception as e: | ||
| logger.warning( | ||
| "Batch embedding failed for extract_preferences; " | ||
| + f"falling back to per-query embedding in search_messages: {e}" | ||
| ) |
There was a problem hiding this comment.
should add a return type to this of ObservationsCreatedResult or something like that that reports if all of the observations were created or not so that we report any silent failures if any of the list of requested observations was malformed in some way
Summary by CodeRabbit
New Features
Performance
Tests