diff --git a/debug_caps.py b/debug_caps.py new file mode 100644 index 0000000..ab96f68 --- /dev/null +++ b/debug_caps.py @@ -0,0 +1,37 @@ + +import os +import shutil +import subprocess +import sys + +# Locate getcap +getcap = shutil.which("getcap") +if not getcap: + possible_paths = ["/usr/sbin/getcap", "/sbin/getcap"] + for p in possible_paths: + if os.path.exists(p): + getcap = p + break + +print(f"Using getcap: {getcap}") + +# Binaries +bin_dir = os.path.abspath("bin/linux-x86_64") +singbox = os.path.join(bin_dir, "sing-box") +xray = os.path.join(bin_dir, "xray") + +for binary in [singbox, xray]: + print(f"Checking: {binary}") + if os.path.exists(binary): + cmd = [getcap, binary] + try: + res = subprocess.run(cmd, capture_output=True, text=True) + print(f"Result ({res.returncode}): {res.stdout.strip()}") + if "cap_net_admin" in res.stdout: + print("HAS CAP_NET_ADMIN") + else: + print("MISSING CAP_NET_ADMIN") + except Exception as e: + print(f"Error: {e}") + else: + print("Does not exist") diff --git a/debug_tray.py b/debug_tray.py new file mode 100644 index 0000000..c3c84d1 --- /dev/null +++ b/debug_tray.py @@ -0,0 +1,75 @@ + +import sys +# Inject system packages if needed (mimicking main.py) +import os +if sys.platform.startswith("linux"): + system_packages = "/usr/lib/python3/dist-packages" + if os.path.exists(system_packages) and system_packages not in sys.path: + try: + import gi + except ImportError: + sys.path.append(system_packages) + +import pystray +from PIL import Image, ImageDraw + +def create_image(): + # Generate an image with a specific color + width = 64 + height = 64 + color1 = "blue" + color2 = "white" + image = Image.new('RGB', (width, height), color1) + dc = ImageDraw.Draw(image) + dc.rectangle((width // 2, 0, width, height // 2), fill=color2) + dc.rectangle((0, height // 2, width // 2, height), fill=color2) + return image + +def setup(icon): + icon.visible = True + +def on_quit(icon, item): + icon.stop() + +def main(): + print(f"Python: {sys.version}") + + try: + import gi + print(f"PyGObject (gi) imported: {gi.__file__}") + try: + gi.require_version('AppIndicator3', '0.1') + from gi.repository import AppIndicator3 + print("AppIndicator3 found via gi") + except Exception as e: + print(f"AppIndicator3 check failed: {e}") + + try: + gi.require_version('AyatanaAppIndicator3', '0.1') + from gi.repository import AyatanaAppIndicator3 + print("AyatanaAppIndicator3 found via gi") + except Exception as e: + print(f"AyatanaAppIndicator3 check failed: {e}") + + except ImportError: + print("PyGObject (gi) NOT found") + + image = create_image() + + # Try to force AppIndicator backend explicitly to see if it works validation + # os.environ['PYSTRAY_BACKEND'] = 'appindicator' + + print("Creating icon...") + icon = pystray.Icon( + 'test_icon', + image, + menu=pystray.Menu( + pystray.MenuItem('Quit', on_quit) + ) + ) + + print("Running icon... (Check your system tray)") + icon.run(setup) + +if __name__ == "__main__": + main() diff --git a/scripts/download_binaries.py b/scripts/download_binaries.py index b3b692f..a76122d 100644 --- a/scripts/download_binaries.py +++ b/scripts/download_binaries.py @@ -1,22 +1,27 @@ #!/usr/bin/env python3 """ -Download Xray and Singbox binaries for Windows from GitHub releases. +Download Xray and Singbox binaries from GitHub releases. This script: 1. Reads versions from .env file 2. Downloads from official GitHub releases -3. Extracts to bin/ folder -4. Supports both 32-bit and 64-bit architectures -5. Ensures Windows 7+ compatibility +3. Extracts to bin/ folder with platform-specific subdirectories +4. Supports Windows (32/64-bit) and Linux (x86_64/arm64) +5. Sets executable permissions on Linux/macOS Usage: - python scripts/download_binaries.py - python scripts/download_binaries.py --arch 32 + python scripts/download_binaries.py # Auto-detect platform + python scripts/download_binaries.py --platform linux # Force Linux + python scripts/download_binaries.py --platform windows --arch 32 """ import argparse import os +import platform +import shutil +import stat import sys +import tarfile import urllib.request import zipfile from pathlib import Path @@ -31,27 +36,86 @@ load_dotenv(PROJECT_ROOT / ".env") # Configuration -BIN_DIR = PROJECT_ROOT / "bin" TEMP_DIR = PROJECT_ROOT / "temp_downloads" -# GitHub release URLs -XRAY_URL_TEMPLATE = "https://github.com/XTLS/Xray-core/releases/download/v{version}/Xray-windows-{arch}.zip" -SINGBOX_URL_TEMPLATE = ( +# GitHub release URLs - Windows +XRAY_WINDOWS_URL = "https://github.com/XTLS/Xray-core/releases/download/v{version}/Xray-windows-{arch}.zip" +SINGBOX_WINDOWS_URL = ( "https://github.com/SagerNet/sing-box/releases/download/v{version}/sing-box-{version}-windows-amd{arch}.zip" ) +# GitHub release URLs - Linux +XRAY_LINUX_X64_URL = "https://github.com/XTLS/Xray-core/releases/download/v{version}/Xray-linux-64.zip" +XRAY_LINUX_ARM64_URL = "https://github.com/XTLS/Xray-core/releases/download/v{version}/Xray-linux-arm64-v8a.zip" +SINGBOX_LINUX_X64_URL = ( + "https://github.com/SagerNet/sing-box/releases/download/v{version}/sing-box-{version}-linux-amd64.tar.gz" +) +SINGBOX_LINUX_ARM64_URL = ( + "https://github.com/SagerNet/sing-box/releases/download/v{version}/sing-box-{version}-linux-arm64.tar.gz" +) + + +def detect_platform() -> str: + """Detect current platform.""" + system = platform.system().lower() + if system == "windows" or os.name == "nt": + return "windows" + elif system == "darwin": + return "macos" + else: + return "linux" -def get_config(arch: int = 64): + +def detect_architecture() -> str: + """Detect CPU architecture.""" + machine = platform.machine().lower() + if machine in ("amd64", "x86_64", "x64"): + return "x86_64" + elif machine in ("arm64", "aarch64", "arm64-v8a"): + return "arm64" + elif machine in ("i386", "i686", "x86"): + return "x86" + return "x86_64" # Default + + +def get_config(target_platform: str, arch: str): """Get configuration from environment.""" - xray_version = os.getenv("XRAY_VERSION", "1.8.24") - singbox_version = os.getenv("SINGBOX_VERSION", "1.10.6") + xray_version = os.getenv("XRAY_VERSION", "25.12.8") + singbox_version = os.getenv("SINGBOX_VERSION", "1.12.14") + + # Determine bin directory based on platform + if target_platform == "windows": + bin_dir = PROJECT_ROOT / "bin" + xray_url = XRAY_WINDOWS_URL.format(version=xray_version, arch=arch) + singbox_url = SINGBOX_WINDOWS_URL.format(version=singbox_version, arch=arch) + xray_exe = "xray.exe" + singbox_exe = "sing-box.exe" + elif target_platform == "linux": + # Platform-specific bin directory + arch_dir = "linux-x86_64" if arch == "x86_64" else "linux-arm64" + bin_dir = PROJECT_ROOT / "bin" / arch_dir + + if arch == "arm64": + xray_url = XRAY_LINUX_ARM64_URL.format(version=xray_version) + singbox_url = SINGBOX_LINUX_ARM64_URL.format(version=singbox_version) + else: + xray_url = XRAY_LINUX_X64_URL.format(version=xray_version) + singbox_url = SINGBOX_LINUX_X64_URL.format(version=singbox_version) + xray_exe = "xray" + singbox_exe = "sing-box" + else: + raise ValueError(f"Unsupported platform: {target_platform}") return { "xray_version": xray_version, "singbox_version": singbox_version, + "platform": target_platform, "arch": arch, - "xray_url": XRAY_URL_TEMPLATE.format(version=xray_version, arch=arch), - "singbox_url": SINGBOX_URL_TEMPLATE.format(version=singbox_version, arch=arch), + "bin_dir": bin_dir, + "xray_url": xray_url, + "singbox_url": singbox_url, + "xray_exe": xray_exe, + "singbox_exe": singbox_exe, } @@ -72,15 +136,18 @@ def progress(block_num, block_size, total_size): return dest -def extract_zip(zip_path: Path, extract_dir: Path, executable_name: str): - """Extract specific executable from zip.""" +def extract_zip(zip_path: Path, extract_dir: Path, executable_name: str, extra_files: list = None): + """Extract executable and optional extra files from zip.""" print(f" [EXTRACT] {zip_path.name}") + if extra_files is None: + extra_files = [] + with zipfile.ZipFile(zip_path, "r") as z: - # Find the executable + # Find and extract the executable exe_found = False for file in z.namelist(): - if file.endswith(executable_name): + if file.endswith(executable_name) or file == executable_name: # Extract to bin directory z.extract(file, extract_dir) @@ -91,6 +158,58 @@ def extract_zip(zip_path: Path, extract_dir: Path, executable_name: str): if extracted_path != final_path: extracted_path.replace(final_path) + # Set executable permission on Unix + if os.name != "nt": + st = os.stat(final_path) + os.chmod(final_path, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) + + print(f" [OK] Extracted {executable_name}") + exe_found = True + break + + if not exe_found: + print(f" [WARNING] {executable_name} not found in archive!") + + # Extract extra files (like geoip.dat, geosite.dat) + for extra_file in extra_files: + for file in z.namelist(): + if file.endswith(extra_file) or file == extra_file: + z.extract(file, extract_dir) + extracted_path = extract_dir / file + final_path = extract_dir / extra_file + if extracted_path != final_path: + extracted_path.replace(final_path) + print(f" [OK] Extracted {extra_file}") + break + + +def extract_tarball(tar_path: Path, extract_dir: Path, executable_name: str): + """Extract specific executable from tar.gz.""" + print(f" [EXTRACT] {tar_path.name}") + + with tarfile.open(tar_path, "r:gz") as tar: + # Find the executable + exe_found = False + for member in tar.getmembers(): + if member.name.endswith(executable_name) or os.path.basename(member.name) == executable_name: + # Extract to temp location + tar.extract(member, extract_dir) + + # Move to root of bin dir + extracted_path = extract_dir / member.name + final_path = extract_dir / executable_name + + if extracted_path != final_path: + shutil.move(str(extracted_path), str(final_path)) + # Clean up nested directories + nested_dir = extract_dir / member.name.split("/")[0] + if nested_dir.exists() and nested_dir.is_dir(): + shutil.rmtree(nested_dir, ignore_errors=True) + + # Set executable permission + st = os.stat(final_path) + os.chmod(final_path, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) + print(f" [OK] Extracted {executable_name}") exe_found = True break @@ -101,8 +220,6 @@ def extract_zip(zip_path: Path, extract_dir: Path, executable_name: str): def cleanup(temp_dir: Path): """Remove temporary files.""" - import shutil - if temp_dir.exists(): print(f"\n[CLEANUP] Removing {temp_dir}") shutil.rmtree(temp_dir) @@ -110,45 +227,102 @@ def cleanup(temp_dir: Path): def main(): parser = argparse.ArgumentParser(description="Download Xray and Singbox binaries") - parser.add_argument("--arch", type=int, choices=[32, 64], default=64, help="Architecture: 32 or 64 (default: 64)") + parser.add_argument( + "--platform", + type=str, + choices=["windows", "linux"], + default=None, + help="Target platform (default: auto-detect)", + ) + parser.add_argument( + "--arch", + type=str, + choices=["32", "64", "x86_64", "arm64"], + default=None, + help="Architecture (default: auto-detect)", + ) args = parser.parse_args() + # Auto-detect platform and architecture if not specified + target_platform = args.platform or detect_platform() + + if args.arch: + if args.arch in ("64", "x86_64"): + arch = "x86_64" + elif args.arch == "arm64": + arch = "arm64" + else: + arch = args.arch # "32" for Windows + else: + arch = detect_architecture() + + # For Windows, convert architecture to 32/64 format + if target_platform == "windows": + if arch in ("x86_64", "64"): + arch = "64" + elif arch in ("x86", "32"): + arch = "32" + else: + arch = "64" + print("=" * 60) print("Downloading Xray and Singbox Binaries") print("=" * 60) # Get configuration - config = get_config(args.arch) + config = get_config(target_platform, arch) - print(f"\nConfiguration:") # noqa: F541 + print(f"\nConfiguration:") print(f" Xray Version: {config['xray_version']}") print(f" Singbox Version: {config['singbox_version']}") - print(f" Architecture: {config['arch']}-bit") + print(f" Platform: {config['platform']}") + print(f" Architecture: {config['arch']}") + print(f" Bin Directory: {config['bin_dir']}") # Create directories TEMP_DIR.mkdir(parents=True, exist_ok=True) - BIN_DIR.mkdir(parents=True, exist_ok=True) + config["bin_dir"].mkdir(parents=True, exist_ok=True) try: # Download Xray print("\n[STEP 1] Downloading Xray...") - xray_zip = TEMP_DIR / f"xray-{config['xray_version']}.zip" - download_file(config["xray_url"], xray_zip) - extract_zip(xray_zip, BIN_DIR, "xray.exe") + xray_ext = ".zip" # Xray always uses zip + xray_archive = TEMP_DIR / f"xray-{config['xray_version']}{xray_ext}" + download_file(config["xray_url"], xray_archive) + # Extract xray executable and geo data files + extract_zip( + xray_archive, + config["bin_dir"], + config["xray_exe"], + extra_files=["geoip.dat", "geosite.dat"] + ) # Download Singbox print("\n[STEP 2] Downloading Singbox...") - singbox_zip = TEMP_DIR / f"singbox-{config['singbox_version']}.zip" - download_file(config["singbox_url"], singbox_zip) - extract_zip(singbox_zip, BIN_DIR, "sing-box.exe") + if target_platform == "linux": + singbox_archive = TEMP_DIR / f"singbox-{config['singbox_version']}.tar.gz" + download_file(config["singbox_url"], singbox_archive) + extract_tarball(singbox_archive, config["bin_dir"], config["singbox_exe"]) + else: + singbox_archive = TEMP_DIR / f"singbox-{config['singbox_version']}.zip" + download_file(config["singbox_url"], singbox_archive) + extract_zip(singbox_archive, config["bin_dir"], config["singbox_exe"]) print("\n" + "=" * 60) print("DOWNLOAD COMPLETE!") - print(f"Binaries location: {BIN_DIR}") + print(f"Binaries location: {config['bin_dir']}") print("=" * 60) + # List downloaded files + print("\nDownloaded files:") + for f in config["bin_dir"].iterdir(): + size_mb = f.stat().st_size / (1024 * 1024) + print(f" {f.name}: {size_mb:.2f} MB") + except Exception as e: print(f"\n[ERROR] Download failed: {e}") + import traceback + traceback.print_exc() return 1 finally: cleanup(TEMP_DIR) diff --git a/src/core/constants.py b/src/core/constants.py index ba90988..24c0478 100644 --- a/src/core/constants.py +++ b/src/core/constants.py @@ -5,7 +5,7 @@ # Load environment variables from .env file from dotenv import load_dotenv -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils # Load .env from project root _project_root = Path(__file__).parent.parent.parent @@ -28,10 +28,10 @@ PLACEHOLDER_SINGBOX_PID = int(os.getenv("PLACEHOLDER_SINGBOX_PID", "-999997")) # Temporary directory (cross-platform) -if PlatformUtils.get_platform() == "windows": +if PlatformUtils.get_platform() == Platform.WINDOWS: # Windows: Use system temp directory TMPDIR = os.path.join(tempfile.gettempdir(), "xenray") -elif PlatformUtils.get_platform() == "macos": +elif PlatformUtils.get_platform() == Platform.MACOS: # macOS: Use system temp or user cache directory TMPDIR = os.path.join(os.path.expanduser("~/Library/Caches"), "xenray") else: diff --git a/src/main.py b/src/main.py index fb32a3c..abc053d 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,17 @@ if __name__ == "__main__": sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# [Linux Fix] Inject system site-packages to allow access to python3-gi (PyGObject) +# This is required for pystray to use AppIndicator backend on Linux +if sys.platform.startswith("linux"): + system_packages = "/usr/lib/python3/dist-packages" + if os.path.exists(system_packages) and system_packages not in sys.path: + # Check if gi is already available (e.g. key system dependencies) + try: + import gi # noqa: F401 + except ImportError: + sys.path.append(system_packages) + import asyncio from src.core.constants import EARLY_LOG_FILE @@ -105,10 +116,10 @@ def run(): # Import PlatformUtils - works for both script and PyInstaller # PyInstaller bundles these as hidden-imports - from src.utils.platform_utils import PlatformUtils + from src.utils.platform_utils import Platform, PlatformUtils # Platform-specific singleton check - if PlatformUtils.get_platform() == "windows": + if PlatformUtils.get_platform() == Platform.WINDOWS: kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) mutex_name = "Global\\XenRay_Singleton_Mutex_v1" diff --git a/src/services/app_update_service.py b/src/services/app_update_service.py index 786695a..e9d04b9 100644 --- a/src/services/app_update_service.py +++ b/src/services/app_update_service.py @@ -14,7 +14,7 @@ from loguru import logger from src.core.constants import APP_VERSION, GITHUB_REPO, UPDATE_DOWNLOAD_TIMEOUT, UPDATE_MIN_FILE_SIZE -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Architecture, Platform, PlatformUtils # API URL constructed from configurable repo GITHUB_API_URL = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest" @@ -127,14 +127,14 @@ def _find_asset_url(assets: list) -> Optional[str]: arch = PlatformUtils.get_architecture() # Determine asset name pattern - if platform == "windows": - if arch == "x86_64": + if platform == Platform.WINDOWS: + if arch == Architecture.X86_64: pattern = "windows-x64.zip" else: pattern = "windows-x86.zip" - elif platform == "linux": + elif platform == Platform.LINUX: pattern = "linux.AppImage" # Future support - elif platform == "macos": + elif platform == Platform.MACOS: pattern = "macos.dmg" # Future support else: return None diff --git a/src/services/monitoring/service.py b/src/services/monitoring/service.py index 0247526..621a6af 100644 --- a/src/services/monitoring/service.py +++ b/src/services/monitoring/service.py @@ -85,7 +85,7 @@ def __init__( # Create ActiveConnectivityMonitor (metrics-based, VPN mode only) # Emits ACTIVE_LOST, ACTIVE_RESTORED, ACTIVE_DEGRADED signals - metrics_provider = ClashAPIProvider(port=9090) + metrics_provider = ClashAPIProvider(port=9099) self._active_monitor = ActiveConnectivityMonitor( metrics_provider=metrics_provider, on_connectivity_lost=lambda: self._emit_signal(MonitorSignal.ACTIVE_LOST), diff --git a/src/services/singbox_metrics_provider.py b/src/services/singbox_metrics_provider.py index c74f40e..19eb343 100644 --- a/src/services/singbox_metrics_provider.py +++ b/src/services/singbox_metrics_provider.py @@ -37,7 +37,7 @@ class ClashAPIProvider: The /traffic endpoint is streaming and not suitable for polling. """ - DEFAULT_PORT = 9090 + DEFAULT_PORT = 9099 TIMEOUT = 2.0 def __init__(self, port: int = None): @@ -98,7 +98,7 @@ class DebugVarsProvider: Note: Only available if explicitly enabled in sing-box build. """ - DEFAULT_PORT = 9090 + DEFAULT_PORT = 9099 TIMEOUT = 2.0 def __init__(self, port: int = None): diff --git a/src/services/singbox_service.py b/src/services/singbox_service.py index e6f2885..c2dca4b 100644 --- a/src/services/singbox_service.py +++ b/src/services/singbox_service.py @@ -20,7 +20,7 @@ XRAY_EXECUTABLE, ) from src.utils.network_interface import NetworkInterfaceDetector -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils from src.utils.process_utils import ProcessUtils # Constants @@ -116,13 +116,19 @@ def _add_static_route(self, ip: str, gateway: str) -> None: """Add static route for IP via gateway.""" if ip in self._added_routes: return + + # On Linux, if we are not root (even with caps), we cannot use 'ip route' command. + # We must rely on sing-box internal routing (bind_interface/auto_route). + platform = PlatformUtils.get_platform() + if platform == Platform.LINUX and not ProcessUtils.is_admin(): + logger.debug(f"[SingboxService] Non-root user: skipping manual route for {ip} (relying on sing-box auto-route)") + return + try: logger.info(f"[SingboxService] Adding static route: {ip} → {gateway}") # Platform-specific route commands - platform = PlatformUtils.get_platform() - - if platform == "windows": + if platform == Platform.WINDOWS: cmd = [ "route", "add", @@ -133,7 +139,7 @@ def _add_static_route(self, ip: str, gateway: str) -> None: "metric", "1", ] - elif platform == "macos": + elif platform == Platform.MACOS: cmd = [ "route", "-n", @@ -142,7 +148,7 @@ def _add_static_route(self, ip: str, gateway: str) -> None: ip, gateway, ] - else: # Linux + else: # Linux (Root) cmd = [ "ip", "route", @@ -166,14 +172,19 @@ def _cleanup_routes(self) -> None: """Remove all added static routes.""" platform = PlatformUtils.get_platform() + # On Linux non-root, we didn't add routes manually, so nothing to clean up manually. + if platform == Platform.LINUX and not ProcessUtils.is_admin(): + self._added_routes.clear() + return + for ip in self._added_routes[:]: try: logger.debug(f"[SingboxService] Removing static route: {ip}") # Platform-specific route delete commands - if platform == "windows": + if platform == Platform.WINDOWS: cmd = ["route", "delete", ip] - elif platform == "macos": + elif platform == Platform.MACOS: cmd = ["route", "-n", "delete", "-host", ip] else: # Linux cmd = ["ip", "route", "del", ip] @@ -413,7 +424,7 @@ def _generate_config( "log": {"level": "info", "timestamp": True}, "experimental": { "clash_api": { - "external_controller": "127.0.0.1:9090", + "external_controller": "127.0.0.1:9099", "secret": "", } }, @@ -452,7 +463,10 @@ def _generate_config( "mtu": mtu, "auto_route": True, "strict_route": True, - "stack": "system", + # Stack selection by platform: + # - gvisor: Userspace stack, reliable for Linux (handles TCP properly) + # - system: Native kernel stack, fast on Windows/macOS + "stack": "gvisor" if PlatformUtils.get_platform() == Platform.LINUX else "system", "sniff": True, "sniff_override_destination": True, "endpoint_independent_nat": True, @@ -486,10 +500,16 @@ def _generate_config( "outbound": "direct", }, {"process_path": [XRAY_EXECUTABLE], "outbound": "direct"}, + # DNS hijacking - MUST come before private IP bypass { "protocol": "dns", "action": "hijack-dns", }, + # Port 53 fallback for DNS not detected by protocol sniff + { + "port": 53, + "action": "hijack-dns", + }, {"network": "udp", "port": 443, "outbound": "proxy"}, {"ip_cidr": ["224.0.0.0/3", "ff00::/8"], "outbound": "block"}, { diff --git a/src/services/task_scheduler.py b/src/services/task_scheduler.py index c9b7e37..3d41d99 100644 --- a/src/services/task_scheduler.py +++ b/src/services/task_scheduler.py @@ -12,7 +12,7 @@ from loguru import logger -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils from src.utils.process_utils import ProcessUtils TASK_NAME = "XenRayStartup" @@ -22,7 +22,7 @@ def _get_startupinfo(): """Get STARTUPINFO to hide console window on Windows.""" - if PlatformUtils.get_platform() == "windows": + if PlatformUtils.get_platform() == Platform.WINDOWS: startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = 0 # SW_HIDE @@ -234,4 +234,4 @@ def unregister_task() -> tuple[bool, str]: def is_supported() -> bool: """Check if startup management is supported on this platform.""" - return PlatformUtils.get_platform() == "windows" + return PlatformUtils.get_platform() == Platform.WINDOWS diff --git a/src/services/xray_installer.py b/src/services/xray_installer.py index 42cafad..381f05d 100644 --- a/src/services/xray_installer.py +++ b/src/services/xray_installer.py @@ -10,7 +10,7 @@ from loguru import logger from src.core.constants import BIN_DIR, XRAY_EXECUTABLE -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Architecture, Platform, PlatformUtils # Constants DOWNLOAD_TIMEOUT = 30.0 # seconds @@ -92,18 +92,18 @@ def _download_core(progress_callback: Optional[Callable[[str], None]] = None) -> try: # Determine architecture arch = PlatformUtils.get_architecture() - if arch == "x86_64": + if arch == Architecture.X86_64: arch_str = "64" - elif arch == "arm64": + elif arch == Architecture.ARM64: arch_str = "arm64-v8a" else: arch_str = "32" # Determine OS name platform = PlatformUtils.get_platform() - if platform == "windows": + if platform == Platform.WINDOWS: os_name = "windows" - elif platform == "macos": + elif platform == Platform.MACOS: os_name = "macos" else: os_name = "linux" diff --git a/src/ui/components/admin_restart_dialog.py b/src/ui/components/admin_restart_dialog.py index 05abce0..3270108 100644 --- a/src/ui/components/admin_restart_dialog.py +++ b/src/ui/components/admin_restart_dialog.py @@ -2,23 +2,45 @@ import flet as ft +from src.utils.platform_utils import Platform, PlatformUtils + class AdminRestartDialog(ft.AlertDialog): """Shows an AlertDialog asking the user to restart the app as Admin.""" def __init__(self, on_restart: callable): self._on_restart_callback = on_restart + platform = PlatformUtils.get_platform() - super().__init__( - modal=True, - title=ft.Text("Admin Rights Required"), - content=ft.Text( - "VPN mode requires Administrator privileges.\n\nDo you want to restart the application as Admin?" - ), - actions=[ + # Different content for Linux (can't auto-restart with GUI) + # Linux now supports pkexec restart + if platform == Platform.LINUX: + title_text = "Root Privileges Required" + content_text = ( + "VPN mode requires root privileges (or capabilities).\n" + "The application needs to restart with sudo/pkexec.\n\n" + "Do you want to restart now?" + ) + actions = [ + ft.TextButton("Cancel", on_click=self._close_dlg), + ft.TextButton("Restart", on_click=self._confirm_restart), + ] + else: + title_text = "Admin Rights Required" + content_text = ( + "VPN mode requires Administrator privileges.\n\n" + "Do you want to restart the application as Admin?" + ) + actions = [ ft.TextButton("Cancel", on_click=self._close_dlg), ft.TextButton("Restart", on_click=self._confirm_restart), - ], + ] + + super().__init__( + modal=True, + title=ft.Text(title_text), + content=ft.Text(content_text), + actions=actions, actions_alignment=ft.MainAxisAlignment.END, ) diff --git a/src/ui/handlers/systray_handler.py b/src/ui/handlers/systray_handler.py index 1e701fb..394e479 100644 --- a/src/ui/handlers/systray_handler.py +++ b/src/ui/handlers/systray_handler.py @@ -11,6 +11,7 @@ from src.core.constants import APPDIR from src.core.i18n import t from src.core.logger import logger +from src.utils.platform_utils import Platform, PlatformUtils if TYPE_CHECKING: from src.ui.main_window import MainWindow @@ -47,21 +48,39 @@ def _load_icon(self) -> Image.Image: """Load the application icon using Pillow.""" icon_path = os.path.join(APPDIR, "assets", "icon.png") if not os.path.exists(icon_path): - # Fallback to ico if png doesn't exist (though pystray handles PIL images best) + # Fallback to ico if png doesn't exist icon_path = os.path.join(APPDIR, "assets", "icon.ico") try: - return Image.open(icon_path) + logger.debug(f"[Systray] Loading icon from: {icon_path}") + img = Image.open(icon_path) + + # On Linux, ensure the icon is in RGBA mode and properly sized + # Some Linux desktop environments require specific icon formats + if PlatformUtils.get_platform() == Platform.LINUX: + # Convert to RGBA if needed + if img.mode != "RGBA": + img = img.convert("RGBA") + + # Resize to a standard tray icon size (22x22 or 24x24 is common) + # Some Linux DEs may not display large icons correctly + img = img.resize((48, 48), Image.Resampling.LANCZOS) + logger.debug(f"[Systray] Linux icon: mode={img.mode}, size={img.size}") + + return img except Exception as e: logger.error(f"Failed to load tray icon: {e}") - # Create a blank image as last resort - return Image.new("RGBA", (64, 64), (0, 0, 0, 0)) + # Create a visible fallback icon (purple square) instead of transparent + fallback = Image.new("RGBA", (48, 48), (128, 58, 237, 255)) # Purple + return fallback def _init_tray(self): """Initialize and start the tray icon.""" if self._icon: return + logger.debug(f"[Systray] Initializing tray icon on {PlatformUtils.get_platform()}") + self._icon = pystray.Icon( name="XenRay", icon=self._icon_image, @@ -72,10 +91,12 @@ def _init_tray(self): # Start tray in a separate thread or detached try: self._icon.run_detached() + logger.debug("[Systray] Icon running in detached mode") except Exception as e: logger.warning(f"run_detached failed, falling back to thread: {e}") self._tray_thread = threading.Thread(target=self._icon.run, daemon=True) self._tray_thread.start() + logger.debug("[Systray] Icon running in threaded mode") def _create_menu(self) -> pystray.Menu: """Create the tray context menu.""" @@ -110,13 +131,39 @@ def update_state(self): def update_title(self, title: str): """Update the hover tooltip.""" if self._icon: - self._icon.title = f"XenRay - {title}" + # Sanitize title for pystray compatibility on Linux + # pystray on Linux (using GTK/AppIndicator) may fail with non-ASCII chars + try: + # Try to encode as latin-1, replacing problematic chars + safe_title = title.encode("latin-1", errors="replace").decode("latin-1") + except (UnicodeEncodeError, UnicodeDecodeError): + safe_title = title.encode("ascii", errors="replace").decode("ascii") + self._icon.title = f"XenRay - {safe_title}" def stop(self): """Shutdown the tray icon.""" if self._icon: - self._icon.stop() - self._icon = None + try: + # On Linux, pystray can hang during stop, so do it in a + # non-blocking way with a timeout + if PlatformUtils.get_platform() == Platform.LINUX: + def _stop_icon(): + try: + self._icon.stop() + except Exception: + pass + + stop_thread = threading.Thread(target=_stop_icon, daemon=True) + stop_thread.start() + stop_thread.join(timeout=1.0) # Wait max 1 second + if stop_thread.is_alive(): + logger.debug("[Systray] Icon stop timed out, continuing anyway") + else: + self._icon.stop() + except Exception as e: + logger.debug(f"[Systray] Stop error (ignoring): {e}") + finally: + self._icon = None # --- Callbacks --- def _on_open(self, icon, item): @@ -147,8 +194,11 @@ def _on_toggle_connect(self, icon, item): def _on_exit(self, icon, item): """Final exit callback.""" try: - # This will trigger the full cleanup and exit logic - icon.stop() + # Stop icon first (non-blocking on Linux) + try: + icon.stop() + except Exception: + pass # We use the MainWindow's cleanup or call ProcessUtils from src.utils.process_utils import ProcessUtils diff --git a/src/ui/main_window.py b/src/ui/main_window.py index ca6846c..c7e879b 100644 --- a/src/ui/main_window.py +++ b/src/ui/main_window.py @@ -9,7 +9,7 @@ # Local modules from src.core.app_context import AppContext from src.core.connection_manager import ConnectionManager -from src.core.constants import APPDIR, FONT_URLS +from src.core.constants import APPDIR, FONT_URLS, SINGBOX_EXECUTABLE, XRAY_EXECUTABLE from src.core.i18n import t from src.core.logger import logger from src.core.types import ConnectionMode @@ -302,7 +302,7 @@ def _on_connect_clicked_impl(self, e=None): # Admin Check for VPN Mode if not self._is_running: if self._current_mode == ConnectionMode.VPN: - if not ProcessUtils.is_admin(): + if not ProcessUtils.has_vpn_privileges([SINGBOX_EXECUTABLE, XRAY_EXECUTABLE]): # CALL THE NEW CLASS METHOD self._show_admin_restart_dialog() return # Stop execution if admin restart is needed @@ -438,9 +438,12 @@ def _on_profile_updated(self, updated_profile: dict): def _on_mode_changed(self, mode: ConnectionMode): from src.utils.process_utils import ProcessUtils - if mode == ConnectionMode.VPN and not ProcessUtils.is_admin(): - self._show_toast(t("status.admin_required"), "warning") - return + if mode == ConnectionMode.VPN: + has_privs = ProcessUtils.has_vpn_privileges([SINGBOX_EXECUTABLE, XRAY_EXECUTABLE]) + logger.debug(f"[_on_mode_changed] Switching to VPN. Privileges check: {has_privs}") + if not has_privs: + self._show_toast(t("status.admin_required"), "warning") + return self._current_mode = mode self._app_context.settings.set_connection_mode("vpn" if mode == ConnectionMode.VPN else "proxy") diff --git a/src/utils/admin_utils.py b/src/utils/admin_utils.py index 4a9804f..4314d4f 100644 --- a/src/utils/admin_utils.py +++ b/src/utils/admin_utils.py @@ -25,13 +25,13 @@ def check_and_request_admin(mode: str) -> None: import ctypes import os - from src.utils.platform_utils import PlatformUtils + from src.utils.platform_utils import Platform, PlatformUtils # Check if already running as admin is_admin = False platform = PlatformUtils.get_platform() - if platform == "windows": + if platform == Platform.WINDOWS: try: is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: @@ -46,7 +46,7 @@ def check_and_request_admin(mode: str) -> None: typer.echo("⚠️ VPN mode requires administrator privileges", err=True) typer.echo() - if platform == "windows": + if platform == Platform.WINDOWS: _handle_windows_elevation() else: _handle_unix_elevation() diff --git a/src/utils/network_interface.py b/src/utils/network_interface.py index c4e6a41..9840006 100644 --- a/src/utils/network_interface.py +++ b/src/utils/network_interface.py @@ -1,20 +1,20 @@ -"""Network interface utilities for Windows.""" +"""Network interface utilities - Cross-platform support.""" import re import subprocess from typing import Optional, Tuple from loguru import logger -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils # Constants ROUTE_COMMAND_TIMEOUT = 5 # seconds IPCONFIG_COMMAND_TIMEOUT = 5 # seconds -TUN_INTERFACE_KEYWORDS = {"SING", "TUN", "TAP"} +TUN_INTERFACE_KEYWORDS = {"SING", "TUN", "TAP", "tun", "utun"} class NetworkInterfaceDetector: - """Detects primary network interface on Windows.""" + """Detects primary network interface - cross-platform.""" @staticmethod def get_primary_interface() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: @@ -25,6 +25,187 @@ def get_primary_interface() -> Tuple[Optional[str], Optional[str], Optional[str] Tuple of (interface_name, interface_ip, subnet, gateway) e.g., ("Wi-Fi", "192.168.1.10", "192.168.1.0/24", "192.168.1.1") """ + platform = PlatformUtils.get_platform() + + if platform == Platform.WINDOWS: + return NetworkInterfaceDetector._get_primary_interface_windows() + elif platform == Platform.LINUX: + return NetworkInterfaceDetector._get_primary_interface_linux() + elif platform == Platform.MACOS: + return NetworkInterfaceDetector._get_primary_interface_macos() + else: + logger.warning(f"Unsupported platform for interface detection: {platform}") + return None, None, None, None + + @staticmethod + def _get_primary_interface_linux() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Get primary interface on Linux using 'ip' command.""" + try: + # Get default route to find primary interface and gateway + # ip route show default + result = subprocess.run( + ["ip", "route", "show", "default"], + capture_output=True, + text=True, + timeout=ROUTE_COMMAND_TIMEOUT, + check=False, + ) + + if result.returncode != 0: + logger.error(f"Failed to get route table on Linux: {result.stderr}") + return None, None, None, None + + # Parse output like: "default via 192.168.1.1 dev wlp3s0 proto dhcp metric 600" + for line in result.stdout.strip().split("\n"): + if "default" in line: + parts = line.split() + + # Find gateway (after "via") and interface (after "dev") + gateway = None + interface_name = None + + for i, part in enumerate(parts): + if part == "via" and i + 1 < len(parts): + gateway = parts[i + 1] + elif part == "dev" and i + 1 < len(parts): + interface_name = parts[i + 1] + + if not interface_name: + continue + + # Skip TUN interfaces + if any(keyword in interface_name for keyword in TUN_INTERFACE_KEYWORDS): + logger.warning(f"Ignored TUN interface: {interface_name}") + continue + + # Get interface IP using ip addr show + interface_ip = NetworkInterfaceDetector._get_interface_ip_linux(interface_name) + + if interface_ip: + subnet = NetworkInterfaceDetector._calculate_subnet(interface_ip) + logger.info( + f"Detected primary interface: {interface_name} ({interface_ip}, {subnet}, {gateway})" + ) + return interface_name, interface_ip, subnet, gateway + + logger.warning("Could not detect primary interface on Linux") + return None, None, None, None + + except subprocess.TimeoutExpired: + logger.error("Timeout while getting route table on Linux") + return None, None, None, None + except (OSError, subprocess.SubprocessError) as e: + logger.error(f"Error detecting primary interface on Linux: {e}") + return None, None, None, None + except Exception as e: + logger.error(f"Unexpected error detecting primary interface on Linux: {e}") + return None, None, None, None + + @staticmethod + def _get_interface_ip_linux(interface_name: str) -> Optional[str]: + """Get IP address of a network interface on Linux.""" + try: + result = subprocess.run( + ["ip", "addr", "show", interface_name], + capture_output=True, + text=True, + timeout=IPCONFIG_COMMAND_TIMEOUT, + check=False, + ) + + if result.returncode != 0: + return None + + # Parse output for inet (IPv4) address + # Looking for: "inet 192.168.1.10/24 brd 192.168.1.255 scope global dynamic noprefixroute wlp3s0" + for line in result.stdout.split("\n"): + line = line.strip() + if line.startswith("inet "): + parts = line.split() + if len(parts) >= 2: + # Extract IP from CIDR notation (e.g., "192.168.1.10/24") + ip_cidr = parts[1] + ip = ip_cidr.split("/")[0] + if NetworkInterfaceDetector._is_valid_ip(ip): + return ip + + return None + + except Exception as e: + logger.debug(f"Error getting IP for {interface_name}: {e}") + return None + + @staticmethod + def _get_primary_interface_macos() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Get primary interface on macOS using 'route' and 'ifconfig'.""" + try: + # Get default route + result = subprocess.run( + ["route", "-n", "get", "default"], + capture_output=True, + text=True, + timeout=ROUTE_COMMAND_TIMEOUT, + check=False, + ) + + if result.returncode != 0: + logger.error(f"Failed to get route on macOS: {result.stderr}") + return None, None, None, None + + # Parse output for gateway and interface + gateway = None + interface_name = None + + for line in result.stdout.split("\n"): + line = line.strip() + if line.startswith("gateway:"): + gateway = line.split(":")[1].strip() + elif line.startswith("interface:"): + interface_name = line.split(":")[1].strip() + + if not interface_name: + return None, None, None, None + + # Skip TUN interfaces + if any(keyword in interface_name for keyword in TUN_INTERFACE_KEYWORDS): + logger.warning(f"Ignored TUN interface: {interface_name}") + return None, None, None, None + + # Get IP using ifconfig + result = subprocess.run( + ["ifconfig", interface_name], + capture_output=True, + text=True, + timeout=IPCONFIG_COMMAND_TIMEOUT, + check=False, + ) + + interface_ip = None + if result.returncode == 0: + for line in result.stdout.split("\n"): + if "inet " in line and "inet6" not in line: + parts = line.split() + for i, part in enumerate(parts): + if part == "inet" and i + 1 < len(parts): + interface_ip = parts[i + 1] + break + + if interface_ip: + subnet = NetworkInterfaceDetector._calculate_subnet(interface_ip) + logger.info( + f"Detected primary interface: {interface_name} ({interface_ip}, {subnet}, {gateway})" + ) + return interface_name, interface_ip, subnet, gateway + + return None, None, None, None + + except Exception as e: + logger.error(f"Error detecting primary interface on macOS: {e}") + return None, None, None, None + + @staticmethod + def _get_primary_interface_windows() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Get primary interface on Windows using 'route' and 'ipconfig'.""" try: # Get default route to find primary interface result = subprocess.run( @@ -57,7 +238,7 @@ def get_primary_interface() -> Tuple[Optional[str], Optional[str], Optional[str] continue # Get interface name from IP - interface_name = NetworkInterfaceDetector._get_interface_name(interface_ip) + interface_name = NetworkInterfaceDetector._get_interface_name_windows(interface_ip) # Fix: Ignore Sing-box TUN interface to prevent loops if interface_name: @@ -101,8 +282,8 @@ def _is_valid_ip(ip: str) -> bool: return False @staticmethod - def _get_interface_name(ip: str) -> Optional[str]: - """Get interface name from IP address using ipconfig.""" + def _get_interface_name_windows(ip: str) -> Optional[str]: + """Get interface name from IP address using ipconfig (Windows only).""" if not NetworkInterfaceDetector._is_valid_ip(ip): return None diff --git a/src/utils/network_utils.py b/src/utils/network_utils.py index c35dc29..c9e94fb 100644 --- a/src/utils/network_utils.py +++ b/src/utils/network_utils.py @@ -5,7 +5,7 @@ import subprocess from src.core.logger import logger -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils class NetworkUtils: @@ -152,8 +152,6 @@ def detect_optimal_mtu(host="8.8.8.8", min_mtu=1280, max_mtu=1480, timeout=2, mt Returns: Optimal MTU value (defaults to 1420 if detection fails) """ - import platform - # Default safe MTU if detection fails default_mtu = 1420 @@ -166,7 +164,7 @@ def detect_optimal_mtu(host="8.8.8.8", min_mtu=1280, max_mtu=1480, timeout=2, mt logger.info(f"MTU mode: auto - detecting optimal MTU (range: {min_mtu}-{max_mtu})...") # Platform-specific ping commands - system = platform.system().lower() + current_platform = PlatformUtils.get_platform() def test_mtu(mtu_size: int) -> bool: """Test if a specific MTU size works.""" @@ -179,7 +177,7 @@ def test_mtu(mtu_size: int) -> bool: return False # Build ping command based on platform - if system == "windows": + if current_platform == Platform.WINDOWS: # Windows: ping -n 1 -w timeout -f -l size host cmd = [ "ping", diff --git a/src/utils/platform_utils.py b/src/utils/platform_utils.py index 085bf42..b83403b 100644 --- a/src/utils/platform_utils.py +++ b/src/utils/platform_utils.py @@ -2,58 +2,71 @@ import os import platform import sys -from typing import Literal, Tuple +from enum import Enum +from typing import Tuple -PlatformType = Literal["windows", "macos", "linux"] -ArchType = Literal["x86_64", "arm64", "x86", "unknown"] + +class Platform(Enum): + """Operating system platforms.""" + WINDOWS = "windows" + MACOS = "macos" + LINUX = "linux" + + +class Architecture(Enum): + """CPU architectures.""" + X86_64 = "x86_64" + ARM64 = "arm64" + X86 = "x86" + UNKNOWN = "unknown" class PlatformUtils: """Utility class for platform detection and abstraction.""" @staticmethod - def get_platform() -> PlatformType: + def get_platform() -> Platform: """ Detect the current operating system. Returns: - Platform identifier: 'windows', 'macos', or 'linux' + Platform enum value: Platform.WINDOWS, Platform.MACOS, or Platform.LINUX """ system = platform.system() if system == "Windows" or os.name == "nt": - return "windows" + return Platform.WINDOWS elif system == "Darwin": - return "macos" + return Platform.MACOS else: - return "linux" + return Platform.LINUX @staticmethod - def get_architecture() -> ArchType: + def get_architecture() -> Architecture: """ Detect the CPU architecture. Returns: - Architecture identifier: 'x86_64', 'arm64', 'x86', or 'unknown' + Architecture enum value """ machine = platform.machine().lower() # Normalize common architecture names if machine in ("amd64", "x86_64", "x64"): - return "x86_64" + return Architecture.X86_64 elif machine in ("arm64", "aarch64", "arm64-v8a"): - return "arm64" + return Architecture.ARM64 elif machine in ("i386", "i686", "x86"): - return "x86" + return Architecture.X86 else: - return "unknown" + return Architecture.UNKNOWN @staticmethod - def get_platform_arch() -> Tuple[PlatformType, ArchType]: + def get_platform_arch() -> Tuple[Platform, Architecture]: """ Get both platform and architecture. Returns: - Tuple of (platform, architecture) + Tuple of (Platform, Architecture) enums """ return PlatformUtils.get_platform(), PlatformUtils.get_architecture() @@ -65,7 +78,7 @@ def get_binary_suffix() -> str: Returns: '.exe' for Windows, empty string for Unix-like systems """ - return ".exe" if PlatformUtils.get_platform() == "windows" else "" + return ".exe" if PlatformUtils.get_platform() == Platform.WINDOWS else "" @staticmethod def get_platform_bin_dir(base_dir: str) -> str: @@ -81,11 +94,15 @@ def get_platform_bin_dir(base_dir: str) -> str: """ plat, arch = PlatformUtils.get_platform_arch() - # Map platform names to directory conventions - platform_map = {"windows": "windows", "macos": "darwin", "linux": "linux"} + # Map platform enum values to directory conventions + platform_map = { + Platform.WINDOWS: "windows", + Platform.MACOS: "darwin", + Platform.LINUX: "linux" + } - platform_dir = platform_map.get(plat, plat) - return os.path.join(base_dir, f"{platform_dir}-{arch}") + platform_dir = platform_map.get(plat, plat.value) + return os.path.join(base_dir, f"{platform_dir}-{arch.value}") @staticmethod def is_frozen() -> bool: @@ -143,7 +160,7 @@ def get_subprocess_flags() -> int: """ import subprocess - if PlatformUtils.get_platform() == "windows": + if PlatformUtils.get_platform() == Platform.WINDOWS: # CREATE_NO_WINDOW only exists on Windows return getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000) return 0 @@ -158,7 +175,7 @@ def get_startupinfo(): """ import subprocess - if PlatformUtils.get_platform() == "windows": + if PlatformUtils.get_platform() == Platform.WINDOWS: # STARTUPINFO and related constants only exist on Windows STARTUPINFO = getattr(subprocess, "STARTUPINFO", None) if STARTUPINFO: @@ -177,9 +194,9 @@ def get_tun_interface_name() -> str: 'SINGTUN' for Windows, 'utun9' for macOS, 'tun0' for Linux """ plat = PlatformUtils.get_platform() - if plat == "windows": + if plat == Platform.WINDOWS: return "SINGTUN" - elif plat == "macos": + elif plat == Platform.MACOS: return "utun9" else: return "tun0" @@ -192,4 +209,4 @@ def supports_privileged_helper() -> bool: Returns: True for macOS (SMJobBless), False otherwise """ - return PlatformUtils.get_platform() == "macos" + return PlatformUtils.get_platform() == Platform.MACOS diff --git a/src/utils/process_utils.py b/src/utils/process_utils.py index 917d344..ebf0870 100644 --- a/src/utils/process_utils.py +++ b/src/utils/process_utils.py @@ -7,7 +7,7 @@ import psutil from src.core.logger import logger -from src.utils.platform_utils import PlatformUtils +from src.utils.platform_utils import Platform, PlatformUtils class ProcessUtils: @@ -46,6 +46,83 @@ def is_admin() -> bool: logger.warning(f"Unexpected error checking admin status: {e}") return False + @staticmethod + def has_vpn_privileges(binary_paths: List[str] = None) -> bool: + """ + Check if the system has privileges to run VPN (Tun mode). + On Linux, this checks for CAP_NET_ADMIN on binaries or root. + On Windows/macOS, checks for Admin/Root. + + Args: + binary_paths: List of absolute paths to binaries (sing-box, xray) to check capabilities for. + + Returns: + True if privileged enough, False otherwise. + """ + # If we are already admin/root, we are good + if ProcessUtils.is_admin(): + return True + + # On Linux, check for capabilities + if PlatformUtils.get_platform() == Platform.LINUX and binary_paths: + try: + import shutil + getcap = shutil.which("getcap") + + # Check standard locations if which() failed + if not getcap: + for p in ["/sbin/getcap", "/usr/sbin/getcap", "/usr/bin/getcap"]: + if os.path.exists(p): + getcap = p + break + + if not getcap or not os.path.exists(getcap): + logger.debug(f"[ProcessUtils] getcap not found") + return False + + logger.debug(f"[ProcessUtils] Checking capabilities using {getcap}") + + all_good = True + files_checked = 0 + for binary in binary_paths: + if not os.path.exists(binary): + logger.debug(f"[ProcessUtils] Binary not found: {binary}") + continue + + files_checked += 1 + # Check if binary has cap_net_admin + try: + result = subprocess.run( + [getcap, binary], + capture_output=True, + text=True, + check=False + ) + output = result.stdout.strip() + logger.debug(f"[ProcessUtils] getcap {binary} -> {output}") + + # Output format: /path/to/binary cap_net_admin,cap_net_bind_service=ep + if "cap_net_admin" not in output: + logger.warning(f"[ProcessUtils] Missing cap_net_admin on {binary}") + all_good = False + break + except Exception as e: + logger.error(f"[ProcessUtils] Failed to run getcap: {e}") + all_good = False + break + + # If we didn't check any files, that's technically a pass (no binaries to restrict) + # or a fail (paths wrong). Safer to pass if paths just don't exist yet but assume they will work? + # Actually, if binary_paths were provided but none found, we should probably warn, but returning True + # maintains "innocent until proven guilty" logic. + return all_good + + except Exception as e: + logger.error(f"[ProcessUtils] Error checking capabilities: {e}") + return False + + return False + @staticmethod def kill_process(pid: int, force: bool = False) -> bool: """ @@ -242,12 +319,59 @@ def restart_as_admin(): platform = PlatformUtils.get_platform() - if platform == "windows": + if platform == Platform.WINDOWS: ProcessUtils._restart_as_admin_windows() - elif platform == "macos": + elif platform == Platform.MACOS: ProcessUtils._restart_as_admin_macos() else: - logger.warning("restart_as_admin is not supported on Linux") + ProcessUtils._restart_as_admin_linux() + + @staticmethod + def _restart_as_admin_linux(): + """Restart as admin on Linux using pkexec.""" + import sys + + try: + # Reconstruct the command + # If frozen (PyInstaller), sys.executable is the app binary + # If source, sys.executable is python, sys.argv[0] is script + + cmd = [] + if getattr(sys, "frozen", False): + cmd = [sys.executable] + else: + cmd = [sys.executable] + sys.argv + + logger.info(f"Restarting as admin (pkexec): {cmd}") + + # Use pkexec with env to preserve display + # We use 'env' command as the executable for pkexec to run, + # so we can pass environment variables + + pkexec_cmd = ["pkexec", "env"] + + # Preserve critical X11/Wayland env vars + for var in ["DISPLAY", "XAUTHORITY", "WAYLAND_DISPLAY", "XDG_RUNTIME_DIR"]: + val = os.environ.get(var) + if val: + pkexec_cmd.append(f"{var}={val}") + + # Add the actual command + pkexec_cmd.extend(cmd) + + subprocess.Popen(pkexec_cmd) + + # Exit current instance + sys.exit(0) + + except Exception as e: + logger.error(f"Failed to restart as admin on Linux: {e}") + # Fallback to logging instructions + logger.warning( + "Automatic restart failed. " + "VPN mode requires root privileges on Linux. " + "Please run with: sudo poetry run xenray" + ) @staticmethod def _restart_as_admin_windows(): @@ -366,3 +490,81 @@ def _restart_as_admin_macos(): import traceback traceback.print_exc() + + @staticmethod + def _restart_as_admin_linux(): + """Restart as admin on Linux using pkexec (PolicyKit). + + This method uses pkexec to request root privileges on Linux. + pkexec is part of PolicyKit and provides a graphical authentication dialog. + We use 'env' to pass DISPLAY and XAUTHORITY so the GUI can connect to X11. + """ + import os + import shutil + import subprocess + import sys + + try: + # Check if pkexec is available + pkexec_path = shutil.which("pkexec") + if not pkexec_path: + logger.error("pkexec not found. Please install PolicyKit (policykit-1 package)") + return + + # Get display environment variables - required for GUI to work + display = os.environ.get("DISPLAY", ":0") + xauthority = os.environ.get("XAUTHORITY", os.path.expanduser("~/.Xauthority")) + wayland_display = os.environ.get("WAYLAND_DISPLAY", "") + xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", "") + + # Get the executable path + if getattr(sys, "frozen", False): + # Running as compiled PyInstaller executable + executable = sys.executable + app_cmd = [executable] + else: + # Running as Python script + python_exe = sys.executable + script_path = sys.argv[0] + + # Use the full path to the script + if not os.path.isabs(script_path): + script_path = os.path.abspath(script_path) + + app_cmd = [python_exe, script_path] + + # Build command with env to pass display variables + # pkexec env DISPLAY=:0 XAUTHORITY=/path python script.py + env_cmd = ["env", f"DISPLAY={display}", f"XAUTHORITY={xauthority}"] + + # Add Wayland variables if present + if wayland_display: + env_cmd.append(f"WAYLAND_DISPLAY={wayland_display}") + if xdg_runtime_dir: + env_cmd.append(f"XDG_RUNTIME_DIR={xdg_runtime_dir}") + + cmd = [pkexec_path] + env_cmd + app_cmd + + logger.info(f"Requesting admin privileges via pkexec: {' '.join(cmd)}") + + # Launch the new instance with pkexec + # Use Popen to start the process and detach + process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, # Detach from current session + ) + + if process.pid: + logger.info(f"Successfully launched new instance with admin privileges (PID: {process.pid})") + # Exit current instance + sys.exit(0) + else: + logger.error("Failed to launch admin instance") + + except Exception as e: + logger.error(f"Failed to restart as admin on Linux: {e}") + import traceback + + traceback.print_exc()