forked from Atomlaunch/engram
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtodos.py
More file actions
126 lines (114 loc) · 4.6 KB
/
todos.py
File metadata and controls
126 lines (114 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
"""Todo tracking utilities for Engram."""
import os
import sys
from datetime import datetime
from typing import Optional
try:
import kuzu
except ImportError:
kuzu = None
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from engram.ingest import generate_id, normalize_entity_name
def get_open_todos(conn: kuzu.Connection, agent_id: Optional[str] = None) -> list[dict]:
"""Return open todo facts."""
todos = []
try:
params = {"p_limit": 50}
agent_filter = ""
if agent_id:
agent_filter = " AND f.agent_id = $p_agent"
params["p_agent"] = agent_id
result = conn.execute(
"MATCH (f:Fact) "
"WHERE lower(f.category) = 'todo' "
"AND (f.status IS NULL OR f.status <> 'resolved')"
+ agent_filter +
" RETURN f.id, f.content, f.created_at, f.status, f.agent_id "
"ORDER BY f.created_at DESC LIMIT $p_limit",
params
)
while result.has_next():
row = result.get_next()
todos.append({
"id": row[0], "content": row[1], "created_at": str(row[2]),
"status": row[3], "agent_id": row[4]
})
except Exception:
pass
return todos
def resolve_todo(conn: kuzu.Connection, todo_id: str) -> dict:
"""Mark a todo as resolved."""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
conn.execute(
"MATCH (f:Fact {id: $p_id}) "
"SET f.status = 'resolved', "
"f.resolved_at = timestamp($p_now), "
"f.updated_at = timestamp($p_now)",
{"p_id": todo_id, "p_now": now}
)
return {"ok": True, "id": todo_id}
except Exception as e:
return {"ok": False, "error": str(e), "id": todo_id}
def add_todo(conn: kuzu.Connection, content: str, agent_id: str = "main", about_entities: Optional[list] = None) -> dict:
"""Create a todo fact directly."""
about_entities = about_entities or []
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
todo_id = generate_id("fact", f"todo:{content.lower()}:{agent_id}")
try:
conn.execute(
"MERGE (f:Fact {id: $p_id}) "
"SET f.content = $p_content, "
"f.category = 'todo', "
"f.status = 'open', "
"f.confidence = $p_conf, "
"f.importance = $p_imp, "
"f.valid_at = timestamp($p_now), "
"f.created_at = CASE WHEN f.created_at IS NULL THEN timestamp($p_now) ELSE f.created_at END, "
"f.updated_at = timestamp($p_now), "
"f.agent_id = $p_agent, "
"f.source_type = 'manual', "
"f.memory_tier = 'candidate', "
"f.quality_score = 0.7, "
"f.contamination_score = 0.0, "
"f.retrievable = true",
{
"p_id": todo_id,
"p_content": content,
"p_conf": 0.85,
"p_imp": 0.6,
"p_now": now,
"p_agent": agent_id,
}
)
# Link to entities if provided
for name in about_entities:
name = str(name).strip()
if not name:
continue
eid = generate_id("ent", normalize_entity_name(name))
try:
conn.execute(
"MERGE (e:Entity {id: $p_eid}) "
"SET e.name = $p_name, "
"e.entity_type = 'concept', "
"e.description = CASE WHEN e.description IS NULL THEN '' ELSE e.description END, "
"e.importance = CASE WHEN e.importance IS NULL THEN 0.3 ELSE e.importance END, "
"e.access_count = CASE WHEN e.access_count IS NULL THEN 0 ELSE e.access_count END, "
"e.agent_id = $p_agent, "
"e.created_at = CASE WHEN e.created_at IS NULL THEN timestamp($p_now) ELSE e.created_at END, "
"e.updated_at = timestamp($p_now)",
{"p_eid": eid, "p_name": name, "p_now": now, "p_agent": agent_id}
)
conn.execute(
"MATCH (f:Fact {id: $p_fid}), (e:Entity {id: $p_eid}) "
"MERGE (f)-[r:ABOUT]->(e) "
"ON CREATE SET r.aspect = 'todo', r.created_at = timestamp($p_now)",
{"p_fid": todo_id, "p_eid": eid, "p_now": now}
)
except Exception:
pass
return {"ok": True, "id": todo_id}
except Exception as e:
return {"ok": False, "error": str(e)}