-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_async_processing.py
More file actions
180 lines (143 loc) Β· 5.53 KB
/
test_async_processing.py
File metadata and controls
180 lines (143 loc) Β· 5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
"""
Test script for PyMapGIS async processing functionality.
"""
import asyncio
import tempfile
import pandas as pd
import numpy as np
from pathlib import Path
import time
def create_test_data():
"""Create test CSV data for async processing."""
print("Creating test data...")
# Create a moderately large dataset
n_rows = 100000
data = {
'id': range(n_rows),
'x': np.random.uniform(-180, 180, n_rows),
'y': np.random.uniform(-90, 90, n_rows),
'population': np.random.randint(100, 100000, n_rows),
'area_km2': np.random.uniform(1, 1000, n_rows),
'category': np.random.choice(['urban', 'suburban', 'rural'], n_rows)
}
df = pd.DataFrame(data)
# Save to temporary file
temp_file = Path(tempfile.gettempdir()) / "test_async_data.csv"
df.to_csv(temp_file, index=False)
print(f"Created test data: {len(df)} rows at {temp_file}")
return temp_file
async def test_async_processing():
"""Test async processing functionality."""
print("Testing PyMapGIS async processing...")
try:
# Import async processing
from pymapgis.async_processing import (
AsyncGeoProcessor,
async_process_in_chunks,
PerformanceMonitor
)
print("β
Async processing module imported successfully")
# Create test data
test_file = create_test_data()
# Define a simple processing function
def calculate_density(chunk):
"""Calculate population density."""
chunk['density'] = chunk['population'] / chunk['area_km2']
return chunk[chunk['density'] > 50] # Filter high-density areas
# Test 1: Basic async processing
print("\n--- Test 1: Basic Async Processing ---")
start_time = time.time()
result = await async_process_in_chunks(
filepath=test_file,
operation=calculate_density,
chunk_size=10000,
show_progress=True
)
duration = time.time() - start_time
print(f"β
Processed {len(result)} high-density areas in {duration:.2f}s")
# Test 2: Performance monitoring
print("\n--- Test 2: Performance Monitoring ---")
monitor = PerformanceMonitor("Density Calculation")
monitor.start()
# Simulate some processing
for i in range(5):
monitor.update(items=1000, bytes_count=50000)
await asyncio.sleep(0.1) # Simulate work
stats = monitor.finish()
print(f"β
Performance monitoring: {stats['items_per_second']:.1f} items/s")
# Test 3: Parallel operations
print("\n--- Test 3: Parallel Operations ---")
# Create multiple small datasets
test_items = [f"item_{i}" for i in range(10)]
def simple_operation(item):
"""Simple operation for parallel testing."""
time.sleep(0.1) # Simulate work
return f"processed_{item}"
from pymapgis.async_processing import parallel_geo_operations
start_time = time.time()
results = await parallel_geo_operations(
data_items=test_items,
operation=simple_operation,
max_workers=4
)
duration = time.time() - start_time
print(f"β
Parallel processing: {len(results)} items in {duration:.2f}s")
# Clean up
test_file.unlink()
print("\nπ All async processing tests passed!")
return True
except ImportError as e:
print(f"β Import error: {e}")
return False
except Exception as e:
print(f"β Test error: {e}")
import traceback
traceback.print_exc()
return False
async def test_basic_import():
"""Test basic PyMapGIS import with async functions."""
print("Testing basic PyMapGIS import...")
try:
import pymapgis as pmg
print(f"β
PyMapGIS imported, version: {pmg.__version__}")
# Test if async functions are available
async_functions = [
'AsyncGeoProcessor',
'async_read_large_file',
'async_process_in_chunks',
'parallel_geo_operations'
]
for func_name in async_functions:
if hasattr(pmg, func_name):
print(f"β
{func_name} available")
else:
print(f"β {func_name} not available")
return True
except Exception as e:
print(f"β Import test failed: {e}")
return False
async def main():
"""Run all tests."""
print("PyMapGIS Async Processing Test Suite")
print("=" * 50)
# Test basic imports
import_ok = await test_basic_import()
if import_ok:
# Test async processing functionality
async_ok = await test_async_processing()
print("\n" + "=" * 50)
print("Test Summary:")
print(f"Basic imports: {'β
PASSED' if import_ok else 'β FAILED'}")
print(f"Async processing: {'β
PASSED' if async_ok else 'β FAILED'}")
if import_ok and async_ok:
print("\nπ All tests passed! Async processing is ready for use.")
return 0
else:
print("\nβ Some tests failed.")
return 1
else:
print("\nβ Basic import test failed.")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())