|
6 | 6 | from typing import Any |
7 | 7 |
|
8 | 8 | import duckdb |
| 9 | +import pandas as pd |
9 | 10 | import yaml |
10 | 11 | from toolkit.core.csv_read import ( |
11 | 12 | READ_SELECTION_KEYS, |
|
19 | 20 | ) |
20 | 21 |
|
21 | 22 |
|
22 | | -SUPPORTED_INPUT_EXTS = {".csv", ".tsv", ".txt", ".parquet", ".csv.gz", ".tsv.gz", ".txt.gz"} |
| 23 | +SUPPORTED_INPUT_EXTS = { |
| 24 | + ".csv", |
| 25 | + ".tsv", |
| 26 | + ".txt", |
| 27 | + ".parquet", |
| 28 | + ".csv.gz", |
| 29 | + ".tsv.gz", |
| 30 | + ".txt.gz", |
| 31 | + ".xlsx", |
| 32 | +} |
23 | 33 |
|
24 | 34 |
|
25 | 35 | @dataclass(frozen=True) |
@@ -277,6 +287,97 @@ def _execute_parquet_read( |
277 | 287 | return ReadInfo(source="parquet", params_used={}) |
278 | 288 |
|
279 | 289 |
|
| 290 | +def _normalize_excel_sheet_name(value: Any) -> str | int: |
| 291 | + if value is None: |
| 292 | + return 0 |
| 293 | + if isinstance(value, bool): |
| 294 | + raise ValueError("clean.read.sheet_name must be a string, integer, or null") |
| 295 | + if isinstance(value, int): |
| 296 | + return value |
| 297 | + if isinstance(value, str): |
| 298 | + text = value.strip() |
| 299 | + if not text: |
| 300 | + return 0 |
| 301 | + return text |
| 302 | + raise ValueError("clean.read.sheet_name must be a string, integer, or null") |
| 303 | + |
| 304 | + |
| 305 | +def _trim_excel_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| 306 | + return df.apply(lambda column: column.map(lambda value: value.strip() if isinstance(value, str) else value)) |
| 307 | + |
| 308 | + |
| 309 | +def _load_excel_frame( |
| 310 | + input_file: Path, |
| 311 | + read_cfg: dict[str, Any], |
| 312 | +) -> tuple[pd.DataFrame, dict[str, Any]]: |
| 313 | + header = bool(read_cfg.get("header", True)) |
| 314 | + skip = int(read_cfg["skip"]) if read_cfg.get("skip") is not None else 0 |
| 315 | + trim_whitespace = read_cfg.get("trim_whitespace", True) |
| 316 | + columns = read_cfg.get("columns") |
| 317 | + sheet_name = _normalize_excel_sheet_name(read_cfg.get("sheet_name")) |
| 318 | + |
| 319 | + df = pd.read_excel( |
| 320 | + input_file, |
| 321 | + sheet_name=sheet_name, |
| 322 | + header=0 if header else None, |
| 323 | + skiprows=skip, |
| 324 | + dtype=object, |
| 325 | + engine="openpyxl", |
| 326 | + ) |
| 327 | + |
| 328 | + if columns: |
| 329 | + expected_columns = list(columns.keys()) |
| 330 | + if len(expected_columns) != len(df.columns): |
| 331 | + raise ValueError( |
| 332 | + "Excel input columns mismatch. " |
| 333 | + f"Configured={len(expected_columns)} detected={len(df.columns)} file={input_file}" |
| 334 | + ) |
| 335 | + df.columns = expected_columns |
| 336 | + elif not header: |
| 337 | + df.columns = [f"col{i}" for i in range(len(df.columns))] |
| 338 | + |
| 339 | + if trim_whitespace: |
| 340 | + df = _trim_excel_dataframe(df) |
| 341 | + |
| 342 | + return df, { |
| 343 | + "sheet_name": sheet_name, |
| 344 | + "header": header, |
| 345 | + "skip": skip, |
| 346 | + "trim_whitespace": bool(trim_whitespace), |
| 347 | + "columns": dict(columns) if columns else None, |
| 348 | + } |
| 349 | + |
| 350 | + |
| 351 | +def _execute_excel_read( |
| 352 | + con: duckdb.DuckDBPyConnection, |
| 353 | + input_files: list[Path], |
| 354 | + read_cfg: dict[str, Any], |
| 355 | + *, |
| 356 | + logger, |
| 357 | +) -> ReadInfo: |
| 358 | + frames: list[pd.DataFrame] = [] |
| 359 | + params_used: dict[str, Any] | None = None |
| 360 | + |
| 361 | + for input_file in input_files: |
| 362 | + frame, frame_params = _load_excel_frame(input_file, read_cfg) |
| 363 | + frames.append(frame) |
| 364 | + if params_used is None: |
| 365 | + params_used = frame_params |
| 366 | + |
| 367 | + combined = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0] |
| 368 | + con.register("raw_input_df", combined) |
| 369 | + con.execute("CREATE OR REPLACE VIEW raw_input AS SELECT * FROM raw_input_df;") |
| 370 | + |
| 371 | + used = dict(params_used or {}) |
| 372 | + if used.get("columns") is None: |
| 373 | + used.pop("columns", None) |
| 374 | + logger.info( |
| 375 | + "read_excel params used: source=excel params=%s", |
| 376 | + json.dumps(used, ensure_ascii=False, sort_keys=True), |
| 377 | + ) |
| 378 | + return ReadInfo(source="excel", params_used=used) |
| 379 | + |
| 380 | + |
280 | 381 | def _validate_read_mode(mode: str) -> str: |
281 | 382 | normalized_mode = str(mode or "fallback") |
282 | 383 | if normalized_mode not in {"strict", "fallback", "robust"}: |
@@ -386,6 +487,8 @@ def read_raw_to_relation( |
386 | 487 | info = _execute_parquet_read(con, input_files) |
387 | 488 | logger.info("read_csv params used: source=parquet params={}") |
388 | 489 | return info |
| 490 | + if exts <= {".xlsx"}: |
| 491 | + return _execute_excel_read(con, input_files, read_cfg, logger=logger) |
389 | 492 |
|
390 | 493 | normalized_mode = _validate_read_mode(mode) |
391 | 494 | return _read_csv_relation( |
|
0 commit comments