Skip to content

Instrumentation rompy model forcing #156

@benjaminleighton

Description

@benjaminleighton

Rompy Forcing Instrumentation Layer / Virtual Sensors

1. Motivation and Goal

The primary goal is to provide users with a standardized, model-agnostic way to inspect, validate, and analyze forcing data as it will be seen by the model (after source access, filtering, interpolation, and processing) before committing to potentially lengthy model runs. This addresses several needs:

  • Debugging: Easily identify issues with forcing data sources, units, coordinate systems, or interpolation methods across any supported model.
  • Validation: Check if processed forcing data at specific locations falls within expected ranges or meets criteria, using a consistent methodology.
  • Understanding Forcing: Visualize timeseries or spatial patterns of the input data via a common interface, facilitating comparison between different model setups or forcing sources.
  • Lazy Data Inspection: Provide a uniform mechanism to trigger processing and retrieval of data from lazy sources for specific points/times.
  • Statistical Analysis: Compute summary statistics using standardized tools operating on a common data format.
  • Interoperability: Enable the connection of generic visualization and analysis frameworks to Rompy's forcing data pipeline.

2. Proposed Approach: Refactoring for a Generic Core

To achieve model agnosticism without duplicating logic, the approach focuses on refactoring the existing data retrieval and processing into a generic core mechanism leveraged by both model setup and the instrumentation layer.

  1. Internal Data Retrieval Method (_retrieve_data):

    • A new internal method (e.g., _retrieve_data) will be implemented within the relevant base classes (DataGrid, DataBoundary, BoundaryWaveStation, etc.) and potentially overridden by model-specific implementations only where absolutely necessary for model-specific processing, not data access.
    • This method encapsulates the core tasks: accessing the Source, applying time filters (based on period + buffer), applying spatial selection/interpolation (based on requested locations and grid context), and performing necessary unit conversions or basic processing common to the data type (e.g., calculating speed/direction from u/v wind).
    • Crucially, _retrieve_data operates on generic inputs:
      • locations: Primarily defined using coordinate arrays (e.g., {'x': np.array([...]), 'y': np.array([...]), 'crs': ...}). Model-specific locators (like boundary side names) should be translated into coordinates by the caller (either the get method or a sensor) before passing them to _retrieve_data.
      • period: A standard TimeRange object.
      • grid: A BaseGrid object providing spatial context (CRS, potentially extents for efficient source filtering).
    • Standardized Output: It always returns a standardized, lazy-loaded xarray.Dataset. This dataset serves as the generic interface point.
  2. Refocused get() Method:

    • The existing public get(destdir, grid, period) method remains the entry point for generating model-specific files.
    • It determines the model-required locations (e.g., using grid.boundary_points()).
    • It calls self._retrieve_data(locations=model_locations, period=period, grid=grid).
    • It takes the returned xr.Dataset and performs the final, model-specific formatting and file writing.
  3. Instrumentation Layer (Sensors/Analysis Methods):

    • This layer provides the user-facing tools for inspection. It could be implemented as methods added to forcing objects or, preferably, as separate Sensor classes (e.g., PointSensor, BoundarySensor).
    • Configuration: Configuring a sensor will likely involve specifying:
      • The target forcing component within the ModelRun.config (e.g., model.config.forcing.boundary) to provide context (source, processing rules). This link is necessary to ensure the sensor inspects data consistent with the model setup.
      • The desired sensor locations (ideally generic coordinates).
      • The variables to inspect.
    • Operation: The sensor will:
      • Translate any model-specific location identifiers (if used) into coordinate arrays.
      • Call the associated forcing component's _retrieve_data(locations=sensor_locations, period=model_run.period, grid=model_run.grid).
      • Operate exclusively on the returned xarray.Dataset for plotting, statistics, validation etc.

3. The Generic Interface: _retrieve_data and xarray.Dataset

The combination of the standardized _retrieve_data method signature and its xarray.Dataset return type forms the generic interface:

  • Input: Any sensor or analysis tool interacts with forcing data by requesting specific locations (coordinates), a period, and providing grid context via the _retrieve_data method of a forcing component instance.
  • Output: The forcing component processes the request using its configured source and processing rules, returning a predictable xarray.Dataset. This dataset should ideally:
    • Use consistent dimension names (e.g., time, site or station for multiple locations).
    • Use consistent coordinate names (e.g., lon, lat, potentially depth).
    • Use CF standard names for variables where applicable.
    • Carry necessary metadata (units, descriptions).
    • Be Dask-backed for lazy evaluation.
  • Connecting External Tools: Any external visualization or analysis framework designed to work with xarray.Dataset can readily consume the output of _retrieve_data (via a sensor or direct call), without needing specific knowledge of the underlying model (SCHISM, SWAN, XBeach) or the original data source format.

4. Key Design Principles (Reiteration)

  • xarray.Dataset Standard: The cornerstone of the generic interface.
  • DRY: Core data retrieval/processing logic is centralized in _retrieve_data.
  • Common Grid Interface: BaseGrid subclasses provide necessary spatial context (CRS, boundaries) consistently.
  • Separation of Concerns:
    • Source: Raw data access.
    • _retrieve_data: Generic data processing -> xr.Dataset.
    • get(): Model-specific formatting/writing from xr.Dataset.
    • Sensors/Analysis: User interaction, calls _retrieve_data, works on xr.Dataset.

5. Impacted Methods (Summary)

The primary change involves refactoring the get() methods of data handling components (listed in the previous response) to extract core logic into _retrieve_data. Internal helper methods (_sel_boundary, _sel_locations, _calculate_stats, etc.) will be generalized to support this core method and operate based on coordinate locations.

6. Benefits of Enhanced Genericity

  • True Model Agnosticism (for Analysis): Tools built on top of the sensor layer / _retrieve_data interface don't need model-specific code.
  • Easier Integration: Simplifies connecting external plotting libraries, statistical packages, or validation frameworks.
  • Improved User Experience: Users learn one way to inspect forcing data across different models within Rompy.
  • Future-Proofing: Easier to add support for new models or forcing types by adhering to the _retrieve_data interface.

7. Conclusion

By carefully refactoring the data handling to centralize core logic in a _retrieve_data method that operates on generic coordinate locations and returns a standardized xarray.Dataset, Rompy can provide a powerful and truly model-agnostic instrumentation layer. While configuring the specific sensor instance might require referencing the model setup (to inherit the correct source and processing context), the subsequent analysis and visualization operate on a generic, well-defined data interface, fulfilling the goal of decoupling analysis tools from model internals.


Example 1: rompy.core.boundary.DataBoundary.get

  • Current Role: Filters time, selects boundary points (_sel_boundary), gets the dataset, writes a generic NetCDF file.
  • Key Internal Call: self._sel_boundary(grid) which determines boundary points and uses self.ds.sel/interp.

Before Refactor (Simplified):

# In class DataBoundary
def get(self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None) -> Path:
    # 1. Time Filtering (potentially modifies self.filter)
    if self.crop_data and time is not None:
        self._filter_time(time)
    # 2. Spatial Selection (Calls ds.sel/interp internally)
    ds_boundary = self._sel_boundary(grid) # Returns xr.Dataset
    # 3. Write Model-Specific File (Generic NetCDF here)
    outfile = Path(destdir) / f"{self.id}.nc"
    ds_boundary.to_netcdf(outfile)
    return outfile

After Refactor:

# In class DataBoundary
def _retrieve_data(self, locations: Dict[str, np.ndarray], period: TimeRange, grid: BaseGrid) -> xr.Dataset:
    # 1. Open Source
    ds_source = self.source.open(variables=self.variables, filters=self.filter, coords=self.coords)
    # 2. Apply time filter (using period + buffer) -> returns ds
    ds_time_filtered = self._apply_time_filter(ds_source, period)
    # 3. Apply spatial selection/interpolation (using 'locations' dict) -> returns ds
    #    This replaces the logic previously in _sel_boundary
    ds_processed = self._apply_spatial_selection(ds_time_filtered, locations, grid)
    # 4. Final standardization (if needed)
    return ds_processed

def get(self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None) -> Path:
    # 1. Determine locations needed for the *model file*
    xbnd, ybnd = self._boundary_points(grid=grid) # Uses grid boundary logic
    model_locations = {self.coords.x: xbnd, self.coords.y: ybnd}

    # 2. Call internal method to get processed data
    ds_boundary = self._retrieve_data(locations=model_locations, period=time, grid=grid)

    # 3. Write Model-Specific File (Generic NetCDF here)
    outfile = Path(destdir) / f"{self.id}.nc"
    ds_boundary.to_netcdf(outfile)
    return outfile

Key Change: Data fetching, time filtering, and spatial selection (_sel_boundary's core logic, now generalized as _apply_spatial_selection) move to _retrieve_data. get now orchestrates: determine model locations -> call _retrieve_data -> write file.


Example 2: rompy.swan.data.SwanDataGrid.get

  • Current Role: Filters time/grid, gets dataset, calls ds.swan.to_bottom_grid or ds.swan.to_inpgrid which writes SWAN ASCII files, returns command strings.
  • Key Internal Call: self.ds.swan.to_...

Before Refactor (Simplified):

# In class SwanDataGrid
def get(self, destdir: str | Path, grid: Optional[SwanGrid] = None, time: Optional[TimeRange] = None) -> str:
    # 1. Time/Grid Filtering (potentially modifies self.filter)
    if self.crop_data:
        # ... calls _filter_grid, _filter_time ...
    # 2. Get Dataset
    ds = self.ds
    # 3. Write Model-Specific File and get commands (via accessor)
    output_file = os.path.join(destdir, f"{self.var.value}.grd")
    if self.var.value == "bottom":
        # This accessor method currently WRITES the file
        inpgrid_cmd, readgrid_cmd = ds.swan.to_bottom_grid(output_file, ...)
    else:
        # This accessor method currently WRITES the file
        inpgrid_cmd, readgrid_cmd = ds.swan.to_inpgrid(output_file, ...)
    return f"{inpgrid_cmd}\n{readgrid_cmd}\n"

After Refactor:

# In class SwanDataGrid
def _retrieve_data(self, locations: Dict[str, np.ndarray], period: TimeRange, grid: BaseGrid) -> xr.Dataset:
    # ... (Standard logic: open source, filter time, select/interp spatial) ...
    # Assumes locations here correspond to the SWAN computational grid nodes needed
    ds_source = self.source.open(...)
    ds_time_filtered = self._apply_time_filter(ds_source, period)
    ds_processed = self._apply_spatial_selection(ds_time_filtered, locations, grid)
    return ds_processed

# Refactored Accessor Method (Conceptual)
# In class Swan_accessor
def format_swan_grid_data(self, ds_processed: xr.Dataset, z_variable: str, ...) -> Tuple[np.ndarray, SwanGrid]:
    # Takes the dataset prepared by _retrieve_data
    # Formats the z_variable data array as needed (e.g., transpose, fillna)
    # Creates the SwanGrid object representing this data's layout
    # Returns the formatted numpy array and the SwanGrid
    data_array = ds_processed[z_variable].fillna(FILL_VALUE).values # Example
    # ... potentially more formatting ...
    swan_grid = SwanGrid(...) # Defined from ds_processed coords
    return data_array, swan_grid

# Refactored get method
def get(self, destdir: str | Path, grid: Optional[SwanGrid] = None, time: Optional[TimeRange] = None) -> str:
    # 1. Determine locations needed for the model file (all nodes of the SwanGrid)
    #    Need to ensure correct CRS handling if source is different
    model_locations = {self.coords.x: grid.x.flatten(), self.coords.y: grid.y.flatten()}

    # 2. Call internal method to get processed data at grid nodes
    ds_processed = self._retrieve_data(locations=model_locations, period=time, grid=grid)

    # 3. Format/Write Model-Specific File
    output_file = os.path.join(destdir, f"{self.var.value}.grd")
    if self.var.value == "bottom":
        # Assumes a refactored accessor or a helper function
        # that takes the dataset and returns formatted numpy array + SwanGrid object
        data_array, swan_grid_repr = ds_processed.swan.format_swan_grid_data(ds_processed, self.z1, ...)
        # Write the formatted numpy array
        with open(output_file, "w") as stream:
            np.savetxt(stream, data_array, fmt=...) # Example write logic
        # Generate commands using the SwanGrid object representation
        inpgrid_cmd = f"INPGRID BOTTOM {swan_grid_repr.inpgrid}"
        readgrid_cmd = f"READINP BOTTOM {self.fac} '{Path(output_file).name}' 3 FREE" # Simplified
    else:
        # Similar logic for vector inputs using z1, z2
        data_array1, data_array2, swan_grid_repr = ds_processed.swan.format_swan_vector_data(ds_processed, self.z1, self.z2, ...)
        # Write data_array1, data_array2 sequentially
        with open(output_file, "w") as stream:
             # ... write logic ...
        # Generate commands
        inpgrid_cmd = f"INPGRID {self.var.name} {swan_grid_repr.inpgrid} NONSTATION ..." # Simplified
        readgrid_cmd = f"READINP {self.var.name} {self.fac} '{Path(output_file).name}' ..." # Simplified

    return f"{inpgrid_cmd}\n{readgrid_cmd}\n"

Key Change: The responsibility of writing the SWAN ASCII file moves out of the xarray accessor (ds.swan.to_...) and into the get method itself (or a dedicated helper function called by get). The get method first retrieves the necessary data for all required grid points using _retrieve_data, then formats and writes it.


Example 3: rompy.swan.boundary.BoundspecSegmentXY.get

  • Current Role: Filters time/grid, selects boundary points (_sel_boundary), calculates segment means, loops through segments, writes multiple TPAR/SPEC2D files, returns list of filenames and command strings.
  • Key Internal Call: self._sel_boundary(grid) and self.tpar/ds.spec.to_swan() for writing.

Before Refactor (Simplified):

# In class BoundspecSegmentXY
def get(self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None) -> Tuple[list, str]:
    # 1. Time/Grid Filtering
    # ...
    # 2. Spatial Selection (returns ds with 'site' dimension along boundary)
    ds_boundary = self._sel_boundary(grid)
    # 3. Loop, Average, Write Files, Build Commands
    cmds = []
    filenames = []
    for ind in range(ds_boundary.site.size - 1):
        ds_seg = ds_boundary.isel(site=slice(ind, ind + 2))
        # Average spectral data across the two points for the segment
        self._ds = ds_seg.mean("site") # Temporary storage for processing
        filename = Path(destdir) / f"{self.id}_{self.file_type}_{ind:03d}.bnd"
        if self.file_type == "tpar":
            write_tpar(self.tpar, filename) # self.tpar uses self._ds
        elif self.file_type == "spec2d":
            self._ds.spec.to_swan(filename) # Uses self._ds
        # Build command for this segment
        location_cmd = SEGMENT(points=XY(x=ds_seg.lon.values, y=ds_seg.lat.values)).render()
        file_cmd = CONSTANTFILE(fname=filename.name, seq=1).render()
        cmds.append(f"BOUNDSPEC {location_cmd}{file_cmd}")
        filenames.append(filename)
    return filenames, "\n".join(cmds)

After Refactor:

# In class BoundspecSegmentXY
def _retrieve_data(self, locations: Dict[str, np.ndarray], period: TimeRange, grid: BaseGrid) -> xr.Dataset:
    # ... (Standard logic: open source, filter time, select/interp spatial) ...
    # Returns dataset indexed by 'site' corresponding to the requested boundary points
    ds_source = self.source.open(...)
    ds_time_filtered = self._apply_time_filter(ds_source, period)
    # _apply_spatial_selection needs to handle multiple points correctly
    ds_processed = self._apply_spatial_selection(ds_time_filtered, locations, grid)
    return ds_processed

def get(self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None) -> Tuple[list, str]:
    # 1. Determine locations needed (all points along the defined boundary)
    xbnd, ybnd = self._boundary_points(grid=grid) # Gets all points along the segment(s)
    model_locations = {self.coords.x: xbnd, self.coords.y: ybnd}

    # 2. Call internal method to get processed data along the boundary
    ds_boundary = self._retrieve_data(locations=model_locations, period=time, grid=grid)

    # 3. Loop, Average, Write Files, Build Commands (Operates on ds_boundary)
    cmds = []
    filenames = []
    for ind in range(ds_boundary.site.size - 1):
        ds_seg_boundary_points = ds_boundary.isel(site=slice(ind, ind + 2))
        # Average spectral data across the two points for the segment
        # Note: Averaging spectra might need careful implementation depending on desired outcome
        ds_seg_averaged = ds_seg_boundary_points.mean("site") # Example simple averaging

        filename = Path(destdir) / f"{self.id}_{self.file_type}_{ind:03d}.bnd"
        if self.file_type == "tpar":
            # Calculate tpar dataframe directly from ds_seg_averaged
            tpar_df = ds_seg_averaged.spec.stats(...).to_pandas()
            write_tpar(tpar_df, filename)
        elif self.file_type == "spec2d":
            ds_seg_averaged.spec.to_swan(filename)

        # Build command for this segment using boundary point coordinates
        location_cmd = SEGMENT(points=XY(x=ds_seg_boundary_points.lon.values, y=ds_seg_boundary_points.lat.values)).render()
        file_cmd = CONSTANTFILE(fname=filename.name, seq=1).render()
        cmds.append(f"BOUNDSPEC {location_cmd}{file_cmd}")
        filenames.append(filename)

    return filenames, "\n".join(cmds)

Key Change: Data retrieval for all boundary points happens first via _retrieve_data. The get method then consumes the resulting ds_boundary dataset, performing the segment looping, averaging (operating on the dataset), file writing, and command string generation.


Example 4: rompy_xbeach.data.XBeachBathy.get

  • Current Role: Filters time, reprojects, interpolates (interpolator.get), applies extensions (extension.get, expand_lateral), writes multiple text files (np.savetxt).
  • Key Internal Call: self.interpolator.get, self.extension.get, self.expand_lateral, np.savetxt.

Before Refactor (Simplified):

# In class XBeachBathy
def get(self, destdir: str | Path, grid: RegularGrid, ...) -> tuple[Path, Path, Path, RegularGrid]:
    # 1. Time Filter (if needed)
    # ...
    # 2. Reproject source dataset if CRS differs
    # ... dset = self.ds.rio.reproject(grid.crs) ...
    # 3. Interpolate to model grid nodes
    data = self.interpolator.get(x=dset.x, y=dset.y, data=dset[variable], xi=grid.x, yi=grid.y)
    current_grid = grid # Keep track of grid matching 'data'
    # 4. Apply Seaward Extension
    data, current_grid = self.extension.get(data, current_grid, self.posdwn)
    # 5. Apply Lateral Extension
    data, current_grid = self.expand_lateral(data, current_grid)
    # 6. Write Files
    xfile = Path(destdir) / "xdata.txt"
    yfile = Path(destdir) / "ydata.txt"
    depfile = Path(destdir) / "bathy.txt"
    np.savetxt(xfile, current_grid.x)
    np.savetxt(yfile, current_grid.y)
    np.savetxt(depfile, data)
    return xfile, yfile, depfile, current_grid # Return potentially modified grid

After Refactor:

# In class XBeachBathy (inherits _retrieve_data from a base like DataGrid)
def _retrieve_data(self, locations: Dict[str, np.ndarray], period: TimeRange, grid: BaseGrid) -> xr.Dataset:
    # 1. Open Source
    ds_source = self.source.open(...)
    # 2. Apply time filter (if relevant for bathy source)
    ds_time_filtered = self._apply_time_filter(ds_source, period)
    # 3. Reproject source dataset if CRS differs from grid.crs
    #    (Could happen here or in _apply_spatial_selection)
    if grid.crs is not None and grid.crs.to_epsg() != self.crs.to_epsg():
         ds_source_reproj = ds_time_filtered.rio.reproject(grid.crs).rename(...)
    else:
         ds_source_reproj = ds_time_filtered
    # 4. Apply spatial selection/interpolation (using 'locations' dict and interpolator)
    #    This replaces the direct call to self.interpolator.get
    ds_processed = self._apply_spatial_selection(ds_source_reproj, locations, grid)
    # 5. Return dataset indexed appropriately (e.g., ny, nx or site)
    return ds_processed

def get(self, destdir: str | Path, grid: RegularGrid, ...) -> tuple[Path, Path, Path, RegularGrid]:
    # 1. Determine locations needed (all nodes of the initial grid)
    #    Need grid coords BEFORE potential extension
    initial_grid_nodes = {self.coords.x: grid.x.flatten(), self.coords.y: grid.y.flatten()}

    # 2. Call internal method to get interpolated data at initial grid nodes
    ds_interpolated = self._retrieve_data(locations=initial_grid_nodes, period=None, grid=grid)

    # 3. Reshape data back to 2D grid matching 'grid'
    #    (Assume _retrieve_data returns flat 'site' dim, needs reshaping)
    data_2d = ds_interpolated[self.variables[0]].values.reshape(grid.ny, grid.nx) # Example reshape
    current_grid = grid # Grid matching data_2d

    # 4. Apply Seaward Extension (operates on numpy array + grid object)
    data_2d, current_grid = self.extension.get(data_2d, current_grid, self.posdwn)

    # 5. Apply Lateral Extension (operates on numpy array + grid object)
    data_2d, current_grid = self.expand_lateral(data_2d, current_grid)

    # 6. Write Files using final data and potentially modified grid
    xfile = Path(destdir) / "xdata.txt"
    yfile = Path(destdir) / "ydata.txt"
    depfile = Path(destdir) / "bathy.txt"
    np.savetxt(xfile, current_grid.x)
    np.savetxt(yfile, current_grid.y)
    np.savetxt(depfile, data_2d)

    return xfile, yfile, depfile, current_grid # Return potentially modified grid

Key Change: Data retrieval and interpolation (_retrieve_data) yields an intermediate xr.Dataset (or just the core data needed). The get method then applies the geometry-modifying steps (extensions) to this intermediate data and the associated grid object before writing the final files. This separates spatial data retrieval from spatial data manipulation (extensions).


These examples illustrate the pattern: get orchestrates by defining model needs and calling _retrieve_data, then performs final model-specific formatting/writing. _retrieve_data becomes the generic engine for accessing, filtering, and processing data into a common xr.Dataset format based on requested locations.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions