From da532eec1a832d1f65cd4406c0e51f62e044021f Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 12:49:26 +0000 Subject: [PATCH 1/8] Add start_time and end_time coords to job result DataArrays Job.get() now adds start_time and end_time as 0-D coordinates (unit: ns) to all DataArrays in the result DataGroup. These coordinates provide temporal provenance for each output, enabling lag calculation in the dashboard (lag = current_time - end_time). Addresses issue #444 and provides foundation for #634. Prompt: We need to think about how we can address #444 and also take into account #634. --- src/ess/livedata/core/job.py | 26 ++++++++++++++++ tests/core/job_test.py | 60 ++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/src/ess/livedata/core/job.py b/src/ess/livedata/core/job.py index cf116517d..6054d32f2 100644 --- a/src/ess/livedata/core/job.py +++ b/src/ess/livedata/core/job.py @@ -99,6 +99,31 @@ class JobState(str, Enum): warning = "warning" +def _add_time_coords( + data: sc.DataGroup, start_time: int | None, end_time: int | None +) -> sc.DataGroup: + """ + Add start_time and end_time as 0-D coordinates to all DataArrays in a DataGroup. + + These coordinates provide temporal provenance for each output, enabling lag + calculation in the dashboard (lag = current_time - end_time). + """ + if start_time is None or end_time is None: + return data + start_coord = sc.scalar(start_time, unit='ns') + end_coord = sc.scalar(end_time, unit='ns') + return sc.DataGroup( + { + key: ( + val.assign_coords(start_time=start_coord, end_time=end_coord) + if isinstance(val, sc.DataArray) + else val + ) + for key, val in data.items() + } + ) + + class Job: def __init__( self, @@ -206,6 +231,7 @@ def get(self) -> JobResult: data = sc.DataGroup( {str(key): val for key, val in self._processor.finalize().items()} ) + data = _add_time_coords(data, self.start_time, self.end_time) return JobResult( job_id=self._job_id, workflow_id=self._workflow_id, diff --git a/tests/core/job_test.py b/tests/core/job_test.py index d5056f7fa..ec06a9cb8 100644 --- a/tests/core/job_test.py +++ b/tests/core/job_test.py @@ -315,6 +315,66 @@ def test_get_returns_job_result(self, sample_job, sample_job_data): assert isinstance(result.data, sc.DataGroup) assert result.error_message is None + def test_get_adds_time_coords_to_data_arrays( + self, fake_processor, sample_workflow_id + ): + """Test that get() adds start_time and end_time coords to DataArrays.""" + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return a DataArray + fake_processor.data = { + "output": sc.DataArray(data=sc.array(dims=["x"], values=[1, 2, 3])) + } + + # Add data to set job times + data = JobData( + start_time=1000, + end_time=2000, + primary_data={"test_source": sc.scalar(42.0)}, + aux_data={}, + ) + job.add(data) + + result = job.get() + output = result.data["output"] + + assert "start_time" in output.coords + assert "end_time" in output.coords + assert output.coords["start_time"].value == 1000 + assert output.coords["end_time"].value == 2000 + assert output.coords["start_time"].unit == "ns" + assert output.coords["end_time"].unit == "ns" + + def test_get_does_not_add_time_coords_when_no_data_added( + self, fake_processor, sample_workflow_id + ): + """Test that get() does not add time coords when job has no timing info.""" + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return a DataArray + fake_processor.data = { + "output": sc.DataArray(data=sc.array(dims=["x"], values=[1, 2, 3])) + } + + # Don't add any data, so start_time and end_time remain None + result = job.get() + output = result.data["output"] + + assert "start_time" not in output.coords + assert "end_time" not in output.coords + def test_get_calls_processor_finalize(self, sample_job, fake_processor): """Test that get() calls processor.finalize().""" sample_job.get() From 56975f4d23ef8c629e1130362588c7f050d311c1 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 13:04:20 +0000 Subject: [PATCH 2/8] Add lag indicator to plot titles Display data lag (current_time - end_time) in HoloViews plot titles. The lag shows how old the displayed data is, updating each time the plot refreshes with new data. This is a first implementation to evaluate the UX. The lag appears as "Lag: X.Xs" in the plot title when the data has an end_time coordinate. Addresses issue #634. --- src/ess/livedata/dashboard/plots.py | 67 +++++++++++++++- tests/dashboard/plots_test.py | 115 ++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+), 4 deletions(-) diff --git a/src/ess/livedata/dashboard/plots.py b/src/ess/livedata/dashboard/plots.py index fbaac9856..14146ba1f 100644 --- a/src/ess/livedata/dashboard/plots.py +++ b/src/ess/livedata/dashboard/plots.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) """This file contains utilities for creating plots in the dashboard.""" +import time from abc import ABC, abstractmethod from typing import Any, cast @@ -28,6 +29,56 @@ from .scipp_to_holoviews import to_holoviews +def _format_time_ns(ns: int) -> str: + """Format nanoseconds since epoch as HH:MM:SS in local time.""" + from datetime import UTC, datetime + + dt = datetime.fromtimestamp(ns / 1e9, tz=UTC).astimezone() + return dt.strftime('%H:%M:%S') + + +def _compute_time_info(data: dict[str, sc.DataArray]) -> str | None: + """ + Compute time interval and lag from start_time/end_time coordinates. + + Returns a formatted string like "12:34:56 - 12:35:02 (Lag: 2.3s)" or None + if no timing coordinates are found. Uses the earliest start_time and + latest end_time across all DataArrays to show the full data range. + Lag is computed from the earliest end_time (oldest data) to show worst-case + staleness. + """ + now_ns = time.time_ns() + min_start: int | None = None + min_end: int | None = None + max_end: int | None = None + + for da in data.values(): + if 'start_time' in da.coords: + start_ns = da.coords['start_time'].value + if min_start is None or start_ns < min_start: + min_start = start_ns + if 'end_time' in da.coords: + end_ns = da.coords['end_time'].value + if min_end is None or end_ns < min_end: + min_end = end_ns + if max_end is None or end_ns > max_end: + max_end = end_ns + + if min_end is None: + return None + + # Use min_end for lag (oldest data = maximum lag) + lag_s = (now_ns - min_end) / 1e9 + + if min_start is not None and max_end is not None: + start_str = _format_time_ns(min_start) + end_str = _format_time_ns(max_end) + return f'{start_str} - {end_str} (Lag: {lag_s:.1f}s)' + else: + end_str = _format_time_ns(min_end) + return f'{end_str} (Lag: {lag_s:.1f}s)' + + class Plotter(ABC): """ Base class for plots that support autoscaling. @@ -245,10 +296,18 @@ def __call__( plots = [self._apply_generic_options(p) for p in plots] if self.layout_params.combine_mode == 'overlay': - return hv.Overlay(plots).opts(shared_axes=True) - if len(plots) == 1: - return plots[0] - return hv.Layout(plots).cols(self.layout_params.layout_columns) + result = hv.Overlay(plots).opts(shared_axes=True) + elif len(plots) == 1: + result = plots[0] + else: + result = hv.Layout(plots).cols(self.layout_params.layout_columns) + + # Add time interval and lag indicator as plot title + time_info = _compute_time_info(data) + if time_info is not None: + result = result.opts(title=time_info, fontsize={'title': '10pt'}) + + return result def _apply_generic_options(self, plot_element: hv.Element) -> hv.Element: """Apply generic options like aspect ratio to a plot element.""" diff --git a/tests/dashboard/plots_test.py b/tests/dashboard/plots_test.py index 94d1f816d..8f5f68b08 100644 --- a/tests/dashboard/plots_test.py +++ b/tests/dashboard/plots_test.py @@ -1343,3 +1343,118 @@ def test_free_aspect_applies_responsive_only( opts = hv.Store.lookup_options('bokeh', curve, 'plot').kwargs assert opts.get('responsive') is True assert 'aspect' not in opts or opts.get('aspect') is None + + +class TestLagIndicator: + """Tests for lag indicator functionality in plotters.""" + + @pytest.fixture + def data_key(self): + """Create a test ResultKey.""" + workflow_id = WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + job_id = JobId(source_name='test_source', job_number=uuid.uuid4()) + return ResultKey( + workflow_id=workflow_id, job_id=job_id, output_name='test_result' + ) + + def test_time_info_shown_when_coords_present(self, data_key): + """Test that time interval and lag are shown in title.""" + import time + + now_ns = time.time_ns() + # Create data with start_time 2 seconds ago and end_time 1 second ago + start_time_ns = now_ns - int(2e9) + end_time_ns = now_ns - int(1e9) + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(start_time_ns, unit='ns'), + 'end_time': sc.scalar(end_time_ns, unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + # Check that title contains time range and lag + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + assert 'title' in opts + title = opts['title'] + # Should contain time range separator and lag + assert ' - ' in title # hyphen between times + assert 'Lag:' in title + # Lag should be approximately 1 second + assert '1.' in title or '2.' in title + + def test_no_lag_title_when_end_time_absent(self, data_key): + """Test that no lag title is added when end_time coord is absent.""" + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0])}, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + # Check that no title is set (or title doesn't contain Lag) + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + title = opts.get('title', '') + assert 'Lag:' not in title + + def test_lag_uses_maximum_across_multiple_sources(self): + """Test that lag shows the maximum (oldest data) when multiple sources.""" + import time + + workflow_id = WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + + now_ns = time.time_ns() + # Source 1: data from 2s to 1s ago + data_key1 = ResultKey( + workflow_id=workflow_id, + job_id=JobId(source_name='source1', job_number=uuid.uuid4()), + output_name='result', + ) + data1 = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(now_ns - int(2e9), unit='ns'), + 'end_time': sc.scalar(now_ns - int(1e9), unit='ns'), + }, + ) + + # Source 2: data from 6s to 5s ago (older, should determine the lag) + data_key2 = ResultKey( + workflow_id=workflow_id, + job_id=JobId(source_name='source2', job_number=uuid.uuid4()), + output_name='result', + ) + data2 = sc.DataArray( + data=sc.array(dims=['x'], values=[4.0, 5.0, 6.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(now_ns - int(6e9), unit='ns'), + 'end_time': sc.scalar(now_ns - int(5e9), unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key1: data1, data_key2: data2}) + + # Check that lag is approximately 5 seconds (the older data) + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + assert 'title' in opts + assert 'Lag:' in opts['title'] + # Should show ~5 seconds, not ~1 second (using oldest end_time) + assert '5.' in opts['title'] or '6.' in opts['title'] From a5c0648a9a8d2e67bfb9d881240e65556d4838d3 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 13:36:57 +0000 Subject: [PATCH 3/8] Add per-output time coords for delta outputs in DetectorView Delta outputs like 'current', 'roi_spectra_current', and counts now have their own start_time and end_time coords representing the period since the last finalize, not the entire job duration. This fixes misleading time display in the dashboard for these outputs. Changes: - _add_time_coords() now skips DataArrays with existing time coords - DetectorView tracks _current_end_time alongside _current_start_time - All delta outputs get start_time/end_time coords set by the workflow Prompt: This branch add a time info display by adding and using start_time and end_time. This works. However, there are workflows such as the detector views that produce a cumulative and current output... but the start_time is set to the same for both, misleading the user as to what period was used to obtain the "current" (delta since last update) counts. Please think about how to solve this. Co-Authored-By: Claude Opus 4.5 --- src/ess/livedata/core/job.py | 17 ++- src/ess/livedata/handlers/detector_view.py | 41 +++++-- tests/core/job_test.py | 50 ++++++++ tests/handlers/detector_view_test.py | 129 +++++++++++++++++++++ 4 files changed, 224 insertions(+), 13 deletions(-) diff --git a/src/ess/livedata/core/job.py b/src/ess/livedata/core/job.py index 6054d32f2..583a538ef 100644 --- a/src/ess/livedata/core/job.py +++ b/src/ess/livedata/core/job.py @@ -107,18 +107,25 @@ def _add_time_coords( These coordinates provide temporal provenance for each output, enabling lag calculation in the dashboard (lag = current_time - end_time). + + DataArrays that already have start_time or end_time coordinates are skipped. + This allows workflows to set their own time coords for outputs that represent + different time ranges (e.g., "current" outputs that only cover the period + since the last finalize, not the entire job duration). """ if start_time is None or end_time is None: return data start_coord = sc.scalar(start_time, unit='ns') end_coord = sc.scalar(end_time, unit='ns') + + def maybe_add_coords(val: sc.DataArray) -> sc.DataArray: + if 'start_time' in val.coords or 'end_time' in val.coords: + return val + return val.assign_coords(start_time=start_coord, end_time=end_coord) + return sc.DataGroup( { - key: ( - val.assign_coords(start_time=start_coord, end_time=end_coord) - if isinstance(val, sc.DataArray) - else val - ) + key: (maybe_add_coords(val) if isinstance(val, sc.DataArray) else val) for key, val in data.items() } ) diff --git a/src/ess/livedata/handlers/detector_view.py b/src/ess/livedata/handlers/detector_view.py index 89633d53a..57514e3f4 100644 --- a/src/ess/livedata/handlers/detector_view.py +++ b/src/ess/livedata/handlers/detector_view.py @@ -66,6 +66,7 @@ def __init__( self._roi_mapper = get_roi_mapper() self._initial_readback_sent = False self._current_start_time: int | None = None + self._current_end_time: int | None = None # Ratemeter: track event counts per finalize period self._counts_total: int = 0 @@ -116,9 +117,10 @@ def accumulate( "DetectorViewProcessor expects exactly one detector data item." ) - # Track start time of first detector data since last finalize + # Track time range of detector data since last finalize if self._current_start_time is None: self._current_start_time = start_time + self._current_end_time = end_time raw = next(iter(detector_data.values())) filtered = self.apply_toa_range(raw) @@ -136,6 +138,13 @@ def finalize(self) -> dict[str, sc.DataArray]: "finalize called without any detector data accumulated via accumulate" ) + # Create time coords for delta outputs. The start_time and end_time coords + # represent the time range of the current (delta) output, which is the + # period since the last finalize. This differs from the job-level times + # which cover the entire job duration. + start_time_coord = sc.scalar(self._current_start_time, unit='ns') + end_time_coord = sc.scalar(self._current_end_time, unit='ns') + cumulative = self._view.cumulative.copy() # This is a hack to get the current counts. Should be updated once # ess.reduce.live.raw.RollingDetectorView has been modified to support this. @@ -144,10 +153,10 @@ def finalize(self) -> dict[str, sc.DataArray]: current = current - self._previous self._previous = cumulative - # Add time coord to current result - time_coord = sc.scalar(self._current_start_time, unit='ns') - current = current.assign_coords(time=time_coord) - self._current_start_time = None + # The 'time' coord is kept for backward compatibility + current = current.assign_coords( + time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord + ) result = sc.DataGroup(cumulative=cumulative, current=current) view_result = dict(result * self._inv_weights if self._use_weights else result) @@ -171,7 +180,10 @@ def finalize(self) -> dict[str, sc.DataArray]: roi_result = { 'roi_spectra_current': roi_current.assign_coords( - roi=roi_coord, time=time_coord + roi=roi_coord, + time=start_time_coord, + start_time=start_time_coord, + end_time=end_time_coord, ), 'roi_spectra_cumulative': roi_cumulative.assign_coords(roi=roi_coord), } @@ -215,22 +227,35 @@ def finalize(self) -> dict[str, sc.DataArray]: counts_result = { 'counts_total': sc.DataArray( sc.scalar(self._counts_total, unit='counts'), - coords={'time': time_coord}, + coords={ + 'time': start_time_coord, + 'start_time': start_time_coord, + 'end_time': end_time_coord, + }, ), 'counts_in_toa_range': sc.DataArray( sc.scalar(self._counts_in_toa_range, unit='counts'), - coords={'time': time_coord}, + coords={ + 'time': start_time_coord, + 'start_time': start_time_coord, + 'end_time': end_time_coord, + }, ), } self._counts_total = 0 self._counts_in_toa_range = 0 + # Reset time tracking for next period + self._current_start_time = None + self._current_end_time = None + return {**view_result, **roi_result, **counts_result} def clear(self) -> None: self._view.clear_counts() self._previous = None self._current_start_time = None + self._current_end_time = None for roi_state in self._rois.values(): roi_state.clear() self._counts_total = 0 diff --git a/tests/core/job_test.py b/tests/core/job_test.py index ec06a9cb8..6a41b0efa 100644 --- a/tests/core/job_test.py +++ b/tests/core/job_test.py @@ -351,6 +351,56 @@ def test_get_adds_time_coords_to_data_arrays( assert output.coords["start_time"].unit == "ns" assert output.coords["end_time"].unit == "ns" + def test_get_preserves_existing_time_coords_on_data_arrays( + self, fake_processor, sample_workflow_id + ): + """Test that get() skips DataArrays that already have time coords. + + This allows workflows to set their own time coords for outputs that + represent different time ranges (e.g., delta outputs that only cover + the period since the last finalize, not the entire job duration). + """ + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return DataArrays: one with existing time coords, + # one without + workflow_start = sc.scalar(500, unit='ns') + workflow_end = sc.scalar(1500, unit='ns') + fake_processor.data = { + "delta_output": sc.DataArray( + data=sc.scalar(1.0), + coords={'start_time': workflow_start, 'end_time': workflow_end}, + ), + "cumulative_output": sc.DataArray(data=sc.scalar(2.0)), + } + + # Add data to set job times (different from workflow times) + data = JobData( + start_time=1000, + end_time=2000, + primary_data={"test_source": sc.scalar(42.0)}, + aux_data={}, + ) + job.add(data) + + result = job.get() + + # Delta output should preserve workflow-set times + delta = result.data["delta_output"] + assert delta.coords["start_time"].value == 500 + assert delta.coords["end_time"].value == 1500 + + # Cumulative output should get job-level times + cumulative = result.data["cumulative_output"] + assert cumulative.coords["start_time"].value == 1000 + assert cumulative.coords["end_time"].value == 2000 + def test_get_does_not_add_time_coords_when_no_data_added( self, fake_processor, sample_workflow_id ): diff --git a/tests/handlers/detector_view_test.py b/tests/handlers/detector_view_test.py index 4a5184ea8..63ad8ad92 100644 --- a/tests/handlers/detector_view_test.py +++ b/tests/handlers/detector_view_test.py @@ -251,6 +251,69 @@ def test_time_coord_tracks_first_accumulate( result2 = view.finalize() assert result2['current'].coords['time'].value == 5000 + def test_current_has_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that 'current' result has start_time and end_time coords. + + Delta outputs like 'current' need their own time bounds that represent + the period since the last finalize, not the entire job duration. + """ + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # Accumulate with specific time range + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result = view.finalize() + + # Verify start_time and end_time coords are present on current + assert 'start_time' in result['current'].coords + assert 'end_time' in result['current'].coords + assert result['current'].coords['start_time'].value == 1000 + assert result['current'].coords['end_time'].value == 2000 + assert result['current'].coords['start_time'].unit == 'ns' + assert result['current'].coords['end_time'].unit == 'ns' + + # cumulative should not have start_time or end_time coords + # (they will be added by Job.get() with job-level times) + assert 'start_time' not in result['cumulative'].coords + assert 'end_time' not in result['cumulative'].coords + + def test_delta_outputs_track_time_since_last_finalize( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that delta outputs track time since last finalize, not job start. + + This is critical for showing correct time bounds when the dashboard + displays the period used to compute delta outputs like 'current'. + """ + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # First period: accumulate and finalize + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result1 = view.finalize() + assert result1['current'].coords['start_time'].value == 1000 + assert result1['current'].coords['end_time'].value == 2000 + + # Second period: new time range + view.accumulate( + {'detector': sample_detector_events}, start_time=3000, end_time=4000 + ) + result2 = view.finalize() + + # Delta output should reflect second period's time range, NOT job start + assert result2['current'].coords['start_time'].value == 3000 + assert result2['current'].coords['end_time'].value == 4000 + def test_finalize_without_accumulate_raises( self, mock_rolling_view: RollingDetectorView ) -> None: @@ -460,6 +523,49 @@ def test_roi_current_has_time_coord( # ROI cumulative should not have time coord assert 'time' not in result['roi_spectra_cumulative'].coords + def test_roi_current_has_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + standard_roi: RectangleROI, + standard_toa_edges: TOAEdges, + ) -> None: + """Test that ROI current results have start_time and end_time coords.""" + params = DetectorViewParams(toa_edges=standard_toa_edges) + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # Configure ROI + view.accumulate( + roi_to_accumulate_data(standard_roi), start_time=1000, end_time=2000 + ) + + # Accumulate detector events + view.accumulate( + {'detector': sample_detector_events}, start_time=2500, end_time=3000 + ) + + result = view.finalize() + + # Verify start_time and end_time coords on ROI current + assert 'start_time' in result['roi_spectra_current'].coords + assert 'end_time' in result['roi_spectra_current'].coords + assert result['roi_spectra_current'].coords['start_time'].value == 2500 + assert result['roi_spectra_current'].coords['end_time'].value == 3000 + + # Verify they match detector view current time coords + assert ( + result['roi_spectra_current'].coords['start_time'] + == result['current'].coords['start_time'] + ) + assert ( + result['roi_spectra_current'].coords['end_time'] + == result['current'].coords['end_time'] + ) + + # ROI cumulative should not have start_time/end_time coords + assert 'start_time' not in result['roi_spectra_cumulative'].coords + assert 'end_time' not in result['roi_spectra_cumulative'].coords + def test_roi_cumulative_accumulation( self, mock_rolling_view: RollingDetectorView, @@ -1192,6 +1298,29 @@ def test_counts_output_in_finalize( assert result['counts_total'].unit == 'counts' assert result['counts_in_toa_range'].unit == 'counts' + def test_counts_have_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that counts outputs have start_time and end_time coords.""" + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result = view.finalize() + + # Verify start_time and end_time coords on counts outputs + for key in ['counts_total', 'counts_in_toa_range']: + assert 'start_time' in result[key].coords, f"Missing start_time on {key}" + assert 'end_time' in result[key].coords, f"Missing end_time on {key}" + assert result[key].coords['start_time'].value == 1000 + assert result[key].coords['end_time'].value == 2000 + assert result[key].coords['start_time'].unit == 'ns' + assert result[key].coords['end_time'].unit == 'ns' + def test_counts_match_total_events_without_toa_filter( self, mock_rolling_view: RollingDetectorView, From 9dd02629ddb8e9068807de2efdcd5a605eed6bb3 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 13:38:58 +0000 Subject: [PATCH 4/8] Display timestamps with 0.1s precision in plot titles The time interval display in plot titles now shows tenths of seconds (e.g., "12:34:56.7" instead of "12:34:56") for better precision when monitoring fast-updating data. Prompt: Consider this branch, in plots.py we should display timestamps to 0.1s precision Co-Authored-By: Claude Opus 4.5 --- src/ess/livedata/dashboard/plots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ess/livedata/dashboard/plots.py b/src/ess/livedata/dashboard/plots.py index 14146ba1f..27579175f 100644 --- a/src/ess/livedata/dashboard/plots.py +++ b/src/ess/livedata/dashboard/plots.py @@ -30,11 +30,11 @@ def _format_time_ns(ns: int) -> str: - """Format nanoseconds since epoch as HH:MM:SS in local time.""" + """Format nanoseconds since epoch as HH:MM:SS.s in local time (0.1s precision).""" from datetime import UTC, datetime dt = datetime.fromtimestamp(ns / 1e9, tz=UTC).astimezone() - return dt.strftime('%H:%M:%S') + return f"{dt.strftime('%H:%M:%S')}.{dt.microsecond // 100000}" def _compute_time_info(data: dict[str, sc.DataArray]) -> str | None: From db19856cdbbb547a10d06ca5488ca83a9cb2b297 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 14:19:29 +0000 Subject: [PATCH 5/8] Add per-output time coords for delta outputs in MonitorStreamProcessor Apply the same pattern as DetectorView: delta outputs (current, counts_total, counts_in_toa_range) now have start_time and end_time coords representing the period since the last finalize. Cumulative outputs do not get these coords, allowing Job.get() to add job-level times. Prompt: implement for monitors and commit --- .../livedata/handlers/monitor_data_handler.py | 32 +++++++--- tests/handlers/monitor_data_handler_test.py | 63 ++++++++++++++++++- 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/src/ess/livedata/handlers/monitor_data_handler.py b/src/ess/livedata/handlers/monitor_data_handler.py index 62347d80e..2217d10ed 100644 --- a/src/ess/livedata/handlers/monitor_data_handler.py +++ b/src/ess/livedata/handlers/monitor_data_handler.py @@ -26,6 +26,7 @@ def __init__( self._cumulative: sc.DataArray | None = None self._current: sc.DataArray | None = None self._current_start_time: int | None = None + self._current_end_time: int | None = None # Ratemeter configuration - convert range to edges unit once dim = edges.dim if toa_range is not None: @@ -57,9 +58,10 @@ def accumulate( if len(data) != 1: raise ValueError("MonitorStreamProcessor expects exactly one data item.") - # Track start time of first data since last finalize + # Track time range of data since last finalize if self._current_start_time is None: self._current_start_time = start_time + self._current_end_time = end_time raw = next(iter(data.values())) # Note: In theory we should consider rebinning/histogramming only in finalize(), @@ -104,14 +106,23 @@ def finalize(self) -> dict[Hashable, sc.DataArray]: self._cumulative += current self._current = sc.zeros_like(current) - # Add time coord to current result - time_coord = sc.scalar(self._current_start_time, unit='ns') - current = current.assign_coords(time=time_coord) - self._current_start_time = None + # Create time coords for delta outputs. The start_time and end_time coords + # represent the time range of the current (delta) output, which is the + # period since the last finalize. This differs from the job-level times + # which cover the entire job duration. + start_time_coord = sc.scalar(self._current_start_time, unit='ns') + end_time_coord = sc.scalar(self._current_end_time, unit='ns') + + # The 'time' coord is kept for backward compatibility + current = current.assign_coords( + time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord + ) # Compute ratemeter counts (always output both) counts_total = current.sum() - counts_total.coords['time'] = time_coord + counts_total.coords['time'] = start_time_coord + counts_total.coords['start_time'] = start_time_coord + counts_total.coords['end_time'] = end_time_coord if self._toa_range is not None: low, high = self._toa_range @@ -120,9 +131,15 @@ def finalize(self) -> dict[Hashable, sc.DataArray]: else: # When TOA range not enabled, counts_in_toa_range equals total counts_in_toa_range = counts_total.copy() - counts_in_toa_range.coords['time'] = time_coord + counts_in_toa_range.coords['time'] = start_time_coord + counts_in_toa_range.coords['start_time'] = start_time_coord + counts_in_toa_range.coords['end_time'] = end_time_coord counts_in_toa_range.coords[self._edges.dim] = self._toa_range_edges + # Reset time tracking for next period + self._current_start_time = None + self._current_end_time = None + return { 'cumulative': self._cumulative, 'current': current, @@ -134,6 +151,7 @@ def clear(self) -> None: self._cumulative = None self._current = None self._current_start_time = None + self._current_end_time = None class MonitorHandlerFactory( diff --git a/tests/handlers/monitor_data_handler_test.py b/tests/handlers/monitor_data_handler_test.py index 237f7cd39..a38e34870 100644 --- a/tests/handlers/monitor_data_handler_test.py +++ b/tests/handlers/monitor_data_handler_test.py @@ -149,8 +149,11 @@ def test_finalize_first_time(self, processor): assert "cumulative" in result assert "current" in result - # Check cumulative data (excluding time coord which current has) - assert_identical(result["cumulative"], result["current"].drop_coords("time")) + # Check cumulative data (excluding time coords which current has) + current_without_time = result["current"].drop_coords( + ["time", "start_time", "end_time"] + ) + assert_identical(result["cumulative"], current_without_time) # Verify time coordinate is present assert "time" in result["current"].coords @@ -213,6 +216,62 @@ def test_time_coordinate_tracks_first_accumulate(self, processor): result2 = processor.finalize() assert result2["current"].coords["time"].value == 5000 + def test_current_has_start_end_time_coords(self, processor): + """Test that 'current' result has start_time and end_time coords. + + Delta outputs like 'current' need their own time bounds that represent + the period since the last finalize, not the entire job duration. + """ + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result = processor.finalize() + + # Verify start_time and end_time coords are present on current + assert "start_time" in result["current"].coords + assert "end_time" in result["current"].coords + assert result["current"].coords["start_time"].value == 1000 + assert result["current"].coords["end_time"].value == 2000 + assert result["current"].coords["start_time"].unit == "ns" + assert result["current"].coords["end_time"].unit == "ns" + + # cumulative should not have start_time or end_time coords + # (they will be added by Job.get() with job-level times) + assert "start_time" not in result["cumulative"].coords + assert "end_time" not in result["cumulative"].coords + + def test_delta_outputs_track_time_since_last_finalize(self, processor): + """Test that delta outputs track time since last finalize, not job start. + + This is critical for showing correct time bounds when the dashboard + displays the period used to compute delta outputs like 'current'. + """ + # First period: accumulate and finalize + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result1 = processor.finalize() + assert result1["current"].coords["start_time"].value == 1000 + assert result1["current"].coords["end_time"].value == 2000 + + # Second period: new time range + processor.accumulate({"det1": np.array([20e6])}, start_time=3000, end_time=4000) + result2 = processor.finalize() + + # Delta output should reflect second period's time range, NOT job start + assert result2["current"].coords["start_time"].value == 3000 + assert result2["current"].coords["end_time"].value == 4000 + + def test_counts_have_start_end_time_coords(self, processor): + """Test that counts outputs have start_time and end_time coords.""" + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result = processor.finalize() + + # Verify start_time and end_time coords on counts outputs + for key in ["counts_total", "counts_in_toa_range"]: + assert "start_time" in result[key].coords, f"Missing start_time on {key}" + assert "end_time" in result[key].coords, f"Missing end_time on {key}" + assert result[key].coords["start_time"].value == 1000 + assert result[key].coords["end_time"].value == 2000 + assert result[key].coords["start_time"].unit == "ns" + assert result[key].coords["end_time"].unit == "ns" + def test_clear(self, processor): """Test clear method resets processor state.""" processor.accumulate( From 6d3964c10770ceeb1217ca4bc686841947c05c8f Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 14:21:05 +0000 Subject: [PATCH 6/8] Remove misleading comment about 'time' coord The comment suggested 'time' was only kept for backwards compatibility, but it's an important coord used for timeseries plotting. Also includes import ordering fixes from ruff. --- src/ess/livedata/handlers/detector_view.py | 1 - src/ess/livedata/handlers/monitor_data_handler.py | 1 - 2 files changed, 2 deletions(-) diff --git a/src/ess/livedata/handlers/detector_view.py b/src/ess/livedata/handlers/detector_view.py index 57514e3f4..039097708 100644 --- a/src/ess/livedata/handlers/detector_view.py +++ b/src/ess/livedata/handlers/detector_view.py @@ -153,7 +153,6 @@ def finalize(self) -> dict[str, sc.DataArray]: current = current - self._previous self._previous = cumulative - # The 'time' coord is kept for backward compatibility current = current.assign_coords( time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord ) diff --git a/src/ess/livedata/handlers/monitor_data_handler.py b/src/ess/livedata/handlers/monitor_data_handler.py index 2217d10ed..344aa6b07 100644 --- a/src/ess/livedata/handlers/monitor_data_handler.py +++ b/src/ess/livedata/handlers/monitor_data_handler.py @@ -113,7 +113,6 @@ def finalize(self) -> dict[Hashable, sc.DataArray]: start_time_coord = sc.scalar(self._current_start_time, unit='ns') end_time_coord = sc.scalar(self._current_end_time, unit='ns') - # The 'time' coord is kept for backward compatibility current = current.assign_coords( time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord ) From 552c81a40af9573257386d82e5adb0155a8831d0 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 14:58:25 +0000 Subject: [PATCH 7/8] Store start_time/end_time per-item in TemporalBuffer for accurate plot titles Previously, TemporalBuffer stored start_time/end_time only from the first item in a "reference", causing plot titles to show stale time information that didn't reflect the actual data range being displayed. Now TemporalBuffer accumulates start_time/end_time as 1-D coordinates (one value per time slice), similar to how it handles the time coordinate. Extractors compute min/max of these 1-D coords to get accurate scalar bounds for plot titles. Changes: - TemporalBuffer: Add dedicated buffers for start_time and end_time coords - FullHistoryExtractor: Compute min/max of 1-D time coords - WindowAggregatingExtractor: Capture time bounds from windowed data before aggregation removes the time dimension Prompt: Please consider changes in this branch. We need to understand how we handle time-interval display in the plot titles (added in plots.py) in the case of (a) timeseries plots (extracting full history) and (b) regular plots with a configured 'window'. In both cases we need to ensure that the time info is factual. After you understood the problem, I'd like to implement tests that check for the behavior we desire; if they fail we can think about how to solve this. Co-Authored-By: Claude Opus 4.5 --- src/ess/livedata/dashboard/extractors.py | 47 +++- .../livedata/dashboard/temporal_buffers.py | 68 ++++- tests/dashboard/extractor_time_coords_test.py | 213 ++++++++++++++ tests/dashboard/time_info_test.py | 262 ++++++++++++++++++ 4 files changed, 580 insertions(+), 10 deletions(-) create mode 100644 tests/dashboard/extractor_time_coords_test.py create mode 100644 tests/dashboard/time_info_test.py diff --git a/src/ess/livedata/dashboard/extractors.py b/src/ess/livedata/dashboard/extractors.py index a35eebab7..cbba6aa67 100644 --- a/src/ess/livedata/dashboard/extractors.py +++ b/src/ess/livedata/dashboard/extractors.py @@ -11,6 +11,24 @@ from .plot_params import WindowAggregation +def _set_time_bounds_from_range(data: sc.DataArray) -> sc.DataArray: + """ + Set scalar start_time/end_time coords from min/max of 1-D coords. + + When data comes from a TemporalBuffer, start_time and end_time are 1-D + coordinates (one value per time slice). This function computes the overall + time range and sets scalar coords for use in plot titles. + + If the coords are already scalar or don't exist, returns data unchanged. + """ + result = data + if 'start_time' in data.coords and data.coords['start_time'].ndim == 1: + result = result.assign_coords(start_time=data.coords['start_time'].min()) + if 'end_time' in data.coords and data.coords['end_time'].ndim == 1: + result = result.assign_coords(end_time=data.coords['end_time'].max()) + return result + + class UpdateExtractor(ABC): """Extracts a specific view of buffered data.""" @@ -63,7 +81,10 @@ def get_required_timespan(self) -> float: def extract(self, data: sc.DataArray) -> Any: """Extract the latest value from the data, unwrapped.""" - return data[self._concat_dim, -1] if self._concat_dim in data.dims else data + if self._concat_dim not in data.dims: + return data + # Extract last slice - this also gets the last value from any 1-D coords + return data[self._concat_dim, -1] def _ensure_datetime_coord(data: sc.DataArray, dim: str = 'time') -> sc.DataArray: @@ -102,7 +123,8 @@ def get_required_timespan(self) -> float: def extract(self, data: sc.DataArray) -> Any: """Extract all data from the buffer, converting time to datetime64.""" - return _ensure_datetime_coord(data, self._concat_dim) + result = _ensure_datetime_coord(data, self._concat_dim) + return _set_time_bounds_from_range(result) class WindowAggregatingExtractor(UpdateExtractor): @@ -173,6 +195,19 @@ def extract(self, data: sc.DataArray) -> Any: # the cutoff shift above should handle that well enough. windowed_data = data[self._concat_dim, cutoff_time:] + # Capture time bounds before aggregation (which removes the time dimension) + time_bounds = {} + if 'start_time' in windowed_data.coords: + start_coord = windowed_data.coords['start_time'] + time_bounds['start_time'] = ( + start_coord.min() if start_coord.ndim == 1 else start_coord + ) + if 'end_time' in windowed_data.coords: + end_coord = windowed_data.coords['end_time'] + time_bounds['end_time'] = ( + end_coord.max() if end_coord.ndim == 1 else end_coord + ) + # Resolve and cache aggregator function on first call if self._aggregator is None: if self._aggregation == WindowAggregation.auto: @@ -193,4 +228,10 @@ def extract(self, data: sc.DataArray) -> Any: if self._aggregator is None: raise ValueError(f"Unknown aggregation method: {self._aggregation}") - return self._aggregator(windowed_data, self._concat_dim) + result = self._aggregator(windowed_data, self._concat_dim) + + # Restore time bounds as scalar coords on the aggregated result + if time_bounds: + result = result.assign_coords(**time_bounds) + + return result diff --git a/src/ess/livedata/dashboard/temporal_buffers.py b/src/ess/livedata/dashboard/temporal_buffers.py index 3da5967c6..dd9012706 100644 --- a/src/ess/livedata/dashboard/temporal_buffers.py +++ b/src/ess/livedata/dashboard/temporal_buffers.py @@ -291,10 +291,15 @@ class TemporalBuffer(BufferProtocol[sc.DataArray]): Validates that non-time coords and masks remain constant across all added data. """ + # Coordinates that should be accumulated per-item rather than stored as scalars + _ACCUMULATED_COORDS = ('time', 'start_time', 'end_time') + def __init__(self) -> None: """Initialize empty temporal buffer.""" self._data_buffer: VariableBuffer | None = None self._time_buffer: VariableBuffer | None = None + self._start_time_buffer: VariableBuffer | None = None + self._end_time_buffer: VariableBuffer | None = None self._reference: sc.DataArray | None = None self._max_memory: int | None = None self._required_timespan: float = 0.0 @@ -332,6 +337,12 @@ def add(self, data: sc.DataArray) -> None: if not self._time_buffer.append(data.coords['time']): raise RuntimeError("Time buffer append failed unexpectedly") + # Append start_time and end_time if present + if self._start_time_buffer is not None and 'start_time' in data.coords: + self._start_time_buffer.append(data.coords['start_time']) + if self._end_time_buffer is not None and 'end_time' in data.coords: + self._end_time_buffer.append(data.coords['end_time']) + def get(self) -> sc.DataArray | None: """Return the complete buffer.""" if self._data_buffer is None: @@ -344,6 +355,12 @@ def get(self) -> sc.DataArray | None: coords = {'time': time_coord} coords.update(self._reference.coords) + # Add accumulated start_time and end_time as 1-D coords + if self._start_time_buffer is not None: + coords['start_time'] = self._start_time_buffer.get() + if self._end_time_buffer is not None: + coords['end_time'] = self._end_time_buffer.get() + masks = dict(self._reference.masks) return sc.DataArray(data=data_var, coords=coords, masks=masks) @@ -352,6 +369,8 @@ def clear(self) -> None: """Clear all buffered data.""" self._data_buffer = None self._time_buffer = None + self._start_time_buffer = None + self._end_time_buffer = None self._reference = None def set_required_timespan(self, seconds: float) -> None: @@ -368,11 +387,16 @@ def set_max_memory(self, max_bytes: int) -> None: def _initialize_buffers(self, data: sc.DataArray) -> None: """Initialize buffers with first data, storing reference metadata.""" - # Store reference as slice at time=0 without time coord + # Store reference as slice at time=0 without accumulated coords if 'time' in data.dims: - self._reference = data['time', 0].drop_coords('time') + ref = data['time', 0] else: - self._reference = data.drop_coords('time') + ref = data + # Drop all coords that will be accumulated (not stored in reference) + for coord in self._ACCUMULATED_COORDS: + if coord in ref.coords: + ref = ref.drop_coords(coord) + self._reference = ref # Calculate max_capacity from memory limit if 'time' in data.dims: @@ -393,6 +417,25 @@ def _initialize_buffers(self, data: sc.DataArray) -> None: data=data.coords['time'], max_capacity=max_capacity, concat_dim='time' ) + # Create buffers for start_time and end_time if present + if 'start_time' in data.coords: + self._start_time_buffer = VariableBuffer( + data=data.coords['start_time'], + max_capacity=max_capacity, + concat_dim='time', + ) + else: + self._start_time_buffer = None + + if 'end_time' in data.coords: + self._end_time_buffer = VariableBuffer( + data=data.coords['end_time'], + max_capacity=max_capacity, + concat_dim='time', + ) + else: + self._end_time_buffer = None + def _trim_to_timespan(self, new_data: sc.DataArray) -> None: """Trim buffer to keep only data within required timespan.""" if self._required_timespan < 0: @@ -401,8 +444,7 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None: if self._required_timespan == 0.0: # Keep only the latest value - drop all existing data drop_count = self._data_buffer.size - self._data_buffer.drop(drop_count) - self._time_buffer.drop(drop_count) + self._drop_from_all_buffers(drop_count) return # Get latest time from new data @@ -425,9 +467,17 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None: # Find first True index drop_count = int(keep_mask.values.argmax()) - # Trim both buffers by same amount to keep them in sync + # Trim all buffers by same amount to keep them in sync + self._drop_from_all_buffers(drop_count) + + def _drop_from_all_buffers(self, drop_count: int) -> None: + """Drop data from all buffers to keep them in sync.""" self._data_buffer.drop(drop_count) self._time_buffer.drop(drop_count) + if self._start_time_buffer is not None: + self._start_time_buffer.drop(drop_count) + if self._end_time_buffer is not None: + self._end_time_buffer.drop(drop_count) def _metadata_matches(self, data: sc.DataArray) -> bool: """Check if incoming data's metadata matches stored reference metadata.""" @@ -438,6 +488,10 @@ def _metadata_matches(self, data: sc.DataArray) -> bool: new = data # Create template with reference data but incoming metadata - template = new.assign(self._reference.data).drop_coords('time') + # Drop all accumulated coords for comparison + template = new.assign(self._reference.data) + for coord in self._ACCUMULATED_COORDS: + if coord in template.coords: + template = template.drop_coords(coord) return sc.identical(self._reference, template) diff --git a/tests/dashboard/extractor_time_coords_test.py b/tests/dashboard/extractor_time_coords_test.py new file mode 100644 index 000000000..e10700494 --- /dev/null +++ b/tests/dashboard/extractor_time_coords_test.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Tests for time coordinate handling in extractors. + +Extractors should ensure that extracted data has correct start_time/end_time +coordinates reflecting the actual time range of the extracted data, not stale +coordinates from the buffer reference. +""" + +import time + +import scipp as sc + +from ess.livedata.dashboard.extractors import ( + FullHistoryExtractor, + LatestValueExtractor, + WindowAggregatingExtractor, +) + + +def _make_buffered_data( + num_points: int = 5, + time_span_seconds: float = 5.0, +) -> sc.DataArray: + """ + Create data simulating what comes from a TemporalBuffer. + + The data has: + - A 'time' dimension with actual timestamps (recent) + - 1-D start_time/end_time coords (one per time slice, as TemporalBuffer stores them) + """ + now_ns = time.time_ns() + + # Time values: spanning the last `time_span_seconds` + time_step_ns = int(time_span_seconds * 1e9 / num_points) + time_values = [ + now_ns - (num_points - 1 - i) * time_step_ns for i in range(num_points) + ] + + # Each frame has its own start_time/end_time (1-D coords) + # Assume each frame covers 1 second + frame_duration_ns = int(1e9) + start_time_values = time_values # start_time == time for each frame + end_time_values = [t + frame_duration_ns for t in time_values] + + return sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * num_points), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + # 1-D coords as TemporalBuffer produces + 'start_time': sc.array(dims=['time'], values=start_time_values, unit='ns'), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + +class TestFullHistoryExtractor: + """Tests for FullHistoryExtractor time coordinate handling.""" + + def test_sets_start_time_from_min_of_1d_coord(self): + """start_time should be min of the 1-D start_time coord.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # Result should have scalar start_time = min of input 1-D start_time + expected_start = data.coords['start_time'].values[0] # First value (min) + assert 'start_time' in result.coords + assert result.coords['start_time'].ndim == 0 # Scalar + assert result.coords['start_time'].value == expected_start + + def test_sets_end_time_from_max_of_1d_coord(self): + """end_time should be max of the 1-D end_time coord.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # Result should have scalar end_time = max of input 1-D end_time + expected_end = data.coords['end_time'].values[-1] # Last value (max) + assert 'end_time' in result.coords + assert result.coords['end_time'].ndim == 0 # Scalar + assert result.coords['end_time'].value == expected_end + + def test_end_time_reflects_actual_data_range(self): + """end_time should reflect the actual data, giving recent lag.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # end_time should be recent (within last few seconds) + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0-1s" + + def test_handles_datetime64_time_coord(self): + """Should work with datetime64 time coordinates and 1-D time bounds.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + time_var = sc.epoch(unit='ns') + sc.array( + dims=['time'], values=time_values, unit='ns' + ) + + # Create with 1-D start_time/end_time coords + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': time_var, + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 + assert result.coords['end_time'].ndim == 0 + + +class TestWindowAggregatingExtractor: + """Tests for WindowAggregatingExtractor time coordinate handling.""" + + def test_sets_time_coords_from_actual_window(self): + """Time coords should reflect the actual window that was aggregated.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=10, time_span_seconds=10.0) + # Window of 3 seconds should select roughly the last 3 data points + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + + result = extractor.extract(data) + + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 # Scalar + assert result.coords['end_time'].ndim == 0 # Scalar + + # end_time should be recent (within last second or so) + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0s" + + # start_time should be roughly 3 seconds before end_time + # Note: with 10 points over 10s (1s apart), a 3s window captures ~3 points, + # so the span from first to last end_time is ~2-3s + start_time_ns = result.coords['start_time'].value + window_duration = (end_time_ns - start_time_ns) / 1e9 + assert ( + 1.5 <= window_duration <= 5.0 + ), f"Window duration {window_duration}s should be ~2-3s" + + def test_end_time_reflects_actual_window(self): + """end_time should reflect the actual window, giving recent lag.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=10, time_span_seconds=10.0) + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + + result = extractor.extract(data) + + # end_time should be recent + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0-1s" + + +class TestLatestValueExtractor: + """Tests for LatestValueExtractor time coordinate handling.""" + + def test_sets_time_coords_from_latest_slice(self): + """Time coords should be extracted from the latest slice's 1-D coords.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = LatestValueExtractor() + + result = extractor.extract(data) + + # Should have scalar coords from the last slice + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 + assert result.coords['end_time'].ndim == 0 + + # end_time should be recent + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0s" + + def test_extracts_last_value_from_1d_coords(self): + """Should extract the last value from 1-D start_time/end_time coords.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = LatestValueExtractor() + + result = extractor.extract(data) + + # Verify coords are from the last slice + expected_start = data.coords['start_time'].values[-1] + expected_end = data.coords['end_time'].values[-1] + assert result.coords['start_time'].value == expected_start + assert result.coords['end_time'].value == expected_end diff --git a/tests/dashboard/time_info_test.py b/tests/dashboard/time_info_test.py new file mode 100644 index 000000000..c5a878e7e --- /dev/null +++ b/tests/dashboard/time_info_test.py @@ -0,0 +1,262 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Tests for time-interval display in plot titles. + +These tests verify that the time information shown in plot titles is factual, +i.e., it reflects the actual time range of the data being displayed. + +The tests simulate the end-to-end flow: buffered data → extractor → plotter. +Extractors are responsible for setting correct start_time/end_time coords +based on the actual data range, not stale coords from buffer references. +""" + +import time +import uuid + +import holoviews as hv +import pytest +import scipp as sc + +from ess.livedata.config.workflow_spec import JobId, ResultKey, WorkflowId +from ess.livedata.dashboard import plots +from ess.livedata.dashboard.extractors import ( + FullHistoryExtractor, + WindowAggregatingExtractor, +) +from ess.livedata.dashboard.plot_params import PlotParams1d + +hv.extension('bokeh') + + +@pytest.fixture +def workflow_id(): + return WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + + +@pytest.fixture +def data_key(workflow_id): + job_id = JobId(source_name='test_source', job_number=uuid.uuid4()) + return ResultKey(workflow_id=workflow_id, job_id=job_id, output_name='test_result') + + +def _extract_title(result) -> str: + """Extract title from a HoloViews plot result.""" + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + return opts.get('title', '') + + +def _parse_time_range_from_title(title: str) -> tuple[str, str] | None: + """ + Parse start and end time from a title string. + + Expected format: "HH:MM:SS.s - HH:MM:SS.s (Lag: X.Xs)" + Returns (start_time_str, end_time_str) or None if not found. + """ + if ' - ' not in title or 'Lag:' not in title: + return None + time_part = title.split(' (Lag:')[0] + parts = time_part.split(' - ') + if len(parts) != 2: + return None + return parts[0].strip(), parts[1].strip() + + +def _extract_lag_seconds(title: str) -> float: + """Extract lag in seconds from title string.""" + lag_match = title.split('Lag:')[1].strip().rstrip('s)') + return float(lag_match) + + +class TestTimeseriesTimeInfo: + """ + Tests for time info in timeseries plots (full history extraction). + + These tests simulate the end-to-end flow through FullHistoryExtractor, + which computes scalar start_time/end_time from min/max of 1-D coords. + """ + + def test_time_info_reflects_actual_time_dimension_range(self, data_key): + """Time info should reflect the actual time range of the data.""" + now_ns = time.time_ns() + + # Buffered data with 1-D start_time/end_time coords (as TemporalBuffer produces) + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction (this is what happens in the real data flow) + extractor = FullHistoryExtractor() + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + + # Lag should reflect actual data (last end_time ~0s ago) + assert lag_seconds < 5.0, f"Lag {lag_seconds}s should be ~0s" + + def test_time_info_with_datetime64_time_coord(self, data_key): + """Time info should work with datetime64 time coordinates.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + time_var = sc.epoch(unit='ns') + sc.array( + dims=['time'], values=time_values, unit='ns' + ) + + # Create with 1-D start_time/end_time coords + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': time_var, + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction + extractor = FullHistoryExtractor() + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + # Extractor computes min/max of start_time/end_time + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + assert lag_seconds < 5.0 + + +class TestWindowedTimeInfo: + """ + Tests for time info in windowed plots (aggregation over time window). + + These tests simulate the end-to-end flow through WindowAggregatingExtractor, + which computes min/max of start_time/end_time from the windowed 1-D coords. + """ + + def test_aggregated_data_reflects_actual_window_time_range(self, data_key): + """Aggregated data should show the time range of the aggregation window.""" + now_ns = time.time_ns() + + # Buffered data with 1-D start_time/end_time coords (as TemporalBuffer produces) + time_values = [now_ns - int(i * 1e9) for i in range(10, 0, -1)] + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0, 3.0]] * 10), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction with 3-second window + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + + # Lag should reflect actual window end (~0s) + assert lag_seconds < 5.0, f"Lag {lag_seconds}s should be ~0s" + + +class TestTimeInfoBaseline: + """Tests verifying time info works when coords are accurate.""" + + def test_correct_lag_with_accurate_coords(self, data_key): + """Time info should be correct when coords accurately reflect the data.""" + now_ns = time.time_ns() + start_time = now_ns - int(2e9) + end_time = now_ns - int(1e9) + + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(start_time, unit='ns'), + 'end_time': sc.scalar(end_time, unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + assert 0.5 < lag_seconds < 3.0 + + +class TestTimeInfoEdgeCases: + """Tests for edge cases in time info handling.""" + + def test_no_time_info_without_time_coords_or_dimension(self, data_key): + """No time info should be shown when there are no time coordinates.""" + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0])}, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + title = _extract_title(result) + assert 'Lag:' not in title + + def test_time_dimension_without_coords_does_not_crash(self, data_key): + """Data with 'time' dimension but no scalar time coords should not crash.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + + data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) # Should not raise + assert isinstance(_extract_title(result), str) From c6825c16190d18c1b440ed482eea17383ef9bed1 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 8 Jan 2026 15:07:33 +0000 Subject: [PATCH 8/8] Refactor time bounds extraction to use shared helper in extractors FullHistoryExtractor and WindowAggregatingExtractor had duplicate logic for extracting scalar time bounds from 1-D coords. Replaced with a single _extract_time_bounds_as_scalars helper that both extractors now use. Prompt: Consider changes in this branch in extractor.py - why is the implementation different for the window extractor? Can we use the same helper somehow? Co-Authored-By: Claude Opus 4.5 --- src/ess/livedata/dashboard/extractors.py | 35 ++++++++---------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/src/ess/livedata/dashboard/extractors.py b/src/ess/livedata/dashboard/extractors.py index cbba6aa67..58c501adc 100644 --- a/src/ess/livedata/dashboard/extractors.py +++ b/src/ess/livedata/dashboard/extractors.py @@ -11,22 +11,21 @@ from .plot_params import WindowAggregation -def _set_time_bounds_from_range(data: sc.DataArray) -> sc.DataArray: +def _extract_time_bounds_as_scalars(data: sc.DataArray) -> dict[str, sc.Variable]: """ - Set scalar start_time/end_time coords from min/max of 1-D coords. + Extract start_time/end_time as scalar values from 1-D coords. When data comes from a TemporalBuffer, start_time and end_time are 1-D - coordinates (one value per time slice). This function computes the overall - time range and sets scalar coords for use in plot titles. + coordinates (one value per time slice). This function extracts the overall + time range (min of start_time, max of end_time) for use in plot titles. - If the coords are already scalar or don't exist, returns data unchanged. + Returns an empty dict if coords don't exist or are already scalar. """ - result = data - if 'start_time' in data.coords and data.coords['start_time'].ndim == 1: - result = result.assign_coords(start_time=data.coords['start_time'].min()) - if 'end_time' in data.coords and data.coords['end_time'].ndim == 1: - result = result.assign_coords(end_time=data.coords['end_time'].max()) - return result + bounds: dict[str, sc.Variable] = {} + for name, func in [('start_time', 'min'), ('end_time', 'max')]: + if name in data.coords and data.coords[name].ndim == 1: + bounds[name] = getattr(data.coords[name], func)() + return bounds class UpdateExtractor(ABC): @@ -124,7 +123,7 @@ def get_required_timespan(self) -> float: def extract(self, data: sc.DataArray) -> Any: """Extract all data from the buffer, converting time to datetime64.""" result = _ensure_datetime_coord(data, self._concat_dim) - return _set_time_bounds_from_range(result) + return result.assign_coords(**_extract_time_bounds_as_scalars(result)) class WindowAggregatingExtractor(UpdateExtractor): @@ -196,17 +195,7 @@ def extract(self, data: sc.DataArray) -> Any: windowed_data = data[self._concat_dim, cutoff_time:] # Capture time bounds before aggregation (which removes the time dimension) - time_bounds = {} - if 'start_time' in windowed_data.coords: - start_coord = windowed_data.coords['start_time'] - time_bounds['start_time'] = ( - start_coord.min() if start_coord.ndim == 1 else start_coord - ) - if 'end_time' in windowed_data.coords: - end_coord = windowed_data.coords['end_time'] - time_bounds['end_time'] = ( - end_coord.max() if end_coord.ndim == 1 else end_coord - ) + time_bounds = _extract_time_bounds_as_scalars(windowed_data) # Resolve and cache aggregator function on first call if self._aggregator is None: