diff --git a/CHANGELOG.md b/CHANGELOG.md index c38846b..385987d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ Change Log ========== +Version 2.3.0 *(2025-07-02)* +---------------------------- + + * New: Added interactive device selection when multiple devices are connected + * Fix: Improved handling of subprocess outputs + * Fix: Better error handling for adb commands + + Version 2.1.0 *(2016-09-07)* ---------------------------- diff --git a/pidcat.py b/pidcat.py index 6a23786..b698117 100755 --- a/pidcat.py +++ b/pidcat.py @@ -26,15 +26,79 @@ import re import subprocess from subprocess import PIPE +import shutil +import colorama + +# Initialize colorama to process ANSI escape codes on Windows +colorama.init() + +# A sensible version bump reflecting new features. +__version__ = '2.3.0' + + +def check_adb_device(): + """Checks for a connected ADB device and prompts for selection if multiple are found.""" + try: + # Use a timeout to prevent the command from hanging indefinitely. + result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, check=True, timeout=5) + lines = result.stdout.strip().splitlines() + + # Skip the first line which is just a header + device_lines = [line for line in lines[1:] if line.strip()] + + # Check if we have any authorized devices + authorized_devices = [] + for line in device_lines: + parts = line.split() + if len(parts) >= 2 and 'device' in parts[1] and 'unauthorized' not in parts[1]: + authorized_devices.append(parts[0]) # The first part is the device serial + + if not authorized_devices: + print("❌ ERROR: No authorized ADB device found. Please connect a device with USB debugging enabled.", file=sys.stderr) + sys.exit(1) + + # If we have one device, or device selection parameters are already provided, we're good + if len(authorized_devices) == 1 or args.device_serial or args.use_device or args.use_emulator: + print(f"✅ Found {len(authorized_devices)} device(s).") + return None # No need to select a device + + # If we have multiple devices and no selection made, ask the user + print(f"📱 Multiple devices found ({len(authorized_devices)}). Please select one:") + for idx, device in enumerate(authorized_devices): + print(f" [{idx + 1}] {device}") + + selection = None + while selection is None: + try: + choice = input("Enter device number [1-{}]: ".format(len(authorized_devices))) + idx = int(choice) - 1 + if 0 <= idx < len(authorized_devices): + selection = authorized_devices[idx] + else: + print("❌ Invalid selection. Please try again.") + except ValueError: + print("❌ Please enter a valid number.") + + print(f"✅ Selected device: {selection}") + return selection + + except (subprocess.CalledProcessError, FileNotFoundError): + print("❌ ERROR: 'adb' command not found. Is the Android SDK Platform-Tools in your system's PATH?", file=sys.stderr) + sys.exit(1) + except subprocess.TimeoutExpired: + print("❌ ERROR: 'adb devices' command timed out. ADB server may be unresponsive.", file=sys.stderr) + sys.exit(1) -__version__ = '2.1.0' LOG_LEVELS = 'VDIWEF' LOG_LEVELS_MAP = dict([(LOG_LEVELS[i], i) for i in range(len(LOG_LEVELS))]) -parser = argparse.ArgumentParser(description='Filter logcat by package name') +parser = argparse.ArgumentParser( + description='Filter logcat by package name with colored output.', + epilog='Example: python pidcat.py com.example.app' +) parser.add_argument('package', nargs='*', help='Application package name(s)') parser.add_argument('-w', '--tag-width', metavar='N', dest='tag_width', type=int, default=23, help='Width of log tag') -parser.add_argument('-l', '--min-level', dest='min_level', type=str, choices=LOG_LEVELS+LOG_LEVELS.lower(), default='V', help='Minimum level to be displayed') +parser.add_argument('-l', '--min-level', dest='min_level', type=str, choices=LOG_LEVELS+LOG_LEVELS.lower(), default='V', help='Minimum level to be displayed (V, D, I, W, E, F)') parser.add_argument('--color-gc', dest='color_gc', action='store_true', help='Color garbage collection') parser.add_argument('--always-display-tags', dest='always_tags', action='store_true',help='Always display the tag name') parser.add_argument('--current', dest='current_app', action='store_true',help='Filter logcat by current running app') @@ -52,87 +116,80 @@ package = args.package +print(f"--- Colored Logcat v{__version__} ---") +selected_device = check_adb_device() + base_adb_command = ['adb'] if args.device_serial: base_adb_command.extend(['-s', args.device_serial]) -if args.use_device: + print(f" targeting device serial: {args.device_serial}") +elif selected_device: + base_adb_command.extend(['-s', selected_device]) + print(f" targeting selected device: {selected_device}") +elif args.use_device: base_adb_command.append('-d') -if args.use_emulator: + print(" targeting first connected device.") +elif args.use_emulator: base_adb_command.append('-e') + print(" targeting first running emulator.") if args.current_app: + print(" looking for current running app...") system_dump_command = base_adb_command + ["shell", "dumpsys", "activity", "activities"] - system_dump = subprocess.Popen(system_dump_command, stdout=PIPE, stderr=PIPE).communicate()[0] - running_package_name = re.search(".*TaskRecord.*A[= ]([^ ^}]*)", str(system_dump)).group(1) - package.append(running_package_name) + system_dump_process = subprocess.Popen(system_dump_command, stdout=PIPE, stderr=PIPE, universal_newlines=True) + system_dump = system_dump_process.communicate()[0] + running_package_name_match = re.search(".*TaskRecord.*A[= ]([^ ^}]*)", system_dump) + if running_package_name_match: + current_package = running_package_name_match.group(1) + package.append(current_package) + print(f" found current app: {current_package}") + if len(package) == 0: args.all = True + print("No package name provided, switching to --all mode.") +else: + print(f"Filtering for packages: {package}") -# Store the names of packages for which to match all processes. -catchall_package = list(filter(lambda package: package.find(":") == -1, package)) -# Store the name of processes to match exactly. -named_processes = list(filter(lambda package: package.find(":") != -1, package)) -# Convert default process names from : (cli notation) to (android notation) in the exact names match group. -named_processes = map(lambda package: package if package.find(":") != len(package) - 1 else package[:-1], named_processes) - -header_size = args.tag_width + 1 + 3 + 1 # space, level, space +catchall_package = list(filter(lambda p: p.find(":") == -1, package)) +named_processes = list(filter(lambda p: p.find(":") != -1, package)) +named_processes = list(map(lambda p: p if p.find(":") != len(p) - 1 else p[:-1], named_processes)) +header_size = args.tag_width + 1 + 3 + 1 stdout_isatty = sys.stdout.isatty() width = -1 try: - # Get the current terminal width - import fcntl, termios, struct - h, width = struct.unpack('hh', fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack('hh', 0, 0))) -except: + width, _ = shutil.get_terminal_size() +except OSError: pass BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) - RESET = '\033[0m' def termcolor(fg=None, bg=None): codes = [] if fg is not None: codes.append('3%d' % fg) - if bg is not None: codes.append('10%d' % bg) + if bg is not None: codes.append('4%d' % bg) return '\033[%sm' % ';'.join(codes) if codes else '' def colorize(message, fg=None, bg=None): return termcolor(fg, bg) + message + RESET if stdout_isatty else message def indent_wrap(message): - if width == -1: - return message - message = message.replace('\t', ' ') - wrap_area = width - header_size - messagebuf = '' - current = 0 - while current < len(message): - next = min(current + wrap_area, len(message)) - messagebuf += message[current:next] - if next < len(message): - messagebuf += '\n' - messagebuf += ' ' * header_size - current = next - return messagebuf - + if width <= 0 or (width - header_size) <= 0: + return message + message = message.replace('\t', ' ') + wrap_area = width - header_size + return '\n'.join([message[i:i+wrap_area] for i in range(0, len(message), wrap_area)]) LAST_USED = [RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN] KNOWN_TAGS = { - 'dalvikvm': WHITE, - 'Process': WHITE, - 'ActivityManager': WHITE, - 'ActivityThread': WHITE, - 'AndroidRuntime': CYAN, - 'jdwp': WHITE, - 'StrictMode': WHITE, - 'DEBUG': YELLOW, + 'dalvikvm': WHITE, 'Process': WHITE, 'ActivityManager': WHITE, 'ActivityThread': WHITE, + 'AndroidRuntime': CYAN, 'jdwp': WHITE, 'StrictMode': WHITE, 'DEBUG': YELLOW, } def allocate_color(tag): - # this will allocate a unique format for the given tag - # since we dont have very many colors, we always keep track of the LRU if tag not in KNOWN_TAGS: KNOWN_TAGS[tag] = LAST_USED[0] color = KNOWN_TAGS[tag] @@ -141,32 +198,19 @@ def allocate_color(tag): LAST_USED.append(color) return color - -RULES = { - # StrictMode policy violation; ~duration=319 ms: android.os.StrictMode$StrictModeDiskWriteViolation: policy=31 violation=1 - re.compile(r'^(StrictMode policy violation)(; ~duration=)(\d+ ms)') - : r'%s\1%s\2%s\3%s' % (termcolor(RED), RESET, termcolor(YELLOW), RESET), -} - -# Only enable GC coloring if the user opted-in +RULES = {re.compile(r'^(StrictMode policy violation)(; ~duration=)(\d+ ms)') : r'%s\1%s\2%s\3%s' % (termcolor(RED), RESET, termcolor(YELLOW), RESET)} if args.color_gc: - # GC_CONCURRENT freed 3617K, 29% free 20525K/28648K, paused 4ms+5ms, total 85ms key = re.compile(r'^(GC_(?:CONCURRENT|FOR_M?ALLOC|EXTERNAL_ALLOC|EXPLICIT) )(freed >>>> ([a-zA-Z0-9._:]+) \[ userId:0 \| appId:(\d+) \]$') @@ -181,182 +225,169 @@ def allocate_color(tag): adb_command.append('logcat') adb_command.extend(['-v', 'brief']) -# Clear log before starting logcat if args.clear_logcat: - adb_clear_command = list(adb_command) - adb_clear_command.append('-c') - adb_clear = subprocess.Popen(adb_clear_command) - - while adb_clear.poll() is None: - pass + print("Clearing logcat buffer (this may fail on some Android versions)...") + try: + subprocess.run(list(adb_command) + ['-c'], check=True, capture_output=True) + print("Buffer cleared successfully.") + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + print("Warning: Could not clear log buffer. This is common on newer Android versions.") -# This is a ducktype of the subprocess.Popen object class FakeStdinProcess(): def __init__(self): self.stdout = sys.stdin + self.returncode = None + def poll(self): return None -if sys.stdin.isatty(): - adb = subprocess.Popen(adb_command, stdin=PIPE, stdout=PIPE) -else: - adb = FakeStdinProcess() pids = set() last_tag = None app_pid = None def match_packages(token): - if len(package) == 0: - return True - if token in named_processes: - return True + if not package: return True + if token in named_processes: return True index = token.find(':') return (token in catchall_package) if index == -1 else (token[:index] in catchall_package) def parse_death(tag, message): - if tag != 'ActivityManager': - return None, None - kill = PID_KILL.match(message) - if kill: - pid = kill.group(1) - package_line = kill.group(2) - if match_packages(package_line) and pid in pids: - return pid, package_line - leave = PID_LEAVE.match(message) - if leave: - pid = leave.group(2) - package_line = leave.group(1) - if match_packages(package_line) and pid in pids: - return pid, package_line - death = PID_DEATH.match(message) - if death: - pid = death.group(2) - package_line = death.group(1) - if match_packages(package_line) and pid in pids: - return pid, package_line + if tag != 'ActivityManager': return None, None + for pattern in [PID_KILL, PID_LEAVE, PID_DEATH]: + m = pattern.match(message) + if m: + pid = m.group(1) if pattern != PID_LEAVE else m.group(2) + pname = m.group(2) if pattern != PID_LEAVE else m.group(1) + if match_packages(pname) and pid in pids: return pid, pname return None, None def parse_start_proc(line): - start = PID_START_5_1.match(line) - if start is not None: - line_pid, line_package, target = start.groups() - return line_package, target, line_pid, '', '' - start = PID_START.match(line) - if start is not None: - line_package, target, line_pid, line_uid, line_gids = start.groups() - return line_package, target, line_pid, line_uid, line_gids - start = PID_START_DALVIK.match(line) - if start is not None: - line_pid, line_package, line_uid = start.groups() - return line_package, '', line_pid, line_uid, '' + patterns = [PID_START_5_1, PID_START, PID_START_DALVIK] + for pattern in patterns: + start = pattern.match(line) + if start: + if pattern == PID_START_5_1: return start.group(2), start.group(3), start.group(1), '', '' + if pattern == PID_START: return start.groups() + if pattern == PID_START_DALVIK: return start.group(2), '', start.group(1), start.group(3), '' return None def tag_in_tags_regex(tag, tags): - return any(re.match(r'^' + t + r'$', tag) for t in map(str.strip, tags)) + return any(re.match(r'^' + t + r'$', tag, re.IGNORECASE) for t in map(str.strip, tags)) -ps_command = base_adb_command + ['shell', 'ps'] -ps_pid = subprocess.Popen(ps_command, stdin=PIPE, stdout=PIPE, stderr=PIPE) -while True: - try: - line = ps_pid.stdout.readline().decode('utf-8', 'replace').strip() - except KeyboardInterrupt: - break - if len(line) == 0: - break - - pid_match = PID_LINE.match(line) - if pid_match is not None: - pid = pid_match.group(1) - proc = pid_match.group(2) - if proc in catchall_package: - seen_pids = True - pids.add(pid) - -while adb.poll() is None: - try: - line = adb.stdout.readline().decode('utf-8', 'replace').strip() - except KeyboardInterrupt: - break - if len(line) == 0: - break - - bug_line = BUG_LINE.match(line) - if bug_line is not None: - continue - - log_line = LOG_LINE.match(line) - if log_line is None: - continue - - level, tag, owner, message = log_line.groups() - tag = tag.strip() - start = parse_start_proc(line) - if start: - line_package, target, line_pid, line_uid, line_gids = start - if match_packages(line_package): - pids.add(line_pid) - - app_pid = line_pid - - linebuf = '\n' - linebuf += colorize(' ' * (header_size - 1), bg=WHITE) - linebuf += indent_wrap(' Process %s created for %s\n' % (line_package, target)) - linebuf += colorize(' ' * (header_size - 1), bg=WHITE) - linebuf += ' PID: %s UID: %s GIDs: %s' % (line_pid, line_uid, line_gids) - linebuf += '\n' - print(linebuf) - last_tag = None # Ensure next log gets a tag printed - - dead_pid, dead_pname = parse_death(tag, message) - if dead_pid: - pids.remove(dead_pid) - linebuf = '\n' - linebuf += colorize(' ' * (header_size - 1), bg=RED) - linebuf += ' Process %s (PID: %s) ended' % (dead_pname, dead_pid) - linebuf += '\n' - print(linebuf) - last_tag = None # Ensure next log gets a tag printed - - # Make sure the backtrace is printed after a native crash - if tag == 'DEBUG': - bt_line = BACKTRACE_LINE.match(message.lstrip()) - if bt_line is not None: - message = message.lstrip() - owner = app_pid - - if not args.all and owner not in pids: - continue - if level in LOG_LEVELS_MAP and LOG_LEVELS_MAP[level] < min_level: - continue - if args.ignored_tag and tag_in_tags_regex(tag, args.ignored_tag): - continue - if args.tag and not tag_in_tags_regex(tag, args.tag): - continue - - linebuf = '' - - if args.tag_width > 0: - # right-align tag title and allocate color if needed - if tag != last_tag or args.always_tags: - last_tag = tag - color = allocate_color(tag) - tag = tag[-args.tag_width:].rjust(args.tag_width) - linebuf += colorize(tag, fg=color) - else: - linebuf += ' ' * args.tag_width - linebuf += ' ' - - # write out level colored edge - if level in TAGTYPES: - linebuf += TAGTYPES[level] - else: - linebuf += ' ' + level + ' ' - linebuf += ' ' - - # format tag message using rules - for matcher in RULES: - replace = RULES[matcher] - message = matcher.sub(replace, message) - - linebuf += indent_wrap(message) - print(linebuf.encode('utf-8')) +# Initialize ADB connection +if sys.stdin.isatty(): + adb = subprocess.Popen(adb_command, stdin=PIPE, stdout=PIPE, text=True) +else: + adb = FakeStdinProcess() + +if not args.all: + print(f"Searching for running process(es) for '{', '.join(package)}'...") + ps_command = base_adb_command + ['shell', 'ps'] + try: + ps_output = subprocess.check_output(ps_command, universal_newlines=True) + for line in ps_output.splitlines(): + pid_match = PID_LINE.match(line) + if pid_match: + pid, proc = pid_match.groups() + if match_packages(proc): + pids.add(pid) + if pids: + print(f"✅ Success! Found PID(s): {', '.join(pids)}. Now listening...") + else: + print(f"⚠️ Warning: No running process found for '{', '.join(package)}'. Waiting for it to start...") + except FileNotFoundError: + print("❌ ERROR: Could not find a running ADB process. Please check the connection.", file=sys.stderr) + sys.exit(1) + except subprocess.CalledProcessError: + print("⚠️ Warning: Error executing PS command. Will still attempt to capture logs.") + +print("\n--- Listening for logcat messages... (Press Ctrl+C to exit) ---\n") + +try: + while adb and adb.poll() is None: + try: + if hasattr(adb, 'stdout') and adb.stdout: + line = adb.stdout.readline() + elif not sys.stdin.isatty(): + line = sys.stdin.readline() + else: + # No valid input source + break + + if not line: # End of file + break + + # Ensure it's a string (should be with text=True) + line = line.strip() + if not line: + continue + + if BUG_LINE.match(line): + continue + + log_line = LOG_LINE.match(line) + if not log_line: + continue + + level, tag, owner, message = log_line.groups() + tag = tag.strip() + + start = parse_start_proc(line) + if start: + line_package, target, line_pid, line_uid, line_gids = start + if match_packages(line_package) and line_pid not in pids: + pids.add(line_pid) + app_pid = line_pid + linebuf = '\n' + colorize(' ' * (header_size - 1), bg=WHITE) + linebuf += indent_wrap(' Process %s created for %s\n' % (line_package, target)) + linebuf += colorize(' ' * (header_size - 1), bg=WHITE) + linebuf += ' PID: %s UID: %s GIDs: %s' % (line_pid, line_uid, line_gids) + linebuf += '\n' + print(linebuf) + last_tag = None + + dead_pid, dead_pname = parse_death(tag, message) + if dead_pid and dead_pid in pids: + pids.remove(dead_pid) + linebuf = '\n' + colorize(' ' * (header_size - 1), bg=RED) + linebuf += ' Process %s (PID: %s) ended' % (dead_pname, dead_pid) + linebuf += '\n' + print(linebuf) + last_tag = None + + if tag == 'DEBUG' and BACKTRACE_LINE.match(message.lstrip()): + message = message.lstrip() + owner = app_pid + + if not args.all and owner not in pids: continue + if level in LOG_LEVELS_MAP and LOG_LEVELS_MAP[level] < min_level: continue + if args.ignored_tag and tag_in_tags_regex(tag, args.ignored_tag): continue + if args.tag and not tag_in_tags_regex(tag, args.tag): continue + + linebuf = '' + if args.tag_width > 0: + if tag != last_tag or args.always_tags: + last_tag = tag + color = allocate_color(tag) + tag = tag[-args.tag_width:].rjust(args.tag_width) + linebuf += colorize(tag, fg=color) + else: + linebuf += ' ' * args.tag_width + linebuf += ' ' + + linebuf += TAGTYPES.get(level, ' ' + level + ' ') + linebuf += ' ' + + for matcher, replace in RULES.items(): + message = matcher.sub(replace, message) + + linebuf += indent_wrap(message) + print(linebuf) + + except KeyboardInterrupt: + print("\n--- Exiting gracefully. ---") + except Exception as e: + print(f"\nAn unexpected error occurred: {e}", file=sys.stderr) +finally: + # De-initialize colorama to restore original terminal settings. + colorama.deinit() \ No newline at end of file diff --git a/start_pidcat.bat b/start_pidcat.bat new file mode 100644 index 0000000..199464c --- /dev/null +++ b/start_pidcat.bat @@ -0,0 +1,102 @@ +@echo off +REM Enable the "Delayed Expansion" feature, which is essential for handling +REM variables that are set and used inside IF/ELSE blocks. +SETLOCAL EnableDelayedExpansion + +REM ================================================================= +REM CONFIGURATION BLOCK +REM ================================================================= +REM +REM --- IMPORTANT: The paths to python.exe and your script MUST be correct --- +REM +set "pythonPath=C:\Python311\python.exe" +set "pidcatPath=C:\Python311\Tools\pidcat\pidcat.py" +REM +REM --- This is the default package used when you just press Enter --- +REM +set "defaultPackage=com.fadcam" +REM +REM ================================================================= + + +REM Set the main menu color scheme (Light Aqua text on Black) +COLOR 0B +TITLE Android Logcat Assistant + +:main +REM Refresh the screen and set the main menu color +COLOR 0B +cls +ECHO. +ECHO ============================================ +REM Escaping the < and > characters +ECHO ^<^<^<^<^< Android Logcat Assistant ^>^>^>^>^> +ECHO ============================================ +ECHO. +ECHO Instructions: +ECHO. +ECHO * To filter for an app, type its package name. +ECHO * To see the full device log, type 'all'. +ECHO * To use the default (%defaultPackage%), just press ENTER. +ECHO. +ECHO. + +set /p "userInput=Enter package name or command: " + +REM --- Check if the user wants the full logcat --- +IF /I "%userInput%" == "all" ( + cls + COLOR 0A + ECHO. + ECHO ^>^>^> Starting full logcat for ALL processes... ^<^<^< + ECHO ^>^>^> Press Ctrl+C in this window to stop. ^<^<^< + ECHO. + "%pythonPath%" "%pidcatPath%" -c -a + +) ELSE ( + REM --- Handle a specific package or the default --- + + REM Set the packageName variable based on the user's input. + set "packageName=%userInput%" + + REM If the user just pressed Enter, userInput is empty, so we use the default. + if "%userInput%" == "" set "packageName=%defaultPackage%" + + cls + COLOR 0A + ECHO. + + REM ================================================================= + REM --- THE CRITICAL FIX: Using !packageName! for delayed expansion --- + REM ================================================================= + ECHO ^>^>^> Starting filtered logcat for package: !packageName! ^<^<^< + ECHO ^>^>^> Press Ctrl+C in this window to stop. ^<^<^< + ECHO. + "%pythonPath%" "%pidcatPath%" -c "!packageName!" + REM ================================================================= +) + + +REM --- Return to menu color for the exit prompt --- +COLOR 0B +ECHO. +ECHO ======================================== +ECHO ^|^| Session Finished or Stopped ^|^| +ECHO ======================================== +ECHO. + +:ask_again +set /p "choice=Run another session? (y/n): " +if /i "%choice%"=="y" goto main +if /i "%choice%"=="n" goto end + +REM --- Invalid choice handler --- +COLOR 4F +echo Invalid choice. Please enter 'y' or 'n'. +timeout /t 1 /nobreak > nul +goto ask_again + +:end +REM Restore the default CMD color before exiting +COLOR 07 +exit \ No newline at end of file