From b97f8e10610fea3d0461a8d40fe7209b7eae7e02 Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Sat, 21 Mar 2026 03:22:37 -0400 Subject: [PATCH 1/6] Add streaming brute-force model selector --- README.md | 18 ++ src/agentopt/__init__.py | 2 + src/agentopt/model_selection/__init__.py | 2 + .../model_selection/streaming_brute_force.py | 195 ++++++++++++++++++ 4 files changed, 217 insertions(+) create mode 100644 src/agentopt/model_selection/streaming_brute_force.py diff --git a/README.md b/README.md index cd93ff7..e9f0231 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ AgentOpt provides advanced selection algorithms — you don't always need to eva | `ArmEliminationModelSelector` | Early pruning | Eliminates statistically dominated combinations | | `EpsilonLUCBModelSelector` | Best-arm identification | Stops when LUCB confidence gap is within user `epsilon` | | `ThresholdBanditSEModelSelector` | Thresholding objectives | Classifies combinations above/below user `threshold` | +| `StreamingBruteForceModelSelector` | Incoming data streams | Updates brute-force metrics incrementally as new batches arrive | | `LMProposalModelSelector` | LLM-guided search | Uses a proposer LLM to shortlist promising combinations | | `BayesianOptimizationModelSelector` | Expensive evaluations | GP-based optimization (requires `torch`, `botorch`) | @@ -208,6 +209,23 @@ selector = BruteForceModelSelector( ) ``` +### Streaming incoming data + +```python +from agentopt import StreamingBruteForceModelSelector + +selector = StreamingBruteForceModelSelector( + agent_fn=agent_maker, + models=models, + eval_fn=eval_fn, + dataset=warm_start_dataset, # required seed dataset +) + +for batch in stream_of_labeled_batches: + selector.update(batch) # batch: Sequence[(input_data, expected_answer)] + print("Current best:", selector.best_combo()) +``` + ## Development ```bash diff --git a/src/agentopt/__init__.py b/src/agentopt/__init__.py index fcf4ca1..73d7e3c 100644 --- a/src/agentopt/__init__.py +++ b/src/agentopt/__init__.py @@ -20,6 +20,7 @@ ModelResult, RandomSearchModelSelector, SelectionResults, + StreamingBruteForceModelSelector, ThresholdBanditSEModelSelector, ) @@ -45,6 +46,7 @@ "ArmEliminationModelSelector", "EpsilonLUCBModelSelector", "ThresholdBanditSEModelSelector", + "StreamingBruteForceModelSelector", "LMProposalModelSelector", "BayesianOptimizationModelSelector", # Result types diff --git a/src/agentopt/model_selection/__init__.py b/src/agentopt/model_selection/__init__.py index 4ed97dd..5976c8c 100644 --- a/src/agentopt/model_selection/__init__.py +++ b/src/agentopt/model_selection/__init__.py @@ -8,6 +8,7 @@ from .hill_climbing import HillClimbingModelSelector from .lm_proposal import LMProposalModelSelector from .random_search import RandomSearchModelSelector +from .streaming_brute_force import StreamingBruteForceModelSelector from .threshold_successive_elimination import ThresholdBanditSEModelSelector __all__ = [ @@ -19,6 +20,7 @@ "EpsilonLUCBModelSelector", "ThresholdBanditSEModelSelector", "LMProposalModelSelector", + "StreamingBruteForceModelSelector", "BayesianOptimizationModelSelector", "DatapointResult", "ModelResult", diff --git a/src/agentopt/model_selection/streaming_brute_force.py b/src/agentopt/model_selection/streaming_brute_force.py new file mode 100644 index 0000000..492c2d7 --- /dev/null +++ b/src/agentopt/model_selection/streaming_brute_force.py @@ -0,0 +1,195 @@ +""" +Streaming brute-force model selection. + +Evaluates every combination on incoming data batches and keeps cumulative +metrics updated over time. +""" + +import asyncio +from typing import Any, Callable, Dict, List, Optional, Tuple + +from ..base_models import Dataset, EvalFn, ModelCandidate, validate_dataset +from .base import BaseModelSelector, ModelResult, SelectionResults + + +class StreamingBruteForceModelSelector(BaseModelSelector): + """ + Brute-force selector that supports streaming updates. + + Usage: + - call ``update(batch)`` as new labeled data arrives + - call ``results()`` / ``best_combo()`` at any time + - optional: still supports ``select_best()`` over the initial dataset + """ + + def __init__( + self, + agent_fn: Callable[[Dict[str, ModelCandidate]], Any], + models: Dict[str, List[ModelCandidate]], + eval_fn: EvalFn, + dataset: Dataset, + invoke_fn: Optional[Callable] = None, + model_prices: Optional[Dict[str, Dict[str, float]]] = None, + tracker=None, + ) -> None: + super().__init__( + agent_fn=agent_fn, + models=models, + eval_fn=eval_fn, + dataset=dataset, + invoke_fn=invoke_fn, + model_prices=model_prices, + tracker=tracker, + ) + self._all_combos: List[Dict[str, ModelCandidate]] = self._all_combos() + self._combo_scores: Dict[int, List[float]] = { + i: [] for i in range(len(self._all_combos)) + } + self._combo_latencies: Dict[int, List[float]] = { + i: [] for i in range(len(self._all_combos)) + } + self._combo_dp_ids: Dict[int, List[str]] = { + i: [] for i in range(len(self._all_combos)) + } + self._seen_samples = 0 + self._seed_consumed = False + + def _run_selection( + self, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + # Keep select_best behavior: evaluate the provided dataset once. + if not self._seed_consumed: + result = self.update(self.dataset, parallel=parallel, max_concurrent=max_concurrent) + self._seed_consumed = True + return result + return self.results() + + def update( + self, batch: Dataset, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + """Evaluate all combinations on a new incoming batch.""" + validate_dataset(batch) + + if parallel: + asyncio.run(self._update_async(batch, max_concurrent=max_concurrent)) + else: + self._update_sequential(batch) + self._seen_samples += len(batch) + return self.results() + + def update_one( + self, + input_data: Any, + expected_answer: Any, + parallel: bool = False, + max_concurrent: int = 20, + ) -> SelectionResults: + """Convenience helper for a single incoming datapoint.""" + return self.update( + [(input_data, expected_answer)], + parallel=parallel, + max_concurrent=max_concurrent, + ) + + def results(self) -> SelectionResults: + """Return cumulative results over all streamed batches so far.""" + all_results: List[ModelResult] = [] + for idx, combo in enumerate(self._all_combos): + combo_name = self._combo_name(combo) + scores = self._combo_scores[idx] + latencies = self._combo_latencies[idx] + dp_ids = self._combo_dp_ids[idx] + + if scores: + accuracy = sum(scores) / len(scores) + avg_latency = sum(latencies) / len(latencies) + else: + accuracy = 0.0 + avg_latency = 0.0 + + input_tokens, output_tokens = self._fetch_tokens(combo_name) + dp_results = ( + self._build_datapoint_results(scores, latencies, dp_ids) if dp_ids else [] + ) + + all_results.append( + self._make_result( + model_name=combo_name, + accuracy=accuracy, + latency_seconds=avg_latency, + input_tokens=input_tokens, + output_tokens=output_tokens, + attribute="combination", + is_best=False, + datapoint_results=dp_results, + ) + ) + + best_info = self._find_best(all_results) + if best_info is not None: + best_name, _ = best_info + for result in all_results: + if result.model_name == best_name: + result.is_best = True + break + + return SelectionResults(results=all_results) + + def best_combo(self) -> Optional[Dict[str, str]]: + """Return current best combination as node->model dict.""" + return self.results().get_best_combo() + + def _update_sequential(self, batch: Dataset) -> None: + dp_offset = self._seen_samples + total = len(self._all_combos) + print(f"\nUpdating stream (sequential): {total} combinations, batch={len(batch)}") + + for idx, combo in enumerate(self._all_combos, 1): + combo_name = self._combo_name(combo) + print(f" [{idx}/{total}] {combo_name}") + scores, latencies, dp_ids = self._evaluate_combo( + combo, batch, label=combo_name, dp_offset=dp_offset + ) + self._combo_scores[idx - 1].extend(scores) + self._combo_latencies[idx - 1].extend(latencies) + self._combo_dp_ids[idx - 1].extend(dp_ids) + + async def _update_async(self, batch: Dataset, max_concurrent: int) -> None: + dp_offset = self._seen_samples + batch_size = len(batch) + n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size) + combo_sem = asyncio.Semaphore(n_combo) + total = len(self._all_combos) + print( + f"\nUpdating stream (async): {total} combinations, batch={batch_size}, " + f"max {max_concurrent} total concurrent" + ) + + async def _eval_combo( + idx: int, + combo: Dict[str, ModelCandidate], + ) -> Tuple[int, List[float], List[float], List[str]]: + async with combo_sem: + combo_name = self._combo_name(combo) + scores, latencies, dp_ids = await self._evaluate_combo_async( + combo, + batch, + label=combo_name, + max_concurrent=dp_concurrent, + dp_offset=dp_offset, + ) + return idx, scores, latencies, dp_ids + + results = await asyncio.gather( + *[_eval_combo(idx, combo) for idx, combo in enumerate(self._all_combos)], + return_exceptions=True, + ) + for i, res in enumerate(results): + if isinstance(res, Exception): + combo_name = self._combo_name(self._all_combos[i]) + print(f" [{combo_name}] failed: {res}") + continue + idx, scores, latencies, dp_ids = res + self._combo_scores[idx].extend(scores) + self._combo_latencies[idx].extend(latencies) + self._combo_dp_ids[idx].extend(dp_ids) From 7df825500c0b64d1cd7697b7f61b1832f27562ba Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Sat, 21 Mar 2026 20:17:31 -0400 Subject: [PATCH 2/6] Add convergence stop policy for streaming selection --- README.md | 89 +++++++++++++++++++ .../model_selection/streaming_brute_force.py | 86 +++++++++++++++++- 2 files changed, 174 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e9f0231..a61c67f 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,95 @@ All selectors share the same interface: results = selector.select_best(parallel=True, max_concurrent=20) ``` +## Streaming Model Selection (Detailed) + +`StreamingBruteForceModelSelector` is the online version of brute force for incoming labeled data. + +### When to use it + +Use it when: +- new `(input, expected)` samples arrive continuously (or in periodic batches), +- you want to keep re-ranking model combinations over time, +- you want the current best combo at any point without re-running full offline experiments. + +### What it optimizes + +The current best is chosen with the same tie-break logic as other selectors: +1. highest accuracy, +2. then lowest latency, +3. then lowest cost (if pricing is available). + +### Data contract + +Each batch must be: + +```python +Sequence[Tuple[input_data, expected_answer]] +``` + +Example: + +```python +batch = [ + ({"question": "What is 3+5?"}, "8"), + ({"question": "Capital of France?"}, "Paris"), +] +``` + +### API + +```python +from agentopt import StreamingBruteForceModelSelector + +selector = StreamingBruteForceModelSelector( + agent_fn=agent_maker, + models=models, + eval_fn=eval_fn, + dataset=warm_start_dataset, # seed batch used by select_best() +) + +# 1) Evaluate warm-start dataset once +selector.select_best(parallel=True, max_concurrent=20) + +# 2) Update incrementally as new data arrives +selector.update(batch, parallel=True, max_concurrent=20) + +# 3) Inspect current ranking / best combo +results = selector.results() +best_combo = selector.best_combo() +``` + +Methods: +- `select_best(...)`: evaluates the initial seed dataset once. +- `update(batch, ...)`: evaluates all combos on the new batch and appends metrics. +- `update_one(input_data, expected_answer, ...)`: single-sample convenience method. +- `results()`: cumulative leaderboard over all streamed batches seen so far. +- `best_combo()`: current best `{"node": "model"}` mapping. + +### Internal update loop + +For each incoming batch: +1. validate batch format, +2. evaluate every combo on that batch (sequential or async), +3. append new scores/latencies/tokens to cumulative per-combo state, +4. recompute ranking and mark the current best combo. + +### Minimal online loop + +```python +for batch in stream_of_labeled_batches: + selector.update(batch, parallel=True, max_concurrent=20) + best = selector.best_combo() + print("best now:", best) +``` + +### Notes / caveats + +- This is still **brute force per batch**: every combo is evaluated on each update. +- It is simple and stable, but can be expensive with large combo spaces. +- For large spaces, start with smaller candidate sets or move to pruning selectors + (`ArmEliminationModelSelector`, `EpsilonLUCBModelSelector`, etc.) for offline filtering first. + ## Framework Compatibility AgentOpt works with any LLM framework that uses `httpx` under the hood — which is virtually all of them: diff --git a/src/agentopt/model_selection/streaming_brute_force.py b/src/agentopt/model_selection/streaming_brute_force.py index 492c2d7..27399c2 100644 --- a/src/agentopt/model_selection/streaming_brute_force.py +++ b/src/agentopt/model_selection/streaming_brute_force.py @@ -20,8 +20,16 @@ class StreamingBruteForceModelSelector(BaseModelSelector): - call ``update(batch)`` as new labeled data arrives - call ``results()`` / ``best_combo()`` at any time - optional: still supports ``select_best()`` over the initial dataset + + Convergence policy (internal defaults, not user-facing): + - best combo must stay unchanged + - best accuracy improvement stays below 2% + - for 10 consecutive batch updates """ + _CONVERGENCE_DELTA = 0.02 + _CONVERGENCE_PATIENCE_BATCHES = 10 + def __init__( self, agent_fn: Callable[[Dict[str, ModelCandidate]], Any], @@ -53,6 +61,10 @@ def __init__( } self._seen_samples = 0 self._seed_consumed = False + self._converged = False + self._stable_batches = 0 + self._best_combo_signature: Optional[Tuple[Tuple[str, str], ...]] = None + self._best_accuracy: Optional[float] = None def _run_selection( self, parallel: bool = False, max_concurrent: int = 20, @@ -68,6 +80,13 @@ def update( self, batch: Dataset, parallel: bool = False, max_concurrent: int = 20, ) -> SelectionResults: """Evaluate all combinations on a new incoming batch.""" + if self._converged: + print( + "\nStreaming selector converged; skipping new batch. " + "Current best combo is stable." + ) + return self.results() + validate_dataset(batch) if parallel: @@ -75,7 +94,10 @@ def update( else: self._update_sequential(batch) self._seen_samples += len(batch) - return self.results() + + results = self.results() + self._update_convergence_state(results) + return results def update_one( self, @@ -139,6 +161,27 @@ def best_combo(self) -> Optional[Dict[str, str]]: """Return current best combination as node->model dict.""" return self.results().get_best_combo() + def has_converged(self) -> bool: + """Whether streaming updates have converged under internal policy.""" + return self._converged + + def should_continue(self) -> bool: + """Whether caller should continue feeding new batches.""" + return not self._converged + + def convergence_state(self) -> Dict[str, Any]: + """Return convergence diagnostics for logging/monitoring.""" + return { + "converged": self._converged, + "stable_batches": self._stable_batches, + "required_stable_batches": self._CONVERGENCE_PATIENCE_BATCHES, + "delta": self._CONVERGENCE_DELTA, + "best_accuracy": self._best_accuracy, + "best_combo": dict(self._best_combo_signature) + if self._best_combo_signature is not None + else None, + } + def _update_sequential(self, batch: Dataset) -> None: dp_offset = self._seen_samples total = len(self._all_combos) @@ -193,3 +236,44 @@ async def _eval_combo( self._combo_scores[idx].extend(scores) self._combo_latencies[idx].extend(latencies) self._combo_dp_ids[idx].extend(dp_ids) + + @staticmethod + def _combo_signature(combo: Optional[Dict[str, str]]) -> Optional[Tuple[Tuple[str, str], ...]]: + if combo is None: + return None + return tuple(sorted((str(k), str(v)) for k, v in combo.items())) + + def _update_convergence_state(self, results: SelectionResults) -> None: + best = results.get_best() + if best is None: + return + + combo_sig = self._combo_signature(results.get_best_combo()) + if combo_sig is None: + return + + current_acc = best.accuracy + if self._best_combo_signature is None or self._best_accuracy is None: + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + self._stable_batches = 0 + return + + combo_unchanged = combo_sig == self._best_combo_signature + improvement = current_acc - self._best_accuracy + + if combo_unchanged and improvement < self._CONVERGENCE_DELTA: + self._stable_batches += 1 + else: + self._stable_batches = 0 + + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + + if self._stable_batches >= self._CONVERGENCE_PATIENCE_BATCHES: + self._converged = True + print( + "\nStreaming selector converged: best combo stable for " + f"{self._stable_batches} batches with < " + f"{self._CONVERGENCE_DELTA:.0%} accuracy improvement." + ) From a194f861dd810b2cc7945234e33f4b6118fef480 Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Sat, 21 Mar 2026 20:19:51 -0400 Subject: [PATCH 3/6] Add fake-data streaming script and tests --- tests/streaming_brute_force_fake_data.py | 111 +++++++++++++++++++++++ tests/test_streaming_brute_force.py | 55 +++++++++++ 2 files changed, 166 insertions(+) create mode 100644 tests/streaming_brute_force_fake_data.py create mode 100644 tests/test_streaming_brute_force.py diff --git a/tests/streaming_brute_force_fake_data.py b/tests/streaming_brute_force_fake_data.py new file mode 100644 index 0000000..4c98d3a --- /dev/null +++ b/tests/streaming_brute_force_fake_data.py @@ -0,0 +1,111 @@ +""" +Run StreamingBruteForceModelSelector on fake streaming data. + +This script does not make external API calls; it uses a deterministic fake +agent so you can validate streaming update behavior quickly. +""" + +from __future__ import annotations + +import argparse +from random import Random +from typing import Any, Dict, List, Sequence, Tuple + +from agentopt import StreamingBruteForceModelSelector + + +def build_fake_stream(total_samples: int, seed: int) -> List[Tuple[Dict[str, int], int]]: + """Create a simple synthetic stream: expected output is exactly x.""" + rng = Random(seed) + stream: List[Tuple[Dict[str, int], int]] = [] + for _ in range(total_samples): + x = rng.randint(0, 100) + stream.append(({"x": x}, x)) + return stream + + +def agent_fn(combo: Dict[str, Any]): + model_name = combo["agent"] + + def _run(input_data: Dict[str, int]) -> int: + x = input_data["x"] + # Deterministic fake model behavior: + # - "good": always correct + # - "noisy": occasionally wrong + # - "bad": always off by +1 + if model_name == "good": + return x + if model_name == "noisy": + return x if (x % 5 != 0) else (x + 1) + return x + 1 + + return _run + + +def eval_fn(expected: int, actual: int) -> float: + return 1.0 if expected == actual else 0.0 + + +def batches_of( + stream: Sequence[Tuple[Dict[str, int], int]], batch_size: int +) -> List[List[Tuple[Dict[str, int], int]]]: + return [list(stream[i : i + batch_size]) for i in range(0, len(stream), batch_size)] + + +def run(total: int, warm_start: int, batch_size: int, seed: int) -> None: + if warm_start < 1: + raise ValueError("warm_start must be >= 1") + if batch_size < 1: + raise ValueError("batch_size must be >= 1") + if total <= warm_start: + raise ValueError("total must be greater than warm_start") + + stream = build_fake_stream(total, seed) + seed_dataset = stream[:warm_start] + incoming = stream[warm_start:] + + selector = StreamingBruteForceModelSelector( + agent_fn=agent_fn, + models={"agent": ["good", "noisy", "bad"]}, + eval_fn=eval_fn, + dataset=seed_dataset, + ) + + print("== Initial seed evaluation ==") + initial_results = selector.select_best() + print(initial_results) + print(f"Initial best: {initial_results.get_best_combo()}") + + print("\n== Streaming updates ==") + for i, batch in enumerate(batches_of(incoming, batch_size), 1): + results = selector.update(batch, parallel=True, max_concurrent=20) + best = results.get_best() + print( + f"Batch {i}: size={len(batch)} " + f"best={results.get_best_combo()} " + f"accuracy={best.accuracy:.3f} " + f"latency={best.latency_seconds:.4f}s" + ) + + print("\n== Final cumulative ranking ==") + print(selector.results()) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Test streaming model selection on fake data.") + parser.add_argument("--total", type=int, default=60, help="Total fake samples.") + parser.add_argument("--warm-start", type=int, default=12, help="Initial seed samples.") + parser.add_argument("--batch-size", type=int, default=8, help="Incoming batch size.") + parser.add_argument("--seed", type=int, default=7, help="Random seed.") + args = parser.parse_args() + + run( + total=args.total, + warm_start=args.warm_start, + batch_size=args.batch_size, + seed=args.seed, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/test_streaming_brute_force.py b/tests/test_streaming_brute_force.py new file mode 100644 index 0000000..880e9cd --- /dev/null +++ b/tests/test_streaming_brute_force.py @@ -0,0 +1,55 @@ +from agentopt.model_selection.streaming_brute_force import ( + StreamingBruteForceModelSelector, +) + + +def _agent_fn(combo): + model_name = combo["agent"] + + def _run(input_data): + x = input_data["x"] + return x if model_name == "good" else x + 1 + + return _run + + +def _eval_fn(expected, actual): + return 1.0 if expected == actual else 0.0 + + +def test_streaming_update_accumulates_and_tracks_best(): + selector = StreamingBruteForceModelSelector( + agent_fn=_agent_fn, + models={"agent": ["good", "bad"]}, + eval_fn=_eval_fn, + dataset=[({"x": 0}, 0)], + ) + + selector.update([({"x": 1}, 1)]) + selector.update([({"x": 2}, 2)]) + results = selector.results() + + by_name = {r.model_name: r for r in results} + assert by_name["agent=good"].accuracy == 1.0 + assert by_name["agent=bad"].accuracy == 0.0 + assert by_name["agent=good"].is_best + assert len(by_name["agent=good"].datapoint_results) == 2 + assert len(by_name["agent=bad"].datapoint_results) == 2 + + +def test_select_best_consumes_seed_dataset_once(): + selector = StreamingBruteForceModelSelector( + agent_fn=_agent_fn, + models={"agent": ["good", "bad"]}, + eval_fn=_eval_fn, + dataset=[({"x": 3}, 3)], + ) + + first = selector.select_best() + second = selector.select_best() + + first_by_name = {r.model_name: r for r in first} + second_by_name = {r.model_name: r for r in second} + + assert len(first_by_name["agent=good"].datapoint_results) == 1 + assert len(second_by_name["agent=good"].datapoint_results) == 1 From 81be75da3ec754f0f200c06b228ea2bf1c19ddbe Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Tue, 24 Mar 2026 01:13:19 -0400 Subject: [PATCH 4/6] Add streaming random search model selector --- README.md | 1 + docs/api/selectors.md | 5 + docs/concepts/algorithms.md | 35 ++- src/agentopt/__init__.py | 4 + src/agentopt/model_selection/__init__.py | 2 + .../streaming_random_search.py | 293 ++++++++++++++++++ tests/test_streaming_random_search.py | 88 ++++++ 7 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 src/agentopt/model_selection/streaming_random_search.py create mode 100644 tests/test_streaming_random_search.py diff --git a/README.md b/README.md index 5f4bfb2..53c1531 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ If you do not need the strict best model combination and want **more evaluation | `"auto"` (default) | General use | Automatically finds the best combination (wired to `arm_elimination` — strong best-arm identification with lower evaluation cost than `brute_force`) | | `"brute_force"` | Small search spaces | Evaluates all combinations | | `"random"` | Quick exploration | Samples a random fraction | +| `"streaming_random"` | Streaming / online updates | Samples once, then incrementally updates the sampled combos on new batches | | `"hill_climbing"` | Topology-aware search | Greedy search using model quality/speed rankings | | `"arm_elimination"` | Best-arm identification | Bandit; eliminates statistically dominated combinations | | `"epsilon_lucb"` | Extra cost savings when ε-optimal is enough | Bandit; stops when an epsilon-optimal best arm is identified | diff --git a/docs/api/selectors.md b/docs/api/selectors.md index 1c95f3b..0180357 100644 --- a/docs/api/selectors.md +++ b/docs/api/selectors.md @@ -41,6 +41,11 @@ Returns a [`SelectionResults`](results.md) object. members: false show_bases: false +::: agentopt.model_selection.streaming_random_search.StreamingRandomSearchModelSelector + options: + members: false + show_bases: false + ::: agentopt.model_selection.hill_climbing.HillClimbingModelSelector options: members: false diff --git a/docs/concepts/algorithms.md b/docs/concepts/algorithms.md index 0193e09..87502de 100644 --- a/docs/concepts/algorithms.md +++ b/docs/concepts/algorithms.md @@ -1,6 +1,6 @@ # Selection Algorithms -AgentOpt provides 8 selection algorithms. Choose based on your search space size and evaluation budget. +AgentOpt provides 9 selection algorithms. Choose based on your search space size and evaluation budget. ## At a Glance @@ -8,6 +8,7 @@ AgentOpt provides 8 selection algorithms. Choose based on your search space size |:----------|:---------|:------------|:---------| | [Brute Force](#brute-force) | Exhaustive | All | Small spaces (< 50 combos) | | [Random Search](#random-search) | Sampling | Configurable fraction | Quick baselines | +| [Streaming Random Search](#streaming-random-search) | Streaming sampling | Incremental | Online / incoming batches | | [Hill Climbing](#hill-climbing) | Greedy + restarts | Guided neighbors | Medium spaces | | [Arm Elimination](#arm-elimination) | Progressive pruning | Adaptive | Statistical early stopping | | [Epsilon LUCB](#epsilon-lucb) | ε-optimal LUCB | Adaptive | Cost savings when ε-optimal is enough | @@ -80,6 +81,38 @@ selector = RandomSearchModelSelector( --- +## Streaming Random Search + +Samples a random subset once, then incrementally updates those sampled combinations as new batches arrive. + +```python +from agentopt import StreamingRandomSearchModelSelector + +selector = StreamingRandomSearchModelSelector( + agent=MyAgent, + models=models, + eval_fn=eval_fn, + dataset=warm_start_dataset, # seed batch + sample_fraction=0.25, + seed=42, +) + +selector.select_best() # evaluate warm start once +selector.update(stream_batch_1) # keep updating online +selector.update(stream_batch_2) +best_combo = selector.best_combo() +``` + +| Parameter | Default | Description | +|:----------|:--------|:------------| +| `sample_fraction` | `0.25` | Fraction of combinations sampled once and reused | +| `seed` | `None` | Random seed for reproducible sampling | + +!!! success "When to use" + Streaming / online settings where data arrives over time and you want incremental updates instead of full reselection each round. + +--- + ## Hill Climbing Greedy local search with random restarts. Defines "neighbors" using model quality and speed rankings, so each step is an informed single-model swap. diff --git a/src/agentopt/__init__.py b/src/agentopt/__init__.py index c558032..e574d16 100644 --- a/src/agentopt/__init__.py +++ b/src/agentopt/__init__.py @@ -21,6 +21,7 @@ DatapointResult, ModelResult, RandomSearchModelSelector, + StreamingRandomSearchModelSelector, SelectionResults, StreamingBruteForceModelSelector, ThresholdBanditSEModelSelector, @@ -36,6 +37,7 @@ "auto": ArmEliminationModelSelector, "brute_force": BruteForceModelSelector, "random": RandomSearchModelSelector, + "streaming_random": StreamingRandomSearchModelSelector, "hill_climbing": HillClimbingModelSelector, "arm_elimination": ArmEliminationModelSelector, "epsilon_lucb": EpsilonLUCBModelSelector, @@ -65,6 +67,7 @@ def ModelSelector( ``"brute_force"``). Other options: ``"brute_force"``, ``"random"``, ``"hill_climbing"``, ``"arm_elimination"``, ``"epsilon_lucb"``, ``"threshold"``, ``"lm_proposal"``, + ``"streaming_random"``, ``"bayesian"``. **kwargs: Additional arguments passed to the selector (e.g. ``epsilon``, ``threshold``, ``sample_fraction``). @@ -96,6 +99,7 @@ def ModelSelector( # Selectors "BruteForceModelSelector", "RandomSearchModelSelector", + "StreamingRandomSearchModelSelector", "HillClimbingModelSelector", "ArmEliminationModelSelector", "EpsilonLUCBModelSelector", diff --git a/src/agentopt/model_selection/__init__.py b/src/agentopt/model_selection/__init__.py index eff9d43..3e4d249 100644 --- a/src/agentopt/model_selection/__init__.py +++ b/src/agentopt/model_selection/__init__.py @@ -8,6 +8,7 @@ from .lm_proposal import LMProposalModelSelector from .random_search import RandomSearchModelSelector from .streaming_brute_force import StreamingBruteForceModelSelector +from .streaming_random_search import StreamingRandomSearchModelSelector from .threshold_successive_elimination import ThresholdBanditSEModelSelector # Bayesian is optional (requires torch/botorch) @@ -26,6 +27,7 @@ "ThresholdBanditSEModelSelector", "LMProposalModelSelector", "StreamingBruteForceModelSelector", + "StreamingRandomSearchModelSelector", "BayesianOptimizationModelSelector", "DatapointResult", "ModelResult", diff --git a/src/agentopt/model_selection/streaming_random_search.py b/src/agentopt/model_selection/streaming_random_search.py new file mode 100644 index 0000000..aa082d2 --- /dev/null +++ b/src/agentopt/model_selection/streaming_random_search.py @@ -0,0 +1,293 @@ +""" +Streaming random-search model selection. + +Evaluates a fixed random subset of combinations on incoming data batches and +keeps cumulative metrics updated over time. +""" + +import asyncio +import math +import random +from typing import Any, Dict, List, Optional, Tuple + +from ..base_models import Dataset, EvalFn, ModelCandidate, validate_dataset +from .base import BaseModelSelector, ModelResult, SelectionResults + + +class StreamingRandomSearchModelSelector(BaseModelSelector): + """ + Random-search selector that supports streaming updates. + + A subset of combinations is sampled once at initialization and reused for + all incoming batches. + """ + + _CONVERGENCE_DELTA = 0.02 + _CONVERGENCE_PATIENCE_BATCHES = 10 + + def __init__( + self, + agent: Any = None, + models: Dict[str, List[ModelCandidate]] = None, + eval_fn: EvalFn = None, + dataset: Dataset = None, + sample_fraction: float = 0.25, + seed: Optional[int] = None, + model_prices: Optional[Dict[str, Dict[str, float]]] = None, + tracker=None, + ) -> None: + super().__init__( + agent=agent, + models=models, + eval_fn=eval_fn, + dataset=dataset, + model_prices=model_prices, + tracker=tracker, + ) + if not 0 < sample_fraction <= 1: + raise ValueError("sample_fraction must be in the range (0, 1].") + + self.sample_fraction = sample_fraction + self.seed = seed + + self._all_combos: List[Dict[str, ModelCandidate]] = self._all_combos() + total = len(self._all_combos) + sample_size = max(1, math.ceil(total * self.sample_fraction)) + sample_size = min(sample_size, total) + + if sample_size == total: + self._sampled_indices = list(range(total)) + else: + rng = random.Random(self.seed) + self._sampled_indices = sorted(rng.sample(range(total), sample_size)) + self._sampled_combos = [self._all_combos[i] for i in self._sampled_indices] + + self._combo_scores: Dict[int, List[float]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._combo_latencies: Dict[int, List[float]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._combo_dp_ids: Dict[int, List[str]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._seen_samples = 0 + self._seed_consumed = False + + self._converged = False + self._stable_batches = 0 + self._best_combo_signature: Optional[Tuple[Tuple[str, str], ...]] = None + self._best_accuracy: Optional[float] = None + + def _run_selection( + self, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + if not self._seed_consumed: + result = self.update(self.dataset, parallel=parallel, max_concurrent=max_concurrent) + self._seed_consumed = True + return result + return self.results() + + def update( + self, batch: Dataset, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + """Evaluate sampled combinations on a new incoming batch.""" + if self._converged: + print( + "\nStreaming random search converged; skipping new batch. " + "Current best combo is stable." + ) + return self.results() + + validate_dataset(batch) + if parallel: + asyncio.run(self._update_async(batch, max_concurrent=max_concurrent)) + else: + self._update_sequential(batch) + self._seen_samples += len(batch) + + results = self.results() + self._update_convergence_state(results) + return results + + def update_one( + self, + input_data: Any, + expected_answer: Any, + parallel: bool = False, + max_concurrent: int = 20, + ) -> SelectionResults: + return self.update( + [(input_data, expected_answer)], + parallel=parallel, + max_concurrent=max_concurrent, + ) + + def results(self) -> SelectionResults: + all_results: List[ModelResult] = [] + for idx, combo in enumerate(self._sampled_combos): + combo_name = self._combo_name(combo) + scores = self._combo_scores[idx] + latencies = self._combo_latencies[idx] + dp_ids = self._combo_dp_ids[idx] + + if scores: + accuracy = sum(scores) / len(scores) + avg_latency = sum(latencies) / len(latencies) + else: + accuracy, avg_latency = 0.0, 0.0 + + input_tokens, output_tokens = self._fetch_tokens(combo_name) + dp_results = ( + self._build_datapoint_results(scores, latencies, dp_ids) if dp_ids else [] + ) + all_results.append( + self._make_result( + model_name=combo_name, + accuracy=accuracy, + latency_seconds=avg_latency, + input_tokens=input_tokens, + output_tokens=output_tokens, + attribute="combination", + is_best=False, + datapoint_results=dp_results, + ) + ) + + best_info = self._find_best(all_results) + if best_info is not None: + best_name, _ = best_info + for result in all_results: + if result.model_name == best_name: + result.is_best = True + break + + return SelectionResults(results=all_results) + + def best_combo(self) -> Optional[Dict[str, str]]: + return self.results().get_best_combo() + + def has_converged(self) -> bool: + return self._converged + + def should_continue(self) -> bool: + return not self._converged + + def convergence_state(self) -> Dict[str, Any]: + return { + "converged": self._converged, + "stable_batches": self._stable_batches, + "required_stable_batches": self._CONVERGENCE_PATIENCE_BATCHES, + "delta": self._CONVERGENCE_DELTA, + "best_accuracy": self._best_accuracy, + "best_combo": dict(self._best_combo_signature) + if self._best_combo_signature is not None + else None, + "sampled_combinations": len(self._sampled_combos), + "sample_fraction": self.sample_fraction, + } + + def _update_sequential(self, batch: Dataset) -> None: + dp_offset = self._seen_samples + total = len(self._sampled_combos) + print( + f"\nUpdating stream random-search (sequential): " + f"{total}/{len(self._all_combos)} combinations, batch={len(batch)}" + ) + + for idx, combo in enumerate(self._sampled_combos, 1): + combo_name = self._combo_name(combo) + print(f" [{idx}/{total}] {combo_name}") + scores, latencies, dp_ids = self._evaluate_combo( + combo, batch, label=combo_name, dp_offset=dp_offset + ) + self._combo_scores[idx - 1].extend(scores) + self._combo_latencies[idx - 1].extend(latencies) + self._combo_dp_ids[idx - 1].extend(dp_ids) + + async def _update_async(self, batch: Dataset, max_concurrent: int) -> None: + dp_offset = self._seen_samples + batch_size = len(batch) + n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size) + combo_sem = asyncio.Semaphore(n_combo) + total = len(self._sampled_combos) + print( + f"\nUpdating stream random-search (async): " + f"{total}/{len(self._all_combos)} combinations, batch={batch_size}, " + f"max {max_concurrent} total concurrent" + ) + + async def _eval_combo( + idx: int, + combo: Dict[str, ModelCandidate], + ) -> Tuple[int, List[float], List[float], List[str]]: + async with combo_sem: + combo_name = self._combo_name(combo) + scores, latencies, dp_ids = await self._evaluate_combo_async( + combo, + batch, + label=combo_name, + max_concurrent=dp_concurrent, + dp_offset=dp_offset, + ) + return idx, scores, latencies, dp_ids + + results = await asyncio.gather( + *[ + _eval_combo(idx, combo) + for idx, combo in enumerate(self._sampled_combos) + ], + return_exceptions=True, + ) + for i, res in enumerate(results): + if isinstance(res, Exception): + combo_name = self._combo_name(self._sampled_combos[i]) + print(f" [{combo_name}] failed: {res}") + continue + idx, scores, latencies, dp_ids = res + self._combo_scores[idx].extend(scores) + self._combo_latencies[idx].extend(latencies) + self._combo_dp_ids[idx].extend(dp_ids) + + @staticmethod + def _combo_signature( + combo: Optional[Dict[str, str]], + ) -> Optional[Tuple[Tuple[str, str], ...]]: + if combo is None: + return None + return tuple(sorted((str(k), str(v)) for k, v in combo.items())) + + def _update_convergence_state(self, results: SelectionResults) -> None: + best = results.get_best() + if best is None: + return + + combo_sig = self._combo_signature(results.get_best_combo()) + if combo_sig is None: + return + + current_acc = best.accuracy + if self._best_combo_signature is None or self._best_accuracy is None: + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + self._stable_batches = 0 + return + + combo_unchanged = combo_sig == self._best_combo_signature + improvement = current_acc - self._best_accuracy + + if combo_unchanged and improvement < self._CONVERGENCE_DELTA: + self._stable_batches += 1 + else: + self._stable_batches = 0 + + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + + if self._stable_batches >= self._CONVERGENCE_PATIENCE_BATCHES: + self._converged = True + print( + "\nStreaming random search converged: best combo stable for " + f"{self._stable_batches} batches with < " + f"{self._CONVERGENCE_DELTA:.0%} accuracy improvement." + ) diff --git a/tests/test_streaming_random_search.py b/tests/test_streaming_random_search.py new file mode 100644 index 0000000..1bd5e85 --- /dev/null +++ b/tests/test_streaming_random_search.py @@ -0,0 +1,88 @@ +"""Unit tests for StreamingRandomSearchModelSelector.""" + +from agentopt import ModelSelector, StreamingRandomSearchModelSelector + + +class DummyAgent: + """Simple deterministic agent for selector tests.""" + + def __init__(self, models): + self.models = models + + def run(self, _input_data): + return f"{self.models['planner']}|{self.models['solver']}" + + +def exact_match(expected, actual): + return float(expected == actual) + + +def test_streaming_random_sampling_size(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={ + "planner": ["p1", "p2", "p3"], + "solver": ["s1", "s2"], + }, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + sample_fraction=0.5, + seed=7, + ) + # 3 * 2 = 6 combos; sample_fraction=0.5 => ceil(3) = 3 sampled combos. + assert len(selector._sampled_combos) == 3 + assert len(selector._all_combos) == 6 + + +def test_streaming_random_updates_best_combo_over_time(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={"planner": ["p1", "p2"], "solver": ["s1", "s2"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + sample_fraction=1.0, + seed=3, + ) + + initial = selector.select_best() + assert initial.get_best_combo() == {"planner": "p1", "solver": "s1"} + + # Stream in data that favors a different combination. + selector.update( + [ + ({"q": "b1"}, "p2|s2"), + ({"q": "b2"}, "p2|s2"), + ({"q": "b3"}, "p2|s2"), + ] + ) + best_after = selector.results().get_best_combo() + assert best_after == {"planner": "p2", "solver": "s2"} + + +def test_streaming_random_converges_after_stable_batches(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={"planner": ["p1", "p2"], "solver": ["s1", "s2"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p2|s2")], + sample_fraction=1.0, + seed=11, + ) + + selector.select_best() + for _ in range(10): + selector.update_one({"q": "stream"}, "p2|s2") + + assert selector.has_converged() is True + assert selector.should_continue() is False + + +def test_model_selector_factory_supports_streaming_random(): + selector = ModelSelector( + agent=DummyAgent, + models={"planner": ["p1"], "solver": ["s1"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + method="streaming_random", + ) + assert isinstance(selector, StreamingRandomSearchModelSelector) From e32eafe4e328ed80dffc2621203bf980a2b2a8cd Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Wed, 25 Mar 2026 16:53:37 -0400 Subject: [PATCH 5/6] Update src/agentopt/model_selection/streaming_brute_force.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/agentopt/model_selection/streaming_brute_force.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/agentopt/model_selection/streaming_brute_force.py b/src/agentopt/model_selection/streaming_brute_force.py index 27399c2..0410479 100644 --- a/src/agentopt/model_selection/streaming_brute_force.py +++ b/src/agentopt/model_selection/streaming_brute_force.py @@ -32,20 +32,18 @@ class StreamingBruteForceModelSelector(BaseModelSelector): def __init__( self, - agent_fn: Callable[[Dict[str, ModelCandidate]], Any], + agent: Any, models: Dict[str, List[ModelCandidate]], eval_fn: EvalFn, dataset: Dataset, - invoke_fn: Optional[Callable] = None, model_prices: Optional[Dict[str, Dict[str, float]]] = None, tracker=None, ) -> None: super().__init__( - agent_fn=agent_fn, + agent=agent, models=models, eval_fn=eval_fn, dataset=dataset, - invoke_fn=invoke_fn, model_prices=model_prices, tracker=tracker, ) From ec7b01041d34ecd7311036b37fb4c462725eb85d Mon Sep 17 00:00:00 2001 From: Armaan Agrawal Date: Wed, 25 Mar 2026 16:54:05 -0400 Subject: [PATCH 6/6] Update src/agentopt/model_selection/streaming_brute_force.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/agentopt/model_selection/streaming_brute_force.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/agentopt/model_selection/streaming_brute_force.py b/src/agentopt/model_selection/streaming_brute_force.py index 0410479..ecdc468 100644 --- a/src/agentopt/model_selection/streaming_brute_force.py +++ b/src/agentopt/model_selection/streaming_brute_force.py @@ -47,15 +47,15 @@ def __init__( model_prices=model_prices, tracker=tracker, ) - self._all_combos: List[Dict[str, ModelCandidate]] = self._all_combos() + self._combos: List[Dict[str, ModelCandidate]] = self._all_combos() self._combo_scores: Dict[int, List[float]] = { - i: [] for i in range(len(self._all_combos)) + i: [] for i in range(len(self._combos)) } self._combo_latencies: Dict[int, List[float]] = { - i: [] for i in range(len(self._all_combos)) + i: [] for i in range(len(self._combos)) } self._combo_dp_ids: Dict[int, List[str]] = { - i: [] for i in range(len(self._all_combos)) + i: [] for i in range(len(self._combos)) } self._seen_samples = 0 self._seed_consumed = False