From 7bc6b6575358abed12ef4c88f9b76cf9a34e030f Mon Sep 17 00:00:00 2001 From: aangelo9 Date: Fri, 23 May 2025 17:41:58 -0400 Subject: [PATCH 01/16] checkpoint --- .../eia_utils.py | 25 +- causal_automl/TutorTask401_HOLDER/__init__.py | 0 causal_automl/download_eia_data.py | 217 ++++++++++++++++++ 3 files changed, 235 insertions(+), 7 deletions(-) create mode 100644 causal_automl/TutorTask401_HOLDER/__init__.py create mode 100644 causal_automl/download_eia_data.py diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index f907d8e76d..311b95793c 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -5,7 +5,7 @@ """ import logging -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple import matplotlib.pyplot as plt import pandas as pd @@ -295,27 +295,38 @@ def _get_facet_values( def build_full_url( base_url: str, api_key: str, - facet_input: Dict[str, str], + *, + facet_input: Optional[Dict[str, str]] = None, + start_timestamp: Optional[pd.Timestamp] = None, + end_timestamp: Optional[pd.Timestamp] = None, ) -> str: """ - Build a full EIA v2 API URL by appending one facet value per facet type. + Build an EIA v2 API URL to data endpoint. This modifies the base metadata URL to point to the actual time series - data endpoint. + data endpoint, optionally appending facet values and date range. :param base_url: base API URL with frequency and metric, excluding facet values, e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue" :param api_key: EIA API key, e.g., "abcd1234xyz" :param facet_input: specified facet values, e.g., {"stateid": "KS", "sectorid": "COM"} - :return: full EIA API URL with all required facet parameters, + :param start_timestamp: first observation date + :param end_timestamp: last observation date + :return: full EIA API URL to data endpoint, e.g, "https://api.eia.gov/v2/electricity/retail-sales/data?api_key=abcd1234xyz&frequency=monthly&data[0]=price&facets[stateid][]=KS&facets[sectorid][]=OTH" """ base_url = base_url.replace("?", "/data?") url = base_url.replace("{API_KEY}", api_key) query_parts = [] - for facet_id, value in facet_input.items(): - query_parts.append(f"&facets[{facet_id}][]={value}") + if start_timestamp: + query_parts.append(f"&start={start_timestamp}") + if end_timestamp: + query_parts.append(f"&end={end_timestamp}") + if facet_input: + # Add facet values when specified. + for facet_id, value in facet_input.items(): + query_parts.append(f"&facets[{facet_id}][]={value}") full_url = url + "".join(query_parts) return full_url diff --git a/causal_automl/TutorTask401_HOLDER/__init__.py b/causal_automl/TutorTask401_HOLDER/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py new file mode 100644 index 0000000000..91b077dfb5 --- /dev/null +++ b/causal_automl/download_eia_data.py @@ -0,0 +1,217 @@ +""" +Import as: + +import causal_automl.download_eia_data as cadoeida +""" + +import io +import logging +import os +from typing import Dict, List, Optional, Tuple + +import helpers.hdbg as hdbg +import helpers.hs3 as hs3 +import myeia +import pandas as pd +import ratelimit + +import causal_automl.TutorTask401_EIA_metadata_downloader_pipeline.eia_utils as catemdpeu + +_LOG = logging.getLogger(__name__) + + +# ############################################################################# +# EiaDataDownloader +# ############################################################################# + + +class EiaDataDownloader: + """ + Download historical data from EIA. + """ + + def __init__( + self, *, api_key: Optional[str] = None, aws_profile: Optional[str] = "ck" + ) -> None: + """ + Initialize the EIA data downloader with the API key. + + If no EIA API key is passed as a parameter, it is read from the + environment variable. + + :param api_key: EIA API key + :param aws_profile: AWS CLI profile name used for authentication + """ + self._api_key = api_key or os.getenv("EIA_API_KEY") + if not self._api_key: + raise ValueError("EIA API key is required") + self._client = myeia.API(token=self._api_key) + self._aws_profile = aws_profile + self.base_url = "https://api.eia.gov/v2/" + + def filter_series( + self, + df: pd.DataFrame, + id_: str, + facets: Dict[str, str], + ) -> pd.DataFrame: + """ + Filter and clean a single time series from an EIA dataset. + + Apply facet filters (e.g., state, sector) to select one unique + series, drop missing values, and convert the time column to a + UTC-indexed datetime format. + + :param df: EIA series data + :param id_: EIA series ID, e.g., + "electricity.retail_sales.monthly.price" + :param facets: facet filters, e.g., {"stateid": "WI", + "sectorid": "ALL"} + :return: data of single time series with one facet value per + facet type + """ + # Filter data with given facet values. + for key, val in facets.items(): + hdbg.dassert_in( + key, + df.columns, + "Facet '%s' not found in data columns=%s", + key, + list(df.columns), + ) + df = df[df[key] == val] + # Detect the metric column. + _, _, _, data_identifier = self._parse_id(id_) + # Drop rows with missing value. + df = df.dropna(subset=[data_identifier]) + if df.empty: + _LOG.warning("No data remaining after applying facets.") + # Convert to datetime index. + df["period"] = pd.to_datetime(df["period"]) + df = df.rename(columns={"period": "period (UTC)"}) + df = df.set_index("period (UTC)") + df.index = df.index.tz_localize("UTC") + return df + + @ratelimit.sleep_and_retry + @ratelimit.limits(calls=60, period=60) + def download_series( + self, + id_: str, + *, + start_timestamp: Optional[pd.Timestamp] = None, + end_timestamp: Optional[pd.Timestamp] = None, + max_rows_per_call: int = 5000, + ) -> pd.DataFrame: + """ + Download EIA historical series data. + + This method retrieves the full set of time series linked to an + EIA identifier, including all combinations of facet values + (e.g., state, sector). When no start and end timestamps are + passed, the entire time series is downloaded. + + :param id_: EIA series ID, e.g., + "electricity.retail_sales.monthly.price" + :param start_timestamp: first observation date + :param end_timestamp: last observation date + :param max_rows_per_call: max data rows per api call + :return: full time series data with all facets + """ + # Get base url from metadata index. + base_url, _ = self._get_metadata_url_and_facets(id_) + # Build URL query with api key and timestamps. + url = catemdpeu.build_full_url( + base_url, + self._api_key, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + ) + df = [] + offset = 0 + while True: + # Construct the paginated URL for the current offset. + paginated_url = f"{url}&offset={offset}&length={max_rows_per_call}" + data = self._client.get_response(paginated_url, self._client.header) + df.append(data) + if len(data) < max_rows_per_call: + # Exit loop when its the final page of data. + break + offset += max_rows_per_call + if not df: + _LOG.warning("No data returned under given id.") + return pd.concat(df, ignore_index=True) + + def _parse_id(self, id_: str) -> Tuple[str, str, str, str]: + """ + Parse an EIA time series ID into its components. + + :param id_: EIA time series ID, + e.g., "electricity.retail_sales.monthly.price" + :return: + - top-level EIA category, e.g., "electricity" + - sub-path in the category, e.g., "retail-sales" + - reporting frequency, e.g., "monthly" + - data identifier, e.g., "price" + """ + id_ = id_.replace("_", "-") + parts = id_.split(".") + category = parts[0] + frequency = parts[-2] + data_identifier = parts[-1] + route_parts = parts[1:-2] + subroute = "/".join(route_parts) + return category, subroute, frequency, data_identifier + + def _get_latest_metadata_s3_path(self, category: str) -> str: + """ + Get the latest versioned metadata file S3 path for a given category. + + :param category: top-level EIA category, e.g., "electricity" + :return: full S3 path to the latest version of the metadata CSV + e.g., "eia_electricity_metadata_original_v2.0.csv" + """ + # Get file names from s3 bucket. + base_dir = "s3://causify-data-collaborators/causal_automl/metadata" + pattern = f"eia_{category}_metadata_original_v*" + files = hs3.listdir( + dir_name=base_dir, + pattern=pattern, + only_files=True, + use_relative_paths=False, + aws_profile=self._aws_profile, + maxdepth=1, + ) + if not files: + raise FileNotFoundError( + f"No metadata index file found for category: '{category}' in S3." + ) + # Get latest file version. + files.sort(reverse=True) + s3_path = f"s3://{files[0]}" + return s3_path + + def _get_metadata_url_and_facets(self, id_: str) -> Tuple[str, List[str]]: + """ + :param id_: EIA time series ID, + e.g., "electricity.retail_sales.monthly.price" + :param category: top-level EIA category, e.g., "electricity" + :return: + - base API URL with frequency and metric, excluding facet values, + e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue" + - available facet types, e.g., ["stateid", "sectorid"] + """ + category, _, _, _ = self._parse_id(id_) + # Load latest metadata index file from s3. + s3_path = self._get_latest_metadata_s3_path(category) + csv_str = hs3.from_file(s3_path, aws_profile=self._aws_profile) + df = pd.read_csv(io.StringIO(csv_str)) + # Filter for exact ID match. + match = df[df["id"] == id_] + if match.empty: + raise ValueError(f"Invalid id: '{id_}'") + row = match.iloc[0] + base_url = row["url"] + # Extract only id field from facets. + facets = [f["id"] for f in eval(row["facets"])] + return base_url, facets From 09ee2b14a948c841e0b298d1b0efabf243aaf7bb Mon Sep 17 00:00:00 2001 From: aangelo9 Date: Fri, 23 May 2025 18:09:32 -0400 Subject: [PATCH 02/16] checkpoint --- causal_automl/download_eia_data.py | 45 +++++++++++++----------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 91b077dfb5..56a2ffb411 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -7,7 +7,7 @@ import io import logging import os -from typing import Dict, List, Optional, Tuple +from typing import Dict, Optional, Tuple import helpers.hdbg as hdbg import helpers.hs3 as hs3 @@ -65,8 +65,8 @@ def filter_series( :param df: EIA series data :param id_: EIA series ID, e.g., "electricity.retail_sales.monthly.price" - :param facets: facet filters, e.g., {"stateid": "WI", - "sectorid": "ALL"} + :param facets: facet filters, + e.g., {"stateid": "WI", "sectorid": "ALL"} :return: data of single time series with one facet value per facet type """ @@ -81,7 +81,7 @@ def filter_series( ) df = df[df[key] == val] # Detect the metric column. - _, _, _, data_identifier = self._parse_id(id_) + _, data_identifier = self._parse_id(id_) # Drop rows with missing value. df = df.dropna(subset=[data_identifier]) if df.empty: @@ -119,7 +119,7 @@ def download_series( :return: full time series data with all facets """ # Get base url from metadata index. - base_url, _ = self._get_metadata_url_and_facets(id_) + base_url = self._get_metadata_url(id_) # Build URL query with api key and timestamps. url = catemdpeu.build_full_url( base_url, @@ -127,22 +127,24 @@ def download_series( start_timestamp=start_timestamp, end_timestamp=end_timestamp, ) - df = [] + data_chunks = [] offset = 0 while True: # Construct the paginated URL for the current offset. paginated_url = f"{url}&offset={offset}&length={max_rows_per_call}" data = self._client.get_response(paginated_url, self._client.header) - df.append(data) + data_chunks.append(data) if len(data) < max_rows_per_call: # Exit loop when its the final page of data. break offset += max_rows_per_call - if not df: + if not data_chunks: _LOG.warning("No data returned under given id.") - return pd.concat(df, ignore_index=True) + df = pd.concat(data_chunks, ignore_index=True) + _LOG.debug("Downloaded %d rows for id=%s", len(df), id_) + return df - def _parse_id(self, id_: str) -> Tuple[str, str, str, str]: + def _parse_id(self, id_: str) -> Tuple[str, str]: """ Parse an EIA time series ID into its components. @@ -150,18 +152,13 @@ def _parse_id(self, id_: str) -> Tuple[str, str, str, str]: e.g., "electricity.retail_sales.monthly.price" :return: - top-level EIA category, e.g., "electricity" - - sub-path in the category, e.g., "retail-sales" - - reporting frequency, e.g., "monthly" - data identifier, e.g., "price" """ id_ = id_.replace("_", "-") parts = id_.split(".") category = parts[0] - frequency = parts[-2] data_identifier = parts[-1] - route_parts = parts[1:-2] - subroute = "/".join(route_parts) - return category, subroute, frequency, data_identifier + return category, data_identifier def _get_latest_metadata_s3_path(self, category: str) -> str: """ @@ -191,17 +188,15 @@ def _get_latest_metadata_s3_path(self, category: str) -> str: s3_path = f"s3://{files[0]}" return s3_path - def _get_metadata_url_and_facets(self, id_: str) -> Tuple[str, List[str]]: + def _get_metadata_url(self, id_: str) -> str: """ :param id_: EIA time series ID, e.g., "electricity.retail_sales.monthly.price" :param category: top-level EIA category, e.g., "electricity" - :return: - - base API URL with frequency and metric, excluding facet values, - e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue" - - available facet types, e.g., ["stateid", "sectorid"] + :return: base API URL with frequency and metric, excluding facet values, + e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue" """ - category, _, _, _ = self._parse_id(id_) + category, _ = self._parse_id(id_) # Load latest metadata index file from s3. s3_path = self._get_latest_metadata_s3_path(category) csv_str = hs3.from_file(s3_path, aws_profile=self._aws_profile) @@ -211,7 +206,5 @@ def _get_metadata_url_and_facets(self, id_: str) -> Tuple[str, List[str]]: if match.empty: raise ValueError(f"Invalid id: '{id_}'") row = match.iloc[0] - base_url = row["url"] - # Extract only id field from facets. - facets = [f["id"] for f in eval(row["facets"])] - return base_url, facets + base_url = str(row["url"]) + return base_url From c0b3e49be6b2607b1f39dbcce4aae451cd2f29d8 Mon Sep 17 00:00:00 2001 From: aangelo9 Date: Fri, 23 May 2025 18:11:44 -0400 Subject: [PATCH 03/16] remove holder file --- causal_automl/TutorTask401_HOLDER/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 causal_automl/TutorTask401_HOLDER/__init__.py diff --git a/causal_automl/TutorTask401_HOLDER/__init__.py b/causal_automl/TutorTask401_HOLDER/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 From 32ad9c4f4dbde829d912494caec88da55fa2101f Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Fri, 23 May 2025 18:18:13 -0400 Subject: [PATCH 04/16] nits --- causal_automl/download_eia_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 56a2ffb411..3919c12f6f 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -108,7 +108,7 @@ def download_series( This method retrieves the full set of time series linked to an EIA identifier, including all combinations of facet values - (e.g., state, sector). When no start and end timestamps are + (e.g., `stateid`, `sectorid`). When no start and end timestamps are passed, the entire time series is downloaded. :param id_: EIA series ID, e.g., From 1dbbecf8b06efc133ba4b13f8ecffcf60b379062 Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Tue, 27 May 2025 11:10:37 -0400 Subject: [PATCH 05/16] remove api key from args --- causal_automl/download_eia_data.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 3919c12f6f..b45f663814 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -31,20 +31,21 @@ class EiaDataDownloader: """ def __init__( - self, *, api_key: Optional[str] = None, aws_profile: Optional[str] = "ck" + self, *, aws_profile: Optional[str] = "ck" ) -> None: """ - Initialize the EIA data downloader with the API key. + Initialize the EIA data downloader with the API key and AWS profile. - If no EIA API key is passed as a parameter, it is read from the - environment variable. + EIA API key is read from the environment variable. - :param api_key: EIA API key :param aws_profile: AWS CLI profile name used for authentication """ - self._api_key = api_key or os.getenv("EIA_API_KEY") - if not self._api_key: - raise ValueError("EIA API key is required") + hdbg.dassert_in( + "EIA_API_KEY", + os.environ, + msg="EIA_API_KEY is not found in environment variables", + ) + self._api_key = os.getenv("EIA_API_KEY") self._client = myeia.API(token=self._api_key) self._aws_profile = aws_profile self.base_url = "https://api.eia.gov/v2/" From ad7ee01e6a34d315f5722be759b536c07a949527 Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Tue, 27 May 2025 11:12:47 -0400 Subject: [PATCH 06/16] remove ratelimit --- causal_automl/download_eia_data.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index b45f663814..682108bec2 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -13,7 +13,6 @@ import helpers.hs3 as hs3 import myeia import pandas as pd -import ratelimit import causal_automl.TutorTask401_EIA_metadata_downloader_pipeline.eia_utils as catemdpeu @@ -94,8 +93,6 @@ def filter_series( df.index = df.index.tz_localize("UTC") return df - @ratelimit.sleep_and_retry - @ratelimit.limits(calls=60, period=60) def download_series( self, id_: str, From 510a61e3999ebbf4cbc84bddc137e86e2f28cb88 Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Tue, 27 May 2025 11:38:23 -0400 Subject: [PATCH 07/16] add output examples --- causal_automl/download_eia_data.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 682108bec2..61c5cfdaa1 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -69,6 +69,19 @@ def filter_series( e.g., {"stateid": "WI", "sectorid": "ALL"} :return: data of single time series with one facet value per facet type + + Example output: + ``` + period stateid stateDescription sectorid + 2001-05-01T00:00:00+00:00 WI Wisconsin ALL + 2001-01-01T00:00:00+00:00 WI Wisconsin ALL + 2001-04-01T00:00:00+00:00 WI Wisconsin ALL + + sectorName price price-units + all sectors 5.98 cents per kilowatt-hour + all sectors 5.9 cents per kilowatt-hour + all sectors 6.02 cents per kilowatt-hour + ``` """ # Filter data with given facet values. for key, val in facets.items(): @@ -91,6 +104,7 @@ def filter_series( df = df.rename(columns={"period": "period (UTC)"}) df = df.set_index("period (UTC)") df.index = df.index.tz_localize("UTC") + df = df.sort_index() return df def download_series( @@ -115,6 +129,19 @@ def download_series( :param end_timestamp: last observation date :param max_rows_per_call: max data rows per api call :return: full time series data with all facets + + Example output: + ``` + period stateid stateDescription sectorid sectorName + 2020-09 WI Wisconsin IND industrial + 2020-09 WY Wyoming ALL all sectors + 2020-09 IA Iowa RES Residential + + price price-units + 7.45 cents per kilowatt-hour + 8.55 cents per kilowatt-hour + 12.65 cents per kilowatt-hour + ``` """ # Get base url from metadata index. base_url = self._get_metadata_url(id_) From b248ebeaf717ff7c6ce1b288d4d591b0b3e689ab Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Tue, 27 May 2025 11:39:35 -0400 Subject: [PATCH 08/16] sort output --- causal_automl/download_eia_data.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 61c5cfdaa1..06ca1850d2 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -73,14 +73,14 @@ def filter_series( Example output: ``` period stateid stateDescription sectorid - 2001-05-01T00:00:00+00:00 WI Wisconsin ALL 2001-01-01T00:00:00+00:00 WI Wisconsin ALL - 2001-04-01T00:00:00+00:00 WI Wisconsin ALL + 2001-02-01T00:00:00+00:00 WI Wisconsin ALL + 2001-03-01T00:00:00+00:00 WI Wisconsin ALL sectorName price price-units - all sectors 5.98 cents per kilowatt-hour all sectors 5.9 cents per kilowatt-hour - all sectors 6.02 cents per kilowatt-hour + all sectors 5.98 cents per kilowatt-hour + all sectors 5.93 cents per kilowatt-hour ``` """ # Filter data with given facet values. From 3161c9d6817559f2d7451577ce48b1c41be7f4ab Mon Sep 17 00:00:00 2001 From: GP Saggese Date: Fri, 30 May 2025 14:09:13 -0400 Subject: [PATCH 09/16] Review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- .../eia_utils.py | 12 ++++++++++-- helpers_root | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index 311b95793c..24ef2e4b31 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -118,11 +118,17 @@ def _get_api_request(self, route: str) -> Dict[str, Any]: # Build the full API request URL. url = f"{self._base_url}/{route}?api_key={self._api_key}" # Send HTTP GET request to the EIA API. + # TODO(alvino): Add error handling for the HTTP request to handle + # potential exceptions such as connection errors or timeouts. response = requests.get(url, timeout=20) # Parse JSON content. + # TODO(alvino): Check if the response is successful (e.g., + # `response.status_code == 200`) before attempting to parse the JSON + # content. json_data = response.json() # Get response from parsed payload. data: Dict[str, Any] = {} + # TODO(alvino): Add error handling for JSON parsing to manage potential parsing errors. data = json_data.get("response", {}) return data @@ -238,6 +244,8 @@ def _extract_metadata( # Determine parameter CSV path for associated facet values. param_file_path = f"eia_parameters_v{self._version_num}/{dataset_id_clean}_parameters.csv" # Flattened metadata row for one frequency and metric combination. + # TODO(gp): `.get()` will use `None` if there is a missing + # value in the dictionary. Is this the intended behavior? metadata = { "url": url, "id": f"{route_clean}.{frequency_id}.{metric_id_clean}", @@ -270,6 +278,7 @@ def _get_facet_values( :param route: dataset route under the EIA v2 API :return: data containing all facet values """ + hdbg.dassert_in("facets", metadata) facets = metadata["facets"] rows = [] for facet in facets: @@ -340,8 +349,7 @@ def plot_distribution(df_metadata: pd.DataFrame, column: str, title: str) -> Non 'frequency_id', 'data_units') :param title: title for the plot """ - if column not in df_metadata.columns: - raise ValueError(f"Column '{column}' not found in metadata index.") + hdbg.dassert_in(column, df_metadata.columns) counts = df_metadata[column].value_counts() ax = counts.plot(kind="bar", figsize=(8, 4), title=title) ax.set_xlabel(column.replace("_", " ").title()) diff --git a/helpers_root b/helpers_root index 69a00d2f8d..7629706576 160000 --- a/helpers_root +++ b/helpers_root @@ -1 +1 @@ -Subproject commit 69a00d2f8dd0d4de1aa8893d7539de91538377b9 +Subproject commit 762970657663b4d2a0692453fbc9ad2b217c3231 From 3dfe953be6ef750c1cf949fa2ebbc9e3a7576e21 Mon Sep 17 00:00:00 2001 From: aangelo9 Date: Fri, 30 May 2025 18:24:23 -0400 Subject: [PATCH 10/16] checkpoint --- .../eia_utils.py | 54 +++++++- causal_automl/download_eia_data.py | 118 ++++++++++-------- 2 files changed, 114 insertions(+), 58 deletions(-) diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index 24ef2e4b31..eff253921d 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -5,8 +5,10 @@ """ import logging -from typing import Any, Dict, List, Optional, Tuple +import re +from typing import Any, Dict, List, Optional, Tuple, cast +import helpers.hdbg as hdbg import matplotlib.pyplot as plt import pandas as pd import requests @@ -312,8 +314,11 @@ def build_full_url( """ Build an EIA v2 API URL to data endpoint. - This modifies the base metadata URL to point to the actual time series - data endpoint, optionally appending facet values and date range. + This function modifies the base metadata URL by: + - Replacing the metadata endpoint with the actual data endpoint + - Injecting the provided API key + - Appending optional facet filters + - Appending start and end timestamps formatted to match the series frequency :param base_url: base API URL with frequency and metric, excluding facet values, @@ -325,13 +330,17 @@ def build_full_url( :return: full EIA API URL to data endpoint, e.g, "https://api.eia.gov/v2/electricity/retail-sales/data?api_key=abcd1234xyz&frequency=monthly&data[0]=price&facets[stateid][]=KS&facets[sectorid][]=OTH" """ + match = cast(re.Match[str], re.search(r"frequency=([a-zA-Z\-]+)", base_url)) + frequency = match.group(1) base_url = base_url.replace("?", "/data?") url = base_url.replace("{API_KEY}", api_key) query_parts = [] if start_timestamp: - query_parts.append(f"&start={start_timestamp}") + formatted_start = _format_timestamp(start_timestamp, frequency) + query_parts.append(f"&start={formatted_start}") if end_timestamp: - query_parts.append(f"&end={end_timestamp}") + formatted_end = _format_timestamp(end_timestamp, frequency) + query_parts.append(f"&end={formatted_end}") if facet_input: # Add facet values when specified. for facet_id, value in facet_input.items(): @@ -340,6 +349,41 @@ def build_full_url( return full_url +def _format_timestamp(timestamp: pd.Timestamp, frequency: str) -> pd.Timestamp: + """ + Format a timestamp based on the EIA time series frequency. + + Supported formats: + - "annual": "YYYY" + - "quarterly": "YYYY-QN" + - "monthly": "YYYY-MM" + - "daily": "YYYY-MM-DD" + - "hourly": "YYYY-MM-DDTHH" + - "local-hourly": "YYYY-MM-DDTHH-ZZ" (fixed timezone offset, e.g., -00) + + :param timestamp: the timestamp to format + :param frequency: the frequency type (e.g., "monthly", "local-hourly") + :return: formatted timestamp + """ + result = "" + if frequency == "annual": + result = timestamp.strftime("%Y") + elif frequency == "monthly": + result = timestamp.strftime("%Y-%m") + elif frequency == "quarterly": + q = (timestamp.month - 1) // 3 + 1 + result = f"{timestamp.year}-Q{q}" + elif frequency == "daily": + result = timestamp.strftime("%Y-%m-%d") + elif frequency == "hourly": + result = timestamp.strftime("%Y-%m-%dT%H") + elif frequency == "local-hourly": + result = timestamp.strftime("%Y-%m-%dT%H") + "-00" + else: + raise ValueError(f"Unsupported frequency: {frequency}") + return result + + def plot_distribution(df_metadata: pd.DataFrame, column: str, title: str) -> None: """ Plot a distribution count for a specified metadata column. diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 06ca1850d2..856a08a92b 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -29,9 +29,7 @@ class EiaDataDownloader: Download historical data from EIA. """ - def __init__( - self, *, aws_profile: Optional[str] = "ck" - ) -> None: + def __init__(self, *, aws_profile: str = "ck") -> None: """ Initialize the EIA data downloader with the API key and AWS profile. @@ -47,7 +45,7 @@ def __init__( self._api_key = os.getenv("EIA_API_KEY") self._client = myeia.API(token=self._api_key) self._aws_profile = aws_profile - self.base_url = "https://api.eia.gov/v2/" + self._metadata_index_by_category: Dict[str, pd.DataFrame] = {} def filter_series( self, @@ -58,29 +56,26 @@ def filter_series( """ Filter and clean a single time series from an EIA dataset. - Apply facet filters (e.g., state, sector) to select one unique - series, drop missing values, and convert the time column to a - UTC-indexed datetime format. + This function performs data post-processing: + - Filter by facet values (e.g., "stateid", "sectorid") + - Retain only the period and metric column + - Convert the period column to UTC datetime + - Set the period as the index and sort chronologically :param df: EIA series data :param id_: EIA series ID, e.g., "electricity.retail_sales.monthly.price" - :param facets: facet filters, + :param facets: facet filters, e.g., {"stateid": "WI", "sectorid": "ALL"} :return: data of single time series with one facet value per facet type Example output: ``` - period stateid stateDescription sectorid - 2001-01-01T00:00:00+00:00 WI Wisconsin ALL - 2001-02-01T00:00:00+00:00 WI Wisconsin ALL - 2001-03-01T00:00:00+00:00 WI Wisconsin ALL - - sectorName price price-units - all sectors 5.9 cents per kilowatt-hour - all sectors 5.98 cents per kilowatt-hour - all sectors 5.93 cents per kilowatt-hour + period price + 2001-01-01T00:00:00+00:00 5.9 + 2001-02-01T00:00:00+00:00 5.98 + 2001-03-01T00:00:00+00:00 5.93 ``` """ # Filter data with given facet values. @@ -94,16 +89,15 @@ def filter_series( ) df = df[df[key] == val] # Detect the metric column. - _, data_identifier = self._parse_id(id_) + _, _, _, data_identifier = self._parse_id(id_) + df = df[["period", data_identifier]] # Drop rows with missing value. df = df.dropna(subset=[data_identifier]) if df.empty: _LOG.warning("No data remaining after applying facets.") - # Convert to datetime index. - df["period"] = pd.to_datetime(df["period"]) - df = df.rename(columns={"period": "period (UTC)"}) - df = df.set_index("period (UTC)") - df.index = df.index.tz_localize("UTC") + # Convert to datetime and index. + df["period"] = pd.to_datetime(df["period"]).dt.tz_localize("UTC") + df = df.set_index("period") df = df.sort_index() return df @@ -118,19 +112,23 @@ def download_series( """ Download EIA historical series data. - This method retrieves the full set of time series linked to an - EIA identifier, including all combinations of facet values - (e.g., `stateid`, `sectorid`). When no start and end timestamps are - passed, the entire time series is downloaded. + This method retrieves the full set of time series linked to an + EIA identifier, including all combinations of facet values + (e.g., `stateid`, `sectorid`). When no start and end timestamps are + passed, the entire time series is downloaded. - :param id_: EIA series ID, e.g., - "electricity.retail_sales.monthly.price" - :param start_timestamp: first observation date - :param end_timestamp: last observation date - :param max_rows_per_call: max data rows per api call - :return: full time series data with all facets + Pagination is handled internally. The `max_rows_per_call` parameter + controls the page size for each API request, but the method will + continue fetching until all available data is retrieved. - Example output: + :param id_: EIA series ID, e.g., + "electricity.retail_sales.monthly.price" + :param start_timestamp: first observation date + :param end_timestamp: last observation date + :param max_rows_per_call: max data rows per API call + :return: full time series data with all facets + + Example output: ``` period stateid stateDescription sectorid sectorName 2020-09 WI Wisconsin IND industrial @@ -145,7 +143,7 @@ def download_series( """ # Get base url from metadata index. base_url = self._get_metadata_url(id_) - # Build URL query with api key and timestamps. + # Build URL query with API key and timestamps. url = catemdpeu.build_full_url( base_url, self._api_key, @@ -160,7 +158,7 @@ def download_series( data = self._client.get_response(paginated_url, self._client.header) data_chunks.append(data) if len(data) < max_rows_per_call: - # Exit loop when its the final page of data. + # Exit loop when it's the final page of data. break offset += max_rows_per_call if not data_chunks: @@ -169,31 +167,40 @@ def download_series( _LOG.debug("Downloaded %d rows for id=%s", len(df), id_) return df - def _parse_id(self, id_: str) -> Tuple[str, str]: + def _parse_id(self, id_: str) -> Tuple[str, str, str, str]: """ Parse an EIA time series ID into its components. + EIA time series IDs follow the format: + ... + + Underscores are converted to dashes to match the EIA API format. + :param id_: EIA time series ID, e.g., "electricity.retail_sales.monthly.price" :return: - top-level EIA category, e.g., "electricity" + - subroute in the category, e.g., "retail-sales" + - reporting frequency, e.g., "monthly" - data identifier, e.g., "price" """ id_ = id_.replace("_", "-") parts = id_.split(".") category = parts[0] + frequency = parts[-2] data_identifier = parts[-1] - return category, data_identifier + route_parts = parts[1:-2] + subroute = "/".join(route_parts) + return category, subroute, frequency, data_identifier - def _get_latest_metadata_s3_path(self, category: str) -> str: + def _get_latest_metadata_from_s3(self, category: str) -> pd.DataFrame: """ - Get the latest versioned metadata file S3 path for a given category. + Get the latest versioned metadata index file from S3 for a category. :param category: top-level EIA category, e.g., "electricity" - :return: full S3 path to the latest version of the metadata CSV - e.g., "eia_electricity_metadata_original_v2.0.csv" + :return: latest versioned metadata index """ - # Get file names from s3 bucket. + # Get file names from S3 bucket. base_dir = "s3://causify-data-collaborators/causal_automl/metadata" pattern = f"eia_{category}_metadata_original_v*" files = hs3.listdir( @@ -211,25 +218,30 @@ def _get_latest_metadata_s3_path(self, category: str) -> str: # Get latest file version. files.sort(reverse=True) s3_path = f"s3://{files[0]}" - return s3_path + # Load latest metadata index file from S3. + csv_str = hs3.from_file(s3_path, aws_profile=self._aws_profile) + df = pd.read_csv(io.StringIO(csv_str)) + return df def _get_metadata_url(self, id_: str) -> str: """ + Get base URL for given series ID from the metadata index. + :param id_: EIA time series ID, e.g., "electricity.retail_sales.monthly.price" - :param category: top-level EIA category, e.g., "electricity" :return: base API URL with frequency and metric, excluding facet values, e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue" """ - category, _ = self._parse_id(id_) - # Load latest metadata index file from s3. - s3_path = self._get_latest_metadata_s3_path(category) - csv_str = hs3.from_file(s3_path, aws_profile=self._aws_profile) - df = pd.read_csv(io.StringIO(csv_str)) + category, _, _, _ = self._parse_id(id_) + # Load latest metadata index file from S3. + if category not in self._metadata_index_by_category: + self._metadata_index_by_category[category] = ( + self._get_latest_metadata_from_s3(category) + ) + df = self._metadata_index_by_category[category] # Filter for exact ID match. match = df[df["id"] == id_] if match.empty: - raise ValueError(f"Invalid id: '{id_}'") - row = match.iloc[0] - base_url = str(row["url"]) + raise ValueError(f"Invalid ID: '{id_}'") + base_url: str = match.iloc[0]["url"] return base_url From c43e654dfd86be0ca2ec70f7dbe49e3ebd66eae6 Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Fri, 30 May 2025 20:35:15 -0400 Subject: [PATCH 11/16] nits --- causal_automl/download_eia_data.py | 34 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 856a08a92b..59825280d2 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -112,23 +112,23 @@ def download_series( """ Download EIA historical series data. - This method retrieves the full set of time series linked to an - EIA identifier, including all combinations of facet values - (e.g., `stateid`, `sectorid`). When no start and end timestamps are - passed, the entire time series is downloaded. - - Pagination is handled internally. The `max_rows_per_call` parameter - controls the page size for each API request, but the method will - continue fetching until all available data is retrieved. - - :param id_: EIA series ID, e.g., - "electricity.retail_sales.monthly.price" - :param start_timestamp: first observation date - :param end_timestamp: last observation date - :param max_rows_per_call: max data rows per API call - :return: full time series data with all facets - - Example output: + This method retrieves the full set of time series linked to an + EIA identifier, including all combinations of facet values + (e.g., `stateid`, `sectorid`). When no start and end timestamps are + passed, the entire time series is downloaded. + + Pagination is handled internally. The `max_rows_per_call` parameter + controls the page size for each API request, but the method will + continue fetching until all available data is retrieved. + + :param id_: EIA series ID, e.g., + "electricity.retail_sales.monthly.price" + :param start_timestamp: first observation date + :param end_timestamp: last observation date + :param max_rows_per_call: max data rows per API call + :return: full time series data with all facets + + Example output: ``` period stateid stateDescription sectorid sectorName 2020-09 WI Wisconsin IND industrial From b7f1425634284cb763d2df3f73baa1e47d61073d Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Fri, 30 May 2025 20:39:34 -0400 Subject: [PATCH 12/16] nits --- .../TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index eff253921d..9803e4cdf7 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -359,7 +359,7 @@ def _format_timestamp(timestamp: pd.Timestamp, frequency: str) -> pd.Timestamp: - "monthly": "YYYY-MM" - "daily": "YYYY-MM-DD" - "hourly": "YYYY-MM-DDTHH" - - "local-hourly": "YYYY-MM-DDTHH-ZZ" (fixed timezone offset, e.g., -00) + - "local-hourly": "YYYY-MM-DDTHH-ZZ" (fixed timezone offset, e.g., "-00") :param timestamp: the timestamp to format :param frequency: the frequency type (e.g., "monthly", "local-hourly") From 316d88c50fbb344e5471e5485c86b92775282d7f Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Mon, 2 Jun 2025 10:34:05 -0400 Subject: [PATCH 13/16] add missing value warning --- causal_automl/download_eia_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 59825280d2..5f8dc0124f 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -88,13 +88,15 @@ def filter_series( list(df.columns), ) df = df[df[key] == val] + if df.empty: + _LOG.warning("No data remaining after applying facets.") # Detect the metric column. _, _, _, data_identifier = self._parse_id(id_) df = df[["period", data_identifier]] # Drop rows with missing value. df = df.dropna(subset=[data_identifier]) if df.empty: - _LOG.warning("No data remaining after applying facets.") + _LOG.warning("No data remaining after dropping NaN values.") # Convert to datetime and index. df["period"] = pd.to_datetime(df["period"]).dt.tz_localize("UTC") df = df.set_index("period") From b20c26a4d3f03b260235860d4984f45b480aeded Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Mon, 2 Jun 2025 11:07:32 -0400 Subject: [PATCH 14/16] add messages to dassert --- .../eia_utils.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index 9803e4cdf7..66519a1f6f 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -131,7 +131,7 @@ def _get_api_request(self, route: str) -> Dict[str, Any]: # Get response from parsed payload. data: Dict[str, Any] = {} # TODO(alvino): Add error handling for JSON parsing to manage potential parsing errors. - data = json_data.get("response", {}) + data = json_data["response"] return data def _get_leaf_route_data(self) -> Dict[str, Dict[str, Any]]: @@ -280,7 +280,11 @@ def _get_facet_values( :param route: dataset route under the EIA v2 API :return: data containing all facet values """ - hdbg.dassert_in("facets", metadata) + hdbg.dassert_in( + "facets", + metadata, + msg="Column 'facets' not found in metadata index." + ) facets = metadata["facets"] rows = [] for facet in facets: @@ -393,7 +397,11 @@ def plot_distribution(df_metadata: pd.DataFrame, column: str, title: str) -> Non 'frequency_id', 'data_units') :param title: title for the plot """ - hdbg.dassert_in(column, df_metadata.columns) + hdbg.dassert_in( + column, + df_metadata.columns, + msg=f"Column '{column}' not found in metadata index." + ) counts = df_metadata[column].value_counts() ax = counts.plot(kind="bar", figsize=(8, 4), title=title) ax.set_xlabel(column.replace("_", " ").title()) From 79f2ae0dc2cae0779b87b753c060cff095ef6154 Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Mon, 2 Jun 2025 11:13:41 -0400 Subject: [PATCH 15/16] nits (add msg) --- causal_automl/download_eia_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/causal_automl/download_eia_data.py b/causal_automl/download_eia_data.py index 5f8dc0124f..6c32958d12 100644 --- a/causal_automl/download_eia_data.py +++ b/causal_automl/download_eia_data.py @@ -83,9 +83,9 @@ def filter_series( hdbg.dassert_in( key, df.columns, - "Facet '%s' not found in data columns=%s", - key, - list(df.columns), + msg=( + f"Facet '{key}' not found in data columns={list(df.columns)}" + ), ) df = df[df[key] == val] if df.empty: From 2f34e44f7bf42c06fa89bba5ab9ee5afac7e3d1d Mon Sep 17 00:00:00 2001 From: aangelo9 <153690899+aangelo9@users.noreply.github.com> Date: Mon, 2 Jun 2025 11:33:57 -0400 Subject: [PATCH 16/16] fetch value instead of get --- .../eia_utils.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py index 66519a1f6f..669853ffc0 100644 --- a/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py +++ b/causal_automl/TutorTask401_EIA_metadata_downloader_pipeline/eia_utils.py @@ -246,25 +246,23 @@ def _extract_metadata( # Determine parameter CSV path for associated facet values. param_file_path = f"eia_parameters_v{self._version_num}/{dataset_id_clean}_parameters.csv" # Flattened metadata row for one frequency and metric combination. - # TODO(gp): `.get()` will use `None` if there is a missing - # value in the dictionary. Is this the intended behavior? metadata = { "url": url, "id": f"{route_clean}.{frequency_id}.{metric_id_clean}", "dataset_id": dataset_id_clean, - "name": data.get("name"), - "description": data.get("description"), - "frequency_id": frequency.get("id"), + "name": data["name"], + "description": data["description"], + "frequency_id": frequency["id"], "frequency_alias": frequency.get("alias"), - "frequency_description": frequency.get("description"), - "frequency_query": frequency.get("query"), - "frequency_format": frequency.get("format"), - "facets": data.get("facets"), + "frequency_description": frequency["description"], + "frequency_query": frequency["query"], + "frequency_format": frequency["format"], + "facets": data["facets"], "data": metric_id, "data_alias": metric_info.get("alias"), "data_units": metric_info.get("units"), - "start_period": data.get("startPeriod"), - "end_period": data.get("endPeriod"), + "start_period": data["startPeriod"], + "end_period": data["endPeriod"], "parameter_values_file": param_file_path, } flattened_metadata.append(metadata)