diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 16ec2291..1af1a334 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -388,24 +388,42 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: f"deleted={len(deleted_files)}" ) - for file_path in file_paths: + # Separate cached from changed files to allow concurrent VLM calls + pending_indices: List[Tuple[int, str]] = [] + file_summaries: List[Optional[Dict[str, str]]] = [None] * len(file_paths) + + for idx, file_path in enumerate(file_paths): file_name = file_path.split("/")[-1] if file_path not in changed_files and file_name in existing_summaries: - file_summaries.append({"name": file_name, "summary": existing_summaries[file_name]}) + file_summaries[idx] = {"name": file_name, "summary": existing_summaries[file_name]} logger.debug(f"Reused existing summary for {file_name}") else: + pending_indices.append((idx, file_path)) + + if pending_indices: + logger.info( + f"Generating summaries for {len(pending_indices)} changed files concurrently " + f"(reused {len(file_paths) - len(pending_indices)} cached)" + ) + + async def _gen(idx: int, file_path: str) -> None: + file_name = file_path.split("/")[-1] try: summary_dict = await self._generate_single_file_summary( file_path, llm_sem=llm_sem, ctx=ctx ) - file_summaries.append(summary_dict) + file_summaries[idx] = summary_dict logger.debug(f"Generated summary for {file_name}") except Exception as e: logger.warning(f"Failed to generate summary for {file_path}: {e}") - file_summaries.append({"name": file_name, "summary": ""}) + file_summaries[idx] = {"name": file_name, "summary": ""} + + await asyncio.gather(*[_gen(i, fp) for i, fp in pending_indices]) - overview = await self._generate_overview(dir_uri, file_summaries, []) + file_summaries = [s for s in file_summaries if s is not None] + + overview = await self._generate_overview(dir_uri, file_summaries, [], llm_sem=llm_sem) abstract = self._extract_abstract_from_overview(overview) overview, abstract = self._enforce_size_limits(overview, abstract) @@ -816,6 +834,7 @@ async def _generate_overview( dir_uri: str, file_summaries: List[Dict[str, str]], children_abstracts: List[Dict[str, str]], + llm_sem: Optional[asyncio.Semaphore] = None, ) -> str: """Generate directory's .overview.md (L1). @@ -869,7 +888,11 @@ async def _generate_overview( f"Splitting into batches of {semantic.overview_batch_size}." ) overview = await self._batched_generate_overview( - dir_uri, file_summaries, children_abstracts, file_index_map + dir_uri, + file_summaries, + children_abstracts, + file_index_map, + llm_sem=llm_sem, ) elif over_budget: # Few files but long summaries → truncate summaries to fit budget @@ -948,6 +971,7 @@ async def _batched_generate_overview( file_summaries: List[Dict[str, str]], children_abstracts: List[Dict[str, str]], file_index_map: Dict[int, str], + llm_sem: Optional[asyncio.Semaphore] = None, ) -> str: """Generate overview by batching file summaries and merging. @@ -974,9 +998,13 @@ async def _batched_generate_overview( else "None" ) - # Generate partial overview per batch using global file indices - partial_overviews = [] + # Generate partial overview per batch concurrently using global file indices + if llm_sem is None: + llm_sem = asyncio.Semaphore(self.max_concurrent_llm) + partial_overviews = [None] * len(batches) global_offset = 0 + batch_prompts: List[Tuple[int, str, Dict[int, str]]] = [] + for batch_idx, batch in enumerate(batches): # Build per-batch index map using global offsets batch_lines = [] @@ -991,33 +1019,38 @@ async def _batched_generate_overview( # Include children abstracts in the first batch children_str = children_abstracts_str if batch_idx == 0 else "None" - try: - prompt = render_prompt( - "semantic.overview_generation", - { - "dir_name": dir_name, - "file_summaries": batch_str, - "children_abstracts": children_str, - }, - ) - partial = await vlm.get_completion_async(prompt) + prompt = render_prompt( + "semantic.overview_generation", + { + "dir_name": dir_name, + "file_summaries": batch_str, + "children_abstracts": children_str, + }, + ) + batch_prompts.append((batch_idx, prompt, batch_index_map)) - # Replace [number] references per batch using batch-local map - def make_replacer(idx_map): - def replacer(match): - idx = int(match.group(1)) - return idx_map.get(idx, match.group(0)) + def make_replacer(idx_map): + def replacer(match): + idx = int(match.group(1)) + return idx_map.get(idx, match.group(0)) - return replacer + return replacer + async def _run_batch(batch_idx: int, prompt: str, batch_index_map: Dict[int, str]) -> None: + try: + async with llm_sem: + partial = await vlm.get_completion_async(prompt) partial = re.sub(r"\[(\d+)\]", make_replacer(batch_index_map), partial) - partial_overviews.append(partial.strip()) + partial_overviews[batch_idx] = partial.strip() except Exception as e: logger.warning( f"Failed to generate partial overview batch " f"{batch_idx + 1}/{len(batches)} for {dir_uri}: {e}" ) + await asyncio.gather(*[_run_batch(*bp) for bp in batch_prompts]) + partial_overviews = [p for p in partial_overviews if p is not None] + if not partial_overviews: return f"# {dir_name}\n\nDirectory overview"