diff --git a/src/ai_memory_protocol/cli.py b/src/ai_memory_protocol/cli.py index 3acbaf4..d09ccd8 100644 --- a/src/ai_memory_protocol/cli.py +++ b/src/ai_memory_protocol/cli.py @@ -14,6 +14,7 @@ memory review Show memories due for review memory tags List all tags in use with counts memory stale Show expired or review-overdue memories + memory prune [--yes] Remove deprecated memories from RST files memory rebuild Rebuild needs.json from RST sources """ @@ -389,6 +390,31 @@ def cmd_stale(args: argparse.Namespace) -> None: print(f" > {body_text[:200].replace(chr(10), ' ')}") +def cmd_prune(args: argparse.Namespace) -> None: + """Remove deprecated memories from RST files.""" + from ai_memory_protocol.rst import count_deprecated_in_rst, prune_deprecated_from_rst + + workspace = find_workspace(args.dir) + + deprecated_count = count_deprecated_in_rst(workspace) + if deprecated_count == 0: + print("No deprecated memories to prune.") + return + + if not args.yes: + print(f"Found {deprecated_count} deprecated memories to remove from RST files.") + print("History is preserved in git. Use --yes to confirm.") + return + + count, _removed = prune_deprecated_from_rst(workspace) + print(f"Pruned {count} deprecated memories from RST files.") + if count > 0: + success, msg = run_rebuild(workspace) + print(msg) + if not success: + sys.exit(1) + + def cmd_rebuild(args: argparse.Namespace) -> None: """Rebuild needs.json by running Sphinx build.""" workspace = find_workspace(args.dir) @@ -613,6 +639,11 @@ def build_parser() -> argparse.ArgumentParser: ) p_stale.set_defaults(func=cmd_stale) + # --- prune --- + p_prune = sub.add_parser("prune", help="Remove deprecated memories from RST files") + p_prune.add_argument("--yes", "-y", action="store_true", help="Confirm removal") + p_prune.set_defaults(func=cmd_prune) + # --- rebuild --- p_rebuild = sub.add_parser("rebuild", help="Rebuild needs.json from RST sources") p_rebuild.set_defaults(func=cmd_rebuild) diff --git a/src/ai_memory_protocol/rst.py b/src/ai_memory_protocol/rst.py index ffa09e8..16bb70c 100644 --- a/src/ai_memory_protocol/rst.py +++ b/src/ai_memory_protocol/rst.py @@ -222,6 +222,83 @@ def deprecate_in_rst( return success, msg +def count_deprecated_in_rst(workspace: Path) -> int: + """Count deprecated directive blocks across all RST files.""" + count = 0 + for mem_type in TYPE_FILES: + for rst_path in _find_all_rst_files(workspace, mem_type): + if rst_path.exists(): + content = rst_path.read_text() + count += content.count(":status: deprecated") + return count + + +def prune_deprecated_from_rst(workspace: Path) -> tuple[int, list[str]]: + """Remove all deprecated directive blocks from RST files. + + Returns (count_removed, list of removed IDs). + Git preserves history so no data is truly lost. + """ + removed_count = 0 + removed_ids: list[str] = [] + + for mem_type in TYPE_FILES: + for rst_path in _find_all_rst_files(workspace, mem_type): + if not rst_path.exists(): + continue + content = rst_path.read_text() + if ":status: deprecated" not in content: + continue + + lines = content.split("\n") + new_lines: list[str] = [] + i = 0 + while i < len(lines): + # Detect start of a directive block: ".. type:: Title" + directive_match = re.match(r"^\.\. (\w+):: ", lines[i]) + if not directive_match: + new_lines.append(lines[i]) + i += 1 + continue + + # Collect the full directive block (directive line + indented body) + block_lines = [lines[i]] + i += 1 + while i < len(lines) and (lines[i].startswith(" ") or lines[i].strip() == ""): + # Stop at empty line followed by non-indented content + if lines[i].strip() == "" and ( + i + 1 >= len(lines) + or (lines[i + 1].strip() != "" and not lines[i + 1].startswith(" ")) + ): + block_lines.append(lines[i]) + i += 1 + break + block_lines.append(lines[i]) + i += 1 + + # Check if this block is deprecated + block_text = "\n".join(block_lines) + if ":status: deprecated" in block_text: + removed_count += 1 + id_match = re.search(r":id:\s*(\S+)", block_text) + if id_match: + removed_ids.append(id_match.group(1)) + # Skip this block (don't add to new_lines) + # Also skip trailing blank lines + while i < len(lines) and lines[i].strip() == "": + i += 1 + else: + new_lines.extend(block_lines) + + # Write back cleaned content + cleaned = "\n".join(new_lines) + # Normalize trailing newlines + cleaned = cleaned.rstrip("\n") + "\n" + rst_path.write_text(cleaned) + + return removed_count, removed_ids + + def update_body_in_rst( workspace: Path, need_id: str, diff --git a/tests/test_rst.py b/tests/test_rst.py index 9c4a22d..b275a19 100644 --- a/tests/test_rst.py +++ b/tests/test_rst.py @@ -8,9 +8,11 @@ from ai_memory_protocol.rst import ( add_tags_in_rst, append_to_rst, + count_deprecated_in_rst, deprecate_in_rst, generate_id, generate_rst_directive, + prune_deprecated_from_rst, remove_tags_in_rst, update_field_in_rst, ) @@ -382,3 +384,78 @@ def test_empty_title_rejected(self, tmp_workspace: Path) -> None: ok, msg = update_title_in_rst(tmp_workspace, "FACT_x", "") assert not ok assert "empty" in msg.lower() + + +class TestPruneDeprecated: + def test_removes_only_deprecated(self, tmp_workspace: Path) -> None: + active = generate_rst_directive( + "fact", "Active fact", need_id="FACT_active", tags=["topic:test"], body="Keep me." + ) + deprecated = generate_rst_directive( + "fact", + "Old fact", + need_id="FACT_old", + tags=["topic:test"], + body="Remove me.", + ) + append_to_rst(tmp_workspace, "fact", active) + append_to_rst(tmp_workspace, "fact", deprecated) + deprecate_in_rst(tmp_workspace, "FACT_old") + + count, removed_ids = prune_deprecated_from_rst(tmp_workspace) + + assert count == 1 + assert "FACT_old" in removed_ids + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert "FACT_active" in content + assert "FACT_old" not in content + assert "Keep me." in content + assert "Remove me." not in content + + def test_preserves_file_header(self, tmp_workspace: Path) -> None: + deprecated = generate_rst_directive( + "fact", "Gone", need_id="FACT_gone", tags=["topic:test"], body="Bye." + ) + append_to_rst(tmp_workspace, "fact", deprecated) + deprecate_in_rst(tmp_workspace, "FACT_gone") + + prune_deprecated_from_rst(tmp_workspace) + + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert "Facts" in content # Header preserved + assert "FACT_gone" not in content + + def test_handles_split_files(self, tmp_workspace: Path) -> None: + # Fill main file to trigger split + for i in range(52): + d = generate_rst_directive( + "mem", f"Obs {i}", need_id=f"MEM_obs_{i}", tags=["topic:test"] + ) + append_to_rst(tmp_workspace, "mem", d) + + # Deprecate one in the split file + deprecate_in_rst(tmp_workspace, "MEM_obs_51") + + count, removed_ids = prune_deprecated_from_rst(tmp_workspace) + assert count == 1 + assert "MEM_obs_51" in removed_ids + + def test_count_deprecated(self, tmp_workspace: Path) -> None: + for i in range(3): + d = generate_rst_directive("fact", f"F{i}", need_id=f"FACT_f{i}", tags=["topic:test"]) + append_to_rst(tmp_workspace, "fact", d) + + deprecate_in_rst(tmp_workspace, "FACT_f0") + deprecate_in_rst(tmp_workspace, "FACT_f2") + + assert count_deprecated_in_rst(tmp_workspace) == 2 + + def test_prune_empty_workspace(self, tmp_workspace: Path) -> None: + count, removed_ids = prune_deprecated_from_rst(tmp_workspace) + assert count == 0 + assert removed_ids == [] + + def test_count_zero_when_none_deprecated(self, tmp_workspace: Path) -> None: + d = generate_rst_directive("fact", "Active", need_id="FACT_a", tags=["topic:test"]) + append_to_rst(tmp_workspace, "fact", d) + assert count_deprecated_in_rst(tmp_workspace) == 0