Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
"""

import logging
from typing import Any, Dict, List, Tuple
import re
from typing import Any, Dict, List, Optional, Tuple, cast

import helpers.hdbg as hdbg
import matplotlib.pyplot as plt
import pandas as pd
import requests
Expand Down Expand Up @@ -118,12 +120,18 @@ def _get_api_request(self, route: str) -> Dict[str, Any]:
# Build the full API request URL.
url = f"{self._base_url}/{route}?api_key={self._api_key}"
# Send HTTP GET request to the EIA API.
# TODO(alvino): Add error handling for the HTTP request to handle
# potential exceptions such as connection errors or timeouts.
response = requests.get(url, timeout=20)
# Parse JSON content.
# TODO(alvino): Check if the response is successful (e.g.,
# `response.status_code == 200`) before attempting to parse the JSON
# content.
json_data = response.json()
# Get response from parsed payload.
data: Dict[str, Any] = {}
data = json_data.get("response", {})
# TODO(alvino): Add error handling for JSON parsing to manage potential parsing errors.
data = json_data["response"]
return data

def _get_leaf_route_data(self) -> Dict[str, Dict[str, Any]]:
Expand Down Expand Up @@ -242,19 +250,19 @@ def _extract_metadata(
"url": url,
"id": f"{route_clean}.{frequency_id}.{metric_id_clean}",
"dataset_id": dataset_id_clean,
"name": data.get("name"),
"description": data.get("description"),
"frequency_id": frequency.get("id"),
"name": data["name"],
"description": data["description"],
"frequency_id": frequency["id"],
"frequency_alias": frequency.get("alias"),
"frequency_description": frequency.get("description"),
"frequency_query": frequency.get("query"),
"frequency_format": frequency.get("format"),
"facets": data.get("facets"),
"frequency_description": frequency["description"],
"frequency_query": frequency["query"],
"frequency_format": frequency["format"],
"facets": data["facets"],
"data": metric_id,
"data_alias": metric_info.get("alias"),
"data_units": metric_info.get("units"),
"start_period": data.get("startPeriod"),
"end_period": data.get("endPeriod"),
"start_period": data["startPeriod"],
"end_period": data["endPeriod"],
"parameter_values_file": param_file_path,
}
flattened_metadata.append(metadata)
Expand All @@ -270,6 +278,11 @@ def _get_facet_values(
:param route: dataset route under the EIA v2 API
:return: data containing all facet values
"""
hdbg.dassert_in(
"facets",
metadata,
msg="Column 'facets' not found in metadata index."
)
facets = metadata["facets"]
rows = []
for facet in facets:
Expand All @@ -295,31 +308,84 @@ def _get_facet_values(
def build_full_url(
base_url: str,
api_key: str,
facet_input: Dict[str, str],
*,
facet_input: Optional[Dict[str, str]] = None,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
) -> str:
"""
Build a full EIA v2 API URL by appending one facet value per facet type.
Build an EIA v2 API URL to data endpoint.

This modifies the base metadata URL to point to the actual time series
data endpoint.
This function modifies the base metadata URL by:
- Replacing the metadata endpoint with the actual data endpoint
- Injecting the provided API key
- Appending optional facet filters
- Appending start and end timestamps formatted to match the series frequency

:param base_url: base API URL with frequency and metric, excluding
facet values,
e.g., "https://api.eia.gov/v2/electricity/retail-sales?api_key={API_KEY}&frequency=monthly&data[0]=revenue"
:param api_key: EIA API key, e.g., "abcd1234xyz"
:param facet_input: specified facet values, e.g., {"stateid": "KS", "sectorid": "COM"}
:return: full EIA API URL with all required facet parameters,
:param start_timestamp: first observation date
:param end_timestamp: last observation date
:return: full EIA API URL to data endpoint,
e.g, "https://api.eia.gov/v2/electricity/retail-sales/data?api_key=abcd1234xyz&frequency=monthly&data[0]=price&facets[stateid][]=KS&facets[sectorid][]=OTH"
"""
match = cast(re.Match[str], re.search(r"frequency=([a-zA-Z\-]+)", base_url))
frequency = match.group(1)
base_url = base_url.replace("?", "/data?")
url = base_url.replace("{API_KEY}", api_key)
query_parts = []
for facet_id, value in facet_input.items():
query_parts.append(f"&facets[{facet_id}][]={value}")
if start_timestamp:
formatted_start = _format_timestamp(start_timestamp, frequency)
query_parts.append(f"&start={formatted_start}")
if end_timestamp:
formatted_end = _format_timestamp(end_timestamp, frequency)
query_parts.append(f"&end={formatted_end}")
if facet_input:
# Add facet values when specified.
for facet_id, value in facet_input.items():
query_parts.append(f"&facets[{facet_id}][]={value}")
full_url = url + "".join(query_parts)
return full_url


def _format_timestamp(timestamp: pd.Timestamp, frequency: str) -> pd.Timestamp:
"""
Format a timestamp based on the EIA time series frequency.

Supported formats:
- "annual": "YYYY"
- "quarterly": "YYYY-QN"
- "monthly": "YYYY-MM"
- "daily": "YYYY-MM-DD"
- "hourly": "YYYY-MM-DDTHH"
- "local-hourly": "YYYY-MM-DDTHH-ZZ" (fixed timezone offset, e.g., "-00")

:param timestamp: the timestamp to format
:param frequency: the frequency type (e.g., "monthly", "local-hourly")
:return: formatted timestamp
"""
result = ""
if frequency == "annual":
result = timestamp.strftime("%Y")
elif frequency == "monthly":
result = timestamp.strftime("%Y-%m")
elif frequency == "quarterly":
q = (timestamp.month - 1) // 3 + 1
result = f"{timestamp.year}-Q{q}"
elif frequency == "daily":
result = timestamp.strftime("%Y-%m-%d")
elif frequency == "hourly":
result = timestamp.strftime("%Y-%m-%dT%H")
elif frequency == "local-hourly":
result = timestamp.strftime("%Y-%m-%dT%H") + "-00"
else:
raise ValueError(f"Unsupported frequency: {frequency}")
return result


def plot_distribution(df_metadata: pd.DataFrame, column: str, title: str) -> None:
"""
Plot a distribution count for a specified metadata column.
Expand All @@ -329,8 +395,11 @@ def plot_distribution(df_metadata: pd.DataFrame, column: str, title: str) -> Non
'frequency_id', 'data_units')
:param title: title for the plot
"""
if column not in df_metadata.columns:
raise ValueError(f"Column '{column}' not found in metadata index.")
hdbg.dassert_in(
column,
df_metadata.columns,
msg=f"Column '{column}' not found in metadata index."
)
counts = df_metadata[column].value_counts()
ax = counts.plot(kind="bar", figsize=(8, 4), title=title)
ax.set_xlabel(column.replace("_", " ").title())
Expand Down
Loading
Loading