From 485dc079a6cd14e10e7b555246e867c57086ad00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:44:05 +0100 Subject: [PATCH 01/16] Retrieve data stream via oneshot channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- pyproject.toml | 2 +- .../_component_metric_request.py | 7 ++ .../_data_sourcing/microgrid_api_source.py | 38 ++++--- tests/microgrid/test_data_sourcing.py | 101 +++++------------- 4 files changed, 60 insertions(+), 88 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4b7835460..39db0b5ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "frequenz-client-microgrid >= 0.18.1, < 0.19.0", "frequenz-microgrid-component-graph >= 0.3.4, < 0.4", "frequenz-client-common >= 0.3.6, < 0.4.0", - "frequenz-channels >= 1.6.1, < 2.0.0", + "frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@oneshot", "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "numpy >= 2.1.0, < 3", "typing_extensions >= 4.14.1, < 5", diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py index 2fb8f5aa3..cb2b18313 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py @@ -6,13 +6,17 @@ from dataclasses import dataclass from datetime import datetime +from frequenz.channels import Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk.microgrid._old_component_data import TransitionalMetric __all__ = ["ComponentMetricRequest", "Metric"] +from frequenz.sdk.timeseries import Sample + @dataclass class ComponentMetricRequest: @@ -51,6 +55,9 @@ class ComponentMetricRequest: If None, only live data is streamed. """ + telem_stream_sender: Sender[Receiver[Sample[Quantity]]] + """Sender of a oneshot channel used to send the data stream back to the requester.""" + def get_channel_name(self) -> str: """Construct the channel name based on the request parameters. diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index d43da8e58..03faad41e 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -8,7 +8,7 @@ from collections.abc import Callable from typing import Any -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ComponentCategory from frequenz.client.microgrid.metrics import Metric @@ -182,6 +182,9 @@ def __init__( ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} + self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} + """ Channel cache for reuse (map channel name to channel).""" + async def _get_component_category( self, comp_id: ComponentId ) -> ComponentCategory | int | None: @@ -397,7 +400,7 @@ def _get_data_extraction_method( _logger.error(err) raise ValueError(err) - def _get_metric_senders( + async def _get_metric_senders( self, category: ComponentCategory | int, requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], @@ -413,18 +416,23 @@ def _get_metric_senders( A dictionary of output metric names to channel senders from the channel registry. """ - return [ - ( - self._get_data_extraction_method(category, metric), - [ - self._registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ).new_sender() - for request in req_list - ], - ) - for (metric, req_list) in requests.items() - ] + all_senders = [] + for metric, req_list in requests.items(): + extraction_method = self._get_data_extraction_method(category, metric) + senders = [] + for request in req_list: + channel_name = request.get_channel_name() + # Create missing channels and inform the requesting side via oneshot + if channel_name not in self._channel_lookup: + telem_stream: Broadcast[Sample[Quantity]] = Broadcast( + name=channel_name + ) + self._channel_lookup[channel_name] = telem_stream + await request.telem_stream_sender.send(telem_stream.new_receiver()) + senders.append(self._channel_lookup[channel_name].new_sender()) + all_senders.append((extraction_method, senders)) + + return all_senders async def _handle_data_stream( self, @@ -446,7 +454,7 @@ async def _handle_data_stream( await self._check_requested_component_and_metrics( comp_id, category, self._req_streaming_metrics[comp_id] ) - stream_senders = self._get_metric_senders( + stream_senders = await self._get_metric_senders( category, self._req_streaming_metrics[comp_id] ) api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index bcc4e5857..6ec3f8e0c 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -11,7 +11,7 @@ import pytest import pytest_mock -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, Receiver, make_oneshot from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ( @@ -24,7 +24,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, @@ -84,88 +83,46 @@ def mock_connection_manager(mocker: pytest_mock.MockFixture) -> mock.Mock: return mock_conn_manager +@pytest.mark.parametrize( + ("component_id", "metric", "expected_sample_value"), + [ + (ComponentId(4), Metric.AC_ACTIVE_POWER, 100.0), + (ComponentId(4), Metric.AC_REACTIVE_POWER, 100.0), + (ComponentId(6), Metric.AC_ACTIVE_POWER, 0.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(12), Metric.AC_ACTIVE_POWER, -13.0), + ], +) async def test_data_sourcing_actor( # pylint: disable=too-many-locals mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument + component_id: ComponentId, + metric: Metric, + expected_sample_value: float, ) -> None: """Tests for the DataSourcingActor.""" req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") req_sender = req_chan.new_sender() - registry = ChannelRegistry(name="test-registry") - - async with DataSourcingActor(req_chan.new_receiver(), registry): - active_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_4 = registry.get_or_create( - Sample[Quantity], active_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_4) - - reactive_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_REACTIVE_POWER, None - ) - reactive_power_recv_4 = registry.get_or_create( - Sample[Quantity], reactive_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(reactive_power_request_4) - - active_power_request_6 = ComponentMetricRequest( - "test-namespace", ComponentId(6), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_6 = registry.get_or_create( - Sample[Quantity], active_power_request_6.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_6) - - soc_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None - ) - soc_recv_9 = registry.get_or_create( - Sample[Quantity], soc_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc_request_9) - - soc2_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None + async with DataSourcingActor(req_chan.new_receiver()): + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] ) - soc2_recv_9 = registry.get_or_create( - Sample[Quantity], soc2_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc2_request_9) - active_power_request_12 = ComponentMetricRequest( - "test-namespace", ComponentId(12), Metric.AC_ACTIVE_POWER, None + component_metric_request = ComponentMetricRequest( + "test-namespace", + component_id, + metric, + None, + telem_stream_sender, ) - active_power_recv_12 = registry.get_or_create( - Sample[Quantity], active_power_request_12.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_12) - - for i in range(3): - sample = await active_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await reactive_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await active_power_recv_6.receive() - assert sample.value is not None - assert 0.0 + i == sample.value.base_value - - sample = await soc_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value - - sample = await soc2_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value + await req_sender.send(component_metric_request) + telem_stream = await telem_stream_receiver.receive() - sample = await active_power_recv_12.receive() + for i in range(10): + sample = await telem_stream.receive() assert sample.value is not None - assert -13.0 + i == sample.value.base_value + assert expected_sample_value + i == sample.value.base_value def _new_meter_data( From 595253a70adce211c21512058e66b39a5bff7e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:45:45 +0100 Subject: [PATCH 02/16] Use oneshot channel setup in resampler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/microgrid/_resampling.py | 53 ++++--- .../formulas/_resampled_stream_fetcher.py | 15 +- tests/actor/test_resampling.py | 134 ++++++++++++++---- tests/timeseries/mock_resampler.py | 65 ++++----- 4 files changed, 174 insertions(+), 93 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index b5c554513..0e330fe9f 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -5,10 +5,9 @@ import asyncio -import dataclasses import logging -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await @@ -58,7 +57,7 @@ def __init__( # pylint: disable=too-many-arguments resampling_request_receiver ) self._resampler: Resampler = Resampler(config) - self._active_req_channels: set[str] = set() + self._upstream_channels: dict[str, Broadcast[Sample[Quantity]]] = {} async def _subscribe(self, request: ComponentMetricRequest) -> None: """Request data for a component metric. @@ -68,28 +67,38 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: """ request_channel_name = request.get_channel_name() - # If we are already handling this request, there is nothing to do. - if request_channel_name in self._active_req_channels: + # If we are already handling this request, answer the request by sending a + # new receiver from the existing channel. + if upstream_channel := self._upstream_channels.get(request_channel_name): + await request.telem_stream_sender.send(upstream_channel.new_receiver()) return - self._active_req_channels.add(request_channel_name) - - data_source_request = dataclasses.replace( - request, namespace=request.namespace + ":Source" + # Derive a new ComponentMetricRequest from the given one to + # receive the telemetry stream from the data sourcing actor. + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + own_request = ComponentMetricRequest( + namespace=request.namespace + ":Source", + component_id=request.component_id, + metric=request.metric, + start_time=request.start_time, + telem_stream_sender=telem_stream_sender, + ) + await self._data_sourcing_request_sender.send(own_request) + telem_stream = await telem_stream_receiver.receive() + + # Create a new channel based on the original ComponentMetricRequest + # to act as our data sink. + upstream_channel = Broadcast(name=request_channel_name, resend_latest=True) + await request.telem_stream_sender.send(upstream_channel.new_receiver()) + self._upstream_channels[request_channel_name] = upstream_channel + + self._resampler.add_timeseries( + name=request_channel_name, + source=telem_stream, + sink=upstream_channel.new_sender().send, ) - data_source_channel_name = data_source_request.get_channel_name() - await self._data_sourcing_request_sender.send(data_source_request) - receiver = self._channel_registry.get_or_create( - Sample[Quantity], data_source_channel_name - ).new_receiver() - - # This is a temporary hack until the Sender implementation uses - # exceptions to report errors. - sender = self._channel_registry.get_or_create( - Sample[Quantity], request_channel_name - ).new_sender() - - self._resampler.add_timeseries(request_channel_name, receiver, sender.send) async def _process_resampling_requests(self) -> None: """Process resampling data requests.""" diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index 7ad45ec53..0f2165d4c 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -3,7 +3,7 @@ """Fetches telemetry streams for components.""" -from frequenz.channels import Receiver, Sender +from frequenz.channels import Receiver, Sender, make_oneshot from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity @@ -54,18 +54,17 @@ async def fetch_stream( Returns: A receiver to stream resampled data for the given component id. """ + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + request = ComponentMetricRequest( self._namespace, component_id, self._metric, None, + telem_stream_sender, ) - chan = self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ) - chan.resend_latest = True - await self._resampler_subscription_sender.send(request) - - return chan.new_receiver() + return await telem_stream_receiver.receive() diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 9b15339de..fc7a6b6f5 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -3,18 +3,16 @@ """Frequenz Python SDK resampling example.""" import asyncio -import dataclasses from datetime import datetime, timedelta, timezone import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ComponentMetricRequest from frequenz.sdk.microgrid._resampling import ComponentMetricsResamplingActor from frequenz.sdk.timeseries import ResamplerConfig2, Sample @@ -31,19 +29,10 @@ def _now() -> datetime: async def _assert_resampling_works( - channel_registry: ChannelRegistry, + timeseries_sender: Sender[Sample[Quantity]], + timeseries_receiver: Receiver[Sample[Quantity]], fake_time: time_machine.Coordinates, - *, - resampling_chan_name: str, - data_source_chan_name: str, ) -> None: - timeseries_receiver = channel_registry.get_or_create( - Sample[Quantity], resampling_chan_name - ).new_receiver() - timeseries_sender = channel_registry.get_or_create( - Sample[Quantity], data_source_chan_name - ).new_sender() - fake_time.shift(0.2) new_sample = await timeseries_receiver.receive() # At 0.2s (timer) assert new_sample == Sample(_now(), None) @@ -103,14 +92,12 @@ async def test_single_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -118,25 +105,35 @@ async def test_single_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) data_source_req = await data_source_req_recv.receive() assert data_source_req is not None - assert data_source_req == dataclasses.replace( - subs_req, namespace="Resampling:Source" - ) + + assert data_source_req.namespace == "Resampling:Source" + assert data_source_req.component_id == ComponentId(9) + assert data_source_req.metric == Metric.BATTERY_SOC_PCT + assert data_source_req.start_time is None + assert data_source_req.telem_stream_sender != telem_stream_sender + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access @@ -146,14 +143,12 @@ async def test_duplicate_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -161,11 +156,15 @@ async def test_duplicate_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) @@ -176,11 +175,88 @@ async def test_duplicate_request( with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1) + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, + ) + + await resampling_actor._resampler.stop() # pylint: disable=protected-access + + +async def test_resubscribe(fake_time: time_machine.Coordinates) -> None: + """Test that resampling works when e receiver resubscribes. + + For example, Coalesce may close its receivers and resubscribe to the + same component later on. ComponentMetricsResamplingActor must provide + a new receiver in that case. + """ + data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") + data_source_req_recv = data_source_req_chan.new_receiver() + resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") + resampling_req_sender = resampling_req_chan.new_sender() + + async with ComponentMetricsResamplingActor( + data_sourcing_request_sender=data_source_req_chan.new_sender(), + resampling_request_receiver=resampling_req_chan.new_receiver(), + config=ResamplerConfig2( + resampling_period=timedelta(seconds=0.2), + max_data_age_in_periods=2, + ), + ) as resampling_actor: + + async def send_metric_request() -> Receiver[Receiver[Sample[Quantity]]]: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + subs_req = ComponentMetricRequest( + namespace="Resampling", + component_id=ComponentId(9), + metric=Metric.BATTERY_SOC_PCT, + start_time=None, + telem_stream_sender=telem_stream_sender, + ) + await resampling_req_sender.send(subs_req) + return telem_stream_receiver + + telem_stream_receiver = await send_metric_request() + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + data_source_req = await data_source_req_recv.receive() + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + resampled_stream_receiver = await telem_stream_receiver.receive() + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, + ) + + resampled_stream_receiver.close() + + # Resubscribe to the same metric data + telem_stream_receiver = await send_metric_request() + resampled_stream_receiver = await telem_stream_receiver.receive() + + # No need to answer the request in the data sourcing actor - The resampler answers + + # New subscriptions receive the latest resampled value immediately. + # This must be drained for _assert_resampling_works to have a clean start. + resent_sample = await resampled_stream_receiver.receive() + assert resent_sample is not None + assert resent_sample.value is None + assert resent_sample.timestamp == _now() + await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 8ae6b0920..b3e221683 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -41,13 +41,18 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument ) -> None: """Create a `MockDataPipeline` instance.""" self._data_pipeline = _DataPipeline(resampler_config) - - self._channel_registry = self._data_pipeline._channel_registry + self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} self._resampler_request_channel = Broadcast[ComponentMetricRequest]( - name="resampler-request" + name="resampler-request", + resend_latest=True, ) self._input_channels_receivers: dict[str, list[Receiver[Sample[Quantity]]]] = {} + def get_or_create_channel(name: str) -> Broadcast[Sample[Quantity]]: + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast[Sample[Quantity]](name=name) + return self._channel_lookup[name] + def metric_senders( comp_ids: list[ComponentId], metric_id: Metric, @@ -55,15 +60,9 @@ def metric_senders( senders: list[Sender[Sample[Quantity]]] = [] for comp_id in comp_ids: name = f"{comp_id}:{metric_id}" - senders.append( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + senders.append(get_or_create_channel(name).new_sender()) self._input_channels_receivers[name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_receiver() + get_or_create_channel(name).new_receiver() for _ in range(namespaces) ] return senders @@ -115,33 +114,21 @@ def multi_phase_senders( senders.append( [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_sender(), + get_or_create_channel(p1_name).new_sender(), + get_or_create_channel(p2_name).new_sender(), + get_or_create_channel(p3_name).new_sender(), ] ) self._input_channels_receivers[p1_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_receiver() + get_or_create_channel(p1_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p2_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_receiver() + get_or_create_channel(p2_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p3_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_receiver() + get_or_create_channel(p3_name).new_receiver() for _ in range(namespaces) ] return senders @@ -237,16 +224,26 @@ async def _channel_forward_messages( async def _handle_resampling_requests(self) -> None: async for request in self._resampler_request_channel.new_receiver(): name = request.get_channel_name() + if name in self._forward_tasks: + # Forward task exists, but we must create a new receiver + # from the existing channel and return it to the request sender. + assert name in self._channel_lookup + output_channel = self._channel_lookup[name] + await request.telem_stream_sender.send(output_channel.new_receiver()) continue + input_chan_recv_name = f"{request.component_id}:{request.metric}" input_chan_recv = self._input_channels_receivers[input_chan_recv_name].pop() assert input_chan_recv is not None - output_chan_sender: Sender[Sample[Quantity]] = ( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast(name=name, resend_latest=True) + + output_channel = self._channel_lookup[name] + output_chan_sender = output_channel.new_sender() + await request.telem_stream_sender.send(output_channel.new_receiver()) + task = asyncio.create_task( self._channel_forward_messages( input_chan_recv, From 6ffc93e5b3d0a1b490dc86d41f2ac35d556b3ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:46:07 +0100 Subject: [PATCH 03/16] Use oneshot channel setup in GridFrequency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_grid_frequency.py | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 774b2a02b..07ebda951 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -8,8 +8,8 @@ import asyncio import logging -from frequenz.channels import Receiver, Sender -from frequenz.client.common.microgrid.components import ComponentId +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Frequency, Quantity @@ -23,20 +23,6 @@ _logger = logging.getLogger(__name__) -def create_request(component_id: ComponentId) -> ComponentMetricRequest: - """Create a request for grid frequency. - - Args: - component_id: The component id to use for the request. - - Returns: - A component metric request for grid frequency. - """ - return ComponentMetricRequest( - "grid-frequency", component_id, Metric.AC_FREQUENCY, None - ) - - class GridFrequency: """Grid Frequency.""" @@ -65,10 +51,27 @@ def __init__( ) self._channel_registry: ChannelRegistry = channel_registry self._source_component: Component = source - self._component_metric_request: ComponentMetricRequest = create_request( - self._source_component.id + + # Microgrid API source will send the stream through a oneshot channel + telem_stream_sender, self._telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] ) + self._component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, + ) + + # This channel merely forwards the telemetry stream. It is needed + # because we must return a receiver synchronously in new_receiver. + # The "real" channel for telemetry must be created in and owned by + # MicrogridApiSource, otherwise streams would not be reused. + self._forwarding_channel: Broadcast[Sample[Quantity]] | None = None + + # Sadly needed for testing self._task: None | asyncio.Task[None] = None @property @@ -86,18 +89,13 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: Returns: A receiver that will receive grid frequency samples. """ - receiver = self._channel_registry.get_or_create( - Sample[Quantity], self._component_metric_request.get_channel_name() - ).new_receiver() - - if not self._task: - self._task = asyncio.create_task(self._send_request()) - else: - _logger.info( - "Grid frequency request already sent: %s", self._source_component + if self._forwarding_channel is None: + self._forwarding_channel = Broadcast(name="Forward frequency samples") + self._task = asyncio.create_task( + self._send_request(self._forwarding_channel.new_sender()) ) - return receiver.map( + return self._forwarding_channel.new_receiver().map( lambda sample: ( Sample[Frequency](sample.timestamp, None) if sample.value is None or sample.value.isnan() @@ -107,7 +105,13 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: ) ) - async def _send_request(self) -> None: + async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> None: """Send the request for grid frequency.""" await self._request_sender.send(self._component_metric_request) _logger.debug("Sent request for grid frequency: %s", self._source_component) + + # Receive the telemetry stream and forward it via pipe + telem_receiver: Receiver[Sample[Quantity]] = ( + await self._telem_stream_receiver.receive() + ) + await Pipe(telem_receiver, forwarding_sender).start() From bf4819ea0f0a59c6f8d60c39980003d0e23d3cdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:46:17 +0100 Subject: [PATCH 04/16] Use oneshot channel setup in VoltageStreamer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_voltage_streamer.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 4759b362f..5709ae716 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -13,7 +13,7 @@ import logging from typing import TYPE_CHECKING -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage @@ -99,6 +99,8 @@ def __init__( self._channel_key = f"{self._namespace}-all-phases" """The channel key for the phase-to-neutral voltage streaming.""" + self._telem_channel: Broadcast[Sample3Phase[Voltage]] | None = None + @property def source(self) -> Component: """Get the component to fetch the phase-to-neutral voltage from. @@ -108,6 +110,14 @@ def source(self) -> Component: """ return self._source_component + @property + def _channel(self) -> Broadcast[Sample3Phase[Voltage]]: + if self._telem_channel is None: + self._telem_channel = Broadcast[Sample3Phase[Voltage]]( + name=self._channel_key + ) + return self._telem_channel + def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: """Create a receiver for the phase-to-neutral voltage. @@ -118,9 +128,7 @@ def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: A receiver that will receive the phase-to-neutral voltage as a 3-phase sample. """ - receiver = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_receiver() + receiver = self._channel.new_receiver() if not self._task: self._task = asyncio.create_task(self._send_request()) @@ -143,21 +151,21 @@ async def _send_request(self) -> None: ) phases_rx: list[Receiver[Sample[Quantity]]] = [] for metric in metrics: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) req = ComponentMetricRequest( - self._namespace, self._source_component.id, metric, None + namespace=self._namespace, + component_id=self._source_component.id, + metric=metric, + start_time=None, + telem_stream_sender=telem_stream_sender, ) await self._resampler_subscription_sender.send(req) + phases_rx.append(await telem_stream_receiver.receive()) - phases_rx.append( - self._channel_registry.get_or_create( - Sample[Quantity], req.get_channel_name() - ).new_receiver() - ) - - sender = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_sender() + sender = self._channel.new_sender() _logger.debug( "Sent request for fetching voltage from: %s", self._source_component From 116ef1ff95854d15f09caf1dd9529078018ee583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:50:44 +0100 Subject: [PATCH 05/16] Added oneshot channel to ReportRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../_power_managing/_base_classes.py | 6 +++- .../_power_managing/_power_managing_actor.py | 28 +++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py index cd575229a..40616de63 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py @@ -10,6 +10,7 @@ import enum import typing +from frequenz.channels import Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -31,7 +32,10 @@ class ReportRequest: """The component IDs to report on.""" priority: int - """The priority of the actor .""" + """The priority of the actor.""" + + report_stream_sender: Sender[Receiver[_Report]] + """Oneshot sender to transmit the report receiver.""" def get_channel_name(self) -> str: """Get the channel name for the report request. diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 0dead0c29..b06586917 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone from typing import assert_never -from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter @@ -72,6 +72,7 @@ def __init__( # pylint: disable=too-many-arguments self._power_distributing_results_receiver = power_distributing_results_receiver self._channel_registry = channel_registry self._proposals_receiver = proposals_receiver + self._channel_lookup: dict[str, Broadcast[_Report]] = {} self._system_bounds: dict[frozenset[ComponentId], SystemBounds] = {} self._bound_tracker_tasks: dict[frozenset[ComponentId], asyncio.Task[None]] = {} @@ -233,18 +234,27 @@ async def _run(self) -> None: component_ids = sub.component_ids priority = sub.priority + async def get_or_create_channel( + subscription: ReportRequest, + ) -> Broadcast[_Report]: + channel_name = subscription.get_channel_name() + if channel_name not in self._channel_lookup: + report_channel = Broadcast[_Report](name=channel_name) + report_channel.resend_latest = True + self._channel_lookup[channel_name] = report_channel + await subscription.report_stream_sender.send( + report_channel.new_receiver() + ) + return self._channel_lookup[channel_name] + if component_ids not in self._subscriptions: + channel = await get_or_create_channel(sub) self._subscriptions[component_ids] = { - priority: self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() + priority: channel.new_sender() } elif priority not in self._subscriptions[component_ids]: - self._subscriptions[component_ids][priority] = ( - self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() - ) + channel = await get_or_create_channel(sub) + self._subscriptions[component_ids][priority] = channel.new_sender() if sub.component_ids not in self._bound_tracker_tasks: self._add_system_bounds_tracker(sub.component_ids) From f29781dfbee89ea33d69d0e5660615af4c9b54e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:51:07 +0100 Subject: [PATCH 06/16] Using oneshot channel for report requests in all use cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../timeseries/battery_pool/_battery_pool.py | 44 +++++++++++++++--- .../ev_charger_pool/_ev_charger_pool.py | 45 +++++++++++++++---- .../sdk/timeseries/pv_pool/_pv_pool.py | 45 +++++++++++++++---- 3 files changed, 109 insertions(+), 25 deletions(-) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 3be021a85..65c6e4d02 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -12,12 +12,15 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature from ... import timeseries from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Sample from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -74,6 +77,7 @@ def __init__( unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -329,22 +333,48 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store._batteries, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store._channel_registry.get_or_create( - _power_managing._Report, sub.get_channel_name() + + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store._power_manager_bounds_subscription_sender.send( + request ) - channel.resend_latest = True - return channel + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 77327eb7d..d8bc9f5c7 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -8,11 +8,14 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -61,6 +64,7 @@ def __init__( # pylint: disable=too-many-arguments unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -171,23 +175,46 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 6bbb91e15..dac444d30 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -7,6 +7,8 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -14,6 +16,7 @@ from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -56,6 +59,7 @@ def __init__( # pylint: disable=too-many-arguments unique_id = uuid.uuid4() self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -143,23 +147,46 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: From bb1b42bdbeaa1cc87cc162f5281fabd97a065627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Thu, 5 Mar 2026 12:20:55 +0100 Subject: [PATCH 07/16] Stopped using ChannelRegistry in benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../timeseries/benchmark_datasourcing.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/benchmarks/timeseries/benchmark_datasourcing.py b/benchmarks/timeseries/benchmark_datasourcing.py index dd430aa42..005424d9c 100644 --- a/benchmarks/timeseries/benchmark_datasourcing.py +++ b/benchmarks/timeseries/benchmark_datasourcing.py @@ -16,15 +16,16 @@ from time import perf_counter from typing import Any -from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError +from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, make_oneshot from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk import microgrid -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, ) +from frequenz.sdk.timeseries import Sample try: from tests.timeseries.mock_microgrid import MockMicrogrid @@ -80,7 +81,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals name="DataSourcingActor Request Channel" ) - channel_registry = ChannelRegistry(name="Microgrid Channel Registry") request_receiver = request_channel.new_receiver( name="datasourcing-benchmark", limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)), @@ -105,18 +105,21 @@ async def consume(channel: Receiver[Any]) -> None: for evc_id in mock_grid.evc_ids: for component_metric_id in COMPONENT_METRIC_IDS: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) request = ComponentMetricRequest( - "current_phase_requests", evc_id, component_metric_id, None + namespace="current_phase_requests", + component_id=evc_id, + metric=component_metric_id, + start_time=None, + telem_stream_sender=telem_stream_sender, ) - - recv_channel = channel_registry.get_or_create( - ComponentMetricRequest, request.get_channel_name() - ).new_receiver() - await request_sender.send(request) - consume_tasks.append(asyncio.create_task(consume(recv_channel))) + stream_receiver = await telem_stream_receiver.receive() + consume_tasks.append(asyncio.create_task(consume(stream_receiver))) - async with DataSourcingActor(request_receiver, channel_registry): + async with DataSourcingActor(request_receiver): await asyncio.gather(*consume_tasks) time_taken = perf_counter() - start_time From 4b67686bc85a5aee01a1c3c932e969dfb04eed56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Thu, 5 Mar 2026 12:21:41 +0100 Subject: [PATCH 08/16] Remove ChannelRegistry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/_internal/_channels.py | 123 ------------------ src/frequenz/sdk/microgrid/_data_pipeline.py | 23 +--- .../microgrid/_data_sourcing/data_sourcing.py | 6 +- .../_data_sourcing/microgrid_api_source.py | 14 +- .../_power_managing/_power_managing_actor.py | 4 - src/frequenz/sdk/microgrid/_power_wrapper.py | 6 +- src/frequenz/sdk/microgrid/_resampling.py | 5 - .../sdk/timeseries/_grid_frequency.py | 4 - .../sdk/timeseries/_voltage_streamer.py | 7 - .../_battery_pool_reference_store.py | 7 +- src/frequenz/sdk/timeseries/consumer.py | 4 - .../_ev_charger_pool_reference_store.py | 7 +- .../sdk/timeseries/formulas/_formula_pool.py | 6 - .../formulas/_resampled_stream_fetcher.py | 5 - src/frequenz/sdk/timeseries/grid.py | 5 - .../logical_meter/_logical_meter.py | 6 - src/frequenz/sdk/timeseries/producer.py | 4 - .../pv_pool/_pv_pool_reference_store.py | 7 +- tests/actor/test_channel_registry.py | 69 ---------- tests/timeseries/_formulas/utils.py | 1 - 20 files changed, 8 insertions(+), 305 deletions(-) delete mode 100644 tests/actor/test_channel_registry.py diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 74b2f6929..4b11cbba7 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -6,7 +6,6 @@ import abc import dataclasses import logging -import traceback import typing from frequenz.channels import Broadcast, Receiver @@ -63,128 +62,6 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: return self._mapping_function(self._fetcher.new_receiver(limit=limit)) -class ChannelRegistry: - """Dynamically creates, own and provide access to broadcast channels. - - It can be used by actors to dynamically establish a communication channel - between each other. - - The registry is responsible for creating channels when they are first requested via - the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method. - - The registry also stores type information to make sure that the same channel is not - used for different message types. - - Since the registry owns the channels, it is also responsible for closing them when - they are no longer needed. There is no way to remove a channel without closing it. - - Note: - This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels. - """ - - def __init__(self, *, name: str) -> None: - """Initialize this registry. - - Args: - name: A name to identify the registry in the logs. This name is also used as - a prefix for the channel names. - """ - self._name = name - self._channels: dict[str, _Entry] = {} - - @property - def name(self) -> str: - """The name of this registry.""" - return self._name - - def message_type(self, key: str) -> type: - """Get the message type of the channel for the given key. - - Args: - key: The key to identify the channel. - - Returns: - The message type of the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.get(key) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - return entry.message_type - - def __contains__(self, key: str) -> bool: - """Check whether the channel for the given `key` exists.""" - return key in self._channels - - def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]: - """Get or create a channel for the given key. - - If a channel for the given key already exists, the message type of the existing - channel is checked against the requested message type. If they do not match, - a `ValueError` is raised. - - Note: - The types have to match exactly, it doesn't do a subtype check due to - technical limitations. In the future subtype checks might be supported. - - Args: - message_type: The type of the message that is sent through the channel. - key: The key to identify the channel. - - Returns: - The channel for the given key. - - Raises: - ValueError: If the channel exists and the message type does not match. - """ - if key not in self._channels: - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "Creating a new channel for key %r with type %s at:\n%s", - key, - message_type, - "".join(traceback.format_stack(limit=10)[:9]), - ) - self._channels[key] = _Entry( - message_type, Broadcast(name=f"{self._name}-{key}") - ) - - entry = self._channels[key] - if entry.message_type is not message_type: - error_message = ( - f"Type mismatch, a channel for key {key!r} exists and the requested " - f"message type {message_type} is not the same as the existing " - f"message type {entry.message_type}." - ) - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "%s at:\n%s", - error_message, - # We skip the last frame because it's this method, and limit the - # stack to 9 frames to avoid adding too much noise. - "".join(traceback.format_stack(limit=10)[:9]), - ) - raise ValueError(error_message) - - return typing.cast(Broadcast[T], entry.channel) - - async def close_and_remove(self, key: str) -> None: - """Remove the channel for the given key. - - Args: - key: The key to identify the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.pop(key, None) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - await entry.channel.close() - - @dataclasses.dataclass(frozen=True) class _Entry: """An entry in a channel registry.""" diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index aca47e4a9..4bf96f35d 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -20,7 +20,6 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import ResamplerConfig from ..timeseries._voltage_streamer import VoltageStreamer @@ -96,29 +95,22 @@ def __init__( """ self._resampler_config: ResamplerConfig = resampler_config - self._channel_registry: ChannelRegistry = ChannelRegistry( - name="Data Pipeline Registry" - ) - self._data_sourcing_actor: _ActorInfo | None = None self._resampling_actor: _ActorInfo | None = None self._battery_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=battery_power_manager_algorithm, default_power=DefaultPower.ZERO, component_class=Battery, ) self._ev_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MAX, component_class=EvCharger, ) self._pv_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MIN, @@ -158,7 +150,6 @@ def frequency(self) -> GridFrequency: if self._frequency_instance is None: self._frequency_instance = GridFrequency( self._data_sourcing_request_sender(), - self._channel_registry, ) return self._frequency_instance @@ -166,10 +157,7 @@ def frequency(self) -> GridFrequency: def voltage_per_phase(self) -> VoltageStreamer: """Return the per-phase voltage measuring point.""" if not self._voltage_instance: - self._voltage_instance = VoltageStreamer( - self._resampling_request_sender(), - self._channel_registry, - ) + self._voltage_instance = VoltageStreamer(self._resampling_request_sender()) return self._voltage_instance @@ -179,7 +167,6 @@ def logical_meter(self) -> LogicalMeter: if self._logical_meter is None: self._logical_meter = LogicalMeter( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._logical_meter @@ -190,7 +177,6 @@ def consumer(self) -> Consumer: if self._consumer is None: self._consumer = Consumer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._consumer @@ -201,7 +187,6 @@ def producer(self) -> Producer: if self._producer is None: self._producer = Producer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._producer @@ -213,7 +198,6 @@ def grid(self) -> Grid: if self._grid is None: initialize_grid( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) self._grid = get_grid() @@ -273,7 +257,6 @@ def new_ev_charger_pool( if ref_store_key not in self._ev_charger_pool_reference_stores: self._ev_charger_pool_reference_stores[ref_store_key] = ( EVChargerPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=self._ev_power_wrapper.status_channel.new_receiver( limit=1 @@ -346,7 +329,6 @@ def new_pv_pool( if ref_store_key not in self._pv_pool_reference_stores: self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=( self._pv_power_wrapper.status_channel.new_receiver(limit=1) @@ -422,7 +404,6 @@ def new_battery_pool( if ref_store_key not in self._battery_pool_reference_stores: self._battery_pool_reference_stores[ref_store_key] = ( BatteryPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), batteries_status_receiver=( self._battery_power_wrapper.status_channel.new_receiver(limit=1) @@ -461,7 +442,6 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: ) actor = DataSourcingActor( request_receiver=channel.new_receiver(limit=_REQUEST_RECV_BUFFER_SIZE), - registry=self._channel_registry, ) self._data_sourcing_actor = _ActorInfo(actor, channel) self._data_sourcing_actor.actor.start() @@ -482,7 +462,6 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]: name="Data Pipeline: Component Metric Resampling Actor Request Channel" ) actor = ComponentMetricsResamplingActor( - channel_registry=self._channel_registry, data_sourcing_request_sender=self._data_sourcing_request_sender(), resampling_request_receiver=channel.new_receiver( limit=_REQUEST_RECV_BUFFER_SIZE, diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py index 990e2b420..776768f29 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py @@ -5,7 +5,6 @@ from frequenz.channels import Receiver -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ._component_metric_request import ComponentMetricRequest from .microgrid_api_source import MicrogridApiSource @@ -17,7 +16,6 @@ class DataSourcingActor(Actor): def __init__( self, request_receiver: Receiver[ComponentMetricRequest], - registry: ChannelRegistry, *, name: str | None = None, ) -> None: @@ -25,14 +23,12 @@ def __init__( Args: request_receiver: A channel receiver to accept metric requests from. - registry: A channel registry. To be replaced by a singleton - instance. name: The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. """ super().__init__(name=name) self._request_receiver = request_receiver - self._microgrid_api_source = MicrogridApiSource(registry) + self._microgrid_api_source = MicrogridApiSource() async def _run(self) -> None: """Run the actor.""" diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 03faad41e..03f89474f 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -15,7 +15,6 @@ from frequenz.quantities import Quantity from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...microgrid import connection_manager from ...timeseries import Sample from .._old_component_data import ( @@ -159,16 +158,8 @@ class MicrogridApiSource: Used by the DataSourcingActor. """ - def __init__( - self, - registry: ChannelRegistry, - ) -> None: - """Create a `MicrogridApiSource` instance. - - Args: - registry: A channel registry. To be replaced by a singleton - instance. - """ + def __init__(self) -> None: + """Create a `MicrogridApiSource` instance.""" self._comp_categories_cache: dict[ComponentId, ComponentCategory | int] = {} self.comp_data_receivers: dict[ComponentId, Receiver[Any]] = {} @@ -177,7 +168,6 @@ def __init__( self.comp_data_tasks: dict[ComponentId, asyncio.Task[None]] = {} """The dictionary of component IDs to asyncio tasks.""" - self._registry = registry self._req_streaming_metrics: dict[ ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index b06586917..b58737242 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -18,7 +18,6 @@ from typing_extensions import override from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ...timeseries._base_types import SystemBounds from .. import _data_pipeline, _power_distributing @@ -46,7 +45,6 @@ def __init__( # pylint: disable=too-many-arguments bounds_subscription_receiver: Receiver[ReportRequest], power_distributing_requests_sender: Sender[_power_distributing.Request], power_distributing_results_receiver: Receiver[_power_distributing.Result], - channel_registry: ChannelRegistry, algorithm: PowerManagerAlgorithm, default_power: DefaultPower, component_class: type[Battery | EvCharger | SolarInverter], @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments requests. power_distributing_results_receiver: The receiver for power distribution results. - channel_registry: The channel registry. algorithm: The power management algorithm to use. default_power: The default power to use for the components. component_class: The class of component this instance is going to support. @@ -70,7 +67,6 @@ def __init__( # pylint: disable=too-many-arguments self._bounds_subscription_receiver = bounds_subscription_receiver self._power_distributing_requests_sender = power_distributing_requests_sender self._power_distributing_results_receiver = power_distributing_results_receiver - self._channel_registry = channel_registry self._proposals_receiver = proposals_receiver self._channel_lookup: dict[str, Broadcast[_Report]] = {} diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 70e2e4e55..e82382335 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -11,7 +11,7 @@ from frequenz.channels import Broadcast from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry, ReceiverFetcher +from .._internal._channels import ReceiverFetcher # pylint seems to think this is a cyclic import, but it is not. # @@ -35,7 +35,6 @@ class PowerWrapper: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, - channel_registry: ChannelRegistry, *, api_power_request_timeout: timedelta, power_manager_algorithm: PowerManagerAlgorithm, @@ -45,7 +44,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize the power control. Args: - channel_registry: A channel registry for use in the actors. api_power_request_timeout: Timeout to use when making power requests to the microgrid API. power_manager_algorithm: The power management algorithm to use. @@ -55,7 +53,6 @@ def __init__( # pylint: disable=too-many-arguments self._default_power = default_power self._power_manager_algorithm = power_manager_algorithm self._component_class = component_class - self._channel_registry = channel_registry self._api_power_request_timeout = api_power_request_timeout self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast( @@ -109,7 +106,6 @@ def _start_power_managing_actor(self) -> None: power_distributing_results_receiver=( self._power_distribution_results_channel.new_receiver() ), - channel_registry=self._channel_registry, ) self._power_managing_actor.start() diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index 0e330fe9f..8457f1883 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -11,7 +11,6 @@ from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import Sample from ..timeseries._resampling._config import ResamplerConfig @@ -28,7 +27,6 @@ class ComponentMetricsResamplingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], config: ResamplerConfig, @@ -37,8 +35,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize an instance. Args: - channel_registry: The channel registry used to get senders and - receivers for data sourcing subscriptions. data_sourcing_request_sender: The sender used to send requests to the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor] to subscribe to component metrics. @@ -49,7 +45,6 @@ def __init__( # pylint: disable=too-many-arguments is used mostly for debugging purposes. """ super().__init__(name=name) - self._channel_registry: ChannelRegistry = channel_registry self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 07ebda951..5a35ee494 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -14,7 +14,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Frequency, Quantity -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest @@ -29,14 +28,12 @@ class GridFrequency: def __init__( self, data_sourcing_request_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source: Component | None = None, ): """Initialize the grid frequency formula generator. Args: data_sourcing_request_sender: The sender to use for requests. - channel_registry: The channel registry to use for the grid frequency. source: The source component to use to receive the grid frequency. """ if not source: @@ -49,7 +46,6 @@ def __init__( self._request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) - self._channel_registry: ChannelRegistry = channel_registry self._source_component: Component = source # Microgrid API source will send the stream through a oneshot channel diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 5709ae716..d784dce25 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -18,7 +18,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..timeseries._base_types import Sample, Sample3Phase @@ -55,7 +54,6 @@ class VoltageStreamer: def __init__( self, resampler_subscription_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source_component: Component | None = None, ): """Initialize the phase-to-neutral voltage streaming. @@ -63,8 +61,6 @@ def __init__( Args: resampler_subscription_sender: The sender for sending metric requests to the resampling actor. - channel_registry: The channel registry for the phase-to-neutral - voltage streaming. source_component: The source component to receive the phase-to-neutral voltage. If None, it fetches the source component from the connection manager. @@ -73,9 +69,6 @@ def __init__( self._resampler_subscription_sender = resampler_subscription_sender """The sender for sending metric requests to the resampling actor.""" - self._channel_registry = channel_registry - """The channel registry for the phase-to-neutral voltage streaming.""" - from ..microgrid import ( # pylint: disable=import-outside-toplevel connection_manager, ) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py index 68a03f597..34d549174 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py @@ -15,7 +15,7 @@ from frequenz.client.microgrid.component import Battery from ..._internal._asyncio import cancel_and_await -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import Result @@ -41,7 +41,6 @@ class BatteryPoolReferenceStore: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], batteries_status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -53,8 +52,6 @@ def __init__( # pylint: disable=too-many-arguments """Create the class instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. batteries_status_receiver: Receiver to receive status of the batteries. @@ -125,13 +122,11 @@ def __init__( # pylint: disable=too-many-arguments self._power_bounds_subs: dict[str, asyncio.Task[None]] = {} self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}" self._power_distributing_namespace: str = f"power-distributor-{self._namespace}" - self._channel_registry: ChannelRegistry = channel_registry self._power_dist_results_fetcher: ReceiverFetcher[Result] = ( power_distribution_results_fetcher ) self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/consumer.py b/src/frequenz/sdk/timeseries/consumer.py index 0376c0444..50d40c1a6 100644 --- a/src/frequenz/sdk/timeseries/consumer.py +++ b/src/frequenz/sdk/timeseries/consumer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Consumer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the consumer formula generator. Args: - channel_registry: The channel registry to use for the consumer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"consumer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py index 7e4d4c27b..8b7e00971 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -11,7 +11,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import EvCharger -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -37,7 +37,6 @@ class EVChargerPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -48,8 +47,6 @@ def __init__( # pylint: disable=too-many-arguments """Create an instance of the class. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the EV Chargers in @@ -68,7 +65,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the specified component_ids are not EV chargers or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -96,7 +92,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py index d0887c7e2..107ab449a 100644 --- a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py +++ b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py @@ -15,7 +15,6 @@ ResampledStreamFetcher, ) -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ._formula import Formula from ._formula_3_phase import Formula3Phase @@ -44,20 +43,16 @@ class FormulaPool: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a new instance. Args: namespace: namespace to use with the data pipeline. - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -276,7 +271,6 @@ def _telemetry_fetcher(self, metric: Metric) -> ResampledStreamFetcher: """ return ResampledStreamFetcher( self._namespace, - self._channel_registry, self._resampler_subscription_sender, metric, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index 0f2165d4c..f72860fe2 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -9,7 +9,6 @@ from frequenz.sdk.timeseries import Sample -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest, Metric from ...microgrid._old_component_data import TransitionalMetric @@ -20,7 +19,6 @@ class ResampledStreamFetcher: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], metric: Metric | TransitionalMetric, ): @@ -29,14 +27,11 @@ def __init__( Args: namespace: The unique namespace to allow reuse of streams in the data pipeline. - channel_registry: The channel registry instance shared with the resampling - and the data sourcing actors. resampler_subscription_sender: A sender to send metric requests to the resampling actor. metric: The metric to fetch for all components in this formula. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) diff --git a/src/frequenz/sdk/timeseries/grid.py b/src/frequenz/sdk/timeseries/grid.py index b79a690f9..f9c788b10 100644 --- a/src/frequenz/sdk/timeseries/grid.py +++ b/src/frequenz/sdk/timeseries/grid.py @@ -16,7 +16,6 @@ from frequenz.client.microgrid.component import GridConnectionPoint from frequenz.quantities import Current, Power, ReactivePower -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from ._fuse import Fuse @@ -155,14 +154,11 @@ async def stop(self) -> None: def initialize( - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the grid connection. Args: - channel_registry: The channel registry instance shared with the - resampling actor. resampler_subscription_sender: The sender for sending metric requests to the resampling actor. @@ -204,7 +200,6 @@ def initialize( namespace = f"grid-{uuid.uuid4()}" formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index 00df8f776..220581125 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -12,7 +12,6 @@ from frequenz.sdk.microgrid import connection_manager -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ..formulas._formula import Formula from ..formulas._formula_pool import FormulaPool @@ -57,7 +56,6 @@ class LogicalMeter: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a `LogicalMeter` instance. @@ -68,12 +66,9 @@ def __init__( for creating `LogicalMeter` instances. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -83,7 +78,6 @@ def __init__( self._namespace: str = f"logical-meter-{uuid.uuid4()}" self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, self._resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/producer.py b/src/frequenz/sdk/timeseries/producer.py index 0c82ae16c..0aa89bd2f 100644 --- a/src/frequenz/sdk/timeseries/producer.py +++ b/src/frequenz/sdk/timeseries/producer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Producer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the producer formula generator. Args: - channel_registry: The channel registry to use for the producer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"producer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py index e893f9a36..d8b3ed641 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -12,7 +12,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import SolarInverter -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -38,7 +38,6 @@ class PVPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -49,8 +48,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize this instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the PV inverters in @@ -69,7 +66,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the provided component_ids are not PV inverters or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -98,7 +94,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"pv-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) self.bounds_channel: Broadcast[SystemBounds] = Broadcast( diff --git a/tests/actor/test_channel_registry.py b/tests/actor/test_channel_registry.py deleted file mode 100644 index 2966a9ad3..000000000 --- a/tests/actor/test_channel_registry.py +++ /dev/null @@ -1,69 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for the ChannelRegistry.""" - -import pytest -from frequenz.channels import ReceiverError, SenderError - -from frequenz.sdk._internal._channels import ChannelRegistry - - -async def test_channel_registry() -> None: - """Tests for ChannelRegistry, with string as key type.""" - reg = ChannelRegistry(name="test-registry") - - assert "20-hello" not in reg - assert "21-hello" not in reg - - chan20 = reg.get_or_create(int, "20-hello") - assert "20-hello" in reg - assert reg.message_type("20-hello") == int - - with pytest.raises(ValueError): - reg.get_or_create(str, "20-hello") - - sender20 = chan20.new_sender() - receiver20 = chan20.new_receiver() - - assert "21-hello" not in reg - - chan21 = reg.get_or_create(int, "21-hello") - assert "21-hello" in reg - assert reg.message_type("21-hello") == int - - sender21 = chan21.new_sender() - receiver21 = chan21.new_receiver() - - await sender20.send(30) - await sender21.send(31) - - rcvd = await receiver21.receive() - assert rcvd == 31 - - rcvd = await receiver20.receive() - assert rcvd == 30 - - await reg.close_and_remove("20-hello") - assert "20-hello" not in reg - assert chan20._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender20.send(30) - with pytest.raises(ReceiverError): - await receiver20.receive() - with pytest.raises(KeyError): - reg.message_type("20-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("20-hello") - - await reg.close_and_remove("21-hello") - assert "21-hello" not in reg - assert chan21._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender21.send(30) - with pytest.raises(ReceiverError): - await receiver21.receive() - with pytest.raises(KeyError): - reg.message_type("21-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("21-hello") diff --git a/tests/timeseries/_formulas/utils.py b/tests/timeseries/_formulas/utils.py index 471d7e9c7..ee0ae1b22 100644 --- a/tests/timeseries/_formulas/utils.py +++ b/tests/timeseries/_formulas/utils.py @@ -31,7 +31,6 @@ async def get_resampled_stream( # pylint: disable=protected-access builder = ResampledStreamFetcher( namespace=namespace, - channel_registry=_data_pipeline._get()._channel_registry, resampler_subscription_sender=_data_pipeline._get()._resampling_request_sender(), metric=metric, ) From 1224e5a3efacba6902d65176080e97034ee2e35a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Fri, 6 Mar 2026 15:38:38 +0100 Subject: [PATCH 09/16] Update release notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d36ca7298..8893779fb 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,3 +19,5 @@ - This fixes a rare power distributor bug where some battery inverters becoming unreachable because of network outages would lead to excess power values getting set. This is fixed by measuring the power of the unreachable inverters through their fallback meters and excluding that power from what is distributed to the other inverters. - Fixed stopping formulas: It will now also stop the evaluator and sub-formulas correctly. + +- Removed ChannelRegistry. Streams are not being set up using one-shot channels and owned by the data source. From cd7229d4136c42362a128f9388ff3090b34daec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 14:39:51 +0100 Subject: [PATCH 10/16] Keep a reference to Pipe objects to prevent it from being garbage collected MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/timeseries/_grid_frequency.py | 5 ++++- src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py | 6 ++++-- .../sdk/timeseries/ev_charger_pool/_ev_charger_pool.py | 6 ++++-- src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py | 6 ++++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 5a35ee494..58b072a10 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -69,6 +69,8 @@ def __init__( # Sadly needed for testing self._task: None | asyncio.Task[None] = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[Sample[Quantity]] | None = None @property def source(self) -> Component: @@ -110,4 +112,5 @@ async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> No telem_receiver: Receiver[Sample[Quantity]] = ( await self._telem_stream_receiver.receive() ) - await Pipe(telem_receiver, forwarding_sender).start() + self._pipe = Pipe(telem_receiver, forwarding_sender) + await self._pipe.start() diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 65c6e4d02..f2da953a4 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -78,6 +78,8 @@ def __init__( self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -373,8 +375,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index d8bc9f5c7..12bd79814 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -65,6 +65,8 @@ def __init__( # pylint: disable=too-many-arguments self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -213,8 +215,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index dac444d30..ef3598041 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -60,6 +60,8 @@ def __init__( # pylint: disable=too-many-arguments self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -185,8 +187,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: From 5f26edb852ae40eafd32d8ff7400f96312c95381 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 14:38:36 +0100 Subject: [PATCH 11/16] Deprecate GridFrequency.new_receiver in favor of .subscribe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_grid_frequency.py | 42 ++++++-- tests/timeseries/test_frequency_streaming.py | 98 ++++++++++++------- 2 files changed, 95 insertions(+), 45 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 58b072a10..7e504a227 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -81,9 +81,27 @@ def source(self) -> Component: """ return self._source_component + @staticmethod + def _map_frequency_samples( + receiver: Receiver[Sample[Quantity]], + ) -> Receiver[Sample[Frequency]]: + """Handle NaN values and map sample type to Frequency.""" + return receiver.map( + lambda sample: ( + Sample[Frequency](sample.timestamp, None) + if sample.value is None or sample.value.isnan() + else Sample( + sample.timestamp, Frequency.from_hertz(sample.value.base_value) + ) + ) + ) + def new_receiver(self) -> Receiver[Sample[Frequency]]: """Create a receiver for grid frequency. + Deprecated: + Use subscribe() instead. + Returns: A receiver that will receive grid frequency samples. """ @@ -93,15 +111,23 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: self._send_request(self._forwarding_channel.new_sender()) ) - return self._forwarding_channel.new_receiver().map( - lambda sample: ( - Sample[Frequency](sample.timestamp, None) - if sample.value is None or sample.value.isnan() - else Sample( - sample.timestamp, Frequency.from_hertz(sample.value.base_value) - ) - ) + return self._map_frequency_samples(self._forwarding_channel.new_receiver()) + + async def subscribe(self) -> Receiver[Sample[Frequency]]: + """Create a receiver for grid frequency.""" + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, ) + await self._request_sender.send(component_metric_request) + receiver = await telem_stream_receiver.receive() + return self._map_frequency_samples(receiver) async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> None: """Send the request for grid frequency.""" diff --git a/tests/timeseries/test_frequency_streaming.py b/tests/timeseries/test_frequency_streaming.py index 25077c3b9..ef47f1ce8 100644 --- a/tests/timeseries/test_frequency_streaming.py +++ b/tests/timeseries/test_frequency_streaming.py @@ -7,6 +7,7 @@ import asyncio from datetime import datetime, timezone +import pytest from frequenz.quantities import Frequency from pytest_mock import MockerFixture @@ -19,7 +20,8 @@ # pylint: disable=protected-access -async def test_grid_frequency_none(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_none(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True) mockgrid.add_batteries(2) @@ -27,13 +29,16 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.start(mocker) grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) await mockgrid.mock_client.send( component_data_wrapper.MeterDataWrapper( @@ -47,7 +52,8 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.cleanup() -async def test_grid_frequency_1(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_1(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True, mocker=mocker) mockgrid.add_batteries(2) @@ -55,13 +61,16 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] grid_meter_data = [] @@ -82,8 +91,9 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: assert equal_float_lists(results, grid_meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter_no_consumer_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -94,12 +104,16 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -120,8 +134,9 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -131,12 +146,16 @@ async def test_grid_frequency_no_grid_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -157,8 +176,9 @@ async def test_grid_frequency_no_grid_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_only_inverter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without any meter but only inverters.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -166,12 +186,16 @@ async def test_grid_frequency_only_inverter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] From 264129564943f182186d6c9043f709039c0139e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Tue, 10 Mar 2026 12:00:11 +0100 Subject: [PATCH 12/16] Update to channels changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../timeseries/benchmark_datasourcing.py | 8 ++++---- .../_data_sourcing/microgrid_api_source.py | 5 +++-- src/frequenz/sdk/microgrid/_resampling.py | 8 ++++---- .../sdk/timeseries/_grid_frequency.py | 14 ++++++------- .../sdk/timeseries/_voltage_streamer.py | 8 ++++---- .../timeseries/battery_pool/_battery_pool.py | 8 ++++---- .../ev_charger_pool/_ev_charger_pool.py | 8 ++++---- .../formulas/_resampled_stream_fetcher.py | 8 ++++---- .../sdk/timeseries/pv_pool/_pv_pool.py | 8 ++++---- tests/actor/test_resampling.py | 20 +++++++++---------- tests/microgrid/test_data_sourcing.py | 9 ++++----- 11 files changed, 52 insertions(+), 52 deletions(-) diff --git a/benchmarks/timeseries/benchmark_datasourcing.py b/benchmarks/timeseries/benchmark_datasourcing.py index 005424d9c..b3f9b5a22 100644 --- a/benchmarks/timeseries/benchmark_datasourcing.py +++ b/benchmarks/timeseries/benchmark_datasourcing.py @@ -16,7 +16,7 @@ from time import perf_counter from typing import Any -from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity @@ -105,9 +105,9 @@ async def consume(channel: Receiver[Any]) -> None: for evc_id in mock_grid.evc_ids: for component_metric_id in COMPONENT_METRIC_IDS: - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() request = ComponentMetricRequest( namespace="current_phase_requests", component_id=evc_id, diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 03f89474f..53eb94d4b 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -8,7 +8,8 @@ from collections.abc import Callable from typing import Any -from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.channels import Broadcast, Receiver +from frequenz.channels._broadcast import BroadcastSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ComponentCategory from frequenz.client.microgrid.metrics import Metric @@ -394,7 +395,7 @@ async def _get_metric_senders( self, category: ComponentCategory | int, requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], - ) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]: + ) -> list[tuple[Callable[[Any], float], list[BroadcastSender[Sample[Quantity]]]]]: """Get channel senders from the channel registry for each requested metric. Args: diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index 8457f1883..32851871c 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -7,7 +7,7 @@ import asyncio import logging -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await @@ -70,9 +70,9 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: # Derive a new ComponentMetricRequest from the given one to # receive the telemetry stream from the data sourcing actor. - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() own_request = ComponentMetricRequest( namespace=request.namespace + ":Source", component_id=request.component_id, diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 7e504a227..d30924474 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -8,7 +8,7 @@ import asyncio import logging -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.channels.experimental import Pipe from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric @@ -49,9 +49,9 @@ def __init__( self._source_component: Component = source # Microgrid API source will send the stream through a oneshot channel - telem_stream_sender, self._telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, self._telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() self._component_metric_request = ComponentMetricRequest( "grid-frequency", @@ -115,9 +115,9 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: async def subscribe(self) -> Receiver[Sample[Frequency]]: """Create a receiver for grid frequency.""" - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() component_metric_request = ComponentMetricRequest( "grid-frequency", self._source_component.id, diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index d784dce25..440b07186 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -13,7 +13,7 @@ import logging from typing import TYPE_CHECKING -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage @@ -144,9 +144,9 @@ async def _send_request(self) -> None: ) phases_rx: list[Receiver[Sample[Quantity]]] = [] for metric in metrics: - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() req = ComponentMetricRequest( namespace=self._namespace, component_id=self._source_component.id, diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index f2da953a4..53d8846cc 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -12,7 +12,7 @@ import uuid from collections import abc -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature @@ -335,9 +335,9 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - report_stream_sender, self._report_stream_receiver = make_oneshot( - Receiver[_Report] # type: ignore[type-abstract] - ) + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + Receiver[_Report] + ]() request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 12bd79814..4350c40fd 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -8,7 +8,7 @@ import uuid from collections import abc -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power @@ -177,9 +177,9 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - report_stream_sender, self._report_stream_receiver = make_oneshot( - Receiver[_Report] # type: ignore[type-abstract] - ) + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + Receiver[_Report] + ]() request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index f72860fe2..dd8875b00 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -3,7 +3,7 @@ """Fetches telemetry streams for components.""" -from frequenz.channels import Receiver, Sender, make_oneshot +from frequenz.channels import OneshotChannel, Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity @@ -49,9 +49,9 @@ async def fetch_stream( Returns: A receiver to stream resampled data for the given component id. """ - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() request = ComponentMetricRequest( self._namespace, diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index ef3598041..cba298e49 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -7,7 +7,7 @@ import uuid from collections import abc -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -149,9 +149,9 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - report_stream_sender, self._report_stream_receiver = make_oneshot( - Receiver[_Report] # type: ignore[type-abstract] - ) + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + Receiver[_Report] + ]() request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index fc7a6b6f5..3a6fc4203 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -8,7 +8,7 @@ import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity @@ -105,9 +105,9 @@ async def test_single_request( max_data_age_in_periods=2, ), ) as resampling_actor: - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), @@ -156,9 +156,9 @@ async def test_duplicate_request( max_data_age_in_periods=2, ), ) as resampling_actor: - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), @@ -210,9 +210,9 @@ async def test_resubscribe(fake_time: time_machine.Coordinates) -> None: ) as resampling_actor: async def send_metric_request() -> Receiver[Receiver[Sample[Quantity]]]: - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index 6ec3f8e0c..e66bd2496 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -11,7 +11,7 @@ import pytest import pytest_mock -from frequenz.channels import Broadcast, Receiver, make_oneshot +from frequenz.channels import Broadcast, OneshotChannel, Receiver from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ( @@ -105,10 +105,9 @@ async def test_data_sourcing_actor( # pylint: disable=too-many-locals req_sender = req_chan.new_sender() async with DataSourcingActor(req_chan.new_receiver()): - telem_stream_sender, telem_stream_receiver = make_oneshot( - Receiver[Sample[Quantity]] # type: ignore[type-abstract] - ) - + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + Receiver[Sample[Quantity]] + ]() component_metric_request = ComponentMetricRequest( "test-namespace", component_id, From 1524583d0b62372a4b84c6e8a2270c16897db2de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Tue, 10 Mar 2026 12:08:37 +0100 Subject: [PATCH 13/16] Address PR comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/_internal/_channels.py | 19 +------------------ .../_data_sourcing/microgrid_api_source.py | 10 +++++----- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 4b11cbba7..ad5e02a13 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -4,11 +4,10 @@ """General purpose classes for use with channels.""" import abc -import dataclasses import logging import typing -from frequenz.channels import Broadcast, Receiver +from frequenz.channels import Receiver _logger = logging.getLogger(__name__) @@ -60,19 +59,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: A receiver instance. """ return self._mapping_function(self._fetcher.new_receiver(limit=limit)) - - -@dataclasses.dataclass(frozen=True) -class _Entry: - """An entry in a channel registry.""" - - message_type: type - """The type of the message that is sent through the channel in this entry.""" - - # We use object instead of Any to minimize the chances of hindering type checking. - # If for some reason the channel is not casted to the proper underlaying type, when - # using object at least accessing any member that's not part of the object base - # class will yield a type error, while if we used Any, it would not and the issue - # would be much harder to find. - channel: Broadcast[object] - """The channel in this entry.""" diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 53eb94d4b..8528435b0 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -173,8 +173,8 @@ def __init__(self) -> None: ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} - self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} - """ Channel cache for reuse (map channel name to channel).""" + self._channels: dict[str, Broadcast[Sample[Quantity]]] = {} + """Metric data channels by channel name, to enable reuse.""" async def _get_component_category( self, comp_id: ComponentId @@ -414,13 +414,13 @@ async def _get_metric_senders( for request in req_list: channel_name = request.get_channel_name() # Create missing channels and inform the requesting side via oneshot - if channel_name not in self._channel_lookup: + if channel_name not in self._channels: telem_stream: Broadcast[Sample[Quantity]] = Broadcast( name=channel_name ) - self._channel_lookup[channel_name] = telem_stream + self._channels[channel_name] = telem_stream await request.telem_stream_sender.send(telem_stream.new_receiver()) - senders.append(self._channel_lookup[channel_name].new_sender()) + senders.append(self._channels[channel_name].new_sender()) all_senders.append((extraction_method, senders)) return all_senders From 314473ba749cc021b5660677bad69d76a544244c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Tue, 10 Mar 2026 13:43:48 +0100 Subject: [PATCH 14/16] Split resampling actor subscribe-logic for better readability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/microgrid/_resampling.py | 57 +++++++++++++---------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index 32851871c..fd0716f4a 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -8,6 +8,7 @@ import logging from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await @@ -52,7 +53,29 @@ def __init__( # pylint: disable=too-many-arguments resampling_request_receiver ) self._resampler: Resampler = Resampler(config) - self._upstream_channels: dict[str, Broadcast[Sample[Quantity]]] = {} + self._data_sink_channels: dict[str, Broadcast[Sample[Quantity]]] = {} + + async def _subscribe_to_data_source( + self, request: ComponentMetricRequest + ) -> BroadcastReceiver[Sample[Quantity]]: + """Subscribe to the data source using a new request. + + Args: + request: The original request from the resampler. + + Returns: + The metric data receiver. + """ + sender, receiver = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]() + data_source_request = ComponentMetricRequest( + namespace=request.namespace + ":Source", + component_id=request.component_id, + metric=request.metric, + start_time=request.start_time, + telem_stream_sender=sender, + ) + await self._data_sourcing_request_sender.send(data_source_request) + return await receiver.receive() async def _subscribe(self, request: ComponentMetricRequest) -> None: """Request data for a component metric. @@ -64,35 +87,21 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: # If we are already handling this request, answer the request by sending a # new receiver from the existing channel. - if upstream_channel := self._upstream_channels.get(request_channel_name): - await request.telem_stream_sender.send(upstream_channel.new_receiver()) + if data_sink_channel := self._data_sink_channels.get(request_channel_name): + await request.telem_stream_sender.send(data_sink_channel.new_receiver()) return - # Derive a new ComponentMetricRequest from the given one to - # receive the telemetry stream from the data sourcing actor. - telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] - ]() - own_request = ComponentMetricRequest( - namespace=request.namespace + ":Source", - component_id=request.component_id, - metric=request.metric, - start_time=request.start_time, - telem_stream_sender=telem_stream_sender, - ) - await self._data_sourcing_request_sender.send(own_request) - telem_stream = await telem_stream_receiver.receive() + # Set up data source and sink channels + data_source = await self._subscribe_to_data_source(request) - # Create a new channel based on the original ComponentMetricRequest - # to act as our data sink. - upstream_channel = Broadcast(name=request_channel_name, resend_latest=True) - await request.telem_stream_sender.send(upstream_channel.new_receiver()) - self._upstream_channels[request_channel_name] = upstream_channel + data_sink_channel = Broadcast(name=request_channel_name, resend_latest=True) + await request.telem_stream_sender.send(data_sink_channel.new_receiver()) + self._data_sink_channels[request_channel_name] = data_sink_channel self._resampler.add_timeseries( name=request_channel_name, - source=telem_stream, - sink=upstream_channel.new_sender().send, + source=data_source, + sink=data_sink_channel.new_sender().send, ) async def _process_resampling_requests(self) -> None: From f484278e123099a3b78b90f794b273eebbbf31b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Tue, 10 Mar 2026 13:44:41 +0100 Subject: [PATCH 15/16] Use more precise channel sender and receiver types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- benchmarks/timeseries/benchmark_datasourcing.py | 3 ++- .../microgrid/_data_sourcing/_component_metric_request.py | 5 +++-- .../sdk/microgrid/_power_managing/_base_classes.py | 5 +++-- src/frequenz/sdk/timeseries/_grid_frequency.py | 5 +++-- src/frequenz/sdk/timeseries/_voltage_streamer.py | 3 ++- src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py | 3 ++- .../sdk/timeseries/ev_charger_pool/_ev_charger_pool.py | 3 ++- .../sdk/timeseries/formulas/_resampled_stream_fetcher.py | 3 ++- src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py | 3 ++- tests/actor/test_resampling.py | 7 ++++--- tests/microgrid/test_data_sourcing.py | 5 +++-- 11 files changed, 28 insertions(+), 17 deletions(-) diff --git a/benchmarks/timeseries/benchmark_datasourcing.py b/benchmarks/timeseries/benchmark_datasourcing.py index b3f9b5a22..957ba0408 100644 --- a/benchmarks/timeseries/benchmark_datasourcing.py +++ b/benchmarks/timeseries/benchmark_datasourcing.py @@ -17,6 +17,7 @@ from typing import Any from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity @@ -106,7 +107,7 @@ async def consume(channel: Receiver[Any]) -> None: for evc_id in mock_grid.evc_ids: for component_metric_id in COMPONENT_METRIC_IDS: telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() request = ComponentMetricRequest( namespace="current_phase_requests", diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py index cb2b18313..5087c04e5 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py @@ -6,7 +6,8 @@ from dataclasses import dataclass from datetime import datetime -from frequenz.channels import Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels._oneshot import OneshotSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity @@ -55,7 +56,7 @@ class ComponentMetricRequest: If None, only live data is streamed. """ - telem_stream_sender: Sender[Receiver[Sample[Quantity]]] + telem_stream_sender: OneshotSender[BroadcastReceiver[Sample[Quantity]]] """Sender of a oneshot channel used to send the data stream back to the requester.""" def get_channel_name(self) -> str: diff --git a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py index 40616de63..f84b0b9ce 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py @@ -10,7 +10,8 @@ import enum import typing -from frequenz.channels import Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels._oneshot import OneshotSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -34,7 +35,7 @@ class ReportRequest: priority: int """The priority of the actor.""" - report_stream_sender: Sender[Receiver[_Report]] + report_stream_sender: OneshotSender[BroadcastReceiver[_Report]] """Oneshot sender to transmit the report receiver.""" def get_channel_name(self) -> str: diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index d30924474..585f68d4a 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -9,6 +9,7 @@ import logging from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.channels.experimental import Pipe from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric @@ -50,7 +51,7 @@ def __init__( # Microgrid API source will send the stream through a oneshot channel telem_stream_sender, self._telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() self._component_metric_request = ComponentMetricRequest( @@ -116,7 +117,7 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: async def subscribe(self) -> Receiver[Sample[Frequency]]: """Create a receiver for grid frequency.""" telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() component_metric_request = ComponentMetricRequest( "grid-frequency", diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 440b07186..075e9cc4b 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -14,6 +14,7 @@ from typing import TYPE_CHECKING from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage @@ -145,7 +146,7 @@ async def _send_request(self) -> None: phases_rx: list[Receiver[Sample[Quantity]]] = [] for metric in metrics: telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() req = ComponentMetricRequest( namespace=self._namespace, diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 53d8846cc..bf3c6f36c 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -13,6 +13,7 @@ from collections import abc from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature @@ -336,7 +337,7 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: A receiver that will stream power status reports for the pool's priority. """ report_stream_sender, self._report_stream_receiver = OneshotChannel[ - Receiver[_Report] + BroadcastReceiver[_Report] ]() request = _power_managing.ReportRequest( source_id=self._source_id, diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 4350c40fd..6653d1845 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -9,6 +9,7 @@ from collections import abc from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power @@ -178,7 +179,7 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: A receiver that will stream power status reports for the pool's priority. """ report_stream_sender, self._report_stream_receiver = OneshotChannel[ - Receiver[_Report] + BroadcastReceiver[_Report] ]() request = _power_managing.ReportRequest( source_id=self._source_id, diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index dd8875b00..834f62562 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -4,6 +4,7 @@ """Fetches telemetry streams for components.""" from frequenz.channels import OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity @@ -50,7 +51,7 @@ async def fetch_stream( A receiver to stream resampled data for the given component id. """ telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() request = ComponentMetricRequest( diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index cba298e49..8d7b54ec7 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -8,6 +8,7 @@ from collections import abc from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -150,7 +151,7 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: A receiver that will stream power status reports for the pool's priority. """ report_stream_sender, self._report_stream_receiver = OneshotChannel[ - Receiver[_Report] + BroadcastReceiver[_Report] ]() request = _power_managing.ReportRequest( source_id=self._source_id, diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 3a6fc4203..41d94a1e6 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -9,6 +9,7 @@ import pytest import time_machine from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity @@ -106,7 +107,7 @@ async def test_single_request( ), ) as resampling_actor: telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() subs_req = ComponentMetricRequest( namespace="Resampling", @@ -157,7 +158,7 @@ async def test_duplicate_request( ), ) as resampling_actor: telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() subs_req = ComponentMetricRequest( namespace="Resampling", @@ -211,7 +212,7 @@ async def test_resubscribe(fake_time: time_machine.Coordinates) -> None: async def send_metric_request() -> Receiver[Receiver[Sample[Quantity]]]: telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() subs_req = ComponentMetricRequest( namespace="Resampling", diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index e66bd2496..c0627259d 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -11,7 +11,8 @@ import pytest import pytest_mock -from frequenz.channels import Broadcast, OneshotChannel, Receiver +from frequenz.channels import Broadcast, OneshotChannel +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ( @@ -106,7 +107,7 @@ async def test_data_sourcing_actor( # pylint: disable=too-many-locals async with DataSourcingActor(req_chan.new_receiver()): telem_stream_sender, telem_stream_receiver = OneshotChannel[ - Receiver[Sample[Quantity]] + BroadcastReceiver[Sample[Quantity]] ]() component_metric_request = ComponentMetricRequest( "test-namespace", From 4eb6278cc12ac860b8aaf53729c32e327648d973 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 16 Mar 2026 16:04:55 +0100 Subject: [PATCH 16/16] Update dependencies after rebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 39db0b5ed..5aaef4de5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "frequenz-client-microgrid >= 0.18.1, < 0.19.0", "frequenz-microgrid-component-graph >= 0.3.4, < 0.4", "frequenz-client-common >= 0.3.6, < 0.4.0", - "frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@oneshot", + "frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@auto-closing-broadcast", "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "numpy >= 2.1.0, < 3", "typing_extensions >= 4.14.1, < 5",