-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor_service.py
More file actions
348 lines (275 loc) · 11.2 KB
/
monitor_service.py
File metadata and controls
348 lines (275 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""
系统监控服务
实时监控鼠标、键盘、窗口和系统活动
"""
import time
import logging
import threading
from datetime import datetime
from typing import Optional
import psutil
from pynput import mouse, keyboard
import win32gui
import win32process
from database import Database
from config import (
MONITOR_INTERVAL, IDLE_THRESHOLD,
BUSY_WEIGHTS, LOG_FILE, LOG_FORMAT, LOG_LEVEL
)
# 配置日志
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format=LOG_FORMAT,
handlers=[
logging.FileHandler(LOG_FILE, encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ActivityMonitor:
"""活动监控器"""
def __init__(self):
self.db = Database()
self.session_id = None
# 活动计数器
self.mouse_distance = 0.0
self.mouse_clicks = 0
self.mouse_moves = 0
self.keyboard_presses = 0
self.window_switches = 0
# 鼠标位置追踪
self.last_mouse_pos = None
self.last_active_window = None
# 监听器
self.mouse_listener = None
self.keyboard_listener = None
# 运行标志
self.running = False
self.monitor_thread = None
# 最后活动时间
self.last_activity_time = datetime.now()
def on_mouse_move(self, x, y):
"""鼠标移动事件"""
if self.last_mouse_pos:
distance = ((x - self.last_mouse_pos[0]) ** 2 +
(y - self.last_mouse_pos[1]) ** 2) ** 0.5
self.mouse_distance += distance
self.last_mouse_pos = (x, y)
self.mouse_moves += 1
self.last_activity_time = datetime.now()
def on_mouse_click(self, x, y, button, pressed):
"""鼠标点击事件"""
if pressed:
self.mouse_clicks += 1
self.last_activity_time = datetime.now()
def on_key_press(self, key):
"""键盘按键事件"""
self.keyboard_presses += 1
self.last_activity_time = datetime.now()
def get_active_window_info(self) -> tuple:
"""获取活动窗口信息"""
try:
hwnd = win32gui.GetForegroundWindow()
window_title = win32gui.GetWindowText(hwnd)
# 检测窗口切换
if self.last_active_window and self.last_active_window != window_title:
self.window_switches += 1
self.last_active_window = window_title
# 获取窗口数量(所有可见窗口)
window_count = 0
def enum_handler(hwnd, ctx):
nonlocal window_count
if win32gui.IsWindowVisible(hwnd):
window_count += 1
win32gui.EnumWindows(enum_handler, None)
return window_title, window_count
except Exception as e:
logger.error(f"获取窗口信息失败: {e}")
return "", 0
def get_system_usage(self) -> tuple:
"""获取系统资源使用情况"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
return cpu_percent, memory_percent
except Exception as e:
logger.error(f"获取系统资源失败: {e}")
return 0, 0
def calculate_busy_index(self, mouse_activity: float, keyboard_activity: float,
window_activity: float, system_activity: float) -> float:
"""
计算忙碌指数(0-100)
"""
busy_index = (
mouse_activity * BUSY_WEIGHTS['mouse_activity'] +
keyboard_activity * BUSY_WEIGHTS['keyboard_activity'] +
window_activity * BUSY_WEIGHTS['window_switches'] +
system_activity * BUSY_WEIGHTS['system_usage']
)
return min(100, max(0, busy_index))
def is_idle(self) -> bool:
"""判断是否处于空闲状态"""
time_since_activity = (datetime.now() - self.last_activity_time).total_seconds()
return time_since_activity > IDLE_THRESHOLD
def collect_and_save_data(self):
"""收集并保存数据"""
try:
# 获取窗口信息
active_window, window_count = self.get_active_window_info()
# 获取系统资源使用
cpu_usage, memory_usage = self.get_system_usage()
# 标准化活动指标(基于MONITOR_INTERVAL的期望值)
# 计算每秒的期望值,然后乘以实际间隔
interval_factor = MONITOR_INTERVAL / 60 # 相对于1分钟的系数
# 假设正常工作状态(每分钟):
# - 鼠标移动约1000-5000像素
# - 鼠标点击约10-50次
# - 键盘按键约50-300次
# - 窗口切换约0-10次
mouse_activity_score = min(100, (
(self.mouse_distance / (5000 * interval_factor) * 50) +
(self.mouse_clicks / (50 * interval_factor) * 30) +
(self.mouse_moves / (500 * interval_factor) * 20)
))
keyboard_activity_score = min(100, self.keyboard_presses / (300 * interval_factor) * 100)
window_activity_score = min(100, self.window_switches / (10 * interval_factor) * 100)
system_activity_score = (cpu_usage + memory_usage) / 2
# 计算忙碌指数
busy_index = self.calculate_busy_index(
mouse_activity_score,
keyboard_activity_score,
window_activity_score,
system_activity_score
)
# 判断是否空闲
is_idle = self.is_idle()
# 如果空闲,忙碌指数设为0
if is_idle:
busy_index = 0
# 准备记录数据
record = {
'timestamp': datetime.now(),
'mouse_distance': round(self.mouse_distance, 2),
'mouse_clicks': self.mouse_clicks,
'mouse_moves': self.mouse_moves,
'keyboard_presses': self.keyboard_presses,
'window_switches': self.window_switches,
'active_windows': window_count,
'cpu_usage': round(cpu_usage, 2),
'memory_usage': round(memory_usage, 2),
'busy_index': round(busy_index, 2),
'is_idle': is_idle,
'active_window_title': active_window[:200] # 限制长度
}
# 保存到数据库
self.db.save_activity_record(record)
logger.info(f"数据已保存({MONITOR_INTERVAL}s) - 忙碌指数: {busy_index:.2f}, "
f"鼠标: {self.mouse_clicks}次点击/{self.mouse_distance:.0f}px, "
f"键盘: {self.keyboard_presses}次按键, "
f"窗口切换: {self.window_switches}次")
# 重置计数器
self.reset_counters()
except Exception as e:
logger.error(f"收集和保存数据失败: {e}", exc_info=True)
def reset_counters(self):
"""重置计数器"""
self.mouse_distance = 0.0
self.mouse_clicks = 0
self.mouse_moves = 0
self.keyboard_presses = 0
self.window_switches = 0
def monitor_loop(self):
"""监控循环"""
logger.info("监控循环开始")
while self.running:
try:
# 收集并保存数据
self.collect_and_save_data()
# 每小时更新一次每日统计
now = datetime.now()
if now.minute == 0 and now.second < MONITOR_INTERVAL: # 每小时的第0分钟
self.db.update_daily_stats(now.date())
# 等待到下一个监控间隔
time.sleep(MONITOR_INTERVAL)
except Exception as e:
logger.error(f"监控循环错误: {e}", exc_info=True)
time.sleep(MONITOR_INTERVAL)
def start(self):
"""启动监控服务"""
if self.running:
logger.warning("监控服务已在运行")
return
logger.info("正在启动监控服务...")
# 创建新会话
self.session_id = self.db.start_session()
if self.session_id < 0:
logger.error("创建会话失败")
return
# 启动鼠标监听器
self.mouse_listener = mouse.Listener(
on_move=self.on_mouse_move,
on_click=self.on_mouse_click
)
self.mouse_listener.start()
# 启动键盘监听器
self.keyboard_listener = keyboard.Listener(
on_press=self.on_key_press
)
self.keyboard_listener.start()
# 启动监控线程
self.running = True
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info(f"监控服务已启动 - 会话ID: {self.session_id}")
def stop(self):
"""停止监控服务"""
if not self.running:
logger.warning("监控服务未运行")
return
logger.info("正在停止监控服务...")
# 停止监控循环
self.running = False
# 保存最后一次数据
self.collect_and_save_data()
# 结束会话
if self.session_id:
self.db.end_session(self.session_id)
# 更新今日统计
self.db.update_daily_stats(datetime.now().date())
# 停止监听器
if self.mouse_listener:
self.mouse_listener.stop()
if self.keyboard_listener:
self.keyboard_listener.stop()
# 等待监控线程结束
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
logger.info("监控服务已停止")
def get_current_status(self) -> dict:
"""获取当前状态"""
return {
'running': self.running,
'session_id': self.session_id,
'mouse_clicks': self.mouse_clicks,
'keyboard_presses': self.keyboard_presses,
'window_switches': self.window_switches,
'is_idle': self.is_idle(),
'last_activity': self.last_activity_time.isoformat()
}
def main():
"""主函数(用于测试)"""
monitor = ActivityMonitor()
try:
monitor.start()
logger.info("监控服务正在运行,按 Ctrl+C 停止...")
# 保持运行
while True:
time.sleep(10)
status = monitor.get_current_status()
logger.info(f"当前状态: {status}")
except KeyboardInterrupt:
logger.info("收到停止信号")
finally:
monitor.stop()
if __name__ == '__main__':
main()