Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.docker.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ ORCHESTRATOR_PORT=8002
# Hugging Face token (required for gated models like Llama)
HF_TOKEN=

# -----------------------------------------------------------------------------
# Vector Search (optional semantic search microservice)
# -----------------------------------------------------------------------------

# Vector Search service URL (enable with: docker compose --profile vector-search up -d)
# VECTOR_SEARCH_URL=http://ai-secretary-vector-search:8000

# Bearer token for Vector Search API (optional, both sides must match)
VECTOR_SEARCH_TOKEN=

# -----------------------------------------------------------------------------
# Advanced (usually no need to change)
# -----------------------------------------------------------------------------
Expand Down
5 changes: 4 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ AI Secretary System — virtual secretary with voice cloning (XTTS v2, OpenVoice
cp .env.docker.example .env && docker compose up -d # GPU mode
docker compose -f docker-compose.yml -f docker-compose.cpu.yml up -d # CPU mode
docker compose -f docker-compose.yml -f docker-compose.full.yml up -d # Full containerized (includes vLLM)
docker compose --profile vector-search up -d # + Vector Search microservice (:8003)

# Local
./start_gpu.sh # GPU: XTTS + Qwen2.5-7B + LoRA
Expand Down Expand Up @@ -238,7 +239,7 @@ New routers import domain services directly (`from modules.monitoring.service im

**Cloud LLM**: `cloud_llm_service.py` factory pattern. OpenAI-compatible providers auto-handled via `OpenAICompatibleProvider`. Custom SDKs get their own provider class inheriting `BaseLLMProvider`. Provider types in `PROVIDER_TYPES` dict in `db/models.py`. Supports model fallback via `fallback_models` list. `supports_tools` flag + `generate_with_tools()` on `OpenAICompatibleProvider` and `VLLMLLMService` for tool-calling (agentic RAG).

**Wiki RAG**: `app/services/wiki_rag_service.py` — tiered search: (1) semantic embeddings (Gemini/OpenAI/local), (2) BM25 with Russian/English stemming. Multi-collection support. Per-instance RAG config on bots/widgets. **Agentic RAG** (`modules/chat/router.py`): server-side loop where LLM calls `knowledge_search` tool to query the knowledge base on demand (max 5 iterations). Providers without `supports_tools` (Gemini SDK) fall back to one-shot RAG injection. Frontend shows inline search indicator via `tool_start`/`tool_end` SSE events.
**Wiki RAG**: `app/services/wiki_rag_service.py` — tiered search: (1) semantic embeddings (Gemini/OpenAI/local), (2) BM25 with Russian/English stemming, (3) Vector Search microservice (if `VECTOR_SEARCH_URL` configured). Multi-collection support. Per-instance RAG config on bots/widgets. **Agentic RAG** (`modules/chat/router.py`): server-side loop where LLM calls `knowledge_search` tool to query the knowledge base on demand (max 5 iterations). Providers without `supports_tools` (Gemini SDK) fall back to one-shot RAG injection. Frontend shows inline search indicator via `tool_start`/`tool_end` SSE events. **Vector Search** (`services/vector-search/`): standalone FastAPI microservice using ChromaDB + `paraphrase-multilingual-mpnet-base-v2` (768 dims). Client: `app/services/vector_search_client.py` (async httpx). Runs as Docker profile `vector-search` on port 8003. Async search methods (`search_async`, `retrieve_async`, `retrieve_multi_async`) run all engines in parallel via `asyncio.gather` and merge/deduplicate results. Background task `vector-search-sync` upserts all sections on startup. `DatasetSynced` event triggers incremental sync. Admin endpoints: `GET /admin/wiki-rag/vector-search/status`, `POST /admin/wiki-rag/vector-search/sync`.

### Frontend Architecture

Expand Down Expand Up @@ -333,6 +334,8 @@ ORCHESTRATOR_PORT=8002
ADMIN_JWT_SECRET=... # Auto-generated if empty
REDIS_URL=redis://localhost:6379/0 # Optional, graceful fallback
DEV_MODE=1 # Backend proxies to Vite dev server (:5173)
VECTOR_SEARCH_URL=http://localhost:8003 # Optional, Vector Search microservice
VECTOR_SEARCH_TOKEN= # Bearer token for Vector Search API
```

## Deployment
Expand Down
8 changes: 8 additions & 0 deletions app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def __init__(self):
# Wiki RAG service
self.wiki_rag_service = None

# Vector Search client (HTTP → microservice)
self.vector_search_client = None

# Current voice configuration
self.current_voice_config = {
"engine": "xtts",
Expand Down Expand Up @@ -133,3 +136,8 @@ def get_streaming_tts_manager():
def get_gsm_service():
"""Dependency: Get GSM telephony service."""
return get_container().gsm_service


def get_vector_search_client():
"""Dependency: Get Vector Search microservice client."""
return get_container().vector_search_client
186 changes: 186 additions & 0 deletions app/services/vector_search_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""
Async HTTP client for Vector Search microservice.

Graceful degradation: if the service is unavailable, methods log warnings
and return empty results instead of raising exceptions.
"""

from __future__ import annotations

import logging
from typing import Any

import httpx


logger = logging.getLogger(__name__)

# Retry config
MAX_RETRIES = 2
RETRY_BACKOFF = 0.5 # seconds


class VectorSearchClient:
"""Async client for the Vector Search microservice."""

def __init__(self, base_url: str, token: str = "", timeout: float = 30.0):
self._base_url = base_url.rstrip("/")
self._token = token
self._timeout = timeout
self._available = False

def _headers(self) -> dict[str, str]:
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
return headers

async def _request(
self,
method: str,
path: str,
json: dict | None = None,
params: dict | None = None,
) -> dict[str, Any]:
"""Make an HTTP request with retry and graceful error handling."""
import asyncio

url = f"{self._base_url}{path}"
last_error = None

for attempt in range(MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.request(
method, url, json=json, params=params, headers=self._headers()
)
resp.raise_for_status()
self._available = True
return resp.json()
except (httpx.ConnectError, httpx.ConnectTimeout) as e:
last_error = e
self._available = False
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF * (attempt + 1))
except httpx.HTTPStatusError as e:
logger.warning("Vector Search %s %s: HTTP %s", method, path, e.response.status_code)
self._available = True
raise
except Exception as e:
last_error = e
self._available = False
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF * (attempt + 1))

logger.warning("Vector Search unavailable (%s %s): %s", method, path, last_error)
return {}

@property
def available(self) -> bool:
"""Whether the last request succeeded."""
return self._available

@property
def base_url(self) -> str:
return self._base_url

async def health(self) -> dict[str, Any]:
"""Check service health."""
return await self._request("GET", "/health")

async def upsert(
self,
text: str,
doc_id: str = "",
group: str = "default",
chunk_size: int | None = 500,
chunk_overlap: int | None = 50,
metadata: dict | None = None,
) -> list[str]:
"""Upsert text into vector store. Returns list of record IDs."""
body: dict[str, Any] = {
"text": text,
"doc_id": doc_id,
"group": group,
}
if chunk_size is not None:
body["chunk_size"] = chunk_size
if chunk_overlap is not None:
body["chunk_overlap"] = chunk_overlap
if metadata:
body["metadata"] = metadata

result = await self._request("POST", "/upsert", json=body)
return result.get("record_ids", [])

async def search(
self,
text: str,
group: str = "default",
doc_id: str = "",
min_similarity: float = 0.3,
limit: int = 5,
) -> list[dict[str, Any]]:
"""Semantic search. Returns list of {record_id, text, similarity, metadata}."""
body = {
"text": text,
"group": group,
"min_similarity": min_similarity,
"limit": limit,
}
if doc_id:
body["doc_id"] = doc_id

result = await self._request("POST", "/search", json=body)
return result.get("results", [])

async def compare(self, text: str, record_id: str) -> float:
"""Compare text with a stored record. Returns similarity score."""
result = await self._request(
"POST",
"/compare",
json={
"text": text,
"record_id": record_id,
},
)
return result.get("similarity", 0.0)

async def count(self, group: str = "default") -> int:
"""Count records in a group."""
result = await self._request("GET", "/count", params={"group": group})
return result.get("count", 0)

async def get_ids(self, group: str = "default", doc_id: str = "") -> list[str]:
"""Get record IDs in a group."""
params: dict[str, str] = {"group": group}
if doc_id:
params["doc_id"] = doc_id
result = await self._request("GET", "/ids", params=params)
return result.get("ids", [])

async def delete_record(self, record_id: str) -> bool:
"""Delete a single record."""
try:
result = await self._request("POST", "/delete/record", json={"record_id": record_id})
return result.get("status") == "ok"
except Exception:
return False

async def delete_document(self, doc_id: str, group: str = "default") -> bool:
"""Delete all records for a document."""
try:
result = await self._request(
"POST", "/delete/document", json={"doc_id": doc_id, "group": group}
)
return result.get("status") == "ok"
except Exception:
return False

async def delete_group(self, group: str) -> bool:
"""Delete an entire group (collection)."""
try:
result = await self._request("POST", "/delete/group", json={"group": group})
return result.get("status") == "ok"
except Exception:
return False
Loading
Loading