diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 82bf21d..5c98b84 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -32,10 +32,10 @@ jobs: - name: Create version tag shell: bash - run: echo "tag=docker.elaad.io/elaadnl/bl-reference-implementation:$(git show -s --format="%ct-%h" $GITHUB_SHA)" >> $GITHUB_ENV + run: echo "tag=docker.elaad.io/elaadnl/ditm-openadr-bl:$(git show -s --format="%ct-%h" $GITHUB_SHA)" >> $GITHUB_ENV - name: Latest tag on main branch if: github.ref == 'refs/heads/main' - run: echo "tag_main=,docker.elaad.io/elaadnl/bl-reference-implementation:latest" >> $GITHUB_ENV + run: echo "tag_main=,docker.elaad.io/elaadnl/ditm-openadr-bl:latest" >> $GITHUB_ENV - name: Set up Docker Buildx uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 diff --git a/poetry.lock b/poetry.lock index a1f1477..e25ca86 100644 --- a/poetry.lock +++ b/poetry.lock @@ -704,6 +704,21 @@ files = [ {file = "frozenlist-1.7.0.tar.gz", hash = "sha256:2e310d81923c2437ea8670467121cc3e9b0f76d3043cc1d2331d56c7fb7a3a8f"}, ] +[[package]] +name = "holidays" +version = "0.83" +description = "Open World Holidays Framework" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "holidays-0.83-py3-none-any.whl", hash = "sha256:e36a368227b5b62129871463697bfde7e5212f6f77e43640320b727b79a875a8"}, + {file = "holidays-0.83.tar.gz", hash = "sha256:99b97b002079ab57dac93295933907d2aae2742ad9a4d64fe33864dfae6805fa"}, +] + +[package.dependencies] +python-dateutil = "*" + [[package]] name = "idna" version = "3.10" @@ -1099,14 +1114,14 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] [[package]] name = "openadr3-client" -version = "0.0.7" +version = "0.0.11" description = "" optional = false python-versions = "<4,>=3.12" groups = ["main"] files = [ - {file = "openadr3_client-0.0.7-py3-none-any.whl", hash = "sha256:e6175fa5e92e58f1b9b11b211f10f23ff2e42f3a25fc517f13734c9fd60a991e"}, - {file = "openadr3_client-0.0.7.tar.gz", hash = "sha256:37f841e9123ebb05f1e7cda97f9e87f9e0289553d52f3e627cfd1c91a3e62d18"}, + {file = "openadr3_client-0.0.11-py3-none-any.whl", hash = "sha256:ce050556a5ff0566671e7c8052f99c22c3f51dc1f8665124b372277f17366ec0"}, + {file = "openadr3_client-0.0.11.tar.gz", hash = "sha256:2a975db33d7f72fa91a260c33822c9b3def037aa03e8fff5428d1922d4e3adf6"}, ] [package.dependencies] @@ -1118,26 +1133,26 @@ pandera = {version = ">=0.23.1,<0.24.0", extras = ["mypy"]} pycountry = ">=24.6.1,<25.0.0" pydantic = ">=2.11.2,<3.0.0" pydantic-extra-types = ">=2.10.3,<3.0.0" -python-decouple = ">=3.8,<4.0" requests = ">=2.32.3,<3.0.0" requests-oauthlib = ">=2.0.0,<3.0.0" [[package]] name = "openadr3-client-gac-compliance" -version = "1.4.0" +version = "2.0.0" description = "" optional = false python-versions = "<4,>=3.12" groups = ["main"] files = [ - {file = "openadr3_client_gac_compliance-1.4.0-py3-none-any.whl", hash = "sha256:398e440232d1b568e8c3b852d8af9914b5e9c96a427aa5119d0531b119ef27cd"}, - {file = "openadr3_client_gac_compliance-1.4.0.tar.gz", hash = "sha256:14b3b93227af8f41261e0abd078e8a37c5f2e8f4426d719ba605ee80520bfd38"}, + {file = "openadr3_client_gac_compliance-2.0.0-py3-none-any.whl", hash = "sha256:c16e80afb386140fb51aaaee91103ab769cc3393ff1e6becadb687b634e6e7f7"}, + {file = "openadr3_client_gac_compliance-2.0.0.tar.gz", hash = "sha256:38bc52a340dadd57d2fb9f35182a79caac46f7f29ce4a7f6fd21b28afdf8133e"}, ] [package.dependencies] -openadr3-client = ">=0.0.7,<1.0.0" +openadr3-client = ">=0.0.11,<1.0.0" pycountry = ">=24.6.1,<25.0.0" pydantic = ">=2.11.2,<3.0.0" +python-decouple = ">=3.8,<4.0" [[package]] name = "packaging" @@ -1738,14 +1753,14 @@ typing-extensions = ">=4.1.1,<5.0.0" [[package]] name = "requests" -version = "2.32.4" +version = "2.32.5" description = "Python HTTP for Humans." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"}, - {file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"}, + {file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"}, + {file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"}, ] [package.dependencies] @@ -1853,6 +1868,18 @@ files = [ [package.dependencies] typing_extensions = ">=4.14.0" +[[package]] +name = "types-oauthlib" +version = "3.3.0.20250822" +description = "Typing stubs for oauthlib" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_oauthlib-3.3.0.20250822-py3-none-any.whl", hash = "sha256:b7f4c9b9eed0e020f454e0af800b10e93dd2efd196da65744b76910cce7e70d6"}, + {file = "types_oauthlib-3.3.0.20250822.tar.gz", hash = "sha256:2cd41587dd80c199e4230e3f086777e9ae525e89579c64afe5e0039ab09be9de"}, +] + [[package]] name = "types-pytz" version = "2025.2.0.20250516" @@ -1865,6 +1892,37 @@ files = [ {file = "types_pytz-2025.2.0.20250516.tar.gz", hash = "sha256:e1216306f8c0d5da6dafd6492e72eb080c9a166171fa80dd7a1990fd8be7a7b3"}, ] +[[package]] +name = "types-requests" +version = "2.32.4.20250913" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.9" +groups = ["main", "dev"] +files = [ + {file = "types_requests-2.32.4.20250913-py3-none-any.whl", hash = "sha256:78c9c1fffebbe0fa487a418e0fa5252017e9c60d1a2da394077f1780f655d7e1"}, + {file = "types_requests-2.32.4.20250913.tar.gz", hash = "sha256:abd6d4f9ce3a9383f269775a9835a4c24e5cd6b9f647d64f88aa4613c33def5d"}, +] + +[package.dependencies] +urllib3 = ">=2" + +[[package]] +name = "types-requests-oauthlib" +version = "2.0.0.20250809" +description = "Typing stubs for requests-oauthlib" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_requests_oauthlib-2.0.0.20250809-py3-none-any.whl", hash = "sha256:0d1af4907faf9f4a1b0f0afbc7ec488f1dd5561a2b5b6dad70f78091a1acfb76"}, + {file = "types_requests_oauthlib-2.0.0.20250809.tar.gz", hash = "sha256:f3b9b31e0394fe2c362f0d44bc9ef6d5c150a298d01089513cd54a51daec37a2"}, +] + +[package.dependencies] +types-oauthlib = "*" +types-requests = "*" + [[package]] name = "typing-extensions" version = "4.14.1" @@ -1926,7 +1984,7 @@ version = "2.5.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"}, {file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"}, @@ -2078,4 +2136,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.12, <4" -content-hash = "52f89cb1b8ddff689f8ba349fda65dff0df7a737f70c5e7236a97ed9865f3886" +content-hash = "c95bb38905c80edda62bef86b0ca293abed2d4ae288f09c83974730f4eff612a" diff --git a/pyproject.toml b/pyproject.toml index a91c071..902539a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,14 +8,18 @@ authors = [ readme = "README.md" requires-python = ">=3.12, <4" dependencies = [ - "openadr3-client-gac-compliance (>=1.4.0,<2.0.0)", + "openadr3-client-gac-compliance (==2.0.0)", "python-decouple (>=3.8,<4.0)", "influxdb-client[async] (>=1.49.0,<2.0.0)", "ruff (>=0.12.4,<0.13.0)", "mypy (>=1.17.0,<2.0.0)", "pytest (>=8.4.1,<9.0.0)", "azure-functions (>=1.23.0,<2.0.0)", - "openadr3-client (>=0.0.7,<0.0.8)" + "openadr3-client (==0.0.11)", + "requests (>=2.32.5,<3.0.0)", + "requests-oauthlib (>=2.0.0,<3.0.0)", + "holidays (>=0.83,<0.84)", + "types-requests (>=2.32.4.20250913,<3.0.0.0)", ] [build-system] @@ -28,6 +32,8 @@ package-mode = false [tool.poetry.group.dev.dependencies] pytest-cov = "^6.2.1" +types-oauthlib = "^3.3.0.20250822" +types-requests-oauthlib = "^2.0.0.20250809" [[tool.mypy.overrides]] module = ["decouple", "openadr3_client_gac_compliance"] diff --git a/src/application/generate_events.py b/src/application/generate_events.py index 7bb5470..cc5c6e5 100644 --- a/src/application/generate_events.py +++ b/src/application/generate_events.py @@ -69,7 +69,7 @@ def _generate_capacity_limitation_intervals( type=EventPayloadType.IMPORT_CAPACITY_LIMIT, values=( predicted_grid_asset_loads.flex_capacity_required(max_capacity) - or 4, + or 22, ), ), ), diff --git a/src/config.py b/src/config.py index 8992a91..a803e78 100644 --- a/src/config.py +++ b/src/config.py @@ -21,3 +21,25 @@ # INFLUXDB parameters (secrets) INFLUXDB_TOKEN = config("INFLUXDB_TOKEN") INFLUXDB_URL = config("INFLUXDB_URL") + +PREDICTED_TRAFO_LOAD_BUCKET = config( + "PREDICTED_TRAFO_LOAD_BUCKET", default="ditm_model_output" +) +STANDARD_PROFILES_BUCKET_NAME = config( + "STANDARD_PROFILES_BUCKET_NAME", default="ditm_standard_profiles" +) +DALIDATA_BUCKET_NAME = config("DALIDATA_BUCKET_NAME", default="dalidata") + +# External services URLs +WEATHER_FORECAST_API_URL = config("WEATHER_FORECAST_API_URL") + +# Authentication to Azure ML managed endpoint for prediction model +DITM_MODEL_API_URL = config("DITM_MODEL_API_URL") +DITM_MODEL_API_CLIENT_ID = config("DITM_MODEL_API_CLIENT_ID") +DITM_MODEL_API_CLIENT_SECRET = config("DITM_MODEL_API_CLIENT_SECRET") +DITM_MODEL_API_TOKEN_URL = config("DITM_MODEL_API_TOKEN_URL") + +OAUTH_CLIENT_ID = config("OAUTH_CLIENT_ID") +OAUTH_CLIENT_SECRET = config("OAUTH_CLIENT_SECRET") +OAUTH_TOKEN_ENDPOINT = config("OAUTH_TOKEN_ENDPOINT") +OAUTH_SCOPES = config("OAUTH_SCOPES") diff --git a/src/infrastructure/_auth/http/authenticated_session.py b/src/infrastructure/_auth/http/authenticated_session.py new file mode 100644 index 0000000..e507607 --- /dev/null +++ b/src/infrastructure/_auth/http/authenticated_session.py @@ -0,0 +1,56 @@ +"""Implementation of a HTTP session which has an associated access token that is send to every request.""" + +from typing import Optional +from requests import PreparedRequest, Session +from requests.auth import AuthBase + +from src.config import ( + DITM_MODEL_API_CLIENT_ID, + DITM_MODEL_API_CLIENT_SECRET, + DITM_MODEL_API_TOKEN_URL, +) +from src.infrastructure._auth.token_manager import ( + OAuthTokenManager, + OAuthTokenManagerConfig, +) + + +class _BearerAuth(AuthBase): + """AuthBase implementation that includes a bearer token in all requests.""" + + def __init__(self, token_manager: OAuthTokenManager) -> None: + self._token_manager = token_manager + + def __call__(self, r: PreparedRequest) -> PreparedRequest: + """ + Perform the request. + + Adds the bearer token to the 'Authorization' request header before the call is made. + If the 'Authorization' was already present, it is replaced. + """ + # The token manager handles caching internally, so we can safely invoke this + # for each request. + r.headers["Authorization"] = "Bearer " + self._token_manager.get_access_token() + return r + + +class _BearerAuthenticatedSession(Session): + """Session that includes a bearer token in all requests made through it.""" + + def __init__( + self, + token_manager: Optional[OAuthTokenManager] = None, + scopes: Optional[list[str]] = None, + ) -> None: + super().__init__() + if not token_manager: + token_manager = OAuthTokenManager( + OAuthTokenManagerConfig( + client_id=DITM_MODEL_API_CLIENT_ID, + client_secret=DITM_MODEL_API_CLIENT_SECRET, + token_url=DITM_MODEL_API_TOKEN_URL, + scopes=scopes, + audience=None, + ) + ) + self.auth = _BearerAuth(token_manager) diff --git a/src/infrastructure/_auth/token_manager.py b/src/infrastructure/_auth/token_manager.py new file mode 100644 index 0000000..6e57b49 --- /dev/null +++ b/src/infrastructure/_auth/token_manager.py @@ -0,0 +1,88 @@ +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from threading import Lock + +from oauthlib.oauth2 import BackendApplicationClient +from requests_oauthlib import OAuth2Session + +from src.logger import logger + + +@dataclass +class OAuthTokenManagerConfig: + client_id: str + client_secret: str + token_url: str + scopes: list[str] | None + audience: str | None + + +class OAuthTokenManager: + """An OAuth token manager responsible for the retrieval and caching of access tokens.""" + + def __init__(self, config: OAuthTokenManagerConfig) -> None: + self.client = BackendApplicationClient( + client_id=config.client_id, + scope=" ".join(config.scopes) if config.scopes is not None else None, + ) + self.oauth = OAuth2Session(client=self.client) + self.token_url = config.token_url + self.client_secret = config.client_secret + self.audience = config.audience + if self.token_url is None: + msg = "token_url is required" + raise ValueError(msg) + + if self.client_secret is None: + msg = "client_secret is required" + raise ValueError(msg) + + self._lock = Lock() + self._cached_token: tuple[datetime, str] | None = None + + def get_access_token(self) -> str: + """ + Retrieves an access token from the token manager. + + If a cached token is present in the token manager, this token is returned. + If no cached token is present, a new token is fetched, cached and returned. + + Returns: + str: The access token. + + """ + with self._lock: + if self._cached_token: + expiration_time, token = self._cached_token + + if expiration_time > datetime.now(tz=UTC): + return token + + # If we reach here, the token has reached its expiration time. + # Remove the token and fetch a new one. + self._cached_token = None + + return self._get_new_access_token() + + def _get_new_access_token(self) -> str: + token_response = self.oauth.fetch_token( + token_url=self.token_url, + client_secret=self.client_secret, + audience=self.audience, + ) + + # Calculate expiration time (half of token lifetime) + expires_in_seconds = token_response.get("expires_in", 3600) + expiration_time = datetime.now(tz=UTC) + timedelta( + seconds=expires_in_seconds // 2 + ) + + access_token = token_response.get("access_token") + + if not access_token: + logger.error("OAuthTokenManager - access_token not present in response") + exc_msg = "Access token was not present in token response" + raise ValueError(exc_msg) + + self._cached_token = (expiration_time, access_token) + return access_token diff --git a/src/infrastructure/azureml/feature_generation.py b/src/infrastructure/azureml/feature_generation.py new file mode 100644 index 0000000..9ce658b --- /dev/null +++ b/src/infrastructure/azureml/feature_generation.py @@ -0,0 +1,225 @@ +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import holidays +import pandas as pd + +from influxdb_client.client.query_api_async import QueryApiAsync +from src.infrastructure.influxdb.dalidata.query_dali_data import ( + retrieve_dali_data_between, +) +from src.infrastructure.weather_data.weather_forecast import WeatherForecastData + + +def _get_weather_features_for_dates( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: + """Get weather features for each date between the given datetime range. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing weather forecasts for the date range. + """ + weather_forecast = WeatherForecastData() + weather_forecasts = weather_forecast.etl_weather_forecast_data( + start_date_inclusive, end_date_inclusive + ) + return weather_forecasts.rename(columns={"date_time": "datetime"}) + + +def _get_time_features_for_dates( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: + """Get time features for each date between the given datetime range. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing time features for the date range. + """ + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) + + predict_datetimes_df["year"] = predict_datetimes_df["datetime"].dt.year + predict_datetimes_df["month"] = predict_datetimes_df["datetime"].dt.month + predict_datetimes_df["day"] = predict_datetimes_df["datetime"].dt.day + predict_datetimes_df["hour"] = predict_datetimes_df["datetime"].dt.hour + predict_datetimes_df["minute"] = predict_datetimes_df["datetime"].dt.minute + predict_datetimes_df["dayofyear"] = predict_datetimes_df["datetime"].dt.dayofyear + predict_datetimes_df["dayofweek"] = predict_datetimes_df["datetime"].dt.dayofweek + predict_datetimes_df["weekofyear"] = ( + predict_datetimes_df["datetime"].dt.isocalendar().week + ) + + weekend_cutoff = 5 + predict_datetimes_df["is_weekend"] = predict_datetimes_df[ + "datetime" + ].dt.dayofweek.apply(func=lambda x: 1 if x >= weekend_cutoff else 0) + + nl_holidays = holidays.country_holidays(country="NL") + predict_datetimes_df["is_holiday"] = predict_datetimes_df["datetime"].apply( + func=lambda x: 1 if x.date() in nl_holidays else 0 + ) + return predict_datetimes_df + + +async def _get_lag_features_for_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: + """Get time features for each date between the given datetime range. + + Args: + query_api (QueryApi): The read-only connection to the influx database. + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing time features for the date range. + """ + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) + + # Retrieve the dalidata starting from a year ago, since we account for the lag a year ago. + start_date_year_ago = start_date_inclusive - timedelta(days=366) + # Retrieve up to 1 day before the end_date + end_date_day_ago = end_date_inclusive - timedelta(days=1) + + dalidata_df = await retrieve_dali_data_between( + query_api=query_api, + start_date_inclusive=start_date_year_ago, + end_date_inclusive=end_date_day_ago, + ) + + dalidata_df["datetime"] = pd.to_datetime(dalidata_df["datetime"], utc=True) + dalidata_df = dalidata_df.set_index("datetime") + + lag_1_year_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(years=1) + ) + lag_1_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=1) + ) + lag_2_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=2) + ) + lag_3_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=3) + ) + lag_4_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=4) + ) + lag_5_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=5) + ) + lag_6_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=6) + ) + lag_7_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=7) + ) + + predict_datetimes_df["lag_1_year"] = dalidata_df.reindex(lag_1_year_dt)[ + "value" + ].values + predict_datetimes_df["lag_1_days"] = dalidata_df.reindex(lag_1_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_2_days"] = dalidata_df.reindex(lag_2_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_3_days"] = dalidata_df.reindex(lag_3_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_4_days"] = dalidata_df.reindex(lag_4_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_5_days"] = dalidata_df.reindex(lag_5_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_6_days"] = dalidata_df.reindex(lag_6_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_7_days"] = dalidata_df.reindex(lag_7_day_dt)[ + "value" + ].values + + return predict_datetimes_df + + +def _get_mock_standard_profile_features( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) + + predict_datetimes_df["scaled_profile"] = 0 + + return predict_datetimes_df + + +async def get_features_between_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: + """Get features for the prediction model between the start date (inclusive) and end date (inclusive). + + Args: + query_api (QueryApi): The read-only connection to the influx database. + start_date_inclusive (datetime): The start date (inclusive) + start_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: A dataframe containing all the features for the given time range. + """ + time_features = _get_time_features_for_dates( + start_date_inclusive, end_date_inclusive + ) + lag_features = await _get_lag_features_for_dates( + query_api, start_date_inclusive, end_date_inclusive + ) + weather_features = _get_weather_features_for_dates( + start_date_inclusive, end_date_inclusive + ) + # standard_profiles = await retrieve_standard_profiles_between_dates(query_api, start_date_inclusive, end_date_inclusive) + standard_profiles = _get_mock_standard_profile_features( + start_date_inclusive, end_date_inclusive + ) + + return pd.concat( + [time_features, lag_features, weather_features, standard_profiles], axis=1 + ) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py new file mode 100644 index 0000000..df57d8a --- /dev/null +++ b/src/infrastructure/azureml/predictions.py @@ -0,0 +1,120 @@ +from datetime import timedelta +from typing import Any + +from src.config import ( + DITM_MODEL_API_URL, +) +from src.infrastructure._auth.http.authenticated_session import ( + _BearerAuthenticatedSession, +) +from src.models.predicted_load import PredictedGridAssetLoad +import pandas as pd + + +class _DitmPredictionPayload: + def __init__( + self, columns: list[str], index: list[int], data: list[Any], params: dict + ) -> None: + """Create a DITM predictions payload + + Args: + columns (list[str]): The columns to perform inference on + index (list[str]): The indexes + data (list[Any]): The data + params (dict): The parameters + + Returns: + DitmPredictionPayload: The payload + """ + self.columns = columns + self.index = index + self.data = data + self.params = params + + def as_json(self) -> dict: + data = { + "input_data": { + "columns": self.columns, + "index": self.index, + "data": self.data, + }, + "params": self.params, + } + + return data + + +def get_predictions_for_features( + features: pd.DataFrame, +) -> list[PredictedGridAssetLoad]: + """Get transformer load predictions between the start date (exclusive) and end date (inclusive). + + Args: + features (pd.DataFrame): The features to make prediction(s) for. + + Returns: + list[TransformerLoad]: The list of transformer load predictions + """ + copied_features = features.copy() + altered_features = copied_features.reset_index(drop=True).drop(columns=["datetime"]) + altered_features.fillna(0, inplace=True) + + session = _BearerAuthenticatedSession(scopes=["https://ml.azure.com/.default"]) + headers = {"Content-Type": "application/json", "Accept": "application/json"} + payload = _DitmPredictionPayload( + columns=[ + "year", + "month", + "day", + "hour", + "minute", + "dayofyear", + "dayofweek", + "weekofyear", + "is_weekend", + "is_holiday", + "lag_1_days", + "lag_2_days", + "lag_3_days", + "lag_4_days", + "lag_5_days", + "lag_6_days", + "lag_7_days", + "lag_1_year", + "temperature", + "irradiation_duration", + "irradiation", + "cloud_coverage", + "rain", + "humidity", + "snow", + "scaled_profile", + ], + index=list(range(len(altered_features))), + data=altered_features.values.tolist(), + params={}, + ) + + response = session.post( + url=DITM_MODEL_API_URL, headers=headers, json=payload.as_json() + ) + + predictions = [float(x) for x in response.json()] + + if len(predictions) != len(features): + raise ValueError("Features dataframe and predictions list did not match") + + loads: list[PredictedGridAssetLoad] = [] + + for index, pred in enumerate(predictions): + matching_df_row = features.iloc[index] + datetime_of_load = matching_df_row["datetime"].iloc[0] + converted = pd.to_datetime(datetime_of_load, utc=True).to_pydatetime() + + loads.append( + PredictedGridAssetLoad( + time=converted, duration=timedelta(minutes=15), load=pred + ) + ) + + return loads diff --git a/src/infrastructure/influxdb/dalidata/query_dali_data.py b/src/infrastructure/influxdb/dalidata/query_dali_data.py new file mode 100644 index 0000000..7fd6517 --- /dev/null +++ b/src/infrastructure/influxdb/dalidata/query_dali_data.py @@ -0,0 +1,39 @@ +from datetime import datetime + +import pandas as pd + +from src.config import INFLUXDB_ORG, DALIDATA_BUCKET_NAME +from influxdb_client.client.query_api_async import QueryApiAsync + + +async def retrieve_dali_data_between( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: + """Retrieve standard profiles from InfluxDB between the given dates. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing standard profiles for the date range. + """ + start_date_str = start_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + end_date_str = end_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + + query = f"""from(bucket: "{DALIDATA_BUCKET_NAME}") + |> range(start: {start_date_str}, stop: {end_date_str}) + |> filter(fn: (r) => r["_measurement"] == "kinesis_data") + |> filter(fn: (r) => r["_field"] == "value") + |> filter(fn: (r) => r["boxid"] == "CVD.091030-1") + |> filter(fn: (r) => r["description"] == "Trafometing vermogen som 3 fases - gem. 15 min." or r["description"] == "Trafometing_vermogen_som_3_fases_-_gem._15_min.") + |> group(columns: []) + |> sort(columns: ["_time"]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + """ # noqa: E501 + + df = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) + + return df.rename(columns={"_time": "datetime"}) diff --git a/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py new file mode 100644 index 0000000..37d3cfe --- /dev/null +++ b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py @@ -0,0 +1,34 @@ +from datetime import datetime + +import pandas as pd +from src.config import INFLUXDB_ORG, STANDARD_PROFILES_BUCKET_NAME +from influxdb_client.client.query_api_async import QueryApiAsync + + +async def retrieve_standard_profiles_between_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: + """Retrieve standard profiles from InfluxDB between the given dates. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing standard profiles for the date range. + """ + start_date_str = start_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + end_date_str = end_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + + query = f""" + from(bucket: "{STANDARD_PROFILES_BUCKET_NAME}") + |> range(start: {start_date_str}, stop: {end_date_str}) + |> filter(fn: (r) => r["_measurement"] == "standard_profile") + |> filter(fn: (r) => "profile") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + """ + + response = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) + return response.rename(columns={"_time": "datumtijd"}) diff --git a/src/infrastructure/influxdb/trafo_load_audit.py b/src/infrastructure/influxdb/trafo_load_audit.py new file mode 100644 index 0000000..81534da --- /dev/null +++ b/src/infrastructure/influxdb/trafo_load_audit.py @@ -0,0 +1,28 @@ +"""Module which contains functions to retrieve predicted trafo load from an external database.""" + +from influxdb_client.client.write_api_async import WriteApiAsync +import pandas as pd + +from src.config import PREDICTED_TRAFO_LOAD_BUCKET +from src.models.predicted_load import PredictedGridAssetLoad + + +async def store_predictions_for_audit( + write_api: WriteApiAsync, predicted_loads: list[PredictedGridAssetLoad] +) -> None: + """Write predicted transformer loads to the database for auditing purposes. + + Args: + write_api (WriteApi): The write connection to the database. + predicted_loads (list[PredictedGridAssetLoad]): List of predicted transformer loads to write to the database. + """ + df = pd.DataFrame( + [(tl.time, tl.load) for tl in predicted_loads], columns=["datetime", "WAARDE"] + ) + + await write_api.write( + bucket=PREDICTED_TRAFO_LOAD_BUCKET, + record=df, + data_frame_measurement_name="predictions", + data_frame_timestamp_column="datetime", + ) diff --git a/src/infrastructure/prediction_actions_impl.py b/src/infrastructure/prediction_actions_impl.py index 7a4ab98..89f8a77 100644 --- a/src/infrastructure/prediction_actions_impl.py +++ b/src/infrastructure/prediction_actions_impl.py @@ -6,10 +6,8 @@ from influxdb_client.client.query_api_async import QueryApiAsync from src.application.generate_events import PredictionActionsBase -from src.config import INFLUXDB_BUCKET -from src.infrastructure.influxdb.prediction_retrieval import ( - retrieve_predicted_grid_asset_load, -) +from src.infrastructure.azureml.feature_generation import get_features_between_dates +from src.infrastructure.azureml.predictions import get_predictions_for_features from src.models.predicted_load import PredictedGridAssetLoad @@ -49,9 +47,9 @@ async def get_predicted_grid_asset_load( Returns: list[TransformerLoad]: The list of predicted transformer loads. """ - return await retrieve_predicted_grid_asset_load( + features_for_time_range = await get_features_between_dates( query_api=query_api, - bucket=INFLUXDB_BUCKET, - from_date=from_date, - to_date=to_date, + start_date_inclusive=from_date, + end_date_inclusive=to_date, ) + return get_predictions_for_features(features=features_for_time_range) diff --git a/src/infrastructure/weather_data/weather_forecast.py b/src/infrastructure/weather_data/weather_forecast.py new file mode 100644 index 0000000..292a823 --- /dev/null +++ b/src/infrastructure/weather_data/weather_forecast.py @@ -0,0 +1,157 @@ +"""Module for retrieving and processing weather forecasting data. + +This module provides: +- _call_weather_forecast_api: Function that calls the Open-Meteo API to fetch hourly forecast data. +- _interpolate_hourly_data_to_quarterly: Function that interpolates the hourly values data to quarterly values. +- etl_weather_forecast_data: Function that extracts, transforms and loads weather forecast data. +""" + +from datetime import datetime, timezone +from typing import Any + +import pandas as pd +import requests +from pandas import concat + +from src.config import WEATHER_FORECAST_API_URL + + +class WeatherForecastData: + """Class for weather forecast data from Open Meteo.""" + + def __init__(self) -> None: + """Initializes the weather forecast data class.""" + self.om_weather_forecast_vars = { + "temperature_2m": "temperature", + "shortwave_radiation": "irradiation", + "sunshine_duration": "irradiation_duration", + "cloud_cover": "cloud_coverage", + "rain": "rain", + "relative_humidity_2m": "humidity", + "snowfall": "snow", + } + + def _call_weather_forecast_api( + self, start_time_inclusive: datetime, end_time_inclusive: datetime + ) -> dict: + """Calls the Open-Meteo API to retrieve hourly weather forecast data. + + Args: + start_time_inclusive (datetime): Datetime from which to start fetching forecasts + end_time_inclusive (datetime): Datetime from which to stop fetching forecasts + + Returns: + JSON response containing weather forecast data. + """ + start_time_utc = start_time_inclusive.astimezone(timezone.utc) + end_time_utc = end_time_inclusive.astimezone(timezone.utc) + + start_time = start_time_utc.strftime(format="%Y-%m-%dT%H:%M") + end_time = end_time_utc.strftime(format="%Y-%m-%dT%H:%M") + + params: dict[str, Any] = { + "latitude": 52.7481819, + "longitude": 6.5663292, + "hourly": list(self.om_weather_forecast_vars.keys()), + "models": "knmi_seamless", + "start_hour": start_time, + "end_hour": end_time, + } + + response = requests.get(url=WEATHER_FORECAST_API_URL, params=params, timeout=10) + response.raise_for_status() + return response.json() + + def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: + """.""" + match weather_var: + case "temperature": + self.weather_data[weather_var] = self.weather_data[ + weather_var + ].interpolate(method="linear", limit=3, limit_direction="forward") + + case "irradiation" | "irradiation_duration" | "rain" | "humidity": + self.weather_data[weather_var] = ( + self.weather_data[weather_var] + .interpolate(method="linear", limit=3, limit_direction="forward") + .clip(lower=0) + ) + + case "cloud_coverage" | "snow": + self.weather_data[weather_var] = self.weather_data[weather_var].ffill() + + def etl_weather_forecast_data( + self, start_time_inclusive: datetime, end_time_inclusive: datetime + ) -> pd.DataFrame: + """Extracts, transforms, and loads weather forecast data. + + - Extracts and transforms the forecast data. + - Reindexes data to 15-minute intervals to maintain consitency. + - Interpolates temperature and irradiation to 15-minute intervals. + + Args: + start_time_inclusive (datetime): Datetime from which to start fetching forecasts + end_time_inclusive (datetime): Datetime from which to stop fetching forecasts + + Returns: + Processed DataFrame with interpolated weather data at 15-minute intervals. + """ + forecast_dict = self._call_weather_forecast_api( + start_time_inclusive, end_time_inclusive + ) + + weather_data = pd.DataFrame({"datetime": forecast_dict["hourly"]["time"]}) + weather_data = concat( + objs=[ + weather_data, + pd.DataFrame( + { + value: forecast_dict["hourly"][key] + for key, value in self.om_weather_forecast_vars.items() + } + ), + ], + axis=1, + ) + weather_data["datetime"] = pd.to_datetime( + arg=weather_data["datetime"], utc=True + ) + weather_data["snow"] = weather_data["snow"].apply(func=lambda x: (x > 0) * 1) + weather_data["cloud_coverage"] = weather_data["cloud_coverage"].apply( + func=lambda x: int(x / 100 * 9) + ) + weather_data["irradiation_duration"] = weather_data[ + "irradiation_duration" + ].apply( + func=lambda x: x / 3600 if x != -1 else x + ) # Irradiation duration is in 0.1 hours, converted to seconds + weather_data["irradiation"] = weather_data["irradiation"].apply( + func=lambda x: x * 0.36 if x != -1 else x + ) # Irradiation duration is in 0.1 hours, converted to seconds + + # Generate 15-minute intervals over the date range + full_date_range = pd.date_range( + start=weather_data["datetime"].iloc[0].tz_localize(None), + end=weather_data["datetime"].iloc[-1].tz_localize(None), + freq="15min", + tz="UTC", + ) + + # Reindex and interpolate missing values + self.weather_data = ( + weather_data.set_index("datetime") + .reindex(full_date_range) + .iloc[:-1] + .reset_index() + .rename(columns={"index": "datetime"}) + ).fillna(0) + + self.weather_data["datetime"] = self.weather_data["datetime"] = ( + self.weather_data["datetime"].dt.tz_convert("Europe/Amsterdam") + ) + + # Interpolate values of features to 15 min interval + for weather_var in self.om_weather_forecast_vars.values(): + self._interpolate_hourly_data_to_quarterly(weather_var=weather_var) + + return self.weather_data diff --git a/src/main.py b/src/main.py index e40bcda..65ec62e 100644 --- a/src/main.py +++ b/src/main.py @@ -8,9 +8,18 @@ from openadr3_client._vtn.interfaces.filters import TargetFilter from src.application.generate_events import get_capacity_limitation_event -from src.infrastructure.predictions_actions_stub_impl import PredictionActionsStub +from src.infrastructure.influxdb._client import create_db_client +from src.infrastructure.prediction_actions_impl import PredictionActionsInfluxDB from src.logger import logger -from src.config import PROGRAM_ID, VEN_NAME, VTN_BASE_URL +from src.config import ( + PROGRAM_ID, + VEN_NAME, + VTN_BASE_URL, + OAUTH_CLIENT_ID, + OAUTH_CLIENT_SECRET, + OAUTH_TOKEN_ENDPOINT, + OAUTH_SCOPES, +) bp = func.Blueprint() @@ -22,7 +31,11 @@ def _initialize_bl_client() -> BusinessLogicClient: BusinessLogicClient: The BL client. """ bl_client = BusinessLogicHttpClientFactory.create_http_bl_client( - vtn_base_url=VTN_BASE_URL + vtn_base_url=VTN_BASE_URL, + client_id=OAUTH_CLIENT_ID, + client_secret=OAUTH_CLIENT_SECRET, + token_url=OAUTH_TOKEN_ENDPOINT, + scopes=OAUTH_SCOPES.split(","), ) return bl_client @@ -37,11 +50,10 @@ async def _generate_events() -> NewEvent | None: # Start time is 12:00 today. start_time = current_time_ams.replace(hour=12, minute=0, second=0, microsecond=0) - # End time is 12:00 48 hours in the future - end_time = start_time + timedelta(days=2) + # End time is 12:00 24 hours in the future + end_time = start_time + timedelta(days=1) - # actions = PredictionActionsInfluxDB(client=create_db_client()) - actions = PredictionActionsStub() + actions = PredictionActionsInfluxDB(client=create_db_client()) return await get_capacity_limitation_event( actions, from_date=start_time, to_date=end_time @@ -91,8 +103,12 @@ async def main() -> None: logger.info("Python timer trigger function executed.") +# if __name__ == "__main__": +# asyncio.run(main()) + + @bp.schedule( - schedule="0 50 5 * * *", + schedule="0 55 5 * * *", arg_name="myTimer", run_on_startup=False, use_monitor=False, diff --git a/src/models/predicted_load.py b/src/models/predicted_load.py index 77e30df..e11f2d1 100644 --- a/src/models/predicted_load.py +++ b/src/models/predicted_load.py @@ -52,8 +52,20 @@ def flex_capacity_required(self, max_capacity: float) -> float | None: Returns: float: The amount of kw of flex which is needed for this grid asset load period. """ - if self.load > max_capacity: - return self.load - max_capacity + min_guaranteed_capacity = 4 + max_capacity_of_pod = ( + 22 # TODO: configure based on input from actual battery when available. + ) + max_excess = 50 - # No flex capacity needed for this period. - return None + excess_load = self.load - max_capacity + if excess_load <= 0: + return 0 # not exceeding capacity + + # Linear scaling from min_value to max_value + scaled = min_guaranteed_capacity + ( + max_capacity_of_pod - min_guaranteed_capacity + ) * (excess_load / max_excess) + + # Cap to max_value + return min(max_capacity_of_pod, scaled)