diff --git a/openviking/storage/queuefs/semantic_queue.py b/openviking/storage/queuefs/semantic_queue.py index 4f412d9c..cafc8e8f 100644 --- a/openviking/storage/queuefs/semantic_queue.py +++ b/openviking/storage/queuefs/semantic_queue.py @@ -1,7 +1,9 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 -"""SemanticQueue: Semantic extraction queue.""" +"""Semantic extraction queue.""" +import threading +import time from typing import Optional from openviking_cli.utils.logger import get_logger @@ -11,12 +13,44 @@ logger = get_logger(__name__) +# Coalesce rapid re-enqueues for the same memory parent directory (github #769). +_MEMORY_PARENT_SEMANTIC_DEDUPE_SEC = 45.0 + class SemanticQueue(NamedQueue): """Semantic extraction queue for async generation of .abstract.md and .overview.md.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._memory_parent_semantic_last: dict[str, float] = {} + self._memory_parent_semantic_lock = threading.Lock() + + @staticmethod + def _memory_parent_semantic_key(msg: SemanticMsg) -> str: + return f"{msg.account_id}|{msg.user_id}|{msg.agent_id}|{msg.uri}" + async def enqueue(self, msg: SemanticMsg) -> str: """Serialize SemanticMsg object and store in queue.""" + if msg.context_type == "memory": + key = self._memory_parent_semantic_key(msg) + now = time.monotonic() + with self._memory_parent_semantic_lock: + last = self._memory_parent_semantic_last.get(key, 0.0) + if now - last < _MEMORY_PARENT_SEMANTIC_DEDUPE_SEC: + logger.debug( + "[SemanticQueue] Skipping duplicate memory semantic enqueue for %s " + "(within %.0fs dedupe window; see #769)", + msg.uri, + _MEMORY_PARENT_SEMANTIC_DEDUPE_SEC, + ) + return "deduplicated" + self._memory_parent_semantic_last[key] = now + if len(self._memory_parent_semantic_last) > 2000: + cutoff = now - (_MEMORY_PARENT_SEMANTIC_DEDUPE_SEC * 4) + stale = [k for k, t in self._memory_parent_semantic_last.items() if t < cutoff] + for k in stale[:800]: + self._memory_parent_semantic_last.pop(k, None) + return await super().enqueue(msg.to_dict()) async def dequeue(self) -> Optional[SemanticMsg]: @@ -39,7 +73,7 @@ async def dequeue(self) -> Optional[SemanticMsg]: return None async def peek(self) -> Optional[SemanticMsg]: - """Peek at queue head message.""" + """Peek at message from queue.""" data_dict = await super().peek() if not data_dict: return None diff --git a/tests/storage/test_semantic_queue_memory_dedupe.py b/tests/storage/test_semantic_queue_memory_dedupe.py new file mode 100644 index 00000000..5e66e914 --- /dev/null +++ b/tests/storage/test_semantic_queue_memory_dedupe.py @@ -0,0 +1,72 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for memory-context semantic enqueue deduplication (#769).""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from openviking.storage.queuefs.named_queue import NamedQueue +from openviking.storage.queuefs.semantic_msg import SemanticMsg +from openviking.storage.queuefs.semantic_queue import SemanticQueue + + +@pytest.mark.asyncio +async def test_memory_semantic_enqueue_deduped_within_window(): + mock_agfs = MagicMock() + with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: + named_enqueue.return_value = "queued-id" + q = SemanticQueue(mock_agfs, "/queue", "semantic") + msg = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + account_id="acc", + user_id="u1", + agent_id="a1", + ) + r1 = await q.enqueue(msg) + r2 = await q.enqueue( + SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + account_id="acc", + user_id="u1", + agent_id="a1", + ) + ) + assert r1 == "queued-id" + assert r2 == "deduplicated" + assert named_enqueue.call_count == 1 + + +@pytest.mark.asyncio +async def test_memory_semantic_enqueue_different_uri_not_deduped(): + mock_agfs = MagicMock() + with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: + named_enqueue.return_value = "queued-id" + q = SemanticQueue(mock_agfs, "/queue", "semantic") + await q.enqueue( + SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + ) + ) + await q.enqueue( + SemanticMsg( + uri="viking://user/default/memories/patterns", + context_type="memory", + ) + ) + assert named_enqueue.call_count == 2 + + +@pytest.mark.asyncio +async def test_non_memory_context_not_deduped(): + mock_agfs = MagicMock() + with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: + named_enqueue.return_value = "queued-id" + q = SemanticQueue(mock_agfs, "/queue", "semantic") + uri = "viking://resources/docs" + await q.enqueue(SemanticMsg(uri=uri, context_type="resource")) + await q.enqueue(SemanticMsg(uri=uri, context_type="resource")) + assert named_enqueue.call_count == 2