diff --git a/README.md b/README.md index 010939d..53c1531 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ If you do not need the strict best model combination and want **more evaluation | `"auto"` (default) | General use | Automatically finds the best combination (wired to `arm_elimination` — strong best-arm identification with lower evaluation cost than `brute_force`) | | `"brute_force"` | Small search spaces | Evaluates all combinations | | `"random"` | Quick exploration | Samples a random fraction | +| `"streaming_random"` | Streaming / online updates | Samples once, then incrementally updates the sampled combos on new batches | | `"hill_climbing"` | Topology-aware search | Greedy search using model quality/speed rankings | | `"arm_elimination"` | Best-arm identification | Bandit; eliminates statistically dominated combinations | | `"epsilon_lucb"` | Extra cost savings when ε-optimal is enough | Bandit; stops when an epsilon-optimal best arm is identified | @@ -261,6 +262,23 @@ selector = ModelSelector( ) ``` +**Streaming incoming data** — use `StreamingBruteForceModelSelector` to update rankings incrementally as labeled batches arrive: + +```python +from agentopt import StreamingBruteForceModelSelector + +selector = StreamingBruteForceModelSelector( + agent_fn=agent_maker, + models=models, + eval_fn=eval_fn, + dataset=warm_start_dataset, # required seed dataset +) + +for batch in stream_of_labeled_batches: + selector.update(batch) # batch: Sequence[(input_data, expected_answer)] + print("Current best:", selector.best_combo()) +``` + ## Documentation Full documentation at **[agentoptimizer.github.io/agentopt](https://agentoptimizer.github.io/agentopt/)** — including detailed guides on the [Results API](https://agentoptimizer.github.io/agentopt/api/results/), [response caching](https://agentoptimizer.github.io/agentopt/concepts/caching/), and [custom model pricing](https://agentoptimizer.github.io/agentopt/api/selectors/). diff --git a/docs/api/selectors.md b/docs/api/selectors.md index 1c95f3b..0180357 100644 --- a/docs/api/selectors.md +++ b/docs/api/selectors.md @@ -41,6 +41,11 @@ Returns a [`SelectionResults`](results.md) object. members: false show_bases: false +::: agentopt.model_selection.streaming_random_search.StreamingRandomSearchModelSelector + options: + members: false + show_bases: false + ::: agentopt.model_selection.hill_climbing.HillClimbingModelSelector options: members: false diff --git a/docs/concepts/algorithms.md b/docs/concepts/algorithms.md index 0193e09..87502de 100644 --- a/docs/concepts/algorithms.md +++ b/docs/concepts/algorithms.md @@ -1,6 +1,6 @@ # Selection Algorithms -AgentOpt provides 8 selection algorithms. Choose based on your search space size and evaluation budget. +AgentOpt provides 9 selection algorithms. Choose based on your search space size and evaluation budget. ## At a Glance @@ -8,6 +8,7 @@ AgentOpt provides 8 selection algorithms. Choose based on your search space size |:----------|:---------|:------------|:---------| | [Brute Force](#brute-force) | Exhaustive | All | Small spaces (< 50 combos) | | [Random Search](#random-search) | Sampling | Configurable fraction | Quick baselines | +| [Streaming Random Search](#streaming-random-search) | Streaming sampling | Incremental | Online / incoming batches | | [Hill Climbing](#hill-climbing) | Greedy + restarts | Guided neighbors | Medium spaces | | [Arm Elimination](#arm-elimination) | Progressive pruning | Adaptive | Statistical early stopping | | [Epsilon LUCB](#epsilon-lucb) | ε-optimal LUCB | Adaptive | Cost savings when ε-optimal is enough | @@ -80,6 +81,38 @@ selector = RandomSearchModelSelector( --- +## Streaming Random Search + +Samples a random subset once, then incrementally updates those sampled combinations as new batches arrive. + +```python +from agentopt import StreamingRandomSearchModelSelector + +selector = StreamingRandomSearchModelSelector( + agent=MyAgent, + models=models, + eval_fn=eval_fn, + dataset=warm_start_dataset, # seed batch + sample_fraction=0.25, + seed=42, +) + +selector.select_best() # evaluate warm start once +selector.update(stream_batch_1) # keep updating online +selector.update(stream_batch_2) +best_combo = selector.best_combo() +``` + +| Parameter | Default | Description | +|:----------|:--------|:------------| +| `sample_fraction` | `0.25` | Fraction of combinations sampled once and reused | +| `seed` | `None` | Random seed for reproducible sampling | + +!!! success "When to use" + Streaming / online settings where data arrives over time and you want incremental updates instead of full reselection each round. + +--- + ## Hill Climbing Greedy local search with random restarts. Defines "neighbors" using model quality and speed rankings, so each step is an informed single-model swap. diff --git a/src/agentopt/__init__.py b/src/agentopt/__init__.py index 4232539..e574d16 100644 --- a/src/agentopt/__init__.py +++ b/src/agentopt/__init__.py @@ -21,7 +21,9 @@ DatapointResult, ModelResult, RandomSearchModelSelector, + StreamingRandomSearchModelSelector, SelectionResults, + StreamingBruteForceModelSelector, ThresholdBanditSEModelSelector, ) @@ -35,6 +37,7 @@ "auto": ArmEliminationModelSelector, "brute_force": BruteForceModelSelector, "random": RandomSearchModelSelector, + "streaming_random": StreamingRandomSearchModelSelector, "hill_climbing": HillClimbingModelSelector, "arm_elimination": ArmEliminationModelSelector, "epsilon_lucb": EpsilonLUCBModelSelector, @@ -64,6 +67,7 @@ def ModelSelector( ``"brute_force"``). Other options: ``"brute_force"``, ``"random"``, ``"hill_climbing"``, ``"arm_elimination"``, ``"epsilon_lucb"``, ``"threshold"``, ``"lm_proposal"``, + ``"streaming_random"``, ``"bayesian"``. **kwargs: Additional arguments passed to the selector (e.g. ``epsilon``, ``threshold``, ``sample_fraction``). @@ -95,10 +99,12 @@ def ModelSelector( # Selectors "BruteForceModelSelector", "RandomSearchModelSelector", + "StreamingRandomSearchModelSelector", "HillClimbingModelSelector", "ArmEliminationModelSelector", "EpsilonLUCBModelSelector", "ThresholdBanditSEModelSelector", + "StreamingBruteForceModelSelector", "LMProposalModelSelector", "BayesianOptimizationModelSelector", # Result types diff --git a/src/agentopt/model_selection/__init__.py b/src/agentopt/model_selection/__init__.py index 73ef163..3e4d249 100644 --- a/src/agentopt/model_selection/__init__.py +++ b/src/agentopt/model_selection/__init__.py @@ -7,6 +7,8 @@ from .hill_climbing import HillClimbingModelSelector from .lm_proposal import LMProposalModelSelector from .random_search import RandomSearchModelSelector +from .streaming_brute_force import StreamingBruteForceModelSelector +from .streaming_random_search import StreamingRandomSearchModelSelector from .threshold_successive_elimination import ThresholdBanditSEModelSelector # Bayesian is optional (requires torch/botorch) @@ -24,6 +26,8 @@ "EpsilonLUCBModelSelector", "ThresholdBanditSEModelSelector", "LMProposalModelSelector", + "StreamingBruteForceModelSelector", + "StreamingRandomSearchModelSelector", "BayesianOptimizationModelSelector", "DatapointResult", "ModelResult", diff --git a/src/agentopt/model_selection/streaming_brute_force.py b/src/agentopt/model_selection/streaming_brute_force.py new file mode 100644 index 0000000..ecdc468 --- /dev/null +++ b/src/agentopt/model_selection/streaming_brute_force.py @@ -0,0 +1,277 @@ +""" +Streaming brute-force model selection. + +Evaluates every combination on incoming data batches and keeps cumulative +metrics updated over time. +""" + +import asyncio +from typing import Any, Callable, Dict, List, Optional, Tuple + +from ..base_models import Dataset, EvalFn, ModelCandidate, validate_dataset +from .base import BaseModelSelector, ModelResult, SelectionResults + + +class StreamingBruteForceModelSelector(BaseModelSelector): + """ + Brute-force selector that supports streaming updates. + + Usage: + - call ``update(batch)`` as new labeled data arrives + - call ``results()`` / ``best_combo()`` at any time + - optional: still supports ``select_best()`` over the initial dataset + + Convergence policy (internal defaults, not user-facing): + - best combo must stay unchanged + - best accuracy improvement stays below 2% + - for 10 consecutive batch updates + """ + + _CONVERGENCE_DELTA = 0.02 + _CONVERGENCE_PATIENCE_BATCHES = 10 + + def __init__( + self, + agent: Any, + models: Dict[str, List[ModelCandidate]], + eval_fn: EvalFn, + dataset: Dataset, + model_prices: Optional[Dict[str, Dict[str, float]]] = None, + tracker=None, + ) -> None: + super().__init__( + agent=agent, + models=models, + eval_fn=eval_fn, + dataset=dataset, + model_prices=model_prices, + tracker=tracker, + ) + self._combos: List[Dict[str, ModelCandidate]] = self._all_combos() + self._combo_scores: Dict[int, List[float]] = { + i: [] for i in range(len(self._combos)) + } + self._combo_latencies: Dict[int, List[float]] = { + i: [] for i in range(len(self._combos)) + } + self._combo_dp_ids: Dict[int, List[str]] = { + i: [] for i in range(len(self._combos)) + } + self._seen_samples = 0 + self._seed_consumed = False + self._converged = False + self._stable_batches = 0 + self._best_combo_signature: Optional[Tuple[Tuple[str, str], ...]] = None + self._best_accuracy: Optional[float] = None + + def _run_selection( + self, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + # Keep select_best behavior: evaluate the provided dataset once. + if not self._seed_consumed: + result = self.update(self.dataset, parallel=parallel, max_concurrent=max_concurrent) + self._seed_consumed = True + return result + return self.results() + + def update( + self, batch: Dataset, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + """Evaluate all combinations on a new incoming batch.""" + if self._converged: + print( + "\nStreaming selector converged; skipping new batch. " + "Current best combo is stable." + ) + return self.results() + + validate_dataset(batch) + + if parallel: + asyncio.run(self._update_async(batch, max_concurrent=max_concurrent)) + else: + self._update_sequential(batch) + self._seen_samples += len(batch) + + results = self.results() + self._update_convergence_state(results) + return results + + def update_one( + self, + input_data: Any, + expected_answer: Any, + parallel: bool = False, + max_concurrent: int = 20, + ) -> SelectionResults: + """Convenience helper for a single incoming datapoint.""" + return self.update( + [(input_data, expected_answer)], + parallel=parallel, + max_concurrent=max_concurrent, + ) + + def results(self) -> SelectionResults: + """Return cumulative results over all streamed batches so far.""" + all_results: List[ModelResult] = [] + for idx, combo in enumerate(self._all_combos): + combo_name = self._combo_name(combo) + scores = self._combo_scores[idx] + latencies = self._combo_latencies[idx] + dp_ids = self._combo_dp_ids[idx] + + if scores: + accuracy = sum(scores) / len(scores) + avg_latency = sum(latencies) / len(latencies) + else: + accuracy = 0.0 + avg_latency = 0.0 + + input_tokens, output_tokens = self._fetch_tokens(combo_name) + dp_results = ( + self._build_datapoint_results(scores, latencies, dp_ids) if dp_ids else [] + ) + + all_results.append( + self._make_result( + model_name=combo_name, + accuracy=accuracy, + latency_seconds=avg_latency, + input_tokens=input_tokens, + output_tokens=output_tokens, + attribute="combination", + is_best=False, + datapoint_results=dp_results, + ) + ) + + best_info = self._find_best(all_results) + if best_info is not None: + best_name, _ = best_info + for result in all_results: + if result.model_name == best_name: + result.is_best = True + break + + return SelectionResults(results=all_results) + + def best_combo(self) -> Optional[Dict[str, str]]: + """Return current best combination as node->model dict.""" + return self.results().get_best_combo() + + def has_converged(self) -> bool: + """Whether streaming updates have converged under internal policy.""" + return self._converged + + def should_continue(self) -> bool: + """Whether caller should continue feeding new batches.""" + return not self._converged + + def convergence_state(self) -> Dict[str, Any]: + """Return convergence diagnostics for logging/monitoring.""" + return { + "converged": self._converged, + "stable_batches": self._stable_batches, + "required_stable_batches": self._CONVERGENCE_PATIENCE_BATCHES, + "delta": self._CONVERGENCE_DELTA, + "best_accuracy": self._best_accuracy, + "best_combo": dict(self._best_combo_signature) + if self._best_combo_signature is not None + else None, + } + + def _update_sequential(self, batch: Dataset) -> None: + dp_offset = self._seen_samples + total = len(self._all_combos) + print(f"\nUpdating stream (sequential): {total} combinations, batch={len(batch)}") + + for idx, combo in enumerate(self._all_combos, 1): + combo_name = self._combo_name(combo) + print(f" [{idx}/{total}] {combo_name}") + scores, latencies, dp_ids = self._evaluate_combo( + combo, batch, label=combo_name, dp_offset=dp_offset + ) + self._combo_scores[idx - 1].extend(scores) + self._combo_latencies[idx - 1].extend(latencies) + self._combo_dp_ids[idx - 1].extend(dp_ids) + + async def _update_async(self, batch: Dataset, max_concurrent: int) -> None: + dp_offset = self._seen_samples + batch_size = len(batch) + n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size) + combo_sem = asyncio.Semaphore(n_combo) + total = len(self._all_combos) + print( + f"\nUpdating stream (async): {total} combinations, batch={batch_size}, " + f"max {max_concurrent} total concurrent" + ) + + async def _eval_combo( + idx: int, + combo: Dict[str, ModelCandidate], + ) -> Tuple[int, List[float], List[float], List[str]]: + async with combo_sem: + combo_name = self._combo_name(combo) + scores, latencies, dp_ids = await self._evaluate_combo_async( + combo, + batch, + label=combo_name, + max_concurrent=dp_concurrent, + dp_offset=dp_offset, + ) + return idx, scores, latencies, dp_ids + + results = await asyncio.gather( + *[_eval_combo(idx, combo) for idx, combo in enumerate(self._all_combos)], + return_exceptions=True, + ) + for i, res in enumerate(results): + if isinstance(res, Exception): + combo_name = self._combo_name(self._all_combos[i]) + print(f" [{combo_name}] failed: {res}") + continue + idx, scores, latencies, dp_ids = res + self._combo_scores[idx].extend(scores) + self._combo_latencies[idx].extend(latencies) + self._combo_dp_ids[idx].extend(dp_ids) + + @staticmethod + def _combo_signature(combo: Optional[Dict[str, str]]) -> Optional[Tuple[Tuple[str, str], ...]]: + if combo is None: + return None + return tuple(sorted((str(k), str(v)) for k, v in combo.items())) + + def _update_convergence_state(self, results: SelectionResults) -> None: + best = results.get_best() + if best is None: + return + + combo_sig = self._combo_signature(results.get_best_combo()) + if combo_sig is None: + return + + current_acc = best.accuracy + if self._best_combo_signature is None or self._best_accuracy is None: + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + self._stable_batches = 0 + return + + combo_unchanged = combo_sig == self._best_combo_signature + improvement = current_acc - self._best_accuracy + + if combo_unchanged and improvement < self._CONVERGENCE_DELTA: + self._stable_batches += 1 + else: + self._stable_batches = 0 + + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + + if self._stable_batches >= self._CONVERGENCE_PATIENCE_BATCHES: + self._converged = True + print( + "\nStreaming selector converged: best combo stable for " + f"{self._stable_batches} batches with < " + f"{self._CONVERGENCE_DELTA:.0%} accuracy improvement." + ) diff --git a/src/agentopt/model_selection/streaming_random_search.py b/src/agentopt/model_selection/streaming_random_search.py new file mode 100644 index 0000000..aa082d2 --- /dev/null +++ b/src/agentopt/model_selection/streaming_random_search.py @@ -0,0 +1,293 @@ +""" +Streaming random-search model selection. + +Evaluates a fixed random subset of combinations on incoming data batches and +keeps cumulative metrics updated over time. +""" + +import asyncio +import math +import random +from typing import Any, Dict, List, Optional, Tuple + +from ..base_models import Dataset, EvalFn, ModelCandidate, validate_dataset +from .base import BaseModelSelector, ModelResult, SelectionResults + + +class StreamingRandomSearchModelSelector(BaseModelSelector): + """ + Random-search selector that supports streaming updates. + + A subset of combinations is sampled once at initialization and reused for + all incoming batches. + """ + + _CONVERGENCE_DELTA = 0.02 + _CONVERGENCE_PATIENCE_BATCHES = 10 + + def __init__( + self, + agent: Any = None, + models: Dict[str, List[ModelCandidate]] = None, + eval_fn: EvalFn = None, + dataset: Dataset = None, + sample_fraction: float = 0.25, + seed: Optional[int] = None, + model_prices: Optional[Dict[str, Dict[str, float]]] = None, + tracker=None, + ) -> None: + super().__init__( + agent=agent, + models=models, + eval_fn=eval_fn, + dataset=dataset, + model_prices=model_prices, + tracker=tracker, + ) + if not 0 < sample_fraction <= 1: + raise ValueError("sample_fraction must be in the range (0, 1].") + + self.sample_fraction = sample_fraction + self.seed = seed + + self._all_combos: List[Dict[str, ModelCandidate]] = self._all_combos() + total = len(self._all_combos) + sample_size = max(1, math.ceil(total * self.sample_fraction)) + sample_size = min(sample_size, total) + + if sample_size == total: + self._sampled_indices = list(range(total)) + else: + rng = random.Random(self.seed) + self._sampled_indices = sorted(rng.sample(range(total), sample_size)) + self._sampled_combos = [self._all_combos[i] for i in self._sampled_indices] + + self._combo_scores: Dict[int, List[float]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._combo_latencies: Dict[int, List[float]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._combo_dp_ids: Dict[int, List[str]] = { + i: [] for i in range(len(self._sampled_combos)) + } + self._seen_samples = 0 + self._seed_consumed = False + + self._converged = False + self._stable_batches = 0 + self._best_combo_signature: Optional[Tuple[Tuple[str, str], ...]] = None + self._best_accuracy: Optional[float] = None + + def _run_selection( + self, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + if not self._seed_consumed: + result = self.update(self.dataset, parallel=parallel, max_concurrent=max_concurrent) + self._seed_consumed = True + return result + return self.results() + + def update( + self, batch: Dataset, parallel: bool = False, max_concurrent: int = 20, + ) -> SelectionResults: + """Evaluate sampled combinations on a new incoming batch.""" + if self._converged: + print( + "\nStreaming random search converged; skipping new batch. " + "Current best combo is stable." + ) + return self.results() + + validate_dataset(batch) + if parallel: + asyncio.run(self._update_async(batch, max_concurrent=max_concurrent)) + else: + self._update_sequential(batch) + self._seen_samples += len(batch) + + results = self.results() + self._update_convergence_state(results) + return results + + def update_one( + self, + input_data: Any, + expected_answer: Any, + parallel: bool = False, + max_concurrent: int = 20, + ) -> SelectionResults: + return self.update( + [(input_data, expected_answer)], + parallel=parallel, + max_concurrent=max_concurrent, + ) + + def results(self) -> SelectionResults: + all_results: List[ModelResult] = [] + for idx, combo in enumerate(self._sampled_combos): + combo_name = self._combo_name(combo) + scores = self._combo_scores[idx] + latencies = self._combo_latencies[idx] + dp_ids = self._combo_dp_ids[idx] + + if scores: + accuracy = sum(scores) / len(scores) + avg_latency = sum(latencies) / len(latencies) + else: + accuracy, avg_latency = 0.0, 0.0 + + input_tokens, output_tokens = self._fetch_tokens(combo_name) + dp_results = ( + self._build_datapoint_results(scores, latencies, dp_ids) if dp_ids else [] + ) + all_results.append( + self._make_result( + model_name=combo_name, + accuracy=accuracy, + latency_seconds=avg_latency, + input_tokens=input_tokens, + output_tokens=output_tokens, + attribute="combination", + is_best=False, + datapoint_results=dp_results, + ) + ) + + best_info = self._find_best(all_results) + if best_info is not None: + best_name, _ = best_info + for result in all_results: + if result.model_name == best_name: + result.is_best = True + break + + return SelectionResults(results=all_results) + + def best_combo(self) -> Optional[Dict[str, str]]: + return self.results().get_best_combo() + + def has_converged(self) -> bool: + return self._converged + + def should_continue(self) -> bool: + return not self._converged + + def convergence_state(self) -> Dict[str, Any]: + return { + "converged": self._converged, + "stable_batches": self._stable_batches, + "required_stable_batches": self._CONVERGENCE_PATIENCE_BATCHES, + "delta": self._CONVERGENCE_DELTA, + "best_accuracy": self._best_accuracy, + "best_combo": dict(self._best_combo_signature) + if self._best_combo_signature is not None + else None, + "sampled_combinations": len(self._sampled_combos), + "sample_fraction": self.sample_fraction, + } + + def _update_sequential(self, batch: Dataset) -> None: + dp_offset = self._seen_samples + total = len(self._sampled_combos) + print( + f"\nUpdating stream random-search (sequential): " + f"{total}/{len(self._all_combos)} combinations, batch={len(batch)}" + ) + + for idx, combo in enumerate(self._sampled_combos, 1): + combo_name = self._combo_name(combo) + print(f" [{idx}/{total}] {combo_name}") + scores, latencies, dp_ids = self._evaluate_combo( + combo, batch, label=combo_name, dp_offset=dp_offset + ) + self._combo_scores[idx - 1].extend(scores) + self._combo_latencies[idx - 1].extend(latencies) + self._combo_dp_ids[idx - 1].extend(dp_ids) + + async def _update_async(self, batch: Dataset, max_concurrent: int) -> None: + dp_offset = self._seen_samples + batch_size = len(batch) + n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size) + combo_sem = asyncio.Semaphore(n_combo) + total = len(self._sampled_combos) + print( + f"\nUpdating stream random-search (async): " + f"{total}/{len(self._all_combos)} combinations, batch={batch_size}, " + f"max {max_concurrent} total concurrent" + ) + + async def _eval_combo( + idx: int, + combo: Dict[str, ModelCandidate], + ) -> Tuple[int, List[float], List[float], List[str]]: + async with combo_sem: + combo_name = self._combo_name(combo) + scores, latencies, dp_ids = await self._evaluate_combo_async( + combo, + batch, + label=combo_name, + max_concurrent=dp_concurrent, + dp_offset=dp_offset, + ) + return idx, scores, latencies, dp_ids + + results = await asyncio.gather( + *[ + _eval_combo(idx, combo) + for idx, combo in enumerate(self._sampled_combos) + ], + return_exceptions=True, + ) + for i, res in enumerate(results): + if isinstance(res, Exception): + combo_name = self._combo_name(self._sampled_combos[i]) + print(f" [{combo_name}] failed: {res}") + continue + idx, scores, latencies, dp_ids = res + self._combo_scores[idx].extend(scores) + self._combo_latencies[idx].extend(latencies) + self._combo_dp_ids[idx].extend(dp_ids) + + @staticmethod + def _combo_signature( + combo: Optional[Dict[str, str]], + ) -> Optional[Tuple[Tuple[str, str], ...]]: + if combo is None: + return None + return tuple(sorted((str(k), str(v)) for k, v in combo.items())) + + def _update_convergence_state(self, results: SelectionResults) -> None: + best = results.get_best() + if best is None: + return + + combo_sig = self._combo_signature(results.get_best_combo()) + if combo_sig is None: + return + + current_acc = best.accuracy + if self._best_combo_signature is None or self._best_accuracy is None: + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + self._stable_batches = 0 + return + + combo_unchanged = combo_sig == self._best_combo_signature + improvement = current_acc - self._best_accuracy + + if combo_unchanged and improvement < self._CONVERGENCE_DELTA: + self._stable_batches += 1 + else: + self._stable_batches = 0 + + self._best_combo_signature = combo_sig + self._best_accuracy = current_acc + + if self._stable_batches >= self._CONVERGENCE_PATIENCE_BATCHES: + self._converged = True + print( + "\nStreaming random search converged: best combo stable for " + f"{self._stable_batches} batches with < " + f"{self._CONVERGENCE_DELTA:.0%} accuracy improvement." + ) diff --git a/tests/streaming_brute_force_fake_data.py b/tests/streaming_brute_force_fake_data.py new file mode 100644 index 0000000..4c98d3a --- /dev/null +++ b/tests/streaming_brute_force_fake_data.py @@ -0,0 +1,111 @@ +""" +Run StreamingBruteForceModelSelector on fake streaming data. + +This script does not make external API calls; it uses a deterministic fake +agent so you can validate streaming update behavior quickly. +""" + +from __future__ import annotations + +import argparse +from random import Random +from typing import Any, Dict, List, Sequence, Tuple + +from agentopt import StreamingBruteForceModelSelector + + +def build_fake_stream(total_samples: int, seed: int) -> List[Tuple[Dict[str, int], int]]: + """Create a simple synthetic stream: expected output is exactly x.""" + rng = Random(seed) + stream: List[Tuple[Dict[str, int], int]] = [] + for _ in range(total_samples): + x = rng.randint(0, 100) + stream.append(({"x": x}, x)) + return stream + + +def agent_fn(combo: Dict[str, Any]): + model_name = combo["agent"] + + def _run(input_data: Dict[str, int]) -> int: + x = input_data["x"] + # Deterministic fake model behavior: + # - "good": always correct + # - "noisy": occasionally wrong + # - "bad": always off by +1 + if model_name == "good": + return x + if model_name == "noisy": + return x if (x % 5 != 0) else (x + 1) + return x + 1 + + return _run + + +def eval_fn(expected: int, actual: int) -> float: + return 1.0 if expected == actual else 0.0 + + +def batches_of( + stream: Sequence[Tuple[Dict[str, int], int]], batch_size: int +) -> List[List[Tuple[Dict[str, int], int]]]: + return [list(stream[i : i + batch_size]) for i in range(0, len(stream), batch_size)] + + +def run(total: int, warm_start: int, batch_size: int, seed: int) -> None: + if warm_start < 1: + raise ValueError("warm_start must be >= 1") + if batch_size < 1: + raise ValueError("batch_size must be >= 1") + if total <= warm_start: + raise ValueError("total must be greater than warm_start") + + stream = build_fake_stream(total, seed) + seed_dataset = stream[:warm_start] + incoming = stream[warm_start:] + + selector = StreamingBruteForceModelSelector( + agent_fn=agent_fn, + models={"agent": ["good", "noisy", "bad"]}, + eval_fn=eval_fn, + dataset=seed_dataset, + ) + + print("== Initial seed evaluation ==") + initial_results = selector.select_best() + print(initial_results) + print(f"Initial best: {initial_results.get_best_combo()}") + + print("\n== Streaming updates ==") + for i, batch in enumerate(batches_of(incoming, batch_size), 1): + results = selector.update(batch, parallel=True, max_concurrent=20) + best = results.get_best() + print( + f"Batch {i}: size={len(batch)} " + f"best={results.get_best_combo()} " + f"accuracy={best.accuracy:.3f} " + f"latency={best.latency_seconds:.4f}s" + ) + + print("\n== Final cumulative ranking ==") + print(selector.results()) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Test streaming model selection on fake data.") + parser.add_argument("--total", type=int, default=60, help="Total fake samples.") + parser.add_argument("--warm-start", type=int, default=12, help="Initial seed samples.") + parser.add_argument("--batch-size", type=int, default=8, help="Incoming batch size.") + parser.add_argument("--seed", type=int, default=7, help="Random seed.") + args = parser.parse_args() + + run( + total=args.total, + warm_start=args.warm_start, + batch_size=args.batch_size, + seed=args.seed, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/test_streaming_brute_force.py b/tests/test_streaming_brute_force.py new file mode 100644 index 0000000..880e9cd --- /dev/null +++ b/tests/test_streaming_brute_force.py @@ -0,0 +1,55 @@ +from agentopt.model_selection.streaming_brute_force import ( + StreamingBruteForceModelSelector, +) + + +def _agent_fn(combo): + model_name = combo["agent"] + + def _run(input_data): + x = input_data["x"] + return x if model_name == "good" else x + 1 + + return _run + + +def _eval_fn(expected, actual): + return 1.0 if expected == actual else 0.0 + + +def test_streaming_update_accumulates_and_tracks_best(): + selector = StreamingBruteForceModelSelector( + agent_fn=_agent_fn, + models={"agent": ["good", "bad"]}, + eval_fn=_eval_fn, + dataset=[({"x": 0}, 0)], + ) + + selector.update([({"x": 1}, 1)]) + selector.update([({"x": 2}, 2)]) + results = selector.results() + + by_name = {r.model_name: r for r in results} + assert by_name["agent=good"].accuracy == 1.0 + assert by_name["agent=bad"].accuracy == 0.0 + assert by_name["agent=good"].is_best + assert len(by_name["agent=good"].datapoint_results) == 2 + assert len(by_name["agent=bad"].datapoint_results) == 2 + + +def test_select_best_consumes_seed_dataset_once(): + selector = StreamingBruteForceModelSelector( + agent_fn=_agent_fn, + models={"agent": ["good", "bad"]}, + eval_fn=_eval_fn, + dataset=[({"x": 3}, 3)], + ) + + first = selector.select_best() + second = selector.select_best() + + first_by_name = {r.model_name: r for r in first} + second_by_name = {r.model_name: r for r in second} + + assert len(first_by_name["agent=good"].datapoint_results) == 1 + assert len(second_by_name["agent=good"].datapoint_results) == 1 diff --git a/tests/test_streaming_random_search.py b/tests/test_streaming_random_search.py new file mode 100644 index 0000000..1bd5e85 --- /dev/null +++ b/tests/test_streaming_random_search.py @@ -0,0 +1,88 @@ +"""Unit tests for StreamingRandomSearchModelSelector.""" + +from agentopt import ModelSelector, StreamingRandomSearchModelSelector + + +class DummyAgent: + """Simple deterministic agent for selector tests.""" + + def __init__(self, models): + self.models = models + + def run(self, _input_data): + return f"{self.models['planner']}|{self.models['solver']}" + + +def exact_match(expected, actual): + return float(expected == actual) + + +def test_streaming_random_sampling_size(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={ + "planner": ["p1", "p2", "p3"], + "solver": ["s1", "s2"], + }, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + sample_fraction=0.5, + seed=7, + ) + # 3 * 2 = 6 combos; sample_fraction=0.5 => ceil(3) = 3 sampled combos. + assert len(selector._sampled_combos) == 3 + assert len(selector._all_combos) == 6 + + +def test_streaming_random_updates_best_combo_over_time(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={"planner": ["p1", "p2"], "solver": ["s1", "s2"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + sample_fraction=1.0, + seed=3, + ) + + initial = selector.select_best() + assert initial.get_best_combo() == {"planner": "p1", "solver": "s1"} + + # Stream in data that favors a different combination. + selector.update( + [ + ({"q": "b1"}, "p2|s2"), + ({"q": "b2"}, "p2|s2"), + ({"q": "b3"}, "p2|s2"), + ] + ) + best_after = selector.results().get_best_combo() + assert best_after == {"planner": "p2", "solver": "s2"} + + +def test_streaming_random_converges_after_stable_batches(): + selector = StreamingRandomSearchModelSelector( + agent=DummyAgent, + models={"planner": ["p1", "p2"], "solver": ["s1", "s2"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p2|s2")], + sample_fraction=1.0, + seed=11, + ) + + selector.select_best() + for _ in range(10): + selector.update_one({"q": "stream"}, "p2|s2") + + assert selector.has_converged() is True + assert selector.should_continue() is False + + +def test_model_selector_factory_supports_streaming_random(): + selector = ModelSelector( + agent=DummyAgent, + models={"planner": ["p1"], "solver": ["s1"]}, + eval_fn=exact_match, + dataset=[({"q": "seed"}, "p1|s1")], + method="streaming_random", + ) + assert isinstance(selector, StreamingRandomSearchModelSelector)