diff --git a/CHANGELOG.md b/CHANGELOG.md index 30a0ad0..08368ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,99 @@ All notable changes to NextMCP will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.0] - 2025-11-04 + +### Added + +#### Authentication & Authorization System +- **Complete Auth Framework** (`nextmcp/auth/`): Production-ready authentication and authorization + - `AuthContext`: Authentication context with user info, roles, and permissions + - `AuthProvider`: Base class for authentication strategies + - `AuthResult`: Authentication result with success/failure and context + - `Permission`: Fine-grained permission model with wildcard support + - `Role`: Role class with permission collections + +- **Built-in Auth Providers**: + - **APIKeyProvider**: API key authentication with role/permission mapping + - Pre-configured key validation + - Custom validation function support + - Secure key generation utility + - **JWTProvider**: JSON Web Token authentication + - HS256/RS256 algorithm support + - Automatic expiration validation + - Token creation and verification + - Requires PyJWT library + - **SessionProvider**: Session-based authentication + - In-memory session storage + - Automatic session expiration + - Session creation and destruction + - Expired session cleanup + +- **RBAC System** (`nextmcp/auth/rbac.py`): + - `RBAC` class for role and permission management + - Define custom permissions and roles + - Assign permissions to roles + - Check and require permissions/roles + - Load configuration from dictionaries + - Export configuration to dictionaries + - `PermissionDeniedError` exception + +- **Auth Middleware Decorators**: + - `@requires_auth` / `@requires_auth_async`: Require authentication + - `@requires_role` / `@requires_role_async`: Require specific roles + - `@requires_permission` / `@requires_permission_async`: Require specific permissions + - Auth context injected as first parameter to protected tools + - Supports middleware stacking + +- **Permission Features**: + - Exact permission matching (`read:posts`) + - Wildcard permissions (`admin:*`, `*`) + - Resource-scoped permissions + - Permission inheritance through roles + +#### Examples +- **API Key Auth Example** (`examples/auth_api_key/`): + - 3 pre-configured API keys (admin, user, viewer) + - Role-based access control demonstration + - Public and protected tools + - Comprehensive README + +- **JWT Auth Example** (`examples/auth_jwt/`): + - Login endpoint with JWT token generation + - Token expiration handling + - Token generation utility script + - Role-based access demonstration + +- **RBAC Example** (`examples/auth_rbac/`): + - Fine-grained permission control + - Permission wildcards demonstration + - RBAC configuration loading + - Permission-based access control + +#### Tests +- **Auth Provider Tests** (`tests/test_auth_providers.py`): 26 tests + - APIKeyProvider: initialization, authentication, validation, key generation + - JWTProvider: token creation, verification, expiration, custom claims + - SessionProvider: session management, expiration, cleanup + +- **RBAC Tests** (`tests/test_rbac.py`): 36 tests + - Permission: creation, matching, wildcards, hashing + - Role: creation, permission management + - AuthContext: role and permission checking + - RBAC: configuration loading, permission checking, access control + - PermissionDeniedError + +### Changed +- **Main Exports** (`nextmcp/__init__.py`): + - Added all auth classes and functions to public API + - 15 new authentication-related exports + +### Notes +- **100% Backward Compatible**: All 235 existing tests pass +- **62 New Tests**: Comprehensive auth system coverage +- **297 Total Tests**: All passing +- **Optional Dependency**: PyJWT required only for JWT provider + ## [0.3.0] - 2025-11-04 ### Added diff --git a/README.md b/README.md index 3561477..4c4b8ad 100644 --- a/README.md +++ b/README.md @@ -398,6 +398,375 @@ def legacy_tool(): See `examples/blog_server/` for a complete convention-based project. +## Authentication & Authorization + +NextMCP v0.4.0 introduces a comprehensive authentication and authorization system inspired by next-auth, adapted for the Model Context Protocol. + +### Why Authentication for MCP? + +MCP servers often need to: +- **Protect sensitive tools** from unauthorized access +- **Implement role-based access** (admin, user, viewer) +- **Track who performed actions** for audit logs +- **Integrate with existing auth systems** (API keys, JWT, OAuth) + +### Quick Start + +#### API Key Authentication + +The simplest way to protect your tools: + +```python +from nextmcp import NextMCP +from nextmcp.auth import APIKeyProvider, AuthContext, requires_auth_async + +app = NextMCP("secure-server") + +# Configure API key provider +api_key_provider = APIKeyProvider( + valid_keys={ + "admin-key-123": { + "user_id": "admin1", + "username": "admin", + "roles": ["admin"], + "permissions": ["read:*", "write:*"], + }, + "user-key-456": { + "user_id": "user1", + "username": "alice", + "roles": ["user"], + "permissions": ["read:posts"], + } + } +) + +# Protected tool - requires authentication +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def protected_tool(auth: AuthContext, data: str) -> dict: + """Only authenticated users can access this.""" + return { + "message": f"Hello {auth.username}", + "data": data, + "user_id": auth.user_id + } +``` + +#### JWT Token Authentication + +For stateless token-based auth: + +```python +from nextmcp.auth import JWTProvider + +# Configure JWT provider +jwt_provider = JWTProvider( + secret_key="your-secret-key", + algorithm="HS256", + verify_exp=True +) + +# Login endpoint that generates tokens +@app.tool() +async def login(username: str, password: str) -> dict: + """Login and receive a JWT token.""" + # Validate credentials (check database, etc.) + + # Generate token + token = jwt_provider.create_token( + user_id=f"user_{username}", + roles=["user"], + permissions=["read:posts", "write:posts"], + username=username, + expires_in=3600 # 1 hour + ) + + return {"token": token, "expires_in": 3600} + +# Use the token for authentication +@app.tool() +@requires_auth_async(provider=jwt_provider) +async def secure_action(auth: AuthContext) -> dict: + """Requires valid JWT token.""" + return {"user": auth.username, "action": "performed"} +``` + +### Built-in Auth Providers + +NextMCP includes three production-ready authentication providers: + +| Provider | Use Case | Features | +|----------|----------|----------| +| **APIKeyProvider** | Simple API key auth | Pre-configured keys, custom validators, secure generation | +| **JWTProvider** | Token-based auth | Automatic expiration, signature verification, stateless | +| **SessionProvider** | Session-based auth | In-memory sessions, automatic cleanup, session management | + +### Role-Based Access Control (RBAC) + +Control access based on user roles: + +```python +from nextmcp.auth import requires_role_async + +# Only admins can access this tool +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("admin") +async def admin_tool(auth: AuthContext) -> dict: + """Admin-only functionality.""" + return {"action": "admin action performed"} + +# Users or admins can access +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("user", "admin") # Either role works +async def user_tool(auth: AuthContext) -> dict: + """User or admin can access.""" + return {"action": "user action"} +``` + +### Permission-Based Access Control + +Fine-grained control with specific permissions: + +```python +from nextmcp.auth import RBAC, requires_permission_async + +# Set up RBAC system +rbac = RBAC() + +# Define permissions +rbac.define_permission("read:posts", "Read blog posts") +rbac.define_permission("write:posts", "Create and edit posts") +rbac.define_permission("delete:posts", "Delete posts") + +# Define roles with permissions +rbac.define_role("viewer", "Read-only access") +rbac.assign_permission_to_role("viewer", "read:posts") + +rbac.define_role("editor", "Full content management") +rbac.assign_permission_to_role("editor", "read:posts") +rbac.assign_permission_to_role("editor", "write:posts") +rbac.assign_permission_to_role("editor", "delete:posts") + +# Require specific permission +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("write:posts") +async def create_post(auth: AuthContext, title: str) -> dict: + """Requires write:posts permission.""" + return {"status": "created", "title": title} + +# Multiple permissions (user needs at least one) +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("admin:posts", "delete:posts") +async def delete_post(auth: AuthContext, post_id: int) -> dict: + """Requires admin:posts OR delete:posts permission.""" + return {"status": "deleted", "post_id": post_id} +``` + +### Permission Wildcards + +Support for wildcard permissions: + +```python +# Admin with wildcard - matches ALL permissions +rbac.define_role("admin", "Full access") +rbac.assign_permission_to_role("admin", "*") + +# Namespace wildcard - matches all admin permissions +rbac.assign_permission_to_role("moderator", "admin:*") + +# moderator has: admin:users, admin:posts, admin:settings, etc. +``` + +### AuthContext + +The `AuthContext` object is injected as the first parameter to protected tools: + +```python +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def my_tool(auth: AuthContext, param: str) -> dict: + # Access user information + user_id = auth.user_id # Unique user ID + username = auth.username # Human-readable name + + # Check roles and permissions + is_admin = auth.has_role("admin") + can_write = auth.has_permission("write:posts") + + # Access metadata + department = auth.metadata.get("department") + + return { + "user": username, + "is_admin": is_admin, + "can_write": can_write + } +``` + +### Middleware Stacking + +Stack authentication and authorization decorators: + +```python +@app.tool() # 4. Register as tool +@requires_auth_async(provider=api_key_provider) # 3. Authenticate user +@requires_role_async("admin") # 2. Check role +@requires_permission_async("delete:users") # 1. Check permission (executes first) +async def delete_user(auth: AuthContext, user_id: int) -> dict: + """Requires authentication, admin role, AND delete:users permission.""" + return {"status": "deleted", "user_id": user_id} +``` + +### Session Management + +Using the SessionProvider for session-based authentication: + +```python +from nextmcp.auth import SessionProvider + +session_provider = SessionProvider(session_timeout=3600) # 1 hour + +@app.tool() +async def login(username: str, password: str) -> dict: + """Create a new session.""" + # Validate credentials... + + # Create session + session_id = session_provider.create_session( + user_id=f"user_{username}", + username=username, + roles=["user"], + permissions=["read:posts"] + ) + + return {"session_id": session_id, "expires_in": 3600} + +@app.tool() +async def logout(session_id: str) -> dict: + """Destroy a session.""" + success = session_provider.destroy_session(session_id) + return {"logged_out": success} + +# Use session for authentication +@app.tool() +@requires_auth_async(provider=session_provider) +async def protected_tool(auth: AuthContext) -> dict: + """Requires valid session.""" + return {"user": auth.username} +``` + +### Loading RBAC from Configuration + +Define roles and permissions in configuration: + +```python +from nextmcp.auth import RBAC + +rbac = RBAC() + +config = { + "permissions": [ + {"name": "read:posts", "description": "Read posts"}, + {"name": "write:posts", "description": "Write posts"}, + {"name": "delete:posts", "description": "Delete posts"}, + ], + "roles": [ + { + "name": "viewer", + "description": "Read-only", + "permissions": ["read:posts"] + }, + { + "name": "editor", + "description": "Full content management", + "permissions": ["read:posts", "write:posts", "delete:posts"] + } + ] +} + +rbac.load_from_config(config) +``` + +### Custom Auth Providers + +Create your own authentication provider: + +```python +from nextmcp.auth import AuthProvider, AuthResult, AuthContext + +class CustomAuthProvider(AuthProvider): + """Custom authentication using external service.""" + + async def authenticate(self, credentials: dict) -> AuthResult: + """Validate credentials against external service.""" + token = credentials.get("token") + + # Call your external auth service + user_data = await external_auth_service.validate(token) + + if not user_data: + return AuthResult.failure("Invalid token") + + # Build auth context + context = AuthContext( + authenticated=True, + user_id=user_data["id"], + username=user_data["name"], + ) + + # Add roles from external service + for role in user_data.get("roles", []): + context.add_role(role) + + return AuthResult.success_result(context) +``` + +### Error Handling + +Authentication errors are raised as exceptions: + +```python +from nextmcp.auth import PermissionDeniedError +from nextmcp.auth.middleware import AuthenticationError + +try: + # Call protected tool without credentials + result = await protected_tool(data="test") +except AuthenticationError as e: + print(f"Auth failed: {e}") + +try: + # Call tool without required permission + result = await admin_tool() +except PermissionDeniedError as e: + print(f"Permission denied: {e}") + print(f"Required: {e.required}") + print(f"User: {e.user_id}") +``` + +### Security Best Practices + +1. **Never commit secrets**: Use environment variables for keys/secrets +2. **Use HTTPS/TLS**: Always encrypt traffic in production +3. **Rotate keys regularly**: Implement key rotation policies +4. **Short token expiration**: Balance security and UX (1-24 hours) +5. **Log auth attempts**: Track successful and failed authentication +6. **Validate all inputs**: Never trust client-provided data +7. **Use strong secrets**: Generate with `secrets.token_urlsafe(32)` +8. **Implement rate limiting**: Prevent brute force attacks + +### Examples + +Check out complete authentication examples: + +- **`examples/auth_api_key/`** - API key authentication with role-based access +- **`examples/auth_jwt/`** - JWT token authentication with login endpoint +- **`examples/auth_rbac/`** - Advanced RBAC with fine-grained permissions + ## Core Concepts ### Creating an Application @@ -1189,6 +1558,9 @@ mcp version Check out the `examples/` directory for complete working examples: - **blog_server** - Convention-based project structure with auto-discovery (5 tools, 3 prompts, 4 resources) +- **auth_api_key** - API key authentication with role-based access control +- **auth_jwt** - JWT token authentication with login endpoint and token generation +- **auth_rbac** - Advanced RBAC with fine-grained permissions and wildcards - **weather_bot** - A weather information server with multiple tools - **async_weather_bot** - Async version demonstrating concurrent operations and async middleware - **websocket_chat** - Real-time chat server using WebSocket transport @@ -1207,6 +1579,9 @@ cd nextmcp # Install in editable mode with dev dependencies pip install -e ".[dev]" +# Install git pre-commit hooks (recommended) +./scripts/install-hooks.sh + # Run tests pytest @@ -1223,6 +1598,23 @@ ruff check nextmcp tests mypy nextmcp ``` +#### Pre-commit Hooks + +The repository includes a pre-commit hook that automatically runs before each commit to: +- Check and auto-fix code with ruff +- Format code with black +- Run all tests + +Install the hook with: +```bash +./scripts/install-hooks.sh +``` + +The hook ensures all commits pass linting and tests, preventing CI failures. To bypass the hook (not recommended), use: +```bash +git commit --no-verify +``` + ### Running Tests ```bash @@ -1262,6 +1654,12 @@ NextMCP builds on FastMCP to provide: | Convention-based structure | ❌ | ✅ File-based organization | | Auto-discovery | ❌ | ✅ Automatic primitive registration | | Zero-config setup | ❌ | ✅ `NextMCP.from_config()` | +| **Authentication & Authorization** | ❌ | ✅ **Built-in auth system** | +| API key auth | ❌ | ✅ APIKeyProvider | +| JWT auth | ❌ | ✅ JWTProvider | +| Session auth | ❌ | ✅ SessionProvider | +| RBAC | ❌ | ✅ Full RBAC system | +| Permission-based access | ❌ | ✅ Fine-grained permissions | | Async/await support | ❌ | ✅ Full support | | WebSocket transport | ❌ | ✅ Built-in | | Middleware | ❌ | Global + tool-specific | @@ -1280,6 +1678,7 @@ NextMCP builds on FastMCP to provide: - [x] **v0.1.0** - Core MCP server with Tools primitive - [x] **v0.2.0** - Full MCP Primitives (Prompts, Resources, Resource Templates, Subscriptions) - [x] **v0.3.0** - Convention-Based Architecture (Auto-discovery, `from_config()`, Project structure) +- [x] **v0.4.0** - Authentication & Authorization (API keys, JWT, Sessions, RBAC) - [x] Async tool support - [x] WebSocket transport - [x] Plugin system @@ -1293,13 +1692,6 @@ NextMCP builds on FastMCP to provide: ### Planned -#### v0.4.0 - Authentication & Authorization -- **Built-in Auth System**: next-auth inspired authentication for MCP -- **Multiple Providers**: API keys, JWT, OAuth-like flows for MCP -- **Role-Based Access Control (RBAC)**: Tool-level permissions -- **Convention-Based**: `auth/` directory for auth providers -- **Middleware Integration**: Seamless auth middleware - #### v0.5.0 - Production & Deployment - **Deployment Manifests**: Generate Docker, AWS Lambda, and serverless configs - **One-Command Deploy**: `mcp deploy --target=aws-lambda` diff --git a/examples/async_weather_bot/app.py b/examples/async_weather_bot/app.py index 7dd0b1e..3bdab50 100644 --- a/examples/async_weather_bot/app.py +++ b/examples/async_weather_bot/app.py @@ -9,10 +9,10 @@ """ import asyncio -from nextmcp import NextMCP, setup_logging, log_calls_async, error_handler_async -from typing import Optional import random +from nextmcp import NextMCP, error_handler_async, log_calls_async, setup_logging + # Setup logging setup_logging(level="INFO") @@ -185,7 +185,7 @@ async def test_async_tools(): # Get batch weather (concurrent execution) print("\n3. Testing get_batch_weather (concurrent)...") cities = ["New York", "London", "Tokyo", "Paris"] - batch_weather = await get_batch_weather(cities) + await get_batch_weather(cities) print(f" Result: Fetched weather for {len(cities)} cities concurrently") # Search cities diff --git a/examples/auth_api_key/README.md b/examples/auth_api_key/README.md new file mode 100644 index 0000000..0592614 --- /dev/null +++ b/examples/auth_api_key/README.md @@ -0,0 +1,242 @@ +# API Key Authentication Example + +This example demonstrates how to use API key authentication to protect MCP tools with role-based access control. + +## Features + +- **API Key Provider**: Validates requests using pre-configured API keys +- **Role-Based Access**: Different keys have different roles (admin, user, viewer) +- **Permission Control**: Fine-grained permissions per key +- **Public & Protected Tools**: Mix of authenticated and unauthenticated tools +- **Async Support**: All tools use async/await + +## Project Structure + +``` +auth_api_key/ +├── server.py # Main server with protected tools +└── README.md # This file +``` + +## Running the Example + +```bash +cd examples/auth_api_key +python server.py +``` + +## API Keys + +The server has three pre-configured API keys: + +| API Key | User | Role | Permissions | +|---------|------|------|-------------| +| `admin-key-123` | admin | admin | read:*, write:*, delete:* | +| `user-key-456` | alice | user | read:posts, write:own_posts | +| `viewer-key-789` | bob | viewer | read:posts | + +## Available Tools + +### Public (No Auth) +- `public_info()` - Get server information + +### Authenticated (Any Valid Key) +- `get_profile()` - Get your user profile +- `list_posts()` - List blog posts + +### User or Admin Role Required +- `create_post(title, content)` - Create a new post + +### Admin Role Required +- `delete_post(post_id)` - Delete a post +- `server_stats()` - View server statistics + +## How It Works + +### 1. Configure the API Key Provider + +```python +from nextmcp.auth import APIKeyProvider + +api_key_provider = APIKeyProvider( + valid_keys={ + "admin-key-123": { + "user_id": "admin1", + "username": "admin", + "roles": ["admin"], + "permissions": ["read:*", "write:*", "delete:*"], + } + } +) +``` + +### 2. Protect Tools with Authentication + +```python +from nextmcp.auth import requires_auth_async, AuthContext + +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def protected_tool(auth: AuthContext) -> dict: + """This tool requires authentication.""" + return { + "user": auth.username, + "roles": [r.name for r in auth.roles] + } +``` + +### 3. Add Role-Based Access Control + +```python +from nextmcp.auth import requires_role_async + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("admin") +async def admin_only_tool(auth: AuthContext) -> dict: + """Only users with admin role can access this.""" + return {"action": "performed", "by": auth.username} +``` + +## Testing the Authentication + +### Test with Admin Key + +```python +# This will work - admin has all permissions +result = await client.invoke_tool( + "delete_post", + {"post_id": 1}, + auth={"api_key": "admin-key-123"} +) +``` + +### Test with User Key + +```python +# This will work - user can create posts +result = await client.invoke_tool( + "create_post", + {"title": "My Post", "content": "Content here"}, + auth={"api_key": "user-key-456"} +) + +# This will FAIL - user cannot delete posts +result = await client.invoke_tool( + "delete_post", + {"post_id": 1}, + auth={"api_key": "user-key-456"} # PermissionDeniedError! +) +``` + +### Test with Viewer Key + +```python +# This will work - viewer can list posts +result = await client.invoke_tool( + "list_posts", + {"limit": 5}, + auth={"api_key": "viewer-key-789"} +) + +# This will FAIL - viewer cannot create posts +result = await client.invoke_tool( + "create_post", + {"title": "Post", "content": "Content"}, + auth={"api_key": "viewer-key-789"} # PermissionDeniedError! +) +``` + +## Key Concepts + +### AuthContext + +The `AuthContext` object contains information about the authenticated user: + +- `authenticated` - Whether authentication succeeded +- `user_id` - Unique user identifier +- `username` - Human-readable username +- `roles` - Set of Role objects +- `permissions` - Set of Permission objects +- `metadata` - Additional user data + +### Middleware Stacking + +Decorators are applied from bottom to top: + +```python +@app.tool() # 3. Register as tool +@requires_auth_async(provider=api_key_provider) # 2. Check authentication +@requires_role_async("admin") # 1. Check role (first to execute) +async def tool(auth: AuthContext): + pass +``` + +### Generating API Keys + +```python +from nextmcp.auth import APIKeyProvider + +# Generate a secure random API key +new_key = APIKeyProvider.generate_key() +print(f"New API key: {new_key}") +# Output: "a1b2c3d4e5f6..." (64 character hex string) +``` + +## Security Best Practices + +1. **Never commit API keys to version control** +2. **Use environment variables for production keys** +3. **Rotate keys regularly** +4. **Use different keys for different environments** +5. **Implement key expiration if needed** +6. **Log authentication attempts** +7. **Use HTTPS/TLS in production** + +## Production Configuration + +For production, load API keys from environment variables or a secure config file: + +```python +import os +from nextmcp.auth import APIKeyProvider + +api_key_provider = APIKeyProvider( + valid_keys={ + os.environ["ADMIN_API_KEY"]: { + "user_id": "admin1", + "roles": ["admin"], + }, + os.environ["USER_API_KEY"]: { + "user_id": "user1", + "roles": ["user"], + }, + } +) +``` + +## Custom Validation + +You can provide a custom validation function: + +```python +def validate_api_key(api_key: str) -> dict | None: + """Custom validation logic.""" + # Query database, external service, etc. + user = database.find_user_by_api_key(api_key) + if user: + return { + "user_id": user.id, + "username": user.name, + "roles": user.roles, + } + return None + +provider = APIKeyProvider(key_validator=validate_api_key) +``` + +## Next Steps + +- See `examples/auth_jwt/` for JWT token authentication +- See `examples/auth_rbac/` for advanced RBAC scenarios +- See `examples/auth_custom/` for custom auth providers diff --git a/examples/auth_api_key/server.py b/examples/auth_api_key/server.py new file mode 100644 index 0000000..ab9ebef --- /dev/null +++ b/examples/auth_api_key/server.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +API Key Authentication Example + +This example demonstrates how to use API key authentication to protect MCP tools. +Shows both basic authentication and role-based access control. + +Usage: + python server.py + +Testing: + # Valid admin key + python client.py --api-key admin-key-123 + + # Valid user key + python client.py --api-key user-key-456 + + # Invalid key + python client.py --api-key wrong-key +""" + + +from nextmcp import NextMCP +from nextmcp.auth import ( + APIKeyProvider, + AuthContext, + requires_auth_async, + requires_role_async, +) + +# Create the MCP app +app = NextMCP("api-key-auth-server", description="Demonstrates API key authentication") + +# Configure API keys with roles and permissions +api_key_provider = APIKeyProvider( + valid_keys={ + "admin-key-123": { + "user_id": "admin1", + "username": "admin", + "roles": ["admin"], + "permissions": ["read:*", "write:*", "delete:*"], + "metadata": {"department": "IT"}, + }, + "user-key-456": { + "user_id": "user1", + "username": "alice", + "roles": ["user"], + "permissions": ["read:posts", "write:own_posts"], + "metadata": {"department": "Marketing"}, + }, + "viewer-key-789": { + "user_id": "viewer1", + "username": "bob", + "roles": ["viewer"], + "permissions": ["read:posts"], + "metadata": {"department": "Sales"}, + }, + } +) + + +# Public tool - no authentication required +@app.tool() +async def public_info() -> dict: + """Get public server information (no auth required).""" + return { + "server": "API Key Auth Demo", + "version": "1.0.0", + "auth_required": "Use api_key parameter for protected tools", + } + + +# Authenticated tool - requires valid API key +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def get_profile(auth: AuthContext) -> dict: + """Get the authenticated user's profile.""" + return { + "user_id": auth.user_id, + "username": auth.username, + "roles": [r.name for r in auth.roles], + "permissions": [p.name for p in auth.permissions], + "metadata": auth.metadata, + } + + +# Tool that requires authentication +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def list_posts(auth: AuthContext, limit: int = 10) -> dict: + """List blog posts (requires authentication).""" + # Anyone authenticated can list posts + return { + "posts": [ + {"id": 1, "title": "First Post", "author": "admin"}, + {"id": 2, "title": "Second Post", "author": "alice"}, + {"id": 3, "title": "Third Post", "author": "bob"}, + ][:limit], + "requested_by": auth.username, + } + + +# Tool that requires specific role +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("user", "admin") +async def create_post(auth: AuthContext, title: str, content: str) -> dict: + """Create a new blog post (requires user or admin role).""" + return { + "status": "created", + "post": { + "id": 4, + "title": title, + "content": content, + "author": auth.username, + }, + "message": f"Post created by {auth.username}", + } + + +# Admin-only tool +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("admin") +async def delete_post(auth: AuthContext, post_id: int) -> dict: + """Delete a blog post (admin only).""" + return { + "status": "deleted", + "post_id": post_id, + "deleted_by": auth.username, + "message": f"Post {post_id} deleted by admin {auth.username}", + } + + +# Admin-only server management tool +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_role_async("admin") +async def server_stats(auth: AuthContext) -> dict: + """Get server statistics (admin only).""" + return { + "active_users": 42, + "total_posts": 156, + "disk_usage": "45%", + "uptime": "12 days", + "accessed_by": auth.username, + } + + +if __name__ == "__main__": + print("=" * 60) + print("API Key Authentication Example Server") + print("=" * 60) + print() + print("Available API Keys:") + print(" - admin-key-123 (admin role - full access)") + print(" - user-key-456 (user role - can create posts)") + print(" - viewer-key-789 (viewer role - read-only)") + print() + print("Available Tools:") + print(" - public_info() - No auth required") + print(" - get_profile() - Requires any valid key") + print(" - list_posts() - Requires any valid key") + print(" - create_post() - Requires user or admin role") + print(" - delete_post() - Requires admin role") + print(" - server_stats() - Requires admin role") + print() + print("Starting server...") + print("=" * 60) + + app.run() diff --git a/examples/auth_jwt/README.md b/examples/auth_jwt/README.md new file mode 100644 index 0000000..0efd2c9 --- /dev/null +++ b/examples/auth_jwt/README.md @@ -0,0 +1,198 @@ +# JWT Authentication Example + +This example demonstrates JWT (JSON Web Token) based authentication with automatic token expiration and role-based access control. + +## Features + +- **JWT Token Authentication**: Stateless token-based auth +- **Token Generation**: Built-in login endpoint +- **Expiration Handling**: Automatic token expiration validation +- **Role-Based Access**: Different roles with different permissions +- **Token Utility**: Helper script to generate tokens for testing + +## Requirements + +```bash +pip install PyJWT +``` + +## Running the Example + +```bash +cd examples/auth_jwt + +# Generate a token +python generate_token.py --user admin --role admin + +# Start the server +python server.py +``` + +## How JWT Works + +1. **Client logs in** with username/password +2. **Server generates JWT** signed with secret key +3. **Client includes token** in subsequent requests +4. **Server validates token** signature and expiration +5. **Server extracts user info** from token payload + +## Available Roles + +| Role | Permissions | Can Access | +|------|-------------|-----------| +| admin | read:*, write:*, delete:* | All tools | +| user | read:posts, write:own_posts | Create, update posts | +| viewer | read:posts | Read-only access | + +## Tools + +### Public (No Auth) +- `public_info()` - Server information +- `login(username, password)` - Get JWT token + +### Authenticated +- `whoami()` - View your auth info +- `list_posts(limit)` - List posts + +### User/Admin Role +- `create_post(title, content)` - Create post +- `update_post(post_id, title, content)` - Update post + +### Admin Only +- `delete_post(post_id)` - Delete post +- `admin_dashboard()` - View server stats + +## Token Generation + +### Using the Utility + +```bash +# Admin token (1 hour expiration) +python generate_token.py --user admin --role admin + +# User token (2 hour expiration) +python generate_token.py --user alice --role user --expires 7200 + +# Viewer token +python generate_token.py --user bob --role viewer +``` + +### Programmatic Generation + +```python +from nextmcp.auth import JWTProvider + +provider = JWTProvider(secret_key="your-secret") + +token = provider.create_token( + user_id="user123", + roles=["admin"], + permissions=["read:*"], + username="alice", + expires_in=3600 # 1 hour +) +``` + +## Using Tokens + +### With MCP Client + +```python +# Authenticate with token +result = await client.invoke_tool( + "whoami", + {}, + auth={"token": "eyJhbGc..."} +) +``` + +### Token Structure + +JWT tokens have three parts (separated by `.`): + +``` +header.payload.signature +``` + +Example payload: +```json +{ + "sub": "user_admin", + "username": "admin", + "roles": ["admin"], + "permissions": ["read:*", "write:*"], + "iat": 1705320000, + "exp": 1705323600 +} +``` + +## Security Features + +- **Signature Verification**: Prevents token tampering +- **Expiration Validation**: Tokens auto-expire +- **Algorithm Specification**: HS256 by default +- **No Session Storage**: Stateless authentication + +## Error Handling + +### Expired Token +```python +# After token expires +result = await client.invoke_tool("whoami", {}, auth={"token": expired_token}) +# Error: "Token expired" +``` + +### Invalid Token +```python +# Tampered or malformed token +result = await client.invoke_tool("whoami", {}, auth={"token": "invalid"}) +# Error: "Invalid token" +``` + +### Insufficient Permissions +```python +# Viewer trying to create post +result = await client.invoke_tool( + "create_post", + {"title": "Post", "content": "Content"}, + auth={"token": viewer_token} +) +# Error: PermissionDeniedError +``` + +## Production Best Practices + +1. **Use strong secret keys**: Generate with `secrets.token_urlsafe(32)` +2. **Store secrets securely**: Use environment variables +3. **Use HTTPS**: Prevent token interception +4. **Short expiration times**: Balance UX and security +5. **Refresh tokens**: Implement token refresh mechanism +6. **Revocation list**: Track revoked tokens if needed + +## Advanced Usage + +### Custom Claims + +```python +token = provider.create_token( + user_id="user123", + roles=["user"], + custom_field="custom_value", + department="Engineering" +) +``` + +### Different Algorithms + +```python +# RS256 (asymmetric) +provider = JWTProvider( + secret_key=private_key, + algorithm="RS256" +) +``` + +## Next Steps + +- See `examples/auth_api_key/` for simpler API key auth +- See `examples/auth_rbac/` for complex permission scenarios diff --git a/examples/auth_jwt/generate_token.py b/examples/auth_jwt/generate_token.py new file mode 100644 index 0000000..b5d0011 --- /dev/null +++ b/examples/auth_jwt/generate_token.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +""" +JWT Token Generator + +Utility to generate JWT tokens for testing the auth_jwt example. + +Usage: + python generate_token.py --user admin --role admin + python generate_token.py --user alice --role user + python generate_token.py --user bob --role viewer --expires 7200 +""" + +import argparse + +from nextmcp.auth import JWTProvider + +# Must match the secret in server.py +SECRET_KEY = "your-secret-key-change-in-production" + + +def main(): + parser = argparse.ArgumentParser(description="Generate JWT tokens for testing") + parser.add_argument("--user", required=True, help="Username") + parser.add_argument("--role", required=True, help="User role (admin, user, viewer)") + parser.add_argument( + "--expires", type=int, default=3600, help="Token expiration in seconds (default: 3600)" + ) + + args = parser.parse_args() + + # Map roles to permissions + role_permissions = { + "admin": ["read:*", "write:*", "delete:*"], + "user": ["read:posts", "write:own_posts"], + "viewer": ["read:posts"], + } + + permissions = role_permissions.get(args.role, ["read:posts"]) + + # Create JWT provider + provider = JWTProvider(secret_key=SECRET_KEY) + + # Generate token + token = provider.create_token( + user_id=f"user_{args.user}", + roles=[args.role], + permissions=permissions, + username=args.user, + expires_in=args.expires, + ) + + print("=" * 60) + print("JWT Token Generated") + print("=" * 60) + print() + print(f"User: {args.user}") + print(f"Role: {args.role}") + print(f"Permissions: {', '.join(permissions)}") + print(f"Expires in: {args.expires} seconds ({args.expires/3600:.1f} hours)") + print() + print("Token:") + print("-" * 60) + print(token) + print("-" * 60) + print() + print("Usage in your client:") + print(' auth={"token": "' + token[:20] + '..."}') + print() + print("To decode the token, visit: https://jwt.io") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/examples/auth_jwt/server.py b/examples/auth_jwt/server.py new file mode 100644 index 0000000..465119e --- /dev/null +++ b/examples/auth_jwt/server.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +JWT Authentication Example + +This example demonstrates how to use JWT (JSON Web Tokens) for authentication. +Shows token generation, validation, expiration, and role-based access. + +Usage: + # Generate a token first + python generate_token.py --user alice --role user + + # Then use it with the server + python server.py + +Requirements: + pip install PyJWT +""" + + +from nextmcp import NextMCP +from nextmcp.auth import AuthContext, JWTProvider, requires_auth_async, requires_role_async + +# Create the MCP app +app = NextMCP("jwt-auth-server", description="Demonstrates JWT authentication") + +# Secret key for JWT signing (in production, use environment variable!) +SECRET_KEY = "your-secret-key-change-in-production" + +# Configure JWT provider +jwt_provider = JWTProvider( + secret_key=SECRET_KEY, + algorithm="HS256", + verify_exp=True, # Verify token expiration +) + + +# Public tool - no authentication +@app.tool() +async def public_info() -> dict: + """Get public server information (no auth required).""" + return { + "server": "JWT Auth Demo", + "version": "1.0.0", + "auth": "JWT", + "note": "Use generate_token.py to create tokens", + } + + +# Tool to login and get a JWT token +@app.tool() +async def login(username: str, password: str) -> dict: + """ + Login with username/password and receive a JWT token. + + In a real application, this would validate against a database. + For this demo, we accept any username and password. + """ + # In production: validate against database, hash passwords, etc. + # For demo purposes, we'll accept any credentials + + # Determine role based on username (demo only!) + if username == "admin": + roles = ["admin"] + permissions = ["read:*", "write:*", "delete:*"] + elif username.startswith("user"): + roles = ["user"] + permissions = ["read:posts", "write:own_posts"] + else: + roles = ["viewer"] + permissions = ["read:posts"] + + # Generate JWT token (expires in 1 hour) + token = jwt_provider.create_token( + user_id=f"user_{username}", + roles=roles, + permissions=permissions, + username=username, + expires_in=3600, # 1 hour + ) + + return { + "status": "success", + "message": f"Login successful for {username}", + "token": token, + "expires_in": 3600, + "token_type": "Bearer", + "user": {"username": username, "roles": roles}, + } + + +# Authenticated tool +@app.tool() +@requires_auth_async(provider=jwt_provider) +async def whoami(auth: AuthContext) -> dict: + """Get information about the authenticated user.""" + return { + "user_id": auth.user_id, + "username": auth.username, + "roles": [r.name for r in auth.roles], + "permissions": [p.name for p in auth.permissions], + "authenticated": auth.authenticated, + } + + +# List posts - requires authentication +@app.tool() +@requires_auth_async(provider=jwt_provider) +async def list_posts(auth: AuthContext, limit: int = 10) -> dict: + """List blog posts (requires valid JWT token).""" + posts = [ + {"id": 1, "title": "JWT Authentication in MCP", "author": "admin"}, + {"id": 2, "title": "Role-Based Access Control", "author": "alice"}, + {"id": 3, "title": "Secure API Design", "author": "bob"}, + ] + + return { + "posts": posts[:limit], + "total": len(posts), + "requested_by": auth.username, + } + + +# Create post - requires user or admin role +@app.tool() +@requires_auth_async(provider=jwt_provider) +@requires_role_async("user", "admin") +async def create_post(auth: AuthContext, title: str, content: str) -> dict: + """Create a new blog post (requires user or admin role).""" + new_post = { + "id": 4, + "title": title, + "content": content, + "author": auth.username, + "created_at": "2025-01-15T10:30:00Z", + } + + return { + "status": "created", + "post": new_post, + "message": f"Post created by {auth.username}", + } + + +# Update post - requires user or admin role +@app.tool() +@requires_auth_async(provider=jwt_provider) +@requires_role_async("user", "admin") +async def update_post(auth: AuthContext, post_id: int, title: str, content: str) -> dict: + """Update an existing post (requires user or admin role).""" + # In production: verify ownership or admin status + return { + "status": "updated", + "post_id": post_id, + "title": title, + "updated_by": auth.username, + "message": f"Post {post_id} updated", + } + + +# Delete post - admin only +@app.tool() +@requires_auth_async(provider=jwt_provider) +@requires_role_async("admin") +async def delete_post(auth: AuthContext, post_id: int) -> dict: + """Delete a blog post (admin only).""" + return { + "status": "deleted", + "post_id": post_id, + "deleted_by": auth.username, + "message": f"Post {post_id} deleted by admin", + } + + +# Admin dashboard - admin only +@app.tool() +@requires_auth_async(provider=jwt_provider) +@requires_role_async("admin") +async def admin_dashboard(auth: AuthContext) -> dict: + """View admin dashboard with server statistics.""" + return { + "server_stats": { + "total_users": 156, + "active_sessions": 42, + "total_posts": 1234, + "disk_usage_gb": 45.6, + }, + "recent_activity": [ + {"user": "alice", "action": "created_post", "timestamp": "2025-01-15T10:25:00Z"}, + {"user": "bob", "action": "updated_post", "timestamp": "2025-01-15T10:20:00Z"}, + ], + "accessed_by": auth.username, + } + + +if __name__ == "__main__": + print("=" * 60) + print("JWT Authentication Example Server") + print("=" * 60) + print() + print("Quick Start:") + print(" 1. Generate a token:") + print(" python generate_token.py --user admin --role admin") + print() + print(" 2. Use the token to authenticate:") + print(" - Pass as 'token' in credentials") + print(" - Token expires after 1 hour") + print() + print("Available Tools:") + print(" - public_info() - No auth required") + print(" - login() - Get JWT token") + print(" - whoami() - Check your auth status") + print(" - list_posts() - Requires valid token") + print(" - create_post() - Requires user/admin role") + print(" - update_post() - Requires user/admin role") + print(" - delete_post() - Requires admin role") + print(" - admin_dashboard() - Requires admin role") + print() + print("User Roles:") + print(" - admin - Full access to all tools") + print(" - user - Can create and update posts") + print(" - viewer - Read-only access") + print() + print("Starting server...") + print("=" * 60) + + app.run() diff --git a/examples/auth_rbac/README.md b/examples/auth_rbac/README.md new file mode 100644 index 0000000..70caa12 --- /dev/null +++ b/examples/auth_rbac/README.md @@ -0,0 +1,170 @@ +# RBAC (Role-Based Access Control) Example + +Advanced example demonstrating fine-grained permission control with the RBAC system. + +## Features + +- **Fine-Grained Permissions**: Specific permissions like `read:posts`, `write:posts` +- **Permission Wildcards**: `admin:*` matches all admin permissions, `*` matches everything +- **RBAC Configuration**: Load roles and permissions from configuration +- **Permission-Based Access**: `@requires_permission` instead of `@requires_role` +- **Role Hierarchies**: Different roles with different permission sets + +## Roles and Permissions + +| Role | Permissions | Capabilities | +|------|-------------|--------------| +| viewer | read:posts | Read posts only | +| author | read:posts, write:posts | Create and edit posts | +| editor | read/write/delete:posts | Full post management | +| moderator | All post perms + read:users | Manage posts + view users | +| admin | * (all) | Full access to everything | + +## Permission Matrix + +| Permission | viewer | author | editor | moderator | admin | +|-----------|--------|--------|--------|-----------|-------| +| read:posts | ✓ | ✓ | ✓ | ✓ | ✓ | +| write:posts | ✗ | ✓ | ✓ | ✓ | ✓ | +| delete:posts | ✗ | ✗ | ✓ | ✓ | ✓ | +| read:users | ✗ | ✗ | ✗ | ✓ | ✓ | +| write:users | ✗ | ✗ | ✗ | ✗ | ✓ | +| delete:users | ✗ | ✗ | ✗ | ✗ | ✓ | + +## Running the Example + +```bash +cd examples/auth_rbac +python server.py +``` + +## Key Concepts + +### Permission-Based Access + +Instead of checking roles, check specific permissions: + +```python +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("write:posts") # Specific permission +async def create_post(auth: AuthContext, title: str) -> dict: + pass +``` + +This allows: +- **Flexibility**: Users can have permissions without roles +- **Granularity**: Fine-grained access control +- **Clarity**: Explicit permission requirements + +### Loading RBAC Configuration + +```python +from nextmcp.auth import RBAC + +rbac = RBAC() + +config = { + "permissions": [ + {"name": "read:posts", "description": "Read posts"} + ], + "roles": [ + { + "name": "viewer", + "permissions": ["read:posts"] + } + ] +} + +rbac.load_from_config(config) +``` + +### Permission Wildcards + +```python +# Admin has all permissions +rbac.define_permission("*") + +# Matches any permission check +auth_context.has_permission("read:posts") # True +auth_context.has_permission("write:users") # True +auth_context.has_permission("anything") # True +``` + +### Checking Permissions + +```python +from nextmcp.auth import RBAC + +rbac = RBAC() + +# Check if user has permission +if rbac.check_permission(auth_context, "write:posts"): + # Allow action + pass + +# Require permission (raises PermissionDeniedError if missing) +rbac.require_permission(auth_context, "delete:posts") +``` + +## Testing Different Roles + +### Viewer (Read-Only) + +```python +# Works +list_posts(auth={"api_key": "viewer-key"}) + +# Fails - PermissionDeniedError +create_post(title="Post", content="...", auth={"api_key": "viewer-key"}) +``` + +### Author (Create/Edit) + +```python +# Works +create_post(title="My Post", content="...", auth={"api_key": "author-key"}) +update_post(post_id=1, title="Updated", auth={"api_key": "author-key"}) + +# Fails - PermissionDeniedError +delete_post(post_id=1, auth={"api_key": "author-key"}) +``` + +### Editor (Full Post Management) + +```python +# All work +list_posts(auth={"api_key": "editor-key"}) +create_post(..., auth={"api_key": "editor-key"}) +update_post(..., auth={"api_key": "editor-key"}) +delete_post(post_id=1, auth={"api_key": "editor-key"}) + +# Fails - no user management permission +list_users(auth={"api_key": "editor-key"}) +``` + +### Admin (Full Access) + +```python +# Everything works - wildcard permission +list_posts(auth={"api_key": "admin-key"}) +delete_post(..., auth={"api_key": "admin-key"}) +list_users(auth={"api_key": "admin-key"}) +create_user(..., auth={"api_key": "admin-key"}) +delete_user(..., auth={"api_key": "admin-key"}) +``` + +## Best Practices + +1. **Use specific permissions over roles** when possible +2. **Define clear permission naming conventions** (resource:action) +3. **Document permission requirements** for each tool +4. **Use wildcards sparingly** (only for admin/superuser roles) +5. **Load RBAC config from files** in production +6. **Version your permission schemas** as they evolve + +## Next Steps + +- See `nextmcp/auth/rbac.py` for full RBAC system implementation +- See `examples/auth_api_key/` for simpler auth example +- See `examples/auth_jwt/` for token-based auth diff --git a/examples/auth_rbac/server.py b/examples/auth_rbac/server.py new file mode 100644 index 0000000..196e35c --- /dev/null +++ b/examples/auth_rbac/server.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +""" +RBAC (Role-Based Access Control) Example + +This example demonstrates advanced RBAC with fine-grained permissions, +hierarchical roles, and permission-based access control. + +Features: +- Fine-grained permissions (read:posts, write:posts, etc.) +- Permission wildcards (admin:*, *) +- RBAC configuration loading from dict +- Permission-based access control (@requires_permission) +- Role hierarchies + +Usage: + python server.py +""" + +from nextmcp import NextMCP +from nextmcp.auth import ( + RBAC, + APIKeyProvider, + AuthContext, + requires_auth_async, + requires_permission_async, +) + +# Create the MCP app +app = NextMCP("rbac-server", description="Demonstrates advanced RBAC") + +# Initialize RBAC system +rbac = RBAC() + +# Define RBAC configuration +rbac_config = { + "permissions": [ + {"name": "read:posts", "description": "Read blog posts"}, + {"name": "write:posts", "description": "Create and edit posts"}, + {"name": "delete:posts", "description": "Delete posts"}, + {"name": "read:users", "description": "View user list"}, + {"name": "write:users", "description": "Create and edit users"}, + {"name": "delete:users", "description": "Delete users"}, + {"name": "admin:*", "description": "All admin permissions"}, + {"name": "*", "description": "All permissions"}, + ], + "roles": [ + { + "name": "viewer", + "description": "Read-only access to posts", + "permissions": ["read:posts"], + }, + { + "name": "author", + "description": "Can create and edit posts", + "permissions": ["read:posts", "write:posts"], + }, + { + "name": "editor", + "description": "Can create, edit, and delete posts", + "permissions": ["read:posts", "write:posts", "delete:posts"], + }, + { + "name": "moderator", + "description": "Can manage posts and view users", + "permissions": ["read:posts", "write:posts", "delete:posts", "read:users"], + }, + { + "name": "admin", + "description": "Full access to everything", + "permissions": ["*"], # Wildcard - matches all permissions + }, + ], +} + +# Load RBAC configuration +rbac.load_from_config(rbac_config) + +# Configure API key provider with RBAC roles +api_key_provider = APIKeyProvider( + valid_keys={ + "admin-key": { + "user_id": "admin1", + "username": "admin", + "roles": ["admin"], + }, + "moderator-key": { + "user_id": "mod1", + "username": "moderator", + "roles": ["moderator"], + }, + "editor-key": { + "user_id": "editor1", + "username": "editor", + "roles": ["editor"], + }, + "author-key": { + "user_id": "author1", + "username": "alice", + "roles": ["author"], + }, + "viewer-key": { + "user_id": "viewer1", + "username": "bob", + "roles": ["viewer"], + }, + } +) + + +@app.tool() +async def show_rbac_config() -> dict: + """View the current RBAC configuration (no auth required).""" + config = rbac.to_dict() + + return { + "message": "Current RBAC Configuration", + "total_roles": len(config["roles"]), + "total_permissions": len(config["permissions"]), + "roles": config["roles"], + "permissions": config["permissions"], + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +async def check_my_permissions(auth: AuthContext) -> dict: + """Check what permissions you have based on your roles.""" + # Get user's roles + user_roles = [rbac.get_role(r.name) for r in auth.roles] + + # Collect all permissions from roles + all_permissions = set() + for role in user_roles: + if role: + all_permissions.update(role.permissions) + + return { + "user": auth.username, + "user_id": auth.user_id, + "roles": [r.name for r in auth.roles], + "permissions": [p.name for p in all_permissions], + "can_read_posts": rbac.check_permission(auth, "read:posts"), + "can_write_posts": rbac.check_permission(auth, "write:posts"), + "can_delete_posts": rbac.check_permission(auth, "delete:posts"), + "can_read_users": rbac.check_permission(auth, "read:users"), + "can_write_users": rbac.check_permission(auth, "write:users"), + } + + +# Permission-based access control examples + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("read:posts") +async def list_posts(auth: AuthContext) -> dict: + """List all posts (requires read:posts permission).""" + return { + "posts": [ + {"id": 1, "title": "RBAC in MCP", "author": "admin"}, + {"id": 2, "title": "Permission Systems", "author": "alice"}, + {"id": 3, "title": "Security Best Practices", "author": "bob"}, + ], + "accessed_by": auth.username, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("write:posts") +async def create_post(auth: AuthContext, title: str, content: str) -> dict: + """Create a post (requires write:posts permission).""" + return { + "status": "created", + "post": { + "id": 4, + "title": title, + "content": content, + "author": auth.username, + }, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("write:posts") +async def update_post(auth: AuthContext, post_id: int, title: str) -> dict: + """Update a post (requires write:posts permission).""" + return { + "status": "updated", + "post_id": post_id, + "new_title": title, + "updated_by": auth.username, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("delete:posts") +async def delete_post(auth: AuthContext, post_id: int) -> dict: + """Delete a post (requires delete:posts permission).""" + return { + "status": "deleted", + "post_id": post_id, + "deleted_by": auth.username, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("read:users") +async def list_users(auth: AuthContext) -> dict: + """List all users (requires read:users permission).""" + return { + "users": [ + {"id": 1, "username": "admin", "role": "admin"}, + {"id": 2, "username": "alice", "role": "author"}, + {"id": 3, "username": "bob", "role": "viewer"}, + ], + "accessed_by": auth.username, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("write:users") +async def create_user(auth: AuthContext, username: str, role: str) -> dict: + """Create a new user (requires write:users permission).""" + return { + "status": "created", + "user": {"id": 4, "username": username, "role": role}, + "created_by": auth.username, + } + + +@app.tool() +@requires_auth_async(provider=api_key_provider) +@requires_permission_async("delete:users") +async def delete_user(auth: AuthContext, user_id: int) -> dict: + """Delete a user (requires delete:users permission).""" + return { + "status": "deleted", + "user_id": user_id, + "deleted_by": auth.username, + "note": "Only admins have this permission", + } + + +if __name__ == "__main__": + print("=" * 70) + print("RBAC (Role-Based Access Control) Example") + print("=" * 70) + print() + print("This example demonstrates fine-grained permission control.") + print() + print("Roles and Their Permissions:") + print("-" * 70) + + for role_config in rbac_config["roles"]: + print(f" {role_config['name'].upper():15} - {role_config['description']}") + print(f" Permissions: {', '.join(role_config['permissions'])}") + + print() + print("API Keys:") + print("-" * 70) + print(" admin-key - Full access (admin role)") + print(" moderator-key - Manage posts + view users") + print(" editor-key - Create, edit, delete posts") + print(" author-key - Create and edit posts") + print(" viewer-key - Read-only access") + print() + print("Available Tools:") + print("-" * 70) + print(" show_rbac_config() - View RBAC configuration") + print(" check_my_permissions() - Check your permissions") + print() + print(" Requires read:posts:") + print(" - list_posts()") + print() + print(" Requires write:posts:") + print(" - create_post(title, content)") + print(" - update_post(post_id, title)") + print() + print(" Requires delete:posts:") + print(" - delete_post(post_id)") + print() + print(" Requires read:users:") + print(" - list_users()") + print() + print(" Requires write:users:") + print(" - create_user(username, role)") + print() + print(" Requires delete:users:") + print(" - delete_user(user_id)") + print() + print("Starting server...") + print("=" * 70) + + app.run() diff --git a/examples/knowledge_base/app.py b/examples/knowledge_base/app.py index cbc0e5e..5974cd4 100644 --- a/examples/knowledge_base/app.py +++ b/examples/knowledge_base/app.py @@ -7,9 +7,7 @@ - Resources: Access to knowledge base data and statistics """ -import asyncio from datetime import datetime -from pathlib import Path from nextmcp import NextMCP, argument @@ -120,7 +118,7 @@ def list_categories() -> list[str]: Returns: List of category names """ - categories = set(article["category"] for article in KNOWLEDGE_BASE.values()) + categories = {article["category"] for article in KNOWLEDGE_BASE.values()} return sorted(categories) diff --git a/examples/metrics_example/app.py b/examples/metrics_example/app.py index 84d07c2..e6f1d8f 100644 --- a/examples/metrics_example/app.py +++ b/examples/metrics_example/app.py @@ -8,9 +8,10 @@ - JSON format export """ -from nextmcp import NextMCP, setup_logging -import time import random +import time + +from nextmcp import NextMCP, setup_logging # Setup logging setup_logging(level="INFO") @@ -88,7 +89,7 @@ def error_demo() -> dict: # Error handling print("\n4. Error handling (some may fail):") -for i in range(5): +for _i in range(5): try: result = error_demo() print(f" error_demo() = {result['status']}") diff --git a/examples/plugin_example/plugins/timing_plugin.py b/examples/plugin_example/plugins/timing_plugin.py index cdd6469..438d224 100644 --- a/examples/plugin_example/plugins/timing_plugin.py +++ b/examples/plugin_example/plugins/timing_plugin.py @@ -7,9 +7,10 @@ - Tracking state in plugins """ -from nextmcp import Plugin import time +from nextmcp import Plugin + class TimingPlugin(Plugin): """Plugin that adds timing middleware to measure tool execution time.""" diff --git a/examples/weather_bot/app.py b/examples/weather_bot/app.py index 13cd3de..aa71524 100644 --- a/examples/weather_bot/app.py +++ b/examples/weather_bot/app.py @@ -8,10 +8,10 @@ - Configuration management """ -from nextmcp import NextMCP, setup_logging, log_calls, error_handler -from typing import Optional import random +from nextmcp import NextMCP, error_handler, log_calls, setup_logging + # Setup logging setup_logging(level="INFO") diff --git a/examples/websocket_chat/client.py b/examples/websocket_chat/client.py index fb45871..26e469f 100644 --- a/examples/websocket_chat/client.py +++ b/examples/websocket_chat/client.py @@ -9,6 +9,7 @@ """ import asyncio + from nextmcp.transport import WebSocketClient @@ -57,7 +58,7 @@ async def main(): print("5. Retrieving recent messages...") result = await client.invoke_tool("get_messages", {"limit": 5}) print(f" Total messages in history: {result['total']}") - print(f" Recent messages:") + print(" Recent messages:") for msg in result["messages"]: print(f" [{msg['timestamp']}] {msg['username']}: {msg['message']}") print() @@ -72,9 +73,9 @@ async def main(): # Test echo (concurrent calls) print("7. Testing concurrent tool invocation...") echo_tasks = [client.invoke_tool("echo", {"message": f"Message {i}"}) for i in range(5)] - results = await asyncio.gather(*echo_tasks) - print(f" Sent 5 concurrent echo requests") - print(f" All received back successfully ✓\n") + await asyncio.gather(*echo_tasks) + print(" Sent 5 concurrent echo requests") + print(" All received back successfully ✓\n") print("=" * 60) print("All tests completed successfully!") diff --git a/examples/websocket_chat/server.py b/examples/websocket_chat/server.py index bae80eb..c02b7fe 100644 --- a/examples/websocket_chat/server.py +++ b/examples/websocket_chat/server.py @@ -9,10 +9,10 @@ """ import asyncio -from nextmcp import NextMCP, setup_logging, log_calls_async, error_handler_async -from nextmcp.transport import WebSocketTransport from datetime import datetime, timezone -from typing import List + +from nextmcp import NextMCP, error_handler_async, log_calls_async, setup_logging +from nextmcp.transport import WebSocketTransport # Setup logging setup_logging(level="INFO") @@ -27,7 +27,7 @@ app.add_middleware(error_handler_async) # Store chat messages in memory -chat_history: List[dict] = [] +chat_history: list[dict] = [] @app.tool(name="send_message", description="Send a chat message") @@ -77,7 +77,7 @@ async def get_stats() -> dict: Returns: Server statistics """ - unique_users = set(msg["username"] for msg in chat_history) + unique_users = {msg["username"] for msg in chat_history} return { "total_messages": len(chat_history), diff --git a/hooks/pre-commit b/hooks/pre-commit new file mode 100755 index 0000000..cb266d3 --- /dev/null +++ b/hooks/pre-commit @@ -0,0 +1,62 @@ +#!/bin/bash + +# NextMCP Pre-commit Hook +# Runs linting and formatting checks before allowing commits + +set -e + +echo "🔍 Running pre-commit checks..." +echo "" + +# Check if venv exists +if [ ! -d "venv" ]; then + echo "❌ Virtual environment not found. Please run: python3 -m venv venv" + exit 1 +fi + +# 1. Run ruff linting with auto-fix +echo "📋 Running ruff linting..." +if ! venv/bin/ruff check --fix nextmcp/ tests/ examples/; then + echo "" + echo "❌ Ruff linting failed. Please fix the errors above." + exit 1 +fi +echo "✅ Ruff checks passed" +echo "" + +# 2. Run black formatting +echo "🎨 Running black formatting..." +if ! venv/bin/black nextmcp/ tests/ examples/; then + echo "" + echo "❌ Black formatting failed." + exit 1 +fi +echo "✅ Black formatting applied" +echo "" + +# 3. Run tests (quick check) +echo "🧪 Running tests..." +if ! venv/bin/python3 -m pytest tests/ -q --tb=line; then + echo "" + echo "❌ Tests failed. Please fix failing tests before committing." + exit 1 +fi +echo "✅ All tests passed" +echo "" + +# Check if linting/formatting made changes +if ! git diff --quiet; then + echo "⚠️ Linting/formatting made changes to your files." + echo " The changes have been applied but NOT staged." + echo " Please review the changes and stage them:" + echo "" + echo " git add -u" + echo " git commit" + echo "" + exit 1 +fi + +echo "✨ All pre-commit checks passed! Proceeding with commit..." +echo "" + +exit 0 diff --git a/nextmcp/__init__.py b/nextmcp/__init__.py index 3ebcda4..425edd7 100644 --- a/nextmcp/__init__.py +++ b/nextmcp/__init__.py @@ -17,8 +17,26 @@ def hello(name: str) -> str: app.run() """ -__version__ = "0.3.0" +__version__ = "0.4.0" +from nextmcp.auth import ( + RBAC, + APIKeyProvider, + AuthContext, + AuthProvider, + AuthResult, + JWTProvider, + Permission, + PermissionDeniedError, + Role, + SessionProvider, + requires_auth, + requires_auth_async, + requires_permission, + requires_permission_async, + requires_role, + requires_role_async, +) from nextmcp.config import Config, load_config from nextmcp.core import NextMCP from nextmcp.discovery import AutoDiscovery, validate_project_structure @@ -77,6 +95,23 @@ def hello(name: str) -> str: "__version__", # Core "NextMCP", + # Authentication + "AuthContext", + "AuthProvider", + "AuthResult", + "APIKeyProvider", + "JWTProvider", + "SessionProvider", + "Permission", + "Role", + "RBAC", + "PermissionDeniedError", + "requires_auth", + "requires_auth_async", + "requires_role", + "requires_role_async", + "requires_permission", + "requires_permission_async", # Tools "tool", "get_tool_metadata", diff --git a/nextmcp/auth/__init__.py b/nextmcp/auth/__init__.py new file mode 100644 index 0000000..f156106 --- /dev/null +++ b/nextmcp/auth/__init__.py @@ -0,0 +1,51 @@ +""" +Authentication and authorization for NextMCP. + +This module provides a comprehensive auth system inspired by next-auth, +adapted for the Model Context Protocol (MCP). +""" + +from nextmcp.auth.core import ( + AuthContext, + AuthProvider, + AuthResult, + Permission, + Role, +) +from nextmcp.auth.middleware import ( + requires_auth, + requires_auth_async, + requires_permission, + requires_permission_async, + requires_role, + requires_role_async, +) +from nextmcp.auth.providers import ( + APIKeyProvider, + JWTProvider, + SessionProvider, +) +from nextmcp.auth.rbac import RBAC, PermissionDeniedError + +__all__ = [ + # Core + "AuthContext", + "AuthProvider", + "AuthResult", + "Permission", + "Role", + # Middleware + "requires_auth", + "requires_auth_async", + "requires_permission", + "requires_permission_async", + "requires_role", + "requires_role_async", + # Providers + "APIKeyProvider", + "JWTProvider", + "SessionProvider", + # RBAC + "RBAC", + "PermissionDeniedError", +] diff --git a/nextmcp/auth/core.py b/nextmcp/auth/core.py new file mode 100644 index 0000000..8c93c3d --- /dev/null +++ b/nextmcp/auth/core.py @@ -0,0 +1,210 @@ +""" +Core authentication framework for NextMCP. + +This module provides the foundational classes for authentication and authorization +in MCP servers, adapted for stdio and WebSocket transports. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class Permission: + """ + Represents a single permission that can be granted to users. + + Permissions are fine-grained capabilities like "read:posts", "write:posts", + "admin:users", etc. + """ + + name: str + description: str = "" + resource: str | None = None # Optional resource this permission applies to + + def __str__(self) -> str: + return self.name + + def __hash__(self) -> int: + return hash(self.name) + + def __eq__(self, other: object) -> bool: + if isinstance(other, Permission): + return self.name == other.name + return False + + def matches(self, required: str) -> bool: + """ + Check if this permission matches a required permission string. + + Supports wildcards: + - "admin:*" matches "admin:users", "admin:posts", etc. + - "*" matches everything + + Args: + required: Required permission string + + Returns: + True if this permission satisfies the requirement + """ + if self.name == "*": + return True + + if "*" in self.name: + prefix = self.name.split("*")[0] + return required.startswith(prefix) + + return self.name == required + + +@dataclass +class Role: + """ + Represents a role with a collection of permissions. + + Roles are named collections of permissions like "admin", "editor", "viewer", etc. + """ + + name: str + description: str = "" + permissions: set[Permission] = field(default_factory=set) + + def __str__(self) -> str: + return self.name + + def __hash__(self) -> int: + return hash(self.name) + + def __eq__(self, other: object) -> bool: + if isinstance(other, Role): + return self.name == other.name + return False + + def add_permission(self, permission: Permission | str) -> None: + """Add a permission to this role.""" + if isinstance(permission, str): + permission = Permission(permission) + self.permissions.add(permission) + + def has_permission(self, permission_name: str) -> bool: + """Check if this role has a specific permission.""" + return any(p.matches(permission_name) for p in self.permissions) + + +@dataclass +class AuthContext: + """ + Represents the authentication context for a request. + + This contains information about the authenticated user, their credentials, + roles, and permissions. It's passed to tools that require authentication. + """ + + authenticated: bool = False + user_id: str | None = None + username: str | None = None + roles: set[Role] = field(default_factory=set) + permissions: set[Permission] = field(default_factory=set) + metadata: dict[str, Any] = field(default_factory=dict) + + def has_role(self, role_name: str) -> bool: + """Check if user has a specific role.""" + return any(r.name == role_name for r in self.roles) + + def has_permission(self, permission_name: str) -> bool: + """ + Check if user has a specific permission. + + Checks both direct permissions and permissions from roles. + """ + # Check direct permissions + if any(p.matches(permission_name) for p in self.permissions): + return True + + # Check role permissions + return any(r.has_permission(permission_name) for r in self.roles) + + def add_role(self, role: Role | str) -> None: + """Add a role to this auth context.""" + if isinstance(role, str): + role = Role(role) + self.roles.add(role) + + def add_permission(self, permission: Permission | str) -> None: + """Add a permission to this auth context.""" + if isinstance(permission, str): + permission = Permission(permission) + self.permissions.add(permission) + + +@dataclass +class AuthResult: + """ + Result of an authentication attempt. + + Contains whether authentication succeeded, the auth context if successful, + and any error message if failed. + """ + + success: bool + context: AuthContext | None = None + error: str | None = None + + @classmethod + def success_result(cls, context: AuthContext) -> "AuthResult": + """Create a successful auth result.""" + return cls(success=True, context=context) + + @classmethod + def failure(cls, error: str) -> "AuthResult": + """Create a failed auth result.""" + return cls(success=False, error=error) + + +class AuthProvider(ABC): + """ + Base class for authentication providers. + + Providers implement specific authentication strategies like API keys, + JWT tokens, OAuth flows, etc. + """ + + def __init__(self, **config: Any): + """ + Initialize the auth provider with configuration. + + Args: + **config: Provider-specific configuration + """ + self.config = config + + @abstractmethod + async def authenticate(self, credentials: dict[str, Any]) -> AuthResult: + """ + Authenticate a user with the given credentials. + + Args: + credentials: Authentication credentials (provider-specific format) + + Returns: + AuthResult indicating success/failure and auth context + """ + pass + + def validate_credentials(self, credentials: dict[str, Any]) -> bool: + """ + Validate that credentials are in the correct format. + + Args: + credentials: Credentials to validate + + Returns: + True if credentials are valid format, False otherwise + """ + return True + + @property + def name(self) -> str: + """Get the provider name.""" + return self.__class__.__name__ diff --git a/nextmcp/auth/middleware.py b/nextmcp/auth/middleware.py new file mode 100644 index 0000000..3fd15b9 --- /dev/null +++ b/nextmcp/auth/middleware.py @@ -0,0 +1,357 @@ +""" +Authentication middleware for NextMCP. + +This module provides middleware decorators for protecting tools with +authentication and authorization requirements. +""" + +import functools +import logging +from collections.abc import Callable +from typing import Any + +from nextmcp.auth.core import AuthContext, AuthProvider +from nextmcp.auth.rbac import PermissionDeniedError + +logger = logging.getLogger(__name__) + + +class AuthenticationError(Exception): + """Raised when authentication fails.""" + + pass + + +def requires_auth( + provider: AuthProvider | None = None, + credentials_key: str = "auth", +) -> Callable: + """ + Middleware decorator that requires authentication. + + The decorated tool will only execute if valid credentials are provided. + The auth context is injected as the first parameter. + + Args: + provider: Auth provider to use (if None, must be set later) + credentials_key: Key in kwargs where credentials are passed + + Example: + @app.tool() + @requires_auth(provider=api_key_provider) + def protected_tool(auth: AuthContext, param: str) -> str: + return f"Hello {auth.user_id}" + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> Any: + # Get credentials from kwargs + credentials = kwargs.pop(credentials_key, {}) + + if not credentials: + raise AuthenticationError("No credentials provided") + + if provider is None: + raise AuthenticationError("No auth provider configured") + + # Authenticate + # Note: We need to handle sync/async provider authentication + # For now, we'll require providers to have sync authenticate method + import asyncio + + if asyncio.iscoroutinefunction(provider.authenticate): + # If provider is async, we need to run it + loop = asyncio.get_event_loop() + if loop.is_running(): + # If we're in async context, this won't work + # We'll need the async version of this middleware + raise RuntimeError( + "Async auth provider requires requires_auth_async middleware" + ) + result = loop.run_until_complete(provider.authenticate(credentials)) + else: + # Sync provider - but authenticate is async, so we need to await it + raise RuntimeError("Auth providers must be async - use requires_auth_async") + + if not result.success: + raise AuthenticationError(result.error or "Authentication failed") + + # Inject auth context as first argument + return fn(result.context, *args, **kwargs) + + # Mark function as requiring auth + wrapper._requires_auth = True # type: ignore + wrapper._auth_provider = provider # type: ignore + + return wrapper + + return decorator + + +def requires_auth_async( + provider: AuthProvider | None = None, + credentials_key: str = "auth", +) -> Callable: + """ + Async middleware decorator that requires authentication. + + The decorated async tool will only execute if valid credentials are provided. + The auth context is injected as the first parameter. + + Args: + provider: Auth provider to use (if None, must be set later) + credentials_key: Key in kwargs where credentials are passed + + Example: + @app.tool() + @requires_auth_async(provider=jwt_provider) + async def protected_tool(auth: AuthContext, param: str) -> str: + return f"Hello {auth.user_id}" + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + # Get credentials from kwargs + credentials = kwargs.pop(credentials_key, {}) + + if not credentials: + raise AuthenticationError("No credentials provided") + + if provider is None: + raise AuthenticationError("No auth provider configured") + + # Authenticate (provider.authenticate is async) + result = await provider.authenticate(credentials) + + if not result.success: + raise AuthenticationError(result.error or "Authentication failed") + + # Inject auth context as first argument + if asyncio.iscoroutinefunction(fn): + return await fn(result.context, *args, **kwargs) + else: + return fn(result.context, *args, **kwargs) + + # Mark function as requiring auth + wrapper._requires_auth = True # type: ignore + wrapper._auth_provider = provider # type: ignore + + return wrapper + + return decorator + + +def requires_role(*required_roles: str) -> Callable: + """ + Middleware decorator that requires specific roles. + + Must be used with @requires_auth or @requires_auth_async. + The auth context from the auth middleware is checked for required roles. + + Args: + *required_roles: Role names required (user must have at least one) + + Example: + @app.tool() + @requires_auth_async(provider=api_key_provider) + @requires_role("admin", "moderator") + async def admin_tool(auth: AuthContext) -> str: + return "Admin action performed" + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> Any: + # First argument should be AuthContext (from requires_auth) + if not args or not isinstance(args[0], AuthContext): + raise AuthenticationError( + "requires_role must be used with requires_auth or requires_auth_async" + ) + + auth_context = args[0] + + # Check if user has any of the required roles + has_role = any(auth_context.has_role(role) for role in required_roles) + + if not has_role: + roles_str = ", ".join(required_roles) + raise PermissionDeniedError( + f"One of the following roles required: {roles_str}", + required=roles_str, + user_id=auth_context.user_id, + ) + + return fn(*args, **kwargs) + + # Mark function as requiring roles + wrapper._requires_roles = required_roles # type: ignore + + return wrapper + + return decorator + + +def requires_role_async(*required_roles: str) -> Callable: + """ + Async middleware decorator that requires specific roles. + + Must be used with @requires_auth_async. + The auth context from the auth middleware is checked for required roles. + + Args: + *required_roles: Role names required (user must have at least one) + + Example: + @app.tool() + @requires_auth_async(provider=api_key_provider) + @requires_role_async("admin") + async def admin_tool(auth: AuthContext) -> str: + return "Admin action performed" + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + # First argument should be AuthContext (from requires_auth_async) + if not args or not isinstance(args[0], AuthContext): + raise AuthenticationError( + "requires_role_async must be used with requires_auth_async" + ) + + auth_context = args[0] + + # Check if user has any of the required roles + has_role = any(auth_context.has_role(role) for role in required_roles) + + if not has_role: + roles_str = ", ".join(required_roles) + raise PermissionDeniedError( + f"One of the following roles required: {roles_str}", + required=roles_str, + user_id=auth_context.user_id, + ) + + import asyncio + + if asyncio.iscoroutinefunction(fn): + return await fn(*args, **kwargs) + else: + return fn(*args, **kwargs) + + # Mark function as requiring roles + wrapper._requires_roles = required_roles # type: ignore + + return wrapper + + return decorator + + +def requires_permission(*required_permissions: str) -> Callable: + """ + Middleware decorator that requires specific permissions. + + Must be used with @requires_auth or @requires_auth_async. + The auth context is checked for required permissions. + + Args: + *required_permissions: Permission names required (user must have at least one) + + Example: + @app.tool() + @requires_auth_async(provider=api_key_provider) + @requires_permission("write:posts", "admin:posts") + async def create_post(auth: AuthContext, title: str) -> dict: + return {"title": title, "author": auth.user_id} + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> Any: + # First argument should be AuthContext + if not args or not isinstance(args[0], AuthContext): + raise AuthenticationError( + "requires_permission must be used with requires_auth or requires_auth_async" + ) + + auth_context = args[0] + + # Check if user has any of the required permissions + has_permission = any(auth_context.has_permission(perm) for perm in required_permissions) + + if not has_permission: + perms_str = ", ".join(required_permissions) + raise PermissionDeniedError( + f"One of the following permissions required: {perms_str}", + required=perms_str, + user_id=auth_context.user_id, + ) + + return fn(*args, **kwargs) + + # Mark function as requiring permissions + wrapper._requires_permissions = required_permissions # type: ignore + + return wrapper + + return decorator + + +def requires_permission_async(*required_permissions: str) -> Callable: + """ + Async middleware decorator that requires specific permissions. + + Must be used with @requires_auth_async. + The auth context is checked for required permissions. + + Args: + *required_permissions: Permission names required (user must have at least one) + + Example: + @app.tool() + @requires_auth_async(provider=api_key_provider) + @requires_permission_async("delete:posts") + async def delete_post(auth: AuthContext, post_id: int) -> dict: + return {"deleted": post_id} + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + # First argument should be AuthContext + if not args or not isinstance(args[0], AuthContext): + raise AuthenticationError( + "requires_permission_async must be used with requires_auth_async" + ) + + auth_context = args[0] + + # Check if user has any of the required permissions + has_permission = any(auth_context.has_permission(perm) for perm in required_permissions) + + if not has_permission: + perms_str = ", ".join(required_permissions) + raise PermissionDeniedError( + f"One of the following permissions required: {perms_str}", + required=perms_str, + user_id=auth_context.user_id, + ) + + import asyncio + + if asyncio.iscoroutinefunction(fn): + return await fn(*args, **kwargs) + else: + return fn(*args, **kwargs) + + # Mark function as requiring permissions + wrapper._requires_permissions = required_permissions # type: ignore + + return wrapper + + return decorator + + +# Need to add this import +import asyncio # noqa: E402 diff --git a/nextmcp/auth/providers.py b/nextmcp/auth/providers.py new file mode 100644 index 0000000..6c8d465 --- /dev/null +++ b/nextmcp/auth/providers.py @@ -0,0 +1,411 @@ +""" +Built-in authentication providers for NextMCP. + +This module provides ready-to-use authentication providers for common use cases. +""" + +import logging +import secrets +from collections.abc import Callable +from typing import Any + +from nextmcp.auth.core import AuthContext, AuthProvider, AuthResult, Permission, Role + +logger = logging.getLogger(__name__) + + +class APIKeyProvider(AuthProvider): + """ + API Key authentication provider. + + Validates requests using API keys. Supports: + - Simple key validation + - Key-to-user mapping + - Role assignment per key + - Permission assignment per key + """ + + def __init__( + self, + valid_keys: dict[str, dict[str, Any]] | None = None, + key_validator: Callable[[str], dict[str, Any] | None] | None = None, + **config: Any, + ): + """ + Initialize API key provider. + + Args: + valid_keys: Dictionary mapping API keys to user config. + Example: {"key123": {"user_id": "user1", "roles": ["admin"]}} + key_validator: Optional custom validation function + **config: Additional configuration + """ + super().__init__(**config) + self.valid_keys = valid_keys or {} + self.key_validator = key_validator + + async def authenticate(self, credentials: dict[str, Any]) -> AuthResult: + """ + Authenticate using an API key. + + Expected credentials format: + { + "api_key": "the-api-key-string" + } + + Args: + credentials: Credentials containing api_key + + Returns: + AuthResult with auth context if successful + """ + api_key = credentials.get("api_key") + + if not api_key: + return AuthResult.failure("Missing api_key in credentials") + + # Use custom validator if provided + if self.key_validator: + user_config = self.key_validator(api_key) + if not user_config: + logger.warning("API key validation failed (custom validator)") + return AuthResult.failure("Invalid API key") + # Otherwise check against valid_keys + elif api_key in self.valid_keys: + user_config = self.valid_keys[api_key] + else: + logger.warning(f"Invalid API key attempt: {api_key[:8]}...") + return AuthResult.failure("Invalid API key") + + # Build auth context from user config + context = AuthContext( + authenticated=True, + user_id=user_config.get("user_id", "unknown"), + username=user_config.get("username"), + metadata=user_config.get("metadata", {}), + ) + + # Add roles + for role_name in user_config.get("roles", []): + context.add_role(Role(role_name)) + + # Add permissions + for perm_name in user_config.get("permissions", []): + context.add_permission(Permission(perm_name)) + + logger.info(f"API key authentication successful for user: {context.user_id}") + return AuthResult.success_result(context) + + def validate_credentials(self, credentials: dict[str, Any]) -> bool: + """Validate that credentials contain an api_key.""" + return "api_key" in credentials + + @staticmethod + def generate_key(length: int = 32) -> str: + """ + Generate a secure random API key. + + Args: + length: Length of the key in bytes (default 32) + + Returns: + Hex-encoded API key + """ + return secrets.token_hex(length) + + +class JWTProvider(AuthProvider): + """ + JWT (JSON Web Token) authentication provider. + + Validates and decodes JWT tokens. Requires PyJWT library. + """ + + def __init__( + self, + secret_key: str, + algorithm: str = "HS256", + verify_exp: bool = True, + **config: Any, + ): + """ + Initialize JWT provider. + + Args: + secret_key: Secret key for verifying JWT signatures + algorithm: JWT algorithm (default: HS256) + verify_exp: Whether to verify token expiration (default: True) + **config: Additional configuration + """ + super().__init__(**config) + self.secret_key = secret_key + self.algorithm = algorithm + self.verify_exp = verify_exp + + # Check if PyJWT is available + try: + import jwt # noqa: F401 + + self._jwt = jwt + except ImportError as err: + raise ImportError( + "PyJWT is required for JWT authentication. " "Install with: pip install PyJWT" + ) from err + + async def authenticate(self, credentials: dict[str, Any]) -> AuthResult: + """ + Authenticate using a JWT token. + + Expected credentials format: + { + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." + } + + Expected token payload: + { + "sub": "user_id", + "username": "optional_username", + "roles": ["role1", "role2"], + "permissions": ["perm1", "perm2"], + "exp": 1234567890 + } + + Args: + credentials: Credentials containing JWT token + + Returns: + AuthResult with auth context if successful + """ + token = credentials.get("token") + + if not token: + return AuthResult.failure("Missing token in credentials") + + try: + # Decode and verify JWT + payload = self._jwt.decode( + token, + self.secret_key, + algorithms=[self.algorithm], + options={"verify_exp": self.verify_exp}, + ) + + # Build auth context from token payload + context = AuthContext( + authenticated=True, + user_id=payload.get("sub", "unknown"), + username=payload.get("username"), + metadata=payload.get("metadata", {}), + ) + + # Add roles from token + for role_name in payload.get("roles", []): + context.add_role(Role(role_name)) + + # Add permissions from token + for perm_name in payload.get("permissions", []): + context.add_permission(Permission(perm_name)) + + logger.info(f"JWT authentication successful for user: {context.user_id}") + return AuthResult.success_result(context) + + except self._jwt.ExpiredSignatureError: + logger.warning("JWT token expired") + return AuthResult.failure("Token expired") + except self._jwt.InvalidTokenError as e: + logger.warning(f"Invalid JWT token: {e}") + return AuthResult.failure("Invalid token") + except Exception as e: + logger.error(f"JWT authentication error: {e}", exc_info=True) + return AuthResult.failure("Authentication failed") + + def validate_credentials(self, credentials: dict[str, Any]) -> bool: + """Validate that credentials contain a token.""" + return "token" in credentials + + def create_token( + self, + user_id: str, + roles: list[str] | None = None, + permissions: list[str] | None = None, + expires_in: int = 3600, + **claims: Any, + ) -> str: + """ + Create a JWT token. + + Args: + user_id: User ID (stored in 'sub' claim) + roles: List of role names + permissions: List of permission names + expires_in: Token expiration time in seconds + **claims: Additional claims to include + + Returns: + Encoded JWT token string + """ + import time + + payload = { + "sub": user_id, + "iat": int(time.time()), + "exp": int(time.time()) + expires_in, + "roles": roles or [], + "permissions": permissions or [], + **claims, + } + + return self._jwt.encode(payload, self.secret_key, algorithm=self.algorithm) + + +class SessionProvider(AuthProvider): + """ + Session-based authentication provider. + + Manages user sessions with session IDs and in-memory session storage. + For production, consider using a persistent session store. + """ + + def __init__(self, session_timeout: int = 3600, **config: Any): + """ + Initialize session provider. + + Args: + session_timeout: Session timeout in seconds (default: 1 hour) + **config: Additional configuration + """ + super().__init__(**config) + self.session_timeout = session_timeout + self._sessions: dict[str, dict[str, Any]] = {} + + async def authenticate(self, credentials: dict[str, Any]) -> AuthResult: + """ + Authenticate using a session ID. + + Expected credentials format: + { + "session_id": "session-uuid-string" + } + + Args: + credentials: Credentials containing session_id + + Returns: + AuthResult with auth context if successful + """ + import time + + session_id = credentials.get("session_id") + + if not session_id: + return AuthResult.failure("Missing session_id in credentials") + + # Check if session exists + if session_id not in self._sessions: + logger.warning(f"Invalid session ID: {session_id}") + return AuthResult.failure("Invalid session") + + session = self._sessions[session_id] + + # Check if session is expired + if time.time() > session.get("expires_at", 0): + logger.info(f"Session expired: {session_id}") + del self._sessions[session_id] + return AuthResult.failure("Session expired") + + # Build auth context from session + context = AuthContext( + authenticated=True, + user_id=session.get("user_id", "unknown"), + username=session.get("username"), + metadata=session.get("metadata", {}), + ) + + # Add roles + for role_name in session.get("roles", []): + context.add_role(Role(role_name)) + + # Add permissions + for perm_name in session.get("permissions", []): + context.add_permission(Permission(perm_name)) + + logger.info(f"Session authentication successful for user: {context.user_id}") + return AuthResult.success_result(context) + + def create_session( + self, + user_id: str, + roles: list[str] | None = None, + permissions: list[str] | None = None, + username: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> str: + """ + Create a new session. + + Args: + user_id: User ID + roles: List of role names + permissions: List of permission names + username: Optional username + metadata: Optional metadata dictionary + + Returns: + Session ID string + """ + import time + + session_id = secrets.token_urlsafe(32) + + self._sessions[session_id] = { + "user_id": user_id, + "username": username, + "roles": roles or [], + "permissions": permissions or [], + "metadata": metadata or {}, + "created_at": time.time(), + "expires_at": time.time() + self.session_timeout, + } + + logger.info(f"Created session for user: {user_id}") + return session_id + + def destroy_session(self, session_id: str) -> bool: + """ + Destroy a session. + + Args: + session_id: Session ID to destroy + + Returns: + True if session was destroyed, False if not found + """ + if session_id in self._sessions: + del self._sessions[session_id] + logger.info(f"Destroyed session: {session_id}") + return True + return False + + def validate_credentials(self, credentials: dict[str, Any]) -> bool: + """Validate that credentials contain a session_id.""" + return "session_id" in credentials + + def cleanup_expired_sessions(self) -> int: + """ + Remove expired sessions from storage. + + Returns: + Number of sessions cleaned up + """ + import time + + now = time.time() + expired = [sid for sid, sess in self._sessions.items() if now > sess.get("expires_at", 0)] + + for session_id in expired: + del self._sessions[session_id] + + if expired: + logger.info(f"Cleaned up {len(expired)} expired sessions") + + return len(expired) diff --git a/nextmcp/auth/rbac.py b/nextmcp/auth/rbac.py new file mode 100644 index 0000000..9bb79dd --- /dev/null +++ b/nextmcp/auth/rbac.py @@ -0,0 +1,249 @@ +""" +Role-Based Access Control (RBAC) system for NextMCP. + +This module provides a comprehensive RBAC system for managing roles, +permissions, and access control at the tool level. +""" + +import logging +from typing import Any + +from nextmcp.auth.core import AuthContext, Permission, Role + +logger = logging.getLogger(__name__) + + +class PermissionDeniedError(Exception): + """Raised when a user lacks required permissions.""" + + def __init__(self, message: str, required: str | None = None, user_id: str | None = None): + self.required = required + self.user_id = user_id + super().__init__(message) + + +class RBAC: + """ + Role-Based Access Control system. + + Manages roles, permissions, and provides methods for checking access. + Can be used standalone or integrated with NextMCP auth system. + """ + + def __init__(self) -> None: + """Initialize the RBAC system.""" + self._roles: dict[str, Role] = {} + self._permissions: dict[str, Permission] = {} + + def define_permission( + self, name: str, description: str = "", resource: str | None = None + ) -> Permission: + """ + Define a new permission. + + Args: + name: Permission name (e.g., "read:posts", "admin:users") + description: Human-readable description + resource: Optional resource this permission applies to + + Returns: + The created Permission object + """ + permission = Permission(name=name, description=description, resource=resource) + self._permissions[name] = permission + logger.debug(f"Defined permission: {name}") + return permission + + def define_role(self, name: str, description: str = "") -> Role: + """ + Define a new role. + + Args: + name: Role name (e.g., "admin", "editor", "viewer") + description: Human-readable description + + Returns: + The created Role object + """ + role = Role(name=name, description=description) + self._roles[name] = role + logger.debug(f"Defined role: {name}") + return role + + def assign_permission_to_role(self, role_name: str, permission_name: str) -> None: + """ + Assign a permission to a role. + + Args: + role_name: Name of the role + permission_name: Name of the permission + + Raises: + ValueError: If role or permission doesn't exist + """ + if role_name not in self._roles: + raise ValueError(f"Role '{role_name}' not found") + + if permission_name not in self._permissions: + # Auto-create permission if it doesn't exist + self.define_permission(permission_name) + + role = self._roles[role_name] + permission = self._permissions[permission_name] + role.add_permission(permission) + logger.debug(f"Assigned permission '{permission_name}' to role '{role_name}'") + + def get_role(self, name: str) -> Role | None: + """Get a role by name.""" + return self._roles.get(name) + + def get_permission(self, name: str) -> Permission | None: + """Get a permission by name.""" + return self._permissions.get(name) + + def list_roles(self) -> list[Role]: + """List all defined roles.""" + return list(self._roles.values()) + + def list_permissions(self) -> list[Permission]: + """List all defined permissions.""" + return list(self._permissions.values()) + + def check_permission(self, context: AuthContext, permission_name: str) -> bool: + """ + Check if an auth context has a specific permission. + + Args: + context: Authentication context + permission_name: Required permission name + + Returns: + True if context has the permission, False otherwise + """ + if not context.authenticated: + return False + + return context.has_permission(permission_name) + + def check_role(self, context: AuthContext, role_name: str) -> bool: + """ + Check if an auth context has a specific role. + + Args: + context: Authentication context + role_name: Required role name + + Returns: + True if context has the role, False otherwise + """ + if not context.authenticated: + return False + + return context.has_role(role_name) + + def require_permission(self, context: AuthContext, permission_name: str) -> None: + """ + Require that an auth context has a specific permission. + + Args: + context: Authentication context + permission_name: Required permission name + + Raises: + PermissionDeniedError: If permission is not granted + """ + if not self.check_permission(context, permission_name): + raise PermissionDeniedError( + f"Permission '{permission_name}' required", + required=permission_name, + user_id=context.user_id, + ) + + def require_role(self, context: AuthContext, role_name: str) -> None: + """ + Require that an auth context has a specific role. + + Args: + context: Authentication context + role_name: Required role name + + Raises: + PermissionDeniedError: If role is not granted + """ + if not self.check_role(context, role_name): + raise PermissionDeniedError( + f"Role '{role_name}' required", required=role_name, user_id=context.user_id + ) + + def load_from_config(self, config: dict[str, Any]) -> None: + """ + Load roles and permissions from configuration. + + Expected format: + { + "permissions": [ + {"name": "read:posts", "description": "Read posts"}, + {"name": "write:posts", "description": "Write posts"} + ], + "roles": [ + { + "name": "editor", + "description": "Content editor", + "permissions": ["read:posts", "write:posts"] + } + ] + } + + Args: + config: Configuration dictionary + """ + # Load permissions + for perm_config in config.get("permissions", []): + self.define_permission( + name=perm_config["name"], + description=perm_config.get("description", ""), + resource=perm_config.get("resource"), + ) + + # Load roles + for role_config in config.get("roles", []): + role = self.define_role( + name=role_config["name"], description=role_config.get("description", "") + ) + + # Assign permissions to role + for perm_name in role_config.get("permissions", []): + if perm_name not in self._permissions: + self.define_permission(perm_name) + role.add_permission(self._permissions[perm_name]) + + logger.info( + f"Loaded RBAC config: {len(self._roles)} roles, " + f"{len(self._permissions)} permissions" + ) + + def to_dict(self) -> dict[str, Any]: + """ + Export RBAC configuration as dictionary. + + Returns: + Dictionary representation of roles and permissions + """ + return { + "permissions": [ + { + "name": p.name, + "description": p.description, + "resource": p.resource, + } + for p in self._permissions.values() + ], + "roles": [ + { + "name": r.name, + "description": r.description, + "permissions": [p.name for p in r.permissions], + } + for r in self._roles.values() + ], + } diff --git a/pyproject.toml b/pyproject.toml index 2849349..378845b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "nextmcp" -version = "0.3.0" +version = "0.4.0" description = "Production-grade MCP server toolkit with minimal boilerplate" readme = "README.md" requires-python = ">=3.10" diff --git a/scripts/install-hooks.sh b/scripts/install-hooks.sh new file mode 100755 index 0000000..2d7c0ce --- /dev/null +++ b/scripts/install-hooks.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# Install Git Hooks +# This script installs pre-commit hooks for NextMCP development + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +HOOKS_DIR="$REPO_ROOT/.git/hooks" + +echo "Installing Git hooks for NextMCP..." +echo "" + +# Check if we're in a git repository +if [ ! -d "$REPO_ROOT/.git" ]; then + echo "❌ Error: Not in a git repository" + exit 1 +fi + +# Install pre-commit hook +if [ -f "$REPO_ROOT/hooks/pre-commit" ]; then + cp "$REPO_ROOT/hooks/pre-commit" "$HOOKS_DIR/pre-commit" + chmod +x "$HOOKS_DIR/pre-commit" + echo "✅ Installed pre-commit hook" +else + echo "❌ Error: hooks/pre-commit not found" + exit 1 +fi + +echo "" +echo "✨ Git hooks installed successfully!" +echo "" +echo "The pre-commit hook will now run automatically before each commit to:" +echo " - Check code with ruff (auto-fix enabled)" +echo " - Format code with black" +echo " - Run all tests" +echo "" +echo "To bypass the hook (not recommended), use: git commit --no-verify" diff --git a/tests/test_auth_providers.py b/tests/test_auth_providers.py new file mode 100644 index 0000000..194e5c7 --- /dev/null +++ b/tests/test_auth_providers.py @@ -0,0 +1,385 @@ +""" +Tests for authentication providers. + +Tests for APIKeyProvider, JWTProvider, and SessionProvider. +""" + +import pytest + +from nextmcp.auth.providers import APIKeyProvider, JWTProvider, SessionProvider + + +class TestAPIKeyProvider: + """Tests for API Key authentication provider.""" + + def test_provider_initialization(self): + """Test basic provider initialization.""" + provider = APIKeyProvider(valid_keys={"test-key": {"user_id": "user1", "roles": ["admin"]}}) + + assert provider.valid_keys == {"test-key": {"user_id": "user1", "roles": ["admin"]}} + + @pytest.mark.asyncio + async def test_successful_authentication(self): + """Test successful API key authentication.""" + provider = APIKeyProvider( + valid_keys={ + "test-key-123": { + "user_id": "user1", + "username": "alice", + "roles": ["admin"], + "permissions": ["read:posts", "write:posts"], + } + } + ) + + result = await provider.authenticate({"api_key": "test-key-123"}) + + assert result.success is True + assert result.context is not None + assert result.context.authenticated is True + assert result.context.user_id == "user1" + assert result.context.username == "alice" + assert result.context.has_role("admin") + assert result.context.has_permission("read:posts") + assert result.context.has_permission("write:posts") + + @pytest.mark.asyncio + async def test_invalid_api_key(self): + """Test authentication with invalid API key.""" + provider = APIKeyProvider(valid_keys={"valid-key": {"user_id": "user1"}}) + + result = await provider.authenticate({"api_key": "invalid-key"}) + + assert result.success is False + assert result.error == "Invalid API key" + assert result.context is None + + @pytest.mark.asyncio + async def test_missing_api_key(self): + """Test authentication with missing API key.""" + provider = APIKeyProvider(valid_keys={"test-key": {"user_id": "user1"}}) + + result = await provider.authenticate({}) + + assert result.success is False + assert result.error == "Missing api_key in credentials" + + @pytest.mark.asyncio + async def test_custom_validator(self): + """Test API key provider with custom validator function.""" + + def custom_validator(api_key: str): + if api_key == "custom-key": + return {"user_id": "custom-user", "roles": ["user"]} + return None + + provider = APIKeyProvider(key_validator=custom_validator) + + # Valid custom key + result = await provider.authenticate({"api_key": "custom-key"}) + assert result.success is True + assert result.context.user_id == "custom-user" + + # Invalid custom key + result = await provider.authenticate({"api_key": "wrong-key"}) + assert result.success is False + + def test_generate_key(self): + """Test API key generation.""" + key1 = APIKeyProvider.generate_key() + key2 = APIKeyProvider.generate_key() + + # Keys should be different + assert key1 != key2 + + # Default length is 32 bytes = 64 hex chars + assert len(key1) == 64 + assert len(key2) == 64 + + # Custom length + key3 = APIKeyProvider.generate_key(length=16) + assert len(key3) == 32 # 16 bytes = 32 hex chars + + def test_validate_credentials(self): + """Test credentials validation.""" + provider = APIKeyProvider() + + assert provider.validate_credentials({"api_key": "test"}) is True + assert provider.validate_credentials({}) is False + assert provider.validate_credentials({"other": "value"}) is False + + +class TestJWTProvider: + """Tests for JWT authentication provider.""" + + def test_provider_initialization(self): + """Test JWT provider initialization.""" + provider = JWTProvider(secret_key="test-secret") + + assert provider.secret_key == "test-secret" + assert provider.algorithm == "HS256" + assert provider.verify_exp is True + + def test_create_token(self): + """Test JWT token creation.""" + provider = JWTProvider(secret_key="test-secret") + + token = provider.create_token( + user_id="user123", + roles=["admin", "editor"], + permissions=["read:all", "write:all"], + username="alice", + ) + + assert isinstance(token, str) + assert len(token) > 0 + + @pytest.mark.asyncio + async def test_successful_authentication(self): + """Test successful JWT authentication.""" + provider = JWTProvider(secret_key="test-secret") + + # Create a token + token = provider.create_token( + user_id="user123", + roles=["admin"], + permissions=["read:posts"], + username="alice", + ) + + # Authenticate with the token + result = await provider.authenticate({"token": token}) + + assert result.success is True + assert result.context is not None + assert result.context.authenticated is True + assert result.context.user_id == "user123" + assert result.context.username == "alice" + assert result.context.has_role("admin") + assert result.context.has_permission("read:posts") + + @pytest.mark.asyncio + async def test_expired_token(self): + """Test authentication with expired token.""" + provider = JWTProvider(secret_key="test-secret") + + # Create a token that expires immediately + token = provider.create_token(user_id="user123", expires_in=-1) + + # Try to authenticate + result = await provider.authenticate({"token": token}) + + assert result.success is False + assert result.error == "Token expired" + + @pytest.mark.asyncio + async def test_invalid_token(self): + """Test authentication with invalid token.""" + provider = JWTProvider(secret_key="test-secret") + + result = await provider.authenticate({"token": "invalid.token.here"}) + + assert result.success is False + assert result.error == "Invalid token" + + @pytest.mark.asyncio + async def test_missing_token(self): + """Test authentication with missing token.""" + provider = JWTProvider(secret_key="test-secret") + + result = await provider.authenticate({}) + + assert result.success is False + assert result.error == "Missing token in credentials" + + @pytest.mark.asyncio + async def test_token_with_custom_claims(self): + """Test JWT with custom claims.""" + provider = JWTProvider(secret_key="test-secret") + + token = provider.create_token(user_id="user123", custom_field="custom_value", number=42) + + result = await provider.authenticate({"token": token}) + + assert result.success is True + # Custom claims should be in the token payload (though not in our AuthContext) + + def test_validate_credentials(self): + """Test credentials validation.""" + provider = JWTProvider(secret_key="test-secret") + + assert provider.validate_credentials({"token": "test"}) is True + assert provider.validate_credentials({}) is False + assert provider.validate_credentials({"other": "value"}) is False + + @pytest.mark.asyncio + async def test_wrong_secret_key(self): + """Test JWT verification fails with wrong secret key.""" + provider1 = JWTProvider(secret_key="secret1") + provider2 = JWTProvider(secret_key="secret2") + + # Create token with provider1 + token = provider1.create_token(user_id="user123") + + # Try to verify with provider2 (different secret) + result = await provider2.authenticate({"token": token}) + + assert result.success is False + assert result.error == "Invalid token" + + +class TestSessionProvider: + """Tests for session-based authentication provider.""" + + def test_provider_initialization(self): + """Test session provider initialization.""" + provider = SessionProvider(session_timeout=1800) + + assert provider.session_timeout == 1800 + assert provider._sessions == {} + + def test_create_session(self): + """Test session creation.""" + provider = SessionProvider() + + session_id = provider.create_session( + user_id="user123", + username="alice", + roles=["admin"], + permissions=["read:all"], + metadata={"ip": "192.168.1.1"}, + ) + + assert isinstance(session_id, str) + assert len(session_id) > 0 + assert session_id in provider._sessions + + # Check session data + session = provider._sessions[session_id] + assert session["user_id"] == "user123" + assert session["username"] == "alice" + assert session["roles"] == ["admin"] + assert session["permissions"] == ["read:all"] + assert session["metadata"] == {"ip": "192.168.1.1"} + + @pytest.mark.asyncio + async def test_successful_authentication(self): + """Test successful session authentication.""" + provider = SessionProvider() + + # Create a session + session_id = provider.create_session( + user_id="user123", username="alice", roles=["admin"], permissions=["read:posts"] + ) + + # Authenticate with session ID + result = await provider.authenticate({"session_id": session_id}) + + assert result.success is True + assert result.context is not None + assert result.context.authenticated is True + assert result.context.user_id == "user123" + assert result.context.username == "alice" + assert result.context.has_role("admin") + assert result.context.has_permission("read:posts") + + @pytest.mark.asyncio + async def test_invalid_session_id(self): + """Test authentication with invalid session ID.""" + provider = SessionProvider() + + result = await provider.authenticate({"session_id": "invalid-session"}) + + assert result.success is False + assert result.error == "Invalid session" + + @pytest.mark.asyncio + async def test_missing_session_id(self): + """Test authentication with missing session ID.""" + provider = SessionProvider() + + result = await provider.authenticate({}) + + assert result.success is False + assert result.error == "Missing session_id in credentials" + + @pytest.mark.asyncio + async def test_expired_session(self): + """Test authentication with expired session.""" + provider = SessionProvider(session_timeout=0) # Expire immediately + + # Create a session + session_id = provider.create_session(user_id="user123") + + # Session should expire immediately + import time + + time.sleep(0.1) + + result = await provider.authenticate({"session_id": session_id}) + + assert result.success is False + assert result.error == "Session expired" + # Session should be removed + assert session_id not in provider._sessions + + def test_destroy_session(self): + """Test session destruction.""" + provider = SessionProvider() + + # Create a session + session_id = provider.create_session(user_id="user123") + assert session_id in provider._sessions + + # Destroy it + result = provider.destroy_session(session_id) + assert result is True + assert session_id not in provider._sessions + + # Try to destroy non-existent session + result = provider.destroy_session("non-existent") + assert result is False + + def test_cleanup_expired_sessions(self): + """Test cleanup of expired sessions.""" + provider = SessionProvider(session_timeout=0) # Expire immediately + + # Create several sessions + _session1 = provider.create_session(user_id="user1") + _session2 = provider.create_session(user_id="user2") + _session3 = provider.create_session(user_id="user3") + + assert len(provider._sessions) == 3 + + # Wait for expiration + import time + + time.sleep(0.1) + + # Cleanup + cleaned = provider.cleanup_expired_sessions() + + assert cleaned == 3 + assert len(provider._sessions) == 0 + + def test_cleanup_no_expired_sessions(self): + """Test cleanup when no sessions are expired.""" + provider = SessionProvider(session_timeout=3600) + + # Create sessions + _session1 = provider.create_session(user_id="user1") + _session2 = provider.create_session(user_id="user2") + + # Cleanup (none should be expired) + cleaned = provider.cleanup_expired_sessions() + + assert cleaned == 0 + assert len(provider._sessions) == 2 + + def test_validate_credentials(self): + """Test credentials validation.""" + provider = SessionProvider() + + assert provider.validate_credentials({"session_id": "test"}) is True + assert provider.validate_credentials({}) is False + assert provider.validate_credentials({"other": "value"}) is False diff --git a/tests/test_rbac.py b/tests/test_rbac.py new file mode 100644 index 0000000..36e47f3 --- /dev/null +++ b/tests/test_rbac.py @@ -0,0 +1,456 @@ +""" +Tests for Role-Based Access Control (RBAC) system. + +Tests for Permission, Role, and RBAC classes. +""" + +import pytest + +from nextmcp.auth.core import AuthContext, Permission, Role +from nextmcp.auth.rbac import RBAC, PermissionDeniedError + + +class TestPermission: + """Tests for Permission class.""" + + def test_permission_creation(self): + """Test basic permission creation.""" + perm = Permission(name="read:posts", description="Read blog posts") + + assert perm.name == "read:posts" + assert perm.description == "Read blog posts" + assert perm.resource is None + + def test_permission_with_resource(self): + """Test permission with resource.""" + perm = Permission(name="write:posts", resource="posts") + + assert perm.name == "write:posts" + assert perm.resource == "posts" + + def test_permission_string(self): + """Test permission string representation.""" + perm = Permission(name="admin:users") + assert str(perm) == "admin:users" + + def test_permission_hash_and_equality(self): + """Test permission hashing and equality.""" + perm1 = Permission(name="read:posts") + perm2 = Permission(name="read:posts") + perm3 = Permission(name="write:posts") + + assert perm1 == perm2 + assert perm1 != perm3 + assert hash(perm1) == hash(perm2) + assert hash(perm1) != hash(perm3) + + # Can be used in sets + perm_set = {perm1, perm2, perm3} + assert len(perm_set) == 2 # perm1 and perm2 are the same + + def test_permission_matches_exact(self): + """Test exact permission matching.""" + perm = Permission(name="read:posts") + + assert perm.matches("read:posts") is True + assert perm.matches("write:posts") is False + + def test_permission_matches_wildcard(self): + """Test wildcard permission matching.""" + perm_all = Permission(name="*") + assert perm_all.matches("read:posts") is True + assert perm_all.matches("anything") is True + + perm_admin = Permission(name="admin:*") + assert perm_admin.matches("admin:users") is True + assert perm_admin.matches("admin:posts") is True + assert perm_admin.matches("read:posts") is False + + +class TestRole: + """Tests for Role class.""" + + def test_role_creation(self): + """Test basic role creation.""" + role = Role(name="admin", description="Administrator") + + assert role.name == "admin" + assert role.description == "Administrator" + assert len(role.permissions) == 0 + + def test_role_string(self): + """Test role string representation.""" + role = Role(name="editor") + assert str(role) == "editor" + + def test_role_hash_and_equality(self): + """Test role hashing and equality.""" + role1 = Role(name="admin") + role2 = Role(name="admin") + role3 = Role(name="editor") + + assert role1 == role2 + assert role1 != role3 + assert hash(role1) == hash(role2) + + # Can be used in sets + role_set = {role1, role2, role3} + assert len(role_set) == 2 + + def test_add_permission(self): + """Test adding permissions to role.""" + role = Role(name="editor") + + # Add permission object + perm1 = Permission(name="read:posts") + role.add_permission(perm1) + assert perm1 in role.permissions + + # Add permission by name + role.add_permission("write:posts") + assert any(p.name == "write:posts" for p in role.permissions) + + def test_has_permission(self): + """Test checking if role has permission.""" + role = Role(name="editor") + role.add_permission(Permission(name="read:posts")) + role.add_permission(Permission(name="write:posts")) + + assert role.has_permission("read:posts") is True + assert role.has_permission("write:posts") is True + assert role.has_permission("delete:posts") is False + + def test_has_permission_wildcard(self): + """Test role with wildcard permissions.""" + role = Role(name="admin") + role.add_permission(Permission(name="admin:*")) + + assert role.has_permission("admin:users") is True + assert role.has_permission("admin:posts") is True + assert role.has_permission("read:posts") is False + + +class TestAuthContext: + """Tests for AuthContext class.""" + + def test_auth_context_creation(self): + """Test basic auth context creation.""" + context = AuthContext(authenticated=True, user_id="user123", username="alice") + + assert context.authenticated is True + assert context.user_id == "user123" + assert context.username == "alice" + assert len(context.roles) == 0 + assert len(context.permissions) == 0 + + def test_add_role(self): + """Test adding roles to context.""" + context = AuthContext(authenticated=True, user_id="user123") + + # Add role object + role1 = Role(name="admin") + context.add_role(role1) + assert role1 in context.roles + + # Add role by name + context.add_role("editor") + assert any(r.name == "editor" for r in context.roles) + + def test_add_permission(self): + """Test adding permissions to context.""" + context = AuthContext(authenticated=True, user_id="user123") + + # Add permission object + perm1 = Permission(name="read:posts") + context.add_permission(perm1) + assert perm1 in context.permissions + + # Add permission by name + context.add_permission("write:posts") + assert any(p.name == "write:posts" for p in context.permissions) + + def test_has_role(self): + """Test checking if context has role.""" + context = AuthContext(authenticated=True, user_id="user123") + context.add_role(Role(name="admin")) + context.add_role(Role(name="editor")) + + assert context.has_role("admin") is True + assert context.has_role("editor") is True + assert context.has_role("viewer") is False + + def test_has_permission_direct(self): + """Test checking direct permissions.""" + context = AuthContext(authenticated=True, user_id="user123") + context.add_permission(Permission(name="read:posts")) + + assert context.has_permission("read:posts") is True + assert context.has_permission("write:posts") is False + + def test_has_permission_from_role(self): + """Test checking permissions from roles.""" + # Create role with permissions + editor_role = Role(name="editor") + editor_role.add_permission(Permission(name="read:posts")) + editor_role.add_permission(Permission(name="write:posts")) + + # Create context with role + context = AuthContext(authenticated=True, user_id="user123") + context.add_role(editor_role) + + assert context.has_permission("read:posts") is True + assert context.has_permission("write:posts") is True + assert context.has_permission("delete:posts") is False + + +class TestRBAC: + """Tests for RBAC system.""" + + def test_rbac_initialization(self): + """Test RBAC initialization.""" + rbac = RBAC() + + assert len(rbac.list_roles()) == 0 + assert len(rbac.list_permissions()) == 0 + + def test_define_permission(self): + """Test defining permissions.""" + rbac = RBAC() + + perm = rbac.define_permission("read:posts", "Read blog posts") + + assert perm.name == "read:posts" + assert perm.description == "Read blog posts" + assert rbac.get_permission("read:posts") == perm + + def test_define_role(self): + """Test defining roles.""" + rbac = RBAC() + + role = rbac.define_role("admin", "Administrator") + + assert role.name == "admin" + assert role.description == "Administrator" + assert rbac.get_role("admin") == role + + def test_assign_permission_to_role(self): + """Test assigning permissions to roles.""" + rbac = RBAC() + + rbac.define_permission("read:posts") + rbac.define_role("viewer") + + rbac.assign_permission_to_role("viewer", "read:posts") + + role = rbac.get_role("viewer") + assert role.has_permission("read:posts") is True + + def test_assign_permission_auto_create(self): + """Test auto-creating permission when assigning to role.""" + rbac = RBAC() + rbac.define_role("editor") + + # Assign permission that doesn't exist yet (should auto-create) + rbac.assign_permission_to_role("editor", "write:posts") + + role = rbac.get_role("editor") + assert role.has_permission("write:posts") is True + assert rbac.get_permission("write:posts") is not None + + def test_assign_permission_invalid_role(self): + """Test assigning permission to non-existent role.""" + rbac = RBAC() + + with pytest.raises(ValueError, match="Role 'nonexistent' not found"): + rbac.assign_permission_to_role("nonexistent", "read:posts") + + def test_list_roles_and_permissions(self): + """Test listing all roles and permissions.""" + rbac = RBAC() + + rbac.define_role("admin") + rbac.define_role("editor") + rbac.define_permission("read:posts") + rbac.define_permission("write:posts") + + assert len(rbac.list_roles()) == 2 + assert len(rbac.list_permissions()) == 2 + + def test_check_permission(self): + """Test checking permissions on auth context.""" + rbac = RBAC() + + # Create role with permission + rbac.define_role("editor") + rbac.define_permission("write:posts") + rbac.assign_permission_to_role("editor", "write:posts") + + # Create auth context with role + context = AuthContext(authenticated=True, user_id="user123") + context.add_role(rbac.get_role("editor")) + + assert rbac.check_permission(context, "write:posts") is True + assert rbac.check_permission(context, "delete:posts") is False + + def test_check_permission_not_authenticated(self): + """Test permission check fails when not authenticated.""" + rbac = RBAC() + + context = AuthContext(authenticated=False) + + assert rbac.check_permission(context, "any:permission") is False + + def test_check_role(self): + """Test checking roles on auth context.""" + rbac = RBAC() + rbac.define_role("admin") + + context = AuthContext(authenticated=True, user_id="user123") + context.add_role(rbac.get_role("admin")) + + assert rbac.check_role(context, "admin") is True + assert rbac.check_role(context, "editor") is False + + def test_require_permission_success(self): + """Test requiring permission succeeds when granted.""" + rbac = RBAC() + rbac.define_permission("read:posts") + + context = AuthContext(authenticated=True, user_id="user123") + context.add_permission(rbac.get_permission("read:posts")) + + # Should not raise + rbac.require_permission(context, "read:posts") + + def test_require_permission_denied(self): + """Test requiring permission raises when not granted.""" + rbac = RBAC() + + context = AuthContext(authenticated=True, user_id="user123") + + with pytest.raises(PermissionDeniedError, match="Permission 'write:posts' required"): + rbac.require_permission(context, "write:posts") + + def test_require_role_success(self): + """Test requiring role succeeds when granted.""" + rbac = RBAC() + rbac.define_role("admin") + + context = AuthContext(authenticated=True, user_id="user123") + context.add_role(rbac.get_role("admin")) + + # Should not raise + rbac.require_role(context, "admin") + + def test_require_role_denied(self): + """Test requiring role raises when not granted.""" + rbac = RBAC() + + context = AuthContext(authenticated=True, user_id="user123") + + with pytest.raises(PermissionDeniedError, match="Role 'admin' required"): + rbac.require_role(context, "admin") + + def test_load_from_config(self): + """Test loading RBAC configuration from dict.""" + rbac = RBAC() + + config = { + "permissions": [ + {"name": "read:posts", "description": "Read posts"}, + {"name": "write:posts", "description": "Write posts"}, + {"name": "delete:posts", "description": "Delete posts"}, + ], + "roles": [ + { + "name": "viewer", + "description": "Read-only access", + "permissions": ["read:posts"], + }, + { + "name": "editor", + "description": "Can edit content", + "permissions": ["read:posts", "write:posts"], + }, + { + "name": "admin", + "description": "Full access", + "permissions": ["read:posts", "write:posts", "delete:posts"], + }, + ], + } + + rbac.load_from_config(config) + + # Check permissions loaded + assert len(rbac.list_permissions()) == 3 + assert rbac.get_permission("read:posts") is not None + + # Check roles loaded + assert len(rbac.list_roles()) == 3 + + # Check role permissions + viewer = rbac.get_role("viewer") + assert viewer.has_permission("read:posts") is True + assert viewer.has_permission("write:posts") is False + + editor = rbac.get_role("editor") + assert editor.has_permission("read:posts") is True + assert editor.has_permission("write:posts") is True + assert editor.has_permission("delete:posts") is False + + admin = rbac.get_role("admin") + assert admin.has_permission("read:posts") is True + assert admin.has_permission("write:posts") is True + assert admin.has_permission("delete:posts") is True + + def test_to_dict(self): + """Test exporting RBAC configuration to dict.""" + rbac = RBAC() + + rbac.define_permission("read:posts", "Read posts") + rbac.define_permission("write:posts", "Write posts") + + rbac.define_role("viewer", "Read-only") + rbac.assign_permission_to_role("viewer", "read:posts") + + rbac.define_role("editor", "Can edit") + rbac.assign_permission_to_role("editor", "read:posts") + rbac.assign_permission_to_role("editor", "write:posts") + + config = rbac.to_dict() + + assert "permissions" in config + assert "roles" in config + assert len(config["permissions"]) == 2 + assert len(config["roles"]) == 2 + + # Check permission in export + perm_names = [p["name"] for p in config["permissions"]] + assert "read:posts" in perm_names + assert "write:posts" in perm_names + + # Check role in export + editor_role = next(r for r in config["roles"] if r["name"] == "editor") + assert "read:posts" in editor_role["permissions"] + assert "write:posts" in editor_role["permissions"] + + +class TestPermissionDeniedError: + """Tests for PermissionDeniedError exception.""" + + def test_error_creation(self): + """Test creating permission denied error.""" + error = PermissionDeniedError("Access denied", required="admin:users", user_id="user123") + + assert str(error) == "Access denied" + assert error.required == "admin:users" + assert error.user_id == "user123" + + def test_error_without_details(self): + """Test error without required/user_id details.""" + error = PermissionDeniedError("Access denied") + + assert str(error) == "Access denied" + assert error.required is None + assert error.user_id is None