From 65ee5a7b37b186999f07d996c1ab5d60fd440d02 Mon Sep 17 00:00:00 2001 From: sliverp <870080352@qq.com> Date: Fri, 20 Mar 2026 14:03:54 +0800 Subject: [PATCH] =?UTF-8?q?fix(session):=20avoid=20O(n=C2=B2)=20semantic?= =?UTF-8?q?=20re-processing=20on=20memory=20commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When SessionCompressor successfully extracts memories it already enqueues incremental SemanticMsg(s) with per-file change sets via _flush_semantic_operations(). The session.commit_async() fallback was unconditionally enqueueing a *second* SemanticMsg without any change info, causing the semantic processor to re-summarise and re-vectorise every file in the memory directory on every commit. Cost impact: cumulative token usage grew as O(n²) with memory count — 500 memories produced ~250K embedding tokens/day with 2000+ rate-limit retries. Changes: 1. session.py: Only enqueue fallback SemanticMsg when compressor is absent or extracted 0 memories. 2. semantic_processor.py: Always try to load existing overview.md summaries regardless of whether msg.changes is set, so even the fallback path can skip unchanged files. 3. Add 4 unit tests covering both paths. Closes #505 Ref #744 --- openviking/session/session.py | 45 ++-- .../storage/queuefs/semantic_processor.py | 24 +- .../unit/session/test_incremental_semantic.py | 249 ++++++++++++++++++ 3 files changed, 292 insertions(+), 26 deletions(-) create mode 100644 tests/unit/session/test_incremental_semantic.py diff --git a/openviking/session/session.py b/openviking/session/session.py index bdb6500b..a1d088af 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -306,22 +306,35 @@ async def commit_async(self) -> Dict[str, Any]: await self._write_to_agfs_async(self._messages) await self._write_relations_async() - # Enqueue semantic processing directly - from openviking.storage.queuefs import get_queue_manager - from openviking.storage.queuefs.semantic_msg import SemanticMsg - - queue_manager = get_queue_manager() - if queue_manager: - msg = SemanticMsg( - uri=self._session_uri, - context_type="memory", - account_id=self.ctx.account_id, - user_id=self.ctx.user.user_id, - agent_id=self.ctx.user.agent_id, - role=self.ctx.role.value, - ) - semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC) - await semantic_queue.enqueue(msg) + # Enqueue semantic processing only when compressor did NOT flush + # its own incremental SemanticMsg(s) — i.e. when no compressor is + # configured or when extraction produced zero memories. + # When the compressor runs successfully it already enqueues + # SemanticMsg(s) with per-file change sets via + # _flush_semantic_operations(), so a second (full-directory) + # SemanticMsg here would trigger an O(n²) re-summarisation of every + # file in the memory directory. See: + # https://github.com/volcengine/OpenViking/issues/505 + compressor_flushed = ( + self._session_compressor is not None + and result.get("memories_extracted", 0) > 0 + ) + if not compressor_flushed: + from openviking.storage.queuefs import get_queue_manager + from openviking.storage.queuefs.semantic_msg import SemanticMsg + + queue_manager = get_queue_manager() + if queue_manager: + msg = SemanticMsg( + uri=self._session_uri, + context_type="memory", + account_id=self.ctx.account_id, + user_id=self.ctx.user.user_id, + agent_id=self.ctx.user.agent_id, + role=self.ctx.role.value, + ) + semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC) + await semantic_queue.enqueue(msg) redo_log.mark_done(task_id) diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 16ec2291..c6ce75c9 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -366,16 +366,20 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: file_summaries: List[Dict[str, str]] = [] existing_summaries: Dict[str, str] = {} - if msg.changes: - try: - old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) - if old_overview: - existing_summaries = self._parse_overview_md(old_overview) - logger.info( - f"Parsed {len(existing_summaries)} existing summaries from overview.md" - ) - except Exception as e: - logger.debug(f"No existing overview.md found for {dir_uri}: {e}") + # Always try to load existing summaries from overview.md so we can + # skip re-summarising files that haven't changed. Previously this + # block only ran when msg.changes was set, causing a full O(n²) + # re-summarisation on every commit via the session fallback path. + # See: https://github.com/volcengine/OpenViking/issues/505 + try: + old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) + if old_overview: + existing_summaries = self._parse_overview_md(old_overview) + logger.info( + f"Parsed {len(existing_summaries)} existing summaries from overview.md" + ) + except Exception as e: + logger.debug(f"No existing overview.md found for {dir_uri}: {e}") changed_files: Set[str] = set() if msg.changes: diff --git a/tests/unit/session/test_incremental_semantic.py b/tests/unit/session/test_incremental_semantic.py new file mode 100644 index 00000000..dae6fbd5 --- /dev/null +++ b/tests/unit/session/test_incremental_semantic.py @@ -0,0 +1,249 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for incremental semantic processing on session commit. + +Verifies the fix for https://github.com/volcengine/OpenViking/issues/505: +When a SessionCompressor successfully extracts memories and flushes its own +incremental SemanticMsg(s) with per-file change sets, the session.commit() +fallback must NOT enqueue an additional full-directory SemanticMsg that +triggers an O(n²) re-summarisation of every file. +""" + +import asyncio +from typing import Any, Dict, List, Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from openviking.storage.queuefs.semantic_msg import SemanticMsg + + +class FakeSemanticQueue: + """In-memory queue that records enqueued messages.""" + + def __init__(self): + self.messages: List[SemanticMsg] = [] + + async def enqueue(self, msg: SemanticMsg) -> str: + self.messages.append(msg) + return msg.id + + +class FakeQueueManager: + """Minimal queue manager stub.""" + + SEMANTIC = "semantic" + + def __init__(self): + self._queues: Dict[str, FakeSemanticQueue] = {} + + def get_queue(self, name: str, allow_create: bool = False) -> FakeSemanticQueue: + if name not in self._queues: + self._queues[name] = FakeSemanticQueue() + return self._queues[name] + + +# --------------------------------------------------------------------------- +# Test: session.commit_async should NOT enqueue a second SemanticMsg when +# the compressor already flushed incremental messages. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_commit_skips_fallback_semantic_when_compressor_flushed(): + """When compressor extracts memories, session.commit should not re-enqueue.""" + from openviking.session.session import Session + + # Build a minimal Session with mocked internals + session = Session.__new__(Session) + session._session_uri = "viking://memories/user/default/sessions/test" + session._messages = [MagicMock()] # non-empty so commit proceeds + session._compression = MagicMock() + session._compression.compression_index = 0 + session._stats = MagicMock() + session._stats.memories_extracted = 0 + session._stats.total_turns = 0 + session._stats.contexts_used = 0 + session._stats.skills_used = 0 + + # Mock ctx + session.ctx = MagicMock() + session.ctx.account_id = "default" + session.ctx.user.user_id = "default" + session.ctx.user.agent_id = "default" + session.ctx.role.value = "root" + session.user = MagicMock() + session.session_id = "test" + + # Mock compressor that "successfully" extracts 3 memories + mock_compressor = AsyncMock() + mock_compressor.extract_long_term_memories = AsyncMock( + return_value=[MagicMock(), MagicMock(), MagicMock()] + ) + session._session_compressor = mock_compressor + + # Mock internal methods + session._generate_archive_summary_async = AsyncMock(return_value="summary") + session._extract_abstract_from_summary = MagicMock(return_value="abstract") + session._write_archive_async = AsyncMock() + session._write_to_agfs_async = AsyncMock() + session._write_relations_async = AsyncMock() + session._update_active_counts_async = AsyncMock(return_value=0) + session._vikingdb_manager = None + + # Set up queue manager + fake_qm = FakeQueueManager() + + # Mock redo log + mock_redo = MagicMock() + mock_redo.write_pending = MagicMock() + mock_redo.mark_done = MagicMock() + + with ( + patch("openviking.session.session.get_lock_manager") as mock_lock_mgr, + patch("openviking.session.session.get_current_telemetry") as mock_telem, + patch("openviking.storage.queuefs.get_queue_manager", return_value=fake_qm), + ): + mock_lock_mgr.return_value.redo_log = mock_redo + mock_telem.return_value.set = MagicMock() + + result = await session.commit_async() + + # The compressor extracted 3 memories, so the session should NOT have + # enqueued any additional SemanticMsg. + semantic_queue = fake_qm.get_queue("semantic") + assert len(semantic_queue.messages) == 0, ( + f"Expected 0 SemanticMsg from session fallback, got {len(semantic_queue.messages)}. " + "The compressor already flushed incremental messages." + ) + assert result["memories_extracted"] == 3 + + +@pytest.mark.asyncio +async def test_commit_enqueues_fallback_semantic_when_no_compressor(): + """When no compressor is configured, session.commit should enqueue fallback.""" + from openviking.session.session import Session + + session = Session.__new__(Session) + session._session_uri = "viking://memories/user/default/sessions/test" + session._messages = [MagicMock()] + session._compression = MagicMock() + session._compression.compression_index = 0 + session._stats = MagicMock() + session._stats.memories_extracted = 0 + session._stats.total_turns = 0 + session._stats.contexts_used = 0 + session._stats.skills_used = 0 + + session.ctx = MagicMock() + session.ctx.account_id = "default" + session.ctx.user.user_id = "default" + session.ctx.user.agent_id = "default" + session.ctx.role.value = "root" + session.user = MagicMock() + session.session_id = "test" + + # No compressor configured + session._session_compressor = None + + session._generate_archive_summary_async = AsyncMock(return_value="summary") + session._extract_abstract_from_summary = MagicMock(return_value="abstract") + session._write_archive_async = AsyncMock() + session._write_to_agfs_async = AsyncMock() + session._write_relations_async = AsyncMock() + session._update_active_counts_async = AsyncMock(return_value=0) + session._vikingdb_manager = None + + fake_qm = FakeQueueManager() + mock_redo = MagicMock() + mock_redo.write_pending = MagicMock() + mock_redo.mark_done = MagicMock() + + with ( + patch("openviking.session.session.get_lock_manager") as mock_lock_mgr, + patch("openviking.session.session.get_current_telemetry") as mock_telem, + patch("openviking.storage.queuefs.get_queue_manager", return_value=fake_qm), + ): + mock_lock_mgr.return_value.redo_log = mock_redo + mock_telem.return_value.set = MagicMock() + + result = await session.commit_async() + + # No compressor → session should enqueue a fallback SemanticMsg + semantic_queue = fake_qm.get_queue("semantic") + assert len(semantic_queue.messages) == 1, ( + f"Expected 1 fallback SemanticMsg, got {len(semantic_queue.messages)}" + ) + msg = semantic_queue.messages[0] + assert msg.context_type == "memory" + assert msg.uri == session._session_uri + + +@pytest.mark.asyncio +async def test_commit_enqueues_fallback_when_compressor_extracts_zero(): + """When compressor extracts 0 memories, session should enqueue fallback.""" + from openviking.session.session import Session + + session = Session.__new__(Session) + session._session_uri = "viking://memories/user/default/sessions/test" + session._messages = [MagicMock()] + session._compression = MagicMock() + session._compression.compression_index = 0 + session._stats = MagicMock() + session._stats.memories_extracted = 0 + session._stats.total_turns = 0 + session._stats.contexts_used = 0 + session._stats.skills_used = 0 + + session.ctx = MagicMock() + session.ctx.account_id = "default" + session.ctx.user.user_id = "default" + session.ctx.user.agent_id = "default" + session.ctx.role.value = "root" + session.user = MagicMock() + session.session_id = "test" + + # Compressor returns 0 memories + mock_compressor = AsyncMock() + mock_compressor.extract_long_term_memories = AsyncMock(return_value=[]) + session._session_compressor = mock_compressor + + session._generate_archive_summary_async = AsyncMock(return_value="summary") + session._extract_abstract_from_summary = MagicMock(return_value="abstract") + session._write_archive_async = AsyncMock() + session._write_to_agfs_async = AsyncMock() + session._write_relations_async = AsyncMock() + session._update_active_counts_async = AsyncMock(return_value=0) + session._vikingdb_manager = None + + fake_qm = FakeQueueManager() + mock_redo = MagicMock() + mock_redo.write_pending = MagicMock() + mock_redo.mark_done = MagicMock() + + with ( + patch("openviking.session.session.get_lock_manager") as mock_lock_mgr, + patch("openviking.session.session.get_current_telemetry") as mock_telem, + patch("openviking.storage.queuefs.get_queue_manager", return_value=fake_qm), + ): + mock_lock_mgr.return_value.redo_log = mock_redo + mock_telem.return_value.set = MagicMock() + + result = await session.commit_async() + + # Compressor returned empty → session should enqueue fallback + semantic_queue = fake_qm.get_queue("semantic") + assert len(semantic_queue.messages) == 1 + + +# --------------------------------------------------------------------------- +# Test: semantic_processor should reuse existing summaries even without +# explicit changes dict +# --------------------------------------------------------------------------- + + +def test_semantic_msg_changes_none_by_default(): + """SemanticMsg should default changes to None.""" + msg = SemanticMsg(uri="viking://test", context_type="memory") + assert msg.changes is None + assert msg.recursive is True