Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/ai_memory_protocol/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
memory review Show memories due for review
memory tags List all tags in use with counts
memory stale Show expired or review-overdue memories
memory prune [--yes] Remove deprecated memories from RST files
memory rebuild Rebuild needs.json from RST sources
"""

Expand Down Expand Up @@ -389,6 +390,31 @@ def cmd_stale(args: argparse.Namespace) -> None:
print(f" > {body_text[:200].replace(chr(10), ' ')}")


def cmd_prune(args: argparse.Namespace) -> None:
"""Remove deprecated memories from RST files."""
from ai_memory_protocol.rst import count_deprecated_in_rst, prune_deprecated_from_rst

workspace = find_workspace(args.dir)

deprecated_count = count_deprecated_in_rst(workspace)
if deprecated_count == 0:
print("No deprecated memories to prune.")
return

if not args.yes:
print(f"Found {deprecated_count} deprecated memories to remove from RST files.")
print("History is preserved in git. Use --yes to confirm.")
return

count, _removed = prune_deprecated_from_rst(workspace)
print(f"Pruned {count} deprecated memories from RST files.")
if count > 0:
success, msg = run_rebuild(workspace)
print(msg)
if not success:
sys.exit(1)


def cmd_rebuild(args: argparse.Namespace) -> None:
"""Rebuild needs.json by running Sphinx build."""
workspace = find_workspace(args.dir)
Expand Down Expand Up @@ -613,6 +639,11 @@ def build_parser() -> argparse.ArgumentParser:
)
p_stale.set_defaults(func=cmd_stale)

# --- prune ---
p_prune = sub.add_parser("prune", help="Remove deprecated memories from RST files")
p_prune.add_argument("--yes", "-y", action="store_true", help="Confirm removal")
p_prune.set_defaults(func=cmd_prune)

# --- rebuild ---
p_rebuild = sub.add_parser("rebuild", help="Rebuild needs.json from RST sources")
p_rebuild.set_defaults(func=cmd_rebuild)
Expand Down
77 changes: 77 additions & 0 deletions src/ai_memory_protocol/rst.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,83 @@ def deprecate_in_rst(
return success, msg


def count_deprecated_in_rst(workspace: Path) -> int:
"""Count deprecated directive blocks across all RST files."""
count = 0
for mem_type in TYPE_FILES:
for rst_path in _find_all_rst_files(workspace, mem_type):
if rst_path.exists():
content = rst_path.read_text()
count += content.count(":status: deprecated")
return count


def prune_deprecated_from_rst(workspace: Path) -> tuple[int, list[str]]:
"""Remove all deprecated directive blocks from RST files.

Returns (count_removed, list of removed IDs).
Git preserves history so no data is truly lost.
"""
removed_count = 0
removed_ids: list[str] = []

for mem_type in TYPE_FILES:
for rst_path in _find_all_rst_files(workspace, mem_type):
if not rst_path.exists():
continue
content = rst_path.read_text()
if ":status: deprecated" not in content:
continue

lines = content.split("\n")
new_lines: list[str] = []
i = 0
while i < len(lines):
# Detect start of a directive block: ".. type:: Title"
directive_match = re.match(r"^\.\. (\w+):: ", lines[i])
if not directive_match:
new_lines.append(lines[i])
i += 1
continue

# Collect the full directive block (directive line + indented body)
block_lines = [lines[i]]
i += 1
while i < len(lines) and (lines[i].startswith(" ") or lines[i].strip() == ""):
# Stop at empty line followed by non-indented content
if lines[i].strip() == "" and (
i + 1 >= len(lines)
or (lines[i + 1].strip() != "" and not lines[i + 1].startswith(" "))
):
block_lines.append(lines[i])
i += 1
break
block_lines.append(lines[i])
i += 1

# Check if this block is deprecated
block_text = "\n".join(block_lines)
if ":status: deprecated" in block_text:
removed_count += 1
id_match = re.search(r":id:\s*(\S+)", block_text)
if id_match:
removed_ids.append(id_match.group(1))
# Skip this block (don't add to new_lines)
# Also skip trailing blank lines
while i < len(lines) and lines[i].strip() == "":
i += 1
else:
new_lines.extend(block_lines)

# Write back cleaned content
cleaned = "\n".join(new_lines)
# Normalize trailing newlines
cleaned = cleaned.rstrip("\n") + "\n"
rst_path.write_text(cleaned)

return removed_count, removed_ids


def update_body_in_rst(
workspace: Path,
need_id: str,
Expand Down
77 changes: 77 additions & 0 deletions tests/test_rst.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from ai_memory_protocol.rst import (
add_tags_in_rst,
append_to_rst,
count_deprecated_in_rst,
deprecate_in_rst,
generate_id,
generate_rst_directive,
prune_deprecated_from_rst,
remove_tags_in_rst,
update_field_in_rst,
)
Expand Down Expand Up @@ -382,3 +384,78 @@ def test_empty_title_rejected(self, tmp_workspace: Path) -> None:
ok, msg = update_title_in_rst(tmp_workspace, "FACT_x", "")
assert not ok
assert "empty" in msg.lower()


class TestPruneDeprecated:
def test_removes_only_deprecated(self, tmp_workspace: Path) -> None:
active = generate_rst_directive(
"fact", "Active fact", need_id="FACT_active", tags=["topic:test"], body="Keep me."
)
deprecated = generate_rst_directive(
"fact",
"Old fact",
need_id="FACT_old",
tags=["topic:test"],
body="Remove me.",
)
append_to_rst(tmp_workspace, "fact", active)
append_to_rst(tmp_workspace, "fact", deprecated)
deprecate_in_rst(tmp_workspace, "FACT_old")

count, removed_ids = prune_deprecated_from_rst(tmp_workspace)

assert count == 1
assert "FACT_old" in removed_ids
content = (tmp_workspace / "memory" / "facts.rst").read_text()
assert "FACT_active" in content
assert "FACT_old" not in content
assert "Keep me." in content
assert "Remove me." not in content

def test_preserves_file_header(self, tmp_workspace: Path) -> None:
deprecated = generate_rst_directive(
"fact", "Gone", need_id="FACT_gone", tags=["topic:test"], body="Bye."
)
append_to_rst(tmp_workspace, "fact", deprecated)
deprecate_in_rst(tmp_workspace, "FACT_gone")

prune_deprecated_from_rst(tmp_workspace)

content = (tmp_workspace / "memory" / "facts.rst").read_text()
assert "Facts" in content # Header preserved
assert "FACT_gone" not in content

def test_handles_split_files(self, tmp_workspace: Path) -> None:
# Fill main file to trigger split
for i in range(52):
d = generate_rst_directive(
"mem", f"Obs {i}", need_id=f"MEM_obs_{i}", tags=["topic:test"]
)
append_to_rst(tmp_workspace, "mem", d)

# Deprecate one in the split file
deprecate_in_rst(tmp_workspace, "MEM_obs_51")

count, removed_ids = prune_deprecated_from_rst(tmp_workspace)
assert count == 1
assert "MEM_obs_51" in removed_ids

def test_count_deprecated(self, tmp_workspace: Path) -> None:
for i in range(3):
d = generate_rst_directive("fact", f"F{i}", need_id=f"FACT_f{i}", tags=["topic:test"])
append_to_rst(tmp_workspace, "fact", d)

deprecate_in_rst(tmp_workspace, "FACT_f0")
deprecate_in_rst(tmp_workspace, "FACT_f2")

assert count_deprecated_in_rst(tmp_workspace) == 2

def test_prune_empty_workspace(self, tmp_workspace: Path) -> None:
count, removed_ids = prune_deprecated_from_rst(tmp_workspace)
assert count == 0
assert removed_ids == []

def test_count_zero_when_none_deprecated(self, tmp_workspace: Path) -> None:
d = generate_rst_directive("fact", "Active", need_id="FACT_a", tags=["topic:test"])
append_to_rst(tmp_workspace, "fact", d)
assert count_deprecated_in_rst(tmp_workspace) == 0
Loading