Skip to content
35 changes: 25 additions & 10 deletions src/vectorcode/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,36 @@ class Chunk:
"""

text: str
start: Point
end: Point
start: Point | None = None
end: Point | None = None
path: str | None = None
id: str | None = None

def __str__(self):
return self.text

def __hash__(self) -> int:
return hash(f"VectorCodeChunk_{self.path}({self.start}:{self.end}@{self.text})")

def export_dict(self):
d: dict[str, str | dict[str, int]] = {"text": self.text}
if self.start is not None:
return {
"text": self.text,
"start": {"row": self.start.row, "column": self.start.column},
"end": {"row": self.end.row, "column": self.end.column},
}
else:
return {"text": self.text}
d.update(
{
"start": {"row": self.start.row, "column": self.start.column},
}
)
if self.end is not None:
d.update(
{
"end": {"row": self.end.row, "column": self.end.column},
}
)
if self.path:
d["path"] = self.path
if self.id:
d["chunk_id"] = self.id
return d


@dataclass
Expand Down Expand Up @@ -129,7 +144,7 @@ def chunk(
) -> Generator[Chunk, None, None]:
logger.info("Started chunking %s using FileChunker.", data.name)
lines = data.readlines()
if len(lines) == 0:
if len(lines) == 0: # pragma: nocover
return
if (
self.config.chunk_size < 0
Expand Down
54 changes: 31 additions & 23 deletions src/vectorcode/mcp_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import sys
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, cast
Expand Down Expand Up @@ -160,14 +161,17 @@ async def vectorise_files(paths: list[str], project_root: str) -> dict[str, int]
await remove_orphanes(collection, collection_lock, stats, stats_lock)

return stats.to_dict()
except Exception as e:
logger.error("Failed to access collection at %s", project_root)
raise McpError(
ErrorData(
code=1,
message=f"{e.__class__.__name__}: Failed to create the collection at {project_root}.",
)
)
except Exception as e: # pragma: nocover
if isinstance(e, McpError):
logger.error("Failed to access collection at %s", project_root)
raise
else:
raise McpError(
ErrorData(
code=1,
message="\n".join(traceback.format_exception(e)),
)
) from e


async def query_tool(
Expand Down Expand Up @@ -211,24 +215,28 @@ async def query_tool(
configs=query_config,
)
results: list[str] = []
for path in result_paths:
if os.path.isfile(path):
with open(path) as fin:
rel_path = os.path.relpath(path, config.project_root)
results.append(
f"<path>{rel_path}</path>\n<content>{fin.read()}</content>",
)
for result in result_paths:
if isinstance(result, str):
if os.path.isfile(result):
with open(result) as fin:
rel_path = os.path.relpath(result, config.project_root)
results.append(
f"<path>{rel_path}</path>\n<content>{fin.read()}</content>",
)
logger.info("Retrieved the following files: %s", result_paths)
return results

except Exception as e:
logger.error("Failed to access collection at %s", project_root)
raise McpError(
ErrorData(
code=1,
message=f"{e.__class__.__name__}: Failed to access the collection at {project_root}. Use `list_collections` tool to get a list of valid paths for this field.",
)
)
except Exception as e: # pragma: nocover
if isinstance(e, McpError):
logger.error("Failed to access collection at %s", project_root)
raise
else:
raise McpError(
ErrorData(
code=1,
message="\n".join(traceback.format_exception(e)),
)
) from e


async def ls_files(project_root: str) -> list[str]:
Expand Down
147 changes: 83 additions & 64 deletions src/vectorcode/subcommands/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import os
from typing import Any, cast

from chromadb import GetResult, Where
from chromadb import Where
from chromadb.api.models.AsyncCollection import AsyncCollection
from chromadb.api.types import IncludeEnum
from chromadb.api.types import IncludeEnum, QueryResult
from chromadb.errors import InvalidCollectionException, InvalidDimensionException
from tree_sitter import Point

from vectorcode.chunking import StringChunker
from vectorcode.chunking import Chunk, StringChunker
from vectorcode.cli_utils import (
Config,
QueryInclude,
Expand All @@ -22,6 +23,7 @@
get_embedding_function,
verify_ef,
)
from vectorcode.subcommands.query import types as vectorcode_types
from vectorcode.subcommands.query.reranker import (
RerankerError,
get_reranker,
Expand All @@ -30,14 +32,49 @@
logger = logging.getLogger(name=__name__)


def convert_query_results(
chroma_result: QueryResult, queries: list[str]
) -> list[vectorcode_types.QueryResult]:
"""Convert chromadb query result to in-house query results"""
assert chroma_result["documents"] is not None
assert chroma_result["distances"] is not None
assert chroma_result["metadatas"] is not None
assert chroma_result["ids"] is not None

chroma_results_list: list[vectorcode_types.QueryResult] = []
for q_i in range(len(queries)):
q = queries[q_i]
documents = chroma_result["documents"][q_i]
distances = chroma_result["distances"][q_i]
metadatas = chroma_result["metadatas"][q_i]
ids = chroma_result["ids"][q_i]
for doc, dist, meta, _id in zip(documents, distances, metadatas, ids):
chunk = Chunk(text=doc, id=_id)
if meta.get("start"):
chunk.start = Point(int(meta.get("start", 0)), 0)
if meta.get("end"):
chunk.end = Point(int(meta.get("end", 0)), 0)
if meta.get("path"):
chunk.path = str(meta["path"])
chroma_results_list.append(
vectorcode_types.QueryResult(
chunk=chunk,
path=str(meta.get("path", "")),
query=(q,),
scores=(-dist,),
)
)
return chroma_results_list


async def get_query_result_files(
collection: AsyncCollection, configs: Config
) -> list[str]:
) -> list[str | Chunk]:
query_chunks = []
if configs.query:
chunker = StringChunker(configs)
for q in configs.query:
query_chunks.extend(str(i) for i in chunker.chunk(q))
assert configs.query, "Query messages cannot be empty."
chunker = StringChunker(configs)
for q in configs.query:
query_chunks.extend(str(i) for i in chunker.chunk(q))

configs.query_exclude = [
expand_path(i, True)
Expand Down Expand Up @@ -70,7 +107,7 @@ async def get_query_result_files(
query_embeddings = get_embedding_function(configs)(query_chunks)
if isinstance(configs.embedding_dims, int) and configs.embedding_dims > 0:
query_embeddings = [e[: configs.embedding_dims] for e in query_embeddings]
results = await collection.query(
chroma_query_results: QueryResult = await collection.query(
query_embeddings=query_embeddings,
n_results=num_query,
include=[
Expand All @@ -85,69 +122,51 @@ async def get_query_result_files(
return []

reranker = get_reranker(configs)
return await reranker.rerank(results)
return await reranker.rerank(
convert_query_results(chroma_query_results, configs.query)
)


async def build_query_results(
collection: AsyncCollection, configs: Config
) -> list[dict[str, str | int]]:
structured_result = []
for identifier in await get_query_result_files(collection, configs):
if os.path.isfile(identifier):
if configs.use_absolute_path:
output_path = os.path.abspath(identifier)
else:
output_path = os.path.relpath(identifier, configs.project_root)
full_result = {"path": output_path}
with open(identifier) as fin:
document = fin.read()
full_result["document"] = document
assert configs.project_root

structured_result.append(
{str(key): full_result[str(key)] for key in configs.include}
)
elif QueryInclude.chunk in configs.include:
chunks: GetResult = await collection.get(
identifier, include=[IncludeEnum.metadatas, IncludeEnum.documents]
)
meta = chunks.get(
"metadatas",
)
if meta is not None and len(meta) != 0:
chunk_texts = chunks.get("documents")
assert chunk_texts is not None, (
"QueryResult does not contain `documents`!"
)
full_result: dict[str, str | int] = {
"chunk": str(chunk_texts[0]),
"chunk_id": identifier,
}
if meta[0].get("start") is not None and meta[0].get("end") is not None:
path = str(meta[0].get("path"))
with open(path) as fin:
start: int = int(meta[0]["start"])
end: int = int(meta[0]["end"])
full_result["chunk"] = "".join(fin.readlines()[start : end + 1])
full_result["start_line"] = start
full_result["end_line"] = end
if QueryInclude.path in configs.include:
full_result["path"] = str(
meta[0]["path"]
if configs.use_absolute_path
else os.path.relpath(
str(meta[0]["path"]), str(configs.project_root)
)
)

structured_result.append(full_result)
else: # pragma: nocover
logger.error(
"This collection doesn't support chunk-mode output because it lacks the necessary metadata. Please re-vectorise it.",
)
def make_output_path(path: str, absolute: bool) -> str:
if absolute:
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(str(configs.project_root), path))
else:
rel_path = os.path.relpath(path, configs.project_root)
if isinstance(rel_path, bytes): # pragma: nocover
# for some reasons, some python versions report that `os.path.relpath` returns a string.
rel_path = rel_path.decode()
return rel_path

structured_result = []
for res in await get_query_result_files(collection, configs):
if isinstance(res, str):
output_path = make_output_path(res, configs.use_absolute_path)
io_path = make_output_path(res, True)
if not os.path.isfile(io_path):
logger.warning(f"{io_path} is no longer a valid file.")
continue
with open(io_path) as fin:
structured_result.append({"path": output_path, "document": fin.read()})
else:
logger.warning(
f"{identifier} is no longer a valid file! Please re-run vectorcode vectorise to refresh the database.",
res = cast(Chunk, res)
assert res.path, f"{res} has no `path` attribute."
structured_result.append(
{
"path": make_output_path(res.path, configs.use_absolute_path)
if res.path is not None
else None,
"chunk": res.text,
"start_line": res.start.row if res.start is not None else None,
"end_line": res.end.row if res.end is not None else None,
"chunk_id": res.id,
}
)
for result in structured_result:
if result.get("path") is not None:
Expand Down
Loading
Loading