From 5acdc2a454322194d9ffb0c42bccec13d0988a3d Mon Sep 17 00:00:00 2001 From: "Sharlon N. Regales" Date: Thu, 29 Jan 2026 17:48:43 +0100 Subject: [PATCH 1/5] Add filter transaction types --- packages/scratch-core/src/container_models/base.py | 7 ++++++- packages/scratch-core/src/utils/constants.py | 7 +++++++ 2 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 packages/scratch-core/src/utils/constants.py diff --git a/packages/scratch-core/src/container_models/base.py b/packages/scratch-core/src/container_models/base.py index df2bb659..6cd99536 100644 --- a/packages/scratch-core/src/container_models/base.py +++ b/packages/scratch-core/src/container_models/base.py @@ -1,6 +1,6 @@ from collections.abc import Sequence from functools import partial -from typing import Annotated, TypeAlias +from typing import Annotated, NamedTuple, TypeAlias from numpy import array, bool_, floating, number, uint8 from numpy.typing import DTypeLike, NDArray @@ -99,3 +99,8 @@ class ConfigBaseModel(BaseModel): arbitrary_types_allowed=True, regex_engine="rust-regex", ) + + +class Point[T](NamedTuple): + x: T + y: T diff --git a/packages/scratch-core/src/utils/constants.py b/packages/scratch-core/src/utils/constants.py new file mode 100644 index 00000000..3844c980 --- /dev/null +++ b/packages/scratch-core/src/utils/constants.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class RegressionOrder(Enum): + GAUSSIAN_WEIGHTED_AVERAGE = 0 + LOCAL_PLANAR = 1 + LOCAL_QUADRATIC = 2 From 550a439c939fd86009436906ebb23995053bd528 Mon Sep 17 00:00:00 2001 From: "Sharlon N. Regales" Date: Thu, 29 Jan 2026 17:49:49 +0100 Subject: [PATCH 2/5] Move internal lower level helper functions to computations module --- .../scratch-core/src/renders/computations.py | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 packages/scratch-core/src/renders/computations.py diff --git a/packages/scratch-core/src/renders/computations.py b/packages/scratch-core/src/renders/computations.py new file mode 100644 index 00000000..18ba43a3 --- /dev/null +++ b/packages/scratch-core/src/renders/computations.py @@ -0,0 +1,72 @@ +from itertools import product + +from numpy.typing import NDArray +from conversion.filter import _convolve_2d_separable + +from container_models.base import Point +import numpy as np + + +def generate_polynomial_exponents(order: int) -> list[Point[int]]: + """ + Generate polynomial exponent pairs for 2D polynomial terms up to a given order. + + :param order: Maximum total degree (py + px) for the polynomial terms. + :returns: List of (power_y, power_x) tuples representing polynomial terms. + """ + return [ + Point(x, y) for y, x in product(range(order + 1), repeat=2) if y + x <= order + ] + + +# TODO: make it generic +# def build_lhs_matrix[T: NDArray(value: T) -> T:... +# def build_lhs_matrix[T: NDArray[np.floating]](value: T) -> T:... +def build_lhs_matrix( + weights: NDArray[np.floating], + kernel_x: NDArray[np.floating], + kernel_y: NDArray[np.floating], + x_coords: NDArray[np.floating], + y_coords: NDArray[np.floating], + exponents: list[Point[int]], +) -> NDArray[np.floating]: + """ + Construct the LHS matrix 'A' efficiently. + + Optimization: Many terms in the matrix are identical (e.g., x*y appears multiple times). + We compute unique moment sums first, then map them to the matrix structure. + """ + n_params = len(exponents) + + # Calculate sum of powers for every cell in the matrix (A_pq = term_p * term_q) + matrix_power_sums = np.array( + [ + (exponents[p][0] + exponents[q][0], exponents[p][1] + exponents[q][1]) + for q, p in product(range(n_params), repeat=2) + ] + ) + + # Find unique combinations of powers to avoid redundant convolutions + unique_powers, inverse_indices = np.unique( + matrix_power_sums, axis=0, return_inverse=True + ) + + # Compute convolutions for unique powers + unique_moments = np.array( + [ + _convolve_2d_separable( + weights, (x_coords**point.x) * kernel_x, (y_coords**point.y) * kernel_y + ) + for point in unique_powers + ] + ) + + # Reconstruct the full (n_params * n_params) matrix for every pixel + # Current shape: (unique_terms, H, W) -> Map to -> (n_params^2, H, W) + full_moments = unique_moments[inverse_indices] + + # Reshape to (H, W, n_params, n_params) + # We move axes so H, W are first, creating a matrix for every pixel + return np.moveaxis( + full_moments.reshape(n_params, n_params, *weights.shape), [0, 1], [-2, -1] + ) From 2029e838feb906f43766f2bb860aead4a499dd0c Mon Sep 17 00:00:00 2001 From: "Sharlon N. Regales" Date: Mon, 26 Jan 2026 12:00:07 +0100 Subject: [PATCH 3/5] Refactor gaussian filters --- packages/scratch-core/src/renders/filters.py | 132 +++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 packages/scratch-core/src/renders/filters.py diff --git a/packages/scratch-core/src/renders/filters.py b/packages/scratch-core/src/renders/filters.py new file mode 100644 index 00000000..f577b7a3 --- /dev/null +++ b/packages/scratch-core/src/renders/filters.py @@ -0,0 +1,132 @@ +import numpy as np +from container_models.base import Point +from conversion.filter import ( + _convolve_2d_separable, + _solve_pixelwise_regression, +) +from numpy.typing import NDArray +from conversion.filter import ( + _apply_order0_filter, + _create_normalized_separable_kernels, +) + +from renders.computations import generate_polynomial_exponents, build_lhs_matrix +from utils.constants import RegressionOrder + +# Constants based on ISO 16610 surface texture standards +# Standard Gaussian alpha for 50% transmission +ALPHA_GAUSSIAN = np.sqrt(np.log(2) / np.pi) +# Adjusted alpha often used for higher-order regression filters to maintain properties +# alpha = Sqrt((-1 - LambertW(-1, -1 / (2 * exp(1)))) / Pi) +ALPHA_REGRESSION = 0.7309134280946760 + + +def calculate_polynomial_filter( + data: NDArray[np.floating], + kernel_x: NDArray[np.floating], + kernel_y: NDArray[np.floating], + exponents: list[Point[int]], +) -> NDArray[np.floating]: + """ + Apply Order-1 or Order-2 Local Polynomial Regression. + + This function performs a Weighted Least Squares (WLS) fit of a polynomial surface within a local window + defined by the kernels. For each pixel, it solves the linear system A * c = b, where 'c' contains the + coefficients of the polynomial. The smoothed value is the first coefficient (c0). + + The kernels (kernel_x, kernel_y) serve as spatial weight functions. They determine the importance of + neighboring pixels in the regression. A non-uniform kernel (e.g., Gaussian) ensures that points closer + to the target pixel have a higher influence on the fit than points at the window's edge, providing better + localization and noise suppression. + + :param data: The 2D input array to be filtered. Can contain NaNs, which are treated as zero-weight during + the regression. + :param kernel_x: 1D array representing the horizontal weight distribution. + :param kernel_y: 1D array representing the vertical weight distribution. + :param exponents: List of (power_y, power_x) tuples defining the polynomial terms. + :returns: The filtered (smoothed) version of the input data. + """ + # 1. Setup Coordinate Systems (Normalized to [-1, 1] for stability) + ny, nx = len(kernel_y), len(kernel_x) + radius_y, radius_x = (ny - 1) // 2, (nx - 1) // 2 + + y_coords = np.arange(-radius_y, radius_y + 1) / (radius_y if ny > 1 else 1.0) + x_coords = np.arange(-radius_x, radius_x + 1) / (radius_x if nx > 1 else 1.0) + + # 2. Construct the Linear System Components (A matrix and b vector) + nan_mask = np.isnan(data) + weights = np.where(nan_mask, 0.0, 1.0) + weighted_data = np.where(nan_mask, 0.0, data * weights) + + # Calculate RHS vector 'b' (Data Moments) + # b_k = Convolution(weighted_data, x^px * y^py * Kernel) + rhs_moments = np.array( + [ + _convolve_2d_separable( + weighted_data, (x_coords**px) * kernel_x, (y_coords**py) * kernel_y + ) + for py, px in exponents + ] + ) + + # Calculate LHS Matrix 'A' (Weight Moments) + # A_jk = Convolution(weights, x^(px_j + px_k) * y^(py_j + py_k) * Kernel) + lhs_matrix = build_lhs_matrix( + weights, kernel_x, kernel_y, x_coords, y_coords, exponents + ) + + # 3. Solve the System (A * c = b) per pixel + return _solve_pixelwise_regression(lhs_matrix, rhs_moments, data) + + +def apply_gaussian_regression_filter( + data: NDArray[np.floating], + cutoff_pixels: NDArray[np.floating], + regression_order: RegressionOrder, +) -> NDArray[np.floating]: + """ + Apply a 2D Savitzky-Golay filter with Gaussian weighting via local polynomial regression (ISO 16610-21). + + This implementation generalizes standard Gaussian filtering to handle missing data (NaNs) using local + regression techniques. It supports 0th order (Gaussian Kernel weighted average), 1st order (planar fit), + and 2nd order (quadratic fit) regression. + + Explanation of Regression Orders: + - **Order 0**: Equivalent to the Nadaraya-Watson estimator. It calculates a weighted average where weights + are determined by the Gaussian kernel and the validity (non-NaN status) of neighboring pixels. + - **Order 1 & 2**: Local Weighted Least Squares (LOESS). It fits a polynomial surface (plane or quadratic) to + the local neighborhood weighted by the Gaussian kernel. This acts as a robust 2D Savitzky-Golay filter. + + Mathematical basis: + - Approximate a signal s(x, y) from noisy data f(x, y) = s(x, y) + e(x, y) using weighted local regression. + - The approximation b(x, y) is calculated as the fitted value at point (x, y) using a weighted least squares + approach. Weights are non-zero within the neighborhood [x - rx, x + rx] and [y - ry, y + ry], following a + Gaussian distribution with standard deviations proportional to rx and ry. + - Optimization: + For **Order 0**, the operation is mathematically equivalent to a normalized convolution. This implementation + uses FFT-based convolution for performance gains compared to pixel-wise regression. + + :param data: 2D input array containing float data. May contain NaNs. + :param cutoff_pixels: The filter cutoff wavelength in pixels as array [cutoff_y, cutoff_x]. + :param regression_order: RegressionOrder enum specifying the polynomial fit order: + GAUSSIAN_WEIGHTED_AVERAGE (0) = Gaussian weighted average. + LOCAL_PLANAR (1) = Local planar fit (corrects for tilt). + LOCAL_QUADRATIC (2) = Local quadratic fit (corrects for quadratic curvature). + :returns: The filtered 2D array of the same shape as input. + """ + alpha = ( + ALPHA_REGRESSION + if regression_order == RegressionOrder.LOCAL_QUADRATIC + else ALPHA_GAUSSIAN + ) + kernel_x, kernel_y = _create_normalized_separable_kernels(alpha, cutoff_pixels) + + if regression_order == RegressionOrder.GAUSSIAN_WEIGHTED_AVERAGE: + return _apply_order0_filter(data, kernel_x, kernel_y) + + return calculate_polynomial_filter( + data, + kernel_x, + kernel_y, + exponents=generate_polynomial_exponents(regression_order.value), + ) From 2e38a6b6083355a05ab6065518b87c2417d924b8 Mon Sep 17 00:00:00 2001 From: Raytesnel Date: Wed, 4 Feb 2026 15:32:17 +0100 Subject: [PATCH 4/5] revert --- .../scratch-core/src/container_models/base.py | 7 +- .../scratch-core/src/renders/computations.py | 72 ---------- packages/scratch-core/src/renders/filters.py | 132 ------------------ packages/scratch-core/src/utils/constants.py | 7 - 4 files changed, 1 insertion(+), 217 deletions(-) delete mode 100644 packages/scratch-core/src/renders/computations.py delete mode 100644 packages/scratch-core/src/renders/filters.py delete mode 100644 packages/scratch-core/src/utils/constants.py diff --git a/packages/scratch-core/src/container_models/base.py b/packages/scratch-core/src/container_models/base.py index 6cd99536..df2bb659 100644 --- a/packages/scratch-core/src/container_models/base.py +++ b/packages/scratch-core/src/container_models/base.py @@ -1,6 +1,6 @@ from collections.abc import Sequence from functools import partial -from typing import Annotated, NamedTuple, TypeAlias +from typing import Annotated, TypeAlias from numpy import array, bool_, floating, number, uint8 from numpy.typing import DTypeLike, NDArray @@ -99,8 +99,3 @@ class ConfigBaseModel(BaseModel): arbitrary_types_allowed=True, regex_engine="rust-regex", ) - - -class Point[T](NamedTuple): - x: T - y: T diff --git a/packages/scratch-core/src/renders/computations.py b/packages/scratch-core/src/renders/computations.py deleted file mode 100644 index 18ba43a3..00000000 --- a/packages/scratch-core/src/renders/computations.py +++ /dev/null @@ -1,72 +0,0 @@ -from itertools import product - -from numpy.typing import NDArray -from conversion.filter import _convolve_2d_separable - -from container_models.base import Point -import numpy as np - - -def generate_polynomial_exponents(order: int) -> list[Point[int]]: - """ - Generate polynomial exponent pairs for 2D polynomial terms up to a given order. - - :param order: Maximum total degree (py + px) for the polynomial terms. - :returns: List of (power_y, power_x) tuples representing polynomial terms. - """ - return [ - Point(x, y) for y, x in product(range(order + 1), repeat=2) if y + x <= order - ] - - -# TODO: make it generic -# def build_lhs_matrix[T: NDArray(value: T) -> T:... -# def build_lhs_matrix[T: NDArray[np.floating]](value: T) -> T:... -def build_lhs_matrix( - weights: NDArray[np.floating], - kernel_x: NDArray[np.floating], - kernel_y: NDArray[np.floating], - x_coords: NDArray[np.floating], - y_coords: NDArray[np.floating], - exponents: list[Point[int]], -) -> NDArray[np.floating]: - """ - Construct the LHS matrix 'A' efficiently. - - Optimization: Many terms in the matrix are identical (e.g., x*y appears multiple times). - We compute unique moment sums first, then map them to the matrix structure. - """ - n_params = len(exponents) - - # Calculate sum of powers for every cell in the matrix (A_pq = term_p * term_q) - matrix_power_sums = np.array( - [ - (exponents[p][0] + exponents[q][0], exponents[p][1] + exponents[q][1]) - for q, p in product(range(n_params), repeat=2) - ] - ) - - # Find unique combinations of powers to avoid redundant convolutions - unique_powers, inverse_indices = np.unique( - matrix_power_sums, axis=0, return_inverse=True - ) - - # Compute convolutions for unique powers - unique_moments = np.array( - [ - _convolve_2d_separable( - weights, (x_coords**point.x) * kernel_x, (y_coords**point.y) * kernel_y - ) - for point in unique_powers - ] - ) - - # Reconstruct the full (n_params * n_params) matrix for every pixel - # Current shape: (unique_terms, H, W) -> Map to -> (n_params^2, H, W) - full_moments = unique_moments[inverse_indices] - - # Reshape to (H, W, n_params, n_params) - # We move axes so H, W are first, creating a matrix for every pixel - return np.moveaxis( - full_moments.reshape(n_params, n_params, *weights.shape), [0, 1], [-2, -1] - ) diff --git a/packages/scratch-core/src/renders/filters.py b/packages/scratch-core/src/renders/filters.py deleted file mode 100644 index f577b7a3..00000000 --- a/packages/scratch-core/src/renders/filters.py +++ /dev/null @@ -1,132 +0,0 @@ -import numpy as np -from container_models.base import Point -from conversion.filter import ( - _convolve_2d_separable, - _solve_pixelwise_regression, -) -from numpy.typing import NDArray -from conversion.filter import ( - _apply_order0_filter, - _create_normalized_separable_kernels, -) - -from renders.computations import generate_polynomial_exponents, build_lhs_matrix -from utils.constants import RegressionOrder - -# Constants based on ISO 16610 surface texture standards -# Standard Gaussian alpha for 50% transmission -ALPHA_GAUSSIAN = np.sqrt(np.log(2) / np.pi) -# Adjusted alpha often used for higher-order regression filters to maintain properties -# alpha = Sqrt((-1 - LambertW(-1, -1 / (2 * exp(1)))) / Pi) -ALPHA_REGRESSION = 0.7309134280946760 - - -def calculate_polynomial_filter( - data: NDArray[np.floating], - kernel_x: NDArray[np.floating], - kernel_y: NDArray[np.floating], - exponents: list[Point[int]], -) -> NDArray[np.floating]: - """ - Apply Order-1 or Order-2 Local Polynomial Regression. - - This function performs a Weighted Least Squares (WLS) fit of a polynomial surface within a local window - defined by the kernels. For each pixel, it solves the linear system A * c = b, where 'c' contains the - coefficients of the polynomial. The smoothed value is the first coefficient (c0). - - The kernels (kernel_x, kernel_y) serve as spatial weight functions. They determine the importance of - neighboring pixels in the regression. A non-uniform kernel (e.g., Gaussian) ensures that points closer - to the target pixel have a higher influence on the fit than points at the window's edge, providing better - localization and noise suppression. - - :param data: The 2D input array to be filtered. Can contain NaNs, which are treated as zero-weight during - the regression. - :param kernel_x: 1D array representing the horizontal weight distribution. - :param kernel_y: 1D array representing the vertical weight distribution. - :param exponents: List of (power_y, power_x) tuples defining the polynomial terms. - :returns: The filtered (smoothed) version of the input data. - """ - # 1. Setup Coordinate Systems (Normalized to [-1, 1] for stability) - ny, nx = len(kernel_y), len(kernel_x) - radius_y, radius_x = (ny - 1) // 2, (nx - 1) // 2 - - y_coords = np.arange(-radius_y, radius_y + 1) / (radius_y if ny > 1 else 1.0) - x_coords = np.arange(-radius_x, radius_x + 1) / (radius_x if nx > 1 else 1.0) - - # 2. Construct the Linear System Components (A matrix and b vector) - nan_mask = np.isnan(data) - weights = np.where(nan_mask, 0.0, 1.0) - weighted_data = np.where(nan_mask, 0.0, data * weights) - - # Calculate RHS vector 'b' (Data Moments) - # b_k = Convolution(weighted_data, x^px * y^py * Kernel) - rhs_moments = np.array( - [ - _convolve_2d_separable( - weighted_data, (x_coords**px) * kernel_x, (y_coords**py) * kernel_y - ) - for py, px in exponents - ] - ) - - # Calculate LHS Matrix 'A' (Weight Moments) - # A_jk = Convolution(weights, x^(px_j + px_k) * y^(py_j + py_k) * Kernel) - lhs_matrix = build_lhs_matrix( - weights, kernel_x, kernel_y, x_coords, y_coords, exponents - ) - - # 3. Solve the System (A * c = b) per pixel - return _solve_pixelwise_regression(lhs_matrix, rhs_moments, data) - - -def apply_gaussian_regression_filter( - data: NDArray[np.floating], - cutoff_pixels: NDArray[np.floating], - regression_order: RegressionOrder, -) -> NDArray[np.floating]: - """ - Apply a 2D Savitzky-Golay filter with Gaussian weighting via local polynomial regression (ISO 16610-21). - - This implementation generalizes standard Gaussian filtering to handle missing data (NaNs) using local - regression techniques. It supports 0th order (Gaussian Kernel weighted average), 1st order (planar fit), - and 2nd order (quadratic fit) regression. - - Explanation of Regression Orders: - - **Order 0**: Equivalent to the Nadaraya-Watson estimator. It calculates a weighted average where weights - are determined by the Gaussian kernel and the validity (non-NaN status) of neighboring pixels. - - **Order 1 & 2**: Local Weighted Least Squares (LOESS). It fits a polynomial surface (plane or quadratic) to - the local neighborhood weighted by the Gaussian kernel. This acts as a robust 2D Savitzky-Golay filter. - - Mathematical basis: - - Approximate a signal s(x, y) from noisy data f(x, y) = s(x, y) + e(x, y) using weighted local regression. - - The approximation b(x, y) is calculated as the fitted value at point (x, y) using a weighted least squares - approach. Weights are non-zero within the neighborhood [x - rx, x + rx] and [y - ry, y + ry], following a - Gaussian distribution with standard deviations proportional to rx and ry. - - Optimization: - For **Order 0**, the operation is mathematically equivalent to a normalized convolution. This implementation - uses FFT-based convolution for performance gains compared to pixel-wise regression. - - :param data: 2D input array containing float data. May contain NaNs. - :param cutoff_pixels: The filter cutoff wavelength in pixels as array [cutoff_y, cutoff_x]. - :param regression_order: RegressionOrder enum specifying the polynomial fit order: - GAUSSIAN_WEIGHTED_AVERAGE (0) = Gaussian weighted average. - LOCAL_PLANAR (1) = Local planar fit (corrects for tilt). - LOCAL_QUADRATIC (2) = Local quadratic fit (corrects for quadratic curvature). - :returns: The filtered 2D array of the same shape as input. - """ - alpha = ( - ALPHA_REGRESSION - if regression_order == RegressionOrder.LOCAL_QUADRATIC - else ALPHA_GAUSSIAN - ) - kernel_x, kernel_y = _create_normalized_separable_kernels(alpha, cutoff_pixels) - - if regression_order == RegressionOrder.GAUSSIAN_WEIGHTED_AVERAGE: - return _apply_order0_filter(data, kernel_x, kernel_y) - - return calculate_polynomial_filter( - data, - kernel_x, - kernel_y, - exponents=generate_polynomial_exponents(regression_order.value), - ) diff --git a/packages/scratch-core/src/utils/constants.py b/packages/scratch-core/src/utils/constants.py deleted file mode 100644 index 3844c980..00000000 --- a/packages/scratch-core/src/utils/constants.py +++ /dev/null @@ -1,7 +0,0 @@ -from enum import Enum - - -class RegressionOrder(Enum): - GAUSSIAN_WEIGHTED_AVERAGE = 0 - LOCAL_PLANAR = 1 - LOCAL_QUADRATIC = 2 From b52847baf4ef63be242e5f48a25346600f8aa46d Mon Sep 17 00:00:00 2001 From: Raytesnel Date: Wed, 4 Feb 2026 16:39:33 +0100 Subject: [PATCH 5/5] refactor to a ImageMutation --- .../scratch-core/src/mutations/__init__.py | 3 + packages/scratch-core/src/mutations/filter.py | 147 +++++++++++++++++- packages/scratch-core/src/utils/constants.py | 7 + 3 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 packages/scratch-core/src/utils/constants.py diff --git a/packages/scratch-core/src/mutations/__init__.py b/packages/scratch-core/src/mutations/__init__.py index e69de29b..1837a3ba 100644 --- a/packages/scratch-core/src/mutations/__init__.py +++ b/packages/scratch-core/src/mutations/__init__.py @@ -0,0 +1,3 @@ +from .filter import GausianRegressionFilter, LevelMap + +__all__ = ["GausianRegressionFilter", "LevelMap"] diff --git a/packages/scratch-core/src/mutations/filter.py b/packages/scratch-core/src/mutations/filter.py index c4e92235..06ea8733 100644 --- a/packages/scratch-core/src/mutations/filter.py +++ b/packages/scratch-core/src/mutations/filter.py @@ -1,5 +1,7 @@ +from collections import namedtuple +from itertools import product from typing import NamedTuple -from container_models.base import FloatArray1D, FloatArray2D +from container_models.base import DepthData, FloatArray1D, FloatArray2D from container_models.scan_image import ScanImage from conversion.leveling.data_types import SurfaceTerms from conversion.leveling.solver.design import build_design_matrix @@ -7,6 +9,14 @@ from conversion.leveling.solver.transforms import normalize_coordinates from mutations.base import ImageMutation import numpy as np +from utils.constants import RegressionOrder +from conversion.filter.regression import ( + _build_lhs_matrix, + _solve_pixelwise_regression, + apply_order0_filter, + convolve_2d_separable, + create_normalized_separable_kernels, +) class PointCloud(NamedTuple): @@ -118,3 +128,138 @@ def apply_on_image(self, scan_image: ScanImage) -> ScanImage: scan_image.data = leveled_map_2d return scan_image + + +class GausianRegressionFilter(ImageMutation): + # Constants based on ISO 16610 surface texture standards + # Standard Gaussian alpha for 50% transmission + ALPHA_GAUSSIAN = np.sqrt(np.log(2) / np.pi) + # Adjusted alpha often used for higher-order regression filters to maintain properties + # alpha = Sqrt((-1 - LambertW(-1, -1 / (2 * exp(1)))) / Pi) + ALPHA_REGRESSION = 0.7309134280946760 + _Exponent = namedtuple("Exponent", ["y", "x"]) + + def __init__( + self, cutoff_pixels: FloatArray1D, regression_order: RegressionOrder + ) -> None: + self.cutoff_pixels = cutoff_pixels + self.regression_order = regression_order + + def calculate_polynomial_filter( + self, + data: DepthData, + kernel_x: FloatArray1D, + kernel_y: FloatArray1D, + exponents: list[_Exponent], + ) -> DepthData: + """ + Apply Order-1 or Order-2 Local Polynomial Regression. + This function performs a Weighted Least Squares (WLS) fit of a polynomial surface within a local window + defined by the kernels. For each pixel, it solves the linear system A * c = b, where 'c' contains the + coefficients of the polynomial. The smoothed value is the first coefficient (c0). + The kernels (kernel_x, kernel_y) serve as spatial weight functions. They determine the importance of + neighboring pixels in the regression. A non-uniform kernel (e.g., Gaussian) ensures that points closer + to the target pixel have a higher influence on the fit than points at the window's edge, providing better + localization and noise suppression. + :param data: The 2D input array to be filtered. Can contain NaNs, which are treated as zero-weight during + the regression. + :param kernel_x: 1D array representing the horizontal weight distribution. + :param kernel_y: 1D array representing the vertical weight distribution. + :param exponents: List of (power_y, power_x) tuples defining the polynomial terms. + :returns: The filtered (smoothed) version of the input data. + """ + # 1. Setup Coordinate Systems (Normalized to [-1, 1] for stability) + ny, nx = len(kernel_y), len(kernel_x) + radius_y, radius_x = (ny - 1) // 2, (nx - 1) // 2 + + y_coords = np.arange(-radius_y, radius_y + 1) / (radius_y if ny > 1 else 1.0) + x_coords = np.arange(-radius_x, radius_x + 1) / (radius_x if nx > 1 else 1.0) + + # 2. Construct the Linear System Components (A matrix and b vector) + nan_mask = np.isnan(data) + weights = np.where(nan_mask, 0.0, 1.0) + weighted_data = np.where(nan_mask, 0.0, data * weights) + + # Calculate RHS vector 'b' (Data Moments) + # b_k = Convolution(weighted_data, x^px * y^py * Kernel) + rhs_moments = np.array( + [ + convolve_2d_separable( + weighted_data, (x_coords**px) * kernel_x, (y_coords**py) * kernel_y + ) + for py, px in exponents + ] + ) + + # Calculate LHS Matrix 'A' (Weight Moments) + # A_jk = Convolution(weights, x^(px_j + px_k) * y^(py_j + py_k) * Kernel) + lhs_matrix = _build_lhs_matrix( + weights, + kernel_x, + kernel_y, + x_coords, + y_coords, + exponents, # type: ignore + ) + + # 3. Solve the System (A * c = b) per pixel + return _solve_pixelwise_regression(lhs_matrix, rhs_moments, data) + + def generate_polynomial_exponents(self, order: int) -> list[_Exponent]: + """ + Generate polynomial exponent pairs for 2D polynomial terms up to a given order. + :param order: Maximum total degree (py + px) for the polynomial terms. + :returns: List of (power_y, power_x) tuples representing polynomial terms. + """ + return [ + self._Exponent(x, y) + for y, x in product(range(order + 1), repeat=2) + if y + x <= order + ] + + def apply_on_image(self, scan_image: ScanImage) -> ScanImage: + """ + Apply a 2D Savitzky-Golay filter with Gaussian weighting via local polynomial regression (ISO 16610-21). + This implementation generalizes standard Gaussian filtering to handle missing data (NaNs) using local + regression techniques. It supports 0th order (Gaussian Kernel weighted average), 1st order (planar fit), + and 2nd order (quadratic fit) regression. + Explanation of Regression Orders: + - **Order 0**: Equivalent to the Nadaraya-Watson estimator. It calculates a weighted average where weights + are determined by the Gaussian kernel and the validity (non-NaN status) of neighboring pixels. + - **Order 1 & 2**: Local Weighted Least Squares (LOESS). It fits a polynomial surface (plane or quadratic) to + the local neighborhood weighted by the Gaussian kernel. This acts as a robust 2D Savitzky-Golay filter. + Mathematical basis: + - Approximate a signal s(x, y) from noisy data f(x, y) = s(x, y) + e(x, y) using weighted local regression. + - The approximation b(x, y) is calculated as the fitted value at point (x, y) using a weighted least squares + approach. Weights are non-zero within the neighborhood [x - rx, x + rx] and [y - ry, y + ry], following a + Gaussian distribution with standard deviations proportional to rx and ry. + - Optimization: + For **Order 0**, the operation is mathematically equivalent to a normalized convolution. This implementation + uses FFT-based convolution for performance gains compared to pixel-wise regression. + :param data: 2D input array containing float data. May contain NaNs. + :param cutoff_pixels: The filter cutoff wavelength in pixels as array [cutoff_y, cutoff_x]. + :param regression_order: RegressionOrder enum specifying the polynomial fit order: + GAUSSIAN_WEIGHTED_AVERAGE (0) = Gaussian weighted average. + LOCAL_PLANAR (1) = Local planar fit (corrects for tilt). + LOCAL_QUADRATIC (2) = Local quadratic fit (corrects for quadratic curvature). + :returns: The filtered 2D array of the same shape as input. + """ + alpha = ( + self.ALPHA_REGRESSION + if self.regression_order == RegressionOrder.LOCAL_QUADRATIC + else self.ALPHA_GAUSSIAN + ) + kernel_x, kernel_y = create_normalized_separable_kernels( + alpha, self.cutoff_pixels + ) + + if self.regression_order == RegressionOrder.GAUSSIAN_WEIGHTED_AVERAGE: + scan_image.data = apply_order0_filter(scan_image.data, kernel_x, kernel_y) + return scan_image + scan_image.data = self.calculate_polynomial_filter( + scan_image.data, + kernel_x, + kernel_y, + exponents=self.generate_polynomial_exponents(self.regression_order.value), + ) + return scan_image diff --git a/packages/scratch-core/src/utils/constants.py b/packages/scratch-core/src/utils/constants.py new file mode 100644 index 00000000..1ded9267 --- /dev/null +++ b/packages/scratch-core/src/utils/constants.py @@ -0,0 +1,7 @@ +from enum import Enum, auto + + +class RegressionOrder(Enum): + GAUSSIAN_WEIGHTED_AVERAGE = auto() + LOCAL_PLANAR = auto() + LOCAL_QUADRATIC = auto()