diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d36ca7298..8893779fb 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,3 +19,5 @@ - This fixes a rare power distributor bug where some battery inverters becoming unreachable because of network outages would lead to excess power values getting set. This is fixed by measuring the power of the unreachable inverters through their fallback meters and excluding that power from what is distributed to the other inverters. - Fixed stopping formulas: It will now also stop the evaluator and sub-formulas correctly. + +- Removed ChannelRegistry. Streams are not being set up using one-shot channels and owned by the data source. diff --git a/benchmarks/timeseries/benchmark_datasourcing.py b/benchmarks/timeseries/benchmark_datasourcing.py index dd430aa42..957ba0408 100644 --- a/benchmarks/timeseries/benchmark_datasourcing.py +++ b/benchmarks/timeseries/benchmark_datasourcing.py @@ -16,15 +16,17 @@ from time import perf_counter from typing import Any -from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError +from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk import microgrid -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, ) +from frequenz.sdk.timeseries import Sample try: from tests.timeseries.mock_microgrid import MockMicrogrid @@ -80,7 +82,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals name="DataSourcingActor Request Channel" ) - channel_registry = ChannelRegistry(name="Microgrid Channel Registry") request_receiver = request_channel.new_receiver( name="datasourcing-benchmark", limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)), @@ -105,18 +106,21 @@ async def consume(channel: Receiver[Any]) -> None: for evc_id in mock_grid.evc_ids: for component_metric_id in COMPONENT_METRIC_IDS: + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() request = ComponentMetricRequest( - "current_phase_requests", evc_id, component_metric_id, None + namespace="current_phase_requests", + component_id=evc_id, + metric=component_metric_id, + start_time=None, + telem_stream_sender=telem_stream_sender, ) - - recv_channel = channel_registry.get_or_create( - ComponentMetricRequest, request.get_channel_name() - ).new_receiver() - await request_sender.send(request) - consume_tasks.append(asyncio.create_task(consume(recv_channel))) + stream_receiver = await telem_stream_receiver.receive() + consume_tasks.append(asyncio.create_task(consume(stream_receiver))) - async with DataSourcingActor(request_receiver, channel_registry): + async with DataSourcingActor(request_receiver): await asyncio.gather(*consume_tasks) time_taken = perf_counter() - start_time diff --git a/pyproject.toml b/pyproject.toml index 4b7835460..5aaef4de5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "frequenz-client-microgrid >= 0.18.1, < 0.19.0", "frequenz-microgrid-component-graph >= 0.3.4, < 0.4", "frequenz-client-common >= 0.3.6, < 0.4.0", - "frequenz-channels >= 1.6.1, < 2.0.0", + "frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@auto-closing-broadcast", "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "numpy >= 2.1.0, < 3", "typing_extensions >= 4.14.1, < 5", diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 74b2f6929..ad5e02a13 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -4,12 +4,10 @@ """General purpose classes for use with channels.""" import abc -import dataclasses import logging -import traceback import typing -from frequenz.channels import Broadcast, Receiver +from frequenz.channels import Receiver _logger = logging.getLogger(__name__) @@ -61,141 +59,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: A receiver instance. """ return self._mapping_function(self._fetcher.new_receiver(limit=limit)) - - -class ChannelRegistry: - """Dynamically creates, own and provide access to broadcast channels. - - It can be used by actors to dynamically establish a communication channel - between each other. - - The registry is responsible for creating channels when they are first requested via - the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method. - - The registry also stores type information to make sure that the same channel is not - used for different message types. - - Since the registry owns the channels, it is also responsible for closing them when - they are no longer needed. There is no way to remove a channel without closing it. - - Note: - This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels. - """ - - def __init__(self, *, name: str) -> None: - """Initialize this registry. - - Args: - name: A name to identify the registry in the logs. This name is also used as - a prefix for the channel names. - """ - self._name = name - self._channels: dict[str, _Entry] = {} - - @property - def name(self) -> str: - """The name of this registry.""" - return self._name - - def message_type(self, key: str) -> type: - """Get the message type of the channel for the given key. - - Args: - key: The key to identify the channel. - - Returns: - The message type of the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.get(key) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - return entry.message_type - - def __contains__(self, key: str) -> bool: - """Check whether the channel for the given `key` exists.""" - return key in self._channels - - def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]: - """Get or create a channel for the given key. - - If a channel for the given key already exists, the message type of the existing - channel is checked against the requested message type. If they do not match, - a `ValueError` is raised. - - Note: - The types have to match exactly, it doesn't do a subtype check due to - technical limitations. In the future subtype checks might be supported. - - Args: - message_type: The type of the message that is sent through the channel. - key: The key to identify the channel. - - Returns: - The channel for the given key. - - Raises: - ValueError: If the channel exists and the message type does not match. - """ - if key not in self._channels: - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "Creating a new channel for key %r with type %s at:\n%s", - key, - message_type, - "".join(traceback.format_stack(limit=10)[:9]), - ) - self._channels[key] = _Entry( - message_type, Broadcast(name=f"{self._name}-{key}") - ) - - entry = self._channels[key] - if entry.message_type is not message_type: - error_message = ( - f"Type mismatch, a channel for key {key!r} exists and the requested " - f"message type {message_type} is not the same as the existing " - f"message type {entry.message_type}." - ) - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "%s at:\n%s", - error_message, - # We skip the last frame because it's this method, and limit the - # stack to 9 frames to avoid adding too much noise. - "".join(traceback.format_stack(limit=10)[:9]), - ) - raise ValueError(error_message) - - return typing.cast(Broadcast[T], entry.channel) - - async def close_and_remove(self, key: str) -> None: - """Remove the channel for the given key. - - Args: - key: The key to identify the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.pop(key, None) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - await entry.channel.close() - - -@dataclasses.dataclass(frozen=True) -class _Entry: - """An entry in a channel registry.""" - - message_type: type - """The type of the message that is sent through the channel in this entry.""" - - # We use object instead of Any to minimize the chances of hindering type checking. - # If for some reason the channel is not casted to the proper underlaying type, when - # using object at least accessing any member that's not part of the object base - # class will yield a type error, while if we used Any, it would not and the issue - # would be much harder to find. - channel: Broadcast[object] - """The channel in this entry.""" diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index aca47e4a9..4bf96f35d 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -20,7 +20,6 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import ResamplerConfig from ..timeseries._voltage_streamer import VoltageStreamer @@ -96,29 +95,22 @@ def __init__( """ self._resampler_config: ResamplerConfig = resampler_config - self._channel_registry: ChannelRegistry = ChannelRegistry( - name="Data Pipeline Registry" - ) - self._data_sourcing_actor: _ActorInfo | None = None self._resampling_actor: _ActorInfo | None = None self._battery_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=battery_power_manager_algorithm, default_power=DefaultPower.ZERO, component_class=Battery, ) self._ev_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MAX, component_class=EvCharger, ) self._pv_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MIN, @@ -158,7 +150,6 @@ def frequency(self) -> GridFrequency: if self._frequency_instance is None: self._frequency_instance = GridFrequency( self._data_sourcing_request_sender(), - self._channel_registry, ) return self._frequency_instance @@ -166,10 +157,7 @@ def frequency(self) -> GridFrequency: def voltage_per_phase(self) -> VoltageStreamer: """Return the per-phase voltage measuring point.""" if not self._voltage_instance: - self._voltage_instance = VoltageStreamer( - self._resampling_request_sender(), - self._channel_registry, - ) + self._voltage_instance = VoltageStreamer(self._resampling_request_sender()) return self._voltage_instance @@ -179,7 +167,6 @@ def logical_meter(self) -> LogicalMeter: if self._logical_meter is None: self._logical_meter = LogicalMeter( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._logical_meter @@ -190,7 +177,6 @@ def consumer(self) -> Consumer: if self._consumer is None: self._consumer = Consumer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._consumer @@ -201,7 +187,6 @@ def producer(self) -> Producer: if self._producer is None: self._producer = Producer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._producer @@ -213,7 +198,6 @@ def grid(self) -> Grid: if self._grid is None: initialize_grid( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) self._grid = get_grid() @@ -273,7 +257,6 @@ def new_ev_charger_pool( if ref_store_key not in self._ev_charger_pool_reference_stores: self._ev_charger_pool_reference_stores[ref_store_key] = ( EVChargerPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=self._ev_power_wrapper.status_channel.new_receiver( limit=1 @@ -346,7 +329,6 @@ def new_pv_pool( if ref_store_key not in self._pv_pool_reference_stores: self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=( self._pv_power_wrapper.status_channel.new_receiver(limit=1) @@ -422,7 +404,6 @@ def new_battery_pool( if ref_store_key not in self._battery_pool_reference_stores: self._battery_pool_reference_stores[ref_store_key] = ( BatteryPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), batteries_status_receiver=( self._battery_power_wrapper.status_channel.new_receiver(limit=1) @@ -461,7 +442,6 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: ) actor = DataSourcingActor( request_receiver=channel.new_receiver(limit=_REQUEST_RECV_BUFFER_SIZE), - registry=self._channel_registry, ) self._data_sourcing_actor = _ActorInfo(actor, channel) self._data_sourcing_actor.actor.start() @@ -482,7 +462,6 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]: name="Data Pipeline: Component Metric Resampling Actor Request Channel" ) actor = ComponentMetricsResamplingActor( - channel_registry=self._channel_registry, data_sourcing_request_sender=self._data_sourcing_request_sender(), resampling_request_receiver=channel.new_receiver( limit=_REQUEST_RECV_BUFFER_SIZE, diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py index 2fb8f5aa3..5087c04e5 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py @@ -6,13 +6,18 @@ from dataclasses import dataclass from datetime import datetime +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels._oneshot import OneshotSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk.microgrid._old_component_data import TransitionalMetric __all__ = ["ComponentMetricRequest", "Metric"] +from frequenz.sdk.timeseries import Sample + @dataclass class ComponentMetricRequest: @@ -51,6 +56,9 @@ class ComponentMetricRequest: If None, only live data is streamed. """ + telem_stream_sender: OneshotSender[BroadcastReceiver[Sample[Quantity]]] + """Sender of a oneshot channel used to send the data stream back to the requester.""" + def get_channel_name(self) -> str: """Construct the channel name based on the request parameters. diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py index 990e2b420..776768f29 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py @@ -5,7 +5,6 @@ from frequenz.channels import Receiver -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ._component_metric_request import ComponentMetricRequest from .microgrid_api_source import MicrogridApiSource @@ -17,7 +16,6 @@ class DataSourcingActor(Actor): def __init__( self, request_receiver: Receiver[ComponentMetricRequest], - registry: ChannelRegistry, *, name: str | None = None, ) -> None: @@ -25,14 +23,12 @@ def __init__( Args: request_receiver: A channel receiver to accept metric requests from. - registry: A channel registry. To be replaced by a singleton - instance. name: The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. """ super().__init__(name=name) self._request_receiver = request_receiver - self._microgrid_api_source = MicrogridApiSource(registry) + self._microgrid_api_source = MicrogridApiSource() async def _run(self) -> None: """Run the actor.""" diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index d43da8e58..8528435b0 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -8,14 +8,14 @@ from collections.abc import Callable from typing import Any -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver +from frequenz.channels._broadcast import BroadcastSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ComponentCategory from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...microgrid import connection_manager from ...timeseries import Sample from .._old_component_data import ( @@ -159,16 +159,8 @@ class MicrogridApiSource: Used by the DataSourcingActor. """ - def __init__( - self, - registry: ChannelRegistry, - ) -> None: - """Create a `MicrogridApiSource` instance. - - Args: - registry: A channel registry. To be replaced by a singleton - instance. - """ + def __init__(self) -> None: + """Create a `MicrogridApiSource` instance.""" self._comp_categories_cache: dict[ComponentId, ComponentCategory | int] = {} self.comp_data_receivers: dict[ComponentId, Receiver[Any]] = {} @@ -177,11 +169,13 @@ def __init__( self.comp_data_tasks: dict[ComponentId, asyncio.Task[None]] = {} """The dictionary of component IDs to asyncio tasks.""" - self._registry = registry self._req_streaming_metrics: dict[ ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} + self._channels: dict[str, Broadcast[Sample[Quantity]]] = {} + """Metric data channels by channel name, to enable reuse.""" + async def _get_component_category( self, comp_id: ComponentId ) -> ComponentCategory | int | None: @@ -397,11 +391,11 @@ def _get_data_extraction_method( _logger.error(err) raise ValueError(err) - def _get_metric_senders( + async def _get_metric_senders( self, category: ComponentCategory | int, requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], - ) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]: + ) -> list[tuple[Callable[[Any], float], list[BroadcastSender[Sample[Quantity]]]]]: """Get channel senders from the channel registry for each requested metric. Args: @@ -413,18 +407,23 @@ def _get_metric_senders( A dictionary of output metric names to channel senders from the channel registry. """ - return [ - ( - self._get_data_extraction_method(category, metric), - [ - self._registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ).new_sender() - for request in req_list - ], - ) - for (metric, req_list) in requests.items() - ] + all_senders = [] + for metric, req_list in requests.items(): + extraction_method = self._get_data_extraction_method(category, metric) + senders = [] + for request in req_list: + channel_name = request.get_channel_name() + # Create missing channels and inform the requesting side via oneshot + if channel_name not in self._channels: + telem_stream: Broadcast[Sample[Quantity]] = Broadcast( + name=channel_name + ) + self._channels[channel_name] = telem_stream + await request.telem_stream_sender.send(telem_stream.new_receiver()) + senders.append(self._channels[channel_name].new_sender()) + all_senders.append((extraction_method, senders)) + + return all_senders async def _handle_data_stream( self, @@ -446,7 +445,7 @@ async def _handle_data_stream( await self._check_requested_component_and_metrics( comp_id, category, self._req_streaming_metrics[comp_id] ) - stream_senders = self._get_metric_senders( + stream_senders = await self._get_metric_senders( category, self._req_streaming_metrics[comp_id] ) api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] diff --git a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py index cd575229a..f84b0b9ce 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py @@ -10,6 +10,8 @@ import enum import typing +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels._oneshot import OneshotSender from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -31,7 +33,10 @@ class ReportRequest: """The component IDs to report on.""" priority: int - """The priority of the actor .""" + """The priority of the actor.""" + + report_stream_sender: OneshotSender[BroadcastReceiver[_Report]] + """Oneshot sender to transmit the report receiver.""" def get_channel_name(self) -> str: """Get the channel name for the report request. diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 0dead0c29..b58737242 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -11,14 +11,13 @@ from datetime import datetime, timedelta, timezone from typing import assert_never -from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter from typing_extensions import override from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ...timeseries._base_types import SystemBounds from .. import _data_pipeline, _power_distributing @@ -46,7 +45,6 @@ def __init__( # pylint: disable=too-many-arguments bounds_subscription_receiver: Receiver[ReportRequest], power_distributing_requests_sender: Sender[_power_distributing.Request], power_distributing_results_receiver: Receiver[_power_distributing.Result], - channel_registry: ChannelRegistry, algorithm: PowerManagerAlgorithm, default_power: DefaultPower, component_class: type[Battery | EvCharger | SolarInverter], @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments requests. power_distributing_results_receiver: The receiver for power distribution results. - channel_registry: The channel registry. algorithm: The power management algorithm to use. default_power: The default power to use for the components. component_class: The class of component this instance is going to support. @@ -70,8 +67,8 @@ def __init__( # pylint: disable=too-many-arguments self._bounds_subscription_receiver = bounds_subscription_receiver self._power_distributing_requests_sender = power_distributing_requests_sender self._power_distributing_results_receiver = power_distributing_results_receiver - self._channel_registry = channel_registry self._proposals_receiver = proposals_receiver + self._channel_lookup: dict[str, Broadcast[_Report]] = {} self._system_bounds: dict[frozenset[ComponentId], SystemBounds] = {} self._bound_tracker_tasks: dict[frozenset[ComponentId], asyncio.Task[None]] = {} @@ -233,18 +230,27 @@ async def _run(self) -> None: component_ids = sub.component_ids priority = sub.priority + async def get_or_create_channel( + subscription: ReportRequest, + ) -> Broadcast[_Report]: + channel_name = subscription.get_channel_name() + if channel_name not in self._channel_lookup: + report_channel = Broadcast[_Report](name=channel_name) + report_channel.resend_latest = True + self._channel_lookup[channel_name] = report_channel + await subscription.report_stream_sender.send( + report_channel.new_receiver() + ) + return self._channel_lookup[channel_name] + if component_ids not in self._subscriptions: + channel = await get_or_create_channel(sub) self._subscriptions[component_ids] = { - priority: self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() + priority: channel.new_sender() } elif priority not in self._subscriptions[component_ids]: - self._subscriptions[component_ids][priority] = ( - self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() - ) + channel = await get_or_create_channel(sub) + self._subscriptions[component_ids][priority] = channel.new_sender() if sub.component_ids not in self._bound_tracker_tasks: self._add_system_bounds_tracker(sub.component_ids) diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 70e2e4e55..e82382335 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -11,7 +11,7 @@ from frequenz.channels import Broadcast from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry, ReceiverFetcher +from .._internal._channels import ReceiverFetcher # pylint seems to think this is a cyclic import, but it is not. # @@ -35,7 +35,6 @@ class PowerWrapper: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, - channel_registry: ChannelRegistry, *, api_power_request_timeout: timedelta, power_manager_algorithm: PowerManagerAlgorithm, @@ -45,7 +44,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize the power control. Args: - channel_registry: A channel registry for use in the actors. api_power_request_timeout: Timeout to use when making power requests to the microgrid API. power_manager_algorithm: The power management algorithm to use. @@ -55,7 +53,6 @@ def __init__( # pylint: disable=too-many-arguments self._default_power = default_power self._power_manager_algorithm = power_manager_algorithm self._component_class = component_class - self._channel_registry = channel_registry self._api_power_request_timeout = api_power_request_timeout self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast( @@ -109,7 +106,6 @@ def _start_power_managing_actor(self) -> None: power_distributing_results_receiver=( self._power_distribution_results_channel.new_receiver() ), - channel_registry=self._channel_registry, ) self._power_managing_actor.start() diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index b5c554513..fd0716f4a 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -5,14 +5,13 @@ import asyncio -import dataclasses import logging -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import Sample from ..timeseries._resampling._config import ResamplerConfig @@ -29,7 +28,6 @@ class ComponentMetricsResamplingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], config: ResamplerConfig, @@ -38,8 +36,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize an instance. Args: - channel_registry: The channel registry used to get senders and - receivers for data sourcing subscriptions. data_sourcing_request_sender: The sender used to send requests to the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor] to subscribe to component metrics. @@ -50,7 +46,6 @@ def __init__( # pylint: disable=too-many-arguments is used mostly for debugging purposes. """ super().__init__(name=name) - self._channel_registry: ChannelRegistry = channel_registry self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) @@ -58,7 +53,29 @@ def __init__( # pylint: disable=too-many-arguments resampling_request_receiver ) self._resampler: Resampler = Resampler(config) - self._active_req_channels: set[str] = set() + self._data_sink_channels: dict[str, Broadcast[Sample[Quantity]]] = {} + + async def _subscribe_to_data_source( + self, request: ComponentMetricRequest + ) -> BroadcastReceiver[Sample[Quantity]]: + """Subscribe to the data source using a new request. + + Args: + request: The original request from the resampler. + + Returns: + The metric data receiver. + """ + sender, receiver = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]() + data_source_request = ComponentMetricRequest( + namespace=request.namespace + ":Source", + component_id=request.component_id, + metric=request.metric, + start_time=request.start_time, + telem_stream_sender=sender, + ) + await self._data_sourcing_request_sender.send(data_source_request) + return await receiver.receive() async def _subscribe(self, request: ComponentMetricRequest) -> None: """Request data for a component metric. @@ -68,28 +85,24 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: """ request_channel_name = request.get_channel_name() - # If we are already handling this request, there is nothing to do. - if request_channel_name in self._active_req_channels: + # If we are already handling this request, answer the request by sending a + # new receiver from the existing channel. + if data_sink_channel := self._data_sink_channels.get(request_channel_name): + await request.telem_stream_sender.send(data_sink_channel.new_receiver()) return - self._active_req_channels.add(request_channel_name) + # Set up data source and sink channels + data_source = await self._subscribe_to_data_source(request) - data_source_request = dataclasses.replace( - request, namespace=request.namespace + ":Source" - ) - data_source_channel_name = data_source_request.get_channel_name() - await self._data_sourcing_request_sender.send(data_source_request) - receiver = self._channel_registry.get_or_create( - Sample[Quantity], data_source_channel_name - ).new_receiver() + data_sink_channel = Broadcast(name=request_channel_name, resend_latest=True) + await request.telem_stream_sender.send(data_sink_channel.new_receiver()) + self._data_sink_channels[request_channel_name] = data_sink_channel - # This is a temporary hack until the Sender implementation uses - # exceptions to report errors. - sender = self._channel_registry.get_or_create( - Sample[Quantity], request_channel_name - ).new_sender() - - self._resampler.add_timeseries(request_channel_name, receiver, sender.send) + self._resampler.add_timeseries( + name=request_channel_name, + source=data_source, + sink=data_sink_channel.new_sender().send, + ) async def _process_resampling_requests(self) -> None: """Process resampling data requests.""" diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 774b2a02b..585f68d4a 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -8,13 +8,13 @@ import asyncio import logging -from frequenz.channels import Receiver, Sender -from frequenz.client.common.microgrid.components import ComponentId +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels.experimental import Pipe from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Frequency, Quantity -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest @@ -23,34 +23,18 @@ _logger = logging.getLogger(__name__) -def create_request(component_id: ComponentId) -> ComponentMetricRequest: - """Create a request for grid frequency. - - Args: - component_id: The component id to use for the request. - - Returns: - A component metric request for grid frequency. - """ - return ComponentMetricRequest( - "grid-frequency", component_id, Metric.AC_FREQUENCY, None - ) - - class GridFrequency: """Grid Frequency.""" def __init__( self, data_sourcing_request_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source: Component | None = None, ): """Initialize the grid frequency formula generator. Args: data_sourcing_request_sender: The sender to use for requests. - channel_registry: The channel registry to use for the grid frequency. source: The source component to use to receive the grid frequency. """ if not source: @@ -63,13 +47,31 @@ def __init__( self._request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) - self._channel_registry: ChannelRegistry = channel_registry self._source_component: Component = source - self._component_metric_request: ComponentMetricRequest = create_request( - self._source_component.id + + # Microgrid API source will send the stream through a oneshot channel + telem_stream_sender, self._telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() + + self._component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, ) + # This channel merely forwards the telemetry stream. It is needed + # because we must return a receiver synchronously in new_receiver. + # The "real" channel for telemetry must be created in and owned by + # MicrogridApiSource, otherwise streams would not be reused. + self._forwarding_channel: Broadcast[Sample[Quantity]] | None = None + + # Sadly needed for testing self._task: None | asyncio.Task[None] = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[Sample[Quantity]] | None = None @property def source(self) -> Component: @@ -80,23 +82,11 @@ def source(self) -> Component: """ return self._source_component - def new_receiver(self) -> Receiver[Sample[Frequency]]: - """Create a receiver for grid frequency. - - Returns: - A receiver that will receive grid frequency samples. - """ - receiver = self._channel_registry.get_or_create( - Sample[Quantity], self._component_metric_request.get_channel_name() - ).new_receiver() - - if not self._task: - self._task = asyncio.create_task(self._send_request()) - else: - _logger.info( - "Grid frequency request already sent: %s", self._source_component - ) - + @staticmethod + def _map_frequency_samples( + receiver: Receiver[Sample[Quantity]], + ) -> Receiver[Sample[Frequency]]: + """Handle NaN values and map sample type to Frequency.""" return receiver.map( lambda sample: ( Sample[Frequency](sample.timestamp, None) @@ -107,7 +97,47 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: ) ) - async def _send_request(self) -> None: + def new_receiver(self) -> Receiver[Sample[Frequency]]: + """Create a receiver for grid frequency. + + Deprecated: + Use subscribe() instead. + + Returns: + A receiver that will receive grid frequency samples. + """ + if self._forwarding_channel is None: + self._forwarding_channel = Broadcast(name="Forward frequency samples") + self._task = asyncio.create_task( + self._send_request(self._forwarding_channel.new_sender()) + ) + + return self._map_frequency_samples(self._forwarding_channel.new_receiver()) + + async def subscribe(self) -> Receiver[Sample[Frequency]]: + """Create a receiver for grid frequency.""" + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() + component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, + ) + await self._request_sender.send(component_metric_request) + receiver = await telem_stream_receiver.receive() + return self._map_frequency_samples(receiver) + + async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> None: """Send the request for grid frequency.""" await self._request_sender.send(self._component_metric_request) _logger.debug("Sent request for grid frequency: %s", self._source_component) + + # Receive the telemetry stream and forward it via pipe + telem_receiver: Receiver[Sample[Quantity]] = ( + await self._telem_stream_receiver.receive() + ) + self._pipe = Pipe(telem_receiver, forwarding_sender) + await self._pipe.start() diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 4759b362f..075e9cc4b 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -13,12 +13,12 @@ import logging from typing import TYPE_CHECKING -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..timeseries._base_types import Sample, Sample3Phase @@ -55,7 +55,6 @@ class VoltageStreamer: def __init__( self, resampler_subscription_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source_component: Component | None = None, ): """Initialize the phase-to-neutral voltage streaming. @@ -63,8 +62,6 @@ def __init__( Args: resampler_subscription_sender: The sender for sending metric requests to the resampling actor. - channel_registry: The channel registry for the phase-to-neutral - voltage streaming. source_component: The source component to receive the phase-to-neutral voltage. If None, it fetches the source component from the connection manager. @@ -73,9 +70,6 @@ def __init__( self._resampler_subscription_sender = resampler_subscription_sender """The sender for sending metric requests to the resampling actor.""" - self._channel_registry = channel_registry - """The channel registry for the phase-to-neutral voltage streaming.""" - from ..microgrid import ( # pylint: disable=import-outside-toplevel connection_manager, ) @@ -99,6 +93,8 @@ def __init__( self._channel_key = f"{self._namespace}-all-phases" """The channel key for the phase-to-neutral voltage streaming.""" + self._telem_channel: Broadcast[Sample3Phase[Voltage]] | None = None + @property def source(self) -> Component: """Get the component to fetch the phase-to-neutral voltage from. @@ -108,6 +104,14 @@ def source(self) -> Component: """ return self._source_component + @property + def _channel(self) -> Broadcast[Sample3Phase[Voltage]]: + if self._telem_channel is None: + self._telem_channel = Broadcast[Sample3Phase[Voltage]]( + name=self._channel_key + ) + return self._telem_channel + def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: """Create a receiver for the phase-to-neutral voltage. @@ -118,9 +122,7 @@ def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: A receiver that will receive the phase-to-neutral voltage as a 3-phase sample. """ - receiver = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_receiver() + receiver = self._channel.new_receiver() if not self._task: self._task = asyncio.create_task(self._send_request()) @@ -143,21 +145,21 @@ async def _send_request(self) -> None: ) phases_rx: list[Receiver[Sample[Quantity]]] = [] for metric in metrics: + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() req = ComponentMetricRequest( - self._namespace, self._source_component.id, metric, None + namespace=self._namespace, + component_id=self._source_component.id, + metric=metric, + start_time=None, + telem_stream_sender=telem_stream_sender, ) await self._resampler_subscription_sender.send(req) + phases_rx.append(await telem_stream_receiver.receive()) - phases_rx.append( - self._channel_registry.get_or_create( - Sample[Quantity], req.get_channel_name() - ).new_receiver() - ) - - sender = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_sender() + sender = self._channel.new_sender() _logger.debug( "Sent request for fetching voltage from: %s", self._source_component diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 3be021a85..bf3c6f36c 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -12,12 +12,16 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature from ... import timeseries from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Sample from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -74,6 +78,9 @@ def __init__( unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -329,22 +336,48 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + BroadcastReceiver[_Report] + ]() + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store._batteries, + report_stream_sender=report_stream_sender, ) - self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, + ) + + self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store._channel_registry.get_or_create( - _power_managing._Report, sub.get_channel_name() + + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store._power_manager_bounds_subscription_sender.send( + request ) - channel.resend_latest = True - return channel + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py index 68a03f597..34d549174 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py @@ -15,7 +15,7 @@ from frequenz.client.microgrid.component import Battery from ..._internal._asyncio import cancel_and_await -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import Result @@ -41,7 +41,6 @@ class BatteryPoolReferenceStore: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], batteries_status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -53,8 +52,6 @@ def __init__( # pylint: disable=too-many-arguments """Create the class instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. batteries_status_receiver: Receiver to receive status of the batteries. @@ -125,13 +122,11 @@ def __init__( # pylint: disable=too-many-arguments self._power_bounds_subs: dict[str, asyncio.Task[None]] = {} self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}" self._power_distributing_namespace: str = f"power-distributor-{self._namespace}" - self._channel_registry: ChannelRegistry = channel_registry self._power_dist_results_fetcher: ReceiverFetcher[Result] = ( power_distribution_results_fetcher ) self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/consumer.py b/src/frequenz/sdk/timeseries/consumer.py index 0376c0444..50d40c1a6 100644 --- a/src/frequenz/sdk/timeseries/consumer.py +++ b/src/frequenz/sdk/timeseries/consumer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Consumer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the consumer formula generator. Args: - channel_registry: The channel registry to use for the consumer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"consumer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 77327eb7d..6653d1845 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -8,11 +8,15 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -61,6 +65,9 @@ def __init__( # pylint: disable=too-many-arguments unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -171,23 +178,46 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + BroadcastReceiver[_Report] + ]() + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, + ) + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py index 7e4d4c27b..8b7e00971 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -11,7 +11,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import EvCharger -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -37,7 +37,6 @@ class EVChargerPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -48,8 +47,6 @@ def __init__( # pylint: disable=too-many-arguments """Create an instance of the class. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the EV Chargers in @@ -68,7 +65,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the specified component_ids are not EV chargers or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -96,7 +92,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py index d0887c7e2..107ab449a 100644 --- a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py +++ b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py @@ -15,7 +15,6 @@ ResampledStreamFetcher, ) -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ._formula import Formula from ._formula_3_phase import Formula3Phase @@ -44,20 +43,16 @@ class FormulaPool: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a new instance. Args: namespace: namespace to use with the data pipeline. - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -276,7 +271,6 @@ def _telemetry_fetcher(self, metric: Metric) -> ResampledStreamFetcher: """ return ResampledStreamFetcher( self._namespace, - self._channel_registry, self._resampler_subscription_sender, metric, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index 7ad45ec53..834f62562 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -3,13 +3,13 @@ """Fetches telemetry streams for components.""" -from frequenz.channels import Receiver, Sender +from frequenz.channels import OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity from frequenz.sdk.timeseries import Sample -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest, Metric from ...microgrid._old_component_data import TransitionalMetric @@ -20,7 +20,6 @@ class ResampledStreamFetcher: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], metric: Metric | TransitionalMetric, ): @@ -29,14 +28,11 @@ def __init__( Args: namespace: The unique namespace to allow reuse of streams in the data pipeline. - channel_registry: The channel registry instance shared with the resampling - and the data sourcing actors. resampler_subscription_sender: A sender to send metric requests to the resampling actor. metric: The metric to fetch for all components in this formula. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -54,18 +50,17 @@ async def fetch_stream( Returns: A receiver to stream resampled data for the given component id. """ + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() + request = ComponentMetricRequest( self._namespace, component_id, self._metric, None, + telem_stream_sender, ) - chan = self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ) - chan.resend_latest = True - await self._resampler_subscription_sender.send(request) - - return chan.new_receiver() + return await telem_stream_receiver.receive() diff --git a/src/frequenz/sdk/timeseries/grid.py b/src/frequenz/sdk/timeseries/grid.py index b79a690f9..f9c788b10 100644 --- a/src/frequenz/sdk/timeseries/grid.py +++ b/src/frequenz/sdk/timeseries/grid.py @@ -16,7 +16,6 @@ from frequenz.client.microgrid.component import GridConnectionPoint from frequenz.quantities import Current, Power, ReactivePower -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from ._fuse import Fuse @@ -155,14 +154,11 @@ async def stop(self) -> None: def initialize( - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the grid connection. Args: - channel_registry: The channel registry instance shared with the - resampling actor. resampler_subscription_sender: The sender for sending metric requests to the resampling actor. @@ -204,7 +200,6 @@ def initialize( namespace = f"grid-{uuid.uuid4()}" formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index 00df8f776..220581125 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -12,7 +12,6 @@ from frequenz.sdk.microgrid import connection_manager -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ..formulas._formula import Formula from ..formulas._formula_pool import FormulaPool @@ -57,7 +56,6 @@ class LogicalMeter: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a `LogicalMeter` instance. @@ -68,12 +66,9 @@ def __init__( for creating `LogicalMeter` instances. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -83,7 +78,6 @@ def __init__( self._namespace: str = f"logical-meter-{uuid.uuid4()}" self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, self._resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/producer.py b/src/frequenz/sdk/timeseries/producer.py index 0c82ae16c..0aa89bd2f 100644 --- a/src/frequenz/sdk/timeseries/producer.py +++ b/src/frequenz/sdk/timeseries/producer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Producer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the producer formula generator. Args: - channel_registry: The channel registry to use for the producer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"producer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 6bbb91e15..8d7b54ec7 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -7,6 +7,9 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -14,6 +17,7 @@ from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -56,6 +60,9 @@ def __init__( # pylint: disable=too-many-arguments unique_id = uuid.uuid4() self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -143,23 +150,46 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = OneshotChannel[ + BroadcastReceiver[_Report] + ]() + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, + ) + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py index e893f9a36..d8b3ed641 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -12,7 +12,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import SolarInverter -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -38,7 +38,6 @@ class PVPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -49,8 +48,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize this instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the PV inverters in @@ -69,7 +66,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the provided component_ids are not PV inverters or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -98,7 +94,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"pv-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) self.bounds_channel: Broadcast[SystemBounds] = Broadcast( diff --git a/tests/actor/test_channel_registry.py b/tests/actor/test_channel_registry.py deleted file mode 100644 index 2966a9ad3..000000000 --- a/tests/actor/test_channel_registry.py +++ /dev/null @@ -1,69 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for the ChannelRegistry.""" - -import pytest -from frequenz.channels import ReceiverError, SenderError - -from frequenz.sdk._internal._channels import ChannelRegistry - - -async def test_channel_registry() -> None: - """Tests for ChannelRegistry, with string as key type.""" - reg = ChannelRegistry(name="test-registry") - - assert "20-hello" not in reg - assert "21-hello" not in reg - - chan20 = reg.get_or_create(int, "20-hello") - assert "20-hello" in reg - assert reg.message_type("20-hello") == int - - with pytest.raises(ValueError): - reg.get_or_create(str, "20-hello") - - sender20 = chan20.new_sender() - receiver20 = chan20.new_receiver() - - assert "21-hello" not in reg - - chan21 = reg.get_or_create(int, "21-hello") - assert "21-hello" in reg - assert reg.message_type("21-hello") == int - - sender21 = chan21.new_sender() - receiver21 = chan21.new_receiver() - - await sender20.send(30) - await sender21.send(31) - - rcvd = await receiver21.receive() - assert rcvd == 31 - - rcvd = await receiver20.receive() - assert rcvd == 30 - - await reg.close_and_remove("20-hello") - assert "20-hello" not in reg - assert chan20._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender20.send(30) - with pytest.raises(ReceiverError): - await receiver20.receive() - with pytest.raises(KeyError): - reg.message_type("20-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("20-hello") - - await reg.close_and_remove("21-hello") - assert "21-hello" not in reg - assert chan21._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender21.send(30) - with pytest.raises(ReceiverError): - await receiver21.receive() - with pytest.raises(KeyError): - reg.message_type("21-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("21-hello") diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 9b15339de..41d94a1e6 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -3,18 +3,17 @@ """Frequenz Python SDK resampling example.""" import asyncio -import dataclasses from datetime import datetime, timedelta, timezone import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, OneshotChannel, Receiver, Sender +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ComponentMetricRequest from frequenz.sdk.microgrid._resampling import ComponentMetricsResamplingActor from frequenz.sdk.timeseries import ResamplerConfig2, Sample @@ -31,19 +30,10 @@ def _now() -> datetime: async def _assert_resampling_works( - channel_registry: ChannelRegistry, + timeseries_sender: Sender[Sample[Quantity]], + timeseries_receiver: Receiver[Sample[Quantity]], fake_time: time_machine.Coordinates, - *, - resampling_chan_name: str, - data_source_chan_name: str, ) -> None: - timeseries_receiver = channel_registry.get_or_create( - Sample[Quantity], resampling_chan_name - ).new_receiver() - timeseries_sender = channel_registry.get_or_create( - Sample[Quantity], data_source_chan_name - ).new_sender() - fake_time.shift(0.2) new_sample = await timeseries_receiver.receive() # At 0.2s (timer) assert new_sample == Sample(_now(), None) @@ -103,14 +93,12 @@ async def test_single_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -118,25 +106,35 @@ async def test_single_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) data_source_req = await data_source_req_recv.receive() assert data_source_req is not None - assert data_source_req == dataclasses.replace( - subs_req, namespace="Resampling:Source" - ) + + assert data_source_req.namespace == "Resampling:Source" + assert data_source_req.component_id == ComponentId(9) + assert data_source_req.metric == Metric.BATTERY_SOC_PCT + assert data_source_req.start_time is None + assert data_source_req.telem_stream_sender != telem_stream_sender + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access @@ -146,14 +144,12 @@ async def test_duplicate_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -161,11 +157,15 @@ async def test_duplicate_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) @@ -176,11 +176,88 @@ async def test_duplicate_request( with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1) + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, + ) + + await resampling_actor._resampler.stop() # pylint: disable=protected-access + + +async def test_resubscribe(fake_time: time_machine.Coordinates) -> None: + """Test that resampling works when e receiver resubscribes. + + For example, Coalesce may close its receivers and resubscribe to the + same component later on. ComponentMetricsResamplingActor must provide + a new receiver in that case. + """ + data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") + data_source_req_recv = data_source_req_chan.new_receiver() + resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") + resampling_req_sender = resampling_req_chan.new_sender() + + async with ComponentMetricsResamplingActor( + data_sourcing_request_sender=data_source_req_chan.new_sender(), + resampling_request_receiver=resampling_req_chan.new_receiver(), + config=ResamplerConfig2( + resampling_period=timedelta(seconds=0.2), + max_data_age_in_periods=2, + ), + ) as resampling_actor: + + async def send_metric_request() -> Receiver[Receiver[Sample[Quantity]]]: + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() + subs_req = ComponentMetricRequest( + namespace="Resampling", + component_id=ComponentId(9), + metric=Metric.BATTERY_SOC_PCT, + start_time=None, + telem_stream_sender=telem_stream_sender, + ) + await resampling_req_sender.send(subs_req) + return telem_stream_receiver + + telem_stream_receiver = await send_metric_request() + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + data_source_req = await data_source_req_recv.receive() + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + resampled_stream_receiver = await telem_stream_receiver.receive() + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, + ) + + resampled_stream_receiver.close() + + # Resubscribe to the same metric data + telem_stream_receiver = await send_metric_request() + resampled_stream_receiver = await telem_stream_receiver.receive() + + # No need to answer the request in the data sourcing actor - The resampler answers + + # New subscriptions receive the latest resampled value immediately. + # This must be drained for _assert_resampling_works to have a clean start. + resent_sample = await resampled_stream_receiver.receive() + assert resent_sample is not None + assert resent_sample.value is None + assert resent_sample.timestamp == _now() + await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index bcc4e5857..c0627259d 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -11,7 +11,8 @@ import pytest import pytest_mock -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, OneshotChannel +from frequenz.channels._broadcast import BroadcastReceiver from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ( @@ -24,7 +25,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, @@ -84,88 +84,45 @@ def mock_connection_manager(mocker: pytest_mock.MockFixture) -> mock.Mock: return mock_conn_manager +@pytest.mark.parametrize( + ("component_id", "metric", "expected_sample_value"), + [ + (ComponentId(4), Metric.AC_ACTIVE_POWER, 100.0), + (ComponentId(4), Metric.AC_REACTIVE_POWER, 100.0), + (ComponentId(6), Metric.AC_ACTIVE_POWER, 0.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(12), Metric.AC_ACTIVE_POWER, -13.0), + ], +) async def test_data_sourcing_actor( # pylint: disable=too-many-locals mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument + component_id: ComponentId, + metric: Metric, + expected_sample_value: float, ) -> None: """Tests for the DataSourcingActor.""" req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") req_sender = req_chan.new_sender() - registry = ChannelRegistry(name="test-registry") - - async with DataSourcingActor(req_chan.new_receiver(), registry): - active_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_4 = registry.get_or_create( - Sample[Quantity], active_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_4) - - reactive_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_REACTIVE_POWER, None - ) - reactive_power_recv_4 = registry.get_or_create( - Sample[Quantity], reactive_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(reactive_power_request_4) - - active_power_request_6 = ComponentMetricRequest( - "test-namespace", ComponentId(6), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_6 = registry.get_or_create( - Sample[Quantity], active_power_request_6.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_6) - - soc_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None + async with DataSourcingActor(req_chan.new_receiver()): + telem_stream_sender, telem_stream_receiver = OneshotChannel[ + BroadcastReceiver[Sample[Quantity]] + ]() + component_metric_request = ComponentMetricRequest( + "test-namespace", + component_id, + metric, + None, + telem_stream_sender, ) - soc_recv_9 = registry.get_or_create( - Sample[Quantity], soc_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc_request_9) - - soc2_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None - ) - soc2_recv_9 = registry.get_or_create( - Sample[Quantity], soc2_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc2_request_9) - - active_power_request_12 = ComponentMetricRequest( - "test-namespace", ComponentId(12), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_12 = registry.get_or_create( - Sample[Quantity], active_power_request_12.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_12) - - for i in range(3): - sample = await active_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await reactive_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await active_power_recv_6.receive() - assert sample.value is not None - assert 0.0 + i == sample.value.base_value - - sample = await soc_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value - - sample = await soc2_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value + await req_sender.send(component_metric_request) + telem_stream = await telem_stream_receiver.receive() - sample = await active_power_recv_12.receive() + for i in range(10): + sample = await telem_stream.receive() assert sample.value is not None - assert -13.0 + i == sample.value.base_value + assert expected_sample_value + i == sample.value.base_value def _new_meter_data( diff --git a/tests/timeseries/_formulas/utils.py b/tests/timeseries/_formulas/utils.py index 471d7e9c7..ee0ae1b22 100644 --- a/tests/timeseries/_formulas/utils.py +++ b/tests/timeseries/_formulas/utils.py @@ -31,7 +31,6 @@ async def get_resampled_stream( # pylint: disable=protected-access builder = ResampledStreamFetcher( namespace=namespace, - channel_registry=_data_pipeline._get()._channel_registry, resampler_subscription_sender=_data_pipeline._get()._resampling_request_sender(), metric=metric, ) diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 8ae6b0920..b3e221683 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -41,13 +41,18 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument ) -> None: """Create a `MockDataPipeline` instance.""" self._data_pipeline = _DataPipeline(resampler_config) - - self._channel_registry = self._data_pipeline._channel_registry + self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} self._resampler_request_channel = Broadcast[ComponentMetricRequest]( - name="resampler-request" + name="resampler-request", + resend_latest=True, ) self._input_channels_receivers: dict[str, list[Receiver[Sample[Quantity]]]] = {} + def get_or_create_channel(name: str) -> Broadcast[Sample[Quantity]]: + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast[Sample[Quantity]](name=name) + return self._channel_lookup[name] + def metric_senders( comp_ids: list[ComponentId], metric_id: Metric, @@ -55,15 +60,9 @@ def metric_senders( senders: list[Sender[Sample[Quantity]]] = [] for comp_id in comp_ids: name = f"{comp_id}:{metric_id}" - senders.append( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + senders.append(get_or_create_channel(name).new_sender()) self._input_channels_receivers[name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_receiver() + get_or_create_channel(name).new_receiver() for _ in range(namespaces) ] return senders @@ -115,33 +114,21 @@ def multi_phase_senders( senders.append( [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_sender(), + get_or_create_channel(p1_name).new_sender(), + get_or_create_channel(p2_name).new_sender(), + get_or_create_channel(p3_name).new_sender(), ] ) self._input_channels_receivers[p1_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_receiver() + get_or_create_channel(p1_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p2_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_receiver() + get_or_create_channel(p2_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p3_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_receiver() + get_or_create_channel(p3_name).new_receiver() for _ in range(namespaces) ] return senders @@ -237,16 +224,26 @@ async def _channel_forward_messages( async def _handle_resampling_requests(self) -> None: async for request in self._resampler_request_channel.new_receiver(): name = request.get_channel_name() + if name in self._forward_tasks: + # Forward task exists, but we must create a new receiver + # from the existing channel and return it to the request sender. + assert name in self._channel_lookup + output_channel = self._channel_lookup[name] + await request.telem_stream_sender.send(output_channel.new_receiver()) continue + input_chan_recv_name = f"{request.component_id}:{request.metric}" input_chan_recv = self._input_channels_receivers[input_chan_recv_name].pop() assert input_chan_recv is not None - output_chan_sender: Sender[Sample[Quantity]] = ( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast(name=name, resend_latest=True) + + output_channel = self._channel_lookup[name] + output_chan_sender = output_channel.new_sender() + await request.telem_stream_sender.send(output_channel.new_receiver()) + task = asyncio.create_task( self._channel_forward_messages( input_chan_recv, diff --git a/tests/timeseries/test_frequency_streaming.py b/tests/timeseries/test_frequency_streaming.py index 25077c3b9..ef47f1ce8 100644 --- a/tests/timeseries/test_frequency_streaming.py +++ b/tests/timeseries/test_frequency_streaming.py @@ -7,6 +7,7 @@ import asyncio from datetime import datetime, timezone +import pytest from frequenz.quantities import Frequency from pytest_mock import MockerFixture @@ -19,7 +20,8 @@ # pylint: disable=protected-access -async def test_grid_frequency_none(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_none(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True) mockgrid.add_batteries(2) @@ -27,13 +29,16 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.start(mocker) grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) await mockgrid.mock_client.send( component_data_wrapper.MeterDataWrapper( @@ -47,7 +52,8 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.cleanup() -async def test_grid_frequency_1(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_1(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True, mocker=mocker) mockgrid.add_batteries(2) @@ -55,13 +61,16 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] grid_meter_data = [] @@ -82,8 +91,9 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: assert equal_float_lists(results, grid_meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter_no_consumer_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -94,12 +104,16 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -120,8 +134,9 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -131,12 +146,16 @@ async def test_grid_frequency_no_grid_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -157,8 +176,9 @@ async def test_grid_frequency_no_grid_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_only_inverter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without any meter but only inverters.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -166,12 +186,16 @@ async def test_grid_frequency_only_inverter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = []