From 27437e1bf057e67dbefad51e5ce42f6add225b26 Mon Sep 17 00:00:00 2001 From: Tiomat85 Date: Thu, 15 Jan 2026 16:03:57 +0000 Subject: [PATCH 1/3] Function_Warp improvements and unit tests This adds unit tests for function_warp testing various data types and shapes, and validates shape and quick sanity check of data. Additionally extended the function_warp functionality itself to be able to handle data of higher dimensionality than the provided warp map, with it broadcasting the map over the higher dimensions. So as an example if the data provided was of shape (8,4,32,32) and the warp map was (32,32) it broadcasts it applying the warp map to each data dimension correctly for each of the sequence/scan dimensions. --- nion/data/Core.py | 46 ++-- nion/data/test/Core_test.py | 428 ++++++++++++++++++++++++++++++++++++ 2 files changed, 456 insertions(+), 18 deletions(-) diff --git a/nion/data/Core.py b/nion/data/Core.py index 09d5963..9e855aa 100755 --- a/nion/data/Core.py +++ b/nion/data/Core.py @@ -1781,31 +1781,41 @@ def calculate_data() -> _ImageDataType: return DataAndMetadata.new_data_and_metadata(calculate_data(), intensity_calibration=data_and_metadata.intensity_calibration, dimensional_calibrations=resampled_dimensional_calibrations) -def function_warp(data_and_metadata_in: _DataAndMetadataLike, coordinates_in: typing.Sequence[_DataAndMetadataLike], order: int = 1) -> DataAndMetadata.DataAndMetadata: +def function_warp(data_and_metadata_in: _DataAndMetadataLike, coordinates_in: typing.Sequence[ + _DataAndMetadataLike], order: int = 1) -> DataAndMetadata.DataAndMetadata: data_and_metadata = DataAndMetadata.promote_ndarray(data_and_metadata_in) coordinates = [DataAndMetadata.promote_ndarray(c) for c in coordinates_in] - coords = numpy.moveaxis(numpy.dstack([coordinate.data for coordinate in coordinates]), -1, 0) + coords = numpy.stack([c.data.astype(float) for c in coordinates], axis=0) data = data_and_metadata._data_ex - if data_and_metadata.is_data_rgb: - rgb: numpy.typing.NDArray[numpy.uint8] = numpy.zeros(tuple(data_and_metadata.dimensional_shape) + (3,), numpy.uint8) - rgb[..., 0] = scipy.ndimage.map_coordinates(data[..., 0], coords, order=order) - rgb[..., 1] = scipy.ndimage.map_coordinates(data[..., 1], coords, order=order) - rgb[..., 2] = scipy.ndimage.map_coordinates(data[..., 2], coords, order=order) - return DataAndMetadata.new_data_and_metadata(rgb, - dimensional_calibrations=data_and_metadata.dimensional_calibrations, - intensity_calibration=data_and_metadata.intensity_calibration) - elif data_and_metadata.is_data_rgba: - rgba: numpy.typing.NDArray[numpy.uint8] = numpy.zeros(tuple(data_and_metadata.dimensional_shape) + (4,), numpy.uint8) - rgba[..., 0] = scipy.ndimage.map_coordinates(data[..., 0], coords, order=order) - rgba[..., 1] = scipy.ndimage.map_coordinates(data[..., 1], coords, order=order) - rgba[..., 2] = scipy.ndimage.map_coordinates(data[..., 2], coords, order=order) - rgba[..., 3] = scipy.ndimage.map_coordinates(data[..., 3], coords, order=order) - return DataAndMetadata.new_data_and_metadata(rgba, + num_frame_dims = coords.shape[0] + + if data_and_metadata.is_data_rgb or data_and_metadata.is_data_rgba: + # Last dimension is channels + leading_shape = data.shape[:-num_frame_dims - 1] + output_shape = leading_shape + coords.shape[1:] + channels = 3 if data_and_metadata.is_data_rgb else 4 + output = numpy.zeros(tuple(output_shape) + (channels,), numpy.uint8) + + for idx in numpy.ndindex(leading_shape): + for chan in range(channels): + output[idx + (..., chan)] = scipy.ndimage.map_coordinates( + data[idx + (..., chan)], + coords, + order=order) + + return DataAndMetadata.new_data_and_metadata(data=output, dimensional_calibrations=data_and_metadata.dimensional_calibrations, intensity_calibration=data_and_metadata.intensity_calibration) else: + leading_shape = data.shape[:-num_frame_dims] + output_shape = leading_shape + coords.shape[1:] + output = numpy.zeros(output_shape, dtype=data.dtype) + + for idx in numpy.ndindex(leading_shape): + output[idx] = scipy.ndimage.map_coordinates(data[idx], coords, order=order) + return DataAndMetadata.new_data_and_metadata( - scipy.ndimage.map_coordinates(data, coords, order=order), + data=output, dimensional_calibrations=data_and_metadata.dimensional_calibrations, intensity_calibration=data_and_metadata.intensity_calibration) diff --git a/nion/data/test/Core_test.py b/nion/data/test/Core_test.py index 6f1d8d4..375ef1c 100755 --- a/nion/data/test/Core_test.py +++ b/nion/data/test/Core_test.py @@ -1042,6 +1042,434 @@ def test_operations_using_copy_on_h5py_array(self) -> None: Core.function_rebin_2d(d, (2, 2)) Core.function_resample_2d(d, (3, 3)) + def test_element_data_returns_ndarray(self) -> None: + bio = io.BytesIO() + with h5py.File(bio, "w") as f: + dataset = f.create_dataset("data", data=numpy.ones((5, 6), dtype=numpy.float32)) + xdata = DataAndMetadata.new_data_and_metadata(data=dataset) + element, _ = Core.function_element_data_no_copy(xdata, 0, (0, 0)) + assert element + # test whether inline math works, implying it is a numpy array + elementp1 = element.data + 4 + # test directly its type + self.assertIsInstance(element.data, numpy.ndarray) + + def test_elliptical_mask_generation(self) -> None: + bounds = Geometry.FloatRect.make(((0.2, 0.2), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[200, 200], 0) # top left + self.assertEqual(mask.data[200, 299], 0) # bottom left + self.assertEqual(mask.data[299, 299], 0) # bottom right + self.assertEqual(mask.data[299, 200], 0) # top right + + self.assertEqual(mask.data[249, 200], 1) # center top + self.assertEqual(mask.data[249, 199], 0) # center top + self.assertEqual(mask.data[299, 249], 1) # center right + self.assertEqual(mask.data[300, 249], 0) # center right + self.assertEqual(mask.data[249, 299], 1) # center bottom + self.assertEqual(mask.data[249, 300], 0) # center bottom + self.assertEqual(mask.data[200, 249], 1) # center left + self.assertEqual(mask.data[199, 249], 0) # center left + + def test_elliptical_mask_generation_out_of_bounds_top_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 49], 0) # bottom right + + self.assertEqual(mask.data[49, 0], 1) # center right + self.assertEqual(mask.data[50, 0], 0) # center right + self.assertEqual(mask.data[0, 49], 1) # center bottom + self.assertEqual(mask.data[0, 50], 0) # center bottom + + def test_elliptical_mask_generation_out_of_bounds_center_top(self) -> None: + bounds = Geometry.FloatRect.make(((0.45, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[450, 49], 0) # bottom left + self.assertEqual(mask.data[549, 49], 0) # bottom right + + self.assertEqual(mask.data[549, 0], 1) # center right + self.assertEqual(mask.data[550, 0], 0) # center right + self.assertEqual(mask.data[500, 49], 1) # center bottom + self.assertEqual(mask.data[500, 50], 0) # center bottom + self.assertEqual(mask.data[450, 0], 1) # center left + self.assertEqual(mask.data[449, 0], 0) # center left + + def test_elliptical_mask_generation_out_of_bounds_top_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 49], 0) # bottom left + + self.assertEqual(mask.data[999, 49], 1) # center bottom + self.assertEqual(mask.data[999, 50], 0) # center bottom + self.assertEqual(mask.data[950, 0], 1) # center left + self.assertEqual(mask.data[949, 0], 0) # center left + + def test_elliptical_mask_generation_out_of_bounds_center_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, 0.45), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 450], 0) # top left + self.assertEqual(mask.data[950, 549], 0) # bottom left + + self.assertEqual(mask.data[999, 450], 1) # center top + self.assertEqual(mask.data[999, 449], 0) # center top + self.assertEqual(mask.data[999, 549], 1) # center bottom + self.assertEqual(mask.data[999, 550], 0) # center bottom + self.assertEqual(mask.data[950, 500], 1) # center left + self.assertEqual(mask.data[949, 550], 0) # center left + + def test_elliptical_mask_generation_out_of_bounds_bottom_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 950], 0) # top left + + self.assertEqual(mask.data[999, 950], 1) # center top + self.assertEqual(mask.data[999, 949], 0) # center top + self.assertEqual(mask.data[950, 999], 1) # center left + self.assertEqual(mask.data[949, 999], 0) # center left + + def test_elliptical_mask_generation_out_of_bound_center_bottom(self) -> None: + bounds = Geometry.FloatRect.make(((0.45, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[450, 950], 0) # top left + self.assertEqual(mask.data[549, 950], 0) # top right + + self.assertEqual(mask.data[500, 950], 1) # center top + self.assertEqual(mask.data[500, 949], 0) # center top + self.assertEqual(mask.data[549, 999], 1) # center right + self.assertEqual(mask.data[550, 999], 0) # center right + self.assertEqual(mask.data[450, 999], 1) # center left + self.assertEqual(mask.data[449, 999], 0) # center left + + def test_elliptical_mask_generation_out_of_bounds_bottom_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 950], 0) # top right + + self.assertEqual(mask.data[0, 950], 1) # center top + self.assertEqual(mask.data[0, 949], 0) # center top + self.assertEqual(mask.data[49, 999], 1) # center right + self.assertEqual(mask.data[50, 999], 0) # center right + + def test_elliptical_mask_generation_out_of_bounds_center_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, 0.45), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 549], 0) # bottom right + self.assertEqual(mask.data[49, 450], 0) # top right + + self.assertEqual(mask.data[0, 450], 1) # center top + self.assertEqual(mask.data[0, 449], 0) # center top + self.assertEqual(mask.data[49, 500], 1) # center right + self.assertEqual(mask.data[50, 500], 0) # center right + self.assertEqual(mask.data[0, 549], 1) # center bottom + self.assertEqual(mask.data[0, 550], 0) # center bottom + + def test_elliptical_mask_generation_out_of_bounds_completely(self) -> None: + bounds = Geometry.FloatRect.make(((1.1, 1.1), (0.1, 0.1))) + mask_xdata = Core.function_make_elliptical_mask((1000, 1000), bounds.center.as_tuple(), bounds.size.as_tuple(), 0) + mask = mask_xdata.data + self.assertTrue(numpy.all(mask == 0)) + + def test_rectangular_mask_generation(self) -> None: + bounds = Geometry.FloatRect.make(((0.2, 0.2), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[200, 200], 1) # top left + self.assertEqual(mask.data[199, 199], 0) # top left + self.assertEqual(mask.data[200, 299], 1) # bottom left + self.assertEqual(mask.data[199, 300], 0) # bottom left + self.assertEqual(mask.data[299, 299], 1) # bottom right + self.assertEqual(mask.data[300, 300], 0) # bottom right + self.assertEqual(mask.data[299, 200], 1) # top right + self.assertEqual(mask.data[300, 199], 0) # top right + + self.assertEqual(mask.data[249, 200], 1) # center top + self.assertEqual(mask.data[249, 199], 0) # center top + self.assertEqual(mask.data[299, 249], 1) # center right + self.assertEqual(mask.data[300, 249], 0) # center right + self.assertEqual(mask.data[249, 299], 1) # center bottom + self.assertEqual(mask.data[249, 300], 0) # center bottom + self.assertEqual(mask.data[200, 249], 1) # center left + self.assertEqual(mask.data[199, 249], 0) # center left + + def test_rectangular_mask_generation_out_of_bounds_top_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size, 0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 49], 1) # bottom right + self.assertEqual(mask.data[50, 50], 0) # bottom right + + self.assertEqual(mask.data[49, 0], 1) # center right + self.assertEqual(mask.data[50, 0], 0) # center right + self.assertEqual(mask.data[0, 49], 1) # center bottom + self.assertEqual(mask.data[0, 50], 0) # center bottom + + def test_rectangular_mask_generation_out_of_bounds_center_top(self) -> None: + bounds = Geometry.FloatRect.make(((0.45, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size, 0) + mask = mask_xdata.data + self.assertEqual(mask.data[450, 49], 1) # bottom left + self.assertEqual(mask.data[449, 50], 0) # bottom left + self.assertEqual(mask.data[549, 49], 1) # bottom right + self.assertEqual(mask.data[550, 50], 0) # bottom right + + self.assertEqual(mask.data[549, 0], 1) # center right + self.assertEqual(mask.data[550, 0], 0) # center right + self.assertEqual(mask.data[500, 49], 1) # center bottom + self.assertEqual(mask.data[500, 50], 0) # center bottom + self.assertEqual(mask.data[450, 0], 1) # center left + self.assertEqual(mask.data[449, 0], 0) # center left + + def test_rectangular_mask_generation_out_of_bounds_top_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, -0.05), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 49], 1) # bottom left + self.assertEqual(mask.data[949, 50], 0) # bottom left + + self.assertEqual(mask.data[999, 49], 1) # center bottom + self.assertEqual(mask.data[999, 50], 0) # center bottom + self.assertEqual(mask.data[950, 0], 1) # center left + self.assertEqual(mask.data[949, 0], 0) # center left + + def test_rectangular_mask_generation_out_of_bounds_center_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, 0.45), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size, 0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 450], 1) # top left + self.assertEqual(mask.data[949, 449], 0) # top left + self.assertEqual(mask.data[950, 549], 1) # bottom left + self.assertEqual(mask.data[950, 550], 0) # bottom left + + self.assertEqual(mask.data[999, 450], 1) # center top + self.assertEqual(mask.data[999, 449], 0) # center top + self.assertEqual(mask.data[999, 549], 1) # center bottom + self.assertEqual(mask.data[999, 550], 0) # center bottom + self.assertEqual(mask.data[950, 500], 1) # center left + self.assertEqual(mask.data[949, 500], 0) # center left + + def test_rectangular_mask_generation_out_of_bounds_bottom_right(self) -> None: + bounds = Geometry.FloatRect.make(((0.95, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[950, 950], 1) # top left + self.assertEqual(mask.data[949, 949], 0) # top left + + self.assertEqual(mask.data[999, 950], 1) # center top + self.assertEqual(mask.data[999, 949], 0) # center top + self.assertEqual(mask.data[950, 999], 1) # center left + self.assertEqual(mask.data[949, 999], 0) # center left + + def test_rectangular_mask_generation_out_of_bound_center_bottom(self) -> None: + bounds = Geometry.FloatRect.make(((0.45, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[450, 950], 1) # top left + self.assertEqual(mask.data[449, 949], 0) # top left + self.assertEqual(mask.data[549, 950], 1) # top right + self.assertEqual(mask.data[550, 949], 0) # top right + + self.assertEqual(mask.data[500, 950], 1) # center top + self.assertEqual(mask.data[500, 949], 0) # center top + self.assertEqual(mask.data[549, 999], 1) # center right + self.assertEqual(mask.data[550, 999], 0) # center right + self.assertEqual(mask.data[450, 999], 1) # center left + self.assertEqual(mask.data[449, 999], 0) # center left + + def test_rectangular_mask_generation_out_of_bounds_bottom_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, 0.95), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 950], 1) # top right + self.assertEqual(mask.data[50, 949], 0) # top right + + self.assertEqual(mask.data[0, 950], 1) # center top + self.assertEqual(mask.data[0, 949], 0) # center top + self.assertEqual(mask.data[49, 999], 1) # center right + self.assertEqual(mask.data[50, 999], 0) # center right + + def test_rectangular_mask_generation_out_of_bounds_center_left(self) -> None: + bounds = Geometry.FloatRect.make(((-0.05, 0.45), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertEqual(mask.data[49, 549], 1) # bottom right + self.assertEqual(mask.data[50, 550], 0) # bottom right + self.assertEqual(mask.data[49, 450], 1) # top right + self.assertEqual(mask.data[50, 449], 0) # top right + + self.assertEqual(mask.data[0, 450], 1) # center top + self.assertEqual(mask.data[0, 449], 0) # center top + self.assertEqual(mask.data[49, 500], 1) # center right + self.assertEqual(mask.data[50, 500], 0) # center right + self.assertEqual(mask.data[0, 549], 1) # center bottom + self.assertEqual(mask.data[0, 550], 0) # center bottom + + def test_rectangular_mask_generation_out_of_bounds_completely(self) -> None: + bounds = Geometry.FloatRect.make(((1.1, 1.1), (0.1, 0.1))) + mask_xdata = Core.function_make_rectangular_mask((1000, 1000), bounds.center, bounds.size,0) + mask = mask_xdata.data + self.assertTrue(numpy.all(mask == 0)) + + def test_fft_zero_component_calibration(self) -> None: + dimensional_calibrations = (Calibration.Calibration(0, 1, "S"), Calibration.Calibration(0, 1, "S")) + xdata = DataAndMetadata.new_data_and_metadata(data=numpy.ones((16, 8)), dimensional_calibrations=dimensional_calibrations) + result = Core.function_fft(xdata) + self.assertAlmostEqual(0.0, result.dimensional_calibrations[0].convert_to_calibrated_value(8.5)) + self.assertAlmostEqual(0.0, result.dimensional_calibrations[1].convert_to_calibrated_value(4.5)) + xdata2 = DataAndMetadata.new_data_and_metadata(data=numpy.ones((15, 9)), dimensional_calibrations=dimensional_calibrations) + result2 = Core.function_fft(xdata2) + self.assertAlmostEqual(0.0, result2.dimensional_calibrations[0].convert_to_calibrated_value(7.5)) + self.assertAlmostEqual(0.0, result2.dimensional_calibrations[1].convert_to_calibrated_value(4.5)) + xdata3 = DataAndMetadata.new_data_and_metadata(data=numpy.ones((16,)), dimensional_calibrations=dimensional_calibrations[0:1]) + result3 = Core.function_fft(xdata3) + self.assertAlmostEqual(0.0, result3.dimensional_calibrations[0].convert_to_calibrated_value(8.5)) + xdata4 = DataAndMetadata.new_data_and_metadata(data=numpy.ones((15,)), dimensional_calibrations=dimensional_calibrations[0:1]) + result4 = Core.function_fft(xdata4) + self.assertAlmostEqual(0.0, result4.dimensional_calibrations[0].convert_to_calibrated_value(7.5)) + + ## WARP TESTS + # Helper func + def _create_warp_test_data(self, + input_shape: tuple[int,...], + output_shape: tuple[int, ...] | None = None, + identity: bool = False, + mode: str = "greyscale") -> tuple[DataAndMetadata.DataAndMetadata, list[numpy.ndarray]]: + # Determine data type and channels based on mode + if mode == "greyscale": + dtype = float + channels = None + elif mode == "rgb": + dtype = numpy.uint8 + channels = 3 + elif mode == "rgba": + dtype = numpy.uint8 + channels = 4 + else: + raise ValueError(f"Invalid mode: {mode}. Choose 'greyscale', 'rgb', or 'rgba'.") + + # Prepare input shape for data array + if channels is None: + full_shape = input_shape + else: + full_shape = input_shape + (channels,) + + # Input data: sequential numbers for easy validation + data = numpy.arange(numpy.prod(full_shape), dtype=dtype).reshape(full_shape) + src = DataAndMetadata.new_data_and_metadata(data=data) + + # Determine output grid shape + if output_shape is None: + H, W = input_shape[-2:] + else: + H, W = output_shape[-2:] + + # Create warp coordinates + if identity: + # Identity warp: map output coordinates to same as input indices + warp_y, warp_x = numpy.meshgrid( + numpy.arange(input_shape[-2]), + numpy.arange(input_shape[-1]), + indexing="ij" + ) + else: + # Resampling / scaling: map output grid into input index space + in_H, in_W = input_shape[-2:] + y = numpy.arange(0, in_H, in_H / H) + x = numpy.arange(0, in_W, in_W / W) + warp_y, warp_x = numpy.meshgrid(y, x, indexing="ij") + + return src, [warp_y, warp_x] + + def _validate_warp(self, src, dst, coords): + + # ---- shape validation ---- + n_dims = len(coords) # number of warped dimensions + output_shape = coords[0].shape # shape of warp grid + + expected_shape = dst.data_shape[:-n_dims] + output_shape + assert dst.data_shape == expected_shape, ( + f"Output shape mismatch: {dst.data_shape} != {expected_shape}" + ) + + # ---- extract warped subspace ---- + # Take the first element of all leading dimensions + warped = dst._data_ex + for _ in range(warped.ndim - n_dims): + warped = warped[0] + + # warped now has shape == output_shape + + # ---- monotonicity checks for each warped axis ---- + for axis in range(n_dims): + # Build a slice that varies only along this axis + slicer = [0] * n_dims + slicer[axis] = slice(None) + + axis_values = warped[tuple(slicer)] + + # Remove out-of-range zeros (leading or trailing) + nonzero = axis_values != 0 + if numpy.count_nonzero(nonzero) < 2: + # Not enough valid data to validate this axis + continue + + valid_values = axis_values[nonzero] + + diffs = numpy.diff(valid_values) + + assert numpy.all(diffs > 0), ( + f"Warped axis {axis} is not strictly increasing: {axis_values}" + ) + + def test_warp_identity(self) -> None: + src, coords = self._create_warp_test_data(input_shape=(4, 4), identity=True) + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + def test_warp_sequence(self) -> None: + src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(4, 4)) + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + def test_warp_upscale(self) -> None: + # Input 4x4, warp to 8x8 + src, coords = self._create_warp_test_data(input_shape=(4, 4), output_shape=(8, 8)) + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + def test_warp_sequence_upscale(self) -> None: + src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(6, 8, 8)) + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + def test_warp_rgb(self): + src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(4, 4), mode="rgb") + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + def test_warp_rgba(self): + src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(4, 4), mode="rgba") + print(coords) + dst = Core.function_warp(src, coords) + self._validate_warp(src, dst, coords) + + # def validate_test_warp_upscale(self) -> None: + # # Test to validate the validate_warp detects non-increasing values. Do not include in automated tests. + # # Input 4x4, warp to 8x8 + # src, coords = self._create_warp_test_data(input_shape=(4, 4), output_shape=(8, 8)) + # dst = Core.function_warp(src, coords) + # dst.data[0, 2] = 0.1 + # self._validate_warp(src, dst, coords) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) unittest.main() From 3158092474de337dfb83d60cf920ea0c0ff40621 Mon Sep 17 00:00:00 2001 From: Tiomat85 Date: Thu, 15 Jan 2026 16:10:55 +0000 Subject: [PATCH 2/3] Fix handling of channeled data in validate_warp Added is_channel_data to specify whether data has the last dimension as channel (rgb/rgba) and so to correctly validate the warped shape. Missed return type, Adding typing to validate_warp --- nion/data/test/Core_test.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/nion/data/test/Core_test.py b/nion/data/test/Core_test.py index 08a1fc5..345aaae 100755 --- a/nion/data/test/Core_test.py +++ b/nion/data/test/Core_test.py @@ -1344,6 +1344,7 @@ def _create_warp_test_data(self, identity: bool = False, mode: str = "greyscale") -> tuple[DataAndMetadata.DataAndMetadata, list[numpy.ndarray]]: # Determine data type and channels based on mode + dtype: numpy.typing.DTypeLike if mode == "greyscale": dtype = float channels = None @@ -1389,13 +1390,17 @@ def _create_warp_test_data(self, return src, [warp_y, warp_x] - def _validate_warp(self, src, dst, coords): + def _validate_warp(self, src: DataAndMetadata.DataAndMetadata, dst: DataAndMetadata.DataAndMetadata, coords: list[numpy.ndarray], is_channel_data: bool = False) -> None: # ---- shape validation ---- n_dims = len(coords) # number of warped dimensions output_shape = coords[0].shape # shape of warp grid expected_shape = dst.data_shape[:-n_dims] + output_shape + + if is_channel_data: + expected_shape = dst.data_shape[:-n_dims-1] + output_shape + (dst.data_shape[-1],) + assert dst.data_shape == expected_shape, ( f"Output shape mismatch: {dst.data_shape} != {expected_shape}" ) @@ -1411,7 +1416,7 @@ def _validate_warp(self, src, dst, coords): # ---- monotonicity checks for each warped axis ---- for axis in range(n_dims): # Build a slice that varies only along this axis - slicer = [0] * n_dims + slicer: list[typing.Union[int, slice]] = [0] * n_dims slicer[axis] = slice(None) axis_values = warped[tuple(slicer)] @@ -1451,16 +1456,15 @@ def test_warp_sequence_upscale(self) -> None: dst = Core.function_warp(src, coords) self._validate_warp(src, dst, coords) - def test_warp_rgb(self): + def test_warp_rgb(self) -> None: src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(4, 4), mode="rgb") dst = Core.function_warp(src, coords) - self._validate_warp(src, dst, coords) + self._validate_warp(src, dst, coords, is_channel_data=True) - def test_warp_rgba(self): + def test_warp_rgba(self) -> None: src, coords = self._create_warp_test_data(input_shape=(6, 4, 4), output_shape=(4, 4), mode="rgba") - print(coords) dst = Core.function_warp(src, coords) - self._validate_warp(src, dst, coords) + self._validate_warp(src, dst, coords, is_channel_data=True) # def validate_test_warp_upscale(self) -> None: # # Test to validate the validate_warp detects non-increasing values. Do not include in automated tests. From ab6695e6a874039e7ca1b1ce640314e0ca693b9f Mon Sep 17 00:00:00 2001 From: Tiomat85 Date: Tue, 20 Jan 2026 11:17:19 +0000 Subject: [PATCH 3/3] Review updates Removed test-test code. Adds function description comment. Utilise better properties to identify if data is channelled. --- nion/data/Core.py | 15 ++++++++++++--- nion/data/test/Core_test.py | 8 -------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/nion/data/Core.py b/nion/data/Core.py index 7784db7..015f046 100755 --- a/nion/data/Core.py +++ b/nion/data/Core.py @@ -1800,15 +1800,24 @@ def calculate_data() -> _ImageDataType: return DataAndMetadata.new_data_and_metadata(data=calculate_data(), intensity_calibration=data_and_metadata.intensity_calibration, dimensional_calibrations=resampled_dimensional_calibrations) -def function_warp(data_and_metadata_in: _DataAndMetadataLike, coordinates_in: typing.Sequence[ - _DataAndMetadataLike], order: int = 1) -> DataAndMetadata.DataAndMetadata: +def function_warp(data_and_metadata_in: _DataAndMetadataLike, coordinates_in: typing.Sequence[_DataAndMetadataLike], order: int = 1) -> DataAndMetadata.DataAndMetadata: + """ + Warp or unwarp input data using an N-dimensional warp map. + + The warp map is applied along N axes and broadcast over any additional + dimensions in the input, allowing a single warp map to be used for + higher-dimensional data (e.g., image sequences). For channelled data + such as RGB/RGBA, the warp is applied uniformly to all channels. + + scipy map_coordinates does not broadcast by default, so need to loop + """ data_and_metadata = DataAndMetadata.promote_ndarray(data_and_metadata_in) coordinates = [DataAndMetadata.promote_ndarray(c) for c in coordinates_in] coords = numpy.stack([c.data.astype(float) for c in coordinates], axis=0) data = data_and_metadata._data_ex num_frame_dims = coords.shape[0] - if data_and_metadata.is_data_rgb or data_and_metadata.is_data_rgba: + if data_and_metadata.is_data_rgb_type: # Last dimension is channels leading_shape = data.shape[:-num_frame_dims - 1] output_shape = leading_shape + coords.shape[1:] diff --git a/nion/data/test/Core_test.py b/nion/data/test/Core_test.py index 345aaae..3f9e236 100755 --- a/nion/data/test/Core_test.py +++ b/nion/data/test/Core_test.py @@ -1466,14 +1466,6 @@ def test_warp_rgba(self) -> None: dst = Core.function_warp(src, coords) self._validate_warp(src, dst, coords, is_channel_data=True) - # def validate_test_warp_upscale(self) -> None: - # # Test to validate the validate_warp detects non-increasing values. Do not include in automated tests. - # # Input 4x4, warp to 8x8 - # src, coords = self._create_warp_test_data(input_shape=(4, 4), output_shape=(8, 8)) - # dst = Core.function_warp(src, coords) - # dst.data[0, 2] = 0.1 - # self._validate_warp(src, dst, coords) - if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)