diff --git a/README.md b/README.md new file mode 100644 index 0000000..9d9d7df --- /dev/null +++ b/README.md @@ -0,0 +1,150 @@ +# OmniBlocks Proactive AI Bot + +A groundbreaking proactive AI bot with VM testing and vision capabilities for the OmniBlocks organization. + +## Overview + +This bot represents a new paradigm in AI development tools - instead of being reactive (waiting for mentions/triggers), it operates proactively by: + +- **Always monitoring** repository activity autonomously +- **Testing code** in real, isolated VM environments +- **Providing visual feedback** with screenshots and UI interaction +- **Managing resources** intelligently with smart wake/sleep cycles + +## Key Features + +### 🤖 Proactive Operation +- Continuous repository monitoring and engagement +- Intelligent sleep/wake scheduling to optimize token usage +- Smart pattern recognition for proactive intervention +- Event-driven architecture with real-time processing + +### 🔬 VM Testing Environment +- Isolated Docker containers for safe code execution +- Automated testing workflows with comprehensive reporting +- Security sandboxing to prevent malicious code execution +- Resource cleanup and management + +### 👁️ Vision & Multimodal Capabilities +- Screenshot capture and visual analysis +- UI element detection and interaction +- Visual debugging and change detection +- Integration with vision-enabled LLM models + +### 🔧 Advanced Integration +- GitHub Actions and Octokit API integration +- Multi-provider LLM support (OpenAI, Anthropic, etc.) +- Webhook processing for real-time events +- Comprehensive logging and monitoring + +## Architecture + +``` +omniblocks-ai-bot/ +├── src/ +│ ├── core/ # Core bot engine and orchestration +│ ├── monitoring/ # Repository monitoring and event processing +│ ├── testing/ # VM testing environment and execution +│ ├── vision/ # Computer vision and UI interaction +│ ├── github/ # GitHub API integration and webhooks +│ ├── llm/ # LLM integration and intelligence +│ └── utils/ # Shared utilities and helpers +├── config/ # Configuration files and templates +├── docker/ # Docker configurations for VM testing +├── workflows/ # GitHub Actions workflows +├── tests/ # Comprehensive test suite +└── docs/ # Documentation and guides +``` + +## Quick Start + +1. **Clone and Setup** + ```bash + git clone + cd omniblocks-ai-bot + pip install -r requirements.txt + ``` + +2. **Configure Environment** + ```bash + cp config/env.example .env + # Edit .env with your API keys and settings + ``` + +3. **Run the Bot** + ```bash + python -m src.main + ``` + +## Configuration + +The bot uses environment variables for configuration: + +- `GITHUB_TOKEN` - GitHub API token with appropriate permissions +- `OPENAI_API_KEY` - OpenAI API key for LLM integration +- `ANTHROPIC_API_KEY` - Anthropic API key (optional) +- `BOT_MODE` - Operation mode: `proactive`, `reactive`, or `hybrid` +- `SLEEP_SCHEDULE` - Sleep/wake schedule configuration +- `VM_RESOURCE_LIMITS` - Resource limits for VM testing + +## Development + +### Prerequisites +- Python 3.9+ +- Docker and Docker Compose +- Git + +### Setup Development Environment +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +pip install -r requirements-dev.txt +pre-commit install +``` + +### Running Tests +```bash +pytest tests/ +``` + +### Code Quality +```bash +black src/ tests/ +flake8 src/ tests/ +mypy src/ +``` + +## Security + +This bot handles sensitive operations including: +- Code execution in isolated environments +- API key management +- Repository access and modifications + +Security measures include: +- Sandboxed VM execution with resource limits +- Encrypted API key storage +- Audit logging of all operations +- Rate limiting and abuse prevention + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes with tests +4. Submit a pull request + +## License + +This project is licensed under the GNU General Public License v3.0 - see the [LICENSE](LICENSE) file for details. + +## Support + +For questions, issues, or contributions: +- Open an issue on GitHub +- Contact @supervoidcoder +- Reference: OmniBlocks/scratch-gui#249 + +--- + +**Note**: This is a groundbreaking implementation of proactive AI bot technology. Most existing bots are reactive, but this system actively monitors and engages with repositories autonomously while maintaining intelligent resource management. \ No newline at end of file diff --git a/config/env.example b/config/env.example new file mode 100644 index 0000000..9df66f6 --- /dev/null +++ b/config/env.example @@ -0,0 +1,63 @@ +# OmniBlocks Proactive AI Bot Configuration +# Copy this file to .env and fill in your values + +# GitHub Configuration +GITHUB_TOKEN=your_github_token_here +GITHUB_WEBHOOK_SECRET=your_webhook_secret_here +GITHUB_REPOSITORY=owner/repo-name +GITHUB_ORGANIZATION=your-org-name +GITHUB_API_BASE_URL=https://api.github.com + +# LLM Configuration +OPENAI_API_KEY=your_openai_api_key_here +ANTHROPIC_API_KEY=your_anthropic_api_key_here +LLM_DEFAULT_PROVIDER=openai +LLM_MAX_TOKENS=4000 +LLM_TEMPERATURE=0.7 +LLM_VISION_MODEL=gpt-4-vision-preview + +# VM Testing Configuration +VM_DOCKER_IMAGE=python:3.9-slim +VM_MEMORY_LIMIT=512m +VM_CPU_LIMIT=0.5 +VM_TIMEOUT_SECONDS=300 +VM_NETWORK_MODE=none +VM_ENABLE_GPU=false + +# Monitoring Configuration +MONITORING_POLL_INTERVAL=60 +MONITORING_MAX_EVENTS=10 +MONITORING_EVENT_TYPES=push,pull_request,issues,issue_comment,pull_request_review +MONITORING_IGNORE_BOTS=true + +# Scheduling Configuration +SCHEDULING_ENABLE_SLEEP=true +SCHEDULING_SLEEP_DURATION=30 +SCHEDULING_WAKE_TRIGGERS=high_priority_event,mention,scheduled_time +SCHEDULING_QUIET_START=22:00 +SCHEDULING_QUIET_END=06:00 +SCHEDULING_TIMEZONE=UTC + +# Vision Configuration +VISION_SCREENSHOT_QUALITY=85 +VISION_MAX_SCREENSHOT_SIZE=1920,1080 +VISION_UI_TIMEOUT=30 +VISION_DIFF_THRESHOLD=0.1 +VISION_ENABLE_OCR=true + +# Security Configuration +SECURITY_ENABLE_SCANNING=true +SECURITY_ALLOWED_EXTENSIONS=.py,.js,.ts,.json,.yaml,.yml,.md,.txt +SECURITY_BLOCKED_COMMANDS=rm -rf,sudo,curl,wget,nc,netcat +SECURITY_MAX_FILE_SIZE_MB=10 + +# Logging Configuration +LOG_LEVEL=INFO +LOG_DIR=logs +LOG_JSON=false + +# State Management +STATE_DIR=state + +# Bot Mode +BOT_MODE=proactive \ No newline at end of file diff --git a/docker/Dockerfile.testing b/docker/Dockerfile.testing new file mode 100644 index 0000000..199b91d --- /dev/null +++ b/docker/Dockerfile.testing @@ -0,0 +1,30 @@ +# Dockerfile for secure testing environment +FROM python:3.9-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + git \ + curl \ + wget \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +# Create non-root user for security +RUN useradd -m -u 1000 testuser + +# Set up working directory +WORKDIR /workspace + +# Copy requirements and install Python dependencies +COPY requirements-testing.txt . +RUN pip install --no-cache-dir -r requirements-testing.txt + +# Switch to non-root user +USER testuser + +# Set environment variables +ENV PYTHONPATH=/workspace +ENV PYTHONUNBUFFERED=1 + +# Default command +CMD ["python", "-c", "print('Testing environment ready')"] \ No newline at end of file diff --git a/docker/requirements-testing.txt b/docker/requirements-testing.txt new file mode 100644 index 0000000..772389b --- /dev/null +++ b/docker/requirements-testing.txt @@ -0,0 +1,19 @@ +# Testing environment requirements +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.1.0 +pytest-mock>=3.11.0 + +# Common testing libraries +requests>=2.31.0 +numpy>=1.24.0 +pandas>=2.0.0 + +# Code analysis +flake8>=6.0.0 +black>=23.7.0 +mypy>=1.5.0 + +# Security scanning +bandit>=1.7.5 +safety>=2.3.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b4d3a1b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,45 @@ +# Core dependencies +asyncio +aiohttp>=3.8.0 +aiofiles>=0.8.0 + +# GitHub integration +PyGithub>=1.58.0 + +# LLM integrations +openai>=1.0.0 +anthropic>=0.7.0 + +# Computer vision and image processing +opencv-python>=4.8.0 +Pillow>=10.0.0 +pytesseract>=0.3.10 + +# Docker integration +docker>=6.0.0 + +# Data processing +pandas>=2.0.0 +numpy>=1.24.0 + +# Configuration and utilities +python-dotenv>=1.0.0 +pydantic>=2.0.0 +pytz>=2023.3 + +# Logging and monitoring +structlog>=23.1.0 + +# Testing +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-mock>=3.11.0 + +# Code quality +black>=23.7.0 +flake8>=6.0.0 +mypy>=1.5.0 +pre-commit>=3.3.0 + +# Security +cryptography>=41.0.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..483e007 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,9 @@ +""" +OmniBlocks Proactive AI Bot + +A groundbreaking proactive AI bot with VM testing and vision capabilities. +""" + +__version__ = "1.0.0" +__author__ = "OmniBlocks Organization" +__description__ = "Proactive AI Bot with VM Testing and Vision Capabilities" \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..135b047 --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,6 @@ +""" +Core module for the OmniBlocks Proactive AI Bot. + +This module contains the core bot engine, configuration management, +and orchestration components. +""" \ No newline at end of file diff --git a/src/core/bot.py b/src/core/bot.py new file mode 100644 index 0000000..1496739 --- /dev/null +++ b/src/core/bot.py @@ -0,0 +1,500 @@ +""" +Core bot engine for the OmniBlocks Proactive AI Bot. + +This module contains the main bot orchestration logic, managing all +subsystems and coordinating proactive operations. +""" + +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any +from dataclasses import dataclass +import json + +from .config import Config +from .scheduler import SleepWakeScheduler +from .events import EventProcessor, Event +from ..monitoring.repository_monitor import RepositoryMonitor +from ..testing.vm_manager import VMManager +from ..vision.vision_engine import VisionEngine +from ..github.github_client import GitHubClient +from ..llm.llm_manager import LLMManager +from ..utils.state_manager import StateManager + +logger = logging.getLogger(__name__) + + +@dataclass +class BotStatus: + """Current status of the bot.""" + is_running: bool = False + is_sleeping: bool = False + last_activity: Optional[datetime] = None + events_processed: int = 0 + uptime_seconds: int = 0 + current_mode: str = "proactive" + + +class ProactiveBot: + """ + Main proactive AI bot class. + + This class orchestrates all bot capabilities including: + - Proactive repository monitoring + - VM testing environment + - Vision and multimodal capabilities + - GitHub integration + - LLM intelligence + """ + + def __init__(self, config: Config): + """Initialize the bot with configuration.""" + self.config = config + self.status = BotStatus() + self.start_time = datetime.utcnow() + + # Initialize core components + self.scheduler = SleepWakeScheduler(config.scheduling) + self.event_processor = EventProcessor() + self.state_manager = StateManager() + + # Initialize subsystems + self.github_client = GitHubClient(config.github) + self.repository_monitor = RepositoryMonitor(config.monitoring, self.github_client) + self.vm_manager = VMManager(config.vm) + self.vision_engine = VisionEngine(config.vision) + self.llm_manager = LLMManager(config.llm) + + # Event handlers + self._setup_event_handlers() + + logger.info("ProactiveBot initialized successfully") + + async def start(self): + """Start the bot and begin proactive operations.""" + logger.info("Starting ProactiveBot...") + + try: + # Initialize all subsystems + await self._initialize_subsystems() + + # Start the main event loop + self.status.is_running = True + self.status.current_mode = "proactive" + + # Start background tasks + tasks = [ + asyncio.create_task(self._main_loop()), + asyncio.create_task(self._monitoring_loop()), + asyncio.create_task(self._scheduler_loop()), + asyncio.create_task(self._health_check_loop()) + ] + + logger.info("Bot started successfully - entering proactive mode") + + # Wait for all tasks to complete + await asyncio.gather(*tasks) + + except Exception as e: + logger.error(f"Error starting bot: {e}", exc_info=True) + await self.shutdown() + raise + + async def shutdown(self): + """Gracefully shutdown the bot.""" + logger.info("Shutting down ProactiveBot...") + + self.status.is_running = False + + try: + # Shutdown all subsystems + await self._shutdown_subsystems() + + # Save state + await self.state_manager.save_state({ + "shutdown_time": datetime.utcnow().isoformat(), + "events_processed": self.status.events_processed, + "uptime_seconds": self.status.uptime_seconds + }) + + logger.info("Bot shutdown completed successfully") + + except Exception as e: + logger.error(f"Error during shutdown: {e}", exc_info=True) + + async def _initialize_subsystems(self): + """Initialize all bot subsystems.""" + logger.info("Initializing subsystems...") + + # Initialize GitHub client + await self.github_client.initialize() + + # Initialize repository monitor + await self.repository_monitor.initialize() + + # Initialize VM manager + await self.vm_manager.initialize() + + # Initialize vision engine + await self.vision_engine.initialize() + + # Initialize LLM manager + await self.llm_manager.initialize() + + # Load previous state + previous_state = await self.state_manager.load_state() + if previous_state: + logger.info(f"Loaded previous state: {previous_state}") + + logger.info("All subsystems initialized successfully") + + async def _shutdown_subsystems(self): + """Shutdown all bot subsystems.""" + logger.info("Shutting down subsystems...") + + # Shutdown in reverse order + await self.llm_manager.shutdown() + await self.vision_engine.shutdown() + await self.vm_manager.shutdown() + await self.repository_monitor.shutdown() + await self.github_client.shutdown() + + logger.info("All subsystems shut down successfully") + + def _setup_event_handlers(self): + """Setup event handlers for different event types.""" + + # Repository events + self.event_processor.register_handler("push", self._handle_push_event) + self.event_processor.register_handler("pull_request", self._handle_pull_request_event) + self.event_processor.register_handler("issues", self._handle_issue_event) + self.event_processor.register_handler("issue_comment", self._handle_comment_event) + + # System events + self.event_processor.register_handler("wake_up", self._handle_wake_up_event) + self.event_processor.register_handler("sleep", self._handle_sleep_event) + self.event_processor.register_handler("health_check", self._handle_health_check_event) + + logger.info("Event handlers registered successfully") + + async def _main_loop(self): + """Main bot event processing loop.""" + logger.info("Starting main event processing loop") + + while self.status.is_running: + try: + # Update uptime + self.status.uptime_seconds = int((datetime.utcnow() - self.start_time).total_seconds()) + + # Check if we should be sleeping + if self.status.is_sleeping: + await asyncio.sleep(10) # Sleep for 10 seconds when in sleep mode + continue + + # Process pending events + await self._process_pending_events() + + # Proactive analysis and engagement + await self._proactive_analysis() + + # Brief pause before next cycle + await asyncio.sleep(5) + + except Exception as e: + logger.error(f"Error in main loop: {e}", exc_info=True) + await asyncio.sleep(30) # Wait before retrying + + async def _monitoring_loop(self): + """Repository monitoring loop.""" + logger.info("Starting repository monitoring loop") + + while self.status.is_running: + try: + if not self.status.is_sleeping: + # Check for new repository events + events = await self.repository_monitor.check_for_events() + + # Queue events for processing + for event in events: + await self.event_processor.queue_event(event) + + # Wait for next monitoring cycle + await asyncio.sleep(self.config.monitoring.poll_interval_seconds) + + except Exception as e: + logger.error(f"Error in monitoring loop: {e}", exc_info=True) + await asyncio.sleep(60) # Wait before retrying + + async def _scheduler_loop(self): + """Sleep/wake scheduler loop.""" + logger.info("Starting scheduler loop") + + while self.status.is_running: + try: + # Check if we should sleep or wake up + should_sleep = await self.scheduler.should_sleep() + should_wake = await self.scheduler.should_wake_up() + + if should_sleep and not self.status.is_sleeping: + await self._enter_sleep_mode() + elif should_wake and self.status.is_sleeping: + await self._exit_sleep_mode() + + # Check every minute + await asyncio.sleep(60) + + except Exception as e: + logger.error(f"Error in scheduler loop: {e}", exc_info=True) + await asyncio.sleep(60) + + async def _health_check_loop(self): + """Health monitoring loop.""" + logger.info("Starting health check loop") + + while self.status.is_running: + try: + # Perform health checks on all subsystems + health_status = await self._perform_health_checks() + + # Log health status + if not all(health_status.values()): + logger.warning(f"Health check issues detected: {health_status}") + + # Wait 5 minutes between health checks + await asyncio.sleep(300) + + except Exception as e: + logger.error(f"Error in health check loop: {e}", exc_info=True) + await asyncio.sleep(300) + + async def _process_pending_events(self): + """Process all pending events in the queue.""" + events = await self.event_processor.get_pending_events() + + for event in events: + try: + await self.event_processor.process_event(event) + self.status.events_processed += 1 + self.status.last_activity = datetime.utcnow() + + except Exception as e: + logger.error(f"Error processing event {event.id}: {e}", exc_info=True) + + async def _proactive_analysis(self): + """Perform proactive analysis and engagement.""" + try: + # Analyze repository state for proactive opportunities + analysis_result = await self._analyze_repository_state() + + if analysis_result.get("should_engage"): + await self._proactive_engagement(analysis_result) + + except Exception as e: + logger.error(f"Error in proactive analysis: {e}", exc_info=True) + + async def _analyze_repository_state(self) -> Dict[str, Any]: + """Analyze current repository state for proactive opportunities.""" + # This is a placeholder for sophisticated analysis logic + # In a real implementation, this would use ML/AI to identify patterns + + return { + "should_engage": False, + "confidence": 0.0, + "reason": "No proactive opportunities identified" + } + + async def _proactive_engagement(self, analysis_result: Dict[str, Any]): + """Engage proactively based on analysis results.""" + logger.info(f"Proactive engagement triggered: {analysis_result}") + + # This would implement the actual proactive engagement logic + # For now, it's a placeholder + pass + + async def _enter_sleep_mode(self): + """Enter sleep mode to conserve resources.""" + logger.info("Entering sleep mode...") + self.status.is_sleeping = True + + # Create sleep event + sleep_event = Event( + id=f"sleep_{datetime.utcnow().isoformat()}", + type="sleep", + data={"reason": "scheduled_sleep", "timestamp": datetime.utcnow().isoformat()} + ) + await self.event_processor.queue_event(sleep_event) + + async def _exit_sleep_mode(self): + """Exit sleep mode and resume normal operations.""" + logger.info("Exiting sleep mode...") + self.status.is_sleeping = False + + # Create wake up event + wake_event = Event( + id=f"wake_{datetime.utcnow().isoformat()}", + type="wake_up", + data={"reason": "scheduled_wake", "timestamp": datetime.utcnow().isoformat()} + ) + await self.event_processor.queue_event(wake_event) + + async def _perform_health_checks(self) -> Dict[str, bool]: + """Perform health checks on all subsystems.""" + health_status = {} + + try: + health_status["github_client"] = await self.github_client.health_check() + health_status["repository_monitor"] = await self.repository_monitor.health_check() + health_status["vm_manager"] = await self.vm_manager.health_check() + health_status["vision_engine"] = await self.vision_engine.health_check() + health_status["llm_manager"] = await self.llm_manager.health_check() + + except Exception as e: + logger.error(f"Error performing health checks: {e}", exc_info=True) + health_status["error"] = False + + return health_status + + # Event handlers + + async def _handle_push_event(self, event: Event): + """Handle push events.""" + logger.info(f"Handling push event: {event.id}") + + # Extract push information + push_data = event.data + + # Analyze the push for testing opportunities + if await self._should_test_push(push_data): + # Run tests in VM + test_result = await self.vm_manager.run_tests(push_data) + + # Analyze results with LLM + analysis = await self.llm_manager.analyze_test_results(test_result) + + # Post results if significant + if analysis.get("should_comment"): + await self.github_client.post_comment( + push_data.get("repository"), + push_data.get("commit_sha"), + analysis.get("comment") + ) + + async def _handle_pull_request_event(self, event: Event): + """Handle pull request events.""" + logger.info(f"Handling pull request event: {event.id}") + + pr_data = event.data + + # Analyze PR for automated testing + if await self._should_test_pr(pr_data): + # Run comprehensive tests + test_result = await self.vm_manager.run_pr_tests(pr_data) + + # Capture screenshots if UI changes detected + if test_result.get("has_ui_changes"): + screenshots = await self.vision_engine.capture_ui_screenshots(test_result) + test_result["screenshots"] = screenshots + + # Generate AI analysis + analysis = await self.llm_manager.analyze_pr(pr_data, test_result) + + # Post review comment + await self.github_client.post_pr_review( + pr_data.get("repository"), + pr_data.get("number"), + analysis + ) + + async def _handle_issue_event(self, event: Event): + """Handle issue events.""" + logger.info(f"Handling issue event: {event.id}") + + issue_data = event.data + + # Analyze issue for proactive assistance + analysis = await self.llm_manager.analyze_issue(issue_data) + + if analysis.get("can_help"): + # Generate helpful response + response = await self.llm_manager.generate_issue_response(issue_data, analysis) + + # Post comment + await self.github_client.post_issue_comment( + issue_data.get("repository"), + issue_data.get("number"), + response + ) + + async def _handle_comment_event(self, event: Event): + """Handle comment events.""" + logger.info(f"Handling comment event: {event.id}") + + comment_data = event.data + + # Check if bot is mentioned or should respond + if await self._should_respond_to_comment(comment_data): + # Generate response + response = await self.llm_manager.generate_comment_response(comment_data) + + # Post reply + await self.github_client.post_comment_reply( + comment_data.get("repository"), + comment_data.get("comment_id"), + response + ) + + async def _handle_wake_up_event(self, event: Event): + """Handle wake up events.""" + logger.info(f"Bot waking up: {event.data}") + self.status.is_sleeping = False + + async def _handle_sleep_event(self, event: Event): + """Handle sleep events.""" + logger.info(f"Bot going to sleep: {event.data}") + self.status.is_sleeping = True + + async def _handle_health_check_event(self, event: Event): + """Handle health check events.""" + health_status = await self._perform_health_checks() + logger.info(f"Health check completed: {health_status}") + + # Helper methods + + async def _should_test_push(self, push_data: Dict[str, Any]) -> bool: + """Determine if a push should trigger testing.""" + # Implement logic to decide if testing is needed + return True # Placeholder + + async def _should_test_pr(self, pr_data: Dict[str, Any]) -> bool: + """Determine if a PR should trigger testing.""" + # Implement logic to decide if PR testing is needed + return True # Placeholder + + async def _should_respond_to_comment(self, comment_data: Dict[str, Any]) -> bool: + """Determine if bot should respond to a comment.""" + # Check for mentions, keywords, or other triggers + comment_text = comment_data.get("body", "").lower() + + # Check for bot mention + if "@omniblocks-bot" in comment_text: + return True + + # Check for help keywords + help_keywords = ["help", "assist", "question", "issue", "problem"] + if any(keyword in comment_text for keyword in help_keywords): + return True + + return False + + def get_status(self) -> Dict[str, Any]: + """Get current bot status.""" + return { + "is_running": self.status.is_running, + "is_sleeping": self.status.is_sleeping, + "last_activity": self.status.last_activity.isoformat() if self.status.last_activity else None, + "events_processed": self.status.events_processed, + "uptime_seconds": self.status.uptime_seconds, + "current_mode": self.status.current_mode, + "start_time": self.start_time.isoformat() + } \ No newline at end of file diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..9b279d1 --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,310 @@ +""" +Configuration management for the OmniBlocks Proactive AI Bot. + +This module handles loading and validating configuration from environment +variables, config files, and default values. +""" + +import os +from dataclasses import dataclass, field +from typing import Dict, List, Optional +from pathlib import Path +import json +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class GitHubConfig: + """GitHub integration configuration.""" + token: str + webhook_secret: Optional[str] = None + repository: Optional[str] = None + organization: Optional[str] = None + api_base_url: str = "https://api.github.com" + + +@dataclass +class LLMConfig: + """LLM provider configuration.""" + openai_api_key: Optional[str] = None + anthropic_api_key: Optional[str] = None + default_provider: str = "openai" + max_tokens: int = 4000 + temperature: float = 0.7 + vision_model: str = "gpt-4-vision-preview" + + +@dataclass +class VMConfig: + """VM testing environment configuration.""" + docker_image: str = "python:3.9-slim" + memory_limit: str = "512m" + cpu_limit: str = "0.5" + timeout_seconds: int = 300 + network_mode: str = "none" + enable_gpu: bool = False + + +@dataclass +class MonitoringConfig: + """Repository monitoring configuration.""" + poll_interval_seconds: int = 60 + max_events_per_cycle: int = 10 + event_types: List[str] = field(default_factory=lambda: [ + "push", "pull_request", "issues", "issue_comment", "pull_request_review" + ]) + ignore_bots: bool = True + + +@dataclass +class SchedulingConfig: + """Sleep/wake scheduling configuration.""" + enable_sleep_mode: bool = True + sleep_duration_minutes: int = 30 + wake_triggers: List[str] = field(default_factory=lambda: [ + "high_priority_event", "mention", "scheduled_time" + ]) + quiet_hours_start: str = "22:00" + quiet_hours_end: str = "06:00" + timezone: str = "UTC" + + +@dataclass +class VisionConfig: + """Vision and UI interaction configuration.""" + screenshot_quality: int = 85 + max_screenshot_size: tuple = (1920, 1080) + ui_interaction_timeout: int = 30 + visual_diff_threshold: float = 0.1 + enable_ocr: bool = True + + +@dataclass +class SecurityConfig: + """Security and safety configuration.""" + enable_code_scanning: bool = True + allowed_file_extensions: List[str] = field(default_factory=lambda: [ + ".py", ".js", ".ts", ".json", ".yaml", ".yml", ".md", ".txt" + ]) + blocked_commands: List[str] = field(default_factory=lambda: [ + "rm -rf", "sudo", "curl", "wget", "nc", "netcat" + ]) + max_file_size_mb: int = 10 + + +class Config: + """Main configuration class for the bot.""" + + def __init__(self, config_file: Optional[Path] = None): + """Initialize configuration from environment and config file.""" + self.github = self._load_github_config() + self.llm = self._load_llm_config() + self.vm = self._load_vm_config() + self.monitoring = self._load_monitoring_config() + self.scheduling = self._load_scheduling_config() + self.vision = self._load_vision_config() + self.security = self._load_security_config() + + # Load from config file if provided + if config_file and config_file.exists(): + self._load_from_file(config_file) + + # Validate configuration + self._validate() + + logger.info("Configuration loaded successfully") + + def _load_github_config(self) -> GitHubConfig: + """Load GitHub configuration from environment.""" + return GitHubConfig( + token=self._get_required_env("GITHUB_TOKEN"), + webhook_secret=os.getenv("GITHUB_WEBHOOK_SECRET"), + repository=os.getenv("GITHUB_REPOSITORY"), + organization=os.getenv("GITHUB_ORGANIZATION"), + api_base_url=os.getenv("GITHUB_API_BASE_URL", "https://api.github.com") + ) + + def _load_llm_config(self) -> LLMConfig: + """Load LLM configuration from environment.""" + return LLMConfig( + openai_api_key=os.getenv("OPENAI_API_KEY"), + anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"), + default_provider=os.getenv("LLM_DEFAULT_PROVIDER", "openai"), + max_tokens=int(os.getenv("LLM_MAX_TOKENS", "4000")), + temperature=float(os.getenv("LLM_TEMPERATURE", "0.7")), + vision_model=os.getenv("LLM_VISION_MODEL", "gpt-4-vision-preview") + ) + + def _load_vm_config(self) -> VMConfig: + """Load VM configuration from environment.""" + return VMConfig( + docker_image=os.getenv("VM_DOCKER_IMAGE", "python:3.9-slim"), + memory_limit=os.getenv("VM_MEMORY_LIMIT", "512m"), + cpu_limit=os.getenv("VM_CPU_LIMIT", "0.5"), + timeout_seconds=int(os.getenv("VM_TIMEOUT_SECONDS", "300")), + network_mode=os.getenv("VM_NETWORK_MODE", "none"), + enable_gpu=os.getenv("VM_ENABLE_GPU", "false").lower() == "true" + ) + + def _load_monitoring_config(self) -> MonitoringConfig: + """Load monitoring configuration from environment.""" + event_types = os.getenv("MONITORING_EVENT_TYPES") + if event_types: + event_types = event_types.split(",") + else: + event_types = ["push", "pull_request", "issues", "issue_comment", "pull_request_review"] + + return MonitoringConfig( + poll_interval_seconds=int(os.getenv("MONITORING_POLL_INTERVAL", "60")), + max_events_per_cycle=int(os.getenv("MONITORING_MAX_EVENTS", "10")), + event_types=event_types, + ignore_bots=os.getenv("MONITORING_IGNORE_BOTS", "true").lower() == "true" + ) + + def _load_scheduling_config(self) -> SchedulingConfig: + """Load scheduling configuration from environment.""" + wake_triggers = os.getenv("SCHEDULING_WAKE_TRIGGERS") + if wake_triggers: + wake_triggers = wake_triggers.split(",") + else: + wake_triggers = ["high_priority_event", "mention", "scheduled_time"] + + return SchedulingConfig( + enable_sleep_mode=os.getenv("SCHEDULING_ENABLE_SLEEP", "true").lower() == "true", + sleep_duration_minutes=int(os.getenv("SCHEDULING_SLEEP_DURATION", "30")), + wake_triggers=wake_triggers, + quiet_hours_start=os.getenv("SCHEDULING_QUIET_START", "22:00"), + quiet_hours_end=os.getenv("SCHEDULING_QUIET_END", "06:00"), + timezone=os.getenv("SCHEDULING_TIMEZONE", "UTC") + ) + + def _load_vision_config(self) -> VisionConfig: + """Load vision configuration from environment.""" + max_size = os.getenv("VISION_MAX_SCREENSHOT_SIZE", "1920,1080") + max_size = tuple(map(int, max_size.split(","))) + + return VisionConfig( + screenshot_quality=int(os.getenv("VISION_SCREENSHOT_QUALITY", "85")), + max_screenshot_size=max_size, + ui_interaction_timeout=int(os.getenv("VISION_UI_TIMEOUT", "30")), + visual_diff_threshold=float(os.getenv("VISION_DIFF_THRESHOLD", "0.1")), + enable_ocr=os.getenv("VISION_ENABLE_OCR", "true").lower() == "true" + ) + + def _load_security_config(self) -> SecurityConfig: + """Load security configuration from environment.""" + allowed_extensions = os.getenv("SECURITY_ALLOWED_EXTENSIONS") + if allowed_extensions: + allowed_extensions = allowed_extensions.split(",") + else: + allowed_extensions = [".py", ".js", ".ts", ".json", ".yaml", ".yml", ".md", ".txt"] + + blocked_commands = os.getenv("SECURITY_BLOCKED_COMMANDS") + if blocked_commands: + blocked_commands = blocked_commands.split(",") + else: + blocked_commands = ["rm -rf", "sudo", "curl", "wget", "nc", "netcat"] + + return SecurityConfig( + enable_code_scanning=os.getenv("SECURITY_ENABLE_SCANNING", "true").lower() == "true", + allowed_file_extensions=allowed_extensions, + blocked_commands=blocked_commands, + max_file_size_mb=int(os.getenv("SECURITY_MAX_FILE_SIZE_MB", "10")) + ) + + def _load_from_file(self, config_file: Path): + """Load additional configuration from JSON file.""" + try: + with open(config_file, 'r') as f: + file_config = json.load(f) + + # Update configuration with file values + for section, values in file_config.items(): + if hasattr(self, section): + config_obj = getattr(self, section) + for key, value in values.items(): + if hasattr(config_obj, key): + setattr(config_obj, key, value) + + logger.info(f"Loaded configuration from {config_file}") + except Exception as e: + logger.warning(f"Failed to load config file {config_file}: {e}") + + def _validate(self): + """Validate the configuration.""" + errors = [] + + # Validate GitHub token + if not self.github.token: + errors.append("GITHUB_TOKEN is required") + + # Validate LLM configuration + if not self.llm.openai_api_key and not self.llm.anthropic_api_key: + errors.append("At least one LLM API key (OpenAI or Anthropic) is required") + + # Validate VM configuration + if self.vm.timeout_seconds <= 0: + errors.append("VM timeout must be positive") + + if errors: + raise ValueError(f"Configuration validation failed: {', '.join(errors)}") + + def _get_required_env(self, key: str) -> str: + """Get required environment variable or raise error.""" + value = os.getenv(key) + if not value: + raise ValueError(f"Required environment variable {key} is not set") + return value + + def to_dict(self) -> Dict: + """Convert configuration to dictionary (excluding sensitive data).""" + return { + "github": { + "repository": self.github.repository, + "organization": self.github.organization, + "api_base_url": self.github.api_base_url + }, + "llm": { + "default_provider": self.llm.default_provider, + "max_tokens": self.llm.max_tokens, + "temperature": self.llm.temperature, + "vision_model": self.llm.vision_model + }, + "vm": { + "docker_image": self.vm.docker_image, + "memory_limit": self.vm.memory_limit, + "cpu_limit": self.vm.cpu_limit, + "timeout_seconds": self.vm.timeout_seconds, + "network_mode": self.vm.network_mode, + "enable_gpu": self.vm.enable_gpu + }, + "monitoring": { + "poll_interval_seconds": self.monitoring.poll_interval_seconds, + "max_events_per_cycle": self.monitoring.max_events_per_cycle, + "event_types": self.monitoring.event_types, + "ignore_bots": self.monitoring.ignore_bots + }, + "scheduling": { + "enable_sleep_mode": self.scheduling.enable_sleep_mode, + "sleep_duration_minutes": self.scheduling.sleep_duration_minutes, + "wake_triggers": self.scheduling.wake_triggers, + "quiet_hours_start": self.scheduling.quiet_hours_start, + "quiet_hours_end": self.scheduling.quiet_hours_end, + "timezone": self.scheduling.timezone + }, + "vision": { + "screenshot_quality": self.vision.screenshot_quality, + "max_screenshot_size": self.vision.max_screenshot_size, + "ui_interaction_timeout": self.vision.ui_interaction_timeout, + "visual_diff_threshold": self.vision.visual_diff_threshold, + "enable_ocr": self.vision.enable_ocr + }, + "security": { + "enable_code_scanning": self.security.enable_code_scanning, + "allowed_file_extensions": self.security.allowed_file_extensions, + "blocked_commands": self.security.blocked_commands, + "max_file_size_mb": self.security.max_file_size_mb + } + } \ No newline at end of file diff --git a/src/core/events.py b/src/core/events.py new file mode 100644 index 0000000..fdba383 --- /dev/null +++ b/src/core/events.py @@ -0,0 +1,325 @@ +""" +Event processing system for the OmniBlocks Proactive AI Bot. + +This module handles event queuing, processing, and routing to +appropriate handlers. +""" + +import asyncio +import logging +from datetime import datetime +from typing import Dict, List, Callable, Any, Optional +from dataclasses import dataclass, field +from enum import Enum +import json +import uuid + +logger = logging.getLogger(__name__) + + +class EventPriority(Enum): + """Event priority levels.""" + LOW = 1 + NORMAL = 2 + HIGH = 3 + CRITICAL = 4 + EMERGENCY = 5 + + +@dataclass +class Event: + """Represents an event in the system.""" + id: str + type: str + data: Dict[str, Any] + priority: EventPriority = EventPriority.NORMAL + timestamp: datetime = field(default_factory=datetime.utcnow) + source: str = "unknown" + processed: bool = False + retry_count: int = 0 + max_retries: int = 3 + + def to_dict(self) -> Dict[str, Any]: + """Convert event to dictionary.""" + return { + "id": self.id, + "type": self.type, + "data": self.data, + "priority": self.priority.value, + "timestamp": self.timestamp.isoformat(), + "source": self.source, + "processed": self.processed, + "retry_count": self.retry_count, + "max_retries": self.max_retries + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Event': + """Create event from dictionary.""" + return cls( + id=data["id"], + type=data["type"], + data=data["data"], + priority=EventPriority(data.get("priority", 2)), + timestamp=datetime.fromisoformat(data["timestamp"]), + source=data.get("source", "unknown"), + processed=data.get("processed", False), + retry_count=data.get("retry_count", 0), + max_retries=data.get("max_retries", 3) + ) + + +class EventProcessor: + """ + Processes events asynchronously with priority queuing and retry logic. + + Features: + - Priority-based event queuing + - Async event handlers + - Retry logic for failed events + - Event filtering and routing + - Performance metrics + """ + + def __init__(self): + """Initialize the event processor.""" + self.event_queue: asyncio.PriorityQueue = asyncio.PriorityQueue() + self.handlers: Dict[str, List[Callable]] = {} + self.filters: List[Callable] = [] + self.failed_events: List[Event] = [] + self.processed_events: List[Event] = [] + self.metrics = { + "events_processed": 0, + "events_failed": 0, + "events_retried": 0, + "processing_time_total": 0.0 + } + + logger.info("EventProcessor initialized") + + def register_handler(self, event_type: str, handler: Callable): + """Register an event handler for a specific event type.""" + if event_type not in self.handlers: + self.handlers[event_type] = [] + + self.handlers[event_type].append(handler) + logger.info(f"Registered handler for event type: {event_type}") + + def register_filter(self, filter_func: Callable): + """Register an event filter function.""" + self.filters.append(filter_func) + logger.info("Registered event filter") + + async def queue_event(self, event: Event): + """Queue an event for processing.""" + # Apply filters + for filter_func in self.filters: + if not await self._apply_filter(filter_func, event): + logger.debug(f"Event {event.id} filtered out") + return + + # Add to priority queue (lower priority value = higher priority) + priority_value = -event.priority.value # Negative for correct ordering + await self.event_queue.put((priority_value, event.timestamp, event)) + + logger.debug(f"Queued event: {event.type} (ID: {event.id}, Priority: {event.priority.name})") + + async def get_pending_events(self, max_events: int = 10) -> List[Event]: + """Get pending events from the queue.""" + events = [] + + for _ in range(max_events): + try: + # Non-blocking get with timeout + priority, timestamp, event = await asyncio.wait_for( + self.event_queue.get(), timeout=0.1 + ) + events.append(event) + except asyncio.TimeoutError: + break + + return events + + async def process_event(self, event: Event) -> bool: + """Process a single event.""" + start_time = datetime.utcnow() + + try: + logger.info(f"Processing event: {event.type} (ID: {event.id})") + + # Get handlers for this event type + handlers = self.handlers.get(event.type, []) + + if not handlers: + logger.warning(f"No handlers registered for event type: {event.type}") + return False + + # Execute all handlers + for handler in handlers: + try: + await handler(event) + except Exception as e: + logger.error(f"Handler failed for event {event.id}: {e}", exc_info=True) + raise + + # Mark as processed + event.processed = True + self.processed_events.append(event) + self.metrics["events_processed"] += 1 + + # Calculate processing time + processing_time = (datetime.utcnow() - start_time).total_seconds() + self.metrics["processing_time_total"] += processing_time + + logger.info(f"Successfully processed event {event.id} in {processing_time:.2f}s") + return True + + except Exception as e: + logger.error(f"Failed to process event {event.id}: {e}", exc_info=True) + + # Handle retry logic + event.retry_count += 1 + + if event.retry_count <= event.max_retries: + logger.info(f"Retrying event {event.id} (attempt {event.retry_count}/{event.max_retries})") + await self.queue_event(event) + self.metrics["events_retried"] += 1 + else: + logger.error(f"Event {event.id} failed after {event.max_retries} retries") + self.failed_events.append(event) + self.metrics["events_failed"] += 1 + + return False + + async def _apply_filter(self, filter_func: Callable, event: Event) -> bool: + """Apply a filter function to an event.""" + try: + if asyncio.iscoroutinefunction(filter_func): + return await filter_func(event) + else: + return filter_func(event) + except Exception as e: + logger.error(f"Filter function failed: {e}", exc_info=True) + return True # Allow event through if filter fails + + def create_event(self, event_type: str, data: Dict[str, Any], + priority: EventPriority = EventPriority.NORMAL, + source: str = "system") -> Event: + """Create a new event with generated ID.""" + return Event( + id=str(uuid.uuid4()), + type=event_type, + data=data, + priority=priority, + source=source + ) + + def get_queue_size(self) -> int: + """Get current queue size.""" + return self.event_queue.qsize() + + def get_metrics(self) -> Dict[str, Any]: + """Get processing metrics.""" + avg_processing_time = 0.0 + if self.metrics["events_processed"] > 0: + avg_processing_time = ( + self.metrics["processing_time_total"] / self.metrics["events_processed"] + ) + + return { + **self.metrics, + "average_processing_time": avg_processing_time, + "queue_size": self.get_queue_size(), + "failed_events_count": len(self.failed_events), + "processed_events_count": len(self.processed_events) + } + + def get_failed_events(self) -> List[Event]: + """Get list of failed events.""" + return self.failed_events.copy() + + def clear_processed_events(self, keep_recent: int = 100): + """Clear processed events, keeping only recent ones.""" + if len(self.processed_events) > keep_recent: + self.processed_events = self.processed_events[-keep_recent:] + + def clear_failed_events(self): + """Clear failed events list.""" + self.failed_events.clear() + + async def shutdown(self): + """Shutdown the event processor.""" + logger.info("Shutting down EventProcessor...") + + # Process remaining events in queue + remaining_events = await self.get_pending_events(max_events=100) + + if remaining_events: + logger.info(f"Processing {len(remaining_events)} remaining events...") + for event in remaining_events: + try: + await self.process_event(event) + except Exception as e: + logger.error(f"Failed to process remaining event {event.id}: {e}") + + logger.info("EventProcessor shutdown complete") + + +# Common event filters + +def ignore_bot_events(event: Event) -> bool: + """Filter to ignore events from bots.""" + author = event.data.get("author", {}) + return not author.get("bot", False) + + +def priority_filter(min_priority: EventPriority): + """Create a filter that only allows events above a certain priority.""" + def filter_func(event: Event) -> bool: + return event.priority.value >= min_priority.value + return filter_func + + +def event_type_filter(allowed_types: List[str]): + """Create a filter that only allows specific event types.""" + def filter_func(event: Event) -> bool: + return event.type in allowed_types + return filter_func + + +# Event factory functions + +def create_github_event(event_type: str, payload: Dict[str, Any], + source: str = "github") -> Event: + """Create a GitHub webhook event.""" + return Event( + id=str(uuid.uuid4()), + type=event_type, + data=payload, + priority=EventPriority.NORMAL, + source=source + ) + + +def create_system_event(event_type: str, data: Dict[str, Any], + priority: EventPriority = EventPriority.NORMAL) -> Event: + """Create a system event.""" + return Event( + id=str(uuid.uuid4()), + type=event_type, + data=data, + priority=priority, + source="system" + ) + + +def create_user_event(event_type: str, data: Dict[str, Any], + user_id: str, priority: EventPriority = EventPriority.HIGH) -> Event: + """Create a user-initiated event.""" + return Event( + id=str(uuid.uuid4()), + type=event_type, + data={**data, "user_id": user_id}, + priority=priority, + source="user" + ) \ No newline at end of file diff --git a/src/core/scheduler.py b/src/core/scheduler.py new file mode 100644 index 0000000..47012eb --- /dev/null +++ b/src/core/scheduler.py @@ -0,0 +1,234 @@ +""" +Sleep/Wake scheduler for the OmniBlocks Proactive AI Bot. + +This module manages intelligent sleep and wake cycles to optimize +token usage while maintaining proactive capabilities. +""" + +import asyncio +import logging +from datetime import datetime, time, timedelta +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +import pytz + +from .config import SchedulingConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class ScheduleEvent: + """Represents a scheduled event.""" + event_type: str + scheduled_time: datetime + priority: int = 1 + data: Dict[str, Any] = None + + +class SleepWakeScheduler: + """ + Manages intelligent sleep/wake scheduling for the bot. + + Features: + - Time-based scheduling (quiet hours) + - Event-based wake triggers + - Token usage optimization + - Priority-based event handling + """ + + def __init__(self, config: SchedulingConfig): + """Initialize the scheduler with configuration.""" + self.config = config + self.timezone = pytz.timezone(config.timezone) + self.scheduled_events: List[ScheduleEvent] = [] + self.last_sleep_time: Optional[datetime] = None + self.last_wake_time: Optional[datetime] = None + self.sleep_reason: Optional[str] = None + + logger.info(f"SleepWakeScheduler initialized with timezone: {config.timezone}") + + async def should_sleep(self) -> bool: + """Determine if the bot should enter sleep mode.""" + if not self.config.enable_sleep_mode: + return False + + current_time = datetime.now(self.timezone) + + # Check if we're in quiet hours + if self._is_quiet_hours(current_time): + self.sleep_reason = "quiet_hours" + return True + + # Check if we've been awake for too long + if self.last_wake_time: + awake_duration = current_time - self.last_wake_time + max_awake_duration = timedelta(minutes=self.config.sleep_duration_minutes * 3) + + if awake_duration > max_awake_duration: + self.sleep_reason = "max_awake_duration" + return True + + # Check token usage patterns (placeholder for now) + if await self._should_sleep_for_token_conservation(): + self.sleep_reason = "token_conservation" + return True + + return False + + async def should_wake_up(self) -> bool: + """Determine if the bot should wake up from sleep mode.""" + if not self.last_sleep_time: + return False + + current_time = datetime.now(self.timezone) + + # Check if minimum sleep duration has passed + sleep_duration = current_time - self.last_sleep_time + min_sleep_duration = timedelta(minutes=self.config.sleep_duration_minutes) + + if sleep_duration < min_sleep_duration: + return False + + # Check for wake triggers + for trigger in self.config.wake_triggers: + if await self._check_wake_trigger(trigger): + logger.info(f"Wake trigger activated: {trigger}") + return True + + # Check if quiet hours have ended + if not self._is_quiet_hours(current_time) and self.sleep_reason == "quiet_hours": + return True + + # Check for scheduled events + if self._has_pending_high_priority_events(): + return True + + return False + + def schedule_event(self, event_type: str, scheduled_time: datetime, + priority: int = 1, data: Dict[str, Any] = None): + """Schedule an event for future execution.""" + event = ScheduleEvent( + event_type=event_type, + scheduled_time=scheduled_time, + priority=priority, + data=data or {} + ) + + self.scheduled_events.append(event) + self.scheduled_events.sort(key=lambda x: (x.scheduled_time, -x.priority)) + + logger.info(f"Scheduled event: {event_type} at {scheduled_time}") + + def schedule_wake_up(self, wake_time: datetime, reason: str = "scheduled"): + """Schedule a specific wake up time.""" + self.schedule_event( + event_type="wake_up", + scheduled_time=wake_time, + priority=5, + data={"reason": reason} + ) + + def get_next_scheduled_event(self) -> Optional[ScheduleEvent]: + """Get the next scheduled event.""" + current_time = datetime.now(self.timezone) + + for event in self.scheduled_events: + if event.scheduled_time <= current_time: + return event + + return None + + def remove_processed_events(self): + """Remove events that have been processed.""" + current_time = datetime.now(self.timezone) + self.scheduled_events = [ + event for event in self.scheduled_events + if event.scheduled_time > current_time + ] + + def enter_sleep_mode(self): + """Record that the bot has entered sleep mode.""" + self.last_sleep_time = datetime.now(self.timezone) + logger.info(f"Entered sleep mode at {self.last_sleep_time} (reason: {self.sleep_reason})") + + def exit_sleep_mode(self): + """Record that the bot has exited sleep mode.""" + self.last_wake_time = datetime.now(self.timezone) + sleep_duration = None + + if self.last_sleep_time: + sleep_duration = self.last_wake_time - self.last_sleep_time + + logger.info(f"Exited sleep mode at {self.last_wake_time} (slept for: {sleep_duration})") + self.sleep_reason = None + + def _is_quiet_hours(self, current_time: datetime) -> bool: + """Check if current time is within quiet hours.""" + current_time_only = current_time.time() + + quiet_start = time.fromisoformat(self.config.quiet_hours_start) + quiet_end = time.fromisoformat(self.config.quiet_hours_end) + + # Handle overnight quiet hours (e.g., 22:00 to 06:00) + if quiet_start > quiet_end: + return current_time_only >= quiet_start or current_time_only <= quiet_end + else: + return quiet_start <= current_time_only <= quiet_end + + async def _should_sleep_for_token_conservation(self) -> bool: + """Check if bot should sleep to conserve tokens.""" + # This would implement sophisticated token usage analysis + # For now, it's a placeholder + return False + + async def _check_wake_trigger(self, trigger: str) -> bool: + """Check if a specific wake trigger condition is met.""" + if trigger == "high_priority_event": + return self._has_pending_high_priority_events() + + elif trigger == "mention": + # This would check for recent mentions + return False + + elif trigger == "scheduled_time": + # Check for scheduled wake up times + next_event = self.get_next_scheduled_event() + if next_event and next_event.event_type == "wake_up": + return True + + return False + + def _has_pending_high_priority_events(self) -> bool: + """Check if there are pending high priority events.""" + current_time = datetime.now(self.timezone) + + for event in self.scheduled_events: + if (event.scheduled_time <= current_time and + event.priority >= 4): # High priority threshold + return True + + return False + + def get_status(self) -> Dict[str, Any]: + """Get current scheduler status.""" + current_time = datetime.now(self.timezone) + + return { + "enable_sleep_mode": self.config.enable_sleep_mode, + "current_time": current_time.isoformat(), + "is_quiet_hours": self._is_quiet_hours(current_time), + "last_sleep_time": self.last_sleep_time.isoformat() if self.last_sleep_time else None, + "last_wake_time": self.last_wake_time.isoformat() if self.last_wake_time else None, + "sleep_reason": self.sleep_reason, + "scheduled_events_count": len(self.scheduled_events), + "next_event": ( + self.scheduled_events[0].event_type if self.scheduled_events else None + ), + "quiet_hours": { + "start": self.config.quiet_hours_start, + "end": self.config.quiet_hours_end, + "timezone": self.config.timezone + } + } \ No newline at end of file diff --git a/src/github/__init__.py b/src/github/__init__.py new file mode 100644 index 0000000..0d8d5f6 --- /dev/null +++ b/src/github/__init__.py @@ -0,0 +1,6 @@ +""" +GitHub integration module for the OmniBlocks Proactive AI Bot. + +This module handles all GitHub API interactions including +repository operations, webhooks, and comment management. +""" \ No newline at end of file diff --git a/src/github/github_client.py b/src/github/github_client.py new file mode 100644 index 0000000..84cc46d --- /dev/null +++ b/src/github/github_client.py @@ -0,0 +1,392 @@ +""" +GitHub API client for repository operations and interactions. + +This module provides a comprehensive interface to the GitHub API +with rate limiting, error handling, and retry logic. +""" + +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional +import aiohttp +import base64 +import json + +from ..core.config import GitHubConfig +from ..utils.logging import log_audit_event + +logger = logging.getLogger(__name__) + + +class GitHubRateLimitError(Exception): + """Raised when GitHub API rate limit is exceeded.""" + pass + + +class GitHubClient: + """ + GitHub API client with comprehensive functionality. + + Features: + - Rate limit handling + - Automatic retries + - Webhook processing + - Repository operations + - Issue and PR management + """ + + def __init__(self, config: GitHubConfig): + """Initialize GitHub client.""" + self.config = config + self.session: Optional[aiohttp.ClientSession] = None + self.rate_limit_remaining = 5000 + self.rate_limit_reset = datetime.utcnow() + + self.headers = { + "Authorization": f"token {config.token}", + "Accept": "application/vnd.github.v3+json", + "User-Agent": "OmniBlocks-ProactiveBot/1.0" + } + + logger.info("GitHubClient initialized") + + async def initialize(self): + """Initialize the GitHub client.""" + try: + # Create aiohttp session + connector = aiohttp.TCPConnector(limit=100, limit_per_host=30) + timeout = aiohttp.ClientTimeout(total=30) + + self.session = aiohttp.ClientSession( + connector=connector, + timeout=timeout, + headers=self.headers + ) + + # Test API access + await self._test_api_access() + + logger.info("GitHubClient initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize GitHubClient: {e}", exc_info=True) + raise + + async def shutdown(self): + """Shutdown the GitHub client.""" + if self.session: + await self.session.close() + + logger.info("GitHubClient shutdown complete") + + async def health_check(self) -> bool: + """Perform health check.""" + try: + response = await self._make_request("GET", "/user") + return response is not None + except Exception as e: + logger.error(f"GitHubClient health check failed: {e}") + return False + + async def _make_request(self, method: str, endpoint: str, + data: Dict = None, params: Dict = None, + retry_count: int = 0) -> Optional[Dict]: + """Make HTTP request to GitHub API with rate limiting and retries.""" + if not self.session: + raise RuntimeError("GitHubClient not initialized") + + # Check rate limit + await self._check_rate_limit() + + url = f"{self.config.api_base_url}{endpoint}" + + try: + kwargs = {"params": params} if params else {} + if data: + kwargs["json"] = data + + async with self.session.request(method, url, **kwargs) as response: + # Update rate limit info + self._update_rate_limit_info(response) + + if response.status == 200 or response.status == 201: + return await response.json() + + elif response.status == 403: + # Rate limit exceeded + if "rate limit" in (await response.text()).lower(): + raise GitHubRateLimitError("GitHub API rate limit exceeded") + else: + logger.error(f"GitHub API forbidden: {await response.text()}") + return None + + elif response.status == 404: + logger.warning(f"GitHub API not found: {endpoint}") + return None + + else: + error_text = await response.text() + logger.error(f"GitHub API error {response.status}: {error_text}") + + # Retry on server errors + if response.status >= 500 and retry_count < 3: + await asyncio.sleep(2 ** retry_count) + return await self._make_request(method, endpoint, data, params, retry_count + 1) + + return None + + except GitHubRateLimitError: + raise + except Exception as e: + logger.error(f"GitHub API request failed: {e}", exc_info=True) + + # Retry on network errors + if retry_count < 3: + await asyncio.sleep(2 ** retry_count) + return await self._make_request(method, endpoint, data, params, retry_count + 1) + + return None + + async def _check_rate_limit(self): + """Check and handle rate limiting.""" + if self.rate_limit_remaining <= 10: # Conservative threshold + wait_time = (self.rate_limit_reset - datetime.utcnow()).total_seconds() + + if wait_time > 0: + logger.warning(f"Rate limit low ({self.rate_limit_remaining}), waiting {wait_time:.1f}s") + await asyncio.sleep(wait_time + 1) + + def _update_rate_limit_info(self, response: aiohttp.ClientResponse): + """Update rate limit information from response headers.""" + try: + self.rate_limit_remaining = int(response.headers.get("X-RateLimit-Remaining", 5000)) + reset_timestamp = int(response.headers.get("X-RateLimit-Reset", 0)) + + if reset_timestamp: + self.rate_limit_reset = datetime.utcfromtimestamp(reset_timestamp) + + except (ValueError, TypeError): + pass # Ignore header parsing errors + + async def _test_api_access(self): + """Test GitHub API access.""" + user_data = await self._make_request("GET", "/user") + + if user_data: + logger.info(f"GitHub API access confirmed for user: {user_data.get('login')}") + else: + raise RuntimeError("Failed to access GitHub API") + + # Repository operations + + async def get_repository(self, repository: str) -> Optional[Dict]: + """Get repository information.""" + return await self._make_request("GET", f"/repos/{repository}") + + async def get_commits(self, repository: str, sha: str = None, + per_page: int = 30, page: int = 1) -> List[Dict]: + """Get repository commits.""" + params = {"per_page": per_page, "page": page} + if sha: + params["sha"] = sha + + result = await self._make_request("GET", f"/repos/{repository}/commits", params=params) + return result if result else [] + + async def get_commit(self, repository: str, sha: str) -> Optional[Dict]: + """Get specific commit information.""" + return await self._make_request("GET", f"/repos/{repository}/commits/{sha}") + + async def get_pull_requests(self, repository: str, state: str = "open", + per_page: int = 30, page: int = 1) -> List[Dict]: + """Get repository pull requests.""" + params = {"state": state, "per_page": per_page, "page": page} + + result = await self._make_request("GET", f"/repos/{repository}/pulls", params=params) + return result if result else [] + + async def get_pull_request(self, repository: str, number: int) -> Optional[Dict]: + """Get specific pull request.""" + return await self._make_request("GET", f"/repos/{repository}/pulls/{number}") + + async def get_issues(self, repository: str, state: str = "open", + per_page: int = 30, page: int = 1) -> List[Dict]: + """Get repository issues.""" + params = {"state": state, "per_page": per_page, "page": page} + + result = await self._make_request("GET", f"/repos/{repository}/issues", params=params) + return result if result else [] + + async def get_issue(self, repository: str, number: int) -> Optional[Dict]: + """Get specific issue.""" + return await self._make_request("GET", f"/repos/{repository}/issues/{number}") + + async def get_issue_comments(self, repository: str, issue_number: int = None, + since: str = None) -> List[Dict]: + """Get issue comments.""" + if issue_number: + endpoint = f"/repos/{repository}/issues/{issue_number}/comments" + else: + endpoint = f"/repos/{repository}/issues/comments" + + params = {} + if since: + params["since"] = since + + result = await self._make_request("GET", endpoint, params=params) + return result if result else [] + + async def get_user_repositories(self, per_page: int = 30) -> List[str]: + """Get user repositories.""" + params = {"per_page": per_page, "type": "all"} + + repos = await self._make_request("GET", "/user/repos", params=params) + + if repos: + return [repo["full_name"] for repo in repos] + + return [] + + async def get_organization_repositories(self, org: str, per_page: int = 30) -> List[str]: + """Get organization repositories.""" + params = {"per_page": per_page, "type": "all"} + + repos = await self._make_request("GET", f"/orgs/{org}/repos", params=params) + + if repos: + return [repo["full_name"] for repo in repos] + + return [] + + # Comment and interaction operations + + async def post_issue_comment(self, repository: str, issue_number: int, + body: str) -> Optional[Dict]: + """Post comment on an issue.""" + data = {"body": body} + + log_audit_event("github_comment_posted", details={ + "repository": repository, + "issue_number": issue_number, + "comment_length": len(body) + }) + + return await self._make_request( + "POST", + f"/repos/{repository}/issues/{issue_number}/comments", + data=data + ) + + async def post_pr_review(self, repository: str, pr_number: int, + review_data: Dict) -> Optional[Dict]: + """Post pull request review.""" + log_audit_event("github_pr_review_posted", details={ + "repository": repository, + "pr_number": pr_number, + "review_event": review_data.get("event", "COMMENT") + }) + + return await self._make_request( + "POST", + f"/repos/{repository}/pulls/{pr_number}/reviews", + data=review_data + ) + + async def post_commit_comment(self, repository: str, sha: str, + body: str, path: str = None, + line: int = None) -> Optional[Dict]: + """Post comment on a commit.""" + data = {"body": body} + if path: + data["path"] = path + if line: + data["line"] = line + + log_audit_event("github_commit_comment_posted", details={ + "repository": repository, + "commit_sha": sha, + "comment_length": len(body) + }) + + return await self._make_request( + "POST", + f"/repos/{repository}/commits/{sha}/comments", + data=data + ) + + async def post_comment(self, repository: str, commit_sha: str, comment: str): + """Post a comment on a commit (legacy method for compatibility).""" + return await self.post_commit_comment(repository, commit_sha, comment) + + async def post_comment_reply(self, repository: str, comment_id: int, reply: str): + """Post a reply to a comment (GitHub doesn't support direct replies, so post new comment).""" + # GitHub doesn't have direct comment replies, so we'll post a new comment + # that references the original comment + body = f"@{comment_id} {reply}" # This is a simplified approach + + # We need to determine if this is an issue or PR comment to post correctly + # For now, we'll assume it's an issue comment + # In a real implementation, you'd need to track the comment context + + logger.warning("Comment reply not fully implemented - posting as new comment") + return None + + # File operations + + async def get_file_content(self, repository: str, path: str, + ref: str = "main") -> Optional[str]: + """Get file content from repository.""" + result = await self._make_request( + "GET", + f"/repos/{repository}/contents/{path}", + params={"ref": ref} + ) + + if result and result.get("content"): + # Decode base64 content + content = base64.b64decode(result["content"]).decode("utf-8") + return content + + return None + + async def create_or_update_file(self, repository: str, path: str, + content: str, message: str, + branch: str = "main") -> Optional[Dict]: + """Create or update a file in repository.""" + # First, try to get existing file to get SHA + existing = await self._make_request( + "GET", + f"/repos/{repository}/contents/{path}", + params={"ref": branch} + ) + + data = { + "message": message, + "content": base64.b64encode(content.encode("utf-8")).decode("utf-8"), + "branch": branch + } + + if existing: + data["sha"] = existing["sha"] + + log_audit_event("github_file_updated", details={ + "repository": repository, + "path": path, + "branch": branch, + "content_length": len(content) + }) + + return await self._make_request( + "PUT", + f"/repos/{repository}/contents/{path}", + data=data + ) + + def get_rate_limit_status(self) -> Dict[str, Any]: + """Get current rate limit status.""" + return { + "remaining": self.rate_limit_remaining, + "reset_time": self.rate_limit_reset.isoformat(), + "seconds_until_reset": max(0, (self.rate_limit_reset - datetime.utcnow()).total_seconds()) + } \ No newline at end of file diff --git a/src/llm/__init__.py b/src/llm/__init__.py new file mode 100644 index 0000000..4ca9a23 --- /dev/null +++ b/src/llm/__init__.py @@ -0,0 +1,6 @@ +""" +LLM integration module for the OmniBlocks Proactive AI Bot. + +This module provides integration with various LLM providers including +OpenAI, Anthropic, and others for intelligent analysis and response generation. +""" \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..c30a706 --- /dev/null +++ b/src/main.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +""" +Main entry point for the OmniBlocks Proactive AI Bot. + +This module initializes and starts the bot with all its capabilities: +- Proactive repository monitoring +- VM testing environment +- Vision and multimodal capabilities +- GitHub integration +- LLM intelligence +""" + +import asyncio +import logging +import signal +import sys +from pathlib import Path + +# Add src to path for imports +sys.path.insert(0, str(Path(__file__).parent)) + +from core.bot import ProactiveBot +from core.config import Config +from utils.logging import setup_logging + + +async def main(): + """Main entry point for the bot.""" + # Setup logging + setup_logging() + logger = logging.getLogger(__name__) + + logger.info("Starting OmniBlocks Proactive AI Bot...") + + try: + # Load configuration + config = Config() + + # Initialize the bot + bot = ProactiveBot(config) + + # Setup signal handlers for graceful shutdown + def signal_handler(signum, frame): + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + asyncio.create_task(bot.shutdown()) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Start the bot + await bot.start() + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down...") + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1) + finally: + logger.info("Bot shutdown complete.") + + +if __name__ == "__main__": + # Run the bot + asyncio.run(main()) \ No newline at end of file diff --git a/src/monitoring/__init__.py b/src/monitoring/__init__.py new file mode 100644 index 0000000..a6e3b39 --- /dev/null +++ b/src/monitoring/__init__.py @@ -0,0 +1,6 @@ +""" +Repository monitoring module for the OmniBlocks Proactive AI Bot. + +This module handles continuous monitoring of repository activity +and detection of events that require proactive engagement. +""" \ No newline at end of file diff --git a/src/monitoring/repository_monitor.py b/src/monitoring/repository_monitor.py new file mode 100644 index 0000000..eeb143d --- /dev/null +++ b/src/monitoring/repository_monitor.py @@ -0,0 +1,399 @@ +""" +Repository monitoring system for proactive event detection. + +This module continuously monitors repository activity and identifies +events that warrant proactive bot engagement. +""" + +import asyncio +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional, Set +from dataclasses import dataclass +import hashlib + +from ..core.events import Event, EventPriority, create_github_event +from ..core.config import MonitoringConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class RepositoryState: + """Represents the current state of a repository.""" + last_commit_sha: Optional[str] = None + last_push_time: Optional[datetime] = None + open_prs: Set[int] = None + open_issues: Set[int] = None + last_activity_time: Optional[datetime] = None + + def __post_init__(self): + if self.open_prs is None: + self.open_prs = set() + if self.open_issues is None: + self.open_issues = set() + + +class RepositoryMonitor: + """ + Monitors repository activity for proactive engagement opportunities. + + Features: + - Continuous polling of repository events + - Change detection and analysis + - Event prioritization + - Activity pattern recognition + """ + + def __init__(self, config: MonitoringConfig, github_client): + """Initialize repository monitor.""" + self.config = config + self.github_client = github_client + self.repository_states: Dict[str, RepositoryState] = {} + self.last_poll_time: Optional[datetime] = None + self.event_cache: Set[str] = set() # Cache to prevent duplicate events + self.is_running = False + + logger.info("RepositoryMonitor initialized") + + async def initialize(self): + """Initialize the repository monitor.""" + try: + # Load initial repository states + await self._load_initial_states() + + logger.info("RepositoryMonitor initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize RepositoryMonitor: {e}", exc_info=True) + raise + + async def shutdown(self): + """Shutdown the repository monitor.""" + self.is_running = False + logger.info("RepositoryMonitor shutdown complete") + + async def health_check(self) -> bool: + """Perform health check.""" + try: + # Check if we can access GitHub API + return await self.github_client.health_check() + except Exception as e: + logger.error(f"RepositoryMonitor health check failed: {e}") + return False + + async def check_for_events(self) -> List[Event]: + """Check for new repository events.""" + events = [] + + try: + # Get list of repositories to monitor + repositories = await self._get_monitored_repositories() + + for repo in repositories: + repo_events = await self._check_repository_events(repo) + events.extend(repo_events) + + # Update last poll time + self.last_poll_time = datetime.utcnow() + + logger.debug(f"Found {len(events)} new events across {len(repositories)} repositories") + + except Exception as e: + logger.error(f"Error checking for events: {e}", exc_info=True) + + return events + + async def _get_monitored_repositories(self) -> List[str]: + """Get list of repositories to monitor.""" + try: + # If specific repository is configured, monitor only that one + if hasattr(self.github_client.config, 'repository') and self.github_client.config.repository: + return [self.github_client.config.repository] + + # Otherwise, get repositories from organization + if hasattr(self.github_client.config, 'organization') and self.github_client.config.organization: + return await self.github_client.get_organization_repositories( + self.github_client.config.organization + ) + + # Fallback: get user repositories + return await self.github_client.get_user_repositories() + + except Exception as e: + logger.error(f"Failed to get monitored repositories: {e}", exc_info=True) + return [] + + async def _check_repository_events(self, repository: str) -> List[Event]: + """Check for events in a specific repository.""" + events = [] + + try: + # Get current repository state + current_state = await self._get_repository_state(repository) + previous_state = self.repository_states.get(repository, RepositoryState()) + + # Check for different types of events + events.extend(await self._check_push_events(repository, current_state, previous_state)) + events.extend(await self._check_pr_events(repository, current_state, previous_state)) + events.extend(await self._check_issue_events(repository, current_state, previous_state)) + events.extend(await self._check_comment_events(repository)) + + # Update stored state + self.repository_states[repository] = current_state + + except Exception as e: + logger.error(f"Error checking events for repository {repository}: {e}", exc_info=True) + + return events + + async def _get_repository_state(self, repository: str) -> RepositoryState: + """Get current state of a repository.""" + try: + # Get latest commit + commits = await self.github_client.get_commits(repository, per_page=1) + last_commit_sha = commits[0]["sha"] if commits else None + last_push_time = None + + if commits: + commit_date = commits[0]["commit"]["committer"]["date"] + last_push_time = datetime.fromisoformat(commit_date.replace("Z", "+00:00")) + + # Get open PRs + prs = await self.github_client.get_pull_requests(repository, state="open") + open_prs = {pr["number"] for pr in prs} + + # Get open issues + issues = await self.github_client.get_issues(repository, state="open") + open_issues = {issue["number"] for issue in issues if "pull_request" not in issue} + + return RepositoryState( + last_commit_sha=last_commit_sha, + last_push_time=last_push_time, + open_prs=open_prs, + open_issues=open_issues, + last_activity_time=datetime.utcnow() + ) + + except Exception as e: + logger.error(f"Failed to get repository state for {repository}: {e}", exc_info=True) + return RepositoryState() + + async def _check_push_events(self, repository: str, current: RepositoryState, + previous: RepositoryState) -> List[Event]: + """Check for new push events.""" + events = [] + + try: + # Check if there's a new commit + if (current.last_commit_sha and + current.last_commit_sha != previous.last_commit_sha): + + # Get commit details + commit_data = await self.github_client.get_commit(repository, current.last_commit_sha) + + # Create event ID to prevent duplicates + event_id = f"push_{repository}_{current.last_commit_sha}" + + if event_id not in self.event_cache: + event = create_github_event( + event_type="push", + payload={ + "repository": repository, + "commit_sha": current.last_commit_sha, + "commit_data": commit_data, + "timestamp": current.last_push_time.isoformat() if current.last_push_time else None + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.info(f"Detected new push in {repository}: {current.last_commit_sha[:8]}") + + except Exception as e: + logger.error(f"Error checking push events for {repository}: {e}", exc_info=True) + + return events + + async def _check_pr_events(self, repository: str, current: RepositoryState, + previous: RepositoryState) -> List[Event]: + """Check for PR events (new, updated, closed).""" + events = [] + + try: + # New PRs + new_prs = current.open_prs - previous.open_prs + for pr_number in new_prs: + pr_data = await self.github_client.get_pull_request(repository, pr_number) + + event_id = f"pr_opened_{repository}_{pr_number}" + if event_id not in self.event_cache: + event = create_github_event( + event_type="pull_request", + payload={ + "action": "opened", + "repository": repository, + "number": pr_number, + "pull_request": pr_data + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.info(f"Detected new PR in {repository}: #{pr_number}") + + # Closed PRs + closed_prs = previous.open_prs - current.open_prs + for pr_number in closed_prs: + event_id = f"pr_closed_{repository}_{pr_number}" + if event_id not in self.event_cache: + event = create_github_event( + event_type="pull_request", + payload={ + "action": "closed", + "repository": repository, + "number": pr_number + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.info(f"Detected closed PR in {repository}: #{pr_number}") + + except Exception as e: + logger.error(f"Error checking PR events for {repository}: {e}", exc_info=True) + + return events + + async def _check_issue_events(self, repository: str, current: RepositoryState, + previous: RepositoryState) -> List[Event]: + """Check for issue events (new, closed).""" + events = [] + + try: + # New issues + new_issues = current.open_issues - previous.open_issues + for issue_number in new_issues: + issue_data = await self.github_client.get_issue(repository, issue_number) + + event_id = f"issue_opened_{repository}_{issue_number}" + if event_id not in self.event_cache: + event = create_github_event( + event_type="issues", + payload={ + "action": "opened", + "repository": repository, + "number": issue_number, + "issue": issue_data + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.info(f"Detected new issue in {repository}: #{issue_number}") + + # Closed issues + closed_issues = previous.open_issues - current.open_issues + for issue_number in closed_issues: + event_id = f"issue_closed_{repository}_{issue_number}" + if event_id not in self.event_cache: + event = create_github_event( + event_type="issues", + payload={ + "action": "closed", + "repository": repository, + "number": issue_number + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.info(f"Detected closed issue in {repository}: #{issue_number}") + + except Exception as e: + logger.error(f"Error checking issue events for {repository}: {e}", exc_info=True) + + return events + + async def _check_comment_events(self, repository: str) -> List[Event]: + """Check for new comments on issues and PRs.""" + events = [] + + try: + # Get recent comments (last hour) + since = datetime.utcnow() - timedelta(hours=1) + + # Check issue comments + comments = await self.github_client.get_issue_comments( + repository, since=since.isoformat() + ) + + for comment in comments: + # Skip bot comments if configured + if (self.config.ignore_bots and + comment.get("user", {}).get("type") == "Bot"): + continue + + event_id = f"comment_{repository}_{comment['id']}" + if event_id not in self.event_cache: + event = create_github_event( + event_type="issue_comment", + payload={ + "action": "created", + "repository": repository, + "comment": comment + } + ) + + events.append(event) + self.event_cache.add(event_id) + + logger.debug(f"Detected new comment in {repository}: {comment['id']}") + + except Exception as e: + logger.error(f"Error checking comment events for {repository}: {e}", exc_info=True) + + return events + + async def _load_initial_states(self): + """Load initial repository states.""" + try: + repositories = await self._get_monitored_repositories() + + for repository in repositories: + state = await self._get_repository_state(repository) + self.repository_states[repository] = state + + logger.info(f"Loaded initial states for {len(repositories)} repositories") + + except Exception as e: + logger.error(f"Failed to load initial states: {e}", exc_info=True) + + def _cleanup_event_cache(self, max_size: int = 10000): + """Clean up event cache to prevent memory issues.""" + if len(self.event_cache) > max_size: + # Remove oldest half of the cache + cache_list = list(self.event_cache) + self.event_cache = set(cache_list[len(cache_list)//2:]) + + logger.debug(f"Cleaned up event cache, new size: {len(self.event_cache)}") + + def get_monitoring_status(self) -> Dict[str, Any]: + """Get current monitoring status.""" + return { + "is_running": self.is_running, + "last_poll_time": self.last_poll_time.isoformat() if self.last_poll_time else None, + "monitored_repositories": list(self.repository_states.keys()), + "event_cache_size": len(self.event_cache), + "config": { + "poll_interval_seconds": self.config.poll_interval_seconds, + "max_events_per_cycle": self.config.max_events_per_cycle, + "event_types": self.config.event_types, + "ignore_bots": self.config.ignore_bots + } + } \ No newline at end of file diff --git a/src/testing/__init__.py b/src/testing/__init__.py new file mode 100644 index 0000000..07f4943 --- /dev/null +++ b/src/testing/__init__.py @@ -0,0 +1,6 @@ +""" +VM testing environment module for the OmniBlocks Proactive AI Bot. + +This module provides isolated VM environments for safe code execution +and automated testing workflows. +""" \ No newline at end of file diff --git a/src/testing/vm_manager.py b/src/testing/vm_manager.py new file mode 100644 index 0000000..5ce7044 --- /dev/null +++ b/src/testing/vm_manager.py @@ -0,0 +1,509 @@ +""" +VM Manager for secure code execution and testing. + +This module provides isolated Docker containers for safe code execution, +automated testing, and result analysis. +""" + +import asyncio +import logging +import tempfile +import shutil +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Any, Optional +import docker +import json +import tarfile +import io + +from ..core.config import VMConfig, SecurityConfig +from ..utils.logging import log_audit_event, log_performance_metric + +logger = logging.getLogger(__name__) + + +class VMTestResult: + """Represents the result of a VM test execution.""" + + def __init__(self): + self.success = False + self.exit_code = None + self.stdout = "" + self.stderr = "" + self.execution_time = 0.0 + self.memory_usage = 0 + self.has_ui_changes = False + self.test_files = [] + self.artifacts = {} + self.security_violations = [] + self.error_message = None + + +class VMManager: + """ + Manages Docker containers for secure code execution. + + Features: + - Isolated container execution + - Resource limits and timeouts + - Security scanning + - Artifact collection + - Performance monitoring + """ + + def __init__(self, vm_config: VMConfig, security_config: SecurityConfig = None): + """Initialize VM manager.""" + self.config = vm_config + self.security_config = security_config + self.docker_client = None + self.active_containers = {} + + logger.info("VMManager initialized") + + async def initialize(self): + """Initialize the VM manager.""" + try: + # Initialize Docker client + self.docker_client = docker.from_env() + + # Test Docker connectivity + self.docker_client.ping() + + # Pull required images + await self._pull_required_images() + + logger.info("VMManager initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize VMManager: {e}", exc_info=True) + raise + + async def shutdown(self): + """Shutdown the VM manager.""" + try: + # Stop all active containers + for container_id in list(self.active_containers.keys()): + await self._cleanup_container(container_id) + + if self.docker_client: + self.docker_client.close() + + logger.info("VMManager shutdown complete") + + except Exception as e: + logger.error(f"Error during VMManager shutdown: {e}", exc_info=True) + + async def health_check(self) -> bool: + """Perform health check.""" + try: + if not self.docker_client: + return False + + # Test Docker connectivity + self.docker_client.ping() + return True + + except Exception as e: + logger.error(f"VMManager health check failed: {e}") + return False + + async def run_tests(self, push_data: Dict[str, Any]) -> VMTestResult: + """Run tests for a push event.""" + result = VMTestResult() + + try: + # Create test environment + container_id = await self._create_test_container() + + # Prepare test code + test_files = await self._prepare_test_files(push_data) + + # Copy files to container + await self._copy_files_to_container(container_id, test_files) + + # Run tests + result = await self._execute_tests(container_id, "pytest tests/") + + # Collect artifacts + result.artifacts = await self._collect_artifacts(container_id) + + log_audit_event("vm_test_executed", details={ + "repository": push_data.get("repository"), + "commit_sha": push_data.get("commit_sha"), + "success": result.success, + "execution_time": result.execution_time + }) + + except Exception as e: + logger.error(f"Error running tests: {e}", exc_info=True) + result.error_message = str(e) + + finally: + if 'container_id' in locals(): + await self._cleanup_container(container_id) + + return result + + async def run_pr_tests(self, pr_data: Dict[str, Any]) -> VMTestResult: + """Run comprehensive tests for a pull request.""" + result = VMTestResult() + + try: + # Create test environment + container_id = await self._create_test_container() + + # Prepare PR code + test_files = await self._prepare_pr_files(pr_data) + + # Copy files to container + await self._copy_files_to_container(container_id, test_files) + + # Run comprehensive test suite + commands = [ + "pytest tests/ --cov=src/", + "flake8 src/", + "mypy src/", + "bandit -r src/" + ] + + for command in commands: + test_result = await self._execute_tests(container_id, command) + + if not test_result.success: + result.success = False + result.stderr += f"\n{command} failed:\n{test_result.stderr}" + else: + result.stdout += f"\n{command} passed:\n{test_result.stdout}" + + # Check for UI changes + result.has_ui_changes = await self._detect_ui_changes(container_id, pr_data) + + # Collect artifacts + result.artifacts = await self._collect_artifacts(container_id) + + log_audit_event("vm_pr_test_executed", details={ + "repository": pr_data.get("repository"), + "pr_number": pr_data.get("number"), + "success": result.success, + "has_ui_changes": result.has_ui_changes + }) + + except Exception as e: + logger.error(f"Error running PR tests: {e}", exc_info=True) + result.error_message = str(e) + + finally: + if 'container_id' in locals(): + await self._cleanup_container(container_id) + + return result + + async def _create_test_container(self) -> str: + """Create a new test container.""" + try: + # Container configuration + container_config = { + "image": self.config.docker_image, + "detach": True, + "mem_limit": self.config.memory_limit, + "cpu_period": 100000, + "cpu_quota": int(float(self.config.cpu_limit) * 100000), + "network_mode": self.config.network_mode, + "working_dir": "/workspace", + "user": "testuser", + "environment": { + "PYTHONPATH": "/workspace", + "PYTHONUNBUFFERED": "1" + } + } + + # Create container + container = self.docker_client.containers.run(**container_config) + container_id = container.id + + # Track active container + self.active_containers[container_id] = { + "container": container, + "created_at": datetime.utcnow() + } + + logger.debug(f"Created test container: {container_id[:12]}") + return container_id + + except Exception as e: + logger.error(f"Failed to create test container: {e}", exc_info=True) + raise + + async def _prepare_test_files(self, push_data: Dict[str, Any]) -> Dict[str, str]: + """Prepare test files from push data.""" + # This is a simplified implementation + # In a real scenario, you'd fetch the actual files from the repository + + test_files = { + "test_basic.py": """ +import pytest + +def test_basic(): + assert True + +def test_import(): + # Test that we can import common modules + import json + import os + assert True +""", + "requirements.txt": "pytest>=7.4.0\nrequests>=2.31.0" + } + + return test_files + + async def _prepare_pr_files(self, pr_data: Dict[str, Any]) -> Dict[str, str]: + """Prepare files for PR testing.""" + # This would fetch the actual PR files + # For now, return basic test files + return await self._prepare_test_files(pr_data) + + async def _copy_files_to_container(self, container_id: str, files: Dict[str, str]): + """Copy files to container.""" + try: + container = self.active_containers[container_id]["container"] + + # Create tar archive in memory + tar_stream = io.BytesIO() + + with tarfile.open(fileobj=tar_stream, mode='w') as tar: + for file_path, content in files.items(): + # Create file info + file_data = content.encode('utf-8') + tarinfo = tarfile.TarInfo(name=file_path) + tarinfo.size = len(file_data) + tarinfo.mode = 0o644 + + # Add file to tar + tar.addfile(tarinfo, io.BytesIO(file_data)) + + # Copy to container + tar_stream.seek(0) + container.put_archive("/workspace", tar_stream.getvalue()) + + logger.debug(f"Copied {len(files)} files to container {container_id[:12]}") + + except Exception as e: + logger.error(f"Failed to copy files to container: {e}", exc_info=True) + raise + + async def _execute_tests(self, container_id: str, command: str) -> VMTestResult: + """Execute test command in container.""" + result = VMTestResult() + start_time = datetime.utcnow() + + try: + container = self.active_containers[container_id]["container"] + + # Execute command with timeout + exec_result = container.exec_run( + command, + workdir="/workspace", + user="testuser", + environment={"PYTHONPATH": "/workspace"} + ) + + result.exit_code = exec_result.exit_code + result.stdout = exec_result.output.decode('utf-8', errors='ignore') + result.success = exec_result.exit_code == 0 + + # Calculate execution time + result.execution_time = (datetime.utcnow() - start_time).total_seconds() + + # Get memory usage + stats = container.stats(stream=False) + if stats and 'memory_stats' in stats: + result.memory_usage = stats['memory_stats'].get('usage', 0) + + log_performance_metric("vm_test_execution_time", result.execution_time, "seconds") + + logger.debug(f"Executed command in container {container_id[:12]}: {command}") + + except Exception as e: + logger.error(f"Failed to execute tests: {e}", exc_info=True) + result.error_message = str(e) + result.stderr = str(e) + + return result + + async def _detect_ui_changes(self, container_id: str, pr_data: Dict[str, Any]) -> bool: + """Detect if PR contains UI changes.""" + try: + # This is a simplified implementation + # In a real scenario, you'd analyze the changed files + + # Check for common UI file patterns + ui_patterns = ['.html', '.css', '.js', '.jsx', '.vue', '.svelte'] + + # Get PR files (this would come from GitHub API) + changed_files = pr_data.get('changed_files', []) + + for file_path in changed_files: + if any(pattern in file_path.lower() for pattern in ui_patterns): + return True + + return False + + except Exception as e: + logger.error(f"Failed to detect UI changes: {e}", exc_info=True) + return False + + async def _collect_artifacts(self, container_id: str) -> Dict[str, Any]: + """Collect test artifacts from container.""" + artifacts = {} + + try: + container = self.active_containers[container_id]["container"] + + # Common artifact paths + artifact_paths = [ + "/workspace/coverage.xml", + "/workspace/test-results.xml", + "/workspace/logs/", + "/workspace/screenshots/" + ] + + for path in artifact_paths: + try: + # Get archive from container + archive, _ = container.get_archive(path) + + # Extract and store + tar_stream = io.BytesIO() + for chunk in archive: + tar_stream.write(chunk) + + tar_stream.seek(0) + artifacts[path] = tar_stream.getvalue() + + except docker.errors.NotFound: + # Path doesn't exist, skip + continue + + logger.debug(f"Collected {len(artifacts)} artifacts from container {container_id[:12]}") + + except Exception as e: + logger.error(f"Failed to collect artifacts: {e}", exc_info=True) + + return artifacts + + async def _cleanup_container(self, container_id: str): + """Clean up container and resources.""" + try: + if container_id in self.active_containers: + container_info = self.active_containers[container_id] + container = container_info["container"] + + # Stop and remove container + try: + container.stop(timeout=10) + except: + pass # Container might already be stopped + + try: + container.remove(force=True) + except: + pass # Container might already be removed + + # Remove from tracking + del self.active_containers[container_id] + + logger.debug(f"Cleaned up container {container_id[:12]}") + + except Exception as e: + logger.error(f"Failed to cleanup container {container_id}: {e}", exc_info=True) + + async def _pull_required_images(self): + """Pull required Docker images.""" + try: + images_to_pull = [self.config.docker_image] + + for image in images_to_pull: + logger.info(f"Pulling Docker image: {image}") + self.docker_client.images.pull(image) + + logger.info("All required Docker images pulled successfully") + + except Exception as e: + logger.error(f"Failed to pull Docker images: {e}", exc_info=True) + raise + + async def _security_scan_code(self, files: Dict[str, str]) -> List[str]: + """Perform security scanning on code files.""" + violations = [] + + if not self.security_config or not self.security_config.enable_code_scanning: + return violations + + try: + for file_path, content in files.items(): + # Check file extension + file_ext = Path(file_path).suffix + if file_ext not in self.security_config.allowed_file_extensions: + violations.append(f"Disallowed file extension: {file_ext}") + continue + + # Check for blocked commands + content_lower = content.lower() + for blocked_cmd in self.security_config.blocked_commands: + if blocked_cmd.lower() in content_lower: + violations.append(f"Blocked command found: {blocked_cmd}") + + # Check file size + if len(content.encode('utf-8')) > self.security_config.max_file_size_mb * 1024 * 1024: + violations.append(f"File too large: {file_path}") + + except Exception as e: + logger.error(f"Security scan failed: {e}", exc_info=True) + violations.append(f"Security scan error: {e}") + + return violations + + async def cleanup_old_containers(self, max_age_hours: int = 24): + """Clean up old containers that might be stuck.""" + try: + current_time = datetime.utcnow() + containers_to_cleanup = [] + + for container_id, info in self.active_containers.items(): + age = current_time - info["created_at"] + if age > timedelta(hours=max_age_hours): + containers_to_cleanup.append(container_id) + + for container_id in containers_to_cleanup: + await self._cleanup_container(container_id) + logger.warning(f"Cleaned up old container: {container_id[:12]}") + + return len(containers_to_cleanup) + + except Exception as e: + logger.error(f"Failed to cleanup old containers: {e}", exc_info=True) + return 0 + + def get_vm_status(self) -> Dict[str, Any]: + """Get current VM manager status.""" + return { + "active_containers": len(self.active_containers), + "docker_connected": self.docker_client is not None, + "config": { + "docker_image": self.config.docker_image, + "memory_limit": self.config.memory_limit, + "cpu_limit": self.config.cpu_limit, + "timeout_seconds": self.config.timeout_seconds, + "network_mode": self.config.network_mode + }, + "container_details": [ + { + "id": cid[:12], + "created_at": info["created_at"].isoformat(), + "age_minutes": (datetime.utcnow() - info["created_at"]).total_seconds() / 60 + } + for cid, info in self.active_containers.items() + ] + } \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..ed69260 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,5 @@ +""" +Utility modules for the OmniBlocks Proactive AI Bot. + +This package contains shared utilities and helper functions. +""" \ No newline at end of file diff --git a/src/utils/logging.py b/src/utils/logging.py new file mode 100644 index 0000000..811b35c --- /dev/null +++ b/src/utils/logging.py @@ -0,0 +1,242 @@ +""" +Logging configuration for the OmniBlocks Proactive AI Bot. + +This module sets up structured logging with multiple outputs and +appropriate formatting for different environments. +""" + +import logging +import logging.handlers +import os +import sys +from datetime import datetime +from pathlib import Path +import json + + +class JSONFormatter(logging.Formatter): + """JSON formatter for structured logging.""" + + def format(self, record): + """Format log record as JSON.""" + log_entry = { + "timestamp": datetime.utcfromtimestamp(record.created).isoformat() + "Z", + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "module": record.module, + "function": record.funcName, + "line": record.lineno + } + + # Add exception info if present + if record.exc_info: + log_entry["exception"] = self.formatException(record.exc_info) + + # Add extra fields + for key, value in record.__dict__.items(): + if key not in ["name", "msg", "args", "levelname", "levelno", "pathname", + "filename", "module", "lineno", "funcName", "created", + "msecs", "relativeCreated", "thread", "threadName", + "processName", "process", "getMessage", "exc_info", + "exc_text", "stack_info"]: + log_entry[key] = value + + return json.dumps(log_entry) + + +def setup_logging(log_level: str = None, log_dir: str = None, enable_json: bool = None): + """ + Setup logging configuration for the bot. + + Args: + log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + log_dir: Directory for log files + enable_json: Whether to use JSON formatting + """ + # Get configuration from environment if not provided + if log_level is None: + log_level = os.getenv("LOG_LEVEL", "INFO").upper() + + if log_dir is None: + log_dir = os.getenv("LOG_DIR", "logs") + + if enable_json is None: + enable_json = os.getenv("LOG_JSON", "false").lower() == "true" + + # Create log directory + log_path = Path(log_dir) + log_path.mkdir(exist_ok=True) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, log_level)) + + # Clear existing handlers + root_logger.handlers.clear() + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(getattr(logging, log_level)) + + if enable_json: + console_formatter = JSONFormatter() + else: + console_formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + + console_handler.setFormatter(console_formatter) + root_logger.addHandler(console_handler) + + # File handler for general logs + file_handler = logging.handlers.RotatingFileHandler( + log_path / "bot.log", + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + file_handler.setLevel(logging.INFO) + + if enable_json: + file_formatter = JSONFormatter() + else: + file_formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + + file_handler.setFormatter(file_formatter) + root_logger.addHandler(file_handler) + + # Error file handler + error_handler = logging.handlers.RotatingFileHandler( + log_path / "error.log", + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(file_formatter) + root_logger.addHandler(error_handler) + + # Audit log handler for security events + audit_handler = logging.handlers.RotatingFileHandler( + log_path / "audit.log", + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=10 + ) + audit_handler.setLevel(logging.INFO) + audit_handler.setFormatter(JSONFormatter()) # Always use JSON for audit logs + + # Create audit logger + audit_logger = logging.getLogger("audit") + audit_logger.addHandler(audit_handler) + audit_logger.setLevel(logging.INFO) + audit_logger.propagate = False + + # Performance log handler + perf_handler = logging.handlers.RotatingFileHandler( + log_path / "performance.log", + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + perf_handler.setLevel(logging.INFO) + perf_handler.setFormatter(JSONFormatter()) + + # Create performance logger + perf_logger = logging.getLogger("performance") + perf_logger.addHandler(perf_handler) + perf_logger.setLevel(logging.INFO) + perf_logger.propagate = False + + # Suppress noisy third-party loggers + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("requests").setLevel(logging.WARNING) + logging.getLogger("docker").setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) + + logging.info(f"Logging configured - Level: {log_level}, JSON: {enable_json}, Dir: {log_dir}") + + +def get_audit_logger(): + """Get the audit logger for security events.""" + return logging.getLogger("audit") + + +def get_performance_logger(): + """Get the performance logger for metrics.""" + return logging.getLogger("performance") + + +def log_audit_event(event_type: str, user_id: str = None, details: dict = None): + """Log an audit event.""" + audit_logger = get_audit_logger() + + audit_data = { + "event_type": event_type, + "timestamp": datetime.utcnow().isoformat() + "Z", + "user_id": user_id, + "details": details or {} + } + + audit_logger.info("Audit event", extra=audit_data) + + +def log_performance_metric(metric_name: str, value: float, unit: str = None, + tags: dict = None): + """Log a performance metric.""" + perf_logger = get_performance_logger() + + metric_data = { + "metric_name": metric_name, + "value": value, + "unit": unit, + "tags": tags or {}, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + perf_logger.info("Performance metric", extra=metric_data) + + +class ContextLogger: + """Logger with additional context information.""" + + def __init__(self, logger_name: str, context: dict = None): + """Initialize context logger.""" + self.logger = logging.getLogger(logger_name) + self.context = context or {} + + def _log_with_context(self, level: int, message: str, *args, **kwargs): + """Log message with context.""" + extra = kwargs.get("extra", {}) + extra.update(self.context) + kwargs["extra"] = extra + + self.logger.log(level, message, *args, **kwargs) + + def debug(self, message: str, *args, **kwargs): + """Log debug message with context.""" + self._log_with_context(logging.DEBUG, message, *args, **kwargs) + + def info(self, message: str, *args, **kwargs): + """Log info message with context.""" + self._log_with_context(logging.INFO, message, *args, **kwargs) + + def warning(self, message: str, *args, **kwargs): + """Log warning message with context.""" + self._log_with_context(logging.WARNING, message, *args, **kwargs) + + def error(self, message: str, *args, **kwargs): + """Log error message with context.""" + self._log_with_context(logging.ERROR, message, *args, **kwargs) + + def critical(self, message: str, *args, **kwargs): + """Log critical message with context.""" + self._log_with_context(logging.CRITICAL, message, *args, **kwargs) + + def add_context(self, **kwargs): + """Add context information.""" + self.context.update(kwargs) + + def remove_context(self, *keys): + """Remove context keys.""" + for key in keys: + self.context.pop(key, None) \ No newline at end of file diff --git a/src/utils/state_manager.py b/src/utils/state_manager.py new file mode 100644 index 0000000..4495e32 --- /dev/null +++ b/src/utils/state_manager.py @@ -0,0 +1,331 @@ +""" +State management for the OmniBlocks Proactive AI Bot. + +This module handles persistent state storage and retrieval +for maintaining bot state across restarts. +""" + +import asyncio +import json +import logging +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional +import aiofiles +import pickle + +logger = logging.getLogger(__name__) + + +class StateManager: + """ + Manages persistent state for the bot. + + Features: + - JSON and binary state storage + - Atomic writes with backup + - State versioning + - Automatic cleanup of old states + """ + + def __init__(self, state_dir: str = None): + """Initialize state manager.""" + self.state_dir = Path(state_dir or os.getenv("STATE_DIR", "state")) + self.state_dir.mkdir(exist_ok=True) + + self.state_file = self.state_dir / "bot_state.json" + self.backup_file = self.state_dir / "bot_state.backup.json" + self.binary_state_file = self.state_dir / "bot_state.pkl" + + self._lock = asyncio.Lock() + + logger.info(f"StateManager initialized with directory: {self.state_dir}") + + async def save_state(self, state: Dict[str, Any], backup: bool = True) -> bool: + """ + Save bot state to persistent storage. + + Args: + state: State dictionary to save + backup: Whether to create backup of previous state + + Returns: + True if successful, False otherwise + """ + async with self._lock: + try: + # Add metadata + state_with_meta = { + "timestamp": datetime.utcnow().isoformat(), + "version": "1.0", + "data": state + } + + # Create backup if requested and file exists + if backup and self.state_file.exists(): + await self._create_backup() + + # Write to temporary file first (atomic write) + temp_file = self.state_file.with_suffix(".tmp") + + async with aiofiles.open(temp_file, 'w') as f: + await f.write(json.dumps(state_with_meta, indent=2, default=str)) + + # Atomic move + temp_file.replace(self.state_file) + + logger.info("Bot state saved successfully") + return True + + except Exception as e: + logger.error(f"Failed to save state: {e}", exc_info=True) + return False + + async def load_state(self) -> Optional[Dict[str, Any]]: + """ + Load bot state from persistent storage. + + Returns: + State dictionary if successful, None otherwise + """ + async with self._lock: + try: + if not self.state_file.exists(): + logger.info("No state file found") + return None + + async with aiofiles.open(self.state_file, 'r') as f: + content = await f.read() + + state_with_meta = json.loads(content) + + # Validate state format + if not isinstance(state_with_meta, dict) or "data" not in state_with_meta: + logger.warning("Invalid state file format") + return None + + logger.info(f"Loaded state from {state_with_meta.get('timestamp', 'unknown time')}") + return state_with_meta["data"] + + except Exception as e: + logger.error(f"Failed to load state: {e}", exc_info=True) + + # Try to load from backup + return await self._load_from_backup() + + async def save_binary_state(self, state: Any, key: str = "default") -> bool: + """ + Save binary state using pickle. + + Args: + state: Any Python object to save + key: Key to identify the state + + Returns: + True if successful, False otherwise + """ + async with self._lock: + try: + binary_file = self.state_dir / f"{key}_state.pkl" + temp_file = binary_file.with_suffix(".tmp") + + # Use asyncio to run pickle in thread pool + def pickle_dump(): + with open(temp_file, 'wb') as f: + pickle.dump({ + "timestamp": datetime.utcnow(), + "data": state + }, f) + + await asyncio.get_event_loop().run_in_executor(None, pickle_dump) + + # Atomic move + temp_file.replace(binary_file) + + logger.debug(f"Binary state saved for key: {key}") + return True + + except Exception as e: + logger.error(f"Failed to save binary state for {key}: {e}", exc_info=True) + return False + + async def load_binary_state(self, key: str = "default") -> Optional[Any]: + """ + Load binary state using pickle. + + Args: + key: Key to identify the state + + Returns: + Loaded object if successful, None otherwise + """ + async with self._lock: + try: + binary_file = self.state_dir / f"{key}_state.pkl" + + if not binary_file.exists(): + return None + + def pickle_load(): + with open(binary_file, 'rb') as f: + return pickle.load(f) + + state_with_meta = await asyncio.get_event_loop().run_in_executor( + None, pickle_load + ) + + logger.debug(f"Binary state loaded for key: {key}") + return state_with_meta.get("data") + + except Exception as e: + logger.error(f"Failed to load binary state for {key}: {e}", exc_info=True) + return None + + async def delete_state(self, key: str = None) -> bool: + """ + Delete state files. + + Args: + key: Specific key to delete, or None for all states + + Returns: + True if successful, False otherwise + """ + async with self._lock: + try: + if key is None: + # Delete all state files + for file_path in self.state_dir.glob("*_state.*"): + file_path.unlink() + + if self.state_file.exists(): + self.state_file.unlink() + + if self.backup_file.exists(): + self.backup_file.unlink() + + logger.info("All state files deleted") + else: + # Delete specific key + binary_file = self.state_dir / f"{key}_state.pkl" + if binary_file.exists(): + binary_file.unlink() + + logger.info(f"State deleted for key: {key}") + + return True + + except Exception as e: + logger.error(f"Failed to delete state: {e}", exc_info=True) + return False + + async def get_state_info(self) -> Dict[str, Any]: + """Get information about stored states.""" + info = { + "state_directory": str(self.state_dir), + "main_state_exists": self.state_file.exists(), + "backup_exists": self.backup_file.exists(), + "binary_states": [], + "total_size_bytes": 0 + } + + try: + # Check main state file + if self.state_file.exists(): + stat = self.state_file.stat() + info["main_state_size"] = stat.st_size + info["main_state_modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat() + info["total_size_bytes"] += stat.st_size + + # Check backup file + if self.backup_file.exists(): + stat = self.backup_file.stat() + info["backup_size"] = stat.st_size + info["backup_modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat() + info["total_size_bytes"] += stat.st_size + + # Check binary state files + for file_path in self.state_dir.glob("*_state.pkl"): + stat = file_path.stat() + key = file_path.stem.replace("_state", "") + + info["binary_states"].append({ + "key": key, + "size_bytes": stat.st_size, + "modified": datetime.fromtimestamp(stat.st_mtime).isoformat() + }) + info["total_size_bytes"] += stat.st_size + + except Exception as e: + logger.error(f"Failed to get state info: {e}", exc_info=True) + info["error"] = str(e) + + return info + + async def cleanup_old_states(self, keep_days: int = 7) -> int: + """ + Clean up old state files. + + Args: + keep_days: Number of days to keep files + + Returns: + Number of files deleted + """ + deleted_count = 0 + cutoff_time = datetime.utcnow().timestamp() - (keep_days * 24 * 3600) + + try: + for file_path in self.state_dir.glob("*.backup.*"): + if file_path.stat().st_mtime < cutoff_time: + file_path.unlink() + deleted_count += 1 + logger.debug(f"Deleted old state file: {file_path}") + + logger.info(f"Cleaned up {deleted_count} old state files") + + except Exception as e: + logger.error(f"Failed to cleanup old states: {e}", exc_info=True) + + return deleted_count + + async def _create_backup(self): + """Create backup of current state file.""" + try: + if self.state_file.exists(): + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + backup_name = f"bot_state.backup.{timestamp}.json" + backup_path = self.state_dir / backup_name + + # Copy current state to backup + async with aiofiles.open(self.state_file, 'r') as src: + content = await src.read() + + async with aiofiles.open(backup_path, 'w') as dst: + await dst.write(content) + + logger.debug(f"Created state backup: {backup_name}") + + except Exception as e: + logger.error(f"Failed to create backup: {e}", exc_info=True) + + async def _load_from_backup(self) -> Optional[Dict[str, Any]]: + """Try to load state from backup file.""" + try: + if not self.backup_file.exists(): + return None + + async with aiofiles.open(self.backup_file, 'r') as f: + content = await f.read() + + state_with_meta = json.loads(content) + + if isinstance(state_with_meta, dict) and "data" in state_with_meta: + logger.warning("Loaded state from backup file") + return state_with_meta["data"] + + except Exception as e: + logger.error(f"Failed to load from backup: {e}", exc_info=True) + + return None \ No newline at end of file diff --git a/src/vision/__init__.py b/src/vision/__init__.py new file mode 100644 index 0000000..c2a7533 --- /dev/null +++ b/src/vision/__init__.py @@ -0,0 +1,6 @@ +""" +Vision and multimodal capabilities module for the OmniBlocks Proactive AI Bot. + +This module provides computer vision, screenshot capture, UI interaction, +and visual analysis capabilities. +""" \ No newline at end of file diff --git a/src/vision/vision_engine.py b/src/vision/vision_engine.py new file mode 100644 index 0000000..8481b31 --- /dev/null +++ b/src/vision/vision_engine.py @@ -0,0 +1,519 @@ +""" +Vision Engine for screenshot capture, UI interaction, and visual analysis. + +This module provides comprehensive computer vision capabilities including +screenshot capture, UI element detection, visual comparison, and OCR. +""" + +import asyncio +import logging +import tempfile +import base64 +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple +import cv2 +import numpy as np +from PIL import Image, ImageDraw, ImageFont +import pytesseract + +from ..core.config import VisionConfig +from ..utils.logging import log_performance_metric + +logger = logging.getLogger(__name__) + + +class ScreenshotResult: + """Represents a screenshot capture result.""" + + def __init__(self): + self.success = False + self.image_data = None + self.image_path = None + self.width = 0 + self.height = 0 + self.format = "PNG" + self.size_bytes = 0 + self.timestamp = datetime.utcnow() + self.error_message = None + + +class UIElement: + """Represents a detected UI element.""" + + def __init__(self, x: int, y: int, width: int, height: int, + element_type: str, text: str = "", confidence: float = 0.0): + self.x = x + self.y = y + self.width = width + self.height = height + self.element_type = element_type + self.text = text + self.confidence = confidence + self.center_x = x + width // 2 + self.center_y = y + height // 2 + + +class VisionEngine: + """ + Computer vision engine for UI interaction and analysis. + + Features: + - Screenshot capture + - UI element detection + - Visual comparison and diff + - OCR text extraction + - Image annotation + """ + + def __init__(self, config: VisionConfig): + """Initialize vision engine.""" + self.config = config + self.screenshot_dir = Path("screenshots") + self.screenshot_dir.mkdir(exist_ok=True) + + # Initialize OCR if enabled + self.ocr_enabled = config.enable_ocr + + logger.info("VisionEngine initialized") + + async def initialize(self): + """Initialize the vision engine.""" + try: + # Test OCR if enabled + if self.ocr_enabled: + await self._test_ocr() + + logger.info("VisionEngine initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize VisionEngine: {e}", exc_info=True) + raise + + async def shutdown(self): + """Shutdown the vision engine.""" + logger.info("VisionEngine shutdown complete") + + async def health_check(self) -> bool: + """Perform health check.""" + try: + # Test basic image operations + test_image = np.zeros((100, 100, 3), dtype=np.uint8) + pil_image = Image.fromarray(test_image) + + # Test OCR if enabled + if self.ocr_enabled: + pytesseract.image_to_string(pil_image) + + return True + + except Exception as e: + logger.error(f"VisionEngine health check failed: {e}") + return False + + async def capture_screenshot(self, container_id: str = None, + display: str = ":0") -> ScreenshotResult: + """Capture screenshot from container or display.""" + result = ScreenshotResult() + + try: + if container_id: + # Capture from Docker container + result = await self._capture_container_screenshot(container_id) + else: + # Capture from local display (for testing) + result = await self._capture_display_screenshot(display) + + if result.success: + log_performance_metric("screenshot_captured", 1, "count") + + except Exception as e: + logger.error(f"Failed to capture screenshot: {e}", exc_info=True) + result.error_message = str(e) + + return result + + async def capture_ui_screenshots(self, test_result: Dict[str, Any]) -> List[ScreenshotResult]: + """Capture multiple UI screenshots during testing.""" + screenshots = [] + + try: + # This would integrate with the VM testing environment + # For now, return a placeholder + + container_id = test_result.get("container_id") + if container_id: + # Capture before and after screenshots + before_screenshot = await self.capture_screenshot(container_id) + screenshots.append(before_screenshot) + + # Wait a moment for UI changes + await asyncio.sleep(2) + + after_screenshot = await self.capture_screenshot(container_id) + screenshots.append(after_screenshot) + + except Exception as e: + logger.error(f"Failed to capture UI screenshots: {e}", exc_info=True) + + return screenshots + + async def detect_ui_elements(self, image_data: bytes) -> List[UIElement]: + """Detect UI elements in an image.""" + elements = [] + + try: + # Convert image data to OpenCV format + nparr = np.frombuffer(image_data, np.uint8) + image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + + if image is None: + raise ValueError("Invalid image data") + + # Convert to grayscale for processing + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + + # Detect buttons using template matching or contour detection + elements.extend(await self._detect_buttons(gray)) + + # Detect text fields + elements.extend(await self._detect_text_fields(gray)) + + # Detect clickable areas + elements.extend(await self._detect_clickable_areas(gray)) + + logger.debug(f"Detected {len(elements)} UI elements") + + except Exception as e: + logger.error(f"Failed to detect UI elements: {e}", exc_info=True) + + return elements + + async def extract_text_ocr(self, image_data: bytes) -> str: + """Extract text from image using OCR.""" + if not self.ocr_enabled: + return "" + + try: + # Convert image data to PIL Image + image = Image.open(io.BytesIO(image_data)) + + # Perform OCR + text = pytesseract.image_to_string(image, lang='eng') + + logger.debug(f"Extracted {len(text)} characters via OCR") + return text.strip() + + except Exception as e: + logger.error(f"OCR extraction failed: {e}", exc_info=True) + return "" + + async def compare_images(self, image1_data: bytes, image2_data: bytes) -> Dict[str, Any]: + """Compare two images and return difference analysis.""" + comparison_result = { + "similarity": 0.0, + "difference_percentage": 100.0, + "has_significant_changes": True, + "diff_image_data": None, + "changed_regions": [] + } + + try: + # Convert images to OpenCV format + nparr1 = np.frombuffer(image1_data, np.uint8) + nparr2 = np.frombuffer(image2_data, np.uint8) + + img1 = cv2.imdecode(nparr1, cv2.IMREAD_COLOR) + img2 = cv2.imdecode(nparr2, cv2.IMREAD_COLOR) + + if img1 is None or img2 is None: + raise ValueError("Invalid image data for comparison") + + # Resize images to same size if needed + if img1.shape != img2.shape: + height = min(img1.shape[0], img2.shape[0]) + width = min(img1.shape[1], img2.shape[1]) + img1 = cv2.resize(img1, (width, height)) + img2 = cv2.resize(img2, (width, height)) + + # Calculate structural similarity + gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) + + # Simple pixel-wise comparison + diff = cv2.absdiff(gray1, gray2) + non_zero_count = np.count_nonzero(diff) + total_pixels = diff.shape[0] * diff.shape[1] + + difference_percentage = (non_zero_count / total_pixels) * 100 + similarity = 100 - difference_percentage + + comparison_result.update({ + "similarity": similarity, + "difference_percentage": difference_percentage, + "has_significant_changes": difference_percentage > self.config.visual_diff_threshold * 100 + }) + + # Create difference image + diff_colored = cv2.applyColorMap(diff, cv2.COLORMAP_JET) + _, diff_encoded = cv2.imencode('.png', diff_colored) + comparison_result["diff_image_data"] = diff_encoded.tobytes() + + logger.debug(f"Image comparison: {similarity:.2f}% similarity") + + except Exception as e: + logger.error(f"Image comparison failed: {e}", exc_info=True) + + return comparison_result + + async def annotate_image(self, image_data: bytes, annotations: List[Dict]) -> bytes: + """Add annotations to an image.""" + try: + # Convert to PIL Image + image = Image.open(io.BytesIO(image_data)) + draw = ImageDraw.Draw(image) + + # Try to load a font + try: + font = ImageFont.truetype("arial.ttf", 16) + except: + font = ImageFont.load_default() + + for annotation in annotations: + ann_type = annotation.get("type", "text") + + if ann_type == "text": + x = annotation.get("x", 0) + y = annotation.get("y", 0) + text = annotation.get("text", "") + color = annotation.get("color", "red") + + draw.text((x, y), text, fill=color, font=font) + + elif ann_type == "rectangle": + x1 = annotation.get("x1", 0) + y1 = annotation.get("y1", 0) + x2 = annotation.get("x2", 100) + y2 = annotation.get("y2", 100) + color = annotation.get("color", "red") + width = annotation.get("width", 2) + + draw.rectangle([x1, y1, x2, y2], outline=color, width=width) + + elif ann_type == "circle": + x = annotation.get("x", 0) + y = annotation.get("y", 0) + radius = annotation.get("radius", 10) + color = annotation.get("color", "red") + width = annotation.get("width", 2) + + draw.ellipse([x-radius, y-radius, x+radius, y+radius], + outline=color, width=width) + + # Convert back to bytes + output = io.BytesIO() + image.save(output, format='PNG') + return output.getvalue() + + except Exception as e: + logger.error(f"Image annotation failed: {e}", exc_info=True) + return image_data # Return original if annotation fails + + async def _capture_container_screenshot(self, container_id: str) -> ScreenshotResult: + """Capture screenshot from Docker container.""" + result = ScreenshotResult() + + try: + # This would require X11 forwarding or VNC in the container + # For now, create a placeholder image + + # Create a simple test image + width, height = self.config.max_screenshot_size + image = Image.new('RGB', (width, height), color='lightblue') + draw = ImageDraw.Draw(image) + + # Add some test content + draw.text((50, 50), f"Container: {container_id[:12]}", fill='black') + draw.text((50, 100), f"Timestamp: {datetime.utcnow().isoformat()}", fill='black') + draw.rectangle([100, 150, 300, 250], outline='red', width=2) + + # Save to bytes + output = io.BytesIO() + image.save(output, format='PNG', quality=self.config.screenshot_quality) + image_data = output.getvalue() + + # Save to file + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + filename = f"container_{container_id[:12]}_{timestamp}.png" + image_path = self.screenshot_dir / filename + + with open(image_path, 'wb') as f: + f.write(image_data) + + result.success = True + result.image_data = image_data + result.image_path = str(image_path) + result.width = width + result.height = height + result.size_bytes = len(image_data) + + logger.debug(f"Captured container screenshot: {filename}") + + except Exception as e: + logger.error(f"Container screenshot capture failed: {e}", exc_info=True) + result.error_message = str(e) + + return result + + async def _capture_display_screenshot(self, display: str) -> ScreenshotResult: + """Capture screenshot from display (for testing).""" + result = ScreenshotResult() + + try: + # Create a test image since we can't capture real display in this environment + width, height = self.config.max_screenshot_size + image = Image.new('RGB', (width, height), color='white') + draw = ImageDraw.Draw(image) + + # Add test content + draw.text((50, 50), f"Display: {display}", fill='black') + draw.text((50, 100), f"Timestamp: {datetime.utcnow().isoformat()}", fill='black') + + # Save to bytes + output = io.BytesIO() + image.save(output, format='PNG', quality=self.config.screenshot_quality) + image_data = output.getvalue() + + # Save to file + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + filename = f"display_{display.replace(':', '')}_{timestamp}.png" + image_path = self.screenshot_dir / filename + + with open(image_path, 'wb') as f: + f.write(image_data) + + result.success = True + result.image_data = image_data + result.image_path = str(image_path) + result.width = width + result.height = height + result.size_bytes = len(image_data) + + except Exception as e: + logger.error(f"Display screenshot capture failed: {e}", exc_info=True) + result.error_message = str(e) + + return result + + async def _detect_buttons(self, gray_image: np.ndarray) -> List[UIElement]: + """Detect button-like elements in grayscale image.""" + elements = [] + + try: + # Use edge detection to find rectangular shapes + edges = cv2.Canny(gray_image, 50, 150) + contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + for contour in contours: + # Get bounding rectangle + x, y, w, h = cv2.boundingRect(contour) + + # Filter by size (buttons are usually medium-sized rectangles) + if 20 < w < 200 and 10 < h < 50: + area = cv2.contourArea(contour) + rect_area = w * h + + # Check if contour is roughly rectangular + if area / rect_area > 0.7: + element = UIElement(x, y, w, h, "button", confidence=0.7) + elements.append(element) + + except Exception as e: + logger.error(f"Button detection failed: {e}", exc_info=True) + + return elements + + async def _detect_text_fields(self, gray_image: np.ndarray) -> List[UIElement]: + """Detect text field elements in grayscale image.""" + elements = [] + + try: + # Use morphological operations to find text field shapes + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (20, 5)) + morph = cv2.morphologyEx(gray_image, cv2.MORPH_CLOSE, kernel) + + contours, _ = cv2.findContours(morph, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + for contour in contours: + x, y, w, h = cv2.boundingRect(contour) + + # Text fields are usually wide and not too tall + if w > 50 and 15 < h < 40 and w / h > 3: + element = UIElement(x, y, w, h, "text_field", confidence=0.6) + elements.append(element) + + except Exception as e: + logger.error(f"Text field detection failed: {e}", exc_info=True) + + return elements + + async def _detect_clickable_areas(self, gray_image: np.ndarray) -> List[UIElement]: + """Detect general clickable areas in grayscale image.""" + elements = [] + + try: + # Use corner detection to find interactive elements + corners = cv2.goodFeaturesToTrack(gray_image, maxCorners=100, + qualityLevel=0.01, minDistance=10) + + if corners is not None: + for corner in corners: + x, y = corner.ravel().astype(int) + + # Create small clickable area around corner + element = UIElement(x-5, y-5, 10, 10, "clickable", confidence=0.5) + elements.append(element) + + except Exception as e: + logger.error(f"Clickable area detection failed: {e}", exc_info=True) + + return elements + + async def _test_ocr(self): + """Test OCR functionality.""" + try: + # Create test image with text + image = Image.new('RGB', (200, 50), color='white') + draw = ImageDraw.Draw(image) + draw.text((10, 10), "Test OCR", fill='black') + + # Test OCR + text = pytesseract.image_to_string(image) + + if "test" not in text.lower(): + logger.warning("OCR test did not recognize test text") + else: + logger.info("OCR test successful") + + except Exception as e: + logger.error(f"OCR test failed: {e}", exc_info=True) + raise + + def get_vision_status(self) -> Dict[str, Any]: + """Get current vision engine status.""" + return { + "ocr_enabled": self.ocr_enabled, + "screenshot_dir": str(self.screenshot_dir), + "config": { + "screenshot_quality": self.config.screenshot_quality, + "max_screenshot_size": self.config.max_screenshot_size, + "ui_interaction_timeout": self.config.ui_interaction_timeout, + "visual_diff_threshold": self.config.visual_diff_threshold, + "enable_ocr": self.config.enable_ocr + }, + "screenshot_count": len(list(self.screenshot_dir.glob("*.png"))) + } + + +# Add missing import +import io \ No newline at end of file