diff --git a/tools/packages/codeflow-utils-python/pyproject.toml b/tools/packages/codeflow-utils-python/pyproject.toml index 9c1afca..36dca74 100644 --- a/tools/packages/codeflow-utils-python/pyproject.toml +++ b/tools/packages/codeflow-utils-python/pyproject.toml @@ -9,7 +9,7 @@ description = "Shared utility functions for CodeFlow Python applications" readme = "README.md" license = "MIT" requires-python = ">=3.12" -keywords = ["codeflow", "utils", "utilities", "resilience", "logging"] +keywords = ["codeflow", "utils", "utilities", "resilience", "logging", "validation", "formatting", "retry", "rate-limiting"] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py index fa3cac3..51e705d 100644 --- a/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py @@ -7,6 +7,9 @@ Modules: logging: Structured logging utilities resilience: Circuit breaker and retry patterns + validation: Input, URL, email, and config validation + formatting: Date, number, and string formatting + common: Retry, rate limiting, and error handling """ from codeflow_utils.logging import ( @@ -22,6 +25,39 @@ CircuitBreakerState, CircuitBreakerStats, ) +from codeflow_utils.validation import ( + validate_config, + validate_environment_variables, + sanitize_input, + validate_input, + validate_url, + is_valid_url, + validate_email, + is_valid_email, + extract_email_domain, + normalize_email, +) +from codeflow_utils.formatting import ( + format_datetime, + format_iso_datetime, + format_relative_time, + format_number, + format_bytes, + format_percentage, + truncate_string, + slugify, + camel_to_snake, + snake_to_camel, +) +from codeflow_utils.common import ( + retry, + CodeFlowUtilsError, + format_error_message, + create_error_response, + RateLimiter, + rate_limit, + PerKeyRateLimiter, +) __version__ = "0.1.0" @@ -37,4 +73,36 @@ "CircuitBreakerOpenError", "CircuitBreakerState", "CircuitBreakerStats", + # Validation + "validate_config", + "validate_environment_variables", + "sanitize_input", + "validate_input", + "validate_url", + "is_valid_url", + "validate_email", + "is_valid_email", + "extract_email_domain", + "normalize_email", + # Formatting - Date + "format_datetime", + "format_iso_datetime", + "format_relative_time", + # Formatting - Number + "format_number", + "format_bytes", + "format_percentage", + # Formatting - String + "truncate_string", + "slugify", + "camel_to_snake", + "snake_to_camel", + # Common + "retry", + "CodeFlowUtilsError", + "format_error_message", + "create_error_response", + "RateLimiter", + "rate_limit", + "PerKeyRateLimiter", ] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/common/__init__.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/__init__.py new file mode 100644 index 0000000..bce278e --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/__init__.py @@ -0,0 +1,19 @@ +"""Common utility functions for CodeFlow.""" + +from .retry import retry +from .errors import ( + CodeFlowUtilsError, + format_error_message, + create_error_response, +) +from .rate_limit import RateLimiter, rate_limit, PerKeyRateLimiter + +__all__ = [ + "retry", + "CodeFlowUtilsError", + "format_error_message", + "create_error_response", + "RateLimiter", + "rate_limit", + "PerKeyRateLimiter", +] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/common/errors.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/errors.py new file mode 100644 index 0000000..b98be09 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/errors.py @@ -0,0 +1,63 @@ +"""Error handling utilities.""" + +from typing import Any + + +class CodeFlowUtilsError(Exception): + """Base exception for codeflow-utils-python.""" + pass + + +def format_error_message( + operation: str, + error: Exception, + context: dict[str, Any] | None = None, +) -> str: + """ + Format error message with context. + + Args: + operation: Name of the operation that failed + error: The exception that was raised + context: Optional context dictionary + + Returns: + Formatted error message + """ + message = f"{operation} failed: {str(error)}" + + if context: + context_str = ", ".join(f"{k}={v}" for k, v in context.items()) + message += f" (context: {context_str})" + + return message + + +def create_error_response( + error: Exception, + operation: str | None = None, + context: dict[str, Any] | None = None, +) -> dict[str, Any]: + """ + Create standardized error response dictionary. + + Args: + error: The exception that was raised + operation: Optional operation name + context: Optional context dictionary + + Returns: + Error response dictionary + """ + response = { + "error": type(error).__name__, + "message": str(error), + } + + if operation: + response["operation"] = operation + + if context: + response["context"] = context + + return response diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/common/rate_limit.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/rate_limit.py new file mode 100644 index 0000000..b991522 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/rate_limit.py @@ -0,0 +1,131 @@ +"""Rate limiting utilities.""" + +import time +from collections import defaultdict +from threading import Lock +from typing import Callable, Any + + +class RateLimiter: + """Simple rate limiter using token bucket algorithm.""" + + def __init__(self, max_calls: int, period: float): + """ + Initialize rate limiter. + + Args: + max_calls: Maximum number of calls allowed + period: Time period in seconds + """ + self.max_calls = max_calls + self.period = period + self.calls: list[float] = [] + self.lock = Lock() + + def acquire(self) -> bool: + """ + Try to acquire a rate limit token. + + Returns: + True if token acquired, False if rate limit exceeded + """ + with self.lock: + now = time.time() + + # Remove old calls outside the period + self.calls = [call_time for call_time in self.calls if now - call_time < self.period] + + # Check if we can make another call + if len(self.calls) < self.max_calls: + self.calls.append(now) + return True + + return False + + def wait_time(self) -> float: + """ + Get the wait time until next call can be made. + + Returns: + Wait time in seconds, or 0 if no wait needed + """ + with self.lock: + if len(self.calls) < self.max_calls: + return 0.0 + + oldest_call = min(self.calls) + wait = self.period - (time.time() - oldest_call) + return max(0.0, wait) + + +def rate_limit(max_calls: int, period: float): + """ + Decorator to rate limit function calls. + + Args: + max_calls: Maximum number of calls allowed + period: Time period in seconds + + Returns: + Decorated function with rate limiting + """ + limiter = RateLimiter(max_calls, period) + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + def wrapper(*args: Any, **kwargs: Any) -> Any: + if not limiter.acquire(): + wait = limiter.wait_time() + if wait > 0: + time.sleep(wait) + limiter.acquire() + + return func(*args, **kwargs) + + return wrapper + + return decorator + + +class PerKeyRateLimiter: + """Rate limiter that tracks limits per key.""" + + def __init__(self, max_calls: int, period: float): + """ + Initialize per-key rate limiter. + + Args: + max_calls: Maximum number of calls allowed per key + period: Time period in seconds + """ + self.max_calls = max_calls + self.period = period + self.limiters: dict[str, RateLimiter] = defaultdict( + lambda: RateLimiter(max_calls, period) + ) + self.lock = Lock() + + def acquire(self, key: str) -> bool: + """ + Try to acquire a rate limit token for a key. + + Args: + key: Rate limit key + + Returns: + True if token acquired, False if rate limit exceeded + """ + with self.lock: + return self.limiters[key].acquire() + + def wait_time(self, key: str) -> float: + """ + Get the wait time for a key. + + Args: + key: Rate limit key + + Returns: + Wait time in seconds + """ + with self.lock: + return self.limiters[key].wait_time() diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/common/retry.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/retry.py new file mode 100644 index 0000000..eb9c1e6 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/common/retry.py @@ -0,0 +1,69 @@ +"""Retry logic utilities.""" + +import asyncio +import time +from functools import wraps +from typing import Any, Callable, TypeVar + +T = TypeVar("T") + + +def retry( + max_attempts: int = 3, + delay: float = 1.0, + backoff: float = 2.0, + exceptions: tuple[type[Exception], ...] = (Exception,), +): + """ + Retry decorator for functions. + + Args: + max_attempts: Maximum number of retry attempts + delay: Initial delay between retries (seconds) + backoff: Backoff multiplier + exceptions: Tuple of exceptions to catch and retry on + + Example: + @retry(max_attempts=3, delay=1.0) + def api_call(): + # API call that might fail + pass + """ + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any) -> T: + current_delay = delay + last_exception = None + + for attempt in range(max_attempts): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < max_attempts - 1: + time.sleep(current_delay) + current_delay *= backoff + else: + raise + + @wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> T: + current_delay = delay + last_exception = None + + for attempt in range(max_attempts): + try: + return await func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < max_attempts - 1: + await asyncio.sleep(current_delay) + current_delay *= backoff + else: + raise + + if asyncio.iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + + return decorator diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/__init__.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/__init__.py new file mode 100644 index 0000000..e3bf5df --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/__init__.py @@ -0,0 +1,21 @@ +"""Formatting utilities for CodeFlow.""" + +from .date import format_datetime, format_iso_datetime, format_relative_time +from .number import format_number, format_bytes, format_percentage +from .string import truncate_string, slugify, camel_to_snake, snake_to_camel + +__all__ = [ + # Date + "format_datetime", + "format_iso_datetime", + "format_relative_time", + # Number + "format_number", + "format_bytes", + "format_percentage", + # String + "truncate_string", + "slugify", + "camel_to_snake", + "snake_to_camel", +] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/date.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/date.py new file mode 100644 index 0000000..47e23c1 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/date.py @@ -0,0 +1,86 @@ +"""Date and time formatting utilities.""" + +from datetime import datetime +from typing import Optional + + +def format_datetime(dt: datetime, format_str: str = "%Y-%m-%d %H:%M:%S") -> str: + """ + Format datetime to string. + + Args: + dt: Datetime object + format_str: Format string (default: ISO-like format) + + Returns: + Formatted datetime string + """ + return dt.strftime(format_str) + + +def format_iso_datetime(dt: datetime) -> str: + """ + Format datetime to ISO 8601 string. + + Args: + dt: Datetime object + + Returns: + ISO 8601 formatted string + """ + return dt.isoformat() + + +def format_relative_time(dt: datetime, now: Optional[datetime] = None) -> str: + """ + Format datetime as relative time (e.g., "2 hours ago"). + + Args: + dt: Datetime object + now: Current datetime (defaults to now) + + Returns: + Relative time string + """ + if now is None: + now = datetime.now() + + delta = now - dt + + if delta.total_seconds() < 0: + # Future time + delta = dt - now + if delta.days > 365: + years = delta.days // 365 + return f"in {years} year{'s' if years > 1 else ''}" + elif delta.days > 30: + months = delta.days // 30 + return f"in {months} month{'s' if months > 1 else ''}" + elif delta.days > 0: + return f"in {delta.days} day{'s' if delta.days > 1 else ''}" + elif delta.seconds > 3600: + hours = delta.seconds // 3600 + return f"in {hours} hour{'s' if hours > 1 else ''}" + elif delta.seconds > 60: + minutes = delta.seconds // 60 + return f"in {minutes} minute{'s' if minutes > 1 else ''}" + else: + return "in a moment" + + # Past time + if delta.days > 365: + years = delta.days // 365 + return f"{years} year{'s' if years > 1 else ''} ago" + elif delta.days > 30: + months = delta.days // 30 + return f"{months} month{'s' if months > 1 else ''} ago" + elif delta.days > 0: + return f"{delta.days} day{'s' if delta.days > 1 else ''} ago" + elif delta.seconds > 3600: + hours = delta.seconds // 3600 + return f"{hours} hour{'s' if hours > 1 else ''} ago" + elif delta.seconds > 60: + minutes = delta.seconds // 60 + return f"{minutes} minute{'s' if minutes > 1 else ''} ago" + else: + return "just now" diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/number.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/number.py new file mode 100644 index 0000000..20d75f9 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/number.py @@ -0,0 +1,85 @@ +"""Number formatting utilities.""" + + +def format_number( + value: float | int, + decimals: int = 2, + thousands_separator: str = ",", + decimal_separator: str = ".", +) -> str: + """ + Format number with thousands separator and decimal places. + + Args: + value: Number to format + decimals: Number of decimal places + thousands_separator: Thousands separator character + decimal_separator: Decimal separator character + + Returns: + Formatted number string + """ + # Format with decimal places + formatted = f"{value:,.{decimals}f}" + + # Replace separators if needed + if thousands_separator != ",": + formatted = formatted.replace(",", "TEMP_THOUSANDS") + formatted = formatted.replace(".", decimal_separator) + formatted = formatted.replace("TEMP_THOUSANDS", thousands_separator) + elif decimal_separator != ".": + # Split by decimal point + parts = formatted.split(".") + if len(parts) == 2: + formatted = thousands_separator.join(parts[0].split(",")) + decimal_separator + parts[1] + else: + formatted = thousands_separator.join(parts[0].split(",")) + + return formatted + + +def format_bytes(bytes_value: int, binary: bool = False) -> str: + """ + Format bytes to human-readable string. + + Args: + bytes_value: Number of bytes + binary: Use binary (1024) or decimal (1000) units + + Returns: + Formatted string (e.g., "1.5 MB") + """ + base = 1024 if binary else 1000 + units = ["B", "KB", "MB", "GB", "TB", "PB"] if not binary else ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] + + if bytes_value == 0: + return "0 B" + + unit_index = 0 + value = float(bytes_value) + + while value >= base and unit_index < len(units) - 1: + value /= base + unit_index += 1 + + return f"{value:.2f} {units[unit_index]}" + + +def format_percentage(value: float, decimals: int = 1) -> str: + """ + Format number as percentage. + + Args: + value: Number to format (0.0 to 1.0 or 0 to 100) + decimals: Number of decimal places + + Returns: + Formatted percentage string + """ + # Assume value is 0-1 if less than 1, otherwise 0-100 + if value <= 1.0: + percentage = value * 100 + else: + percentage = value + + return f"{percentage:.{decimals}f}%" diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/string.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/string.py new file mode 100644 index 0000000..b0594a1 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/formatting/string.py @@ -0,0 +1,98 @@ +"""String formatting utilities.""" + +import re + + +def truncate_string( + value: str, + max_length: int, + suffix: str = "...", + preserve_words: bool = True, +) -> str: + """ + Truncate string to maximum length. + + Args: + value: String to truncate + max_length: Maximum length (including suffix) + suffix: Suffix to add if truncated + preserve_words: Whether to preserve word boundaries + + Returns: + Truncated string + """ + if len(value) <= max_length: + return value + + if preserve_words: + # Try to truncate at word boundary + truncated = value[:max_length - len(suffix)] + last_space = truncated.rfind(" ") + if last_space > max_length * 0.5: # Only use word boundary if reasonable + truncated = truncated[:last_space] + return truncated + suffix + + return value[:max_length - len(suffix)] + suffix + + +def slugify(value: str, separator: str = "-") -> str: + """ + Convert string to URL-friendly slug. + + Args: + value: String to slugify + separator: Word separator character + + Returns: + Slugified string + """ + # Convert to lowercase + slug = value.lower() + + # Replace spaces and underscores with separator + slug = re.sub(r"[\s_]+", separator, slug) + + # Remove all non-word characters except separator + slug = re.sub(r"[^\w\-]+", "", slug) + + # Remove multiple separators + slug = re.sub(rf"{re.escape(separator)}+", separator, slug) + + # Remove leading/trailing separators + slug = slug.strip(separator) + + return slug + + +def camel_to_snake(value: str) -> str: + """ + Convert camelCase to snake_case. + + Args: + value: CamelCase string + + Returns: + snake_case string + """ + # Insert underscore before uppercase letters + snake = re.sub(r"(? str: + """ + Convert snake_case to camelCase. + + Args: + value: snake_case string + capitalize_first: Whether to capitalize first letter (PascalCase) + + Returns: + camelCase or PascalCase string + """ + components = value.split("_") + + if capitalize_first: + return "".join(word.capitalize() for word in components) + else: + return components[0] + "".join(word.capitalize() for word in components[1:]) diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/__init__.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/__init__.py new file mode 100644 index 0000000..1d4d79b --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/__init__.py @@ -0,0 +1,23 @@ +"""Validation utilities for CodeFlow.""" + +from .config import validate_config, validate_environment_variables +from .input import sanitize_input, validate_input +from .url import validate_url, is_valid_url +from .email import validate_email, is_valid_email, extract_email_domain, normalize_email + +__all__ = [ + # Config + "validate_config", + "validate_environment_variables", + # Input + "sanitize_input", + "validate_input", + # URL + "validate_url", + "is_valid_url", + # Email + "validate_email", + "is_valid_email", + "extract_email_domain", + "normalize_email", +] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/config.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/config.py new file mode 100644 index 0000000..4ff262a --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/config.py @@ -0,0 +1,45 @@ +"""Configuration validation utilities.""" + +import os +from typing import Any + + +def validate_config(config: dict[str, Any], required_keys: list[str]) -> tuple[bool, list[str]]: + """ + Validate configuration dictionary. + + Args: + config: Configuration dictionary to validate + required_keys: List of required keys + + Returns: + Tuple of (is_valid, missing_keys) + """ + if not isinstance(config, dict): + return False, ["Configuration must be a dictionary"] + + missing_keys = [key for key in required_keys if key not in config or config[key] is None] + return len(missing_keys) == 0, missing_keys + + +def validate_environment_variables(required_vars: list[str]) -> dict[str, Any]: + """ + Check for required environment variables. + + Args: + required_vars: List of required environment variable names + + Returns: + Dictionary with validation results: + - valid: bool - Whether all variables are present + - missing: list[str] - List of missing variable names + - found: list[str] - List of found variable names + """ + missing_vars = [var for var in required_vars if not os.getenv(var)] + found_vars = [var for var in required_vars if var not in missing_vars] + + return { + "valid": len(missing_vars) == 0, + "missing": missing_vars, + "found": found_vars, + } diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/email.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/email.py new file mode 100644 index 0000000..85ba030 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/email.py @@ -0,0 +1,111 @@ +"""Email validation utilities.""" + +import re +from typing import Optional + + +# RFC 5322 compliant email regex (simplified) +EMAIL_PATTERN = re.compile( + r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' +) + + +def validate_email(email: str) -> bool: + """ + Validate email address format. + + Args: + email: Email address to validate + + Returns: + True if email is valid, False otherwise + """ + if not email or not isinstance(email, str): + return False + + email = email.strip() + + if not email: + return False + + # Basic format check + if not EMAIL_PATTERN.match(email): + return False + + # Additional checks + if email.count('@') != 1: + return False + + local, domain = email.split('@', 1) + + # Local part checks + if len(local) > 64: + return False + + if local.startswith('.') or local.endswith('.'): + return False + + if '..' in local: + return False + + # Domain part checks + if len(domain) > 255: + return False + + if domain.startswith('.') or domain.endswith('.'): + return False + + if '..' in domain: + return False + + return True + + +def is_valid_email(email: str) -> bool: + """ + Alias for validate_email for consistency. + + Args: + email: Email address to validate + + Returns: + True if email is valid, False otherwise + """ + return validate_email(email) + + +def extract_email_domain(email: str) -> Optional[str]: + """ + Extract domain from email address. + + Args: + email: Email address + + Returns: + Domain name or None if invalid + """ + if not validate_email(email): + return None + + return email.split('@', 1)[1] + + +def normalize_email(email: str) -> Optional[str]: + """ + Normalize email address (lowercase, trim). + + Args: + email: Email address to normalize + + Returns: + Normalized email or None if invalid + """ + if not email or not isinstance(email, str): + return None + + email = email.strip().lower() + + if not validate_email(email): + return None + + return email diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/input.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/input.py new file mode 100644 index 0000000..3965f3d --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/input.py @@ -0,0 +1,53 @@ +"""Input validation and sanitization utilities.""" + +from typing import Any + + +def sanitize_input(value: str, max_length: int | None = None) -> str: + """ + Sanitize input string. + + Args: + value: Input string to sanitize + max_length: Maximum length (None for no limit) + + Returns: + Sanitized string + """ + if not isinstance(value, str): + value = str(value) + + # Remove leading/trailing whitespace + sanitized = value.strip() + + # Remove null bytes + sanitized = sanitized.replace("\x00", "") + + # Truncate if needed + if max_length and len(sanitized) > max_length: + sanitized = sanitized[:max_length] + + return sanitized + + +def validate_input(value: Any, input_type: type, required: bool = True) -> tuple[bool, str | None]: + """ + Validate input value. + + Args: + value: Value to validate + input_type: Expected type + required: Whether value is required + + Returns: + Tuple of (is_valid, error_message) + """ + if value is None: + if required: + return False, "Value is required" + return True, None + + if not isinstance(value, input_type): + return False, f"Expected {input_type.__name__}, got {type(value).__name__}" + + return True, None diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/url.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/url.py new file mode 100644 index 0000000..b0e535d --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/validation/url.py @@ -0,0 +1,49 @@ +"""URL validation utilities.""" + +from urllib.parse import urlparse + + +def validate_url(url: str, schemes: list[str] | None = None) -> tuple[bool, str | None]: + """ + Validate URL format. + + Args: + url: URL string to validate + schemes: Allowed URL schemes (None for any) + + Returns: + Tuple of (is_valid, error_message) + """ + if not url or not isinstance(url, str): + return False, "URL must be a non-empty string" + + try: + parsed = urlparse(url) + + if not parsed.scheme: + return False, "URL must include a scheme (e.g., https://)" + + if not parsed.netloc: + return False, "URL must include a domain" + + if schemes and parsed.scheme not in schemes: + return False, f"URL scheme must be one of: {', '.join(schemes)}" + + return True, None + except Exception as e: + return False, f"Invalid URL format: {str(e)}" + + +def is_valid_url(url: str, schemes: list[str] | None = None) -> bool: + """ + Check if URL is valid. + + Args: + url: URL string to check + schemes: Allowed URL schemes (None for any) + + Returns: + True if URL is valid, False otherwise + """ + is_valid, _ = validate_url(url, schemes) + return is_valid