-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvolume.py
More file actions
548 lines (444 loc) · 20.4 KB
/
volume.py
File metadata and controls
548 lines (444 loc) · 20.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
#!/usr/bin/env python3
"""
Steps 2 & 3 — Aligned Grid + Volume
Step 2: Generates an orientation-aligned 1×1m grid within each building
footprint, rotated to the longest edge so non-axis-aligned buildings are
covered tightly.
Step 3: Samples terrain (swissALTI3D) and surface (swissSURFACE3D) elevations
at each grid point, then calculates building volume and all height metrics.
Outputs per building:
- volume_above_ground_m3
- elevation_base_min_m (min terrain under building — used as volume base datum)
- elevation_base_mean_m (mean terrain elevation under building)
- elevation_base_max_m (max terrain elevation under building)
- elevation_roof_min_m (min surface elevation within footprint — estimated eave)
- elevation_roof_mean_m (mean surface elevation within footprint)
- elevation_roof_max_m (max surface elevation within footprint — estimated ridge)
- height_mean_m (mean of building heights at grid points)
- height_max_m (max building height)
- height_minimal_m (volume / footprint — equivalent uniform box height)
"""
import logging
from collections import OrderedDict
from pathlib import Path
from typing import Any, Optional, TypedDict, Union
import numpy as np
import rasterio
from shapely import contains_xy
from shapely.affinity import rotate
from shapely.geometry.base import BaseGeometry
from tile_fetcher import tile_ids_from_bounds
log = logging.getLogger(__name__)
# ── Canonical pipeline result row ──────────────────────────────────────────
class BuildingResult(TypedDict, total=False):
"""
The canonical dict that flows through the pipeline. Fields are added
incrementally:
- **Step 1** (footprints.py) sets the identifiers and ``status_step1``.
- **Step 3** (volume.py) adds measurements and ``status_step3``.
- **Step 4** (area.py) adds GWR fields, floor-area fields, and
``status_step4``.
- **Aggregation** (main.py) collapses sub-rows by ``input_id``. After
aggregation, numeric fields may become ``';'``-joined strings when
sub-rows disagree (multi-EGID input or multi-polygon AV match) —
the column dtype becomes ``object`` for any column with at least
one aggregated row.
``total=False`` because individual fields may be absent depending on
which steps succeeded; e.g. a row with ``status_step1='no_footprint'``
has no Step 3 measurements, and a row without ``--estimate-area`` has
no Step 4 fields. Field types here document the *primary* type — see
the docstring above for the post-aggregation string fallback.
"""
# Step 1: identifiers + status
input_id: Union[str, int]
input_egid: Optional[int]
input_egid_raw: Optional[str]
input_lon: Optional[float] # only present in --use-coordinates mode
input_lat: Optional[float] # only present in --use-coordinates mode
av_egid: Optional[int]
fid: Optional[str]
area_official_m2: Optional[float]
geometry: Any # shapely geometry; only present in Step 1 GeoDataFrames
status_step1: str
# Step 3: volume + heights
area_footprint_m2: Optional[float]
volume_above_ground_m3: Optional[float]
elevation_base_min_m: Optional[float]
elevation_base_mean_m: Optional[float]
elevation_base_max_m: Optional[float]
elevation_roof_min_m: Optional[float]
elevation_roof_mean_m: Optional[float]
elevation_roof_max_m: Optional[float]
height_mean_m: Optional[float]
height_max_m: Optional[float]
height_minimal_m: Optional[float]
grid_points_count: int
status_step3: str
warnings: str
# Step 4: GWR + floor area
gkat: Optional[int]
gklas: Optional[int]
gbauj: Optional[int]
gastw: Optional[int]
floor_height_used_m: Optional[float]
floor_height_source: Optional[str]
floors_estimated: Optional[int]
area_floor_total_m2: Optional[float]
area_accuracy: Optional[str]
building_type: Optional[str]
status_step4: Optional[str]
# LRU tile cache size (each cached tile = one open file handle)
MAX_CACHED_TILES = 200
# ── Pipeline status code constants ──────────────────────────────────────────
#
# These live here because volume.py is the de-facto shared-definitions
# module (it already exports make_empty_volume_result and append_warning
# to area.py and main.py). Centralising them here means there is exactly
# one place to look up "what valid status strings exist", and a typo in
# any consumer becomes an ImportError instead of a silent string mismatch.
#
# The dynamic prefixes (STATUS_ERROR_PREFIX, STATUS_SKIPPED_PREFIX) are
# composed at the call site, e.g. f"{STATUS_ERROR_PREFIX}{type(e).__name__}"
# yields "error:RasterioIOError".
# Step 1 (footprints.py)
STATUS_OK = "ok"
STATUS_INVALID_EGID = "invalid_egid"
STATUS_NO_FOOTPRINT = "no_footprint"
# Step 3 (volume.py)
STATUS_SUCCESS = "success"
STATUS_NO_GRID_POINTS = "no_grid_points"
STATUS_NO_HEIGHT_DATA = "no_height_data"
STATUS_ERROR_PREFIX = "error:"
# Step 4 (area.py) — re-uses STATUS_SUCCESS and STATUS_NO_FOOTPRINT
STATUS_NO_VOLUME = "no_volume"
# STATUS_HEIGHT_EXCEEDS_CAP lives in area.py because its value depends on
# the HEIGHT_SANITY_CAP_M constant defined there.
# main.py — composed by main.py to flag rows that didn't make it through
# Step 3 successfully (e.g. "skipped:invalid_egid", "skipped:no_footprint")
STATUS_SKIPPED = "skipped"
STATUS_SKIPPED_PREFIX = "skipped:"
# ── Step 2: Aligned Grid ────────────────────────────────────────────────────
def get_building_orientation(polygon: BaseGeometry) -> float:
"""
Calculate building orientation using minimum area bounding rectangle.
Returns rotation angle in degrees (angle of the longest edge).
Returns 0.0 for degenerate polygons (too small or linear).
"""
min_rect = polygon.minimum_rotated_rectangle
# Guard against degenerate geometry (point or line)
if min_rect.geom_type != 'Polygon' or min_rect.area < 1e-6:
return 0.0
coords = list(min_rect.exterior.coords)
edge_lengths = []
angles = []
for i in range(len(coords) - 1):
x1, y1 = coords[i]
x2, y2 = coords[i + 1]
length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
edge_lengths.append(length)
angles.append(angle)
longest_idx = np.argmax(edge_lengths)
return angles[longest_idx]
def create_aligned_grid_points(
polygon: BaseGeometry,
voxel_size: float = 1.0,
) -> list[tuple[float, float]]:
"""
Create grid points aligned to building orientation.
Algorithm:
1. Compute building orientation from minimum rotated rectangle
2. Rotate polygon to align with axes
3. Generate regular grid in rotated space
4. Filter points inside/touching the polygon
5. Rotate points back to original orientation
Args:
polygon: Shapely Polygon in LV95 coordinates
voxel_size: Grid cell size in meters (default 1.0)
Returns:
List of (x, y) tuples in LV95 coordinates
"""
rotation_angle = get_building_orientation(polygon)
# Rotate polygon to align with axes
rotated_polygon = rotate(polygon, -rotation_angle, origin='centroid')
# Snap bounding box to grid boundaries
bounds = rotated_polygon.bounds
x_min = np.floor(bounds[0] / voxel_size) * voxel_size
y_min = np.floor(bounds[1] / voxel_size) * voxel_size
x_max = np.ceil(bounds[2] / voxel_size) * voxel_size
y_max = np.ceil(bounds[3] / voxel_size) * voxel_size
# Generate grid at cell centers
x_coords = np.arange(x_min + voxel_size / 2, x_max, voxel_size)
y_coords = np.arange(y_min + voxel_size / 2, y_max, voxel_size)
# Filter points inside the rotated polygon using vectorized containment
xx, yy = np.meshgrid(x_coords, y_coords)
flat_x = xx.ravel()
flat_y = yy.ravel()
mask = contains_xy(rotated_polygon, flat_x, flat_y)
inside_x = flat_x[mask]
inside_y = flat_y[mask]
if len(inside_x) == 0:
return []
# Rotate points back to original orientation (vectorized)
cx, cy = rotated_polygon.centroid.x, rotated_polygon.centroid.y
angle_rad = np.radians(rotation_angle)
cos_a, sin_a = np.cos(angle_rad), np.sin(angle_rad)
dx = inside_x - cx
dy = inside_y - cy
orig_x = cx + dx * cos_a - dy * sin_a
orig_y = cy + dx * sin_a + dy * cos_a
return list(zip(orig_x.tolist(), orig_y.tolist()))
# ── Step 3: Tile Index, Sampling & Volume ───────────────────────────────────
class TileIndex:
"""Indexes and caches swisstopo GeoTIFF elevation tiles."""
def __init__(self, alti3d_dir: str, surface3d_dir: str) -> None:
self.alti3d_dir = Path(alti3d_dir)
self.surface3d_dir = Path(surface3d_dir)
# OrderedDict gives us real LRU semantics: move_to_end(key) on hit,
# popitem(last=False) to evict the oldest entry on overflow.
self.tile_cache: "OrderedDict[str, rasterio.DatasetReader]" = OrderedDict()
log.info("Indexing available tiles...")
self.alti3d_tiles = self._index_tiles(self.alti3d_dir)
self.surface3d_tiles = self._index_tiles(self.surface3d_dir)
log.info(f" Found {len(self.alti3d_tiles)} swissALTI3D tiles")
log.info(f" Found {len(self.surface3d_tiles)} swissSURFACE3D tiles")
def _index_tiles(self, directory: Path) -> dict[str, Path]:
"""
Scan directory and build tile_id -> filepath mapping.
Expected filenames:
- swissalti3d_YYYY_XXXX-YYYY_0.5_2056_5728.tif
- swisssurface3d-raster_YYYY_XXXX-YYYY_0.5_2056_5728.tif
Tile ID is at index 2 when split by underscore.
Tile IDs encode the 1km LV95 grid position (e.g. 2683-1248).
"""
tile_index = {}
if not directory.exists():
log.warning(f"Directory not found: {directory}")
return tile_index
for filepath in directory.glob("*.tif"):
parts = filepath.stem.split('_')
if len(parts) < 3:
log.debug("Unexpected filename layout: %s", filepath.name)
continue
tile_id = parts[2]
if '-' not in tile_id or len(tile_id.split('-')) != 2:
log.debug("Unexpected tile ID format in %s", filepath.name)
continue
tile_index[tile_id] = filepath
return tile_index
def _open_tile(self, cache_key: str, tile_path: Path) -> None:
"""Open a tile, evicting the least-recently-used entry when full."""
if len(self.tile_cache) >= MAX_CACHED_TILES:
_, evicted = self.tile_cache.popitem(last=False)
evicted.close()
self.tile_cache[cache_key] = rasterio.open(tile_path)
def sample_heights(
self,
points: list[tuple[float, float]],
tiles: list[str],
model_type: str,
) -> np.ndarray:
"""
Sample height values from raster tiles at given points.
Uses a single windowed read per tile instead of point-by-point sampling:
converts all points to pixel row/col indices, reads the minimal bounding
window in one IO call, then indexes into the resulting numpy array.
Args:
points: List of (x, y) tuples in LV95
tiles: List of tile IDs to search
model_type: 'alti3d' or 'surface3d'
Returns:
numpy array of height values (NaN where no data)
"""
heights = np.full(len(points), np.nan)
points_arr = np.array(points)
tile_index = self.alti3d_tiles if model_type == 'alti3d' else self.surface3d_tiles
for tile_id in tiles:
cache_key = f"{model_type}_{tile_id}"
if cache_key not in self.tile_cache:
tile_path = tile_index.get(tile_id)
if tile_path is None:
continue
try:
self._open_tile(cache_key, tile_path)
except (rasterio.errors.RasterioIOError, OSError) as e:
log.debug("Could not open %s: %s", tile_path, e)
continue
else:
# Cache hit — refresh LRU position
self.tile_cache.move_to_end(cache_key)
src = self.tile_cache[cache_key]
try:
# Filter to points within this tile's bounds
bounds = src.bounds
mask = (
(points_arr[:, 0] >= bounds.left) &
(points_arr[:, 0] <= bounds.right) &
(points_arr[:, 1] >= bounds.bottom) &
(points_arr[:, 1] <= bounds.top)
)
if not mask.any():
continue
tile_pts = points_arr[mask]
indices = np.where(mask)[0]
# Convert LV95 coordinates to pixel row/col indices
rows, cols = rasterio.transform.rowcol(
src.transform, tile_pts[:, 0], tile_pts[:, 1]
)
rows = np.clip(np.asarray(rows), 0, src.height - 1)
cols = np.clip(np.asarray(cols), 0, src.width - 1)
# Read the minimal bounding window in a single IO call
row_min, row_max = int(rows.min()), int(rows.max())
col_min, col_max = int(cols.min()), int(cols.max())
window = rasterio.windows.Window(
col_min, row_min,
col_max - col_min + 1,
row_max - row_min + 1,
)
data = src.read(1, window=window).astype(float)
# Index into the window and apply nodata mask
values = data[rows - row_min, cols - col_min]
nodata = src.nodata
valid = ~np.isnan(values)
if nodata is not None:
valid &= values != float(nodata)
heights[indices[valid]] = values[valid]
except (rasterio.errors.RasterioIOError, ValueError, IndexError) as e:
log.debug("Error sampling from %s: %s", tile_id, e)
return heights
def close(self) -> None:
"""Close all cached raster files."""
for src in self.tile_cache.values():
src.close()
self.tile_cache.clear()
def make_empty_volume_result(
av_egid: Optional[int] = None,
fid: Optional[str] = None,
area_footprint_m2: Optional[float] = None,
area_official_m2: Optional[float] = None,
status_step3: Optional[str] = None,
warnings: str = '',
) -> BuildingResult:
"""
Build a result row with no measurements — used both as the base for
successful runs (mutated below) and standalone for skipped buildings
(e.g. no footprint after Step 1). Missing measurements are NaN, never
zero, so downstream aggregations skip them correctly.
The ``warnings`` field is a ``';'``-joined string that accumulates
notes across pipeline steps (multi-polygon EGID, GWR lookup miss, etc.).
Keep this in sync with the keys returned by calculate_building_volume.
"""
result = {
'av_egid': av_egid,
'fid': fid,
'area_footprint_m2': area_footprint_m2,
'area_official_m2': area_official_m2,
'volume_above_ground_m3': np.nan,
'elevation_base_min_m': np.nan,
'elevation_base_mean_m': np.nan,
'elevation_base_max_m': np.nan,
'elevation_roof_min_m': np.nan,
'elevation_roof_mean_m': np.nan,
'elevation_roof_max_m': np.nan,
'height_mean_m': np.nan,
'height_max_m': np.nan,
'height_minimal_m': np.nan,
'grid_points_count': 0,
'warnings': warnings or '',
}
if status_step3 is not None:
result['status_step3'] = status_step3
return result
def append_warning(result: dict, message: Optional[str]) -> None:
"""Append a warning message to a result row's ``warnings`` column."""
if not message:
return
existing = result.get('warnings', '') or ''
result['warnings'] = f'{existing}; {message}' if existing else message
def calculate_building_volume(
polygon: BaseGeometry,
tile_index: "TileIndex",
av_egid: Optional[int] = None,
fid: Optional[str] = None,
area_official_m2: Optional[float] = None,
voxel_size: float = 1.0,
) -> BuildingResult:
"""
Calculate volume and height metrics for a single building.
Uses polygon.area (computed from geometry) as the primary footprint area.
The official area attribute is kept for reference only.
Args:
polygon: Shapely Polygon in LV95 (EPSG:2056)
tile_index: TileIndex instance with loaded elevation tiles
av_egid: Optional EGID from Amtliche Vermessung (GWR_EGID)
fid: Optional FID from cadastral survey
area_official_m2: Optional official area from source data (reference only)
voxel_size: Grid cell size in meters
Returns:
Dict with volume, height metrics, and status
"""
footprint_area = polygon.area
empty_result = make_empty_volume_result(
av_egid=av_egid, fid=fid,
area_footprint_m2=round(footprint_area, 2),
area_official_m2=area_official_m2,
)
try:
# Step 2: Create aligned grid
grid_points = create_aligned_grid_points(polygon, voxel_size)
if len(grid_points) == 0:
return {**empty_result, 'status_step3': STATUS_NO_GRID_POINTS}
# Determine required tiles (sorted for deterministic logging order)
tiles = sorted(tile_ids_from_bounds(polygon.bounds))
# Sample elevations
terrain_heights = tile_index.sample_heights(grid_points, tiles, 'alti3d')
surface_heights = tile_index.sample_heights(grid_points, tiles, 'surface3d')
# Filter to points with both terrain and surface data
valid_mask = ~(np.isnan(terrain_heights) | np.isnan(surface_heights))
valid_terrain = terrain_heights[valid_mask]
valid_surface = surface_heights[valid_mask]
if len(valid_terrain) == 0:
return {**empty_result, 'grid_points_count': len(grid_points),
'status_step3': STATUS_NO_HEIGHT_DATA}
# Terrain (base) elevation statistics
base_min = np.min(valid_terrain)
base_mean = np.mean(valid_terrain)
base_max = np.max(valid_terrain)
# Roof surface elevation statistics
roof_min = np.min(valid_surface)
roof_mean = np.mean(valid_surface)
roof_max = np.max(valid_surface)
# Building heights measured from the lowest terrain point (flat base datum).
# Using base_min ensures volume is referenced to a consistent horizontal
# plane, which is stable on sloped terrain.
building_heights = np.maximum(valid_surface - base_min, 0)
# Volume = sum of heights × cell area
volume = np.sum(building_heights) * (voxel_size ** 2)
# Height metrics
height_mean = np.mean(building_heights)
height_max = np.max(building_heights)
height_minimal = volume / footprint_area if footprint_area > 0 else 0
# Spread empty_result first so any future column added to the
# factory automatically lands in the success branch too. This is
# the drift hazard that previously dropped the `warnings` field.
return {
**empty_result,
'volume_above_ground_m3': round(volume, 2),
'elevation_base_min_m': round(base_min, 2),
'elevation_base_mean_m': round(base_mean, 2),
'elevation_base_max_m': round(base_max, 2),
'elevation_roof_min_m': round(roof_min, 2),
'elevation_roof_mean_m': round(roof_mean, 2),
'elevation_roof_max_m': round(roof_max, 2),
'height_mean_m': round(height_mean, 2),
'height_max_m': round(height_max, 2),
'height_minimal_m': round(height_minimal, 2),
'grid_points_count': len(valid_terrain),
'status_step3': STATUS_SUCCESS,
}
except (rasterio.errors.RasterioIOError, ValueError, IndexError, ArithmeticError) as e:
log.debug(
"Error processing building (av_egid=%s, FID=%s): %s: %s",
av_egid, fid, type(e).__name__, e,
)
return {**empty_result, 'status_step3': f'{STATUS_ERROR_PREFIX}{type(e).__name__}'}