-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevograph.py
More file actions
262 lines (206 loc) · 8.75 KB
/
evograph.py
File metadata and controls
262 lines (206 loc) · 8.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
EvoGraph V4 主接口
提供检索和问答功能
V4 改进:
- 检索结果附带演化线信息(state_change, op)
- LLM prompt 中包含结构化的状态变化信息
"""
from typing import List, Dict, Optional, Set
from knowledge_graph import KnowledgeGraphV2
from graph_retriever import EvoGraphRetriever
from llm_client import get_llm_client
from config import SYSTEM_CONFIG
class EvoGraphV4:
"""EvoGraph V2 主接口"""
def __init__(self, debug_log_path=None, embedding_manager=None):
"""
初始化 EvoGraph V2
Args:
debug_log_path: debug 日志文件路径(可选)
embedding_manager: 共享的 EmbeddingManager 实例(可选,用于多线程场景)
"""
self.kg = KnowledgeGraphV2(debug_log_path=debug_log_path, embedding_manager=embedding_manager)
self.retriever = EvoGraphRetriever(self.kg, debug_log_path=debug_log_path)
self.llm = get_llm_client()
self.debug = SYSTEM_CONFIG.get("debug", False)
def close(self):
"""关闭连接"""
self.kg.close()
def retrieve(self, query: str) -> List[dict]:
"""
检索与查询相关的 Notes
Args:
query: 查询问题
Returns:
相关的 Notes 列表
"""
return self.retriever.retrieve(query)
def answer(self, query: str, generate: bool = True) -> Dict:
"""
检索并回答问题
Args:
query: 查询问题
generate: 是否生成答案
Returns:
{
"query": str,
"retrieved_notes": List[dict],
"retrieved_ids": List[str],
"answer": str (if generate=True)
}
"""
# 检索
notes = self.retrieve(query)
note_ids = [n["id"] for n in notes]
result = {
"query": query,
"retrieved_notes": notes,
"retrieved_ids": note_ids,
"answer": ""
}
# 生成答案(使用带演化信息的 notes)
if generate and note_ids:
notes_with_evo = self.kg.get_notes_with_evolution_by_ids(note_ids)
# 收集涉及的实体,获取完整演化链
involved_entities = self._collect_entities_from_notes(notes_with_evo)
entity_timelines = self._get_entity_timelines(involved_entities)
# 格式化上下文:演化链 + 相关对话
timeline_context = self._format_entity_timelines(entity_timelines)
notes_context = self._format_context_with_evolution(notes_with_evo)
result["answer"] = self._generate_answer(query, timeline_context, notes_context)
return result
def _collect_entities_from_notes(self, notes_with_evo: List[dict]) -> List[str]:
"""从检索结果中收集涉及的实体名称"""
entities = set()
for item in notes_with_evo:
for evo in item.get("evolutions", []):
entity = evo.get("entity")
if entity:
entities.add(entity)
return list(entities)
def _get_entity_timelines(self, entity_names: List[str]) -> Dict[str, List[dict]]:
"""获取每个实体的完整演化链"""
timelines = {}
for name in entity_names:
line = self.kg.get_entity_line(name, valid_only=False) # 获取所有状态(包括失效的)
if line:
timelines[name] = line
return timelines
def _format_entity_timelines(self, timelines: Dict[str, List[dict]]) -> str:
"""
格式化实体演化链
输出格式:
[Entity Timelines]
Jon: banker(N5) → unemployed(N15) → opened dance studio(N30)
"""
if not timelines:
return ""
parts = ["[Entity Timelines]"]
for entity, line_nodes in timelines.items():
if not line_nodes:
continue
# 构建演化序列: state(note_id) → state(note_id)
states = []
for ln in line_nodes:
note_id = ln.get("note_id", "?")
op = ln.get("op", "")
change = ln.get("state_change", "")
valid = ln.get("valid", True)
if change:
state_str = f"{change}({note_id})"
else:
state_str = f"{op}({note_id})"
# 标记失效状态
if not valid:
state_str = f"~~{state_str}~~"
states.append(state_str)
if states:
parts.append(f"{entity}: {' → '.join(states)}")
return "\n".join(parts) if len(parts) > 1 else ""
def _format_context(self, notes: List[dict]) -> str:
"""格式化上下文(兼容旧接口)"""
parts = []
for note in notes:
date = note.get("session_date", "")
text = note.get("text", "")
parts.append(f"[{date}] {text}")
return "\n\n".join(parts)
def _format_context_with_evolution(self, notes_with_evo: List[dict]) -> str:
"""
格式化上下文,包含演化线信息
输入格式: [{"note": {...}, "evolutions": [{"entity": "Jon", "op": "UPDATE", "state_change": "..."}]}]
"""
parts = []
for item in notes_with_evo:
note = item["note"]
evolutions = item.get("evolutions", [])
date = note.get("session_date", "")
text = note.get("text", "")
# 构建演化标签 - 覆盖所有变更类型 (CREATE/UPDATE/DELETE)
evo_tags = []
for evo in evolutions:
op = evo.get("op", "")
if op in ("CREATE", "UPDATE", "DELETE"):
entity = evo.get("entity", "?")
change = evo.get("state_change", "")
if change:
evo_tags.append(f"[{entity}: {op} {change}]")
else:
# 没有 state_change 也显示操作类型
evo_tags.append(f"[{entity}: {op}]")
# 拼接:日期 + 演化标签 + 内容
evo_str = " ".join(evo_tags)
if evo_str:
parts.append(f"[{date}] {evo_str}\n{text}")
else:
parts.append(f"[{date}] {text}")
return "\n\n".join(parts)
def _generate_answer(self, query: str, timeline_context: str, notes_context: str) -> str:
"""生成答案"""
# 构建 prompt,先给实体演化链概览,再给相关对话详情
context_parts = []
if timeline_context:
context_parts.append(timeline_context)
if notes_context:
context_parts.append(f"[Related Conversations]\n{notes_context}")
full_context = "\n\n".join(context_parts)
prompt = f"""Based on the following information, answer the question.
{full_context}
Question: {query}
Instructions:
- The "Entity Timelines" section shows the complete state evolution of relevant entities (~~strikethrough~~ indicates outdated states)
- The "Related Conversations" section contains the actual conversation details
- Answer based ONLY on the information provided above
- Answer with exact words from the context whenever possible
- Output ONLY the final answer in the form of a short phrase (no explanation, no extra words, no quotes)
- If the question asks "when"/time/date: output an ABSOLUTE date/month/year (e.g., "February, 2023", "20 June, 2023"); avoid relative terms like "next month", "tomorrow", "yesterday"
- Prefer explicit dates mentioned in the conversation text; if only relative time is given, infer the absolute time from the note date shown in brackets
- If the question is yes/no: output ONLY "Yes" or "No"
Answer:"""
return self.llm.call_for_answer(prompt) or ""
def get_stats(self) -> dict:
"""获取图谱统计"""
return self.kg.get_stats()
def get_entity_info(self, entity_name: str) -> Optional[dict]:
"""获取实体信息"""
return self.kg.get_entity(entity_name)
def get_entity_timeline(self, entity_name: str) -> List[dict]:
"""获取实体时间线"""
return self.kg.get_entity_notes(entity_name)
# 兼容别名
EvoGraphV2 = EvoGraphV4
if __name__ == "__main__":
evo = EvoGraphV4()
print("Graph Stats:", evo.get_stats())
# 测试查询
test_queries = [
"When did Jon lose his job as a banker?",
"Which city have both Jean and John visited?",
"What does Jon's dance studio offer?",
]
for query in test_queries:
print(f"\n{'='*60}")
result = evo.answer(query, generate=False)
print(f"Query: {query}")
print(f"Retrieved: {result['retrieved_ids']}")
evo.close()