This guide covers security best practices for using Moltres in production environments.
❌ Bad:
db = connect("postgresql://user:password@host/dbname")✅ Good:
import os
dsn = os.getenv("DATABASE_URL")
db = connect(dsn)Store database credentials in environment variables:
# .env file (never commit to version control)
DATABASE_URL=postgresql://user:password@host:5432/dbnameimport os
from dotenv import load_dotenv
load_dotenv() # Load from .env file
db = connect(os.getenv("DATABASE_URL"))For production, use secret management services:
AWS Secrets Manager:
import boto3
import json
def get_database_dsn():
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='prod/database')
secret = json.loads(response['SecretString'])
return secret['dsn']
db = connect(get_database_dsn())HashiCorp Vault:
import hvac
def get_database_dsn():
client = hvac.Client(url='https://vault.example.com')
secret = client.secrets.kv.v2.read_secret_version(path='database/prod')
return secret['data']['data']['dsn']
db = connect(get_database_dsn())Azure Key Vault:
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
def get_database_dsn():
credential = DefaultAzureCredential()
client = SecretClient(vault_url="https://vault.vault.azure.net/", credential=credential)
return client.get_secret("database-dsn").value
db = connect(get_database_dsn())- Use SSL/TLS: Always use encrypted connections in production:
dsn = "postgresql://user:pass@host/dbname?sslmode=require"- Sanitize Logs: Never log full DSNs (they contain credentials):
# ❌ Bad
logger.info(f"Connecting to {dsn}")
# ✅ Good
logger.info(f"Connecting to {dsn.split('@')[-1] if '@' in dsn else 'database'}")-
Rotate Credentials: Regularly rotate database passwords and update secrets.
-
Use Read-Only Connections: When possible, use read-only database users for queries:
# Read-only user
read_dsn = os.getenv("DATABASE_READ_ONLY_URL")
read_db = connect(read_dsn)
# Read-write user (only for mutations)
write_dsn = os.getenv("DATABASE_WRITE_URL")
write_db = connect(write_dsn)Moltres automatically prevents SQL injection through:
- Parameterized Queries: All user input is parameterized
- Identifier Validation: Table and column names are validated
- Type Safety: Type checking prevents injection vectors
✅ Safe - Parameterized Queries:
# User input is automatically parameterized
user_id = request.args.get('user_id')
df = db.table("users").select().where(col("id") == user_id)✅ Safe - Raw SQL with Parameters:
user_id = request.args.get('user_id')
df = db.sql("SELECT * FROM users WHERE id = :user_id", user_id=user_id)❌ Unsafe - String Concatenation (Don't Do This):
# NEVER do this - vulnerable to SQL injection
user_id = request.args.get('user_id')
df = db.sql(f"SELECT * FROM users WHERE id = {user_id}") # DANGEROUS!- Principle of Least Privilege: Grant only necessary permissions:
-- Read-only user GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly_user; -- Write user (only for specific tables) GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE orders TO write_user;
2. **Row-Level Security**: Use database row-level security policies:
```sql
-- PostgreSQL example
CREATE POLICY user_isolation ON users
FOR ALL
USING (user_id = current_user);
- Schema Isolation: Use separate schemas for different applications:
# Application-specific schema
dsn = "postgresql://user:pass@host/dbname?options=-csearch_path=app_schema"
db = connect(dsn)- Validate Input: Always validate user input before database operations:
def validate_user_id(user_id: str) -> int:
try:
uid = int(user_id)
if uid <= 0:
raise ValueError("User ID must be positive")
return uid
except ValueError:
raise ValueError("Invalid user ID format")
user_id = validate_user_id(request.args.get('user_id'))
df = db.table("users").select().where(col("id") == user_id)- Rate Limiting: Implement rate limiting for database operations:
from functools import wraps
import time
def rate_limit(calls_per_second: float):
min_interval = 1.0 / calls_per_second
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
@rate_limit(10.0) # Max 10 calls per second
def query_users():
return db.table("users").select().collect()Enable audit logging for sensitive operations:
import logging
audit_logger = logging.getLogger("audit")
def audit_query(sql: str, user: str, params: dict = None):
"""Log query for audit purposes."""
audit_logger.info(
"Query executed",
extra={
"sql": sql[:500], # Truncate long queries
"user": user,
"params": params,
}
)
# Use performance hooks for audit logging
from moltres import register_performance_hook
def audit_hook(sql: str, elapsed: float, metadata: dict):
audit_query(sql, current_user(), metadata.get("params"))
register_performance_hook("query_end", audit_hook)Keep dependencies up to date:
# Check for outdated packages
pip list --outdated
# Update dependencies
pip install --upgrade moltresUse tools to scan for known vulnerabilities:
# Using safety
pip install safety
safety check
# Using pip-audit
pip install pip-audit
pip-auditPin critical dependencies in production:
# pyproject.toml
dependencies = [
"SQLAlchemy>=2.0,<3.0", # Pin to major version
"typing-extensions>=4.5,<5.0",
]Always use SSL/TLS for remote database connections:
# PostgreSQL
dsn = "postgresql://user:pass@host/dbname?sslmode=require"
# MySQL
dsn = "mysql://user:pass@host/dbname?ssl=true"
# SQL Server
dsn = "mssql+pyodbc://user:pass@host/dbname?driver=ODBC+Driver+17+for+SQL+Server&Encrypt=yes"Restrict database access with firewall rules:
- Database Firewall: Only allow connections from application servers
- Application Firewall: Use WAF (Web Application Firewall) for web applications
- Network Segmentation: Isolate database servers in private networks
- Encryption at Rest: Ensure database encryption is enabled
- Encryption in Transit: Always use SSL/TLS connections
- Data Masking: Mask sensitive data in logs and error messages:
def mask_sensitive_data(data: dict) -> dict:
"""Mask sensitive fields in data."""
sensitive_fields = ["password", "ssn", "credit_card"]
masked = data.copy()
for field in sensitive_fields:
if field in masked:
masked[field] = "***"
return masked- Retention Policies: Implement data retention policies
- Secure Deletion: Use secure deletion for sensitive data
- Backup Security: Encrypt database backups
For GDPR compliance:
- Right to Access: Provide APIs to export user data
- Right to Deletion: Implement secure data deletion
- Data Minimization: Only collect necessary data
- Consent Management: Track user consent for data processing
For HIPAA compliance:
- Access Controls: Implement strict access controls
- Audit Logs: Maintain comprehensive audit logs
- Encryption: Encrypt PHI (Protected Health Information) at rest and in transit
- Business Associate Agreements: Ensure BAAs with service providers
- Detection: Monitor for suspicious activity
- Containment: Isolate affected systems
- Investigation: Analyze logs and determine scope
- Remediation: Fix vulnerabilities and restore systems
- Communication: Notify affected parties if required
Log all security-relevant events:
security_logger = logging.getLogger("security")
def log_security_event(event_type: str, details: dict):
"""Log security event."""
security_logger.warning(
f"Security event: {event_type}",
extra=details
)
# Example: Log failed authentication
log_security_event("auth_failure", {
"user": username,
"ip": request.remote_addr,
"timestamp": time.time(),
})- ✅ Never hardcode credentials
- ✅ Use environment variables or secret management
- ✅ Always use SSL/TLS for remote connections
- ✅ Implement least privilege access control
- ✅ Validate and sanitize all user input
- ✅ Enable audit logging for sensitive operations
- ✅ Keep dependencies up to date
- ✅ Scan for vulnerabilities regularly
- ✅ Encrypt sensitive data at rest and in transit
- ✅ Implement proper error handling (don't expose internals)
If you discover a security vulnerability, please report it responsibly:
- Do not open a public GitHub issue
- Email security concerns to: [security contact email]
- Include:
- Description of the vulnerability
- Steps to reproduce
- Potential impact
- Suggested fix (if any)
We will respond within 48 hours and work with you to resolve the issue.