Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/en/guides/01-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,24 @@ Reranking model for search result refinement.

If rerank is not configured, search uses vector similarity only.

### retrieval indexing

Long text files are chunked during file-level vectorization so a single oversized file does not
collapse into one coarse L2 embedding only. Chunk hits are collapsed back to the base file URI at
retrieval time.

```json
{
"file_chunk_chars": 4000,
"file_chunk_overlap": 400
}
```

| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `file_chunk_chars` | int | Maximum characters per chunk when vectorizing long text files | `4000` |
| `file_chunk_overlap` | int | Overlapping characters between adjacent file chunks | `400` |

### storage

Storage configuration for context data, including file storage (AGFS) and vector database storage (VectorDB).
Expand Down Expand Up @@ -837,6 +855,8 @@ For details on the lock mechanism, see [Path Locks and Crash Recovery](../concep
"code": {
"code_summary_mode": "ast"
},
"file_chunk_chars": 4000,
"file_chunk_overlap": 400,
"server": {
"host": "0.0.0.0",
"port": 1933,
Expand Down
19 changes: 19 additions & 0 deletions docs/zh/guides/01-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,23 @@ AST 提取支持:Python、JavaScript/TypeScript、Rust、Go、Java、C/C++。

如果未配置 Rerank,搜索仅使用向量相似度。

### 检索索引

长文本文件在文件级向量化阶段会被切分为多个 chunk,避免一个超长文件只生成单个粗粒度的
L2 向量。检索阶段会再把这些 chunk 命中折叠回基础文件 URI,对外仍返回文件级结果。

```json
{
"file_chunk_chars": 4000,
"file_chunk_overlap": 400
}
```

| 参数 | 类型 | 说明 | 默认值 |
|------|------|------|--------|
| `file_chunk_chars` | int | 长文本文件向量化时每个 chunk 的最大字符数 | `4000` |
| `file_chunk_overlap` | int | 相邻文件 chunk 之间的重叠字符数 | `400` |

### storage

用于存储上下文数据 ,包括文件存储(AGFS)和向量库存储(VectorDB)。
Expand Down Expand Up @@ -814,6 +831,8 @@ HTTP 客户端(`SyncHTTPClient` / `AsyncHTTPClient`)和 CLI 工具连接远
"code": {
"code_summary_mode": "ast"
},
"file_chunk_chars": 4000,
"file_chunk_overlap": 400,
"server": {
"host": "string",
"port": 1933,
Expand Down
64 changes: 62 additions & 2 deletions openviking/retrieve/hierarchical_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
"""

import heapq
import math
import logging
import math
import re
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -51,6 +52,7 @@ class HierarchicalRetriever:
GLOBAL_SEARCH_TOPK = 5 # Global retrieval count
HOTNESS_ALPHA = 0.2 # Weight for hotness score in final ranking (0 = disabled)
LEVEL_URI_SUFFIX = {0: ".abstract.md", 1: ".overview.md"}
CHUNK_URI_PATTERN = re.compile(r"#chunk_\d+$")

def __init__(
self,
Expand Down Expand Up @@ -177,20 +179,22 @@ async def retrieve(
# 从 global_results 中提取 level 2 的文件作为初始候选者
initial_candidates = [r for r in global_results if r.get("level", 2) == 2]

initial_candidates = [r for r in global_results if r.get("level", 2) == 2]
initial_candidates = self._prepare_initial_candidates(
query.query,
initial_candidates,
mode=mode,
)

# Step 4: Recursive search
recursive_limit = max(limit * 3, 10)
candidates = await self._recursive_search(
vector_proxy=vector_proxy,
query=query.query,
query_vector=query_vector,
sparse_query_vector=sparse_query_vector,
starting_points=starting_points,
limit=limit,
limit=recursive_limit,
mode=mode,
threshold=effective_threshold,
score_gte=score_gte,
Expand Down Expand Up @@ -501,6 +505,7 @@ async def _convert_to_matched_contexts(
is controlled by ``HOTNESS_ALPHA`` (0 disables the boost).
"""
results = []
candidates = self._collapse_chunk_candidates(candidates)

for c in candidates:
# Read related contexts and get summaries
Expand Down Expand Up @@ -563,6 +568,61 @@ async def _convert_to_matched_contexts(
results.sort(key=lambda x: x.score, reverse=True)
return results

@classmethod
def _base_uri_for_chunk(cls, uri: str) -> str:
"""Strip chunk suffix from vector-only file chunk URIs."""
if not uri:
return uri
return cls.CHUNK_URI_PATTERN.sub("", uri)

@classmethod
def _collapse_chunk_candidates(cls, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Collapse chunk-level file hits back to a single file-level candidate."""
collapsed: Dict[str, Dict[str, Any]] = {}

for candidate in candidates:
candidate_copy = dict(candidate)
original_uri = candidate_copy.get("uri", "")
base_uri = cls._base_uri_for_chunk(original_uri)
is_chunk = base_uri != original_uri
candidate_copy["uri"] = base_uri

previous = collapsed.get(base_uri)
candidate_score = candidate_copy.get("_final_score", candidate_copy.get("_score", 0.0))

if previous is None:
collapsed[base_uri] = candidate_copy
else:
previous_score = previous.get("_final_score", previous.get("_score", 0.0))
if candidate_score > previous_score:
preserved_abstract = previous.get("abstract", "")
preserved_context_type = previous.get("context_type")
preserved_category = previous.get("category")
collapsed[base_uri] = candidate_copy
if preserved_abstract and is_chunk and not candidate_copy.get("abstract"):
collapsed[base_uri]["abstract"] = preserved_abstract
if preserved_context_type and not candidate_copy.get("context_type"):
collapsed[base_uri]["context_type"] = preserved_context_type
if preserved_category and not candidate_copy.get("category"):
collapsed[base_uri]["category"] = preserved_category
else:
if not previous.get("abstract") and candidate_copy.get("abstract"):
previous["abstract"] = candidate_copy["abstract"]
if not previous.get("context_type") and candidate_copy.get("context_type"):
previous["context_type"] = candidate_copy["context_type"]
if not previous.get("category") and candidate_copy.get("category"):
previous["category"] = candidate_copy["category"]

collapsed_candidate = collapsed[base_uri]
if not is_chunk and candidate_copy.get("abstract"):
collapsed_candidate["abstract"] = candidate_copy["abstract"]

return sorted(
collapsed.values(),
key=lambda item: item.get("_final_score", item.get("_score", 0.0)),
reverse=True,
)

@classmethod
def _append_level_suffix(cls, uri: str, level: int) -> str:
"""Return user-facing URI with L0/L1 suffix reconstructed by level."""
Expand Down
115 changes: 101 additions & 14 deletions openviking/utils/embedding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,54 @@

import os
from datetime import datetime
from typing import Dict, Optional
from typing import Any, Dict, List, Optional, cast

from openviking.core.context import Context, ContextLevel, ResourceContentType, Vectorize
from openviking.server.identity import RequestContext
from openviking.storage.queuefs import get_queue_manager
from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter
from openviking.storage.viking_fs import get_viking_fs
from openviking_cli.utils import VikingURI, get_logger
from openviking_cli.utils.config import get_openviking_config

logger = get_logger(__name__)


def _chunk_text(text: str, chunk_chars: int, chunk_overlap: int) -> List[str]:
"""Split text into overlapping chunks for long-file vectorization."""
if not text:
return []
if len(text) <= chunk_chars:
return [text]

step = max(chunk_chars - chunk_overlap, 1)
chunks = []
start = 0

while start < len(text):
end = min(start + chunk_chars, len(text))
chunks.append(text[start:end])
if end >= len(text):
break
start += step

return chunks


async def _enqueue_context(
context: Context,
embedding_queue: Any,
semantic_msg_id: Optional[str] = None,
) -> bool:
"""Convert a Context into an embedding message and enqueue it."""
embedding_msg = EmbeddingMsgConverter.from_context(context)
if not embedding_msg:
return False
embedding_msg.semantic_msg_id = semantic_msg_id
await embedding_queue.enqueue(embedding_msg)
return True


async def _decrement_embedding_tracker(semantic_msg_id: Optional[str], count: int) -> None:
if not semantic_msg_id or count <= 0:
return
Expand Down Expand Up @@ -146,9 +182,10 @@ async def vectorize_directory_meta(
return

queue_manager = get_queue_manager()
embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING)
embedding_queue = cast(Any, queue_manager.get_queue(queue_manager.EMBEDDING))

parent_uri = VikingURI(uri).parent.uri
parent = VikingURI(uri).parent
parent_uri = parent.uri if parent else ""
owner_space = _owner_space_for_uri(uri, ctx)

# Vectorize L0: .abstract.md (abstract)
Expand Down Expand Up @@ -230,22 +267,25 @@ async def vectorize_file(
return

queue_manager = get_queue_manager()
embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING)
viking_fs = get_viking_fs()
embedding_queue = cast(Any, queue_manager.get_queue(queue_manager.EMBEDDING))
viking_fs = cast(Any, get_viking_fs())
config = get_openviking_config()

file_name = summary_dict.get("name") or os.path.basename(file_path)
summary = summary_dict.get("summary", "")
owner_space = _owner_space_for_uri(file_path, ctx)
created_at = datetime.now()

context = Context(
uri=file_path,
parent_uri=parent_uri,
is_leaf=True,
abstract=summary,
context_type=context_type,
created_at=datetime.now(),
created_at=created_at,
user=ctx.user,
account_id=ctx.account_id,
owner_space=_owner_space_for_uri(file_path, ctx),
owner_space=owner_space,
)

content_type = get_resource_content_type(file_name)
Expand All @@ -271,7 +311,58 @@ async def vectorize_file(
content = await viking_fs.read_file(file_path, ctx=ctx)
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
context.set_vectorize(Vectorize(text=content))
chunks = _chunk_text(
content, config.file_chunk_chars, config.file_chunk_overlap
)
if len(chunks) <= 1:
context.set_vectorize(Vectorize(text=content))
else:
if summary:
context.set_vectorize(Vectorize(text=summary))
if not await _enqueue_context(
context,
embedding_queue,
semantic_msg_id=semantic_msg_id,
):
return
enqueued = True
logger.debug(
"Enqueued canonical summary vector for chunked file: %s", file_path
)

for index, chunk_text in enumerate(chunks):
chunk_context = Context(
uri=f"{file_path}#chunk_{index:04d}",
parent_uri=parent_uri,
is_leaf=True,
abstract=summary,
context_type=context_type,
created_at=created_at,
user=ctx.user,
account_id=ctx.account_id,
owner_space=owner_space,
meta={
"chunk_index": index,
"chunk_count": len(chunks),
"source_uri": file_path,
},
level=ContextLevel.DETAIL,
)
chunk_context.set_vectorize(Vectorize(text=chunk_text))
if not await _enqueue_context(
chunk_context,
embedding_queue,
semantic_msg_id=semantic_msg_id,
):
continue
enqueued = True

logger.debug(
"Enqueued %d chunk vectors for long text file: %s",
len(chunks),
file_path,
)
return
except Exception as e:
logger.warning(
f"Failed to read file content for {file_path}, falling back to summary: {e}"
Expand All @@ -290,12 +381,8 @@ async def vectorize_file(
logger.debug(f"Skipping file {file_path} (no text content or summary)")
return

embedding_msg = EmbeddingMsgConverter.from_context(context)
if not embedding_msg:
if not await _enqueue_context(context, embedding_queue, semantic_msg_id=semantic_msg_id):
return

embedding_msg.semantic_msg_id = semantic_msg_id
await embedding_queue.enqueue(embedding_msg)
enqueued = True
logger.debug(f"Enqueued file for vectorization: {file_path}")

Expand All @@ -316,7 +403,7 @@ async def index_resource(
1. Reads .abstract.md and .overview.md and vectorizes them.
2. Scans files in the directory and vectorizes them.
"""
viking_fs = get_viking_fs()
viking_fs = cast(Any, get_viking_fs())

# 1. Index Directory Metadata
abstract_uri = f"{uri}/.abstract.md"
Expand Down
Loading
Loading