From 9fa4d914ab053ba7789c5133dec95b4e0e915a97 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Thu, 22 Jan 2026 21:53:05 +0530 Subject: [PATCH 01/12] feat: Implement alias management, configuration handling, last station persistence, and XDG-compliant path management. --- .vscode/settings.json | 4 +- README.md | 14 +++++++ radioactive/alias.py | 3 +- radioactive/config.py | 10 ++--- radioactive/last_station.py | 7 +--- radioactive/paths.py | 84 +++++++++++++++++++++++++++++++++++++ 6 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 radioactive/paths.py diff --git a/.vscode/settings.json b/.vscode/settings.json index b5a61b5..bff3298 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -17,8 +17,8 @@ ], "files.autoSave": "off", "editor.wordWrap": "wordWrapColumn", - "workbench.colorTheme": "GitHub Dark", - "editor.minimap.autohide": true, + "workbench.colorTheme": "Visual Studio Light", + "editor.minimap.autohide": "mouseover", "editor.minimap.renderCharacters": false, "editor.experimentalWhitespaceRendering": "font", "editor.fontFamily": "'Fira Code', Consolas, 'Courier New', monospace", diff --git a/README.md b/README.md index 3a22537..4978ae7 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,20 @@ filetype = mp3 player = ffplay ``` +### Configuration Paths + +`radio-active` follows the XDG Base Directory specification: + +- **Configuration** (including `config.ini` and `alias_map`): + - Linux/Mac: `$XDG_CONFIG_HOME/radio-active` (defaults to `~/.config/radio-active`) + - Windows: `%XDG_CONFIG_HOME%/radio-active` (defaults to `~/.config/radio-active` if not set) + +- **Data** (including `last-station`): + - Linux/Mac: `$XDG_DATA_HOME/radio-active` (defaults to `~/.local/share/radio-active`) + - Windows: `%XDG_DATA_HOME%/radio-active` (defaults to `~/.local/share/radio-active` if not set) + +Legacy configuration files in `~/.radio-active-configs.ini`, `~/.radio-active-alias`, and `~/.radio-active-last-station` will be automatically migrated to the new locations on first run. + > [!WARNING] > Do NOT modify the keys, only change the values. you can give any absolute or relative path as filepath. diff --git a/radioactive/alias.py b/radioactive/alias.py index 8dd4b84..ecb9d5e 100644 --- a/radioactive/alias.py +++ b/radioactive/alias.py @@ -6,10 +6,11 @@ class Alias: def __init__(self): + from radioactive.paths import get_alias_path self.alias_map = [] self.found = False - self.alias_path = os.path.join(os.path.expanduser("~"), ".radio-active-alias") + self.alias_path = get_alias_path() def write_stations(self, station_map): """Write stations file from generated map""" diff --git a/radioactive/config.py b/radioactive/config.py index 504b1a4..90480b7 100644 --- a/radioactive/config.py +++ b/radioactive/config.py @@ -24,11 +24,10 @@ def write_a_sample_config_file(): "player": "ffplay", } - # Get the user's home directory - home_directory = os.path.expanduser("~") + from radioactive.paths import get_config_path # Specify the file path - file_path = os.path.join(home_directory, ".radio-active-configs.ini") + file_path = get_config_path() try: # Write the configuration to the file @@ -43,9 +42,8 @@ def write_a_sample_config_file(): class Configs: def __init__(self): - self.config_path = os.path.join( - os.path.expanduser("~"), ".radio-active-configs.ini" - ) + from radioactive.paths import get_config_path + self.config_path = get_config_path() def load(self): self.config = configparser.ConfigParser() diff --git a/radioactive/last_station.py b/radioactive/last_station.py index a24be3a..27a99cf 100644 --- a/radioactive/last_station.py +++ b/radioactive/last_station.py @@ -17,11 +17,8 @@ class Last_station: """ def __init__(self): - self.last_station_path = None - - self.last_station_path = os.path.join( - os.path.expanduser("~"), ".radio-active-last-station" - ) + from radioactive.paths import get_last_station_path + self.last_station_path = get_last_station_path() def get_info(self): try: diff --git a/radioactive/paths.py b/radioactive/paths.py new file mode 100644 index 0000000..79b3a43 --- /dev/null +++ b/radioactive/paths.py @@ -0,0 +1,84 @@ +import os +import shutil +from zenlog import log + +def _get_xdg_config_dir(): + """Return the XDG configuration directory for radio-active.""" + xdg_config_home = os.environ.get("XDG_CONFIG_HOME") + if xdg_config_home: + return os.path.join(xdg_config_home, "radio-active") + # Default to ~/.config/radio-active + return os.path.join(os.path.expanduser("~"), ".config", "radio-active") + + +def _get_xdg_data_dir(): + """Return the XDG data directory for radio-active.""" + xdg_data_home = os.environ.get("XDG_DATA_HOME") + if xdg_data_home: + return os.path.join(xdg_data_home, "radio-active") + # Default to ~/.local/share/radio-active + return os.path.join(os.path.expanduser("~"), ".local", "share", "radio-active") + + +def get_config_path(): + """ + Get the path to the configuration file. + Migrates from legacy path if it exists and new path does not. + """ + legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-configs.ini") + + config_dir = _get_xdg_config_dir() + os.makedirs(config_dir, exist_ok=True) + new_path = os.path.join(config_dir, "config.ini") + + if os.path.exists(legacy_path) and not os.path.exists(new_path): + log.info(f"Migrating config file from {legacy_path} to {new_path}") + try: + shutil.move(legacy_path, new_path) + except Exception as e: + log.warning(f"Could not migrate config file: {e}") + # If migration fails, we return new_path anyway, user might have to manually move or start fresh + + return new_path + + +def get_alias_path(): + """ + Get the path to the alias (favorites) file. + Migrates from legacy path if it exists and new path does not. + """ + legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-alias") + + config_dir = _get_xdg_config_dir() # Aliases are user config + os.makedirs(config_dir, exist_ok=True) + new_path = os.path.join(config_dir, "alias_map") + + if os.path.exists(legacy_path) and not os.path.exists(new_path): + log.info(f"Migrating alias file from {legacy_path} to {new_path}") + try: + shutil.move(legacy_path, new_path) + except Exception as e: + log.warning(f"Could not migrate alias file: {e}") + + return new_path + + +def get_last_station_path(): + """ + Get the path to the last played station file. + Migrates from legacy path if it exists and new path does not. + """ + legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-last-station") + + data_dir = _get_xdg_data_dir() # Last station is state/data + os.makedirs(data_dir, exist_ok=True) + new_path = os.path.join(data_dir, "last-station") + + if os.path.exists(legacy_path) and not os.path.exists(new_path): + log.info(f"Migrating last station file from {legacy_path} to {new_path}") + try: + shutil.move(legacy_path, new_path) + except Exception as e: + log.warning(f"Could not migrate last station file: {e}") + + return new_path From 17151913dfc16d80d5f21fbca4be2314d1f30e0b Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 00:17:36 +0530 Subject: [PATCH 02/12] Refactor: Internal code structure improvements --- radioactive/actions.py | 327 ++++++++++++++++ radioactive/config.py | 78 ++-- radioactive/ffplay.py | 121 ++++-- radioactive/filter.py | 146 ++++--- radioactive/handler.py | 235 ++++++++---- radioactive/parser.py | 15 +- radioactive/ui.py | 122 ++++++ radioactive/utilities.py | 592 ++++++----------------------- tmp_config/radio-active/config.ini | 10 + 9 files changed, 988 insertions(+), 658 deletions(-) create mode 100644 radioactive/actions.py create mode 100644 radioactive/ui.py create mode 100644 tmp_config/radio-active/config.ini diff --git a/radioactive/actions.py b/radioactive/actions.py new file mode 100644 index 0000000..a0036cd --- /dev/null +++ b/radioactive/actions.py @@ -0,0 +1,327 @@ +""" +Core logical actions for radio-active. +""" + +import datetime +import json +import os +import subprocess +import sys +from random import randint +from typing import Any, Dict, List, Optional, Tuple, Union + +import requests +from zenlog import log + +from radioactive.recorder import record_audio_auto_codec, record_audio_from_url +from radioactive.last_station import Last_station + + +def handle_fetch_song_title(url: str) -> None: + """Fetch currently playing track information""" + log.info("Fetching the current track info") + log.debug(f"Attempting to retrieve track info from: {url}") + # Run ffprobe command and capture the metadata + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_entries", + "format=icy", + url, + ] + track_name = "" + + try: + output = subprocess.check_output(cmd).decode("utf-8") + data = json.loads(output) + log.debug(f"station info: {data}") + + # Extract the station name (icy-name) if available + track_name = data.get("format", {}).get("tags", {}).get("StreamTitle", "") + except Exception: + log.error("Error while fetching the track name") + + if track_name != "": + log.info(f"🎶: {track_name}") + else: + log.error("No track information available") + + +def handle_record( + target_url: str, + curr_station_name: str, + record_file_path: str, + record_file: str, + record_file_format: str, # auto/mp3 + loglevel: str, +) -> None: + """ + Handle audio recording logic. + """ + log.info("Press 'q' to stop recording") + force_mp3 = False + + if record_file_format != "mp3" and record_file_format != "auto": + record_file_format = "mp3" # default to mp3 + log.debug("Error: wrong codec supplied!. falling back to mp3") + force_mp3 = True + elif record_file_format == "auto": + log.debug("Codec: fetching stream codec") + codec = record_audio_auto_codec(target_url) + if codec is None: + record_file_format = "mp3" # default to mp3 + force_mp3 = True + log.debug("Error: could not detect codec. falling back to mp3") + else: + record_file_format = codec + log.debug(f"Codec: found {codec}") + elif record_file_format == "mp3": + # always save to mp3 to eliminate any runtime issues + # it is better to leave it on libmp3lame + force_mp3 = True + + if record_file_path and not os.path.exists(record_file_path): + log.debug(f"filepath: {record_file_path}") + os.makedirs(record_file_path, exist_ok=True) + + elif not record_file_path: + log.debug("filepath: fallback to default path") + record_file_path = os.path.join( + os.path.expanduser("~"), "Music/radioactive" + ) # fallback path + try: + os.makedirs(record_file_path, exist_ok=True) + except Exception as e: + log.debug(f"{e}") + log.error("Could not make default directory") + sys.exit(1) + + now = datetime.datetime.now() + month_name = now.strftime("%b").upper() + # Format AM/PM as 'AM' or 'PM' + am_pm = now.strftime("%p") + + # format is : day-monthname-year@hour-minute-second-(AM/PM) + formatted_date_time = now.strftime(f"%d-{month_name}-%Y@%I-%M-%S-{am_pm}") + + if not record_file_format.strip(): + record_file_format = "mp3" + + if not record_file: + record_file = "{}-{}".format( + curr_station_name.strip(), formatted_date_time + ).replace(" ", "-") + + tmp_filename = f"{record_file}.{record_file_format}" + outfile_path = os.path.join(record_file_path, tmp_filename) + + log.info(f"Recording will be saved as: \n{outfile_path}") + + record_audio_from_url(target_url, outfile_path, force_mp3, loglevel) + + +def handle_add_station(alias) -> None: + """Add a new station to favorites via user input.""" + try: + left = input("Enter station name:") + right = input("Enter station stream-url or radio-browser uuid:") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + sys.exit(0) + + if left.strip() == "" or right.strip() == "": + log.error("Empty inputs not allowed") + sys.exit(1) + alias.add_entry(left, right) + log.info("New entry: {}={} added\n".format(left, right)) + sys.exit(0) + + +def handle_add_to_favorite(alias, station_name: str, station_uuid_url: str) -> None: + """Add the current station to favorites.""" + try: + response = alias.add_entry(station_name, station_uuid_url) + if not response: + try: + user_input = input("Enter a different name: ") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + sys.exit(0) + + if user_input.strip() != "": + response = alias.add_entry(user_input.strip(), station_uuid_url) + except Exception as e: + log.debug(f"Error: {e}") + log.error("Could not add to favorite. Already in list?") + + +def handle_save_last_station(last_station, station_name: str, station_url: str) -> None: + """Save the last played station.""" + # last_station = Last_station() # Provided as arg now + + last_played_station = {} + last_played_station["name"] = station_name + last_played_station["uuid_or_url"] = station_url + + log.debug(f"Saving the current station: {last_played_station}") + last_station.save_info(last_played_station) + + +def check_sort_by_parameter(sort_by: str) -> str: + """Validate and return the sort parameter.""" + accepted_parameters = [ + "name", + "votes", + "codec", + "bitrate", + "lastcheckok", + "lastchecktime", + "clickcount", + "clicktrend", + "random", + ] + + if sort_by not in accepted_parameters: + log.warning("Sort parameter is unknown. Falling back to 'name'") + + log.warning( + "choose from: name,votes,codec,bitrate,lastcheckok,lastchecktime,clickcount,clicktrend,random" + ) + return "name" + return sort_by + + +def handle_search_stations( + handler, + station_name: str, + limit: int, + sort_by: str, + filter_with: str +) -> Any: + """Wrapper to search stations by name.""" + log.debug(f"Searching API for: {station_name}") + return handler.search_by_station_name(station_name, limit, sort_by, filter_with) + + +def handle_station_uuid_play(handler, station_uuid: str) -> Tuple[str, str]: + """Play a station by UUID and register a vote.""" + log.debug(f"Searching API for: {station_uuid}") + + handler.play_by_station_uuid(station_uuid) + + log.debug(f"increased click count for: {station_uuid}") + + handler.vote_for_uuid(station_uuid) + try: + station_name = handler.target_station["name"] + station_url = handler.target_station["url"] + except Exception as e: + log.debug(f"{e}") + log.error("Something went wrong") + sys.exit(1) + + return station_name, station_url + + +def handle_direct_play(alias, station_name_or_url: str = "") -> Tuple[str, str]: + """Play a station directly with UUID or direct stream URL.""" + if "://" in station_name_or_url.strip(): + log.debug("Direct play: URL provided") + # stream URL + # call using URL with no station name N/A + # attempt to get station name from metadata + station_name = handle_get_station_name_from_metadata(station_name_or_url) + return station_name, station_name_or_url + else: + log.debug("Direct play: station name provided") + # station name from fav list + # search for the station in fav list and return name and url + + response = alias.search(station_name_or_url) + if not response: + log.error("No station found on your favorite list with the name") + sys.exit(1) + else: + log.debug(f"Direct play: {response}") + return response["name"], response["uuid_or_url"] + + +def handle_play_last_station(last_station) -> Tuple[str, str]: + """Play the last played station.""" + station_obj = last_station.get_info() + return station_obj["name"], station_obj["uuid_or_url"] + + +def handle_get_station_name_from_metadata(url: str) -> str: + """Get ICY metadata from ffprobe to find station name.""" + log.info("Fetching the station name") + log.debug(f"Attempting to retrieve station name from: {url}") + # Run ffprobe command and capture the metadata + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_entries", + "format=icy", + url, + ] + station_name = "Unknown Station" + + try: + output = subprocess.check_output(cmd).decode("utf-8") + data = json.loads(output) + log.debug(f"station info: {data}") + + # Extract the station name (icy-name) if available + station_name = ( + data.get("format", {}).get("tags", {}).get("icy-name", "Unknown Station") + ) + except Exception: + log.error("Could not fetch the station name") + + return station_name + + +def handle_station_name_from_headers(url: str) -> str: + """ + Get headers from URL to find station name (deprecated). + """ + log.info("Fetching the station name") + log.debug(f"Attempting to retrieve station name from: {url}") + station_name = "Unknown Station" + try: + # sync call, with timeout + response = requests.get(url, timeout=5) + if response.status_code == requests.codes.ok: + if response.headers.get("Icy-Name"): + station_name = response.headers.get("Icy-Name") + else: + log.error("Station name not found") + else: + log.debug(f"Response code received is: {response.status_code}") + except Exception as e: + log.error("Could not fetch the station name") + log.debug(f"An error occurred: {e}") + return station_name + + +def handle_play_random_station(alias) -> Tuple[str, str]: + """Select a random station from favorite menu.""" + log.debug("playing a random station") + alias_map = alias.alias_map + if not alias_map: + log.error("No favorite stations found") + sys.exit(1) + + index = randint(0, len(alias_map) - 1) + station = alias_map[index] + return station["name"], station["uuid_or_url"] diff --git a/radioactive/config.py b/radioactive/config.py index 90480b7..9138d5b 100644 --- a/radioactive/config.py +++ b/radioactive/config.py @@ -1,14 +1,21 @@ -# load configs from a file and apply. -# If any options are given on command line it will override the configs +""" +Configuration management for radio-active. +Handles loading, saving, and managing user configurations. +""" + import configparser import getpass -import os import sys +from typing import Dict, Any, Optional from zenlog import log -def write_a_sample_config_file(): +def write_a_sample_config_file() -> None: + """ + Create a sample configuration file with default settings. + Checks for the XDG config path and writes the file there. + """ # Create a ConfigParser object config = configparser.ConfigParser() @@ -24,12 +31,11 @@ def write_a_sample_config_file(): "player": "ffplay", } - from radioactive.paths import get_config_path - - # Specify the file path - file_path = get_config_path() - try: + from radioactive.paths import get_config_path + # Specify the file path + file_path = get_config_path() + # Write the configuration to the file with open(file_path, "w") as config_file: config.write(config_file) @@ -37,33 +43,53 @@ def write_a_sample_config_file(): log.info(f"A sample default configuration file added at: {file_path}") except Exception as e: - print(f"Error writing the configuration file: {e}") + log.error(f"Error writing the configuration file: {e}") class Configs: + """ + Class to handle loading and parsing of the configuration file. + """ def __init__(self): from radioactive.paths import get_config_path self.config_path = get_config_path() + self.config: Optional[configparser.ConfigParser] = None + + def load(self) -> Dict[str, str]: + """ + Load the configuration file and return options as a dictionary. - def load(self): + Returns: + dict: The configuration options. + """ self.config = configparser.ConfigParser() try: self.config.read(self.config_path) - options = {} - options["volume"] = self.config.get("AppConfig", "volume") - options["loglevel"] = self.config.get("AppConfig", "loglevel") - options["sort"] = self.config.get("AppConfig", "sort") - options["filter"] = self.config.get("AppConfig", "filter") - options["limit"] = self.config.get("AppConfig", "limit") - options["filepath"] = self.config.get("AppConfig", "filepath") - # if filepath has any placeholder, replace - # {user} to actual user map - options["filepath"] = options["filepath"].replace( - "{user}", getpass.getuser() - ) - options["filetype"] = self.config.get("AppConfig", "filetype") - options["player"] = self.config.get("AppConfig", "player") + options: Dict[str, str] = {} + + # Helper to safely get config values with defaults if section missing + def get_option(key: str, default: str = "") -> str: + try: + return self.config.get("AppConfig", key) + except (configparser.NoSectionError, configparser.NoOptionError): + return default + + options["volume"] = get_option("volume", "80") + options["loglevel"] = get_option("loglevel", "info") + options["sort"] = get_option("sort", "votes") + options["filter"] = get_option("filter", "none") + options["limit"] = get_option("limit", "100") + options["filepath"] = get_option("filepath", "/home/{user}/recordings/radioactive/") + + # if filepath has any placeholder, replace {user} to actual user map + if "{user}" in options["filepath"]: + options["filepath"] = options["filepath"].replace( + "{user}", getpass.getuser() + ) + + options["filetype"] = get_option("filetype", "mp3") + options["player"] = get_option("player", "ffplay") return options @@ -71,5 +97,5 @@ def load(self): log.error(f"Something went wrong while parsing the config file: {e}") # write the example config file write_a_sample_config_file() - log.info("Re-run radioative") + log.info("Re-run radioactive") sys.exit(1) diff --git a/radioactive/ffplay.py b/radioactive/ffplay.py index aeca804..fa74f04 100644 --- a/radioactive/ffplay.py +++ b/radioactive/ffplay.py @@ -1,3 +1,7 @@ +""" +Module for handling FFplay process management to play radio streams. +""" + import os import signal import subprocess @@ -5,12 +9,16 @@ import threading from shutil import which from time import sleep +from typing import Optional, List, Any import psutil from zenlog import log -def kill_background_ffplays(): +def kill_background_ffplays() -> None: + """ + Kill all background 'ffplay' processes started by this user. + """ all_processes = psutil.process_iter(attrs=["pid", "name"]) count = 0 # Iterate through the processes and terminate those named "ffplay" @@ -33,24 +41,37 @@ def kill_background_ffplays(): class Ffplay: - def __init__(self, URL, volume, loglevel): + """ + Wrapper class to manage the FFplay process for audio playback. + """ + def __init__(self, URL: str, volume: int, loglevel: str): self.program_name = "ffplay" self.url = URL self.volume = volume self.loglevel = loglevel self.is_playing = False - self.process = None + self.process: Optional[subprocess.Popen] = None + self.exe_path: Optional[str] = None + self.is_running = False self._check_ffplay_installation() self.start_process() - def _check_ffplay_installation(self): + def _check_ffplay_installation(self) -> None: + """Check if ffplay is installed and available in PATH.""" self.exe_path = which(self.program_name) if self.exe_path is None: log.critical("FFplay not found, install it first please") sys.exit(1) - def _construct_ffplay_commands(self): + def _construct_ffplay_commands(self) -> List[str]: + """Construct the command line arguments for ffplay.""" + # Ensure volume is within valid range (0-100) though ffplay accepts 0-100 + # Actually ffplay volume is 0-100 + + if self.exe_path is None: + raise RuntimeError("FFplay executable path is not set") + ffplay_commands = [self.exe_path, "-volume", f"{self.volume}", "-vn", self.url] if self.loglevel == "debug": @@ -60,7 +81,8 @@ def _construct_ffplay_commands(self): return ffplay_commands - def start_process(self): + def start_process(self) -> None: + """Start the ffplay process.""" try: ffplay_commands = self._construct_ffplay_commands() self.process = subprocess.Popen( @@ -76,37 +98,60 @@ def start_process(self): self._start_error_thread() except Exception as e: - log.error("Error while starting radio: {}".format(e)) + log.error(f"Error while starting radio: {e}") + self.is_playing = False - def _start_error_thread(self): + def _start_error_thread(self) -> None: + """Start a thread to monitor stderr for errors.""" error_thread = threading.Thread(target=self._check_error_output) error_thread.daemon = True error_thread.start() - def _check_error_output(self): + def _check_error_output(self) -> None: + """Monitor stderr for errors.""" + if not self.process or not self.process.stderr: + return + while self.is_running: - stderr_result = self.process.stderr.readline() - if stderr_result: - self._handle_error(stderr_result) - self.is_running = False - self.stop() - sleep(2) - - def _handle_error(self, stderr_result): + try: + stderr_result = self.process.stderr.readline() + if stderr_result: + self._handle_error(stderr_result) + self.is_running = False + self.stop() + break + except ValueError: + # ValueError: I/O operation on closed file. + break + except Exception: + break + sleep(0.5) + + def _handle_error(self, stderr_result: str) -> None: + """Log the error message.""" print() - log.error("Could not connect to the station") + log.error("Could not connect to the station/stream") try: log.debug(stderr_result) - log.error(stderr_result.split(": ")[1]) + parts = stderr_result.split(": ") + if len(parts) > 1: + log.error(parts[1].strip()) + else: + log.error(stderr_result.strip()) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error parsing stderr: {e}") pass - def terminate_parent_process(self): + def terminate_parent_process(self) -> None: + """Signal the parent process (main app) to terminate.""" parent_pid = os.getppid() - os.kill(parent_pid, signal.SIGINT) + try: + os.kill(parent_pid, signal.SIGINT) + except Exception as e: + log.debug(f"Could not kill parent process: {e}") - def is_active(self): + def is_active(self) -> bool: + """Check if the ffplay process is currently active/running.""" if not self.process: log.warning("Process is not initialized") return False @@ -124,25 +169,29 @@ def is_active(self): return False except (psutil.NoSuchProcess, Exception) as e: - log.debug("Process not found or error while checking status: {}".format(e)) + log.debug(f"Process not found or error checking status: {e}") return False - def play(self): + def play(self) -> None: + """Resume or start playback.""" if not self.is_playing: self.start_process() - def stop(self): - if self.is_playing: + def stop(self) -> None: + """Stop playback and terminate the process.""" + if self.is_playing and self.process: + self.is_running = False # Stop the error thread loop try: - self.process.kill() - self.process.wait(timeout=5) + self.process.terminate() + try: + self.process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=2) + log.debug("Radio playback stopped successfully") - except subprocess.TimeoutExpired: - log.warning("Radio process did not terminate, killing...") - self.process.kill() except Exception as e: - log.error("Error while stopping radio: {}".format(e)) - raise + log.error(f"Error while stopping radio: {e}") finally: self.is_playing = False self.process = None @@ -150,10 +199,10 @@ def stop(self): log.debug("Radio is not currently playing") self.terminate_parent_process() - def toggle(self): + def toggle(self) -> None: + """Toggle playback state.""" if self.is_playing: log.debug("Stopping the ffplay process") - self.is_running = False self.stop() else: log.debug("Starting the ffplay process") diff --git a/radioactive/filter.py b/radioactive/filter.py index 3f5caba..6a9c90c 100644 --- a/radioactive/filter.py +++ b/radioactive/filter.py @@ -1,34 +1,54 @@ +""" +Module for filtering radio station results based on various criteria. +""" + import sys +from typing import List, Dict, Any, Union from zenlog import log -# function to filter strings -def _filter_entries_by_key(data, filter_param, key): +def _filter_entries_by_key( + data: List[Dict[str, Any]], + filter_param: str, + key: str +) -> List[Dict[str, Any]]: + """ + Filter list of dictionaries by a string key using inclusion (=) or exclusion (!=). + """ log.debug(f"filter: {filter_param}") filtered_entries = [] for entry in data: value = entry.get(key) - - if value is not None and value != "": - if "!=" in filter_param: - # Handle exclusion - exclusion_values = filter_param.split("!=")[1].split(",") - + # Ensure value is a string for comparison + if value is None: + continue + + str_value = str(value) + if str_value == "": + continue + + if "!=" in filter_param: + # Handle exclusion + # Splitting safely to avoid index errors + parts = filter_param.split("!=") + if len(parts) > 1: + exclusion_values = parts[1].split(",") if all( - exclusion_value.lower() not in value.lower() + exclusion_value.lower() not in str_value.lower() for exclusion_value in exclusion_values ): filtered_entries.append(entry) - elif "=" in filter_param: - # Handle inclusion - inclusion_values = filter_param.split("=")[1].split(",") - + elif "=" in filter_param: + # Handle inclusion + parts = filter_param.split("=") + if len(parts) > 1: + inclusion_values = parts[1].split(",") if any( - inclusion_value.lower() in value.lower() + inclusion_value.lower() in str_value.lower() for inclusion_value in inclusion_values ): filtered_entries.append(entry) @@ -36,74 +56,105 @@ def _filter_entries_by_key(data, filter_param, key): return filtered_entries -# function to filter numeric values -def _filter_entries_by_numeric_key(data, filter_param, key): +def _filter_entries_by_numeric_key( + data: List[Dict[str, Any]], + filter_param: str, + key: str +) -> List[Dict[str, Any]]: + """ + Filter list of dictionaries by a numeric key. + Supports <, >, and = operators. + """ filtered_entries = [] - # filter_key = filter_param.split(key)[0] # most left hand of the expression - filter_param = filter_param.split(key)[1] # portion after the operator - filter_operator = filter_param[0] # operator part - filter_value = int(filter_param[1:]) # value part - # log.debug(f"filter: parameter:{filter_param}") + try: + # Split logic needs to be robust. + # Expected format: keyOPvalue e.g. votes>100 + # We know the key, so we can split by key + parts = filter_param.split(key) + if len(parts) < 2: + log.warning(f"Invalid filter format: {filter_param}") + return data + + param_part = parts[1] # portion after the key name e.g. >100, =50 + if not param_part: + return data + + filter_operator = param_part[0] # operator part + filter_value_str = param_part[1:] # value part + + if not filter_value_str: + log.warning(f"No value provided for filter: {filter_param}") + return data + + filter_value = int(filter_value_str) + + for entry in data: + val = entry.get(key) + if val is not None: + try: + # Convert to int, default to 0 if fails + int_val = int(val) + except (ValueError, TypeError): + continue - for entry in data: - value = int(entry.get(key)) - - if value is not None: - try: if filter_operator not in [">", "<", "="]: - log.warning("Unsupported filter operator, not filtering !!") + log.warning(f"Unsupported filter operator: {filter_operator}") return data - if filter_operator == "<" and value < filter_value: + + if filter_operator == "<" and int_val < filter_value: filtered_entries.append(entry) - elif filter_operator == ">" and value > filter_value: + elif filter_operator == ">" and int_val > filter_value: filtered_entries.append(entry) - elif filter_operator == "=" and value == filter_value: + elif filter_operator == "=" and int_val == filter_value: filtered_entries.append(entry) - except ValueError: - log.error(f"Invalid filter value for {key}: {filter_param}") - sys.exit(1) + except ValueError: + log.error(f"Invalid numeric filter value for {key}: {filter_param}") + sys.exit(1) + except Exception as e: + log.error(f"Error filtering by numeric key {key}: {e}") + sys.exit(1) return filtered_entries # allowed string string filters -def _filter_entries_by_name(data, filter_param): +def _filter_entries_by_name(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="name") -def _filter_entries_by_language(data, filter_param): +def _filter_entries_by_language(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="language") -def _filter_entries_by_country(data, filter_param): +def _filter_entries_by_country(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="countrycode") -def _filter_entries_by_tags(data, filter_param): +def _filter_entries_by_tags(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="tags") -def _filter_entries_by_codec(data, filter_param): +def _filter_entries_by_codec(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="codec") # allowed numeric filters -def _filter_entries_by_votes(data, filter_param): +def _filter_entries_by_votes(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="votes") -def _filter_entries_by_bitrate(data, filter_param): +def _filter_entries_by_bitrate(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="bitrate") -def _filter_entries_by_clickcount(data, filter_param): +def _filter_entries_by_clickcount(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="clickcount") # top level filter function -def _filter_results(data, expression): +def _filter_results(data: List[Dict], expression: str) -> List[Dict]: log.debug(f"Filter exp: {expression}") if not data: log.error("Empty results") @@ -126,15 +177,16 @@ def _filter_results(data, expression): elif "votes" in expression: return _filter_entries_by_votes(data, expression) else: - log.warning("Unknown filter expression, not filtering!") + log.warning(f"Unknown filter expression: {expression}, not filtering!") return data # Top most function for multiple filtering expressions with '&' -# NOTE: it will filter maintaining the order you provided on the CLI - - -def filter_expressions(data, input_expression): +def filter_expressions(data: List[Dict[str, Any]], input_expression: str) -> List[Dict[str, Any]]: + """ + Filter the list of stations based on the input expression. + Supports multiple filters separated by '&'. + """ log.info( "Setting a higher value for the --limit parameter is preferable when filtering stations." ) diff --git a/radioactive/handler.py b/radioactive/handler.py index 2a5e905..799686b 100644 --- a/radioactive/handler.py +++ b/radioactive/handler.py @@ -5,6 +5,7 @@ import datetime import json import sys +from typing import List, Optional, Union, Dict, Any import requests_cache from pyradios import RadioBrowser @@ -14,44 +15,56 @@ from radioactive.filter import filter_expressions +# constants +DEFAULT_CACHE_RETENTION_DAYS = 3 +BYTES_TO_MB_DIVISOR = 1024 * 1024 + console = Console() -def trim_string(text, max_length=40): +def trim_string(text: str, max_length: int = 40) -> str: """ Trim a string to a maximum length and add ellipsis if needed. Args: - text (str): The input text to be trimmed. - max_length (int, optional): The maximum length of the trimmed string. Defaults to 40. + text (str): The input text to be trimmed. + max_length (int, optional): The maximum length of the trimmed string. Defaults to 40. Returns: - str: The trimmed string, possibly with an ellipsis (...) if it was shortened. + str: The trimmed string, possibly with an ellipsis (...) if it was shortened. """ + if not isinstance(text, str): + return str(text) + if len(text) > max_length: return text[:max_length] + "..." - else: - return text + return text -def print_table(response, columns, sort_by, filter_expression): +def print_table( + response: List[Dict[str, Any]], + columns: List[str], + sort_by: str, + filter_expression: str +) -> List[Dict[str, Any]]: """ Print the table applying the sort logic. Args: - response (list): A list of data to be displayed in the table. - columns (list): List of column specifications in the format "col_name:response_key@max_str". - sort_by (str): The column by which to sort the table. + response (list): A list of data to be displayed in the table. + columns (list): List of column specifications in the format "col_name:response_key@max_str". + sort_by (str): The column by which to sort the table. + filter_expression (str): Filter expression to apply strings. Returns: - list: The original response data. + list: The original (or filtered) response data. """ if not response: log.error("No stations found") sys.exit(1) - # need to filter? + # Apply filtering if needed if filter_expression.lower() != "none": response = filter_expressions(response, filter_expression) @@ -61,44 +74,37 @@ def print_table(response, columns, sort_by, filter_expression): else: log.debug("Not filtering") - if len(response) >= 1: + if response: table = Table( show_header=True, header_style="magenta", expand=True, min_width=85, safe_box=True, - # show_footer=True, - # show_lines=True, - # padding=0.1, - # collapse_padding=True, ) table.add_column("ID", justify="center") + parsed_columns = [] for col_spec in columns: - col_name, response_key, max_str = ( - col_spec.split(":")[0], - col_spec.split(":")[1].split("@")[0], - int(col_spec.split("@")[1]), - ) + parts = col_spec.split(":") + col_name = parts[0] + rest = parts[1].split("@") + response_key = rest[0] + max_str = int(rest[1]) + + parsed_columns.append((col_name, response_key, max_str)) table.add_column(col_name, justify="left") - # do not need extra columns for these cases + # Add the sort column if it's not already displayed (and not generic 'name' or 'random') if sort_by not in ["name", "random"]: table.add_column(sort_by, justify="left") for i, station in enumerate(response): row_data = [str(i + 1)] # for ID - for col_spec in columns: - col_name, response_key, max_str = ( - col_spec.split(":")[0], - col_spec.split(":")[1].split("@")[0], - int(col_spec.split("@")[1]), - ) - row_data.append( - trim_string(station.get(response_key, ""), max_length=max_str) - ) + for _, response_key, max_str in parsed_columns: + val = station.get(response_key, "") + row_data.append(trim_string(val, max_length=max_str)) if sort_by not in ["name", "random"]: row_data.append(str(station.get(sort_by, ""))) @@ -106,9 +112,6 @@ def print_table(response, columns, sort_by, filter_expression): table.add_row(*row_data) console.print(table) - # log.info( - # "If the table does not fit into your screen, \ntry to maximize the window, decrease the font by a bit, and retry" - # ) return response else: log.info("No stations found") @@ -117,7 +120,7 @@ def print_table(response, columns, sort_by, filter_expression): class Handler: """ - radio-browser API handler. This module communicates with the underlying API via PyRadios + radio-browser API handler. This module communicates with the underlying API via PyRadios. """ def __init__(self): @@ -127,45 +130,82 @@ def __init__(self): # When RadioBrowser can not be initiated properly due to no internet (probably) try: - expire_after = datetime.timedelta(days=3) + expire_after = datetime.timedelta(days=DEFAULT_CACHE_RETENTION_DAYS) session = requests_cache.CachedSession( cache_name="cache", backend="sqlite", expire_after=expire_after ) self.API = RadioBrowser(session=session) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error initializing RadioBrowser: {e}") log.critical("Something is wrong with your internet connection") sys.exit(1) - def get_country_code(self, name): + def get_country_code(self, name: str) -> Optional[str]: + """ + Get the ISO 3166-1 alpha-2 country code for a given country name. + + Args: + name (str): The name of the country. + + Returns: + str: The country code if found, None otherwise. + """ self.countries = self.API.countries() for country in self.countries: if country["name"].lower() == name.lower(): return country["iso_3166_1"] return None - def validate_uuid_station(self): - if len(self.response) == 1: + def validate_uuid_station(self) -> List[Dict[str, Any]]: + """ + Validate that a station UUID search returned exactly one result and register a click. + + Returns: + list: The response list containing the station details. + """ + if self.response and len(self.response) >= 1: + # We take the first one if multiple (unlikely for UUID but possible in theory) log.debug(json.dumps(self.response[0], indent=3)) self.target_station = self.response[0] # register a valid click to increase its popularity - self.API.click_counter(self.target_station["stationuuid"]) + self.vote_for_uuid(self.target_station["stationuuid"]) return self.response + + log.error("Station found by UUID is invalid or empty") + sys.exit(1) # ---------------------------- NAME -------------------------------- # - def search_by_station_name(self, _name, limit, sort_by, filter_with): - """search and play a station by its name""" - reversed = sort_by != "name" + def search_by_station_name( + self, + name: str, + limit: int, + sort_by: str, + filter_with: str + ) -> List[Dict[str, Any]]: + """ + Search and play a station by its name. + + Args: + name (str): Station name to search for. + limit (int): Max number of results. + sort_by (str): Field to sort by. + filter_with (str): Filter expression. + + Returns: + list: List of found stations. + """ + # Rename 'reversed' to avoid shadowing built-in + is_reverse = sort_by != "name" try: response = self.API.search( - name=_name, + name=name, name_exact=False, limit=limit, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) return print_table( response, @@ -174,44 +214,60 @@ def search_by_station_name(self, _name, limit, sort_by, filter_with): filter_expression=filter_with, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error in search_by_station_name: {e}") log.error("Something went wrong. please try again.") sys.exit(1) # ------------------------- UUID ------------------------ # - def play_by_station_uuid(self, _uuid): - """search and play station by its stationuuid""" + def play_by_station_uuid(self, uuid: str) -> List[Dict[str, Any]]: + """ + Search and play station by its stationuuid. + + Args: + uuid (str): The UUID of the station. + + Returns: + list: Confirmed station details. + """ try: - self.response = self.API.station_by_uuid(_uuid) + self.response = self.API.station_by_uuid(uuid) return self.validate_uuid_station() except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error in play_by_station_uuid: {e}") log.error("Something went wrong. please try again.") sys.exit(1) # -------------------------- COUNTRY ----------------------# - def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with): - # set reverse to false if name is is the parameter for sorting - reversed = sort_by != "name" + def discover_by_country( + self, + country_code_or_name: str, + limit: int, + sort_by: str, + filter_with: str + ) -> List[Dict[str, Any]]: + """ + Discover stations by country code or name. + """ + is_reverse = sort_by != "name" # check if it is a code or name if len(country_code_or_name.strip()) == 2: # it's a code - log.debug("Country code '{}' provided".format(country_code_or_name)) + log.debug(f"Country code '{country_code_or_name}' provided") try: response = self.API.search( countrycode=country_code_or_name, limit=limit, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error searching by country code: {e}") log.error("Something went wrong. please try again.") sys.exit(1) else: # it's name - log.debug("Country name '{}' provided".format(country_code_or_name)) + log.debug(f"Country name '{country_code_or_name}' provided") code = self.get_country_code(country_code_or_name) if code: try: @@ -220,10 +276,10 @@ def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with) limit=limit, country_exact=True, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error searching by country name: {e}") log.error("Something went wrong. please try again.") sys.exit(1) else: @@ -246,14 +302,22 @@ def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with) # ------------------- by state --------------------- - def discover_by_state(self, state, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_state( + self, + state: str, + limit: int, + sort_by: str, + filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by state.""" + is_reverse = sort_by != "name" try: response = self.API.search( - state=state, limit=limit, order=str(sort_by), reverse=reversed + state=state, limit=limit, order=str(sort_by), reverse=is_reverse ) - except Exception: + except Exception as e: + log.debug(f"Error discover_by_state: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -272,15 +336,25 @@ def discover_by_state(self, state, limit, sort_by, filter_with): # -----------------by language -------------------- - def discover_by_language(self, language, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_language( + self, + language: str, + limit: int, + sort_by: str, + filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by language.""" + is_reverse = sort_by != "name" try: response = self.API.search( - language=language, limit=limit, order=str(sort_by), reverse=reversed + language=language, + limit=limit, + order=str(sort_by), + reverse=is_reverse ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error discover_by_language: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -297,15 +371,22 @@ def discover_by_language(self, language, limit, sort_by, filter_with): ) # -------------------- by tag ---------------------- # - def discover_by_tag(self, tag, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_tag( + self, + tag: str, + limit: int, + sort_by: str, + filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by tag.""" + is_reverse = sort_by != "name" try: response = self.API.search( - tag=tag, limit=limit, order=str(sort_by), reverse=reversed + tag=tag, limit=limit, order=str(sort_by), reverse=is_reverse ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error discover_by_tag: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -322,9 +403,11 @@ def discover_by_tag(self, tag, limit, sort_by, filter_with): ) # ---- Increase click count ------------- # - def vote_for_uuid(self, UUID): + def vote_for_uuid(self, uuid: str) -> Optional[Dict]: + """Increase the click count for a station UUID.""" try: - result = self.API.click_counter(UUID) + result = self.API.click_counter(uuid) return result except Exception as e: - log.debug("Something went wrong during increasing click count:{}".format(e)) + log.debug(f"Something went wrong during increasing click count: {e}") + return None diff --git a/radioactive/parser.py b/radioactive/parser.py index 0eedd35..f302eed 100644 --- a/radioactive/parser.py +++ b/radioactive/parser.py @@ -1,12 +1,21 @@ +from typing import Dict, Any, Optional + from zenlog import log from radioactive.args import Parser -def parse_options(): +def parse_options() -> Dict[str, Any]: + """ + Parse command-line arguments and return a dictionary of options. + + Returns: + dict: A dictionary containing all the parsed options and their values. + """ parser = Parser() args = parser.parse() - options = {} + options: Dict[str, Any] = {} + # ----------------- all the args ------------- # options["version"] = args.version options["show_help_table"] = args.help @@ -17,7 +26,7 @@ def parse_options(): log.level(options["loglevel"]) else: log.level("info") - log.warning("Correct log levels are: error,warning,info(default),debug") + log.warning("Correct log levels are: error, warning, info(default), debug") # check is limit is a valid integer limit = args.limit diff --git a/radioactive/ui.py b/radioactive/ui.py new file mode 100644 index 0000000..9afffee --- /dev/null +++ b/radioactive/ui.py @@ -0,0 +1,122 @@ +""" +UI components for radio-active using Rich. +""" + +from rich import print +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from zenlog import log + +# Global variable to store current station info for display +# This is shared state, ideally should be managed better, but keeping for compatibility +global_current_station_info = {} + + +def handle_welcome_screen() -> None: + """Print the welcome screen panel.""" + welcome = Panel( + """ + :radio: Play any radios around the globe right from this Terminal [yellow]:zap:[/yellow]! + :smile: Author: Dipankar Pal + :question: Type '--help' for more details on available commands + :bug: Visit: https://github.com/deep5050/radio-active to submit issues + :star: Show some love by starring the project on GitHub [red]:heart:[/red] + :dollar: You can donate me at https://deep5050.github.io/payme/ + :x: Press Ctrl+C to quit + """, + title="[b]RADIOACTIVE[/b]", + width=85, + expand=True, + safe_box=True, + ) + print(welcome) + + +def handle_update_screen(app) -> None: + """ + Check for updates and print a message if available. + + Args: + app: The App instance to check for updates. + """ + if app.is_update_available(): + update_msg = ( + "\t[blink]An update available, run [green][italic]pip install radio-active==" + + app.get_remote_version() + + "[/italic][/green][/blink]\nSee the changes: https://github.com/deep5050/radio-active/blob/main/CHANGELOG.md" + ) + update_panel = Panel( + update_msg, + width=85, + ) + print(update_panel) + else: + log.debug("Update not available") + + +def handle_favorite_table(alias) -> None: + """ + Print the user's favorite list in a table. + + Args: + alias: The Alias instance containing the favorite map. + """ + table = Table( + show_header=True, + header_style="bold magenta", + min_width=85, + safe_box=False, + expand=True, + ) + table.add_column("Station", justify="left") + table.add_column("URL / UUID", justify="left") + + if len(alias.alias_map) > 0: + for entry in alias.alias_map: + table.add_row(entry["name"], entry["uuid_or_url"]) + print(table) + log.info(f"Your favorite stations are saved in {alias.alias_path}") + else: + log.info("You have no favorite station list") + + +def handle_show_station_info() -> None: + """Show important information regarding the current station.""" + # pylint: disable=global-statement + global global_current_station_info + custom_info = {} + try: + custom_info["name"] = global_current_station_info.get("name") + custom_info["uuid"] = global_current_station_info.get("stationuuid") + custom_info["url"] = global_current_station_info.get("url") + custom_info["website"] = global_current_station_info.get("homepage") + custom_info["country"] = global_current_station_info.get("country") + custom_info["language"] = global_current_station_info.get("language") + custom_info["tags"] = global_current_station_info.get("tags") + custom_info["codec"] = global_current_station_info.get("codec") + custom_info["bitrate"] = global_current_station_info.get("bitrate") + print(custom_info) + except Exception as e: + log.error(f"No station information available: {e}") + + +def handle_current_play_panel(curr_station_name: str = "") -> None: + """ + Print the currently playing station panel. + + Args: + curr_station_name (str): Name of the station. + """ + panel_station_name = Text(curr_station_name, justify="center") + + station_panel = Panel(panel_station_name, title="[blink]:radio:[/blink]", width=85) + console = Console() + console.print(station_panel) + + +def set_global_station_info(info: dict) -> None: + """Helper to update global station info from other modules.""" + global global_current_station_info + global_current_station_info = info diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 4bbf41f..860c6a1 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -1,304 +1,62 @@ -"""Handler functions for __main__.py""" +""" +Handler functions for __main__.py. +Acts as a controller/orchestrator, delegating to UI and Actions modules. +""" -import datetime -import json -import os -import subprocess import sys from random import randint +from typing import Any, Dict, List, Optional, Tuple, Union -import requests from pick import pick -from rich import print -from rich.console import Console -from rich.panel import Panel -from rich.table import Table -from rich.text import Text from zenlog import log +# Re-export functions for backward compatibility and aggregation +from radioactive.ui import ( + handle_welcome_screen, + handle_update_screen, + handle_favorite_table, + handle_show_station_info, + handle_current_play_panel, + set_global_station_info, + global_current_station_info, +) + +from radioactive.actions import ( + handle_fetch_song_title, + handle_record, + handle_add_station, + handle_add_to_favorite, + handle_save_last_station, + check_sort_by_parameter, + handle_search_stations, + handle_station_uuid_play, + handle_direct_play, + handle_play_last_station, + handle_get_station_name_from_metadata, + handle_station_name_from_headers, + handle_play_random_station, +) from radioactive.ffplay import kill_background_ffplays -from radioactive.last_station import Last_station -from radioactive.recorder import record_audio_auto_codec, record_audio_from_url + RED_COLOR = "\033[91m" END_COLOR = "\033[0m" -global_current_station_info = {} - - -def handle_fetch_song_title(url): - """Fetch currently playing track information""" - log.info("Fetching the current track info") - log.debug("Attempting to retrieve track info from: {}".format(url)) - # Run ffprobe command and capture the metadata - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_entries", - "format=icy", - url, - ] - track_name = "" - - try: - output = subprocess.check_output(cmd).decode("utf-8") - data = json.loads(output) - log.debug(f"station info: {data}") - - # Extract the station name (icy-name) if available - track_name = data.get("format", {}).get("tags", {}).get("StreamTitle", "") - except: - log.error("Error while fetching the track name") - - if track_name != "": - log.info(f"🎶: {track_name}") - else: - log.error("No track information available") - - -def handle_record( - target_url, - curr_station_name, - record_file_path, - record_file, - record_file_format, # auto/mp3 - loglevel, -): - log.info("Press 'q' to stop recording") - force_mp3 = False - - if record_file_format != "mp3" and record_file_format != "auto": - record_file_format = "mp3" # default to mp3 - log.debug("Error: wrong codec supplied!. falling back to mp3") - force_mp3 = True - elif record_file_format == "auto": - log.debug("Codec: fetching stream codec") - codec = record_audio_auto_codec(target_url) - if codec is None: - record_file_format = "mp3" # default to mp3 - force_mp3 = True - log.debug("Error: could not detect codec. falling back to mp3") - else: - record_file_format = codec - log.debug("Codec: found {}".format(codec)) - elif record_file_format == "mp3": - # always save to mp3 to eliminate any runtime issues - # it is better to leave it on libmp3lame - force_mp3 = True - - if record_file_path and not os.path.exists(record_file_path): - log.debug("filepath: {}".format(record_file_path)) - os.makedirs(record_file_path, exist_ok=True) - - elif not record_file_path: - log.debug("filepath: fallback to default path") - record_file_path = os.path.join( - os.path.expanduser("~"), "Music/radioactive" - ) # fallback path - try: - os.makedirs(record_file_path, exist_ok=True) - except Exception as e: - log.debug("{}".format(e)) - log.error("Could not make default directory") - sys.exit(1) - - now = datetime.datetime.now() - month_name = now.strftime("%b").upper() - # Format AM/PM as 'AM' or 'PM' - am_pm = now.strftime("%p") - - # format is : day-monthname-year@hour-minute-second-(AM/PM) - formatted_date_time = now.strftime(f"%d-{month_name}-%Y@%I-%M-%S-{am_pm}") - - if not record_file_format.strip(): - record_file_format = "mp3" - - if not record_file: - record_file = "{}-{}".format( - curr_station_name.strip(), formatted_date_time - ).replace(" ", "-") - - tmp_filename = f"{record_file}.{record_file_format}" - outfile_path = os.path.join(record_file_path, tmp_filename) - - log.info(f"Recording will be saved as: \n{outfile_path}") - - record_audio_from_url(target_url, outfile_path, force_mp3, loglevel) - - -def handle_welcome_screen(): - welcome = Panel( - """ - :radio: Play any radios around the globe right from this Terminal [yellow]:zap:[/yellow]! - :smile: Author: Dipankar Pal - :question: Type '--help' for more details on available commands - :bug: Visit: https://github.com/deep5050/radio-active to submit issues - :star: Show some love by starring the project on GitHub [red]:heart:[/red] - :dollar: You can donate me at https://deep5050.github.io/payme/ - :x: Press Ctrl+C to quit - """, - title="[b]RADIOACTIVE[/b]", - width=85, - expand=True, - safe_box=True, - ) - print(welcome) - - -def handle_update_screen(app): - if app.is_update_available(): - update_msg = ( - "\t[blink]An update available, run [green][italic]pip install radio-active==" - + app.get_remote_version() - + "[/italic][/green][/blink]\nSee the changes: https://github.com/deep5050/radio-active/blob/main/CHANGELOG.md" - ) - update_panel = Panel( - update_msg, - width=85, - ) - print(update_panel) - else: - log.debug("Update not available") - - -def handle_favorite_table(alias): - # log.info("Your favorite station list is below") - table = Table( - show_header=True, - header_style="bold magenta", - min_width=85, - safe_box=False, - expand=True, - ) - table.add_column("Station", justify="left") - table.add_column("URL / UUID", justify="left") - if len(alias.alias_map) > 0: - for entry in alias.alias_map: - table.add_row(entry["name"], entry["uuid_or_url"]) - print(table) - log.info(f"Your favorite stations are saved in {alias.alias_path}") - else: - log.info("You have no favorite station list") - - -def handle_show_station_info(): - """Show important information regarding the current station""" - global global_current_station_info - custom_info = {} - try: - custom_info["name"] = global_current_station_info["name"] - custom_info["uuid"] = global_current_station_info["stationuuid"] - custom_info["url"] = global_current_station_info["url"] - custom_info["website"] = global_current_station_info["homepage"] - custom_info["country"] = global_current_station_info["country"] - custom_info["language"] = global_current_station_info["language"] - custom_info["tags"] = global_current_station_info["tags"] - custom_info["codec"] = global_current_station_info["codec"] - custom_info["bitrate"] = global_current_station_info["bitrate"] - print(custom_info) - except: - log.error("No station information available") - - -def handle_add_station(alias): - try: - left = input("Enter station name:") - right = input("Enter station stream-url or radio-browser uuid:") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - sys.exit(0) - - if left.strip() == "" or right.strip() == "": - log.error("Empty inputs not allowed") - sys.exit(1) - alias.add_entry(left, right) - log.info("New entry: {}={} added\n".format(left, right)) - sys.exit(0) - - -def handle_add_to_favorite(alias, station_name, station_uuid_url): - try: - response = alias.add_entry(station_name, station_uuid_url) - if not response: - try: - user_input = input("Enter a different name: ") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - sys.exit(0) - - if user_input.strip() != "": - response = alias.add_entry(user_input.strip(), station_uuid_url) - except Exception as e: - log.debug("Error: {}".format(e)) - log.error("Could not add to favorite. Already in list?") - - -def handle_station_uuid_play(handler, station_uuid): - log.debug("Searching API for: {}".format(station_uuid)) - - handler.play_by_station_uuid(station_uuid) - - log.debug("increased click count for: {}".format(station_uuid)) - - handler.vote_for_uuid(station_uuid) - try: - station_name = handler.target_station["name"] - station_url = handler.target_station["url"] - except Exception as e: - log.debug("{}".format(e)) - log.error("Something went wrong") - sys.exit(1) - - return station_name, station_url - - -def check_sort_by_parameter(sort_by): - accepted_parameters = [ - "name", - "votes", - "codec", - "bitrate", - "lastcheckok", - "lastchecktime", - "clickcount", - "clicktrend", - "random", - ] - - if sort_by not in accepted_parameters: - log.warning("Sort parameter is unknown. Falling back to 'name'") - - log.warning( - "choose from: name,votes,codec,bitrate,lastcheckok,lastchecktime,clickcount,clicktrend,random" - ) - return "name" - return sort_by - -def handle_search_stations(handler, station_name, limit, sort_by, filter_with): - log.debug("Searching API for: {}".format(station_name)) - - return handler.search_by_station_name(station_name, limit, sort_by, filter_with) - - -def handle_station_selection_menu(handler, last_station, alias): +def handle_station_selection_menu(handler, last_station, alias) -> Tuple[str, str]: + """ + Show a selection menu for favorite stations. + """ # Add a selection list here. first entry must be the last played station # try to fetch the last played station's information last_station_info = {} try: last_station_info = last_station.get_info() except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error: {e}") # no last station?? pass - # log.info("You can search for a station on internet using the --search option") title = "Please select a station from your favorite list:" station_selection_names = [] station_selection_urls = [] @@ -311,7 +69,7 @@ def handle_station_selection_menu(handler, last_station, alias): try: station_selection_urls.append(last_station_info["stationuuid"]) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error: {e}") station_selection_urls.append(last_station_info["uuid_or_url"]) fav_stations = alias.alias_map @@ -343,15 +101,73 @@ def handle_station_selection_menu(handler, last_station, alias): return handle_station_uuid_play(handler, station_uuid) -def handle_save_last_station(last_station, station_name, station_url): - last_station = Last_station() +def handle_user_choice_from_search_result(handler, response) -> Tuple[str, str]: + """ + Handle user selection from search results. + """ + if not response: + log.debug("No result found!") + sys.exit(0) + + if len(response) == 1: + # single station found + log.debug("Exactly one result found") + + try: + user_input = input("Want to play this station? Y/N: ") + except EOFError: + print() + sys.exit(0) + + if user_input in ["y", "Y"]: + log.debug("Playing UUID from single response") + # Update global info - handled via helper to ensure UI sees it + set_global_station_info(response[0]) + + return handle_station_uuid_play(handler, response[0]["stationuuid"]) + else: + log.debug("Quitting") + sys.exit(0) + else: + # multiple station + log.debug("Asking for user input") + + try: + log.info("Type 'r' to play a random station") + user_input = input("Type the result ID to play: ") + except EOFError: + print() + log.info("Exiting") + log.debug("EOF reached, quitting") + sys.exit(0) + + try: + if user_input in ["r", "R", "random"]: + # pick a random integer withing range + user_input = randint(1, len(response) - 1) + log.debug(f"Radom station id: {user_input}") + # elif user_input in ["f", "F", "fuzzy"]: + # fuzzy find all the stations, and return the selected station id + # user_input = fuzzy_find(response) + + user_input = int(user_input) - 1 # because ID starts from 1 + if user_input in range(0, len(response)): + target_response = response[user_input] + log.debug(f"Selected: {target_response}") - last_played_station = {} - last_played_station["name"] = station_name - last_played_station["uuid_or_url"] = station_url + # saving global info + set_global_station_info(target_response) - log.debug(f"Saving the current station: {last_played_station}") - last_station.save_info(last_played_station) + return handle_station_uuid_play(handler, target_response["stationuuid"]) + else: + log.error("Please enter an ID within the range") + sys.exit(1) + except ValueError: + log.error("Please enter an valid ID number") + sys.exit(1) + except Exception as e: + log.error(f"Error: {e}") + sys.exit(1) def handle_listen_keypress( @@ -364,7 +180,10 @@ def handle_listen_keypress( record_file, record_file_format, loglevel, -): +) -> None: + """ + Listen for user input during playback to perform actions. + """ log.info("Press '?' to see available commands\n") while True: try: @@ -385,8 +204,6 @@ def handle_listen_keypress( loglevel, ) elif user_input in ["rf", "RF", "recordfile"]: - # if no filename is provided try to auto detect - # else if ".mp3" is provided, use libmp3lame to force write to mp3 try: user_input = input("Enter output filename: ") except EOFError: @@ -397,17 +214,25 @@ def handle_listen_keypress( # try to get extension from filename try: - file_name, file_ext = user_input.split(".") - if file_ext == "mp3": + file_name_parts = user_input.split(".") + if len(file_name_parts) > 1 and file_name_parts[-1] == "mp3": log.debug("codec: force mp3") # overwrite original codec with "mp3" record_file_format = "mp3" + file_name = user_input.rsplit(".", 1)[0] # Handle filename with dots else: - log.warning("You can only specify mp3 as file extension.\n") - log.warning( - "Do not provide any extension to autodetect the codec.\n" - ) - except: + # If user typed "file.mp3", we want file_name="file.mp3" probably? + # Original code logic: split('.') -> takes first part as name, second as ext. + # If ext is mp3, set force mp3. + # If user enters "my.song", it mistakes "song" for ext. + + if len(file_name_parts) > 1 and file_name_parts[-1] != "mp3": + log.warning("You can only specify mp3 as file extension.\n") + log.warning( + "Do not provide any extension to autodetect the codec.\n" + ) + file_name = user_input + except Exception: file_name = user_input if user_input.strip() != "": @@ -437,7 +262,6 @@ def handle_listen_keypress( elif user_input in ["p", "P"]: # toggle the player (start/stop) player.toggle() - # TODO: toggle the player elif user_input in ["h", "H", "?", "help"]: log.info("p: Play/Pause current station") @@ -448,175 +272,3 @@ def handle_listen_keypress( log.info("f/fav: Add station to favorite list") log.info("h/help/?: Show this help message") log.info("q/quit: Quit radioactive") - - -def handle_current_play_panel(curr_station_name=""): - panel_station_name = Text(curr_station_name, justify="center") - - station_panel = Panel(panel_station_name, title="[blink]:radio:[/blink]", width=85) - console = Console() - console.print(station_panel) - - -def handle_user_choice_from_search_result(handler, response): - global global_current_station_info - - if not response: - log.debug("No result found!") - sys.exit(0) - if len(response) == 1: - # single station found - log.debug("Exactly one result found") - - try: - user_input = input("Want to play this station? Y/N: ") - except EOFError: - print() - sys.exit(0) - - if user_input in ["y", "Y"]: - log.debug("Playing UUID from single response") - global_current_station_info = response[0] - - return handle_station_uuid_play(handler, response[0]["stationuuid"]) - else: - log.debug("Quitting") - sys.exit(0) - else: - # multiple station - log.debug("Asking for user input") - - try: - log.info("Type 'r' to play a random station") - user_input = input("Type the result ID to play: ") - except EOFError: - print() - log.info("Exiting") - log.debug("EOF reached, quitting") - sys.exit(0) - - try: - if user_input in ["r", "R", "random"]: - # pick a random integer withing range - user_input = randint(1, len(response) - 1) - log.debug(f"Radom station id: {user_input}") - # elif user_input in ["f", "F", "fuzzy"]: - # fuzzy find all the stations, and return the selected station id - # user_input = fuzzy_find(response) - - user_input = int(user_input) - 1 # because ID starts from 1 - if user_input in range(0, len(response)): - target_response = response[user_input] - log.debug("Selected: {}".format(target_response)) - # log.info("UUID: {}".format(target_response["stationuuid"])) - - # saving global info - global_current_station_info = target_response - - return handle_station_uuid_play(handler, target_response["stationuuid"]) - else: - log.error("Please enter an ID within the range") - sys.exit(1) - except: - log.err("Please enter an valid ID number") - sys.exit(1) - - -def handle_direct_play(alias, station_name_or_url=""): - """Play a station directly with UUID or direct stream URL""" - if "://" in station_name_or_url.strip(): - log.debug("Direct play: URL provided") - # stream URL - # call using URL with no station name N/A - # let's attempt to get station name from url headers - # station_name = handle_station_name_from_headers(station_name_or_url) - station_name = handle_get_station_name_from_metadata(station_name_or_url) - return station_name, station_name_or_url - else: - log.debug("Direct play: station name provided") - # station name from fav list - # search for the station in fav list and return name and url - - response = alias.search(station_name_or_url) - if not response: - log.error("No station found on your favorite list with the name") - sys.exit(1) - else: - log.debug("Direct play: {}".format(response)) - return response["name"], response["uuid_or_url"] - - -def handle_play_last_station(last_station): - station_obj = last_station.get_info() - return station_obj["name"], station_obj["uuid_or_url"] - - -# uses ffprobe to fetch station name -def handle_get_station_name_from_metadata(url): - """Get ICY metadata from ffprobe""" - log.info("Fetching the station name") - log.debug("Attempting to retrieve station name from: {}".format(url)) - # Run ffprobe command and capture the metadata - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_entries", - "format=icy", - url, - ] - station_name = "Unknown Station" - - try: - output = subprocess.check_output(cmd).decode("utf-8") - data = json.loads(output) - log.debug(f"station info: {data}") - - # Extract the station name (icy-name) if available - station_name = ( - data.get("format", {}).get("tags", {}).get("icy-name", "Unknown Station") - ) - except: - log.error("Could not fetch the station name") - - return station_name - - -# uses requests module to fetch station name [deprecated] -def handle_station_name_from_headers(url): - # Get headers from URL so that we can get radio station - log.info("Fetching the station name") - log.debug("Attempting to retrieve station name from: {}".format(url)) - station_name = "Unknown Station" - try: - # sync call, with timeout - response = requests.get(url, timeout=5) - if response.status_code == requests.codes.ok: - if response.headers.get("Icy-Name"): - station_name = response.headers.get("Icy-Name") - else: - log.error("Station name not found") - else: - log.debug("Response code received is: {}".format(response.status_code())) - except Exception as e: - # except requests.HTTPError and requests.exceptions.ReadTimeout as e: - log.error("Could not fetch the station name") - log.debug( - """An error occurred: {} - The response code was {}""".format( - e, e.errno - ) - ) - return station_name - - -def handle_play_random_station(alias): - """Select a random station from favorite menu""" - log.debug("playing a random station") - alias_map = alias.alias_map - index = randint(0, len(alias_map) - 1) - station = alias_map[index] - return station["name"], station["uuid_or_url"] diff --git a/tmp_config/radio-active/config.ini b/tmp_config/radio-active/config.ini new file mode 100644 index 0000000..9fd498c --- /dev/null +++ b/tmp_config/radio-active/config.ini @@ -0,0 +1,10 @@ +[AppConfig] +loglevel = info +limit = 100 +sort = votes +filter = none +volume = 80 +filepath = /home/{user}/recordings/radioactive/ +filetype = mp3 +player = ffplay + From 9ddc99a3b9c7706b41bb2825733a0d19d3e05212 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Thu, 22 Jan 2026 22:44:21 +0530 Subject: [PATCH 03/12] Feat: Add runtime search and cycle commands --- README.md | 2 + radioactive/__main__.py | 19 ++++++--- radioactive/mpv.py | 4 ++ radioactive/utilities.py | 92 +++++++++++++++++++++++++++++++++++++--- radioactive/vlc.py | 17 ++++++++ 5 files changed, 121 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 4978ae7..1a1bede 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,8 @@ Enter a command to perform an action: ? t/T/track: Current song name (track info) r/R/record: Record a station f/F/fav: Add station to favorite list +s/S/search: Search for a new station +c/C/cycle: Cycle to next station in search results rf/RF/recordfile: Specify a filename for the recording. h/H/help/?: Show this help message q/Q/quit: Quit radioactive diff --git a/radioactive/__main__.py b/radioactive/__main__.py index e0989b4..5df8f1d 100755 --- a/radioactive/__main__.py +++ b/radioactive/__main__.py @@ -39,7 +39,7 @@ player = None -def final_step(options, last_station, alias, handler): +def final_step(options, last_station, alias, handler, station_list=None): global ffplay # always needed global player @@ -104,6 +104,8 @@ def final_step(options, last_station, alias, handler): record_file=options["record_file"], record_file_format=options["record_file_format"], loglevel=options["loglevel"], + handler=handler, + station_list=station_list ) @@ -169,7 +171,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -186,7 +188,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -203,7 +205,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -220,7 +222,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -268,7 +270,7 @@ def main(): ) = handle_user_choice_from_search_result(handler, response) # options["codec"] = response["codec"] # print(response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) # ------------------------- direct play ------------------------# @@ -292,6 +294,11 @@ def main(): final_step(options, last_station, alias, handler) # final_step() + # If response is not defined yet, initialize it + if 'response' not in locals(): + response = [] + + final_step(options, last_station, alias, handler, response) if os.name == "nt": while True: diff --git a/radioactive/mpv.py b/radioactive/mpv.py index 872dbb1..3cebe76 100644 --- a/radioactive/mpv.py +++ b/radioactive/mpv.py @@ -52,3 +52,7 @@ def toggle(self): self.stop() else: self.start(self.url) + + def play(self): + if not self.is_running and self.url: + self.start(self.url) diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 860c6a1..8e49607 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -180,9 +180,12 @@ def handle_listen_keypress( record_file, record_file_format, loglevel, + handler=None, + station_list=None, ) -> None: """ Listen for user input during playback to perform actions. + Now with handler and station_list for runtime commands. """ log.info("Press '?' to see available commands\n") while True: @@ -221,11 +224,6 @@ def handle_listen_keypress( record_file_format = "mp3" file_name = user_input.rsplit(".", 1)[0] # Handle filename with dots else: - # If user typed "file.mp3", we want file_name="file.mp3" probably? - # Original code logic: split('.') -> takes first part as name, second as ext. - # If ext is mp3, set force mp3. - # If user enters "my.song", it mistakes "song" for ext. - if len(file_name_parts) > 1 and file_name_parts[-1] != "mp3": log.warning("You can only specify mp3 as file extension.\n") log.warning( @@ -251,18 +249,96 @@ def handle_listen_keypress( handle_add_to_favorite(alias, station_name, station_url) elif user_input in ["q", "Q", "quit"]: - # kill_background_ffplays() player.stop() sys.exit(0) + elif user_input in ["w", "W", "list"]: alias.generate_map() handle_favorite_table(alias) + elif user_input in ["t", "T", "track"]: handle_fetch_song_title(target_url) + elif user_input in ["p", "P"]: - # toggle the player (start/stop) player.toggle() + elif user_input in ["s", "S", "search"]: + if handler: + try: + query = input("Enter station name to search: ") + except EOFError: + continue + + if query.strip(): + station_list = handle_search_stations( + handler, query, limit=100, sort_by="votes", filter_with="none" + ) + if station_list: + # Find valid station choice + try: + station_name, target_url = handle_user_choice_from_search_result( + handler, station_list + ) + # Stop current, switch + player.stop() + player.url = target_url + player.play() + handle_current_play_panel(station_name) + # Update loop variables + station_url = target_url + except SystemExit: + # handle_user_choice might try to exit on cancel + pass + else: + log.warning("Search unavailable (handler not initialized)") + + elif user_input in ["c", "C", "cycle"]: + if station_list and handler: + # Find current index + current_uuid = global_current_station_info.get("stationuuid") + current_index = -1 + for idx, st in enumerate(station_list): + if st.get("stationuuid") == current_uuid: + current_index = idx + break + + # Next index + next_index = (current_index + 1) % len(station_list) + + # Try to play next valid station + # We loop until we find one that works or we exhaust list + attempts = 0 + max_attempts = len(station_list) + + while attempts < max_attempts: + target_station = station_list[next_index] + set_global_station_info(target_station) + log.info(f"Switching to: {target_station.get('name')}") + + try: + station_name, target_url = handle_station_uuid_play( + handler, target_station["stationuuid"] + ) + player.stop() + player.url = target_url + player.play() + + # Check if successful (basic check) + # The player runs in threads, so strict check is hard, but we can trust if it didn't error immediately + handle_current_play_panel(station_name) + station_url = target_url + break + except Exception as e: + log.error(f"Failed to play {target_station.get('name')}: {e}") + next_index = (next_index + 1) % len(station_list) + attempts += 1 + + if attempts >= max_attempts: + log.error("Could not play any station from the list") + + else: + log.warning("Cycle unavailable (no search results to cycle through)") + elif user_input in ["h", "H", "?", "help"]: log.info("p: Play/Pause current station") log.info("t/track: Current track info") @@ -270,5 +346,7 @@ def handle_listen_keypress( log.info("r/record: Record a station") log.info("rf/recordfile: Specify a filename for the recording") log.info("f/fav: Add station to favorite list") + log.info("s/search: Search for a new station") + log.info("c/cycle: Cycle to next station in search results") log.info("h/help/?: Show this help message") log.info("q/quit: Quit radioactive") diff --git a/radioactive/vlc.py b/radioactive/vlc.py index 872ee29..69612f6 100644 --- a/radioactive/vlc.py +++ b/radioactive/vlc.py @@ -9,6 +9,19 @@ class VLC: def __init__(self): self.program_name = "vlc" self.exe_path = which(self.program_name) + + # Check common locations on Windows + if self.exe_path is None and sys.platform.startswith("win"): + import os + common_paths = [ + os.path.join(os.environ.get("ProgramFiles", "C:\\Program Files"), "VideoLAN", "VLC", "vlc.exe"), + os.path.join(os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)"), "VideoLAN", "VLC", "vlc.exe"), + ] + for path in common_paths: + if os.path.exists(path): + self.exe_path = path + break + log.debug(f"{self.program_name}: {self.exe_path}") if self.exe_path is None: @@ -52,3 +65,7 @@ def toggle(self): self.stop() else: self.start(self.url) + + def play(self): + if not self.is_running and self.url: + self.start(self.url) From de718d8105277b84d588d3b9cca9f61d0ecb6525 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Thu, 22 Jan 2026 23:17:18 +0530 Subject: [PATCH 04/12] Feat: Implement compile-time feature configuration system --- Makefile | 6 +- configure.py | 33 ++++++++ features.conf | 4 + radioactive/__main__.py | 4 +- radioactive/actions.py | 23 ++++-- radioactive/alias.py | 1 + radioactive/app.py | 1 + radioactive/args.py | 75 ++++++++++--------- radioactive/config.py | 13 +++- radioactive/feature_flags.py | 3 + radioactive/ffplay.py | 11 +-- radioactive/filter.py | 24 +++--- radioactive/handler.py | 57 +++++--------- radioactive/last_station.py | 2 +- radioactive/parser.py | 8 +- radioactive/paths.py | 23 +++--- radioactive/ui.py | 8 +- radioactive/utilities.py | 141 +++++++++++++++++++---------------- radioactive/vlc.py | 15 +++- 19 files changed, 258 insertions(+), 194 deletions(-) create mode 100644 configure.py create mode 100644 features.conf create mode 100644 radioactive/feature_flags.py diff --git a/Makefile b/Makefile index 9f11434..90a54ca 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,11 @@ SRC_DIR = "radioactive" TEST_DIR = "test" .PHONY: all clean isort check dist deploy test-deploy help build install install-dev test -all: clean isort format check build install +all: clean isort format check configure build install + +configure: + @echo "Configuring features..." + ${PYTHON} configure.py check: @echo "Chceking linting errors......." diff --git a/configure.py b/configure.py new file mode 100644 index 0000000..1cdc40b --- /dev/null +++ b/configure.py @@ -0,0 +1,33 @@ + +import os + +CONFIG_FILE = "features.conf" +FEATURE_FLAGS_FILE = "radioactive/feature_flags.py" + +def generate_flags(): + print(f"Generating {FEATURE_FLAGS_FILE} from {CONFIG_FILE}...") + + flags = {} + if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE, "r") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, value = line.split("=", 1) + flags[key.strip()] = value.strip().lower() == "true" + else: + print(f"Warning: {CONFIG_FILE} not found. Using defaults.") + # Default flags if config is missing + flags["RECORDING_FEATURE"] = True + + with open(FEATURE_FLAGS_FILE, "w") as f: + f.write("# This file is auto-generated by the configure step. Do not edit manually.\n\n") + for key, value in flags.items(): + f.write(f"{key} = {value}\n") + + print(f"done. {FEATURE_FLAGS_FILE} is ready.") + +if __name__ == "__main__": + generate_flags() diff --git a/features.conf b/features.conf new file mode 100644 index 0000000..677ded1 --- /dev/null +++ b/features.conf @@ -0,0 +1,4 @@ +# Feature Configuration File +# Set features to true or false to enable/disable them at "compile time" + +RECORDING_FEATURE=false diff --git a/radioactive/__main__.py b/radioactive/__main__.py index 5df8f1d..68bb815 100755 --- a/radioactive/__main__.py +++ b/radioactive/__main__.py @@ -105,7 +105,7 @@ def final_step(options, last_station, alias, handler, station_list=None): record_file_format=options["record_file_format"], loglevel=options["loglevel"], handler=handler, - station_list=station_list + station_list=station_list, ) @@ -295,7 +295,7 @@ def main(): # final_step() # If response is not defined yet, initialize it - if 'response' not in locals(): + if "response" not in locals(): response = [] final_step(options, last_station, alias, handler, response) diff --git a/radioactive/actions.py b/radioactive/actions.py index a0036cd..576f1d1 100644 --- a/radioactive/actions.py +++ b/radioactive/actions.py @@ -13,7 +13,14 @@ import requests from zenlog import log -from radioactive.recorder import record_audio_auto_codec, record_audio_from_url +try: + from radioactive.feature_flags import RECORDING_FEATURE +except ImportError: + # Default to True if file not found (e.g. dev mode without configure) + RECORDING_FEATURE = True + +if RECORDING_FEATURE: + from radioactive.recorder import record_audio_auto_codec, record_audio_from_url from radioactive.last_station import Last_station @@ -62,6 +69,10 @@ def handle_record( """ Handle audio recording logic. """ + if not RECORDING_FEATURE: + log.error("Recording feature is not compiled/enabled in this build.") + sys.exit(1) + log.info("Press 'q' to stop recording") force_mp3 = False @@ -164,7 +175,7 @@ def handle_add_to_favorite(alias, station_name: str, station_uuid_url: str) -> N def handle_save_last_station(last_station, station_name: str, station_url: str) -> None: """Save the last played station.""" # last_station = Last_station() # Provided as arg now - + last_played_station = {} last_played_station["name"] = station_name last_played_station["uuid_or_url"] = station_url @@ -198,11 +209,7 @@ def check_sort_by_parameter(sort_by: str) -> str: def handle_search_stations( - handler, - station_name: str, - limit: int, - sort_by: str, - filter_with: str + handler, station_name: str, limit: int, sort_by: str, filter_with: str ) -> Any: """Wrapper to search stations by name.""" log.debug(f"Searching API for: {station_name}") @@ -321,7 +328,7 @@ def handle_play_random_station(alias) -> Tuple[str, str]: if not alias_map: log.error("No favorite stations found") sys.exit(1) - + index = randint(0, len(alias_map) - 1) station = alias_map[index] return station["name"], station["uuid_or_url"] diff --git a/radioactive/alias.py b/radioactive/alias.py index ecb9d5e..e94d042 100644 --- a/radioactive/alias.py +++ b/radioactive/alias.py @@ -7,6 +7,7 @@ class Alias: def __init__(self): from radioactive.paths import get_alias_path + self.alias_map = [] self.found = False diff --git a/radioactive/app.py b/radioactive/app.py index b4502a1..128a644 100644 --- a/radioactive/app.py +++ b/radioactive/app.py @@ -3,6 +3,7 @@ it needs to be updated in every release) and to check if an updated version available for the app or not """ + import json import requests diff --git a/radioactive/args.py b/radioactive/args.py index 72b1717..740483e 100644 --- a/radioactive/args.py +++ b/radioactive/args.py @@ -5,6 +5,11 @@ from radioactive.config import Configs +try: + from radioactive.feature_flags import RECORDING_FEATURE +except ImportError: + RECORDING_FEATURE = True + # load default configs def load_default_configs(): @@ -15,7 +20,6 @@ def load_default_configs(): class Parser: - """Parse the command-line args and return result to the __main__""" def __init__(self): @@ -203,40 +207,41 @@ def __init__(self): help="kill all the ffplay process initiated by radioactive", ) - self.parser.add_argument( - "--record", - "-R", - action="store_true", - dest="record_stream", - default=False, - help="record a station and save as audio file", - ) - - self.parser.add_argument( - "--filepath", - action="store", - dest="record_file_path", - default=self.defaults["filepath"], - help="specify the audio format for recording", - ) - - self.parser.add_argument( - "--filename", - "-N", - action="store", - dest="record_file", - default="", - help="specify the output filename of the recorded audio", - ) - - self.parser.add_argument( - "--filetype", - "-T", - action="store", - dest="record_file_format", - default=self.defaults["filetype"], - help="specify the audio format for recording. auto/mp3", - ) + if RECORDING_FEATURE: + self.parser.add_argument( + "--record", + "-R", + action="store_true", + dest="record_stream", + default=False, + help="record a station and save as audio file", + ) + + self.parser.add_argument( + "--filepath", + action="store", + dest="record_file_path", + default=self.defaults["filepath"], + help="specify the audio format for recording", + ) + + self.parser.add_argument( + "--filename", + "-N", + action="store", + dest="record_file", + default="", + help="specify the output filename of the recorded audio", + ) + + self.parser.add_argument( + "--filetype", + "-T", + action="store", + dest="record_file_format", + default=self.defaults["filetype"], + help="specify the audio format for recording. auto/mp3", + ) self.parser.add_argument( "--player", diff --git a/radioactive/config.py b/radioactive/config.py index 9138d5b..cf8851a 100644 --- a/radioactive/config.py +++ b/radioactive/config.py @@ -33,6 +33,7 @@ def write_a_sample_config_file() -> None: try: from radioactive.paths import get_config_path + # Specify the file path file_path = get_config_path() @@ -50,8 +51,10 @@ class Configs: """ Class to handle loading and parsing of the configuration file. """ + def __init__(self): from radioactive.paths import get_config_path + self.config_path = get_config_path() self.config: Optional[configparser.ConfigParser] = None @@ -67,7 +70,7 @@ def load(self) -> Dict[str, str]: try: self.config.read(self.config_path) options: Dict[str, str] = {} - + # Helper to safely get config values with defaults if section missing def get_option(key: str, default: str = "") -> str: try: @@ -80,14 +83,16 @@ def get_option(key: str, default: str = "") -> str: options["sort"] = get_option("sort", "votes") options["filter"] = get_option("filter", "none") options["limit"] = get_option("limit", "100") - options["filepath"] = get_option("filepath", "/home/{user}/recordings/radioactive/") - + options["filepath"] = get_option( + "filepath", "/home/{user}/recordings/radioactive/" + ) + # if filepath has any placeholder, replace {user} to actual user map if "{user}" in options["filepath"]: options["filepath"] = options["filepath"].replace( "{user}", getpass.getuser() ) - + options["filetype"] = get_option("filetype", "mp3") options["player"] = get_option("player", "ffplay") diff --git a/radioactive/feature_flags.py b/radioactive/feature_flags.py new file mode 100644 index 0000000..1a43edd --- /dev/null +++ b/radioactive/feature_flags.py @@ -0,0 +1,3 @@ +# This file is auto-generated by the configure step. Do not edit manually. + +RECORDING_FEATURE = False diff --git a/radioactive/ffplay.py b/radioactive/ffplay.py index fa74f04..718eb10 100644 --- a/radioactive/ffplay.py +++ b/radioactive/ffplay.py @@ -44,6 +44,7 @@ class Ffplay: """ Wrapper class to manage the FFplay process for audio playback. """ + def __init__(self, URL: str, volume: int, loglevel: str): self.program_name = "ffplay" self.url = URL @@ -68,7 +69,7 @@ def _construct_ffplay_commands(self) -> List[str]: """Construct the command line arguments for ffplay.""" # Ensure volume is within valid range (0-100) though ffplay accepts 0-100 # Actually ffplay volume is 0-100 - + if self.exe_path is None: raise RuntimeError("FFplay executable path is not set") @@ -121,8 +122,8 @@ def _check_error_output(self) -> None: self.stop() break except ValueError: - # ValueError: I/O operation on closed file. - break + # ValueError: I/O operation on closed file. + break except Exception: break sleep(0.5) @@ -180,7 +181,7 @@ def play(self) -> None: def stop(self) -> None: """Stop playback and terminate the process.""" if self.is_playing and self.process: - self.is_running = False # Stop the error thread loop + self.is_running = False # Stop the error thread loop try: self.process.terminate() try: @@ -188,7 +189,7 @@ def stop(self) -> None: except subprocess.TimeoutExpired: self.process.kill() self.process.wait(timeout=2) - + log.debug("Radio playback stopped successfully") except Exception as e: log.error(f"Error while stopping radio: {e}") diff --git a/radioactive/filter.py b/radioactive/filter.py index 6a9c90c..e43bf74 100644 --- a/radioactive/filter.py +++ b/radioactive/filter.py @@ -9,9 +9,7 @@ def _filter_entries_by_key( - data: List[Dict[str, Any]], - filter_param: str, - key: str + data: List[Dict[str, Any]], filter_param: str, key: str ) -> List[Dict[str, Any]]: """ Filter list of dictionaries by a string key using inclusion (=) or exclusion (!=). @@ -25,10 +23,10 @@ def _filter_entries_by_key( # Ensure value is a string for comparison if value is None: continue - + str_value = str(value) if str_value == "": - continue + continue if "!=" in filter_param: # Handle exclusion @@ -57,9 +55,7 @@ def _filter_entries_by_key( def _filter_entries_by_numeric_key( - data: List[Dict[str, Any]], - filter_param: str, - key: str + data: List[Dict[str, Any]], filter_param: str, key: str ) -> List[Dict[str, Any]]: """ Filter list of dictionaries by a numeric key. @@ -68,25 +64,25 @@ def _filter_entries_by_numeric_key( filtered_entries = [] try: - # Split logic needs to be robust. + # Split logic needs to be robust. # Expected format: keyOPvalue e.g. votes>100 # We know the key, so we can split by key parts = filter_param.split(key) if len(parts) < 2: log.warning(f"Invalid filter format: {filter_param}") return data - + param_part = parts[1] # portion after the key name e.g. >100, =50 if not param_part: return data filter_operator = param_part[0] # operator part filter_value_str = param_part[1:] # value part - + if not filter_value_str: log.warning(f"No value provided for filter: {filter_param}") return data - + filter_value = int(filter_value_str) for entry in data: @@ -182,7 +178,9 @@ def _filter_results(data: List[Dict], expression: str) -> List[Dict]: # Top most function for multiple filtering expressions with '&' -def filter_expressions(data: List[Dict[str, Any]], input_expression: str) -> List[Dict[str, Any]]: +def filter_expressions( + data: List[Dict[str, Any]], input_expression: str +) -> List[Dict[str, Any]]: """ Filter the list of stations based on the input expression. Supports multiple filters separated by '&'. diff --git a/radioactive/handler.py b/radioactive/handler.py index 799686b..a0fd054 100644 --- a/radioactive/handler.py +++ b/radioactive/handler.py @@ -35,7 +35,7 @@ def trim_string(text: str, max_length: int = 40) -> str: """ if not isinstance(text, str): return str(text) - + if len(text) > max_length: return text[:max_length] + "..." return text @@ -45,7 +45,7 @@ def print_table( response: List[Dict[str, Any]], columns: List[str], sort_by: str, - filter_expression: str + filter_expression: str, ) -> List[Dict[str, Any]]: """ Print the table applying the sort logic. @@ -91,7 +91,7 @@ def print_table( rest = parts[1].split("@") response_key = rest[0] max_str = int(rest[1]) - + parsed_columns.append((col_name, response_key, max_str)) table.add_column(col_name, justify="left") @@ -143,10 +143,10 @@ def __init__(self): def get_country_code(self, name: str) -> Optional[str]: """ Get the ISO 3166-1 alpha-2 country code for a given country name. - + Args: name (str): The name of the country. - + Returns: str: The country code if found, None otherwise. """ @@ -159,7 +159,7 @@ def get_country_code(self, name: str) -> Optional[str]: def validate_uuid_station(self) -> List[Dict[str, Any]]: """ Validate that a station UUID search returned exactly one result and register a click. - + Returns: list: The response list containing the station details. """ @@ -172,27 +172,23 @@ def validate_uuid_station(self) -> List[Dict[str, Any]]: self.vote_for_uuid(self.target_station["stationuuid"]) return self.response - + log.error("Station found by UUID is invalid or empty") sys.exit(1) # ---------------------------- NAME -------------------------------- # def search_by_station_name( - self, - name: str, - limit: int, - sort_by: str, - filter_with: str + self, name: str, limit: int, sort_by: str, filter_with: str ) -> List[Dict[str, Any]]: """ Search and play a station by its name. - + Args: name (str): Station name to search for. limit (int): Max number of results. sort_by (str): Field to sort by. filter_with (str): Filter expression. - + Returns: list: List of found stations. """ @@ -222,10 +218,10 @@ def search_by_station_name( def play_by_station_uuid(self, uuid: str) -> List[Dict[str, Any]]: """ Search and play station by its stationuuid. - + Args: uuid (str): The UUID of the station. - + Returns: list: Confirmed station details. """ @@ -239,11 +235,7 @@ def play_by_station_uuid(self, uuid: str) -> List[Dict[str, Any]]: # -------------------------- COUNTRY ----------------------# def discover_by_country( - self, - country_code_or_name: str, - limit: int, - sort_by: str, - filter_with: str + self, country_code_or_name: str, limit: int, sort_by: str, filter_with: str ) -> List[Dict[str, Any]]: """ Discover stations by country code or name. @@ -303,11 +295,7 @@ def discover_by_country( # ------------------- by state --------------------- def discover_by_state( - self, - state: str, - limit: int, - sort_by: str, - filter_with: str + self, state: str, limit: int, sort_by: str, filter_with: str ) -> List[Dict[str, Any]]: """Discover stations by state.""" is_reverse = sort_by != "name" @@ -337,21 +325,14 @@ def discover_by_state( # -----------------by language -------------------- def discover_by_language( - self, - language: str, - limit: int, - sort_by: str, - filter_with: str + self, language: str, limit: int, sort_by: str, filter_with: str ) -> List[Dict[str, Any]]: """Discover stations by language.""" is_reverse = sort_by != "name" try: response = self.API.search( - language=language, - limit=limit, - order=str(sort_by), - reverse=is_reverse + language=language, limit=limit, order=str(sort_by), reverse=is_reverse ) except Exception as e: log.debug(f"Error discover_by_language: {e}") @@ -372,11 +353,7 @@ def discover_by_language( # -------------------- by tag ---------------------- # def discover_by_tag( - self, - tag: str, - limit: int, - sort_by: str, - filter_with: str + self, tag: str, limit: int, sort_by: str, filter_with: str ) -> List[Dict[str, Any]]: """Discover stations by tag.""" is_reverse = sort_by != "name" diff --git a/radioactive/last_station.py b/radioactive/last_station.py index 27a99cf..42eb36a 100644 --- a/radioactive/last_station.py +++ b/radioactive/last_station.py @@ -8,7 +8,6 @@ class Last_station: - """Saves the last played radio station information, when user don't provide any -S or -U it looks for the information. @@ -18,6 +17,7 @@ class Last_station: def __init__(self): from radioactive.paths import get_last_station_path + self.last_station_path = get_last_station_path() def get_info(self): diff --git a/radioactive/parser.py b/radioactive/parser.py index f302eed..f43dcc2 100644 --- a/radioactive/parser.py +++ b/radioactive/parser.py @@ -57,10 +57,10 @@ def parse_options() -> Dict[str, Any]: options["kill_ffplays"] = args.kill_ffplays - options["record_stream"] = args.record_stream - options["record_file"] = args.record_file - options["record_file_format"] = args.record_file_format - options["record_file_path"] = args.record_file_path + options["record_stream"] = getattr(args, "record_stream", False) + options["record_file"] = getattr(args, "record_file", "") + options["record_file_format"] = getattr(args, "record_file_format", "mp3") + options["record_file_path"] = getattr(args, "record_file_path", "") options["target_url"] = "" options["volume"] = args.volume diff --git a/radioactive/paths.py b/radioactive/paths.py index 79b3a43..356e5a7 100644 --- a/radioactive/paths.py +++ b/radioactive/paths.py @@ -2,6 +2,7 @@ import shutil from zenlog import log + def _get_xdg_config_dir(): """Return the XDG configuration directory for radio-active.""" xdg_config_home = os.environ.get("XDG_CONFIG_HOME") @@ -26,11 +27,11 @@ def get_config_path(): Migrates from legacy path if it exists and new path does not. """ legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-configs.ini") - + config_dir = _get_xdg_config_dir() os.makedirs(config_dir, exist_ok=True) new_path = os.path.join(config_dir, "config.ini") - + if os.path.exists(legacy_path) and not os.path.exists(new_path): log.info(f"Migrating config file from {legacy_path} to {new_path}") try: @@ -38,7 +39,7 @@ def get_config_path(): except Exception as e: log.warning(f"Could not migrate config file: {e}") # If migration fails, we return new_path anyway, user might have to manually move or start fresh - + return new_path @@ -48,18 +49,18 @@ def get_alias_path(): Migrates from legacy path if it exists and new path does not. """ legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-alias") - - config_dir = _get_xdg_config_dir() # Aliases are user config + + config_dir = _get_xdg_config_dir() # Aliases are user config os.makedirs(config_dir, exist_ok=True) new_path = os.path.join(config_dir, "alias_map") - + if os.path.exists(legacy_path) and not os.path.exists(new_path): log.info(f"Migrating alias file from {legacy_path} to {new_path}") try: shutil.move(legacy_path, new_path) except Exception as e: log.warning(f"Could not migrate alias file: {e}") - + return new_path @@ -69,16 +70,16 @@ def get_last_station_path(): Migrates from legacy path if it exists and new path does not. """ legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-last-station") - - data_dir = _get_xdg_data_dir() # Last station is state/data + + data_dir = _get_xdg_data_dir() # Last station is state/data os.makedirs(data_dir, exist_ok=True) new_path = os.path.join(data_dir, "last-station") - + if os.path.exists(legacy_path) and not os.path.exists(new_path): log.info(f"Migrating last station file from {legacy_path} to {new_path}") try: shutil.move(legacy_path, new_path) except Exception as e: log.warning(f"Could not migrate last station file: {e}") - + return new_path diff --git a/radioactive/ui.py b/radioactive/ui.py index 9afffee..f5cff35 100644 --- a/radioactive/ui.py +++ b/radioactive/ui.py @@ -37,7 +37,7 @@ def handle_welcome_screen() -> None: def handle_update_screen(app) -> None: """ Check for updates and print a message if available. - + Args: app: The App instance to check for updates. """ @@ -59,7 +59,7 @@ def handle_update_screen(app) -> None: def handle_favorite_table(alias) -> None: """ Print the user's favorite list in a table. - + Args: alias: The Alias instance containing the favorite map. """ @@ -72,7 +72,7 @@ def handle_favorite_table(alias) -> None: ) table.add_column("Station", justify="left") table.add_column("URL / UUID", justify="left") - + if len(alias.alias_map) > 0: for entry in alias.alias_map: table.add_row(entry["name"], entry["uuid_or_url"]) @@ -105,7 +105,7 @@ def handle_show_station_info() -> None: def handle_current_play_panel(curr_station_name: str = "") -> None: """ Print the currently playing station panel. - + Args: curr_station_name (str): Name of the station. """ diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 8e49607..ab3245f 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -7,9 +7,15 @@ from random import randint from typing import Any, Dict, List, Optional, Tuple, Union +from pick import pick from pick import pick from zenlog import log +try: + from radioactive.feature_flags import RECORDING_FEATURE +except ImportError: + RECORDING_FEATURE = True + # Re-export functions for backward compatibility and aggregation from radioactive.ui import ( handle_welcome_screen, @@ -108,7 +114,7 @@ def handle_user_choice_from_search_result(handler, response) -> Tuple[str, str]: if not response: log.debug("No result found!") sys.exit(0) - + if len(response) == 1: # single station found log.debug("Exactly one result found") @@ -163,8 +169,8 @@ def handle_user_choice_from_search_result(handler, response) -> Tuple[str, str]: log.error("Please enter an ID within the range") sys.exit(1) except ValueError: - log.error("Please enter an valid ID number") - sys.exit(1) + log.error("Please enter an valid ID number") + sys.exit(1) except Exception as e: log.error(f"Error: {e}") sys.exit(1) @@ -197,52 +203,56 @@ def handle_listen_keypress( kill_background_ffplays() sys.exit(0) - if user_input in ["r", "R", "record"]: - handle_record( - target_url, - station_name, - record_file_path, - record_file, - record_file_format, - loglevel, - ) - elif user_input in ["rf", "RF", "recordfile"]: - try: - user_input = input("Enter output filename: ") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - kill_background_ffplays() - sys.exit(0) - - # try to get extension from filename - try: - file_name_parts = user_input.split(".") - if len(file_name_parts) > 1 and file_name_parts[-1] == "mp3": - log.debug("codec: force mp3") - # overwrite original codec with "mp3" - record_file_format = "mp3" - file_name = user_input.rsplit(".", 1)[0] # Handle filename with dots - else: - if len(file_name_parts) > 1 and file_name_parts[-1] != "mp3": - log.warning("You can only specify mp3 as file extension.\n") - log.warning( - "Do not provide any extension to autodetect the codec.\n" - ) - file_name = user_input - except Exception: - file_name = user_input - - if user_input.strip() != "": + if RECORDING_FEATURE: + if user_input in ["r", "R", "record"]: handle_record( target_url, station_name, record_file_path, - file_name, + record_file, record_file_format, loglevel, ) - elif user_input in ["i", "I", "info"]: + elif user_input in ["rf", "RF", "recordfile"]: + try: + user_input = input("Enter output filename: ") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + kill_background_ffplays() + sys.exit(0) + + # try to get extension from filename + try: + file_name_parts = user_input.split(".") + if len(file_name_parts) > 1 and file_name_parts[-1] == "mp3": + log.debug("codec: force mp3") + # overwrite original codec with "mp3" + record_file_format = "mp3" + file_name = user_input.rsplit(".", 1)[ + 0 + ] # Handle filename with dots + else: + if len(file_name_parts) > 1 and file_name_parts[-1] != "mp3": + log.warning("You can only specify mp3 as file extension.\n") + log.warning( + "Do not provide any extension to autodetect the codec.\n" + ) + file_name = user_input + except Exception: + file_name = user_input + + if user_input.strip() != "": + handle_record( + target_url, + station_name, + record_file_path, + file_name, + record_file_format, + loglevel, + ) + + if user_input in ["i", "I", "info"]: handle_show_station_info() elif user_input in ["f", "F", "fav"]: @@ -251,14 +261,14 @@ def handle_listen_keypress( elif user_input in ["q", "Q", "quit"]: player.stop() sys.exit(0) - + elif user_input in ["w", "W", "list"]: alias.generate_map() handle_favorite_table(alias) - + elif user_input in ["t", "T", "track"]: handle_fetch_song_title(target_url) - + elif user_input in ["p", "P"]: player.toggle() @@ -268,16 +278,18 @@ def handle_listen_keypress( query = input("Enter station name to search: ") except EOFError: continue - + if query.strip(): station_list = handle_search_stations( - handler, query, limit=100, sort_by="votes", filter_with="none" + handler, query, limit=100, sort_by="votes", filter_with="none" ) if station_list: # Find valid station choice try: - station_name, target_url = handle_user_choice_from_search_result( - handler, station_list + station_name, target_url = ( + handle_user_choice_from_search_result( + handler, station_list + ) ) # Stop current, switch player.stop() @@ -287,10 +299,10 @@ def handle_listen_keypress( # Update loop variables station_url = target_url except SystemExit: - # handle_user_choice might try to exit on cancel + # handle_user_choice might try to exit on cancel pass else: - log.warning("Search unavailable (handler not initialized)") + log.warning("Search unavailable (handler not initialized)") elif user_input in ["c", "C", "cycle"]: if station_list and handler: @@ -301,38 +313,38 @@ def handle_listen_keypress( if st.get("stationuuid") == current_uuid: current_index = idx break - + # Next index next_index = (current_index + 1) % len(station_list) - + # Try to play next valid station # We loop until we find one that works or we exhaust list attempts = 0 max_attempts = len(station_list) - + while attempts < max_attempts: - target_station = station_list[next_index] - set_global_station_info(target_station) - log.info(f"Switching to: {target_station.get('name')}") - - try: + target_station = station_list[next_index] + set_global_station_info(target_station) + log.info(f"Switching to: {target_station.get('name')}") + + try: station_name, target_url = handle_station_uuid_play( handler, target_station["stationuuid"] ) player.stop() player.url = target_url player.play() - + # Check if successful (basic check) # The player runs in threads, so strict check is hard, but we can trust if it didn't error immediately handle_current_play_panel(station_name) station_url = target_url break - except Exception as e: + except Exception as e: log.error(f"Failed to play {target_station.get('name')}: {e}") next_index = (next_index + 1) % len(station_list) attempts += 1 - + if attempts >= max_attempts: log.error("Could not play any station from the list") @@ -343,8 +355,9 @@ def handle_listen_keypress( log.info("p: Play/Pause current station") log.info("t/track: Current track info") log.info("i/info: Station information") - log.info("r/record: Record a station") - log.info("rf/recordfile: Specify a filename for the recording") + if RECORDING_FEATURE: + log.info("r/record: Record a station") + log.info("rf/recordfile: Specify a filename for the recording") log.info("f/fav: Add station to favorite list") log.info("s/search: Search for a new station") log.info("c/cycle: Cycle to next station in search results") diff --git a/radioactive/vlc.py b/radioactive/vlc.py index 69612f6..a564d44 100644 --- a/radioactive/vlc.py +++ b/radioactive/vlc.py @@ -13,9 +13,20 @@ def __init__(self): # Check common locations on Windows if self.exe_path is None and sys.platform.startswith("win"): import os + common_paths = [ - os.path.join(os.environ.get("ProgramFiles", "C:\\Program Files"), "VideoLAN", "VLC", "vlc.exe"), - os.path.join(os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)"), "VideoLAN", "VLC", "vlc.exe"), + os.path.join( + os.environ.get("ProgramFiles", "C:\\Program Files"), + "VideoLAN", + "VLC", + "vlc.exe", + ), + os.path.join( + os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)"), + "VideoLAN", + "VLC", + "vlc.exe", + ), ] for path in common_paths: if os.path.exists(path): From 19cd9c7807f43030f1243b6ae70098157406b75a Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 00:18:38 +0530 Subject: [PATCH 05/12] Feat: Minimal Mode and Info Runtime Command --- configure.py | 14 ++++++++++++++ features.conf | 6 ++++++ radioactive/args.py | 3 ++- radioactive/feature_flags.py | 5 +++++ radioactive/handler.py | 14 ++++++++++++++ radioactive/parser.py | 2 +- radioactive/utilities.py | 32 +++++++++++++++++++++++--------- 7 files changed, 65 insertions(+), 11 deletions(-) diff --git a/configure.py b/configure.py index 1cdc40b..f6e4ff9 100644 --- a/configure.py +++ b/configure.py @@ -21,6 +21,20 @@ def generate_flags(): print(f"Warning: {CONFIG_FILE} not found. Using defaults.") # Default flags if config is missing flags["RECORDING_FEATURE"] = True + flags["TRACK_FEATURE"] = True + flags["SEARCH_FEATURE"] = True + flags["CYCLE_FEATURE"] = True + flags["INFO_FEATURE"] = True + flags["MINIMAL_FEATURE"] = False + + # Apply limits if MINIMAL_FEATURE is True + if flags.get("MINIMAL_FEATURE", False): + print("MINIMAL_FEATURE is enabled. Disabling all optional features.") + flags["RECORDING_FEATURE"] = False + flags["TRACK_FEATURE"] = False + flags["SEARCH_FEATURE"] = False + flags["CYCLE_FEATURE"] = False + flags["INFO_FEATURE"] = False with open(FEATURE_FLAGS_FILE, "w") as f: f.write("# This file is auto-generated by the configure step. Do not edit manually.\n\n") diff --git a/features.conf b/features.conf index 677ded1..c272713 100644 --- a/features.conf +++ b/features.conf @@ -1,4 +1,10 @@ # Feature Configuration File # Set features to true or false to enable/disable them at "compile time" +# If MINIMAL_FEATURE is true, it will override and disable all optional features (Recording, Track, Search, Cycle, Info) +MINIMAL_FEATURE=true RECORDING_FEATURE=false +TRACK_FEATURE=true +SEARCH_FEATURE=false +CYCLE_FEATURE=true +INFO_FEATURE=true diff --git a/radioactive/args.py b/radioactive/args.py index 740483e..f87f561 100644 --- a/radioactive/args.py +++ b/radioactive/args.py @@ -6,9 +6,10 @@ from radioactive.config import Configs try: - from radioactive.feature_flags import RECORDING_FEATURE + from radioactive.feature_flags import RECORDING_FEATURE, SEARCH_FEATURE except ImportError: RECORDING_FEATURE = True + SEARCH_FEATURE = True # load default configs diff --git a/radioactive/feature_flags.py b/radioactive/feature_flags.py index 1a43edd..10ee642 100644 --- a/radioactive/feature_flags.py +++ b/radioactive/feature_flags.py @@ -1,3 +1,8 @@ # This file is auto-generated by the configure step. Do not edit manually. +MINIMAL_FEATURE = True RECORDING_FEATURE = False +TRACK_FEATURE = False +SEARCH_FEATURE = False +CYCLE_FEATURE = False +INFO_FEATURE = False diff --git a/radioactive/handler.py b/radioactive/handler.py index a0fd054..9d67692 100644 --- a/radioactive/handler.py +++ b/radioactive/handler.py @@ -15,6 +15,11 @@ from radioactive.filter import filter_expressions +try: + from radioactive.feature_flags import MINIMAL_FEATURE +except ImportError: + MINIMAL_FEATURE = False + # constants DEFAULT_CACHE_RETENTION_DAYS = 3 BYTES_TO_MB_DIVISOR = 1024 * 1024 @@ -60,6 +65,15 @@ def print_table( list: The original (or filtered) response data. """ + if MINIMAL_FEATURE: + columns = [ + col + for col in columns + if all(x not in col for x in ["Tags", "Country", "Language"]) + ] + if len(response) > 10: + response = response[:10] + if not response: log.error("No stations found") sys.exit(1) diff --git a/radioactive/parser.py b/radioactive/parser.py index f43dcc2..61c1782 100644 --- a/radioactive/parser.py +++ b/radioactive/parser.py @@ -33,7 +33,7 @@ def parse_options() -> Dict[str, Any]: options["limit"] = int(limit) if limit else 100 log.debug("limit is set to: {}".format(limit)) - options["search_station_name"] = args.search_station_name + options["search_station_name"] = getattr(args, "search_station_name", None) options["search_station_uuid"] = args.search_station_uuid options["play_last_station"] = args.play_last_station diff --git a/radioactive/utilities.py b/radioactive/utilities.py index ab3245f..fceba9c 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -12,9 +12,19 @@ from zenlog import log try: - from radioactive.feature_flags import RECORDING_FEATURE + from radioactive.feature_flags import ( + RECORDING_FEATURE, + TRACK_FEATURE, + SEARCH_FEATURE, + CYCLE_FEATURE, + INFO_FEATURE, + ) except ImportError: RECORDING_FEATURE = True + TRACK_FEATURE = True + SEARCH_FEATURE = True + CYCLE_FEATURE = True + INFO_FEATURE = True # Re-export functions for backward compatibility and aggregation from radioactive.ui import ( @@ -252,7 +262,7 @@ def handle_listen_keypress( loglevel, ) - if user_input in ["i", "I", "info"]: + if INFO_FEATURE and user_input in ["i", "I", "info"]: handle_show_station_info() elif user_input in ["f", "F", "fav"]: @@ -266,13 +276,13 @@ def handle_listen_keypress( alias.generate_map() handle_favorite_table(alias) - elif user_input in ["t", "T", "track"]: + elif TRACK_FEATURE and user_input in ["t", "T", "track"]: handle_fetch_song_title(target_url) elif user_input in ["p", "P"]: player.toggle() - elif user_input in ["s", "S", "search"]: + elif SEARCH_FEATURE and user_input in ["s", "S", "search"]: if handler: try: query = input("Enter station name to search: ") @@ -304,7 +314,7 @@ def handle_listen_keypress( else: log.warning("Search unavailable (handler not initialized)") - elif user_input in ["c", "C", "cycle"]: + elif CYCLE_FEATURE and user_input in ["c", "C", "cycle"]: if station_list and handler: # Find current index current_uuid = global_current_station_info.get("stationuuid") @@ -353,13 +363,17 @@ def handle_listen_keypress( elif user_input in ["h", "H", "?", "help"]: log.info("p: Play/Pause current station") - log.info("t/track: Current track info") - log.info("i/info: Station information") + if TRACK_FEATURE: + log.info("t/track: Current track info") + if INFO_FEATURE: + log.info("i/info: Station information") if RECORDING_FEATURE: log.info("r/record: Record a station") log.info("rf/recordfile: Specify a filename for the recording") log.info("f/fav: Add station to favorite list") - log.info("s/search: Search for a new station") - log.info("c/cycle: Cycle to next station in search results") + if SEARCH_FEATURE: + log.info("s/search: Search for a new station") + if CYCLE_FEATURE: + log.info("c/cycle: Cycle to next station in search results") log.info("h/help/?: Show this help message") log.info("q/quit: Quit radioactive") From 1c75801bdaf64cd21553b5646f406289b6fcdc42 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 00:07:50 +0530 Subject: [PATCH 06/12] Feat: Sleep Timer and configuration updates --- README.md | 2 ++ configure.py | 2 ++ features.conf | 5 +++-- radioactive/feature_flags.py | 9 +++++---- radioactive/ffplay.py | 3 ++- radioactive/utilities.py | 37 ++++++++++++++++++++++++++++++++++++ 6 files changed, 51 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1a1bede..5df1045 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ - [x] Discovers stations by language - [x] VLC, MPV player support - [x] Default config file +- [x] Sleep Timer (pomodoro) ⏲️ - [ ] I'm feeling lucky! Play Random stations @@ -199,6 +200,7 @@ r/R/record: Record a station f/F/fav: Add station to favorite list s/S/search: Search for a new station c/C/cycle: Cycle to next station in search results +timer/sleep: Set a sleep timer (duration in minutes) rf/RF/recordfile: Specify a filename for the recording. h/H/help/?: Show this help message q/Q/quit: Quit radioactive diff --git a/configure.py b/configure.py index f6e4ff9..6f2b8ef 100644 --- a/configure.py +++ b/configure.py @@ -25,6 +25,7 @@ def generate_flags(): flags["SEARCH_FEATURE"] = True flags["CYCLE_FEATURE"] = True flags["INFO_FEATURE"] = True + flags["TIMER_FEATURE"] = True flags["MINIMAL_FEATURE"] = False # Apply limits if MINIMAL_FEATURE is True @@ -35,6 +36,7 @@ def generate_flags(): flags["SEARCH_FEATURE"] = False flags["CYCLE_FEATURE"] = False flags["INFO_FEATURE"] = False + flags["TIMER_FEATURE"] = False with open(FEATURE_FLAGS_FILE, "w") as f: f.write("# This file is auto-generated by the configure step. Do not edit manually.\n\n") diff --git a/features.conf b/features.conf index c272713..977b508 100644 --- a/features.conf +++ b/features.conf @@ -1,10 +1,11 @@ # Feature Configuration File # Set features to true or false to enable/disable them at "compile time" -# If MINIMAL_FEATURE is true, it will override and disable all optional features (Recording, Track, Search, Cycle, Info) +# If MINIMAL_FEATURE is true, it will override and disable all optional features (Recording, Track, Search, Cycle, Info, Timer) -MINIMAL_FEATURE=true +MINIMAL_FEATURE=false RECORDING_FEATURE=false TRACK_FEATURE=true SEARCH_FEATURE=false CYCLE_FEATURE=true INFO_FEATURE=true +TIMER_FEATURE=true diff --git a/radioactive/feature_flags.py b/radioactive/feature_flags.py index 10ee642..049899b 100644 --- a/radioactive/feature_flags.py +++ b/radioactive/feature_flags.py @@ -1,8 +1,9 @@ # This file is auto-generated by the configure step. Do not edit manually. -MINIMAL_FEATURE = True +MINIMAL_FEATURE = False RECORDING_FEATURE = False -TRACK_FEATURE = False +TRACK_FEATURE = True SEARCH_FEATURE = False -CYCLE_FEATURE = False -INFO_FEATURE = False +CYCLE_FEATURE = True +INFO_FEATURE = True +TIMER_FEATURE = True diff --git a/radioactive/ffplay.py b/radioactive/ffplay.py index 718eb10..dc07139 100644 --- a/radioactive/ffplay.py +++ b/radioactive/ffplay.py @@ -37,7 +37,8 @@ def kill_background_ffplays() -> None: # Handle exceptions, such as processes that no longer exist or access denied log.debug("Could not terminate a ffplay processes!") if count == 0: - log.info("No background radios are running!") + pass + # log.info("No background radios are running!") class Ffplay: diff --git a/radioactive/utilities.py b/radioactive/utilities.py index fceba9c..2e53435 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -4,6 +4,9 @@ """ import sys +import threading +import time +import os from random import randint from typing import Any, Dict, List, Optional, Tuple, Union @@ -18,6 +21,7 @@ SEARCH_FEATURE, CYCLE_FEATURE, INFO_FEATURE, + TIMER_FEATURE, ) except ImportError: RECORDING_FEATURE = True @@ -25,6 +29,7 @@ SEARCH_FEATURE = True CYCLE_FEATURE = True INFO_FEATURE = True + TIMER_FEATURE = True # Re-export functions for backward compatibility and aggregation from radioactive.ui import ( @@ -265,6 +270,36 @@ def handle_listen_keypress( if INFO_FEATURE and user_input in ["i", "I", "info"]: handle_show_station_info() + elif TIMER_FEATURE and user_input in ["timer", "sleep"]: + try: + duration_str = input("Enter sleep timer duration in minutes: ") + duration = float(duration_str) + if duration <= 0: + log.error("Duration must be positive") + continue + + log.info(f"Sleep timer set for {duration} minutes") + + def stop_playback(): + log.info("\nSleep timer finished. Stopping playback...") + # We need to stop the player and exit. + # Since we are in a thread, we can't easily exit the main input loop cleanly + # without some signal, but sys.exit() or os._exit() should work strong enough. + if player: + player.stop() + kill_background_ffplays() + log.info("Exiting...") + os._exit(0) # Force exit from thread + + t = threading.Timer(duration * 60, stop_playback) + t.daemon = True # Ensure it doesn't block exit if we quit manually + t.start() + + except ValueError: + log.error("Invalid number") + except Exception as e: + log.error(f"Error setting timer: {e}") + elif user_input in ["f", "F", "fav"]: handle_add_to_favorite(alias, station_name, station_url) @@ -375,5 +410,7 @@ def handle_listen_keypress( log.info("s/search: Search for a new station") if CYCLE_FEATURE: log.info("c/cycle: Cycle to next station in search results") + if TIMER_FEATURE: + log.info("timer/sleep: Set a sleep timer") log.info("h/help/?: Show this help message") log.info("q/quit: Quit radioactive") From 91aebaa3342950d4e054cf84735b91aa5fff56a0 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 00:25:40 +0530 Subject: [PATCH 07/12] Release version 3.0.0 --- features.conf | 4 ++-- radioactive/app.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/features.conf b/features.conf index 977b508..125ed4e 100644 --- a/features.conf +++ b/features.conf @@ -3,9 +3,9 @@ # If MINIMAL_FEATURE is true, it will override and disable all optional features (Recording, Track, Search, Cycle, Info, Timer) MINIMAL_FEATURE=false -RECORDING_FEATURE=false +RECORDING_FEATURE=true TRACK_FEATURE=true -SEARCH_FEATURE=false +SEARCH_FEATURE=true CYCLE_FEATURE=true INFO_FEATURE=true TIMER_FEATURE=true diff --git a/radioactive/app.py b/radioactive/app.py index 128a644..ca85c10 100644 --- a/radioactive/app.py +++ b/radioactive/app.py @@ -11,7 +11,7 @@ class App: def __init__(self): - self.__VERSION__ = "2.9.1" # change this on every update # + self.__VERSION__ = "3.0.0" # change this on every update # self.pypi_api = "https://pypi.org/pypi/radio-active/json" self.remote_version = "" From 197168248648b8298b68951fb4d65d21890029ba Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 00:47:22 +0530 Subject: [PATCH 08/12] fix: resolve build dependency issue by lazy importing requests --- radioactive/app.py | 4 ++-- radioactive/feature_flags.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/radioactive/app.py b/radioactive/app.py index ca85c10..fed2b95 100644 --- a/radioactive/app.py +++ b/radioactive/app.py @@ -6,8 +6,6 @@ import json -import requests - class App: def __init__(self): @@ -28,6 +26,8 @@ def is_update_available(self): """ try: + import requests + remote_data = requests.get(self.pypi_api) remote_data = remote_data.content.decode("utf8") remote_data = json.loads(remote_data) diff --git a/radioactive/feature_flags.py b/radioactive/feature_flags.py index 049899b..08b274b 100644 --- a/radioactive/feature_flags.py +++ b/radioactive/feature_flags.py @@ -1,9 +1,9 @@ # This file is auto-generated by the configure step. Do not edit manually. MINIMAL_FEATURE = False -RECORDING_FEATURE = False +RECORDING_FEATURE = True TRACK_FEATURE = True -SEARCH_FEATURE = False +SEARCH_FEATURE = True CYCLE_FEATURE = True INFO_FEATURE = True TIMER_FEATURE = True From f2e7eb3db335f103e5c9a85cb756da980b6f6ed6 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 23:06:56 +0530 Subject: [PATCH 09/12] Fix cycle runtime command bugs --- README.md | 2 +- radioactive/ui.py | 6 ++ radioactive/utilities.py | 124 ++++++++++++++++++++++++++++++--------- 3 files changed, 103 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 5df1045..34195d2 100644 --- a/README.md +++ b/README.md @@ -199,7 +199,7 @@ t/T/track: Current song name (track info) r/R/record: Record a station f/F/fav: Add station to favorite list s/S/search: Search for a new station -c/C/cycle: Cycle to next station in search results +n/N/next: Play next station from search results or favorite list timer/sleep: Set a sleep timer (duration in minutes) rf/RF/recordfile: Specify a filename for the recording. h/H/help/?: Show this help message diff --git a/radioactive/ui.py b/radioactive/ui.py index f5cff35..86d812a 100644 --- a/radioactive/ui.py +++ b/radioactive/ui.py @@ -120,3 +120,9 @@ def set_global_station_info(info: dict) -> None: """Helper to update global station info from other modules.""" global global_current_station_info global_current_station_info = info + + +def get_global_station_info() -> dict: + """Helper to get global station info.""" + global global_current_station_info + return global_current_station_info diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 2e53435..08f7d2b 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -39,7 +39,7 @@ handle_show_station_info, handle_current_play_panel, set_global_station_info, - global_current_station_info, + get_global_station_info, ) from radioactive.actions import ( @@ -349,52 +349,118 @@ def stop_playback(): else: log.warning("Search unavailable (handler not initialized)") - elif CYCLE_FEATURE and user_input in ["c", "C", "cycle"]: - if station_list and handler: + elif CYCLE_FEATURE and user_input in ["n", "N", "next"]: + target_list = [] + source_type = "" + + # Prioritize search results if available + if station_list and len(station_list) > 0: + target_list = station_list + source_type = "search" + elif alias and alias.alias_map: + target_list = alias.alias_map + source_type = "favorite" + + if target_list: # Find current index - current_uuid = global_current_station_info.get("stationuuid") + current_info = get_global_station_info() + current_uuid = current_info.get("stationuuid") + current_url = current_info.get("url") # for direct URLs + current_index = -1 - for idx, st in enumerate(station_list): - if st.get("stationuuid") == current_uuid: - current_index = idx - break + + # Try to find current station in the target list + for idx, st in enumerate(target_list): + if source_type == "search": + if st.get("stationuuid") == current_uuid: + current_index = idx + break + elif source_type == "favorite": + # Favorites use uuid_or_url + val = st.get("uuid_or_url") + # Check against both uuid and url to be safe + if val == current_uuid or val == current_url: + current_index = idx + break + # Also check name as fallback + if st.get("name") == current_info.get("name"): + current_index = idx + break # Next index - next_index = (current_index + 1) % len(station_list) + next_index = (current_index + 1) % len(target_list) # Try to play next valid station - # We loop until we find one that works or we exhaust list attempts = 0 - max_attempts = len(station_list) + max_attempts = len(target_list) while attempts < max_attempts: - target_station = station_list[next_index] - set_global_station_info(target_station) + target_station = target_list[next_index] log.info(f"Switching to: {target_station.get('name')}") + # Determine how to play based on available info + # We need to simulate the "Selection" logic + try: - station_name, target_url = handle_station_uuid_play( - handler, target_station["stationuuid"] - ) - player.stop() - player.url = target_url - player.play() - - # Check if successful (basic check) - # The player runs in threads, so strict check is hard, but we can trust if it didn't error immediately - handle_current_play_panel(station_name) - station_url = target_url - break + new_station_name = "" + new_target_url = "" + + if source_type == "search": + # It's a full station object + set_global_station_info(target_station) + new_station_name, new_target_url = handle_station_uuid_play( + handler, target_station["stationuuid"] + ) + else: + # Favorite entry: {'name':..., 'uuid_or_url':...} + # Construct a temporary info object for global state + uuid_or_url = target_station["uuid_or_url"] + + temp_info = { + "name": target_station["name"], + "uuid_or_url": uuid_or_url, + # We might not know if it is a UUID or URL yet for sure without helper, + # but let's try to populate what we can + } + + if "://" in uuid_or_url: + # Direct URL + temp_info["url"] = uuid_or_url + set_global_station_info(temp_info) + new_station_name = target_station["name"] + new_target_url = uuid_or_url + # Allow direct play without UUID handler + else: + # UUID + temp_info["stationuuid"] = uuid_or_url + set_global_station_info(temp_info) + new_station_name, new_target_url = ( + handle_station_uuid_play(handler, uuid_or_url) + ) + + # Check if we have a URL to play + if new_target_url: + player.stop() + player.url = new_target_url + player.play() + handle_current_play_panel(new_station_name) + station_url = new_target_url + break + else: + raise Exception("Could not resolve station URL") + except Exception as e: log.error(f"Failed to play {target_station.get('name')}: {e}") - next_index = (next_index + 1) % len(station_list) + next_index = (next_index + 1) % len(target_list) attempts += 1 if attempts >= max_attempts: log.error("Could not play any station from the list") else: - log.warning("Cycle unavailable (no search results to cycle through)") + log.warning( + "Cycle/Next unavailable (no search results or favorites to cycle through)" + ) elif user_input in ["h", "H", "?", "help"]: log.info("p: Play/Pause current station") @@ -409,7 +475,9 @@ def stop_playback(): if SEARCH_FEATURE: log.info("s/search: Search for a new station") if CYCLE_FEATURE: - log.info("c/cycle: Cycle to next station in search results") + log.info( + "n/next: Play result from next station searching or favorite list" + ) if TIMER_FEATURE: log.info("timer/sleep: Set a sleep timer") log.info("h/help/?: Show this help message") From 3805537a8be16d68fb9ba47c8ed99e25d9127906 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 23:20:00 +0530 Subject: [PATCH 10/12] Fix saving a station info into favorite list from next player runtime command --- radioactive/utilities.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 08f7d2b..7216f8e 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -445,6 +445,8 @@ def stop_playback(): player.play() handle_current_play_panel(new_station_name) station_url = new_target_url + station_name = new_station_name + target_url = new_target_url break else: raise Exception("Could not resolve station URL") From e2a261266e1f910a68cd4723bcb39fab33eb0cac Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Fri, 23 Jan 2026 23:29:33 +0530 Subject: [PATCH 11/12] Fix empty station list on runtime exist the entire app --- radioactive/handler.py | 9 ++++++--- radioactive/utilities.py | 5 +++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/radioactive/handler.py b/radioactive/handler.py index 9d67692..7798b5a 100644 --- a/radioactive/handler.py +++ b/radioactive/handler.py @@ -76,7 +76,8 @@ def print_table( if not response: log.error("No stations found") - sys.exit(1) + # sys.exit(1) + return [] # Apply filtering if needed if filter_expression.lower() != "none": @@ -84,7 +85,8 @@ def print_table( if not response: log.error("No stations found after filtering") - sys.exit(1) + # sys.exit(1) + return [] else: log.debug("Not filtering") @@ -129,7 +131,8 @@ def print_table( return response else: log.info("No stations found") - sys.exit(0) + # Do not exit if no stations found, just return empty + return [] class Handler: diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 7216f8e..5b619c7 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -325,10 +325,11 @@ def stop_playback(): continue if query.strip(): - station_list = handle_search_stations( + temp_station_list = handle_search_stations( handler, query, limit=100, sort_by="votes", filter_with="none" ) - if station_list: + if temp_station_list: + station_list = temp_station_list # Find valid station choice try: station_name, target_url = ( From c76f219bc48033923c191decee59930592a85789 Mon Sep 17 00:00:00 2001 From: Dipankar Pal Date: Sat, 24 Jan 2026 00:30:41 +0530 Subject: [PATCH 12/12] Fix recording path issues Also added fallback paths for app default config dir --- README.md | 26 ++++---- radioactive/actions.py | 22 +++++-- radioactive/config.py | 10 +-- radioactive/help.py | 2 +- radioactive/paths.py | 146 +++++++++++++++++++++++++++-------------- 5 files changed, 129 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index 34195d2..f84e82b 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,7 @@ Search a station with `radio --search [STATION_NAME]` or simply `radio` :zap: to | `--uuid`, `-U` | Optional | ID of the station | None | | | `--record` , `-R` | Optional | Record a station and save to file | False | | | `--filename`, `-N` | Optional | Filename to used to save the recorded audio | None | | -| `--filepath` | Optional | Path to save the recordings | | | +| `--filepath` | Optional | Path to save the recordings | `~/radioactive/recordings` | | | `--filetype`, `-T` | Optional | Format of the recording | mp3 | `mp3`,`auto` | | `--last` | Optional | Play last played station | False | | | `--random` | Optional | Play a random station from favorite list | False | | @@ -186,7 +186,7 @@ Search a station with `radio --search [STATION_NAME]` or simply `radio` :zap: to > `--filetype`: Specify the extension of the final recording file. default is `mp3`. you can provide `-T auto` to autodetect the codec and set file extension accordingly (in original form). -> DEFAULT_DIR: is `/home/user/Music/radioactive` + ### Runtime Commands @@ -262,27 +262,23 @@ limit = 100 sort = votes filter = none volume = 80 -filepath = /home/{user}/recordings/radioactive/ +filepath = /home/{user}/radioactive/recordings/ filetype = mp3 player = ffplay ``` ### Configuration Paths +All the data files are stored in a folder called `radioactive` under your user home directory. -`radio-active` follows the XDG Base Directory specification: - -- **Configuration** (including `config.ini` and `alias_map`): - - Linux/Mac: `$XDG_CONFIG_HOME/radio-active` (defaults to `~/.config/radio-active`) - - Windows: `%XDG_CONFIG_HOME%/radio-active` (defaults to `~/.config/radio-active` if not set) - -- **Data** (including `last-station`): - - Linux/Mac: `$XDG_DATA_HOME/radio-active` (defaults to `~/.local/share/radio-active`) - - Windows: `%XDG_DATA_HOME%/radio-active` (defaults to `~/.local/share/radio-active` if not set) +- **Configuration**: `~/radioactive/config.ini` +- **Favorites**: `~/radioactive/alias_map` +- **Last Station**: `~/radioactive/last_station` +- **Recordings**: `~/radioactive/recordings` -Legacy configuration files in `~/.radio-active-configs.ini`, `~/.radio-active-alias`, and `~/.radio-active-last-station` will be automatically migrated to the new locations on first run. +Legacy configuration files are automatically migrated to this new location on the first run. > [!WARNING] -> Do NOT modify the keys, only change the values. you can give any absolute or relative path as filepath. +> Do NOT modify the keys, only change the values. ### Bonus Tips @@ -300,7 +296,7 @@ see [CHANGELOG](./CHANGELOG.md) Share you favorite list with our community 🌐 ➡️ [Here](https://github.com/deep5050/radio-active/discussions/10) -> Your favorite list `.radio-active-alias` is under your home directory as a hidden file :) +> Your favorite list `alias_map` is under `~/radioactive/` directory. ### Support diff --git a/radioactive/actions.py b/radioactive/actions.py index 576f1d1..71a009a 100644 --- a/radioactive/actions.py +++ b/radioactive/actions.py @@ -97,19 +97,27 @@ def handle_record( if record_file_path and not os.path.exists(record_file_path): log.debug(f"filepath: {record_file_path}") - os.makedirs(record_file_path, exist_ok=True) + try: + os.makedirs(record_file_path, exist_ok=True) + except Exception as e: + log.error(f"Could not create recording directory: {e}") elif not record_file_path: + from radioactive.paths import get_recordings_path + log.debug("filepath: fallback to default path") - record_file_path = os.path.join( - os.path.expanduser("~"), "Music/radioactive" - ) # fallback path + record_file_path = get_recordings_path() try: os.makedirs(record_file_path, exist_ok=True) except Exception as e: - log.debug(f"{e}") - log.error("Could not make default directory") - sys.exit(1) + log.error(f"Could not create recording directory: {e}") + log.warning("Recording might fail if the directory is not writable.") + # We don't exit here, we try to proceed or return? + # If we return, recording stops but app stays alive. + # But earlier code sys.exit(1). + # User wants NO CRASH. + # Let's try to verify if we can write there? + # For now, just catching the exception is enough to stop the crash. now = datetime.datetime.now() month_name = now.strftime("%b").upper() diff --git a/radioactive/config.py b/radioactive/config.py index cf8851a..3d3ac38 100644 --- a/radioactive/config.py +++ b/radioactive/config.py @@ -19,6 +19,8 @@ def write_a_sample_config_file() -> None: # Create a ConfigParser object config = configparser.ConfigParser() + from radioactive.paths import get_recordings_path + # Add sections and key-value pairs config["AppConfig"] = { "loglevel": "info", @@ -26,7 +28,7 @@ def write_a_sample_config_file() -> None: "sort": "votes", "filter": "none", "volume": "80", - "filepath": "/home/{user}/recordings/radioactive/", + "filepath": get_recordings_path(), "filetype": "mp3", "player": "ffplay", } @@ -83,9 +85,9 @@ def get_option(key: str, default: str = "") -> str: options["sort"] = get_option("sort", "votes") options["filter"] = get_option("filter", "none") options["limit"] = get_option("limit", "100") - options["filepath"] = get_option( - "filepath", "/home/{user}/recordings/radioactive/" - ) + from radioactive.paths import get_recordings_path + + options["filepath"] = get_option("filepath", get_recordings_path()) # if filepath has any placeholder, replace {user} to actual user map if "{user}" in options["filepath"]: diff --git a/radioactive/help.py b/radioactive/help.py index 6a25412..572ce2b 100644 --- a/radioactive/help.py +++ b/radioactive/help.py @@ -124,7 +124,7 @@ def show_help(): table.add_row( "--filepath", "Path to save the recorded audio", - f"{user}/Music/radioactive", + f"{user}/radioactive/recordings", ) table.add_row( diff --git a/radioactive/paths.py b/radioactive/paths.py index 356e5a7..37133ca 100644 --- a/radioactive/paths.py +++ b/radioactive/paths.py @@ -1,85 +1,131 @@ import os import shutil +import sys from zenlog import log -def _get_xdg_config_dir(): - """Return the XDG configuration directory for radio-active.""" - xdg_config_home = os.environ.get("XDG_CONFIG_HOME") - if xdg_config_home: - return os.path.join(xdg_config_home, "radio-active") - # Default to ~/.config/radio-active - return os.path.join(os.path.expanduser("~"), ".config", "radio-active") - - -def _get_xdg_data_dir(): - """Return the XDG data directory for radio-active.""" - xdg_data_home = os.environ.get("XDG_DATA_HOME") - if xdg_data_home: - return os.path.join(xdg_data_home, "radio-active") - # Default to ~/.local/share/radio-active - return os.path.join(os.path.expanduser("~"), ".local", "share", "radio-active") +def get_user_home(): + """ + Get the user's home directory in a cross-platform way. + """ + return os.path.expanduser("~") -def get_config_path(): +def get_base_dir(): """ - Get the path to the configuration file. - Migrates from legacy path if it exists and new path does not. + Return the base directory for radioactive files: ~/radioactive + This acts as the central storage for config, data, and recordings + as per user request. """ - legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-configs.ini") + home = get_user_home() + base_dir = os.path.join(home, "radioactive") - config_dir = _get_xdg_config_dir() - os.makedirs(config_dir, exist_ok=True) - new_path = os.path.join(config_dir, "config.ini") + try: + os.makedirs(base_dir, exist_ok=True) + except Exception as e: + # If we can't create the base dir, we are in trouble, + # but we log it and proceed (might crash later if not handled) + log.error(f"Could not create base directory {base_dir}: {e}") + return base_dir + + +def _migrate_file(legacy_path, new_path, description): + """ + Migrate a file from legacy_path to new_path if it exists. + """ if os.path.exists(legacy_path) and not os.path.exists(new_path): - log.info(f"Migrating config file from {legacy_path} to {new_path}") + log.info(f"Migrating {description} from {legacy_path} to {new_path}") try: + # Ensure the directory exists + os.makedirs(os.path.dirname(new_path), exist_ok=True) shutil.move(legacy_path, new_path) except Exception as e: - log.warning(f"Could not migrate config file: {e}") - # If migration fails, we return new_path anyway, user might have to manually move or start fresh + log.warning(f"Could not migrate {description}: {e}") + + +def get_config_path(): + """ + Get the path to the configuration file: ~/radioactive/config.ini + """ + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "config.ini") + + home = get_user_home() + + # 1. ~/.radio-active-configs.ini + legacy_dot_path = os.path.join(home, ".radio-active-configs.ini") + _migrate_file(legacy_dot_path, new_path, "config file (dotfile)") + + # 2. XDG locations (from previous attempts) + # ~/.config/radioactive/config.ini + xdg_path_new = os.path.join(home, ".config", "radioactive", "config.ini") + _migrate_file(xdg_path_new, new_path, "config file (xdg-new)") + + # ~/.config/radio-active/config.ini + xdg_path_old = os.path.join(home, ".config", "radio-active", "config.ini") + _migrate_file(xdg_path_old, new_path, "config file (xdg-old)") return new_path def get_alias_path(): """ - Get the path to the alias (favorites) file. - Migrates from legacy path if it exists and new path does not. + Get the path to the alias (favorites) file: ~/radioactive/alias_map """ - legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-alias") + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "alias_map") - config_dir = _get_xdg_config_dir() # Aliases are user config - os.makedirs(config_dir, exist_ok=True) - new_path = os.path.join(config_dir, "alias_map") + home = get_user_home() - if os.path.exists(legacy_path) and not os.path.exists(new_path): - log.info(f"Migrating alias file from {legacy_path} to {new_path}") - try: - shutil.move(legacy_path, new_path) - except Exception as e: - log.warning(f"Could not migrate alias file: {e}") + # 1. ~/.radio-active-alias + legacy_dot_path = os.path.join(home, ".radio-active-alias") + _migrate_file(legacy_dot_path, new_path, "alias file (dotfile)") + + # 2. XDG locations + xdg_path_new = os.path.join(home, ".config", "radioactive", "alias_map") + _migrate_file(xdg_path_new, new_path, "alias file (xdg-new)") + + xdg_path_old = os.path.join(home, ".config", "radio-active", "alias_map") + _migrate_file(xdg_path_old, new_path, "alias file (xdg-old)") return new_path def get_last_station_path(): """ - Get the path to the last played station file. - Migrates from legacy path if it exists and new path does not. + Get the path to the last played station file: ~/radioactive/last_station """ - legacy_path = os.path.join(os.path.expanduser("~"), ".radio-active-last-station") + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "last_station") - data_dir = _get_xdg_data_dir() # Last station is state/data - os.makedirs(data_dir, exist_ok=True) - new_path = os.path.join(data_dir, "last-station") + home = get_user_home() - if os.path.exists(legacy_path) and not os.path.exists(new_path): - log.info(f"Migrating last station file from {legacy_path} to {new_path}") - try: - shutil.move(legacy_path, new_path) - except Exception as e: - log.warning(f"Could not migrate last station file: {e}") + # 1. ~/.radio-active-last_station + legacy_dot_path = os.path.join(home, ".radio-active-last_station") + _migrate_file(legacy_dot_path, new_path, "last station file (dotfile)") + + # 2. XDG locations (usually in local/share, but we check config too just in case) + xdg_data_new = os.path.join(home, ".local", "share", "radioactive", "last_station") + _migrate_file(xdg_data_new, new_path, "last station file (xdg-new)") + + xdg_data_old = os.path.join(home, ".local", "share", "radio-active", "last_station") + _migrate_file(xdg_data_old, new_path, "last station file (xdg-old)") return new_path + + +def get_recordings_path(): + """ + Get the path for recordings: ~/radioactive/recordings + """ + base_dir = get_base_dir() + recordings_path = os.path.join(base_dir, "recordings") + + try: + os.makedirs(recordings_path, exist_ok=True) + except Exception as e: + log.error(f"Could not create recordings directory {recordings_path}: {e}") + # Not exiting here, hoping the caller handles it or it works next time + + return recordings_path