diff --git a/src/ess/livedata/core/job.py b/src/ess/livedata/core/job.py index cf116517d..583a538ef 100644 --- a/src/ess/livedata/core/job.py +++ b/src/ess/livedata/core/job.py @@ -99,6 +99,38 @@ class JobState(str, Enum): warning = "warning" +def _add_time_coords( + data: sc.DataGroup, start_time: int | None, end_time: int | None +) -> sc.DataGroup: + """ + Add start_time and end_time as 0-D coordinates to all DataArrays in a DataGroup. + + These coordinates provide temporal provenance for each output, enabling lag + calculation in the dashboard (lag = current_time - end_time). + + DataArrays that already have start_time or end_time coordinates are skipped. + This allows workflows to set their own time coords for outputs that represent + different time ranges (e.g., "current" outputs that only cover the period + since the last finalize, not the entire job duration). + """ + if start_time is None or end_time is None: + return data + start_coord = sc.scalar(start_time, unit='ns') + end_coord = sc.scalar(end_time, unit='ns') + + def maybe_add_coords(val: sc.DataArray) -> sc.DataArray: + if 'start_time' in val.coords or 'end_time' in val.coords: + return val + return val.assign_coords(start_time=start_coord, end_time=end_coord) + + return sc.DataGroup( + { + key: (maybe_add_coords(val) if isinstance(val, sc.DataArray) else val) + for key, val in data.items() + } + ) + + class Job: def __init__( self, @@ -206,6 +238,7 @@ def get(self) -> JobResult: data = sc.DataGroup( {str(key): val for key, val in self._processor.finalize().items()} ) + data = _add_time_coords(data, self.start_time, self.end_time) return JobResult( job_id=self._job_id, workflow_id=self._workflow_id, diff --git a/src/ess/livedata/dashboard/extractors.py b/src/ess/livedata/dashboard/extractors.py index a35eebab7..58c501adc 100644 --- a/src/ess/livedata/dashboard/extractors.py +++ b/src/ess/livedata/dashboard/extractors.py @@ -11,6 +11,23 @@ from .plot_params import WindowAggregation +def _extract_time_bounds_as_scalars(data: sc.DataArray) -> dict[str, sc.Variable]: + """ + Extract start_time/end_time as scalar values from 1-D coords. + + When data comes from a TemporalBuffer, start_time and end_time are 1-D + coordinates (one value per time slice). This function extracts the overall + time range (min of start_time, max of end_time) for use in plot titles. + + Returns an empty dict if coords don't exist or are already scalar. + """ + bounds: dict[str, sc.Variable] = {} + for name, func in [('start_time', 'min'), ('end_time', 'max')]: + if name in data.coords and data.coords[name].ndim == 1: + bounds[name] = getattr(data.coords[name], func)() + return bounds + + class UpdateExtractor(ABC): """Extracts a specific view of buffered data.""" @@ -63,7 +80,10 @@ def get_required_timespan(self) -> float: def extract(self, data: sc.DataArray) -> Any: """Extract the latest value from the data, unwrapped.""" - return data[self._concat_dim, -1] if self._concat_dim in data.dims else data + if self._concat_dim not in data.dims: + return data + # Extract last slice - this also gets the last value from any 1-D coords + return data[self._concat_dim, -1] def _ensure_datetime_coord(data: sc.DataArray, dim: str = 'time') -> sc.DataArray: @@ -102,7 +122,8 @@ def get_required_timespan(self) -> float: def extract(self, data: sc.DataArray) -> Any: """Extract all data from the buffer, converting time to datetime64.""" - return _ensure_datetime_coord(data, self._concat_dim) + result = _ensure_datetime_coord(data, self._concat_dim) + return result.assign_coords(**_extract_time_bounds_as_scalars(result)) class WindowAggregatingExtractor(UpdateExtractor): @@ -173,6 +194,9 @@ def extract(self, data: sc.DataArray) -> Any: # the cutoff shift above should handle that well enough. windowed_data = data[self._concat_dim, cutoff_time:] + # Capture time bounds before aggregation (which removes the time dimension) + time_bounds = _extract_time_bounds_as_scalars(windowed_data) + # Resolve and cache aggregator function on first call if self._aggregator is None: if self._aggregation == WindowAggregation.auto: @@ -193,4 +217,10 @@ def extract(self, data: sc.DataArray) -> Any: if self._aggregator is None: raise ValueError(f"Unknown aggregation method: {self._aggregation}") - return self._aggregator(windowed_data, self._concat_dim) + result = self._aggregator(windowed_data, self._concat_dim) + + # Restore time bounds as scalar coords on the aggregated result + if time_bounds: + result = result.assign_coords(**time_bounds) + + return result diff --git a/src/ess/livedata/dashboard/plots.py b/src/ess/livedata/dashboard/plots.py index fbaac9856..27579175f 100644 --- a/src/ess/livedata/dashboard/plots.py +++ b/src/ess/livedata/dashboard/plots.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) """This file contains utilities for creating plots in the dashboard.""" +import time from abc import ABC, abstractmethod from typing import Any, cast @@ -28,6 +29,56 @@ from .scipp_to_holoviews import to_holoviews +def _format_time_ns(ns: int) -> str: + """Format nanoseconds since epoch as HH:MM:SS.s in local time (0.1s precision).""" + from datetime import UTC, datetime + + dt = datetime.fromtimestamp(ns / 1e9, tz=UTC).astimezone() + return f"{dt.strftime('%H:%M:%S')}.{dt.microsecond // 100000}" + + +def _compute_time_info(data: dict[str, sc.DataArray]) -> str | None: + """ + Compute time interval and lag from start_time/end_time coordinates. + + Returns a formatted string like "12:34:56 - 12:35:02 (Lag: 2.3s)" or None + if no timing coordinates are found. Uses the earliest start_time and + latest end_time across all DataArrays to show the full data range. + Lag is computed from the earliest end_time (oldest data) to show worst-case + staleness. + """ + now_ns = time.time_ns() + min_start: int | None = None + min_end: int | None = None + max_end: int | None = None + + for da in data.values(): + if 'start_time' in da.coords: + start_ns = da.coords['start_time'].value + if min_start is None or start_ns < min_start: + min_start = start_ns + if 'end_time' in da.coords: + end_ns = da.coords['end_time'].value + if min_end is None or end_ns < min_end: + min_end = end_ns + if max_end is None or end_ns > max_end: + max_end = end_ns + + if min_end is None: + return None + + # Use min_end for lag (oldest data = maximum lag) + lag_s = (now_ns - min_end) / 1e9 + + if min_start is not None and max_end is not None: + start_str = _format_time_ns(min_start) + end_str = _format_time_ns(max_end) + return f'{start_str} - {end_str} (Lag: {lag_s:.1f}s)' + else: + end_str = _format_time_ns(min_end) + return f'{end_str} (Lag: {lag_s:.1f}s)' + + class Plotter(ABC): """ Base class for plots that support autoscaling. @@ -245,10 +296,18 @@ def __call__( plots = [self._apply_generic_options(p) for p in plots] if self.layout_params.combine_mode == 'overlay': - return hv.Overlay(plots).opts(shared_axes=True) - if len(plots) == 1: - return plots[0] - return hv.Layout(plots).cols(self.layout_params.layout_columns) + result = hv.Overlay(plots).opts(shared_axes=True) + elif len(plots) == 1: + result = plots[0] + else: + result = hv.Layout(plots).cols(self.layout_params.layout_columns) + + # Add time interval and lag indicator as plot title + time_info = _compute_time_info(data) + if time_info is not None: + result = result.opts(title=time_info, fontsize={'title': '10pt'}) + + return result def _apply_generic_options(self, plot_element: hv.Element) -> hv.Element: """Apply generic options like aspect ratio to a plot element.""" diff --git a/src/ess/livedata/dashboard/temporal_buffers.py b/src/ess/livedata/dashboard/temporal_buffers.py index 3da5967c6..dd9012706 100644 --- a/src/ess/livedata/dashboard/temporal_buffers.py +++ b/src/ess/livedata/dashboard/temporal_buffers.py @@ -291,10 +291,15 @@ class TemporalBuffer(BufferProtocol[sc.DataArray]): Validates that non-time coords and masks remain constant across all added data. """ + # Coordinates that should be accumulated per-item rather than stored as scalars + _ACCUMULATED_COORDS = ('time', 'start_time', 'end_time') + def __init__(self) -> None: """Initialize empty temporal buffer.""" self._data_buffer: VariableBuffer | None = None self._time_buffer: VariableBuffer | None = None + self._start_time_buffer: VariableBuffer | None = None + self._end_time_buffer: VariableBuffer | None = None self._reference: sc.DataArray | None = None self._max_memory: int | None = None self._required_timespan: float = 0.0 @@ -332,6 +337,12 @@ def add(self, data: sc.DataArray) -> None: if not self._time_buffer.append(data.coords['time']): raise RuntimeError("Time buffer append failed unexpectedly") + # Append start_time and end_time if present + if self._start_time_buffer is not None and 'start_time' in data.coords: + self._start_time_buffer.append(data.coords['start_time']) + if self._end_time_buffer is not None and 'end_time' in data.coords: + self._end_time_buffer.append(data.coords['end_time']) + def get(self) -> sc.DataArray | None: """Return the complete buffer.""" if self._data_buffer is None: @@ -344,6 +355,12 @@ def get(self) -> sc.DataArray | None: coords = {'time': time_coord} coords.update(self._reference.coords) + # Add accumulated start_time and end_time as 1-D coords + if self._start_time_buffer is not None: + coords['start_time'] = self._start_time_buffer.get() + if self._end_time_buffer is not None: + coords['end_time'] = self._end_time_buffer.get() + masks = dict(self._reference.masks) return sc.DataArray(data=data_var, coords=coords, masks=masks) @@ -352,6 +369,8 @@ def clear(self) -> None: """Clear all buffered data.""" self._data_buffer = None self._time_buffer = None + self._start_time_buffer = None + self._end_time_buffer = None self._reference = None def set_required_timespan(self, seconds: float) -> None: @@ -368,11 +387,16 @@ def set_max_memory(self, max_bytes: int) -> None: def _initialize_buffers(self, data: sc.DataArray) -> None: """Initialize buffers with first data, storing reference metadata.""" - # Store reference as slice at time=0 without time coord + # Store reference as slice at time=0 without accumulated coords if 'time' in data.dims: - self._reference = data['time', 0].drop_coords('time') + ref = data['time', 0] else: - self._reference = data.drop_coords('time') + ref = data + # Drop all coords that will be accumulated (not stored in reference) + for coord in self._ACCUMULATED_COORDS: + if coord in ref.coords: + ref = ref.drop_coords(coord) + self._reference = ref # Calculate max_capacity from memory limit if 'time' in data.dims: @@ -393,6 +417,25 @@ def _initialize_buffers(self, data: sc.DataArray) -> None: data=data.coords['time'], max_capacity=max_capacity, concat_dim='time' ) + # Create buffers for start_time and end_time if present + if 'start_time' in data.coords: + self._start_time_buffer = VariableBuffer( + data=data.coords['start_time'], + max_capacity=max_capacity, + concat_dim='time', + ) + else: + self._start_time_buffer = None + + if 'end_time' in data.coords: + self._end_time_buffer = VariableBuffer( + data=data.coords['end_time'], + max_capacity=max_capacity, + concat_dim='time', + ) + else: + self._end_time_buffer = None + def _trim_to_timespan(self, new_data: sc.DataArray) -> None: """Trim buffer to keep only data within required timespan.""" if self._required_timespan < 0: @@ -401,8 +444,7 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None: if self._required_timespan == 0.0: # Keep only the latest value - drop all existing data drop_count = self._data_buffer.size - self._data_buffer.drop(drop_count) - self._time_buffer.drop(drop_count) + self._drop_from_all_buffers(drop_count) return # Get latest time from new data @@ -425,9 +467,17 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None: # Find first True index drop_count = int(keep_mask.values.argmax()) - # Trim both buffers by same amount to keep them in sync + # Trim all buffers by same amount to keep them in sync + self._drop_from_all_buffers(drop_count) + + def _drop_from_all_buffers(self, drop_count: int) -> None: + """Drop data from all buffers to keep them in sync.""" self._data_buffer.drop(drop_count) self._time_buffer.drop(drop_count) + if self._start_time_buffer is not None: + self._start_time_buffer.drop(drop_count) + if self._end_time_buffer is not None: + self._end_time_buffer.drop(drop_count) def _metadata_matches(self, data: sc.DataArray) -> bool: """Check if incoming data's metadata matches stored reference metadata.""" @@ -438,6 +488,10 @@ def _metadata_matches(self, data: sc.DataArray) -> bool: new = data # Create template with reference data but incoming metadata - template = new.assign(self._reference.data).drop_coords('time') + # Drop all accumulated coords for comparison + template = new.assign(self._reference.data) + for coord in self._ACCUMULATED_COORDS: + if coord in template.coords: + template = template.drop_coords(coord) return sc.identical(self._reference, template) diff --git a/src/ess/livedata/handlers/detector_view.py b/src/ess/livedata/handlers/detector_view.py index 89633d53a..039097708 100644 --- a/src/ess/livedata/handlers/detector_view.py +++ b/src/ess/livedata/handlers/detector_view.py @@ -66,6 +66,7 @@ def __init__( self._roi_mapper = get_roi_mapper() self._initial_readback_sent = False self._current_start_time: int | None = None + self._current_end_time: int | None = None # Ratemeter: track event counts per finalize period self._counts_total: int = 0 @@ -116,9 +117,10 @@ def accumulate( "DetectorViewProcessor expects exactly one detector data item." ) - # Track start time of first detector data since last finalize + # Track time range of detector data since last finalize if self._current_start_time is None: self._current_start_time = start_time + self._current_end_time = end_time raw = next(iter(detector_data.values())) filtered = self.apply_toa_range(raw) @@ -136,6 +138,13 @@ def finalize(self) -> dict[str, sc.DataArray]: "finalize called without any detector data accumulated via accumulate" ) + # Create time coords for delta outputs. The start_time and end_time coords + # represent the time range of the current (delta) output, which is the + # period since the last finalize. This differs from the job-level times + # which cover the entire job duration. + start_time_coord = sc.scalar(self._current_start_time, unit='ns') + end_time_coord = sc.scalar(self._current_end_time, unit='ns') + cumulative = self._view.cumulative.copy() # This is a hack to get the current counts. Should be updated once # ess.reduce.live.raw.RollingDetectorView has been modified to support this. @@ -144,10 +153,9 @@ def finalize(self) -> dict[str, sc.DataArray]: current = current - self._previous self._previous = cumulative - # Add time coord to current result - time_coord = sc.scalar(self._current_start_time, unit='ns') - current = current.assign_coords(time=time_coord) - self._current_start_time = None + current = current.assign_coords( + time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord + ) result = sc.DataGroup(cumulative=cumulative, current=current) view_result = dict(result * self._inv_weights if self._use_weights else result) @@ -171,7 +179,10 @@ def finalize(self) -> dict[str, sc.DataArray]: roi_result = { 'roi_spectra_current': roi_current.assign_coords( - roi=roi_coord, time=time_coord + roi=roi_coord, + time=start_time_coord, + start_time=start_time_coord, + end_time=end_time_coord, ), 'roi_spectra_cumulative': roi_cumulative.assign_coords(roi=roi_coord), } @@ -215,22 +226,35 @@ def finalize(self) -> dict[str, sc.DataArray]: counts_result = { 'counts_total': sc.DataArray( sc.scalar(self._counts_total, unit='counts'), - coords={'time': time_coord}, + coords={ + 'time': start_time_coord, + 'start_time': start_time_coord, + 'end_time': end_time_coord, + }, ), 'counts_in_toa_range': sc.DataArray( sc.scalar(self._counts_in_toa_range, unit='counts'), - coords={'time': time_coord}, + coords={ + 'time': start_time_coord, + 'start_time': start_time_coord, + 'end_time': end_time_coord, + }, ), } self._counts_total = 0 self._counts_in_toa_range = 0 + # Reset time tracking for next period + self._current_start_time = None + self._current_end_time = None + return {**view_result, **roi_result, **counts_result} def clear(self) -> None: self._view.clear_counts() self._previous = None self._current_start_time = None + self._current_end_time = None for roi_state in self._rois.values(): roi_state.clear() self._counts_total = 0 diff --git a/src/ess/livedata/handlers/monitor_data_handler.py b/src/ess/livedata/handlers/monitor_data_handler.py index 62347d80e..344aa6b07 100644 --- a/src/ess/livedata/handlers/monitor_data_handler.py +++ b/src/ess/livedata/handlers/monitor_data_handler.py @@ -26,6 +26,7 @@ def __init__( self._cumulative: sc.DataArray | None = None self._current: sc.DataArray | None = None self._current_start_time: int | None = None + self._current_end_time: int | None = None # Ratemeter configuration - convert range to edges unit once dim = edges.dim if toa_range is not None: @@ -57,9 +58,10 @@ def accumulate( if len(data) != 1: raise ValueError("MonitorStreamProcessor expects exactly one data item.") - # Track start time of first data since last finalize + # Track time range of data since last finalize if self._current_start_time is None: self._current_start_time = start_time + self._current_end_time = end_time raw = next(iter(data.values())) # Note: In theory we should consider rebinning/histogramming only in finalize(), @@ -104,14 +106,22 @@ def finalize(self) -> dict[Hashable, sc.DataArray]: self._cumulative += current self._current = sc.zeros_like(current) - # Add time coord to current result - time_coord = sc.scalar(self._current_start_time, unit='ns') - current = current.assign_coords(time=time_coord) - self._current_start_time = None + # Create time coords for delta outputs. The start_time and end_time coords + # represent the time range of the current (delta) output, which is the + # period since the last finalize. This differs from the job-level times + # which cover the entire job duration. + start_time_coord = sc.scalar(self._current_start_time, unit='ns') + end_time_coord = sc.scalar(self._current_end_time, unit='ns') + + current = current.assign_coords( + time=start_time_coord, start_time=start_time_coord, end_time=end_time_coord + ) # Compute ratemeter counts (always output both) counts_total = current.sum() - counts_total.coords['time'] = time_coord + counts_total.coords['time'] = start_time_coord + counts_total.coords['start_time'] = start_time_coord + counts_total.coords['end_time'] = end_time_coord if self._toa_range is not None: low, high = self._toa_range @@ -120,9 +130,15 @@ def finalize(self) -> dict[Hashable, sc.DataArray]: else: # When TOA range not enabled, counts_in_toa_range equals total counts_in_toa_range = counts_total.copy() - counts_in_toa_range.coords['time'] = time_coord + counts_in_toa_range.coords['time'] = start_time_coord + counts_in_toa_range.coords['start_time'] = start_time_coord + counts_in_toa_range.coords['end_time'] = end_time_coord counts_in_toa_range.coords[self._edges.dim] = self._toa_range_edges + # Reset time tracking for next period + self._current_start_time = None + self._current_end_time = None + return { 'cumulative': self._cumulative, 'current': current, @@ -134,6 +150,7 @@ def clear(self) -> None: self._cumulative = None self._current = None self._current_start_time = None + self._current_end_time = None class MonitorHandlerFactory( diff --git a/tests/core/job_test.py b/tests/core/job_test.py index d5056f7fa..6a41b0efa 100644 --- a/tests/core/job_test.py +++ b/tests/core/job_test.py @@ -315,6 +315,116 @@ def test_get_returns_job_result(self, sample_job, sample_job_data): assert isinstance(result.data, sc.DataGroup) assert result.error_message is None + def test_get_adds_time_coords_to_data_arrays( + self, fake_processor, sample_workflow_id + ): + """Test that get() adds start_time and end_time coords to DataArrays.""" + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return a DataArray + fake_processor.data = { + "output": sc.DataArray(data=sc.array(dims=["x"], values=[1, 2, 3])) + } + + # Add data to set job times + data = JobData( + start_time=1000, + end_time=2000, + primary_data={"test_source": sc.scalar(42.0)}, + aux_data={}, + ) + job.add(data) + + result = job.get() + output = result.data["output"] + + assert "start_time" in output.coords + assert "end_time" in output.coords + assert output.coords["start_time"].value == 1000 + assert output.coords["end_time"].value == 2000 + assert output.coords["start_time"].unit == "ns" + assert output.coords["end_time"].unit == "ns" + + def test_get_preserves_existing_time_coords_on_data_arrays( + self, fake_processor, sample_workflow_id + ): + """Test that get() skips DataArrays that already have time coords. + + This allows workflows to set their own time coords for outputs that + represent different time ranges (e.g., delta outputs that only cover + the period since the last finalize, not the entire job duration). + """ + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return DataArrays: one with existing time coords, + # one without + workflow_start = sc.scalar(500, unit='ns') + workflow_end = sc.scalar(1500, unit='ns') + fake_processor.data = { + "delta_output": sc.DataArray( + data=sc.scalar(1.0), + coords={'start_time': workflow_start, 'end_time': workflow_end}, + ), + "cumulative_output": sc.DataArray(data=sc.scalar(2.0)), + } + + # Add data to set job times (different from workflow times) + data = JobData( + start_time=1000, + end_time=2000, + primary_data={"test_source": sc.scalar(42.0)}, + aux_data={}, + ) + job.add(data) + + result = job.get() + + # Delta output should preserve workflow-set times + delta = result.data["delta_output"] + assert delta.coords["start_time"].value == 500 + assert delta.coords["end_time"].value == 1500 + + # Cumulative output should get job-level times + cumulative = result.data["cumulative_output"] + assert cumulative.coords["start_time"].value == 1000 + assert cumulative.coords["end_time"].value == 2000 + + def test_get_does_not_add_time_coords_when_no_data_added( + self, fake_processor, sample_workflow_id + ): + """Test that get() does not add time coords when job has no timing info.""" + job_id = JobId(source_name="test_source", job_number=1) + job = Job( + job_id=job_id, + workflow_id=sample_workflow_id, + processor=fake_processor, + source_names=["test_source"], + ) + + # Set up processor to return a DataArray + fake_processor.data = { + "output": sc.DataArray(data=sc.array(dims=["x"], values=[1, 2, 3])) + } + + # Don't add any data, so start_time and end_time remain None + result = job.get() + output = result.data["output"] + + assert "start_time" not in output.coords + assert "end_time" not in output.coords + def test_get_calls_processor_finalize(self, sample_job, fake_processor): """Test that get() calls processor.finalize().""" sample_job.get() diff --git a/tests/dashboard/extractor_time_coords_test.py b/tests/dashboard/extractor_time_coords_test.py new file mode 100644 index 000000000..e10700494 --- /dev/null +++ b/tests/dashboard/extractor_time_coords_test.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Tests for time coordinate handling in extractors. + +Extractors should ensure that extracted data has correct start_time/end_time +coordinates reflecting the actual time range of the extracted data, not stale +coordinates from the buffer reference. +""" + +import time + +import scipp as sc + +from ess.livedata.dashboard.extractors import ( + FullHistoryExtractor, + LatestValueExtractor, + WindowAggregatingExtractor, +) + + +def _make_buffered_data( + num_points: int = 5, + time_span_seconds: float = 5.0, +) -> sc.DataArray: + """ + Create data simulating what comes from a TemporalBuffer. + + The data has: + - A 'time' dimension with actual timestamps (recent) + - 1-D start_time/end_time coords (one per time slice, as TemporalBuffer stores them) + """ + now_ns = time.time_ns() + + # Time values: spanning the last `time_span_seconds` + time_step_ns = int(time_span_seconds * 1e9 / num_points) + time_values = [ + now_ns - (num_points - 1 - i) * time_step_ns for i in range(num_points) + ] + + # Each frame has its own start_time/end_time (1-D coords) + # Assume each frame covers 1 second + frame_duration_ns = int(1e9) + start_time_values = time_values # start_time == time for each frame + end_time_values = [t + frame_duration_ns for t in time_values] + + return sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * num_points), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + # 1-D coords as TemporalBuffer produces + 'start_time': sc.array(dims=['time'], values=start_time_values, unit='ns'), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + +class TestFullHistoryExtractor: + """Tests for FullHistoryExtractor time coordinate handling.""" + + def test_sets_start_time_from_min_of_1d_coord(self): + """start_time should be min of the 1-D start_time coord.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # Result should have scalar start_time = min of input 1-D start_time + expected_start = data.coords['start_time'].values[0] # First value (min) + assert 'start_time' in result.coords + assert result.coords['start_time'].ndim == 0 # Scalar + assert result.coords['start_time'].value == expected_start + + def test_sets_end_time_from_max_of_1d_coord(self): + """end_time should be max of the 1-D end_time coord.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # Result should have scalar end_time = max of input 1-D end_time + expected_end = data.coords['end_time'].values[-1] # Last value (max) + assert 'end_time' in result.coords + assert result.coords['end_time'].ndim == 0 # Scalar + assert result.coords['end_time'].value == expected_end + + def test_end_time_reflects_actual_data_range(self): + """end_time should reflect the actual data, giving recent lag.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + # end_time should be recent (within last few seconds) + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0-1s" + + def test_handles_datetime64_time_coord(self): + """Should work with datetime64 time coordinates and 1-D time bounds.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + time_var = sc.epoch(unit='ns') + sc.array( + dims=['time'], values=time_values, unit='ns' + ) + + # Create with 1-D start_time/end_time coords + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': time_var, + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + extractor = FullHistoryExtractor() + + result = extractor.extract(data) + + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 + assert result.coords['end_time'].ndim == 0 + + +class TestWindowAggregatingExtractor: + """Tests for WindowAggregatingExtractor time coordinate handling.""" + + def test_sets_time_coords_from_actual_window(self): + """Time coords should reflect the actual window that was aggregated.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=10, time_span_seconds=10.0) + # Window of 3 seconds should select roughly the last 3 data points + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + + result = extractor.extract(data) + + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 # Scalar + assert result.coords['end_time'].ndim == 0 # Scalar + + # end_time should be recent (within last second or so) + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0s" + + # start_time should be roughly 3 seconds before end_time + # Note: with 10 points over 10s (1s apart), a 3s window captures ~3 points, + # so the span from first to last end_time is ~2-3s + start_time_ns = result.coords['start_time'].value + window_duration = (end_time_ns - start_time_ns) / 1e9 + assert ( + 1.5 <= window_duration <= 5.0 + ), f"Window duration {window_duration}s should be ~2-3s" + + def test_end_time_reflects_actual_window(self): + """end_time should reflect the actual window, giving recent lag.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=10, time_span_seconds=10.0) + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + + result = extractor.extract(data) + + # end_time should be recent + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0-1s" + + +class TestLatestValueExtractor: + """Tests for LatestValueExtractor time coordinate handling.""" + + def test_sets_time_coords_from_latest_slice(self): + """Time coords should be extracted from the latest slice's 1-D coords.""" + now_ns = time.time_ns() + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = LatestValueExtractor() + + result = extractor.extract(data) + + # Should have scalar coords from the last slice + assert 'start_time' in result.coords + assert 'end_time' in result.coords + assert result.coords['start_time'].ndim == 0 + assert result.coords['end_time'].ndim == 0 + + # end_time should be recent + end_time_ns = result.coords['end_time'].value + lag_seconds = (now_ns - end_time_ns) / 1e9 + assert lag_seconds < 5.0, f"end_time lag {lag_seconds}s should be ~0s" + + def test_extracts_last_value_from_1d_coords(self): + """Should extract the last value from 1-D start_time/end_time coords.""" + data = _make_buffered_data(num_points=5, time_span_seconds=5.0) + extractor = LatestValueExtractor() + + result = extractor.extract(data) + + # Verify coords are from the last slice + expected_start = data.coords['start_time'].values[-1] + expected_end = data.coords['end_time'].values[-1] + assert result.coords['start_time'].value == expected_start + assert result.coords['end_time'].value == expected_end diff --git a/tests/dashboard/plots_test.py b/tests/dashboard/plots_test.py index 94d1f816d..8f5f68b08 100644 --- a/tests/dashboard/plots_test.py +++ b/tests/dashboard/plots_test.py @@ -1343,3 +1343,118 @@ def test_free_aspect_applies_responsive_only( opts = hv.Store.lookup_options('bokeh', curve, 'plot').kwargs assert opts.get('responsive') is True assert 'aspect' not in opts or opts.get('aspect') is None + + +class TestLagIndicator: + """Tests for lag indicator functionality in plotters.""" + + @pytest.fixture + def data_key(self): + """Create a test ResultKey.""" + workflow_id = WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + job_id = JobId(source_name='test_source', job_number=uuid.uuid4()) + return ResultKey( + workflow_id=workflow_id, job_id=job_id, output_name='test_result' + ) + + def test_time_info_shown_when_coords_present(self, data_key): + """Test that time interval and lag are shown in title.""" + import time + + now_ns = time.time_ns() + # Create data with start_time 2 seconds ago and end_time 1 second ago + start_time_ns = now_ns - int(2e9) + end_time_ns = now_ns - int(1e9) + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(start_time_ns, unit='ns'), + 'end_time': sc.scalar(end_time_ns, unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + # Check that title contains time range and lag + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + assert 'title' in opts + title = opts['title'] + # Should contain time range separator and lag + assert ' - ' in title # hyphen between times + assert 'Lag:' in title + # Lag should be approximately 1 second + assert '1.' in title or '2.' in title + + def test_no_lag_title_when_end_time_absent(self, data_key): + """Test that no lag title is added when end_time coord is absent.""" + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0])}, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + # Check that no title is set (or title doesn't contain Lag) + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + title = opts.get('title', '') + assert 'Lag:' not in title + + def test_lag_uses_maximum_across_multiple_sources(self): + """Test that lag shows the maximum (oldest data) when multiple sources.""" + import time + + workflow_id = WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + + now_ns = time.time_ns() + # Source 1: data from 2s to 1s ago + data_key1 = ResultKey( + workflow_id=workflow_id, + job_id=JobId(source_name='source1', job_number=uuid.uuid4()), + output_name='result', + ) + data1 = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(now_ns - int(2e9), unit='ns'), + 'end_time': sc.scalar(now_ns - int(1e9), unit='ns'), + }, + ) + + # Source 2: data from 6s to 5s ago (older, should determine the lag) + data_key2 = ResultKey( + workflow_id=workflow_id, + job_id=JobId(source_name='source2', job_number=uuid.uuid4()), + output_name='result', + ) + data2 = sc.DataArray( + data=sc.array(dims=['x'], values=[4.0, 5.0, 6.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(now_ns - int(6e9), unit='ns'), + 'end_time': sc.scalar(now_ns - int(5e9), unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key1: data1, data_key2: data2}) + + # Check that lag is approximately 5 seconds (the older data) + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + assert 'title' in opts + assert 'Lag:' in opts['title'] + # Should show ~5 seconds, not ~1 second (using oldest end_time) + assert '5.' in opts['title'] or '6.' in opts['title'] diff --git a/tests/dashboard/time_info_test.py b/tests/dashboard/time_info_test.py new file mode 100644 index 000000000..c5a878e7e --- /dev/null +++ b/tests/dashboard/time_info_test.py @@ -0,0 +1,262 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Tests for time-interval display in plot titles. + +These tests verify that the time information shown in plot titles is factual, +i.e., it reflects the actual time range of the data being displayed. + +The tests simulate the end-to-end flow: buffered data → extractor → plotter. +Extractors are responsible for setting correct start_time/end_time coords +based on the actual data range, not stale coords from buffer references. +""" + +import time +import uuid + +import holoviews as hv +import pytest +import scipp as sc + +from ess.livedata.config.workflow_spec import JobId, ResultKey, WorkflowId +from ess.livedata.dashboard import plots +from ess.livedata.dashboard.extractors import ( + FullHistoryExtractor, + WindowAggregatingExtractor, +) +from ess.livedata.dashboard.plot_params import PlotParams1d + +hv.extension('bokeh') + + +@pytest.fixture +def workflow_id(): + return WorkflowId( + instrument='test_instrument', + namespace='test_namespace', + name='test_workflow', + version=1, + ) + + +@pytest.fixture +def data_key(workflow_id): + job_id = JobId(source_name='test_source', job_number=uuid.uuid4()) + return ResultKey(workflow_id=workflow_id, job_id=job_id, output_name='test_result') + + +def _extract_title(result) -> str: + """Extract title from a HoloViews plot result.""" + opts = hv.Store.lookup_options('bokeh', result, 'plot').kwargs + return opts.get('title', '') + + +def _parse_time_range_from_title(title: str) -> tuple[str, str] | None: + """ + Parse start and end time from a title string. + + Expected format: "HH:MM:SS.s - HH:MM:SS.s (Lag: X.Xs)" + Returns (start_time_str, end_time_str) or None if not found. + """ + if ' - ' not in title or 'Lag:' not in title: + return None + time_part = title.split(' (Lag:')[0] + parts = time_part.split(' - ') + if len(parts) != 2: + return None + return parts[0].strip(), parts[1].strip() + + +def _extract_lag_seconds(title: str) -> float: + """Extract lag in seconds from title string.""" + lag_match = title.split('Lag:')[1].strip().rstrip('s)') + return float(lag_match) + + +class TestTimeseriesTimeInfo: + """ + Tests for time info in timeseries plots (full history extraction). + + These tests simulate the end-to-end flow through FullHistoryExtractor, + which computes scalar start_time/end_time from min/max of 1-D coords. + """ + + def test_time_info_reflects_actual_time_dimension_range(self, data_key): + """Time info should reflect the actual time range of the data.""" + now_ns = time.time_ns() + + # Buffered data with 1-D start_time/end_time coords (as TemporalBuffer produces) + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction (this is what happens in the real data flow) + extractor = FullHistoryExtractor() + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + + # Lag should reflect actual data (last end_time ~0s ago) + assert lag_seconds < 5.0, f"Lag {lag_seconds}s should be ~0s" + + def test_time_info_with_datetime64_time_coord(self, data_key): + """Time info should work with datetime64 time coordinates.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + time_var = sc.epoch(unit='ns') + sc.array( + dims=['time'], values=time_values, unit='ns' + ) + + # Create with 1-D start_time/end_time coords + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': time_var, + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction + extractor = FullHistoryExtractor() + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + # Extractor computes min/max of start_time/end_time + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + assert lag_seconds < 5.0 + + +class TestWindowedTimeInfo: + """ + Tests for time info in windowed plots (aggregation over time window). + + These tests simulate the end-to-end flow through WindowAggregatingExtractor, + which computes min/max of start_time/end_time from the windowed 1-D coords. + """ + + def test_aggregated_data_reflects_actual_window_time_range(self, data_key): + """Aggregated data should show the time range of the aggregation window.""" + now_ns = time.time_ns() + + # Buffered data with 1-D start_time/end_time coords (as TemporalBuffer produces) + time_values = [now_ns - int(i * 1e9) for i in range(10, 0, -1)] + frame_duration_ns = int(1e9) + start_time_values = time_values + end_time_values = [t + frame_duration_ns for t in time_values] + + buffered_data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0, 3.0]] * 10), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.array( + dims=['time'], values=start_time_values, unit='ns' + ), + 'end_time': sc.array(dims=['time'], values=end_time_values, unit='ns'), + }, + ) + + # Simulate extraction with 3-second window + extractor = WindowAggregatingExtractor(window_duration_seconds=3.0) + extracted_data = extractor.extract(buffered_data) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: extracted_data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + + # Lag should reflect actual window end (~0s) + assert lag_seconds < 5.0, f"Lag {lag_seconds}s should be ~0s" + + +class TestTimeInfoBaseline: + """Tests verifying time info works when coords are accurate.""" + + def test_correct_lag_with_accurate_coords(self, data_key): + """Time info should be correct when coords accurately reflect the data.""" + now_ns = time.time_ns() + start_time = now_ns - int(2e9) + end_time = now_ns - int(1e9) + + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0]), + 'start_time': sc.scalar(start_time, unit='ns'), + 'end_time': sc.scalar(end_time, unit='ns'), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + title = _extract_title(result) + assert 'Lag:' in title + lag_seconds = _extract_lag_seconds(title) + assert 0.5 < lag_seconds < 3.0 + + +class TestTimeInfoEdgeCases: + """Tests for edge cases in time info handling.""" + + def test_no_time_info_without_time_coords_or_dimension(self, data_key): + """No time info should be shown when there are no time coordinates.""" + data = sc.DataArray( + data=sc.array(dims=['x'], values=[1.0, 2.0, 3.0]), + coords={'x': sc.array(dims=['x'], values=[0.0, 1.0, 2.0])}, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) + + title = _extract_title(result) + assert 'Lag:' not in title + + def test_time_dimension_without_coords_does_not_crash(self, data_key): + """Data with 'time' dimension but no scalar time coords should not crash.""" + now_ns = time.time_ns() + time_values = [now_ns - int(i * 1e9) for i in range(5, 0, -1)] + + data = sc.DataArray( + data=sc.array(dims=['time', 'x'], values=[[1.0, 2.0]] * 5), + coords={ + 'time': sc.array(dims=['time'], values=time_values, unit='ns'), + 'x': sc.array(dims=['x'], values=[0.0, 1.0]), + }, + ) + + plotter = plots.LinePlotter.from_params(PlotParams1d()) + result = plotter({data_key: data}) # Should not raise + assert isinstance(_extract_title(result), str) diff --git a/tests/handlers/detector_view_test.py b/tests/handlers/detector_view_test.py index 4a5184ea8..63ad8ad92 100644 --- a/tests/handlers/detector_view_test.py +++ b/tests/handlers/detector_view_test.py @@ -251,6 +251,69 @@ def test_time_coord_tracks_first_accumulate( result2 = view.finalize() assert result2['current'].coords['time'].value == 5000 + def test_current_has_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that 'current' result has start_time and end_time coords. + + Delta outputs like 'current' need their own time bounds that represent + the period since the last finalize, not the entire job duration. + """ + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # Accumulate with specific time range + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result = view.finalize() + + # Verify start_time and end_time coords are present on current + assert 'start_time' in result['current'].coords + assert 'end_time' in result['current'].coords + assert result['current'].coords['start_time'].value == 1000 + assert result['current'].coords['end_time'].value == 2000 + assert result['current'].coords['start_time'].unit == 'ns' + assert result['current'].coords['end_time'].unit == 'ns' + + # cumulative should not have start_time or end_time coords + # (they will be added by Job.get() with job-level times) + assert 'start_time' not in result['cumulative'].coords + assert 'end_time' not in result['cumulative'].coords + + def test_delta_outputs_track_time_since_last_finalize( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that delta outputs track time since last finalize, not job start. + + This is critical for showing correct time bounds when the dashboard + displays the period used to compute delta outputs like 'current'. + """ + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # First period: accumulate and finalize + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result1 = view.finalize() + assert result1['current'].coords['start_time'].value == 1000 + assert result1['current'].coords['end_time'].value == 2000 + + # Second period: new time range + view.accumulate( + {'detector': sample_detector_events}, start_time=3000, end_time=4000 + ) + result2 = view.finalize() + + # Delta output should reflect second period's time range, NOT job start + assert result2['current'].coords['start_time'].value == 3000 + assert result2['current'].coords['end_time'].value == 4000 + def test_finalize_without_accumulate_raises( self, mock_rolling_view: RollingDetectorView ) -> None: @@ -460,6 +523,49 @@ def test_roi_current_has_time_coord( # ROI cumulative should not have time coord assert 'time' not in result['roi_spectra_cumulative'].coords + def test_roi_current_has_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + standard_roi: RectangleROI, + standard_toa_edges: TOAEdges, + ) -> None: + """Test that ROI current results have start_time and end_time coords.""" + params = DetectorViewParams(toa_edges=standard_toa_edges) + view = DetectorView(params=params, detector_view=mock_rolling_view) + + # Configure ROI + view.accumulate( + roi_to_accumulate_data(standard_roi), start_time=1000, end_time=2000 + ) + + # Accumulate detector events + view.accumulate( + {'detector': sample_detector_events}, start_time=2500, end_time=3000 + ) + + result = view.finalize() + + # Verify start_time and end_time coords on ROI current + assert 'start_time' in result['roi_spectra_current'].coords + assert 'end_time' in result['roi_spectra_current'].coords + assert result['roi_spectra_current'].coords['start_time'].value == 2500 + assert result['roi_spectra_current'].coords['end_time'].value == 3000 + + # Verify they match detector view current time coords + assert ( + result['roi_spectra_current'].coords['start_time'] + == result['current'].coords['start_time'] + ) + assert ( + result['roi_spectra_current'].coords['end_time'] + == result['current'].coords['end_time'] + ) + + # ROI cumulative should not have start_time/end_time coords + assert 'start_time' not in result['roi_spectra_cumulative'].coords + assert 'end_time' not in result['roi_spectra_cumulative'].coords + def test_roi_cumulative_accumulation( self, mock_rolling_view: RollingDetectorView, @@ -1192,6 +1298,29 @@ def test_counts_output_in_finalize( assert result['counts_total'].unit == 'counts' assert result['counts_in_toa_range'].unit == 'counts' + def test_counts_have_start_end_time_coords( + self, + mock_rolling_view: RollingDetectorView, + sample_detector_events: sc.DataArray, + ) -> None: + """Test that counts outputs have start_time and end_time coords.""" + params = DetectorViewParams() + view = DetectorView(params=params, detector_view=mock_rolling_view) + + view.accumulate( + {'detector': sample_detector_events}, start_time=1000, end_time=2000 + ) + result = view.finalize() + + # Verify start_time and end_time coords on counts outputs + for key in ['counts_total', 'counts_in_toa_range']: + assert 'start_time' in result[key].coords, f"Missing start_time on {key}" + assert 'end_time' in result[key].coords, f"Missing end_time on {key}" + assert result[key].coords['start_time'].value == 1000 + assert result[key].coords['end_time'].value == 2000 + assert result[key].coords['start_time'].unit == 'ns' + assert result[key].coords['end_time'].unit == 'ns' + def test_counts_match_total_events_without_toa_filter( self, mock_rolling_view: RollingDetectorView, diff --git a/tests/handlers/monitor_data_handler_test.py b/tests/handlers/monitor_data_handler_test.py index 237f7cd39..a38e34870 100644 --- a/tests/handlers/monitor_data_handler_test.py +++ b/tests/handlers/monitor_data_handler_test.py @@ -149,8 +149,11 @@ def test_finalize_first_time(self, processor): assert "cumulative" in result assert "current" in result - # Check cumulative data (excluding time coord which current has) - assert_identical(result["cumulative"], result["current"].drop_coords("time")) + # Check cumulative data (excluding time coords which current has) + current_without_time = result["current"].drop_coords( + ["time", "start_time", "end_time"] + ) + assert_identical(result["cumulative"], current_without_time) # Verify time coordinate is present assert "time" in result["current"].coords @@ -213,6 +216,62 @@ def test_time_coordinate_tracks_first_accumulate(self, processor): result2 = processor.finalize() assert result2["current"].coords["time"].value == 5000 + def test_current_has_start_end_time_coords(self, processor): + """Test that 'current' result has start_time and end_time coords. + + Delta outputs like 'current' need their own time bounds that represent + the period since the last finalize, not the entire job duration. + """ + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result = processor.finalize() + + # Verify start_time and end_time coords are present on current + assert "start_time" in result["current"].coords + assert "end_time" in result["current"].coords + assert result["current"].coords["start_time"].value == 1000 + assert result["current"].coords["end_time"].value == 2000 + assert result["current"].coords["start_time"].unit == "ns" + assert result["current"].coords["end_time"].unit == "ns" + + # cumulative should not have start_time or end_time coords + # (they will be added by Job.get() with job-level times) + assert "start_time" not in result["cumulative"].coords + assert "end_time" not in result["cumulative"].coords + + def test_delta_outputs_track_time_since_last_finalize(self, processor): + """Test that delta outputs track time since last finalize, not job start. + + This is critical for showing correct time bounds when the dashboard + displays the period used to compute delta outputs like 'current'. + """ + # First period: accumulate and finalize + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result1 = processor.finalize() + assert result1["current"].coords["start_time"].value == 1000 + assert result1["current"].coords["end_time"].value == 2000 + + # Second period: new time range + processor.accumulate({"det1": np.array([20e6])}, start_time=3000, end_time=4000) + result2 = processor.finalize() + + # Delta output should reflect second period's time range, NOT job start + assert result2["current"].coords["start_time"].value == 3000 + assert result2["current"].coords["end_time"].value == 4000 + + def test_counts_have_start_end_time_coords(self, processor): + """Test that counts outputs have start_time and end_time coords.""" + processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000) + result = processor.finalize() + + # Verify start_time and end_time coords on counts outputs + for key in ["counts_total", "counts_in_toa_range"]: + assert "start_time" in result[key].coords, f"Missing start_time on {key}" + assert "end_time" in result[key].coords, f"Missing end_time on {key}" + assert result[key].coords["start_time"].value == 1000 + assert result[key].coords["end_time"].value == 2000 + assert result[key].coords["start_time"].unit == "ns" + assert result[key].coords["end_time"].unit == "ns" + def test_clear(self, processor): """Test clear method resets processor state.""" processor.accumulate(