diff --git a/RATE_LIMIT_SECURITY_AUDIT.md b/RATE_LIMIT_SECURITY_AUDIT.md deleted file mode 100644 index c256604..0000000 --- a/RATE_LIMIT_SECURITY_AUDIT.md +++ /dev/null @@ -1,1007 +0,0 @@ -# Rate Limiting Security Audit Report -**Date:** 2025-12-04 -**Auditor:** Security Review -**Scope:** liminallm rate limiting implementation - ---- - -## Executive Summary - -This audit identified **12 security vulnerabilities** across 3 severity levels: -- **CRITICAL:** 2 issues -- **HIGH:** 3 issues -- **MEDIUM:** 7 issues - -The most critical issues are: -1. Missing rate limits on token refresh endpoint (credential stuffing risk) -2. Missing rate limits on 7 other authenticated endpoints -3. INCR/EXPIRE race condition causing permanent rate limit lockout -4. Email-based DoS attacks on signup/login - ---- - -## CRITICAL Severity Issues - -### 1. Missing Rate Limit on Token Refresh Endpoint - -**Severity:** CRITICAL -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Line:** 695 - -**Code:** -```python -@router.post("/auth/refresh", response_model=Envelope, tags=["auth"]) -async def refresh_tokens( - body: TokenRefreshRequest, - response: Response, - authorization: Optional[str] = Header(None), - x_tenant_id: Optional[str] = Header( - None, convert_underscores=False, alias="X-Tenant-ID" - ), -): - runtime = get_runtime() - tenant_hint = body.tenant_id or x_tenant_id - user, session, tokens = await runtime.auth.refresh_tokens( - body.refresh_token, tenant_hint=tenant_hint - ) - # NO RATE LIMIT CHECK HERE! -``` - -**Attack Scenario:** -1. Attacker obtains or guesses refresh tokens (leaked, brute force, etc.) -2. Can attempt unlimited token refresh attempts without rate limiting -3. Enables credential stuffing attacks on refresh tokens -4. Can test stolen refresh tokens at high speed -5. Can brute-force short/weak refresh tokens - -**Impact:** -- Unlimited authentication bypass attempts -- Credential stuffing on refresh tokens -- Token enumeration attacks -- High server load from spam requests - -**Recommended Fix:** -```python -@router.post("/auth/refresh", response_model=Envelope, tags=["auth"]) -async def refresh_tokens( - body: TokenRefreshRequest, - response: Response, - request: Request, # Add request parameter - authorization: Optional[str] = Header(None), - x_tenant_id: Optional[str] = Header( - None, convert_underscores=False, alias="X-Tenant-ID" - ), -): - runtime = get_runtime() - - # Add rate limiting by IP for unauthenticated endpoint - client_ip = request.client.host if request.client else "unknown" - await _enforce_rate_limit( - runtime, - f"refresh:{client_ip}", - limit=20, # 20 refresh attempts per minute per IP - window_seconds=60, - ) - - tenant_hint = body.tenant_id or x_tenant_id - user, session, tokens = await runtime.auth.refresh_tokens( - body.refresh_token, tenant_hint=tenant_hint - ) -``` - ---- - -### 2. Missing Rate Limits on 7 Additional Endpoints - -**Severity:** CRITICAL -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Lines:** Various - -**Affected Endpoints:** -1. `POST /auth/logout` - Line ~704 -2. `GET /artifacts` - Line ~1200s -3. `POST /tools/{tool_id}/invoke` - Line ~1300s -4. `POST /artifacts` - Line ~1250s -5. `PATCH /artifacts/{artifact_id}` - Line ~1270s -6. `POST /contexts` - Line ~1400s -7. `POST /contexts/{context_id}/sources` - Line ~1450s - -**Attack Scenario (example: /artifacts):** -```bash -# Attacker can spam artifact creation without limits -for i in {1..10000}; do - curl -X POST https://api.example.com/artifacts \ - -H "Authorization: Bearer $TOKEN" \ - -d '{"name":"spam'$i'","content":"x"}' & -done -``` - -**Impact:** -- Resource exhaustion via unlimited artifact/context creation -- Database bloat from spam data -- Storage exhaustion -- Service degradation for legitimate users -- Potential DoS via tool invocation spam - -**Recommended Fix:** -Add rate limiting to each endpoint. Example for artifacts: - -```python -@router.post("/artifacts", response_model=Envelope, status_code=201, tags=["artifacts"]) -async def create_artifact( - body: CreateArtifactRequest, - principal: AuthContext = Depends(get_user) -): - runtime = get_runtime() - - # Add rate limit - await _enforce_rate_limit( - runtime, - f"artifacts:create:{principal.user_id}", - runtime.settings.read_rate_limit_per_minute, # or create a specific limit - 60, - ) - - # ... rest of function -``` - ---- - -## HIGH Severity Issues - -### 3. INCR/EXPIRE Race Condition - Permanent Lockout - -**Severity:** HIGH -**File:** `/home/user/liminallm/liminallm/storage/redis_cache.py` -**Lines:** 62-67 - -**Code:** -```python -# Use pipeline for atomic INCR + EXPIRE to prevent race conditions -pipe = self.client.pipeline() -pipe.incr(redis_key) -pipe.expire(redis_key, ttl) -results = await pipe.execute() -current = results[0] -``` - -**Issue:** -The comment claims "atomic operations" but `pipeline()` without `transaction=True` is **NOT atomic**. It only batches commands to reduce network round-trips. Commands can fail mid-execution. - -**Attack Scenario:** -1. Client sends request, INCR executes (counter=1, no TTL set) -2. Network interruption or process crash before EXPIRE -3. Key exists in Redis with count but no TTL -4. Key never expires (permanent) -5. All future requests for this rate limit bucket are blocked -6. Example: `rate:chat:user123:28847` stuck forever at count=1 - -**Impact:** -- Permanent rate limit lockout for affected keys -- Requires manual Redis intervention to fix -- Can happen during network issues, deployments, crashes -- Affects user experience severely (legitimate users locked out) - -**Proof of Concept:** -```python -# Simulate the race condition -import redis -import time - -r = redis.Redis() -key = "rate:test:123" - -# Simulate INCR succeeding but EXPIRE failing -r.incr(key) # Counter is now 1 -# <--- Process crashes here, EXPIRE never runs - -# Later: -print(r.ttl(key)) # Returns -1 (no expiration) -print(r.get(key)) # Returns 1 - -# This key will never expire! -``` - -**Recommended Fix:** -Use a Lua script for atomic INCR+EXPIRE: - -```python -async def check_rate_limit( - self, key: str, limit: int, window_seconds: int, *, return_remaining: bool = False -) -> Union[bool, Tuple[bool, int]]: - now = datetime.utcnow() - now_bucket = int(now.timestamp() // window_seconds) - redis_key = f"rate:{key}:{now_bucket}" - bucket_end = (now_bucket + 1) * window_seconds - ttl = max(1, int(bucket_end - now.timestamp())) - - # Use Lua script for atomic INCR+EXPIRE - lua_script = """ - local current = redis.call('INCR', KEYS[1]) - redis.call('EXPIRE', KEYS[1], ARGV[1]) - return current - """ - - current = await self.client.eval(lua_script, 1, redis_key, ttl) - - allowed = current <= limit - if return_remaining: - remaining = max(0, limit - current) - return (allowed, remaining) - return allowed -``` - ---- - -### 4. Rate Limit Exhaustion DoS on Signup/Login - -**Severity:** HIGH -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Lines:** 539-544, 585-589 - -**Code:** -```python -# Signup -await _enforce_rate_limit( - runtime, - f"signup:{body.email.lower()}", # Only email-based - runtime.settings.signup_rate_limit_per_minute, - 60, -) - -# Login -await _enforce_rate_limit( - runtime, - f"login:{body.email.lower()}", # Only email-based - runtime.settings.login_rate_limit_per_minute, - 60, -) -``` - -**Attack Scenario:** -1. Attacker knows victim's email: `victim@company.com` -2. Attacker sends 10 signup requests with victim's email (limit is 5) -3. Victim's email is now rate-limited for 60 seconds -4. Victim attempts to sign up → blocked by rate limit -5. Attacker repeats attack every 60 seconds -6. Victim cannot create account or reset password - -**Impact:** -- Account creation denial of service -- Login denial of service -- Password reset denial of service -- Targeted harassment of specific users -- No IP-based protection - -**Recommended Fix:** -Implement dual rate limiting (email + IP): - -```python -@router.post("/auth/signup", response_model=Envelope, status_code=201, tags=["auth"]) -async def signup(body: SignupRequest, request: Request, response: Response): - settings = get_settings() - if not settings.allow_signup: - raise _http_error("forbidden", "signup disabled", status_code=403) - - runtime = get_runtime() - client_ip = request.client.host if request.client else "unknown" - - # Rate limit by both email AND IP - await _enforce_rate_limit( - runtime, - f"signup:email:{body.email.lower()}", - runtime.settings.signup_rate_limit_per_minute, - 60, - ) - - await _enforce_rate_limit( - runtime, - f"signup:ip:{client_ip}", - limit=10, # Allow more attempts per IP (covers multiple users) - window_seconds=60, - ) - - # ... rest of function -``` - ---- - -### 5. No IP-Based Rate Limiting (Distributed Bypass) - -**Severity:** HIGH -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Lines:** All rate limit implementations - -**Issue:** -No client IP extraction or IP-based rate limiting anywhere in the codebase. - -**Search Results:** -```bash -$ grep -r "X-Forwarded-For\|X-Real-IP\|request.client" liminallm/api/ -# No results -``` - -**Attack Scenario:** -1. Attacker controls botnet with 1000 IPs -2. Each IP creates account and gets legitimate user_id -3. Each bot can make 60 chat requests/minute (user-based limit) -4. Total: 60,000 requests/minute from single attacker -5. Bypasses per-user rate limiting completely - -**Impact:** -- Large-scale API abuse -- Resource exhaustion -- Cost inflation (LLM API costs) -- Service degradation -- No protection against distributed attacks - -**Recommended Fix:** -1. Add client IP extraction utility: - -```python -def get_client_ip(request: Request) -> str: - """Extract client IP, respecting reverse proxy headers.""" - # Check X-Forwarded-For header (if behind proxy) - forwarded = request.headers.get("X-Forwarded-For") - if forwarded: - # Take first IP (original client) - return forwarded.split(",")[0].strip() - - # Check X-Real-IP header - real_ip = request.headers.get("X-Real-IP") - if real_ip: - return real_ip.strip() - - # Fallback to direct connection - if request.client: - return request.client.host - - return "unknown" -``` - -2. Add IP-based rate limits to expensive endpoints: - -```python -@router.post("/chat", response_model=Envelope, tags=["chat"]) -async def chat( - body: ChatRequest, - request: Request, - response: Response, - principal: AuthContext = Depends(get_user), -): - runtime = get_runtime() - user_id = principal.user_id - client_ip = get_client_ip(request) - - # Rate limit by user - await _enforce_rate_limit( - runtime, - f"chat:user:{user_id}", - runtime.settings.chat_rate_limit_per_minute, - runtime.settings.chat_rate_limit_window_seconds, - ) - - # Also rate limit by IP (higher limit) - await _enforce_rate_limit( - runtime, - f"chat:ip:{client_ip}", - limit=300, # 5x user limit - window_seconds=60, - ) -``` - -**Security Note:** -Be aware that `X-Forwarded-For` can be spoofed if not behind a trusted reverse proxy. Ensure your proxy (nginx, CloudFlare, etc.) is configured to set this header correctly. - ---- - -## MEDIUM Severity Issues - -### 6. Rate Limit Bypass via Email Variations - -**Severity:** MEDIUM -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Lines:** 541, 587 - -**Code:** -```python -f"signup:{body.email.lower()}" -f"login:{body.email.lower()}" -``` - -**Issue:** -Only applies `.lower()` for normalization. Does not handle: -- Gmail dot addressing: `user.name@gmail.com` = `username@gmail.com` -- Plus addressing: `user+tag1@example.com` vs `user+tag2@example.com` -- Unicode homoglyphs: `test@example.com` vs `tеst@example.com` (Cyrillic е) -- Domain aliases - -**Attack Scenario:** -```python -# Attacker bypasses 5 signup/minute limit: -emails = [ - "user@example.com", - "user+1@example.com", - "user+2@example.com", - "user+3@example.com", - "user+4@example.com", - "user+5@example.com", - # ... up to user+100@example.com -] - -for email in emails: - signup(email) # Each has separate rate limit! -``` - -**Impact:** -- Bypass signup rate limits (5/min → 500/min with plus addressing) -- Bypass login rate limits -- Create many accounts quickly -- Spam prevention ineffective - -**Recommended Fix:** -```python -def normalize_email(email: str) -> str: - """Normalize email for rate limiting.""" - email = email.lower().strip() - - # Split into local and domain - if "@" not in email: - return email - - local, domain = email.rsplit("@", 1) - - # Remove plus addressing: user+tag@domain → user@domain - if "+" in local: - local = local.split("+")[0] - - # Gmail: remove dots from local part - if domain in ("gmail.com", "googlemail.com"): - local = local.replace(".", "") - - return f"{local}@{domain}" - -# Usage: -await _enforce_rate_limit( - runtime, - f"signup:{normalize_email(body.email)}", - runtime.settings.signup_rate_limit_per_minute, - 60, -) -``` - ---- - -### 7. Fixed Window Burst Attack - -**Severity:** MEDIUM -**File:** `/home/user/liminallm/liminallm/storage/redis_cache.py` -**Lines:** 56-58 - -**Code:** -```python -now = datetime.utcnow() -now_bucket = int(now.timestamp() // window_seconds) -redis_key = f"rate:{key}:{now_bucket}" -``` - -**Issue:** -Fixed window implementation allows burst attacks at window boundaries. - -**Attack Scenario:** -``` -Limit: 10 requests per 60 seconds -Window 0: 00:00:00 - 00:00:59 -Window 1: 00:01:00 - 00:01:59 - -Timeline: -00:00:59.0 - Send 10 requests → bucket 0, count=10 ✓ -00:01:00.0 - Send 10 requests → bucket 1, count=10 ✓ - -Result: 20 requests in 1 second (200% of limit) -``` - -**Impact:** -- Can send 2x limit in short burst -- Defeats purpose of rate limiting -- Can cause resource spikes -- Inherent to fixed window algorithm - -**Recommended Fix:** -Implement sliding window with sorted sets: - -```python -async def check_rate_limit_sliding( - self, key: str, limit: int, window_seconds: int, *, return_remaining: bool = False -) -> Union[bool, Tuple[bool, int]]: - """Sliding window rate limit using sorted sets.""" - now = time.time() - redis_key = f"rate:{key}" - - # Lua script for atomic sliding window check - lua_script = """ - local key = KEYS[1] - local now = tonumber(ARGV[1]) - local window = tonumber(ARGV[2]) - local limit = tonumber(ARGV[3]) - - -- Remove old entries outside window - redis.call('ZREMRANGEBYSCORE', key, 0, now - window) - - -- Count current entries - local current = redis.call('ZCARD', key) - - if current < limit then - -- Add new entry - redis.call('ZADD', key, now, now) - redis.call('EXPIRE', key, window) - return {1, limit - current - 1} - else - return {0, 0} - end - """ - - result = await self.client.eval( - lua_script, 1, redis_key, now, window_seconds, limit - ) - - allowed = bool(result[0]) - remaining = result[1] - - if return_remaining: - return (allowed, remaining) - return allowed -``` - ---- - -### 8. Rate Limit Information Disclosure - -**Severity:** MEDIUM -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Lines:** 235-239 - -**Code:** -```python -def apply_headers(self, response: Response) -> None: - """Apply rate limit headers to response per IETF draft-polli-ratelimit-headers.""" - response.headers["X-RateLimit-Limit"] = str(self.limit) - response.headers["X-RateLimit-Remaining"] = str(max(0, self.remaining)) - response.headers["X-RateLimit-Reset"] = str(self.reset_seconds) -``` - -**Issue:** -Exposes rate limit information to attackers. - -**Attack Scenario:** -```bash -$ curl -i https://api.example.com/chat -HTTP/1.1 200 OK -X-RateLimit-Limit: 60 -X-RateLimit-Remaining: 59 -X-RateLimit-Reset: 60 - -# Attacker now knows: -# 1. Exact limit (60 requests) -# 2. When window resets (60 seconds) -# 3. How many requests remaining (59) - -# Can optimize attack: -# - Send exactly 60 requests per minute -# - Time requests to reset window -# - Know when to switch attack vectors -``` - -**Impact:** -- Helps attackers optimize rate limit bypass -- Reveals system capacity information -- Enables precise timing attacks -- Industry best practice debate (some argue transparency is good) - -**Recommended Fix:** -Make headers optional via configuration: - -```python -class Settings: - # Add to config - expose_rate_limit_headers: bool = env_field(False, "EXPOSE_RATE_LIMIT_HEADERS") - -# In apply_headers: -def apply_headers(self, response: Response, expose: bool = False) -> None: - if not expose: - return - - response.headers["X-RateLimit-Limit"] = str(self.limit) - response.headers["X-RateLimit-Remaining"] = str(max(0, self.remaining)) - response.headers["X-RateLimit-Reset"] = str(self.reset_seconds) -``` - ---- - -### 9. Negative/Zero Limit Bypass - -**Severity:** MEDIUM (requires misconfiguration) -**File:** `/home/user/liminallm/liminallm/service/runtime.py` -**Lines:** 263-264 - -**Code:** -```python -if limit <= 0: - return (True, limit) if return_remaining else True -``` - -**Issue:** -If limit is 0 or negative, ALL requests are allowed (not blocked). - -**Attack Scenario:** -```python -# Admin mistakenly sets limit to 0 thinking it means "block all" -CHAT_RATE_LIMIT_PER_MINUTE=0 - -# Or attacker exploits config injection to set negative value -# Result: Unlimited requests allowed -``` - -**Impact:** -- Bypasses all rate limiting if misconfigured -- Counter-intuitive behavior (0 should mean no access) -- Could be exploited via config injection - -**Recommended Fix:** -```python -if limit <= 0: - # Raise error instead of silently allowing - logger.error( - "rate_limit_invalid_config", - key=key, - limit=limit, - message="Rate limit must be positive integer" - ) - raise ValueError(f"Invalid rate limit: {limit}. Must be > 0") -``` - ---- - -### 10. In-Memory Rate Limit State Loss - -**Severity:** MEDIUM -**File:** `/home/user/liminallm/liminallm/service/runtime.py` -**Lines:** 160-161, 278-286 - -**Code:** -```python -# Runtime initialization -self._local_rate_limits: Dict[str, Tuple[datetime, int]] = {} -self._local_rate_limit_lock = asyncio.Lock() - -# Fallback when Redis unavailable -async with runtime._local_rate_limit_lock: - window_start, count = runtime._local_rate_limits.get(key, (now, 0)) - if now - window_start >= window: - window_start, count = now, 0 - new_count = count + 1 - allowed = new_count <= limit - if allowed: - runtime._local_rate_limits[key] = (window_start, new_count) -``` - -**Issue:** -In-memory rate limits are lost on: -- Application restart -- Server crash -- Deployment -- Multiple application instances (not shared) - -**Attack Scenario:** -```bash -# Attacker monitors for deployments -1. Send 60/60 chat requests (at limit) -2. Trigger app restart (or wait for deployment) -3. Rate limit counter resets to 0 -4. Send another 60 requests immediately -5. Repeat on each restart - -# With multiple app instances behind load balancer: -1. Each instance has separate in-memory counters -2. Round-robin to 10 instances -3. 60 requests/min per instance = 600 total requests/min -4. 10x bypass of intended limit -``` - -**Impact:** -- Rate limits reset on restart (attack window) -- Multi-instance deployments ineffective -- Can't enforce limits across fleet -- Only works in single-instance dev mode - -**Current Mitigation:** -Documentation warns about this (lines 69-78): -```python -logger.warning( - "redis_disabled_fallback", - message=( - f"Running without Redis under {fallback_mode}; rate limits, idempotency durability, and " - "workflow/router caches are in-memory only." - ), -) -``` - -**Recommended Fix:** -Require Redis in production: - -```python -if not self.cache: - if not self.settings.test_mode and not self.settings.allow_redis_fallback_dev: - raise RuntimeError( - "Redis is required for sessions, rate limits, idempotency, and workflow caches; " - "start Redis or set TEST_MODE=true/ALLOW_REDIS_FALLBACK_DEV=true for local fallback." - ) -``` - -**Status:** Already enforced (lines 56-63), but environment variables could disable it. - ---- - -### 11. Time Bucket Calculation Integer Overflow - -**Severity:** MEDIUM (theoretical) -**File:** `/home/user/liminallm/liminallm/storage/redis_cache.py` -**Lines:** 57, 59-60 - -**Code:** -```python -now_bucket = int(now.timestamp() // window_seconds) -bucket_end = (now_bucket + 1) * window_seconds -ttl = max(1, int(bucket_end - now.timestamp())) -``` - -**Issue:** -Integer overflow possible with large timestamps or small windows. - -**Attack Scenario:** -```python -# Year 2038 problem (32-bit systems) -now = datetime(2038, 1, 19, 3, 14, 8) # Unix timestamp overflow -now_bucket = int(now.timestamp() // 60) -# Potential negative or wrapped value - -# Or with very small window -window_seconds = 1 -now_bucket = int(1733270400 // 1) # Very large bucket number -bucket_end = (now_bucket + 1) * 1 # Potential overflow -``` - -**Impact:** -- Rate limit keys collide or fail -- TTL calculation incorrect -- Unlikely on 64-bit systems until year 292,277,026,596 - -**Recommended Fix:** -Add validation: - -```python -async def check_rate_limit( - self, key: str, limit: int, window_seconds: int, *, return_remaining: bool = False -) -> Union[bool, Tuple[bool, int]]: - # Validate inputs - if window_seconds <= 0 or window_seconds > 86400: # Max 1 day - raise ValueError(f"Invalid window_seconds: {window_seconds}") - - now = datetime.utcnow() - now_ts = now.timestamp() - - # Use floor division with bounds check - now_bucket = int(now_ts // window_seconds) - if now_bucket < 0: - raise ValueError(f"Invalid bucket calculation: {now_bucket}") - - # ... rest of function -``` - ---- - -### 12. No Rate Limit on Logout (Minor) - -**Severity:** MEDIUM -**File:** `/home/user/liminallm/liminallm/api/routes.py` -**Line:** ~704 - -**Code:** -```python -@router.post("/auth/logout", response_model=Envelope, tags=["auth"]) -async def logout( - authorization: Optional[str] = Header(None), - x_tenant_id: Optional[str] = Header( - None, convert_underscores=False, alias="X-Tenant-ID" - ), -): - runtime = get_runtime() - # NO RATE LIMIT - session_id = _parse_session_cookie(authorization) -``` - -**Attack Scenario:** -```bash -# Spam logout endpoint to cause Redis operations -for i in {1..100000}; do - curl -X POST https://api.example.com/auth/logout \ - -H "Cookie: session_id=fake_session_$i" & -done -``` - -**Impact:** -- Redis spam on session deletion -- Minor DoS vector -- Resource exhaustion - -**Recommended Fix:** -```python -@router.post("/auth/logout", response_model=Envelope, tags=["auth"]) -async def logout( - request: Request, - authorization: Optional[str] = Header(None), - x_tenant_id: Optional[str] = Header( - None, convert_underscores=False, alias="X-Tenant-ID" - ), -): - runtime = get_runtime() - - # Rate limit by IP - client_ip = request.client.host if request.client else "unknown" - await _enforce_rate_limit( - runtime, - f"logout:ip:{client_ip}", - limit=30, - window_seconds=60, - ) - - # ... rest of function -``` - ---- - -## Summary of Recommendations - -### Immediate Actions (Critical/High) - -1. **Add rate limiting to all missing endpoints:** - - `/auth/refresh` (CRITICAL - credential stuffing risk) - - `/auth/logout` - - `/artifacts/*` - - `/tools/{tool_id}/invoke` - - `/contexts/*` - -2. **Fix INCR/EXPIRE race condition:** - - Replace pipeline with Lua script for atomic operations - - Prevents permanent rate limit lockout - -3. **Implement IP-based rate limiting:** - - Add `get_client_ip()` helper - - Add IP rate limits to all unauthenticated endpoints - - Protect against distributed attacks - -4. **Add dual rate limiting to signup/login:** - - Prevent email-based DoS attacks - - Rate limit by both email and IP - -### Medium Priority - -5. **Improve email normalization:** - - Handle plus addressing - - Handle Gmail dot addressing - - Prevent rate limit bypass via email variations - -6. **Consider sliding window algorithm:** - - Prevents burst attacks at window boundaries - - More expensive but more accurate - -7. **Make rate limit headers optional:** - - Reduce information disclosure - - Configurable via environment variable - -8. **Add input validation:** - - Reject limit <= 0 instead of allowing - - Validate window_seconds bounds - - Add overflow checks - -### Configuration Review - -9. **Audit environment variable handling:** - - Ensure rate limit configs cannot be negative - - Add validation in Settings class - - Document security implications - -10. **Require Redis in production:** - - Already enforced, but document clearly - - Prevent TEST_MODE in production - - Add monitoring for Redis failures - ---- - -## Testing Recommendations - -### Unit Tests Needed - -```python -# Test rate limit bypass scenarios -async def test_refresh_endpoint_rate_limit(): - """Ensure refresh endpoint has rate limiting.""" - for i in range(25): # Above limit - response = await client.post("/auth/refresh", json={ - "refresh_token": "fake_token" - }) - if i < 20: - assert response.status_code in [200, 401] # Allowed - else: - assert response.status_code == 429 # Rate limited - -async def test_email_normalization(): - """Test email variations use same rate limit.""" - emails = ["user+1@x.com", "user+2@x.com", "user+3@x.com"] - for email in emails[:5]: # Limit is 5 - await signup(email) - - # 6th request with different variation should be rate limited - response = await signup("user+6@x.com") - assert response.status_code == 429 - -async def test_incr_expire_atomicity(): - """Verify INCR and EXPIRE are atomic.""" - # This requires integration test with Redis - # Simulate network failure between INCR and EXPIRE - pass -``` - -### Load Testing - -```bash -# Test distributed bypass -ab -n 10000 -c 100 https://api.example.com/chat - -# Test window boundary burst -# Send 60 at t=59s, then 60 at t=61s - -# Test email DoS -for i in {1..100}; do - curl -X POST /auth/signup -d '{"email":"victim@x.com"}' -done -``` - ---- - -## Appendix: Rate Limit Bypass Cheat Sheet - -| Attack Vector | Severity | Mitigation | -|--------------|----------|------------| -| Missing endpoint rate limits | CRITICAL | Add rate limits to all endpoints | -| Refresh token stuffing | CRITICAL | Rate limit `/auth/refresh` | -| INCR/EXPIRE race | HIGH | Use Lua script | -| Email-based DoS | HIGH | Dual rate limiting (email + IP) | -| Distributed bypass (no IP limit) | HIGH | Add IP-based rate limiting | -| Email variations (+, dots) | MEDIUM | Normalize emails | -| Window boundary burst | MEDIUM | Use sliding window | -| Information disclosure | MEDIUM | Make headers optional | -| Negative limit bypass | MEDIUM | Validate limit > 0 | -| Multi-instance bypass | MEDIUM | Require Redis in prod | - ---- - -## Conclusion - -The rate limiting implementation has significant gaps that expose the system to: -- **Credential stuffing attacks** (missing rate limits) -- **Denial of service** (email-based DoS, distributed attacks) -- **Resource exhaustion** (unlimited endpoints) -- **Operational issues** (permanent lockouts from race conditions) - -**Priority:** Implement fixes for CRITICAL and HIGH severity issues immediately before production deployment. - -**Estimated effort:** -- Critical fixes: 2-3 days -- High severity fixes: 3-5 days -- Medium priority: 1-2 weeks -- Total: ~2-3 weeks for comprehensive fix - ---- - -*End of Report* diff --git a/docs/ISSUES.md b/docs/ISSUES.md index c8f3755..dc16a8c 100644 --- a/docs/ISSUES.md +++ b/docs/ISSUES.md @@ -1,6 +1,6 @@ # Codebase Issues and Security Audit -**Last Updated:** 2025-12-08 +**Last Updated:** 2025-12-09 **Scope:** Comprehensive review against SPEC.md requirements (12th pass) --- @@ -733,6 +733,14 @@ Redis rate limiting now uses an atomic Lua token bucket with weighted costs and **Fix Applied:** `_safe_float` is now an instance method using `self.logger` for warnings, preserving defensive parsing without crashing. +### 10.7 ~~BUG: Artifact Cursor Timezone Mismatch Breaks Pagination~~ FIXED + +**Location:** `liminallm/storage/memory.py:1138-1146`, `liminallm/storage/cursors.py:9-25` + +**Issue:** `decode_artifact_cursor` returned timezone-aware timestamps while `MemoryStore` artifact `created_at` values were naive UTC datetimes. Comparing them raised `TypeError`, triggering exception handling that skipped cursor filters and caused keyset pagination to repeat the first page. + +**Fix Applied:** Cursor encoding now normalizes timestamps to UTC, and decoding returns UTC-naive datetimes to match in-memory artifacts. Keyset pagination comparisons remain consistent, preventing repeated first pages. + --- ## 11. Authentication Service Security @@ -1135,27 +1143,29 @@ async def revoke_all_user_sessions(...): **Resolution:** `Auth.revoke_all_user_sessions` invokes the store's bulk revocation and then unconditionally clears cached session state via `cache.revoke_user_sessions`, logging (but not aborting) on either failure to avoid skipped steps. Store-level revocation already evicts in-memory cache entries under a lock (`PostgresStore.revoke_user_sessions`), ensuring both the persistent table and local cache are purged even when external cache invalidation encounters transient errors. This keeps cache and database aligned for subsequent session lookups. -### 21.2 CRITICAL: Config Patch Apply Not Atomic +### 21.2 ~~CRITICAL: Config Patch Apply Not Atomic~~ FIXED -**Location:** `liminallm/service/config_ops.py:89-99` +**Locations:** `liminallm/service/config_ops.py`, `liminallm/storage/postgres.py`, `liminallm/storage/memory.py` -Applying a config patch involves multiple operations (validation, application, status update) without transaction wrapping. +**Resolution:** Config patch applications now use store-level atomic helpers (`apply_config_patch`) to persist artifact updates and mark patches applied in one transaction/lock, preventing partial applications when status updates fail. -### 21.3 HIGH: Artifact Create With Versions Not Atomic +### 21.3 ~~HIGH: Artifact Create With Versions Not Atomic~~ FIXED -**Location:** `liminallm/storage/postgres.py:1780-1830` +**Location:** `liminallm/storage/postgres.py:1999-2046` -Creating artifact and first version are separate operations. Failure after artifact create leaves orphan. +**Resolution:** Artifact creation already wraps artifact/version inserts in a single transaction, keeping artifacts and their first versions consistent. -### 21.4 HIGH: User Create With Settings Not Atomic +### 21.4 ~~HIGH: User Create With Settings Not Atomic~~ FIXED -**Location:** `liminallm/storage/postgres.py:188-220` +**Locations:** `liminallm/storage/postgres.py:1060-1112`, `liminallm/storage/memory.py:245-285` -User and initial settings created separately without transaction. +**Resolution:** User creation now seeds default `user_settings` records inside the same transaction/lock as user insertion, so settings cannot be orphaned if later steps fail. -### 21.5 MEDIUM: Conversation Delete Leaves Orphan Messages +### 21.5 ~~MEDIUM: Conversation Delete Leaves Orphan Messages~~ FIXED -If message deletion fails partway through, conversation deleted but messages remain. +**Locations:** `liminallm/storage/postgres.py:1690-1726`, `liminallm/storage/memory.py:589-612` + +**Resolution:** New `delete_conversation` helpers remove conversations and their messages atomically, preventing orphaned message rows. --- @@ -1270,17 +1280,11 @@ No metrics or alerts for connection pool exhaustion. Silent failures under load. ## 24. Edge Cases: Null/Empty/Encoding/Timezone (4th Pass) -### 24.1 CRITICAL: Naive vs Aware Datetime Mixing - -**Location:** `liminallm/service/auth.py:1016`, `liminallm/storage/postgres.py` (multiple) - -```python -expires_at = datetime.utcnow() + timedelta(...) # Naive datetime -``` +### 24.1 ~~CRITICAL: Naive vs Aware Datetime Mixing~~ FIXED -**Issue:** Mixing naive and timezone-aware datetimes causes comparison errors and incorrect expiry calculations. +**Locations:** `liminallm/storage/postgres.py` (conversation creation, config patch timestamps) -**Fix:** Use `datetime.now(timezone.utc)` consistently throughout. +**Resolution:** Store mutations that participate in pagination/keyset comparisons now stamp records with timezone-aware UTC values (`datetime.now(timezone.utc)` and SQL `now()`), eliminating naive/aware comparison errors during cursor filtering. ### 24.2 ~~HIGH: Unsafe .get() Without None Handling~~ (FALSE POSITIVE) @@ -1490,135 +1494,105 @@ objects are provided. ## 28. Service Initialization Issues (5th Pass) -### 28.1 CRITICAL: Thread-Unsafe Singleton in get_runtime() +### 28.1 ~~CRITICAL: Thread-Unsafe Singleton in get_runtime()~~ FIXED -**Location:** `liminallm/service/runtime.py:164-171` +**Location:** `liminallm/service/runtime.py:317-335` -```python -def get_runtime() -> Runtime: - global runtime - if runtime is None: # TOCTOU race - runtime = Runtime() - return runtime -``` +**Resolution:** `get_runtime` now uses a module-level `threading.Lock` to serialize singleton creation. The lock guards Runtime instantiation, preventing TOCTOU races that could allocate duplicate pools or caches under concurrent imports. -**Issue:** Non-atomic check-then-act without locks. Multiple threads can create multiple Runtime instances. +### 28.2 ~~CRITICAL: Asyncio Lock at Module Import Time~~ FIXED -**Impact:** Duplicate database pools, memory leaks, lost state. +**Location:** `liminallm/api/routes.py:107-143` -### 28.2 CRITICAL: Asyncio Lock at Module Import Time +**Resolution:** The active-requests lock is now lazily initialized via `_get_active_requests_lock()`, creating the asyncio lock only when an event loop is available and preventing import-time `RuntimeError`. -**Location:** `liminallm/api/routes.py:113` +### 28.3 ~~HIGH: Missing Cleanup Hooks for Services~~ FIXED -```python -_active_requests_lock = asyncio.Lock() # Created before event loop exists -``` +**Locations:** `liminallm/service/runtime.py:286-310`, `liminallm/app.py:18-74` -**Issue:** Lock created during module import, before any event loop. Can cause "No running event loop" errors. +**Resolution:** Application shutdown now calls `runtime.close()` from the FastAPI lifespan handler. The runtime cleanup routine stops the training worker, shuts down the workflow engine, and closes voice synthesis, Redis caches, and Postgres pools, preventing resource leaks during shutdown. -### 28.3 HIGH: Missing Cleanup Hooks for Services +### 28.4 ~~HIGH: AuthService Mutable State Not Thread-Safe~~ FIXED -**Location:** Multiple files +**Location:** `liminallm/service/auth.py` -- VoiceService has `close()` (voice.py:262) but never called -- PostgreSQL connection pool never explicitly closed -- Redis cache connections not cleaned on shutdown +**Resolution:** Added a shared threading lock with a helper context manager and wrapped all in-memory OAuth state, MFA challenge, and password-reset token mutations with it, preventing concurrent access races when Redis is unavailable and the in-memory fallbacks are used. -**Impact:** Resource leaks on shutdown. +### 28.5 ~~HIGH: Config Validation Deferred to Runtime~~ FIXED -### 28.4 HIGH: AuthService Mutable State Not Thread-Safe +**Location:** `liminallm/config.py:500-584` -**Location:** `liminallm/service/auth.py:128-133` +**Resolution:** JWT secret validation and generation run inside the `Settings` field validator during initial configuration load, ensuring secrets are present before runtime handlers execute and surfacing filesystem errors at startup rather than on first auth request. -```python -self._mfa_challenges: dict[str, tuple[str, datetime]] = {} -self._oauth_states: dict[str, tuple[str, datetime, Optional[str]]] = {} -``` +--- -**Issue:** Multiple unprotected mutable dictionaries accessed concurrently without locks. +## 29. Configuration Validation Issues (5th Pass) -### 28.5 HIGH: Config Validation Deferred to Runtime +### 29.1 ~~CRITICAL: Sensitive Config in Logs~~ FIXED -**Location:** `liminallm/config.py:385-446` +**Location:** `liminallm/service/runtime.py:118-139` -JWT secret generation happens in field validator at first access, not at startup. File system errors occur at first auth request. +**Resolution:** Redis URLs are masked before logging via `_mask_url_password`, preventing password leakage when Redis connectivity falls back to in-memory mode. ---- +### 29.2 ~~CRITICAL: Undocumented Environment Variables~~ FIXED -## 29. Configuration Validation Issues (5th Pass) +**Status:** ✅ IMPLEMENTED -### 29.1 CRITICAL: Sensitive Config in Logs +**Locations:** `liminallm/config.py`, `liminallm/app.py`, `liminallm/service/runtime.py`, `liminallm/storage/memory.py` -**Location:** `liminallm/service/runtime.py:71` +**Resolution:** All previously ad-hoc environment variables are now defined in `Settings` with consistent parsing (log level/JSON/dev mode, build SHA, CORS origins/credentials, HSTS toggle, MFA secret) and are injected into app/runtime construction so the same validated values drive CORS/HSTS, build metadata, and MFA encryption keys. -```python -logger.warning("redis_disabled_fallback", redis_url=self.settings.redis_url) -``` - -**Issue:** Redis URL (may contain password) logged without masking. +### 29.3 ~~HIGH: Missing Integer Range Validators~~ FIXED -### 29.2 CRITICAL: Undocumented Environment Variables +**Status:** ✅ IMPLEMENTED -Multiple env vars read directly via `os.getenv()` but not in Settings class: -- `LOG_LEVEL`, `LOG_JSON`, `LOG_DEV_MODE` (logging.py:98-100) -- `BUILD_SHA` (app.py:22) -- `CORS_ALLOW_ORIGINS` (app.py:55) -- `ENABLE_HSTS` (app.py:123) -- `MFA_SECRET_KEY` (memory.py:113) +**Locations:** `liminallm/config.py` -**Impact:** No centralized config discovery or validation. +**Resolution:** Added positive/range validators for operational integers (SMTP port 1-65535, tmp cleanup windows, training worker polling, global training job caps) to prevent zero/negative/overflow values from booting the runtime. -### 29.3 HIGH: Missing Integer Range Validators +### 29.4 ~~HIGH: Inconsistent Boolean Parsing~~ FIXED -**Location:** `liminallm/config.py:294-341` +**Status:** ✅ IMPLEMENTED -12+ config values (rate limits, TTLs, page sizes) have no min/max bounds: -- `chat_rate_limit_per_minute` - no bounds -- `training_worker_poll_interval` - `0` would loop infinitely -- `smtp_port` - no 1-65535 validation +**Locations:** `liminallm/config.py`, `liminallm/app.py` -### 29.4 HIGH: Inconsistent Boolean Parsing +**Resolution:** Boolean CORS credential and HSTS toggles now flow through Pydantic-managed `Settings`, eliminating ad-hoc string parsing and keeping behavior consistent with other flags. -**Location:** Multiple files +### 29.5 ~~MEDIUM: Optional Config Dependencies Not Validated~~ FIXED -Boolean env vars parsed inconsistently: -- `app.py:72`: `flag.lower() in {"1", "true", "yes", "on"}` -- Pydantic uses stricter parsing +**Status:** ✅ IMPLEMENTED -### 29.5 MEDIUM: Optional Config Dependencies Not Validated +**Location:** `liminallm/config.py` -OAuth and SMTP configs are optional individually but should require pairs: -- `oauth_google_client_id` set but not `oauth_google_client_secret` -- `smtp_host` set but not `smtp_password` +**Resolution:** Post-validation enforces credential pairs for OAuth providers and SMTP, preventing partially configured auth/email settings from starting without required secrets. --- ## 30. Logging and Observability Gaps (5th Pass) -### 30.1 HIGH: Missing Per-Node Latency in Workflow Traces +### 30.1 ~~HIGH: Missing Per-Node Latency in Workflow Traces~~ FIXED + +**Status:** ✅ IMPLEMENTED + +**Location:** `liminallm/service/workflow.py` -**Location:** `liminallm/service/workflow.py:881-882` +**Resolution:** Workflow completions now emit structured trace logs (including per-node latency populated during node execution) through the shared logging helpers, making node timings observable per SPEC §15.2. -**SPEC §15.2 requires:** "workflow traces: per-node latency, retries, timeout counts" +### 30.2 ~~HIGH: Routing/Workflow Trace Functions Never Called~~ FIXED -**Current:** Traces only include node ID and result, not latency metrics. +**Status:** ✅ IMPLEMENTED -### 30.2 HIGH: Routing/Workflow Trace Functions Never Called +**Location:** `liminallm/service/workflow.py` -**Location:** `liminallm/logging.py:117-126` +**Resolution:** Trace emitters now call `log_workflow_trace` and `log_routing_trace` when finalizing message responses, ensuring traces are written to structured logs for observability. -`log_routing_trace()` and `log_workflow_trace()` defined but never used anywhere in codebase. +### 30.3 ~~HIGH: Missing SPEC §15.2 Metrics~~ FIXED -### 30.3 HIGH: Missing SPEC §15.2 Metrics +**Status:** ✅ IMPLEMENTED -**Location:** `liminallm/app.py:263-320` +**Location:** `liminallm/app.py` -`/metrics` endpoint missing: -- Request latency histograms -- Tokens in/out per call -- Adapter usage counts & success_score -- Preference event rates -- Training job metrics +**Resolution:** `/metrics` now exports adapter counts, active training jobs, and total preference events alongside existing health gauges, covering the SPEC §15.2 observability fields for routing/training/feedback usage. ### 30.4 HIGH: Silent Exception in Auth Cache Clear @@ -1669,11 +1643,13 @@ Only 8 logging statements in 3,146 lines. Chat endpoint has no logging of: - Visibility logic includes: user's private + all global + shared within tenant - Users can now discover default workflows, policies, tool specs -### 31.3 HIGH: RAG Cannot Access Shared Contexts +### 31.3 ~~HIGH: RAG Cannot Access Shared Contexts~~ FIXED + +**Status:** ✅ IMPLEMENTED -**Location:** `liminallm/service/rag.py:210, 229` +**Location:** `liminallm/service/rag.py` -RAG filters out all contexts not owned by user, preventing shared knowledge base access. +**Resolution:** Context access now honors shared/global visibility flags, allowing cross-user retrieval when contexts are marked shared while still enforcing tenant scoping for shared items. ### 31.4 ~~HIGH: File Size Limits Not Plan-Differentiated~~ FIXED @@ -1683,11 +1659,13 @@ RAG filters out all contexts not owned by user, preventing shared knowledge base - Added `_get_plan_upload_limit()` with per-plan limits - free: 25MB, paid/enterprise: 200MB per SPEC §18 -### 31.5 MEDIUM: Global Training Job Limit Missing +### 31.5 ~~MEDIUM: Global Training Job Limit Missing~~ FIXED -**Location:** `liminallm/service/training.py:419-428` +**Status:** ✅ IMPLEMENTED + +**Location:** `liminallm/service/training.py`, `liminallm/config.py`, `liminallm/service/runtime.py` -Per-user cooldown enforced but no global concurrency cap. Could exhaust GPU resources. +**Resolution:** Introduced a configurable `max_active_training_jobs` cap enforced before enqueuing training work, with runtime wiring to honor admin/env defaults and log when the global limit is reached. --- @@ -6799,3 +6777,51 @@ The following bugs were identified and fixed: **Fix:** Added `_failure_recorded: True` flag to the error result created in the except block. The subsequent failure recording check now includes `and not tool_result.get("_failure_recorded")` to skip already-recorded failures. The internal flag is excluded from outputs. +### 80.15 ~~MEDIUM: Auto-Prune Dedup Checks Wrong Meta Field~~ FIXED + +**Location:** `liminallm/service/training_worker.py:194-240` + +**Issue:** The adapter auto-prune sweep tried to detect existing recommendations by inspecting `ConfigPatchAudit.meta`, but that field is never populated for the generated patches. The auto-prune marker lives inside the JSON patch operations at `/meta/auto_prune`, so duplicate recommendations were created every cycle. + +**Fix:** The sweep now inspects the patch operations for the auto-prune path and only treats pending patches with that marker as existing recommendations. + +### 80.16 ~~HIGH: Global Cluster Promotions Hidden by Private Visibility~~ FIXED + +**Location:** `liminallm/service/clustering.py:420-451`, `liminallm/storage/memory.py:1155-1184`, `liminallm/storage/postgres.py:2020-2049` + +**Issue:** Skill adapters promoted from global clusters were created with `owner_user_id=None` but default `visibility="private"`. Private visibility requires an owner for access filtering, so these adapters became inaccessible through listing APIs. + +**Fix:** Global promotions now set `visibility="global"`, and artifact creation paths accept explicit visibility so global adapters remain discoverable. + +### 80.17 ~~HIGH: Memory Pagination Drops Cursor Filters on TZ Mismatch~~ FIXED + +**Location:** `liminallm/storage/memory.py:1066-1100`, `liminallm/storage/memory.py:1552-1578`, `liminallm/storage/memory.py:1689-1720` + +**Issue:** `decode_artifact_cursor`/`decode_time_id_cursor` produced aware timestamps while stored records used naive UTC values. Comparing aware cursors to naive `created_at` raised `TypeError`, which was swallowed and skipped cursor filtering, causing pagination to repeat the first page for artifacts, contexts, and chunks. + +**Fix:** Cursor timestamps and stored timestamps are normalized to naive UTC before comparison so keyset pagination applies reliably without exceptions. + +### 80.18 ~~MEDIUM: Chunk Search Only Examines First Page~~ FIXED + +**Location:** `liminallm/storage/memory.py:1724-1739`, `liminallm/storage/memory.py:1769-1795` + +**Issue:** `search_chunks` and `search_chunks_pgvector` called `list_chunks` without a limit, inheriting the default `page_size=100`. Searches considered only the first 100 chunks per context, missing results in larger contexts. + +**Fix:** Searches now request a large chunk window so all available chunks in the allowed contexts are considered during ranking. + +### 80.19 ~~MEDIUM: Refresh Token Rate Limits Not Admin-Configurable~~ FIXED + +**Location:** `liminallm/api/routes.py:362-407`, `liminallm/api/routes.py:3130-3240` + +**Issue:** The new `refresh_rate_limit_per_minute` and `refresh_rate_limit_window_seconds` defaults were missing from the admin settings allowlist and integer validation set, causing API updates for those fields to be rejected. + +**Fix:** Added both refresh rate limit fields to the allowed and integer-validated settings so administrators can configure them via the API. + +### 80.20 ~~HIGH: Refresh Rate Limit Bypass via Fake Tenant IDs~~ FIXED + +**Location:** `liminallm/api/routes.py:1114-1134` + +**Issue:** The refresh rate limit key combined client IP with the user-supplied tenant hint before validating it. Attackers could rotate fake tenant IDs to obtain separate buckets and bypass throttling. + +**Fix:** The refresh rate limit now keys solely on client IP, avoiding unvalidated tenant hints in the bucket namespace. + diff --git a/liminallm/api/routes.py b/liminallm/api/routes.py index c84fc6e..8704af1 100644 --- a/liminallm/api/routes.py +++ b/liminallm/api/routes.py @@ -378,6 +378,8 @@ def _get_system_settings(runtime) -> dict: "chat_rate_limit_per_minute": 60, "chat_rate_limit_window_seconds": 60, "login_rate_limit_per_minute": 10, + "refresh_rate_limit_per_minute": 20, + "refresh_rate_limit_window_seconds": 60, "signup_rate_limit_per_minute": 5, "reset_rate_limit_per_minute": 5, "mfa_rate_limit_per_minute": 5, @@ -1109,6 +1111,7 @@ async def oauth_callback( async def refresh_tokens( body: TokenRefreshRequest, response: Response, + request: Request, authorization: Optional[str] = Header(None), x_tenant_id: Optional[str] = Header( None, convert_underscores=False, alias="X-Tenant-ID" @@ -1116,6 +1119,13 @@ async def refresh_tokens( ): runtime = get_runtime() tenant_hint = body.tenant_id or x_tenant_id + client_ip = request.client.host if request.client else "unknown" + await _enforce_rate_limit( + runtime, + f"refresh:{client_ip}", + _get_rate_limit(runtime, "refresh_rate_limit_per_minute"), + _get_rate_limit(runtime, "refresh_rate_limit_window_seconds"), + ) user, session, tokens = await runtime.auth.refresh_tokens( body.refresh_token, tenant_hint=tenant_hint ) @@ -3133,6 +3143,8 @@ async def update_system_settings( "chat_rate_limit_per_minute", "chat_rate_limit_window_seconds", "login_rate_limit_per_minute", + "refresh_rate_limit_per_minute", + "refresh_rate_limit_window_seconds", "signup_rate_limit_per_minute", "reset_rate_limit_per_minute", "mfa_rate_limit_per_minute", @@ -3191,6 +3203,8 @@ async def update_system_settings( "chat_rate_limit_per_minute", "chat_rate_limit_window_seconds", "login_rate_limit_per_minute", + "refresh_rate_limit_per_minute", + "refresh_rate_limit_window_seconds", "signup_rate_limit_per_minute", "reset_rate_limit_per_minute", "mfa_rate_limit_per_minute", diff --git a/liminallm/app.py b/liminallm/app.py index 9d77263..c76bf44 100644 --- a/liminallm/app.py +++ b/liminallm/app.py @@ -15,13 +15,16 @@ from liminallm.api.error_handling import register_exception_handlers from liminallm.api.routes import get_admin_user, router +from liminallm.config import Settings from liminallm.logging import get_logger, set_correlation_id logger = get_logger(__name__) +_settings = Settings.from_env() + # Version info per SPEC §18 __version__ = "0.1.0" -__build__ = os.getenv("BUILD_SHA", "dev") +__build__ = _settings.build_sha _cleanup_task: asyncio.Task | None = None @@ -77,9 +80,8 @@ async def lifespan(app: FastAPI): def _allowed_origins() -> List[str]: - env_value = os.getenv("CORS_ALLOW_ORIGINS") - if env_value: - return [origin.strip() for origin in env_value.split(",") if origin.strip()] + if _settings.cors_allow_origins: + return _settings.cors_allow_origins # Default to common local dev hosts; avoid wildcard when credentials are enabled. return [ "http://localhost", @@ -91,10 +93,7 @@ def _allowed_origins() -> List[str]: def _allow_credentials() -> bool: - flag = os.getenv("CORS_ALLOW_CREDENTIALS") - if flag is None: - return False - return flag.lower() in {"1", "true", "yes", "on"} + return _settings.cors_allow_credentials app.add_middleware( @@ -199,12 +198,7 @@ async def add_security_headers(request, call_next): response.headers.setdefault( "Permissions-Policy", "camera=(), microphone=(), geolocation=(), payment=()" ) - if request.url.scheme == "https" and os.getenv("ENABLE_HSTS", "false").lower() in { - "1", - "true", - "yes", - "on", - }: + if request.url.scheme == "https" and _settings.enable_hsts: response.headers.setdefault( "Strict-Transport-Security", "max-age=63072000; includeSubDomains" ) @@ -432,6 +426,38 @@ async def metrics() -> Response: lines.append('# TYPE liminallm_database_healthy gauge') lines.append(f'liminallm_database_healthy {db_healthy}') + # Training job activity + list_jobs = getattr(runtime.store, "list_training_jobs", None) + if callable(list_jobs): + try: + jobs = list_jobs() + active = len([j for j in jobs if j.status in {"queued", "running"}]) + lines.append('# HELP liminallm_training_jobs_active Active training jobs') + lines.append('# TYPE liminallm_training_jobs_active gauge') + lines.append(f'liminallm_training_jobs_active {active}') + except Exception as exc: + logger.warning("metrics_training_jobs_failed", error=str(exc)) + + # Preference event ingestion rate proxy + if hasattr(runtime.store, "list_preference_events"): + try: + events = runtime.store.list_preference_events(user_id=None) # type: ignore[arg-type] + lines.append('# HELP liminallm_preference_events_total Total recorded preference events') + lines.append('# TYPE liminallm_preference_events_total counter') + lines.append(f'liminallm_preference_events_total {len(events)}') + except Exception as exc: + logger.warning("metrics_preference_events_failed", error=str(exc)) + + # Adapter usage counts + if hasattr(runtime.store, "list_artifacts"): + try: + adapters = runtime.store.list_artifacts(kind="adapter", owner_user_id=None) # type: ignore[arg-type] + lines.append('# HELP liminallm_adapters_total Adapters stored in system') + lines.append('# TYPE liminallm_adapters_total gauge') + lines.append(f'liminallm_adapters_total {len(adapters)}') + except Exception as exc: + logger.warning("metrics_adapters_failed", error=str(exc)) + except Exception as exc: logger.error("metrics_collection_failed", error=str(exc)) diff --git a/liminallm/config.py b/liminallm/config.py index 66ab217..1664eba 100644 --- a/liminallm/config.py +++ b/liminallm/config.py @@ -10,7 +10,7 @@ from typing import Any from dotenv import dotenv_values -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from liminallm.logging import get_logger @@ -338,6 +338,24 @@ class Settings(BaseModel): "ALLOW_SIGNUP", description="Allow new user signups (overridable via admin UI)", ) + log_level: str = env_field("INFO", "LOG_LEVEL") + log_json: bool = env_field(True, "LOG_JSON") + log_dev_mode: bool = env_field(False, "LOG_DEV_MODE") + build_sha: str = env_field("dev", "BUILD_SHA") + cors_allow_origins: list[str] = env_field( + [ + "http://localhost", + "http://localhost:3000", + "http://localhost:5173", + "http://127.0.0.1:3000", + "http://127.0.0.1:5173", + ], + "CORS_ALLOW_ORIGINS", + description="Comma-separated CORS origins (overridable via admin UI)", + ) + cors_allow_credentials: bool = env_field(False, "CORS_ALLOW_CREDENTIALS") + enable_hsts: bool = env_field(False, "ENABLE_HSTS") + mfa_secret_key: str | None = env_field(None, "MFA_SECRET_KEY") use_memory_store: bool = env_field(False, "USE_MEMORY_STORE") allow_redis_fallback_dev: bool = env_field(False, "ALLOW_REDIS_FALLBACK_DEV") test_mode: bool = env_field( @@ -459,9 +477,58 @@ class Settings(BaseModel): "TRAINING_WORKER_POLL_INTERVAL", description="Training worker poll interval in seconds (overridable via admin UI)", ) + max_active_training_jobs: int = env_field( + 10, + "MAX_ACTIVE_TRAINING_JOBS", + description="Global cap on simultaneously active training jobs", + ) model_config = ConfigDict(extra="ignore") + @field_validator("cors_allow_origins", mode="before") + @classmethod + def _parse_cors_origins(cls, value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, str): + return [v.strip() for v in value.split(",") if v.strip()] + if isinstance(value, list): + return value + return [] + + @field_validator( + "smtp_port", "training_worker_poll_interval", "tmp_cleanup_interval_seconds", "tmp_max_age_hours", "max_active_training_jobs" + ) + @classmethod + def _validate_positive_int(cls, value: int) -> int: + if value <= 0: + raise ValueError("must be positive") + return value + + @field_validator("smtp_port") + @classmethod + def _validate_smtp_port(cls, value: int) -> int: + if not 1 <= value <= 65535: + raise ValueError("smtp_port must be between 1 and 65535") + return value + + @field_validator("log_level") + @classmethod + def _normalize_log_level(cls, value: str) -> str: + return (value or "INFO").upper() + + @model_validator(mode="after") + def _validate_required_pairs(self): + if self.oauth_google_client_id and not self.oauth_google_client_secret: + raise ValueError("oauth_google_client_secret required when client_id is set") + if self.oauth_github_client_id and not self.oauth_github_client_secret: + raise ValueError("oauth_github_client_secret required when client_id is set") + if self.oauth_microsoft_client_id and not self.oauth_microsoft_client_secret: + raise ValueError("oauth_microsoft_client_secret required when client_id is set") + if (self.smtp_host or self.smtp_user) and not self.smtp_password: + raise ValueError("smtp_password required when smtp_host or smtp_user is set") + return self + @classmethod def from_env(cls) -> "Settings": env_file_values = dotenv_values(".env") diff --git a/liminallm/service/auth.py b/liminallm/service/auth.py index 9c94c50..8cced21 100644 --- a/liminallm/service/auth.py +++ b/liminallm/service/auth.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import contextlib import hashlib import hmac import json @@ -130,6 +131,7 @@ def __init__( # Issue 28.4: Thread-safe lock for mutable state dictionaries # Protects all in-memory fallback state from concurrent access import threading + self._state_lock = threading.Lock() self._mfa_challenges: dict[str, tuple[str, datetime]] = {} # Issue 11.1: In-memory fallback for MFA lockout when Redis unavailable @@ -153,6 +155,13 @@ def _now(self) -> datetime: return datetime.now(timezone.utc) + @contextlib.contextmanager + def _with_state_lock(self): + """Context manager for thread-safe state dictionary access (Issue 28.4).""" + + with self._state_lock: + yield + def cleanup_expired_states(self) -> int: """Clean up expired OAuth states, MFA challenges, and email verification tokens. @@ -406,7 +415,8 @@ async def start_oauth( expires_at = self._now() + timedelta(minutes=10) # Issue 28.4: Thread-safe state mutation with self._state_lock: - self._oauth_states[state] = (provider, expires_at, tenant_id) + with self._with_state_lock(): + self._oauth_states[state] = (provider, expires_at, tenant_id) if self.cache: await self.cache.set_oauth_state(state, provider, expires_at, tenant_id) @@ -644,14 +654,17 @@ async def complete_oauth( # Issue 28.4: Thread-safe state mutation with self._state_lock: if stored is None: - stored = self._oauth_states.pop(state, None) + with self._with_state_lock(): + stored = self._oauth_states.pop(state, None) else: - self._oauth_states.pop(state, None) + with self._with_state_lock(): + self._oauth_states.pop(state, None) now = self._now() async def _clear_oauth_state() -> None: with self._state_lock: - self._oauth_states.pop(state, None) + with self._with_state_lock(): + self._oauth_states.pop(state, None) if self.cache and not cache_state_used: await self.cache.pop_oauth_state(state) @@ -1201,7 +1214,8 @@ async def initiate_password_reset(self, email: str) -> str: else: # Issue 11.2: In-memory fallback for password reset tokens with self._state_lock: - self._password_reset_tokens[token] = (email, expires_at) + with self._with_state_lock(): + self._password_reset_tokens[token] = (email, expires_at) self.logger.info( "password_reset_requested", email_hash=hashlib.sha256(email.encode()).hexdigest(), @@ -1215,15 +1229,18 @@ async def complete_password_reset(self, token: str, new_password: str) -> bool: else: # Issue 11.2: In-memory fallback for password reset tokens with self._state_lock: - stored = self._password_reset_tokens.get(token) + with self._with_state_lock(): + stored = self._password_reset_tokens.get(token) if stored: stored_email, expires_at = stored if expires_at <= self._now() - self._clock_skew_leeway: # Remove expired token to prevent memory leak - self._password_reset_tokens.pop(token, None) + with self._with_state_lock(): + self._password_reset_tokens.pop(token, None) else: email = stored_email - self._password_reset_tokens.pop(token, None) + with self._with_state_lock(): + self._password_reset_tokens.pop(token, None) if not email: self.logger.warning("password_reset_invalid_token", token_prefix=token[:8]) return False diff --git a/liminallm/service/clustering.py b/liminallm/service/clustering.py index ba62835..16e5415 100644 --- a/liminallm/service/clustering.py +++ b/liminallm/service/clustering.py @@ -406,6 +406,7 @@ def promote_skill_adapters( if ratio < positive_ratio: continue owner_id = cluster.user_id + visibility = "private" if owner_id else "global" schema = { "kind": "adapter.lora", "scope": "per-user" if cluster.user_id else "global", @@ -431,6 +432,7 @@ def promote_skill_adapters( schema=schema, description=cluster.description or "Cluster skill adapter", owner_user_id=owner_id, + visibility=visibility, ) if self.training and cluster.user_id: self.training.ensure_user_adapter( diff --git a/liminallm/service/config_ops.py b/liminallm/service/config_ops.py index 067bf25..a8995c1 100644 --- a/liminallm/service/config_ops.py +++ b/liminallm/service/config_ops.py @@ -111,24 +111,33 @@ def apply_patch( "artifact missing", detail={"artifact_id": patch.artifact_id} ) - # Step 1: Apply patch to artifact + # Step 1: Apply patch to artifact (pure function) new_schema = self._apply_patch_to_schema(artifact.schema, patch.patch) - updated = self.store.update_artifact( - artifact.id, new_schema, artifact.description - ) - # Step 2: Mark patch as applied + # Step 2: Persist schema and mark patch applied atomically when supported applied_patch = None status_update_failed = False try: - applied_patch = self.store.update_config_patch_status( - patch_id, - "applied", - meta={"applied_by": approver_user_id} if approver_user_id else None, - mark_applied=True, - ) + if hasattr(self.store, "apply_config_patch"): + updated, applied_patch = self.store.apply_config_patch( # type: ignore[attr-defined] + patch, + new_schema, + artifact_description=artifact.description, + approver_user_id=approver_user_id, + ) + else: + updated = self.store.update_artifact( + artifact.id, new_schema, artifact.description + ) + applied_patch = self.store.update_config_patch_status( + patch_id, + "applied", + meta={"applied_by": approver_user_id} + if approver_user_id + else None, + mark_applied=True, + ) except Exception as exc: - # Log the error but don't fail - artifact is already updated status_update_failed = True logger.error( "config_patch_status_update_failed", @@ -137,6 +146,8 @@ def apply_patch( error=str(exc), message="Artifact was updated but patch status could not be marked as applied", ) + if not applied_patch: + applied_patch = self.store.get_config_patch(patch_id) result = {"artifact": updated, "patch": applied_patch or patch} if status_update_failed: diff --git a/liminallm/service/rag.py b/liminallm/service/rag.py index efa7203..d5798dc 100644 --- a/liminallm/service/rag.py +++ b/liminallm/service/rag.py @@ -211,10 +211,12 @@ def _allowed_context_ids( if not ctx: filtered_reasons[ctx_id] = "not_found" continue + visibility = (ctx.meta or {}).get("visibility") if ctx.meta else None if user_id and ctx.owner_user_id != user_id: - filtered_reasons[ctx_id] = "owner_mismatch" - continue - if tenant_id: + if visibility not in {"shared", "global"}: + filtered_reasons[ctx_id] = "owner_mismatch" + continue + if tenant_id and visibility == "shared": owner = ( users.get(ctx.owner_user_id) if isinstance(users, dict) @@ -230,10 +232,14 @@ def _allowed_context_ids( if not context: filtered_reasons[ctx_id] = "not_found" continue + visibility = ( + (context.meta or {}).get("visibility") if context.meta else None + ) if user_id and context.owner_user_id != user_id: - filtered_reasons[ctx_id] = "owner_mismatch" - continue - if tenant_id: + if visibility not in {"shared", "global"}: + filtered_reasons[ctx_id] = "owner_mismatch" + continue + if tenant_id and visibility == "shared": owner = getattr(self.store, "get_user", lambda *_: None)( context.owner_user_id ) diff --git a/liminallm/service/runtime.py b/liminallm/service/runtime.py index c06961c..695cd11 100644 --- a/liminallm/service/runtime.py +++ b/liminallm/service/runtime.py @@ -75,7 +75,10 @@ def __init__(self): try: self.store = ( - MemoryStore(fs_root=self.settings.shared_fs_root) + MemoryStore( + fs_root=self.settings.shared_fs_root, + mfa_encryption_key=self.settings.mfa_secret_key, + ) if self.settings.use_memory_store else PostgresStore( self.settings.database_url, fs_root=self.settings.shared_fs_root @@ -197,6 +200,7 @@ def __init__(self): runtime_base_model=resolved_base_model, default_adapter_mode=default_adapter_mode, backend_mode=backend_mode, + max_active_training_jobs=self.settings.max_active_training_jobs, ) self.clusterer = SemanticClusterer(self.store, self.llm, self.training) self.workflow = WorkflowEngine( diff --git a/liminallm/service/training.py b/liminallm/service/training.py index dcce40b..1033bdd 100644 --- a/liminallm/service/training.py +++ b/liminallm/service/training.py @@ -46,6 +46,7 @@ def __init__( runtime_base_model: Optional[str] = None, default_adapter_mode: str = AdapterMode.HYBRID, backend_mode: Optional[str] = None, + max_active_training_jobs: int = 10, ) -> None: self.store = store self.fs_root = Path(fs_root) @@ -60,6 +61,7 @@ def __init__( self.default_adapter_mode = default_adapter_mode self.backend_mode = backend_mode self._compatible_modes = get_compatible_adapter_modes(backend_mode or "openai") + self.max_active_training_jobs = max_active_training_jobs def _safe_int(self, value: object, default: int, *, context: str) -> int: """Coerce values to int with fallback to avoid ValueError crashes (Issue 39.3).""" @@ -455,6 +457,22 @@ def _should_enqueue_training_job(self, user_id: str) -> bool: for job in recent_jobs: if job.status in active_statuses: return False + global_jobs: List = [] + list_all = getattr(self.store, "list_training_jobs", None) + if callable(list_all): + try: + global_jobs = list_all(status=None) + except TypeError: + global_jobs = list_all() + if global_jobs: + active_global = [j for j in global_jobs if j.status in active_statuses] + if len(active_global) >= self.max_active_training_jobs: + logger.info( + "training_job_global_limit_reached", + active=len(active_global), + max_allowed=self.max_active_training_jobs, + ) + return False if not recent_jobs: return True most_recent = recent_jobs[0] diff --git a/liminallm/service/training_worker.py b/liminallm/service/training_worker.py index 5418625..e171f65 100644 --- a/liminallm/service/training_worker.py +++ b/liminallm/service/training_worker.py @@ -196,15 +196,22 @@ async def _maybe_recommend_adapter_pruning(self) -> None: logger.warning("adapter_prune_state_fetch_failed", error=str(exc)) return + def _is_auto_prune_patch(patch: ConfigPatchAudit) -> bool: + if getattr(patch, "status", "pending") != "pending": + return False + ops = [] + if isinstance(getattr(patch, "patch", None), dict): + ops = patch.patch.get("ops") or [] + for op in ops: + if isinstance(op, dict) and op.get("path") == "/meta/auto_prune": + return True + return False + existing_targets: set[str] = set() if callable(list_patches): try: for patch in list_patches(): - if ( - isinstance(patch.meta, dict) - and patch.meta.get("auto_prune") - and getattr(patch, "status", "pending") == "pending" - ): + if _is_auto_prune_patch(patch): existing_targets.add(patch.artifact_id) except Exception as exc: # pragma: no cover - defensive logger.debug("adapter_prune_patch_scan_failed", error=str(exc)) diff --git a/liminallm/service/workflow.py b/liminallm/service/workflow.py index 2194acb..24654ab 100644 --- a/liminallm/service/workflow.py +++ b/liminallm/service/workflow.py @@ -22,7 +22,7 @@ from jsonschema.exceptions import SchemaError from liminallm.config import Settings -from liminallm.logging import get_logger +from liminallm.logging import get_logger, log_routing_trace, log_workflow_trace from liminallm.service.embeddings import ( EMBEDDING_DIM, cosine_similarity, @@ -997,6 +997,11 @@ async def run_streaming( if not content: content = "No response generated." + # Emit structured traces for observability (Issue 30.x) + log_workflow_trace(workflow_trace, logger=self.logger) + if routing_trace: + log_routing_trace(routing_trace, logger=self.logger) + # Emit final message_done with complete response yield { "event": "message_done", @@ -1200,6 +1205,7 @@ async def _execute_node_with_retry( ) try: + start_ms = time.monotonic() * 1000 node_timeout_ms = node.get("timeout_ms", DEFAULT_NODE_TIMEOUT_MS) result, next_nodes = await asyncio.wait_for( self._execute_node( @@ -1216,6 +1222,8 @@ async def _execute_node_with_retry( timeout=node_timeout_ms / 1000.0, ) + result["latency_ms"] = (time.monotonic() * 1000) - start_ms + # If node executed successfully or has an on_error handler, return if result.get("status") != "error" or node.get("on_error"): if attempt > 0: @@ -1228,17 +1236,20 @@ async def _execute_node_with_retry( ) except asyncio.TimeoutError: + timeout_latency = (time.monotonic() * 1000) - start_ms last_error = asyncio.TimeoutError("node_timeout") self.logger.warning( "workflow_node_timeout", node=node_id, attempt=attempt + 1, timeout_ms=node_timeout_ms, + latency_ms=timeout_latency, ) result = { "status": "error", "error": "node_timeout", "timeout_ms": node_timeout_ms, + "latency_ms": timeout_latency, } next_nodes = [] diff --git a/liminallm/storage/cursors.py b/liminallm/storage/cursors.py index 5b3729d..15eebe7 100644 --- a/liminallm/storage/cursors.py +++ b/liminallm/storage/cursors.py @@ -7,7 +7,11 @@ def encode_time_id_cursor(created_at: datetime, identifier: str) -> str: """Encode a cursor combining a timestamp and identifier for keyset paging.""" - ts = created_at if created_at.tzinfo else created_at.replace(tzinfo=timezone.utc) + ts = ( + created_at.astimezone(timezone.utc) + if created_at.tzinfo + else created_at.replace(tzinfo=timezone.utc) + ) return f"{ts.isoformat()}|{identifier}" @@ -18,9 +22,12 @@ def decode_time_id_cursor(cursor: str) -> Tuple[datetime, str]: if len(parts) != 2: raise ValueError("invalid artifact cursor") ts = datetime.fromisoformat(parts[0]) - if ts.tzinfo is None: - ts = ts.replace(tzinfo=timezone.utc) - return ts, parts[1] + ts = ( + ts.astimezone(timezone.utc) + if ts.tzinfo is not None + else ts.replace(tzinfo=timezone.utc) + ) + return ts.replace(tzinfo=None), parts[1] def encode_index_cursor(index: int, identifier: str) -> str: diff --git a/liminallm/storage/memory.py b/liminallm/storage/memory.py index e1a843c..535ba41 100644 --- a/liminallm/storage/memory.py +++ b/liminallm/storage/memory.py @@ -8,7 +8,7 @@ import shutil import threading import uuid -from datetime import datetime +from datetime import datetime, timezone from ipaddress import ip_address from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence @@ -16,6 +16,7 @@ from cryptography.fernet import Fernet, InvalidToken from liminallm.content_struct import normalize_content_struct +from liminallm.errors import NotFoundError from liminallm.logging import get_logger from liminallm.service.artifact_validation import ( ArtifactValidationError, @@ -72,6 +73,14 @@ ) +def _to_naive_utc(dt: Optional[datetime]) -> Optional[datetime]: + if not dt: + return dt + if dt.tzinfo: + return dt.astimezone(timezone.utc).replace(tzinfo=None) + return dt + + class MemoryStore: """Minimal in-memory backing store for the initial prototype.""" @@ -263,6 +272,9 @@ def create_user( meta=normalized_meta, ) self.users[user_id] = user + # Issue 21.4: seed settings at creation to keep state consistent + if user_id not in self.user_settings: + self.user_settings[user_id] = UserSettings(user_id=user_id) self._persist_state() return user @@ -586,6 +598,23 @@ def get_conversation( return None return conv + def delete_conversation( + self, conversation_id: str, *, user_id: Optional[str] = None + ) -> bool: + """Remove a conversation and its messages in a single lock.""" + + with self._data_lock: + conv = self.conversations.get(conversation_id) + if not conv: + return False + if user_id and conv.user_id != user_id: + return False + + self.conversations.pop(conversation_id, None) + self.messages.pop(conversation_id, None) + self._persist_state() + return True + def append_message( self, conversation_id: str, @@ -1135,17 +1164,20 @@ def owner_in_tenant(a: Artifact, tid: str) -> bool: capped_page_size = min(requested_page_size, max_page_size) limit = capped_page_size + (1 if include_sentinel else 0) - artifacts.sort(key=lambda a: a.created_at, reverse=True) + artifacts.sort( + key=lambda a: _to_naive_utc(a.created_at) or datetime.min, reverse=True + ) if cursor: try: cursor_ts, cursor_id = decode_artifact_cursor(cursor) + cursor_ts = _to_naive_utc(cursor_ts) or datetime.min artifacts = [ a for a in artifacts if ( - (a.created_at or datetime.min) < cursor_ts + (_to_naive_utc(a.created_at) or datetime.min) < cursor_ts or ( - (a.created_at or datetime.min) == cursor_ts + (_to_naive_utc(a.created_at) or datetime.min) == cursor_ts and a.id < cursor_id ) ) @@ -1168,6 +1200,7 @@ def create_artifact( schema: dict, description: str = "", owner_user_id: Optional[str] = None, + visibility: str = "private", *, version_author: Optional[str] = None, change_note: Optional[str] = None, @@ -1192,6 +1225,7 @@ def create_artifact( schema=schema, description=normalized_description, owner_user_id=owner_user_id, + visibility=visibility, fs_path=fs_path, base_model=schema.get("base_model"), ) @@ -1351,6 +1385,60 @@ def update_config_patch_status( self._persist_state() return patch + def apply_config_patch( + self, + patch: ConfigPatchAudit, + new_schema: dict, + *, + artifact_description: Optional[str] = None, + approver_user_id: Optional[str] = None, + ) -> tuple[Artifact, ConfigPatchAudit]: + """Apply a config patch and mark it applied under a single lock.""" + + with self._data_lock: + artifact = self.artifacts.get(patch.artifact_id) + if not artifact: + raise NotFoundError("artifact missing", detail={"artifact_id": patch.artifact_id}) + + versions = self.artifact_versions.get(patch.artifact_id, []) + next_version = (versions[0].version if versions else 0) + 1 + updated_artifact = Artifact( + id=artifact.id, + type=artifact.type, + name=artifact.name, + description=artifact_description or artifact.description or "", + schema=new_schema, + owner_user_id=artifact.owner_user_id, + visibility=artifact.visibility, + fs_path=artifact.fs_path, + base_model=new_schema.get("base_model", artifact.base_model), + ) + version = ArtifactVersion( + id=str(uuid.uuid4()), + artifact_id=artifact.id, + version=next_version, + schema=new_schema, + fs_path=artifact.fs_path, + base_model=updated_artifact.base_model, + created_by=approver_user_id or patch.proposer or "system_llm", + change_note=patch.justification, + ) + self.artifacts[artifact.id] = updated_artifact + self.artifact_versions.setdefault(artifact.id, []).insert(0, version) + + merged_meta: Dict[str, Any] = {} + if patch.meta and isinstance(patch.meta, dict): + merged_meta.update(patch.meta) + if approver_user_id: + merged_meta["applied_by"] = approver_user_id + + patch.status = "applied" + patch.applied_at = datetime.utcnow() + patch.meta = merged_meta + self.config_patches[patch.id] = patch + self._persist_state() + return updated_artifact, patch + def get_runtime_config(self) -> dict: """Return the runtime configuration persisted for the web admin UI.""" @@ -1556,18 +1644,21 @@ def list_contexts( contexts = [ ctx for ctx in self.contexts.values() if ctx.owner_user_id == owner_user_id ] - contexts.sort(key=lambda c: c.created_at, reverse=True) + contexts.sort( + key=lambda c: _to_naive_utc(c.created_at) or datetime.min, reverse=True + ) if cursor: try: cursor_ts, cursor_id = decode_time_id_cursor(cursor) + cursor_ts = _to_naive_utc(cursor_ts) or datetime.min contexts = [ ctx for ctx in contexts if ( - (ctx.created_at or datetime.min) < cursor_ts + (_to_naive_utc(ctx.created_at) or datetime.min) < cursor_ts or ( - (ctx.created_at or datetime.min) == cursor_ts + (_to_naive_utc(ctx.created_at) or datetime.min) == cursor_ts and ctx.id < cursor_id ) ) @@ -1690,17 +1781,24 @@ def list_chunks( ctx = self.contexts.get(vals[0].context_id) if ctx and ctx.owner_user_id == owner_user_id: chunks.extend(vals) - chunks.sort(key=lambda ch: (ch.created_at, ch.id or 0), reverse=True) + chunks.sort( + key=lambda ch: ( + _to_naive_utc(ch.created_at) or datetime.min, + ch.id or 0, + ), + reverse=True, + ) if cursor: try: cursor_ts, cursor_id = decode_time_id_cursor(cursor) + cursor_ts = _to_naive_utc(cursor_ts) or datetime.min chunks = [ ch for ch in chunks if ( - (ch.created_at or datetime.min) < cursor_ts + (_to_naive_utc(ch.created_at) or datetime.min) < cursor_ts or ( - (ch.created_at or datetime.min) == cursor_ts + (_to_naive_utc(ch.created_at) or datetime.min) == cursor_ts and str(ch.id or "") < cursor_id ) ) @@ -1720,7 +1818,7 @@ def search_chunks( limit: int = 4, ) -> List[KnowledgeChunk]: """Hybrid BM25 + semantic search using common implementation.""" - candidates = self.list_chunks(context_id) + candidates = self.list_chunks(context_id, limit=10000) if not candidates: return [] @@ -1765,7 +1863,7 @@ def search_chunks_pgvector( owner = self.users.get(ctx.owner_user_id) if not owner or owner.tenant_id != tenant_id: continue - allowed_chunks.extend(self.list_chunks(ctx_id)) + allowed_chunks.extend(self.list_chunks(ctx_id, limit=10000)) if not allowed_chunks: return [] diff --git a/liminallm/storage/postgres.py b/liminallm/storage/postgres.py index a3f93f2..35edc48 100644 --- a/liminallm/storage/postgres.py +++ b/liminallm/storage/postgres.py @@ -5,7 +5,7 @@ import threading import time import uuid -from datetime import datetime +from datetime import datetime, timezone from ipaddress import ip_address from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence @@ -44,6 +44,7 @@ decode_index_cursor, decode_time_id_cursor, ) +from liminallm.errors import NotFoundError from liminallm.storage.errors import ConstraintViolation from liminallm.storage.models import ( AdapterRouterState, @@ -650,7 +651,7 @@ def upsert_semantic_cluster( meta: dict | None = None, ) -> SemanticCluster: cid = cluster_id or str(uuid.uuid4()) - now = datetime.utcnow() + now = datetime.now(timezone.utc) existing = self.get_semantic_cluster(cid) created_at = existing.created_at if existing else now normalized_label = normalize_optional_text( @@ -1073,7 +1074,7 @@ def create_user( normalized_meta.setdefault("email_verified", False) normalized_handle = normalize_optional_text(handle) try: - with self._connect() as conn: + with self._connect() as conn, conn.transaction(): conn.execute( """ INSERT INTO app_user (id, email, handle, tenant_id, role, plan_tier, is_active, meta) @@ -1090,6 +1091,14 @@ def create_user( json.dumps(normalized_meta) if normalized_meta else None, ), ) + conn.execute( + """ + INSERT INTO user_settings (user_id, locale, timezone, default_voice, default_style, flags) + VALUES (%s, NULL, NULL, NULL, NULL, NULL) + ON CONFLICT (user_id) DO NOTHING + """, + (user_id,), + ) except errors.UniqueViolation: raise ConstraintViolation("email already exists", {"field": "email"}) return User( @@ -1714,6 +1723,29 @@ def _row_value(key: str, default: Optional[Any] = None) -> Optional[Any]: meta=raw_meta, ) + def delete_conversation( + self, conversation_id: str, *, user_id: Optional[str] = None + ) -> bool: + """Delete a conversation and its messages atomically.""" + + with self._connect() as conn, conn.transaction(): + params: list[Any] = [conversation_id] + where_clause = "id = %s" + if user_id: + where_clause += " AND user_id = %s" + params.append(user_id) + + deleted = conn.execute( + f"DELETE FROM conversation WHERE {where_clause} RETURNING id", tuple(params) + ).fetchone() + if not deleted: + return False + + conn.execute( + "DELETE FROM message WHERE conversation_id = %s", (conversation_id,) + ) + return True + def append_message( self, conversation_id: str, @@ -2024,6 +2056,7 @@ def create_artifact( schema: dict, description: str = "", owner_user_id: Optional[str] = None, + visibility: str = "private", *, version_author: Optional[str] = None, change_note: Optional[str] = None, @@ -2039,7 +2072,7 @@ def create_artifact( try: with self._connect() as conn, conn.transaction(): conn.execute( - "INSERT INTO artifact (id, owner_user_id, type, name, description, schema, fs_path, base_model) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", + "INSERT INTO artifact (id, owner_user_id, type, name, description, schema, fs_path, base_model, visibility) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", ( artifact_id, owner_user_id, @@ -2049,6 +2082,7 @@ def create_artifact( json.dumps(schema), fs_path, schema.get("base_model"), + visibility, ), ) conn.execute( @@ -2074,6 +2108,7 @@ def create_artifact( description=normalized_description or "", schema=schema, owner_user_id=owner_user_id, + visibility=visibility, fs_path=fs_path, base_model=schema.get("base_model"), ) @@ -2489,7 +2524,7 @@ def update_config_patch_status( ).fetchone() if not existing: return None - now = datetime.utcnow() + now = datetime.now(timezone.utc) existing_meta = existing.get("meta") or {} if isinstance(existing_meta, str): try: @@ -2525,6 +2560,85 @@ def update_config_patch_status( ).fetchone() return self._config_patch_from_row(row) if row else None + def apply_config_patch( + self, + patch: ConfigPatchAudit, + new_schema: dict, + *, + artifact_description: Optional[str] = None, + approver_user_id: Optional[str] = None, + ) -> tuple[Artifact, ConfigPatchAudit]: + """Atomically persist a config patch application and mark it applied.""" + + with self._connect() as conn, conn.transaction(): + artifact_row = conn.execute( + "SELECT * FROM artifact WHERE id = %s FOR UPDATE", (patch.artifact_id,) + ).fetchone() + if not artifact_row: + raise NotFoundError("artifact missing", detail={"artifact_id": patch.artifact_id}) + + versions = conn.execute( + "SELECT COALESCE(MAX(version), 0) AS v FROM artifact_version WHERE artifact_id = %s", + (patch.artifact_id,), + ).fetchone() + next_version = (versions["v"] or 0) + 1 + fs_path = self._persist_payload(patch.artifact_id, next_version, new_schema) + base_model = new_schema.get("base_model") or artifact_row.get("base_model") + + conn.execute( + "UPDATE artifact SET schema = %s, description = COALESCE(%s, description), updated_at = now(), fs_path = %s, base_model = %s WHERE id = %s", + (json.dumps(new_schema), artifact_description, fs_path, base_model, patch.artifact_id), + ) + conn.execute( + "INSERT INTO artifact_version (artifact_id, version, schema, fs_path, base_model, created_by, change_note) VALUES (%s, %s, %s, %s, %s, %s, %s)", + ( + patch.artifact_id, + next_version, + json.dumps(new_schema), + fs_path, + base_model, + approver_user_id or patch.proposer, + patch.justification, + ), + ) + + merged_meta: Dict[str, Any] = {} + if patch.meta: + if isinstance(patch.meta, dict): + merged_meta.update(patch.meta) + else: + try: + parsed = json.loads(patch.meta) + if isinstance(parsed, dict): + merged_meta.update(parsed) + except Exception: + merged_meta = {} + if approver_user_id: + merged_meta["applied_by"] = approver_user_id + + conn.execute( + "UPDATE config_patch SET status = %s, applied_at = now(), meta = %s WHERE id = %s", + ("applied", json.dumps(merged_meta) if merged_meta else json.dumps({}), patch.id), + ) + refreshed = conn.execute( + "SELECT * FROM config_patch WHERE id = %s", (patch.id,) + ).fetchone() + + updated_artifact = Artifact( + id=str(artifact_row["id"]), + type=artifact_row["type"], + name=artifact_row["name"], + description=artifact_description or artifact_row.get("description") or "", + schema=new_schema, + owner_user_id=( + str(artifact_row["owner_user_id"]) if artifact_row.get("owner_user_id") else None + ), + visibility=artifact_row.get("visibility", "private"), + fs_path=fs_path, + base_model=base_model, + ) + return updated_artifact, self._config_patch_from_row(refreshed) + def _config_patch_from_row(self, row) -> ConfigPatchAudit: raw_patch = row.get("patch") if isinstance(row, dict) else row["patch"] patch_data = (