-
Notifications
You must be signed in to change notification settings - Fork 0
Rate Limiting and Performance
PlanetScope-py implements sophisticated rate limiting and performance optimization to ensure reliable API interactions while maximizing throughput. The system includes intelligent rate limiting, circuit breaker patterns, and advanced retry mechanisms.
The RateLimiter manages API request rates across different Planet API endpoints with automatic detection and enforcement.
from planetscope_py.rate_limiter import RateLimiter
# Initialize with custom rates
rates = {
"search": 10, # 10 requests per minute for search
"activate": 5, # 5 requests per minute for activation
"download": 15, # 15 requests per minute for downloads
"general": 10 # 10 requests per minute for other endpoints
}
rate_limiter = RateLimiter(rates=rates)Advanced session with circuit breaker pattern and intelligent retry logic.
from planetscope_py.rate_limiter import RetryableSession
# Create session with circuit breaker
session = RetryableSession(
rate_limiter=rate_limiter,
circuit_breaker_config={
"failure_threshold": 5, # Open after 5 consecutive failures
"recovery_timeout": 60 # Try recovery after 60 seconds
}
)Prevents cascading failures by temporarily disabling failing operations.
from planetscope_py.rate_limiter import CircuitBreaker
circuit_breaker = CircuitBreaker(
failure_threshold=5, # Failures before opening
recovery_timeout=60, # Seconds before retry attempt
expected_exception=APIError # Exception type to handle
)The system automatically:
- Detects endpoint types from URLs
- Enforces appropriate rate limits per endpoint
- Monitors request patterns and adjusts timing
- Handles Planet API rate limit headers
- Provides intelligent backoff during rate limiting
from planetscope_py import PlanetScopeQuery
# Rate limiting is automatic - no configuration needed
query = PlanetScopeQuery()
# These calls are automatically rate-limited
results1 = query.search_scenes(geometry1, "2024-01-01", "2024-01-31")
results2 = query.search_scenes(geometry2, "2024-01-01", "2024-01-31")
results3 = query.search_scenes(geometry3, "2024-01-01", "2024-01-31")
# System ensures compliance with Planet API limitsThe system classifies endpoints automatically:
| Endpoint Pattern | Classification | Default Rate Limit |
|---|---|---|
/quick-search, /searches
|
search |
10 req/min |
/stats |
search |
10 req/min |
/activate |
activate |
5 req/min |
/download, /assets/.../location
|
download |
15 req/min |
| All others | general |
10 req/min |
# View endpoint classification
rate_limiter = query.rate_limiter
url = "https://api.planet.com/data/v1/quick-search"
endpoint_type = rate_limiter._classify_endpoint(url)
print(f"Endpoint type: {endpoint_type}") # Output: search# Get current rate limit status
status = rate_limiter.get_current_rate_status()
print("Rate Limit Status:")
for endpoint, info in status.items():
print(f" {endpoint}:")
print(f" Limit: {info['limit']} req/min")
print(f" Current: {info['current_rate']:.1f} req/min")
print(f" Capacity: {info['capacity_used']:.1%}")
# Get performance metrics
metrics = rate_limiter.get_performance_metrics()
print(f"\nTotal requests: {metrics['total_requests']}")
print(f"Average response time: {metrics['average_response_time']:.2f}s")
# Endpoint-specific metrics
for endpoint, stats in metrics['endpoint_metrics'].items():
print(f"\n{endpoint} endpoint:")
print(f" Requests: {stats['request_count']}")
print(f" Avg time: {stats['average_response_time']:.2f}s")
print(f" Min time: {stats['min_response_time']:.2f}s")
print(f" Max time: {stats['max_response_time']:.2f}s")# Check if endpoint has capacity
rate_limiter.wait_for_capacity(
endpoint_type="search",
required_capacity=0.8 # Wait until 80% capacity available
)
# Make request only when capacity allows
if rate_limiter.can_make_request("search"):
response = rate_limiter.make_request("POST", search_url, json=payload)
else:
print("Rate limit capacity exceeded, waiting...")from planetscope_py.rate_limiter import CircuitBreaker
from planetscope_py.exceptions import APIError
# Circuit breaker automatically prevents cascading failures
def risky_api_operation():
# This might fail
response = requests.get("https://api.planet.com/unstable-endpoint")
if response.status_code >= 500:
raise APIError("Server error")
return response
# Circuit breaker protects the system
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
try:
result = circuit_breaker.call(risky_api_operation)
print("Operation succeeded")
except APIError as e:
if "Circuit breaker is OPEN" in str(e):
print("Circuit breaker prevented call - system is recovering")
else:
print(f"Operation failed: {e}")- CLOSED: Normal operation, requests pass through
- OPEN: Failures detected, requests fail fast
- HALF_OPEN: Testing if service recovered
# Monitor circuit breaker state
cb = session.circuit_breaker
print(f"Circuit breaker state: {cb.state}")
print(f"Failure count: {cb.failure_count}")
print(f"Last failure: {cb.last_failure_time}")
# Reset circuit breaker manually if needed
cb.reset()# Retry configuration (automatically applied)
retry_config = {
"max_retries": 3,
"backoff_factor": 2.0,
"retry_statuses": [408, 429, 500, 502, 503, 504],
"retry_methods": ["GET", "POST", "PUT", "DELETE"]
}
# System automatically retries with exponential backoff + jitter
# Retry delays: ~1s, ~2s, ~4s (with random jitter)from planetscope_py.rate_limiter import RetryableSession
# Configure custom retry behavior
session = RetryableSession(
circuit_breaker_config={
"failure_threshold": 10, # More tolerant
"recovery_timeout": 120 # Longer recovery time
}
)
# Advanced request with custom timeout
response = session.request(
"POST",
"https://api.planet.com/data/v1/quick-search",
json=search_payload,
timeout=(5, 30) # 5s connect, 30s read timeout
)The system automatically processes Planet API rate limit headers:
# System reads these headers automatically:
# X-RateLimit-Limit: 5000
# X-RateLimit-Remaining: 4999
# X-RateLimit-Reset: 1609459200
# Retry-After: 60
# And adjusts behavior accordingly# Update rate limits based on API responses or documentation
rate_limiter.update_rate_limit("search", 15) # Increase search limit
rate_limiter.update_rate_limit("download", 20) # Increase download limit
# Reset rate limiting state
rate_limiter.reset_rate_limiting_state()from planetscope_py import PlanetScopeQuery
import time
query = PlanetScopeQuery()
# Process multiple geometries efficiently
geometries = [geom1, geom2, geom3, geom4, geom5]
# Method 1: Sequential with automatic rate limiting
sequential_results = []
start_time = time.time()
for geom in geometries:
result = query.search_scenes(geom, "2024-01-01", "2024-01-31")
sequential_results.append(result)
print(f"Sequential: {time.time() - start_time:.1f}s")
# Method 2: Optimized batch processing
start_time = time.time()
batch_results = query.batch_search(
geometries=geometries,
start_date="2024-01-01",
end_date="2024-01-31"
)
print(f"Batch: {time.time() - start_time:.1f}s")# Process 100+ geometries efficiently
large_geometry_list = [...] # 100+ geometries
# Process in smaller batches to manage rate limits
batch_size = 10
all_results = []
for i in range(0, len(large_geometry_list), batch_size):
batch = large_geometry_list[i:i+batch_size]
batch_results = query.batch_search(
geometries=batch,
start_date="2024-01-01",
end_date="2024-01-31"
)
all_results.extend(batch_results)
print(f"Processed batch {i//batch_size + 1}: "
f"{len(batch_results)} results")
# Brief pause between batches (optional)
time.sleep(1)- Use appropriate date ranges: Shorter ranges = faster responses
- Filter effectively: Use cloud_cover_max and other filters
- Batch similar requests: Group geometries when possible
- Cache results: Store results for repeated analysis
- Monitor rate limits: Use status methods to track usage
# For large result sets, process incrementally
def process_large_search_results(results):
"""Process large search results efficiently."""
scenes = results['features']
# Process in chunks to manage memory
chunk_size = 50
for i in range(0, len(scenes), chunk_size):
chunk = scenes[i:i+chunk_size]
# Process chunk
for scene in chunk:
metadata = processor.extract_scene_metadata(scene)
# Do something with metadata
# Optional: Clear processed chunk from memory
del chunkfrom planetscope_py.exceptions import RateLimitError, APIError
def robust_search_with_recovery(query, geometry, start_date, end_date):
"""Search with comprehensive error recovery."""
max_attempts = 3
for attempt in range(max_attempts):
try:
return query.search_scenes(geometry, start_date, end_date)
except RateLimitError as e:
print(f"Rate limited, waiting {e.retry_after}s...")
time.sleep(e.retry_after)
continue
except APIError as e:
if e.status_code >= 500: # Server error
print(f"Server error (attempt {attempt + 1}), retrying...")
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise # Client error, don't retry
raise APIError("Failed after all retry attempts")# Check current status
status = rate_limiter.get_current_rate_status()
overloaded_endpoints = [
endpoint for endpoint, info in status.items()
if info['capacity_used'] > 0.9
]
print(f"Overloaded endpoints: {overloaded_endpoints}")
# Wait for capacity
for endpoint in overloaded_endpoints:
rate_limiter.wait_for_capacity(endpoint, required_capacity=0.5)# Check circuit breaker status
if session.circuit_breaker.state == CircuitBreaker.OPEN:
print("Circuit breaker is open, waiting for recovery...")
time.sleep(session.circuit_breaker.recovery_timeout)
session.circuit_breaker.reset()# Analyze performance metrics
metrics = rate_limiter.get_performance_metrics()
slow_endpoints = [
endpoint for endpoint, stats in metrics['endpoint_metrics'].items()
if stats['average_response_time'] > 5.0 # > 5 seconds
]
print(f"Slow endpoints: {slow_endpoints}")
# Consider reducing batch sizes for slow endpointsimport logging
# Enable debug logging for rate limiter
logging.getLogger('planetscope_py.rate_limiter').setLevel(logging.DEBUG)
# This will show:
# - Rate limit calculations
# - Retry attempts
# - Circuit breaker state changes
# - Performance metrics# Process 500 regions efficiently
regions = load_regions() # 500 geometries
# Configure for high volume
query = PlanetScopeQuery()
rate_limiter = query.rate_limiter
# Monitor progress
processed = 0
errors = 0
for region_batch in batch_iterator(regions, batch_size=20):
try:
# Check capacity before processing batch
rate_limiter.wait_for_capacity("search", required_capacity=0.8)
batch_results = query.batch_search(
geometries=region_batch,
start_date="2024-01-01",
end_date="2024-12-31"
)
processed += len(region_batch)
print(f"Processed: {processed}/{len(regions)}")
except Exception as e:
errors += 1
print(f"Batch failed: {e}")
if errors > 5: # Too many errors
print("Too many errors, stopping...")
break
print(f"Completed: {processed} regions, {errors} errors")import time
def monitor_api_performance():
"""Monitor API performance and alert on issues."""
query = PlanetScopeQuery()
rate_limiter = query.rate_limiter
while True:
metrics = rate_limiter.get_performance_metrics()
status = rate_limiter.get_current_rate_status()
# Check for performance issues
avg_response_time = metrics.get('average_response_time', 0)
if avg_response_time > 10.0:
print(f"ALERT: High response time: {avg_response_time:.1f}s")
# Check for rate limit issues
for endpoint, info in status.items():
if info['capacity_used'] > 0.95:
print(f"ALERT: {endpoint} endpoint near rate limit")
# Check circuit breaker
cb = query.session.circuit_breaker
if cb.state != CircuitBreaker.CLOSED:
print(f"ALERT: Circuit breaker {cb.state}")
time.sleep(60) # Check every minute
# Run monitoring in background
# monitor_api_performance()The Rate Limiting & Performance system ensures reliable, efficient API interactions while maximizing throughput and preventing service disruptions.
Getting Started
Core Features
- Scene Discovery
- Metadata Analysis
- Spatial Density Analysis and Visualization
- Rate Limiting & Performance
Advanced Analysis
Integration Workflows
Examples & Tutorials
API Reference
Support