diff --git a/CHANGELOG.md b/CHANGELOG.md index 65dd13b..2ca77cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project are documented here. ### Added - `BootstrapDistribution.summarize` has new parameter `precision: Optional[Union[int, Literal["auto"]]] = None` to control rounding behaviour. +- `bootstrap` now supports parallelism via ThreadPoolExecutor and has new parameter `n_jobs: Optional[int] = None` to control this behaviour. ### Changed - `BootstrapSummary.round` parameter `precision` now defaults to `Literal["auto"]`. diff --git a/README.md b/README.md index 93c930a..8c8efa9 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ Performs Bayesian bootstrapping on `data` using the given statistic. - `n_boot`: number of bootstrap samples - `seed`: random seed (optional) - `blocksize`: number of resamples processed per block +- `n_jobs`: number of worker threads for parallel computation - `fn_kwargs`: optional dict of extra parameters for `statistic_fn` **Returns** diff --git a/bbstat/bootstrap.py b/bbstat/bootstrap.py index ab9d524..fd1c6be 100644 --- a/bbstat/bootstrap.py +++ b/bbstat/bootstrap.py @@ -30,23 +30,49 @@ See the function-level docstring of `bootstrap` for full details. """ -from typing import Any, Callable, Dict, Optional, Union +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Callable, Dict, Optional, Union, cast import numpy as np from .evaluate import BootstrapDistribution from .registry import get_statistic_fn from .resample import resample +from .statistics import FArray __all__ = ["bootstrap"] +def _bootstrap_worker( + data: Any, + statistic_fn: Callable, + n_data: int, + n_boot: int, + seed: Optional[int], + blocksize: Optional[int], + fn_kwargs: Optional[Dict[str, Any]], +) -> FArray: + return np.array( + [ + statistic_fn(data=data, weights=weights, **(fn_kwargs or {})) + for weights in resample( + n_boot=n_boot, + n_data=n_data, + seed=seed, + blocksize=blocksize, + ) + ] + ) + + def bootstrap( data: Any, statistic_fn: Union[str, Callable], n_boot: int = 1000, seed: Optional[int] = None, blocksize: Optional[int] = None, + n_jobs: Optional[int] = None, fn_kwargs: Optional[Dict[str, Any]] = None, ) -> BootstrapDistribution: """ @@ -67,6 +93,10 @@ def bootstrap( blocksize (int, optional): The block size for resampling. If provided, resampling weights are generated in blocks of this size. Defaults to `None`, meaning all resampling weights are generated at once. + n_jobs (int, optional): Number of worker threads for parallel computation. + If `None` (default), `0`, or `1`, the computation runs serially. + A positive integer requests that many threads. + A negative integer (e.g., `-1`) uses all available CPU cores. fn_kwargs (Dict[str, Any], optional): Additional keyword arguments to be passed to the `statistic_fn` for each resample. Default is `None`. @@ -109,15 +139,36 @@ def bootstrap( if isinstance(statistic_fn, str): statistic_fn = get_statistic_fn(statistic_fn) - estimates = np.array( - [ - statistic_fn(data=data, weights=weights, **(fn_kwargs or {})) - for weights in resample( - n_boot=n_boot, - n_data=n_data, - seed=seed, - blocksize=blocksize, - ) - ] - ) + if n_jobs is not None and n_jobs <= -1: + n_jobs = os.cpu_count() # may return None + n_jobs = cast(int, n_jobs or 1) # map cases 0 and None to serial behavior + if n_jobs == 1: + estimates = _bootstrap_worker( + data=data, + statistic_fn=statistic_fn, + n_data=n_data, + n_boot=n_boot, + seed=seed, + blocksize=blocksize, + fn_kwargs=fn_kwargs, + ) + else: + chunk_size = int(np.ceil(n_boot / n_jobs)) + seeds = np.random.SeedSequence(seed).spawn(n_jobs) + with ThreadPoolExecutor(max_workers=n_jobs) as ex: + futures = [ + ex.submit( + _bootstrap_worker, + data, + statistic_fn, + n_data, + min(chunk_size, n_boot - i * chunk_size), + int(child_seed.generate_state(1)[0]), + blocksize, + fn_kwargs, + ) + for i, child_seed in enumerate(seeds) + if i * chunk_size < n_boot + ] + estimates = np.concatenate([f.result() for f in as_completed(futures)]) return BootstrapDistribution(estimates=estimates) diff --git a/docs/getting_started.md b/docs/getting_started.md index bf3c385..58e03b5 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -148,6 +148,38 @@ print(summary) The same pattern applies if your statistic takes multiple arrays (e.g., (x, y)). The function receives the data and weights, computes its result, and returns a single float. + +## Parallel Usage + +The `bootstrap` function can run in parallel to speed up computation, especially for very large datasets or custom statistics that are expensive to compute. By default, the function runs serially (`n_jobs=None`), which is often fastest for small to medium-sized datasets because parallelisation has overhead. + +### Controlling Parallelism + +You can control the number of worker threads using the `n_jobs` parameter: + +``` python +# Serial execution (default) +dist = bootstrap(data, statistic_fn="mean", n_boot=1000) + +# Use 4 threads +dist = bootstrap(data, statistic_fn="mean", n_boot=1000, n_jobs=4) + +# Use all available CPU cores +dist = bootstrap(data, statistic_fn="mean", n_boot=1000, n_jobs=-1) +``` + +### Notes: + +- If `n_jobs=None`, `0`, or `1`, the job runs serially +- If `n_jobs` is a positive integer, the job uses that many threads +- If `n_jobs` is a negative integer, the job uses all available CPU cores + +### When Parallelisation Helps + +Parallel execution can greatly reduce runtime for large numbers of bootstrap samples or heavy custom statistics, especially if your statistic function performs multiple NumPy operations or other CPU-heavy work. For very small tasks, serial execution is often faster due to the overhead of starting threads. + +Use `n_jobs=-1` with caution in CI or shared environments: it will attempt to use all CPU cores, which may affect other tasks. + ## Common questions and pitfalls - **Why are the credible intervals sometimes narrow?** Bayesian bootstrapping assumes that the observed data already represent the full population support. Uncertainty is only about how much weight each observation should get, not about unseen data. If the sample is small or has heavy tails, results can appear overconfident. diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index b9fae75..39dada6 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -76,6 +76,69 @@ def test_bootstrap_random(data_random: FArray) -> None: np.testing.assert_allclose(summary.ci_high, 0.05, atol=0.07) +@pytest.mark.parametrize( + "n_jobs", + [ + pytest.param(2), + pytest.param(-1), + ], +) +def test_bootstrap_random_serial_matches_parallel( + data_random: FArray, n_jobs: int +) -> None: + distribution_serial = bootstrap( + data=data_random, + statistic_fn=compute_weighted_aggregate, + n_boot=20, + seed=1, + ) + distribution_n_jobs = bootstrap( + data=data_random, + statistic_fn=compute_weighted_aggregate, + n_boot=20, + seed=1, + n_jobs=n_jobs, + ) + assert len(distribution_serial) == len(distribution_n_jobs) + summary_serial = distribution_serial.summarize(level=0.95, precision="auto") + summary_n_jobs = distribution_n_jobs.summarize(level=0.95, precision="auto") + np.testing.assert_allclose(summary_n_jobs.mean, summary_serial.mean, atol=0.02) + np.testing.assert_allclose(summary_n_jobs.ci_low, summary_serial.ci_low, atol=0.02) + np.testing.assert_allclose( + summary_n_jobs.ci_high, summary_serial.ci_high, atol=0.02 + ) + + +@pytest.mark.parametrize( + "n_jobs", + [ + pytest.param(2), + pytest.param(-1), + ], +) +def test_bootstrap_random_parallel_reproducible( + data_random: FArray, n_jobs: int +) -> None: + distribution0 = bootstrap( + data=data_random, + statistic_fn=compute_weighted_aggregate, + n_boot=20, + seed=1, + n_jobs=n_jobs, + ) + distribution1 = bootstrap( + data=data_random, + statistic_fn=compute_weighted_aggregate, + n_boot=20, + seed=1, + n_jobs=n_jobs, + ) + np.testing.assert_allclose( + np.sort(distribution0.estimates), + np.sort(distribution1.estimates), + ) + + @pytest.mark.parametrize( "name, fn_kwargs", [