diff --git a/.gitignore b/.gitignore index 1b3f391..f26b576 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ dist *.txt *.srt *.zip -venv \ No newline at end of file +venv +debug diff --git a/Dockerfile b/Dockerfile index 01ec2bd..93b5b62 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,15 @@ -FROM python:3 +FROM python:3.9-slim WORKDIR /workspace/llvd + +RUN pip install --upgrade pip setuptools + COPY ./ ./ + RUN python setup.py install WORKDIR /courses -ENTRYPOINT [ "llvd" ] -CMD [ "--help" ] +ENTRYPOINT ["llvd"] + +CMD ["--help"] \ No newline at end of file diff --git a/llvd/__init__.py b/llvd/__init__.py index ddd3e86..dd77ffc 100755 --- a/llvd/__init__.py +++ b/llvd/__init__.py @@ -1,3 +1,3 @@ # -*- encoding: utf-8 -*- """Linkedin Learning Video Downloader.""" -__version__ = "3.0.8" +__version__ = "3.0.9" diff --git a/llvd/app.py b/llvd/app.py index 6d3f302..06183b3 100644 --- a/llvd/app.py +++ b/llvd/app.py @@ -1,6 +1,6 @@ +import sys import os import re -import sys import requests import click import json @@ -11,11 +11,13 @@ from llvd.downloader import download_subtitles, download_video, download_exercises from click_spinner import spinner import re -from llvd.utils import clean_name +from llvd.utils import clean_name, cleanup_empty_directories import click import sys from llvd import config import subprocess +import datetime +import time class App: @@ -25,6 +27,9 @@ def __init__( self.email = email self.password = password self.course_slug = course_slug[0] + self.course_download_dir = f"./{self.course_slug}" + self.downloaded_videos = set() + self._load_downloaded_videos() self.course_type = course_slug[1] self.link = "" self.video_format = resolution @@ -37,6 +42,50 @@ def __init__( self.current_chapter_index = None self.current_video_name = "" self.throttle = throttle + self.debug_mode = True + + # Initialize summary tracking + self.summary = { + 'courses_processed': 0, + 'videos': { + 'total': 0, + 'downloaded': 0, + 'skipped': 0, + 'failed': 0, + 'already_exist': 0 + }, + 'chapters': { + 'total': 0, + 'empty': 0, + 'deleted': 0 + }, + 'errors': [] + } + + def _load_downloaded_videos(self): + """Load the set of already downloaded videos""" + self.downloaded_videos = set() + if not os.path.exists(self.course_download_dir): + return + + for root, _, files in os.walk(self.course_download_dir): + for file in files: + if file.endswith(".mp4"): + # Extract the video name without extension and any numbering + video_name = os.path.splitext(file)[0] + # Remove any numbering at the start (e.g., '01. ') + video_name = re.sub(r"^\d+\.\s*", "", video_name) + self.downloaded_videos.add(video_name) + + def _is_video_downloaded(self, video_name): + """Check if a video has already been downloaded""" + clean_video_name = re.sub(r'[\\/*?:"<>|]', "", video_name) + return clean_video_name in self.downloaded_videos + + def _mark_video_downloaded(self, video_name): + """Mark a video as downloaded""" + clean_video_name = re.sub(r'[\\/*?:"<>|]', "", video_name) + self.downloaded_videos.add(clean_video_name) def login(self, session, login_data): @@ -152,28 +201,16 @@ def download(self): except TypeError as e: print("retrying...") self.download_entire_course() - # click.echo( - # click.style( - # f"TypeError: {e}\n", - # fg="red", - # ) - # ) except ConnectionResetError: - click.echo( - click.style( - f"ConnectionResetError: There is a connection error. Please check your connectivity.\n", - fg="red", - ) - ) + self._start_modified_spinner("...") + if self.debug_mode: + self._save_debug_info(e, self.course_slug, "download") except requests.exceptions.ConnectionError: - click.echo( - click.style( - f"ConnectionError: There is a connection error. Please check your connectivity.\n", - fg="red", - ) - ) + self._start_modified_spinner("...") + if self.debug_mode: + self._save_debug_info(e, self.course_slug, "download") def download_courses_from_path(self): @@ -205,49 +242,250 @@ def download_courses_from_path(self): self.download_entire_course(skip_done_alert=suppress) except EmptyCourseList as e: - click.echo( - click.style( - f"EmptyCourseList: Error parsing learning path.\n{e}", fg="red" - ) - ) + self._start_modified_spinner("...") + if self.debug_mode: + self._save_debug_info(e, self.course_slug, "download_courses_from_path") except Exception as e: - click.echo( - click.style( - f"Error fetching courses from learning path!\n{e}", fg="red" - ) - ) + self._start_modified_spinner("...") + if self.debug_mode: + self._save_debug_info(e, self.course_slug, "download_courses_from_path") def fetch_video(self, video): + """ + Fetch video data in the highest available resolution (1080p with fallback to 720p) + and save response for debugging if in debug mode + """ video_name = re.sub(r'[\\/*?:"<>|]', "", video["title"]) self.current_video_name = video_name video_slug = video["slug"] - video_url = config.video_url.format( - self.course_slug, self.video_format, video_slug - ) - page_data = requests.get( - video_url, - cookies=self.cookies, - headers=self.headers, - allow_redirects=False, + + resolutions_to_try = [self.video_format] + if self.video_format == "1080": + resolutions_to_try = ["1080", "720"] + + last_exception = None + + for resolution in resolutions_to_try: + self.video_format = resolution + video_url = config.video_url.format( + self.course_slug, resolution, video_slug + ) + + try: + page_data = requests.get( + video_url, + cookies=self.cookies, + headers=self.headers, + allow_redirects=False, + timeout=30 + ) + try: + page_json = page_data.json() + + # Check for locked/premium content + if "elements" in page_json and page_json["elements"] and \ + isinstance(page_json["elements"][0], dict): + + element = page_json["elements"][0] + is_locked = element.get("isLocked", False) or element.get("lockedState") == "LOCKED" + requires_subscription = element.get("requiresSubscription", False) + + if is_locked or requires_subscription: + error_info = { + "status": "locked_content", + "url": video_url, + "is_locked": is_locked, + "requires_subscription": requires_subscription, + "video_slug": video_slug, + "video_name": video_name, + "response_metadata": { + "status_code": page_data.status_code, + "content_type": page_data.headers.get("content-type") + } + } + if self.debug_mode: + self._save_debug_info( + {**error_info, "full_response": page_json}, + video_slug, + "locked_content" + ) + raise ValueError("This video is locked or requires a premium subscription") + + # Save successful response for debugging + if self.debug_mode and page_data.status_code == 200: + self._save_debug_info( + { + "status": "success", + "url": video_url, + "resolution": resolution, + "response": page_json, + "video_slug": video_slug, + "video_name": video_name, + "headers": dict(page_data.headers), + "status_code": page_data.status_code + }, + video_slug, + f"success_{resolution}p" + ) + + download_url = self._extract_video_url(page_json, video_slug, video_name) + if not download_url: + if resolution == "1080" and "720" in resolutions_to_try: + continue + raise ValueError("No video URL found") + + break + + except ValueError as e: + last_exception = e + if resolution == "1080" and "720" in resolutions_to_try: + continue + raise + + except requests.exceptions.RequestException as e: + last_exception = e + if resolution == "1080" and "720" in resolutions_to_try: + continue + raise + else: + if last_exception: + raise last_exception + raise ValueError("Failed to fetch video data") + + # Get subtitles if available (from the last successful response) + subtitles = page_json["elements"][0].get("selectedVideo", {}).get("transcript") + duration_in_ms = int(page_json["elements"][0].get("selectedVideo", {}).get("durationInSeconds", 0)) * 1000 + + click.echo( + click.style( + f"\nCurrent: {self.current_chapter_index:02d}. {self.chapter_path.split('/')[-1]}/" + f"{self.current_video_index:02d}. {video_name}.mp4 @{resolution}p" + ) ) - page_json = page_data.json() - return page_json, video_name + try: + download_video( + download_url, + self.current_video_index, + video_name, + self.chapter_path, + self.throttle, + ) + + # Only try to download subtitles if video download was successful + if subtitles and self.caption: + try: + download_subtitles( + self.current_video_index, + subtitles.get("lines", []), + video_name, + self.chapter_path, + duration_in_ms, + ) + except Exception as e: + click.echo(click.style( + f"[WARNING] Failed to download subtitles: {str(e)}", + fg="yellow" + )) + + except Exception as e: + self._start_modified_spinner("...") + if self.debug_mode: + self._save_debug_info( + { + "status": "download_failed", + "url": download_url, + "resolution": resolution, + "error": str(e), + "video_slug": video_slug, + "video_name": video_name + }, + video_slug, + "download_failed" + ) + raise + finally: + # Restore the original video format + self.video_format = original_format + + def _extract_video_url(self, page_json, video_slug, video_name): + """Extract video URL from the JSON response with multiple fallback methods""" + if not page_json or "elements" not in page_json or not page_json["elements"]: + return None + + element = page_json["elements"][0] + selected_video = element.get("selectedVideo", {}) + + # Method 1: Get from selectedVideo.url.progressiveUrl + if selected_video.get("url") and isinstance(selected_video["url"], dict): + download_url = selected_video["url"].get("progressiveUrl") + if download_url: + return download_url + + # Method 2: Check video formats array + if selected_video.get("formats"): + for fmt in selected_video["formats"]: + if fmt.get("type") == "progressive" and fmt.get("url"): + return fmt["url"] + if selected_video["formats"] and selected_video["formats"][0].get("url"): + return selected_video["formats"][0]["url"] + + # Method 3: Check alternative locations + if "video" in element and "playback" in element["video"]: + playback = element["video"]["playback"] + if isinstance(playback, dict): + return playback.get("progressiveUrl") or playback.get("url") + + # Save debug info if no URL found + if self.debug_mode: + self._save_debug_info( + { + "status": "no_video_url_found", + "element_keys": list(element.keys()) if isinstance(element, dict) else [], + "selected_video_keys": list(selected_video.keys()) if isinstance(selected_video, dict) else [] + }, + video_slug, + "no_video_url" + ) + + return None + + def _save_debug_info(self, response_data, video_slug, error_type): + """Save debug information for failed downloads""" + debug_dir = os.path.join(self.course_download_dir, "_debug") + os.makedirs(debug_dir, exist_ok=True) + + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}_{video_slug}_{error_type}.json" + filepath = os.path.join(debug_dir, filename) + + try: + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(response_data, f, indent=2, ensure_ascii=False) + return filepath + except Exception as e: + click.echo(f"[DEBUG] Failed to save debug info: {str(e)}") + return None def fetch_chapter(self, chapter, chapters_pad_length, delay): - + """ + Downloads all videos in a chapter with enhanced error handling and debugging + """ chapter_name = chapter["title"] videos = chapter["videos"] chapters_index_padded = str(self.current_chapter_index).rjust( chapters_pad_length, "0" ) - chapter_path = ( - f"./{self.course_slug}/{chapters_index_padded}. {clean_name(chapter_name)}" + chapter_path = os.path.join( + self.course_download_dir, + f"{chapters_index_padded}. {clean_name(chapter_name)}" ) video_index = 1 - self.chapter_path = ( - f"./{self.course_slug}/{chapters_index_padded}. {clean_name(chapter_name)}" - ) + self.chapter_path = chapter_path + + # Update summary + self.summary['chapters']['total'] += 1 + if not os.path.exists(chapter_path): os.makedirs(chapter_path) @@ -257,146 +495,292 @@ def fetch_chapter(self, chapter, chapters_pad_length, delay): ff = re.split(r"\d+\. ", file)[1].replace(".mp4", "") current_files.append(ff) - # unique videos by checking if the video name is in the current files - videos = [ - video for video in videos if clean_name(video["title"]) not in current_files - ] - + # Filter out already downloaded videos + videos_to_download = [] for video in videos: + video_name = re.sub(r'[\\/*?:"<>|]', "", video["title"]) + if clean_name(video_name) not in current_files: + videos_to_download.append(video) + else: + self.summary['videos']['already_exist'] += 1 + + self.summary['videos']['total'] += len(videos) + + for video in videos_to_download: self.current_video_index = video_index + len(current_files) - page_json, video_name = self.fetch_video(video) + video_name = re.sub(r'[\\/*?:"<>|]', "", video["title"]) + video_slug = video.get("slug") + self.current_video_name = video_name + + if not video_slug: + click.echo(click.style( + f"[WARNING] Video '{video_name}' has no slug, skipping...", + fg="yellow" + )) + self._save_debug_info(video, "no_slug", "missing_slug") + self.summary['videos']['skipped'] += 1 + video_index += 1 + continue try: - download_url = page_json["elements"][0]["selectedVideo"]["url"][ - "progressiveUrl" - ] + # Fetch video data try: - subtitles = page_json["elements"][0]["selectedVideo"]["transcript"] - except: - click.echo(click.style(f"Subtitles not found", fg="red")) - subtitles = None - duration_in_ms = ( - int(page_json["elements"][0]["selectedVideo"]["durationInSeconds"]) - * 1000 - ) + page_json, video_name = self.fetch_video(video) + except ValueError as e: + if "locked" in str(e).lower() or "premium" in str(e).lower(): + click.echo(click.style( + f"[WARNING] {str(e)} - Skipping video: {video_name}", + fg="yellow" + )) + self.summary['videos']['skipped'] += 1 + video_index += 1 + continue + raise + + # Extract video URL using the helper method + download_url = self._extract_video_url(page_json, video_slug, video_name) + if not download_url: + click.echo(click.style( + f"[ERROR] Could not find video URL for '{video_name}'", + fg="red" + )) + self._save_debug_info(page_json, video_slug, "no_video_url") + self.summary['videos']['failed'] += 1 + video_index += 1 + continue + + # Get subtitles if available + selected_video = page_json["elements"][0].get("selectedVideo", {}) + subtitles = selected_video.get("transcript") + duration_in_ms = int(selected_video.get("durationInSeconds", 0)) * 1000 click.echo( click.style( f"\nCurrent: {chapters_index_padded}. {clean_name(chapter_name)}/" - f"{video_index + len(current_files):0=2d}. {video_name}.mp4 @{self.video_format}p" - ) - ) - except Exception as e: - if "url" in str(e): - click.echo( - click.style( - f"This video is locked, you probably " - f"need a premium account", - fg="red", - ) - ) - else: - click.echo( - click.style(f"Failed to download {video_name}", fg="red") + f"{self.current_video_index:02d}. {video_name}.mp4 @{self.video_format}p" ) - finally: - download_video( - download_url, - self.current_video_index, - video_name, - chapter_path, - delay, ) - if subtitles is not None and self.caption: - subtitle_lines = subtitles["lines"] - download_subtitles( + + # Download the video + try: + download_video( + download_url, self.current_video_index, - subtitle_lines, video_name, chapter_path, - duration_in_ms, + delay, + ) + + # Only try to download subtitles if video download was successful + if subtitles and self.caption: + try: + download_subtitles( + self.current_video_index, + subtitles.get("lines", []), + video_name, + chapter_path, + duration_in_ms, + ) + except Exception as e: + click.echo(click.style( + f"[WARNING] Failed to download subtitles: {str(e)}", + fg="yellow" + )) + + # Mark video as successfully downloaded + self._mark_video_downloaded(video_name) + self.summary['videos']['downloaded'] += 1 + + except Exception as e: + self._start_modified_spinner("...") + self.summary['videos']['failed'] += 1 + self.summary['errors'].append(f"Failed to download '{video_name}': {str(e)}") + if self.debug_mode: + self._save_debug_info( + {"error": str(e), "download_url": download_url}, + video_slug, + "download_failed" + ) + + + except Exception as e: + self._start_modified_spinner("...") + self.summary['videos']['failed'] += 1 + self.summary['errors'].append(f"Error processing '{video_name}': {str(e)}") + if self.debug_mode: + self._save_debug_info( + {"error": str(e), "video": video}, + video_slug, + "processing_error" ) video_index += 1 - + + # Check if chapter is empty after processing + try: + entries = os.listdir(chapter_path) + if not entries: + self.summary['chapters']['empty'] += 1 + except OSError as e: + self.summary['errors'].append(f"Error checking chapter directory {chapter_path}: {str(e)}") + def download_entire_course(self, *args, **kwargs): - - self.remove_failed_downloads() + skip_done_alert = kwargs.get("skip_done_alert", False) try: + # Initialize spinner for course initialization + self._start_spinner("Initializing course download...") + + course_url = config.course_url.format(self.course_slug) + r = requests.get( - config.course_url.format(self.course_slug), + course_url, cookies=self.cookies, headers=self.headers, allow_redirects=True, ) - course_name = r.json()["elements"][0]["title"] - course_name = re.sub(r'[\\/*?:"<>|]', "", course_name) - course_path = f"./{self.course_slug}" - chapters = r.json()["elements"][0]["chapters"] - exercise_files = r.json()["elements"][0]["exerciseFileUrls"] - chapters_index = 1 - if len(chapters) > 0 and chapters[0]["title"] in [ - "Introduction", - "Welcome", - ]: - chapters_index = 0 - chapters_pad_length = 1 - if chapters_index == 0: - if len(chapters) - 1 > 9: - chapters_pad_length = 2 - else: - if len(chapters) > 9: - chapters_pad_length = 2 - delay = self.throttle - - for chapter in chapters: - - self.current_chapter_index = chapters_index - self.fetch_chapter(chapter, chapters_pad_length, delay) - chapters_index += 1 - - if self.exercise and len(exercise_files) > 0: - download_exercises(exercise_files, course_path) - - if kwargs.get("skip_done_alert"): - return - click.echo( - "\nYour download is complete. Begin your learning journey now.! :)" - ) - except requests.exceptions.TooManyRedirects: - click.echo( - click.style(f"TooManyRedirects: Your cookie is expired", fg="red") - ) - except KeyError as e: - click.echo(click.style(f"KeyError: That course is not found {e}", fg="red")) - - except ConnectionResetError: - click.echo( - click.style( - f"ConnectionResetError: There is a connection error. Please check your connectivity.\n", - fg="red", - ) - ) + try: + response_json = r.json() + + if "elements" not in response_json or not response_json["elements"]: + error_msg = "The course data could not be retrieved. This might be due to authentication issues or the course might not be accessible." + self.summary['errors'].append(error_msg) + self._stop_spinner() + click.echo(click.style(error_msg, fg="red")) + return False + + course_data = response_json["elements"][0] + course_name = course_data.get("title", "Unknown Course") + + if "chapters" not in course_data or not course_data["chapters"]: + error_msg = "No chapters found in the course." + self.summary['errors'].append(error_msg) + self._stop_spinner() + click.echo(click.style(error_msg, fg="red")) + return False + + # Store the chapters list in the instance + self.chapters = course_data["chapters"] + self._stop_spinner() + + # Print course info + click.echo(click.style(f"\nCourse: {course_name}", fg="cyan", bold=True)) + click.echo(click.style(f"Chapters: {len(self.chapters)}", fg="cyan")) + click.echo("-" * 50) + + # Calculate padding for chapter numbers + chapters_pad_length = len(str(len(self.chapters))) + + # Process each chapter + for i, chapter in enumerate(self.chapters, 1): + self.current_chapter_index = i + try: + self.fetch_chapter(chapter, chapters_pad_length, self.throttle) + except Exception as e: + error_msg = f"Error processing chapter {i}: {str(e)}" + self.summary['errors'].append(error_msg) + self._stop_spinner() + click.echo(click.style(f"\n{error_msg}", fg="red")) + continue + + # Clean up empty directories + deleted_dirs = cleanup_empty_directories(self.course_download_dir) + self.summary['chapters']['deleted'] = deleted_dirs + + if not skip_done_alert: + self._stop_spinner() + click.echo(click.style("\n✓ Download completed", fg="green", bold=True)) + + return True + + except json.JSONDecodeError as e: + error_msg = "Failed to parse course data. The course might be locked or not accessible." + self.summary['errors'].append(error_msg) + self._stop_spinner() + click.echo(click.style(error_msg, fg="red")) + return False - except requests.exceptions.ConnectionError: - click.echo( - click.style( - f"ConnectionError: There is a connection error. Please check your connectivity.\n", - fg="red", - ) - ) - except json.decoder.JSONDecodeError as e: - click.echo( - click.style( - f"The course is locked, you probably " f"need a premium account", - fg="red", - ) - ) except Exception as e: - if os.path.exists( - f"{self.chapter_path}/{self.current_video_index:0=2d}. {clean_name(self.current_video_name)}.mp4" - ): - os.remove( - f"{self.chapter_path}/{self.current_video_index:0=2d}. {clean_name(self.current_video_name)}.mp4" - ) - self.download_entire_course() + error_msg = f"Unexpected error: {str(e)}" + self.summary['errors'].append(error_msg) + self._stop_spinner() + click.echo(click.style(f"\n✗ {error_msg}", fg="red")) + return False + + finally: + # Ensure spinner is always stopped + self._stop_spinner() + # Print summary only if we're not in a nested call (when processing learning paths) + if not skip_done_alert: + self._print_summary() + + def _print_summary(self): + """Print a formatted summary of the download process in a table format""" + summary = self.summary + + # Don't print summary if there's no data + if not any(summary.values()): + return + + from texttable import Texttable + + # Initialize table with a border + table = Texttable() + table.set_deco(Texttable.BORDER | Texttable.HEADER | Texttable.VLINES) + table.set_cols_align(["l", "r"]) + table.set_cols_valign(["m", "m"]) + + # Add title + table.add_row(["DOWNLOAD SUMMARY", ""]) + table.add_row(["-" * 30, "-" * 10]) + + # Add course summary if available + if summary['courses_processed'] > 0: + table.add_row(["Courses processed:", summary['courses_processed']]) + + # Add video summary + table.add_row(["\nVIDEOS", ""]) + table.add_row([" • Total", summary['videos']['total']]) + table.add_row([" • Downloaded", summary['videos']['downloaded']]) + table.add_row([" • Already existed", summary['videos']['already_exist']]) + table.add_row([" • Skipped", summary['videos']['skipped']]) + table.add_row([" • Failed", summary['videos']['failed']]) + + # Add chapter summary + table.add_row(["\nCHAPTERS", ""]) + table.add_row([" • Total", summary['chapters']['total']]) + table.add_row([" • Empty", summary['chapters']['empty']]) + table.add_row([" • Deleted", summary['chapters']['deleted']]) + + # Add errors if any + if summary['errors']: + table.add_row(["\nERRORS", ""]) + for i, error in enumerate(summary['errors'][:5], 1): + table.add_row([f" {i}. {error}", ""]) + if len(summary['errors']) > 5: + table.add_row([f" ... and {len(summary['errors']) - 5} more errors", ""]) + + # Print the table + click.echo("\n" + "=" * 80) + click.echo(table.draw()) + click.echo("=" * 80) + + self._stop_spinner() + + # Print final status + status = "✅ Download completed successfully!" if not summary['errors'] else "⚠️ Download completed with errors!" + click.echo(click.style(f"\n{status}\n", fg="green" if not summary['errors'] else "yellow")) + + def _start_spinner(self, message): + """Start a simple dot-based progress indicator""" + sys.stdout.write(click.style(message, fg="cyan")) + sys.stdout.flush() + + def _start_modified_spinner(self, message): + """Print dots to show progress without cluttering the output""" + sys.stdout.write(click.style(".", fg="cyan")) + sys.stdout.flush() + + def _stop_spinner(self): + """Ensure output ends with a newline""" + sys.stdout.write("\n") + sys.stdout.flush() diff --git a/llvd/cli.py b/llvd/cli.py index 1039961..27dee88 100644 --- a/llvd/cli.py +++ b/llvd/cli.py @@ -1,21 +1,21 @@ -import sys import click -from llvd import config +from typing import Optional + +from llvd import config, __version__ from llvd.app import App -import click -import sys -from llvd import config from llvd.process_io import parse_cookie_file, parse_header_file -from llvd.utils import clean_dir - - -BOLD = "\033[1m" # Makes the text bold -RED_COLOR = "\u001b[31m" # Makes the text red -PATH = "path" -COURSE = "course" +from llvd.validators import validate_course_and_path, parse_throttle +BOLD = "\033[1m" +RED_COLOR = "\u001b[31m" @click.command() +@click.option( + "--version", + "-v", + is_flag=True, + help="Show version and exit", +) @click.option( "--cookies", is_flag=True, @@ -30,85 +30,106 @@ "--resolution", "-r", default="720", - help="Video resolution can either be 360, 540 or 720. 720 is the default", + type=click.Choice(["360", "540", "720", "1080"], case_sensitive=False), + help="Video resolution (default: 720)", +) +@click.option( + "--caption", + "-ca", + is_flag=True, + help="Download subtitles", +) +@click.option( + "--exercise", + "-e", + is_flag=True, + help="Download exercises", +) +@click.option( + "--course", + "-c", + help="Course slug (e.g., 'java-8-essential')", ) -@click.option("--caption", "-ca", is_flag=True, help="Download subtitles") -@click.option("--exercise", "-e", is_flag=True, help="Download Exercises") -@click.option("--course", "-c", help="Example: 'java-8-essential'") @click.option( "--path", "-p", - help="Specify learning path to download. Example: 'llvd -p become-a-php-developer -t 20'", + help="Learning path slug (e.g., 'become-a-php-developer')", ) @click.option( "--throttle", "-t", - help="A min,max wait in seconds before downloading next video. Example: -t 30,120", + help="Min,max wait in seconds between downloads (e.g., '10,30' or '5')", ) -def main(cookies, headers, course, resolution, caption, exercise, path, throttle): +@click.pass_context +def main( + ctx: click.Context, + version: bool, + cookies: bool, + headers: bool, + resolution: str, + caption: bool, + exercise: bool, + course: Optional[str], + path: Optional[str], + throttle: Optional[str], +) -> None: """ - Linkedin learning video downloader cli tool - example: llvd --course "java-8-essential" + LinkedIn Learning Video Downloader (LLVD) + + Download LinkedIn Learning courses for offline viewing. + + Examples: + + \b + $ llvd --course "java-8-essential" --cookies + $ llvd -p "become-a-php-developer" -t 10,30 --cookies """ - if not len(sys.argv) != 1: - click.echo(f"{RED_COLOR}{BOLD}Missing required arguments: llvd --help") - sys.exit(0) - - if path: - course_slug = (clean_dir(path), PATH) - else: - course_slug = (clean_dir(course), COURSE) - - email = config.email - password = config.password + if version: + click.echo(f"LLVD version: {__version__}") + return try: - if throttle and "," in throttle: - throttle = [int(i) for i in throttle.split(",")] - elif throttle: - throttle = [int(throttle)] - except ValueError: - click.echo(click.style("Throttle must be a number", fg="red")) - sys.exit(0) - - # Check that both course and path are not both set. Can only be one or other. - if course and path: - click.echo( - click.style( - "Please specify either a course OR learning path, not both.", fg="red" - ) + # Validate and process course/path + course_slug, is_path = validate_course_and_path(course, path) + + # Parse throttle values + throttle_values = parse_throttle(throttle) + + # Validate path requires throttle + if is_path and not throttle_values: + raise click.UsageError("Throttle option (-t) is required when using --path") + + # Initialize and run the application + app = App( + email=config.email, + password=config.password, + course_slug=course_slug, + resolution=resolution, + caption=caption, + exercise=exercise, + throttle=throttle_values, ) - sys.exit(0) - if path and not throttle: - click.echo( - click.style( - "Please use throttle option (-t) when downloading learning paths.", - fg="red", - ) - ) - sys.exit(0) - - if cookies: - cookie_dict = parse_cookie_file() - if "li_at" not in cookie_dict or "JSESSIONID" not in cookie_dict: - click.echo(click.style(f"cookies.txt must not be empty", fg="red")) - sys.exit(0) - else: - click.echo(click.style(f"Using cookie info from cookies.txt", fg="green")) + if cookies: + cookie_dict = parse_cookie_file() + if not all(k in cookie_dict for k in ("li_at", "JSESSIONID")): + raise click.UsageError("cookies.txt must contain both 'li_at' and 'JSESSIONID' cookies") + + click.echo(click.style("Using cookie info from cookies.txt", fg="green")) - app = App(email, password, course_slug, resolution, caption, exercise, throttle) - if headers: - header_dict = parse_header_file() - app.run(cookie_dict, header_dict) + if headers: + header_dict = parse_header_file() + app.run(cookie_dict, header_dict) + else: + app.run(cookie_dict) else: - app.run(cookie_dict) + if not config.email: + config.email = click.prompt("Please enter your LinkedIn email address") + if not config.password: + config.password = click.prompt("Enter your LinkedIn Password", hide_input=True) - else: - if email == "": - email = click.prompt("Please enter your Linkedin email address") - if password == "": - password = click.prompt("Enter your Linkedin Password", hide_input=True) + app.run() - app = App(email, password, course_slug, resolution, caption, exercise, throttle) - app.run() \ No newline at end of file + except Exception as e: + click.echo(click.style(f"Error: {str(e)}", fg="red"), err=True) + ctx.exit(1) \ No newline at end of file diff --git a/llvd/utils.py b/llvd/utils.py index d645907..9f99cff 100644 --- a/llvd/utils.py +++ b/llvd/utils.py @@ -1,3 +1,4 @@ +import os import re from random import randint from time import sleep @@ -46,3 +47,28 @@ def throttle(wait_time=None): sleep(delay) # clean up delay message print(f'{cursor_up}{clear_line}{cursor_up}{cursor_home}') + + + +def cleanup_empty_directories(path, errors=None): + """Recursively remove empty directories. Returns the number of removed directories.""" + if not os.path.isdir(path): + return 0 + + removed = 0 + try: + # Recursively process subdirectories + entries = os.listdir(path) + for entry in entries: + full_path = os.path.join(path, entry) + if os.path.isdir(full_path): + removed += cleanup_empty_directories(full_path, errors) + # Check if directory is empty after processing subdirectories + entries = os.listdir(path) + if not entries: + os.rmdir(path) + removed += 1 + except (OSError, PermissionError) as e: + if errors is not None: + errors.append(f"Failed to clean up directory {path}: {str(e)}") + return removed diff --git a/llvd/validators.py b/llvd/validators.py new file mode 100644 index 0000000..de2aca9 --- /dev/null +++ b/llvd/validators.py @@ -0,0 +1,57 @@ +""" +Input validation utilities for LLVD. +""" +from typing import Optional, Tuple, List +from llvd.utils import clean_dir + +def validate_course_and_path(course: Optional[str], path: Optional[str]) -> Tuple[Tuple[str, str], bool]: + """ + Validate and process course and path arguments. + + Args: + course: The course slug (optional) + path: The learning path slug (optional) + + Returns: + Tuple containing (course_slug, is_path) where: + - course_slug is a tuple of (cleaned_name, type) + - is_path is a boolean indicating if it's a learning path + + Raises: + ValueError: If validation fails + """ + if course and path: + raise ValueError("Please specify either a course OR learning path, not both.") + + if path: + return (clean_dir(path), "path"), True + if course: + return (clean_dir(course), "course"), False + + raise ValueError("Either --course or --path must be specified") + +def parse_throttle(throttle_str: Optional[str]) -> Optional[List[int]]: + """ + Parse throttle string into min,max values. + + Args: + throttle_str: String in format "min,max" or "value" + + Returns: + List of [min, max] or [value], or None if input is None/empty + + Raises: + ValueError: If input format is invalid + """ + if not throttle_str: + return None + + try: + if "," in throttle_str: + values = [int(x.strip()) for x in throttle_str.split(",", 1)] + if len(values) != 2: + raise ValueError("Throttle must be in format 'min,max' or 'value'") + return values + return [int(throttle_str)] + except ValueError as e: + raise ValueError("Throttle must be a number or two numbers separated by comma") from e \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7b95937 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/requirements.txt b/requirements.txt index 04795d5..3e6bf38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ -beautifulsoup4==4.11.1 -certifi==2022.12.7 -chardet==5.1.0 -click==8.1.3 +beautifulsoup4==4.12.2 +certifi==2023.7.22 +chardet==5.2.0 +click==8.1.7 idna==3.4 -requests==2.28.1 -soupsieve==2.3.2 -tqdm==4.64.1 -urllib3==1.26.13 -click_spinner==0.1.10 \ No newline at end of file +requests==2.31.0 +soupsieve==2.5 +tqdm==4.66.1 +urllib3==2.0.7 +click-spinner==0.1.10 +texttable==1.7.0 diff --git a/setup.py b/setup.py index ce13234..eca016e 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,19 @@ from setuptools import setup, find_packages from os import path -import llvd +import re current_dir = path.abspath(path.dirname(__file__)) with open(path.join(current_dir, "README.md"), "r", encoding="utf-8") as f: readme = f.read() +def get_version(): + with open(path.join(current_dir, "llvd", "__init__.py"), "r") as f: + content = f.read() + return re.search(r'__version__\s*=\s*[\'"]([^\'"]+)[\'"]', content).group(1) setup( name="llvd", - version=llvd.__version__, + version=get_version(), url="https://github.com/knowbee/llvd.git", author="Igwaneza Bruce", author_email="knowbeeinc@gmail.com", @@ -21,16 +25,17 @@ packages=find_packages(), include_package_data=True, install_requires=[ - "beautifulsoup4==4.11.1", - "certifi==2022.12.7", - "chardet==5.1.0", - "click==8.1.3", - "idna==3.4", - "requests==2.28.1", - "soupsieve==2.3.2", - "tqdm==4.64.1", - "urllib3==1.26.13", - "click_spinner==0.1.10" + "beautifulsoup4>=4.12.2", # Latest stable, maintains BS4 compatibility + "certifi>=2023.7.22", # Security updates + "chardet>=5.2.0", # Security and bug fixes + "click>=8.1.7", # Latest stable with bug fixes + "idna>=3.4", # Keep current as it's a core dependency + "requests>=2.31.0", # Latest stable with security updates + "soupsieve>=2.5", # Compatible with latest beautifulsoup4 + "tqdm>=4.66.1", # Latest stable with bug fixes + "urllib3>=2.0.7", # Latest stable with security updates + "click-spinner>=0.1.10", # Keep current as it's a stable release + "texttable>=1.7.0" # Keep current as it's a stable release ], entry_points={"console_scripts": ["llvd = llvd.cli:main"]}, classifiers=[