diff --git a/.vscode/settings.json b/.vscode/settings.json index b5a61b5..bff3298 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -17,8 +17,8 @@ ], "files.autoSave": "off", "editor.wordWrap": "wordWrapColumn", - "workbench.colorTheme": "GitHub Dark", - "editor.minimap.autohide": true, + "workbench.colorTheme": "Visual Studio Light", + "editor.minimap.autohide": "mouseover", "editor.minimap.renderCharacters": false, "editor.experimentalWhitespaceRendering": "font", "editor.fontFamily": "'Fira Code', Consolas, 'Courier New', monospace", diff --git a/Makefile b/Makefile index 9f11434..90a54ca 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,11 @@ SRC_DIR = "radioactive" TEST_DIR = "test" .PHONY: all clean isort check dist deploy test-deploy help build install install-dev test -all: clean isort format check build install +all: clean isort format check configure build install + +configure: + @echo "Configuring features..." + ${PYTHON} configure.py check: @echo "Chceking linting errors......." diff --git a/README.md b/README.md index 3a22537..f84e82b 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ - [x] Discovers stations by language - [x] VLC, MPV player support - [x] Default config file +- [x] Sleep Timer (pomodoro) ⏲️ - [ ] I'm feeling lucky! Play Random stations @@ -144,7 +145,7 @@ Search a station with `radio --search [STATION_NAME]` or simply `radio` :zap: to | `--uuid`, `-U` | Optional | ID of the station | None | | | `--record` , `-R` | Optional | Record a station and save to file | False | | | `--filename`, `-N` | Optional | Filename to used to save the recorded audio | None | | -| `--filepath` | Optional | Path to save the recordings | | | +| `--filepath` | Optional | Path to save the recordings | `~/radioactive/recordings` | | | `--filetype`, `-T` | Optional | Format of the recording | mp3 | `mp3`,`auto` | | `--last` | Optional | Play last played station | False | | | `--random` | Optional | Play a random station from favorite list | False | | @@ -185,7 +186,7 @@ Search a station with `radio --search [STATION_NAME]` or simply `radio` :zap: to > `--filetype`: Specify the extension of the final recording file. default is `mp3`. you can provide `-T auto` to autodetect the codec and set file extension accordingly (in original form). -> DEFAULT_DIR: is `/home/user/Music/radioactive` + ### Runtime Commands @@ -197,6 +198,9 @@ Enter a command to perform an action: ? t/T/track: Current song name (track info) r/R/record: Record a station f/F/fav: Add station to favorite list +s/S/search: Search for a new station +n/N/next: Play next station from search results or favorite list +timer/sleep: Set a sleep timer (duration in minutes) rf/RF/recordfile: Specify a filename for the recording. h/H/help/?: Show this help message q/Q/quit: Quit radioactive @@ -258,13 +262,23 @@ limit = 100 sort = votes filter = none volume = 80 -filepath = /home/{user}/recordings/radioactive/ +filepath = /home/{user}/radioactive/recordings/ filetype = mp3 player = ffplay ``` +### Configuration Paths +All the data files are stored in a folder called `radioactive` under your user home directory. + +- **Configuration**: `~/radioactive/config.ini` +- **Favorites**: `~/radioactive/alias_map` +- **Last Station**: `~/radioactive/last_station` +- **Recordings**: `~/radioactive/recordings` + +Legacy configuration files are automatically migrated to this new location on the first run. + > [!WARNING] -> Do NOT modify the keys, only change the values. you can give any absolute or relative path as filepath. +> Do NOT modify the keys, only change the values. ### Bonus Tips @@ -282,7 +296,7 @@ see [CHANGELOG](./CHANGELOG.md) Share you favorite list with our community 🌐 ➡️ [Here](https://github.com/deep5050/radio-active/discussions/10) -> Your favorite list `.radio-active-alias` is under your home directory as a hidden file :) +> Your favorite list `alias_map` is under `~/radioactive/` directory. ### Support diff --git a/configure.py b/configure.py new file mode 100644 index 0000000..6f2b8ef --- /dev/null +++ b/configure.py @@ -0,0 +1,49 @@ + +import os + +CONFIG_FILE = "features.conf" +FEATURE_FLAGS_FILE = "radioactive/feature_flags.py" + +def generate_flags(): + print(f"Generating {FEATURE_FLAGS_FILE} from {CONFIG_FILE}...") + + flags = {} + if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE, "r") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, value = line.split("=", 1) + flags[key.strip()] = value.strip().lower() == "true" + else: + print(f"Warning: {CONFIG_FILE} not found. Using defaults.") + # Default flags if config is missing + flags["RECORDING_FEATURE"] = True + flags["TRACK_FEATURE"] = True + flags["SEARCH_FEATURE"] = True + flags["CYCLE_FEATURE"] = True + flags["INFO_FEATURE"] = True + flags["TIMER_FEATURE"] = True + flags["MINIMAL_FEATURE"] = False + + # Apply limits if MINIMAL_FEATURE is True + if flags.get("MINIMAL_FEATURE", False): + print("MINIMAL_FEATURE is enabled. Disabling all optional features.") + flags["RECORDING_FEATURE"] = False + flags["TRACK_FEATURE"] = False + flags["SEARCH_FEATURE"] = False + flags["CYCLE_FEATURE"] = False + flags["INFO_FEATURE"] = False + flags["TIMER_FEATURE"] = False + + with open(FEATURE_FLAGS_FILE, "w") as f: + f.write("# This file is auto-generated by the configure step. Do not edit manually.\n\n") + for key, value in flags.items(): + f.write(f"{key} = {value}\n") + + print(f"done. {FEATURE_FLAGS_FILE} is ready.") + +if __name__ == "__main__": + generate_flags() diff --git a/features.conf b/features.conf new file mode 100644 index 0000000..125ed4e --- /dev/null +++ b/features.conf @@ -0,0 +1,11 @@ +# Feature Configuration File +# Set features to true or false to enable/disable them at "compile time" +# If MINIMAL_FEATURE is true, it will override and disable all optional features (Recording, Track, Search, Cycle, Info, Timer) + +MINIMAL_FEATURE=false +RECORDING_FEATURE=true +TRACK_FEATURE=true +SEARCH_FEATURE=true +CYCLE_FEATURE=true +INFO_FEATURE=true +TIMER_FEATURE=true diff --git a/radioactive/__main__.py b/radioactive/__main__.py index e0989b4..68bb815 100755 --- a/radioactive/__main__.py +++ b/radioactive/__main__.py @@ -39,7 +39,7 @@ player = None -def final_step(options, last_station, alias, handler): +def final_step(options, last_station, alias, handler, station_list=None): global ffplay # always needed global player @@ -104,6 +104,8 @@ def final_step(options, last_station, alias, handler): record_file=options["record_file"], record_file_format=options["record_file_format"], loglevel=options["loglevel"], + handler=handler, + station_list=station_list, ) @@ -169,7 +171,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -186,7 +188,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -203,7 +205,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -220,7 +222,7 @@ def main(): options["curr_station_name"], options["target_url"], ) = handle_user_choice_from_search_result(handler, response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) @@ -268,7 +270,7 @@ def main(): ) = handle_user_choice_from_search_result(handler, response) # options["codec"] = response["codec"] # print(response) - final_step(options, last_station, alias, handler) + final_step(options, last_station, alias, handler, response) else: sys.exit(0) # ------------------------- direct play ------------------------# @@ -292,6 +294,11 @@ def main(): final_step(options, last_station, alias, handler) # final_step() + # If response is not defined yet, initialize it + if "response" not in locals(): + response = [] + + final_step(options, last_station, alias, handler, response) if os.name == "nt": while True: diff --git a/radioactive/actions.py b/radioactive/actions.py new file mode 100644 index 0000000..71a009a --- /dev/null +++ b/radioactive/actions.py @@ -0,0 +1,342 @@ +""" +Core logical actions for radio-active. +""" + +import datetime +import json +import os +import subprocess +import sys +from random import randint +from typing import Any, Dict, List, Optional, Tuple, Union + +import requests +from zenlog import log + +try: + from radioactive.feature_flags import RECORDING_FEATURE +except ImportError: + # Default to True if file not found (e.g. dev mode without configure) + RECORDING_FEATURE = True + +if RECORDING_FEATURE: + from radioactive.recorder import record_audio_auto_codec, record_audio_from_url +from radioactive.last_station import Last_station + + +def handle_fetch_song_title(url: str) -> None: + """Fetch currently playing track information""" + log.info("Fetching the current track info") + log.debug(f"Attempting to retrieve track info from: {url}") + # Run ffprobe command and capture the metadata + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_entries", + "format=icy", + url, + ] + track_name = "" + + try: + output = subprocess.check_output(cmd).decode("utf-8") + data = json.loads(output) + log.debug(f"station info: {data}") + + # Extract the station name (icy-name) if available + track_name = data.get("format", {}).get("tags", {}).get("StreamTitle", "") + except Exception: + log.error("Error while fetching the track name") + + if track_name != "": + log.info(f"🎶: {track_name}") + else: + log.error("No track information available") + + +def handle_record( + target_url: str, + curr_station_name: str, + record_file_path: str, + record_file: str, + record_file_format: str, # auto/mp3 + loglevel: str, +) -> None: + """ + Handle audio recording logic. + """ + if not RECORDING_FEATURE: + log.error("Recording feature is not compiled/enabled in this build.") + sys.exit(1) + + log.info("Press 'q' to stop recording") + force_mp3 = False + + if record_file_format != "mp3" and record_file_format != "auto": + record_file_format = "mp3" # default to mp3 + log.debug("Error: wrong codec supplied!. falling back to mp3") + force_mp3 = True + elif record_file_format == "auto": + log.debug("Codec: fetching stream codec") + codec = record_audio_auto_codec(target_url) + if codec is None: + record_file_format = "mp3" # default to mp3 + force_mp3 = True + log.debug("Error: could not detect codec. falling back to mp3") + else: + record_file_format = codec + log.debug(f"Codec: found {codec}") + elif record_file_format == "mp3": + # always save to mp3 to eliminate any runtime issues + # it is better to leave it on libmp3lame + force_mp3 = True + + if record_file_path and not os.path.exists(record_file_path): + log.debug(f"filepath: {record_file_path}") + try: + os.makedirs(record_file_path, exist_ok=True) + except Exception as e: + log.error(f"Could not create recording directory: {e}") + + elif not record_file_path: + from radioactive.paths import get_recordings_path + + log.debug("filepath: fallback to default path") + record_file_path = get_recordings_path() + try: + os.makedirs(record_file_path, exist_ok=True) + except Exception as e: + log.error(f"Could not create recording directory: {e}") + log.warning("Recording might fail if the directory is not writable.") + # We don't exit here, we try to proceed or return? + # If we return, recording stops but app stays alive. + # But earlier code sys.exit(1). + # User wants NO CRASH. + # Let's try to verify if we can write there? + # For now, just catching the exception is enough to stop the crash. + + now = datetime.datetime.now() + month_name = now.strftime("%b").upper() + # Format AM/PM as 'AM' or 'PM' + am_pm = now.strftime("%p") + + # format is : day-monthname-year@hour-minute-second-(AM/PM) + formatted_date_time = now.strftime(f"%d-{month_name}-%Y@%I-%M-%S-{am_pm}") + + if not record_file_format.strip(): + record_file_format = "mp3" + + if not record_file: + record_file = "{}-{}".format( + curr_station_name.strip(), formatted_date_time + ).replace(" ", "-") + + tmp_filename = f"{record_file}.{record_file_format}" + outfile_path = os.path.join(record_file_path, tmp_filename) + + log.info(f"Recording will be saved as: \n{outfile_path}") + + record_audio_from_url(target_url, outfile_path, force_mp3, loglevel) + + +def handle_add_station(alias) -> None: + """Add a new station to favorites via user input.""" + try: + left = input("Enter station name:") + right = input("Enter station stream-url or radio-browser uuid:") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + sys.exit(0) + + if left.strip() == "" or right.strip() == "": + log.error("Empty inputs not allowed") + sys.exit(1) + alias.add_entry(left, right) + log.info("New entry: {}={} added\n".format(left, right)) + sys.exit(0) + + +def handle_add_to_favorite(alias, station_name: str, station_uuid_url: str) -> None: + """Add the current station to favorites.""" + try: + response = alias.add_entry(station_name, station_uuid_url) + if not response: + try: + user_input = input("Enter a different name: ") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + sys.exit(0) + + if user_input.strip() != "": + response = alias.add_entry(user_input.strip(), station_uuid_url) + except Exception as e: + log.debug(f"Error: {e}") + log.error("Could not add to favorite. Already in list?") + + +def handle_save_last_station(last_station, station_name: str, station_url: str) -> None: + """Save the last played station.""" + # last_station = Last_station() # Provided as arg now + + last_played_station = {} + last_played_station["name"] = station_name + last_played_station["uuid_or_url"] = station_url + + log.debug(f"Saving the current station: {last_played_station}") + last_station.save_info(last_played_station) + + +def check_sort_by_parameter(sort_by: str) -> str: + """Validate and return the sort parameter.""" + accepted_parameters = [ + "name", + "votes", + "codec", + "bitrate", + "lastcheckok", + "lastchecktime", + "clickcount", + "clicktrend", + "random", + ] + + if sort_by not in accepted_parameters: + log.warning("Sort parameter is unknown. Falling back to 'name'") + + log.warning( + "choose from: name,votes,codec,bitrate,lastcheckok,lastchecktime,clickcount,clicktrend,random" + ) + return "name" + return sort_by + + +def handle_search_stations( + handler, station_name: str, limit: int, sort_by: str, filter_with: str +) -> Any: + """Wrapper to search stations by name.""" + log.debug(f"Searching API for: {station_name}") + return handler.search_by_station_name(station_name, limit, sort_by, filter_with) + + +def handle_station_uuid_play(handler, station_uuid: str) -> Tuple[str, str]: + """Play a station by UUID and register a vote.""" + log.debug(f"Searching API for: {station_uuid}") + + handler.play_by_station_uuid(station_uuid) + + log.debug(f"increased click count for: {station_uuid}") + + handler.vote_for_uuid(station_uuid) + try: + station_name = handler.target_station["name"] + station_url = handler.target_station["url"] + except Exception as e: + log.debug(f"{e}") + log.error("Something went wrong") + sys.exit(1) + + return station_name, station_url + + +def handle_direct_play(alias, station_name_or_url: str = "") -> Tuple[str, str]: + """Play a station directly with UUID or direct stream URL.""" + if "://" in station_name_or_url.strip(): + log.debug("Direct play: URL provided") + # stream URL + # call using URL with no station name N/A + # attempt to get station name from metadata + station_name = handle_get_station_name_from_metadata(station_name_or_url) + return station_name, station_name_or_url + else: + log.debug("Direct play: station name provided") + # station name from fav list + # search for the station in fav list and return name and url + + response = alias.search(station_name_or_url) + if not response: + log.error("No station found on your favorite list with the name") + sys.exit(1) + else: + log.debug(f"Direct play: {response}") + return response["name"], response["uuid_or_url"] + + +def handle_play_last_station(last_station) -> Tuple[str, str]: + """Play the last played station.""" + station_obj = last_station.get_info() + return station_obj["name"], station_obj["uuid_or_url"] + + +def handle_get_station_name_from_metadata(url: str) -> str: + """Get ICY metadata from ffprobe to find station name.""" + log.info("Fetching the station name") + log.debug(f"Attempting to retrieve station name from: {url}") + # Run ffprobe command and capture the metadata + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_entries", + "format=icy", + url, + ] + station_name = "Unknown Station" + + try: + output = subprocess.check_output(cmd).decode("utf-8") + data = json.loads(output) + log.debug(f"station info: {data}") + + # Extract the station name (icy-name) if available + station_name = ( + data.get("format", {}).get("tags", {}).get("icy-name", "Unknown Station") + ) + except Exception: + log.error("Could not fetch the station name") + + return station_name + + +def handle_station_name_from_headers(url: str) -> str: + """ + Get headers from URL to find station name (deprecated). + """ + log.info("Fetching the station name") + log.debug(f"Attempting to retrieve station name from: {url}") + station_name = "Unknown Station" + try: + # sync call, with timeout + response = requests.get(url, timeout=5) + if response.status_code == requests.codes.ok: + if response.headers.get("Icy-Name"): + station_name = response.headers.get("Icy-Name") + else: + log.error("Station name not found") + else: + log.debug(f"Response code received is: {response.status_code}") + except Exception as e: + log.error("Could not fetch the station name") + log.debug(f"An error occurred: {e}") + return station_name + + +def handle_play_random_station(alias) -> Tuple[str, str]: + """Select a random station from favorite menu.""" + log.debug("playing a random station") + alias_map = alias.alias_map + if not alias_map: + log.error("No favorite stations found") + sys.exit(1) + + index = randint(0, len(alias_map) - 1) + station = alias_map[index] + return station["name"], station["uuid_or_url"] diff --git a/radioactive/alias.py b/radioactive/alias.py index 8dd4b84..e94d042 100644 --- a/radioactive/alias.py +++ b/radioactive/alias.py @@ -6,10 +6,12 @@ class Alias: def __init__(self): + from radioactive.paths import get_alias_path + self.alias_map = [] self.found = False - self.alias_path = os.path.join(os.path.expanduser("~"), ".radio-active-alias") + self.alias_path = get_alias_path() def write_stations(self, station_map): """Write stations file from generated map""" diff --git a/radioactive/app.py b/radioactive/app.py index b4502a1..fed2b95 100644 --- a/radioactive/app.py +++ b/radioactive/app.py @@ -3,14 +3,13 @@ it needs to be updated in every release) and to check if an updated version available for the app or not """ -import json -import requests +import json class App: def __init__(self): - self.__VERSION__ = "2.9.1" # change this on every update # + self.__VERSION__ = "3.0.0" # change this on every update # self.pypi_api = "https://pypi.org/pypi/radio-active/json" self.remote_version = "" @@ -27,6 +26,8 @@ def is_update_available(self): """ try: + import requests + remote_data = requests.get(self.pypi_api) remote_data = remote_data.content.decode("utf8") remote_data = json.loads(remote_data) diff --git a/radioactive/args.py b/radioactive/args.py index 72b1717..f87f561 100644 --- a/radioactive/args.py +++ b/radioactive/args.py @@ -5,6 +5,12 @@ from radioactive.config import Configs +try: + from radioactive.feature_flags import RECORDING_FEATURE, SEARCH_FEATURE +except ImportError: + RECORDING_FEATURE = True + SEARCH_FEATURE = True + # load default configs def load_default_configs(): @@ -15,7 +21,6 @@ def load_default_configs(): class Parser: - """Parse the command-line args and return result to the __main__""" def __init__(self): @@ -203,40 +208,41 @@ def __init__(self): help="kill all the ffplay process initiated by radioactive", ) - self.parser.add_argument( - "--record", - "-R", - action="store_true", - dest="record_stream", - default=False, - help="record a station and save as audio file", - ) - - self.parser.add_argument( - "--filepath", - action="store", - dest="record_file_path", - default=self.defaults["filepath"], - help="specify the audio format for recording", - ) - - self.parser.add_argument( - "--filename", - "-N", - action="store", - dest="record_file", - default="", - help="specify the output filename of the recorded audio", - ) - - self.parser.add_argument( - "--filetype", - "-T", - action="store", - dest="record_file_format", - default=self.defaults["filetype"], - help="specify the audio format for recording. auto/mp3", - ) + if RECORDING_FEATURE: + self.parser.add_argument( + "--record", + "-R", + action="store_true", + dest="record_stream", + default=False, + help="record a station and save as audio file", + ) + + self.parser.add_argument( + "--filepath", + action="store", + dest="record_file_path", + default=self.defaults["filepath"], + help="specify the audio format for recording", + ) + + self.parser.add_argument( + "--filename", + "-N", + action="store", + dest="record_file", + default="", + help="specify the output filename of the recorded audio", + ) + + self.parser.add_argument( + "--filetype", + "-T", + action="store", + dest="record_file_format", + default=self.defaults["filetype"], + help="specify the audio format for recording. auto/mp3", + ) self.parser.add_argument( "--player", diff --git a/radioactive/config.py b/radioactive/config.py index 504b1a4..3d3ac38 100644 --- a/radioactive/config.py +++ b/radioactive/config.py @@ -1,17 +1,26 @@ -# load configs from a file and apply. -# If any options are given on command line it will override the configs +""" +Configuration management for radio-active. +Handles loading, saving, and managing user configurations. +""" + import configparser import getpass -import os import sys +from typing import Dict, Any, Optional from zenlog import log -def write_a_sample_config_file(): +def write_a_sample_config_file() -> None: + """ + Create a sample configuration file with default settings. + Checks for the XDG config path and writes the file there. + """ # Create a ConfigParser object config = configparser.ConfigParser() + from radioactive.paths import get_recordings_path + # Add sections and key-value pairs config["AppConfig"] = { "loglevel": "info", @@ -19,18 +28,17 @@ def write_a_sample_config_file(): "sort": "votes", "filter": "none", "volume": "80", - "filepath": "/home/{user}/recordings/radioactive/", + "filepath": get_recordings_path(), "filetype": "mp3", "player": "ffplay", } - # Get the user's home directory - home_directory = os.path.expanduser("~") + try: + from radioactive.paths import get_config_path - # Specify the file path - file_path = os.path.join(home_directory, ".radio-active-configs.ini") + # Specify the file path + file_path = get_config_path() - try: # Write the configuration to the file with open(file_path, "w") as config_file: config.write(config_file) @@ -38,34 +46,57 @@ def write_a_sample_config_file(): log.info(f"A sample default configuration file added at: {file_path}") except Exception as e: - print(f"Error writing the configuration file: {e}") + log.error(f"Error writing the configuration file: {e}") class Configs: + """ + Class to handle loading and parsing of the configuration file. + """ + def __init__(self): - self.config_path = os.path.join( - os.path.expanduser("~"), ".radio-active-configs.ini" - ) + from radioactive.paths import get_config_path + + self.config_path = get_config_path() + self.config: Optional[configparser.ConfigParser] = None - def load(self): + def load(self) -> Dict[str, str]: + """ + Load the configuration file and return options as a dictionary. + + Returns: + dict: The configuration options. + """ self.config = configparser.ConfigParser() try: self.config.read(self.config_path) - options = {} - options["volume"] = self.config.get("AppConfig", "volume") - options["loglevel"] = self.config.get("AppConfig", "loglevel") - options["sort"] = self.config.get("AppConfig", "sort") - options["filter"] = self.config.get("AppConfig", "filter") - options["limit"] = self.config.get("AppConfig", "limit") - options["filepath"] = self.config.get("AppConfig", "filepath") - # if filepath has any placeholder, replace - # {user} to actual user map - options["filepath"] = options["filepath"].replace( - "{user}", getpass.getuser() - ) - options["filetype"] = self.config.get("AppConfig", "filetype") - options["player"] = self.config.get("AppConfig", "player") + options: Dict[str, str] = {} + + # Helper to safely get config values with defaults if section missing + def get_option(key: str, default: str = "") -> str: + try: + return self.config.get("AppConfig", key) + except (configparser.NoSectionError, configparser.NoOptionError): + return default + + options["volume"] = get_option("volume", "80") + options["loglevel"] = get_option("loglevel", "info") + options["sort"] = get_option("sort", "votes") + options["filter"] = get_option("filter", "none") + options["limit"] = get_option("limit", "100") + from radioactive.paths import get_recordings_path + + options["filepath"] = get_option("filepath", get_recordings_path()) + + # if filepath has any placeholder, replace {user} to actual user map + if "{user}" in options["filepath"]: + options["filepath"] = options["filepath"].replace( + "{user}", getpass.getuser() + ) + + options["filetype"] = get_option("filetype", "mp3") + options["player"] = get_option("player", "ffplay") return options @@ -73,5 +104,5 @@ def load(self): log.error(f"Something went wrong while parsing the config file: {e}") # write the example config file write_a_sample_config_file() - log.info("Re-run radioative") + log.info("Re-run radioactive") sys.exit(1) diff --git a/radioactive/feature_flags.py b/radioactive/feature_flags.py new file mode 100644 index 0000000..08b274b --- /dev/null +++ b/radioactive/feature_flags.py @@ -0,0 +1,9 @@ +# This file is auto-generated by the configure step. Do not edit manually. + +MINIMAL_FEATURE = False +RECORDING_FEATURE = True +TRACK_FEATURE = True +SEARCH_FEATURE = True +CYCLE_FEATURE = True +INFO_FEATURE = True +TIMER_FEATURE = True diff --git a/radioactive/ffplay.py b/radioactive/ffplay.py index aeca804..dc07139 100644 --- a/radioactive/ffplay.py +++ b/radioactive/ffplay.py @@ -1,3 +1,7 @@ +""" +Module for handling FFplay process management to play radio streams. +""" + import os import signal import subprocess @@ -5,12 +9,16 @@ import threading from shutil import which from time import sleep +from typing import Optional, List, Any import psutil from zenlog import log -def kill_background_ffplays(): +def kill_background_ffplays() -> None: + """ + Kill all background 'ffplay' processes started by this user. + """ all_processes = psutil.process_iter(attrs=["pid", "name"]) count = 0 # Iterate through the processes and terminate those named "ffplay" @@ -29,28 +37,43 @@ def kill_background_ffplays(): # Handle exceptions, such as processes that no longer exist or access denied log.debug("Could not terminate a ffplay processes!") if count == 0: - log.info("No background radios are running!") + pass + # log.info("No background radios are running!") class Ffplay: - def __init__(self, URL, volume, loglevel): + """ + Wrapper class to manage the FFplay process for audio playback. + """ + + def __init__(self, URL: str, volume: int, loglevel: str): self.program_name = "ffplay" self.url = URL self.volume = volume self.loglevel = loglevel self.is_playing = False - self.process = None + self.process: Optional[subprocess.Popen] = None + self.exe_path: Optional[str] = None + self.is_running = False self._check_ffplay_installation() self.start_process() - def _check_ffplay_installation(self): + def _check_ffplay_installation(self) -> None: + """Check if ffplay is installed and available in PATH.""" self.exe_path = which(self.program_name) if self.exe_path is None: log.critical("FFplay not found, install it first please") sys.exit(1) - def _construct_ffplay_commands(self): + def _construct_ffplay_commands(self) -> List[str]: + """Construct the command line arguments for ffplay.""" + # Ensure volume is within valid range (0-100) though ffplay accepts 0-100 + # Actually ffplay volume is 0-100 + + if self.exe_path is None: + raise RuntimeError("FFplay executable path is not set") + ffplay_commands = [self.exe_path, "-volume", f"{self.volume}", "-vn", self.url] if self.loglevel == "debug": @@ -60,7 +83,8 @@ def _construct_ffplay_commands(self): return ffplay_commands - def start_process(self): + def start_process(self) -> None: + """Start the ffplay process.""" try: ffplay_commands = self._construct_ffplay_commands() self.process = subprocess.Popen( @@ -76,37 +100,60 @@ def start_process(self): self._start_error_thread() except Exception as e: - log.error("Error while starting radio: {}".format(e)) + log.error(f"Error while starting radio: {e}") + self.is_playing = False - def _start_error_thread(self): + def _start_error_thread(self) -> None: + """Start a thread to monitor stderr for errors.""" error_thread = threading.Thread(target=self._check_error_output) error_thread.daemon = True error_thread.start() - def _check_error_output(self): + def _check_error_output(self) -> None: + """Monitor stderr for errors.""" + if not self.process or not self.process.stderr: + return + while self.is_running: - stderr_result = self.process.stderr.readline() - if stderr_result: - self._handle_error(stderr_result) - self.is_running = False - self.stop() - sleep(2) - - def _handle_error(self, stderr_result): + try: + stderr_result = self.process.stderr.readline() + if stderr_result: + self._handle_error(stderr_result) + self.is_running = False + self.stop() + break + except ValueError: + # ValueError: I/O operation on closed file. + break + except Exception: + break + sleep(0.5) + + def _handle_error(self, stderr_result: str) -> None: + """Log the error message.""" print() - log.error("Could not connect to the station") + log.error("Could not connect to the station/stream") try: log.debug(stderr_result) - log.error(stderr_result.split(": ")[1]) + parts = stderr_result.split(": ") + if len(parts) > 1: + log.error(parts[1].strip()) + else: + log.error(stderr_result.strip()) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error parsing stderr: {e}") pass - def terminate_parent_process(self): + def terminate_parent_process(self) -> None: + """Signal the parent process (main app) to terminate.""" parent_pid = os.getppid() - os.kill(parent_pid, signal.SIGINT) + try: + os.kill(parent_pid, signal.SIGINT) + except Exception as e: + log.debug(f"Could not kill parent process: {e}") - def is_active(self): + def is_active(self) -> bool: + """Check if the ffplay process is currently active/running.""" if not self.process: log.warning("Process is not initialized") return False @@ -124,25 +171,29 @@ def is_active(self): return False except (psutil.NoSuchProcess, Exception) as e: - log.debug("Process not found or error while checking status: {}".format(e)) + log.debug(f"Process not found or error checking status: {e}") return False - def play(self): + def play(self) -> None: + """Resume or start playback.""" if not self.is_playing: self.start_process() - def stop(self): - if self.is_playing: + def stop(self) -> None: + """Stop playback and terminate the process.""" + if self.is_playing and self.process: + self.is_running = False # Stop the error thread loop try: - self.process.kill() - self.process.wait(timeout=5) + self.process.terminate() + try: + self.process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=2) + log.debug("Radio playback stopped successfully") - except subprocess.TimeoutExpired: - log.warning("Radio process did not terminate, killing...") - self.process.kill() except Exception as e: - log.error("Error while stopping radio: {}".format(e)) - raise + log.error(f"Error while stopping radio: {e}") finally: self.is_playing = False self.process = None @@ -150,10 +201,10 @@ def stop(self): log.debug("Radio is not currently playing") self.terminate_parent_process() - def toggle(self): + def toggle(self) -> None: + """Toggle playback state.""" if self.is_playing: log.debug("Stopping the ffplay process") - self.is_running = False self.stop() else: log.debug("Starting the ffplay process") diff --git a/radioactive/filter.py b/radioactive/filter.py index 3f5caba..e43bf74 100644 --- a/radioactive/filter.py +++ b/radioactive/filter.py @@ -1,34 +1,52 @@ +""" +Module for filtering radio station results based on various criteria. +""" + import sys +from typing import List, Dict, Any, Union from zenlog import log -# function to filter strings -def _filter_entries_by_key(data, filter_param, key): +def _filter_entries_by_key( + data: List[Dict[str, Any]], filter_param: str, key: str +) -> List[Dict[str, Any]]: + """ + Filter list of dictionaries by a string key using inclusion (=) or exclusion (!=). + """ log.debug(f"filter: {filter_param}") filtered_entries = [] for entry in data: value = entry.get(key) - - if value is not None and value != "": - if "!=" in filter_param: - # Handle exclusion - exclusion_values = filter_param.split("!=")[1].split(",") - + # Ensure value is a string for comparison + if value is None: + continue + + str_value = str(value) + if str_value == "": + continue + + if "!=" in filter_param: + # Handle exclusion + # Splitting safely to avoid index errors + parts = filter_param.split("!=") + if len(parts) > 1: + exclusion_values = parts[1].split(",") if all( - exclusion_value.lower() not in value.lower() + exclusion_value.lower() not in str_value.lower() for exclusion_value in exclusion_values ): filtered_entries.append(entry) - elif "=" in filter_param: - # Handle inclusion - inclusion_values = filter_param.split("=")[1].split(",") - + elif "=" in filter_param: + # Handle inclusion + parts = filter_param.split("=") + if len(parts) > 1: + inclusion_values = parts[1].split(",") if any( - inclusion_value.lower() in value.lower() + inclusion_value.lower() in str_value.lower() for inclusion_value in inclusion_values ): filtered_entries.append(entry) @@ -36,74 +54,103 @@ def _filter_entries_by_key(data, filter_param, key): return filtered_entries -# function to filter numeric values -def _filter_entries_by_numeric_key(data, filter_param, key): +def _filter_entries_by_numeric_key( + data: List[Dict[str, Any]], filter_param: str, key: str +) -> List[Dict[str, Any]]: + """ + Filter list of dictionaries by a numeric key. + Supports <, >, and = operators. + """ filtered_entries = [] - # filter_key = filter_param.split(key)[0] # most left hand of the expression - filter_param = filter_param.split(key)[1] # portion after the operator - filter_operator = filter_param[0] # operator part - filter_value = int(filter_param[1:]) # value part - # log.debug(f"filter: parameter:{filter_param}") + try: + # Split logic needs to be robust. + # Expected format: keyOPvalue e.g. votes>100 + # We know the key, so we can split by key + parts = filter_param.split(key) + if len(parts) < 2: + log.warning(f"Invalid filter format: {filter_param}") + return data + + param_part = parts[1] # portion after the key name e.g. >100, =50 + if not param_part: + return data + + filter_operator = param_part[0] # operator part + filter_value_str = param_part[1:] # value part + + if not filter_value_str: + log.warning(f"No value provided for filter: {filter_param}") + return data + + filter_value = int(filter_value_str) + + for entry in data: + val = entry.get(key) + if val is not None: + try: + # Convert to int, default to 0 if fails + int_val = int(val) + except (ValueError, TypeError): + continue - for entry in data: - value = int(entry.get(key)) - - if value is not None: - try: if filter_operator not in [">", "<", "="]: - log.warning("Unsupported filter operator, not filtering !!") + log.warning(f"Unsupported filter operator: {filter_operator}") return data - if filter_operator == "<" and value < filter_value: + + if filter_operator == "<" and int_val < filter_value: filtered_entries.append(entry) - elif filter_operator == ">" and value > filter_value: + elif filter_operator == ">" and int_val > filter_value: filtered_entries.append(entry) - elif filter_operator == "=" and value == filter_value: + elif filter_operator == "=" and int_val == filter_value: filtered_entries.append(entry) - except ValueError: - log.error(f"Invalid filter value for {key}: {filter_param}") - sys.exit(1) + except ValueError: + log.error(f"Invalid numeric filter value for {key}: {filter_param}") + sys.exit(1) + except Exception as e: + log.error(f"Error filtering by numeric key {key}: {e}") + sys.exit(1) return filtered_entries # allowed string string filters -def _filter_entries_by_name(data, filter_param): +def _filter_entries_by_name(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="name") -def _filter_entries_by_language(data, filter_param): +def _filter_entries_by_language(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="language") -def _filter_entries_by_country(data, filter_param): +def _filter_entries_by_country(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="countrycode") -def _filter_entries_by_tags(data, filter_param): +def _filter_entries_by_tags(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="tags") -def _filter_entries_by_codec(data, filter_param): +def _filter_entries_by_codec(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_key(data, filter_param, key="codec") # allowed numeric filters -def _filter_entries_by_votes(data, filter_param): +def _filter_entries_by_votes(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="votes") -def _filter_entries_by_bitrate(data, filter_param): +def _filter_entries_by_bitrate(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="bitrate") -def _filter_entries_by_clickcount(data, filter_param): +def _filter_entries_by_clickcount(data: List[Dict], filter_param: str) -> List[Dict]: return _filter_entries_by_numeric_key(data, filter_param, key="clickcount") # top level filter function -def _filter_results(data, expression): +def _filter_results(data: List[Dict], expression: str) -> List[Dict]: log.debug(f"Filter exp: {expression}") if not data: log.error("Empty results") @@ -126,15 +173,18 @@ def _filter_results(data, expression): elif "votes" in expression: return _filter_entries_by_votes(data, expression) else: - log.warning("Unknown filter expression, not filtering!") + log.warning(f"Unknown filter expression: {expression}, not filtering!") return data # Top most function for multiple filtering expressions with '&' -# NOTE: it will filter maintaining the order you provided on the CLI - - -def filter_expressions(data, input_expression): +def filter_expressions( + data: List[Dict[str, Any]], input_expression: str +) -> List[Dict[str, Any]]: + """ + Filter the list of stations based on the input expression. + Supports multiple filters separated by '&'. + """ log.info( "Setting a higher value for the --limit parameter is preferable when filtering stations." ) diff --git a/radioactive/handler.py b/radioactive/handler.py index 2a5e905..7798b5a 100644 --- a/radioactive/handler.py +++ b/radioactive/handler.py @@ -5,6 +5,7 @@ import datetime import json import sys +from typing import List, Optional, Union, Dict, Any import requests_cache from pyradios import RadioBrowser @@ -14,91 +15,112 @@ from radioactive.filter import filter_expressions +try: + from radioactive.feature_flags import MINIMAL_FEATURE +except ImportError: + MINIMAL_FEATURE = False + +# constants +DEFAULT_CACHE_RETENTION_DAYS = 3 +BYTES_TO_MB_DIVISOR = 1024 * 1024 + console = Console() -def trim_string(text, max_length=40): +def trim_string(text: str, max_length: int = 40) -> str: """ Trim a string to a maximum length and add ellipsis if needed. Args: - text (str): The input text to be trimmed. - max_length (int, optional): The maximum length of the trimmed string. Defaults to 40. + text (str): The input text to be trimmed. + max_length (int, optional): The maximum length of the trimmed string. Defaults to 40. Returns: - str: The trimmed string, possibly with an ellipsis (...) if it was shortened. + str: The trimmed string, possibly with an ellipsis (...) if it was shortened. """ + if not isinstance(text, str): + return str(text) + if len(text) > max_length: return text[:max_length] + "..." - else: - return text + return text -def print_table(response, columns, sort_by, filter_expression): +def print_table( + response: List[Dict[str, Any]], + columns: List[str], + sort_by: str, + filter_expression: str, +) -> List[Dict[str, Any]]: """ Print the table applying the sort logic. Args: - response (list): A list of data to be displayed in the table. - columns (list): List of column specifications in the format "col_name:response_key@max_str". - sort_by (str): The column by which to sort the table. + response (list): A list of data to be displayed in the table. + columns (list): List of column specifications in the format "col_name:response_key@max_str". + sort_by (str): The column by which to sort the table. + filter_expression (str): Filter expression to apply strings. Returns: - list: The original response data. + list: The original (or filtered) response data. """ + if MINIMAL_FEATURE: + columns = [ + col + for col in columns + if all(x not in col for x in ["Tags", "Country", "Language"]) + ] + if len(response) > 10: + response = response[:10] + if not response: log.error("No stations found") - sys.exit(1) + # sys.exit(1) + return [] - # need to filter? + # Apply filtering if needed if filter_expression.lower() != "none": response = filter_expressions(response, filter_expression) if not response: log.error("No stations found after filtering") - sys.exit(1) + # sys.exit(1) + return [] else: log.debug("Not filtering") - if len(response) >= 1: + if response: table = Table( show_header=True, header_style="magenta", expand=True, min_width=85, safe_box=True, - # show_footer=True, - # show_lines=True, - # padding=0.1, - # collapse_padding=True, ) table.add_column("ID", justify="center") + parsed_columns = [] for col_spec in columns: - col_name, response_key, max_str = ( - col_spec.split(":")[0], - col_spec.split(":")[1].split("@")[0], - int(col_spec.split("@")[1]), - ) + parts = col_spec.split(":") + col_name = parts[0] + rest = parts[1].split("@") + response_key = rest[0] + max_str = int(rest[1]) + + parsed_columns.append((col_name, response_key, max_str)) table.add_column(col_name, justify="left") - # do not need extra columns for these cases + # Add the sort column if it's not already displayed (and not generic 'name' or 'random') if sort_by not in ["name", "random"]: table.add_column(sort_by, justify="left") for i, station in enumerate(response): row_data = [str(i + 1)] # for ID - for col_spec in columns: - col_name, response_key, max_str = ( - col_spec.split(":")[0], - col_spec.split(":")[1].split("@")[0], - int(col_spec.split("@")[1]), - ) - row_data.append( - trim_string(station.get(response_key, ""), max_length=max_str) - ) + for _, response_key, max_str in parsed_columns: + val = station.get(response_key, "") + row_data.append(trim_string(val, max_length=max_str)) if sort_by not in ["name", "random"]: row_data.append(str(station.get(sort_by, ""))) @@ -106,18 +128,16 @@ def print_table(response, columns, sort_by, filter_expression): table.add_row(*row_data) console.print(table) - # log.info( - # "If the table does not fit into your screen, \ntry to maximize the window, decrease the font by a bit, and retry" - # ) return response else: log.info("No stations found") - sys.exit(0) + # Do not exit if no stations found, just return empty + return [] class Handler: """ - radio-browser API handler. This module communicates with the underlying API via PyRadios + radio-browser API handler. This module communicates with the underlying API via PyRadios. """ def __init__(self): @@ -127,45 +147,78 @@ def __init__(self): # When RadioBrowser can not be initiated properly due to no internet (probably) try: - expire_after = datetime.timedelta(days=3) + expire_after = datetime.timedelta(days=DEFAULT_CACHE_RETENTION_DAYS) session = requests_cache.CachedSession( cache_name="cache", backend="sqlite", expire_after=expire_after ) self.API = RadioBrowser(session=session) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error initializing RadioBrowser: {e}") log.critical("Something is wrong with your internet connection") sys.exit(1) - def get_country_code(self, name): + def get_country_code(self, name: str) -> Optional[str]: + """ + Get the ISO 3166-1 alpha-2 country code for a given country name. + + Args: + name (str): The name of the country. + + Returns: + str: The country code if found, None otherwise. + """ self.countries = self.API.countries() for country in self.countries: if country["name"].lower() == name.lower(): return country["iso_3166_1"] return None - def validate_uuid_station(self): - if len(self.response) == 1: + def validate_uuid_station(self) -> List[Dict[str, Any]]: + """ + Validate that a station UUID search returned exactly one result and register a click. + + Returns: + list: The response list containing the station details. + """ + if self.response and len(self.response) >= 1: + # We take the first one if multiple (unlikely for UUID but possible in theory) log.debug(json.dumps(self.response[0], indent=3)) self.target_station = self.response[0] # register a valid click to increase its popularity - self.API.click_counter(self.target_station["stationuuid"]) + self.vote_for_uuid(self.target_station["stationuuid"]) return self.response + log.error("Station found by UUID is invalid or empty") + sys.exit(1) + # ---------------------------- NAME -------------------------------- # - def search_by_station_name(self, _name, limit, sort_by, filter_with): - """search and play a station by its name""" - reversed = sort_by != "name" + def search_by_station_name( + self, name: str, limit: int, sort_by: str, filter_with: str + ) -> List[Dict[str, Any]]: + """ + Search and play a station by its name. + + Args: + name (str): Station name to search for. + limit (int): Max number of results. + sort_by (str): Field to sort by. + filter_with (str): Filter expression. + + Returns: + list: List of found stations. + """ + # Rename 'reversed' to avoid shadowing built-in + is_reverse = sort_by != "name" try: response = self.API.search( - name=_name, + name=name, name_exact=False, limit=limit, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) return print_table( response, @@ -174,44 +227,56 @@ def search_by_station_name(self, _name, limit, sort_by, filter_with): filter_expression=filter_with, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error in search_by_station_name: {e}") log.error("Something went wrong. please try again.") sys.exit(1) # ------------------------- UUID ------------------------ # - def play_by_station_uuid(self, _uuid): - """search and play station by its stationuuid""" + def play_by_station_uuid(self, uuid: str) -> List[Dict[str, Any]]: + """ + Search and play station by its stationuuid. + + Args: + uuid (str): The UUID of the station. + + Returns: + list: Confirmed station details. + """ try: - self.response = self.API.station_by_uuid(_uuid) + self.response = self.API.station_by_uuid(uuid) return self.validate_uuid_station() except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error in play_by_station_uuid: {e}") log.error("Something went wrong. please try again.") sys.exit(1) # -------------------------- COUNTRY ----------------------# - def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with): - # set reverse to false if name is is the parameter for sorting - reversed = sort_by != "name" + def discover_by_country( + self, country_code_or_name: str, limit: int, sort_by: str, filter_with: str + ) -> List[Dict[str, Any]]: + """ + Discover stations by country code or name. + """ + is_reverse = sort_by != "name" # check if it is a code or name if len(country_code_or_name.strip()) == 2: # it's a code - log.debug("Country code '{}' provided".format(country_code_or_name)) + log.debug(f"Country code '{country_code_or_name}' provided") try: response = self.API.search( countrycode=country_code_or_name, limit=limit, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error searching by country code: {e}") log.error("Something went wrong. please try again.") sys.exit(1) else: # it's name - log.debug("Country name '{}' provided".format(country_code_or_name)) + log.debug(f"Country name '{country_code_or_name}' provided") code = self.get_country_code(country_code_or_name) if code: try: @@ -220,10 +285,10 @@ def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with) limit=limit, country_exact=True, order=str(sort_by), - reverse=reversed, + reverse=is_reverse, ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error searching by country name: {e}") log.error("Something went wrong. please try again.") sys.exit(1) else: @@ -246,14 +311,18 @@ def discover_by_country(self, country_code_or_name, limit, sort_by, filter_with) # ------------------- by state --------------------- - def discover_by_state(self, state, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_state( + self, state: str, limit: int, sort_by: str, filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by state.""" + is_reverse = sort_by != "name" try: response = self.API.search( - state=state, limit=limit, order=str(sort_by), reverse=reversed + state=state, limit=limit, order=str(sort_by), reverse=is_reverse ) - except Exception: + except Exception as e: + log.debug(f"Error discover_by_state: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -272,15 +341,18 @@ def discover_by_state(self, state, limit, sort_by, filter_with): # -----------------by language -------------------- - def discover_by_language(self, language, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_language( + self, language: str, limit: int, sort_by: str, filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by language.""" + is_reverse = sort_by != "name" try: response = self.API.search( - language=language, limit=limit, order=str(sort_by), reverse=reversed + language=language, limit=limit, order=str(sort_by), reverse=is_reverse ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error discover_by_language: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -297,15 +369,18 @@ def discover_by_language(self, language, limit, sort_by, filter_with): ) # -------------------- by tag ---------------------- # - def discover_by_tag(self, tag, limit, sort_by, filter_with): - reversed = sort_by != "name" + def discover_by_tag( + self, tag: str, limit: int, sort_by: str, filter_with: str + ) -> List[Dict[str, Any]]: + """Discover stations by tag.""" + is_reverse = sort_by != "name" try: response = self.API.search( - tag=tag, limit=limit, order=str(sort_by), reverse=reversed + tag=tag, limit=limit, order=str(sort_by), reverse=is_reverse ) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error discover_by_tag: {e}") log.error("Something went wrong. please try again.") sys.exit(1) @@ -322,9 +397,11 @@ def discover_by_tag(self, tag, limit, sort_by, filter_with): ) # ---- Increase click count ------------- # - def vote_for_uuid(self, UUID): + def vote_for_uuid(self, uuid: str) -> Optional[Dict]: + """Increase the click count for a station UUID.""" try: - result = self.API.click_counter(UUID) + result = self.API.click_counter(uuid) return result except Exception as e: - log.debug("Something went wrong during increasing click count:{}".format(e)) + log.debug(f"Something went wrong during increasing click count: {e}") + return None diff --git a/radioactive/help.py b/radioactive/help.py index 6a25412..572ce2b 100644 --- a/radioactive/help.py +++ b/radioactive/help.py @@ -124,7 +124,7 @@ def show_help(): table.add_row( "--filepath", "Path to save the recorded audio", - f"{user}/Music/radioactive", + f"{user}/radioactive/recordings", ) table.add_row( diff --git a/radioactive/last_station.py b/radioactive/last_station.py index a24be3a..42eb36a 100644 --- a/radioactive/last_station.py +++ b/radioactive/last_station.py @@ -8,7 +8,6 @@ class Last_station: - """Saves the last played radio station information, when user don't provide any -S or -U it looks for the information. @@ -17,11 +16,9 @@ class Last_station: """ def __init__(self): - self.last_station_path = None + from radioactive.paths import get_last_station_path - self.last_station_path = os.path.join( - os.path.expanduser("~"), ".radio-active-last-station" - ) + self.last_station_path = get_last_station_path() def get_info(self): try: diff --git a/radioactive/mpv.py b/radioactive/mpv.py index 872dbb1..3cebe76 100644 --- a/radioactive/mpv.py +++ b/radioactive/mpv.py @@ -52,3 +52,7 @@ def toggle(self): self.stop() else: self.start(self.url) + + def play(self): + if not self.is_running and self.url: + self.start(self.url) diff --git a/radioactive/parser.py b/radioactive/parser.py index 0eedd35..61c1782 100644 --- a/radioactive/parser.py +++ b/radioactive/parser.py @@ -1,12 +1,21 @@ +from typing import Dict, Any, Optional + from zenlog import log from radioactive.args import Parser -def parse_options(): +def parse_options() -> Dict[str, Any]: + """ + Parse command-line arguments and return a dictionary of options. + + Returns: + dict: A dictionary containing all the parsed options and their values. + """ parser = Parser() args = parser.parse() - options = {} + options: Dict[str, Any] = {} + # ----------------- all the args ------------- # options["version"] = args.version options["show_help_table"] = args.help @@ -17,14 +26,14 @@ def parse_options(): log.level(options["loglevel"]) else: log.level("info") - log.warning("Correct log levels are: error,warning,info(default),debug") + log.warning("Correct log levels are: error, warning, info(default), debug") # check is limit is a valid integer limit = args.limit options["limit"] = int(limit) if limit else 100 log.debug("limit is set to: {}".format(limit)) - options["search_station_name"] = args.search_station_name + options["search_station_name"] = getattr(args, "search_station_name", None) options["search_station_uuid"] = args.search_station_uuid options["play_last_station"] = args.play_last_station @@ -48,10 +57,10 @@ def parse_options(): options["kill_ffplays"] = args.kill_ffplays - options["record_stream"] = args.record_stream - options["record_file"] = args.record_file - options["record_file_format"] = args.record_file_format - options["record_file_path"] = args.record_file_path + options["record_stream"] = getattr(args, "record_stream", False) + options["record_file"] = getattr(args, "record_file", "") + options["record_file_format"] = getattr(args, "record_file_format", "mp3") + options["record_file_path"] = getattr(args, "record_file_path", "") options["target_url"] = "" options["volume"] = args.volume diff --git a/radioactive/paths.py b/radioactive/paths.py new file mode 100644 index 0000000..37133ca --- /dev/null +++ b/radioactive/paths.py @@ -0,0 +1,131 @@ +import os +import shutil +import sys +from zenlog import log + + +def get_user_home(): + """ + Get the user's home directory in a cross-platform way. + """ + return os.path.expanduser("~") + + +def get_base_dir(): + """ + Return the base directory for radioactive files: ~/radioactive + This acts as the central storage for config, data, and recordings + as per user request. + """ + home = get_user_home() + base_dir = os.path.join(home, "radioactive") + + try: + os.makedirs(base_dir, exist_ok=True) + except Exception as e: + # If we can't create the base dir, we are in trouble, + # but we log it and proceed (might crash later if not handled) + log.error(f"Could not create base directory {base_dir}: {e}") + + return base_dir + + +def _migrate_file(legacy_path, new_path, description): + """ + Migrate a file from legacy_path to new_path if it exists. + """ + if os.path.exists(legacy_path) and not os.path.exists(new_path): + log.info(f"Migrating {description} from {legacy_path} to {new_path}") + try: + # Ensure the directory exists + os.makedirs(os.path.dirname(new_path), exist_ok=True) + shutil.move(legacy_path, new_path) + except Exception as e: + log.warning(f"Could not migrate {description}: {e}") + + +def get_config_path(): + """ + Get the path to the configuration file: ~/radioactive/config.ini + """ + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "config.ini") + + home = get_user_home() + + # 1. ~/.radio-active-configs.ini + legacy_dot_path = os.path.join(home, ".radio-active-configs.ini") + _migrate_file(legacy_dot_path, new_path, "config file (dotfile)") + + # 2. XDG locations (from previous attempts) + # ~/.config/radioactive/config.ini + xdg_path_new = os.path.join(home, ".config", "radioactive", "config.ini") + _migrate_file(xdg_path_new, new_path, "config file (xdg-new)") + + # ~/.config/radio-active/config.ini + xdg_path_old = os.path.join(home, ".config", "radio-active", "config.ini") + _migrate_file(xdg_path_old, new_path, "config file (xdg-old)") + + return new_path + + +def get_alias_path(): + """ + Get the path to the alias (favorites) file: ~/radioactive/alias_map + """ + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "alias_map") + + home = get_user_home() + + # 1. ~/.radio-active-alias + legacy_dot_path = os.path.join(home, ".radio-active-alias") + _migrate_file(legacy_dot_path, new_path, "alias file (dotfile)") + + # 2. XDG locations + xdg_path_new = os.path.join(home, ".config", "radioactive", "alias_map") + _migrate_file(xdg_path_new, new_path, "alias file (xdg-new)") + + xdg_path_old = os.path.join(home, ".config", "radio-active", "alias_map") + _migrate_file(xdg_path_old, new_path, "alias file (xdg-old)") + + return new_path + + +def get_last_station_path(): + """ + Get the path to the last played station file: ~/radioactive/last_station + """ + base_dir = get_base_dir() + new_path = os.path.join(base_dir, "last_station") + + home = get_user_home() + + # 1. ~/.radio-active-last_station + legacy_dot_path = os.path.join(home, ".radio-active-last_station") + _migrate_file(legacy_dot_path, new_path, "last station file (dotfile)") + + # 2. XDG locations (usually in local/share, but we check config too just in case) + xdg_data_new = os.path.join(home, ".local", "share", "radioactive", "last_station") + _migrate_file(xdg_data_new, new_path, "last station file (xdg-new)") + + xdg_data_old = os.path.join(home, ".local", "share", "radio-active", "last_station") + _migrate_file(xdg_data_old, new_path, "last station file (xdg-old)") + + return new_path + + +def get_recordings_path(): + """ + Get the path for recordings: ~/radioactive/recordings + """ + base_dir = get_base_dir() + recordings_path = os.path.join(base_dir, "recordings") + + try: + os.makedirs(recordings_path, exist_ok=True) + except Exception as e: + log.error(f"Could not create recordings directory {recordings_path}: {e}") + # Not exiting here, hoping the caller handles it or it works next time + + return recordings_path diff --git a/radioactive/ui.py b/radioactive/ui.py new file mode 100644 index 0000000..86d812a --- /dev/null +++ b/radioactive/ui.py @@ -0,0 +1,128 @@ +""" +UI components for radio-active using Rich. +""" + +from rich import print +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from zenlog import log + +# Global variable to store current station info for display +# This is shared state, ideally should be managed better, but keeping for compatibility +global_current_station_info = {} + + +def handle_welcome_screen() -> None: + """Print the welcome screen panel.""" + welcome = Panel( + """ + :radio: Play any radios around the globe right from this Terminal [yellow]:zap:[/yellow]! + :smile: Author: Dipankar Pal + :question: Type '--help' for more details on available commands + :bug: Visit: https://github.com/deep5050/radio-active to submit issues + :star: Show some love by starring the project on GitHub [red]:heart:[/red] + :dollar: You can donate me at https://deep5050.github.io/payme/ + :x: Press Ctrl+C to quit + """, + title="[b]RADIOACTIVE[/b]", + width=85, + expand=True, + safe_box=True, + ) + print(welcome) + + +def handle_update_screen(app) -> None: + """ + Check for updates and print a message if available. + + Args: + app: The App instance to check for updates. + """ + if app.is_update_available(): + update_msg = ( + "\t[blink]An update available, run [green][italic]pip install radio-active==" + + app.get_remote_version() + + "[/italic][/green][/blink]\nSee the changes: https://github.com/deep5050/radio-active/blob/main/CHANGELOG.md" + ) + update_panel = Panel( + update_msg, + width=85, + ) + print(update_panel) + else: + log.debug("Update not available") + + +def handle_favorite_table(alias) -> None: + """ + Print the user's favorite list in a table. + + Args: + alias: The Alias instance containing the favorite map. + """ + table = Table( + show_header=True, + header_style="bold magenta", + min_width=85, + safe_box=False, + expand=True, + ) + table.add_column("Station", justify="left") + table.add_column("URL / UUID", justify="left") + + if len(alias.alias_map) > 0: + for entry in alias.alias_map: + table.add_row(entry["name"], entry["uuid_or_url"]) + print(table) + log.info(f"Your favorite stations are saved in {alias.alias_path}") + else: + log.info("You have no favorite station list") + + +def handle_show_station_info() -> None: + """Show important information regarding the current station.""" + # pylint: disable=global-statement + global global_current_station_info + custom_info = {} + try: + custom_info["name"] = global_current_station_info.get("name") + custom_info["uuid"] = global_current_station_info.get("stationuuid") + custom_info["url"] = global_current_station_info.get("url") + custom_info["website"] = global_current_station_info.get("homepage") + custom_info["country"] = global_current_station_info.get("country") + custom_info["language"] = global_current_station_info.get("language") + custom_info["tags"] = global_current_station_info.get("tags") + custom_info["codec"] = global_current_station_info.get("codec") + custom_info["bitrate"] = global_current_station_info.get("bitrate") + print(custom_info) + except Exception as e: + log.error(f"No station information available: {e}") + + +def handle_current_play_panel(curr_station_name: str = "") -> None: + """ + Print the currently playing station panel. + + Args: + curr_station_name (str): Name of the station. + """ + panel_station_name = Text(curr_station_name, justify="center") + + station_panel = Panel(panel_station_name, title="[blink]:radio:[/blink]", width=85) + console = Console() + console.print(station_panel) + + +def set_global_station_info(info: dict) -> None: + """Helper to update global station info from other modules.""" + global global_current_station_info + global_current_station_info = info + + +def get_global_station_info() -> dict: + """Helper to get global station info.""" + global global_current_station_info + return global_current_station_info diff --git a/radioactive/utilities.py b/radioactive/utilities.py index 4bbf41f..5b619c7 100644 --- a/radioactive/utilities.py +++ b/radioactive/utilities.py @@ -1,304 +1,83 @@ -"""Handler functions for __main__.py""" +""" +Handler functions for __main__.py. +Acts as a controller/orchestrator, delegating to UI and Actions modules. +""" -import datetime -import json -import os -import subprocess import sys +import threading +import time +import os from random import randint +from typing import Any, Dict, List, Optional, Tuple, Union -import requests from pick import pick -from rich import print -from rich.console import Console -from rich.panel import Panel -from rich.table import Table -from rich.text import Text +from pick import pick from zenlog import log +try: + from radioactive.feature_flags import ( + RECORDING_FEATURE, + TRACK_FEATURE, + SEARCH_FEATURE, + CYCLE_FEATURE, + INFO_FEATURE, + TIMER_FEATURE, + ) +except ImportError: + RECORDING_FEATURE = True + TRACK_FEATURE = True + SEARCH_FEATURE = True + CYCLE_FEATURE = True + INFO_FEATURE = True + TIMER_FEATURE = True + +# Re-export functions for backward compatibility and aggregation +from radioactive.ui import ( + handle_welcome_screen, + handle_update_screen, + handle_favorite_table, + handle_show_station_info, + handle_current_play_panel, + set_global_station_info, + get_global_station_info, +) + +from radioactive.actions import ( + handle_fetch_song_title, + handle_record, + handle_add_station, + handle_add_to_favorite, + handle_save_last_station, + check_sort_by_parameter, + handle_search_stations, + handle_station_uuid_play, + handle_direct_play, + handle_play_last_station, + handle_get_station_name_from_metadata, + handle_station_name_from_headers, + handle_play_random_station, +) from radioactive.ffplay import kill_background_ffplays -from radioactive.last_station import Last_station -from radioactive.recorder import record_audio_auto_codec, record_audio_from_url + RED_COLOR = "\033[91m" END_COLOR = "\033[0m" -global_current_station_info = {} - - -def handle_fetch_song_title(url): - """Fetch currently playing track information""" - log.info("Fetching the current track info") - log.debug("Attempting to retrieve track info from: {}".format(url)) - # Run ffprobe command and capture the metadata - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_entries", - "format=icy", - url, - ] - track_name = "" - - try: - output = subprocess.check_output(cmd).decode("utf-8") - data = json.loads(output) - log.debug(f"station info: {data}") - - # Extract the station name (icy-name) if available - track_name = data.get("format", {}).get("tags", {}).get("StreamTitle", "") - except: - log.error("Error while fetching the track name") - - if track_name != "": - log.info(f"🎶: {track_name}") - else: - log.error("No track information available") - - -def handle_record( - target_url, - curr_station_name, - record_file_path, - record_file, - record_file_format, # auto/mp3 - loglevel, -): - log.info("Press 'q' to stop recording") - force_mp3 = False - - if record_file_format != "mp3" and record_file_format != "auto": - record_file_format = "mp3" # default to mp3 - log.debug("Error: wrong codec supplied!. falling back to mp3") - force_mp3 = True - elif record_file_format == "auto": - log.debug("Codec: fetching stream codec") - codec = record_audio_auto_codec(target_url) - if codec is None: - record_file_format = "mp3" # default to mp3 - force_mp3 = True - log.debug("Error: could not detect codec. falling back to mp3") - else: - record_file_format = codec - log.debug("Codec: found {}".format(codec)) - elif record_file_format == "mp3": - # always save to mp3 to eliminate any runtime issues - # it is better to leave it on libmp3lame - force_mp3 = True - - if record_file_path and not os.path.exists(record_file_path): - log.debug("filepath: {}".format(record_file_path)) - os.makedirs(record_file_path, exist_ok=True) - - elif not record_file_path: - log.debug("filepath: fallback to default path") - record_file_path = os.path.join( - os.path.expanduser("~"), "Music/radioactive" - ) # fallback path - try: - os.makedirs(record_file_path, exist_ok=True) - except Exception as e: - log.debug("{}".format(e)) - log.error("Could not make default directory") - sys.exit(1) - - now = datetime.datetime.now() - month_name = now.strftime("%b").upper() - # Format AM/PM as 'AM' or 'PM' - am_pm = now.strftime("%p") - - # format is : day-monthname-year@hour-minute-second-(AM/PM) - formatted_date_time = now.strftime(f"%d-{month_name}-%Y@%I-%M-%S-{am_pm}") - - if not record_file_format.strip(): - record_file_format = "mp3" - - if not record_file: - record_file = "{}-{}".format( - curr_station_name.strip(), formatted_date_time - ).replace(" ", "-") - - tmp_filename = f"{record_file}.{record_file_format}" - outfile_path = os.path.join(record_file_path, tmp_filename) - - log.info(f"Recording will be saved as: \n{outfile_path}") - - record_audio_from_url(target_url, outfile_path, force_mp3, loglevel) - - -def handle_welcome_screen(): - welcome = Panel( - """ - :radio: Play any radios around the globe right from this Terminal [yellow]:zap:[/yellow]! - :smile: Author: Dipankar Pal - :question: Type '--help' for more details on available commands - :bug: Visit: https://github.com/deep5050/radio-active to submit issues - :star: Show some love by starring the project on GitHub [red]:heart:[/red] - :dollar: You can donate me at https://deep5050.github.io/payme/ - :x: Press Ctrl+C to quit - """, - title="[b]RADIOACTIVE[/b]", - width=85, - expand=True, - safe_box=True, - ) - print(welcome) - - -def handle_update_screen(app): - if app.is_update_available(): - update_msg = ( - "\t[blink]An update available, run [green][italic]pip install radio-active==" - + app.get_remote_version() - + "[/italic][/green][/blink]\nSee the changes: https://github.com/deep5050/radio-active/blob/main/CHANGELOG.md" - ) - update_panel = Panel( - update_msg, - width=85, - ) - print(update_panel) - else: - log.debug("Update not available") - - -def handle_favorite_table(alias): - # log.info("Your favorite station list is below") - table = Table( - show_header=True, - header_style="bold magenta", - min_width=85, - safe_box=False, - expand=True, - ) - table.add_column("Station", justify="left") - table.add_column("URL / UUID", justify="left") - if len(alias.alias_map) > 0: - for entry in alias.alias_map: - table.add_row(entry["name"], entry["uuid_or_url"]) - print(table) - log.info(f"Your favorite stations are saved in {alias.alias_path}") - else: - log.info("You have no favorite station list") - - -def handle_show_station_info(): - """Show important information regarding the current station""" - global global_current_station_info - custom_info = {} - try: - custom_info["name"] = global_current_station_info["name"] - custom_info["uuid"] = global_current_station_info["stationuuid"] - custom_info["url"] = global_current_station_info["url"] - custom_info["website"] = global_current_station_info["homepage"] - custom_info["country"] = global_current_station_info["country"] - custom_info["language"] = global_current_station_info["language"] - custom_info["tags"] = global_current_station_info["tags"] - custom_info["codec"] = global_current_station_info["codec"] - custom_info["bitrate"] = global_current_station_info["bitrate"] - print(custom_info) - except: - log.error("No station information available") - - -def handle_add_station(alias): - try: - left = input("Enter station name:") - right = input("Enter station stream-url or radio-browser uuid:") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - sys.exit(0) - - if left.strip() == "" or right.strip() == "": - log.error("Empty inputs not allowed") - sys.exit(1) - alias.add_entry(left, right) - log.info("New entry: {}={} added\n".format(left, right)) - sys.exit(0) - - -def handle_add_to_favorite(alias, station_name, station_uuid_url): - try: - response = alias.add_entry(station_name, station_uuid_url) - if not response: - try: - user_input = input("Enter a different name: ") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - sys.exit(0) - - if user_input.strip() != "": - response = alias.add_entry(user_input.strip(), station_uuid_url) - except Exception as e: - log.debug("Error: {}".format(e)) - log.error("Could not add to favorite. Already in list?") - - -def handle_station_uuid_play(handler, station_uuid): - log.debug("Searching API for: {}".format(station_uuid)) - - handler.play_by_station_uuid(station_uuid) - log.debug("increased click count for: {}".format(station_uuid)) - - handler.vote_for_uuid(station_uuid) - try: - station_name = handler.target_station["name"] - station_url = handler.target_station["url"] - except Exception as e: - log.debug("{}".format(e)) - log.error("Something went wrong") - sys.exit(1) - - return station_name, station_url - - -def check_sort_by_parameter(sort_by): - accepted_parameters = [ - "name", - "votes", - "codec", - "bitrate", - "lastcheckok", - "lastchecktime", - "clickcount", - "clicktrend", - "random", - ] - - if sort_by not in accepted_parameters: - log.warning("Sort parameter is unknown. Falling back to 'name'") - - log.warning( - "choose from: name,votes,codec,bitrate,lastcheckok,lastchecktime,clickcount,clicktrend,random" - ) - return "name" - return sort_by - - -def handle_search_stations(handler, station_name, limit, sort_by, filter_with): - log.debug("Searching API for: {}".format(station_name)) - - return handler.search_by_station_name(station_name, limit, sort_by, filter_with) - - -def handle_station_selection_menu(handler, last_station, alias): +def handle_station_selection_menu(handler, last_station, alias) -> Tuple[str, str]: + """ + Show a selection menu for favorite stations. + """ # Add a selection list here. first entry must be the last played station # try to fetch the last played station's information last_station_info = {} try: last_station_info = last_station.get_info() except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error: {e}") # no last station?? pass - # log.info("You can search for a station on internet using the --search option") title = "Please select a station from your favorite list:" station_selection_names = [] station_selection_urls = [] @@ -311,7 +90,7 @@ def handle_station_selection_menu(handler, last_station, alias): try: station_selection_urls.append(last_station_info["stationuuid"]) except Exception as e: - log.debug("Error: {}".format(e)) + log.debug(f"Error: {e}") station_selection_urls.append(last_station_info["uuid_or_url"]) fav_stations = alias.alias_map @@ -343,127 +122,14 @@ def handle_station_selection_menu(handler, last_station, alias): return handle_station_uuid_play(handler, station_uuid) -def handle_save_last_station(last_station, station_name, station_url): - last_station = Last_station() - - last_played_station = {} - last_played_station["name"] = station_name - last_played_station["uuid_or_url"] = station_url - - log.debug(f"Saving the current station: {last_played_station}") - last_station.save_info(last_played_station) - - -def handle_listen_keypress( - alias, - player, - target_url, - station_name, - station_url, - record_file_path, - record_file, - record_file_format, - loglevel, -): - log.info("Press '?' to see available commands\n") - while True: - try: - user_input = input("Enter a command to perform an action: ") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - kill_background_ffplays() - sys.exit(0) - - if user_input in ["r", "R", "record"]: - handle_record( - target_url, - station_name, - record_file_path, - record_file, - record_file_format, - loglevel, - ) - elif user_input in ["rf", "RF", "recordfile"]: - # if no filename is provided try to auto detect - # else if ".mp3" is provided, use libmp3lame to force write to mp3 - try: - user_input = input("Enter output filename: ") - except EOFError: - print() - log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") - kill_background_ffplays() - sys.exit(0) - - # try to get extension from filename - try: - file_name, file_ext = user_input.split(".") - if file_ext == "mp3": - log.debug("codec: force mp3") - # overwrite original codec with "mp3" - record_file_format = "mp3" - else: - log.warning("You can only specify mp3 as file extension.\n") - log.warning( - "Do not provide any extension to autodetect the codec.\n" - ) - except: - file_name = user_input - - if user_input.strip() != "": - handle_record( - target_url, - station_name, - record_file_path, - file_name, - record_file_format, - loglevel, - ) - elif user_input in ["i", "I", "info"]: - handle_show_station_info() - - elif user_input in ["f", "F", "fav"]: - handle_add_to_favorite(alias, station_name, station_url) - - elif user_input in ["q", "Q", "quit"]: - # kill_background_ffplays() - player.stop() - sys.exit(0) - elif user_input in ["w", "W", "list"]: - alias.generate_map() - handle_favorite_table(alias) - elif user_input in ["t", "T", "track"]: - handle_fetch_song_title(target_url) - elif user_input in ["p", "P"]: - # toggle the player (start/stop) - player.toggle() - # TODO: toggle the player - - elif user_input in ["h", "H", "?", "help"]: - log.info("p: Play/Pause current station") - log.info("t/track: Current track info") - log.info("i/info: Station information") - log.info("r/record: Record a station") - log.info("rf/recordfile: Specify a filename for the recording") - log.info("f/fav: Add station to favorite list") - log.info("h/help/?: Show this help message") - log.info("q/quit: Quit radioactive") - - -def handle_current_play_panel(curr_station_name=""): - panel_station_name = Text(curr_station_name, justify="center") - - station_panel = Panel(panel_station_name, title="[blink]:radio:[/blink]", width=85) - console = Console() - console.print(station_panel) - - -def handle_user_choice_from_search_result(handler, response): - global global_current_station_info - +def handle_user_choice_from_search_result(handler, response) -> Tuple[str, str]: + """ + Handle user selection from search results. + """ if not response: log.debug("No result found!") sys.exit(0) + if len(response) == 1: # single station found log.debug("Exactly one result found") @@ -476,7 +142,8 @@ def handle_user_choice_from_search_result(handler, response): if user_input in ["y", "Y"]: log.debug("Playing UUID from single response") - global_current_station_info = response[0] + # Update global info - handled via helper to ensure UI sees it + set_global_station_info(response[0]) return handle_station_uuid_play(handler, response[0]["stationuuid"]) else: @@ -507,116 +174,314 @@ def handle_user_choice_from_search_result(handler, response): user_input = int(user_input) - 1 # because ID starts from 1 if user_input in range(0, len(response)): target_response = response[user_input] - log.debug("Selected: {}".format(target_response)) - # log.info("UUID: {}".format(target_response["stationuuid"])) + log.debug(f"Selected: {target_response}") # saving global info - global_current_station_info = target_response + set_global_station_info(target_response) return handle_station_uuid_play(handler, target_response["stationuuid"]) else: log.error("Please enter an ID within the range") sys.exit(1) - except: - log.err("Please enter an valid ID number") + except ValueError: + log.error("Please enter an valid ID number") + sys.exit(1) + except Exception as e: + log.error(f"Error: {e}") sys.exit(1) -def handle_direct_play(alias, station_name_or_url=""): - """Play a station directly with UUID or direct stream URL""" - if "://" in station_name_or_url.strip(): - log.debug("Direct play: URL provided") - # stream URL - # call using URL with no station name N/A - # let's attempt to get station name from url headers - # station_name = handle_station_name_from_headers(station_name_or_url) - station_name = handle_get_station_name_from_metadata(station_name_or_url) - return station_name, station_name_or_url - else: - log.debug("Direct play: station name provided") - # station name from fav list - # search for the station in fav list and return name and url +def handle_listen_keypress( + alias, + player, + target_url, + station_name, + station_url, + record_file_path, + record_file, + record_file_format, + loglevel, + handler=None, + station_list=None, +) -> None: + """ + Listen for user input during playback to perform actions. + Now with handler and station_list for runtime commands. + """ + log.info("Press '?' to see available commands\n") + while True: + try: + user_input = input("Enter a command to perform an action: ") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + kill_background_ffplays() + sys.exit(0) - response = alias.search(station_name_or_url) - if not response: - log.error("No station found on your favorite list with the name") - sys.exit(1) - else: - log.debug("Direct play: {}".format(response)) - return response["name"], response["uuid_or_url"] - - -def handle_play_last_station(last_station): - station_obj = last_station.get_info() - return station_obj["name"], station_obj["uuid_or_url"] - - -# uses ffprobe to fetch station name -def handle_get_station_name_from_metadata(url): - """Get ICY metadata from ffprobe""" - log.info("Fetching the station name") - log.debug("Attempting to retrieve station name from: {}".format(url)) - # Run ffprobe command and capture the metadata - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_entries", - "format=icy", - url, - ] - station_name = "Unknown Station" + if RECORDING_FEATURE: + if user_input in ["r", "R", "record"]: + handle_record( + target_url, + station_name, + record_file_path, + record_file, + record_file_format, + loglevel, + ) + elif user_input in ["rf", "RF", "recordfile"]: + try: + user_input = input("Enter output filename: ") + except EOFError: + print() + log.debug("Ctrl+D (EOF) detected. Exiting gracefully.") + kill_background_ffplays() + sys.exit(0) + + # try to get extension from filename + try: + file_name_parts = user_input.split(".") + if len(file_name_parts) > 1 and file_name_parts[-1] == "mp3": + log.debug("codec: force mp3") + # overwrite original codec with "mp3" + record_file_format = "mp3" + file_name = user_input.rsplit(".", 1)[ + 0 + ] # Handle filename with dots + else: + if len(file_name_parts) > 1 and file_name_parts[-1] != "mp3": + log.warning("You can only specify mp3 as file extension.\n") + log.warning( + "Do not provide any extension to autodetect the codec.\n" + ) + file_name = user_input + except Exception: + file_name = user_input + + if user_input.strip() != "": + handle_record( + target_url, + station_name, + record_file_path, + file_name, + record_file_format, + loglevel, + ) - try: - output = subprocess.check_output(cmd).decode("utf-8") - data = json.loads(output) - log.debug(f"station info: {data}") + if INFO_FEATURE and user_input in ["i", "I", "info"]: + handle_show_station_info() - # Extract the station name (icy-name) if available - station_name = ( - data.get("format", {}).get("tags", {}).get("icy-name", "Unknown Station") - ) - except: - log.error("Could not fetch the station name") + elif TIMER_FEATURE and user_input in ["timer", "sleep"]: + try: + duration_str = input("Enter sleep timer duration in minutes: ") + duration = float(duration_str) + if duration <= 0: + log.error("Duration must be positive") + continue + + log.info(f"Sleep timer set for {duration} minutes") + + def stop_playback(): + log.info("\nSleep timer finished. Stopping playback...") + # We need to stop the player and exit. + # Since we are in a thread, we can't easily exit the main input loop cleanly + # without some signal, but sys.exit() or os._exit() should work strong enough. + if player: + player.stop() + kill_background_ffplays() + log.info("Exiting...") + os._exit(0) # Force exit from thread + + t = threading.Timer(duration * 60, stop_playback) + t.daemon = True # Ensure it doesn't block exit if we quit manually + t.start() + + except ValueError: + log.error("Invalid number") + except Exception as e: + log.error(f"Error setting timer: {e}") - return station_name + elif user_input in ["f", "F", "fav"]: + handle_add_to_favorite(alias, station_name, station_url) + elif user_input in ["q", "Q", "quit"]: + player.stop() + sys.exit(0) -# uses requests module to fetch station name [deprecated] -def handle_station_name_from_headers(url): - # Get headers from URL so that we can get radio station - log.info("Fetching the station name") - log.debug("Attempting to retrieve station name from: {}".format(url)) - station_name = "Unknown Station" - try: - # sync call, with timeout - response = requests.get(url, timeout=5) - if response.status_code == requests.codes.ok: - if response.headers.get("Icy-Name"): - station_name = response.headers.get("Icy-Name") + elif user_input in ["w", "W", "list"]: + alias.generate_map() + handle_favorite_table(alias) + + elif TRACK_FEATURE and user_input in ["t", "T", "track"]: + handle_fetch_song_title(target_url) + + elif user_input in ["p", "P"]: + player.toggle() + + elif SEARCH_FEATURE and user_input in ["s", "S", "search"]: + if handler: + try: + query = input("Enter station name to search: ") + except EOFError: + continue + + if query.strip(): + temp_station_list = handle_search_stations( + handler, query, limit=100, sort_by="votes", filter_with="none" + ) + if temp_station_list: + station_list = temp_station_list + # Find valid station choice + try: + station_name, target_url = ( + handle_user_choice_from_search_result( + handler, station_list + ) + ) + # Stop current, switch + player.stop() + player.url = target_url + player.play() + handle_current_play_panel(station_name) + # Update loop variables + station_url = target_url + except SystemExit: + # handle_user_choice might try to exit on cancel + pass else: - log.error("Station name not found") - else: - log.debug("Response code received is: {}".format(response.status_code())) - except Exception as e: - # except requests.HTTPError and requests.exceptions.ReadTimeout as e: - log.error("Could not fetch the station name") - log.debug( - """An error occurred: {} - The response code was {}""".format( - e, e.errno - ) - ) - return station_name + log.warning("Search unavailable (handler not initialized)") + + elif CYCLE_FEATURE and user_input in ["n", "N", "next"]: + target_list = [] + source_type = "" + + # Prioritize search results if available + if station_list and len(station_list) > 0: + target_list = station_list + source_type = "search" + elif alias and alias.alias_map: + target_list = alias.alias_map + source_type = "favorite" + + if target_list: + # Find current index + current_info = get_global_station_info() + current_uuid = current_info.get("stationuuid") + current_url = current_info.get("url") # for direct URLs + + current_index = -1 + + # Try to find current station in the target list + for idx, st in enumerate(target_list): + if source_type == "search": + if st.get("stationuuid") == current_uuid: + current_index = idx + break + elif source_type == "favorite": + # Favorites use uuid_or_url + val = st.get("uuid_or_url") + # Check against both uuid and url to be safe + if val == current_uuid or val == current_url: + current_index = idx + break + # Also check name as fallback + if st.get("name") == current_info.get("name"): + current_index = idx + break + + # Next index + next_index = (current_index + 1) % len(target_list) + + # Try to play next valid station + attempts = 0 + max_attempts = len(target_list) + + while attempts < max_attempts: + target_station = target_list[next_index] + log.info(f"Switching to: {target_station.get('name')}") + + # Determine how to play based on available info + # We need to simulate the "Selection" logic + + try: + new_station_name = "" + new_target_url = "" + + if source_type == "search": + # It's a full station object + set_global_station_info(target_station) + new_station_name, new_target_url = handle_station_uuid_play( + handler, target_station["stationuuid"] + ) + else: + # Favorite entry: {'name':..., 'uuid_or_url':...} + # Construct a temporary info object for global state + uuid_or_url = target_station["uuid_or_url"] + + temp_info = { + "name": target_station["name"], + "uuid_or_url": uuid_or_url, + # We might not know if it is a UUID or URL yet for sure without helper, + # but let's try to populate what we can + } + + if "://" in uuid_or_url: + # Direct URL + temp_info["url"] = uuid_or_url + set_global_station_info(temp_info) + new_station_name = target_station["name"] + new_target_url = uuid_or_url + # Allow direct play without UUID handler + else: + # UUID + temp_info["stationuuid"] = uuid_or_url + set_global_station_info(temp_info) + new_station_name, new_target_url = ( + handle_station_uuid_play(handler, uuid_or_url) + ) + + # Check if we have a URL to play + if new_target_url: + player.stop() + player.url = new_target_url + player.play() + handle_current_play_panel(new_station_name) + station_url = new_target_url + station_name = new_station_name + target_url = new_target_url + break + else: + raise Exception("Could not resolve station URL") + + except Exception as e: + log.error(f"Failed to play {target_station.get('name')}: {e}") + next_index = (next_index + 1) % len(target_list) + attempts += 1 + + if attempts >= max_attempts: + log.error("Could not play any station from the list") + else: + log.warning( + "Cycle/Next unavailable (no search results or favorites to cycle through)" + ) -def handle_play_random_station(alias): - """Select a random station from favorite menu""" - log.debug("playing a random station") - alias_map = alias.alias_map - index = randint(0, len(alias_map) - 1) - station = alias_map[index] - return station["name"], station["uuid_or_url"] + elif user_input in ["h", "H", "?", "help"]: + log.info("p: Play/Pause current station") + if TRACK_FEATURE: + log.info("t/track: Current track info") + if INFO_FEATURE: + log.info("i/info: Station information") + if RECORDING_FEATURE: + log.info("r/record: Record a station") + log.info("rf/recordfile: Specify a filename for the recording") + log.info("f/fav: Add station to favorite list") + if SEARCH_FEATURE: + log.info("s/search: Search for a new station") + if CYCLE_FEATURE: + log.info( + "n/next: Play result from next station searching or favorite list" + ) + if TIMER_FEATURE: + log.info("timer/sleep: Set a sleep timer") + log.info("h/help/?: Show this help message") + log.info("q/quit: Quit radioactive") diff --git a/radioactive/vlc.py b/radioactive/vlc.py index 872ee29..a564d44 100644 --- a/radioactive/vlc.py +++ b/radioactive/vlc.py @@ -9,6 +9,30 @@ class VLC: def __init__(self): self.program_name = "vlc" self.exe_path = which(self.program_name) + + # Check common locations on Windows + if self.exe_path is None and sys.platform.startswith("win"): + import os + + common_paths = [ + os.path.join( + os.environ.get("ProgramFiles", "C:\\Program Files"), + "VideoLAN", + "VLC", + "vlc.exe", + ), + os.path.join( + os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)"), + "VideoLAN", + "VLC", + "vlc.exe", + ), + ] + for path in common_paths: + if os.path.exists(path): + self.exe_path = path + break + log.debug(f"{self.program_name}: {self.exe_path}") if self.exe_path is None: @@ -52,3 +76,7 @@ def toggle(self): self.stop() else: self.start(self.url) + + def play(self): + if not self.is_running and self.url: + self.start(self.url) diff --git a/tmp_config/radio-active/config.ini b/tmp_config/radio-active/config.ini new file mode 100644 index 0000000..9fd498c --- /dev/null +++ b/tmp_config/radio-active/config.ini @@ -0,0 +1,10 @@ +[AppConfig] +loglevel = info +limit = 100 +sort = votes +filter = none +volume = 80 +filepath = /home/{user}/recordings/radioactive/ +filetype = mp3 +player = ffplay +