From 8edbf8af9eb70cf8f965fd4acb6977cbea040307 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 15:41:08 +0100 Subject: [PATCH 1/6] feat: start test cases for deserializing time series data to BeliefsDataFrames Signed-off-by: F.N. Claessen --- .../schemas/tests/test_sensor_data_schema.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py index b1590937a8..6294e749ae 100644 --- a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py +++ b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py @@ -5,6 +5,7 @@ from marshmallow import ValidationError import pandas as pd +from timely_beliefs import BeliefsDataFrame from unittest import mock from flexmeasures.api.common.schemas.sensor_data import ( @@ -481,3 +482,70 @@ def test_build_asset_jobs_data(db, app, add_battery_assets): app.queues["forecasting"].empty() assert app.queues["scheduling"].count == 0 assert app.queues["forecasting"].count == 0 + + +@pytest.mark.parametrize( + "deserialization_input, exp_length, exp_deserialization_output", + [ + # A single 2-hour event (spanning 3 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor + ( + { + "sensor": "epex_da", # name is used to look up the corresponding sensor ID + "start": "2025-11-21T14:40+01", + "duration": "PT2H", + "values": [20.5], + "unit": "EUR/kWh", + }, + 2, + {}, + ), + # Six 2-minute events (spanning 2 wall-clock hours) are deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor + ( + { + "sensor": "epex_da", # name is used to look up the corresponding sensor ID + "start": "2025-11-21T10:50:40+01", + "duration": "PT12M", + "values": [1, 2, 3, 4, 5, 6], + "unit": "EUR/kWh", + }, + 2, + {}, + ), + ], +) +@pytest.mark.parametrize( + "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True +) +def test_time_series_deserialization( + setup_markets, + setup_roles_users, + deserialization_input, + exp_length, + exp_deserialization_output, + requesting_user, +): + """Test loading of posted sensor data.""" + schema = PostSensorDataSchema() + + # Look up sensor by name and replace it with the sensor ID + sensor = setup_markets[deserialization_input["sensor"]] + deserialization_input["sensor"] = sensor.id + + deser = schema.load(deserialization_input) + bdf = deser["bdf"] + + assert isinstance(bdf, BeliefsDataFrame) + assert bdf.event_resolution == sensor.event_resolution + assert len(bdf) == exp_length + + resolution = pd.Timedelta(deserialization_input["duration"]) / len( + deserialization_input["values"] + ) + if resolution < sensor.event_resolution: + assert bdf.event_starts[0] == pd.Timestamp( + deserialization_input["start"] + ).floor(sensor.event_resolution) + else: + assert bdf.event_starts[0] == pd.Timestamp(deserialization_input["start"]) + assert len(bdf.lineage.belief_times) == 1, "expected unique belief time" + # assert deser == exp_deserialization_output From 3b2770f94d87164d1bfd92f142ead694971b3493 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 15:41:49 +0100 Subject: [PATCH 2/6] fix: update type annotation Signed-off-by: F.N. Claessen --- flexmeasures/api/common/schemas/sensor_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index 7f6b0f337a..7d70299daa 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -303,7 +303,7 @@ def check_multiple_instantenous_values(self, data, **kwargs): ) @post_load() - def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame: + def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame]: """ If needed, upsample and convert units, then deserialize to a BeliefsDataFrame. Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True. From 34a2dc50fdeca42e2088c6d035373efd362f0409 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 15:42:25 +0100 Subject: [PATCH 3/6] feat: handle resampling after converting to a BeliefsDataFrame Signed-off-by: F.N. Claessen --- .../api/common/schemas/sensor_data.py | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index 7d70299daa..66b8eaf94a 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -267,28 +267,6 @@ def check_schema_unit_against_type(self, data, **kwargs): f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}" ) - @validates_schema - def check_resolution_compatibility_of_sensor_data(self, data, **kwargs): - """Ensure event frequency is compatible with the sensor's event resolution. - - For a sensor recording instantaneous values, any event frequency is compatible. - For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution. - Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events). - """ - required_resolution = data["sensor"].event_resolution - - if required_resolution == timedelta(hours=0): - # For instantaneous sensors, any event frequency is compatible - return - - # The event frequency is inferred by assuming sequential, equidistant values within a time interval. - # The event resolution is assumed to be equal to the event frequency. - inferred_resolution = data["duration"] / len(data["values"]) - if inferred_resolution % required_resolution != timedelta(hours=0): - raise ValidationError( - f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}." - ) - @validates_schema def check_multiple_instantenous_values(self, data, **kwargs): """Ensure that we are not getting multiple instantaneous values that overlap. @@ -308,7 +286,6 @@ def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame If needed, upsample and convert units, then deserialize to a BeliefsDataFrame. Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True. """ - data = self.possibly_upsample_values(data) data = self.possibly_convert_units(data) bdf = self.load_bdf(data) @@ -371,6 +348,7 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame: event_resolution = sensor_data["duration"] / num_values start = sensor_data["start"] sensor = sensor_data["sensor"] + inferred_resolution = sensor_data["duration"] / len(sensor_data["values"]) if frequency := sensor.get_attribute("frequency"): start = pd.Timestamp(start).round(frequency) @@ -396,12 +374,15 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame: belief_timing["belief_horizon"] = sensor_data["horizon"] else: belief_timing["belief_time"] = server_now() - return BeliefsDataFrame( + bdf = BeliefsDataFrame( s, source=source, sensor=sensor_data["sensor"], + event_resolution=inferred_resolution, **belief_timing, ) + resampled_bdf = bdf.resample_events(sensor.event_resolution) + return resampled_bdf class GetSensorDataSchemaEntityAddress(GetSensorDataSchema): From 8d2d2d3bad0afa96e88653551ab9409937d40164 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 15:48:57 +0100 Subject: [PATCH 4/6] fix: catch NotImplementedError to fix failing test Signed-off-by: F.N. Claessen --- flexmeasures/api/common/schemas/sensor_data.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index 66b8eaf94a..754f219d1d 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -381,7 +381,12 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame: event_resolution=inferred_resolution, **belief_timing, ) - resampled_bdf = bdf.resample_events(sensor.event_resolution) + try: + resampled_bdf = bdf.resample_events(sensor.event_resolution) + except NotImplementedError: + raise ValidationError( + f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {sensor.event_resolution}." + ) return resampled_bdf From 86ad7745107624e5a6e5e63ccd63e301b438d22c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 16:21:41 +0100 Subject: [PATCH 5/6] feat: two more test cases Signed-off-by: F.N. Claessen --- .../schemas/tests/test_sensor_data_schema.py | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py index 6294e749ae..f31e9c9072 100644 --- a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py +++ b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py @@ -487,7 +487,7 @@ def test_build_asset_jobs_data(db, app, add_battery_assets): @pytest.mark.parametrize( "deserialization_input, exp_length, exp_deserialization_output", [ - # A single 2-hour event (spanning 3 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor + # # A single 2-hour event (spanning 3 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor ( { "sensor": "epex_da", # name is used to look up the corresponding sensor ID @@ -499,6 +499,30 @@ def test_build_asset_jobs_data(db, app, add_battery_assets): 2, {}, ), + # One 5-minute event (spanning 2 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor + ( + { + "sensor": "epex_da", # name is used to look up the corresponding sensor ID + "start": "2025-11-21T10:58:40+01", + "duration": "PT5M", + "values": [1], + "unit": "EUR/kWh", + }, + 2, + {}, + ), + # One 2-minute event (spanning 1 wall-clock hour) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor + ( + { + "sensor": "epex_da", # name is used to look up the corresponding sensor ID + "start": "2025-11-21T10:58:00+01", + "duration": "PT2M", + "values": [1], + "unit": "EUR/kWh", + }, + 1, + {}, + ), # Six 2-minute events (spanning 2 wall-clock hours) are deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor ( { From 30fb2ad412da6edc4ea7687cfac837a3666635f5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 21 Nov 2025 17:01:35 +0100 Subject: [PATCH 6/6] fix: update test expectations Signed-off-by: F.N. Claessen --- flexmeasures/api/v3_0/tests/test_sensor_data.py | 6 ------ flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py | 6 +++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/flexmeasures/api/v3_0/tests/test_sensor_data.py b/flexmeasures/api/v3_0/tests/test_sensor_data.py index c9c1822c7c..49bc4ef574 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_data.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_data.py @@ -145,12 +145,6 @@ def test_post_sensor_data_bad_auth( "request_field, new_value, error_field, error_text", [ ("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"), - ( - "duration", - "PT30M", - "_schema", - "Resolution of 0:05:00 is incompatible", - ), # downsampling not supported ("unit", "m", "_schema", "Required unit"), ("type", "GetSensorDataRequest", "type", "Must be one of"), ], diff --git a/flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py b/flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py index 3912bc0cca..aafa84b94d 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py @@ -31,12 +31,12 @@ ), # upsample from single value for 1-hour interval, sent as float rather than list of floats ( 4, - 0, + 6, "m³/h", False, - None, + -11.28, False, - 422, + 200, ), # failed to resample from 15-min intervals to 10-min intervals ( 10,