From 2bdddbb76a73cb1b4da22362f4ecc9835eb8ea16 Mon Sep 17 00:00:00 2001 From: Ahmed Hesham Abdelkader <23265119+ahmedhesham6@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:55:02 +0200 Subject: [PATCH 1/2] perf(semantic): run batch overview generation and file summaries concurrently The semantic processor generates directory overviews by splitting large directories into batches of 50 files and calling the VLM for each batch. Previously, both file summary generation in _process_memory_directory and batch overview generation in _batched_generate_overview ran sequentially, causing directories with 1000+ files to take 15+ minutes as each VLM call blocked the next. This change runs both operations concurrently using asyncio.gather, bounded by the existing max_concurrent_llm semaphore: - _process_memory_directory: changed files now generate summaries in parallel instead of awaiting each one sequentially. Cached summaries are still reused for unchanged files. - _batched_generate_overview: all batch prompts are pre-built, then dispatched concurrently via asyncio.gather with the llm semaphore controlling concurrency. Batch ordering is preserved via indexed list. With max_concurrent_llm=20, a 1000-file directory that previously took ~15 minutes for the batch step now completes in ~23 seconds (~40x improvement). The final merge step remains sequential as it depends on all batches completing. --- .../storage/queuefs/semantic_processor.py | 73 +++++++++++++------ 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 16ec2291..858e7aa7 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -388,22 +388,40 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: f"deleted={len(deleted_files)}" ) - for file_path in file_paths: + # Separate cached from changed files to allow concurrent VLM calls + pending_indices: List[Tuple[int, str]] = [] + file_summaries: List[Optional[Dict[str, str]]] = [None] * len(file_paths) + + for idx, file_path in enumerate(file_paths): file_name = file_path.split("/")[-1] if file_path not in changed_files and file_name in existing_summaries: - file_summaries.append({"name": file_name, "summary": existing_summaries[file_name]}) + file_summaries[idx] = {"name": file_name, "summary": existing_summaries[file_name]} logger.debug(f"Reused existing summary for {file_name}") else: + pending_indices.append((idx, file_path)) + + if pending_indices: + logger.info( + f"Generating summaries for {len(pending_indices)} changed files concurrently " + f"(reused {len(file_paths) - len(pending_indices)} cached)" + ) + + async def _gen(idx: int, file_path: str) -> None: + file_name = file_path.split("/")[-1] try: summary_dict = await self._generate_single_file_summary( file_path, llm_sem=llm_sem, ctx=ctx ) - file_summaries.append(summary_dict) + file_summaries[idx] = summary_dict logger.debug(f"Generated summary for {file_name}") except Exception as e: logger.warning(f"Failed to generate summary for {file_path}: {e}") - file_summaries.append({"name": file_name, "summary": ""}) + file_summaries[idx] = {"name": file_name, "summary": ""} + + await asyncio.gather(*[_gen(i, fp) for i, fp in pending_indices]) + + file_summaries = [s for s in file_summaries if s is not None] overview = await self._generate_overview(dir_uri, file_summaries, []) abstract = self._extract_abstract_from_overview(overview) @@ -974,9 +992,12 @@ async def _batched_generate_overview( else "None" ) - # Generate partial overview per batch using global file indices - partial_overviews = [] + # Generate partial overview per batch concurrently using global file indices + llm_sem = asyncio.Semaphore(self.max_concurrent_llm) + partial_overviews = [None] * len(batches) global_offset = 0 + batch_prompts: List[Tuple[int, str, Dict[int, str]]] = [] + for batch_idx, batch in enumerate(batches): # Build per-batch index map using global offsets batch_lines = [] @@ -991,33 +1012,37 @@ async def _batched_generate_overview( # Include children abstracts in the first batch children_str = children_abstracts_str if batch_idx == 0 else "None" - try: - prompt = render_prompt( - "semantic.overview_generation", - { - "dir_name": dir_name, - "file_summaries": batch_str, - "children_abstracts": children_str, - }, - ) - partial = await vlm.get_completion_async(prompt) - - # Replace [number] references per batch using batch-local map - def make_replacer(idx_map): - def replacer(match): - idx = int(match.group(1)) - return idx_map.get(idx, match.group(0)) + prompt = render_prompt( + "semantic.overview_generation", + { + "dir_name": dir_name, + "file_summaries": batch_str, + "children_abstracts": children_str, + }, + ) + batch_prompts.append((batch_idx, prompt, batch_index_map)) - return replacer + def make_replacer(idx_map): + def replacer(match): + idx = int(match.group(1)) + return idx_map.get(idx, match.group(0)) + return replacer + async def _run_batch(batch_idx: int, prompt: str, batch_index_map: Dict[int, str]) -> None: + try: + async with llm_sem: + partial = await vlm.get_completion_async(prompt) partial = re.sub(r"\[(\d+)\]", make_replacer(batch_index_map), partial) - partial_overviews.append(partial.strip()) + partial_overviews[batch_idx] = partial.strip() except Exception as e: logger.warning( f"Failed to generate partial overview batch " f"{batch_idx + 1}/{len(batches)} for {dir_uri}: {e}" ) + await asyncio.gather(*[_run_batch(*bp) for bp in batch_prompts]) + partial_overviews = [p for p in partial_overviews if p is not None] + if not partial_overviews: return f"# {dir_name}\n\nDirectory overview" From de76064143acd72f39505942d9620378f5d8b298 Mon Sep 17 00:00:00 2001 From: Ahmed Hesham Abdelkader <23265119+ahmedhesham6@users.noreply.github.com> Date: Sun, 22 Mar 2026 11:23:29 +0200 Subject: [PATCH 2/2] fix(semantic): share LLM semaphore across overview pipeline and fix formatting Thread llm_sem through _generate_overview and _batched_generate_overview so callers can share a single semaphore across the full pipeline, preventing concurrent calls from exceeding the intended concurrency limit. --- openviking/storage/queuefs/semantic_processor.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 858e7aa7..1af1a334 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -423,7 +423,7 @@ async def _gen(idx: int, file_path: str) -> None: file_summaries = [s for s in file_summaries if s is not None] - overview = await self._generate_overview(dir_uri, file_summaries, []) + overview = await self._generate_overview(dir_uri, file_summaries, [], llm_sem=llm_sem) abstract = self._extract_abstract_from_overview(overview) overview, abstract = self._enforce_size_limits(overview, abstract) @@ -834,6 +834,7 @@ async def _generate_overview( dir_uri: str, file_summaries: List[Dict[str, str]], children_abstracts: List[Dict[str, str]], + llm_sem: Optional[asyncio.Semaphore] = None, ) -> str: """Generate directory's .overview.md (L1). @@ -887,7 +888,11 @@ async def _generate_overview( f"Splitting into batches of {semantic.overview_batch_size}." ) overview = await self._batched_generate_overview( - dir_uri, file_summaries, children_abstracts, file_index_map + dir_uri, + file_summaries, + children_abstracts, + file_index_map, + llm_sem=llm_sem, ) elif over_budget: # Few files but long summaries → truncate summaries to fit budget @@ -966,6 +971,7 @@ async def _batched_generate_overview( file_summaries: List[Dict[str, str]], children_abstracts: List[Dict[str, str]], file_index_map: Dict[int, str], + llm_sem: Optional[asyncio.Semaphore] = None, ) -> str: """Generate overview by batching file summaries and merging. @@ -993,7 +999,8 @@ async def _batched_generate_overview( ) # Generate partial overview per batch concurrently using global file indices - llm_sem = asyncio.Semaphore(self.max_concurrent_llm) + if llm_sem is None: + llm_sem = asyncio.Semaphore(self.max_concurrent_llm) partial_overviews = [None] * len(batches) global_offset = 0 batch_prompts: List[Tuple[int, str, Dict[int, str]]] = [] @@ -1026,6 +1033,7 @@ def make_replacer(idx_map): def replacer(match): idx = int(match.group(1)) return idx_map.get(idx, match.group(0)) + return replacer async def _run_batch(batch_idx: int, prompt: str, batch_index_map: Dict[int, str]) -> None: