From a01408295ce067eb7555e675008c4f7ccee128aa Mon Sep 17 00:00:00 2001 From: MrMei Date: Sat, 14 Mar 2026 01:17:12 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=B0=86=20config.py=20=E4=B8=AD=20config.?= =?UTF-8?q?json=20=E7=9A=84=E8=AF=BB=E5=86=99=E7=BB=9F=E4=B8=80=E6=98=BE?= =?UTF-8?q?=E5=BC=8F=E6=8C=87=E5=AE=9A=E4=B8=BA=20UTF-8=20=E7=BC=96?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E4=BF=AE=E5=A4=8D=20Windows=20=E4=B8=AD?= =?UTF-8?q?=E6=96=87=E7=8E=AF=E5=A2=83=E4=B8=8B=E5=9B=A0=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E7=BC=96=E7=A0=81=E5=B7=AE=E5=BC=82=E5=AF=BC=E8=87=B4=E7=9A=84?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=A7=A3=E6=9E=90=E5=BC=82=E5=B8=B8=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config.py b/config.py index 97efc03..cf43b3c 100644 --- a/config.py +++ b/config.py @@ -169,7 +169,7 @@ def load_config(): cfg = {} if os.path.exists(CONFIG_FILE): try: - with open(CONFIG_FILE) as f: + with open(CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) except json.JSONDecodeError: print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置") @@ -181,12 +181,12 @@ def load_config(): if detected: print(f"[+] 自动检测到微信数据目录: {detected}") cfg = {**_DEFAULT, **cfg, "db_dir": detected} - with open(CONFIG_FILE, "w") as f: + with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(cfg, f, indent=4, ensure_ascii=False) print(f"[+] 已保存到: {CONFIG_FILE}") else: if not os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, "w") as f: + with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(_DEFAULT, f, indent=4, ensure_ascii=False) print(f"[!] 未能自动检测微信数据目录") print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段") From d704228fad7631b3b1c3e2ef68106833a3b28b1f Mon Sep 17 00:00:00 2001 From: MrMei Date: Sun, 15 Mar 2026 11:29:46 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=B0=86=E9=A1=B9=E7=9B=AE=E5=86=85=20conf?= =?UTF-8?q?ig.json=E3=80=81=E5=AF=86=E9=92=A5=E6=96=87=E4=BB=B6=E5=8F=8A?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E7=BC=93=E5=AD=98=20JSON=20=E7=9A=84?= =?UTF-8?q?=E8=AF=BB=E5=86=99=E7=BB=9F=E4=B8=80=E6=98=BE=E5=BC=8F=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E4=B8=BA=20UTF-8=20=E7=BC=96=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Windows=20=E4=B8=AD=E6=96=87=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E4=B8=8B=E5=9B=A0=E9=BB=98=E8=AE=A4=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E5=B7=AE=E5=BC=82=E5=AF=BC=E8=87=B4=E7=9A=84=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=B8=8E=E5=AF=86=E9=92=A5=E8=A7=A3=E6=9E=90=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- decrypt_db.py | 46 +- find_image_key.py | 4 +- find_image_key_monitor.py | 4 +- latency_test.py | 2 +- main.py | 4 +- mcp_server.py | 1264 ++++++++++++++++++------------------- monitor.py | 20 +- monitor_web.py | 2 +- 8 files changed, 673 insertions(+), 673 deletions(-) diff --git a/decrypt_db.py b/decrypt_db.py index bd8fa9f..5be0f10 100644 --- a/decrypt_db.py +++ b/decrypt_db.py @@ -7,7 +7,7 @@ """ import hashlib, struct, os, sys, json import hmac as hmac_mod -from Crypto.Cipher import AES +from Crypto.Cipher import AES import functools print = functools.partial(print, flush=True) @@ -20,12 +20,12 @@ RESERVE_SZ = 80 # IV(16) + HMAC(64) SQLITE_HDR = b'SQLite format 3\x00' -from config import load_config -from key_utils import get_key_info, strip_key_metadata -_cfg = load_config() -DB_DIR = _cfg["db_dir"] -OUT_DIR = _cfg["decrypted_dir"] -KEYS_FILE = _cfg["keys_file"] +from config import load_config +from key_utils import get_key_info, strip_key_metadata +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_DIR = _cfg["decrypted_dir"] +KEYS_FILE = _cfg["keys_file"] def derive_mac_key(enc_key, salt): @@ -116,13 +116,13 @@ def main(): print("请先运行 find_all_keys.py") sys.exit(1) - with open(KEYS_FILE) as f: - keys = json.load(f) - - keys = strip_key_metadata(keys) - print(f"\n加载 {len(keys)} 个数据库密钥") - print(f"输出目录: {OUT_DIR}") - os.makedirs(OUT_DIR, exist_ok=True) + with open(KEYS_FILE, "r", encoding="utf-8") as f: + keys = json.load(f) + + keys = strip_key_metadata(keys) + print(f"\n加载 {len(keys)} 个数据库密钥") + print(f"输出目录: {OUT_DIR}") + os.makedirs(OUT_DIR, exist_ok=True) # 收集所有DB文件 db_files = [] @@ -142,15 +142,15 @@ def main(): failed = 0 total_bytes = 0 - for rel, path, sz in db_files: - key_info = get_key_info(keys, rel) - if not key_info: - print(f"SKIP: {rel} (无密钥)") - failed += 1 - continue - - enc_key = bytes.fromhex(key_info["enc_key"]) - out_path = os.path.join(OUT_DIR, rel) + for rel, path, sz in db_files: + key_info = get_key_info(keys, rel) + if not key_info: + print(f"SKIP: {rel} (无密钥)") + failed += 1 + continue + + enc_key = bytes.fromhex(key_info["enc_key"]) + out_path = os.path.join(OUT_DIR, rel) print(f"解密: {rel} ({sz/1024/1024:.1f}MB) ...", end=" ") diff --git a/find_image_key.py b/find_image_key.py index 6800696..3016e12 100644 --- a/find_image_key.py +++ b/find_image_key.py @@ -334,7 +334,7 @@ def verify_and_decrypt(attach_dir, aes_key_str, xor_key): def main(): config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') - with open(config_path) as f: + with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) db_dir = config['db_dir'] @@ -392,7 +392,7 @@ def main(): config['image_aes_key'] = aes_key if xor_key is not None: config['image_xor_key'] = xor_key - with open(config_path, 'w') as f: + with open(config_path, "w", encoding="utf-8") as f: json.dump(config, f, indent=2, ensure_ascii=False) print(f"Saved to {config_path}", flush=True) diff --git a/find_image_key_monitor.py b/find_image_key_monitor.py index f81442b..69861d9 100644 --- a/find_image_key_monitor.py +++ b/find_image_key_monitor.py @@ -227,7 +227,7 @@ def verify_and_decrypt(attach_dir, aes_key_str, xor_key): def main(): config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') - with open(config_path) as f: + with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) db_dir = config['db_dir'] @@ -292,7 +292,7 @@ def main(): config['image_aes_key'] = aes_key if xor_key is not None: config['image_xor_key'] = xor_key - with open(config_path, 'w') as f: + with open(config_path, "w", encoding="utf-8") as f: json.dump(config, f, indent=2, ensure_ascii=False) print(f"Saved to {config_path}", flush=True) diff --git a/latency_test.py b/latency_test.py index e5f2fae..eb04d52 100644 --- a/latency_test.py +++ b/latency_test.py @@ -15,7 +15,7 @@ KEYS_FILE = _cfg["keys_file"] DECRYPTED = os.path.join(_cfg["decrypted_dir"], "session", "session.db") -with open(KEYS_FILE) as f: +with open(KEYS_FILE, "r", encoding="utf-8") as f: keys = json.load(f) enc_key = bytes.fromhex(keys["session/session.db"]["enc_key"]) diff --git a/main.py b/main.py index 90b16ee..9eb3a5f 100644 --- a/main.py +++ b/main.py @@ -28,7 +28,7 @@ def ensure_keys(keys_file, db_dir): """确保密钥文件存在且匹配当前 db_dir,否则重新提取""" if os.path.exists(keys_file): try: - with open(keys_file) as f: + with open(keys_file, "r", encoding="utf-8") as f: keys = json.load(f) except (json.JSONDecodeError, ValueError): keys = {} @@ -59,7 +59,7 @@ def ensure_keys(keys_file, db_dir): print("[!] 密钥提取失败") sys.exit(1) try: - with open(keys_file) as f: + with open(keys_file, "r", encoding="utf-8") as f: keys = json.load(f) except (json.JSONDecodeError, ValueError): keys = {} diff --git a/mcp_server.py b/mcp_server.py index 8ef1f05..a55efe6 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -5,10 +5,10 @@ Runs on Windows Python (needs access to D:\ WeChat databases). """ -import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re -import hmac as hmac_mod -from contextlib import closing -from datetime import datetime +import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re +import hmac as hmac_mod +from contextlib import closing +from datetime import datetime import xml.etree.ElementTree as ET from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP @@ -29,7 +29,7 @@ SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_FILE = os.path.join(SCRIPT_DIR, "config.json") -with open(CONFIG_FILE) as f: +with open(CONFIG_FILE, "r", encoding="utf-8") as f: _cfg = json.load(f) for _key in ("keys_file", "decrypted_dir"): if _key in _cfg and not os.path.isabs(_cfg[_key]): @@ -52,7 +52,7 @@ elif not os.path.isabs(DECODED_IMAGE_DIR): DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR) -with open(KEYS_FILE) as f: +with open(KEYS_FILE, "r", encoding="utf-8") as f: ALL_KEYS = strip_key_metadata(json.load(f)) # ============ 解密函数 ============ @@ -143,7 +143,7 @@ def _load_persistent_cache(self): if not os.path.exists(self.MTIME_FILE): return try: - with open(self.MTIME_FILE) as f: + with open(self.MTIME_FILE, "r", encoding="utf-8") as f: saved = json.load(f) except (json.JSONDecodeError, OSError): return @@ -172,7 +172,7 @@ def _save_persistent_cache(self): for rel_key, (db_mt, wal_mt, path) in self._cache.items(): data[rel_key] = {"db_mt": db_mt, "wal_mt": wal_mt, "path": path} try: - with open(self.MTIME_FILE, 'w') as f: + with open(self.MTIME_FILE, "w", encoding="utf-8") as f: json.dump(data, f) except OSError: pass @@ -220,11 +220,11 @@ def cleanup(self): _contact_names = None # {username: display_name} _contact_full = None # [{username, nick_name, remark}] -_self_username = None -_XML_UNSAFE_RE = re.compile(r' limit_max: - raise ValueError(f"limit 不能大于 {limit_max}") - if offset < 0: - raise ValueError("offset 不能小于 0") + + return None, None + + +def _find_msg_tables_for_user(username): + """返回用户在所有 message_N.db 中对应的消息表,按最新消息时间倒序排列。""" + table_hash = hashlib.md5(username.encode()).hexdigest() + table_name = f"Msg_{table_hash}" + if not _is_safe_msg_table_name(table_name): + return [] + + matches = [] + for rel_key in MSG_DB_KEYS: + path = _cache.get(rel_key) + if not path: + continue + conn = sqlite3.connect(path) + try: + exists = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ).fetchone() + if not exists: + continue + max_create_time = conn.execute( + f"SELECT MAX(create_time) FROM [{table_name}]" + ).fetchone()[0] or 0 + matches.append({ + 'db_path': path, + 'table_name': table_name, + 'max_create_time': max_create_time, + }) + except Exception: + pass + finally: + conn.close() + + matches.sort(key=lambda item: item['max_create_time'], reverse=True) + return matches + + +def _validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX): + if limit <= 0: + raise ValueError("limit 必须大于 0") + if limit_max is not None and limit > limit_max: + raise ValueError(f"limit 不能大于 {limit_max}") + if offset < 0: + raise ValueError("offset 不能小于 0") def _parse_time_value(value, field_name, is_end=False): @@ -692,59 +692,59 @@ def _build_message_filters(start_ts=None, end_ts=None, keyword=''): return clauses, params -def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0): - if not _is_safe_msg_table_name(table_name): - raise ValueError(f'非法消息表名: {table_name}') - - clauses, params = _build_message_filters(start_ts, end_ts, keyword) - where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else '' - sql = f""" - SELECT local_id, local_type, create_time, real_sender_id, message_content, - WCDB_CT_message_content - FROM [{table_name}] - {where_sql} - ORDER BY create_time DESC - """ - if limit is None: - return conn.execute(sql, params).fetchall() - sql += "\n LIMIT ? OFFSET ?" - return conn.execute(sql, (*params, limit, offset)).fetchall() - - -def _resolve_chat_context(chat_name): - username = resolve_username(chat_name) - if not username: - return None - - names = get_contact_names() - display_name = names.get(username, username) - message_tables = _find_msg_tables_for_user(username) - if not message_tables: - return { - 'query': chat_name, - 'username': username, - 'display_name': display_name, - 'db_path': None, - 'table_name': None, - 'message_tables': [], - 'is_group': '@chatroom' in username, - } - - primary = message_tables[0] - return { - 'query': chat_name, - 'username': username, - 'display_name': display_name, - 'db_path': primary['db_path'], - 'table_name': primary['table_name'], - 'message_tables': message_tables, - 'is_group': '@chatroom' in username, - } - - -def _resolve_chat_contexts(chat_names): - if not chat_names: - raise ValueError('chat_names 不能为空') +def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0): + if not _is_safe_msg_table_name(table_name): + raise ValueError(f'非法消息表名: {table_name}') + + clauses, params = _build_message_filters(start_ts, end_ts, keyword) + where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else '' + sql = f""" + SELECT local_id, local_type, create_time, real_sender_id, message_content, + WCDB_CT_message_content + FROM [{table_name}] + {where_sql} + ORDER BY create_time DESC + """ + if limit is None: + return conn.execute(sql, params).fetchall() + sql += "\n LIMIT ? OFFSET ?" + return conn.execute(sql, (*params, limit, offset)).fetchall() + + +def _resolve_chat_context(chat_name): + username = resolve_username(chat_name) + if not username: + return None + + names = get_contact_names() + display_name = names.get(username, username) + message_tables = _find_msg_tables_for_user(username) + if not message_tables: + return { + 'query': chat_name, + 'username': username, + 'display_name': display_name, + 'db_path': None, + 'table_name': None, + 'message_tables': [], + 'is_group': '@chatroom' in username, + } + + primary = message_tables[0] + return { + 'query': chat_name, + 'username': username, + 'display_name': display_name, + 'db_path': primary['db_path'], + 'table_name': primary['table_name'], + 'message_tables': message_tables, + 'is_group': '@chatroom' in username, + } + + +def _resolve_chat_contexts(chat_names): + if not chat_names: + raise ValueError('chat_names 不能为空') resolved = [] unresolved = [] @@ -760,50 +760,50 @@ def _resolve_chat_contexts(chat_names): if not ctx: unresolved.append(name) continue - if not ctx['message_tables']: - missing_tables.append(ctx['display_name']) - continue - if ctx['username'] in seen: - continue + if not ctx['message_tables']: + missing_tables.append(ctx['display_name']) + continue + if ctx['username'] in seen: + continue seen.add(ctx['username']) resolved.append(ctx) - - return resolved, unresolved, missing_tables - - -def _normalize_chat_names(chat_name): - if chat_name is None: - return [] - if isinstance(chat_name, str): - value = chat_name.strip() - return [value] if value else [] - if isinstance(chat_name, (list, tuple, set)): - normalized = [] - for item in chat_name: - if item is None: - continue - value = str(item).strip() - if value: - normalized.append(value) - return normalized - value = str(chat_name).strip() - return [value] if value else [] - - -def _format_history_lines(rows, username, display_name, is_group, names, id_to_username): - lines = [] - ctx = { - 'username': username, - 'display_name': display_name, - 'is_group': is_group, - } - for row in reversed(rows): - _, line = _build_history_line(row, ctx, names, id_to_username) - lines.append(line) - return lines - - -def _build_search_entry(row, ctx, names, id_to_username): + + return resolved, unresolved, missing_tables + + +def _normalize_chat_names(chat_name): + if chat_name is None: + return [] + if isinstance(chat_name, str): + value = chat_name.strip() + return [value] if value else [] + if isinstance(chat_name, (list, tuple, set)): + normalized = [] + for item in chat_name: + if item is None: + continue + value = str(item).strip() + if value: + normalized.append(value) + return normalized + value = str(chat_name).strip() + return [value] if value else [] + + +def _format_history_lines(rows, username, display_name, is_group, names, id_to_username): + lines = [] + ctx = { + 'username': username, + 'display_name': display_name, + 'is_group': is_group, + } + for row in reversed(rows): + _, line = _build_history_line(row, ctx, names, id_to_username) + lines.append(line) + return lines + + +def _build_search_entry(row, ctx, names, id_to_username): local_id, local_type, create_time, real_sender_id, content, ct = row content = _decompress_content(content, ct) if content is None: @@ -828,346 +828,346 @@ def _build_search_entry(row, ctx, names, id_to_username): entry = f"[{time_str}] [{ctx['display_name']}]" if sender_label: entry += f" {sender_label}:" - entry += f" {text}" - return create_time, entry - - -def _build_history_line(row, ctx, names, id_to_username): - local_id, local_type, create_time, real_sender_id, content, ct = row - time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') - content = _decompress_content(content, ct) - if content is None: - content = '(无法解压)' - - sender, text = _format_message_text( - local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names - ) - - sender_label = _resolve_sender_label( - real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username - ) - if sender_label: - return create_time, f'[{time_str}] {sender_label}: {text}' - return create_time, f'[{time_str}] {text}' - - -def _get_chat_message_tables(ctx): - if ctx.get('message_tables'): - return ctx['message_tables'] - if ctx.get('db_path') and ctx.get('table_name'): - return [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}] - return [] - - -def _iter_table_contexts(ctx): - for table in _get_chat_message_tables(ctx): - yield { - 'query': ctx['query'], - 'username': ctx['username'], - 'display_name': ctx['display_name'], - 'db_path': table['db_path'], - 'table_name': table['table_name'], - 'is_group': ctx['is_group'], - } - - -def _candidate_page_size(limit, offset): - return limit + offset - - -def _message_query_batch_size(candidate_limit): - return candidate_limit - - -def _history_query_batch_size(candidate_limit): - return min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) - - -def _page_ranked_entries(entries, limit, offset): - ordered = sorted(entries, key=lambda item: item[0], reverse=True) - paged = ordered[offset:offset + limit] - paged.sort(key=lambda item: item[0]) - return paged - - -def _collect_chat_history_lines(ctx, names, start_ts=None, end_ts=None, limit=20, offset=0): - collected = [] - failures = [] - candidate_limit = _candidate_page_size(limit, offset) - batch_size = _history_query_batch_size(candidate_limit) - - for table_ctx in _iter_table_contexts(ctx): - try: - with closing(sqlite3.connect(table_ctx['db_path'])) as conn: - id_to_username = _load_name2id_maps(conn) - fetch_offset = 0 - collected_before_table = len(collected) - # 当前页上的消息一定落在各分表最近的 offset+limit 条记录内。 - while len(collected) - collected_before_table < candidate_limit: - rows = _query_messages( - conn, - table_ctx['table_name'], - start_ts=start_ts, - end_ts=end_ts, - limit=batch_size, - offset=fetch_offset, - ) - if not rows: - break - fetch_offset += len(rows) - - for row in rows: - try: - collected.append(_build_history_line(row, table_ctx, names, id_to_username)) - except Exception as e: - failures.append( - f"{table_ctx['display_name']} local_id={row[0]} create_time={row[2]}: {e}" - ) - if len(collected) - collected_before_table >= candidate_limit: - break - - if len(rows) < batch_size: - break - except Exception as e: - failures.append(f"{table_ctx['db_path']}: {e}") - - paged = _page_ranked_entries(collected, limit, offset) - return [line for _, line in paged], failures - - -def _collect_chat_search_entries(ctx, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): - collected = [] - failures = [] - contexts_by_db = {} - for table_ctx in _iter_table_contexts(ctx): - contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx) - - for db_path, db_contexts in contexts_by_db.items(): - try: - with closing(sqlite3.connect(db_path)) as conn: - db_entries, db_failures = _collect_search_entries( - conn, - db_contexts, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(db_entries) - failures.extend(db_failures) - except Exception as e: - failures.extend(f"{table_ctx['display_name']}: {e}" for table_ctx in db_contexts) - - return collected, failures - - -def _load_search_contexts_from_db(conn, db_path, names): - tables = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'" - ).fetchall() - - table_to_username = {} - try: - for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall(): - if not user_name: - continue - table_hash = hashlib.md5(user_name.encode()).hexdigest() - table_to_username[f"Msg_{table_hash}"] = user_name - except sqlite3.Error: - pass - - contexts = [] - for (table_name,) in tables: - username = table_to_username.get(table_name, '') - display_name = names.get(username, username) if username else table_name - contexts.append({ - 'query': display_name, - 'username': username, - 'display_name': display_name, - 'db_path': db_path, - 'table_name': table_name, - 'is_group': '@chatroom' in username, - }) - return contexts - - -def _collect_search_entries(conn, contexts, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): - collected = [] - failures = [] - id_to_username = _load_name2id_maps(conn) - batch_size = _message_query_batch_size(candidate_limit) - - for ctx in contexts: - try: - fetch_offset = 0 - collected_before_table = len(collected) - # 全局分页只需要每个分表最新的 offset+limit 条有效命中,无需把整表命中读进内存。 - while len(collected) - collected_before_table < candidate_limit: - rows = _query_messages( - conn, - ctx['table_name'], - start_ts=start_ts, - end_ts=end_ts, - keyword=keyword, - limit=batch_size, - offset=fetch_offset, - ) - if not rows: - break - fetch_offset += len(rows) - - for row in rows: - formatted = _build_search_entry(row, ctx, names, id_to_username) - if formatted: - collected.append(formatted) - if len(collected) - collected_before_table >= candidate_limit: - break - - if len(rows) < batch_size: - break - except Exception as e: - failures.append(f"{ctx['display_name']}: {e}") - - return collected, failures - - -def _page_search_entries(entries, limit, offset): - return _page_ranked_entries(entries, limit, offset) - - -def _search_single_chat(ctx, keyword, start_ts, end_ts, start_time, end_time, limit, offset): - names = get_contact_names() - candidate_limit = _candidate_page_size(limit, offset) - - entries, failures = _collect_chat_search_entries( - ctx, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - - paged = _page_search_entries(entries, limit, offset) - - if not paged: - if failures: - return "查询失败: " + ";".join(failures) - return f"未在 {ctx['display_name']} 中找到包含 \"{keyword}\" 的消息" - - header = f"在 {ctx['display_name']} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) - - -def _search_multiple_chats(chat_names, keyword, start_ts, end_ts, start_time, end_time, limit, offset): - try: - resolved_contexts, unresolved, missing_tables = _resolve_chat_contexts(chat_names) - except ValueError as e: - return f"错误: {e}" - - if not resolved_contexts: - details = [] - if unresolved: - details.append("未找到联系人: " + "、".join(unresolved)) - if missing_tables: - details.append("无消息表: " + "、".join(missing_tables)) - suffix = f"\n{chr(10).join(details)}" if details else "" - return f"错误: 没有可查询的聊天对象{suffix}" - - names = get_contact_names() - candidate_limit = _candidate_page_size(limit, offset) - collected = [] - failures = [] - for ctx in resolved_contexts: - chat_entries, chat_failures = _collect_chat_search_entries( - ctx, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(chat_entries) - failures.extend(chat_failures) - - paged = _page_search_entries(collected, limit, offset) - - notes = [] - if unresolved: - notes.append("未找到联系人: " + "、".join(unresolved)) - if missing_tables: - notes.append("无消息表: " + "、".join(missing_tables)) - if failures: - notes.append("查询失败: " + ";".join(failures)) - - if not paged: - header = f"在 {len(resolved_contexts)} 个聊天对象中未找到包含 \"{keyword}\" 的消息" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if notes: - header += "\n" + "\n".join(notes) - return header - - header = ( - f"在 {len(resolved_contexts)} 个聊天对象中搜索 \"{keyword}\" 找到 {len(paged)} 条结果" - f"(offset={offset}, limit={limit})" - ) - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if notes: - header += "\n" + "\n".join(notes) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) - - -def _search_all_messages(keyword, start_ts, end_ts, start_time, end_time, limit, offset): - names = get_contact_names() - collected = [] - failures = [] - candidate_limit = _candidate_page_size(limit, offset) - - for rel_key in MSG_DB_KEYS: - path = _cache.get(rel_key) - if not path: - continue - - try: - with closing(sqlite3.connect(path)) as conn: - contexts = _load_search_contexts_from_db(conn, path, names) - db_entries, db_failures = _collect_search_entries( - conn, - contexts, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(db_entries) - failures.extend(db_failures) - except Exception as e: - failures.append(f"{rel_key}: {e}") - - paged = _page_search_entries(collected, limit, offset) - - if not paged: - header = f"未找到包含 \"{keyword}\" 的消息" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header - - header = f"搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + entry += f" {text}" + return create_time, entry + + +def _build_history_line(row, ctx, names, id_to_username): + local_id, local_type, create_time, real_sender_id, content, ct = row + time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') + content = _decompress_content(content, ct) + if content is None: + content = '(无法解压)' + + sender, text = _format_message_text( + local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names + ) + + sender_label = _resolve_sender_label( + real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username + ) + if sender_label: + return create_time, f'[{time_str}] {sender_label}: {text}' + return create_time, f'[{time_str}] {text}' + + +def _get_chat_message_tables(ctx): + if ctx.get('message_tables'): + return ctx['message_tables'] + if ctx.get('db_path') and ctx.get('table_name'): + return [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}] + return [] + + +def _iter_table_contexts(ctx): + for table in _get_chat_message_tables(ctx): + yield { + 'query': ctx['query'], + 'username': ctx['username'], + 'display_name': ctx['display_name'], + 'db_path': table['db_path'], + 'table_name': table['table_name'], + 'is_group': ctx['is_group'], + } + + +def _candidate_page_size(limit, offset): + return limit + offset + + +def _message_query_batch_size(candidate_limit): + return candidate_limit + + +def _history_query_batch_size(candidate_limit): + return min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) + + +def _page_ranked_entries(entries, limit, offset): + ordered = sorted(entries, key=lambda item: item[0], reverse=True) + paged = ordered[offset:offset + limit] + paged.sort(key=lambda item: item[0]) + return paged + + +def _collect_chat_history_lines(ctx, names, start_ts=None, end_ts=None, limit=20, offset=0): + collected = [] + failures = [] + candidate_limit = _candidate_page_size(limit, offset) + batch_size = _history_query_batch_size(candidate_limit) + + for table_ctx in _iter_table_contexts(ctx): + try: + with closing(sqlite3.connect(table_ctx['db_path'])) as conn: + id_to_username = _load_name2id_maps(conn) + fetch_offset = 0 + collected_before_table = len(collected) + # 当前页上的消息一定落在各分表最近的 offset+limit 条记录内。 + while len(collected) - collected_before_table < candidate_limit: + rows = _query_messages( + conn, + table_ctx['table_name'], + start_ts=start_ts, + end_ts=end_ts, + limit=batch_size, + offset=fetch_offset, + ) + if not rows: + break + fetch_offset += len(rows) + + for row in rows: + try: + collected.append(_build_history_line(row, table_ctx, names, id_to_username)) + except Exception as e: + failures.append( + f"{table_ctx['display_name']} local_id={row[0]} create_time={row[2]}: {e}" + ) + if len(collected) - collected_before_table >= candidate_limit: + break + + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{table_ctx['db_path']}: {e}") + + paged = _page_ranked_entries(collected, limit, offset) + return [line for _, line in paged], failures + + +def _collect_chat_search_entries(ctx, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): + collected = [] + failures = [] + contexts_by_db = {} + for table_ctx in _iter_table_contexts(ctx): + contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx) + + for db_path, db_contexts in contexts_by_db.items(): + try: + with closing(sqlite3.connect(db_path)) as conn: + db_entries, db_failures = _collect_search_entries( + conn, + db_contexts, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.extend(f"{table_ctx['display_name']}: {e}" for table_ctx in db_contexts) + + return collected, failures + + +def _load_search_contexts_from_db(conn, db_path, names): + tables = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'" + ).fetchall() + + table_to_username = {} + try: + for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall(): + if not user_name: + continue + table_hash = hashlib.md5(user_name.encode()).hexdigest() + table_to_username[f"Msg_{table_hash}"] = user_name + except sqlite3.Error: + pass + + contexts = [] + for (table_name,) in tables: + username = table_to_username.get(table_name, '') + display_name = names.get(username, username) if username else table_name + contexts.append({ + 'query': display_name, + 'username': username, + 'display_name': display_name, + 'db_path': db_path, + 'table_name': table_name, + 'is_group': '@chatroom' in username, + }) + return contexts + + +def _collect_search_entries(conn, contexts, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): + collected = [] + failures = [] + id_to_username = _load_name2id_maps(conn) + batch_size = _message_query_batch_size(candidate_limit) + + for ctx in contexts: + try: + fetch_offset = 0 + collected_before_table = len(collected) + # 全局分页只需要每个分表最新的 offset+limit 条有效命中,无需把整表命中读进内存。 + while len(collected) - collected_before_table < candidate_limit: + rows = _query_messages( + conn, + ctx['table_name'], + start_ts=start_ts, + end_ts=end_ts, + keyword=keyword, + limit=batch_size, + offset=fetch_offset, + ) + if not rows: + break + fetch_offset += len(rows) + + for row in rows: + formatted = _build_search_entry(row, ctx, names, id_to_username) + if formatted: + collected.append(formatted) + if len(collected) - collected_before_table >= candidate_limit: + break + + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{ctx['display_name']}: {e}") + + return collected, failures + + +def _page_search_entries(entries, limit, offset): + return _page_ranked_entries(entries, limit, offset) + + +def _search_single_chat(ctx, keyword, start_ts, end_ts, start_time, end_time, limit, offset): + names = get_contact_names() + candidate_limit = _candidate_page_size(limit, offset) + + entries, failures = _collect_chat_search_entries( + ctx, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + + paged = _page_search_entries(entries, limit, offset) + + if not paged: + if failures: + return "查询失败: " + ";".join(failures) + return f"未在 {ctx['display_name']} 中找到包含 \"{keyword}\" 的消息" + + header = f"在 {ctx['display_name']} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + + +def _search_multiple_chats(chat_names, keyword, start_ts, end_ts, start_time, end_time, limit, offset): + try: + resolved_contexts, unresolved, missing_tables = _resolve_chat_contexts(chat_names) + except ValueError as e: + return f"错误: {e}" + + if not resolved_contexts: + details = [] + if unresolved: + details.append("未找到联系人: " + "、".join(unresolved)) + if missing_tables: + details.append("无消息表: " + "、".join(missing_tables)) + suffix = f"\n{chr(10).join(details)}" if details else "" + return f"错误: 没有可查询的聊天对象{suffix}" + + names = get_contact_names() + candidate_limit = _candidate_page_size(limit, offset) + collected = [] + failures = [] + for ctx in resolved_contexts: + chat_entries, chat_failures = _collect_chat_search_entries( + ctx, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(chat_entries) + failures.extend(chat_failures) + + paged = _page_search_entries(collected, limit, offset) + + notes = [] + if unresolved: + notes.append("未找到联系人: " + "、".join(unresolved)) + if missing_tables: + notes.append("无消息表: " + "、".join(missing_tables)) + if failures: + notes.append("查询失败: " + ";".join(failures)) + + if not paged: + header = f"在 {len(resolved_contexts)} 个聊天对象中未找到包含 \"{keyword}\" 的消息" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if notes: + header += "\n" + "\n".join(notes) + return header + + header = ( + f"在 {len(resolved_contexts)} 个聊天对象中搜索 \"{keyword}\" 找到 {len(paged)} 条结果" + f"(offset={offset}, limit={limit})" + ) + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if notes: + header += "\n" + "\n".join(notes) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + + +def _search_all_messages(keyword, start_ts, end_ts, start_time, end_time, limit, offset): + names = get_contact_names() + collected = [] + failures = [] + candidate_limit = _candidate_page_size(limit, offset) + + for rel_key in MSG_DB_KEYS: + path = _cache.get(rel_key) + if not path: + continue + + try: + with closing(sqlite3.connect(path)) as conn: + contexts = _load_search_contexts_from_db(conn, path, names) + db_entries, db_failures = _collect_search_entries( + conn, + contexts, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.append(f"{rel_key}: {e}") + + paged = _page_search_entries(collected, limit, offset) + + if not paged: + header = f"未找到包含 \"{keyword}\" 的消息" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + + header = f"搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) # ============ MCP Server ============ @@ -1179,7 +1179,7 @@ def _search_all_messages(keyword, start_ts, end_ts, start_time, end_time, limit, @mcp.tool() -def get_recent_sessions(limit: int = 20) -> str: +def get_recent_sessions(limit: int = 20) -> str: """获取微信最近会话列表,包含最新消息摘要、未读数、时间等。 用于了解最近有哪些人/群在聊天。 @@ -1191,15 +1191,15 @@ def get_recent_sessions(limit: int = 20) -> str: return "错误: 无法解密 session.db" names = get_contact_names() - with closing(sqlite3.connect(path)) as conn: - rows = conn.execute(""" - SELECT username, unread_count, summary, last_timestamp, - last_msg_type, last_msg_sender, last_sender_display_name - FROM SessionTable - WHERE last_timestamp > 0 - ORDER BY last_timestamp DESC - LIMIT ? - """, (limit,)).fetchall() + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + LIMIT ? + """, (limit,)).fetchall() results = [] for r in rows: @@ -1237,21 +1237,21 @@ def get_recent_sessions(limit: int = 20) -> str: @mcp.tool() -def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_time: str = "", end_time: str = "") -> str: - """获取指定聊天的消息记录。 - - Args: - chat_name: 聊天对象的名字、备注名或wxid,自动模糊匹配 - limit: 返回的消息数量,默认50;支持较大的值,建议配合 offset 分页使用 - offset: 分页偏移量,默认0 - start_time: 起始时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS - end_time: 结束时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS - """ - try: - _validate_pagination(limit, offset, limit_max=None) - start_ts, end_ts = _parse_time_range(start_time, end_time) - except ValueError as e: - return f"错误: {e}" +def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_time: str = "", end_time: str = "") -> str: + """获取指定聊天的消息记录。 + + Args: + chat_name: 聊天对象的名字、备注名或wxid,自动模糊匹配 + limit: 返回的消息数量,默认50;支持较大的值,建议配合 offset 分页使用 + offset: 分页偏移量,默认0 + start_time: 起始时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS + end_time: 结束时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS + """ + try: + _validate_pagination(limit, offset, limit_max=None) + start_ts, end_ts = _parse_time_range(start_time, end_time) + except ValueError as e: + return f"错误: {e}" ctx = _resolve_chat_context(chat_name) if not ctx: @@ -1259,102 +1259,102 @@ def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_tim if not ctx['db_path']: return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" - names = get_contact_names() - lines, failures = _collect_chat_history_lines( - ctx, - names, - start_ts=start_ts, - end_ts=end_ts, - limit=limit, - offset=offset, - ) - - if not lines: - if failures: - return "查询失败: " + ";".join(failures) - return f"{ctx['display_name']} 无消息记录" - - header = f"{ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" - if ctx['is_group']: - header += " [群聊]" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n".join(lines) - - -@mcp.tool() -def search_messages( - keyword: str, - chat_name: str | list[str] | None = None, - start_time: str = "", - end_time: str = "", - limit: int = 20, - offset: int = 0, -) -> str: - """搜索消息内容,支持全库、单个聊天对象、多个聊天对象,以及时间范围和分页。 - - Args: - keyword: 搜索关键词 - chat_name: 聊天对象名称,可为空、单个字符串或字符串列表 - start_time: 起始时间,可为空 - end_time: 结束时间,可为空 - limit: 返回的结果数量,默认20,最大500 - offset: 分页偏移量,默认0 - """ - if not keyword or len(keyword) < 1: - return "请提供搜索关键词" - - chat_names = _normalize_chat_names(chat_name) - - try: - _validate_pagination(limit, offset) - start_ts, end_ts = _parse_time_range(start_time, end_time) - except ValueError as e: - return f"错误: {e}" - - if len(chat_names) == 1: - ctx = _resolve_chat_context(chat_names[0]) - if not ctx: - return f"找不到聊天对象: {chat_names[0]}\n提示: 可以用 get_contacts(query='{chat_names[0]}') 搜索联系人" - if not ctx['db_path']: - return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" - return _search_single_chat( - ctx, - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) - - if len(chat_names) > 1: - return _search_multiple_chats( - chat_names, - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) - - return _search_all_messages( - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) - -@mcp.tool() -def get_contacts(query: str = "", limit: int = 50) -> str: + names = get_contact_names() + lines, failures = _collect_chat_history_lines( + ctx, + names, + start_ts=start_ts, + end_ts=end_ts, + limit=limit, + offset=offset, + ) + + if not lines: + if failures: + return "查询失败: " + ";".join(failures) + return f"{ctx['display_name']} 无消息记录" + + header = f"{ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" + if ctx['is_group']: + header += " [群聊]" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n".join(lines) + + +@mcp.tool() +def search_messages( + keyword: str, + chat_name: str | list[str] | None = None, + start_time: str = "", + end_time: str = "", + limit: int = 20, + offset: int = 0, +) -> str: + """搜索消息内容,支持全库、单个聊天对象、多个聊天对象,以及时间范围和分页。 + + Args: + keyword: 搜索关键词 + chat_name: 聊天对象名称,可为空、单个字符串或字符串列表 + start_time: 起始时间,可为空 + end_time: 结束时间,可为空 + limit: 返回的结果数量,默认20,最大500 + offset: 分页偏移量,默认0 + """ + if not keyword or len(keyword) < 1: + return "请提供搜索关键词" + + chat_names = _normalize_chat_names(chat_name) + + try: + _validate_pagination(limit, offset) + start_ts, end_ts = _parse_time_range(start_time, end_time) + except ValueError as e: + return f"错误: {e}" + + if len(chat_names) == 1: + ctx = _resolve_chat_context(chat_names[0]) + if not ctx: + return f"找不到聊天对象: {chat_names[0]}\n提示: 可以用 get_contacts(query='{chat_names[0]}') 搜索联系人" + if not ctx['db_path']: + return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" + return _search_single_chat( + ctx, + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + + if len(chat_names) > 1: + return _search_multiple_chats( + chat_names, + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + + return _search_all_messages( + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + +@mcp.tool() +def get_contacts(query: str = "", limit: int = 50) -> str: """搜索或列出微信联系人。 Args: @@ -1397,7 +1397,7 @@ def get_contacts(query: str = "", limit: int = 50) -> str: @mcp.tool() -def get_new_messages() -> str: +def get_new_messages() -> str: """获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" global _last_check_state @@ -1406,14 +1406,14 @@ def get_new_messages() -> str: return "错误: 无法解密 session.db" names = get_contact_names() - with closing(sqlite3.connect(path)) as conn: - rows = conn.execute(""" - SELECT username, unread_count, summary, last_timestamp, - last_msg_type, last_msg_sender, last_sender_display_name - FROM SessionTable - WHERE last_timestamp > 0 - ORDER BY last_timestamp DESC - """).fetchall() + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + """).fetchall() curr_state = {} for r in rows: diff --git a/monitor.py b/monitor.py index 0169b2d..81a3da5 100644 --- a/monitor.py +++ b/monitor.py @@ -7,9 +7,9 @@ import hashlib, struct, os, sys, json, time, sqlite3, io import hmac as hmac_mod from datetime import datetime -from Crypto.Cipher import AES -import zstandard as zstd -from key_utils import get_key_info, strip_key_metadata +from Crypto.Cipher import AES +import zstandard as zstd +from key_utils import get_key_info, strip_key_metadata _zstd_dctx = zstd.ZstdDecompressor() @@ -149,13 +149,13 @@ def main(): print("=" * 60) # 加载密钥 - with open(KEYS_FILE) as f: - keys = strip_key_metadata(json.load(f)) - - session_key_info = get_key_info(keys, os.path.join("session", "session.db")) - if not session_key_info: - print("[ERROR] 找不到session.db的密钥") - sys.exit(1) + with open(KEYS_FILE, "r", encoding="utf-8") as f: + keys = strip_key_metadata(json.load(f)) + + session_key_info = get_key_info(keys, os.path.join("session", "session.db")) + if not session_key_info: + print("[ERROR] 找不到session.db的密钥") + sys.exit(1) enc_key = bytes.fromhex(session_key_info["enc_key"]) session_db = os.path.join(DB_DIR, "session", "session.db") diff --git a/monitor_web.py b/monitor_web.py index e61ac86..7e2b81e 100644 --- a/monitor_web.py +++ b/monitor_web.py @@ -1883,7 +1883,7 @@ def main(): print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True) print("=" * 60, flush=True) - with open(KEYS_FILE) as f: + with open(KEYS_FILE, "r", encoding="utf-8") as f: keys = strip_key_metadata(json.load(f)) session_key_info = get_key_info(keys, os.path.join("session", "session.db"))