Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions hindsight-api-slim/hindsight_api/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ class MemoryItem(BaseModel):
"A list of tag lists runs one pass per inner list, giving full control over which combinations to use."
),
)
strategy: str | None = Field(
default=None,
description="Named retain strategy for this item. Overrides the bank's default strategy for this item only. "
"Strategies are defined in the bank config under 'retain_strategies'.",
)

@field_validator("timestamp", mode="before")
@classmethod
Expand Down Expand Up @@ -491,6 +496,11 @@ class FileRetainMetadata(BaseModel):
description="Parser or ordered fallback chain for this file (overrides request-level parser). "
"E.g. 'iris' or ['iris', 'markitdown'].",
)
strategy: str | None = Field(
default=None,
description="Named retain strategy for this file. Overrides the bank's default strategy. "
"Strategies are defined in the bank config under 'retain_strategies'.",
)


class FileRetainRequest(BaseModel):
Expand Down Expand Up @@ -544,7 +554,11 @@ class RetainResponse(BaseModel):
)
operation_id: str | None = Field(
default=None,
description="Operation ID for tracking async operations. Use GET /v1/default/banks/{bank_id}/operations to list operations. Only present when async=true.",
description="Operation ID for tracking async operations. Use GET /v1/default/banks/{bank_id}/operations to list operations. Only present when async=true. When items use different per-item strategies, use operation_ids instead.",
)
operation_ids: list[str] | None = Field(
default=None,
description="Operation IDs when items were submitted as multiple strategy groups (async=true with mixed per-item strategies). operation_id is set to the first entry for backward compatibility.",
)
usage: TokenUsage | None = Field(
default=None,
Expand Down Expand Up @@ -4441,10 +4455,13 @@ async def api_retain(
metrics = get_metrics_collector()

try:
# Prepare contents for processing
contents = []
# Group items by strategy
strategy_groups: dict[str | None, list[dict]] = {}
for item in request.items:
content_dict = {"content": item.content}
effective = item.strategy
if effective not in strategy_groups:
strategy_groups[effective] = []
content_dict: dict = {"content": item.content}
if item.timestamp == "unset":
content_dict["event_date"] = None
elif item.timestamp:
Expand All @@ -4461,20 +4478,30 @@ async def api_retain(
content_dict["tags"] = item.tags
if item.observation_scopes is not None:
content_dict["observation_scopes"] = item.observation_scopes
contents.append(content_dict)
strategy_groups[effective].append(content_dict)

if request.async_:
# Async processing: queue task and return immediately
result = await app.state.memory.submit_async_retain(
bank_id, contents, document_tags=request.document_tags, request_context=request_context
)
# Async processing: one submit per strategy group
all_operation_ids = []
total_items_count = 0
for group_strategy, contents in strategy_groups.items():
result = await app.state.memory.submit_async_retain(
bank_id,
contents,
document_tags=request.document_tags,
strategy=group_strategy,
request_context=request_context,
)
all_operation_ids.append(result["operation_id"])
total_items_count += result["items_count"]
return RetainResponse.model_validate(
{
"success": True,
"bank_id": bank_id,
"items_count": result["items_count"],
"items_count": total_items_count,
"async": True,
"operation_id": result["operation_id"],
"operation_id": all_operation_ids[0] if all_operation_ids else None,
"operation_ids": all_operation_ids if len(all_operation_ids) > 1 else None,
}
)
else:
Expand All @@ -4493,24 +4520,41 @@ async def api_retain(
),
)

# Synchronous processing: wait for completion (record metrics)
# Synchronous processing: one batch per strategy group, aggregate results
total_items_count = 0
total_usage = TokenUsage(input_tokens=0, output_tokens=0, total_tokens=0)
with metrics.record_operation("retain", bank_id=bank_id, source="api"):
result, usage = await app.state.memory.retain_batch_async(
bank_id=bank_id,
contents=contents,
document_tags=request.document_tags,
request_context=request_context,
return_usage=True,
outbox_callback=app.state.memory._build_retain_outbox_callback(
for group_strategy, contents in strategy_groups.items():
result, usage = await app.state.memory.retain_batch_async(
bank_id=bank_id,
contents=contents,
operation_id=None,
schema=_current_schema.get(),
),
)
document_tags=request.document_tags,
strategy=group_strategy,
request_context=request_context,
return_usage=True,
outbox_callback=app.state.memory._build_retain_outbox_callback(
bank_id=bank_id,
contents=contents,
operation_id=None,
schema=_current_schema.get(),
),
)
total_items_count += len(contents)
if usage:
total_usage = TokenUsage(
input_tokens=total_usage.input_tokens + usage.input_tokens,
output_tokens=total_usage.output_tokens + usage.output_tokens,
total_tokens=total_usage.total_tokens + usage.total_tokens,
)

return RetainResponse.model_validate(
{"success": True, "bank_id": bank_id, "items_count": len(contents), "async": False, "usage": usage}
{
"success": True,
"bank_id": bank_id,
"items_count": total_items_count,
"async": False,
"usage": total_usage,
}
)
except OperationValidationError as e:
raise HTTPException(status_code=e.status_code, detail=e.reason)
Expand Down Expand Up @@ -4673,6 +4717,7 @@ async def read(self):
"tags": file_meta.tags or [],
"timestamp": file_meta.timestamp,
"parser": parser_chain,
"strategy": file_meta.strategy,
}
file_items.append(item)

Expand Down
15 changes: 11 additions & 4 deletions hindsight-api-slim/hindsight_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def normalize_config_dict(config: dict[str, Any]) -> dict[str, Any]:
ENV_RETAIN_EXTRACTION_MODE = "HINDSIGHT_API_RETAIN_EXTRACTION_MODE"
ENV_RETAIN_MISSION = "HINDSIGHT_API_RETAIN_MISSION"
ENV_RETAIN_CUSTOM_INSTRUCTIONS = "HINDSIGHT_API_RETAIN_CUSTOM_INSTRUCTIONS"
ENV_RETAIN_DEFAULT_STRATEGY = "HINDSIGHT_API_RETAIN_DEFAULT_STRATEGY"
ENV_RETAIN_BATCH_TOKENS = "HINDSIGHT_API_RETAIN_BATCH_TOKENS"
ENV_RETAIN_ENTITY_LOOKUP = "HINDSIGHT_API_RETAIN_ENTITY_LOOKUP"
ENV_RETAIN_BATCH_ENABLED = "HINDSIGHT_API_RETAIN_BATCH_ENABLED"
Expand Down Expand Up @@ -443,9 +444,11 @@ def normalize_config_dict(config: dict[str, Any]) -> dict[str, Any]:
DEFAULT_RETAIN_CHUNK_SIZE = 3000 # Max chars per chunk for fact extraction
DEFAULT_RETAIN_EXTRACT_CAUSAL_LINKS = True # Extract causal links between facts
DEFAULT_RETAIN_EXTRACTION_MODE = "concise" # Extraction mode: "concise", "verbose", or "custom"
RETAIN_EXTRACTION_MODES = ("concise", "verbose", "custom") # Allowed extraction modes
RETAIN_EXTRACTION_MODES = ("concise", "verbose", "custom", "verbatim", "chunks") # Allowed extraction modes
DEFAULT_RETAIN_MISSION = None # Declarative spec of what to retain (injected into any extraction mode)
DEFAULT_RETAIN_CUSTOM_INSTRUCTIONS = None # Custom extraction guidelines (only used when mode="custom")
DEFAULT_RETAIN_DEFAULT_STRATEGY = None # Default strategy name (None = no strategy override)
DEFAULT_RETAIN_STRATEGIES: dict | None = None # Named retain strategies (dict of name → config overrides)
DEFAULT_RETAIN_BATCH_TOKENS = 10_000 # ~40KB of text # Max chars per sub-batch for async retain auto-splitting
DEFAULT_RETAIN_ENTITY_LOOKUP = "trigram" # "full" or "trigram"
DEFAULT_RETAIN_BATCH_ENABLED = False # Use LLM Batch API for fact extraction (only when async=True)
Expand Down Expand Up @@ -719,6 +722,8 @@ class HindsightConfig:
retain_extraction_mode: str
retain_mission: str | None
retain_custom_instructions: str | None
retain_default_strategy: str | None
retain_strategies: dict | None
retain_batch_tokens: int
retain_batch_enabled: bool
retain_batch_poll_interval_seconds: int
Expand Down Expand Up @@ -849,6 +854,8 @@ class HindsightConfig:
"retain_extraction_mode",
"retain_mission",
"retain_custom_instructions",
"retain_default_strategy",
"retain_strategies",
# Entity labels (controlled vocabulary for entity classification)
"entity_labels",
"entities_allow_free_form",
Expand Down Expand Up @@ -1101,9 +1108,7 @@ def from_env(cls) -> "HindsightConfig":
ENV_RERANKER_LOCAL_TRUST_REMOTE_CODE, str(DEFAULT_RERANKER_LOCAL_TRUST_REMOTE_CODE)
).lower()
in ("true", "1"),
reranker_local_fp16=os.getenv(
ENV_RERANKER_LOCAL_FP16, str(DEFAULT_RERANKER_LOCAL_FP16)
).lower()
reranker_local_fp16=os.getenv(ENV_RERANKER_LOCAL_FP16, str(DEFAULT_RERANKER_LOCAL_FP16)).lower()
in ("true", "1"),
reranker_local_bucket_batching=os.getenv(
ENV_RERANKER_LOCAL_BUCKET_BATCHING, str(DEFAULT_RERANKER_LOCAL_BUCKET_BATCHING)
Expand Down Expand Up @@ -1177,6 +1182,8 @@ def from_env(cls) -> "HindsightConfig":
),
retain_mission=os.getenv(ENV_RETAIN_MISSION) or DEFAULT_RETAIN_MISSION,
retain_custom_instructions=os.getenv(ENV_RETAIN_CUSTOM_INSTRUCTIONS) or DEFAULT_RETAIN_CUSTOM_INSTRUCTIONS,
retain_default_strategy=os.getenv(ENV_RETAIN_DEFAULT_STRATEGY) or DEFAULT_RETAIN_DEFAULT_STRATEGY,
retain_strategies=DEFAULT_RETAIN_STRATEGIES,
retain_batch_tokens=int(os.getenv(ENV_RETAIN_BATCH_TOKENS, str(DEFAULT_RETAIN_BATCH_TOKENS))),
retain_entity_lookup=os.getenv(ENV_RETAIN_ENTITY_LOOKUP, DEFAULT_RETAIN_ENTITY_LOOKUP),
retain_batch_enabled=os.getenv(ENV_RETAIN_BATCH_ENABLED, str(DEFAULT_RETAIN_BATCH_ENABLED)).lower()
Expand Down
42 changes: 41 additions & 1 deletion hindsight-api-slim/hindsight_api/config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import json
import logging
from dataclasses import asdict
from dataclasses import asdict, replace
from typing import Any

import asyncpg
Expand Down Expand Up @@ -239,6 +239,14 @@ async def update_bank_config(
logger.warning(f"Failed to check permissions for bank {bank_id}: {e}")
# Continue without permission check (fail open for backward compatibility)

# Validate retain_strategies: reject empty string keys
if "retain_strategies" in normalized_updates and normalized_updates["retain_strategies"]:
empty_keys = [k for k in normalized_updates["retain_strategies"] if not str(k).strip()]
if empty_keys:
raise ValueError(
"Strategy names must not be empty strings. Remove entries with empty names before saving."
)

# Merge with existing config (JSONB || operator)
async with self.pool.acquire() as conn:
await conn.execute(
Expand Down Expand Up @@ -273,3 +281,35 @@ async def reset_bank_config(self, bank_id: str) -> None:
)

logger.info(f"Reset bank config for {bank_id} to defaults")


def apply_strategy(config: HindsightConfig, strategy_name: str) -> HindsightConfig:
"""
Apply a named retain strategy's overrides on top of a resolved config.

A strategy is a named set of hierarchical field overrides stored in
config.retain_strategies. Any field in _HIERARCHICAL_FIELDS can be
overridden, including retain_extraction_mode, retain_chunk_size,
entity_labels, entities_allow_free_form, etc.

Unknown strategy names log a warning and return config unchanged.
Unknown or non-hierarchical fields in the strategy are silently ignored.
"""
strategies = config.retain_strategies or {}
if strategy_name not in strategies:
logger.warning(f"Unknown retain strategy '{strategy_name}', using resolved config as-is")
return config

overrides = strategies[strategy_name]
if not isinstance(overrides, dict):
logger.warning(f"Retain strategy '{strategy_name}' is not a dict, skipping")
return config

configurable = HindsightConfig.get_configurable_fields()
filtered = {k: v for k, v in overrides.items() if k in configurable}

if not filtered:
return config

logger.debug(f"Applying retain strategy '{strategy_name}': {list(filtered.keys())}")
return replace(config, **filtered)
20 changes: 20 additions & 0 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ async def _handle_batch_retain(self, task_dict: dict[str, Any]):
contents = task_dict.get("contents", [])
document_tags = task_dict.get("document_tags")
operation_id = task_dict.get("operation_id") # For batch API crash recovery
strategy = task_dict.get("strategy")

logger.info(
f"[BATCH_RETAIN_TASK] Starting background batch retain for bank_id={bank_id}, {len(contents)} items, operation_id={operation_id}"
Expand All @@ -584,6 +585,7 @@ async def _handle_batch_retain(self, task_dict: dict[str, Any]):
document_tags=document_tags,
request_context=context,
operation_id=operation_id,
strategy=strategy,
outbox_callback=self._build_retain_outbox_callback(
bank_id=bank_id,
contents=contents,
Expand Down Expand Up @@ -712,6 +714,8 @@ async def _handle_file_convert_retain(self, task_dict: dict[str, Any]):
retain_task_payload: dict[str, Any] = {"contents": retain_contents}
if document_tags:
retain_task_payload["document_tags"] = document_tags
if task_dict.get("strategy"):
retain_task_payload["strategy"] = task_dict["strategy"]

# Pass tenant/api_key context through to retain task
if task_dict.get("_tenant_id"):
Expand Down Expand Up @@ -1953,6 +1957,7 @@ async def retain_batch_async(
return_usage: bool = False,
operation_id: str | None = None,
outbox_callback: "Callable[[asyncpg.Connection], Awaitable[None]] | None" = None,
strategy: str | None = None,
):
"""
Store multiple content items as memory units in ONE batch operation.
Expand Down Expand Up @@ -2110,6 +2115,7 @@ async def retain_batch_async(
confidence_score=confidence_score,
document_tags=document_tags,
operation_id=operation_id,
strategy=strategy,
# Outbox callback runs inside the last sub-batch's transaction so the
# webhook delivery row is committed atomically with the final retain data.
outbox_callback=outbox_callback if i == len(sub_batches) else None,
Expand All @@ -2134,6 +2140,7 @@ async def retain_batch_async(
confidence_score=confidence_score,
document_tags=document_tags,
operation_id=operation_id,
strategy=strategy,
outbox_callback=outbox_callback,
)

Expand Down Expand Up @@ -2186,6 +2193,7 @@ async def _retain_batch_async_internal(
document_tags: list[str] | None = None,
operation_id: str | None = None,
outbox_callback: "Callable[[asyncpg.Connection], Awaitable[None]] | None" = None,
strategy: str | None = None,
) -> tuple[list[list[str]], "TokenUsage"]:
"""
Internal method for batch processing without chunking logic.
Expand Down Expand Up @@ -2218,6 +2226,13 @@ async def _retain_batch_async_internal(
# Resolve bank-specific config for this operation
resolved_config = await self._config_resolver.resolve_full_config(bank_id, request_context)

# Apply strategy overrides: explicit strategy > bank default strategy
from hindsight_api.config_resolver import apply_strategy

effective_strategy = strategy or resolved_config.retain_default_strategy
if effective_strategy:
resolved_config = apply_strategy(resolved_config, effective_strategy)

# Create parent span for retain operation
with create_operation_span("retain", bank_id):
return await orchestrator.retain_batch(
Expand Down Expand Up @@ -7377,6 +7392,7 @@ async def submit_async_retain(
*,
request_context: "RequestContext",
document_tags: list[str] | None = None,
strategy: str | None = None,
) -> dict[str, Any]:
"""Submit a batch retain operation to run asynchronously.

Expand Down Expand Up @@ -7485,6 +7501,8 @@ async def submit_async_retain(
task_payload: dict[str, Any] = {"contents": sub_batch}
if document_tags:
task_payload["document_tags"] = document_tags
if strategy:
task_payload["strategy"] = strategy
# Pass tenant_id and api_key_id through task payload
if request_context.tenant_id:
task_payload["_tenant_id"] = request_context.tenant_id
Expand Down Expand Up @@ -7599,6 +7617,8 @@ async def submit_async_file_retain(
"document_tags": document_tags or [],
"timestamp": item.get("timestamp"),
}
if item.get("strategy"):
task_payload["strategy"] = item["strategy"]

# Pass tenant_id and api_key_id through task payload
if request_context.tenant_id:
Expand Down
Loading
Loading