diff --git a/src/pysatl_core/distributions/fitters.py b/src/pysatl_core/distributions/fitters.py index 6bebe83..ea76525 100644 --- a/src/pysatl_core/distributions/fitters.py +++ b/src/pysatl_core/distributions/fitters.py @@ -1,6 +1,6 @@ from __future__ import annotations -__author__ = "Leonid Elkin, Mikhail Mikhailov" +__author__ = "Leonid Elkin, Mikhail Mikhailov, Irina Sergeeva" __copyright__ = "Copyright (c) 2025 PySATL project" __license__ = "SPDX-License-Identifier: MIT" @@ -21,7 +21,7 @@ ExplicitTableDiscreteSupport, IntegerLatticeDiscreteSupport, ) -from pysatl_core.types import CharacteristicName +from pysatl_core.types import CharacteristicName, NumericArray if TYPE_CHECKING: from typing import Any @@ -30,21 +30,19 @@ from pysatl_core.types import GenericCharacteristicName, ScalarFunc -def _resolve(distribution: Distribution, name: GenericCharacteristicName) -> ScalarFunc: +def _resolve(distribution: Distribution, name: GenericCharacteristicName): """ - Resolve a scalar characteristic from the distribution. - Parameters ---------- distribution : Distribution Source distribution that provides the computation strategy. - name : str - Characteristic name to resolve (e.g., ``"cdf"``). + name : GenericCharacteristicName + Characteristic name to resolve (e.g., ``"pdf"``, ``"cdf"``). Returns ------- - Callable[[float], float] - Scalar callable for the requested characteristic. + Callable[[NumericArray, ...], NumericArray] + Array-semantic callable for the requested characteristic. Raises ------ @@ -55,322 +53,452 @@ def _resolve(distribution: Distribution, name: GenericCharacteristicName) -> Sca fn = distribution.query_method(name) except AttributeError as e: raise RuntimeError( - "Distribution must provide computation_strategy.querry_method(name, distribution)." + "Distribution must provide computation_strategy.query_method(name, distribution)." ) from e - def _wrap(x: float, **kwargs: Any) -> float: - return float(fn(x, **kwargs)) + _probe = np.array([0.0]) + try: + result = fn(_probe) + if np.ndim(result) == 0: + raise TypeError + except (TypeError, ValueError): ## TODO: delete wrapper + _fn_scalar = fn + def fn(x: NumericArray, **kwargs: Any) -> NumericArray: + return np.vectorize(lambda xi: _fn_scalar(float(xi), **kwargs))(x) - return _wrap + return fn -def _ppf_brentq_from_cdf( - cdf: ScalarFunc, - *, - most_left: bool = False, - x0: float = 0.0, - init_step: float = 1.0, - expand_factor: float = 2.0, - max_expand: int = 60, - x_tol: float = 1e-12, - y_tol: float = 0.0, - max_iter: int = 200, -) -> ScalarFunc: +def _collect_support(support: DiscreteSupport) -> np.ndarray: """ - Build a scalar ``ppf`` from a scalar ``cdf`` using bracket expansion - and a bisection-like search. + Materialise a discrete support into a sorted float64 array. + + For built-in support types the array is constructed directly with NumPy + (no Python-level iteration). For user-defined supports a one-time Python + iteration is unavoidable, but it occurs only at fit-time — never at + call-time. Parameters ---------- - cdf : Callable[[float], float] - Monotone Characteristic.CDF in ``[-inf, +inf] -> [0, 1]``. - most_left : bool, default False - If ``True``, return the leftmost quantile for flat Characteristic.CDF plateaus. - x0 : float, default 0.0 - Initial bracket center. - init_step : float, default 1.0 - Initial half-width for the bracket. - expand_factor : float, default 2.0 - Multiplicative factor for exponential bracket growth. - max_expand : int, default 60 - Maximum expansions while searching for a valid bracket. - x_tol : float, default 1e-12 - Absolute tolerance in ``x`` for stopping criterion. - y_tol : float, default 0.0 - Optional tolerance in Characteristic.CDF values to stop early when the bracket is flat. - max_iter : int, default 200 - Maximum iterations for the bisection-like refinement. + support : DiscreteSupport + A discrete support object. Recognised types are + ``ExplicitTableDiscreteSupport``, ``IntegerLatticeDiscreteSupport`` + (finite-bounds only), and any user-defined support that is iterable + or exposes ``values()`` / ``to_list()`` / ``first()`` + ``next()``. Returns ------- - Callable[[float], float] - Scalar ``ppf`` such that ``cdf(ppf(q)) ≈ q``. + np.ndarray + Sorted 1-D float64 array of all support points. - Notes - ----- - This helper clamps the extreme tail queries: ``q <= 0`` maps to ``-inf``, - ``q >= 1`` maps to ``+inf``. + Raises + ------ + RuntimeError + If the support is a left-unbounded or fully-unbounded + ``IntegerLatticeDiscreteSupport``, or if the support cannot be + iterated by any recognised strategy. """ + # --- Built-in: explicit table ------------------------------------------- + if isinstance(support, ExplicitTableDiscreteSupport): + return np.asarray(support.points, dtype=float) - def _expand_bracket(q: float) -> tuple[float, float, float, float]: - if q <= 0.0: - return float("-inf"), float("-inf"), 0.0, 0.0 - if q >= 1.0: - return float("inf"), float("inf"), 1.0, 1.0 - - step = init_step - L = x0 - step - R = x0 + step - FL = float(cdf(L)) - FR = float(cdf(R)) - - def left_ok(FL: float, FR: float) -> bool: - return (q > FL) and (q <= FR) + # --- Built-in: integer lattice ------------------------------------------ + if isinstance(support, IntegerLatticeDiscreteSupport): + if support.min_k is not None and support.max_k is not None: + first = support.first() + if first is None: + return np.empty(0, dtype=float) + return np.arange(first, support.max_k + 1, support.modulus, dtype=float) - def right_ok(FL: float, FR: float) -> bool: - return (q >= FL) and (q < FR) + # Right-bounded only: we DON'T materialise the infinite left tail. + # Callers that need tail summation handle this case separately. + if support.min_k is None and support.max_k is not None: + raise RuntimeError( + "Left-unbounded IntegerLatticeDiscreteSupport cannot be fully " + "materialised. Use _build_tail_table for pmf->cdf tail summation." + ) - def ok(FL: float, FR: float) -> bool: - return left_ok(FL, FR) if most_left else right_ok(FL, FR) + raise RuntimeError( + "Cannot materialise an unbounded IntegerLatticeDiscreteSupport. " + "Provide at least one bound." + ) - for _ in range(max_expand): - if ok(FL, FR): - return L, R, FL, FR - grow_left = not ((q > FL) if most_left else (q >= FL)) - grow_right = not ((q <= FR) if most_left else (q < FR)) + xs: list[float] = [] - if grow_left: - step *= expand_factor - L -= step - FL = float(cdf(L)) - if grow_right: - step *= expand_factor - R += step - FR = float(cdf(R)) + # 1. Direct iteration + try: + xs = [float(v) for v in support] + if xs: + return np.array(sorted(xs), dtype=float) + except Exception: + pass - if y_tol > 0.0: - if most_left and (q - y_tol >= FL) and (q - y_tol <= FR): - return L, R, FL, FR - if not most_left and (q + y_tol >= FL) and (q + y_tol <= FR): - return L, R, FL, FR + # 2. values() / to_list() + for attr in ("values", "to_list"): + if hasattr(support, attr): + try: + xs = [float(v) for v in getattr(support, attr)()] + if xs: + return np.array(sorted(xs), dtype=float) + except Exception: + pass - return L, R, FL, FR + # 3. Cursor API: first() / next(x) + if hasattr(support, "first") and hasattr(support, "next"): + try: + cur = support.first() + seen: set[Any] = set() + while cur is not None and cur not in seen: + seen.add(cur) + xs.append(float(cur)) + cur = support.next(cur) + if xs: + return np.array(sorted(xs), dtype=float) + except Exception: + pass - def _ppf(q: float, **kwargs: Any) -> float: - if q <= 0.0: - return float("-inf") - if q >= 1.0: - return float("inf") + raise RuntimeError("Discrete support must be iterable or expose first()/next().") - L, R, FL, FR = _expand_bracket(q) +def _build_tail_table( + support: IntegerLatticeDiscreteSupport, + pmf_func, + **options: Any, +) -> tuple[np.ndarray, np.ndarray]: + """ + Build a tail-probability table for a right-bounded, left-unbounded integer lattice. - if not (isfinite(L) and isfinite(R)): - return L if q <= 0.0 else R + Evaluates ``pmf`` on the finite grid ``[residue, residue + modulus, ..., max_k]`` + in a single vectorised call, then computes the reverse cumulative sum to obtain + the survival function at each grid point. - it = 0 - while it < max_iter and x_tol * (1.0 + max(abs(L), abs(R))) < (R - L): - M = 0.5 * (L + R) - FM = float(cdf(M)) + Parameters + ---------- + support : IntegerLatticeDiscreteSupport + Must satisfy ``support.max_k is not None``. + pmf_func : Callable[[NumericArray, ...], NumericArray] + Array-semantic PMF callable. + **options : Any + Keyword arguments forwarded to ``pmf_func``. - if most_left: - if q <= FM: - R, FR = M, FM - else: - L, FL = M, FM - else: - if q < FM: - R, FR = M, FM - else: - L, FL = M, FM + Returns + ------- + xs : np.ndarray + Lattice points from ``residue`` up to ``max_k``, shape ``(m,)``. + tail_from : np.ndarray + Tail probability array of shape ``(m + 1,)``. + ``tail_from[i] = P(X >= xs[i])``, with ``tail_from[0] = 1.0`` + and ``tail_from[m] = 0.0``. Normalised so that ``tail_from[0] == 1.0`` + to correct for any truncation of the finite grid. + """ + max_k = support.max_k + residue = support.residue + modulus = support.modulus - if y_tol > 0.0 and abs(FR - FL) <= y_tol: - break + xs = np.arange(residue, max_k + 1, modulus, dtype=float) + if xs.size == 0: + return np.empty(0, dtype=float), np.array([1.0, 0.0]) - it += 1 + pmf_vals = np.asarray(pmf_func(xs, **options), dtype=float) + pmf_vals = np.clip(pmf_vals, 0.0, None) - return R if most_left else L + tail_cumsum = np.concatenate(([0.0], np.cumsum(pmf_vals[::-1])))[::-1] + total = float(tail_cumsum[0]) + if total > 0.0: + tail_cumsum = tail_cumsum / total - return _ppf + return xs, np.clip(tail_cumsum, 0.0, 1.0) -def _num_derivative(f: ScalarFunc, x: float, h: float = 1e-5) -> float: +def _estimate_support_bounds( + cdf_func, + *, + eps: float = 1e-6, + x0: float = 0.0, + max_steps: int = 100, +) -> tuple[float, float]: """ - 5-point central numerical derivative used for ``cdf -> pdf``. + Estimate the effective support bounds ``[x_lowest, x_highest]`` from a CDF. + + Expands exponentially left and right from ``x0`` until + ``cdf(x_lowest) <= eps`` and ``cdf(x_highest) >= 1 - eps``. Parameters ---------- - f : Callable[[float], float] - Scalar function. - x : float - Evaluation point. - h : float, default 1e-5 - Step for the stencil. + cdf_func : Callable[[NumericArray], NumericArray] + Array-semantic CDF callable. + eps : float, default 1e-6 + Tail probability threshold. + x0 : float, default 0.0 + Starting point for the search. + max_steps : int, default 100 + Maximum expansion steps in each direction. Returns ------- - float - Approximated derivative ``f'(x)``. + x_lowest : float + Left bound where ``cdf(x_lowest) <= eps``. + x_highest : float + Right bound where ``cdf(x_highest) >= 1 - eps``. """ - if not isfinite(x): - return float("nan") - f1 = float(f(x + h)) - f_1 = float(f(x - h)) - f2 = float(f(x + 2 * h)) - f_2 = float(f(x - 2 * h)) - return float((-f2 + 8 * f1 - 8 * f_1 + f_2) / (12.0 * h)) + def _eval(x: float) -> float: + return float(np.asarray(cdf_func(np.array([x])), dtype=float).flat[0]) + + x_lowest = x0 + step = 1.0 + for _ in range(max_steps): + if _eval(x_lowest) <= eps: + break + x_lowest -= step + step *= 2.0 + + x_highest = x0 + step = 1.0 + for _ in range(max_steps): + if _eval(x_highest) >= 1.0 - eps: + break + x_highest += step + step *= 2.0 + + return x_lowest, x_highest + + +# --- Continuous fitters (1C) -------------------------------------- def fit_pdf_to_cdf_1C( distribution: Distribution, /, **kwargs: Any -) -> FittedComputationMethod[float, float]: +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Fit ``cdf`` from an analytical or resolvable ``pdf`` via numerical integration. - Parameters ---------- distribution : Distribution + Must expose a ``pdf`` via the computation strategy. Array semantics + are not required of the source PDF: ``quad`` calls it with scalar inputs. + limit : int, keyword, default 200 + Maximum number of ``quad`` subdivisions per integral. Returns ------- - FittedComputationMethod[float, float] - Fitted ``pdf -> cdf`` conversion. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``pdf → cdf`` conversion. Accepts array inputs; each element + is integrated independently. """ pdf_func = _resolve(distribution, CharacteristicName.PDF) - - def _cdf(x: float, **options: Any) -> float: - val, _ = _sp_integrate.quad( - lambda t: float(pdf_func(t, **options)), float("-inf"), x, limit=200 - ) - return float(np.clip(val, 0.0, 1.0)) - - cdf_func = cast(Callable[[float, KwArg(Any)], float], _cdf) - return FittedComputationMethod[float, float]( - target=CharacteristicName.CDF, sources=[CharacteristicName.PDF], func=cdf_func + limit = int(kwargs.get("limit", 200)) + + def _cdf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) + def _single(xi: float) -> float: + val, _ = _sp_integrate.quad( + lambda t: float(pdf_func(np.array([t]), **options).flat[0]), + float("-inf"), xi, + limit=limit, + ) + return float(np.clip(val, 0.0, 1.0)) + result = np.frompyfunc(_single, 1, 1)(x_array).astype(float) + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + cdf_func = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _cdf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.CDF, + sources=[CharacteristicName.PDF], + func=cdf_func ) def fit_cdf_to_pdf_1C( distribution: Distribution, /, **kwargs: Any -) -> FittedComputationMethod[float, float]: +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Fit ``pdf`` as a clipped numerical derivative of ``cdf``. - Parameters ---------- distribution : Distribution + Must expose an array-semantic ``cdf``. + h : float, keyword, default 1e-5 + Finite-difference step size. Smaller values improve accuracy for + smooth CDFs but increase sensitivity to floating-point noise. Returns ------- - FittedComputationMethod[float, float] - Fitted ``cdf -> pdf`` conversion. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``cdf → pdf`` conversion with full array semantics. """ cdf_func = _resolve(distribution, CharacteristicName.CDF) - - def _pdf(x: float, **options: Any) -> float: - def wrapped_cdf(t: float) -> float: - return cdf_func(t, **options) - - d = _num_derivative(wrapped_cdf, x, h=1e-5) - return float(max(d, 0.0)) - - pdf_func = cast(Callable[[float, KwArg(Any)], float], _pdf) - return FittedComputationMethod[float, float]( - target=CharacteristicName.PDF, sources=[CharacteristicName.CDF], func=pdf_func + h = float(kwargs.get("h", 1e-5)) + def _pdf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) + cdf_ph1 = np.asarray(cdf_func(x_array + h, **options), dtype=float) + cdf_mh1 = np.asarray(cdf_func(x_array - h, **options), dtype=float) + cdf_ph2 = np.asarray(cdf_func(x_array + 2 * h, **options), dtype=float) + cdf_mh2 = np.asarray(cdf_func(x_array - 2 * h, **options), dtype=float) + + derivative = (-cdf_ph2 + 8.0 * cdf_ph1 - 8.0 * cdf_mh1 + cdf_mh2) / (12.0 * h) + result = np.clip(derivative, 0.0, None) + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + pdf_func = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _pdf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.PDF, + sources=[CharacteristicName.CDF], + func=pdf_func ) def fit_cdf_to_ppf_1C( - distribution: Distribution, /, **options: Any -) -> FittedComputationMethod[float, float]: + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Fit ``ppf`` from a resolvable ``cdf`` using a robust bracketing procedure. - Parameters ---------- distribution : Distribution + Must expose an array-semantic ``cdf``. + n_grid : int, keyword, default 1024 + Reserved for future PDF-based bound estimation (currently unused). + max_iter : int, keyword, default 60 + Maximum bisection iterations. + x_tol : float, keyword, default 1e-10 + Early-stop tolerance on the bracket width. + eps : float, keyword, default 1e-6 + Tail probability threshold for support bound estimation. + x0 : float, keyword, default 0.0 + Starting point for the bound search. Returns ------- - FittedComputationMethod[float, float] - Fitted ``cdf -> ppf`` conversion. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``cdf → ppf`` conversion with full array semantics. """ - cdf_func = _resolve(distribution, CharacteristicName.CDF) - - def cdf_with_options(x: float) -> float: - return cdf_func(x, **options) - - ppf_func = _ppf_brentq_from_cdf(cdf_with_options, **options) + n_grid = int(kwargs.get("n_grid", 1024)) + max_iter = int(kwargs.get("max_iter", 60)) + x_tol = float(kwargs.get("x_tol", 1e-10)) + eps = float(kwargs.get("eps", 1e-6)) + x0 = float(kwargs.get("x0", 0.0)) - def _ppf(q: float, **kwargs: Any) -> float: - return ppf_func(q) - - ppf_cast = cast(Callable[[float, KwArg(Any)], float], _ppf) - return FittedComputationMethod[float, float]( - target=CharacteristicName.PPF, sources=[CharacteristicName.CDF], func=ppf_cast + cdf_func = _resolve(distribution, CharacteristicName.CDF) + x_lowest, x_highest = _estimate_support_bounds(cdf_func, eps=eps, x0=x0) + + def _ppf(q: NumericArray, **options: Any) -> NumericArray: + q_array = np.atleast_1d(np.asarray(q, dtype=float)) + result = np.full_like(q_array, np.nan) + result[q_array <= 0.0] = -np.inf + result[q_array >= 1.0] = np.inf + interior = (q_array > 0.0) & (q_array < 1.0) + if np.any(interior): + q_in = q_array[interior] + lowest = np.full_like(q_in, x_lowest) + highest = np.full_like(q_in, x_highest) + + for _ in range(max_iter): + if (highest - lowest).max() < x_tol: + break + mid = 0.5 * (lowest + highest) + cdf_mid = np.asarray(cdf_func(mid, **options), dtype=float) + cond = cdf_mid < q_in + lowest = np.where(cond, mid, lowest) + highest = np.where(cond, highest, mid) + + result[interior] = 0.5 * (lowest + highest) + + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + ppf_cast = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _ppf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.PPF, + sources=[CharacteristicName.CDF], + func=ppf_cast, ) def fit_ppf_to_cdf_1C( - distribution: Distribution, /, **_: Any -) -> FittedComputationMethod[float, float]: + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Fit ``cdf`` by numerically inverting a resolvable ``ppf`` with a root solver. - Parameters ---------- distribution : Distribution + Must expose an array-semantic ``ppf`` via the computation strategy. + q_lowest : float, keyword, default 1e-12 + Left bracket for the root search. + q_highest : float, keyword, default 1 - 1e-12 + Right bracket for the root search. + max_iter : int, keyword, default 256 + Maximum iterations for ``brentq`` per query point. Returns ------- - FittedComputationMethod[float, float] - Fitted ``ppf -> cdf`` conversion. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``ppf → cdf`` conversion. Accepts array inputs; each element + is inverted independently via ``brentq``. + """ + q_lowest = float(kwargs.get("q_lowest", 1e-12)) + q_highest = float(kwargs.get("q_highest", 1.0 - 1e-12)) + max_iter = float(kwargs.get("max_iter", 256)) ppf_func = _resolve(distribution, CharacteristicName.PPF) - def _cdf(x: float, **options: Any) -> float: - if not isfinite(x): - return 0.0 if x == float("-inf") else 1.0 - - def f(q: float) -> float: - return float(ppf_func(q, **options) - x) - - lo, hi = 1e-12, 1.0 - 1e-12 - flo, fhi = f(lo), f(hi) - if flo > 0.0: - return 0.0 - if fhi < 0.0: - return 1.0 - q = float(_sp_optimize.brentq(f, lo, hi, maxiter=256)) # type: ignore[arg-type] - return float(np.clip(q, 0.0, 1.0)) - - cdf_func = cast(Callable[[float, KwArg(Any)], float], _cdf) - return FittedComputationMethod[float, float]( - target=CharacteristicName.CDF, sources=[CharacteristicName.PPF], func=cdf_func - ) + x_lowest, x_highest = -np.inf, np.inf + def _cdf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) + + result = np.empty_like(x_array) + result[x_array <= x_lowest] = 0.0 + result[x_array >= x_highest] = 1.0 + interior = (x_array > x_lowest) & (x_array < x_highest) + if np.any(interior): + x_in = x_array[interior] -# --- Discrete fitters: pmf <-> cdf (1D) -------------------------------------- + def _single(xi: float) -> float: + def f(q: float) -> float: + return float(np.asarray(ppf_func(np.array([q]), **options)).flat[0]) - xi + try: + return float(_sp_optimize.brentq(f, q_lowest, q_highest, maxiter=max_iter)) + except ValueError: + return float("nan") + result[interior] = np.clip(np.frompyfunc(_single, 1, 1)(x_in).astype(float), 0.0, 1.0) + + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + cdf_func = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _cdf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.CDF, + sources=[CharacteristicName.PPF], + func=cdf_func + ) + + +# --- Discrete fitters (1D) -------------------------------------- def fit_pmf_to_cdf_1D( - distribution: Distribution, /, **_: Any -) -> FittedComputationMethod[float, float]: + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Build Characteristic.CDF from Characteristic.PMF on a discrete support by partial summation. - - The behaviour depends on the kind of discrete support: + Parameters + ---------- + distribution : Distribution + Must expose a discrete support on ``.support`` and an array-semantic + ``pmf`` via the computation strategy. + **kwargs : Any + Forwarded to ``pmf`` at fit-time (e.g. distribution parameters). - * For table-like supports and left-bounded integer lattices, the Characteristic.CDF is - constructed as a prefix sum over all support points ``k <= x``. - * For right-bounded integer lattices (support extends to ``-inf``), the Characteristic.CDF - is computed via a *tail* sum: + Returns + ------- + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``pmf → cdf`` conversion with full array semantics. - Characteristic.CDF(x) = 1 - sum_{k > x} pmf(k), + Raises + ------ + RuntimeError + If the distribution does not expose a discrete support, if the + support is empty, or if the support is a two-sided infinite integer + lattice. - which only involves finitely many points. - * Two-sided infinite integer lattices are not supported by this fitter — - a numerically truncated algorithm would require additional configuration - and is left for future work. + Notes + ----- + For right-bounded, left-unbounded ``IntegerLatticeDiscreteSupport`` the + CDF is computed as ``1 - P(X > x)`` using tail summation. + For all other finite supports the CDF is the prefix sum of PMF values. """ support = distribution.support if support is None or not isinstance(support, DiscreteSupport): @@ -378,331 +506,263 @@ def fit_pmf_to_cdf_1D( pmf_func = _resolve(distribution, CharacteristicName.PMF) - # Special case: right-bounded integer lattice - if isinstance(support, IntegerLatticeDiscreteSupport): - # Two-sided infinite lattice: exact pmf->cdf is not feasible without - # additional truncation policy. - if not support.is_left_bounded and not support.is_right_bounded: - raise RuntimeError( - "pmf->cdf for a two-sided infinite integer lattice is not supported " - "by the generic fitter. Provide an analytical Characteristic.CDF or a " - "custom fitter." - ) - - # Right-bounded, left-unbounded: use tail summation. - if not support.is_left_bounded and support.max_k is not None: - max_k = support.max_k + # --- Right-bounded, left-unbounded lattice: tail summation -------------- + if ( + isinstance(support, IntegerLatticeDiscreteSupport) + and not support.is_left_bounded + and support.is_right_bounded + ): + xs, tail_from = _build_tail_table(support, pmf_func, **kwargs) + max_k = float(support.max_k) - def _cdf(x: float, **kwargs: Any) -> float: - # Everything to the right of the upper bound has Characteristic.CDF == 1. - if x >= max_k: - return 1.0 + def _cdf_tail(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) - import math + idx = np.searchsorted(xs, x_array, side="right") + result = np.clip(1.0 - tail_from[idx], 0.0, 1.0) + result[x_array >= max_k] = 1.0 - # First integer strictly greater than x - threshold = int(math.floor(float(x))) - k = threshold + 1 - if k > max_k: - return 1.0 + return cast(NumericArray, result[0] if result.shape == (1,) else result) - # Align to the lattice: smallest k >= candidate with k ≡ residue (mod modulus) - offset = (k - support.residue) % support.modulus - if offset != 0: - k += support.modulus - offset - if k > max_k: - return 1.0 + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.CDF, + sources=[CharacteristicName.PMF], + func=cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _cdf_tail), + ) - tail = 0.0 - cur = k - while cur <= max_k: - tail += float(pmf_func(float(cur), **kwargs)) - cur += support.modulus + # --- Two-sided infinite lattice: not supported -------------------------- + if ( + isinstance(support, IntegerLatticeDiscreteSupport) + and not support.is_left_bounded + and not support.is_right_bounded + ): + raise RuntimeError( + "pmf->cdf for a two-sided infinite integer lattice is not supported " + "by the generic fitter. Provide an analytical CDF or a custom fitter." + ) - return float(np.clip(1.0 - tail, 0.0, 1.0)) + # --- General case: finite support --------------------------------------- + xs = _collect_support(support) + if xs.size == 0: + raise RuntimeError("Discrete support is empty.") - _cdf_func = cast(Callable[[float, KwArg(Any)], float], _cdf) + pmf_vals = np.clip(np.asarray(pmf_func(xs, **kwargs), dtype=float), 0.0, None) + cdf_vals = np.clip(np.maximum.accumulate(np.cumsum(pmf_vals)), 0.0, 1.0) - return FittedComputationMethod[float, float]( - target=CharacteristicName.CDF, sources=[CharacteristicName.PMF], func=_cdf_func - ) + def _cdf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) - def _cdf_prefix(x: float, **kwargs: Any) -> float: - s = 0.0 - for k in support.iter_leq(x): - s += float(pmf_func(float(k), **kwargs)) - return float(np.clip(s, 0.0, 1.0)) + idx = np.searchsorted(xs, x_array, side="right") - 1 + result = np.where(idx < 0, 0.0, cdf_vals[np.clip(idx, 0, cdf_vals.size - 1)]) - _cdf_func = cast(Callable[[float, KwArg(Any)], float], _cdf_prefix) + return cast(NumericArray, result[0] if result.shape == (1,) else result) - return FittedComputationMethod[float, float]( - target=CharacteristicName.CDF, sources=[CharacteristicName.PMF], func=_cdf_func + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.CDF, + sources=[CharacteristicName.PMF], + func=cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _cdf), ) def fit_cdf_to_pmf_1D( - distribution: Distribution, /, **_: Any -) -> FittedComputationMethod[float, float]: + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: """ - Extract Characteristic.PMF from Characteristic.CDF on a discrete support as jump sizes. - Parameters ---------- distribution : Distribution - Distribution exposing a discrete support on ``.support`` and a scalar + Must expose a discrete support on ``.support`` and an array-semantic ``cdf`` via the computation strategy. + **kwargs : Any + Forwarded to ``cdf`` at fit-time. Returns ------- - FittedComputationMethod[float, float] - Fitted ``cdf -> pmf`` conversion. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``cdf → pmf`` conversion with full array semantics. Raises ------ RuntimeError - If the distribution does not expose a discrete support. + If the distribution does not expose a discrete support or if the + support is empty. Notes ----- - ``pmf(x) = cdf(x) - cdf(prev(x))``, where ``prev(x)`` is the predecessor on - the support (with ``cdf(prev) := 0`` if no predecessor exists). + Points that do not belong to the discrete support always return ``0.0``, + consistent with PMF semantics. """ support = distribution.support if support is None or not isinstance(support, DiscreteSupport): raise RuntimeError("Discrete support is required for cdf->pmf.") - cdf_func = _resolve(distribution, CharacteristicName.CDF) - def _pmf(x: float, **kwargs: Any) -> float: - p = support.prev(x) - left = 0.0 if p is None else float(cdf_func(float(p), **kwargs)) - right = float(cdf_func(x)) - mass = max(right - left, 0.0) - return float(np.clip(mass, 0.0, 1.0)) + cdf_func = _resolve(distribution, CharacteristicName.CDF) - _pmf_func = cast(Callable[[float, KwArg(Any)], float], _pmf) + xs = _collect_support(support) + if xs.size == 0: + raise RuntimeError("Discrete support is empty.") - return FittedComputationMethod[float, float]( - target=CharacteristicName.PMF, sources=[CharacteristicName.CDF], func=_pmf_func - ) + cdf_vals = np.asarray(cdf_func(xs, **kwargs), dtype=float) + cdf_vals = np.clip(np.maximum.accumulate(cdf_vals), 0.0, 1.0) + pmf_vals = np.empty_like(cdf_vals) + pmf_vals[0] = cdf_vals[0] + pmf_vals[1:] = np.diff(cdf_vals) + pmf_vals = np.clip(pmf_vals, 0.0, 1.0) -# --- DISCRETE (1D): Characteristic.CDF <-> Characteristic.PPF --------------- + def _pmf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) + result = np.zeros_like(x_array) + idx = np.searchsorted(xs, x_array, side="left") + in_bounds = (idx >= 0) & (idx < xs.size) -def _collect_support_values(support: Any) -> np.ndarray: - """ - Try to extract a sorted array of support values from a discrete support object. + on_support = in_bounds & (xs[np.clip(idx, 0, xs.size - 1)] == x_array) + result[on_support] = pmf_vals[idx[on_support]] - For built-in discrete supports: + return cast(NumericArray, result[0] if result.shape == (1,) else result) - * ExplicitTableDiscreteSupport -> uses ``.points`` (already sorted and unique). - * IntegerLatticeDiscreteSupport with finite bounds -> explicit ``arange`` over the grid. + pmf_cast = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _pmf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.PMF, + sources=[CharacteristicName.CDF], + func=pmf_cast, + ) - For user-defined supports, the following shapes are auto-detected in order: - * iterable support: ``for x in support`` - * ``support.values()`` / ``support.to_list()`` - * cursor API: ``support.first()`` / ``support.next(x)`` +def fit_cdf_to_ppf_1D( + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: + """ + Parameters + ---------- + distribution : Distribution + Must expose a discrete support on ``.support`` and an array-semantic + ``cdf`` via the computation strategy. + **options : Any + Forwarded to ``cdf`` at fit-time. Returns ------- - np.ndarray - 1D float array of sorted support points. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``cdf → ppf`` conversion with full array semantics. Raises ------ RuntimeError - If the support cannot be iterated by any of the strategies or if it - corresponds to an unbounded integer lattice that cannot be enumerated. + If the distribution does not expose a discrete support or if the + support is empty. """ - # 0) Built-in explicit table support: finite, sorted, unique. - if isinstance(support, ExplicitTableDiscreteSupport): - xs = np.asarray(support.points, dtype=float) - return xs + support = distribution.support + if support is None or not isinstance(support, DiscreteSupport): + raise RuntimeError("Discrete support is required for cdf->ppf.") - # 0.1) Built-in integer lattice with finite bounds: finite grid. - if isinstance(support, IntegerLatticeDiscreteSupport): - if support.min_k is not None and support.max_k is not None: - first = support.first() - if first is None: - return np.asarray([], dtype=float) - xs = np.arange(first, support.max_k + 1, support.modulus, dtype=float) - return xs + cdf_func = _resolve(distribution, CharacteristicName.CDF) - raise RuntimeError( - "Cannot collect all support values for an unbounded IntegerLatticeDiscreteSupport. " - "Use lattice-aware fitters instead of _collect_support_values." - ) + xs = _collect_support(support) + if xs.size == 0: + raise RuntimeError("Discrete support is empty.") - xs_list: list[float] = [] + cdf_vals = np.asarray(cdf_func(xs, **kwargs), dtype=float) + cdf_vals = np.clip(np.maximum.accumulate(cdf_vals), 0.0, 1.0) - # 1) Direct iteration - try: - it = iter(support) # may raise TypeError - xs_list = [float(v) for v in it] - if xs_list: - return np.asarray(sorted(xs_list), dtype=float) - except Exception: - pass + x_first = float(xs[0]) + x_last = float(xs[-1]) - # 2) Common containers: values() / to_list() - for name in ("values", "to_list"): - if hasattr(support, name): - try: - seq = getattr(support, name)() - xs_list = [float(v) for v in seq] - if xs_list: - return np.asarray(sorted(xs_list), dtype=float) - except Exception: - pass + def _ppf(q: NumericArray, **options: Any) -> NumericArray: + q_array = np.atleast_1d(np.asarray(q, dtype=float)) - # 3) Cursor-like API: first(), next(x) - if hasattr(support, "first") and hasattr(support, "next"): - try: - cur = support.first() - seen: set[Any] = set() - while cur is not None and cur not in seen: - seen.add(cur) - xs_list.append(float(cur)) - cur = support.next(cur) - if xs_list: - return np.asarray(sorted(xs_list), dtype=float) - except Exception: - pass + result = np.empty_like(q_array) - raise RuntimeError("Discrete support must be iterable or expose first()/next().") + nan_mask = ~np.isfinite(q_array) + low_mask = (~nan_mask) & (q_array <= 0.0) + high_mask = (~nan_mask) & (q_array >= 1.0) + interior = ~nan_mask & ~low_mask & ~high_mask + result[nan_mask] = np.nan + result[low_mask] = x_first + result[high_mask] = x_last -def fit_cdf_to_ppf_1D( - distribution: Distribution, /, **options: Any -) -> FittedComputationMethod[float, float]: - """ - Fit **discrete** Characteristic.PPF from a resolvable Characteristic.CDF - and explicit discrete support. + if np.any(interior): + q_in = q_array[interior] + idx = np.searchsorted(cdf_vals, q_in, side="left") + idx = np.clip(idx, 0, xs.size - 1) + result[interior] = xs[idx] - Semantics - --------- - For a given ``q ∈ [0, 1]`` returns the **leftmost** support point ``x`` such that - ``Characteristic.CDF(x) ≥ q`` (step-quantile). + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + ppf_cast = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _ppf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.PPF, + sources=[CharacteristicName.CDF], + func=ppf_cast, + ) - Requires - -------- - distribution.support : discrete support container (iterable or cursor-like). +def fit_ppf_to_cdf_1D( + distribution: Distribution, /, **kwargs: Any, +) -> FittedComputationMethod[NumericArray, NumericArray]: + """ Parameters ---------- distribution : Distribution - **options : Any - Unused (kept for a uniform API with continuous fitters). + Must expose an array-semantic ``ppf`` via the computation strategy. + No explicit support object is required. + n_q_grid : int, keyword, default 4096 + Number of q-points used to probe the PPF at fit-time. Increase if + the distribution has many closely-spaced support points so that all + CDF steps are captured. Returns ------- - FittedComputationMethod[float, float] - Fitted ``cdf -> ppf`` conversion for discrete 1D distributions. + FittedComputationMethod[NumericArray, NumericArray] + Fitted ``ppf → cdf`` conversion with full array semantics. + """ - support = distribution.support - if support is None or not isinstance(support, DiscreteSupport): - raise RuntimeError("Discrete support is required for cdf->ppf.") + n_q_grid = int(kwargs.get("n_q_grid", 4096)) - cdf_func = _resolve(distribution, CharacteristicName.CDF) + ppf_func = _resolve(distribution, CharacteristicName.PPF) - xs = _collect_support_values(support) # sorted float array - if xs.size == 0: - raise RuntimeError("Discrete support is empty.") + eps = 1.0 / (n_q_grid + 1) + q_grid = np.linspace(eps, 1.0 - eps, n_q_grid) - # Pre-compute Characteristic.CDF on support and enforce monotonicity (safety against FP noise) - cdf_vals = np.asarray([float(cdf_func(float(x))) for x in xs], dtype=float) - cdf_vals = np.clip(np.maximum.accumulate(cdf_vals), 0.0, 1.0) + x_grid = np.asarray(ppf_func(q_grid, **kwargs), dtype=float) - def _ppf(q: float, **kwargs: Any) -> float: - if not isfinite(q): - return float("nan") - q = float(q) - if q <= 0.0: - return float(xs[0]) - if q >= 1.0: - return float(xs[-1]) - idx = int(np.searchsorted(cdf_vals, q, side="left")) - if idx >= xs.size: - idx = xs.size - 1 - return float(xs[idx]) - - _ppf_func = cast(Callable[[float, KwArg(Any)], float], _ppf) - - return FittedComputationMethod[float, float]( - target=CharacteristicName.PPF, sources=[CharacteristicName.CDF], func=_ppf_func - ) + change = np.concatenate(([True], x_grid[1:] != x_grid[:-1])) + xs_table = x_grid[change] + cdf_table = q_grid[change] + next_change_idx = np.where(change)[0] + right_idx = np.concatenate((next_change_idx[1:] - 1, [n_q_grid - 1])) + cdf_table = q_grid[right_idx] + cdf_table = np.clip(np.maximum.accumulate(cdf_table), 0.0, 1.0) -def fit_ppf_to_cdf_1D( - distribution: Distribution, /, **options: Any -) -> FittedComputationMethod[float, float]: - """ - Fit **discrete** Characteristic.CDF using only a resolvable Characteristic.PPF - via bisection on ``q``. + x_min = float(xs_table[0]) + x_max = float(xs_table[-1]) - Semantics - --------- - ``Characteristic.CDF(x) = sup { q ∈ [0,1] : Characteristic.PPF(q) ≤ x }`` + def _cdf(x: NumericArray, **options: Any) -> NumericArray: + x_array = np.atleast_1d(np.asarray(x, dtype=float)) - We implement this as a monotone predicate on ``q``: - ``f(q) := (Characteristic.PPF(q) ≤ x)``, and find the largest ``q`` with ``f(q) = True``. + result = np.empty_like(x_array) - Parameters - ---------- - distribution : Distribution - **options : Any - Optional tuning: - - q_tol : float, default 1e-12 - - max_iter : int, default 100 + left_mask = x_array < x_min + right_mask = x_array >= x_max + interior = ~left_mask & ~right_mask - Returns - ------- - FittedComputationMethod[float, float] - Fitted ``ppf -> cdf`` conversion for discrete 1D distributions. - """ - ppf_func = _resolve(distribution, CharacteristicName.PPF) - q_tol: float = float(options.get("q_tol", 1e-12)) - max_iter: int = int(options.get("max_iter", 100)) + result[left_mask] = 0.0 + result[right_mask] = 1.0 - # Quick edge probes (robust to weird Characteristic.PPF endpoints) - try: - p0 = float(ppf_func(0.0)) - except Exception: - p0 = float("-inf") - try: - p1 = float(ppf_func(1.0 - 1e-15)) - except Exception: - p1 = float("inf") - - def _cdf(x: float, **kwargs: Any) -> float: - if not isfinite(x): - return float("nan") - # Hard clamps from endpoint probes - if x < p0: - return 0.0 - if x >= p1: - return 1.0 - - lo, hi = 0.0, 1.0 - it = 0 - while hi - lo > q_tol and it < max_iter: - it += 1 - mid = 0.5 * (lo + hi) - try: - y = float(ppf_func(mid, **kwargs)) - except Exception: - # If Characteristic.PPF fails at mid, shrink conservatively towards lo - hi = mid - continue - if y <= x: - lo = mid # still True region - else: - hi = mid # crossed threshold - return float(np.clip(lo, 0.0, 1.0)) - - _cdf_func = cast(Callable[[float, KwArg(Any)], float], _cdf) - - return FittedComputationMethod[float, float]( - target=CharacteristicName.CDF, sources=[CharacteristicName.PPF], func=_cdf_func + if np.any(interior): + xi = x_array[interior] + idx = np.searchsorted(xs_table, xi, side="right") - 1 + idx = np.clip(idx, 0, cdf_table.size - 1) + result[interior] = cdf_table[idx] + + return cast(NumericArray, result[0] if result.shape == (1,) else result) + + cdf_cast = cast(Callable[[NumericArray, KwArg(Any)], NumericArray], _cdf) + return FittedComputationMethod[NumericArray, NumericArray]( + target=CharacteristicName.CDF, + sources=[CharacteristicName.PPF], + func=cdf_cast, )