-
Notifications
You must be signed in to change notification settings - Fork 0
Complete Analysis Workflows
End-to-end workflows combining all PlanetScope-py capabilities
This guide demonstrates complete analysis workflows that integrate scene discovery, spatial analysis, temporal analysis, asset management, and data export into comprehensive research and operational workflows.
Complete workflows in PlanetScope-py combine all major components:
- Scene Discovery: Finding and filtering PlanetScope scenes
- Spatial Analysis: Density calculations and coverage assessment
- Temporal Analysis: Pattern detection and gap analysis
- Asset Management: Quota monitoring and downloads
- Data Export: Professional GeoPackage creation with imagery
from planetscope_py import (
PlanetScopeQuery, SpatialDensityEngine, TemporalAnalyzer,
AssetManager, GeoPackageManager, TemporalConfig, TemporalResolution
)
from shapely.geometry import box
import asyncio
async def standard_research_workflow(roi, date_range, output_dir="analysis_results"):
"""
Standard research workflow for environmental monitoring.
Args:
roi: Region of interest (Shapely geometry)
date_range: Tuple of (start_date, end_date) strings
output_dir: Directory for outputs
"""
print("🚀 Starting Standard Research Workflow")
print("=" * 50)
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Step 1: Scene Discovery
print("\n📡 Step 1: Scene Discovery")
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
sun_elevation_min=30,
item_types=["PSScene"]
)
scenes = results['features']
print(f" Found {len(scenes)} scenes")
if len(scenes) == 0:
print(" ❌ No scenes found - try expanding date range or ROI")
return None
# Step 2: Spatial Analysis
print("\n📊 Step 2: Spatial Density Analysis")
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scene_footprints=scenes,
roi_geometry=roi
)
print(f" Method used: {spatial_result.method_used.value}")
print(f" Grid size: {spatial_result.grid_info['width']}×{spatial_result.grid_info['height']}")
print(f" Density range: {spatial_result.stats['min']}-{spatial_result.stats['max']} scenes/cell")
# Export spatial analysis
spatial_output = output_path / "spatial_density.tif"
spatial_result.export_geotiff(str(spatial_output))
print(f" Exported: {spatial_output}")
# Step 3: Temporal Analysis
print("\n⏱️ Step 3: Temporal Pattern Analysis")
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
spatial_resolution=1000.0, # 1km for efficiency
enable_gap_analysis=True
)
temporal_analyzer = TemporalAnalyzer(temporal_config)
# Create data cube
datacube = temporal_analyzer.create_spatiotemporal_datacube(
scenes=scenes,
roi=roi,
start_date=datetime.fromisoformat(date_range[0]),
end_date=datetime.fromisoformat(date_range[1])
)
# Analyze patterns
patterns = temporal_analyzer.analyze_acquisition_patterns(datacube)
gaps = temporal_analyzer.detect_temporal_gaps(datacube)
recommendations = temporal_analyzer.generate_acquisition_recommendations(datacube)
print(f" Seasonal patterns: {len(patterns['seasonal_patterns'])}")
print(f" Temporal gaps: {len(gaps)}")
print(f" Data availability: {patterns['frequency_stats']['data_availability_percentage']:.1%}")
# Step 4: Asset Management (with user confirmation)
print("\n💾 Step 4: Asset Management")
asset_manager = AssetManager(query.auth)
# Check quota
quota_info = await asset_manager.get_quota_info()
print(f" Current quota usage: {quota_info.usage_percentage:.1%}")
print(f" Remaining area: {quota_info.remaining_area_km2:.1f} km²")
# Download high-quality subset
high_quality_scenes = [
scene for scene in scenes
if scene['properties'].get('cloud_cover', 1.0) < 0.15
and scene['properties'].get('sun_elevation', 0) > 40
][:20] # Limit to 20 best scenes
downloads = None
if high_quality_scenes and quota_info.remaining_area_km2 > 100:
print(f" Downloading {len(high_quality_scenes)} high-quality scenes...")
downloads = await asset_manager.activate_and_download_assets(
scenes=high_quality_scenes,
asset_types=["ortho_analytic_4b"],
output_dir=str(output_path / "imagery"),
clip_to_roi=roi,
confirm_download=True
)
successful_downloads = [d for d in downloads if d.status.value == "completed"]
print(f" Downloaded: {len(successful_downloads)}/{len(downloads)} assets")
else:
print(" Skipping downloads (insufficient quota or no suitable scenes)")
# Step 5: Comprehensive Data Export
print("\n📦 Step 5: Data Export")
# Configure GeoPackage with imagery if available
geopackage_config = GeoPackageConfig(
include_imagery=downloads is not None,
clip_to_roi=True,
attribute_schema="comprehensive"
)
geopackage_manager = GeoPackageManager(config=geopackage_config)
# Get downloaded file paths
downloaded_files = []
if downloads:
downloaded_files = [
d.downloaded_file_path for d in downloads
if hasattr(d, 'downloaded_file_path') and d.downloaded_file_path
]
# Create comprehensive GeoPackage
geopackage_path = output_path / "complete_analysis.gpkg"
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path=str(geopackage_path),
roi=roi,
downloaded_files=downloaded_files if downloaded_files else None
)
print(f" Created GeoPackage: {geopackage_path}")
print(f" Vector features: {layer_info.feature_count}")
if downloaded_files:
print(f" Raster layers: {len(downloaded_files)} imagery files")
# Generate summary report
report = {
'workflow': 'standard_research',
'completion_time': datetime.now().isoformat(),
'roi_area_km2': calculate_area_km2(roi),
'date_range': date_range,
'results': {
'total_scenes': len(scenes),
'high_quality_scenes': len(high_quality_scenes),
'downloaded_assets': len(downloaded_files) if downloaded_files else 0,
'spatial_analysis': {
'method': spatial_result.method_used.value,
'density_stats': spatial_result.stats
},
'temporal_analysis': {
'seasonal_patterns': len(patterns['seasonal_patterns']),
'temporal_gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
},
'quota_usage': {
'remaining_km2': quota_info.remaining_area_km2,
'usage_percentage': quota_info.usage_percentage
}
},
'outputs': {
'geopackage': str(geopackage_path),
'spatial_density': str(spatial_output),
'imagery_directory': str(output_path / "imagery") if downloads else None
}
}
# Save report
report_path = output_path / "workflow_report.json"
with open(report_path, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\n✅ Workflow Complete!")
print(f" Report saved: {report_path}")
print(f" Total processing time: {datetime.now() - workflow_start}")
return report
# Example usage
roi = box(9.04, 45.40, 9.28, 45.52) # Milan, Italy
date_range = ("2024-01-01", "2024-06-30")
# Run workflow
# workflow_start = datetime.now()
# result = await standard_research_workflow(roi, date_range)async def environmental_monitoring_workflow(roi, monitoring_period_months=12):
"""
Specialized workflow for environmental monitoring with temporal emphasis.
Focuses on seasonal patterns, change detection, and long-term trends.
"""
print("🌍 Environmental Monitoring Workflow")
print("=" * 40)
# Calculate date range for monitoring period
end_date = datetime.now()
start_date = end_date - timedelta(days=monitoring_period_months * 30)
# Step 1: Extended Scene Discovery
query = PlanetScopeQuery()
# Search with relaxed cloud cover for temporal completeness
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=0.5, # More permissive for temporal coverage
item_types=["PSScene"]
)
scenes = results['features']
print(f"📡 Found {len(scenes)} scenes over {monitoring_period_months} months")
# Step 2: Temporal-Focused Analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.MONTHLY,
spatial_resolution=500.0,
enable_gap_analysis=True,
gap_threshold_days=45 # Longer gap threshold for monitoring
)
temporal_analyzer = TemporalAnalyzer(temporal_config)
# Create fine-scale temporal analysis
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
seasonal_analysis = temporal_analyzer.analyze_seasonal_patterns(datacube)
quality_trends = temporal_analyzer.analyze_quality_trends(datacube)
coverage_gaps = temporal_analyzer.detect_temporal_gaps(datacube)
print(f"⏱️ Temporal Analysis:")
print(f" Quality trend: {quality_trends['trend_direction']}")
print(f" Coverage gaps: {len(coverage_gaps)} periods")
print(f" Best season: {quality_trends['best_period']}")
# Step 3: Change Detection Preparation
# Identify scenes for change detection (seasonal comparisons)
seasonal_scenes = {}
for season, data in seasonal_analysis.items():
# Get best scenes from each season
season_scenes = [
scene for scene in scenes
if get_season_from_scene(scene) == season
and scene['properties'].get('cloud_cover', 1.0) < 0.2
]
# Sort by quality and take best
season_scenes.sort(key=lambda s: s['properties'].get('cloud_cover', 1.0))
seasonal_scenes[season] = season_scenes[:5] # Top 5 per season
# Step 4: Strategic Downloads for Monitoring
asset_manager = AssetManager(query.auth)
# Download representative scenes from each season
monitoring_scenes = []
for season, season_scenes in seasonal_scenes.items():
monitoring_scenes.extend(season_scenes[:2]) # 2 best per season
if monitoring_scenes:
print(f"💾 Downloading {len(monitoring_scenes)} monitoring scenes")
downloads = await asset_manager.activate_and_download_assets(
scenes=monitoring_scenes,
asset_types=["ortho_analytic_4b", "ortho_analytic_sr"], # Include surface reflectance
clip_to_roi=roi
)
# Step 5: Monitoring-Specific Export
config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive"
)
geopackage_manager = GeoPackageManager(config=config)
# Add monitoring-specific metadata
for scene in scenes:
props = scene['properties']
props['monitoring_season'] = get_season_from_scene(scene)
props['monitoring_quality'] = assess_monitoring_quality(scene)
props['change_detection_suitable'] = (
props.get('cloud_cover', 1.0) < 0.15 and
props.get('sun_elevation', 0) > 35
)
# Create monitoring GeoPackage
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path="environmental_monitoring.gpkg",
roi=roi,
downloaded_files=[d.downloaded_file_path for d in downloads if downloads]
)
# Generate monitoring report
monitoring_report = {
'monitoring_type': 'environmental',
'period_months': monitoring_period_months,
'temporal_analysis': {
'seasonal_patterns': seasonal_analysis,
'quality_trends': quality_trends,
'coverage_gaps': len(coverage_gaps)
},
'change_detection_candidates': {
season: len(scenes) for season, scenes in seasonal_scenes.items()
},
'recommendations': generate_monitoring_recommendations(
seasonal_analysis, quality_trends, coverage_gaps
)
}
return monitoring_report
def get_season_from_scene(scene):
"""Extract season from scene acquisition date."""
acquired = scene['properties']['acquired']
date_obj = datetime.fromisoformat(acquired.replace('Z', '+00:00'))
month = date_obj.month
if month in [12, 1, 2]:
return 'winter'
elif month in [3, 4, 5]:
return 'spring'
elif month in [6, 7, 8]:
return 'summer'
else:
return 'autumn'
def assess_monitoring_quality(scene):
"""Assess scene quality for monitoring purposes."""
props = scene['properties']
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
if cloud_cover < 0.05 and sun_elevation > 50:
return 'excellent'
elif cloud_cover < 0.15 and sun_elevation > 35:
return 'good'
elif cloud_cover < 0.3:
return 'fair'
else:
return 'poor'async def operational_monitoring_workflow(roi, alert_thresholds=None):
"""
High-frequency operational monitoring with alerting capabilities.
Designed for rapid response and operational decision making.
"""
print("🚨 Operational Monitoring Workflow")
print("=" * 35)
alert_thresholds = alert_thresholds or {
'cloud_cover_max': 0.2,
'min_scenes_per_week': 3,
'max_gap_days': 7
}
# Step 1: Recent Scene Discovery (last 30 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=alert_thresholds['cloud_cover_max'],
item_types=["PSScene"]
)
recent_scenes = results['features']
print(f"📡 Recent scenes (30 days): {len(recent_scenes)}")
# Step 2: Operational Alerts
alerts = []
# Check scene frequency
scenes_per_week = len(recent_scenes) / 4.3 # 30 days ≈ 4.3 weeks
if scenes_per_week < alert_thresholds['min_scenes_per_week']:
alerts.append({
'type': 'LOW_FREQUENCY',
'severity': 'HIGH',
'message': f"Only {scenes_per_week:.1f} scenes/week (threshold: {alert_thresholds['min_scenes_per_week']})"
})
# Check recent gaps
if recent_scenes:
# Sort by date
recent_scenes.sort(key=lambda s: s['properties']['acquired'])
# Check for gaps
last_acquisition = None
for scene in recent_scenes:
current_date = datetime.fromisoformat(scene['properties']['acquired'].replace('Z', '+00:00'))
if last_acquisition:
gap_days = (current_date - last_acquisition).days
if gap_days > alert_thresholds['max_gap_days']:
alerts.append({
'type': 'TEMPORAL_GAP',
'severity': 'MEDIUM',
'message': f"Gap of {gap_days} days detected",
'gap_start': last_acquisition.isoformat(),
'gap_end': current_date.isoformat()
})
last_acquisition = current_date
# Check data quality
if recent_scenes:
avg_cloud_cover = sum(s['properties'].get('cloud_cover', 1.0) for s in recent_scenes) / len(recent_scenes)
if avg_cloud_cover > 0.3:
alerts.append({
'type': 'POOR_QUALITY',
'severity': 'MEDIUM',
'message': f"High average cloud cover: {avg_cloud_cover:.1%}"
})
# Print alerts
if alerts:
print(f"🚨 {len(alerts)} alerts detected:")
for alert in alerts:
print(f" [{alert['severity']}] {alert['type']}: {alert['message']}")
else:
print("✅ No alerts - system operating normally")
# Step 3: Immediate Download of Critical Scenes
if recent_scenes:
# Get best scenes from last 7 days
week_ago = end_date - timedelta(days=7)
critical_scenes = [
scene for scene in recent_scenes
if datetime.fromisoformat(scene['properties']['acquired'].replace('Z', '+00:00')) > week_ago
and scene['properties'].get('cloud_cover', 1.0) < 0.1
]
if critical_scenes:
print(f"💾 Downloading {len(critical_scenes)} critical scenes")
asset_manager = AssetManager(query.auth)
# Fast download without confirmation for operational use
downloads = await asset_manager.activate_and_download_assets(
scenes=critical_scenes,
asset_types=["ortho_analytic_4b"],
confirm_download=False, # Operational mode - no confirmation
max_concurrent=10 # Faster download
)
print(f" Downloaded: {len([d for d in downloads if d.status.value == 'completed'])} assets")
# Step 4: Operational Report Generation
operational_status = {
'timestamp': datetime.now().isoformat(),
'monitoring_period_days': 30,
'operational_status': 'NORMAL' if not alerts else 'ALERT',
'alerts': alerts,
'metrics': {
'total_recent_scenes': len(recent_scenes),
'scenes_per_week': scenes_per_week,
'avg_cloud_cover': avg_cloud_cover if recent_scenes else None,
'last_acquisition': recent_scenes[-1]['properties']['acquired'] if recent_scenes else None
},
'recommendations': generate_operational_recommendations(alerts, recent_scenes)
}
# Save operational report
with open('operational_status.json', 'w') as f:
json.dump(operational_status, f, indent=2, default=str)
print(f"📊 Operational status: {operational_status['operational_status']}")
return operational_status
def generate_operational_recommendations(alerts, recent_scenes):
"""Generate operational recommendations based on alerts."""
recommendations = []
for alert in alerts:
if alert['type'] == 'LOW_FREQUENCY':
recommendations.append("Consider expanding search criteria or increasing monitoring frequency")
elif alert['type'] == 'TEMPORAL_GAP':
recommendations.append("Review recent weather conditions and satellite tasking priorities")
elif alert['type'] == 'POOR_QUALITY':
recommendations.append("Wait for better weather conditions or consider alternative data sources")
if not alerts and recent_scenes:
recommendations.append("System operating normally - maintain current monitoring parameters")
return recommendationsasync def research_publication_workflow(roi, study_period, research_questions):
"""
Comprehensive workflow for research publications with full documentation.
Includes reproducibility metadata, quality assessments, and publication-ready outputs.
"""
print("📚 Research Publication Workflow")
print("=" * 35)
# Step 1: Comprehensive Scene Discovery
query = PlanetScopeQuery()
# Multiple search strategies for completeness
search_strategies = [
{'cloud_cover_max': 0.1, 'label': 'high_quality'},
{'cloud_cover_max': 0.3, 'label': 'standard_quality'},
{'cloud_cover_max': 0.6, 'label': 'all_available'}
]
all_scenes = {}
for strategy in search_strategies:
results = query.search_scenes(
geometry=roi,
start_date=study_period[0],
end_date=study_period[1],
cloud_cover_max=strategy['cloud_cover_max'],
item_types=["PSScene"]
)
all_scenes[strategy['label']] = results['features']
print(f"📡 {strategy['label']}: {len(results['features'])} scenes")
# Use highest quality dataset
primary_scenes = all_scenes['high_quality']
if len(primary_scenes) < 50: # If insufficient high-quality scenes
primary_scenes = all_scenes['standard_quality']
print("⚠️ Using standard quality scenes due to insufficient high-quality data")
# Step 2: Comprehensive Spatial Analysis
print("\n📊 Comprehensive Spatial Analysis")
# Test multiple methods for comparison
spatial_engine = SpatialDensityEngine()
# High-resolution analysis
high_res_result = spatial_engine.calculate_density(
scene_footprints=primary_scenes,
roi_geometry=roi,
config=DensityConfig(resolution=30.0) # High resolution
)
# Export for publication
high_res_result.export_geotiff("publication_density_30m.tif")
# Multiple resolution analysis for scale sensitivity
resolutions = [100, 500, 1000, 2000]
scale_analysis = {}
for resolution in resolutions:
result = spatial_engine.calculate_density(
scene_footprints=primary_scenes,
roi_geometry=roi,
config=DensityConfig(resolution=float(resolution))
)
scale_analysis[resolution] = {
'method': result.method_used.value,
'stats': result.stats,
'processing_time': result.processing_time_seconds
}
print(f" Scale analysis: {len(resolutions)} resolutions tested")
# Step 3: Advanced Temporal Analysis
print("\n⏱️ Advanced Temporal Analysis")
# Multiple temporal resolutions
temporal_results = {}
temporal_resolutions = [TemporalResolution.WEEKLY, TemporalResolution.MONTHLY]
for temp_res in temporal_resolutions:
config = TemporalConfig(
temporal_resolution=temp_res,
spatial_resolution=500.0,
enable_gap_analysis=True
)
analyzer = TemporalAnalyzer(config)
datacube = analyzer.create_spatiotemporal_datacube(primary_scenes, roi)
patterns = analyzer.analyze_acquisition_patterns(datacube)
gaps = analyzer.detect_temporal_gaps(datacube)
temporal_results[temp_res.value] = {
'patterns': patterns,
'gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
}
print(f" Temporal analysis: {len(temporal_resolutions)} resolutions")
# Step 4: Statistical Analysis
print("\n📈 Statistical Analysis")
# Quality statistics
quality_stats = analyze_scene_quality_statistics(primary_scenes)
# Temporal coverage statistics
temporal_stats = analyze_temporal_coverage_statistics(primary_scenes, study_period)
# Spatial coverage statistics
spatial_stats = analyze_spatial_coverage_statistics(primary_scenes, roi)
print(f" Quality assessment: {quality_stats['excellent_scenes']} excellent scenes")
print(f" Temporal coverage: {temporal_stats['coverage_percentage']:.1%}")
print(f" Spatial coverage: {spatial_stats['roi_coverage_percentage']:.1%}")
# Step 5: Data Download for Analysis
print("\n💾 Research Data Download")
# Strategic scene selection for research
research_scenes = select_research_scenes(
primary_scenes,
research_questions,
max_scenes=100 # Limit for research budget
)
asset_manager = AssetManager(query.auth)
# Download with multiple asset types for research
downloads = await asset_manager.activate_and_download_assets(
scenes=research_scenes,
asset_types=["ortho_analytic_4b", "ortho_analytic_sr", "ortho_udm2"],
clip_to_roi=roi
)
successful_downloads = [d for d in downloads if d.status.value == "completed"]
print(f" Downloaded: {len(successful_downloads)} research assets")
# Step 6: Publication-Ready Export
print("\n📦 Publication Data Export")
# Create comprehensive research GeoPackage
research_config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive",
clip_to_roi=True
)
geopackage_manager = GeoPackageManager(config=research_config)
# Add research metadata
for scene in primary_scenes:
props = scene['properties']
props.update({
'research_quality_class': quality_stats['scene_classifications'].get(
props['id'], 'unclassified'
),
'temporal_completeness': temporal_stats['scene_temporal_scores'].get(
props['id'], 0.0
),
'spatial_relevance': spatial_stats['scene_spatial_scores'].get(
props['id'], 0.0
),
'research_priority': calculate_research_priority(scene, research_questions)
})
# Create research GeoPackage
layer_info = geopackage_manager.create_scene_geopackage(
scenes=primary_scenes,
output_path="research_publication_data.gpkg",
roi=roi,
downloaded_files=[d.downloaded_file_path for d in successful_downloads]
)
# Step 7: Reproducibility Documentation
print("\n📋 Reproducibility Documentation")
reproducibility_metadata = {
'study_metadata': {
'title': 'PlanetScope Analysis Study',
'study_period': study_period,
'roi_wkt': roi.wkt,
'research_questions': research_questions,
'methodology': 'PlanetScope-py automated workflow'
},
'data_sources': {
'primary_dataset': f"{len(primary_scenes)} PlanetScope scenes",
'quality_criteria': {
'cloud_cover_max': 0.1 if primary_scenes == all_scenes['high_quality'] else 0.3,
'sun_elevation_min': 30,
'acquisition_period': study_period
},
'alternative_datasets': {
label: len(scenes) for label, scenes in all_scenes.items()
}
},
'analysis_methods': {
'spatial_analysis': {
'primary_resolution': 30.0,
'method_used': high_res_result.method_used.value,
'scale_sensitivity': scale_analysis
},
'temporal_analysis': {
'resolutions_tested': [tr.value for tr in temporal_resolutions],
'results': temporal_results
}
},
'software_environment': {
'library': f"planetscope-py v{planetscope_py.__version__}",
'python_version': platform.python_version(),
'execution_platform': platform.system(),
'processing_date': datetime.now().isoformat()
},
'quality_assessment': quality_stats,
'statistical_summary': {
'temporal_coverage': temporal_stats,
'spatial_coverage': spatial_stats
},
'data_availability': {
'total_scenes_analyzed': len(primary_scenes),
'downloaded_for_research': len(successful_downloads),
'data_volume_gb': sum(d.file_size_mb for d in successful_downloads) / 1024
}
}
# Save reproducibility metadata
with open('research_reproducibility_metadata.json', 'w') as f:
json.dump(reproducibility_metadata, f, indent=2, default=str)
# Create publication summary
publication_summary = generate_publication_summary(
reproducibility_metadata, research_questions
)
with open('publication_summary.md', 'w') as f:
f.write(publication_summary)
print(f"✅ Research workflow complete!")
print(f" Primary dataset: {len(primary_scenes)} scenes")
print(f" Downloaded assets: {len(successful_downloads)}")
print(f" Documentation: research_reproducibility_metadata.json")
print(f" Summary: publication_summary.md")
return reproducibility_metadata
def select_research_scenes(scenes, research_questions, max_scenes=100):
"""Select optimal scenes for research based on research questions."""
# Score scenes based on research relevance
scored_scenes = []
for scene in scenes:
score = 0
props = scene['properties']
# Quality score
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
score += (1 - cloud_cover) * 50 # Cloud cover contribution
score += min(sun_elevation / 50, 1) * 30 # Sun elevation contribution
# Temporal distribution score
# Add logic based on research questions
if 'seasonal' in ' '.join(research_questions).lower():
# Prefer even seasonal distribution
month = datetime.fromisoformat(props['acquired'].replace('Z', '+00:00')).month
if month in [3, 6, 9, 12]: # Seasonal transition months
score += 20
if 'change detection' in ' '.join(research_questions).lower():
# Prefer high-quality scenes with good temporal spacing
if cloud_cover < 0.05 and sun_elevation > 45:
score += 25
scored_scenes.append((scene, score))
# Sort by score and take top scenes
scored_scenes.sort(key=lambda x: x[1], reverse=True)
return [scene for scene, score in scored_scenes[:max_scenes]]
def analyze_scene_quality_statistics(scenes):
"""Analyze quality statistics for scenes."""
quality_classes = {'excellent': 0, 'good': 0, 'fair': 0, 'poor': 0}
scene_classifications = {}
for scene in scenes:
props = scene['properties']
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
if cloud_cover < 0.05 and sun_elevation > 50:
quality_class = 'excellent'
elif cloud_cover < 0.15 and sun_elevation > 35:
quality_class = 'good'
elif cloud_cover < 0.3:
quality_class = 'fair'
else:
quality_class = 'poor'
quality_classes[quality_class] += 1
scene_classifications[props['id']] = quality_class
return {
'quality_distribution': quality_classes,
'scene_classifications': scene_classifications,
'excellent_scenes': quality_classes['excellent'],
'total_scenes': len(scenes)
}
def generate_publication_summary(metadata, research_questions):
"""Generate publication-ready summary."""
summary = f"""# PlanetScope Analysis Study - Publication Summary
## Study Overview
**Study Period**: {metadata['study_metadata']['study_period'][0]} to {metadata['study_metadata']['study_period'][1]}
**Geographic Area**: {metadata['data_sources']['primary_dataset']}
**Processing Date**: {metadata['software_environment']['processing_date'][:10]}
## Research Questions
{chr(10).join(f"- {q}" for q in research_questions)}
## Methodology
### Data Sources
- **Primary Dataset**: {metadata['data_sources']['primary_dataset']}
- **Quality Criteria**:
- Maximum cloud cover: {metadata['data_sources']['quality_criteria']['cloud_cover_max']:.0%}
- Minimum sun elevation: {metadata['data_sources']['quality_criteria']['sun_elevation_min']}°
- **Processing Software**: {metadata['software_environment']['library']}
### Spatial Analysis
- **Primary Resolution**: {metadata['analysis_methods']['spatial_analysis']['primary_resolution']}m
- **Method Used**: {metadata['analysis_methods']['spatial_analysis']['method_used']}
- **Scale Analysis**: {len(metadata['analysis_methods']['spatial_analysis']['scale_sensitivity'])} resolutions tested
### Temporal Analysis
- **Temporal Resolutions**: {', '.join(metadata['analysis_methods']['temporal_analysis']['resolutions_tested'])}
- **Data Availability**: {metadata['analysis_methods']['temporal_analysis']['results']['weekly']['data_availability']:.1%}
## Results Summary
### Data Quality Assessment
- **Excellent Quality Scenes**: {metadata['quality_assessment']['excellent_scenes']} ({metadata['quality_assessment']['excellent_scenes']/metadata['quality_assessment']['total_scenes']:.1%})
- **Total Scenes Analyzed**: {metadata['quality_assessment']['total_scenes']}
### Coverage Analysis
- **Temporal Coverage**: {metadata['statistical_summary']['temporal_coverage']['coverage_percentage']:.1%}
- **Spatial Coverage**: {metadata['statistical_summary']['spatial_coverage']['roi_coverage_percentage']:.1%}
### Downloaded Research Data
- **Research Assets**: {metadata['data_availability']['downloaded_for_research']} files
- **Data Volume**: {metadata['data_availability']['data_volume_gb']:.1f} GB
## Reproducibility Information
All analysis code, parameters, and intermediate results are documented in the accompanying metadata file (`research_reproducibility_metadata.json`). The complete dataset is available in GeoPackage format (`research_publication_data.gpkg`).
**Software Environment**:
- Python {metadata['software_environment']['python_version']}
- {metadata['software_environment']['library']}
- Platform: {metadata['software_environment']['execution_platform']}
## Data Availability Statement
The processed PlanetScope data used in this study is available in standardized GeoPackage format. Raw PlanetScope imagery is available through Planet Labs PBC subject to licensing agreements.
---
*Generated automatically by planetscope-py workflow system*
"""
return summaryasync def multi_site_analysis_workflow(sites, global_date_range, output_base_dir="multi_site_analysis"):
"""
Process multiple study sites with standardized methodology.
Args:
sites: Dictionary of {site_name: roi_geometry}
global_date_range: Tuple of (start_date, end_date)
output_base_dir: Base directory for outputs
"""
print("🌐 Multi-Site Analysis Workflow")
print("=" * 35)
# Initialize components
query = PlanetScopeQuery()
spatial_engine = SpatialDensityEngine()
temporal_analyzer = TemporalAnalyzer()
asset_manager = AssetManager(query.auth)
geopackage_manager = GeoPackageManager()
# Global results storage
global_results = {
'sites': {},
'comparative_analysis': {},
'processing_metadata': {
'start_time': datetime.now().isoformat(),
'total_sites': len(sites),
'date_range': global_date_range
}
}
# Create output directory structure
output_path = Path(output_base_dir)
output_path.mkdir(exist_ok=True)
for site_name in sites.keys():
(output_path / site_name).mkdir(exist_ok=True)
# Process each site
for site_name, roi in sites.items():
print(f"\n📍 Processing site: {site_name}")
print("-" * 30)
site_start_time = datetime.now()
try:
# Site-specific scene discovery
results = query.search_scenes(
geometry=roi,
start_date=global_date_range[0],
end_date=global_date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
scenes = results['features']
print(f" Scenes found: {len(scenes)}")
if len(scenes) == 0:
print(f" ⚠️ No scenes found for {site_name}")
global_results['sites'][site_name] = {'status': 'no_data'}
continue
# Spatial analysis
spatial_result = spatial_engine.calculate_density(scenes, roi)
# Export spatial analysis
site_output_dir = output_path / site_name
spatial_output = site_output_dir / f"{site_name}_density.tif"
spatial_result.export_geotiff(str(spatial_output))
# Temporal analysis
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
patterns = temporal_analyzer.analyze_acquisition_patterns(datacube)
gaps = temporal_analyzer.detect_temporal_gaps(datacube)
# Download best scenes (limited per site)
best_scenes = [
scene for scene in scenes
if scene['properties'].get('cloud_cover', 1.0) < 0.15
][:10] # Limit to 10 per site
downloads = []
if best_scenes:
site_downloads = await asset_manager.activate_and_download_assets(
scenes=best_scenes,
asset_types=["ortho_analytic_4b"],
output_dir=str(site_output_dir / "imagery"),
clip_to_roi=roi,
confirm_download=False
)
downloads = [d for d in site_downloads if d.status.value == "completed"]
# Create site GeoPackage
site_geopackage = site_output_dir / f"{site_name}_analysis.gpkg"
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path=str(site_geopackage),
roi=roi,
downloaded_files=[d.downloaded_file_path for d in downloads] if downloads else None
)
# Store site results
processing_time = (datetime.now() - site_start_time).total_seconds()
global_results['sites'][site_name] = {
'status': 'completed',
'roi_area_km2': calculate_area_km2(roi),
'total_scenes': len(scenes),
'downloaded_assets': len(downloads),
'processing_time_seconds': processing_time,
'spatial_analysis': {
'method': spatial_result.method_used.value,
'density_stats': spatial_result.stats
},
'temporal_analysis': {
'seasonal_patterns': len(patterns['seasonal_patterns']),
'temporal_gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
},
'outputs': {
'geopackage': str(site_geopackage),
'density_map': str(spatial_output),
'imagery_count': len(downloads)
}
}
print(f" ✅ Completed in {processing_time:.1f}s")
except Exception as e:
print(f" ❌ Error processing {site_name}: {e}")
global_results['sites'][site_name] = {
'status': 'failed',
'error': str(e)
}
# Comparative analysis across sites
print(f"\n📊 Comparative Analysis")
successful_sites = {
name: data for name, data in global_results['sites'].items()
if data.get('status') == 'completed'
}
if len(successful_sites) > 1:
comparative_analysis = perform_comparative_analysis(successful_sites)
global_results['comparative_analysis'] = comparative_analysis
# Create comparative visualization
create_comparative_plots(successful_sites, output_path / "comparative_analysis")
print(f" Compared {len(successful_sites)} sites")
print(f" Scene count range: {comparative_analysis['scene_count_range']}")
print(f" Data availability range: {comparative_analysis['data_availability_range']}")
# Global summary
global_results['processing_metadata']['end_time'] = datetime.now().isoformat()
global_results['processing_metadata']['total_processing_time'] = sum(
site_data.get('processing_time_seconds', 0)
for site_data in global_results['sites'].values()
if isinstance(site_data.get('processing_time_seconds'), (int, float))
)
# Save global results
global_report_path = output_path / "multi_site_analysis_report.json"
with open(global_report_path, 'w') as f:
json.dump(global_results, f, indent=2, default=str)
print(f"\n✅ Multi-site analysis complete!")
print(f" Successful sites: {len(successful_sites)}/{len(sites)}")
print(f" Total processing time: {global_results['processing_metadata']['total_processing_time']:.1f}s")
print(f" Global report: {global_report_path}")
return global_results
def perform_comparative_analysis(sites_data):
"""Perform comparative analysis across multiple sites."""
scene_counts = [data['total_scenes'] for data in sites_data.values()]
data_availability = [
data['temporal_analysis']['data_availability']
for data in sites_data.values()
]
roi_areas = [data['roi_area_km2'] for data in sites_data.values()]
return {
'scene_count_range': [min(scene_counts), max(scene_counts)],
'scene_count_mean': sum(scene_counts) / len(scene_counts),
'data_availability_range': [min(data_availability), max(data_availability)],
'data_availability_mean': sum(data_availability) / len(data_availability),
'roi_area_range': [min(roi_areas), max(roi_areas)],
'total_area_analyzed': sum(roi_areas),
'site_rankings': {
'by_scene_count': sorted(
sites_data.items(),
key=lambda x: x[1]['total_scenes'],
reverse=True
)[:5],
'by_data_availability': sorted(
sites_data.items(),
key=lambda x: x[1]['temporal_analysis']['data_availability'],
reverse=True
)[:5]
}
}
def create_comparative_plots(sites_data, output_dir):
"""Create comparative visualization plots."""
try:
import matplotlib.pyplot as plt
import numpy as np
output_dir.mkdir(exist_ok=True)
# Scene count comparison
site_names = list(sites_data.keys())
scene_counts = [data['total_scenes'] for data in sites_data.values()]
plt.figure(figsize=(12, 6))
plt.bar(site_names, scene_counts)
plt.title('Scene Count by Site')
plt.ylabel('Number of Scenes')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(output_dir / 'scene_count_comparison.png', dpi=300)
plt.close()
# Data availability comparison
data_availability = [
data['temporal_analysis']['data_availability']
for data in sites_data.values()
]
plt.figure(figsize=(12, 6))
plt.bar(site_names, data_availability)
plt.title('Data Availability by Site')
plt.ylabel('Data Availability (%)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(output_dir / 'data_availability_comparison.png', dpi=300)
plt.close()
print(f" Comparative plots saved to {output_dir}")
except ImportError:
print(" Matplotlib not available - skipping plot generation")async def automated_monitoring_workflow(config_file="monitoring_config.json"):
"""
Fully automated monitoring workflow that can be scheduled.
Designed for operational deployment with minimal human intervention.
"""
print("🤖 Automated Monitoring Workflow")
print("=" * 32)
# Load configuration
try:
with open(config_file, 'r') as f:
config = json.load(f)
except FileNotFoundError:
# Create default configuration
config = create_default_monitoring_config()
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
print(f" Created default configuration: {config_file}")
# Initialize systems
query = PlanetScopeQuery()
asset_manager = AssetManager(query.auth)
# Create timestamped output directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path(config['output_base_dir']) / f"monitoring_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
monitoring_results = {
'execution_timestamp': timestamp,
'config': config,
'sites': {},
'global_status': 'UNKNOWN',
'alerts': [],
'recommendations': []
}
# Process each monitored site
for site_config in config['monitored_sites']:
site_name = site_config['name']
roi = shape(site_config['roi_geojson'])
print(f"\n🔍 Monitoring: {site_name}")
try:
# Recent scene discovery
end_date = datetime.now()
start_date = end_date - timedelta(days=config['monitoring_period_days'])
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=config['quality_thresholds']['cloud_cover_max'],
item_types=["PSScene"]
)
recent_scenes = results['features']
# Automated analysis
site_analysis = perform_automated_site_analysis(
recent_scenes, roi, site_config, config
)
# Check for alerts
site_alerts = check_site_alerts(
site_analysis, site_config, config['alert_thresholds']
)
# Automated downloads if needed
downloads = []
if site_alerts and config['auto_download']['enabled']:
critical_scenes = [
scene for scene in recent_scenes
if scene['properties'].get('cloud_cover', 1.0) < config['auto_download']['quality_threshold']
][:config['auto_download']['max_scenes_per_site']]
if critical_scenes:
downloads = await asset_manager.activate_and_download_assets(
scenes=critical_scenes,
asset_types=config['auto_download']['asset_types'],
output_dir=str(output_dir / site_name / "imagery"),
confirm_download=False # Automated mode
)
monitoring_results['sites'][site_name] = {
'status': 'completed',
'analysis': site_analysis,
'alerts': site_alerts,
'downloads': len([d for d in downloads if d.status.value == "completed"])
}
monitoring_results['alerts'].extend(site_alerts)
print(f" Status: {'⚠️ ALERTS' if site_alerts else '✅ NORMAL'}")
except Exception as e:
print(f" ❌ Error: {e}")
monitoring_results['sites'][site_name] = {
'status': 'failed',
'error': str(e)
}
monitoring_results['alerts'].append({
'site': site_name,
'type': 'SYSTEM_ERROR',
'severity': 'HIGH',
'message': f"Monitoring failed: {str(e)}"
})
# Global status assessment
total_alerts = len(monitoring_results['alerts'])
high_severity_alerts = len([
alert for alert in monitoring_results['alerts']
if alert.get('severity') == 'HIGH'
])
if high_severity_alerts > 0:
monitoring_results['global_status'] = 'CRITICAL'
elif total_alerts > 0:
monitoring_results['global_status'] = 'WARNING'
else:
monitoring_results['global_status'] = 'NORMAL'
# Generate recommendations
monitoring_results['recommendations'] = generate_automated_recommendations(
monitoring_results
)
# Save monitoring report
report_path = output_dir / "automated_monitoring_report.json"
with open(report_path, 'w') as f:
json.dump(monitoring_results, f, indent=2, default=str)
# Send alerts if configured
if config.get('notifications', {}).get('enabled', False):
await send_monitoring_alerts(monitoring_results, config['notifications'])
print(f"\n🎯 Monitoring Complete")
print(f" Global Status: {monitoring_results['global_status']}")
print(f" Total Alerts: {total_alerts}")
print(f" Report: {report_path}")
return monitoring_results
def create_default_monitoring_config():
"""Create default monitoring configuration."""
return {
"output_base_dir": "automated_monitoring",
"monitoring_period_days": 14,
"monitored_sites": [
{
"name": "example_site",
"roi_geojson": {
"type": "Polygon",
"coordinates": [[[9.0, 45.4], [9.2, 45.4], [9.2, 45.6], [9.0, 45.6], [9.0, 45.4]]]
},
"expected_frequency_days": 7,
"minimum_quality_threshold": 0.8
}
],
"quality_thresholds": {
"cloud_cover_max": 0.3,
"sun_elevation_min": 30
},
"alert_thresholds": {
"max_gap_days": 14,
"min_scenes_per_week": 2,
"quality_degradation_threshold": 0.2
},
"auto_download": {
"enabled": True,
"quality_threshold": 0.1,
"max_scenes_per_site": 5,
"asset_types": ["ortho_analytic_4b"]
},
"notifications": {
"enabled": False,
"email": {
"recipients": ["monitoring@example.com"],
"smtp_server": "smtp.example.com"
}
}
}
async def send_monitoring_alerts(monitoring_results, notification_config):
"""Send monitoring alerts via configured channels."""
if notification_config.get('email', {}).get('enabled', False):
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Create email content
subject = f"PlanetScope Monitoring Alert - {monitoring_results['global_status']}"
body = f"""
Automated PlanetScope Monitoring Report
=====================================
Global Status: {monitoring_results['global_status']}
Execution Time: {monitoring_results['execution_timestamp']}
Total Alerts: {len(monitoring_results['alerts'])}
Site Status:
{format_site_status_for_email(monitoring_results['sites'])}
Alerts:
{format_alerts_for_email(monitoring_results['alerts'])}
Recommendations:
{chr(10).join(f"- {rec}" for rec in monitoring_results['recommendations'])}
This is an automated message from the PlanetScope monitoring system.
"""
# Send email (implementation would depend on specific email service)
print(" 📧 Email notification sent")
except Exception as e:
print(f" ❌ Email notification failed: {e}")
def format_site_status_for_email(sites):
"""Format site status for email notification."""
lines = []
for site_name, site_data in sites.items():
status_emoji = "✅" if site_data['status'] == 'completed' and not site_data.get('alerts') else "⚠️"
lines.append(f"{status_emoji} {site_name}: {site_data['status']}")
return '\n'.join(lines)
def format_alerts_for_email(alerts):
"""Format alerts for email notification."""
if not alerts:
return "No alerts"
lines = []
for alert in alerts:
severity_emoji = "🚨" if alert.get('severity') == 'HIGH' else "⚠️"
lines.append(f"{severity_emoji} {alert.get('type', 'UNKNOWN')}: {alert.get('message', 'No message')}")
return '\n'.join(lines)import time
import psutil
from contextlib import contextmanager
@contextmanager
def performance_monitor(workflow_name):
"""Context manager for monitoring workflow performance."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
print(f"🚀 Starting {workflow_name}")
print(f" Initial memory: {start_memory:.1f} MB")
try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
duration = end_time - start_time
memory_delta = end_memory - start_memory
print(f"✅ {workflow_name} completed")
print(f" Duration: {duration:.1f} seconds")
print(f" Memory usage: {end_memory:.1f} MB ({memory_delta:+.1f} MB)")
# Usage in workflows
async def optimized_workflow_example():
"""Example of performance-optimized workflow."""
with performance_monitor("Optimized Research Workflow"):
# Use performance monitoring throughout workflow
with performance_monitor("Scene Discovery"):
# Scene discovery code
pass
with performance_monitor("Spatial Analysis"):
# Spatial analysis code
pass
with performance_monitor("Data Export"):
# Export code
passasync def robust_workflow_wrapper(workflow_func, *args, **kwargs):
"""Robust wrapper for any workflow with error handling and recovery."""
max_retries = 3
retry_delay = 60 # seconds
for attempt in range(max_retries):
try:
return await workflow_func(*args, **kwargs)
except Exception as e:
print(f"❌ Workflow failed (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
print(f"⏳ Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("❌ Workflow failed permanently after all retries")
# Save partial results if available
save_partial_results(e, *args, **kwargs)
raise e
def save_partial_results(error, *args, **kwargs):
"""Save partial results when workflow fails."""
error_report = {
'error_timestamp': datetime.now().isoformat(),
'error_message': str(error),
'workflow_args': str(args),
'workflow_kwargs': str(kwargs)
}
with open('workflow_error_report.json', 'w') as f:
json.dump(error_report, f, indent=2, default=str)def load_workflow_config(config_path="workflow_config.yaml"):
"""Load workflow configuration from file."""
try:
import yaml
with open(config_path, 'r') as f:
return yaml.safe_load(f)
except ImportError:
# Fallback to JSON
with open(config_path.replace('.yaml', '.json'), 'r') as f:
return json.load(f)
def validate_workflow_config(config):
"""Validate workflow configuration."""
required_fields = ['roi', 'date_range', 'output_dir']
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required configuration field: {field}")
# Additional validation logic
return Trueclass WorkflowResourceManager:
"""Manage computational resources during workflows."""
def __init__(self, max_memory_gb=8, max_concurrent_downloads=5):
self.max_memory_gb = max_memory_gb
self.max_concurrent_downloads = max_concurrent_downloads
self.active_downloads = 0
def check_memory_usage(self):
"""Check current memory usage."""
memory_gb = psutil.Process().memory_info().rss / (1024**3)
if memory_gb > self.max_memory_gb:
raise MemoryError(f"Memory usage ({memory_gb:.1f} GB) exceeds limit ({self.max_memory_gb} GB)")
async def managed_download(self, download_func, *args, **kwargs):
"""Manage download with resource limits."""
while self.active_downloads >= self.max_concurrent_downloads:
await asyncio.sleep(1)
self.active_downloads += 1
try:
result = await download_func(*args, **kwargs)
return result
finally:
self.active_downloads -= 1
# Usage
resource_manager = WorkflowResourceManager(max_memory_gb=16)
async def resource_managed_workflow():
"""Workflow with resource management."""
resource_manager.check_memory_usage()
# Use managed downloads
downloads = await resource_manager.managed_download(
asset_manager.activate_and_download_assets,
scenes=scenes
)import asyncio
from shapely.geometry import box
async def main():
"""Example of running complete workflows."""
# Define study area (Milan, Italy)
roi = box(9.04, 45.40, 9.28, 45.52)
date_range = ("2024-01-01", "2024-06-30")
# Run standard research workflow
research_result = await standard_research_workflow(roi, date_range)
# Run environmental monitoring workflow
monitoring_result = await environmental_monitoring_workflow(roi, monitoring_period_months=12)
# Run operational monitoring
operational_result = await operational_monitoring_workflow(roi)
# Multi-site analysis
sites = {
'milan': box(9.04, 45.40, 9.28, 45.52),
'rome': box(12.35, 41.70, 12.65, 42.00),
'naples': box(14.15, 40.75, 14.35, 40.95)
}
multi_site_result = await multi_site_analysis_workflow(sites, date_range)
print("\n🎉 All workflows completed successfully!")
if __name__ == "__main__":
asyncio.run(main())import schedule
import time
from datetime import datetime
def setup_scheduled_monitoring():
"""Setup scheduled monitoring workflows."""
# Daily operational monitoring
schedule.every().day.at("08:00").do(
lambda: asyncio.run(operational_monitoring_workflow(roi))
)
# Weekly environmental monitoring
schedule.every().monday.at("06:00").do(
lambda: asyncio.run(environmental_monitoring_workflow(roi))
)
# Monthly comprehensive analysis
schedule.every().month.do(
lambda: asyncio.run(standard_research_workflow(roi, get_last_month_range()))
)
print("📅 Scheduled monitoring setup complete")
print(" Daily operational monitoring: 08:00")
print(" Weekly environmental monitoring: Monday 06:00")
print(" Monthly comprehensive analysis: 1st of month")
def run_scheduler():
"""Run the monitoring scheduler."""
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def get_last_month_range():
"""Get date range for last month."""
now = datetime.now()
if now.month == 1:
last_month = 12
year = now.year - 1
else:
last_month = now.month - 1
year = now.year
start_date = f"{year}-{last_month:02d}-01"
# Calculate end date (last day of last month)
import calendar
last_day = calendar.monthrange(year, last_month)[1]
end_date = f"{year}-{last_month:02d}-{last_day}"
return (start_date, end_date)# Dockerfile for automated workflows
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gdal-bin \
libgdal-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy workflow code
COPY workflows/ ./workflows/
COPY config/ ./config/
# Set environment variables
ENV PLANET_API_KEY=""
ENV PYTHONPATH=/app
# Default command
CMD ["python", "workflows/automated_monitoring.py"]# docker-compose.yml for workflow services
version: '3.8'
services:
planetscope-monitor:
build: .
environment:
- PLANET_API_KEY=${PLANET_API_KEY}
volumes:
- ./data:/app/data
- ./config:/app/config
restart: unless-stopped
planetscope-scheduler:
build: .
command: python workflows/scheduler.py
environment:
- PLANET_API_KEY=${PLANET_API_KEY}
volumes:
- ./data:/app/data
- ./config:/app/config
restart: unless-stoppedasync def adaptive_workflow(roi, date_range, conditions=None):
"""
Adaptive workflow that changes behavior based on conditions.
"""
conditions = conditions or {}
print("🔄 Adaptive Workflow")
print("=" * 20)
# Initialize query
query = PlanetScopeQuery()
# Adaptive scene discovery based on data availability
initial_results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.2, # Start strict
item_types=["PSScene"]
)
scenes = initial_results['features']
# Adapt strategy based on initial results
if len(scenes) < 10:
print(" 📊 Low data availability - relaxing criteria")
# Relax cloud cover requirements
relaxed_results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.5, # More permissive
item_types=["PSScene"]
)
scenes = relaxed_results['features']
workflow_type = 'relaxed_quality'
elif len(scenes) > 1000:
print(" 📊 High data availability - applying strict filtering")
# Apply stricter quality filters
high_quality_scenes = [
scene for scene in scenes
if (scene['properties'].get('cloud_cover', 1.0) < 0.05 and
scene['properties'].get('sun_elevation', 0) > 45)
]
if len(high_quality_scenes) > 50:
scenes = high_quality_scenes
workflow_type = 'high_quality'
else:
workflow_type = 'standard'
else:
workflow_type = 'standard'
print(f" 🎯 Selected workflow type: {workflow_type}")
print(f" 📡 Final scene count: {len(scenes)}")
# Conditional analysis based on workflow type
if workflow_type == 'high_quality':
# High-resolution spatial analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scenes, roi, config=DensityConfig(resolution=30.0)
)
# Detailed temporal analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.DAILY,
enable_gap_analysis=True
)
elif workflow_type == 'relaxed_quality':
# Faster, lower-resolution analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scenes, roi, config=DensityConfig(resolution=1000.0)
)
# Weekly temporal analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
enable_gap_analysis=False
)
else: # standard
# Balanced analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(scenes, roi)
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
enable_gap_analysis=True
)
# Continue with conditional processing...
temporal_analyzer = TemporalAnalyzer(temporal_config)
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
# Adaptive export based on scene count
if len(scenes) > 500:
# Large dataset - create compressed outputs
geopackage_config = GeoPackageConfig(
include_imagery=False, # Skip imagery for large datasets
attribute_schema="minimal"
)
else:
# Standard dataset - full export
geopackage_config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive"
)
# Create adaptive report
adaptive_report = {
'workflow_type': workflow_type,
'adaptation_reason': get_adaptation_reason(workflow_type, len(scenes)),
'final_scene_count': len(scenes),
'processing_parameters': {
'spatial_resolution': spatial_result.resolution_meters,
'temporal_resolution': temporal_config.temporal_resolution.value,
'include_imagery': geopackage_config.include_imagery
}
}
return adaptive_report
def get_adaptation_reason(workflow_type, scene_count):
"""Get human-readable adaptation reason."""
if workflow_type == 'high_quality':
return f"High data availability ({scene_count} scenes) enabled high-quality analysis"
elif workflow_type == 'relaxed_quality':
return f"Low data availability ({scene_count} scenes) required relaxed quality criteria"
else:
return f"Standard analysis with {scene_count} scenes"import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
async def parallel_processing_workflow(sites, date_range, max_workers=4):
"""
Process multiple sites in parallel for faster execution.
"""
print("⚡ Parallel Processing Workflow")
print("=" * 30)
# Create semaphore to limit concurrent API calls
api_semaphore = asyncio.Semaphore(max_workers)
async def process_site_with_semaphore(site_name, roi):
"""Process single site with API rate limiting."""
async with api_semaphore:
return await process_single_site(site_name, roi, date_range)
# Process all sites concurrently
tasks = [
process_site_with_semaphore(site_name, roi)
for site_name, roi in sites.items()
]
print(f" 🚀 Starting {len(tasks)} parallel site analyses...")
# Execute with progress tracking
results = {}
completed = 0
for task in asyncio.as_completed(tasks):
site_result = await task
site_name = site_result['site_name']
results[site_name] = site_result
completed += 1
print(f" ✅ Completed {site_name} ({completed}/{len(tasks)})")
print(f" 🎯 All {len(tasks)} sites processed")
return results
async def process_single_site(site_name, roi, date_range):
"""Process a single site (can be run in parallel)."""
try:
# Scene discovery
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
scenes = results['features']
# Quick spatial analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(scenes, roi)
return {
'site_name': site_name,
'status': 'success',
'scene_count': len(scenes),
'spatial_stats': spatial_result.stats,
'processing_time': datetime.now().isoformat()
}
except Exception as e:
return {
'site_name': site_name,
'status': 'failed',
'error': str(e),
'processing_time': datetime.now().isoformat()
}
# CPU-intensive processing with ProcessPoolExecutor
def cpu_intensive_analysis(scenes_data):
"""CPU-intensive analysis that can be parallelized."""
import numpy as np
# Example: Complex statistical analysis
cloud_covers = [scene['properties'].get('cloud_cover', 1.0) for scene in scenes_data]
# Simulate complex calculations
stats = {
'mean': np.mean(cloud_covers),
'std': np.std(cloud_covers),
'percentiles': np.percentile(cloud_covers, [10, 25, 50, 75, 90]).tolist()
}
return stats
async def workflow_with_cpu_processing(sites, date_range):
"""Workflow combining async I/O with CPU-intensive processing."""
# Step 1: Async data collection
all_scenes = {}
for site_name, roi in sites.items():
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
all_scenes[site_name] = results['features']
# Step 2: CPU-intensive processing in parallel
with ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_event_loop()
cpu_tasks = {
site_name: loop.run_in_executor(
executor, cpu_intensive_analysis, scenes
)
for site_name, scenes in all_scenes.items()
}
# Wait for all CPU tasks to complete
cpu_results = {}
for site_name, task in cpu_tasks.items():
cpu_results[site_name] = await task
return cpu_resultsimport unittest
from unittest.mock import Mock, patch
import asyncio
class TestWorkflows(unittest.TestCase):
"""Test suite for workflow functions."""
def setUp(self):
"""Set up test fixtures."""
self.test_roi = box(9.0, 45.0, 9.1, 45.1)
self.test_date_range = ("2024-01-01", "2024-01-31")
# Mock scene data
self.mock_scene = {
'id': 'test_scene_001',
'properties': {
'acquired': '2024-01-15T10:30:00Z',
'cloud_cover': 0.1,
'sun_elevation': 45.0
},
'geometry': {
'type': 'Polygon',
'coordinates': [[
[9.0, 45.0], [9.1, 45.0], [9.1, 45.1], [9.0, 45.1], [9.0, 45.0]
]]
}
}
@patch('planetscope_py.PlanetScopeQuery')
def test_standard_workflow_success(self, mock_query_class):
"""Test successful execution of standard workflow."""
# Setup mock
mock_query = mock_query_class.return_value
mock_query.search_scenes.return_value = {
'features': [self.mock_scene] * 10
}
# Run workflow
async def run_test():
result = await standard_research_workflow(
self.test_roi,
self.test_date_range,
output_dir="test_output"
)
return result
result = asyncio.run(run_test())
# Assertions
self.assertIsNotNone(result)
self.assertEqual(result['results']['total_scenes'], 10)
self.assertEqual(result['workflow'], 'standard_research')
@patch('planetscope_py.PlanetScopeQuery')
def test_workflow_no_scenes(self, mock_query_class):
"""Test workflow behavior when no scenes are found."""
# Setup mock to return no scenes
mock_query = mock_query_class.return_value
mock_query.search_scenes.return_value = {'features': []}
# Run workflow
async def run_test():
result = await standard_research_workflow(
self.test_roi,
self.test_date_range
)
return result
result = asyncio.run(run_test())
# Should return None when no scenes found
self.assertIsNone(result)
def test_get_season_from_scene(self):
"""Test season extraction from scene."""
# Test winter scene
winter_scene = self.mock_scene.copy()
winter_scene['properties']['acquired'] = '2024-12-15T10:30:00Z'
self.assertEqual(get_season_from_scene(winter_scene), 'winter')
# Test spring scene
spring_scene = self.mock_scene.copy()
spring_scene['properties']['acquired'] = '2024-04-15T10:30:00Z'
self.assertEqual(get_season_from_scene(spring_scene), 'spring')
def test_assess_monitoring_quality(self):
"""Test quality assessment function."""
# Excellent quality scene
excellent_scene = self.mock_scene.copy()
excellent_scene['properties']['cloud_cover'] = 0.02
excellent_scene['properties']['sun_elevation'] = 55
self.assertEqual(assess_monitoring_quality(excellent_scene), 'excellent')
# Poor quality scene
poor_scene = self.mock_scene.copy()
poor_scene['properties']['cloud_cover'] = 0.8
poor_scene['properties']['sun_elevation'] = 20
self.assertEqual(assess_monitoring_quality(poor_scene), 'poor')
class TestWorkflowIntegration(unittest.TestCase):
"""Integration tests for complete workflows."""
@unittest.skipUnless(
os.getenv('PLANET_API_KEY'),
"Requires valid Planet API key"
)
def test_real_api_integration(self):
"""Test workflow with real API (requires API key)."""
# Small test area
test_roi = box(9.18, 45.46, 9.19, 45.47) # Very small area in Milan
test_date_range = ("2024-01-01", "2024-01-07") # One week
async def run_integration_test():
try:
result = await standard_research_workflow(
test_roi,
test_date_range,
output_dir="integration_test_output"
)
return result
except Exception as e:
self.fail(f"Integration test failed: {e}")
result = asyncio.run(run_integration_test())
self.assertIsNotNone(result)
if __name__ == '__main__':
unittest.main()def validate_workflow_outputs(workflow_result, expected_outputs=None):
"""
Validate workflow outputs meet expected criteria.
Args:
workflow_result: Result dictionary from workflow
expected_outputs: Dictionary of expected output criteria
"""
expected_outputs = expected_outputs or {}
validation_results = {
'valid': True,
'errors': [],
'warnings': []
}
# Check required fields
required_fields = ['workflow', 'completion_time', 'results', 'outputs']
for field in required_fields:
if field not in workflow_result:
validation_results['valid'] = False
validation_results['errors'].append(f"Missing required field: {field}")
# Validate results structure
if 'results' in workflow_result:
results = workflow_result['results']
# Check scene counts are reasonable
total_scenes = results.get('total_scenes', 0)
if total_scenes == 0:
validation_results['warnings'].append("No scenes found in analysis")
elif total_scenes > 10000:
validation_results['warnings'].append(f"Very large scene count: {total_scenes}")
# Check data availability
temporal_analysis = results.get('temporal_analysis', {})
data_availability = temporal_analysis.get('data_availability')
if data_availability is not None and data_availability < 0.5:
validation_results['warnings'].append(
f"Low data availability: {data_availability:.1%}"
)
# Validate output files exist
if 'outputs' in workflow_result:
outputs = workflow_result['outputs']
for output_type, output_path in outputs.items():
if output_path and not os.path.exists(output_path):
validation_results['valid'] = False
validation_results['errors'].append(
f"Output file missing: {output_path}"
)
# Custom validation criteria
if expected_outputs:
min_scenes = expected_outputs.get('min_scenes')
if min_scenes and workflow_result.get('results', {}).get('total_scenes', 0) < min_scenes:
validation_results['valid'] = False
validation_results['errors'].append(
f"Scene count below minimum: {min_scenes}"
)
return validation_results
# Example usage
async def validated_workflow_execution():
"""Execute workflow with validation."""
roi = box(9.04, 45.40, 9.28, 45.52)
date_range = ("2024-01-01", "2024-06-30")
# Run workflow
result = await standard_research_workflow(roi, date_range)
# Validate results
validation = validate_workflow_outputs(
result,
expected_outputs={'min_scenes': 10}
)
if validation['valid']:
print("✅ Workflow validation passed")
else:
print("❌ Workflow validation failed:")
for error in validation['errors']:
print(f" Error: {error}")
if validation['warnings']:
print("⚠️ Warnings:")
for warning in validation['warnings']:
print(f" Warning: {warning}")
return result, validationThese complete workflow examples demonstrate how to integrate all PlanetScope-py components into comprehensive, production-ready analysis pipelines suitable for research, operational monitoring, and automated processing scenarios. The workflows include error handling, performance monitoring, validation, and deployment patterns for real-world usage.
Getting Started
Core Features
- Scene Discovery
- Metadata Analysis
- Spatial Density Analysis and Visualization
- Rate Limiting & Performance
Advanced Analysis
Integration Workflows
Examples & Tutorials
API Reference
Support