From dfe5bc62affafc0c9edbf9f416a9cfdbbcc6abb4 Mon Sep 17 00:00:00 2001 From: Indrayudd Roy Chowdhury Date: Tue, 27 May 2025 13:35:10 -0400 Subject: [PATCH 1/5] TutorTask541: Implementation and Saving of a different View for the Gridstatus Metadata in S3 --- .../postprocess_gridstatus_metadata.py | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 causal_automl/postprocess_gridstatus_metadata.py diff --git a/causal_automl/postprocess_gridstatus_metadata.py b/causal_automl/postprocess_gridstatus_metadata.py new file mode 100644 index 0000000000..c5c35c5820 --- /dev/null +++ b/causal_automl/postprocess_gridstatus_metadata.py @@ -0,0 +1,172 @@ +""" +Import as: + +import causal_automl.postprocess_gridstatus_metadata as capogrme +""" + +import ast +import io +import logging +import os +import re +from typing import Dict, Iterable, List + +import helpers.hdbg as hdbg +import helpers.henv as henv +import helpers.hio as hio +import helpers.hpandas as hpandas +import helpers.hs3 as hs3 +import pandas as pd + +# Configure logger. +hdbg.init_logger(verbosity=logging.INFO) +_LOG = logging.getLogger(__name__) + +# Print system signature. +_LOG.info("%s", henv.get_system_signature()[0]) + + +# ############################################################################# +# _GridstatusMetadataWriter +# ############################################################################# + + +class _GridstatusMetadataWriter: + """ + Save Gridstatus metadata and upload to S3. + """ + + def __init__(self, bucket_path: str, aws_profile: str) -> None: + """ + Initialize the writer for saving metadata and facet values to S3. + + :param bucket_path: base S3 path where files will be uploaded + (e.g., "s3://bucket/dir/") + :param aws_profile: AWS CLI profile name used for authentication + """ + self._bucket_path = bucket_path + self._aws_profile = aws_profile + + def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None: + """ + Save the data as a local CSV file and upload it to S3. + + :param df: data to be saved to S3 + :param file_name: local file name for saving + """ + cache_dir = "tmp.download_metadata_cache/" + local_file_path = os.path.join(cache_dir, file_name) + hio.create_dir(os.path.dirname(local_file_path), incremental=True) + # Save CSV locally. + df.to_csv(local_file_path, index=False) + _LOG.debug("Saved CSV locally to: %s", local_file_path) + # Upload CSV to the specified S3 bucket. + bucket_file_path = self._bucket_path + file_name + hs3.copy_file_to_s3(local_file_path, bucket_file_path, self._aws_profile) + _LOG.debug("Uploaded to S3: %s", bucket_file_path) + + +def _load_data(file_path: str) -> pd.DataFrame: + """ + Load data from file path to a dataframe. + + :param file_path: path of the data to load from + :return: dataframe of the loaded data + """ + file = hs3.from_file(file_path, aws_profile="ck") + df = pd.read_csv(io.StringIO(file)) + _LOG.info("shape: %s", df.shape) + _LOG.info("columns: %s", df.columns) + _LOG.info("df: \n %s", hpandas.df_to_str(df, log_level=logging.INFO)) + return df + + +def _prettify(col: str) -> str: + """ + Convert snake_case to Title Case (“spinning_reserves” ⇒ “Spinning + Reserves”). + + :param col: column name to prettify + :return: prettified column name + """ + tokens = re.sub(r"[_\s]+", " ", col).strip().split() + return " ".join(t.capitalize() for t in tokens) + + +def _build_series_row( + base_row: pd.Series, + col_name: str, + dataset_id: str, + dataset_name: str, +) -> Dict[str, object]: + """ + Build new rows with the `id_series` and `num_series` columns. + + :param base_row: original row + :param col_name: column name to prettify + """ + nice_col_name = _prettify(col_name) + # Start with the original row. + new_row: Dict[str, object] = base_row.to_dict() + # Add the two series identifiers. + new_row["id_series"] = f"{dataset_id}.{col_name}" + new_row["name_series"] = f"{dataset_name} / {nice_col_name}" + return new_row + + +def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]: + """ + Transform a single row into the row-per-series view. + + :param row: row to transform + :return: the exploded row + """ + dataset_id: str = row["id"] + dataset_name: str = row["name"] + # Ignore primary key columns. + ignore_cols = set(ast.literal_eval(row["primary_key_columns"])) + # Iterate through all columns and generate the row-per-series view. + for col_meta in ast.literal_eval(row["all_columns"]): + col_name: str = col_meta["name"] + if col_meta.get("is_datetime") or col_name in ignore_cols: + continue + yield _build_series_row(row, col_name, dataset_id, dataset_name) + + +def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: + """ + Transform the whole dataset into the row-per-series view. + + :param df: data to transform + :return: transformed data + """ + exploded_rows: List[Dict[str, object]] = [ + row + for _, dataset_row in df.iterrows() + for row in _explode_dataset_row(dataset_row) + ] + result = pd.DataFrame(exploded_rows) + # Arrange according to desired ordering. + leading = ["id_series", "name_series"] + remaining = [c for c in result.columns if c not in leading] + return result[leading + remaining] + + +# Main flow. +if __name__ == "__main__": + # Configure S3. + aws_profile = "ck" + bucket_root = hs3.get_s3_bucket_path(aws_profile) + bucket_path = "s3://causify-data-collaborators/causal_automl/metadata/" + file_name = "gridstatus_metadata_original_v2.0.csv" + writer = _GridstatusMetadataWriter(bucket_path, aws_profile) + # Load data. + v1_path = ( + "s3://causify-data-collaborators/causal_automl/metadata/" + "gridstatus_metadata_original_v1.0.csv" + ) + gs_meta = _load_data(v1_path) + # Transform data to a row-per-series view. + gs_meta_rps = create_series_metadata(gs_meta) + # Save transformed dataset to S3. + writer.write_df_to_s3(gs_meta_rps, file_name) From f7a20babfc7d326c345a152b5cbc7191165b58b3 Mon Sep 17 00:00:00 2001 From: Indrayudd Roy Chowdhury Date: Tue, 27 May 2025 13:39:08 -0400 Subject: [PATCH 2/5] TutorTask541: Docstring Improvements --- causal_automl/postprocess_gridstatus_metadata.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/causal_automl/postprocess_gridstatus_metadata.py b/causal_automl/postprocess_gridstatus_metadata.py index c5c35c5820..d5ed62f799 100644 --- a/causal_automl/postprocess_gridstatus_metadata.py +++ b/causal_automl/postprocess_gridstatus_metadata.py @@ -83,8 +83,9 @@ def _load_data(file_path: str) -> pd.DataFrame: def _prettify(col: str) -> str: """ - Convert snake_case to Title Case (“spinning_reserves” ⇒ “Spinning - Reserves”). + Convert snake_case to Title Case. + + E.g., “spinning_reserves” to “Spinning Reserves” :param col: column name to prettify :return: prettified column name From 52a73f6febd075320dd0c9bc6196604539f4936f Mon Sep 17 00:00:00 2001 From: Indrayudd Roy Chowdhury Date: Sun, 1 Jun 2025 06:14:50 -0400 Subject: [PATCH 3/5] TutorTask541: Reviewer Changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- .../postprocess_gridstatus_metadata.py | 176 +++++++++++++----- 1 file changed, 132 insertions(+), 44 deletions(-) diff --git a/causal_automl/postprocess_gridstatus_metadata.py b/causal_automl/postprocess_gridstatus_metadata.py index d5ed62f799..a30f8b6012 100644 --- a/causal_automl/postprocess_gridstatus_metadata.py +++ b/causal_automl/postprocess_gridstatus_metadata.py @@ -1,20 +1,29 @@ +#!/usr/bin/env python """ +Convert the dataset-per-row metadata of the Gridstatus metadata into a series- +per-row schema and upload the result back into the same S3 bucket. + +> python causal_automl/postprocess_gridstatus_metadata.py \ + --aws_profile ck \ + --bucket_path s3://causify-data-collaborators/causal_automl/metadata/ \ + --input_version v1.0 \ + --output_version v2.0 + Import as: import causal_automl.postprocess_gridstatus_metadata as capogrme """ +import argparse import ast import io import logging import os import re -from typing import Dict, Iterable, List +from typing import Any, Dict, List import helpers.hdbg as hdbg -import helpers.henv as henv import helpers.hio as hio -import helpers.hpandas as hpandas import helpers.hs3 as hs3 import pandas as pd @@ -22,9 +31,6 @@ hdbg.init_logger(verbosity=logging.INFO) _LOG = logging.getLogger(__name__) -# Print system signature. -_LOG.info("%s", henv.get_system_signature()[0]) - # ############################################################################# # _GridstatusMetadataWriter @@ -36,16 +42,23 @@ class _GridstatusMetadataWriter: Save Gridstatus metadata and upload to S3. """ - def __init__(self, bucket_path: str, aws_profile: str) -> None: + def __init__( + self, + bucket_path: str, + aws_profile: str, + cache_dir: str = "tmp.download_metadata_cache/", + ) -> None: """ - Initialize the writer for saving metadata and facet values to S3. + Initialize the writer for saving postprocessed metadata to S3. :param bucket_path: base S3 path where files will be uploaded (e.g., "s3://bucket/dir/") :param aws_profile: AWS CLI profile name used for authentication + :param cache_dir: cache directory path """ self._bucket_path = bucket_path self._aws_profile = aws_profile + self.cache_dir = cache_dir def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None: """ @@ -54,8 +67,7 @@ def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None: :param df: data to be saved to S3 :param file_name: local file name for saving """ - cache_dir = "tmp.download_metadata_cache/" - local_file_path = os.path.join(cache_dir, file_name) + local_file_path = os.path.join(self.cache_dir, file_name) hio.create_dir(os.path.dirname(local_file_path), incremental=True) # Save CSV locally. df.to_csv(local_file_path, index=False) @@ -66,18 +78,17 @@ def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None: _LOG.debug("Uploaded to S3: %s", bucket_file_path) -def _load_data(file_path: str) -> pd.DataFrame: +def _load_data(file_path: str, aws_profile: str) -> pd.DataFrame: """ - Load data from file path to a dataframe. + Load data from S3 path to a dataframe. - :param file_path: path of the data to load from - :return: dataframe of the loaded data + :param file_path: S3 path of the data to load from + :param aws_profile: aws profile that accesses S3 bucket + :return: the queried metadata """ - file = hs3.from_file(file_path, aws_profile="ck") + file = hs3.from_file(file_path, aws_profile=aws_profile) df = pd.read_csv(io.StringIO(file)) - _LOG.info("shape: %s", df.shape) - _LOG.info("columns: %s", df.columns) - _LOG.info("df: \n %s", hpandas.df_to_str(df, log_level=logging.INFO)) + _LOG.info("Data Successfully Downloaded.") return df @@ -91,7 +102,8 @@ def _prettify(col: str) -> str: :return: prettified column name """ tokens = re.sub(r"[_\s]+", " ", col).strip().split() - return " ".join(t.capitalize() for t in tokens) + prettified = " ".join(t.capitalize() for t in tokens) + return prettified def _build_series_row( @@ -99,26 +111,51 @@ def _build_series_row( col_name: str, dataset_id: str, dataset_name: str, -) -> Dict[str, object]: +) -> Dict[str, Any]: """ - Build new rows with the `id_series` and `num_series` columns. + Build new rows with the `id_series` and `name_series` columns. :param base_row: original row :param col_name: column name to prettify + :param dataset_id: id of the data series + :param dataset_name: name of the collection of series + :return: modified row """ - nice_col_name = _prettify(col_name) # Start with the original row. new_row: Dict[str, object] = base_row.to_dict() # Add the two series identifiers. new_row["id_series"] = f"{dataset_id}.{col_name}" - new_row["name_series"] = f"{dataset_name} / {nice_col_name}" + new_row["name_series"] = f"{dataset_name} / {_prettify(col_name)}" return new_row -def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]: +def _explode_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: """ Transform a single row into the row-per-series view. + E.g., + Input row: + id name .... + caiso_as_prices CAISO AS Prices .... + + Output row: + id name .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + / + id_series name_series + caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves + caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down + caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down + caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up + caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up + caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves + + :param row: row to transform :return: the exploded row """ @@ -127,47 +164,98 @@ def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]: # Ignore primary key columns. ignore_cols = set(ast.literal_eval(row["primary_key_columns"])) # Iterate through all columns and generate the row-per-series view. + exploded: List[Dict[str, Any]] = [] for col_meta in ast.literal_eval(row["all_columns"]): col_name: str = col_meta["name"] if col_meta.get("is_datetime") or col_name in ignore_cols: continue - yield _build_series_row(row, col_name, dataset_id, dataset_name) + exploded.append( + _build_series_row(row, col_name, dataset_id, dataset_name) + ) + return exploded def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: """ Transform the whole dataset into the row-per-series view. + E.g., + Input dataset: + id name .... + caiso_as_prices CAISO AS Prices .... + ... + + Output dataset: + id_series name_series + caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves + caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down + caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down + caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up + caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up + caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves + ... + / + id name .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + ... + :param df: data to transform :return: transformed data """ - exploded_rows: List[Dict[str, object]] = [ - row - for _, dataset_row in df.iterrows() - for row in _explode_dataset_row(dataset_row) - ] + exploded_rows: List[Dict[str, Any]] = [] + for _, dataset_row in df.iterrows(): + exploded_rows.extend(_explode_dataset_row(dataset_row)) result = pd.DataFrame(exploded_rows) # Arrange according to desired ordering. leading = ["id_series", "name_series"] remaining = [c for c in result.columns if c not in leading] - return result[leading + remaining] + transformed_df = result[leading + remaining] + return transformed_df -# Main flow. -if __name__ == "__main__": - # Configure S3. - aws_profile = "ck" - bucket_root = hs3.get_s3_bucket_path(aws_profile) - bucket_path = "s3://causify-data-collaborators/causal_automl/metadata/" - file_name = "gridstatus_metadata_original_v2.0.csv" - writer = _GridstatusMetadataWriter(bucket_path, aws_profile) +def _parse() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--aws_profile", default="ck", help="AWS CLI profile for authentication" + ) + parser.add_argument( + "--bucket_path", + default="s3://causify-data-collaborators/causal_automl/metadata/", + help="Destination S3 directory (trailing slash optional)", + ) + parser.add_argument( + "--input_version", + default="v1.0", + help="Version of the source metadata file", + ) + parser.add_argument( + "--output_version", default="v2.0", help="Version tag for the result file" + ) + return parser.parse_args() + + +def _main(args: argparse.Namespace) -> None: # Load data. - v1_path = ( - "s3://causify-data-collaborators/causal_automl/metadata/" - "gridstatus_metadata_original_v1.0.csv" + src_file = ( + f"{args.bucket_path.rstrip('/')}/gridstatus_metadata_original_" + f"{args.input_version}.csv" ) - gs_meta = _load_data(v1_path) + gs_meta = _load_data(src_file, args.aws_profile) # Transform data to a row-per-series view. gs_meta_rps = create_series_metadata(gs_meta) # Save transformed dataset to S3. - writer.write_df_to_s3(gs_meta_rps, file_name) + writer = _GridstatusMetadataWriter(args.bucket_path, args.aws_profile) + dst_file = f"gridstatus_metadata_original_{args.output_version}.csv" + writer.write_df_to_s3(gs_meta_rps, dst_file) + + +if __name__ == "__main__": + _main(_parse()) From c9e21ec02ef86e1c4edafc6f6008250c0d352586 Mon Sep 17 00:00:00 2001 From: Indrayudd Roy Chowdhury Date: Mon, 2 Jun 2025 12:54:39 -0400 Subject: [PATCH 4/5] TutorTask541: Reviewer Changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- .../postprocess_gridstatus_metadata.py | 115 +++++++++++++----- 1 file changed, 87 insertions(+), 28 deletions(-) mode change 100644 => 100755 causal_automl/postprocess_gridstatus_metadata.py diff --git a/causal_automl/postprocess_gridstatus_metadata.py b/causal_automl/postprocess_gridstatus_metadata.py old mode 100644 new mode 100755 index a30f8b6012..1a58522e3b --- a/causal_automl/postprocess_gridstatus_metadata.py +++ b/causal_automl/postprocess_gridstatus_metadata.py @@ -1,9 +1,9 @@ #!/usr/bin/env python """ -Convert the dataset-per-row metadata of the Gridstatus metadata into a series- +Convert the dataset-per-row schema of the Gridstatus metadata into a series- per-row schema and upload the result back into the same S3 bucket. -> python causal_automl/postprocess_gridstatus_metadata.py \ +> causal_automl/postprocess_gridstatus_metadata.py \ --aws_profile ck \ --bucket_path s3://causify-data-collaborators/causal_automl/metadata/ \ --input_version v1.0 \ @@ -28,7 +28,6 @@ import pandas as pd # Configure logger. -hdbg.init_logger(verbosity=logging.INFO) _LOG = logging.getLogger(__name__) @@ -46,6 +45,7 @@ def __init__( self, bucket_path: str, aws_profile: str, + *, cache_dir: str = "tmp.download_metadata_cache/", ) -> None: """ @@ -84,11 +84,11 @@ def _load_data(file_path: str, aws_profile: str) -> pd.DataFrame: :param file_path: S3 path of the data to load from :param aws_profile: aws profile that accesses S3 bucket - :return: the queried metadata + :return: the loaded data """ file = hs3.from_file(file_path, aws_profile=aws_profile) df = pd.read_csv(io.StringIO(file)) - _LOG.info("Data Successfully Downloaded.") + _LOG.info("Data Successfully Downloaded from %s.", file_path) return df @@ -116,29 +116,50 @@ def _build_series_row( Build new rows with the `id_series` and `name_series` columns. :param base_row: original row - :param col_name: column name to prettify - :param dataset_id: id of the data series + :param col_name: name of the column representing the series + :param dataset_id: id of the collection of series :param dataset_name: name of the collection of series - :return: modified row + :return: modified row with the new columns added """ # Start with the original row. - new_row: Dict[str, object] = base_row.to_dict() + new_row: Dict[str, Any] = base_row.to_dict() # Add the two series identifiers. new_row["id_series"] = f"{dataset_id}.{col_name}" new_row["name_series"] = f"{dataset_name} / {_prettify(col_name)}" return new_row -def _explode_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: +def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: """ - Transform a single row into the row-per-series view. + Expand a row representing a collection into multiple representing each + series. E.g., Input row: + ``` id name .... caiso_as_prices CAISO AS Prices .... - - Output row: + / + all_columns + [{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\ + {'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \ + {'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \ + {'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \ + {'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}] + ``` + Output rows: + ``` id name .... caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... @@ -146,33 +167,38 @@ def _explode_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... / id_series name_series + caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc + caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc + caiso_as_prices.region CAISO AS Prices / Region + caiso_as_prices.market CAISO AS Prices / Market caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves + ``` :param row: row to transform - :return: the exploded row + :return: the collection of expanded rows """ dataset_id: str = row["id"] dataset_name: str = row["name"] - # Ignore primary key columns. - ignore_cols = set(ast.literal_eval(row["primary_key_columns"])) # Iterate through all columns and generate the row-per-series view. - exploded: List[Dict[str, Any]] = [] + expanded: List[Dict[str, Any]] = [] for col_meta in ast.literal_eval(row["all_columns"]): col_name: str = col_meta["name"] - if col_meta.get("is_datetime") or col_name in ignore_cols: - continue - exploded.append( + expanded.append( _build_series_row(row, col_name, dataset_id, dataset_name) ) - return exploded + return expanded def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: @@ -181,12 +207,37 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: E.g., Input dataset: + ``` id name .... caiso_as_prices CAISO AS Prices .... ... - + / + all_columns + [{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\ + {'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \ + {'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \ + {'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \ + {'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}, \ + {'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \ + 'is_datetime': False}] + ... + ``` Output dataset: + ``` id_series name_series + caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc + caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc + caiso_as_prices.region CAISO AS Prices / Region + caiso_as_prices.market CAISO AS Prices / Market caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down @@ -202,16 +253,21 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... + caiso_as_prices CAISO AS Prices .... ... + ``` :param df: data to transform :return: transformed data """ - exploded_rows: List[Dict[str, Any]] = [] + expanded_rows: List[Dict[str, Any]] = [] for _, dataset_row in df.iterrows(): - exploded_rows.extend(_explode_dataset_row(dataset_row)) - result = pd.DataFrame(exploded_rows) - # Arrange according to desired ordering. + expanded_rows.extend(_expand_dataset_row(dataset_row)) + result = pd.DataFrame(expanded_rows) + # Move the series-defining columns to the beginning. leading = ["id_series", "name_series"] remaining = [c for c in result.columns if c not in leading] transformed_df = result[leading + remaining] @@ -233,16 +289,19 @@ def _parse() -> argparse.Namespace: ) parser.add_argument( "--input_version", - default="v1.0", help="Version of the source metadata file", ) parser.add_argument( - "--output_version", default="v2.0", help="Version tag for the result file" + "--output_version", help="Version tag for the result file" + ) + parser.add_argument( + "--log_level", type=int, default=logging.INFO, help="Logging level" ) return parser.parse_args() def _main(args: argparse.Namespace) -> None: + hdbg.init_logger(verbosity=args.log_level, use_exec_path=True) # Load data. src_file = ( f"{args.bucket_path.rstrip('/')}/gridstatus_metadata_original_" From d9782908b1f52757c5b023dd818c9e4f6bcff773 Mon Sep 17 00:00:00 2001 From: Indrayudd Roy Chowdhury Date: Mon, 2 Jun 2025 18:14:43 -0400 Subject: [PATCH 5/5] TutorTask541: Reviewer changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- .../postprocess_gridstatus_metadata.py | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/causal_automl/postprocess_gridstatus_metadata.py b/causal_automl/postprocess_gridstatus_metadata.py index 1a58522e3b..ff0eff85b8 100755 --- a/causal_automl/postprocess_gridstatus_metadata.py +++ b/causal_automl/postprocess_gridstatus_metadata.py @@ -88,7 +88,7 @@ def _load_data(file_path: str, aws_profile: str) -> pd.DataFrame: """ file = hs3.from_file(file_path, aws_profile=aws_profile) df = pd.read_csv(io.StringIO(file)) - _LOG.info("Data Successfully Downloaded from %s.", file_path) + _LOG.info("Data successfully loaded from %s.", file_path) return df @@ -131,8 +131,7 @@ def _build_series_row( def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: """ - Expand a row representing a collection into multiple representing each - series. + Expand a row with the dataset info into rows for its series. E.g., Input row: @@ -167,16 +166,8 @@ def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... / id_series name_series - caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc - caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc - caiso_as_prices.region CAISO AS Prices / Region - caiso_as_prices.market CAISO AS Prices / Market caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down @@ -185,7 +176,6 @@ def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves ``` - :param row: row to transform :return: the collection of expanded rows """ @@ -195,6 +185,9 @@ def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]: expanded: List[Dict[str, Any]] = [] for col_meta in ast.literal_eval(row["all_columns"]): col_name: str = col_meta["name"] + # Expand only with columns that contain numeric time series. + if not col_meta.get("is_numeric"): + continue expanded.append( _build_series_row(row, col_name, dataset_id, dataset_name) ) @@ -234,10 +227,6 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: Output dataset: ``` id_series name_series - caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc - caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc - caiso_as_prices.region CAISO AS Prices / Region - caiso_as_prices.market CAISO AS Prices / Market caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down @@ -253,10 +242,6 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... - caiso_as_prices CAISO AS Prices .... ... ```