Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,24 +388,42 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None:
f"deleted={len(deleted_files)}"
)

for file_path in file_paths:
# Separate cached from changed files to allow concurrent VLM calls
pending_indices: List[Tuple[int, str]] = []
file_summaries: List[Optional[Dict[str, str]]] = [None] * len(file_paths)

for idx, file_path in enumerate(file_paths):
file_name = file_path.split("/")[-1]

if file_path not in changed_files and file_name in existing_summaries:
file_summaries.append({"name": file_name, "summary": existing_summaries[file_name]})
file_summaries[idx] = {"name": file_name, "summary": existing_summaries[file_name]}
logger.debug(f"Reused existing summary for {file_name}")
else:
pending_indices.append((idx, file_path))

if pending_indices:
logger.info(
f"Generating summaries for {len(pending_indices)} changed files concurrently "
f"(reused {len(file_paths) - len(pending_indices)} cached)"
)

async def _gen(idx: int, file_path: str) -> None:
file_name = file_path.split("/")[-1]
try:
summary_dict = await self._generate_single_file_summary(
file_path, llm_sem=llm_sem, ctx=ctx
)
file_summaries.append(summary_dict)
file_summaries[idx] = summary_dict
logger.debug(f"Generated summary for {file_name}")
except Exception as e:
logger.warning(f"Failed to generate summary for {file_path}: {e}")
file_summaries.append({"name": file_name, "summary": ""})
file_summaries[idx] = {"name": file_name, "summary": ""}

await asyncio.gather(*[_gen(i, fp) for i, fp in pending_indices])

overview = await self._generate_overview(dir_uri, file_summaries, [])
file_summaries = [s for s in file_summaries if s is not None]

overview = await self._generate_overview(dir_uri, file_summaries, [], llm_sem=llm_sem)
abstract = self._extract_abstract_from_overview(overview)
overview, abstract = self._enforce_size_limits(overview, abstract)

Expand Down Expand Up @@ -816,6 +834,7 @@ async def _generate_overview(
dir_uri: str,
file_summaries: List[Dict[str, str]],
children_abstracts: List[Dict[str, str]],
llm_sem: Optional[asyncio.Semaphore] = None,
) -> str:
"""Generate directory's .overview.md (L1).
Expand Down Expand Up @@ -869,7 +888,11 @@ async def _generate_overview(
f"Splitting into batches of {semantic.overview_batch_size}."
)
overview = await self._batched_generate_overview(
dir_uri, file_summaries, children_abstracts, file_index_map
dir_uri,
file_summaries,
children_abstracts,
file_index_map,
llm_sem=llm_sem,
)
elif over_budget:
# Few files but long summaries → truncate summaries to fit budget
Expand Down Expand Up @@ -948,6 +971,7 @@ async def _batched_generate_overview(
file_summaries: List[Dict[str, str]],
children_abstracts: List[Dict[str, str]],
file_index_map: Dict[int, str],
llm_sem: Optional[asyncio.Semaphore] = None,
) -> str:
"""Generate overview by batching file summaries and merging.
Expand All @@ -974,9 +998,13 @@ async def _batched_generate_overview(
else "None"
)

# Generate partial overview per batch using global file indices
partial_overviews = []
# Generate partial overview per batch concurrently using global file indices
if llm_sem is None:
llm_sem = asyncio.Semaphore(self.max_concurrent_llm)
partial_overviews = [None] * len(batches)
global_offset = 0
batch_prompts: List[Tuple[int, str, Dict[int, str]]] = []

for batch_idx, batch in enumerate(batches):
# Build per-batch index map using global offsets
batch_lines = []
Expand All @@ -991,33 +1019,38 @@ async def _batched_generate_overview(
# Include children abstracts in the first batch
children_str = children_abstracts_str if batch_idx == 0 else "None"

try:
prompt = render_prompt(
"semantic.overview_generation",
{
"dir_name": dir_name,
"file_summaries": batch_str,
"children_abstracts": children_str,
},
)
partial = await vlm.get_completion_async(prompt)
prompt = render_prompt(
"semantic.overview_generation",
{
"dir_name": dir_name,
"file_summaries": batch_str,
"children_abstracts": children_str,
},
)
batch_prompts.append((batch_idx, prompt, batch_index_map))

# Replace [number] references per batch using batch-local map
def make_replacer(idx_map):
def replacer(match):
idx = int(match.group(1))
return idx_map.get(idx, match.group(0))
def make_replacer(idx_map):
def replacer(match):
idx = int(match.group(1))
return idx_map.get(idx, match.group(0))

return replacer
return replacer

async def _run_batch(batch_idx: int, prompt: str, batch_index_map: Dict[int, str]) -> None:
try:
async with llm_sem:
partial = await vlm.get_completion_async(prompt)
partial = re.sub(r"\[(\d+)\]", make_replacer(batch_index_map), partial)
partial_overviews.append(partial.strip())
partial_overviews[batch_idx] = partial.strip()
except Exception as e:
logger.warning(
f"Failed to generate partial overview batch "
f"{batch_idx + 1}/{len(batches)} for {dir_uri}: {e}"
)

await asyncio.gather(*[_run_batch(*bp) for bp in batch_prompts])
partial_overviews = [p for p in partial_overviews if p is not None]

if not partial_overviews:
return f"# {dir_name}\n\nDirectory overview"

Expand Down
Loading