-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclash_node.py
More file actions
executable file
·192 lines (164 loc) · 5.94 KB
/
clash_node.py
File metadata and controls
executable file
·192 lines (164 loc) · 5.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
"""Interactive Clash node selector"""
import json
import os
import re
import shutil
import sys
import termios
import tty
import urllib.request
import urllib.parse
import urllib.error
CLASH_DIR = os.path.expanduser("~/.clash")
CONFIG_PATH = os.path.join(CLASH_DIR, "config.yaml")
def read_config():
"""Read API settings from config.yaml."""
controller = "127.0.0.1:9090"
secret = ""
try:
with open(CONFIG_PATH) as f:
for line in f:
line = line.strip()
m = re.match(r"external-controller:\s*['\"]?(.+?)['\"]?\s*$", line)
if m:
val = m.group(1)
# Accept either "11314" or "127.0.0.1:11314".
controller = f"127.0.0.1:{val}" if val.isdigit() else val
m = re.match(r"secret:\s*['\"]?(.+?)['\"]?\s*$", line)
if m:
secret = m.group(1)
except FileNotFoundError:
pass
return controller, secret
def api(controller, secret, path, method="GET", data=None):
headers = {}
if secret:
headers["Authorization"] = f"Bearer {secret}"
if data:
headers["Content-Type"] = "application/json"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(
f"http://{controller}{path}", data=body, headers=headers, method=method
)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read()) if method == "GET" else resp.status
except urllib.error.URLError as e:
print(f"\033[31m连接 Clash API 失败: {e}\033[0m")
print("请确认 Clash 已启动 (clash_on)")
sys.exit(1)
def pick(prompt, items):
choice = input(prompt).strip()
if choice.lower() == "q":
sys.exit(0)
try:
return items[int(choice) - 1]
except (ValueError, IndexError):
print("无效输入")
sys.exit(1)
def read_key():
"""Read a single keypress from stdin."""
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
first = sys.stdin.read(1)
if first == "\x1b":
second = sys.stdin.read(1)
if second == "[":
third = sys.stdin.read(1)
return f"\x1b[{third}"
return first + second
return first
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def render_menu(title, items, index, formatter, hint):
height = shutil.get_terminal_size((80, 24)).lines
window = max(5, height - 6)
start = max(0, min(index - window // 2, max(0, len(items) - window)))
end = min(len(items), start + window)
lines = [f"\033[36m{title}\033[0m", hint, ""]
for i in range(start, end):
prefix = "❯" if i == index else " "
style = "\033[32m" if i == index else ""
reset = "\033[0m" if i == index else ""
lines.append(f"{style}{prefix} {formatter(i, items[i])}{reset}")
return "\n".join(lines)
def enter_alt_screen():
if sys.stdout.isatty():
print("\033[?1049h\033[?25l", end="", flush=True)
def leave_alt_screen():
if sys.stdout.isatty():
print("\033[?25h\033[?1049l", end="", flush=True)
def pick_with_cursor(title, items, formatter, initial=0):
if not sys.stdin.isatty() or not sys.stdout.isatty():
print(title)
for i, item in enumerate(items, 1):
print(f" {formatter(i - 1, item)}")
return pick(f"\n选择 (1-{len(items)}), q 退出: ", items)
index = max(0, min(initial, len(items) - 1))
hint = "↑/↓ 或 j/k 移动,Enter 确认,q 退出"
enter_alt_screen()
try:
while True:
print("\033[H\033[J" + render_menu(title, items, index, formatter, hint), end="", flush=True)
key = read_key()
if key in ("\x1b[A", "k"):
index = (index - 1) % len(items)
elif key in ("\x1b[B", "j"):
index = (index + 1) % len(items)
elif key in ("\r", "\n"):
return items[index]
elif key.lower() == "q":
sys.exit(0)
elif key.isdigit():
digits = [key]
while True:
next_key = read_key()
if next_key in ("\r", "\n"):
break
if not next_key.isdigit():
digits = []
break
digits.append(next_key)
if digits:
selected = int("".join(digits)) - 1
if 0 <= selected < len(items):
return items[selected]
finally:
leave_alt_screen()
def main():
controller, secret = read_config()
data = api(controller, secret, "/proxies")
proxies = data.get("proxies", {})
groups = [(name, info) for name, info in proxies.items()
if info.get("type") == "Selector"]
if not groups:
print("没有找到可切换的代理组")
return
# Select group
group_name, group_info = pick_with_cursor(
"代理组",
groups,
lambda i, item: f"{i + 1}. {item[0]} [{item[1].get('now', 'N/A')}]",
)
# Select node
nodes = group_info.get("all", [])
current = group_info.get("now", "")
current_index = nodes.index(current) if current in nodes else 0
node_name = pick_with_cursor(
f"{group_name} 节点列表",
nodes,
lambda i, item: f"{i + 1}. {item}" + (" <- 当前" if item == current else ""),
initial=current_index,
)
# Switch
path = f"/proxies/{urllib.parse.quote(group_name, safe='')}"
status = api(controller, secret, path, method="PUT", data={"name": node_name})
if status in (200, 204):
print(f"\n\033[32m[✓] 已切换: {group_name} -> {node_name}\033[0m")
else:
print(f"\n\033[31m[✗] 切换失败 (HTTP {status})\033[0m")
if __name__ == "__main__":
main()