Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,10 @@ pyrightconfig.json
# Local roadmap files
ROADMAP.md
agentmemory-roadmap.md

# Hookify rules (personal)
.claude/*.local.md

# Benchmark data and results
benchmarks/data/*.json
benchmarks/results/*/
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ docs: ## Build the documentation
docs-serve: ## Build and serve the documentation
uv run mkdocs serve

.PHONY: benchmark
benchmark: ## Run LongMemEval benchmark (all 3 stages)
uv run python -m benchmarks.longmemeval.run

.PHONY: benchmark-smoke
benchmark-smoke: ## Quick 3-question benchmark sanity check
uv run python -m benchmarks.longmemeval.run --num-questions 3 --run-name smoke --config fast

.PHONY: benchmark-baseline
benchmark-baseline: ## Full baseline benchmark run (concurrent)
uv run python -m benchmarks.longmemeval.run --run-name baseline --max-concurrent 20

.PHONY: all
all: format lint typecheck test ## Run formatting, linting, type checks, and tests

Expand Down
Empty file added benchmarks/__init__.py
Empty file.
Empty file added benchmarks/data/.gitkeep
Empty file.
Empty file.
47 changes: 47 additions & 0 deletions benchmarks/longmemeval/_checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""JSONL checkpoint helpers for crash-safe benchmark runs."""

from __future__ import annotations

import json
from pathlib import Path

RESULTS_DIR = Path(__file__).parent.parent / "results"


def load_completed(jsonl_path: Path) -> set[str]:
"""Load completed question IDs from checkpoint JSONL file."""
completed: set[str] = set()
if not jsonl_path.exists():
return completed
for line in jsonl_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
completed.add(obj["question_id"])
except (json.JSONDecodeError, KeyError):
continue
return completed


def append_jsonl(jsonl_path: Path, result: dict) -> None:
"""Append a single result as a JSONL line (atomic append)."""
with jsonl_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")


def load_all_results(jsonl_path: Path) -> list[dict]:
"""Load all results from checkpoint JSONL file."""
results: list[dict] = []
if not jsonl_path.exists():
return results
for line in jsonl_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
results.append(json.loads(line))
except json.JSONDecodeError:
continue
return results
232 changes: 232 additions & 0 deletions benchmarks/longmemeval/add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""Stage 1: Ingest LongMemEval conversation histories into memv."""

from __future__ import annotations

import argparse
import asyncio
import json
import logging
import time
from datetime import datetime, timezone
from pathlib import Path

from memv.memory.memory import Memory
from memv.models import Message, MessageRole

from ._checkpoint import RESULTS_DIR, append_jsonl, load_all_results, load_completed
from .config import get_config
from .dataset import LongMemEvalQuestion, load_dataset

logger = logging.getLogger(__name__)


def parse_longmemeval_date(date_str: str) -> datetime:
"""Parse LongMemEval date format: '2023/05/20 (Sat) 02:21' → datetime (UTC)."""
try:
dt = datetime.strptime(date_str, "%Y/%m/%d (%a) %H:%M")
return dt.replace(tzinfo=timezone.utc)
except ValueError:
logger.warning("Failed to parse date '%s', using epoch", date_str)
return datetime(2023, 1, 1, tzinfo=timezone.utc)


async def process_question(
question_idx: int,
question_data: LongMemEvalQuestion,
db_dir: Path,
config_name: str,
embedding_client,
llm_client,
) -> dict:
"""Process a single LongMemEval question: ingest all sessions, extract knowledge."""
question_id = question_data.question_id
user_id = f"question_{question_id}"
db_path = str(db_dir / f"{question_id}.db")

config = get_config(config_name)

memory = Memory(
db_path=db_path,
config=config,
embedding_client=embedding_client,
llm_client=llm_client,
enable_embedding_cache=True,
)

start_time = time.monotonic()
total_messages = 0

async with memory:
# Ingest each session
for session, date_str in zip(question_data.haystack_sessions, question_data.haystack_dates, strict=True):
timestamp = parse_longmemeval_date(date_str)
for turn in session:
role = MessageRole.USER if turn["role"] == "user" else MessageRole.ASSISTANT
msg = Message(
user_id=user_id,
role=role,
content=turn["content"],
sent_at=timestamp,
)
await memory.add_message(msg)
total_messages += 1

# Extract knowledge
knowledge_count = await memory.process(user_id)

elapsed = time.monotonic() - start_time

return {
"question_id": question_id,
"question_type": question_data.question_type,
"messages_count": total_messages,
"knowledge_count": knowledge_count,
"sessions_count": len(question_data.haystack_sessions),
"construction_time_s": round(elapsed, 2),
}


async def run(
run_name: str = "baseline",
config_name: str = "default",
data_path: str | None = None,
num_questions: int | None = None,
max_concurrent: int = 5,
timeout: int = 1200,
resume: bool = True,
embedding_client=None,
llm_client=None,
):
"""Run ingestion stage for all questions.

Args:
run_name: Name for this benchmark run.
config_name: Config preset name from config.py.
data_path: Path to dataset JSON (None = default location).
num_questions: Limit number of questions (None = all).
max_concurrent: Max concurrent question processing tasks.
timeout: Per-question timeout in seconds.
resume: Resume from checkpoint if prior results exist.
embedding_client: EmbeddingClient instance.
llm_client: LLMClient instance.
"""
if embedding_client is None or llm_client is None:
raise RuntimeError("embedding_client and llm_client are required. Pass them directly or set up default clients.")

dataset = load_dataset(data_path)
if num_questions is not None:
dataset = dataset[:num_questions]

run_dir = RESULTS_DIR / run_name
db_dir = run_dir / "dbs"
db_dir.mkdir(parents=True, exist_ok=True)

jsonl_path = run_dir / "add.jsonl"

# Load checkpoint
completed_ids = load_completed(jsonl_path) if resume else set()
if not resume and jsonl_path.exists():
jsonl_path.unlink()

remaining = [q for q in dataset if q.question_id not in completed_ids]

print(
f"LongMemEval Add | run={run_name} config={config_name} "
f"questions={len(dataset)} remaining={len(remaining)} concurrent={max_concurrent}"
)
if completed_ids:
print(f" Resuming: {len(completed_ids)} already completed")

semaphore = asyncio.Semaphore(max_concurrent)
completed_count = len(completed_ids)
total_count = len(dataset)

async def process_with_guard(idx: int, question: LongMemEvalQuestion) -> dict | None:
nonlocal completed_count
async with semaphore:
try:
result = await asyncio.wait_for(
process_question(idx, question, db_dir, config_name, embedding_client, llm_client),
timeout=timeout,
)
except asyncio.TimeoutError:
result = {
"question_id": question.question_id,
"question_type": question.question_type,
"error": "timeout",
"construction_time_s": timeout,
}
except Exception as e:
logger.exception("Failed to process question %s", question.question_id)
result = {
"question_id": question.question_id,
"question_type": question.question_type,
"error": str(e),
"construction_time_s": 0,
}

append_jsonl(jsonl_path, result)
completed_count += 1
error = result.get("error")
if error:
print(f" [{completed_count}/{total_count}] {question.question_id} ERROR: {error}")
else:
print(
f" [{completed_count}/{total_count}] {question.question_id} "
f"→ {result['knowledge_count']} facts in {result['construction_time_s']}s"
)
return result

tasks = [process_with_guard(idx, q) for idx, q in enumerate(remaining)]
await asyncio.gather(*tasks)

# Write compatibility JSON from all JSONL results
all_results = load_all_results(jsonl_path)
output_path = run_dir / "add.json"
output_path.write_text(json.dumps(all_results, indent=2), encoding="utf-8")
print(f"\nResults saved to {output_path}")

total_knowledge = sum(r.get("knowledge_count", 0) for r in all_results)
total_time = sum(r.get("construction_time_s", 0) for r in all_results)
print(f"Total: {total_knowledge} facts extracted in {total_time:.1f}s")

return all_results


def _make_clients():
"""Create default OpenAI-based clients for CLI usage."""
from memv.embeddings.openai import OpenAIEmbedAdapter
from memv.llm.pydantic_ai import PydanticAIAdapter

return OpenAIEmbedAdapter(), PydanticAIAdapter()


def main():
parser = argparse.ArgumentParser(description="LongMemEval Stage 1: Ingestion")
parser.add_argument("--run-name", default="baseline", help="Name for this run")
parser.add_argument("--config", default="default", help="Config preset name")
parser.add_argument("--data-path", default=None, help="Path to dataset JSON")
parser.add_argument("--num-questions", type=int, default=None, help="Limit number of questions")
parser.add_argument("--max-concurrent", type=int, default=5, help="Max concurrent question processing")
parser.add_argument("--timeout", type=int, default=1200, help="Per-question timeout in seconds")
parser.add_argument("--no-resume", action="store_true", help="Start fresh, ignore prior checkpoint")
args = parser.parse_args()

embedding_client, llm_client = _make_clients()
asyncio.run(
run(
run_name=args.run_name,
config_name=args.config,
data_path=args.data_path,
num_questions=args.num_questions,
max_concurrent=args.max_concurrent,
timeout=args.timeout,
resume=not args.no_resume,
embedding_client=embedding_client,
llm_client=llm_client,
)
)


if __name__ == "__main__":
main()
33 changes: 33 additions & 0 deletions benchmarks/longmemeval/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Named MemoryConfig presets for LongMemEval benchmark ablations."""

from __future__ import annotations

from memv.config import MemoryConfig

CONFIGS: dict[str, MemoryConfig] = {
"default": MemoryConfig(),
# Fast: skips predict-calibrate, dedup, and merging. For iteration speed only —
# results are NOT comparable to 'default' config.
"fast": MemoryConfig(
max_statements_for_prediction=0,
enable_knowledge_dedup=False,
enable_episode_merging=False,
),
"no_predict_calibrate": MemoryConfig(max_statements_for_prediction=0),
"no_segmentation": MemoryConfig(use_legacy_segmentation=True, segmentation_threshold=9999),
"no_dedup": MemoryConfig(enable_knowledge_dedup=False, enable_episode_merging=False),
}


def get_config(name: str) -> MemoryConfig:
"""Get a named config preset.

Args:
name: One of: default, fast, no_predict_calibrate, no_segmentation, no_dedup.

Returns:
MemoryConfig for the named preset.
"""
if name not in CONFIGS:
raise ValueError(f"Unknown config '{name}'. Available: {', '.join(CONFIGS)}")
return CONFIGS[name]
49 changes: 49 additions & 0 deletions benchmarks/longmemeval/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""LongMemEval dataset loader and Pydantic models."""

from __future__ import annotations

import json
from pathlib import Path

from pydantic import BaseModel, field_validator


class LongMemEvalQuestion(BaseModel):
question_id: str
question_type: str
question: str
answer: str

@field_validator("answer", mode="before")
@classmethod
def _coerce_answer(cls, v: object) -> str:
return str(v)

question_date: str # "2023/05/20 (Sat) 02:21"
haystack_session_ids: list[str]
haystack_dates: list[str]
haystack_sessions: list[list[dict]] # list of sessions, each is list of {role, content}
answer_session_ids: list[str]


DEFAULT_DATA_PATH = Path(__file__).parent.parent / "data" / "longmemeval_s_cleaned.json"


def load_dataset(path: Path | str | None = None) -> list[LongMemEvalQuestion]:
"""Load LongMemEval dataset from JSON file.

Args:
path: Path to longmemeval_s_cleaned.json. Defaults to benchmarks/data/.

Returns:
List of parsed questions.
"""
path = Path(path) if path else DEFAULT_DATA_PATH
if not path.exists():
raise FileNotFoundError(
f"Dataset not found at {path}. Download it with:\n"
f" wget -P benchmarks/data/ "
f"https://huggingface.co/datasets/xiaowu0162/longmemeval-cleaned/resolve/main/longmemeval_s_cleaned.json"
)
raw = json.loads(path.read_text(encoding="utf-8"))
return [LongMemEvalQuestion.model_validate(item) for item in raw]
Loading