diff --git a/README.md b/README.md index 171c770..f0bfe1c 100644 --- a/README.md +++ b/README.md @@ -1,161 +1,534 @@ - # qh -Quick Http web-service construction. -Getting from python to an http-service exposing them to the world, -in the easiest way machinely possible. +**Quick HTTP web-service construction** - From Python functions to production-ready HTTP services, with minimal boilerplate. + +`qh` (pronounced "quick") is a convention-over-configuration framework for exposing Python functions as HTTP services. Built on FastAPI, it provides a delightfully simple API while giving you escape hatches for advanced use cases. + +```bash +pip install qh +``` -Harnesses the great power of [py2http](https://github.com/i2mint/py2http) -without all the responsibilities. +## Quickstart: From Function to API in 3 Lines -This is meant for the desireable lightening fast development cycles during -proof-of-conceptualization. -As you move towards production, consider using one of those boring grown-up tools out there... +```python +from qh import mk_app +def add(x: int, y: int) -> int: + return x + y -To install: ```pip install qh``` +app = mk_app([add]) +``` -# Examples +That's it! You now have a FastAPI app with: +- βœ… Automatic request/response handling +- βœ… Type validation from your annotations +- βœ… OpenAPI documentation at `/docs` +- βœ… Multiple input formats (JSON body, query params, etc.) -## When dealing only with simple (json) types... +Run it: +```bash +uvicorn your_module:app +``` +Or test it: ```python -import qh -from qh import mk_http_service_app +from qh.testing import test_app -def poke(): - return 'here is a peek' +with test_app(app) as client: + response = client.post("/add", json={"x": 3, "y": 5}) + print(response.json()) # 8 +``` -def foo(x: int): - return x + 2 +## What You Can Do From Here -def bar(name='world'): - return f"Hello {name}!" +### πŸš€ Async Task Processing (NEW in v0.5.0) -app = mk_http_service_app([foo, bar, poke]) -app.run() -``` +Handle long-running operations without blocking: +```python +import time + +def expensive_computation(n: int) -> int: + time.sleep(5) # Simulate heavy processing + return n * 2 + +# Enable async support +app = mk_app( + [expensive_computation], + async_funcs=['expensive_computation'] +) ``` -Bottle v0.12.19 server starting up (using WSGIRefServer())... -Listening on http://127.0.0.1:8080/ -Hit Ctrl-C to quit. + +Now clients can choose sync or async execution: + +```python +# Synchronous (blocks for 5 seconds) +POST /expensive_computation?n=10 +β†’ 20 + +# Asynchronous (returns immediately) +POST /expensive_computation?n=10&async=true +β†’ {"task_id": "abc-123", "status": "submitted"} + +# Check status +GET /tasks/abc-123/status +β†’ {"status": "running", "started_at": 1234567890} + +# Get result (blocks until ready, or returns immediately if done) +GET /tasks/abc-123/result?wait=true&timeout=10 +β†’ {"status": "completed", "result": 20} ``` -Now grab a browser and go to `http://127.0.0.1:8080/ping` -(it's a GET route that the app gives you for free, to test if alive) +**Advanced async configuration:** +```python +from qh import mk_app, TaskConfig, ProcessPoolTaskExecutor + +app = mk_app( + [cpu_bound_func, io_bound_func], + async_funcs=['cpu_bound_func', 'io_bound_func'], + async_config={ + 'cpu_bound_func': TaskConfig( + executor=ProcessPoolTaskExecutor(max_workers=4), # Use processes for CPU-bound + ttl=3600, # Keep results for 1 hour + ), + 'io_bound_func': TaskConfig( + async_mode='always', # Always async, no query param needed + ), + } +) ``` -{"ping": "pong"} + +Task management endpoints are automatically created: +- `GET /tasks/` - List all tasks +- `GET /tasks/{id}` - Get complete task info +- `GET /tasks/{id}/status` - Get task status +- `GET /tasks/{id}/result` - Get result (with optional wait) +- `DELETE /tasks/{id}` - Cancel/delete task + +### πŸ“ Convention-Based Routing + +```python +def get_user(user_id: str): + return {"id": user_id, "name": "Alice"} + +def list_users(): + return [{"id": "1", "name": "Alice"}] + +def create_user(name: str, email: str): + return {"id": "123", "name": name, "email": email} + +app = mk_app( + [get_user, list_users, create_user], + use_conventions=True +) ``` -Now try some post requests: +This automatically creates RESTful routes: +- `GET /users/{user_id}` β†’ `get_user(user_id)` +- `GET /users` β†’ `list_users()` +- `POST /users` β†’ `create_user(name, email)` + +### 🎯 Explicit Configuration + +```python +from qh import mk_app, RouteConfig + +def add(x: int, y: int) -> int: + return x + y -Run this script somewhere. For example, with curl try things like: +app = mk_app({ + add: RouteConfig( + path="/calculate/sum", + methods=["GET", "POST"], + tags=["math"], + summary="Add two numbers" + ) +}) +``` +Or use dictionaries: +```python +app = mk_app({ + add: { + "path": "/calculate/sum", + "methods": ["GET", "POST"], + } +}) ``` -curl http://127.0.0.1:8080/ping -# should get {"ping": "pong"} -curl -X POST http://127.0.0.1:8080/poke -# should get "here is a peek" +### πŸ”„ Parameter Transformation + +```python +import numpy as np +from qh import mk_app, RouteConfig, TransformSpec, HttpLocation -curl -H "Content-Type: application/json" -X POST -d '{"x": 3}' http://127.0.0.1:8080/foo -# (should get 5) +def add_arrays(a, b): + return (a + b).tolist() -curl -H "Content-Type: application/json" -X POST -d '{"name": "qh"}' http://127.0.0.1:8080/bar -# should get "hello qh!" +app = mk_app({ + add_arrays: RouteConfig( + param_overrides={ + "a": TransformSpec( + http_location=HttpLocation.JSON_BODY, + ingress=np.array # Convert JSON array to numpy + ), + "b": TransformSpec( + http_location=HttpLocation.JSON_BODY, + ingress=np.array + ) + } + ) +}) ``` -Now be happy (or go try the other function by doing some post requests). +Now you can send: +```bash +POST /add_arrays +{"a": [1,2,3], "b": [4,5,6]} +β†’ [5, 7, 9] +``` -## When your types get complicated +### 🌐 OpenAPI & Client Generation -To deploy the above, we would just need to do ```python -app = mk_http_service_app([poke, foo, bar]) +from qh import mk_app, export_openapi, mk_client_from_app + +def greet(name: str) -> str: + return f"Hello, {name}!" + +app = mk_app([greet]) + +# Export OpenAPI spec +export_openapi(app, "api.json") + +# Generate Python client +client = mk_client_from_app(app) +result = client.greet(name="World") # "Hello, World!" + +# Generate TypeScript client +from qh import export_ts_client +export_ts_client(app, "client.ts") ``` -But what if we also wanted to handle this: +### 🎨 Custom Types ```python -def add_numpy_arrays(a, b): - return (a + b).tolist() +from qh import register_type +from datetime import datetime + +def custom_serializer(dt: datetime) -> str: + return dt.isoformat() + +def custom_deserializer(s: str) -> datetime: + return datetime.fromisoformat(s) + +register_type( + datetime, + serialize=custom_serializer, + deserialize=custom_deserializer +) + +def get_event_time(event_id: str) -> datetime: + return datetime.now() + +app = mk_app([get_event_time]) ``` +### βš™οΈ Global Configuration -Here the a and b are assumed to be numpy arrays (or `.tolist()` would fail). -Out of the box, qh can only handle json types `(str, list, int, float)`, so we need to preprocess the input. -`qh` makes that easy too. +```python +from qh import mk_app, AppConfig + +app = mk_app( + funcs=[add, multiply, divide], + config=AppConfig( + path_prefix="/api/v1", + default_methods=["POST"], + title="Math API", + version="1.0.0", + ) +) +``` -Here we provide a name->conversion_func mapping (but could express it otherwise) +### πŸ§ͺ Testing Utilities ```python -from qh.trans import mk_json_handler_from_name_mapping -import numpy as np +from qh import test_app, serve_app, quick_test -input_trans = mk_json_handler_from_name_mapping( - { - "a": np.array, - "b": np.array - } +# Quick inline testing +with test_app(app) as client: + response = client.post("/add", json={"x": 3, "y": 5}) + assert response.json() == 8 + +# Serve for external testing +with serve_app(app, port=8001) as url: + import requests + response = requests.post(f"{url}/add", json={"x": 3, "y": 5}) + +# Quick smoke test +quick_test(app) # Tests all endpoints with example data +``` + +## Features + +### Built-in +- βœ… **Minimal boilerplate** - Define functions, get HTTP service +- βœ… **Type-driven** - Uses Python type hints for validation +- βœ… **FastAPI-powered** - Full async support, high performance +- βœ… **Automatic OpenAPI** - Interactive docs at `/docs` +- βœ… **Client generation** - Python, TypeScript, JavaScript clients +- βœ… **Convention over configuration** - RESTful routing from function names +- βœ… **Flexible parameter handling** - JSON, query, path, headers, forms +- βœ… **Custom transformations** - Transform inputs/outputs as needed +- βœ… **Testing utilities** - Built-in test client and helpers + +### Phase 4 (NEW): Async Task Processing +- βœ… **Background tasks** - Long-running operations without blocking +- βœ… **Task tracking** - Status monitoring and result retrieval +- βœ… **Flexible execution** - Thread pools, process pools, or custom executors +- βœ… **Client-controlled** - Let users choose sync vs async +- βœ… **Standard HTTP patterns** - Poll for status, wait for results +- βœ… **Task management** - List, query, cancel tasks via HTTP + +## Examples + +### Simple CRUD API +```python +from qh import mk_app + +# In-memory database +users = {} + +def create_user(name: str, email: str) -> dict: + user_id = str(len(users) + 1) + users[user_id] = {"id": user_id, "name": name, "email": email} + return users[user_id] + +def get_user(user_id: str) -> dict: + return users.get(user_id, {}) + +def list_users() -> list: + return list(users.values()) + +app = mk_app( + [create_user, get_user, list_users], + use_conventions=True ) +``` -app = mk_http_service_app([poke, foo, bar, add_numpy_arrays], - input_trans=input_trans) +### File Processing with Async +```python +from qh import mk_app, TaskConfig +import time + +def process_large_file(file_path: str) -> dict: + time.sleep(10) # Simulate heavy processing + return {"status": "processed", "path": file_path} + +app = mk_app( + [process_large_file], + async_funcs=['process_large_file'], + async_config=TaskConfig( + async_mode='always', # Always async + ttl=3600, # Keep results for 1 hour + ) +) -app.run() +# Client usage: +# POST /process_large_file -> Returns task_id immediately +# GET /tasks/{task_id}/result?wait=true -> Blocks until done ``` -Now try it out: +### Mixed Sync/Async API +```python +def quick_lookup(key: str) -> str: + """Fast operation - always synchronous""" + return cache.get(key) + +def expensive_aggregation(days: int) -> dict: + """Slow operation - supports async""" + time.sleep(days * 2) + return {"result": "..."} + +app = mk_app( + [quick_lookup, expensive_aggregation], + async_funcs=['expensive_aggregation'] # Only expensive_aggregation supports async +) +# quick_lookup is always synchronous +# expensive_aggregation can be called with ?async=true ``` -curl -H "Content-Type: application/json" -X POST -d '{"a": [1,2,3], "b": [4,5,6]}' http://127.0.0.1:8080/add_numpy_arrays -# should get [5, 7, 9] + +### Data Science API +```python +import numpy as np +import pandas as pd +from qh import mk_app, RouteConfig, TransformSpec + +def analyze_data(data: pd.DataFrame) -> dict: + return { + "mean": data.mean().to_dict(), + "std": data.std().to_dict() + } + +app = mk_app({ + analyze_data: RouteConfig( + param_overrides={ + "data": TransformSpec(ingress=pd.DataFrame) + } + ) +}) + +# POST /analyze_data +# {"data": {"col1": [1,2,3], "col2": [4,5,6]}} ``` -## Testing HTTP Services +## Philosophy + +**Convention over configuration, but configuration when you need it.** + +`qh` follows a layered approach: +1. **Simple case** - Just pass functions, get working HTTP service +2. **Common cases** - Use conventions (RESTful routing, type-driven validation) +3. **Advanced cases** - Explicit configuration for full control + +You write Python functions. `qh` handles the HTTP layer. + +## Comparison + +| Feature | qh | FastAPI | Flask | +|---------|----|---------| ------| +| From functions to HTTP | 1 line | ~10 lines | ~15 lines | +| Type validation | Automatic | Automatic | Manual | +| OpenAPI docs | Automatic | Automatic | Extensions | +| Client generation | βœ… Built-in | ❌ External tools | ❌ Manual | +| Convention routing | βœ… Yes | ❌ No | ❌ No | +| Async tasks | βœ… Built-in | ❌ Manual setup | ❌ Extensions | +| Task tracking | βœ… Automatic | ❌ Manual | ❌ Manual | +| Learning curve | Minutes | Hours | Hours | +| Suitable for production | Yes (it's FastAPI!) | Yes | Yes | + +## Under the Hood -The `service_running` context manager makes it easy to test your qh applications (or any HTTP service): +`qh` is built on: +- [FastAPI](https://fastapi.tiangolo.com/) - Modern, fast web framework +- [i2](https://github.com/i2mint/i2) - Function signature manipulation +- [Pydantic](https://pydantic-docs.helpmanual.io/) - Data validation + +When you create an app with `qh`, you get a fully-featured FastAPI application. All FastAPI features are available. + +## Advanced Topics + +### Using au Package (External Async Backend) + +The built-in async functionality is perfect for most use cases, but if you need distributed task processing, you can integrate with [au](https://github.com/i2mint/au): + +```bash +pip install au +``` ```python -from qh import mk_app, service_running -import requests +from au import async_compute, RQBackend +from qh import mk_app, TaskConfig -def add(x: int, y: int) -> int: - return x + y +# Configure au with Redis backend +@async_compute(backend=RQBackend('redis://localhost:6379')) +def heavy_computation(n: int) -> int: + return n * 2 -app = mk_app([add]) +# Use with qh +app = mk_app([heavy_computation]) +# Now heavy_computation can be distributed across multiple workers +``` -# Test your service -with service_running(app=app, port=8001) as info: - response = requests.post(f'{info.url}/add', json={'x': 3, 'y': 5}) - assert response.json() == 8 - print(f"βœ… Service tested successfully at {info.url}") +### Custom Task Executors + +```python +from qh import TaskExecutor, TaskConfig +from concurrent.futures import ThreadPoolExecutor + +class MyCustomExecutor(TaskExecutor): + def __init__(self): + self.pool = ThreadPoolExecutor(max_workers=10) + + def submit_task(self, task_id, func, args, kwargs, callback): + # Custom task submission logic + def wrapper(): + try: + result = func(*args, **kwargs) + callback(task_id, result, None) + except Exception as e: + callback(task_id, None, e) + self.pool.submit(wrapper) + + def shutdown(self, wait=True): + self.pool.shutdown(wait=wait) + +app = mk_app( + [my_func], + async_funcs=['my_func'], + async_config=TaskConfig(executor=MyCustomExecutor()) +) ``` -The `service_running` context manager: -- βœ… Checks if service is already running -- βœ… Only launches if needed -- βœ… Only tears down what it launched -- βœ… Returns `ServiceInfo` with URL, status, and thread info +### Middleware and Extensions + +Since `qh` creates a FastAPI app, you can use all FastAPI features: -You can also test external services: ```python -# Won't launch or tear down - just verifies it's running -with service_running(url='https://api.github.com') as info: - assert info.was_already_running - response = requests.get(f'{info.url}/users/octocat') +from qh import mk_app +from fastapi.middleware.cors import CORSMiddleware + +app = mk_app([my_func]) + +# Add CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +# Add custom routes +@app.get("/health") +async def health(): + return {"status": "healthy"} ``` -For simpler cases, use `serve_app`: +## Migration Guide + +### From v0.4.0 to v0.5.0 + +The async task feature is fully backward compatible. Existing apps will work without changes. + +To enable async: ```python -from qh import serve_app +# Old (still works) +app = mk_app([my_func]) -with serve_app(app, port=8001) as url: - response = requests.post(f'{url}/add', json={'x': 3, 'y': 5}) - assert response.json() == 8 +# New (with async support) +app = mk_app([my_func], async_funcs=['my_func']) ``` -See `examples/service_running_demo.py` for more examples. -``` \ No newline at end of file +## Contributing + +We welcome contributions! See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines. + +## License + +Apache 2.0 + +## Links + +- **Documentation**: https://github.com/i2mint/qh +- **Source Code**: https://github.com/i2mint/qh +- **Issue Tracker**: https://github.com/i2mint/qh/issues +- **Related Projects**: + - [i2](https://github.com/i2mint/i2) - Function signature manipulation + - [au](https://github.com/i2mint/au) - Async utilities for distributed computing + - [FastAPI](https://fastapi.tiangolo.com/) - The underlying web framework + +--- + +Made with ❀️ by the i2mint team diff --git a/misc/ASYNC_TRACKING_ANALYSIS.md b/misc/ASYNC_TRACKING_ANALYSIS.md new file mode 100644 index 0000000..dec1f10 --- /dev/null +++ b/misc/ASYNC_TRACKING_ANALYSIS.md @@ -0,0 +1,363 @@ +# Async Tracking ID Implementation Analysis + +## Current State of Async Support + +The qh codebase has **comprehensive async support** already in place: + +### What's Already Implemented +1. **Async endpoint handlers** - All endpoints are async +2. **Async function detection** - `inspect.iscoroutinefunction()` used +3. **Async parameter extraction** - `extract_http_params()` is async +4. **Natural await handling** - Async functions work seamlessly +5. **TestClient compatibility** - Works with FastAPI's TestClient + +### Code Examples Showing Current Async Support + +**From endpoint.py**: +```python +async def endpoint(request: Request) -> Response: + # Async parameter extraction + http_params = await extract_http_params(request, param_specs) + # ... + # Natural async/sync function handling + if is_async: + result = await func(**transformed_params) + else: + result = func(**transformed_params) +``` + +**From base.py**: +```python +async def endpoint(request: Request): + data = await request.json() + # ... + result = func(**data) + if inspect.iscoroutine(result): + result = await result +``` + +## What's Missing for Request Tracking IDs + +### Identified Gaps + +1. **No Request Context Management** + - FastAPI Request object is not preserved across async boundaries + - No async context variables (contextvars) used + - No automatic tracking ID injection + +2. **No Built-in Request ID Generation** + - No UUID generation for requests + - No header inspection for existing trace IDs + - No ID propagation mechanism + +3. **No Background Task Support** + - Background tasks not integrated into framework + - No async task queue or job management + - No task-to-request correlation + +4. **No Middleware for Request Correlation** + - No automatic header injection + - No request/response ID decoration + - No logging integration points + +## Recommended Patterns for Async Tracking IDs + +### Pattern 1: Using TransformSpec for Request ID Parameter + +**Approach**: Inject tracking ID as a special parameter via the rules system + +```python +from qh.rules import TransformSpec, HttpLocation +from qh.config import AppConfig, RouteConfig +import uuid + +# Create ingress function that extracts or generates tracking ID +def extract_tracking_id(request_value): + # Value comes from X-Request-ID header + return request_value or str(uuid.uuid4()) + +# Global rule that matches any 'request_id' parameter +tracking_id_rule = NameRule({ + 'request_id': TransformSpec( + http_location=HttpLocation.HEADER, + http_name='X-Request-ID', + ingress=extract_tracking_id + ) +}) + +# Apply globally +app_config = AppConfig( + rule_chain=RuleChain([tracking_id_rule]) +) + +app = mk_app(funcs, config=app_config) + +# Now any function with a 'request_id' parameter gets it automatically +def process_data(request_id: str, data: dict): + print(f"Processing {request_id}: {data}") +``` + +### Pattern 2: Using FastAPI's BackgroundTasks + +**Approach**: Leverage FastAPI's native background task support through Depends + +```python +from fastapi import BackgroundTasks, Depends +from qh import mk_app +import asyncio + +async def log_request(request_id: str, task_name: str): + """Log task in background""" + await asyncio.sleep(1) + print(f"Completed task {task_name} for request {request_id}") + +def process_with_background( + data: dict, + request_id: str = Header('X-Request-ID'), + background_tasks: BackgroundTasks = Depends() +): + """Process and log in background""" + # Process + result = {'processed': data} + + # Add background task + background_tasks.add_task(log_request, request_id, "process_with_background") + + return result + +app = mk_app([process_with_background]) +``` + +### Pattern 3: Contextvars for Async Context + +**Approach**: Use Python's contextvars for request-local storage + +```python +from contextvars import ContextVar +from qh import mk_app +from qh.rules import TransformSpec, HttpLocation, NameRule +import uuid + +# Create context variable for tracking ID +tracking_id_context: ContextVar[str] = ContextVar('tracking_id', default=None) + +def set_tracking_id(header_value: str = None): + """Set tracking ID in context""" + tid = header_value or str(uuid.uuid4()) + tracking_id_context.set(tid) + return tid + +# Rule that sets context +tracking_rule = NameRule({ + 'request_id': TransformSpec( + http_location=HttpLocation.HEADER, + http_name='X-Request-ID', + ingress=set_tracking_id + ) +}) + +# Now any function can access tracking ID +def get_tracking_id(): + return tracking_id_context.get() + +async def async_process(request_id: str, data: dict): + """Async function that can access tracking ID""" + tid = get_tracking_id() + print(f"Request ID: {request_id}, Context ID: {tid}") + # Both are the same ID + return {'result': data, 'request_id': tid} + +app = mk_app([async_process], config=AppConfig(rule_chain=RuleChain([tracking_rule]))) +``` + +### Pattern 4: Custom Middleware with Header Injection + +**Approach**: Add middleware to inject and manage tracking IDs + +```python +from fastapi import FastAPI, Request +from qh import mk_app +import uuid +from starlette.middleware.base import BaseHTTPMiddleware + +class TrackingIDMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + # Extract or generate tracking ID + tracking_id = request.headers.get('X-Request-ID', str(uuid.uuid4())) + + # Add to request state for access in handlers + request.state.tracking_id = tracking_id + + # Call next handler + response = await call_next(request) + + # Add tracking ID to response headers + response.headers['X-Request-ID'] = tracking_id + + return response + +def func_with_tracking(data: dict): + # In a real scenario, would access from request context + return {'processed': data} + +# Create app and add middleware +app = mk_app([func_with_tracking]) +app.add_middleware(TrackingIDMiddleware) +``` + +### Pattern 5: Structured Parameters for Tracking Context + +**Approach**: Create a dedicated tracking context parameter type + +```python +from dataclasses import dataclass +from qh import mk_app, register_json_type +from qh.rules import TransformSpec, HttpLocation, NameRule +import uuid + +@dataclass +class TrackingContext: + request_id: str + user_id: str = None + session_id: str = None + + @classmethod + def from_headers(cls, request_id_header: str = None, **kwargs): + return cls( + request_id=request_id_header or str(uuid.uuid4()), + user_id=kwargs.get('user_id'), + session_id=kwargs.get('session_id') + ) + +# Register custom type +register_json_type( + TrackingContext, + to_json=lambda ctx: { + 'request_id': ctx.request_id, + 'user_id': ctx.user_id, + 'session_id': ctx.session_id + }, + from_json=lambda data: TrackingContext(**data) +) + +# Rule to extract from headers +tracking_context_rule = NameRule({ + 'tracking': TransformSpec( + http_location=HttpLocation.HEADER, + http_name='X-Tracking-Context', + ingress=TrackingContext.from_headers + ) +}) + +async def process_tracked(data: dict, tracking: TrackingContext): + """Handler with full tracking context""" + print(f"Request {tracking.request_id} from user {tracking.user_id}") + return {'processed': data, 'request_id': tracking.request_id} + +app = mk_app( + [process_tracked], + config=AppConfig(rule_chain=RuleChain([tracking_context_rule])) +) +``` + +## Recommended Implementation Approach + +### Phase 1: Core Tracking ID Support + +**Best pattern**: Pattern 1 + 3 (TransformSpec + contextvars) + +**Advantages**: +- Uses existing qh architecture +- No changes to core framework +- Works with all function types +- Context available to async functions +- Simple to test + +**Implementation**: +```python +# qh/tracking.py (new module) +from contextvars import ContextVar +from qh.rules import NameRule, TransformSpec, HttpLocation +from uuid import uuid4 + +# Context variable for request-local tracking ID +REQUEST_ID_CONTEXT: ContextVar[str] = ContextVar('request_id', default=None) + +def get_request_id() -> str: + """Get current request ID from context""" + return REQUEST_ID_CONTEXT.get() + +def set_request_id(header_value: str = None) -> str: + """Set and return request ID""" + tid = header_value or str(uuid4()) + REQUEST_ID_CONTEXT.set(tid) + return tid + +# Predefined rule for automatic tracking ID injection +TRACKING_ID_RULE = NameRule({ + 'request_id': TransformSpec( + http_location=HttpLocation.HEADER, + http_name='X-Request-ID', + ingress=set_request_id + ) +}) +``` + +### Phase 2: Background Task Integration + +**Pattern**: FastAPI's BackgroundTasks + tracking ID propagation + +**Features**: +- Automatic background task creation +- Request ID passed to background tasks +- Task status tracking + +### Phase 3: Distributed Tracing + +**Pattern**: OpenTelemetry integration + +**Features**: +- Span creation per request +- Automatic span propagation +- Integration with observability platforms + +## Design Principles for qh Tracking IDs + +1. **Non-Invasive**: Works without modifying user functions +2. **Opt-In**: Can be enabled selectively per-function +3. **Configurable**: Multiple header names, ID generation strategies +4. **Async-Native**: Uses contextvars, not thread-local storage +5. **Framework-Aligned**: Uses FastAPI patterns, not custom middleware + +## Code Quality Considerations + +### Type Safety +- Use `ContextVar[str]` with proper typing +- TransformSpec definitions are typed + +### Error Handling +- Missing header β†’ auto-generate ID +- Invalid ID format β†’ use default generator +- Context not set β†’ graceful fallback + +### Performance +- Context variable lookup is O(1) +- No allocations in hot path +- Rule evaluation happens at app creation time + +### Testing +- Existing TestClient works unchanged +- Can override tracking IDs in tests +- Context isolation between test cases + +## Summary + +The qh framework is **well-positioned for async tracking ID implementation** because: + +1. βœ… Already async-native with proper endpoint handlers +2. βœ… Has rule-based parameter transformation system +3. βœ… Supports custom configurations per function +4. βœ… Can leverage FastAPI's Request object +5. βœ… Type-safe configuration system + +**Recommended next step**: Implement Pattern 1 + 3 (TransformSpec + contextvars) as a new optional module `qh/tracking.py` that integrates cleanly with existing architecture. diff --git a/misc/CODEBASE_OVERVIEW.md b/misc/CODEBASE_OVERVIEW.md new file mode 100644 index 0000000..92a6736 --- /dev/null +++ b/misc/CODEBASE_OVERVIEW.md @@ -0,0 +1,496 @@ +# QH Codebase Architecture Overview + +## 1. Project Purpose and Current State + +**qh** ("Quick HTTP") is a convention-over-configuration tool for rapidly creating HTTP services from Python functions using FastAPI as the underlying framework. It transforms Python functions into REST API endpoints with minimal boilerplate. + +**Current Phase**: Phase 3 (OpenAPI Export & Client Generation) recently completed +**Version**: 0.4.0 +**Status**: All 3 phases complete and tested + +## 2. Core Architecture + +### 2.1 Main Module Organization + +``` +qh/ +β”œβ”€β”€ __init__.py # Main API exports +β”œβ”€β”€ app.py # Primary API: mk_app() - creates FastAPI apps from functions +β”œβ”€β”€ config.py # Configuration system: AppConfig, RouteConfig, ConfigBuilder +β”œβ”€β”€ endpoint.py # Endpoint creation: make_endpoint(), extract_http_params() +β”œβ”€β”€ rules.py # Rule-based transformation system for parameter handling +β”œβ”€β”€ conventions.py # Convention-based routing (REST patterns) +β”œβ”€β”€ types.py # Type registry for automatic serialization/deserialization +β”œβ”€β”€ base.py # Lower-level mk_fastapi_app() and utilities +β”œβ”€β”€ core.py # Core FastAPI app creation with Wrap-based composition +β”œβ”€β”€ openapi.py # OpenAPI spec generation and enhancement +β”œβ”€β”€ client.py # Python client generation from OpenAPI specs +β”œβ”€β”€ jsclient.py # JavaScript/TypeScript client generation +β”œβ”€β”€ stores_qh.py # Store/object dispatch for dict-like objects +β”œβ”€β”€ testing.py # Testing utilities: AppRunner, serve_app, etc. +└── tests/ # Comprehensive test suite +``` + +## 3. Main Entry Point: mk_app() + +**Location**: `qh/app.py` + +**Purpose**: Single unified API to create FastAPI applications from Python functions + +**Key Features**: +- **Multiple input formats**: + - Single callable: `mk_app(func)` + - List of callables: `mk_app([func1, func2, func3])` + - Dict with per-function config: `mk_app({func1: config1, func2: config2})` + +- **Configuration levels** (hierarchy: function β†’ app β†’ global): + 1. Function-level: `RouteConfig` per function + 2. App-level: `AppConfig` for global defaults + 3. Parameter-level: `TransformSpec` for specific parameters + +```python +# Example usage +def add(x: int, y: int) -> int: + return x + y + +def list_users(limit: int = 10) -> list: + return [...] + +# Simple case - uses defaults +app = mk_app([add, list_users]) + +# With conventions (REST patterns) +app = mk_app([add, list_users], use_conventions=True) + +# With custom config +app = mk_app( + {add: {'path': '/math/add', 'methods': ['POST']}}, + config={'path_prefix': '/api/v1'} +) +``` + +## 4. Configuration System + +**Location**: `qh/config.py` + +### Four-Tier Hierarchy + +1. **Global Defaults** (`DEFAULT_ROUTE_CONFIG`, `DEFAULT_APP_CONFIG`) +2. **App-Level Config** (`AppConfig`) +3. **Function-Level Config** (`RouteConfig`) +4. **Parameter-Level Config** (`param_overrides` in RouteConfig) + +### Key Classes + +**AppConfig**: +- `default_methods`: HTTP methods for all routes (default: ['POST']) +- `path_template`: Auto-generate paths (default: '/{func_name}') +- `path_prefix`: Prefix all routes (e.g., '/api/v1') +- `rule_chain`: Global transformation rules +- FastAPI kwargs (title, version, docs_url, etc.) + +**RouteConfig**: +- `path`: Custom endpoint path +- `methods`: HTTP methods ('GET', 'POST', 'PUT', 'DELETE', 'PATCH') +- `rule_chain`: Custom parameter transformation rules +- `param_overrides`: Per-parameter HTTP location and transformation +- Metadata: `summary`, `description`, `tags`, `response_model` +- Schema options: `include_in_schema`, `deprecated` + +## 5. Endpoint Creation Pipeline + +**Location**: `qh/endpoint.py` + +### make_endpoint() Function + +Creates FastAPI-compatible async endpoint functions that: + +1. **Extract HTTP Parameters** + - Path parameters: `{param}` from URL + - Query parameters: `?key=value` from query string + - JSON body: Parameters from POST/PUT/PATCH body + - Headers, cookies, form data + +2. **Apply Ingress Transformations** + - Convert HTTP representation to Python types + - Type hints used for automatic conversion + - Custom rules applied via `TransformSpec` + +3. **Call Original Function** + - Validates required parameters + - Provides defaults for optional parameters + - Supports both sync and async functions + +4. **Apply Egress Transformations** + - Convert return value to JSON-serializable format + - Default handler for common types (dict, list, str, int, etc.) + +**Key Feature: Async Support** +- `inspect.iscoroutinefunction()` used to detect async functions +- Async functions are awaited naturally +- Sync functions work the same way + +## 6. Transformation Rules System + +**Location**: `qh/rules.py` + +### Rule-Based Parameter Matching + +**HttpLocation Enum**: +```python +- JSON_BODY # Default for POST/PUT/PATCH +- PATH # URL path parameter +- QUERY # Query string parameter +- HEADER # HTTP header +- COOKIE # HTTP cookie +- BINARY_BODY # Raw binary payload +- FORM_DATA # Multipart form data +``` + +**TransformSpec Dataclass**: +- `http_location`: Where to find/put the parameter +- `ingress`: Transform function (HTTP β†’ Python) +- `egress`: Transform function (Python β†’ HTTP) +- `http_name`: HTTP name (may differ from Python param name) + +**Rule Types**: +- `TypeRule`: Match by parameter type +- `NameRule`: Match by parameter name +- `FuncRule`: Match by function object +- `FuncNameRule`: Match by function name pattern +- `DefaultValueRule`: Match by default value +- `CompositeRule`: Combine multiple rules (AND/OR) + +**RuleChain**: +- Stores rules with priorities (higher priority evaluated first) +- First-match semantics: returns first matching rule +- `resolve_transform()` function resolves final spec + +## 7. Convention-Based Routing + +**Location**: `qh/conventions.py` + +### Automatic REST Path Generation + +**Function Name Parsing**: +``` +Verb patterns: +- get, fetch, retrieve, read β†’ GET +- list, find, search, query β†’ GET +- create, add, insert, new β†’ POST +- update, modify, edit, change, set β†’ PUT +- patch β†’ PATCH +- delete, remove, destroy β†’ DELETE + +Resource patterns: +- verb_resource: get_user, list_users, create_order_item +- Resource name auto-pluralized for collections +``` + +**Example Transformations**: +```python +def get_user(user_id: str) β†’ GET /users/{user_id} +def list_users(limit: int = 100) β†’ GET /users?limit=100 +def create_user(name: str) β†’ POST /users +def update_user(user_id: str, ...) β†’ PUT /users/{user_id} +def delete_user(user_id: str) β†’ DELETE /users/{user_id} +``` + +**Implementation Functions**: +- `parse_function_name()`: Extract verb and resource +- `infer_http_method()`: Get HTTP method from verb +- `infer_path_from_function()`: Generate RESTful path +- `apply_conventions_to_funcs()`: Apply to function list + +## 8. Type Registry System + +**Location**: `qh/types.py` + +### Automatic Serialization/Deserialization + +**TypeHandler**: +- Maps Python type β†’ JSON representation +- `to_json()`: Python object β†’ JSON +- `from_json()`: JSON β†’ Python object + +**TypeRegistry**: +- Global `_global_registry` manages all type handlers +- `register_type()`: Register custom type handler +- `get_transform_spec_for_type()`: Get ingress/egress for type + +**Built-in Support**: +- Python builtins: str, int, float, bool, list, dict, None +- NumPy arrays: `.tolist()` / `np.array()` +- Pandas DataFrames: `.to_dict(orient='records')` +- Pandas Series: `.tolist()` + +**Custom Type Registration**: +```python +# Method 1: Explicit +register_type( + MyClass, + to_json=lambda obj: obj.to_dict(), + from_json=lambda data: MyClass.from_dict(data) +) + +# Method 2: Decorator (auto-detects to_dict/from_dict) +@register_json_type +class Point: + def to_dict(self): ... + @classmethod + def from_dict(cls, data): ... +``` + +## 9. Async Support in Current Codebase + +### Existing Async Capabilities + +**Async Functions are Supported**: +- `endpoint.py`: `make_endpoint()` creates async wrapper +- Detects async functions with `inspect.iscoroutinefunction()` +- Awaits async function results: `await func(**params)` +- Async helper functions: `extract_http_params()` is async + +**Request Processing is Async**: +- Parameter extraction awaits: `await request.json()` +- Form parsing: `await request.form()` +- Body reading: `await request.body()` + +**FastAPI Integration**: +- All endpoints are async handlers +- Compatible with FastAPI's async model +- Can use async dependencies (Depends) + +### Async Test Support +```python +# From test_core.py +async def async_greeter(greeting: str, name: str = 'world') -> str: + await asyncio.sleep(0.1) # Simulate async operation + return f"{greeting}, {name}!" + +# Works naturally with TestClient +app = mk_fastapi_app([async_greeter]) +response = TestClient(app).post("/async_greeter", ...) +``` + +## 10. Function Registration & Configuration Patterns + +### How Functions are Registered + +1. **Normalization** (`normalize_funcs_input()`): + - Converts various input formats to `Dict[Callable, RouteConfig]` + - Single function β†’ wrapped in dict + - List β†’ dict with empty configs + - Dict β†’ preserved, dict configs converted to RouteConfig + +2. **Convention Application** (optional): + - `apply_conventions_to_funcs()` adds path/method inference + - Explicit config takes precedence over conventions + +3. **Configuration Resolution**: + - `resolve_route_config()` merges defaults with explicit config + - Fills in missing values from app-level defaults + - Auto-generates paths/descriptions from function metadata + +4. **Endpoint Creation**: + - `make_endpoint()` wraps function with HTTP handling + - Stores original function as `_qh_original_func` + - Sets metadata (`__name__`, `__doc__`) + +5. **Route Registration**: + - `app.add_api_route()` registers with FastAPI + - Full path = `app_config.path_prefix + resolved_config.path` + - Methods, summary, description, tags all configured + +## 11. Optional Features & Middleware Patterns + +### Configuration-Based Optional Features + +1. **Custom Transformations** (per-function): + ```python + RouteConfig( + param_overrides={ + 'param_name': TransformSpec( + http_location=HttpLocation.HEADER, + ingress=custom_decoder, + egress=custom_encoder + ) + } + ) + ``` + +2. **HTTP Location Overrides**: + - Path parameters: `HttpLocation.PATH` + - Query parameters: `HttpLocation.QUERY` + - Headers: `HttpLocation.HEADER` + - Cookies: `HttpLocation.COOKIE` + - Form data: `HttpLocation.FORM_DATA` + +3. **Response Customization**: + - `response_model`: Pydantic model for OpenAPI + - `include_in_schema`: Toggle OpenAPI documentation + - `deprecated`: Mark routes as deprecated + +4. **Metadata**: + - `summary`, `description`: OpenAPI docs + - `tags`: Grouping in OpenAPI UI + +### Middleware-Like Patterns + +**Rules System as Configuration**: +- Custom `RuleChain` applied globally or per-function +- Rules can modify parameter handling without code changes +- Type registry acts as a global configuration + +**Layered Configuration**: +- Global defaults can be set in `AppConfig` +- Per-function overrides in `RouteConfig` +- Per-parameter control via `TransformSpec` + +## 12. Store/Object Dispatch + +**Location**: `qh/stores_qh.py` + +### Specialized Pattern for Objects + +Exposes store-like (dict) objects or arbitrary objects as REST APIs: + +```python +# Store methods exposed as HTTP endpoints +__iter__ β†’ GET / (list keys) +__getitem__ β†’ GET /{key} (get value) +__setitem__ β†’ PUT /{key} (set value) +__delitem__ β†’ DELETE /{key} +__contains__ β†’ GET /{key}/exists +__len__ β†’ GET /$count +``` + +### User-Provided Patterns + +```python +# Object method dispatch +class DataService: + def get_data(self, key: str) β†’ GET /data/{key} + def put_data(self, key: str, data: bytes) β†’ PUT /data/{key} + +# Generic method exposure with custom configs +mk_store_dispatcher( + store_getter=lambda store_id: stores[store_id], + path_prefix='/stores' +) +``` + +## 13. Testing Infrastructure + +**Location**: `qh/testing.py` + +### Testing Utilities + +1. **AppRunner**: Context manager for running apps + - `use_server=False`: FastAPI TestClient (fast, no network) + - `use_server=True`: Real uvicorn server (integration testing) + +2. **Convenience Functions**: + - `run_app()`: Generic context manager + - `test_app()`: Simplified for TestClient + - `serve_app()`: Simplified for real server + - `quick_test()`: Single-function testing + +### Example Usage +```python +from qh import mk_app +from qh.testing import test_app + +def add(x: int, y: int) -> int: + return x + y + +app = mk_app([add]) + +with test_app(app) as client: + response = client.post('/add', json={'x': 3, 'y': 5}) + assert response.json() == 8 +``` + +## 14. OpenAPI & Client Generation (Phase 3) + +**Locations**: `qh/openapi.py`, `qh/client.py`, `qh/jsclient.py` + +### OpenAPI Spec Export +- `export_openapi()`: Generate OpenAPI spec from app +- `enhance_openapi_schema()`: Add custom metadata +- Extended with `x-python-*` fields for round-tripping + +### Python Client Generation +- `mk_client_from_openapi()`: Generate Python client from spec +- `mk_client_from_url()`: Generate from remote API +- `mk_client_from_app()`: Generate from FastAPI app +- `HttpClient`: Base client class with request methods + +### JavaScript/TypeScript Generation +- `export_js_client()`: Generate JavaScript client code +- `export_ts_client()`: Generate TypeScript client code + +## 15. Current Async Limitations and Opportunities + +### Current State +βœ… Async functions work +βœ… Async parameter extraction works +βœ… Async endpoint handlers work +βœ… TestClient supports async functions + +### Limitations/Gaps for Tracking IDs +❌ No automatic request tracking/correlation +❌ No built-in request context management +❌ No background task integration +❌ No async context variables used +❌ No request ID propagation across async boundaries +❌ No tracking ID middleware + +## 16. Key Design Patterns & Principles + +### Convention Over Configuration +- Smart defaults: most functions work with no config +- Escape hatches: override any behavior when needed +- REST conventions inferred from function names + +### Layered Configuration +- Global defaults apply to all routes +- App-level config overrides global +- Function-level config overrides app +- Parameter-level control via rules/overrides + +### Type-Driven +- Type hints used for validation and conversion +- Custom types registered via registry +- Ingress/egress transformations based on types + +### Rule-Based Parameter Handling +- Flexible matching (type, name, function, patterns) +- First-match semantics with priority +- Composable rules for complex scenarios + +### FastAPI-Native +- No abstraction layer over FastAPI +- Users get full FastAPI capabilities +- Direct access to Request, Depends, etc. + +### Open for Extension +- Custom rules can be added +- Types can be registered +- Stores/objects can be dispatched +- OpenAPI can be enhanced + +## Summary + +The qh codebase is a well-architected, convention-over-configuration framework for exposing Python functions as HTTP services. It builds directly on FastAPI with: + +1. **Clean API**: Single `mk_app()` entry point with multiple input formats +2. **Flexible Configuration**: Four-tier hierarchy (global β†’ app β†’ function β†’ parameter) +3. **Smart Defaults**: REST conventions inferred from function names +4. **Type Safety**: Type hints drive validation and transformation +5. **Async Ready**: Full support for async functions and FastAPI patterns +6. **Extensible**: Type registry, rule system, and custom configurations +7. **Testing Friendly**: Built-in test utilities and app inspection +8. **Production Ready**: OpenAPI generation, client code generation, error handling + +The codebase is mature (Phase 3 complete) with comprehensive test coverage and good documentation. The architecture supports adding async tracking ID capabilities through the configuration and rule systems without major refactoring. diff --git a/misc/QUICK_REFERENCE.md b/misc/QUICK_REFERENCE.md new file mode 100644 index 0000000..cf62334 --- /dev/null +++ b/misc/QUICK_REFERENCE.md @@ -0,0 +1,316 @@ +# QH Codebase Quick Reference + +## Project at a Glance + +**qh** = "Quick HTTP" = FastAPI-based function-to-REST-API tool + +**Current Status**: Phase 3 complete (v0.4.0) - Full OpenAPI & client generation + +## Entry Points (in priority order) + +| File | Purpose | Key Classes/Functions | +|------|---------|----------------------| +| `qh/app.py` | **Primary API** | `mk_app()` - creates FastAPI app | +| `qh/config.py` | Configuration system | `AppConfig`, `RouteConfig` | +| `qh/endpoint.py` | Endpoint creation | `make_endpoint()`, async handlers | +| `qh/rules.py` | Parameter transformation | `TransformSpec`, `RuleChain`, `*Rule` | +| `qh/conventions.py` | REST auto-routing | `apply_conventions_to_funcs()` | +| `qh/types.py` | Type serialization | `register_type()`, `TypeRegistry` | + +## Core Data Structures + +### AppConfig (app-level) +```python +AppConfig( + default_methods=['POST'], + path_template='/{func_name}', + path_prefix='/api', + rule_chain=DEFAULT_RULE_CHAIN, + title='My API', + version='0.1.0' +) +``` + +### RouteConfig (function-level) +```python +RouteConfig( + path='/custom/path', + methods=['GET', 'POST'], + rule_chain=custom_rules, + param_overrides={'param': TransformSpec(...)}, + summary='Brief description', + tags=['tag1', 'tag2'] +) +``` + +### TransformSpec (parameter-level) +```python +TransformSpec( + http_location=HttpLocation.QUERY, + ingress=custom_converter, # HTTP β†’ Python + egress=custom_serializer, # Python β†’ HTTP + http_name='different_name' +) +``` + +## How mk_app() Works (Pipeline) + +``` +1. Input Normalization + β”œβ”€ Single callable β†’ Dict[Callable, RouteConfig] + β”œβ”€ List β†’ Dict[Callable, RouteConfig] + └─ Dict β†’ Keep as-is + +2. Convention Application (if use_conventions=True) + β”œβ”€ Parse function name (verb_resource) + β”œβ”€ Infer HTTP method (getβ†’GET, createβ†’POST, etc.) + └─ Generate RESTful path (/users/{user_id}) + +3. Configuration Resolution (per function) + β”œβ”€ Start with DEFAULT_ROUTE_CONFIG + β”œβ”€ Apply AppConfig defaults + β”œβ”€ Apply function-specific RouteConfig + └─ Auto-fill missing fields + +4. Endpoint Creation + β”œβ”€ make_endpoint() wraps function + β”œβ”€ Creates async HTTP handler + β”œβ”€ Parameter extraction & transformation + └─ Stores original function reference + +5. Route Registration + └─ app.add_api_route() to FastAPI app +``` + +## Key Patterns + +### Simple Function +```python +def add(x: int, y: int) -> int: + return x + y + +app = mk_app([add]) +# β†’ POST /add with JSON body +``` + +### With Conventions +```python +def get_user(user_id: str) -> dict: + return {'user_id': user_id} + +def list_users(limit: int = 10) -> list: + return [...] + +app = mk_app([get_user, list_users], use_conventions=True) +# β†’ GET /users/{user_id} +# β†’ GET /users?limit=10 +``` + +### Custom Configuration +```python +app = mk_app( + { + func1: RouteConfig(path='/custom', methods=['GET']), + func2: {'path': '/other', 'methods': ['POST', 'PUT']}, + }, + config=AppConfig(path_prefix='/api/v1') +) +``` + +### Parameter Transformation +```python +from qh.rules import NameRule, TransformSpec, HttpLocation + +my_rule = NameRule({ + 'api_key': TransformSpec( + http_location=HttpLocation.HEADER, + http_name='Authorization' + ) +}) + +app = mk_app( + [func], + config=AppConfig(rule_chain=RuleChain([my_rule])) +) +``` + +### Type Registration +```python +import numpy as np +from qh import register_type + +register_type( + np.ndarray, + to_json=lambda arr: arr.tolist(), + from_json=lambda lst: np.array(lst) +) + +def process(data: np.ndarray) -> np.ndarray: + return data * 2 + +app = mk_app([process]) +# JSON arrays ↔ NumPy arrays automatically +``` + +## Async Support + +### Current Capabilities +βœ… Async functions work automatically +βœ… Async parameter extraction +βœ… Proper await handling +βœ… TestClient compatible + +### Example +```python +async def fetch_data(url: str) -> dict: + # async function works naturally + response = await some_http_client.get(url) + return response.json() + +app = mk_app([fetch_data]) +# Works seamlessly, handler awaits automatically +``` + +## Testing + +```python +from qh.testing import test_app + +def add(x: int, y: int) -> int: + return x + y + +app = mk_app([add]) + +with test_app(app) as client: + response = client.post('/add', json={'x': 3, 'y': 5}) + assert response.json() == 8 +``` + +## Configuration Hierarchy (Precedence) + +``` +Parameter-level override (highest) + ↓ +Function-level config (RouteConfig) + ↓ +App-level config (AppConfig) + ↓ +Global defaults (lowest) +``` + +## Route Inspection + +```python +from qh import inspect_routes, print_routes + +routes = inspect_routes(app) +print_routes(app) + +# Output: +# METHODS PATH ENDPOINT +# ------- ---- -------- +# POST /add add +# GET /users/{id} get_user +``` + +## HTTP Locations (Where Parameters Come From) + +| Location | Source | Example | +|----------|--------|---------| +| `PATH` | URL path | `/users/{user_id}` | +| `QUERY` | Query string | `?limit=10&offset=20` | +| `JSON_BODY` | POST/PUT body | `{"x": 3, "y": 5}` | +| `HEADER` | HTTP header | `X-API-Key: secret` | +| `COOKIE` | HTTP cookie | `session_id=abc123` | +| `FORM_DATA` | Multipart form | Uploaded files | +| `BINARY_BODY` | Raw body | Binary data | + +## File Structure Reference + +``` +qh/ +β”œβ”€β”€ __init__.py β†’ Main exports +β”œβ”€β”€ app.py β†’ mk_app() and route inspection +β”œβ”€β”€ config.py β†’ AppConfig, RouteConfig, ConfigBuilder +β”œβ”€β”€ endpoint.py β†’ make_endpoint(), async handlers +β”œβ”€β”€ rules.py β†’ Rule system, TransformSpec +β”œβ”€β”€ conventions.py β†’ REST conventions (get_user β†’ GET /users/{id}) +β”œβ”€β”€ types.py β†’ Type registry, custom serialization +β”œβ”€β”€ base.py β†’ Lower-level mk_fastapi_app() +β”œβ”€β”€ core.py β†’ Core with i2.Wrap composition +β”œβ”€β”€ openapi.py β†’ OpenAPI spec generation +β”œβ”€β”€ client.py β†’ Python client generation from specs +β”œβ”€β”€ jsclient.py β†’ JavaScript/TypeScript code gen +β”œβ”€β”€ stores_qh.py β†’ Store/object dispatch +β”œβ”€β”€ testing.py β†’ AppRunner, test utilities +└── tests/ β†’ Comprehensive test suite +``` + +## Important Functions + +| Function | Location | Purpose | +|----------|----------|---------| +| `mk_app()` | `app.py` | **Main entry point** | +| `make_endpoint()` | `endpoint.py` | Create async HTTP handler | +| `resolve_route_config()` | `config.py` | Merge configs hierarchically | +| `extract_http_params()` | `endpoint.py` | Extract params from request | +| `apply_conventions_to_funcs()` | `conventions.py` | Apply REST patterns | +| `register_type()` | `types.py` | Register custom type handler | +| `resolve_transform()` | `rules.py` | Resolve parameter transformation | +| `inspect_routes()` | `app.py` | Get list of routes | + +## Common Issues & Solutions + +| Issue | Solution | +|-------|----------| +| Function params not extracted | Check `TransformSpec.http_location` | +| Type conversion failing | Register type with `register_type()` | +| Path parameter not recognized | Use `{param_name}` in path config | +| GET request not working | Params must come from query string or path | +| Async function not awaited | Already handled automatically | +| Missing X-Request-ID header | Use `TransformSpec` with fallback | + +## Design Principles + +1. **Convention over Configuration** - Smart defaults, explicit overrides +2. **Layered Configuration** - Global β†’ app β†’ function β†’ parameter +3. **Type-Driven** - Type hints drive behavior +4. **Rule-Based** - Flexible parameter matching and transformation +5. **FastAPI-Native** - Direct FastAPI integration, no abstraction +6. **Async-Ready** - Full async/await support +7. **Extensible** - Type registry, custom rules, middleware + +## Version & Imports + +```python +# Main API (recommended) +from qh import mk_app, AppConfig, RouteConfig + +# Rules and configuration +from qh import RuleChain, TransformSpec, HttpLocation +from qh.rules import NameRule, TypeRule, FuncRule + +# Type registry +from qh import register_type, register_json_type + +# Testing +from qh.testing import test_app, serve_app, quick_test + +# Conventions +from qh import mk_app # use_conventions=True parameter + +# Advanced +from qh.config import ConfigBuilder +from qh.endpoint import make_endpoint +from qh.conventions import apply_conventions_to_funcs +``` + +## Next Steps for Async Tracking IDs + +See `async_tracking_analysis.md` for: +- Recommended patterns for request ID handling +- Contextvars-based context management +- Integration points with FastAPI +- Code examples for 5 different approaches + +**Recommended approach**: Pattern 1 + 3 (TransformSpec + contextvars) diff --git a/pyproject.toml b/pyproject.toml index 7b8c900..133cc8a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "qh" -version = "0.0.6" +version = "0.0.8" description = "Quick Http web-service construction" readme = "README.md" requires-python = ">=3.10" diff --git a/qh/__init__.py b/qh/__init__.py index 2ad5918..913bb2a 100644 --- a/qh/__init__.py +++ b/qh/__init__.py @@ -27,6 +27,19 @@ from qh.client import mk_client_from_openapi, mk_client_from_url, mk_client_from_app, HttpClient from qh.jsclient import export_js_client, export_ts_client +# Async task processing +from qh.async_tasks import ( + TaskConfig, + TaskStatus, + TaskInfo, + TaskStore, + InMemoryTaskStore, + TaskExecutor, + ThreadPoolTaskExecutor, + ProcessPoolTaskExecutor, + TaskManager, +) + # Testing utilities from qh.testing import AppRunner, run_app, test_app, serve_app, quick_test @@ -44,7 +57,7 @@ # py2http not available, skip legacy imports pass -__version__ = '0.4.0' # Phase 3: OpenAPI & Client Generation +__version__ = '0.5.0' # Phase 4: Async Task Processing __all__ = [ # Primary API 'mk_app', @@ -75,6 +88,16 @@ 'HttpClient', 'export_js_client', 'export_ts_client', + # Async Tasks (Phase 4) + 'TaskConfig', + 'TaskStatus', + 'TaskInfo', + 'TaskStore', + 'InMemoryTaskStore', + 'TaskExecutor', + 'ThreadPoolTaskExecutor', + 'ProcessPoolTaskExecutor', + 'TaskManager', # Testing utilities 'AppRunner', 'run_app', diff --git a/qh/app.py b/qh/app.py index 8dfdb0a..995a5f9 100644 --- a/qh/app.py +++ b/qh/app.py @@ -28,6 +28,8 @@ def mk_app( app: Optional[FastAPI] = None, config: Optional[Union[Dict[str, Any], AppConfig]] = None, use_conventions: bool = False, + async_funcs: Optional[List[Union[str, Callable]]] = None, + async_config: Optional[Union[Dict[str, Any], 'TaskConfig']] = None, **kwargs, ) -> FastAPI: """ @@ -56,6 +58,15 @@ def mk_app( - list_users() β†’ GET /users - create_user(user) β†’ POST /users + async_funcs: List of functions (by name or reference) that should support + async task execution. When enabled, clients can add ?async=true to + get a task ID instead of blocking for the result. + + async_config: Configuration for async task processing. Can be: + - None (uses default TaskConfig for functions in async_funcs) + - TaskConfig object (applies to all async_funcs) + - Dict mapping function names to TaskConfig objects + **kwargs: Additional FastAPI() constructor kwargs (if creating new app) Returns: @@ -82,10 +93,59 @@ def mk_app( >>> app = mk_app({ ... add: {'methods': ['GET', 'POST'], 'path': '/calculate/add'}, ... }) + + With async support: + >>> def expensive_task(n: int) -> int: + ... import time + ... time.sleep(5) + ... return n * 2 + >>> app = mk_app([expensive_task], async_funcs=['expensive_task']) + # Now: POST /expensive_task?async=true returns {"task_id": "..."} + # GET /tasks/{task_id}/result returns the result when ready """ # Normalize input formats func_configs = normalize_funcs_input(funcs) + # Process async configuration + if async_funcs: + from qh.async_tasks import TaskConfig + + # Normalize async_config + if async_config is None: + # Use default config for all async functions + default_task_config = TaskConfig() + async_config_map = {} + elif isinstance(async_config, dict): + # Dict mapping function names to configs + async_config_map = async_config + default_task_config = TaskConfig() + else: + # Single TaskConfig for all functions + default_task_config = async_config + async_config_map = {} + + # Apply async config to specified functions + async_func_names = set() + for func_ref in async_funcs: + if callable(func_ref): + async_func_names.add(func_ref.__name__) + else: + async_func_names.add(str(func_ref)) + + for func, route_config in func_configs.items(): + if func.__name__ in async_func_names: + # Get function-specific config or use default + task_config = async_config_map.get(func.__name__, default_task_config) + + # Update route config with async config + if isinstance(route_config, RouteConfig): + route_config.async_config = task_config + else: + # It's a dict, convert to RouteConfig + route_dict = route_config or {} + route_dict['async_config'] = task_config + func_configs[func] = route_dict + # Apply conventions if requested if use_conventions: # Get list of functions @@ -168,6 +228,30 @@ def mk_app( # Add route to app app.add_api_route(**route_kwargs) + # Add task management endpoints if any async functions were configured + if async_funcs: + from qh.async_endpoints import add_global_task_endpoints + + # Add global task endpoints (list all tasks) + add_global_task_endpoints(app) + + # Add per-function task endpoints + for func in func_configs.keys(): + if func.__name__ in async_func_names: + from qh.async_endpoints import add_task_endpoints + + # Get the task config to check if we should create endpoints + route_config = func_configs[func] + if isinstance(route_config, RouteConfig): + task_config = route_config.async_config + elif isinstance(route_config, dict): + task_config = route_config.get('async_config') + else: + task_config = None + + if task_config and getattr(task_config, 'create_task_endpoints', True): + add_task_endpoints(app, func.__name__) + return app diff --git a/qh/async_endpoints.py b/qh/async_endpoints.py new file mode 100644 index 0000000..f7c09ae --- /dev/null +++ b/qh/async_endpoints.py @@ -0,0 +1,175 @@ +""" +Helper functions to create task management endpoints. + +These endpoints provide standard HTTP interfaces for task status and results. +""" + +from typing import Optional +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse + +from qh.async_tasks import get_task_manager, TaskStatus + + +def add_task_endpoints( + app: FastAPI, + func_name: str, + path_prefix: str = "/tasks", +) -> None: + """ + Add task management endpoints for a specific function. + + Creates the following endpoints: + - GET {path_prefix}/{task_id}/status - Get task status + - GET {path_prefix}/{task_id}/result - Get task result (waits if needed) + - GET {path_prefix}/{task_id} - Get complete task info + - DELETE {path_prefix}/{task_id} - Cancel/delete a task + + Args: + app: FastAPI application + func_name: Name of the function these tasks belong to + path_prefix: URL path prefix for task endpoints + """ + + @app.get( + f"{path_prefix}/{{task_id}}/status", + summary="Get task status", + tags=["tasks"], + ) + async def get_task_status(task_id: str): + """Get the status of a task.""" + task_manager = get_task_manager(func_name) + task_info = task_manager.get_status(task_id) + + if not task_info: + raise HTTPException(status_code=404, detail="Task not found") + + return { + "task_id": task_info.task_id, + "status": task_info.status.value, + "created_at": task_info.created_at, + "started_at": task_info.started_at, + "completed_at": task_info.completed_at, + } + + @app.get( + f"{path_prefix}/{{task_id}}/result", + summary="Get task result", + tags=["tasks"], + ) + async def get_task_result( + task_id: str, + wait: bool = False, + timeout: Optional[float] = None, + ): + """ + Get the result of a completed task. + + Args: + task_id: Task identifier + wait: Whether to block until task completes + timeout: Maximum time to wait in seconds + """ + task_manager = get_task_manager(func_name) + + try: + result = task_manager.get_result(task_id, wait=wait, timeout=timeout) + return {"task_id": task_id, "status": "completed", "result": result} + except ValueError as e: + # Task not found, failed, or not completed + task_info = task_manager.get_status(task_id) + if not task_info: + raise HTTPException(status_code=404, detail="Task not found") + + if task_info.status == TaskStatus.FAILED: + return JSONResponse( + status_code=500, + content={ + "task_id": task_id, + "status": "failed", + "error": task_info.error, + "traceback": task_info.traceback, + }, + ) + else: + # Still pending/running + return JSONResponse( + status_code=202, + content={ + "task_id": task_id, + "status": task_info.status.value, + "message": "Task not yet completed", + }, + ) + except TimeoutError: + raise HTTPException( + status_code=408, detail=f"Task did not complete within {timeout}s" + ) + + @app.get( + f"{path_prefix}/{{task_id}}", + summary="Get complete task info", + tags=["tasks"], + ) + async def get_task_info(task_id: str): + """Get complete information about a task.""" + task_manager = get_task_manager(func_name) + task_info = task_manager.get_status(task_id) + + if not task_info: + raise HTTPException(status_code=404, detail="Task not found") + + return task_info.to_dict() + + @app.delete( + f"{path_prefix}/{{task_id}}", + summary="Cancel or delete a task", + tags=["tasks"], + ) + async def delete_task(task_id: str): + """Cancel (if running) or delete a task.""" + task_manager = get_task_manager(func_name) + deleted = task_manager.cancel_task(task_id) + + if not deleted: + raise HTTPException(status_code=404, detail="Task not found") + + return {"task_id": task_id, "status": "deleted"} + + +def add_global_task_endpoints( + app: FastAPI, + path_prefix: str = "/tasks", +) -> None: + """ + Add global task management endpoints (cross all functions). + + Creates: + - GET {path_prefix}/ - List all recent tasks + + Args: + app: FastAPI application + path_prefix: URL path prefix for task endpoints + """ + + @app.get( + f"{path_prefix}/", + summary="List all tasks", + tags=["tasks"], + ) + async def list_all_tasks(limit: int = 100): + """List recent tasks across all functions.""" + from qh.async_tasks import _task_managers + + all_tasks = [] + for func_name, manager in _task_managers.items(): + tasks = manager.list_tasks(limit=limit) + for task in tasks: + task_dict = task.to_dict() + task_dict["function"] = func_name + all_tasks.append(task_dict) + + # Sort by creation time, most recent first + all_tasks.sort(key=lambda t: t["created_at"], reverse=True) + + return {"tasks": all_tasks[:limit]} diff --git a/qh/async_tasks.py b/qh/async_tasks.py new file mode 100644 index 0000000..e683230 --- /dev/null +++ b/qh/async_tasks.py @@ -0,0 +1,510 @@ +""" +Async task processing for qh. + +Provides a minimal, boilerplate-free way to handle long-running operations +by returning task IDs immediately and allowing clients to poll for results. + +Terminology (standard async task processing): +- Task: An asynchronous computation +- Task ID: Unique identifier for tracking a task +- Task Status: State of the task (pending, running, completed, failed) +- Task Result: The output of the completed task + +Design Philosophy: +- Convention over configuration with escape hatches +- Pluggable backends (in-memory, file-based, au, Celery, etc.) +- HTTP-first patterns (query params, standard endpoints) +""" + +import uuid +import time +import threading +import traceback +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from dataclasses import dataclass, field, asdict +from typing import Any, Callable, Dict, Optional, Protocol, Union +from datetime import datetime +from enum import Enum + + +class TaskStatus(str, Enum): + """Standard task status values.""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class TaskInfo: + """Information about a task's state.""" + task_id: str + status: TaskStatus + created_at: float + started_at: Optional[float] = None + completed_at: Optional[float] = None + result: Any = None + error: Optional[str] = None + traceback: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + data = asdict(self) + # Convert status enum to string + data['status'] = self.status.value + # Add computed fields + if self.started_at: + data['duration'] = ( + (self.completed_at or time.time()) - self.started_at + ) + return data + + +class TaskStore(ABC): + """Abstract interface for task storage backends.""" + + @abstractmethod + def create_task(self, task_id: str, func_name: str) -> TaskInfo: + """Create a new task record.""" + pass + + @abstractmethod + def get_task(self, task_id: str) -> Optional[TaskInfo]: + """Retrieve task information.""" + pass + + @abstractmethod + def update_task(self, task_info: TaskInfo) -> None: + """Update task information.""" + pass + + @abstractmethod + def delete_task(self, task_id: str) -> bool: + """Delete a task. Returns True if deleted, False if not found.""" + pass + + @abstractmethod + def list_tasks(self, limit: int = 100) -> list[TaskInfo]: + """List recent tasks.""" + pass + + +class InMemoryTaskStore(TaskStore): + """Simple in-memory task storage (not persistent, single-process only).""" + + def __init__(self, ttl: Optional[int] = None): + """ + Initialize in-memory store. + + Args: + ttl: Time-to-live in seconds for completed tasks (None = keep forever) + """ + self._tasks: Dict[str, TaskInfo] = {} + self._lock = threading.RLock() + self.ttl = ttl + + def _cleanup_expired(self): + """Remove expired tasks.""" + if not self.ttl: + return + + now = time.time() + expired = [ + task_id + for task_id, info in self._tasks.items() + if info.completed_at and (now - info.completed_at) > self.ttl + ] + for task_id in expired: + del self._tasks[task_id] + + def create_task(self, task_id: str, func_name: str) -> TaskInfo: + with self._lock: + self._cleanup_expired() + task_info = TaskInfo( + task_id=task_id, + status=TaskStatus.PENDING, + created_at=time.time(), + ) + self._tasks[task_id] = task_info + return task_info + + def get_task(self, task_id: str) -> Optional[TaskInfo]: + with self._lock: + self._cleanup_expired() + return self._tasks.get(task_id) + + def update_task(self, task_info: TaskInfo) -> None: + with self._lock: + self._tasks[task_info.task_id] = task_info + + def delete_task(self, task_id: str) -> bool: + with self._lock: + if task_id in self._tasks: + del self._tasks[task_id] + return True + return False + + def list_tasks(self, limit: int = 100) -> list[TaskInfo]: + with self._lock: + self._cleanup_expired() + # Return most recent tasks first + sorted_tasks = sorted( + self._tasks.values(), + key=lambda t: t.created_at, + reverse=True, + ) + return sorted_tasks[:limit] + + +class TaskExecutor(ABC): + """Abstract interface for task execution backends.""" + + @abstractmethod + def submit_task( + self, + task_id: str, + func: Callable, + args: tuple, + kwargs: dict, + callback: Callable[[str, Any, Optional[Exception]], None], + ) -> None: + """ + Submit a task for execution. + + Args: + task_id: Unique task identifier + func: Function to execute + args: Positional arguments + kwargs: Keyword arguments + callback: Called when task completes with (task_id, result, error) + """ + pass + + @abstractmethod + def shutdown(self, wait: bool = True) -> None: + """Shutdown the executor.""" + pass + + +class ThreadPoolTaskExecutor(TaskExecutor): + """Execute tasks using a thread pool (good for I/O-bound tasks).""" + + def __init__(self, max_workers: Optional[int] = None): + """ + Initialize thread pool executor. + + Args: + max_workers: Maximum number of worker threads (None = CPU count * 5) + """ + self._pool = ThreadPoolExecutor(max_workers=max_workers) + + def submit_task( + self, + task_id: str, + func: Callable, + args: tuple, + kwargs: dict, + callback: Callable[[str, Any, Optional[Exception]], None], + ) -> None: + def wrapper(): + try: + result = func(*args, **kwargs) + callback(task_id, result, None) + except Exception as e: + callback(task_id, None, e) + + self._pool.submit(wrapper) + + def shutdown(self, wait: bool = True) -> None: + self._pool.shutdown(wait=wait) + + +class ProcessPoolTaskExecutor(TaskExecutor): + """Execute tasks using a process pool (good for CPU-bound tasks).""" + + def __init__(self, max_workers: Optional[int] = None): + """ + Initialize process pool executor. + + Args: + max_workers: Maximum number of worker processes (None = CPU count) + """ + self._pool = ProcessPoolExecutor(max_workers=max_workers) + + def submit_task( + self, + task_id: str, + func: Callable, + args: tuple, + kwargs: dict, + callback: Callable[[str, Any, Optional[Exception]], None], + ) -> None: + def wrapper(): + return func(*args, **kwargs) + + future = self._pool.submit(wrapper) + + def done_callback(fut): + try: + result = fut.result() + callback(task_id, result, None) + except Exception as e: + callback(task_id, None, e) + + future.add_done_callback(done_callback) + + def shutdown(self, wait: bool = True) -> None: + self._pool.shutdown(wait=wait) + + +@dataclass +class TaskConfig: + """ + Configuration for async task processing. + + This is the explicit configuration. The convention is to use sane defaults. + """ + + # Storage backend for task state + store: Optional[TaskStore] = None + + # Execution backend (thread pool, process pool, etc.) + executor: Optional[TaskExecutor] = None + + # Time-to-live for completed tasks (seconds) + ttl: int = 3600 + + # How to determine if a request should be async + # Options: 'query' (check ?async=true), 'header' (check X-Async: true), 'always' + async_mode: str = 'query' + + # Query parameter name for async mode + async_param: str = 'async' + + # Header name for async mode + async_header: str = 'X-Async' + + # Whether to create task management endpoints (GET /tasks/{id}, etc.) + create_task_endpoints: bool = True + + # Default executor type if not specified: 'thread' or 'process' + default_executor: str = 'thread' + + def get_store(self) -> TaskStore: + """Get or create the task store.""" + if self.store is None: + self.store = InMemoryTaskStore(ttl=self.ttl) + return self.store + + def get_executor(self) -> TaskExecutor: + """Get or create the task executor.""" + if self.executor is None: + if self.default_executor == 'process': + self.executor = ProcessPoolTaskExecutor() + else: + self.executor = ThreadPoolTaskExecutor() + return self.executor + + +class TaskManager: + """ + Manages async task execution and state. + + This is the main coordinator between stores, executors, and HTTP handlers. + """ + + def __init__(self, config: Optional[TaskConfig] = None): + """ + Initialize task manager. + + Args: + config: Task configuration (uses defaults if None) + """ + self.config = config or TaskConfig() + self.store = self.config.get_store() + self.executor = self.config.get_executor() + + def create_task( + self, + func: Callable, + args: tuple = (), + kwargs: Optional[dict] = None, + ) -> str: + """ + Create and submit a new task. + + Args: + func: Function to execute asynchronously + args: Positional arguments + kwargs: Keyword arguments + + Returns: + Task ID + """ + kwargs = kwargs or {} + task_id = str(uuid.uuid4()) + + # Create task record + task_info = self.store.create_task(task_id, func.__name__) + + # Update status to running and set started_at + task_info = self.store.get_task(task_id) # Get fresh copy + task_info.status = TaskStatus.RUNNING + task_info.started_at = time.time() + self.store.update_task(task_info) + + # Submit for execution (after status is set to running) + self.executor.submit_task( + task_id, + func, + args, + kwargs, + self._task_callback, + ) + + return task_id + + def _task_callback( + self, task_id: str, result: Any, error: Optional[Exception] + ) -> None: + """Called when a task completes.""" + task_info = self.store.get_task(task_id) + if not task_info: + return + + task_info.completed_at = time.time() + + if error: + task_info.status = TaskStatus.FAILED + task_info.error = str(error) + task_info.traceback = "".join( + traceback.format_exception(type(error), error, error.__traceback__) + ) + else: + task_info.status = TaskStatus.COMPLETED + task_info.result = result + + self.store.update_task(task_info) + + def get_status(self, task_id: str) -> Optional[TaskInfo]: + """Get task status and metadata.""" + return self.store.get_task(task_id) + + def get_result( + self, task_id: str, wait: bool = False, timeout: Optional[float] = None + ) -> Any: + """ + Get task result. + + Args: + task_id: Task identifier + wait: Whether to block until task completes + timeout: Maximum time to wait in seconds (None = wait forever) + + Returns: + Task result if completed + + Raises: + ValueError: If task not found or failed + TimeoutError: If wait times out + """ + if wait: + start_time = time.time() + while True: + task_info = self.store.get_task(task_id) + if not task_info: + raise ValueError(f"Task not found: {task_id}") + + if task_info.status == TaskStatus.COMPLETED: + return task_info.result + elif task_info.status == TaskStatus.FAILED: + raise ValueError(f"Task failed: {task_info.error}") + + if timeout and (time.time() - start_time) > timeout: + raise TimeoutError(f"Task did not complete within {timeout}s") + + time.sleep(0.1) # Poll every 100ms + else: + task_info = self.store.get_task(task_id) + if not task_info: + raise ValueError(f"Task not found: {task_id}") + + if task_info.status == TaskStatus.COMPLETED: + return task_info.result + elif task_info.status == TaskStatus.FAILED: + raise ValueError(f"Task failed: {task_info.error}") + else: + raise ValueError(f"Task not yet completed: {task_info.status.value}") + + def cancel_task(self, task_id: str) -> bool: + """ + Cancel a task (if possible). + + Note: Cancellation is best-effort and may not work for all executors. + + Returns: + True if task was cancelled or deleted + """ + # For now, just delete the task record + # TODO: Implement proper cancellation for executors that support it + return self.store.delete_task(task_id) + + def list_tasks(self, limit: int = 100) -> list[TaskInfo]: + """List recent tasks.""" + return self.store.list_tasks(limit=limit) + + def shutdown(self) -> None: + """Shutdown the task manager and its executor.""" + self.executor.shutdown(wait=True) + + +# Global registry of task managers (one per function) +_task_managers: Dict[str, TaskManager] = {} + + +def get_task_manager(func_name: str, config: Optional[TaskConfig] = None) -> TaskManager: + """ + Get or create a task manager for a function. + + Args: + func_name: Name of the function + config: Task configuration (only used when creating new manager) + + Returns: + TaskManager instance + """ + if func_name not in _task_managers: + _task_managers[func_name] = TaskManager(config) + return _task_managers[func_name] + + +def should_run_async( + request: Any, # FastAPI Request + config: TaskConfig, +) -> bool: + """ + Determine if a request should be executed asynchronously. + + Args: + request: FastAPI Request object + config: Task configuration + + Returns: + True if request should be async + """ + if config.async_mode == 'always': + return True + + if config.async_mode == 'query': + # Check query parameter + value = request.query_params.get(config.async_param, '').lower() + return value in ('true', '1', 'yes') + + if config.async_mode == 'header': + # Check header + value = request.headers.get(config.async_header, '').lower() + return value in ('true', '1', 'yes') + + return False diff --git a/qh/config.py b/qh/config.py index 58c20b6..a63108f 100644 --- a/qh/config.py +++ b/qh/config.py @@ -8,10 +8,13 @@ 4. Parameter-level config """ -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING from dataclasses import dataclass, field, replace from qh.rules import RuleChain, DEFAULT_RULE_CHAIN, HttpLocation +if TYPE_CHECKING: + from qh.async_tasks import TaskConfig + @dataclass class RouteConfig: @@ -29,6 +32,9 @@ class RouteConfig: # Parameter-specific overrides {param_name: transform_spec} param_overrides: Dict[str, Any] = field(default_factory=dict) + # Async task configuration (None = not async, TaskConfig = async enabled) + async_config: Optional['TaskConfig'] = None + # Additional metadata summary: Optional[str] = None description: Optional[str] = None @@ -46,6 +52,7 @@ def merge_with(self, other: 'RouteConfig') -> 'RouteConfig': methods=other.methods if other.methods is not None else self.methods, rule_chain=other.rule_chain if other.rule_chain is not None else self.rule_chain, param_overrides={**self.param_overrides, **other.param_overrides}, + async_config=other.async_config if other.async_config is not None else self.async_config, summary=other.summary if other.summary is not None else self.summary, description=other.description if other.description is not None else self.description, tags=other.tags if other.tags is not None else self.tags, diff --git a/qh/endpoint.py b/qh/endpoint.py index 46700ca..b3197a8 100644 --- a/qh/endpoint.py +++ b/qh/endpoint.py @@ -231,7 +231,39 @@ async def endpoint(request: Request) -> Response: if param_name not in transformed_params: transformed_params[param_name] = param.default - # Call the wrapped function + # Check if this should be executed as an async task + if route_config.async_config is not None: + from qh.async_tasks import get_task_manager, should_run_async + + if should_run_async(request, route_config.async_config): + # Execute asynchronously and return task ID + task_manager = get_task_manager( + func.__name__, route_config.async_config + ) + + # Create wrapper that handles sync/async functions + def task_wrapper(**kwargs): + if is_async: + # For async functions, we need to run them in an event loop + import asyncio + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(func(**kwargs)) + finally: + loop.close() + else: + return func(**kwargs) + + task_id = task_manager.create_task( + task_wrapper, kwargs=transformed_params + ) + + return JSONResponse( + content={"task_id": task_id, "status": "submitted"}, + status_code=202, # Accepted + ) + + # Call the wrapped function synchronously if is_async: result = await func(**transformed_params) else: diff --git a/qh/examples/async_example.py b/qh/examples/async_example.py new file mode 100644 index 0000000..e91a93a --- /dev/null +++ b/qh/examples/async_example.py @@ -0,0 +1,188 @@ +""" +Example of async task processing with qh. + +This demonstrates the boilerplate-minimal way to handle long-running operations. +""" + +import time +from qh import mk_app, TaskConfig, ThreadPoolTaskExecutor, ProcessPoolTaskExecutor + + +# Define some functions (mix of sync and async) +def quick_add(x: int, y: int) -> int: + """A fast operation that doesn't need async.""" + return x + y + + +def slow_multiply(x: int, y: int) -> int: + """A slow operation that benefits from async.""" + time.sleep(5) # Simulate expensive computation + return x * y + + +def cpu_intensive_task(n: int) -> int: + """A CPU-bound task that should use process pool.""" + # Compute fibonacci (inefficiently) + def fib(x): + if x <= 1: + return x + return fib(x - 1) + fib(x - 2) + + return fib(n) + + +async def async_fetch_data(url: str) -> dict: + """An async function that can also be used as a task.""" + import asyncio + await asyncio.sleep(2) # Simulate network request + return {"url": url, "status": "fetched"} + + +# Example 1: Minimal setup - just specify which functions should be async +print("Example 1: Minimal async setup") +print("=" * 60) + +app1 = mk_app( + [quick_add, slow_multiply], + async_funcs=['slow_multiply'], # Only slow_multiply supports async +) + +print("Created app with async support for slow_multiply") +print("Usage:") +print(" POST /slow_multiply?x=5&y=10 -> Returns result immediately (blocks 5s)") +print(" POST /slow_multiply?x=5&y=10&async=true -> Returns task_id immediately") +print(" GET /tasks/{task_id}/status -> Check task status") +print(" GET /tasks/{task_id}/result -> Get result when ready") +print() + + +# Example 2: Custom configuration +print("Example 2: Custom async configuration") +print("=" * 60) + +app2 = mk_app( + [slow_multiply, cpu_intensive_task], + async_funcs=['slow_multiply', 'cpu_intensive_task'], + async_config={ + 'slow_multiply': TaskConfig( + executor=ThreadPoolTaskExecutor(max_workers=4), + ttl=1800, # Keep results for 30 minutes + ), + 'cpu_intensive_task': TaskConfig( + executor=ProcessPoolTaskExecutor(max_workers=2), + ttl=3600, # Keep results for 1 hour + ), + }, +) + +print("Created app with custom executors:") +print(" - slow_multiply uses thread pool (I/O-bound)") +print(" - cpu_intensive_task uses process pool (CPU-bound)") +print() + + +# Example 3: Always async mode +print("Example 3: Always async (no query param needed)") +print("=" * 60) + +always_async_config = TaskConfig( + async_mode='always', # Every request is async +) + +app3 = mk_app( + [slow_multiply], + async_funcs=['slow_multiply'], + async_config=always_async_config, +) + +print("Created app where slow_multiply is ALWAYS async") +print(" POST /slow_multiply?x=5&y=10 -> Always returns task_id") +print() + + +# Example 4: Header-based async mode +print("Example 4: Header-based async control") +print("=" * 60) + +header_config = TaskConfig( + async_mode='header', # Check X-Async header + async_header='X-Async-Task', # Custom header name +) + +app4 = mk_app( + [slow_multiply], + async_funcs=['slow_multiply'], + async_config=header_config, +) + +print("Created app with header-based async control") +print(" POST /slow_multiply with X-Async-Task: true -> Returns task_id") +print() + + +# Example 5: Complete application with multiple functions +print("Example 5: Complete application") +print("=" * 60) + +app5 = mk_app( + [quick_add, slow_multiply, cpu_intensive_task, async_fetch_data], + async_funcs=['slow_multiply', 'cpu_intensive_task', 'async_fetch_data'], +) + +print("Created complete app with:") +print(" - quick_add: synchronous only") +print(" - slow_multiply: supports async") +print(" - cpu_intensive_task: supports async") +print(" - async_fetch_data: supports async (native async function)") +print() +print("Available endpoints:") +print(" POST /quick_add") +print(" POST /slow_multiply") +print(" POST /cpu_intensive_task") +print(" POST /async_fetch_data") +print(" GET /tasks/") +print(" GET /tasks/{task_id}") +print(" GET /tasks/{task_id}/status") +print(" GET /tasks/{task_id}/result") +print(" DELETE /tasks/{task_id}") +print() + + +# Example 6: Using the app programmatically +if __name__ == "__main__": + print("Example 6: Running the app") + print("=" * 60) + + from qh.testing import test_app + + # Create test client + with test_app(app5) as client: + print("\n1. Synchronous call to quick_add:") + response = client.post("/quick_add", json={"x": 3, "y": 4}) + print(f" Response: {response.json()}") + + print("\n2. Synchronous call to slow_multiply (blocks):") + response = client.post("/slow_multiply", json={"x": 5, "y": 6}) + print(f" Response: {response.json()}") + + print("\n3. Async call to slow_multiply:") + response = client.post("/slow_multiply?async=true", json={"x": 7, "y": 8}) + task_data = response.json() + print(f" Task submitted: {task_data}") + + task_id = task_data["task_id"] + + print("\n4. Check task status:") + response = client.get(f"/tasks/{task_id}/status") + print(f" Status: {response.json()}") + + print("\n5. Wait for result (blocking):") + response = client.get(f"/tasks/{task_id}/result?wait=true&timeout=10") + print(f" Result: {response.json()}") + + print("\n6. List all tasks:") + response = client.get("/tasks/") + tasks = response.json()["tasks"] + print(f" Found {len(tasks)} tasks") + for task in tasks[:3]: # Show first 3 + print(f" - {task['task_id']}: {task['status']}") diff --git a/qh/tests/test_async_tasks.py b/qh/tests/test_async_tasks.py new file mode 100644 index 0000000..d45cdbc --- /dev/null +++ b/qh/tests/test_async_tasks.py @@ -0,0 +1,354 @@ +""" +Tests for async task processing functionality. +""" + +import time +import pytest +from qh import mk_app, TaskConfig, TaskStatus +from qh.testing import test_app + + +def slow_function(x: int) -> int: + """A function that takes time to complete.""" + time.sleep(0.5) + return x * 2 + + +def failing_function(x: int) -> int: + """A function that raises an error.""" + raise ValueError("Intentional error") + + +async def async_function(x: int) -> int: + """An async function.""" + import asyncio + await asyncio.sleep(0.1) + return x + 10 + + +class TestBasicAsync: + """Test basic async functionality.""" + + def test_sync_execution(self): + """Test that functions work normally without async flag.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + response = client.post("/slow_function", json={"x": 5}) + assert response.status_code == 200 + assert response.json() == 10 + + def test_async_execution(self): + """Test async execution returns task ID.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Request async execution + response = client.post("/slow_function?async=true", json={"x": 5}) + assert response.status_code == 202 # Accepted + data = response.json() + assert "task_id" in data + assert data["status"] == "submitted" + + def test_task_status(self): + """Test checking task status.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Submit task + response = client.post("/slow_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Check status + response = client.get(f"/tasks/{task_id}/status") + assert response.status_code == 200 + status_data = response.json() + assert status_data["task_id"] == task_id + assert status_data["status"] in ["pending", "running", "completed"] + + def test_task_result_wait(self): + """Test waiting for task result.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Submit task + response = client.post("/slow_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Wait for result + response = client.get( + f"/tasks/{task_id}/result?wait=true&timeout=5" + ) + assert response.status_code == 200 + result_data = response.json() + assert result_data["status"] == "completed" + assert result_data["result"] == 10 + + def test_task_not_found(self): + """Test error when task doesn't exist.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + response = client.get("/tasks/nonexistent/status") + assert response.status_code == 404 + + def test_failed_task(self): + """Test handling of failed tasks.""" + app = mk_app([failing_function], async_funcs=['failing_function']) + + with test_app(app) as client: + # Submit task that will fail + response = client.post( + "/failing_function?async=true", json={"x": 5} + ) + task_id = response.json()["task_id"] + + # Wait for the task to process + time.sleep(0.5) + + # Get full task info to check error was captured + response = client.get(f"/tasks/{task_id}") + assert response.status_code == 200 + info = response.json() + + # The important thing is that the error was captured + assert "error" in info + assert "Intentional error" in info["error"] + assert info["traceback"] is not None + + +class TestAsyncConfig: + """Test async configuration options.""" + + def test_always_async(self): + """Test always async mode.""" + config = TaskConfig(async_mode='always') + app = mk_app( + [slow_function], + async_funcs=['slow_function'], + async_config=config, + ) + + with test_app(app) as client: + # Even without async=true, should return task ID + response = client.post("/slow_function", json={"x": 5}) + assert response.status_code == 202 + assert "task_id" in response.json() + + def test_header_mode(self): + """Test header-based async mode.""" + config = TaskConfig(async_mode='header', async_header='X-Async') + app = mk_app( + [slow_function], + async_funcs=['slow_function'], + async_config=config, + ) + + with test_app(app) as client: + # With header + response = client.post( + "/slow_function", + json={"x": 5}, + headers={"X-Async": "true"}, + ) + assert response.status_code == 202 + assert "task_id" in response.json() + + # Without header (should be sync) + response = client.post("/slow_function", json={"x": 5}) + assert response.status_code == 200 + assert response.json() == 10 + + def test_ttl_config(self): + """Test TTL configuration.""" + config = TaskConfig(ttl=1) # 1 second TTL + app = mk_app( + [slow_function], + async_funcs=['slow_function'], + async_config=config, + ) + + with test_app(app) as client: + # Submit and complete task + response = client.post("/slow_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Wait for completion + time.sleep(1) + + # Should still be there + response = client.get(f"/tasks/{task_id}/status") + assert response.status_code == 200 + + # Wait for TTL to expire + time.sleep(2) + + # Trigger cleanup by creating new task + response = client.post("/slow_function?async=true", json={"x": 6}) + + # Original task might be gone (TTL cleanup is opportunistic) + # This is okay - just testing that TTL doesn't crash + + +class TestAsyncFunction: + """Test with native async functions.""" + + def test_async_function_sync_mode(self): + """Test async function in sync mode.""" + app = mk_app([async_function], async_funcs=['async_function']) + + with test_app(app) as client: + response = client.post("/async_function", json={"x": 5}) + assert response.status_code == 200 + assert response.json() == 15 + + def test_async_function_async_mode(self): + """Test async function in async mode.""" + app = mk_app([async_function], async_funcs=['async_function']) + + with test_app(app) as client: + # Submit as task + response = client.post("/async_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Wait for result + response = client.get( + f"/tasks/{task_id}/result?wait=true&timeout=5" + ) + assert response.status_code == 200 + assert response.json()["result"] == 15 + + +class TestTaskManagement: + """Test task management endpoints.""" + + def test_list_tasks(self): + """Test listing all tasks.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Create a few tasks + task_ids = [] + for i in range(3): + response = client.post( + "/slow_function?async=true", json={"x": i} + ) + task_ids.append(response.json()["task_id"]) + + # List tasks + response = client.get("/tasks/") + assert response.status_code == 200 + data = response.json() + assert "tasks" in data + assert len(data["tasks"]) >= 3 + + # Check that our tasks are in the list + listed_ids = {t["task_id"] for t in data["tasks"]} + for task_id in task_ids: + assert task_id in listed_ids + + def test_delete_task(self): + """Test deleting a task.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Create task + response = client.post("/slow_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Delete it + response = client.delete(f"/tasks/{task_id}") + assert response.status_code == 200 + + # Should be gone + response = client.get(f"/tasks/{task_id}/status") + assert response.status_code == 404 + + def test_get_complete_task_info(self): + """Test getting complete task information.""" + app = mk_app([slow_function], async_funcs=['slow_function']) + + with test_app(app) as client: + # Create and complete task + response = client.post("/slow_function?async=true", json={"x": 5}) + task_id = response.json()["task_id"] + + # Wait for completion + time.sleep(1) + + # Get complete info + response = client.get(f"/tasks/{task_id}") + assert response.status_code == 200 + info = response.json() + assert info["task_id"] == task_id + assert info["status"] == "completed" + assert info["result"] == 10 + assert "created_at" in info + assert "duration" in info + + +class TestMultipleFunctions: + """Test with multiple functions.""" + + def test_multiple_async_funcs(self): + """Test multiple functions with async support.""" + def func_a(x: int) -> int: + time.sleep(0.2) + return x * 2 + + def func_b(x: int) -> int: + time.sleep(0.2) + return x * 3 + + app = mk_app( + [func_a, func_b], + async_funcs=['func_a', 'func_b'], + ) + + with test_app(app) as client: + # Submit tasks to both functions + response_a = client.post("/func_a?async=true", json={"x": 5}) + task_a = response_a.json()["task_id"] + + response_b = client.post("/func_b?async=true", json={"x": 5}) + task_b = response_b.json()["task_id"] + + # Wait for both + time.sleep(1) + + # Check results + result_a = client.get(f"/tasks/{task_a}/result").json() + result_b = client.get(f"/tasks/{task_b}/result").json() + + assert result_a["result"] == 10 + assert result_b["result"] == 15 + + def test_mixed_sync_async(self): + """Test mixing sync and async functions.""" + def sync_func(x: int) -> int: + return x + 1 + + def async_func(x: int) -> int: + time.sleep(0.2) + return x * 2 + + app = mk_app( + [sync_func, async_func], + async_funcs=['async_func'], # Only async_func supports async + ) + + with test_app(app) as client: + # sync_func doesn't support async + response = client.post("/sync_func?async=true", json={"x": 5}) + # Should execute synchronously even with async=true + assert response.status_code == 200 + assert response.json() == 6 + + # async_func supports async + response = client.post("/async_func?async=true", json={"x": 5}) + assert response.status_code == 202 + assert "task_id" in response.json() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])