Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions flexmeasures/api/common/schemas/sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,28 +267,6 @@ def check_schema_unit_against_type(self, data, **kwargs):
f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
)

@validates_schema
def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
"""Ensure event frequency is compatible with the sensor's event resolution.

For a sensor recording instantaneous values, any event frequency is compatible.
For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution.
Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events).
"""
required_resolution = data["sensor"].event_resolution

if required_resolution == timedelta(hours=0):
# For instantaneous sensors, any event frequency is compatible
return

# The event frequency is inferred by assuming sequential, equidistant values within a time interval.
# The event resolution is assumed to be equal to the event frequency.
inferred_resolution = data["duration"] / len(data["values"])
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)

@validates_schema
def check_multiple_instantenous_values(self, data, **kwargs):
"""Ensure that we are not getting multiple instantaneous values that overlap.
Expand All @@ -303,12 +281,11 @@ def check_multiple_instantenous_values(self, data, **kwargs):
)

@post_load()
def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame]:
"""
If needed, upsample and convert units, then deserialize to a BeliefsDataFrame.
Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True.
"""
data = self.possibly_upsample_values(data)
data = self.possibly_convert_units(data)
bdf = self.load_bdf(data)

Expand Down Expand Up @@ -371,6 +348,7 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
event_resolution = sensor_data["duration"] / num_values
start = sensor_data["start"]
sensor = sensor_data["sensor"]
inferred_resolution = sensor_data["duration"] / len(sensor_data["values"])

if frequency := sensor.get_attribute("frequency"):
start = pd.Timestamp(start).round(frequency)
Expand All @@ -396,12 +374,20 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
belief_timing["belief_horizon"] = sensor_data["horizon"]
else:
belief_timing["belief_time"] = server_now()
return BeliefsDataFrame(
bdf = BeliefsDataFrame(
s,
source=source,
sensor=sensor_data["sensor"],
event_resolution=inferred_resolution,
**belief_timing,
)
try:
resampled_bdf = bdf.resample_events(sensor.event_resolution)
except NotImplementedError:
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {sensor.event_resolution}."
)
return resampled_bdf


class GetSensorDataSchemaEntityAddress(GetSensorDataSchema):
Expand Down
92 changes: 92 additions & 0 deletions flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from marshmallow import ValidationError
import pandas as pd
from timely_beliefs import BeliefsDataFrame
from unittest import mock

from flexmeasures.api.common.schemas.sensor_data import (
Expand Down Expand Up @@ -481,3 +482,94 @@ def test_build_asset_jobs_data(db, app, add_battery_assets):
app.queues["forecasting"].empty()
assert app.queues["scheduling"].count == 0
assert app.queues["forecasting"].count == 0


@pytest.mark.parametrize(
"deserialization_input, exp_length, exp_deserialization_output",
[
# # A single 2-hour event (spanning 3 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor
(
{
"sensor": "epex_da", # name is used to look up the corresponding sensor ID
"start": "2025-11-21T14:40+01",
"duration": "PT2H",
"values": [20.5],
"unit": "EUR/kWh",
},
2,
{},
),
# One 5-minute event (spanning 2 wall-clock hours) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor
(
{
"sensor": "epex_da", # name is used to look up the corresponding sensor ID
"start": "2025-11-21T10:58:40+01",
"duration": "PT5M",
"values": [1],
"unit": "EUR/kWh",
},
2,
{},
),
# One 2-minute event (spanning 1 wall-clock hour) is deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor
(
{
"sensor": "epex_da", # name is used to look up the corresponding sensor ID
"start": "2025-11-21T10:58:00+01",
"duration": "PT2M",
"values": [1],
"unit": "EUR/kWh",
},
1,
{},
),
# Six 2-minute events (spanning 2 wall-clock hours) are deserialized to a BeliefsDataFrame, to be recorded on a 1-hour sensor
(
{
"sensor": "epex_da", # name is used to look up the corresponding sensor ID
"start": "2025-11-21T10:50:40+01",
"duration": "PT12M",
"values": [1, 2, 3, 4, 5, 6],
"unit": "EUR/kWh",
},
2,
{},
),
],
)
@pytest.mark.parametrize(
"requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
)
def test_time_series_deserialization(
setup_markets,
setup_roles_users,
deserialization_input,
exp_length,
exp_deserialization_output,
requesting_user,
):
"""Test loading of posted sensor data."""
schema = PostSensorDataSchema()

# Look up sensor by name and replace it with the sensor ID
sensor = setup_markets[deserialization_input["sensor"]]
deserialization_input["sensor"] = sensor.id

deser = schema.load(deserialization_input)
bdf = deser["bdf"]

assert isinstance(bdf, BeliefsDataFrame)
assert bdf.event_resolution == sensor.event_resolution
assert len(bdf) == exp_length

resolution = pd.Timedelta(deserialization_input["duration"]) / len(
deserialization_input["values"]
)
if resolution < sensor.event_resolution:
assert bdf.event_starts[0] == pd.Timestamp(
deserialization_input["start"]
).floor(sensor.event_resolution)
else:
assert bdf.event_starts[0] == pd.Timestamp(deserialization_input["start"])
assert len(bdf.lineage.belief_times) == 1, "expected unique belief time"
# assert deser == exp_deserialization_output
6 changes: 0 additions & 6 deletions flexmeasures/api/v3_0/tests/test_sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ def test_post_sensor_data_bad_auth(
"request_field, new_value, error_field, error_text",
[
("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"),
(
"duration",
"PT30M",
"_schema",
"Resolution of 0:05:00 is incompatible",
), # downsampling not supported
("unit", "m", "_schema", "Required unit"),
("type", "GetSensorDataRequest", "type", "Must be one of"),
],
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/v3_0/tests/test_sensor_data_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
), # upsample from single value for 1-hour interval, sent as float rather than list of floats
(
4,
0,
6,
"m³/h",
False,
None,
-11.28,
False,
422,
200,
), # failed to resample from 15-min intervals to 10-min intervals
(
10,
Expand Down
Loading