From 591a9f5ec9b894309386b21f54152454a774ff4a Mon Sep 17 00:00:00 2001 From: "tembo[bot]" <208362400+tembo[bot]@users.noreply.github.com> Date: Tue, 23 Dec 2025 20:33:15 +0000 Subject: [PATCH] refactor: distribute utility packages across repositories Co-authored-by: Jurie --- codeflow_engine/utils/__init__.py | 22 +- .../codeflow-utils-python/pyproject.toml | 57 ++++ .../src/codeflow_utils/__init__.py | 40 +++ .../src/codeflow_utils/logging.py | 216 +++++++++++++ .../src/codeflow_utils/py.typed | 0 .../src/codeflow_utils/resilience.py | 288 ++++++++++++++++++ 6 files changed, 621 insertions(+), 2 deletions(-) create mode 100644 tools/packages/codeflow-utils-python/pyproject.toml create mode 100644 tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py create mode 100644 tools/packages/codeflow-utils-python/src/codeflow_utils/logging.py create mode 100644 tools/packages/codeflow-utils-python/src/codeflow_utils/py.typed create mode 100644 tools/packages/codeflow-utils-python/src/codeflow_utils/resilience.py diff --git a/codeflow_engine/utils/__init__.py b/codeflow_engine/utils/__init__.py index c5a4946..3a73bca 100644 --- a/codeflow_engine/utils/__init__.py +++ b/codeflow_engine/utils/__init__.py @@ -1,5 +1,23 @@ -"""Utility modules for CodeFlow Engine.""" +""" +Utility modules for CodeFlow Engine. + +Note: Core portable utilities (logging, resilience) are also available +in the standalone codeflow-utils package at tools/packages/codeflow-utils-python. +This module re-exports engine-specific versions that integrate with CodeFlowSettings. +""" from codeflow_engine.utils.logging import get_logger, log_with_context, setup_logging +from codeflow_engine.utils.resilience import ( + CircuitBreaker, + CircuitBreakerOpenError, + CircuitBreakerState, +) -__all__ = ["get_logger", "log_with_context", "setup_logging"] +__all__ = [ + "get_logger", + "log_with_context", + "setup_logging", + "CircuitBreaker", + "CircuitBreakerOpenError", + "CircuitBreakerState", +] diff --git a/tools/packages/codeflow-utils-python/pyproject.toml b/tools/packages/codeflow-utils-python/pyproject.toml new file mode 100644 index 0000000..9c1afca --- /dev/null +++ b/tools/packages/codeflow-utils-python/pyproject.toml @@ -0,0 +1,57 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "codeflow-utils" +version = "0.1.0" +description = "Shared utility functions for CodeFlow Python applications" +readme = "README.md" +license = "MIT" +requires-python = ">=3.12" +keywords = ["codeflow", "utils", "utilities", "resilience", "logging"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Typing :: Typed", +] + +dependencies = [] + +[project.optional-dependencies] +logging = ["structlog>=24.0.0"] +resilience = [] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.24.0", + "mypy>=1.11.0", + "ruff>=0.6.0", +] + +[project.urls] +Documentation = "https://github.com/JustAGhosT/codeflow-engine/tree/main/tools/packages/codeflow-utils-python" +Source = "https://github.com/JustAGhosT/codeflow-engine" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/codeflow_utils"] + +[tool.mypy] +python_version = "3.12" +strict = true + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "B", "UP"] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py new file mode 100644 index 0000000..fa3cac3 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/__init__.py @@ -0,0 +1,40 @@ +""" +codeflow-utils - Shared utility functions for CodeFlow Python applications. + +This package provides portable utilities that can be used across CodeFlow projects +without tight coupling to any specific application. + +Modules: + logging: Structured logging utilities + resilience: Circuit breaker and retry patterns +""" + +from codeflow_utils.logging import ( + JsonFormatter, + TextFormatter, + get_logger, + log_with_context, +) +from codeflow_utils.resilience import ( + CircuitBreaker, + CircuitBreakerConfig, + CircuitBreakerOpenError, + CircuitBreakerState, + CircuitBreakerStats, +) + +__version__ = "0.1.0" + +__all__ = [ + # Logging + "JsonFormatter", + "TextFormatter", + "get_logger", + "log_with_context", + # Resilience + "CircuitBreaker", + "CircuitBreakerConfig", + "CircuitBreakerOpenError", + "CircuitBreakerState", + "CircuitBreakerStats", +] diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/logging.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/logging.py new file mode 100644 index 0000000..d4d0601 --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/logging.py @@ -0,0 +1,216 @@ +""" +Structured logging utilities for CodeFlow applications. + +Provides JSON-formatted structured logging with correlation IDs and +contextual information. This is a portable version that doesn't depend +on codeflow_engine internals. +""" + +import json +import logging +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +class JsonFormatter(logging.Formatter): + """JSON formatter for structured logging.""" + + def __init__(self, service_name: str = "codeflow"): + """ + Initialize the JSON formatter. + + Args: + service_name: Name of the service for log identification + """ + super().__init__() + self.service_name = service_name + + def format(self, record: logging.LogRecord) -> str: + """ + Format log record as JSON. + + Args: + record: Log record to format + + Returns: + JSON-formatted log string + """ + log_data: dict[str, Any] = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "service": self.service_name, + "component": record.name, + "message": record.getMessage(), + "module": record.module, + "function": record.funcName, + "line": record.lineno, + } + + # Add extra fields from record + if hasattr(record, "request_id"): + log_data["request_id"] = record.request_id + if hasattr(record, "user_id"): + log_data["user_id"] = record.user_id + if hasattr(record, "duration_ms"): + log_data["duration_ms"] = record.duration_ms + if hasattr(record, "correlation_id"): + log_data["correlation_id"] = record.correlation_id + + # Add exception info if present + if record.exc_info: + log_data["exception"] = self.formatException(record.exc_info) + log_data["exception_type"] = ( + record.exc_info[0].__name__ if record.exc_info[0] else None + ) + + # Add any additional extra fields + excluded_keys = { + "name", + "msg", + "args", + "created", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "pathname", + "process", + "processName", + "relativeCreated", + "thread", + "threadName", + "exc_info", + "exc_text", + "stack_info", + } + for key, value in record.__dict__.items(): + if key not in excluded_keys and not key.startswith("_"): + log_data[key] = value + + return json.dumps(log_data, default=str) + + +class TextFormatter(logging.Formatter): + """Human-readable text formatter for development.""" + + def format(self, record: logging.LogRecord) -> str: + """ + Format log record as human-readable text. + + Args: + record: Log record to format + + Returns: + Formatted log string + """ + timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S UTC" + ) + level = record.levelname.ljust(8) + component = record.name + message = record.getMessage() + + # Build context string + context_parts = [] + if hasattr(record, "request_id"): + context_parts.append(f"request_id={record.request_id}") + if hasattr(record, "user_id"): + context_parts.append(f"user_id={record.user_id}") + if hasattr(record, "duration_ms"): + context_parts.append(f"duration={record.duration_ms}ms") + + context = f" [{', '.join(context_parts)}]" if context_parts else "" + + log_line = f"{timestamp} | {level} | {component} | {message}{context}" + + # Add exception if present + if record.exc_info: + log_line += f"\n{self.formatException(record.exc_info)}" + + return log_line + + +def setup_logging( + level: str = "INFO", + format_type: str = "json", + output: str = "stdout", + log_file: str | Path | None = None, + service_name: str = "codeflow", +) -> None: + """ + Configure structured logging. + + Args: + level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format_type: Format type ('json' or 'text') + output: Output destination ('stdout', 'file', or 'both') + log_file: Path to log file (required if output is 'file' or 'both') + service_name: Name of the service for log identification + """ + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, level.upper(), logging.INFO)) + root_logger.handlers.clear() + + if format_type == "json": + formatter = JsonFormatter(service_name=service_name) + else: + formatter = TextFormatter() + + if output in ("stdout", "both"): + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(formatter) + stdout_handler.setLevel(getattr(logging, level.upper(), logging.INFO)) + root_logger.addHandler(stdout_handler) + + if output in ("file", "both"): + if log_file is None: + raise ValueError("log_file is required when output is 'file' or 'both'") + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + file_handler.setLevel(getattr(logging, level.upper(), logging.INFO)) + root_logger.addHandler(file_handler) + + # Reduce noise from third-party libraries + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) + + +def get_logger(name: str) -> logging.Logger: + """ + Get a logger instance for a module. + + Args: + name: Logger name (typically __name__) + + Returns: + Logger instance + """ + return logging.getLogger(name) + + +def log_with_context( + logger: logging.Logger, + level: int, + message: str, + **context: Any, +) -> None: + """ + Log a message with additional context. + + Args: + logger: Logger instance + level: Log level (logging.INFO, etc.) + message: Log message + **context: Additional context fields + """ + logger.log(level, message, extra=context) diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/py.typed b/tools/packages/codeflow-utils-python/src/codeflow_utils/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/tools/packages/codeflow-utils-python/src/codeflow_utils/resilience.py b/tools/packages/codeflow-utils-python/src/codeflow_utils/resilience.py new file mode 100644 index 0000000..a0f99ee --- /dev/null +++ b/tools/packages/codeflow-utils-python/src/codeflow_utils/resilience.py @@ -0,0 +1,288 @@ +""" +Circuit Breaker Pattern Implementation + +Implements the circuit breaker pattern to prevent repeated calls to failing services. +This is a portable version that doesn't depend on codeflow_engine internals. +""" + +import asyncio +import logging +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable + + +logger = logging.getLogger(__name__) + + +class CircuitBreakerState(Enum): + """Circuit breaker states.""" + + CLOSED = "closed" # Normal operation, requests allowed + OPEN = "open" # Failures detected, requests blocked + HALF_OPEN = "half_open" # Testing if service recovered + + +class CircuitBreakerOpenError(Exception): + """Raised when circuit breaker is open and prevents operation.""" + + pass + + +@dataclass +class CircuitBreakerConfig: + """Configuration for circuit breaker.""" + + failure_threshold: int = 5 # Number of failures before opening + success_threshold: int = 2 # Number of successes to close from half-open + timeout: float = 60.0 # Seconds to wait before moving to half-open + half_open_timeout: float = 30.0 # Seconds to wait in half-open before re-opening + + +@dataclass +class CircuitBreakerStats: + """Statistics for circuit breaker.""" + + state: CircuitBreakerState = CircuitBreakerState.CLOSED + failure_count: int = 0 + success_count: int = 0 + last_failure_time: float | None = None + last_success_time: float | None = None + total_calls: int = 0 + total_failures: int = 0 + total_successes: int = 0 + state_transitions: dict[str, int] = field( + default_factory=lambda: { + "closed_to_open": 0, + "open_to_half_open": 0, + "half_open_to_closed": 0, + "half_open_to_open": 0, + } + ) + + +class CircuitBreaker: + """ + Circuit breaker implementation for protecting services from repeated failures. + + The circuit breaker has three states: + - CLOSED: Normal operation, requests are allowed through + - OPEN: Too many failures, requests are blocked + - HALF_OPEN: Testing if the service has recovered + + Example: + >>> cb = CircuitBreaker(name="my_service", failure_threshold=3, timeout=60) + >>> async with cb: + >>> result = await my_service_call() + """ + + def __init__( + self, + name: str, + failure_threshold: int = 5, + success_threshold: int = 2, + timeout: float = 60.0, + half_open_timeout: float = 30.0, + ): + """ + Initialize circuit breaker. + + Args: + name: Name of the circuit breaker (for logging) + failure_threshold: Number of consecutive failures before opening + success_threshold: Number of consecutive successes to close from half-open + timeout: Seconds to wait before moving from open to half-open + half_open_timeout: Seconds to wait in half-open before re-opening + """ + self.name = name + self.config = CircuitBreakerConfig( + failure_threshold=failure_threshold, + success_threshold=success_threshold, + timeout=timeout, + half_open_timeout=half_open_timeout, + ) + self.stats = CircuitBreakerStats() + self._lock = asyncio.Lock() + + logger.info( + "Circuit breaker '%s' initialized with failure_threshold=%d, timeout=%.1fs", + name, + failure_threshold, + timeout, + ) + + async def __aenter__(self) -> "CircuitBreaker": + """Context manager entry - check if request should be allowed.""" + await self._check_and_update_state() + + if self.stats.state == CircuitBreakerState.OPEN: + raise CircuitBreakerOpenError( + f"Circuit breaker '{self.name}' is OPEN, blocking request" + ) + + self.stats.total_calls += 1 + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: Any, + ) -> bool: + """Context manager exit - record success or failure.""" + if exc_type is None: + await self._record_success() + else: + await self._record_failure() + + return False # Don't suppress exceptions + + async def call(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """ + Execute a function with circuit breaker protection. + + Args: + func: Async function to call + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + + Returns: + Result of the function call + + Raises: + CircuitBreakerOpenError: If circuit breaker is open + Exception: Any exception raised by the function + """ + async with self: + return await func(*args, **kwargs) + + async def _check_and_update_state(self) -> None: + """Check if state should be updated based on time elapsed.""" + async with self._lock: + current_time = time.time() + + if self.stats.state == CircuitBreakerState.OPEN: + if self.stats.last_failure_time is not None: + elapsed = current_time - self.stats.last_failure_time + if elapsed >= self.config.timeout: + self._transition_to_half_open() + + elif self.stats.state == CircuitBreakerState.HALF_OPEN: + if self.stats.last_failure_time is not None: + elapsed = current_time - self.stats.last_failure_time + if elapsed >= self.config.half_open_timeout: + self._transition_to_open() + + async def _record_success(self) -> None: + """Record a successful operation.""" + async with self._lock: + self.stats.success_count += 1 + self.stats.total_successes += 1 + self.stats.last_success_time = time.time() + + if self.stats.state == CircuitBreakerState.HALF_OPEN: + if self.stats.success_count >= self.config.success_threshold: + self._transition_to_closed() + elif self.stats.state == CircuitBreakerState.CLOSED: + self.stats.failure_count = 0 + + async def _record_failure(self) -> None: + """Record a failed operation.""" + async with self._lock: + self.stats.failure_count += 1 + self.stats.total_failures += 1 + self.stats.last_failure_time = time.time() + + if self.stats.state == CircuitBreakerState.CLOSED: + if self.stats.failure_count >= self.config.failure_threshold: + self._transition_to_open() + elif self.stats.state == CircuitBreakerState.HALF_OPEN: + self._transition_to_open() + + def _transition_to_open(self) -> None: + """Transition to OPEN state.""" + old_state = self.stats.state + self.stats.state = CircuitBreakerState.OPEN + self.stats.success_count = 0 + + if old_state == CircuitBreakerState.CLOSED: + self.stats.state_transitions["closed_to_open"] += 1 + elif old_state == CircuitBreakerState.HALF_OPEN: + self.stats.state_transitions["half_open_to_open"] += 1 + + logger.warning( + "Circuit breaker '%s' transitioned from %s to OPEN after %d failures", + self.name, + old_state.value, + self.stats.failure_count, + ) + + def _transition_to_half_open(self) -> None: + """Transition to HALF_OPEN state.""" + self.stats.state = CircuitBreakerState.HALF_OPEN + self.stats.failure_count = 0 + self.stats.success_count = 0 + self.stats.state_transitions["open_to_half_open"] += 1 + + logger.info( + "Circuit breaker '%s' transitioned to HALF_OPEN, testing service recovery", + self.name, + ) + + def _transition_to_closed(self) -> None: + """Transition to CLOSED state.""" + self.stats.state = CircuitBreakerState.CLOSED + self.stats.failure_count = 0 + self.stats.success_count = 0 + self.stats.state_transitions["half_open_to_closed"] += 1 + + logger.info( + "Circuit breaker '%s' transitioned to CLOSED, service recovered", + self.name, + ) + + def get_state(self) -> CircuitBreakerState: + """Get current state of the circuit breaker.""" + return self.stats.state + + def get_stats(self) -> dict[str, Any]: + """ + Get statistics for the circuit breaker. + + Returns: + Dictionary containing circuit breaker statistics + """ + return { + "name": self.name, + "state": self.stats.state.value, + "failure_count": self.stats.failure_count, + "success_count": self.stats.success_count, + "total_calls": self.stats.total_calls, + "total_failures": self.stats.total_failures, + "total_successes": self.stats.total_successes, + "failure_rate": ( + self.stats.total_failures / self.stats.total_calls + if self.stats.total_calls > 0 + else 0.0 + ), + "last_failure_time": self.stats.last_failure_time, + "last_success_time": self.stats.last_success_time, + "state_transitions": self.stats.state_transitions.copy(), + "config": { + "failure_threshold": self.config.failure_threshold, + "success_threshold": self.config.success_threshold, + "timeout": self.config.timeout, + "half_open_timeout": self.config.half_open_timeout, + }, + } + + async def reset(self) -> None: + """Reset the circuit breaker to CLOSED state with cleared stats.""" + async with self._lock: + self.stats = CircuitBreakerStats() + logger.info("Circuit breaker '%s' reset to CLOSED state", self.name) + + def is_available(self) -> bool: + """Check if the circuit breaker allows requests.""" + return self.stats.state != CircuitBreakerState.OPEN