-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline_logger.py
More file actions
159 lines (133 loc) · 4.8 KB
/
pipeline_logger.py
File metadata and controls
159 lines (133 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
crewai pipeline 日志记录器
通过拦截 stdout 解析 crewai verbose 输出,捕获完整的 tool call / LLM 交互链路
"""
import json
import re
import sys
import time
import os
from dataclasses import dataclass, field, asdict
AGENT_ROLES = [
"Benchmark Data Parser",
"Statistical Analyst",
"Comparative Research Analyst",
"Evaluation Report Writer",
]
@dataclass
class LogEntry:
ts: float
event: str
agent: str = ""
task_index: int = -1
data: dict = field(default_factory=dict)
class PipelineLogger:
def __init__(self):
self.entries: list[LogEntry] = []
self.t0 = time.time()
self._current_agent = ""
self._current_task_idx = -1
self._task_counter = 0
self._log_file = "outputs/pipeline_log.jsonl"
def set_log_file(self, path: str):
self._log_file = path
os.makedirs(os.path.dirname(path), exist_ok=True)
def _log(self, event: str, agent: str = "", task_index: int = -1, **data):
entry = LogEntry(
ts=round(time.time() - self.t0, 3),
event=event,
agent=agent or self._current_agent,
task_index=task_index if task_index >= 0 else self._current_task_idx,
data=data,
)
self.entries.append(entry)
with open(self._log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(asdict(entry), ensure_ascii=False) + "\n")
def log_task_start(self, task_index: int, description: str, agent_role: str):
self._log("task_start", agent=agent_role, task_index=task_index,
description=description[:200])
def make_task_callback(self):
idx = [0]
def task_callback(task_output):
try:
i = idx[0]
role = AGENT_ROLES[i] if i < len(AGENT_ROLES) else "unknown"
self._log("task_end", agent=role, task_index=i,
raw_output=str(task_output)[:800])
idx[0] += 1
except Exception as e:
self._log("callback_error", data={"error": str(e)})
return task_callback
def install_stdout_interceptor(self):
"""安装 stdout 拦截器,实时解析 crewai verbose 输出"""
interceptor = _CrewAIOutputInterceptor(sys.stdout, self)
sys.stdout = interceptor
return interceptor
def restore_stdout(self, interceptor):
sys.stdout = interceptor.original
class _CrewAIOutputInterceptor:
"""
拦截 crewai 的 rich/print 输出,从中提取结构化事件
同时透传到真正的 stdout
"""
# 匹配 Agent: <name>
RE_AGENT = re.compile(r'Agent:\s*(.+?)(?:\s*│|\s*$)')
# 匹配 Tool: <name>
RE_TOOL = re.compile(r'Tool:\s*(\w+)')
# 匹配 Args: {...}
RE_ARGS = re.compile(r"Args:\s*(\{.+?\})")
# Tool executed with result
RE_RESULT = re.compile(r'Tool \w+ executed with result.*?:\s*(.+)', re.DOTALL)
# Task Started
RE_TASK_START = re.compile(r'Task Started')
# Task Completed / Agent Finished
RE_TASK_END = re.compile(r'Task Completed|✅.*Completed|Agent Finished')
# Final Answer
RE_FINAL = re.compile(r'Final Answer\s*[::]\s*(.+)', re.DOTALL)
def __init__(self, original, logger: PipelineLogger):
self.original = original
self.logger = logger
self._buf = ""
self._pending_tool: str = ""
self._pending_args: str = ""
def write(self, text: str):
self.original.write(text)
self._buf += text
self._parse(text)
def flush(self):
self.original.flush()
def _parse(self, text: str):
lg = self.logger
# 去掉 ANSI 颜色码
clean = re.sub(r'\x1b\[[0-9;]*m', '', text)
# Agent 名字(更新当前 agent 状态)
m = self.RE_AGENT.search(clean)
if m:
name = m.group(1).strip().rstrip('│').strip()
if name and len(name) < 60:
lg._current_agent = name
# Tool name
m = self.RE_TOOL.search(clean)
if m:
self._pending_tool = m.group(1).strip()
# Tool args
m = self.RE_ARGS.search(clean)
if m and self._pending_tool:
self._pending_args = m.group(1)[:300]
lg._log("tool_call",
tool=self._pending_tool,
args=self._pending_args)
self._pending_tool = ""
self._pending_args = ""
# Tool result
m = self.RE_RESULT.search(clean)
if m:
result = m.group(1).strip()[:500]
lg._log("tool_result", result=result)
# Final Answer
m = self.RE_FINAL.search(clean)
if m:
answer = m.group(1).strip()[:400]
lg._log("agent_final", answer=answer)
# 全局单例
logger = PipelineLogger()