diff --git a/.env.docker.example b/.env.docker.example index b5d7d2f..64f9848 100644 --- a/.env.docker.example +++ b/.env.docker.example @@ -73,6 +73,16 @@ ORCHESTRATOR_PORT=8002 # Hugging Face token (required for gated models like Llama) HF_TOKEN= +# ----------------------------------------------------------------------------- +# Vector Search (optional semantic search microservice) +# ----------------------------------------------------------------------------- + +# Vector Search service URL (enable with: docker compose --profile vector-search up -d) +# VECTOR_SEARCH_URL=http://ai-secretary-vector-search:8000 + +# Bearer token for Vector Search API (optional, both sides must match) +VECTOR_SEARCH_TOKEN= + # ----------------------------------------------------------------------------- # Advanced (usually no need to change) # ----------------------------------------------------------------------------- diff --git a/CLAUDE.md b/CLAUDE.md index 4a9d6a7..c8bf7e8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -15,6 +15,7 @@ AI Secretary System — virtual secretary with voice cloning (XTTS v2, OpenVoice cp .env.docker.example .env && docker compose up -d # GPU mode docker compose -f docker-compose.yml -f docker-compose.cpu.yml up -d # CPU mode docker compose -f docker-compose.yml -f docker-compose.full.yml up -d # Full containerized (includes vLLM) +docker compose --profile vector-search up -d # + Vector Search microservice (:8003) # Local ./start_gpu.sh # GPU: XTTS + Qwen2.5-7B + LoRA @@ -238,7 +239,7 @@ New routers import domain services directly (`from modules.monitoring.service im **Cloud LLM**: `cloud_llm_service.py` factory pattern. OpenAI-compatible providers auto-handled via `OpenAICompatibleProvider`. Custom SDKs get their own provider class inheriting `BaseLLMProvider`. Provider types in `PROVIDER_TYPES` dict in `db/models.py`. Supports model fallback via `fallback_models` list. `supports_tools` flag + `generate_with_tools()` on `OpenAICompatibleProvider` and `VLLMLLMService` for tool-calling (agentic RAG). -**Wiki RAG**: `app/services/wiki_rag_service.py` — tiered search: (1) semantic embeddings (Gemini/OpenAI/local), (2) BM25 with Russian/English stemming. Multi-collection support. Per-instance RAG config on bots/widgets. **Agentic RAG** (`modules/chat/router.py`): server-side loop where LLM calls `knowledge_search` tool to query the knowledge base on demand (max 5 iterations). Providers without `supports_tools` (Gemini SDK) fall back to one-shot RAG injection. Frontend shows inline search indicator via `tool_start`/`tool_end` SSE events. +**Wiki RAG**: `app/services/wiki_rag_service.py` — tiered search: (1) semantic embeddings (Gemini/OpenAI/local), (2) BM25 with Russian/English stemming, (3) Vector Search microservice (if `VECTOR_SEARCH_URL` configured). Multi-collection support. Per-instance RAG config on bots/widgets. **Agentic RAG** (`modules/chat/router.py`): server-side loop where LLM calls `knowledge_search` tool to query the knowledge base on demand (max 5 iterations). Providers without `supports_tools` (Gemini SDK) fall back to one-shot RAG injection. Frontend shows inline search indicator via `tool_start`/`tool_end` SSE events. **Vector Search** (`services/vector-search/`): standalone FastAPI microservice using ChromaDB + `paraphrase-multilingual-mpnet-base-v2` (768 dims). Client: `app/services/vector_search_client.py` (async httpx). Runs as Docker profile `vector-search` on port 8003. Async search methods (`search_async`, `retrieve_async`, `retrieve_multi_async`) run all engines in parallel via `asyncio.gather` and merge/deduplicate results. Background task `vector-search-sync` upserts all sections on startup. `DatasetSynced` event triggers incremental sync. Admin endpoints: `GET /admin/wiki-rag/vector-search/status`, `POST /admin/wiki-rag/vector-search/sync`. ### Frontend Architecture @@ -333,6 +334,8 @@ ORCHESTRATOR_PORT=8002 ADMIN_JWT_SECRET=... # Auto-generated if empty REDIS_URL=redis://localhost:6379/0 # Optional, graceful fallback DEV_MODE=1 # Backend proxies to Vite dev server (:5173) +VECTOR_SEARCH_URL=http://localhost:8003 # Optional, Vector Search microservice +VECTOR_SEARCH_TOKEN= # Bearer token for Vector Search API ``` ## Deployment diff --git a/app/dependencies.py b/app/dependencies.py index c535d9f..39185d6 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -54,6 +54,9 @@ def __init__(self): # Wiki RAG service self.wiki_rag_service = None + # Vector Search client (HTTP → microservice) + self.vector_search_client = None + # Current voice configuration self.current_voice_config = { "engine": "xtts", @@ -133,3 +136,8 @@ def get_streaming_tts_manager(): def get_gsm_service(): """Dependency: Get GSM telephony service.""" return get_container().gsm_service + + +def get_vector_search_client(): + """Dependency: Get Vector Search microservice client.""" + return get_container().vector_search_client diff --git a/app/services/vector_search_client.py b/app/services/vector_search_client.py new file mode 100644 index 0000000..c255fda --- /dev/null +++ b/app/services/vector_search_client.py @@ -0,0 +1,186 @@ +""" +Async HTTP client for Vector Search microservice. + +Graceful degradation: if the service is unavailable, methods log warnings +and return empty results instead of raising exceptions. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import httpx + + +logger = logging.getLogger(__name__) + +# Retry config +MAX_RETRIES = 2 +RETRY_BACKOFF = 0.5 # seconds + + +class VectorSearchClient: + """Async client for the Vector Search microservice.""" + + def __init__(self, base_url: str, token: str = "", timeout: float = 30.0): + self._base_url = base_url.rstrip("/") + self._token = token + self._timeout = timeout + self._available = False + + def _headers(self) -> dict[str, str]: + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._token: + headers["Authorization"] = f"Bearer {self._token}" + return headers + + async def _request( + self, + method: str, + path: str, + json: dict | None = None, + params: dict | None = None, + ) -> dict[str, Any]: + """Make an HTTP request with retry and graceful error handling.""" + import asyncio + + url = f"{self._base_url}{path}" + last_error = None + + for attempt in range(MAX_RETRIES + 1): + try: + async with httpx.AsyncClient(timeout=self._timeout) as client: + resp = await client.request( + method, url, json=json, params=params, headers=self._headers() + ) + resp.raise_for_status() + self._available = True + return resp.json() + except (httpx.ConnectError, httpx.ConnectTimeout) as e: + last_error = e + self._available = False + if attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_BACKOFF * (attempt + 1)) + except httpx.HTTPStatusError as e: + logger.warning("Vector Search %s %s: HTTP %s", method, path, e.response.status_code) + self._available = True + raise + except Exception as e: + last_error = e + self._available = False + if attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_BACKOFF * (attempt + 1)) + + logger.warning("Vector Search unavailable (%s %s): %s", method, path, last_error) + return {} + + @property + def available(self) -> bool: + """Whether the last request succeeded.""" + return self._available + + @property + def base_url(self) -> str: + return self._base_url + + async def health(self) -> dict[str, Any]: + """Check service health.""" + return await self._request("GET", "/health") + + async def upsert( + self, + text: str, + doc_id: str = "", + group: str = "default", + chunk_size: int | None = 500, + chunk_overlap: int | None = 50, + metadata: dict | None = None, + ) -> list[str]: + """Upsert text into vector store. Returns list of record IDs.""" + body: dict[str, Any] = { + "text": text, + "doc_id": doc_id, + "group": group, + } + if chunk_size is not None: + body["chunk_size"] = chunk_size + if chunk_overlap is not None: + body["chunk_overlap"] = chunk_overlap + if metadata: + body["metadata"] = metadata + + result = await self._request("POST", "/upsert", json=body) + return result.get("record_ids", []) + + async def search( + self, + text: str, + group: str = "default", + doc_id: str = "", + min_similarity: float = 0.3, + limit: int = 5, + ) -> list[dict[str, Any]]: + """Semantic search. Returns list of {record_id, text, similarity, metadata}.""" + body = { + "text": text, + "group": group, + "min_similarity": min_similarity, + "limit": limit, + } + if doc_id: + body["doc_id"] = doc_id + + result = await self._request("POST", "/search", json=body) + return result.get("results", []) + + async def compare(self, text: str, record_id: str) -> float: + """Compare text with a stored record. Returns similarity score.""" + result = await self._request( + "POST", + "/compare", + json={ + "text": text, + "record_id": record_id, + }, + ) + return result.get("similarity", 0.0) + + async def count(self, group: str = "default") -> int: + """Count records in a group.""" + result = await self._request("GET", "/count", params={"group": group}) + return result.get("count", 0) + + async def get_ids(self, group: str = "default", doc_id: str = "") -> list[str]: + """Get record IDs in a group.""" + params: dict[str, str] = {"group": group} + if doc_id: + params["doc_id"] = doc_id + result = await self._request("GET", "/ids", params=params) + return result.get("ids", []) + + async def delete_record(self, record_id: str) -> bool: + """Delete a single record.""" + try: + result = await self._request("POST", "/delete/record", json={"record_id": record_id}) + return result.get("status") == "ok" + except Exception: + return False + + async def delete_document(self, doc_id: str, group: str = "default") -> bool: + """Delete all records for a document.""" + try: + result = await self._request( + "POST", "/delete/document", json={"doc_id": doc_id, "group": group} + ) + return result.get("status") == "ok" + except Exception: + return False + + async def delete_group(self, group: str) -> bool: + """Delete an entire group (collection).""" + try: + result = await self._request("POST", "/delete/group", json={"group": group}) + return result.get("status") == "ok" + except Exception: + return False diff --git a/app/services/wiki_rag_service.py b/app/services/wiki_rag_service.py index 54db949..a3228bb 100644 --- a/app/services/wiki_rag_service.py +++ b/app/services/wiki_rag_service.py @@ -26,6 +26,7 @@ if TYPE_CHECKING: from app.services.embedding_provider import BaseEmbeddingProvider + from app.services.vector_search_client import VectorSearchClient logger = logging.getLogger(__name__) @@ -177,6 +178,9 @@ def __init__(self, wiki_dir: Optional[Path] = None): self._embeddings: dict[str, list[float]] = {} # section_id → vector self._embedding_cache_path = Path("data/wiki_embeddings.json") + # Vector Search microservice client + self._vector_search_client: Optional[VectorSearchClient] = None + if wiki_dir and wiki_dir.exists(): self._load_and_index(wiki_dir) @@ -750,6 +754,197 @@ def search(self, query: str, top_k: int = 3, collection_id: Optional[int] = None ) return results + # ---- Vector Search integration ---- + + def set_vector_search_client(self, client: VectorSearchClient) -> None: + """Set the Vector Search microservice client.""" + self._vector_search_client = client + + @property + def vector_search_available(self) -> bool: + """True if vector search client is configured.""" + return self._vector_search_client is not None + + async def _vector_search_async( + self, query: str, top_k: int, collection_slug: str = "default" + ) -> list[dict]: + """Search via Vector Search microservice. Returns results in standard format.""" + if not self._vector_search_client: + return [] + + try: + results = await self._vector_search_client.search( + text=query, group=collection_slug, limit=top_k, min_similarity=0.3 + ) + except Exception as e: + logger.warning("Vector Search query failed: %s", e) + return [] + + output = [] + for r in results: + meta = r.get("metadata", {}) + output.append( + { + "title": meta.get("title", ""), + "body": r.get("text", "")[:500], + "source_file": meta.get("doc_id", meta.get("source_file", "")), + "score": r.get("similarity", 0.0), + "engine": "vector_search", + } + ) + return output + + async def search_async( + self, query: str, top_k: int = 3, collection_id: Optional[int] = None + ) -> list[dict]: + """Parallel search across all engines: BM25 + embeddings + vector search. + + Returns deduplicated, merged results sorted by best score. + """ + import asyncio + + # BM25 + embeddings (sync, run in thread) + local_results = await asyncio.to_thread(self.search, query, top_k, collection_id) + + # Vector Search (async) + collection_slug = "default" + if collection_id is not None and collection_id in self._collection_indexes: + # Use collection_id as group name + collection_slug = str(collection_id) + + vs_results = await self._vector_search_async(query, top_k, collection_slug) + + # Merge and deduplicate + return self._merge_results(local_results, vs_results, top_k) + + async def retrieve_async( + self, + query: str, + top_k: int = 3, + max_chars: int = 2500, + collection_id: Optional[int] = None, + ) -> str: + """Like retrieve() but includes vector search results. + + Returns formatted markdown context string. + """ + results = await self.search_async(query, top_k, collection_id) + if not results: + return "" + + return self._format_results(results, max_chars) + + async def retrieve_multi_async( + self, + query: str, + collection_ids: list[int], + top_k: int = 3, + max_chars: int = 3000, + ) -> str: + """Like retrieve_multi() but includes vector search results.""" + import asyncio + + if not collection_ids or not query.strip(): + return "" + + # BM25 multi-collection (sync) + local_results_raw = await asyncio.to_thread( + self._retrieve_multi_search, query, collection_ids, top_k + ) + + # Vector Search across all collection slugs (async) + vs_tasks = [] + for cid in collection_ids: + vs_tasks.append(self._vector_search_async(query, top_k, str(cid))) + + vs_all = await asyncio.gather(*vs_tasks) + vs_results = [r for batch in vs_all for r in batch] + + merged = self._merge_results(local_results_raw, vs_results, top_k) + if not merged: + return "" + + return self._format_results(merged, max_chars) + + def _retrieve_multi_search( + self, query: str, collection_ids: list[int], top_k: int + ) -> list[dict]: + """BM25 search across multiple collections. Returns structured results.""" + query_tokens = self._tokenize(query) + if not query_tokens: + return [] + + all_scored: list[tuple[float, WikiSection]] = [] + for cid in collection_ids: + if cid not in self._collection_indexes: + continue + cidx = self._collection_indexes[cid] + for section in cidx.sections: + score = self._bm25_score_with_index( + query_tokens, section, cidx.doc_freqs, cidx.total_docs, cidx.avg_dl + ) + if score >= MIN_SCORE: + all_scored.append((score, section)) + + if not all_scored: + return [] + + all_scored.sort(key=lambda x: x[0], reverse=True) + results = [] + for score, section in all_scored[:top_k]: + results.append( + { + "title": section.title, + "body": section.body[:500], + "source_file": section.source_file, + "score": round(score, 3), + "engine": "bm25", + } + ) + return results + + @staticmethod + def _merge_results(local_results: list[dict], vs_results: list[dict], top_k: int) -> list[dict]: + """Merge and deduplicate results from multiple engines. + + Deduplicates by (source_file, title), keeping the highest score. + """ + seen: dict[tuple[str, str], dict] = {} + for r in local_results + vs_results: + key = (r.get("source_file", ""), r.get("title", "")) + if key in seen: + if r.get("score", 0) > seen[key].get("score", 0): + seen[key] = r + else: + seen[key] = r + + merged = sorted(seen.values(), key=lambda x: x.get("score", 0), reverse=True) + return merged[:top_k] + + @staticmethod + def _format_results(results: list[dict], max_chars: int) -> str: + """Format search results into markdown context string.""" + parts: list[str] = ["[Документация по теме:]"] + total_chars = len(parts[0]) + + for r in results: + header_line = f"\n\n## {r['title']} ({r['source_file']})" + body = r.get("body", "") + available = max_chars - total_chars - len(header_line) - 4 + if available <= 0: + break + if len(body) > available: + body = body[:available] + "..." + + part = f"{header_line}\n{body}" + parts.append(part) + total_chars += len(part) + + if total_chars >= max_chars: + break + + return "".join(parts) if len(parts) > 1 else "" + def list_source_files(self) -> list[str]: """List unique source files in the index.""" return sorted({s.source_file for s in self.sections}) @@ -769,10 +964,21 @@ def stats(self) -> dict: "unique_tokens": len(cidx.doc_freqs), } + engine_parts = [] + if self._embeddings: + engine_parts.append("embeddings") + engine_parts.append("bm25") + if self._vector_search_client: + engine_parts.append("vector_search") + return { - "engine": "embeddings+bm25" if self._embeddings else "bm25", + "engine": "+".join(engine_parts), "embedding_engine": embedding_engine, "embedding_sections": len(self._embeddings), + "vector_search_available": self._vector_search_client is not None, + "vector_search_url": ( + self._vector_search_client.base_url if self._vector_search_client else None + ), "sections_indexed": len(self.sections), "files_indexed": self._files_indexed, "unique_tokens": len(self.doc_freqs), diff --git a/docker-compose.yml b/docker-compose.yml index e40bb40..f6d7eaa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -95,6 +95,9 @@ services: - ADMIN_JWT_SECRET=${ADMIN_JWT_SECRET:-} # amoCRM proxy (run scripts/amocrm_proxy.py on host) - AMOCRM_PROXY=${AMOCRM_PROXY:-} + # Vector Search microservice + - VECTOR_SEARCH_URL=${VECTOR_SEARCH_URL:-} + - VECTOR_SEARCH_TOKEN=${VECTOR_SEARCH_TOKEN:-} # TTS - COQUI_TOS_AGREED=1 - TTS_CACHE_PATH=/root/.local/share/tts @@ -183,12 +186,38 @@ services: networks: - ai-secretary + # --------------------------------------------------------------------------- + # Vector Search - Semantic Search Microservice (optional) + # --------------------------------------------------------------------------- + # Start with: docker compose --profile vector-search up -d + vector-search: + build: ./services/vector-search + container_name: ai-secretary-vector-search + profiles: ["vector-search"] + ports: + - "8003:8000" + environment: + - VECTOR_SEARCH_TOKEN=${VECTOR_SEARCH_TOKEN:-} + volumes: + - vector_search_data:/app/data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + restart: unless-stopped + networks: + - ai-secretary + # ============================================================================= # Volumes # ============================================================================= volumes: redis_data: driver: local + vector_search_data: + driver: local # ============================================================================= # Networks diff --git a/modules/chat/router.py b/modules/chat/router.py index 2d27069..3c5c539 100644 --- a/modules/chat/router.py +++ b/modules/chat/router.py @@ -141,12 +141,26 @@ def _should_use_agentic_rag( def _execute_knowledge_search(wiki_rag, query: str, collection_ids: list[int]) -> str: - """Execute a knowledge base search and return results text.""" + """Execute a knowledge base search and return results text (sync).""" if len(collection_ids) == 1: return wiki_rag.retrieve(query, top_k=5, max_chars=3000, collection_id=collection_ids[0]) return wiki_rag.retrieve_multi(query, collection_ids, top_k=5, max_chars=3000) +async def _execute_knowledge_search_async(wiki_rag, query: str, collection_ids: list[int]) -> str: + """Execute knowledge search with vector search support (async).""" + if not wiki_rag.vector_search_available: + import asyncio + + return await asyncio.to_thread(_execute_knowledge_search, wiki_rag, query, collection_ids) + + if len(collection_ids) == 1: + return await wiki_rag.retrieve_async( + query, top_k=5, max_chars=3000, collection_id=collection_ids[0] + ) + return await wiki_rag.retrieve_multi_async(query, collection_ids, top_k=5, max_chars=3000) + + def _build_tools(use_agentic: bool, use_web_search: bool) -> list[dict]: """Build the tools list based on enabled features.""" tools: list[dict] = [] @@ -376,6 +390,59 @@ def _inject_rag_context( return f"{base}{no_context_instruction}" +async def _inject_rag_context_async( + wiki_rag, + user_content: str, + base_prompt: Optional[str], + rag_mode: str, + collection_ids: list[int], +) -> Optional[str]: + """Async version of _inject_rag_context — includes vector search results.""" + if not wiki_rag or not user_content or rag_mode == "none" or not collection_ids: + return base_prompt + + if not wiki_rag.vector_search_available: + import asyncio + + return await asyncio.to_thread( + _inject_rag_context, wiki_rag, user_content, base_prompt, rag_mode, collection_ids + ) + + if len(collection_ids) == 1: + wiki_context = await wiki_rag.retrieve_async( + user_content, top_k=7, max_chars=4000, collection_id=collection_ids[0] + ) + else: + wiki_context = await wiki_rag.retrieve_multi_async( + user_content, collection_ids, top_k=7, max_chars=4000 + ) + + logger.info( + f"RAG inject (async): context found={bool(wiki_context)}, " + f"len={len(wiki_context) if wiki_context else 0}" + ) + + base = base_prompt or _DEFAULT_RAG_PROMPT + if wiki_context: + rag_instruction = ( + "\n\n--- КОНТЕКСТ ИЗ БАЗЫ ЗНАНИЙ (обязательно к использованию) ---\n" + "Ниже приведена релевантная информация из базы знаний. " + "ОБЯЗАТЕЛЬНО используй эти данные при ответе. " + "Если информация ниже отвечает на вопрос пользователя — ответь на основе неё. " + "НЕ выдумывай информацию, которой нет в контексте ниже.\n" + ) + return f"{base}{rag_instruction}\n{wiki_context}" + + no_context_instruction = ( + "\n\n--- ВАЖНО ---\n" + "По данному запросу в базе знаний не найдено релевантной информации. " + "НЕ выдумывай ответ. Если ты не уверен в точности информации — " + "честно скажи, что не нашёл данных в базе знаний, и предложи " + "обратиться к менеджеру или уточнить вопрос.\n" + ) + return f"{base}{no_context_instruction}" + + # ============== Token Counting Helpers ============== @@ -763,7 +830,7 @@ async def admin_send_chat_message( use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) if not use_agentic: - default_prompt = _inject_rag_context( + default_prompt = await _inject_rag_context_async( wiki_rag, msg_request.content, default_prompt, rag_mode, collection_ids ) @@ -1013,7 +1080,7 @@ async def admin_stream_chat_message( if not use_agentic: # One-shot RAG: inject context into prompt (existing behavior) - default_prompt = _inject_rag_context( + default_prompt = await _inject_rag_context_async( wiki_rag, msg_request.content, default_prompt, rag_mode, collection_ids ) @@ -1168,7 +1235,7 @@ async def admin_edit_chat_message( use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) if not use_agentic: - default_prompt = _inject_rag_context( + default_prompt = await _inject_rag_context_async( wiki_rag, request.content, default_prompt, rag_mode, collection_ids ) @@ -1297,7 +1364,7 @@ async def admin_regenerate_chat_response( use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) if not use_agentic: - default_prompt = _inject_rag_context( + default_prompt = await _inject_rag_context_async( wiki_rag, user_content, default_prompt, rag_mode, collection_ids ) diff --git a/modules/core/router_health.py b/modules/core/router_health.py index e495ee9..0741514 100644 --- a/modules/core/router_health.py +++ b/modules/core/router_health.py @@ -96,6 +96,19 @@ async def health_check(): if container.streaming_tts_manager is not None: result["streaming_tts_stats"] = container.streaming_tts_manager.get_stats() + # Vector Search status + vs_client = container.vector_search_client + if vs_client is not None: + try: + vs_health = await vs_client.health() + result["vector_search"] = { + "status": "ok" if vs_health else "unavailable", + "url": vs_client.base_url, + **(vs_health if vs_health else {}), + } + except Exception: + result["vector_search"] = {"status": "unavailable", "url": vs_client.base_url} + # Internet monitor status im = getattr(container, "internet_monitor", None) if im is not None: diff --git a/modules/knowledge/router_wiki_rag.py b/modules/knowledge/router_wiki_rag.py index 6522ed1..eefc976 100644 --- a/modules/knowledge/router_wiki_rag.py +++ b/modules/knowledge/router_wiki_rag.py @@ -404,14 +404,20 @@ async def wiki_rag_search( request: WikiSearchRequest, user: User = Depends(require_permission("wiki", "view")), ): - """Test search: query → scored results. Optionally filter by collection.""" + """Test search: query → scored results. Uses all available engines including vector search.""" wiki_rag = _get_wiki_rag() if not wiki_rag: return {"results": [], "query": request.query} - results = wiki_rag.search( - request.query, top_k=request.top_k, collection_id=request.collection_id - ) + # Use async search if vector search is available + if wiki_rag.vector_search_available: + results = await wiki_rag.search_async( + request.query, top_k=request.top_k, collection_id=request.collection_id + ) + else: + results = wiki_rag.search( + request.query, top_k=request.top_k, collection_id=request.collection_id + ) return {"results": results, "query": request.query} @@ -624,3 +630,50 @@ async def delete_document(doc_id: int, user: User = Depends(require_permission(" ) return {"status": "ok", "deleted": doc["filename"]} + + +# ---- Vector Search Endpoints ---- + + +@router.get("/vector-search/status") +async def vector_search_status(user: User = Depends(require_permission("wiki", "view"))): + """Get Vector Search microservice status.""" + container = get_container() + vs_client = container.vector_search_client + if not vs_client: + return {"available": False, "message": "Vector Search not configured"} + + health = await vs_client.health() + if not health: + return {"available": False, "message": "Vector Search service unavailable"} + + return { + "available": True, + "url": vs_client.base_url, + "health": health, + } + + +@router.post("/vector-search/sync") +async def vector_search_sync(user: User = Depends(require_permission("wiki", "manage"))): + """Manually trigger full sync of all sections to Vector Search.""" + container = get_container() + vs_client = container.vector_search_client + wiki_rag = container.wiki_rag_service + + if not vs_client: + raise HTTPException(status_code=503, detail="Vector Search не настроен") + if not wiki_rag: + raise HTTPException(status_code=503, detail="Wiki RAG сервис не инициализирован") + + from modules.knowledge.tasks import sync_vector_search + + await sync_vector_search(wiki_rag, vs_client) + + await audit_service.log( + action="sync", + resource="vector_search", + user_id=user.username, + ) + + return {"status": "ok", "message": "Vector Search sync completed"} diff --git a/modules/knowledge/startup.py b/modules/knowledge/startup.py index 2601b7c..848045e 100644 --- a/modules/knowledge/startup.py +++ b/modules/knowledge/startup.py @@ -1,6 +1,7 @@ """Knowledge domain startup: FAQ reload, Wiki RAG init, event subscriptions.""" import logging +import os from functools import partial from pathlib import Path @@ -65,6 +66,18 @@ async def _handle_dataset_sync(event, collection_svc, doc_svc) -> None: filenames = [d["filename"] for d in event.documents] wiki_rag.reload_collection(collection_id, filenames, Path(event.base_dir)) + # Sync to Vector Search if available + vs_client = container.vector_search_client + if vs_client and wiki_rag: + try: + from modules.knowledge.tasks import sync_collection_to_vector_search + + await sync_collection_to_vector_search( + wiki_rag, vs_client, collection_id, event.collection_slug + ) + except Exception as vs_err: + logger.warning("Vector Search sync failed for %s: %s", event.collection_slug, vs_err) + logger.info( "DatasetSynced handled: source=%s slug=%s docs=%d", event.source, @@ -97,6 +110,16 @@ async def _handle_dataset_clear(event, collection_svc, doc_svc) -> None: else: wiki_rag.reload_collection(collection_id, [], Path(event.base_dir)) + # Clean up Vector Search + vs_client = container.vector_search_client + if vs_client and event.delete_collection: + try: + await vs_client.delete_group(event.collection_slug) + except Exception as vs_err: + logger.warning( + "Vector Search group delete failed for %s: %s", event.collection_slug, vs_err + ) + # Delete collection record if requested if event.delete_collection: await collection_svc.delete(collection_id) @@ -185,5 +208,24 @@ async def init_wiki_rag(container, deployment_mode: str, task_registry) -> None: "wiki-collection-indexes", partial(load_collection_indexes, wiki_rag) ) + # Initialize Vector Search client if configured + vector_url = os.environ.get("VECTOR_SEARCH_URL", "") + vector_token = os.environ.get("VECTOR_SEARCH_TOKEN", "") + if vector_url: + try: + from app.services.vector_search_client import VectorSearchClient + from modules.knowledge.tasks import sync_vector_search + + vs_client = VectorSearchClient(base_url=vector_url, token=vector_token) + container.vector_search_client = vs_client + wiki_rag.set_vector_search_client(vs_client) + + task_registry.register( + "vector-search-sync", partial(sync_vector_search, wiki_rag, vs_client) + ) + logger.info("✅ Vector Search client: %s", vector_url) + except Exception as vs_err: + logger.warning("⚠️ Vector Search client init failed: %s", vs_err) + except Exception as wiki_err: logger.warning(f"⚠️ Wiki RAG service not available: {wiki_err}") diff --git a/modules/knowledge/tasks.py b/modules/knowledge/tasks.py index 7f66eee..e883fc2 100644 --- a/modules/knowledge/tasks.py +++ b/modules/knowledge/tasks.py @@ -1,9 +1,16 @@ -"""Knowledge domain background tasks: Wiki RAG embeddings and collection indexes.""" +"""Knowledge domain background tasks: Wiki RAG embeddings, collection indexes, vector search sync.""" + +from __future__ import annotations import asyncio import logging from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.services.vector_search_client import VectorSearchClient + from app.services.wiki_rag_service import WikiRAGService logger = logging.getLogger(__name__) @@ -35,3 +42,68 @@ async def load_collection_indexes(wiki_rag) -> None: loaded += 1 if loaded: logger.info(f"📚 Wiki RAG: загружено {loaded} коллекционных индексов") + + +async def sync_vector_search(wiki_rag: WikiRAGService, vs_client: VectorSearchClient) -> None: + """Full sync: upsert all sections from all collections into Vector Search.""" + from db.integration import async_knowledge_collection_manager + + # Check connectivity + health = await vs_client.health() + if not health: + logger.warning("Vector Search: service unavailable, skipping sync") + return + + total_upserted = 0 + + # Sync per-collection indexes + collections = await async_knowledge_collection_manager.get_all(enabled_only=True) + for col in collections: + slug = col.get("slug", str(col["id"])) + count = await sync_collection_to_vector_search(wiki_rag, vs_client, col["id"], slug) + total_upserted += count + + # Sync global index + for section in wiki_rag.sections: + text = f"{section.title}\n{section.body}" + await vs_client.upsert( + text=text, + doc_id=section.source_file, + group="default", + metadata={"title": section.title, "source_file": section.source_file}, + ) + total_upserted += 1 + + logger.info("✅ Vector Search sync: %d sections upserted", total_upserted) + + +async def sync_collection_to_vector_search( + wiki_rag: WikiRAGService, + vs_client: VectorSearchClient, + collection_id: int, + collection_slug: str, +) -> int: + """Sync a single collection's sections to Vector Search. Returns upsert count.""" + group = collection_slug or str(collection_id) + idx = wiki_rag._collection_indexes.get(collection_id) + if not idx: + return 0 + + count = 0 + for section in idx.sections: + text = f"{section.title}\n{section.body}" + await vs_client.upsert( + text=text, + doc_id=section.source_file, + group=group, + metadata={"title": section.title, "source_file": section.source_file}, + ) + count += 1 + + logger.info( + "Vector Search: synced %d sections for collection %s (slug=%s)", + count, + collection_id, + group, + ) + return count diff --git a/services/vector-search/Dockerfile b/services/vector-search/Dockerfile new file mode 100644 index 0000000..6dac99f --- /dev/null +++ b/services/vector-search/Dockerfile @@ -0,0 +1,33 @@ +FROM python:3.11-slim AS builder + +WORKDIR /app + +# Install build deps +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential curl && \ + rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt && \ + python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')" + +FROM python:3.11-slim + +WORKDIR /app + +# Copy installed packages from builder +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Copy cached model from builder +COPY --from=builder /root/.cache/huggingface /root/.cache/huggingface + +# Install curl for healthcheck +RUN apt-get update && apt-get install -y --no-install-recommends curl && \ + rm -rf /var/lib/apt/lists/* + +COPY main.py . + +EXPOSE 8000 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/vector-search/main.py b/services/vector-search/main.py new file mode 100644 index 0000000..89b7a20 --- /dev/null +++ b/services/vector-search/main.py @@ -0,0 +1,355 @@ +""" +Vector Search microservice — semantic search engine using ChromaDB + sentence-transformers. + +Model: paraphrase-multilingual-mpnet-base-v2 (768 dims) +Storage: ChromaDB persistent (./data/) +Auth: Bearer token via VECTOR_SEARCH_TOKEN env var +""" + +import hashlib +import logging +import os +from typing import Optional + +import chromadb +from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from pydantic import BaseModel, Field +from sentence_transformers import SentenceTransformer + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# ---- Configuration ---- + +MODEL_NAME = os.environ.get("VECTOR_SEARCH_MODEL", "paraphrase-multilingual-mpnet-base-v2") +DATA_DIR = os.environ.get("VECTOR_SEARCH_DATA_DIR", "./data") +AUTH_TOKEN = os.environ.get("VECTOR_SEARCH_TOKEN", "") +DEFAULT_CHUNK_SIZE = 500 +DEFAULT_CHUNK_OVERLAP = 50 + +# ---- App init ---- + +app = FastAPI(title="Vector Search", version="1.0.0") + +# Load model +logger.info("Loading model: %s", MODEL_NAME) +model = SentenceTransformer(MODEL_NAME) +logger.info("Model loaded, dims=%d", model.get_sentence_embedding_dimension()) + +# ChromaDB persistent client +chroma_client = chromadb.PersistentClient(path=DATA_DIR) + +# ---- Auth ---- + +security = HTTPBearer(auto_error=False) + + +async def verify_token( + request: Request, + credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), +): + """Verify Bearer token if VECTOR_SEARCH_TOKEN is configured.""" + if not AUTH_TOKEN: + return # No auth configured + if not credentials or credentials.credentials != AUTH_TOKEN: + raise HTTPException(status_code=401, detail="Invalid or missing token") + + +# ---- Pydantic models ---- + + +class UpsertRequest(BaseModel): + text: str + doc_id: str = "" + group: str = "default" + chunk_size: Optional[int] = Field(default=DEFAULT_CHUNK_SIZE, ge=50) + chunk_overlap: Optional[int] = Field(default=DEFAULT_CHUNK_OVERLAP, ge=0) + metadata: dict = Field(default_factory=dict) + + +class SearchRequest(BaseModel): + text: str + group: str = "default" + doc_id: str = "" + min_similarity: float = Field(default=0.3, ge=0.0, le=1.0) + limit: int = Field(default=5, ge=1, le=100) + + +class CompareRequest(BaseModel): + text: str + record_id: str + + +class DeleteRecordRequest(BaseModel): + record_id: str + + +class DeleteDocumentRequest(BaseModel): + doc_id: str + group: str = "default" + + +class DeleteGroupRequest(BaseModel): + group: str + + +# ---- Helpers ---- + + +def _get_collection(group: str): + """Get or create a ChromaDB collection for the given group.""" + safe_name = group.replace("/", "_").replace("\\", "_")[:63] + if not safe_name: + safe_name = "default" + return chroma_client.get_or_create_collection( + name=safe_name, + metadata={"hnsw:space": "cosine"}, + ) + + +def _chunk_text(text: str, chunk_size: int, overlap: int) -> list[str]: + """Split text into chunks with overlap.""" + if chunk_size <= 0 or len(text) <= chunk_size: + return [text] + chunks = [] + start = 0 + while start < len(text): + end = start + chunk_size + chunks.append(text[start:end]) + start = end - overlap + if start >= len(text): + break + return chunks + + +def _record_id(doc_id: str, chunk_idx: int, text: str) -> str: + """Generate a stable record ID.""" + raw = f"{doc_id}::{chunk_idx}::{hashlib.md5(text.encode()).hexdigest()[:8]}" + return hashlib.md5(raw.encode()).hexdigest() + + +# ---- Endpoints ---- + + +@app.get("/health") +async def health(): + """Health check.""" + return { + "status": "ok", + "model": MODEL_NAME, + "dims": model.get_sentence_embedding_dimension(), + "collections": len(chroma_client.list_collections()), + } + + +@app.post("/upsert", dependencies=[Depends(verify_token)]) +async def upsert(request: UpsertRequest): + """Upsert text into vector store. Chunks text if chunk_size is set.""" + collection = _get_collection(request.group) + + chunks = _chunk_text( + request.text, + request.chunk_size or len(request.text) + 1, + request.chunk_overlap or 0, + ) + + ids = [] + documents = [] + embeddings = [] + metadatas = [] + + for i, chunk in enumerate(chunks): + chunk = chunk.strip() + if not chunk: + continue + + rid = _record_id(request.doc_id, i, chunk) + ids.append(rid) + documents.append(chunk) + embeddings.append(model.encode(chunk).tolist()) + metadatas.append( + { + "doc_id": request.doc_id, + "chunk_index": i, + "chunk_count": len(chunks), + **request.metadata, + } + ) + + if ids: + collection.upsert(ids=ids, documents=documents, embeddings=embeddings, metadatas=metadatas) + + return {"status": "ok", "record_ids": ids, "chunks": len(ids)} + + +@app.post("/search", dependencies=[Depends(verify_token)]) +async def search(request: SearchRequest): + """Semantic search within a group (collection).""" + collection = _get_collection(request.group) + + if collection.count() == 0: + return {"results": [], "total": 0} + + query_embedding = model.encode(request.text).tolist() + + where_filter = None + if request.doc_id: + where_filter = {"doc_id": request.doc_id} + + results = collection.query( + query_embeddings=[query_embedding], + n_results=min(request.limit, collection.count()), + where=where_filter, + include=["documents", "metadatas", "distances"], + ) + + output = [] + if results and results["ids"] and results["ids"][0]: + for i, rid in enumerate(results["ids"][0]): + # ChromaDB cosine distance: 0 = identical, 2 = opposite + # Convert to similarity: 1 - (distance / 2) + distance = results["distances"][0][i] + similarity = 1.0 - (distance / 2.0) + + if similarity < request.min_similarity: + continue + + output.append( + { + "record_id": rid, + "text": results["documents"][0][i], + "similarity": round(similarity, 4), + "metadata": results["metadatas"][0][i] if results["metadatas"] else {}, + } + ) + + return {"results": output, "total": len(output)} + + +@app.post("/compare", dependencies=[Depends(verify_token)]) +async def compare(request: CompareRequest): + """Compare text similarity with a specific record.""" + # Search all collections for the record + for col_info in chroma_client.list_collections(): + col = chroma_client.get_collection(col_info.name) + try: + record = col.get(ids=[request.record_id], include=["embeddings"]) + if record and record["ids"]: + query_embedding = model.encode(request.text).tolist() + stored_embedding = record["embeddings"][0] + + # Cosine similarity + dot = sum(a * b for a, b in zip(query_embedding, stored_embedding, strict=False)) + norm_q = sum(a * a for a in query_embedding) ** 0.5 + norm_s = sum(a * a for a in stored_embedding) ** 0.5 + similarity = dot / (norm_q * norm_s) if norm_q and norm_s else 0.0 + + return {"similarity": round(similarity, 4), "record_id": request.record_id} + except Exception: + continue + + raise HTTPException(status_code=404, detail=f"Record {request.record_id} not found") + + +@app.get("/count", dependencies=[Depends(verify_token)]) +async def count(group: str = "default"): + """Count records in a group.""" + collection = _get_collection(group) + return {"count": collection.count(), "group": group} + + +@app.get("/ids", dependencies=[Depends(verify_token)]) +async def get_ids(group: str = "default", doc_id: str = ""): + """Get record IDs in a group, optionally filtered by doc_id.""" + collection = _get_collection(group) + + where_filter = None + if doc_id: + where_filter = {"doc_id": doc_id} + + result = collection.get(where=where_filter, include=[]) + return {"ids": result["ids"], "total": len(result["ids"])} + + +@app.get("/records", dependencies=[Depends(verify_token)]) +async def get_records(group: str = "default", doc_id: str = "", limit: int = 100): + """Get records with text and metadata.""" + collection = _get_collection(group) + + where_filter = None + if doc_id: + where_filter = {"doc_id": doc_id} + + result = collection.get( + where=where_filter, + include=["documents", "metadatas"], + limit=limit, + ) + + records = [] + for i, rid in enumerate(result["ids"]): + records.append( + { + "record_id": rid, + "text": result["documents"][i] if result["documents"] else "", + "metadata": result["metadatas"][i] if result["metadatas"] else {}, + } + ) + + return {"records": records, "total": len(records)} + + +@app.post("/delete/record", dependencies=[Depends(verify_token)]) +async def delete_record(request: DeleteRecordRequest): + """Delete a single record by ID (searches all collections).""" + for col_info in chroma_client.list_collections(): + col = chroma_client.get_collection(col_info.name) + try: + existing = col.get(ids=[request.record_id], include=[]) + if existing and existing["ids"]: + col.delete(ids=[request.record_id]) + return {"status": "ok", "deleted": request.record_id} + except Exception: + continue + + raise HTTPException(status_code=404, detail=f"Record {request.record_id} not found") + + +@app.post("/delete/document", dependencies=[Depends(verify_token)]) +async def delete_document(request: DeleteDocumentRequest): + """Delete all records for a doc_id within a group.""" + collection = _get_collection(request.group) + + result = collection.get(where={"doc_id": request.doc_id}, include=[]) + ids = result["ids"] + + if ids: + collection.delete(ids=ids) + + return {"status": "ok", "deleted_count": len(ids), "doc_id": request.doc_id} + + +@app.post("/delete/group", dependencies=[Depends(verify_token)]) +async def delete_group(request: DeleteGroupRequest): + """Delete an entire group (collection).""" + safe_name = request.group.replace("/", "_").replace("\\", "_")[:63] + if not safe_name: + safe_name = "default" + + try: + chroma_client.delete_collection(safe_name) + return {"status": "ok", "deleted_group": request.group} + except Exception: + return {"status": "ok", "deleted_group": request.group, "note": "collection did not exist"} + + +@app.post("/clear", dependencies=[Depends(verify_token)]) +async def clear(): + """Delete all collections.""" + collections = chroma_client.list_collections() + count = len(collections) + for col_info in collections: + chroma_client.delete_collection(col_info.name) + return {"status": "ok", "deleted_collections": count} diff --git a/services/vector-search/requirements.txt b/services/vector-search/requirements.txt new file mode 100644 index 0000000..dbb07d3 --- /dev/null +++ b/services/vector-search/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 +sentence-transformers>=2.2.0 +chromadb>=0.4.0 +pydantic>=2.0