Skip to content
33 changes: 33 additions & 0 deletions src/ess/livedata/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ class JobState(str, Enum):
warning = "warning"


def _add_time_coords(
data: sc.DataGroup, start_time: int | None, end_time: int | None
) -> sc.DataGroup:
"""
Add start_time and end_time as 0-D coordinates to all DataArrays in a DataGroup.

These coordinates provide temporal provenance for each output, enabling lag
calculation in the dashboard (lag = current_time - end_time).

DataArrays that already have start_time or end_time coordinates are skipped.
This allows workflows to set their own time coords for outputs that represent
different time ranges (e.g., "current" outputs that only cover the period
since the last finalize, not the entire job duration).
"""
if start_time is None or end_time is None:
return data
start_coord = sc.scalar(start_time, unit='ns')
end_coord = sc.scalar(end_time, unit='ns')

def maybe_add_coords(val: sc.DataArray) -> sc.DataArray:
if 'start_time' in val.coords or 'end_time' in val.coords:
return val
return val.assign_coords(start_time=start_coord, end_time=end_coord)

return sc.DataGroup(
{
key: (maybe_add_coords(val) if isinstance(val, sc.DataArray) else val)
for key, val in data.items()
}
)


class Job:
def __init__(
self,
Expand Down Expand Up @@ -206,6 +238,7 @@ def get(self) -> JobResult:
data = sc.DataGroup(
{str(key): val for key, val in self._processor.finalize().items()}
)
data = _add_time_coords(data, self.start_time, self.end_time)
return JobResult(
job_id=self._job_id,
workflow_id=self._workflow_id,
Expand Down
36 changes: 33 additions & 3 deletions src/ess/livedata/dashboard/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@
from .plot_params import WindowAggregation


def _extract_time_bounds_as_scalars(data: sc.DataArray) -> dict[str, sc.Variable]:
"""
Extract start_time/end_time as scalar values from 1-D coords.

When data comes from a TemporalBuffer, start_time and end_time are 1-D
coordinates (one value per time slice). This function extracts the overall
time range (min of start_time, max of end_time) for use in plot titles.

Returns an empty dict if coords don't exist or are already scalar.
"""
bounds: dict[str, sc.Variable] = {}
for name, func in [('start_time', 'min'), ('end_time', 'max')]:
if name in data.coords and data.coords[name].ndim == 1:
bounds[name] = getattr(data.coords[name], func)()
return bounds


class UpdateExtractor(ABC):
"""Extracts a specific view of buffered data."""

Expand Down Expand Up @@ -63,7 +80,10 @@ def get_required_timespan(self) -> float:

def extract(self, data: sc.DataArray) -> Any:
"""Extract the latest value from the data, unwrapped."""
return data[self._concat_dim, -1] if self._concat_dim in data.dims else data
if self._concat_dim not in data.dims:
return data
# Extract last slice - this also gets the last value from any 1-D coords
return data[self._concat_dim, -1]


def _ensure_datetime_coord(data: sc.DataArray, dim: str = 'time') -> sc.DataArray:
Expand Down Expand Up @@ -102,7 +122,8 @@ def get_required_timespan(self) -> float:

def extract(self, data: sc.DataArray) -> Any:
"""Extract all data from the buffer, converting time to datetime64."""
return _ensure_datetime_coord(data, self._concat_dim)
result = _ensure_datetime_coord(data, self._concat_dim)
return result.assign_coords(**_extract_time_bounds_as_scalars(result))


class WindowAggregatingExtractor(UpdateExtractor):
Expand Down Expand Up @@ -173,6 +194,9 @@ def extract(self, data: sc.DataArray) -> Any:
# the cutoff shift above should handle that well enough.
windowed_data = data[self._concat_dim, cutoff_time:]

# Capture time bounds before aggregation (which removes the time dimension)
time_bounds = _extract_time_bounds_as_scalars(windowed_data)

# Resolve and cache aggregator function on first call
if self._aggregator is None:
if self._aggregation == WindowAggregation.auto:
Expand All @@ -193,4 +217,10 @@ def extract(self, data: sc.DataArray) -> Any:
if self._aggregator is None:
raise ValueError(f"Unknown aggregation method: {self._aggregation}")

return self._aggregator(windowed_data, self._concat_dim)
result = self._aggregator(windowed_data, self._concat_dim)

# Restore time bounds as scalar coords on the aggregated result
if time_bounds:
result = result.assign_coords(**time_bounds)

return result
67 changes: 63 additions & 4 deletions src/ess/livedata/dashboard/plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
"""This file contains utilities for creating plots in the dashboard."""

import time
from abc import ABC, abstractmethod
from typing import Any, cast

Expand All @@ -28,6 +29,56 @@
from .scipp_to_holoviews import to_holoviews


def _format_time_ns(ns: int) -> str:
"""Format nanoseconds since epoch as HH:MM:SS.s in local time (0.1s precision)."""
from datetime import UTC, datetime

dt = datetime.fromtimestamp(ns / 1e9, tz=UTC).astimezone()
return f"{dt.strftime('%H:%M:%S')}.{dt.microsecond // 100000}"


def _compute_time_info(data: dict[str, sc.DataArray]) -> str | None:
"""
Compute time interval and lag from start_time/end_time coordinates.

Returns a formatted string like "12:34:56 - 12:35:02 (Lag: 2.3s)" or None
if no timing coordinates are found. Uses the earliest start_time and
latest end_time across all DataArrays to show the full data range.
Lag is computed from the earliest end_time (oldest data) to show worst-case
staleness.
"""
now_ns = time.time_ns()
min_start: int | None = None
min_end: int | None = None
max_end: int | None = None

for da in data.values():
if 'start_time' in da.coords:
start_ns = da.coords['start_time'].value
if min_start is None or start_ns < min_start:
min_start = start_ns
if 'end_time' in da.coords:
end_ns = da.coords['end_time'].value
if min_end is None or end_ns < min_end:
min_end = end_ns
if max_end is None or end_ns > max_end:
max_end = end_ns

if min_end is None:
return None

# Use min_end for lag (oldest data = maximum lag)
lag_s = (now_ns - min_end) / 1e9

if min_start is not None and max_end is not None:
start_str = _format_time_ns(min_start)
end_str = _format_time_ns(max_end)
return f'{start_str} - {end_str} (Lag: {lag_s:.1f}s)'
else:
end_str = _format_time_ns(min_end)
return f'{end_str} (Lag: {lag_s:.1f}s)'


class Plotter(ABC):
"""
Base class for plots that support autoscaling.
Expand Down Expand Up @@ -245,10 +296,18 @@ def __call__(
plots = [self._apply_generic_options(p) for p in plots]

if self.layout_params.combine_mode == 'overlay':
return hv.Overlay(plots).opts(shared_axes=True)
if len(plots) == 1:
return plots[0]
return hv.Layout(plots).cols(self.layout_params.layout_columns)
result = hv.Overlay(plots).opts(shared_axes=True)
elif len(plots) == 1:
result = plots[0]
else:
result = hv.Layout(plots).cols(self.layout_params.layout_columns)

# Add time interval and lag indicator as plot title
time_info = _compute_time_info(data)
if time_info is not None:
result = result.opts(title=time_info, fontsize={'title': '10pt'})

return result

def _apply_generic_options(self, plot_element: hv.Element) -> hv.Element:
"""Apply generic options like aspect ratio to a plot element."""
Expand Down
68 changes: 61 additions & 7 deletions src/ess/livedata/dashboard/temporal_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,15 @@ class TemporalBuffer(BufferProtocol[sc.DataArray]):
Validates that non-time coords and masks remain constant across all added data.
"""

# Coordinates that should be accumulated per-item rather than stored as scalars
_ACCUMULATED_COORDS = ('time', 'start_time', 'end_time')

def __init__(self) -> None:
"""Initialize empty temporal buffer."""
self._data_buffer: VariableBuffer | None = None
self._time_buffer: VariableBuffer | None = None
self._start_time_buffer: VariableBuffer | None = None
self._end_time_buffer: VariableBuffer | None = None
self._reference: sc.DataArray | None = None
self._max_memory: int | None = None
self._required_timespan: float = 0.0
Expand Down Expand Up @@ -332,6 +337,12 @@ def add(self, data: sc.DataArray) -> None:
if not self._time_buffer.append(data.coords['time']):
raise RuntimeError("Time buffer append failed unexpectedly")

# Append start_time and end_time if present
if self._start_time_buffer is not None and 'start_time' in data.coords:
self._start_time_buffer.append(data.coords['start_time'])
if self._end_time_buffer is not None and 'end_time' in data.coords:
self._end_time_buffer.append(data.coords['end_time'])

def get(self) -> sc.DataArray | None:
"""Return the complete buffer."""
if self._data_buffer is None:
Expand All @@ -344,6 +355,12 @@ def get(self) -> sc.DataArray | None:
coords = {'time': time_coord}
coords.update(self._reference.coords)

# Add accumulated start_time and end_time as 1-D coords
if self._start_time_buffer is not None:
coords['start_time'] = self._start_time_buffer.get()
if self._end_time_buffer is not None:
coords['end_time'] = self._end_time_buffer.get()

masks = dict(self._reference.masks)

return sc.DataArray(data=data_var, coords=coords, masks=masks)
Expand All @@ -352,6 +369,8 @@ def clear(self) -> None:
"""Clear all buffered data."""
self._data_buffer = None
self._time_buffer = None
self._start_time_buffer = None
self._end_time_buffer = None
self._reference = None

def set_required_timespan(self, seconds: float) -> None:
Expand All @@ -368,11 +387,16 @@ def set_max_memory(self, max_bytes: int) -> None:

def _initialize_buffers(self, data: sc.DataArray) -> None:
"""Initialize buffers with first data, storing reference metadata."""
# Store reference as slice at time=0 without time coord
# Store reference as slice at time=0 without accumulated coords
if 'time' in data.dims:
self._reference = data['time', 0].drop_coords('time')
ref = data['time', 0]
else:
self._reference = data.drop_coords('time')
ref = data
# Drop all coords that will be accumulated (not stored in reference)
for coord in self._ACCUMULATED_COORDS:
if coord in ref.coords:
ref = ref.drop_coords(coord)
self._reference = ref

# Calculate max_capacity from memory limit
if 'time' in data.dims:
Expand All @@ -393,6 +417,25 @@ def _initialize_buffers(self, data: sc.DataArray) -> None:
data=data.coords['time'], max_capacity=max_capacity, concat_dim='time'
)

# Create buffers for start_time and end_time if present
if 'start_time' in data.coords:
self._start_time_buffer = VariableBuffer(
data=data.coords['start_time'],
max_capacity=max_capacity,
concat_dim='time',
)
else:
self._start_time_buffer = None

if 'end_time' in data.coords:
self._end_time_buffer = VariableBuffer(
data=data.coords['end_time'],
max_capacity=max_capacity,
concat_dim='time',
)
else:
self._end_time_buffer = None

def _trim_to_timespan(self, new_data: sc.DataArray) -> None:
"""Trim buffer to keep only data within required timespan."""
if self._required_timespan < 0:
Expand All @@ -401,8 +444,7 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None:
if self._required_timespan == 0.0:
# Keep only the latest value - drop all existing data
drop_count = self._data_buffer.size
self._data_buffer.drop(drop_count)
self._time_buffer.drop(drop_count)
self._drop_from_all_buffers(drop_count)
return

# Get latest time from new data
Expand All @@ -425,9 +467,17 @@ def _trim_to_timespan(self, new_data: sc.DataArray) -> None:
# Find first True index
drop_count = int(keep_mask.values.argmax())

# Trim both buffers by same amount to keep them in sync
# Trim all buffers by same amount to keep them in sync
self._drop_from_all_buffers(drop_count)

def _drop_from_all_buffers(self, drop_count: int) -> None:
"""Drop data from all buffers to keep them in sync."""
self._data_buffer.drop(drop_count)
self._time_buffer.drop(drop_count)
if self._start_time_buffer is not None:
self._start_time_buffer.drop(drop_count)
if self._end_time_buffer is not None:
self._end_time_buffer.drop(drop_count)

def _metadata_matches(self, data: sc.DataArray) -> bool:
"""Check if incoming data's metadata matches stored reference metadata."""
Expand All @@ -438,6 +488,10 @@ def _metadata_matches(self, data: sc.DataArray) -> bool:
new = data

# Create template with reference data but incoming metadata
template = new.assign(self._reference.data).drop_coords('time')
# Drop all accumulated coords for comparison
template = new.assign(self._reference.data)
for coord in self._ACCUMULATED_COORDS:
if coord in template.coords:
template = template.drop_coords(coord)

return sc.identical(self._reference, template)
Loading
Loading