-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
571 lines (482 loc) · 24.6 KB
/
main.py
File metadata and controls
571 lines (482 loc) · 24.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
#!/usr/bin/env python3
"""
Swiss Building Volume & Area Estimator
Unified CLI that runs the full pipeline:
Step 1 — Read building footprints [footprints.py]
Step 2 — Ensure required elevation tiles [tile_fetcher.py]
Step 3 — Aligned grid + volume + heights [volume.py]
Step 4 — GWR enrichment + floor areas [area.py, optional]
Output is always a CSV file.
"""
import argparse
import logging
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Union
import pandas as pd
from footprints import (
load_footprints_from_file,
load_footprints_from_av_with_egids,
load_footprints_from_av_with_coordinates,
load_footprints_from_api_with_egids,
)
from volume import (
STATUS_OK,
STATUS_SKIPPED,
STATUS_SKIPPED_PREFIX,
STATUS_SUCCESS,
TileIndex,
append_warning,
calculate_building_volume,
make_empty_volume_result,
)
from tile_fetcher import ensure_tiles, tile_ids_from_bounds
from area import enrich_with_gwr, estimate_floor_area
# Columns where ``;``-joining sub-row values doesn't carry information —
# either because the value is the group key, the rolled-up status of the
# group, or because aggregation has its own dedicated handling below.
#
# ``geometry`` is in the pass-through set defensively: today the result
# rows that reach aggregation never carry a geometry column (the Step 3
# functions in volume.py drop it), but if that ever changes, the default
# array path would call ``str(polygon)`` and produce a column of joined
# WKT strings — almost certainly not what anyone wants. Keeping it
# pass-through means the first sub-row's geometry is preserved as-is.
_AGGREGATE_PASS_THROUGH_COLS = frozenset({
'input_id', 'input_egid_raw',
'status_step1', 'status_step3', 'status_step4',
'warnings',
'geometry',
})
def aggregate_by_input_id(results_df: pd.DataFrame) -> pd.DataFrame:
"""
Collapse sub-rows that share an ``input_id`` into one output row each.
A sub-row exists when a single CSV input row produced multiple Step 1
matches — either the cell contained several EGIDs (separated by any of
``,`` ``/`` ``;`` or whitespace), or one EGID matched several AV
polygons. The aggregate represents the BBL building **with every
sub-building visible**:
- For each value column where the sub-rows disagree, the output row
contains a ``';'``-joined string of every sub-value in input order.
- Where every sub-row agrees on a value (e.g. all sub-buildings share
the same ``gkat``), the output keeps the scalar.
- ``warnings`` from every sub-row are concatenated and an aggregation
note is appended.
- ``status_step3`` rolls up to ``'success'`` if any sub-row succeeded,
otherwise the first sub-row's status.
The point of the array form is **transparency**: a downstream user
looking at one bbl_id row can see immediately that the EGID has 4 AV
polygons or that the input cell contained 5 EGIDs, and can fix the
source data accordingly. Sums would silently hide the multi-source
nature of the row.
Single-element groups pass through unchanged.
"""
if 'input_id' not in results_df.columns or len(results_df) == 0:
return results_df
aggregated_rows = []
for input_id, group in results_df.groupby('input_id', sort=False, dropna=False):
if len(group) == 1:
aggregated_rows.append(group.iloc[0].to_dict())
continue
aggregated_rows.append(_reduce_group(group))
out = pd.DataFrame(aggregated_rows, columns=results_df.columns)
# When some rows are aggregated (string arrays) and others aren't
# (float scalars), the column becomes object dtype. Pandas then writes
# integer-valued floats as "1234567.0" — ugly. Demote them to int so
# the CSV stays clean. Strings (the array rows) pass through unchanged.
for col in out.columns:
if out[col].dtype == object:
out[col] = out[col].map(_demote_int_float)
return out
def _demote_int_float(v: Any) -> Any:
"""Convert integer-valued floats to ints; pass everything else through."""
if isinstance(v, float) and not pd.isna(v) and v.is_integer():
return int(v)
return v
def _format_sub_value(v: Any) -> str:
"""Stringify one sub-row value for ``;``-joined arrays. Skips NaN."""
if v is None or (isinstance(v, float) and pd.isna(v)):
return ''
if isinstance(v, float) and v.is_integer():
return str(int(v))
return str(v)
def _reduce_group(group: pd.DataFrame) -> dict:
"""Reduce one group of sub-rows (sharing input_id) into a single dict."""
# Start from the first sub-row so the group key + pass-through columns
# land naturally in the output.
row = group.iloc[0].to_dict()
for col in group.columns:
if col in _AGGREGATE_PASS_THROUGH_COLS:
continue
sub_values = list(group[col])
# If every sub-row agrees on the value, keep the scalar form so
# downstream numeric ops still work for the homogeneous columns.
# NaN comparisons need explicit handling.
unique_repr = {_format_sub_value(v) for v in sub_values}
unique_non_empty = {r for r in unique_repr if r != ''}
if len(unique_non_empty) <= 1:
# All sub-rows agree (or all are NaN). Keep the first sub-row's
# value as the scalar representation.
continue
# Sub-rows differ → emit a ;-joined string of every sub-value.
row[col] = '; '.join(_format_sub_value(v) for v in sub_values)
# Status rollup: if any sub-row succeeded, the aggregate is success;
# otherwise the first sub-row's status (for downstream filtering).
if 'status_step3' in group.columns:
successes = group['status_step3'] == STATUS_SUCCESS
if successes.any():
row['status_step3'] = STATUS_SUCCESS
else:
row['status_step3'] = group['status_step3'].iloc[0]
# Concatenate warnings from every sub-row + an aggregation note.
all_warnings = []
for w in group['warnings']:
if w and isinstance(w, str):
for piece in w.split('; '):
if piece and piece not in all_warnings:
all_warnings.append(piece)
n_sub_rows = len(group)
n_unique_egids = group['av_egid'].dropna().nunique() if 'av_egid' in group.columns else 0
if n_unique_egids > 1:
all_warnings.append(
f'aggregated {n_sub_rows} sub-rows from {n_unique_egids} distinct EGIDs '
f'(numeric columns are ;-joined arrays — fix the input CSV to one EGID per row)'
)
elif n_unique_egids == 1:
all_warnings.append(
f'aggregated {n_sub_rows} AV polygons for one EGID '
f'(numeric columns are ;-joined arrays — building is split across cadastral parcels)'
)
else:
# n_unique_egids == 0: every sub-row had av_egid = NaN, i.e. all
# parsed sub-EGIDs failed to find a polygon in AV. The aggregate
# row itself is a no-match summary across the input cell.
all_warnings.append(
f'aggregated {n_sub_rows} sub-rows, none matched a polygon in AV'
)
row['warnings'] = '; '.join(all_warnings)
return row
def setup_logging(output_path: Union[str, Path]) -> Path:
"""Configure file + console logging. The log file lives next to the
output CSV and shares its stem (so ``Gebäude_IN_20260408_1542.csv``
pairs with ``Gebäude_IN_20260408_1542.log``). Idempotent if main()
runs more than once in the same process (e.g. from a notebook or
test runner)."""
output_path = Path(output_path)
log_dir = output_path.parent
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{output_path.stem}.log"
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-8s %(message)s"
))
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter("%(message)s"))
root = logging.getLogger()
root.handlers.clear() # avoid stacking handlers across repeat invocations
root.setLevel(logging.DEBUG)
root.addHandler(file_handler)
root.addHandler(console_handler)
# Suppress noisy third-party loggers
for noisy in ('urllib3', 'requests', 'rasterio', 'fiona', 'pyproj'):
logging.getLogger(noisy).setLevel(logging.WARNING)
return log_file
def main() -> int:
parser = argparse.ArgumentParser(
description='Swiss Building Volume & Area Estimator — '
'estimates building volumes and floor areas from elevation models'
)
# Input: building footprints. Four modes:
# --footprints only → all buildings in the AV file
# --footprints + --csv → EGID match (default for CSV input)
# --footprints + --csv --use-coordinates → spatial join via lon/lat
# --csv --use-api → API cascade (no local AV file needed)
parser.add_argument('--footprints',
help='Geodata file with building footprints '
'(GeoPackage, Shapefile, or GeoJSON from Amtliche Vermessung). '
'Not required when using --use-api.')
parser.add_argument('--csv',
help='CSV input file. By default looks up buildings by `egid` '
'(columns: id, egid). With --use-coordinates, instead does a '
'spatial join via lon/lat (columns: id, lon, lat). '
'Comma- and semicolon-delimited CSVs both work.')
parser.add_argument('--use-coordinates', action='store_true',
help='Match buildings via lon/lat spatial join instead of EGID. '
'Required only for buildings that have no EGID assigned in '
'the cadastral data; EGID match is faster and unambiguous.')
parser.add_argument('--use-api', action='store_true',
help='Fetch building footprints via API instead of a local AV '
'file. Uses GWR → geodienste.ch WFS → swisstopo vec25 '
'cascade. Requires --csv with id/egid columns. '
'No --footprints needed.')
# Input: elevation tiles
parser.add_argument('--alti3d', required=True,
help='Directory containing swissALTI3D GeoTIFF tiles')
parser.add_argument('--surface3d', required=True,
help='Directory containing swissSURFACE3D GeoTIFF tiles')
# Auto-fetch missing tiles
parser.add_argument('--auto-fetch', action='store_true',
help='Automatically download missing elevation tiles from swisstopo')
# Output
parser.add_argument('-o', '--output',
help='Output CSV file path. A YYYYMMDD_HHMM timestamp '
'is appended to the stem. If omitted, the output '
'is dropped next to --csv (named after its stem) '
'when --csv is given, otherwise into '
'data/output/result_<timestamp>.csv. The log '
'file is written next to the CSV with a matching '
'name (same stem, .log extension).')
# Filters
parser.add_argument('-l', '--limit', type=int,
help='Limit number of buildings to process')
parser.add_argument('-b', '--bbox', nargs=4, type=float,
metavar=('MINLON', 'MINLAT', 'MAXLON', 'MAXLAT'),
help='Bounding box filter in WGS84 (only for --footprints)')
# Step 4: Area estimation (off by default)
parser.add_argument('--estimate-area', action='store_true',
help='Enable Step 4: estimate floor areas using GWR classification '
'(requires EGID in input data)')
parser.add_argument('--gwr-csv',
help='Path to GWR CSV bulk download (from housing-stat.ch). '
'If omitted with --estimate-area, uses swisstopo API per building.')
args = parser.parse_args()
# ── Output path & timestamp ───────────────────────────────────────────
# YYYYMMDD_HHMM matches the web app's export naming. The same timestamp
# is reused below for the log file (via setup_logging) so the .csv and
# .log always agree on the suffix.
#
# Resolution order:
# 1. --output explicitly set: append timestamp to user-provided path
# 2. --csv given: drop output next to the CSV, named after its stem
# 3. neither: fall back to data/output/result_<timestamp>.csv
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
if args.output:
p = Path(args.output)
args.output = str(p.parent / f"{p.stem}_{timestamp}{p.suffix}")
elif args.csv:
csv_path = Path(args.csv)
args.output = str(csv_path.parent / f"{csv_path.stem}_{timestamp}.csv")
else:
args.output = str(Path("data/output") / f"result_{timestamp}.csv")
# ── Logging ────────────────────────────────────────────────────────────
log_file = setup_logging(args.output)
log = logging.getLogger("main")
log.info(f"Log file: {log_file}")
# ── Validate inputs ────────────────────────────────────────────────────
if args.use_api and not args.csv:
log.error("--use-api requires --csv (with id/egid columns)")
return 1
if args.use_api and args.use_coordinates:
log.error("--use-api and --use-coordinates are mutually exclusive")
return 1
if not args.use_api and not args.footprints:
log.error("--footprints is required (unless using --use-api)")
return 1
alti3d_dir = Path(args.alti3d)
surface3d_dir = Path(args.surface3d)
if not args.auto_fetch:
if not alti3d_dir.is_dir():
log.error(f"ALTI3D directory not found: {args.alti3d}")
return 1
if not surface3d_dir.is_dir():
log.error(f"SURFACE3D directory not found: {args.surface3d}")
return 1
else:
alti3d_dir.mkdir(parents=True, exist_ok=True)
surface3d_dir.mkdir(parents=True, exist_ok=True)
# ── Step 1: Load building footprints ──────────────────────────────────
log.info("=" * 50)
log.info("STEP 1: Loading building footprints")
log.info("=" * 50)
if args.use_coordinates and not args.csv:
log.error("--use-coordinates requires --csv")
return 1
try:
if args.use_api:
log.info("Mode: API cascade (GWR → WFS → vec25)")
buildings = load_footprints_from_api_with_egids(
args.csv, limit=args.limit,
)
elif args.csv and args.use_coordinates:
log.info("Mode: AV + CSV (lon/lat spatial join)")
buildings = load_footprints_from_av_with_coordinates(
args.footprints, args.csv, limit=args.limit,
)
elif args.csv:
log.info("Mode: AV + CSV (EGID match)")
buildings = load_footprints_from_av_with_egids(
args.footprints, args.csv, limit=args.limit,
)
else:
log.info("Mode: all buildings from AV file")
buildings = load_footprints_from_file(
args.footprints, bbox=args.bbox, limit=args.limit,
)
except (FileNotFoundError, ValueError) as e:
log.error(f"Error loading input: {e}")
return 1
total = len(buildings)
with_geometry = buildings[buildings['status_step1'] == STATUS_OK]
log.info(f" Total features: {total}")
log.info(f" With footprint: {len(with_geometry)}")
log.info(f" Without: {total - len(with_geometry)}")
if len(with_geometry) == 0:
log.info("No buildings with footprints to process")
return 0
# ── Step 2: Ensure elevation tiles ────────────────────────────────────
log.info("")
log.info("=" * 50)
log.info("STEP 2: Checking elevation tiles")
log.info("=" * 50)
# Collect all required tile IDs from all footprints
all_tile_ids = set()
for _, row in with_geometry.iterrows():
all_tile_ids |= tile_ids_from_bounds(row.geometry.bounds)
log.info(f" Required tiles: {len(all_tile_ids)}")
if args.auto_fetch:
log.info(f" Auto-fetching missing tiles from swisstopo...")
t_fetch = time.monotonic()
stats = ensure_tiles(all_tile_ids, alti3d_dir, surface3d_dir)
fetch_elapsed = time.monotonic() - t_fetch
log.info(f" ALTI3D: {stats['alti3d_ok']} ok, {stats['alti3d_missing']} missing")
log.info(f" SURFACE3D: {stats['surface3d_ok']} ok, {stats['surface3d_missing']} missing")
log.info(f" Fetch time: {fetch_elapsed:.0f}s")
else:
log.info(f" (auto-fetch disabled — using local tiles only)")
# ── Step 3: Grid + Volume ─────────────────────────────────────────────
log.info("")
log.info("=" * 50)
log.info("STEP 3: Calculating building volumes")
log.info("=" * 50)
tile_index = TileIndex(str(alti3d_dir), str(surface3d_dir))
try:
results = []
t_start = time.monotonic()
# Columns the result dict already populates — never let an input
# column with the same name overwrite a computed value. `warnings`
# is owned by the result but is *appended to*, not overwritten,
# so we handle it explicitly below. `status_step1` is allowed
# through (the input column carries the Step 1 outcome verbatim).
reserved_input_cols = {'id', 'geometry', 'warnings'}
for i, (_, row) in enumerate(buildings.iterrows()):
step1_warnings = row.get('warnings') or ''
if row['status_step1'] != STATUS_OK:
result = make_empty_volume_result(
av_egid=row.get('av_egid'),
fid=row.get('fid'),
area_official_m2=row.get('area_official_m2'),
status_step3=f"{STATUS_SKIPPED_PREFIX}{row['status_step1']}",
warnings=step1_warnings,
)
else:
result = calculate_building_volume(
polygon=row.geometry,
tile_index=tile_index,
av_egid=row.get('av_egid'),
fid=row.get('fid'),
area_official_m2=row.get('area_official_m2'),
)
# Carry Step 1 warnings into the success path too.
if step1_warnings:
append_warning(result, step1_warnings)
# Preserve extra input columns (e.g. input_id) — but never
# overwrite a key the result dict already owns.
for col in buildings.columns:
if col in reserved_input_cols or col in result:
continue
result[col] = row[col]
results.append(result)
# Progress
if (i + 1) % 100 == 0 or (i + 1) == total:
elapsed = time.monotonic() - t_start
rate = (i + 1) / elapsed if elapsed > 0 else 0
eta = (total - i - 1) / rate if rate > 0 else 0
eta_m, eta_s = divmod(int(eta), 60)
log.info(f" [{i+1}/{total}] {(i+1)/total*100:.0f}% "
f"{rate:.1f} bldg/s ETA: {eta_m}m {eta_s:02d}s")
finally:
tile_index.close()
elapsed = time.monotonic() - t_start
log.info(f"\n Processed {len(results)} buildings in {elapsed:.0f}s")
results_df = pd.DataFrame(results)
# ── Step 4 (optional): GWR enrichment + Area estimation ──────────────
if args.estimate_area:
log.info("")
log.info("=" * 50)
log.info("STEP 4: Estimating floor areas")
log.info("=" * 50)
results_df = enrich_with_gwr(results_df, gwr_csv_path=args.gwr_csv)
area_results = []
for _, row in results_df.iterrows():
if row['status_step3'] == STATUS_SUCCESS:
area_results.append(estimate_floor_area(row.to_dict()))
else:
result = row.to_dict()
result['status_step4'] = STATUS_SKIPPED
area_results.append(result)
results_df = pd.DataFrame(area_results)
# ── Aggregate sub-rows by input_id ────────────────────────────────────
# Multi-EGID inputs and multi-polygon AV matches both produce more
# than one intermediate row sharing the same input_id. Collapse them
# to one output row per input_id; multi-source columns become
# ;-joined arrays so the user can see every sub-building's value
# and improve the input data quality at the source.
n_before = len(results_df)
results_df = aggregate_by_input_id(results_df)
n_after = len(results_df)
if n_before != n_after:
log.info(f"\nAggregated {n_before} sub-rows -> {n_after} output rows by input_id")
# ── Output CSV ────────────────────────────────────────────────────────
# Reorder columns: identifiers first
id_cols = [
"input_id", "input_egid", "input_lon", "input_lat",
"av_egid", "egid", "fid",
]
existing_id_cols = [c for c in id_cols if c in results_df.columns]
other_cols = [c for c in results_df.columns if c not in id_cols]
results_df = results_df[existing_id_cols + other_cols]
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
results_df.to_csv(output_path, index=False)
log.info(f"\nResults saved to: {output_path.resolve()}")
# ── Summary ───────────────────────────────────────────────────────────
log.info("")
log.info("=" * 50)
log.info("SUMMARY")
log.info("=" * 50)
successful = results_df[results_df['status_step3'] == STATUS_SUCCESS]
log.info(f"Successful: {len(successful)}/{len(results_df)}")
if len(successful) > 0:
# After aggregate_by_input_id, multi-source rows have ;-joined
# string values in the numeric columns. Coerce to numeric so the
# array rows become NaN and don't crash .sum() — then report the
# number we excluded separately so the user knows the total is
# only over the single-source rows.
vol_numeric = pd.to_numeric(successful['volume_above_ground_m3'], errors='coerce')
n_array_rows = int(vol_numeric.isna().sum() - successful['volume_above_ground_m3'].isna().sum())
n_summable = int(vol_numeric.notna().sum())
log.info(f"\nVolume:")
log.info(f" Total: {vol_numeric.sum():,.0f} m³ (across {n_summable} single-source rows)")
log.info(f" Average: {vol_numeric.mean():,.0f} m³")
if n_array_rows > 0:
log.info(f" Note: {n_array_rows} multi-source rows excluded from sum "
f"(see ;-joined values in CSV)")
if args.estimate_area and 'area_floor_total_m2' in successful.columns:
area_numeric = pd.to_numeric(successful['area_floor_total_m2'], errors='coerce')
floors_numeric = pd.to_numeric(successful['floors_estimated'], errors='coerce')
has_area = area_numeric.notna()
if has_area.any():
log.info(f"\nFloor Area:")
log.info(f" Total: {area_numeric.sum():,.0f} m² (across {int(has_area.sum())} single-source rows)")
log.info(f" Average: {area_numeric[has_area].mean():,.0f} m²")
log.info(f" Avg floors: {floors_numeric[has_area].mean():.1f}")
log.info("\nStatus (Step 3):")
for status, count in results_df['status_step3'].value_counts().items():
log.info(f" {status}: {count}")
return 0
if __name__ == "__main__":
sys.exit(main())