-
Notifications
You must be signed in to change notification settings - Fork 21
Implement EventResampler for Cascaded Resampling #1372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.x.x
Are you sure you want to change the base?
Changes from all commits
7970a71
03c253a
20615e7
75c286e
a9e3030
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,213 @@ | ||
| # License: MIT | ||
| # Copyright © 2026 Frequenz Energy-as-a-Service GmbH | ||
|
|
||
| """Event-driven resampler for cascaded resampling stages.""" | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from datetime import datetime, timedelta, timezone | ||
|
|
||
| from frequenz.quantities import Quantity | ||
|
|
||
| from .._base_types import Sample | ||
| from ._base_types import Sink, Source | ||
| from ._config import ResamplerConfig | ||
| from ._resampler import Resampler, _ResamplingHelper, _StreamingHelper | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class EventResampler(Resampler): | ||
| """Event-driven resampler for cascaded resampling stages. | ||
|
|
||
| Unlike the standard Timer-based Resampler which uses fixed wall-clock | ||
| intervals, EventResampler is triggered by incoming data. Windows are | ||
| emitted when a sample arrives that falls outside the current window, | ||
| not on a fixed timer schedule. | ||
|
|
||
| Problem Solved: | ||
| When cascading Timer-based resamplers (e.g., 1s → 10s) with | ||
| align_to=UNIX_EPOCH, samples can be lost at window boundaries due to | ||
| timing synchronization issues. EventResampler eliminates this by | ||
| opening/closing windows based on actual data arrival. | ||
|
|
||
| Important: This resampler is optimized for continuous data streams | ||
| where samples arrive at regular or semi-regular intervals. It is not | ||
| suitable for handling raw, irregular data directly from sources. | ||
|
|
||
| Best Used: | ||
| Stage 1: Timer-based Resampler (handles raw, irregular data) | ||
| Stage 2+: Event-based Resampler (handles continuous data from Stage 1) | ||
|
|
||
| Example: | ||
| config = ResamplerConfig( | ||
| resampling_period=timedelta(seconds=10), | ||
| resampling_function=..., | ||
| ) | ||
| resampler = EventResampler(config) | ||
| resampler.add_timeseries("my_source", source, sink) | ||
| await resampler.resample() | ||
|
|
||
| Note: If a long gap occurs without incoming samples (no data for multiple periods), | ||
| the corresponding windows will be emitted all at once when data resumes. This is | ||
| acceptable for cascaded resampling since the input typically comes from another | ||
| Resampler with guaranteed continuous output. | ||
| """ | ||
|
|
||
| # pylint: disable=super-init-not-called | ||
| def __init__(self, config: ResamplerConfig) -> None: | ||
| """Initialize EventResampler. | ||
|
|
||
| This does not call super().__init__() to avoid starting any timers | ||
|
|
||
| Args: | ||
| config: Resampler configuration | ||
| """ | ||
| self._config = config | ||
| """The configuration for this resampler.""" | ||
|
|
||
| self._resamplers: dict[Source, _StreamingHelper] = {} | ||
| """A mapping between sources and the streaming helper handling that source.""" | ||
|
|
||
| window_end, _ = self._calculate_window_end() | ||
| self._window_end: datetime = window_end | ||
| """The time in which the current window ends. | ||
|
|
||
| This is used to make sure every resampling window is generated at | ||
| precise times. We can't rely on the timer timestamp because timers will | ||
| never fire at the exact requested time, so if we don't use a precise | ||
| time for the end of the window, the resampling windows we produce will | ||
| have different sizes. | ||
|
|
||
| The window end will also be aligned to the `config.align_to` time, so | ||
| the window end is deterministic. | ||
| """ | ||
|
|
||
| self._window_lock = asyncio.Lock() | ||
| """Lock protecting access to `_window_end` during window state transitions.""" | ||
|
|
||
| self._sample_queue: asyncio.Queue[Sample[Quantity]] = asyncio.Queue() | ||
| """Queue for samples awaiting processing. Filled by `_StreamingHelper` callbacks, | ||
| consumed by the event loop in `resample()`. | ||
| """ | ||
|
|
||
| # OVERRIDDEN: Register callback to receive samples asynchronously for | ||
| # event-driven window management. | ||
| def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: | ||
| """Start resampling a new timeseries. | ||
|
|
||
| Registers the timeseries and sets up a sample callback to enqueue | ||
| incoming samples for event-driven processing. | ||
|
|
||
| Args: | ||
| name: The name of the timeseries (for logging purposes). | ||
| source: The source of the timeseries to resample. | ||
| sink: The sink to use to send the resampled data. | ||
|
|
||
| Returns: | ||
| `True` if the timeseries was added, `False` if the timeseries was | ||
| not added because there already a timeseries using the provided | ||
| receiver. | ||
| """ | ||
| if source in self._resamplers: | ||
| return False | ||
|
|
||
| resampler = _StreamingHelper( | ||
| _ResamplingHelper(name, self._config), source, sink | ||
| ) | ||
|
|
||
| # Register the callback to receive samples from the streaming helper. | ||
| resampler.register_sample_callback(self._enqueue_sample) | ||
|
|
||
| self._resamplers[source] = resampler | ||
| return True | ||
|
|
||
| async def _enqueue_sample(self, sample: Sample[Quantity]) -> None: | ||
| """Add a sample to the processing queue. | ||
|
|
||
| Args: | ||
| sample: The sample to enqueue. | ||
| """ | ||
| await self._sample_queue.put(sample) | ||
|
|
||
| # OVERRIDDEN: no warm-up period needed for event-driven sample accumulation. | ||
| def _calculate_window_end(self) -> tuple[datetime, timedelta]: | ||
| """Calculate the end of the first resampling window. | ||
|
|
||
| Calculates the next multiple of resampling_period after the current time, | ||
| respecting align_to configuration. | ||
|
|
||
| Returns: | ||
| A tuple (window_end, delay_time) where: | ||
| - window_end: datetime when the first window should end | ||
| - delay_time: always timedelta(0) for EventResampler | ||
| """ | ||
| now = datetime.now(timezone.utc) | ||
| period = self._config.resampling_period | ||
| align_to = self._config.align_to | ||
|
|
||
| if align_to is None: | ||
| return (now + period, timedelta(0)) | ||
|
|
||
| elapsed = (now - align_to) % period | ||
|
|
||
| return ( | ||
| (now + (period - elapsed), timedelta(0)) | ||
| if elapsed > timedelta(0) | ||
| else (now, timedelta(0)) | ||
| ) | ||
|
|
||
| async def resample(self, *, one_shot: bool = False) -> None: | ||
| """Start event-driven resampling. | ||
|
|
||
| Processes incoming samples from the queue continuously. Windows are | ||
| emitted when a sample arrives with a timestamp >= current window_end. | ||
| This is in contrast to Timer-based resampling which emits windows at | ||
| fixed intervals regardless of data arrival. | ||
|
|
||
| Args: | ||
| one_shot: If True, waits for the first window to be emitted, then exits. | ||
|
|
||
| Raises: | ||
| asyncio.CancelledError: If the task is cancelled. | ||
| """ | ||
| try: | ||
| while True: | ||
| sample = await self._sample_queue.get() | ||
| emmitted = await self._process_sample(sample) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. emitted A few more occurrences below. |
||
|
|
||
| if one_shot and emmitted: | ||
| return | ||
|
|
||
| except asyncio.CancelledError: | ||
| _logger.info("EventResampler task cancelled") | ||
| raise | ||
|
|
||
| async def _process_sample( | ||
| self, | ||
| sample: Sample[Quantity], | ||
| ) -> bool: | ||
| """Process an incoming sample and manage window state. | ||
|
|
||
| This method checks if the incoming sample falls outside the current | ||
| window and emits completed windows as needed. Returns True if any | ||
| windows were emitted. | ||
|
|
||
| Args: | ||
| sample: Incoming sample to process | ||
|
|
||
| Returns: | ||
| True if at least one window was emitted, False otherwise. | ||
| """ | ||
| async with self._window_lock: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| emmitted = False | ||
| while sample.timestamp >= self._window_end: | ||
| _logger.debug( | ||
| "EventResampler: Sample at %s >= window end %s, closing window", | ||
| sample.timestamp, | ||
| self._window_end, | ||
| ) | ||
| await self._emit_window(self._window_end) | ||
| self._window_end += self._config.resampling_period | ||
| emmitted = True | ||
| return emmitted | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,7 @@ | |
| from bisect import bisect, bisect_left | ||
| from collections import deque | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import assert_never | ||
| from typing import Awaitable, Callable, assert_never | ||
|
|
||
| from frequenz.channels.timer import Timer, TriggerAllMissed, _to_microseconds | ||
| from frequenz.quantities import Quantity | ||
|
|
@@ -160,14 +160,6 @@ async def resample(self, *, one_shot: bool = False) -> None: | |
| Args: | ||
| one_shot: Wether the resampling should run only for one resampling | ||
| period. | ||
|
|
||
| Raises: | ||
| ResamplingError: If some timeseries source or sink encounters any | ||
| errors while receiving or sending samples. In this case the | ||
| timer still runs and the timeseries will keep receiving data. | ||
| The user should remove (and re-add if desired) the faulty | ||
| timeseries from the resampler before calling this method | ||
| again). | ||
| """ | ||
| # We use a tolerance of 10% of the resampling period | ||
| tolerance = timedelta( | ||
|
|
@@ -200,28 +192,45 @@ async def resample(self, *, one_shot: bool = False) -> None: | |
| case unexpected: | ||
| assert_never(unexpected) | ||
|
|
||
| # We need to make a copy here because we need to match the results to the | ||
| # current resamplers, and since we await here, new resamplers could be added | ||
| # or removed from the dict while we awaiting the resampling, which would | ||
| # cause the results to be out of sync. | ||
| resampler_sources = list(self._resamplers) | ||
| results = await asyncio.gather( | ||
| *[r.resample(next_tick_time) for r in self._resamplers.values()], | ||
| return_exceptions=True, | ||
| ) | ||
| await self._emit_window(next_tick_time) | ||
|
|
||
| exceptions = { | ||
| source: result | ||
| for source, result in zip(resampler_sources, results) | ||
| # CancelledError inherits from BaseException, but we don't want | ||
| # to catch *all* BaseExceptions here. | ||
| if isinstance(result, (Exception, asyncio.CancelledError)) | ||
| } | ||
| if exceptions: | ||
| raise ResamplingError(exceptions) | ||
| if one_shot: | ||
| break | ||
|
|
||
| async def _emit_window(self, window_end: datetime) -> None: | ||
| """Emit resampled samples for all timeseries at the given window boundary. | ||
|
|
||
| Args: | ||
| window_end: The timestamp marking the end of the resampling window. | ||
|
|
||
| Raises: | ||
| ResamplingError: If some timeseries source or sink encounters any | ||
| errors while receiving or sending samples. In this case the | ||
| timer still runs and the timeseries will keep receiving data. | ||
| The user should remove (and re-add if desired) the faulty | ||
| timeseries from the resampler before calling this method | ||
| again). | ||
| """ | ||
| # We need to make a copy here because we need to match the results to the | ||
| # current resamplers, and since we await here, new resamplers could be added | ||
| # or removed from the dict while we awaiting the resampling, which would | ||
| # cause the results to be out of sync. | ||
| resampler_sources = list(self._resamplers) | ||
| results = await asyncio.gather( | ||
| *[r.resample(window_end) for r in self._resamplers.values()], | ||
| return_exceptions=True, | ||
| ) | ||
|
|
||
| exceptions = { | ||
| source: result | ||
| for source, result in zip(resampler_sources, results) | ||
| # CancelledError inherits from BaseException, but we don't want | ||
| # to catch *all* BaseExceptions here. | ||
| if isinstance(result, (Exception, asyncio.CancelledError)) | ||
| } | ||
| if exceptions: | ||
| raise ResamplingError(exceptions) | ||
|
|
||
| def _calculate_window_end(self) -> tuple[datetime, timedelta]: | ||
| """Calculate the end of the current resampling window. | ||
|
|
||
|
|
@@ -528,6 +537,9 @@ def __init__( | |
| self._helper: _ResamplingHelper = helper | ||
| self._source: Source = source | ||
| self._sink: Sink = sink | ||
| self._sample_callback: Callable[[Sample[Quantity]], Awaitable[None]] | None = ( | ||
| None | ||
| ) | ||
| self._receiving_task: asyncio.Task[None] = asyncio.create_task( | ||
| self._receive_samples() | ||
| ) | ||
|
|
@@ -545,6 +557,22 @@ async def stop(self) -> None: | |
| """Cancel the receiving task.""" | ||
| await cancel_and_await(self._receiving_task) | ||
|
|
||
| def register_sample_callback( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me, register would suggest that multiple callbacks can exist. How about set instead? |
||
| self, | ||
| callback: Callable[[Sample[Quantity]], Awaitable[None]] | None, | ||
| ) -> None: | ||
| """Register a callback to be invoked when a sample arrives. | ||
|
|
||
| The callback is called asynchronously each time a sample is received | ||
| from the source. This allows consumers (like EventResampler) to be | ||
| notified of incoming samples without polling internal buffers. | ||
|
|
||
| Args: | ||
| callback: An async function to call when a sample arrives. | ||
| If `None`, no callback will be called on new samples. | ||
| """ | ||
| self._sample_callback = callback | ||
|
|
||
| async def _receive_samples(self) -> None: | ||
| """Pass received samples to the helper. | ||
|
|
||
|
|
@@ -555,6 +583,9 @@ async def _receive_samples(self) -> None: | |
| if sample.value is not None and not sample.value.isnan(): | ||
| self._helper.add_sample((sample.timestamp, sample.value.base_value)) | ||
|
|
||
| if self._sample_callback: | ||
| await self._sample_callback(sample) | ||
|
|
||
| # We need the noqa because pydoclint can't figure out that `recv_exception` is an | ||
| # `Exception` instance. | ||
| async def resample(self, timestamp: datetime) -> None: # noqa: DOC503 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._timeris not used at all, right? I think it would be cleaner then to extract a common base class from the existingResamplerandEventResampler.