From 60951bcffde040b3a6609b5bfa1884cbd0497197 Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Tue, 21 Oct 2025 16:09:51 +0200 Subject: [PATCH 01/10] WIP commit --- poetry.lock | 41 +++-- pyproject.toml | 7 +- src/config.py | 15 ++ .../_auth/http/authenticated_session.py | 45 +++++ src/infrastructure/_auth/token_manager.py | 81 +++++++++ .../azureml/feature_generation.py | 155 ++++++++++++++++++ src/infrastructure/azureml/predictions.py | 103 ++++++++++++ .../influxdb/dalidata/query_dali_data.py | 35 ++++ .../query_standard_profiles.py | 33 ++++ .../influxdb/trafo_load_audit.py | 27 +++ src/infrastructure/prediction_actions_impl.py | 10 +- .../weather_data/weather_forecast.py | 132 +++++++++++++++ src/main.py | 59 ++++--- 13 files changed, 696 insertions(+), 47 deletions(-) create mode 100644 src/infrastructure/_auth/http/authenticated_session.py create mode 100644 src/infrastructure/_auth/token_manager.py create mode 100644 src/infrastructure/azureml/feature_generation.py create mode 100644 src/infrastructure/azureml/predictions.py create mode 100644 src/infrastructure/influxdb/dalidata/query_dali_data.py create mode 100644 src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py create mode 100644 src/infrastructure/influxdb/trafo_load_audit.py create mode 100644 src/infrastructure/weather_data/weather_forecast.py diff --git a/poetry.lock b/poetry.lock index a1f1477..f1231a5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -704,6 +704,21 @@ files = [ {file = "frozenlist-1.7.0.tar.gz", hash = "sha256:2e310d81923c2437ea8670467121cc3e9b0f76d3043cc1d2331d56c7fb7a3a8f"}, ] +[[package]] +name = "holidays" +version = "0.83" +description = "Open World Holidays Framework" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "holidays-0.83-py3-none-any.whl", hash = "sha256:e36a368227b5b62129871463697bfde7e5212f6f77e43640320b727b79a875a8"}, + {file = "holidays-0.83.tar.gz", hash = "sha256:99b97b002079ab57dac93295933907d2aae2742ad9a4d64fe33864dfae6805fa"}, +] + +[package.dependencies] +python-dateutil = "*" + [[package]] name = "idna" version = "3.10" @@ -1099,14 +1114,14 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] [[package]] name = "openadr3-client" -version = "0.0.7" +version = "0.0.11" description = "" optional = false python-versions = "<4,>=3.12" groups = ["main"] files = [ - {file = "openadr3_client-0.0.7-py3-none-any.whl", hash = "sha256:e6175fa5e92e58f1b9b11b211f10f23ff2e42f3a25fc517f13734c9fd60a991e"}, - {file = "openadr3_client-0.0.7.tar.gz", hash = "sha256:37f841e9123ebb05f1e7cda97f9e87f9e0289553d52f3e627cfd1c91a3e62d18"}, + {file = "openadr3_client-0.0.11-py3-none-any.whl", hash = "sha256:ce050556a5ff0566671e7c8052f99c22c3f51dc1f8665124b372277f17366ec0"}, + {file = "openadr3_client-0.0.11.tar.gz", hash = "sha256:2a975db33d7f72fa91a260c33822c9b3def037aa03e8fff5428d1922d4e3adf6"}, ] [package.dependencies] @@ -1118,26 +1133,26 @@ pandera = {version = ">=0.23.1,<0.24.0", extras = ["mypy"]} pycountry = ">=24.6.1,<25.0.0" pydantic = ">=2.11.2,<3.0.0" pydantic-extra-types = ">=2.10.3,<3.0.0" -python-decouple = ">=3.8,<4.0" requests = ">=2.32.3,<3.0.0" requests-oauthlib = ">=2.0.0,<3.0.0" [[package]] name = "openadr3-client-gac-compliance" -version = "1.4.0" +version = "2.0.0" description = "" optional = false python-versions = "<4,>=3.12" groups = ["main"] files = [ - {file = "openadr3_client_gac_compliance-1.4.0-py3-none-any.whl", hash = "sha256:398e440232d1b568e8c3b852d8af9914b5e9c96a427aa5119d0531b119ef27cd"}, - {file = "openadr3_client_gac_compliance-1.4.0.tar.gz", hash = "sha256:14b3b93227af8f41261e0abd078e8a37c5f2e8f4426d719ba605ee80520bfd38"}, + {file = "openadr3_client_gac_compliance-2.0.0-py3-none-any.whl", hash = "sha256:c16e80afb386140fb51aaaee91103ab769cc3393ff1e6becadb687b634e6e7f7"}, + {file = "openadr3_client_gac_compliance-2.0.0.tar.gz", hash = "sha256:38bc52a340dadd57d2fb9f35182a79caac46f7f29ce4a7f6fd21b28afdf8133e"}, ] [package.dependencies] -openadr3-client = ">=0.0.7,<1.0.0" +openadr3-client = ">=0.0.11,<1.0.0" pycountry = ">=24.6.1,<25.0.0" pydantic = ">=2.11.2,<3.0.0" +python-decouple = ">=3.8,<4.0" [[package]] name = "packaging" @@ -1738,14 +1753,14 @@ typing-extensions = ">=4.1.1,<5.0.0" [[package]] name = "requests" -version = "2.32.4" +version = "2.32.5" description = "Python HTTP for Humans." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"}, - {file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"}, + {file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"}, + {file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"}, ] [package.dependencies] @@ -2078,4 +2093,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.12, <4" -content-hash = "52f89cb1b8ddff689f8ba349fda65dff0df7a737f70c5e7236a97ed9865f3886" +content-hash = "65b711b44ff4a8bb28e9c4805797f2b1a8e0c664507c335aa04b4f1cb0f98670" diff --git a/pyproject.toml b/pyproject.toml index a91c071..b91b227 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,14 +8,17 @@ authors = [ readme = "README.md" requires-python = ">=3.12, <4" dependencies = [ - "openadr3-client-gac-compliance (>=1.4.0,<2.0.0)", + "openadr3-client-gac-compliance (==2.0.0)", "python-decouple (>=3.8,<4.0)", "influxdb-client[async] (>=1.49.0,<2.0.0)", "ruff (>=0.12.4,<0.13.0)", "mypy (>=1.17.0,<2.0.0)", "pytest (>=8.4.1,<9.0.0)", "azure-functions (>=1.23.0,<2.0.0)", - "openadr3-client (>=0.0.7,<0.0.8)" + "openadr3-client (==0.0.11)", + "requests (>=2.32.5,<3.0.0)", + "requests-oauthlib (>=2.0.0,<3.0.0)", + "holidays (>=0.83,<0.84)" ] [build-system] diff --git a/src/config.py b/src/config.py index 8992a91..3e626c0 100644 --- a/src/config.py +++ b/src/config.py @@ -21,3 +21,18 @@ # INFLUXDB parameters (secrets) INFLUXDB_TOKEN = config("INFLUXDB_TOKEN") INFLUXDB_URL = config("INFLUXDB_URL") + +PREDICTED_TRAFO_LOAD_BUCKET = config( + "PREDICTED_TRAFO_LOAD_BUCKET", default="ditm_model_output" +) +STANDARD_PROFILES_BUCKET_NAME = config("STANDARD_PROFILES_BUCKET_NAME", default="ditm_standard_profiles") +DALIDATA_BUCKET_NAME = config("DALIDATA_BUCKET_NAME", default="dalidata") + +# External services URLs +WEATHER_FORECAST_API_URL = config("WEATHER_FORECAST_API_URL") + +# Authentication to Azure ML managed endpoint for prediction model +DITM_MODEL_API_URL = config("DITM_MODEL_API_URL") +DITM_MODEL_API_CLIENT_ID = config("DITM_MODEL_API_CLIENT_ID") +DITM_MODEL_API_CLIENT_SECRET = config("DITM_MODEL_API_CLIENT_SECRET") +DITM_MODEL_API_TOKEN_URL = config("DITM_MODEL_API_TOKEN_URL") \ No newline at end of file diff --git a/src/infrastructure/_auth/http/authenticated_session.py b/src/infrastructure/_auth/http/authenticated_session.py new file mode 100644 index 0000000..93164c4 --- /dev/null +++ b/src/infrastructure/_auth/http/authenticated_session.py @@ -0,0 +1,45 @@ +"""Implementation of a HTTP session which has an associated access token that is send to every request.""" + +from typing import Optional +from requests import PreparedRequest, Session +from requests.auth import AuthBase + +from src.config import DITM_MODEL_API_CLIENT_ID, DITM_MODEL_API_CLIENT_SECRET, DITM_MODEL_API_TOKEN_URL +from src.infrastructure._auth.token_manager import OAuthTokenManager, OAuthTokenManagerConfig + + +class _BearerAuth(AuthBase): + """AuthBase implementation that includes a bearer token in all requests.""" + + def __init__(self, token_manager: OAuthTokenManager) -> None: + self._token_manager = token_manager + + def __call__(self, r: PreparedRequest) -> PreparedRequest: + """ + Perform the request. + + Adds the bearer token to the 'Authorization' request header before the call is made. + If the 'Authorization' was already present, it is replaced. + """ + # The token manager handles caching internally, so we can safely invoke this + # for each request. + r.headers["Authorization"] = "Bearer " + self._token_manager.get_access_token() + return r + + +class _BearerAuthenticatedSession(Session): + """Session that includes a bearer token in all requests made through it.""" + + def __init__(self, token_manager: Optional[OAuthTokenManager] = None, scopes: Optional[list[str]] = None) -> None: + super().__init__() + if not token_manager: + token_manager = OAuthTokenManager( + OAuthTokenManagerConfig( + client_id=DITM_MODEL_API_CLIENT_ID, + client_secret=DITM_MODEL_API_CLIENT_SECRET, + token_url=DITM_MODEL_API_TOKEN_URL, + scopes=scopes, + audience=None + ) + ) + self.auth = _BearerAuth(token_manager) \ No newline at end of file diff --git a/src/infrastructure/_auth/token_manager.py b/src/infrastructure/_auth/token_manager.py new file mode 100644 index 0000000..af436f0 --- /dev/null +++ b/src/infrastructure/_auth/token_manager.py @@ -0,0 +1,81 @@ +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from threading import Lock + +from oauthlib.oauth2 import BackendApplicationClient +from requests_oauthlib import OAuth2Session + + +@dataclass +class OAuthTokenManagerConfig: + client_id: str + client_secret: str + token_url: str + scopes: list[str] | None + audience: str | None + + +class OAuthTokenManager: + """An OAuth token manager responsible for the retrieval and caching of access tokens.""" + + def __init__(self, config: OAuthTokenManagerConfig) -> None: + self.client = BackendApplicationClient( + client_id=config.client_id, scope=" ".join(config.scopes) if config.scopes is not None else None + ) + self.oauth = OAuth2Session(client=self.client) + self.token_url = config.token_url + self.client_secret = config.client_secret + self.audience = config.audience + if self.token_url is None: + msg = "token_url is required" + raise ValueError(msg) + + if self.client_secret is None: + msg = "client_secret is required" + raise ValueError(msg) + + self._lock = Lock() + self._cached_token: tuple[datetime, str] | None = None + + def get_access_token(self) -> str: + """ + Retrieves an access token from the token manager. + + If a cached token is present in the token manager, this token is returned. + If no cached token is present, a new token is fetched, cached and returned. + + Returns: + str: The access token. + + """ + with self._lock: + if self._cached_token: + expiration_time, token = self._cached_token + + if expiration_time > datetime.now(tz=UTC): + return token + + # If we reach here, the token has reached its expiration time. + # Remove the token and fetch a new one. + self._cached_token = None + + return self._get_new_access_token() + + def _get_new_access_token(self) -> str: + token_response = self.oauth.fetch_token( + token_url=self.token_url, client_secret=self.client_secret, audience=self.audience + ) + + # Calculate expiration time (half of token lifetime) + expires_in_seconds = token_response.get("expires_in", 3600) + expiration_time = datetime.now(tz=UTC) + timedelta(seconds=expires_in_seconds // 2) + + access_token = token_response.get("access_token") + + if not access_token: + logger.error("OAuthTokenManager - access_token not present in response") + exc_msg = "Access token was not present in token response" + raise ValueError(exc_msg) + + self._cached_token = (expiration_time, access_token) + return access_token \ No newline at end of file diff --git a/src/infrastructure/azureml/feature_generation.py b/src/infrastructure/azureml/feature_generation.py new file mode 100644 index 0000000..4f759d9 --- /dev/null +++ b/src/infrastructure/azureml/feature_generation.py @@ -0,0 +1,155 @@ +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import holidays +import pandas as pd + +from influxdb_client.client.query_api_async import QueryApiAsync +from src.infrastructure.influxdb.dalidata.query_dali_data import retrieve_dali_data_between +from src.infrastructure.influxdb.standard_profiles.query_standard_profiles import retrieve_standard_profiles_between_dates +from src.infrastructure.weather_data.weather_forecast import WeatherForecastData + +def _get_weather_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Get weather features for each date between the given datetime range. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing weather forecasts for the date range. + """ + weather_forecast = WeatherForecastData() + + dates = [start_date_inclusive.date() + timedelta(days=i) for i in range((end_date_inclusive.date() - start_date_inclusive.date()).days + 1)] + + forecasts_df: pd.DataFrame = pd.DataFrame() + + for d in dates: + forecast_for_date = weather_forecast.etl_weather_forecast_data(d) + pd.concat([forecasts_df, forecast_for_date], axis=1) + + return forecasts_df + +def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Get time features for each date between the given datetime range. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing time features for the date range. + """ + predict_datetimes_df = pd.DataFrame({ + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + }) + + predict_datetimes_df["year"] = predict_datetimes_df["datetime"].dt.year + predict_datetimes_df["quarter"] = predict_datetimes_df["datetime"].dt.quarter + predict_datetimes_df["month"] = predict_datetimes_df["datetime"].dt.month + predict_datetimes_df["day"] = predict_datetimes_df["datetime"].dt.day + predict_datetimes_df["hour"] = predict_datetimes_df["datetime"].dt.hour + predict_datetimes_df["minute"] = predict_datetimes_df["datetime"].dt.minute + predict_datetimes_df["dayofyear"] = predict_datetimes_df["datetime"].dt.dayofyear + predict_datetimes_df["dayofweek"] = predict_datetimes_df["datetime"].dt.dayofweek + predict_datetimes_df["weekofyear"] = predict_datetimes_df["datetime"].dt.isocalendar().week + + weekend_cutoff = 5 + predict_datetimes_df["is_weekend"] = predict_datetimes_df["datetime"].dt.dayofweek.apply(func=lambda x: 1 if x >= weekend_cutoff else 0) + + nl_holidays = holidays.country_holidays(country="NL") + predict_datetimes_df["is_holiday"] = predict_datetimes_df["datetime"].apply(func=lambda x: 1 if x.date() in nl_holidays else 0) + return predict_datetimes_df + +async def _get_lag_features_for_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Get time features for each date between the given datetime range. + + Args: + query_api (QueryApi): The read-only connection to the influx database. + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing time features for the date range. + """ + predict_datetimes_df = pd.DataFrame({ + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + }) + + # Retrieve the dalidata starting from a year ago, since we account for the lag a year ago. + start_date_year_ago = start_date_inclusive - timedelta(days=366) + # Retrieve up to 1 day before the end_date + end_date_day_ago = end_date_inclusive - timedelta(days=1) + + dalidata_df = await retrieve_dali_data_between(query_api=query_api, start_date_inclusive=start_date_year_ago, end_date_inclusive=end_date_day_ago) + + dalidata_df["datetime"] = pd.to_datetime(dalidata_df["datetime"], utc=True) + dalidata_df = dalidata_df.set_index("datetime") + + lag_1_year_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(years=1)) + lag_1_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=1)) + lag_2_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=2)) + lag_3_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=3)) + lag_4_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=4)) + lag_5_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=5)) + lag_6_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=6)) + lag_7_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=7)) + + predict_datetimes_df["lag_1_year"] = dalidata_df.reindex(lag_1_year_dt)["value"].values + predict_datetimes_df["lag_1_days"] = dalidata_df.reindex(lag_1_day_dt)["value"].values + predict_datetimes_df["lag_2_days"] = dalidata_df.reindex(lag_2_day_dt)["value"].values + predict_datetimes_df["lag_3_days"] = dalidata_df.reindex(lag_3_day_dt)["value"].values + predict_datetimes_df["lag_4_days"] = dalidata_df.reindex(lag_4_day_dt)["value"].values + predict_datetimes_df["lag_5_days"] = dalidata_df.reindex(lag_5_day_dt)["value"].values + predict_datetimes_df["lag_6_days"] = dalidata_df.reindex(lag_6_day_dt)["value"].values + predict_datetimes_df["lag_7_days"] = dalidata_df.reindex(lag_7_day_dt)["value"].values + + return predict_datetimes_df + +def _get_mock_standard_profile_features(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + predict_datetimes_df = pd.DataFrame({ + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + }) + + predict_datetimes_df["scaled_profile"] = 0 + return predict_datetimes_df + +async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Get features for the prediction model between the start date (inclusive) and end date (inclusive). + + Args: + query_api (QueryApi): The read-only connection to the influx database. + start_date_inclusive (datetime): The start date (inclusive) + start_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: A dataframe containing all the features for the given time range. + """ + features: pd.DataFrame = pd.DataFrame() + + time_features = _get_time_features_for_dates(start_date_inclusive, end_date_inclusive) + lag_features = await _get_lag_features_for_dates(query_api, start_date_inclusive, end_date_inclusive) + weather_features = _get_weather_features_for_dates(start_date_inclusive, end_date_inclusive) + # standard_profiles = await retrieve_standard_profiles_between_dates(query_api, start_date_inclusive, end_date_inclusive) + standard_profiles = _get_mock_standard_profile_features(start_date_inclusive, end_date_inclusive) + + return pd.concat([features, time_features, lag_features, weather_features, standard_profiles], axis=1) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py new file mode 100644 index 0000000..016e4c1 --- /dev/null +++ b/src/infrastructure/azureml/predictions.py @@ -0,0 +1,103 @@ +from datetime import datetime, timedelta +import json +from typing import Any + +from src.config import DITM_MODEL_API_URL, DITM_MODEL_API_CLIENT_ID, DITM_MODEL_API_CLIENT_SECRET +from src.infrastructure._auth.http.authenticated_session import _BearerAuthenticatedSession +from src.infrastructure._auth.token_manager import OAuthTokenManager +from src.models.predicted_load import PredictedGridAssetLoad +import pandas as pd + +class _DitmPredictionPayload: + def __init__(self, columns: list[str], index: list[str], data: list[Any], params: dict) -> None: + """Create a DITM predictions payload + + Args: + columns (list[str]): The columns to perform inference on + index (list[str]): The indexes + data (list[Any]): The data + params (dict): The parameters + + Returns: + DitmPredictionPayload: The payload + """ + self.columns = columns + self.index = index + self.data = data + self.params = params + + def as_json(self) -> str: + data = { + "input_data": { + "columns": self.columns, + "index": self.index, + "data": self.data, + }, + "params": self.params + } + + return json.dumps(data) + +def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAssetLoad]: + """Get transformer load predictions between the start date (exclusive) and end date (inclusive). + + Args: + features (pd.DataFrame): The features to make prediction(s) for. + + Returns: + list[TransformerLoad]: The list of transformer load predictions + """ + copied_features = features.copy() + altered_features = copied_features.reset_index(drop=True).drop(columns=["datetime"]) + + session = _BearerAuthenticatedSession(scopes=["https://ml.azure.com/.default"]) + headers = {'Content-Type':'application/json', 'Accept': 'application/json'} + payload = _DitmPredictionPayload( + columns=[ + "year", + "month", + "day", + "hour", + "minute", + "dayofyear", + "dayofweek", + "weekofyear", + "is_weekend", + "is_holiday", + "lag_1_days", + "lag_2_days", + "lag_3_days", + "lag_4_days", + "lag_5_days", + "lag_6_days", + "lag_7_days", + "lag_1_year", + "temperature", + "irradiation_duration", + "irradiation", + "cloud_coverage", + "rain", + "humidity", + "snow", + "scaled_profile" + ], + index=[], + data=altered_features.values.tolist(), + params={} + ) + + response = session.post(url=DITM_MODEL_API_URL, headers=headers, json=payload.as_json()) + + predictions = [float(x) for x in response.json()] + + if len(predictions) != len(features): + raise ValueError("Features dataframe and predictions list did not match") + + loads : list[PredictedGridAssetLoad] = [] + for index, pred in enumerate(predictions): + matching_df_row = features.iloc(index) + loads.append(PredictedGridAssetLoad(time=pd.to_datetime(matching_df_row["datetime"]), duration=timedelta(minutes=15), load=pred)) + + return loads + + \ No newline at end of file diff --git a/src/infrastructure/influxdb/dalidata/query_dali_data.py b/src/infrastructure/influxdb/dalidata/query_dali_data.py new file mode 100644 index 0000000..96cb08f --- /dev/null +++ b/src/infrastructure/influxdb/dalidata/query_dali_data.py @@ -0,0 +1,35 @@ + +from datetime import datetime + +import pandas as pd + +from src.config import INFLUXDB_ORG, DALIDATA_BUCKET_NAME +from influxdb_client.client.query_api_async import QueryApiAsync + +async def retrieve_dali_data_between(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Retrieve standard profiles from InfluxDB between the given dates. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing standard profiles for the date range. + """ + start_date_str = start_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + end_date_str = end_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + + query = f"""from(bucket: "{DALIDATA_BUCKET_NAME}") + |> range(start: {start_date_str}, stop: {end_date_str}) + |> filter(fn: (r) => r["_measurement"] == "kinesis_data") + |> filter(fn: (r) => r["_field"] == "value") + |> filter(fn: (r) => r["boxid"] == "CVD.091030-1") + |> filter(fn: (r) => r["description"] == "Trafometing vermogen som 3 fases - gem. 15 min." or r["description"] == "Trafometing_vermogen_som_3_fases_-_gem._15_min.") + |> group(columns: []) + |> sort(columns: ["_time"]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + """ # noqa: E501 + + df = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) + + return df.rename(columns={"_time": "datetime"}) \ No newline at end of file diff --git a/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py new file mode 100644 index 0000000..7a8bd97 --- /dev/null +++ b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py @@ -0,0 +1,33 @@ +from datetime import datetime + +import pandas as pd +from src.config import INFLUXDB_ORG, STANDARD_PROFILES_BUCKET_NAME +from influxdb_client.client.query_api_async import QueryApiAsync + +async def retrieve_standard_profiles_between_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + """Retrieve standard profiles from InfluxDB between the given dates. + + Args: + start_date_inclusive (datetime): The start date (inclusive) + end_date_inclusive (datetime): The end date (inclusive) + + Returns: + pd.DataFrame: The dataframe containing standard profiles for the date range. + """ + start_date_str = start_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + end_date_str = end_date_inclusive.strftime(format="%Y-%m-%dT%H:%M:%SZ") + + query = f""" + from(bucket: "{STANDARD_PROFILES_BUCKET_NAME}") + |> range(start: {start_date_str}, stop: {end_date_str}) + |> filter(fn: (r) => r["_measurement"] == "standard_profile") + |> filter(fn: (r) => "profile") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + """ + + response = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) + return response.rename( + columns={"_time": "datumtijd"} + ) + + diff --git a/src/infrastructure/influxdb/trafo_load_audit.py b/src/infrastructure/influxdb/trafo_load_audit.py new file mode 100644 index 0000000..fb958ca --- /dev/null +++ b/src/infrastructure/influxdb/trafo_load_audit.py @@ -0,0 +1,27 @@ +"""Module which contains functions to retrieve predicted trafo load from an external database.""" + +from datetime import datetime + +from influxdb_client.client.write_api_async import WriteApiAsync +import pandas as pd + +from src.config import PREDICTED_TRAFO_LOAD_BUCKET +from src.models.predicted_load import PredictedGridAssetLoad + + +async def store_predictions_for_audit( + write_api: WriteApiAsync, predicted_loads: list[PredictedGridAssetLoad] +) -> None: + """Write predicted transformer loads to the database for auditing purposes. + + Args: + write_api (WriteApi): The write connection to the database. + predicted_loads (list[PredictedGridAssetLoad]): List of predicted transformer loads to write to the database. + """ + df = pd.DataFrame( + [(tl.time, tl.load) for tl in predicted_loads], + columns=["datetime", "WAARDE"] + ) + + await write_api.write(bucket=PREDICTED_TRAFO_LOAD_BUCKET, record=df, data_frame_measurement_name="predictions", data_frame_timestamp_column="datetime") + \ No newline at end of file diff --git a/src/infrastructure/prediction_actions_impl.py b/src/infrastructure/prediction_actions_impl.py index 7a4ab98..168d415 100644 --- a/src/infrastructure/prediction_actions_impl.py +++ b/src/infrastructure/prediction_actions_impl.py @@ -7,6 +7,8 @@ from src.application.generate_events import PredictionActionsBase from src.config import INFLUXDB_BUCKET +from src.infrastructure.azureml.feature_generation import get_features_between_dates +from src.infrastructure.azureml.predictions import get_predictions_for_features from src.infrastructure.influxdb.prediction_retrieval import ( retrieve_predicted_grid_asset_load, ) @@ -49,9 +51,5 @@ async def get_predicted_grid_asset_load( Returns: list[TransformerLoad]: The list of predicted transformer loads. """ - return await retrieve_predicted_grid_asset_load( - query_api=query_api, - bucket=INFLUXDB_BUCKET, - from_date=from_date, - to_date=to_date, - ) + features_for_time_range = await get_features_between_dates(query_api=query_api, start_date_inclusive=from_date, end_date_inclusive=to_date) + return get_predictions_for_features(features=features_for_time_range) \ No newline at end of file diff --git a/src/infrastructure/weather_data/weather_forecast.py b/src/infrastructure/weather_data/weather_forecast.py new file mode 100644 index 0000000..3954294 --- /dev/null +++ b/src/infrastructure/weather_data/weather_forecast.py @@ -0,0 +1,132 @@ +"""Module for retrieving and processing weather forecasting data. + +This module provides: +- _call_weather_forecast_api: Function that calls the Open-Meteo API to fetch hourly forecast data. +- _interpolate_hourly_data_to_quarterly: Function that interpolates the hourly values data to quarterly values. +- etl_weather_forecast_data: Function that extracts, transforms and loads weather forecast data. +""" + +from datetime import date, timedelta +from typing import Any + +import pandas as pd +import requests +from pandas import concat + +from src.config import WEATHER_FORECAST_API_URL + + +class WeatherForecastData: + """Class for weather forecast data from Open Meteo.""" + + def __init__(self) -> None: + """Initializes the weather forecast data class.""" + self.om_weather_forecast_vars = { + "temperature_2m": "temperature", + "shortwave_radiation": "irradiation", + "sunshine_duration": "irradiation_duration", + "cloud_cover": "cloud_coverage", + "rain": "rain", + "relative_humidity_2m": "humidity", + "snowfall": "snow", + } + + def _call_weather_forecast_api(self, forecast_date: date) -> dict: + """Calls the Open-Meteo API to retrieve hourly weather forecast data. + + Args: + forecast_date: Date for which to fetch weather forecast. + + Returns: + JSON response containing weather forecast data. + """ + start_time = forecast_date.strftime(format="%Y-%m-%dT00:00") + end_time = forecast_date.strftime(format="%Y-%m-%dT23:45") + + params: dict[str, Any] = { + "latitude": 52.7481819, + "longitude": 6.5663292, + "hourly": list(self.om_weather_forecast_vars.keys()), + "models": "knmi_seamless", + "start_hour": start_time, + "end_hour": end_time, + } + + response = requests.get(url=WEATHER_FORECAST_API_URL, params=params, timeout=10) + return response.json() + + def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: + """.""" + match weather_var: + case "temperature": + self.weather_data[weather_var] = self.weather_data[weather_var].interpolate( + method="linear", limit=3, limit_direction="forward" + ) + + case "irradiation" | "irradiation_duration" | "rain" | "humidity": + self.weather_data[weather_var] = ( + self.weather_data[weather_var] + .interpolate(method="linear", limit=3, limit_direction="forward") + .clip(lower=0) + ) + + case "cloud_coverage" | "snow": + self.weather_data[weather_var] = self.weather_data[weather_var].ffill() + + def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: + """Extracts, transforms, and loads weather forecast data. + + - Extracts and transforms the forecast data. + - Reindexes data to 15-minute intervals to maintain consitency. + - Interpolates temperature and irradiation to 15-minute intervals. + + Args: + forecast_date: Date for which to fetch and process weather forecast. + + Returns: + Processed DataFrame with interpolated weather data at 15-minute intervals. + """ + forecast_dict = self._call_weather_forecast_api(forecast_date=forecast_date) + + weather_data = pd.DataFrame({"date_time": forecast_dict["hourly"]["time"][:-1]}) + weather_data = concat( + objs=[ + weather_data, + pd.DataFrame( + {value: forecast_dict["hourly"][key] for key, value in self.om_weather_forecast_vars.items()} + ), + ], + axis=1, + ) + weather_data["date_time"] = pd.to_datetime(arg=weather_data["date_time"], utc=True) + weather_data["snow"] = weather_data["snow"].apply(func=lambda x: (x > 0) * 1) + weather_data["cloud_coverage"] = weather_data["cloud_coverage"].apply(func=lambda x: int(x / 100 * 9)) + weather_data["irradiation_duration"] = weather_data["irradiation_duration"].apply( + func=lambda x: x / 3600 if x != -1 else x + ) # Irradiation duration is in 0.1 hours, converted to seconds + weather_data["irradiation"] = weather_data["irradiation"].apply( + func=lambda x: x * 0.36 if x != -1 else x + ) # Irradiation duration is in 0.1 hours, converted to seconds + + # Generate 15-minute intervals over the date range + full_date_range = pd.date_range( + start=weather_data["date_time"].iloc[0].tz_localize(None), + end=weather_data["date_time"].iloc[-1].tz_localize(None).date() + timedelta(days=1), + freq="15min", + tz="UTC", + ) + + # Reindex and interpolate missing values + self.weather_data = ( + weather_data.set_index("date_time") + .reindex(full_date_range) + .iloc[:-1] + .reset_index() + .rename(columns={"index": "date_time"}) + ).fillna(0) + + # Interpolate values of features to 15 min interval + for weather_var in self.om_weather_forecast_vars.values(): + self._interpolate_hourly_data_to_quarterly(weather_var=weather_var) + + return self.weather_data diff --git a/src/main.py b/src/main.py index e40bcda..4bb0abc 100644 --- a/src/main.py +++ b/src/main.py @@ -1,3 +1,5 @@ +import asyncio +import json import azure.functions as func from datetime import UTC, datetime, timedelta @@ -8,7 +10,8 @@ from openadr3_client._vtn.interfaces.filters import TargetFilter from src.application.generate_events import get_capacity_limitation_event -from src.infrastructure.predictions_actions_stub_impl import PredictionActionsStub +from src.infrastructure.influxdb._client import create_db_client +from src.infrastructure.prediction_actions_impl import PredictionActionsInfluxDB from src.logger import logger from src.config import PROGRAM_ID, VEN_NAME, VTN_BASE_URL @@ -22,7 +25,10 @@ def _initialize_bl_client() -> BusinessLogicClient: BusinessLogicClient: The BL client. """ bl_client = BusinessLogicHttpClientFactory.create_http_bl_client( - vtn_base_url=VTN_BASE_URL + vtn_base_url=VTN_BASE_URL, + client_id='test', + client_secret='test', + token_url='test', ) return bl_client @@ -40,8 +46,7 @@ async def _generate_events() -> NewEvent | None: # End time is 12:00 48 hours in the future end_time = start_time + timedelta(days=2) - # actions = PredictionActionsInfluxDB(client=create_db_client()) - actions = PredictionActionsStub() + actions = PredictionActionsInfluxDB(client=create_db_client()) return await get_capacity_limitation_event( actions, from_date=start_time, to_date=end_time @@ -66,36 +71,38 @@ async def main() -> None: try: logger.info("Triggering BL function at %s", datetime.now(tz=UTC)) event = await _generate_events() - + if not event: logger.warning( "No capacity limitation event could be constructed, skipping..." ) return None - bl_client = _initialize_bl_client() - - try: - # Clean up the old events in the VTN that are going to be replaced by the new events - await _clean_up_old_events(bl_client=bl_client) - # Create the new event in the VTN. - created_event = bl_client.events.create_event(new_event=event) - logger.info("Created event with id: %s in VTN", created_event.id) - except Exception as exc: - logger.warning( - "Exception occurred during event creation in the VTN", exc_info=exc - ) + # bl_client = _initialize_bl_client() + + # try: + # # Clean up the old events in the VTN that are going to be replaced by the new events + # await _clean_up_old_events(bl_client=bl_client) + # # Create the new event in the VTN. + # created_event = bl_client.events.create_event(new_event=event) + # logger.info("Created event with id: %s in VTN", created_event.id) + # except Exception as exc: + # logger.warning( + # "Exception occurred during event creation in the VTN", exc_info=exc + # ) except Exception as exc: logger.warning("Exception occurred during function execution", exc_info=exc) logger.info("Python timer trigger function executed.") - -@bp.schedule( - schedule="0 50 5 * * *", - arg_name="myTimer", - run_on_startup=False, - use_monitor=False, -) -async def generate_events_for_tomorrow(myTimer: func.TimerRequest) -> None: - await main() +if __name__ == "__main__": + asyncio.run(main()) + +# @bp.schedule( +# schedule="0 50 5 * * *", +# arg_name="myTimer", +# run_on_startup=False, +# use_monitor=False, +# ) +# async def generate_events_for_tomorrow(myTimer: func.TimerRequest) -> None: +# await main() From 3dc0e0338a3e8d0f4a5ab792cc4bc9be256f137a Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Tue, 21 Oct 2025 16:19:59 +0200 Subject: [PATCH 02/10] improve serialization lgoic for ML api --- src/infrastructure/azureml/predictions.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py index 016e4c1..7a5f514 100644 --- a/src/infrastructure/azureml/predictions.py +++ b/src/infrastructure/azureml/predictions.py @@ -26,7 +26,7 @@ def __init__(self, columns: list[str], index: list[str], data: list[Any], params self.data = data self.params = params - def as_json(self) -> str: + def as_json(self) -> dict: data = { "input_data": { "columns": self.columns, @@ -36,7 +36,7 @@ def as_json(self) -> str: "params": self.params } - return json.dumps(data) + return data def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAssetLoad]: """Get transformer load predictions between the start date (exclusive) and end date (inclusive). @@ -49,7 +49,8 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs """ copied_features = features.copy() altered_features = copied_features.reset_index(drop=True).drop(columns=["datetime"]) - + altered_features.fillna(0, inplace=True) + session = _BearerAuthenticatedSession(scopes=["https://ml.azure.com/.default"]) headers = {'Content-Type':'application/json', 'Accept': 'application/json'} payload = _DitmPredictionPayload( From 8d544f19b4794e23c3a9926936e87883508cc6bc Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Tue, 21 Oct 2025 16:32:02 +0200 Subject: [PATCH 03/10] EOD push --- src/infrastructure/azureml/feature_generation.py | 6 ++++-- src/infrastructure/azureml/predictions.py | 2 +- src/infrastructure/weather_data/weather_forecast.py | 12 ++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/infrastructure/azureml/feature_generation.py b/src/infrastructure/azureml/feature_generation.py index 4f759d9..f2e6fe2 100644 --- a/src/infrastructure/azureml/feature_generation.py +++ b/src/infrastructure/azureml/feature_generation.py @@ -27,9 +27,9 @@ def _get_weather_features_for_dates(start_date_inclusive: datetime, end_date_inc for d in dates: forecast_for_date = weather_forecast.etl_weather_forecast_data(d) - pd.concat([forecasts_df, forecast_for_date], axis=1) + forecasts_df = pd.concat([forecasts_df, forecast_for_date], axis=1) - return forecasts_df + return forecasts_df.rename(columns={"date_time": "datetime"}) def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: """Get time features for each date between the given datetime range. @@ -131,6 +131,7 @@ def _get_mock_standard_profile_features(start_date_inclusive: datetime, end_date }) predict_datetimes_df["scaled_profile"] = 0 + return predict_datetimes_df async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: @@ -152,4 +153,5 @@ async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclus # standard_profiles = await retrieve_standard_profiles_between_dates(query_api, start_date_inclusive, end_date_inclusive) standard_profiles = _get_mock_standard_profile_features(start_date_inclusive, end_date_inclusive) + # TODO: FIX CONCAT HERE, duplicate rows for some reason return pd.concat([features, time_features, lag_features, weather_features, standard_profiles], axis=1) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py index 7a5f514..5987650 100644 --- a/src/infrastructure/azureml/predictions.py +++ b/src/infrastructure/azureml/predictions.py @@ -50,7 +50,7 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs copied_features = features.copy() altered_features = copied_features.reset_index(drop=True).drop(columns=["datetime"]) altered_features.fillna(0, inplace=True) - + session = _BearerAuthenticatedSession(scopes=["https://ml.azure.com/.default"]) headers = {'Content-Type':'application/json', 'Accept': 'application/json'} payload = _DitmPredictionPayload( diff --git a/src/infrastructure/weather_data/weather_forecast.py b/src/infrastructure/weather_data/weather_forecast.py index 3954294..8804223 100644 --- a/src/infrastructure/weather_data/weather_forecast.py +++ b/src/infrastructure/weather_data/weather_forecast.py @@ -88,7 +88,7 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: """ forecast_dict = self._call_weather_forecast_api(forecast_date=forecast_date) - weather_data = pd.DataFrame({"date_time": forecast_dict["hourly"]["time"][:-1]}) + weather_data = pd.DataFrame({"datetime": forecast_dict["hourly"]["time"][:-1]}) weather_data = concat( objs=[ weather_data, @@ -98,7 +98,7 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: ], axis=1, ) - weather_data["date_time"] = pd.to_datetime(arg=weather_data["date_time"], utc=True) + weather_data["datetime"] = pd.to_datetime(arg=weather_data["datetime"], utc=True) weather_data["snow"] = weather_data["snow"].apply(func=lambda x: (x > 0) * 1) weather_data["cloud_coverage"] = weather_data["cloud_coverage"].apply(func=lambda x: int(x / 100 * 9)) weather_data["irradiation_duration"] = weather_data["irradiation_duration"].apply( @@ -110,19 +110,19 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: # Generate 15-minute intervals over the date range full_date_range = pd.date_range( - start=weather_data["date_time"].iloc[0].tz_localize(None), - end=weather_data["date_time"].iloc[-1].tz_localize(None).date() + timedelta(days=1), + start=weather_data["datetime"].iloc[0].tz_localize(None), + end=weather_data["datetime"].iloc[-1].tz_localize(None).date() + timedelta(days=1), freq="15min", tz="UTC", ) # Reindex and interpolate missing values self.weather_data = ( - weather_data.set_index("date_time") + weather_data.set_index("datetime") .reindex(full_date_range) .iloc[:-1] .reset_index() - .rename(columns={"index": "date_time"}) + .rename(columns={"index": "datetime"}) ).fillna(0) # Interpolate values of features to 15 min interval From b7dbb0bb7192d84ee14a5c2cde7675b4249ba1e0 Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Wed, 22 Oct 2025 08:43:51 +0200 Subject: [PATCH 04/10] finalize altertions for DITM BL to communicate with Azure ML --- src/application/generate_events.py | 2 +- .../azureml/feature_generation.py | 18 ++---------- src/infrastructure/azureml/predictions.py | 12 +++++--- .../weather_data/weather_forecast.py | 28 ++++++++++++------- src/main.py | 4 +-- src/models/predicted_load.py | 16 ++++++++--- 6 files changed, 44 insertions(+), 36 deletions(-) diff --git a/src/application/generate_events.py b/src/application/generate_events.py index 7bb5470..cc5c6e5 100644 --- a/src/application/generate_events.py +++ b/src/application/generate_events.py @@ -69,7 +69,7 @@ def _generate_capacity_limitation_intervals( type=EventPayloadType.IMPORT_CAPACITY_LIMIT, values=( predicted_grid_asset_loads.flex_capacity_required(max_capacity) - or 4, + or 22, ), ), ), diff --git a/src/infrastructure/azureml/feature_generation.py b/src/infrastructure/azureml/feature_generation.py index f2e6fe2..6561c73 100644 --- a/src/infrastructure/azureml/feature_generation.py +++ b/src/infrastructure/azureml/feature_generation.py @@ -20,16 +20,8 @@ def _get_weather_features_for_dates(start_date_inclusive: datetime, end_date_inc pd.DataFrame: The dataframe containing weather forecasts for the date range. """ weather_forecast = WeatherForecastData() - - dates = [start_date_inclusive.date() + timedelta(days=i) for i in range((end_date_inclusive.date() - start_date_inclusive.date()).days + 1)] - - forecasts_df: pd.DataFrame = pd.DataFrame() - - for d in dates: - forecast_for_date = weather_forecast.etl_weather_forecast_data(d) - forecasts_df = pd.concat([forecasts_df, forecast_for_date], axis=1) - - return forecasts_df.rename(columns={"date_time": "datetime"}) + weather_forecasts = weather_forecast.etl_weather_forecast_data(start_date_inclusive, end_date_inclusive) + return weather_forecasts.rename(columns={"date_time": "datetime"}) def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: """Get time features for each date between the given datetime range. @@ -52,7 +44,6 @@ def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclus }) predict_datetimes_df["year"] = predict_datetimes_df["datetime"].dt.year - predict_datetimes_df["quarter"] = predict_datetimes_df["datetime"].dt.quarter predict_datetimes_df["month"] = predict_datetimes_df["datetime"].dt.month predict_datetimes_df["day"] = predict_datetimes_df["datetime"].dt.day predict_datetimes_df["hour"] = predict_datetimes_df["datetime"].dt.hour @@ -145,13 +136,10 @@ async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclus Returns: pd.DataFrame: A dataframe containing all the features for the given time range. """ - features: pd.DataFrame = pd.DataFrame() - time_features = _get_time_features_for_dates(start_date_inclusive, end_date_inclusive) lag_features = await _get_lag_features_for_dates(query_api, start_date_inclusive, end_date_inclusive) weather_features = _get_weather_features_for_dates(start_date_inclusive, end_date_inclusive) # standard_profiles = await retrieve_standard_profiles_between_dates(query_api, start_date_inclusive, end_date_inclusive) standard_profiles = _get_mock_standard_profile_features(start_date_inclusive, end_date_inclusive) - # TODO: FIX CONCAT HERE, duplicate rows for some reason - return pd.concat([features, time_features, lag_features, weather_features, standard_profiles], axis=1) + return pd.concat([time_features, lag_features, weather_features, standard_profiles], axis=1) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py index 5987650..c9f8a0f 100644 --- a/src/infrastructure/azureml/predictions.py +++ b/src/infrastructure/azureml/predictions.py @@ -9,7 +9,7 @@ import pandas as pd class _DitmPredictionPayload: - def __init__(self, columns: list[str], index: list[str], data: list[Any], params: dict) -> None: + def __init__(self, columns: list[str], index: list[int], data: list[Any], params: dict) -> None: """Create a DITM predictions payload Args: @@ -82,7 +82,7 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs "snow", "scaled_profile" ], - index=[], + index=list(range(len(altered_features))), data=altered_features.values.tolist(), params={} ) @@ -95,9 +95,13 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs raise ValueError("Features dataframe and predictions list did not match") loads : list[PredictedGridAssetLoad] = [] + for index, pred in enumerate(predictions): - matching_df_row = features.iloc(index) - loads.append(PredictedGridAssetLoad(time=pd.to_datetime(matching_df_row["datetime"]), duration=timedelta(minutes=15), load=pred)) + matching_df_row = features.iloc[index] + datetime_of_load = matching_df_row["datetime"][0] + converted = pd.to_datetime(datetime_of_load, utc=True).to_pydatetime() + + loads.append(PredictedGridAssetLoad(time=converted, duration=timedelta(minutes=15), load=pred)) return loads diff --git a/src/infrastructure/weather_data/weather_forecast.py b/src/infrastructure/weather_data/weather_forecast.py index 8804223..b289024 100644 --- a/src/infrastructure/weather_data/weather_forecast.py +++ b/src/infrastructure/weather_data/weather_forecast.py @@ -6,7 +6,7 @@ - etl_weather_forecast_data: Function that extracts, transforms and loads weather forecast data. """ -from datetime import date, timedelta +from datetime import date, datetime, timedelta, timezone from typing import Any import pandas as pd @@ -31,17 +31,21 @@ def __init__(self) -> None: "snowfall": "snow", } - def _call_weather_forecast_api(self, forecast_date: date) -> dict: + def _call_weather_forecast_api(self, start_time_inclusive: datetime, end_time_inclusive: datetime) -> dict: """Calls the Open-Meteo API to retrieve hourly weather forecast data. Args: - forecast_date: Date for which to fetch weather forecast. + start_time_inclusive (datetime): Datetime from which to start fetching forecasts + end_time_inclusive (datetime): Datetime from which to stop fetching forecasts Returns: JSON response containing weather forecast data. """ - start_time = forecast_date.strftime(format="%Y-%m-%dT00:00") - end_time = forecast_date.strftime(format="%Y-%m-%dT23:45") + start_time_utc = start_time_inclusive.astimezone(timezone.utc) + end_time_utc = end_time_inclusive.astimezone(timezone.utc) + + start_time = start_time_utc.strftime(format="%Y-%m-%dT%H:%M") + end_time = end_time_utc.strftime(format="%Y-%m-%dT%H:%M") params: dict[str, Any] = { "latitude": 52.7481819, @@ -53,6 +57,7 @@ def _call_weather_forecast_api(self, forecast_date: date) -> dict: } response = requests.get(url=WEATHER_FORECAST_API_URL, params=params, timeout=10) + response.raise_for_status() return response.json() def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: @@ -73,7 +78,7 @@ def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: case "cloud_coverage" | "snow": self.weather_data[weather_var] = self.weather_data[weather_var].ffill() - def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: + def etl_weather_forecast_data(self, start_time_inclusive: datetime, end_time_inclusive: datetime) -> pd.DataFrame: """Extracts, transforms, and loads weather forecast data. - Extracts and transforms the forecast data. @@ -81,14 +86,15 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: - Interpolates temperature and irradiation to 15-minute intervals. Args: - forecast_date: Date for which to fetch and process weather forecast. + start_time_inclusive (datetime): Datetime from which to start fetching forecasts + end_time_inclusive (datetime): Datetime from which to stop fetching forecasts Returns: Processed DataFrame with interpolated weather data at 15-minute intervals. """ - forecast_dict = self._call_weather_forecast_api(forecast_date=forecast_date) + forecast_dict = self._call_weather_forecast_api(start_time_inclusive, end_time_inclusive) - weather_data = pd.DataFrame({"datetime": forecast_dict["hourly"]["time"][:-1]}) + weather_data = pd.DataFrame({"datetime": forecast_dict["hourly"]["time"]}) weather_data = concat( objs=[ weather_data, @@ -111,7 +117,7 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: # Generate 15-minute intervals over the date range full_date_range = pd.date_range( start=weather_data["datetime"].iloc[0].tz_localize(None), - end=weather_data["datetime"].iloc[-1].tz_localize(None).date() + timedelta(days=1), + end=weather_data["datetime"].iloc[-1].tz_localize(None), freq="15min", tz="UTC", ) @@ -125,6 +131,8 @@ def etl_weather_forecast_data(self, forecast_date: date) -> pd.DataFrame: .rename(columns={"index": "datetime"}) ).fillna(0) + self.weather_data["datetime"] = self.weather_data["datetime"] = self.weather_data["datetime"].dt.tz_convert("Europe/Amsterdam") + # Interpolate values of features to 15 min interval for weather_var in self.om_weather_forecast_vars.values(): self._interpolate_hourly_data_to_quarterly(weather_var=weather_var) diff --git a/src/main.py b/src/main.py index 4bb0abc..eee546f 100644 --- a/src/main.py +++ b/src/main.py @@ -43,8 +43,8 @@ async def _generate_events() -> NewEvent | None: # Start time is 12:00 today. start_time = current_time_ams.replace(hour=12, minute=0, second=0, microsecond=0) - # End time is 12:00 48 hours in the future - end_time = start_time + timedelta(days=2) + # End time is 12:00 24 hours in the future + end_time = start_time + timedelta(days=1) actions = PredictionActionsInfluxDB(client=create_db_client()) diff --git a/src/models/predicted_load.py b/src/models/predicted_load.py index 77e30df..657cf21 100644 --- a/src/models/predicted_load.py +++ b/src/models/predicted_load.py @@ -52,8 +52,16 @@ def flex_capacity_required(self, max_capacity: float) -> float | None: Returns: float: The amount of kw of flex which is needed for this grid asset load period. """ - if self.load > max_capacity: - return self.load - max_capacity + min_guaranteed_capacity = 4 + max_capacity_of_pod = 22 # TODO: configure based on input from actual battery when available. + max_excess = 50 - # No flex capacity needed for this period. - return None + excess_load = self.load - max_capacity + if excess_load <= 0: + return 0 # not exceeding capacity + + # Linear scaling from min_value to max_value + scaled = min_guaranteed_capacity + (max_capacity_of_pod - min_guaranteed_capacity) * (excess_load / max_excess) + + # Cap to max_value + return min(max_capacity_of_pod, scaled) From c233ee2b6ff0eb70da110b82d17889b8bcdbfcd8 Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Wed, 22 Oct 2025 08:44:52 +0200 Subject: [PATCH 05/10] reenable event creation in VTN --- src/main.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main.py b/src/main.py index eee546f..531bd10 100644 --- a/src/main.py +++ b/src/main.py @@ -78,18 +78,18 @@ async def main() -> None: ) return None - # bl_client = _initialize_bl_client() - - # try: - # # Clean up the old events in the VTN that are going to be replaced by the new events - # await _clean_up_old_events(bl_client=bl_client) - # # Create the new event in the VTN. - # created_event = bl_client.events.create_event(new_event=event) - # logger.info("Created event with id: %s in VTN", created_event.id) - # except Exception as exc: - # logger.warning( - # "Exception occurred during event creation in the VTN", exc_info=exc - # ) + bl_client = _initialize_bl_client() + + try: + # Clean up the old events in the VTN that are going to be replaced by the new events + await _clean_up_old_events(bl_client=bl_client) + # Create the new event in the VTN. + created_event = bl_client.events.create_event(new_event=event) + logger.info("Created event with id: %s in VTN", created_event.id) + except Exception as exc: + logger.warning( + "Exception occurred during event creation in the VTN", exc_info=exc + ) except Exception as exc: logger.warning("Exception occurred during function execution", exc_info=exc) From 49cb8f1f888b2f87cccef3d2766ba31d178a8351 Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Wed, 22 Oct 2025 08:48:02 +0200 Subject: [PATCH 06/10] Reenable azure function schedule --- src/main.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main.py b/src/main.py index 531bd10..f797a33 100644 --- a/src/main.py +++ b/src/main.py @@ -95,14 +95,14 @@ async def main() -> None: logger.info("Python timer trigger function executed.") -if __name__ == "__main__": - asyncio.run(main()) - -# @bp.schedule( -# schedule="0 50 5 * * *", -# arg_name="myTimer", -# run_on_startup=False, -# use_monitor=False, -# ) -# async def generate_events_for_tomorrow(myTimer: func.TimerRequest) -> None: -# await main() +# if __name__ == "__main__": +# asyncio.run(main()) + +@bp.schedule( + schedule="0 55 5 * * *", + arg_name="myTimer", + run_on_startup=False, + use_monitor=False, +) +async def generate_events_for_tomorrow(myTimer: func.TimerRequest) -> None: + await main() From bd8d382027693106d85af9193acbf2dd6b2b03f0 Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Wed, 22 Oct 2025 11:37:26 +0200 Subject: [PATCH 07/10] run ruff --- src/config.py | 6 +- .../_auth/http/authenticated_session.py | 33 ++- src/infrastructure/_auth/token_manager.py | 15 +- .../azureml/feature_generation.py | 204 ++++++++++++------ src/infrastructure/azureml/predictions.py | 48 +++-- .../influxdb/dalidata/query_dali_data.py | 12 +- .../query_standard_profiles.py | 15 +- .../influxdb/trafo_load_audit.py | 13 +- src/infrastructure/prediction_actions_impl.py | 12 +- .../weather_data/weather_forecast.py | 41 ++-- src/main.py | 12 +- src/models/predicted_load.py | 8 +- 12 files changed, 279 insertions(+), 140 deletions(-) diff --git a/src/config.py b/src/config.py index 3e626c0..f6cb6b3 100644 --- a/src/config.py +++ b/src/config.py @@ -25,7 +25,9 @@ PREDICTED_TRAFO_LOAD_BUCKET = config( "PREDICTED_TRAFO_LOAD_BUCKET", default="ditm_model_output" ) -STANDARD_PROFILES_BUCKET_NAME = config("STANDARD_PROFILES_BUCKET_NAME", default="ditm_standard_profiles") +STANDARD_PROFILES_BUCKET_NAME = config( + "STANDARD_PROFILES_BUCKET_NAME", default="ditm_standard_profiles" +) DALIDATA_BUCKET_NAME = config("DALIDATA_BUCKET_NAME", default="dalidata") # External services URLs @@ -35,4 +37,4 @@ DITM_MODEL_API_URL = config("DITM_MODEL_API_URL") DITM_MODEL_API_CLIENT_ID = config("DITM_MODEL_API_CLIENT_ID") DITM_MODEL_API_CLIENT_SECRET = config("DITM_MODEL_API_CLIENT_SECRET") -DITM_MODEL_API_TOKEN_URL = config("DITM_MODEL_API_TOKEN_URL") \ No newline at end of file +DITM_MODEL_API_TOKEN_URL = config("DITM_MODEL_API_TOKEN_URL") diff --git a/src/infrastructure/_auth/http/authenticated_session.py b/src/infrastructure/_auth/http/authenticated_session.py index 93164c4..e507607 100644 --- a/src/infrastructure/_auth/http/authenticated_session.py +++ b/src/infrastructure/_auth/http/authenticated_session.py @@ -4,8 +4,15 @@ from requests import PreparedRequest, Session from requests.auth import AuthBase -from src.config import DITM_MODEL_API_CLIENT_ID, DITM_MODEL_API_CLIENT_SECRET, DITM_MODEL_API_TOKEN_URL -from src.infrastructure._auth.token_manager import OAuthTokenManager, OAuthTokenManagerConfig +from src.config import ( + DITM_MODEL_API_CLIENT_ID, + DITM_MODEL_API_CLIENT_SECRET, + DITM_MODEL_API_TOKEN_URL, +) +from src.infrastructure._auth.token_manager import ( + OAuthTokenManager, + OAuthTokenManagerConfig, +) class _BearerAuth(AuthBase): @@ -30,16 +37,20 @@ def __call__(self, r: PreparedRequest) -> PreparedRequest: class _BearerAuthenticatedSession(Session): """Session that includes a bearer token in all requests made through it.""" - def __init__(self, token_manager: Optional[OAuthTokenManager] = None, scopes: Optional[list[str]] = None) -> None: + def __init__( + self, + token_manager: Optional[OAuthTokenManager] = None, + scopes: Optional[list[str]] = None, + ) -> None: super().__init__() if not token_manager: token_manager = OAuthTokenManager( - OAuthTokenManagerConfig( - client_id=DITM_MODEL_API_CLIENT_ID, - client_secret=DITM_MODEL_API_CLIENT_SECRET, - token_url=DITM_MODEL_API_TOKEN_URL, - scopes=scopes, - audience=None - ) + OAuthTokenManagerConfig( + client_id=DITM_MODEL_API_CLIENT_ID, + client_secret=DITM_MODEL_API_CLIENT_SECRET, + token_url=DITM_MODEL_API_TOKEN_URL, + scopes=scopes, + audience=None, ) - self.auth = _BearerAuth(token_manager) \ No newline at end of file + ) + self.auth = _BearerAuth(token_manager) diff --git a/src/infrastructure/_auth/token_manager.py b/src/infrastructure/_auth/token_manager.py index af436f0..6e57b49 100644 --- a/src/infrastructure/_auth/token_manager.py +++ b/src/infrastructure/_auth/token_manager.py @@ -5,6 +5,8 @@ from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session +from src.logger import logger + @dataclass class OAuthTokenManagerConfig: @@ -20,7 +22,8 @@ class OAuthTokenManager: def __init__(self, config: OAuthTokenManagerConfig) -> None: self.client = BackendApplicationClient( - client_id=config.client_id, scope=" ".join(config.scopes) if config.scopes is not None else None + client_id=config.client_id, + scope=" ".join(config.scopes) if config.scopes is not None else None, ) self.oauth = OAuth2Session(client=self.client) self.token_url = config.token_url @@ -63,12 +66,16 @@ def get_access_token(self) -> str: def _get_new_access_token(self) -> str: token_response = self.oauth.fetch_token( - token_url=self.token_url, client_secret=self.client_secret, audience=self.audience + token_url=self.token_url, + client_secret=self.client_secret, + audience=self.audience, ) # Calculate expiration time (half of token lifetime) expires_in_seconds = token_response.get("expires_in", 3600) - expiration_time = datetime.now(tz=UTC) + timedelta(seconds=expires_in_seconds // 2) + expiration_time = datetime.now(tz=UTC) + timedelta( + seconds=expires_in_seconds // 2 + ) access_token = token_response.get("access_token") @@ -78,4 +85,4 @@ def _get_new_access_token(self) -> str: raise ValueError(exc_msg) self._cached_token = (expiration_time, access_token) - return access_token \ No newline at end of file + return access_token diff --git a/src/infrastructure/azureml/feature_generation.py b/src/infrastructure/azureml/feature_generation.py index 6561c73..9ce658b 100644 --- a/src/infrastructure/azureml/feature_generation.py +++ b/src/infrastructure/azureml/feature_generation.py @@ -5,11 +5,15 @@ import pandas as pd from influxdb_client.client.query_api_async import QueryApiAsync -from src.infrastructure.influxdb.dalidata.query_dali_data import retrieve_dali_data_between -from src.infrastructure.influxdb.standard_profiles.query_standard_profiles import retrieve_standard_profiles_between_dates +from src.infrastructure.influxdb.dalidata.query_dali_data import ( + retrieve_dali_data_between, +) from src.infrastructure.weather_data.weather_forecast import WeatherForecastData -def _get_weather_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +def _get_weather_features_for_dates( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: """Get weather features for each date between the given datetime range. Args: @@ -20,10 +24,15 @@ def _get_weather_features_for_dates(start_date_inclusive: datetime, end_date_inc pd.DataFrame: The dataframe containing weather forecasts for the date range. """ weather_forecast = WeatherForecastData() - weather_forecasts = weather_forecast.etl_weather_forecast_data(start_date_inclusive, end_date_inclusive) + weather_forecasts = weather_forecast.etl_weather_forecast_data( + start_date_inclusive, end_date_inclusive + ) return weather_forecasts.rename(columns={"date_time": "datetime"}) -def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +def _get_time_features_for_dates( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: """Get time features for each date between the given datetime range. Args: @@ -33,15 +42,17 @@ def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclus Returns: pd.DataFrame: The dataframe containing time features for the date range. """ - predict_datetimes_df = pd.DataFrame({ - "datetime": pd.date_range( - start=start_date_inclusive, - end=end_date_inclusive, - freq="15min", - tz=ZoneInfo("Europe/Amsterdam"), - inclusive="left", - ) - }) + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) predict_datetimes_df["year"] = predict_datetimes_df["datetime"].dt.year predict_datetimes_df["month"] = predict_datetimes_df["datetime"].dt.month @@ -50,16 +61,27 @@ def _get_time_features_for_dates(start_date_inclusive: datetime, end_date_inclus predict_datetimes_df["minute"] = predict_datetimes_df["datetime"].dt.minute predict_datetimes_df["dayofyear"] = predict_datetimes_df["datetime"].dt.dayofyear predict_datetimes_df["dayofweek"] = predict_datetimes_df["datetime"].dt.dayofweek - predict_datetimes_df["weekofyear"] = predict_datetimes_df["datetime"].dt.isocalendar().week + predict_datetimes_df["weekofyear"] = ( + predict_datetimes_df["datetime"].dt.isocalendar().week + ) weekend_cutoff = 5 - predict_datetimes_df["is_weekend"] = predict_datetimes_df["datetime"].dt.dayofweek.apply(func=lambda x: 1 if x >= weekend_cutoff else 0) - + predict_datetimes_df["is_weekend"] = predict_datetimes_df[ + "datetime" + ].dt.dayofweek.apply(func=lambda x: 1 if x >= weekend_cutoff else 0) + nl_holidays = holidays.country_holidays(country="NL") - predict_datetimes_df["is_holiday"] = predict_datetimes_df["datetime"].apply(func=lambda x: 1 if x.date() in nl_holidays else 0) + predict_datetimes_df["is_holiday"] = predict_datetimes_df["datetime"].apply( + func=lambda x: 1 if x.date() in nl_holidays else 0 + ) return predict_datetimes_df -async def _get_lag_features_for_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +async def _get_lag_features_for_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: """Get time features for each date between the given datetime range. Args: @@ -70,62 +92,110 @@ async def _get_lag_features_for_dates(query_api: QueryApiAsync, start_date_inclu Returns: pd.DataFrame: The dataframe containing time features for the date range. """ - predict_datetimes_df = pd.DataFrame({ - "datetime": pd.date_range( - start=start_date_inclusive, - end=end_date_inclusive, - freq="15min", - tz=ZoneInfo("Europe/Amsterdam"), - inclusive="left", - ) - }) + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) # Retrieve the dalidata starting from a year ago, since we account for the lag a year ago. start_date_year_ago = start_date_inclusive - timedelta(days=366) # Retrieve up to 1 day before the end_date end_date_day_ago = end_date_inclusive - timedelta(days=1) - dalidata_df = await retrieve_dali_data_between(query_api=query_api, start_date_inclusive=start_date_year_ago, end_date_inclusive=end_date_day_ago) + dalidata_df = await retrieve_dali_data_between( + query_api=query_api, + start_date_inclusive=start_date_year_ago, + end_date_inclusive=end_date_day_ago, + ) dalidata_df["datetime"] = pd.to_datetime(dalidata_df["datetime"], utc=True) dalidata_df = dalidata_df.set_index("datetime") - lag_1_year_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(years=1)) - lag_1_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=1)) - lag_2_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=2)) - lag_3_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=3)) - lag_4_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=4)) - lag_5_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=5)) - lag_6_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=6)) - lag_7_day_dt = predict_datetimes_df["datetime"].apply(lambda dt: dt - pd.DateOffset(days=7)) - - predict_datetimes_df["lag_1_year"] = dalidata_df.reindex(lag_1_year_dt)["value"].values - predict_datetimes_df["lag_1_days"] = dalidata_df.reindex(lag_1_day_dt)["value"].values - predict_datetimes_df["lag_2_days"] = dalidata_df.reindex(lag_2_day_dt)["value"].values - predict_datetimes_df["lag_3_days"] = dalidata_df.reindex(lag_3_day_dt)["value"].values - predict_datetimes_df["lag_4_days"] = dalidata_df.reindex(lag_4_day_dt)["value"].values - predict_datetimes_df["lag_5_days"] = dalidata_df.reindex(lag_5_day_dt)["value"].values - predict_datetimes_df["lag_6_days"] = dalidata_df.reindex(lag_6_day_dt)["value"].values - predict_datetimes_df["lag_7_days"] = dalidata_df.reindex(lag_7_day_dt)["value"].values + lag_1_year_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(years=1) + ) + lag_1_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=1) + ) + lag_2_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=2) + ) + lag_3_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=3) + ) + lag_4_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=4) + ) + lag_5_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=5) + ) + lag_6_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=6) + ) + lag_7_day_dt = predict_datetimes_df["datetime"].apply( + lambda dt: dt - pd.DateOffset(days=7) + ) + + predict_datetimes_df["lag_1_year"] = dalidata_df.reindex(lag_1_year_dt)[ + "value" + ].values + predict_datetimes_df["lag_1_days"] = dalidata_df.reindex(lag_1_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_2_days"] = dalidata_df.reindex(lag_2_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_3_days"] = dalidata_df.reindex(lag_3_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_4_days"] = dalidata_df.reindex(lag_4_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_5_days"] = dalidata_df.reindex(lag_5_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_6_days"] = dalidata_df.reindex(lag_6_day_dt)[ + "value" + ].values + predict_datetimes_df["lag_7_days"] = dalidata_df.reindex(lag_7_day_dt)[ + "value" + ].values return predict_datetimes_df -def _get_mock_standard_profile_features(start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: - predict_datetimes_df = pd.DataFrame({ - "datetime": pd.date_range( - start=start_date_inclusive, - end=end_date_inclusive, - freq="15min", - tz=ZoneInfo("Europe/Amsterdam"), - inclusive="left", - ) - }) + +def _get_mock_standard_profile_features( + start_date_inclusive: datetime, end_date_inclusive: datetime +) -> pd.DataFrame: + predict_datetimes_df = pd.DataFrame( + { + "datetime": pd.date_range( + start=start_date_inclusive, + end=end_date_inclusive, + freq="15min", + tz=ZoneInfo("Europe/Amsterdam"), + inclusive="left", + ) + } + ) predict_datetimes_df["scaled_profile"] = 0 return predict_datetimes_df -async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +async def get_features_between_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: """Get features for the prediction model between the start date (inclusive) and end date (inclusive). Args: @@ -136,10 +206,20 @@ async def get_features_between_dates(query_api: QueryApiAsync, start_date_inclus Returns: pd.DataFrame: A dataframe containing all the features for the given time range. """ - time_features = _get_time_features_for_dates(start_date_inclusive, end_date_inclusive) - lag_features = await _get_lag_features_for_dates(query_api, start_date_inclusive, end_date_inclusive) - weather_features = _get_weather_features_for_dates(start_date_inclusive, end_date_inclusive) + time_features = _get_time_features_for_dates( + start_date_inclusive, end_date_inclusive + ) + lag_features = await _get_lag_features_for_dates( + query_api, start_date_inclusive, end_date_inclusive + ) + weather_features = _get_weather_features_for_dates( + start_date_inclusive, end_date_inclusive + ) # standard_profiles = await retrieve_standard_profiles_between_dates(query_api, start_date_inclusive, end_date_inclusive) - standard_profiles = _get_mock_standard_profile_features(start_date_inclusive, end_date_inclusive) + standard_profiles = _get_mock_standard_profile_features( + start_date_inclusive, end_date_inclusive + ) - return pd.concat([time_features, lag_features, weather_features, standard_profiles], axis=1) + return pd.concat( + [time_features, lag_features, weather_features, standard_profiles], axis=1 + ) diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py index c9f8a0f..e213a0e 100644 --- a/src/infrastructure/azureml/predictions.py +++ b/src/infrastructure/azureml/predictions.py @@ -1,15 +1,20 @@ -from datetime import datetime, timedelta -import json +from datetime import timedelta from typing import Any -from src.config import DITM_MODEL_API_URL, DITM_MODEL_API_CLIENT_ID, DITM_MODEL_API_CLIENT_SECRET -from src.infrastructure._auth.http.authenticated_session import _BearerAuthenticatedSession -from src.infrastructure._auth.token_manager import OAuthTokenManager +from src.config import ( + DITM_MODEL_API_URL, +) +from src.infrastructure._auth.http.authenticated_session import ( + _BearerAuthenticatedSession, +) from src.models.predicted_load import PredictedGridAssetLoad import pandas as pd + class _DitmPredictionPayload: - def __init__(self, columns: list[str], index: list[int], data: list[Any], params: dict) -> None: + def __init__( + self, columns: list[str], index: list[int], data: list[Any], params: dict + ) -> None: """Create a DITM predictions payload Args: @@ -33,12 +38,15 @@ def as_json(self) -> dict: "index": self.index, "data": self.data, }, - "params": self.params + "params": self.params, } return data -def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAssetLoad]: + +def get_predictions_for_features( + features: pd.DataFrame, +) -> list[PredictedGridAssetLoad]: """Get transformer load predictions between the start date (exclusive) and end date (inclusive). Args: @@ -52,7 +60,7 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs altered_features.fillna(0, inplace=True) session = _BearerAuthenticatedSession(scopes=["https://ml.azure.com/.default"]) - headers = {'Content-Type':'application/json', 'Accept': 'application/json'} + headers = {"Content-Type": "application/json", "Accept": "application/json"} payload = _DitmPredictionPayload( columns=[ "year", @@ -65,7 +73,7 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs "weekofyear", "is_weekend", "is_holiday", - "lag_1_days", + "lag_1_days", "lag_2_days", "lag_3_days", "lag_4_days", @@ -80,29 +88,33 @@ def get_predictions_for_features(features: pd.DataFrame) -> list[PredictedGridAs "rain", "humidity", "snow", - "scaled_profile" + "scaled_profile", ], index=list(range(len(altered_features))), data=altered_features.values.tolist(), - params={} + params={}, ) - response = session.post(url=DITM_MODEL_API_URL, headers=headers, json=payload.as_json()) + response = session.post( + url=DITM_MODEL_API_URL, headers=headers, json=payload.as_json() + ) predictions = [float(x) for x in response.json()] if len(predictions) != len(features): raise ValueError("Features dataframe and predictions list did not match") - loads : list[PredictedGridAssetLoad] = [] - + loads: list[PredictedGridAssetLoad] = [] + for index, pred in enumerate(predictions): matching_df_row = features.iloc[index] datetime_of_load = matching_df_row["datetime"][0] converted = pd.to_datetime(datetime_of_load, utc=True).to_pydatetime() - loads.append(PredictedGridAssetLoad(time=converted, duration=timedelta(minutes=15), load=pred)) + loads.append( + PredictedGridAssetLoad( + time=converted, duration=timedelta(minutes=15), load=pred + ) + ) return loads - - \ No newline at end of file diff --git a/src/infrastructure/influxdb/dalidata/query_dali_data.py b/src/infrastructure/influxdb/dalidata/query_dali_data.py index 96cb08f..7fd6517 100644 --- a/src/infrastructure/influxdb/dalidata/query_dali_data.py +++ b/src/infrastructure/influxdb/dalidata/query_dali_data.py @@ -1,4 +1,3 @@ - from datetime import datetime import pandas as pd @@ -6,9 +5,14 @@ from src.config import INFLUXDB_ORG, DALIDATA_BUCKET_NAME from influxdb_client.client.query_api_async import QueryApiAsync -async def retrieve_dali_data_between(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +async def retrieve_dali_data_between( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: """Retrieve standard profiles from InfluxDB between the given dates. - + Args: start_date_inclusive (datetime): The start date (inclusive) end_date_inclusive (datetime): The end date (inclusive) @@ -32,4 +36,4 @@ async def retrieve_dali_data_between(query_api: QueryApiAsync, start_date_inclus df = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) - return df.rename(columns={"_time": "datetime"}) \ No newline at end of file + return df.rename(columns={"_time": "datetime"}) diff --git a/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py index 7a8bd97..37d3cfe 100644 --- a/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py +++ b/src/infrastructure/influxdb/standard_profiles/query_standard_profiles.py @@ -4,9 +4,14 @@ from src.config import INFLUXDB_ORG, STANDARD_PROFILES_BUCKET_NAME from influxdb_client.client.query_api_async import QueryApiAsync -async def retrieve_standard_profiles_between_dates(query_api: QueryApiAsync, start_date_inclusive: datetime, end_date_inclusive: datetime) -> pd.DataFrame: + +async def retrieve_standard_profiles_between_dates( + query_api: QueryApiAsync, + start_date_inclusive: datetime, + end_date_inclusive: datetime, +) -> pd.DataFrame: """Retrieve standard profiles from InfluxDB between the given dates. - + Args: start_date_inclusive (datetime): The start date (inclusive) end_date_inclusive (datetime): The end date (inclusive) @@ -26,8 +31,4 @@ async def retrieve_standard_profiles_between_dates(query_api: QueryApiAsync, sta """ response = await query_api.query_data_frame(query=query, org=INFLUXDB_ORG) - return response.rename( - columns={"_time": "datumtijd"} - ) - - + return response.rename(columns={"_time": "datumtijd"}) diff --git a/src/infrastructure/influxdb/trafo_load_audit.py b/src/infrastructure/influxdb/trafo_load_audit.py index fb958ca..81534da 100644 --- a/src/infrastructure/influxdb/trafo_load_audit.py +++ b/src/infrastructure/influxdb/trafo_load_audit.py @@ -1,7 +1,5 @@ """Module which contains functions to retrieve predicted trafo load from an external database.""" -from datetime import datetime - from influxdb_client.client.write_api_async import WriteApiAsync import pandas as pd @@ -19,9 +17,12 @@ async def store_predictions_for_audit( predicted_loads (list[PredictedGridAssetLoad]): List of predicted transformer loads to write to the database. """ df = pd.DataFrame( - [(tl.time, tl.load) for tl in predicted_loads], - columns=["datetime", "WAARDE"] + [(tl.time, tl.load) for tl in predicted_loads], columns=["datetime", "WAARDE"] ) - await write_api.write(bucket=PREDICTED_TRAFO_LOAD_BUCKET, record=df, data_frame_measurement_name="predictions", data_frame_timestamp_column="datetime") - \ No newline at end of file + await write_api.write( + bucket=PREDICTED_TRAFO_LOAD_BUCKET, + record=df, + data_frame_measurement_name="predictions", + data_frame_timestamp_column="datetime", + ) diff --git a/src/infrastructure/prediction_actions_impl.py b/src/infrastructure/prediction_actions_impl.py index 168d415..89f8a77 100644 --- a/src/infrastructure/prediction_actions_impl.py +++ b/src/infrastructure/prediction_actions_impl.py @@ -6,12 +6,8 @@ from influxdb_client.client.query_api_async import QueryApiAsync from src.application.generate_events import PredictionActionsBase -from src.config import INFLUXDB_BUCKET from src.infrastructure.azureml.feature_generation import get_features_between_dates from src.infrastructure.azureml.predictions import get_predictions_for_features -from src.infrastructure.influxdb.prediction_retrieval import ( - retrieve_predicted_grid_asset_load, -) from src.models.predicted_load import PredictedGridAssetLoad @@ -51,5 +47,9 @@ async def get_predicted_grid_asset_load( Returns: list[TransformerLoad]: The list of predicted transformer loads. """ - features_for_time_range = await get_features_between_dates(query_api=query_api, start_date_inclusive=from_date, end_date_inclusive=to_date) - return get_predictions_for_features(features=features_for_time_range) \ No newline at end of file + features_for_time_range = await get_features_between_dates( + query_api=query_api, + start_date_inclusive=from_date, + end_date_inclusive=to_date, + ) + return get_predictions_for_features(features=features_for_time_range) diff --git a/src/infrastructure/weather_data/weather_forecast.py b/src/infrastructure/weather_data/weather_forecast.py index b289024..292a823 100644 --- a/src/infrastructure/weather_data/weather_forecast.py +++ b/src/infrastructure/weather_data/weather_forecast.py @@ -6,7 +6,7 @@ - etl_weather_forecast_data: Function that extracts, transforms and loads weather forecast data. """ -from datetime import date, datetime, timedelta, timezone +from datetime import datetime, timezone from typing import Any import pandas as pd @@ -31,7 +31,9 @@ def __init__(self) -> None: "snowfall": "snow", } - def _call_weather_forecast_api(self, start_time_inclusive: datetime, end_time_inclusive: datetime) -> dict: + def _call_weather_forecast_api( + self, start_time_inclusive: datetime, end_time_inclusive: datetime + ) -> dict: """Calls the Open-Meteo API to retrieve hourly weather forecast data. Args: @@ -64,9 +66,9 @@ def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: """.""" match weather_var: case "temperature": - self.weather_data[weather_var] = self.weather_data[weather_var].interpolate( - method="linear", limit=3, limit_direction="forward" - ) + self.weather_data[weather_var] = self.weather_data[ + weather_var + ].interpolate(method="linear", limit=3, limit_direction="forward") case "irradiation" | "irradiation_duration" | "rain" | "humidity": self.weather_data[weather_var] = ( @@ -78,7 +80,9 @@ def _interpolate_hourly_data_to_quarterly(self, weather_var: str) -> None: case "cloud_coverage" | "snow": self.weather_data[weather_var] = self.weather_data[weather_var].ffill() - def etl_weather_forecast_data(self, start_time_inclusive: datetime, end_time_inclusive: datetime) -> pd.DataFrame: + def etl_weather_forecast_data( + self, start_time_inclusive: datetime, end_time_inclusive: datetime + ) -> pd.DataFrame: """Extracts, transforms, and loads weather forecast data. - Extracts and transforms the forecast data. @@ -92,22 +96,33 @@ def etl_weather_forecast_data(self, start_time_inclusive: datetime, end_time_inc Returns: Processed DataFrame with interpolated weather data at 15-minute intervals. """ - forecast_dict = self._call_weather_forecast_api(start_time_inclusive, end_time_inclusive) + forecast_dict = self._call_weather_forecast_api( + start_time_inclusive, end_time_inclusive + ) weather_data = pd.DataFrame({"datetime": forecast_dict["hourly"]["time"]}) weather_data = concat( objs=[ weather_data, pd.DataFrame( - {value: forecast_dict["hourly"][key] for key, value in self.om_weather_forecast_vars.items()} + { + value: forecast_dict["hourly"][key] + for key, value in self.om_weather_forecast_vars.items() + } ), ], axis=1, ) - weather_data["datetime"] = pd.to_datetime(arg=weather_data["datetime"], utc=True) + weather_data["datetime"] = pd.to_datetime( + arg=weather_data["datetime"], utc=True + ) weather_data["snow"] = weather_data["snow"].apply(func=lambda x: (x > 0) * 1) - weather_data["cloud_coverage"] = weather_data["cloud_coverage"].apply(func=lambda x: int(x / 100 * 9)) - weather_data["irradiation_duration"] = weather_data["irradiation_duration"].apply( + weather_data["cloud_coverage"] = weather_data["cloud_coverage"].apply( + func=lambda x: int(x / 100 * 9) + ) + weather_data["irradiation_duration"] = weather_data[ + "irradiation_duration" + ].apply( func=lambda x: x / 3600 if x != -1 else x ) # Irradiation duration is in 0.1 hours, converted to seconds weather_data["irradiation"] = weather_data["irradiation"].apply( @@ -131,7 +146,9 @@ def etl_weather_forecast_data(self, start_time_inclusive: datetime, end_time_inc .rename(columns={"index": "datetime"}) ).fillna(0) - self.weather_data["datetime"] = self.weather_data["datetime"] = self.weather_data["datetime"].dt.tz_convert("Europe/Amsterdam") + self.weather_data["datetime"] = self.weather_data["datetime"] = ( + self.weather_data["datetime"].dt.tz_convert("Europe/Amsterdam") + ) # Interpolate values of features to 15 min interval for weather_var in self.om_weather_forecast_vars.values(): diff --git a/src/main.py b/src/main.py index f797a33..6115709 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,3 @@ -import asyncio -import json import azure.functions as func from datetime import UTC, datetime, timedelta @@ -26,9 +24,9 @@ def _initialize_bl_client() -> BusinessLogicClient: """ bl_client = BusinessLogicHttpClientFactory.create_http_bl_client( vtn_base_url=VTN_BASE_URL, - client_id='test', - client_secret='test', - token_url='test', + client_id="test", + client_secret="test", + token_url="test", ) return bl_client @@ -71,7 +69,7 @@ async def main() -> None: try: logger.info("Triggering BL function at %s", datetime.now(tz=UTC)) event = await _generate_events() - + if not event: logger.warning( "No capacity limitation event could be constructed, skipping..." @@ -95,9 +93,11 @@ async def main() -> None: logger.info("Python timer trigger function executed.") + # if __name__ == "__main__": # asyncio.run(main()) + @bp.schedule( schedule="0 55 5 * * *", arg_name="myTimer", diff --git a/src/models/predicted_load.py b/src/models/predicted_load.py index 657cf21..e11f2d1 100644 --- a/src/models/predicted_load.py +++ b/src/models/predicted_load.py @@ -53,7 +53,9 @@ def flex_capacity_required(self, max_capacity: float) -> float | None: float: The amount of kw of flex which is needed for this grid asset load period. """ min_guaranteed_capacity = 4 - max_capacity_of_pod = 22 # TODO: configure based on input from actual battery when available. + max_capacity_of_pod = ( + 22 # TODO: configure based on input from actual battery when available. + ) max_excess = 50 excess_load = self.load - max_capacity @@ -61,7 +63,9 @@ def flex_capacity_required(self, max_capacity: float) -> float | None: return 0 # not exceeding capacity # Linear scaling from min_value to max_value - scaled = min_guaranteed_capacity + (max_capacity_of_pod - min_guaranteed_capacity) * (excess_load / max_excess) + scaled = min_guaranteed_capacity + ( + max_capacity_of_pod - min_guaranteed_capacity + ) * (excess_load / max_excess) # Cap to max_value return min(max_capacity_of_pod, scaled) From 7cf9a0532e7b4758711b49b892d98b13f35119fc Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Wed, 22 Oct 2025 11:42:56 +0200 Subject: [PATCH 08/10] install type stubs --- poetry.lock | 47 +++++++++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 5 ++++- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index f1231a5..e25ca86 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1868,6 +1868,18 @@ files = [ [package.dependencies] typing_extensions = ">=4.14.0" +[[package]] +name = "types-oauthlib" +version = "3.3.0.20250822" +description = "Typing stubs for oauthlib" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_oauthlib-3.3.0.20250822-py3-none-any.whl", hash = "sha256:b7f4c9b9eed0e020f454e0af800b10e93dd2efd196da65744b76910cce7e70d6"}, + {file = "types_oauthlib-3.3.0.20250822.tar.gz", hash = "sha256:2cd41587dd80c199e4230e3f086777e9ae525e89579c64afe5e0039ab09be9de"}, +] + [[package]] name = "types-pytz" version = "2025.2.0.20250516" @@ -1880,6 +1892,37 @@ files = [ {file = "types_pytz-2025.2.0.20250516.tar.gz", hash = "sha256:e1216306f8c0d5da6dafd6492e72eb080c9a166171fa80dd7a1990fd8be7a7b3"}, ] +[[package]] +name = "types-requests" +version = "2.32.4.20250913" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.9" +groups = ["main", "dev"] +files = [ + {file = "types_requests-2.32.4.20250913-py3-none-any.whl", hash = "sha256:78c9c1fffebbe0fa487a418e0fa5252017e9c60d1a2da394077f1780f655d7e1"}, + {file = "types_requests-2.32.4.20250913.tar.gz", hash = "sha256:abd6d4f9ce3a9383f269775a9835a4c24e5cd6b9f647d64f88aa4613c33def5d"}, +] + +[package.dependencies] +urllib3 = ">=2" + +[[package]] +name = "types-requests-oauthlib" +version = "2.0.0.20250809" +description = "Typing stubs for requests-oauthlib" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_requests_oauthlib-2.0.0.20250809-py3-none-any.whl", hash = "sha256:0d1af4907faf9f4a1b0f0afbc7ec488f1dd5561a2b5b6dad70f78091a1acfb76"}, + {file = "types_requests_oauthlib-2.0.0.20250809.tar.gz", hash = "sha256:f3b9b31e0394fe2c362f0d44bc9ef6d5c150a298d01089513cd54a51daec37a2"}, +] + +[package.dependencies] +types-oauthlib = "*" +types-requests = "*" + [[package]] name = "typing-extensions" version = "4.14.1" @@ -1941,7 +1984,7 @@ version = "2.5.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"}, {file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"}, @@ -2093,4 +2136,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.12, <4" -content-hash = "65b711b44ff4a8bb28e9c4805797f2b1a8e0c664507c335aa04b4f1cb0f98670" +content-hash = "c95bb38905c80edda62bef86b0ca293abed2d4ae288f09c83974730f4eff612a" diff --git a/pyproject.toml b/pyproject.toml index b91b227..902539a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,8 @@ dependencies = [ "openadr3-client (==0.0.11)", "requests (>=2.32.5,<3.0.0)", "requests-oauthlib (>=2.0.0,<3.0.0)", - "holidays (>=0.83,<0.84)" + "holidays (>=0.83,<0.84)", + "types-requests (>=2.32.4.20250913,<3.0.0.0)", ] [build-system] @@ -31,6 +32,8 @@ package-mode = false [tool.poetry.group.dev.dependencies] pytest-cov = "^6.2.1" +types-oauthlib = "^3.3.0.20250822" +types-requests-oauthlib = "^2.0.0.20250809" [[tool.mypy.overrides]] module = ["decouple", "openadr3_client_gac_compliance"] From f568a92572dc2bf4a78738d932f742e92c8d8a6a Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Thu, 23 Oct 2025 15:39:55 +0200 Subject: [PATCH 09/10] update docker image name for ditm bl, run linter --- .github/workflows/cd.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 82bf21d..5c98b84 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -32,10 +32,10 @@ jobs: - name: Create version tag shell: bash - run: echo "tag=docker.elaad.io/elaadnl/bl-reference-implementation:$(git show -s --format="%ct-%h" $GITHUB_SHA)" >> $GITHUB_ENV + run: echo "tag=docker.elaad.io/elaadnl/ditm-openadr-bl:$(git show -s --format="%ct-%h" $GITHUB_SHA)" >> $GITHUB_ENV - name: Latest tag on main branch if: github.ref == 'refs/heads/main' - run: echo "tag_main=,docker.elaad.io/elaadnl/bl-reference-implementation:latest" >> $GITHUB_ENV + run: echo "tag_main=,docker.elaad.io/elaadnl/ditm-openadr-bl:latest" >> $GITHUB_ENV - name: Set up Docker Buildx uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 From 74579c6a1173ab13a9a4269c5e93c326ee358ece Mon Sep 17 00:00:00 2001 From: Nick van der Burgt Date: Fri, 24 Oct 2025 09:37:25 +0200 Subject: [PATCH 10/10] update BL logic --- src/config.py | 5 +++++ src/infrastructure/azureml/predictions.py | 2 +- src/main.py | 17 +++++++++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/config.py b/src/config.py index f6cb6b3..a803e78 100644 --- a/src/config.py +++ b/src/config.py @@ -38,3 +38,8 @@ DITM_MODEL_API_CLIENT_ID = config("DITM_MODEL_API_CLIENT_ID") DITM_MODEL_API_CLIENT_SECRET = config("DITM_MODEL_API_CLIENT_SECRET") DITM_MODEL_API_TOKEN_URL = config("DITM_MODEL_API_TOKEN_URL") + +OAUTH_CLIENT_ID = config("OAUTH_CLIENT_ID") +OAUTH_CLIENT_SECRET = config("OAUTH_CLIENT_SECRET") +OAUTH_TOKEN_ENDPOINT = config("OAUTH_TOKEN_ENDPOINT") +OAUTH_SCOPES = config("OAUTH_SCOPES") diff --git a/src/infrastructure/azureml/predictions.py b/src/infrastructure/azureml/predictions.py index e213a0e..df57d8a 100644 --- a/src/infrastructure/azureml/predictions.py +++ b/src/infrastructure/azureml/predictions.py @@ -108,7 +108,7 @@ def get_predictions_for_features( for index, pred in enumerate(predictions): matching_df_row = features.iloc[index] - datetime_of_load = matching_df_row["datetime"][0] + datetime_of_load = matching_df_row["datetime"].iloc[0] converted = pd.to_datetime(datetime_of_load, utc=True).to_pydatetime() loads.append( diff --git a/src/main.py b/src/main.py index 6115709..65ec62e 100644 --- a/src/main.py +++ b/src/main.py @@ -11,7 +11,15 @@ from src.infrastructure.influxdb._client import create_db_client from src.infrastructure.prediction_actions_impl import PredictionActionsInfluxDB from src.logger import logger -from src.config import PROGRAM_ID, VEN_NAME, VTN_BASE_URL +from src.config import ( + PROGRAM_ID, + VEN_NAME, + VTN_BASE_URL, + OAUTH_CLIENT_ID, + OAUTH_CLIENT_SECRET, + OAUTH_TOKEN_ENDPOINT, + OAUTH_SCOPES, +) bp = func.Blueprint() @@ -24,9 +32,10 @@ def _initialize_bl_client() -> BusinessLogicClient: """ bl_client = BusinessLogicHttpClientFactory.create_http_bl_client( vtn_base_url=VTN_BASE_URL, - client_id="test", - client_secret="test", - token_url="test", + client_id=OAUTH_CLIENT_ID, + client_secret=OAUTH_CLIENT_SECRET, + token_url=OAUTH_TOKEN_ENDPOINT, + scopes=OAUTH_SCOPES.split(","), ) return bl_client