Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/packages/codeflow-utils-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Shared utility functions for CodeFlow Python applications"
readme = "README.md"
license = "MIT"
requires-python = ">=3.12"
keywords = ["codeflow", "utils", "utilities", "resilience", "logging"]
keywords = ["codeflow", "utils", "utilities", "resilience", "logging", "validation", "formatting", "retry", "rate-limiting"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
Modules:
logging: Structured logging utilities
resilience: Circuit breaker and retry patterns
validation: Input, URL, email, and config validation
formatting: Date, number, and string formatting
common: Retry, rate limiting, and error handling
"""

from codeflow_utils.logging import (
Expand All @@ -22,6 +25,39 @@
CircuitBreakerState,
CircuitBreakerStats,
)
from codeflow_utils.validation import (
validate_config,
validate_environment_variables,
sanitize_input,
validate_input,
validate_url,
is_valid_url,
validate_email,
is_valid_email,
extract_email_domain,
normalize_email,
)
from codeflow_utils.formatting import (
format_datetime,
format_iso_datetime,
format_relative_time,
format_number,
format_bytes,
format_percentage,
truncate_string,
slugify,
camel_to_snake,
snake_to_camel,
)
from codeflow_utils.common import (
retry,
CodeFlowUtilsError,
format_error_message,
create_error_response,
RateLimiter,
rate_limit,
PerKeyRateLimiter,
)

__version__ = "0.1.0"

Expand All @@ -37,4 +73,36 @@
"CircuitBreakerOpenError",
"CircuitBreakerState",
"CircuitBreakerStats",
# Validation
"validate_config",
"validate_environment_variables",
"sanitize_input",
"validate_input",
"validate_url",
"is_valid_url",
"validate_email",
"is_valid_email",
"extract_email_domain",
"normalize_email",
# Formatting - Date
"format_datetime",
"format_iso_datetime",
"format_relative_time",
# Formatting - Number
"format_number",
"format_bytes",
"format_percentage",
# Formatting - String
"truncate_string",
"slugify",
"camel_to_snake",
"snake_to_camel",
# Common
"retry",
"CodeFlowUtilsError",
"format_error_message",
"create_error_response",
"RateLimiter",
"rate_limit",
"PerKeyRateLimiter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Common utility functions for CodeFlow."""

from .retry import retry
from .errors import (
CodeFlowUtilsError,
format_error_message,
create_error_response,
)
from .rate_limit import RateLimiter, rate_limit, PerKeyRateLimiter

__all__ = [
"retry",
"CodeFlowUtilsError",
"format_error_message",
"create_error_response",
"RateLimiter",
"rate_limit",
"PerKeyRateLimiter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Error handling utilities."""

from typing import Any


class CodeFlowUtilsError(Exception):
"""Base exception for codeflow-utils-python."""
pass


def format_error_message(
operation: str,
error: Exception,
context: dict[str, Any] | None = None,
) -> str:
"""
Format error message with context.

Args:
operation: Name of the operation that failed
error: The exception that was raised
context: Optional context dictionary

Returns:
Formatted error message
"""
message = f"{operation} failed: {str(error)}"

if context:
context_str = ", ".join(f"{k}={v}" for k, v in context.items())
message += f" (context: {context_str})"

return message


def create_error_response(
error: Exception,
operation: str | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Create standardized error response dictionary.

Args:
error: The exception that was raised
operation: Optional operation name
context: Optional context dictionary

Returns:
Error response dictionary
"""
response = {
"error": type(error).__name__,
"message": str(error),
}

if operation:
response["operation"] = operation

if context:
response["context"] = context

return response
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Rate limiting utilities."""

import time
from collections import defaultdict
from threading import Lock
from typing import Callable, Any


class RateLimiter:
"""Simple rate limiter using token bucket algorithm."""

def __init__(self, max_calls: int, period: float):
"""
Initialize rate limiter.

Args:
max_calls: Maximum number of calls allowed
period: Time period in seconds
"""
self.max_calls = max_calls
self.period = period
self.calls: list[float] = []
self.lock = Lock()

def acquire(self) -> bool:
"""
Try to acquire a rate limit token.

Returns:
True if token acquired, False if rate limit exceeded
"""
with self.lock:
now = time.time()

# Remove old calls outside the period
self.calls = [call_time for call_time in self.calls if now - call_time < self.period]

# Check if we can make another call
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True

return False

def wait_time(self) -> float:
"""
Get the wait time until next call can be made.

Returns:
Wait time in seconds, or 0 if no wait needed
"""
with self.lock:
if len(self.calls) < self.max_calls:
return 0.0

oldest_call = min(self.calls)
wait = self.period - (time.time() - oldest_call)
return max(0.0, wait)


def rate_limit(max_calls: int, period: float):
"""
Decorator to rate limit function calls.

Args:
max_calls: Maximum number of calls allowed
period: Time period in seconds

Returns:
Decorated function with rate limiting
"""
limiter = RateLimiter(max_calls, period)

def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
if not limiter.acquire():
wait = limiter.wait_time()
if wait > 0:
time.sleep(wait)
limiter.acquire()

return func(*args, **kwargs)

return wrapper

return decorator


class PerKeyRateLimiter:
"""Rate limiter that tracks limits per key."""

def __init__(self, max_calls: int, period: float):
"""
Initialize per-key rate limiter.

Args:
max_calls: Maximum number of calls allowed per key
period: Time period in seconds
"""
self.max_calls = max_calls
self.period = period
self.limiters: dict[str, RateLimiter] = defaultdict(
lambda: RateLimiter(max_calls, period)
)
self.lock = Lock()

def acquire(self, key: str) -> bool:
"""
Try to acquire a rate limit token for a key.

Args:
key: Rate limit key

Returns:
True if token acquired, False if rate limit exceeded
"""
with self.lock:
return self.limiters[key].acquire()

def wait_time(self, key: str) -> float:
"""
Get the wait time for a key.

Args:
key: Rate limit key

Returns:
Wait time in seconds
"""
with self.lock:
return self.limiters[key].wait_time()
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Retry logic utilities."""

import asyncio
import time
from functools import wraps
from typing import Any, Callable, TypeVar

T = TypeVar("T")


def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple[type[Exception], ...] = (Exception,),
):
"""
Retry decorator for functions.

Args:
max_attempts: Maximum number of retry attempts
delay: Initial delay between retries (seconds)
backoff: Backoff multiplier
exceptions: Tuple of exceptions to catch and retry on

Example:
@retry(max_attempts=3, delay=1.0)
def api_call():
# API call that might fail
pass
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> T:
current_delay = delay
last_exception = None

for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(current_delay)
current_delay *= backoff
else:
raise

@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> T:
current_delay = delay
last_exception = None

for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
raise

if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper

return decorator
Loading