diff --git a/lefthook.yml b/lefthook.yml index ec4abc75c..b1429ba9f 100644 --- a/lefthook.yml +++ b/lefthook.yml @@ -57,6 +57,16 @@ pre-commit: run: uv run nbstripout {files} stage_fixed: true + # Demos: Auto-generate .ipynb from .py files with cell markers + demos-notebook-gen: + tags: + - jupyter + - generate + glob: + - "demos/**/*.py" + run: uv run python scripts/py_to_ipynb.py --force {files} + stage_fixed: true + # TUI: TypeScript typecheck tui-typecheck: tags: diff --git a/scripts/py_to_ipynb.py b/scripts/py_to_ipynb.py new file mode 100644 index 000000000..eb689240a --- /dev/null +++ b/scripts/py_to_ipynb.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +"""Convert demo .py files with percent markers to .ipynb notebooks. + +Only processes .py files under the repo's demos/ directory that include +Jupytext-style cell markers (lines starting with "# %%"). + +Usage: + uv run python scripts/py_to_ipynb.py demos/gepa_banking77/run_banking77.py + uv run python scripts/py_to_ipynb.py demos/ # all demo notebooks + uv run python scripts/py_to_ipynb.py --force demos/ # overwrite existing +""" + +import argparse +import subprocess +from pathlib import Path + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[1] + + +def _demos_dir() -> Path: + return _repo_root() / "demos" + + +def _is_demo_py(path: Path) -> bool: + if path.suffix != ".py": + return False + try: + path.resolve().relative_to(_demos_dir().resolve()) + except ValueError: + return False + return True + + +def _has_ipynb_markers(py_path: Path) -> bool: + try: + for line in py_path.read_text(encoding="utf-8").splitlines(): + if line.lstrip().startswith("# %%"): + return True + except Exception: + return False + return False + + +def convert_file(py_path: Path, force: bool = False) -> bool: + """Convert a single .py file to .ipynb.""" + if not _is_demo_py(py_path): + return False + if not _has_ipynb_markers(py_path): + return False # Not a notebook-style .py file + + ipynb_path = py_path.with_suffix(".ipynb") + + if ipynb_path.exists() and not force: + print(f"Skipping {py_path} (.ipynb exists, use --force)") + return False + + # Delete existing file first (jupytext overwrite is unreliable) + if ipynb_path.exists(): + ipynb_path.unlink() + + result = subprocess.run( + ["jupytext", "--to", "notebook", str(py_path)], + capture_output=True, + text=True, + ) + + if result.returncode != 0: + print(f"Error: {py_path}: {result.stderr}") + return False + + print(f"{py_path} -> {ipynb_path}") + return True + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("paths", nargs="+") + parser.add_argument("--force", "-f", action="store_true") + args = parser.parse_args() + + converted = 0 + for path in map(Path, args.paths): + if path.is_dir(): + files = path.rglob("*.py") + else: + files = [path] + for f in files: + if convert_file(f, args.force): + converted += 1 + + print(f"Converted {converted} file(s)") + + +if __name__ == "__main__": + main() diff --git a/synth_ai/__init__.py b/synth_ai/__init__.py index c5d8e28a4..770e814ad 100644 --- a/synth_ai/__init__.py +++ b/synth_ai/__init__.py @@ -83,6 +83,7 @@ create_task_app, ) from synth_ai.sdk.optimization import GraphOptimizationJob, PolicyOptimizationJob + from synth_ai.sdk.utils import confidence_band, split_seed_slices, stratified_seed_sample # Legacy aliases PromptLearningJob = PolicyOptimizationJob @@ -118,6 +119,10 @@ "VerifierTracePayload", "ReviewPayload", "CriterionScorePayload", + # Utils + "confidence_band", + "split_seed_slices", + "stratified_seed_sample", ] # Lazy loading map: name -> (module, attribute) @@ -149,6 +154,10 @@ "VerifierTracePayload": ("synth_ai.sdk.graphs.verifier_schemas", "VerifierTracePayload"), "ReviewPayload": ("synth_ai.sdk.graphs.verifier_schemas", "ReviewPayload"), "CriterionScorePayload": ("synth_ai.sdk.graphs.verifier_schemas", "CriterionScorePayload"), + # Utils + "confidence_band": ("synth_ai.sdk.utils", "confidence_band"), + "split_seed_slices": ("synth_ai.sdk.utils", "split_seed_slices"), + "stratified_seed_sample": ("synth_ai.sdk.utils", "stratified_seed_sample"), } diff --git a/synth_ai/core/streaming/config.py b/synth_ai/core/streaming/config.py index 3bc10bf93..625110b6e 100644 --- a/synth_ai/core/streaming/config.py +++ b/synth_ai/core/streaming/config.py @@ -20,6 +20,7 @@ class StreamConfig: sample_rate: float = 1.0 max_events_per_poll: int | None = None deduplicate: bool = True + dedupe_events: bool = True @classmethod def default(cls) -> StreamConfig: diff --git a/synth_ai/core/streaming/handlers.py b/synth_ai/core/streaming/handlers.py index 5bda458a7..756be6db7 100644 --- a/synth_ai/core/streaming/handlers.py +++ b/synth_ai/core/streaming/handlers.py @@ -82,6 +82,10 @@ def should_handle(self, message: StreamMessage) -> bool: # pragma: no cover - t """Predicate allowing handlers to filter messages before processing.""" return True + def wants_event_backfill(self) -> bool: # pragma: no cover - optional + """Whether the streamer should backfill events via polling.""" + return False + def flush(self) -> None: # pragma: no cover - optional """Flush buffered output.""" return None diff --git a/synth_ai/core/streaming/streamer.py b/synth_ai/core/streaming/streamer.py index daa511819..f5fbf476e 100644 --- a/synth_ai/core/streaming/streamer.py +++ b/synth_ai/core/streaming/streamer.py @@ -17,7 +17,12 @@ from .handlers import StreamHandler from .types import StreamMessage, StreamType +logger = logging.getLogger(__name__) + TERMINAL_STATUSES = {"succeeded", "failed", "cancelled", "canceled", "completed"} +TERMINAL_STATUS_GRACE_SECONDS = 6.0 +TERMINAL_HANDLER_GRACE_SECONDS = 8.0 +STALE_STATUS_TIMEOUT_SECONDS = 150.0 # 2.5 minutes with no progress = stale # Terminal success events - canonical format only # Format: .... @@ -57,6 +62,36 @@ } +def is_terminal_success_event(event_type: str) -> bool: + """Check if event_type indicates terminal success. + + Uses both exact matching against TERMINAL_EVENT_SUCCESS and flexible + suffix matching for `job.completed` patterns. + """ + event_type = event_type.lower() + if event_type in TERMINAL_EVENT_SUCCESS: + return True + # Flexible matching: any event ending with job.completed + if event_type.endswith("job.completed") or event_type.endswith(".job.completed"): + return True + return False + + +def is_terminal_failure_event(event_type: str) -> bool: + """Check if event_type indicates terminal failure. + + Uses both exact matching against TERMINAL_EVENT_FAILURE and flexible + suffix matching for `job.failed` patterns. + """ + event_type = event_type.lower() + if event_type in TERMINAL_EVENT_FAILURE: + return True + # Flexible matching: any event ending with job.failed + if event_type.endswith("job.failed") or event_type.endswith(".job.failed"): + return True + return False + + def check_terminal_event_typed(event_data: dict[str, Any]) -> tuple[bool, str | None]: """Check if an event is terminal using the typed event system. @@ -161,17 +196,23 @@ def rl(cls, job_id: str) -> StreamEndpoints: def graph_evolve(cls, job_id: str) -> StreamEndpoints: """Endpoints for Graph Evolve workflow optimization jobs. - Prefer /api/graph_evolve/jobs/{job_id} with legacy /api/graphgen fallbacks. + Prefer /api/graph-evolve/jobs/{job_id} with legacy /api/graph_evolve and /api/graphgen fallbacks. """ - base = f"/graph_evolve/jobs/{job_id}" + base = f"/graph-evolve/jobs/{job_id}" return cls( status=base, events=f"{base}/events", metrics=f"{base}/metrics", timeline=None, - status_fallbacks=(f"/graphgen/jobs/{job_id}",), - event_fallbacks=(f"/graphgen/jobs/{job_id}/events",), - metric_fallbacks=(f"/graphgen/jobs/{job_id}/metrics",), + status_fallbacks=(f"/graph_evolve/jobs/{job_id}", f"/graphgen/jobs/{job_id}"), + event_fallbacks=( + f"/graph_evolve/jobs/{job_id}/events", + f"/graphgen/jobs/{job_id}/events", + ), + metric_fallbacks=( + f"/graph_evolve/jobs/{job_id}/metrics", + f"/graphgen/jobs/{job_id}/metrics", + ), ) @classmethod @@ -212,6 +253,7 @@ def __init__( http_timeout: float = 60.0, http_client: RustCoreHttpClient | None = None, sleep_fn=sleep, + debug: bool = False, ) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key @@ -224,6 +266,7 @@ def __init__( self.http_timeout = http_timeout self._http = http_client self._sleep = sleep_fn + self.debug = debug status_sources: list[str | None] = [self.endpoints.status] status_sources.extend(self.endpoints.status_fallbacks) @@ -248,6 +291,11 @@ def __init__( self._last_status_value: str | None = None self._terminal_seen = False self._terminal_event_status: str | None = None + self._consecutive_terminal_status_polls = 0 + self._terminal_status_seen_at: float | None = None + self._terminal_status_value: str | None = None + self._force_event_backfill = False + self._last_progress_time: float = time.time() # Track last meaningful progress if not self.handlers: from .handlers import CLIHandler @@ -284,22 +332,24 @@ async def sse_reader(): sse_done.set() async def status_poller(): - """Periodically poll status while SSE stream is active.""" + """Periodically poll status while SSE stream is active. + + Note: Status polling can terminate the stream if the status + endpoint reports a terminal state. + """ while not sse_done.is_set() and not self._terminal_seen: await asyncio.sleep(2.0) # Check every 2 seconds if self._terminal_seen or sse_done.is_set(): break - status = await self._refresh_status(http) + await self._refresh_status(http) metric_messages = await self._poll_metrics(http) timeline_messages = await self._poll_timeline(http) self._dispatch(metric_messages + timeline_messages) - - # Check for terminal status - if status and status.lower() in TERMINAL_STATUSES: - self._terminal_seen = True - break + if self._force_event_backfill: + event_messages = await self._poll_events(http) + self._dispatch(event_messages) # Start both tasks concurrently sse_task = asyncio.create_task(sse_reader()) @@ -315,19 +365,35 @@ async def status_poller(): # No event received, check if SSE is done or terminal if sse_done.is_set() or self._terminal_seen: break + # Check for stale status (no progress for too long) + if time.time() - self._last_progress_time > STALE_STATUS_TIMEOUT_SECONDS: + logger.warning( + "No progress for %.0f seconds - treating as stale/failed", + STALE_STATUS_TIMEOUT_SECONDS, + ) + self._terminal_seen = True + self._terminal_event_status = "failed" + break continue # Handle exception from SSE reader if isinstance(item, Exception): raise item + # Update progress time - we received an event + self._last_progress_time = time.time() + # Process event self._dispatch([item]) + self._update_backfill_from_handlers() # Poll metrics/timeline after each event metric_messages = await self._poll_metrics(http) timeline_messages = await self._poll_timeline(http) self._dispatch(metric_messages + timeline_messages) + if self._force_event_backfill: + event_messages = await self._poll_events(http) + self._dispatch(event_messages) # Check for terminal status if self._terminal_seen: @@ -340,58 +406,87 @@ async def status_poller(): await asyncio.gather(sse_task, status_task, return_exceptions=True) # If SSE ended before terminal status, fall back to polling if not self._terminal_seen: - print( - "[SDK] SSE stream ended before terminal status; continuing with status polling.", - flush=True, + logger.debug( + "SSE stream ended before terminal status; continuing with status polling." ) while not self._terminal_seen: if ( self.timeout_seconds is not None and time.time() - start_time > self.timeout_seconds ): - print( - "[SDK] Stream timeout reached while waiting for terminal status.", - flush=True, + logger.debug( + "Stream timeout reached while waiting for terminal event." ) break status = await self._refresh_status(http) if status: - print( - f"[SDK] Status polling: {status} (elapsed={time.time() - start_time:.1f}s)", - flush=True, + logger.debug( + "Status polling: %s (elapsed=%.1fs)", status, time.time() - start_time ) + # Poll events - terminal events (job.completed) will set _terminal_seen + event_messages = await self._poll_events(http) metric_messages = await self._poll_metrics(http) timeline_messages = await self._poll_timeline(http) - self._dispatch(metric_messages + timeline_messages) - if status and status.lower() in TERMINAL_STATUSES: + + # Update progress time if we received any messages + all_messages = event_messages + metric_messages + timeline_messages + if all_messages: + self._last_progress_time = time.time() + + self._dispatch(all_messages) + self._update_backfill_from_handlers() + + # Check for stale status (no progress for too long) + if time.time() - self._last_progress_time > STALE_STATUS_TIMEOUT_SECONDS: + logger.warning( + "No progress for %.0f seconds - treating as stale/failed", + STALE_STATUS_TIMEOUT_SECONDS, + ) self._terminal_seen = True + self._terminal_event_status = "failed" break + + # _terminal_seen is set by _poll_events/_dispatch when terminal event received await self._sleep(self.interval_seconds) else: # No SSE endpoint available - use polling for events - while True: - status = await self._refresh_status(http) - - # Check status FIRST before polling events/metrics - if status and status.lower() in TERMINAL_STATUSES: - self._terminal_seen = True - break - if self._terminal_seen: + # Terminal state is determined by terminal EVENTS, not status endpoint + while not self._terminal_seen: + if ( + self.timeout_seconds is not None + and time.time() - start_time > self.timeout_seconds + ): + logger.debug( + "Stream timeout reached while waiting for terminal event." + ) break + await self._refresh_status(http) + + # Poll events - terminal events (job.completed) will set _terminal_seen event_messages = await self._poll_events(http) metric_messages = await self._poll_metrics(http) timeline_messages = await self._poll_timeline(http) - self._dispatch(event_messages + metric_messages + timeline_messages) + # Update progress time if we received any messages + all_messages = event_messages + metric_messages + timeline_messages + if all_messages: + self._last_progress_time = time.time() - # Check again after polling (terminal events might have been received) - if self._terminal_seen: - break - if status and status.lower() in TERMINAL_STATUSES: + self._dispatch(all_messages) + self._update_backfill_from_handlers() + + # Check for stale status (no progress for too long) + if time.time() - self._last_progress_time > STALE_STATUS_TIMEOUT_SECONDS: + logger.warning( + "No progress for %.0f seconds - treating as stale/failed", + STALE_STATUS_TIMEOUT_SECONDS, + ) self._terminal_seen = True + self._terminal_event_status = "failed" break + # _terminal_seen is set by _poll_events/_dispatch when terminal event received await self._sleep(self.interval_seconds) for handler in self.handlers: @@ -399,6 +494,10 @@ async def status_poller(): handler.flush() final_status = self._terminal_event_status or self._last_status_value or "unknown" + if self.debug: + print(f"[STREAM DEBUG] Stream exiting: terminal_seen={self._terminal_seen}, " + f"terminal_event_status={self._terminal_event_status}, " + f"last_status_value={self._last_status_value}, final={final_status}") if self._last_status_payload: self._last_status_payload["status"] = final_status return self._last_status_payload @@ -428,32 +527,98 @@ async def _stream_events_sse(self, sse_url: str) -> AsyncIterator[StreamMessage] if last_seq > 0: headers["Last-Event-ID"] = str(last_seq) - print(f"[DEBUG] SSE stream connecting to {url}", file=sys.stderr) - event_count = 0 - start_time = time.time() - no_events_warning_printed = False - - async for event_data in stream_sse_events(url, headers=headers, timeout=None): - if not event_data: - continue - if event_count == 0 and not no_events_warning_printed: - elapsed = time.time() - start_time - if elapsed > 10: + # Create a separate session for SSE (long-lived connection) + timeout = aiohttp.ClientTimeout(total=None) # No timeout for SSE + async with ( + aiohttp.ClientSession(headers=headers, timeout=timeout) as session, + session.get(url) as resp, + ): + if resp.status != 200: + raise Exception(f"SSE endpoint returned {resp.status}: {await resp.text()}") + + print(f"[DEBUG] SSE stream connected to {url}, status={resp.status}", file=sys.stderr) + buffer = "" + event_count = 0 + last_event_time = time.time() + no_events_warning_printed = False + + # Read SSE stream in chunks and parse events + async for chunk in resp.content.iter_chunked(8192): + current_time = time.time() + + # Warn if no events received for 10 seconds (events should be streaming) + if ( + event_count == 1 + and current_time - last_event_time > 10 + and not no_events_warning_printed + ): print( "[DEBUG] WARNING: No events received via SSE for 10s after connection. Backend may not be publishing to Redis (check SSE_USE_REDIS env var).", file=sys.stderr, ) no_events_warning_printed = True - event_count += 1 - print( - f"[DEBUG] Parsed SSE event #{event_count}: type={event_data.get('type')}, seq={event_data.get('seq')}", - file=sys.stderr, - ) - event_type = str(event_data.get("type", "")).lower() - if event_type == "sse.stream.ended": - print("[DEBUG] SSE stream received sse.stream.ended event", file=sys.stderr) - return + buffer += chunk.decode("utf-8", errors="ignore") + + # SSE events are separated by double newlines + while "\n\n" in buffer: + event_block, buffer = buffer.split("\n\n", 1) + event_block = event_block.strip() + + if not event_block: + continue + + event_data = {} + + # Parse SSE event block line by line + for event_line in event_block.split("\n"): + event_line = event_line.strip() + if not event_line or event_line.startswith(":"): + continue # Skip comments/empty lines + if event_line.startswith("id:"): + pass # SSE event id (not currently used) + elif event_line.startswith("event:"): + pass # SSE event type (not currently used) + elif event_line.startswith("data:"): + data_str = event_line[5:].strip() + if data_str == "[DONE]": + print( + "[DEBUG] SSE stream received [DONE] - stream completed gracefully", + file=sys.stderr, + ) + # Stream ended gracefully - if we already saw terminal event, + # the job is complete. Otherwise, we'll fall back to polling. + return + try: + event_data = json.loads(data_str) + except json.JSONDecodeError as e: + print( + f"[DEBUG] Failed to parse SSE data: {e}, data={data_str[:200]}", + file=sys.stderr, + ) + continue + + # Debug: log what we parsed + if event_data: + event_count += 1 + last_event_time = time.time() + print( + f"[DEBUG] Parsed SSE event #{event_count}: type={event_data.get('type')}, seq={event_data.get('seq')}", + file=sys.stderr, + ) + + if event_data and "type" in event_data: + event_type = str(event_data.get("type", "")).lower() + + # Handle stream lifecycle events from backend + if event_type == "sse.stream.ended": + # Backend signaled stream end - this is a graceful termination + print( + "[DEBUG] SSE stream received sse.stream.ended event", + file=sys.stderr, + ) + # Don't yield this event, just return + return event_job_id = event_data.get("job_id") or self.job_id msg = StreamMessage.from_event(event_job_id, event_data) @@ -470,35 +635,39 @@ async def _stream_events_sse(self, sse_url: str) -> AsyncIterator[StreamMessage] except (TypeError, ValueError): pass - if event_type in TERMINAL_EVENT_SUCCESS: - self._terminal_seen = True - self._terminal_event_status = "succeeded" - print( - f"[DEBUG] Terminal success event detected: {event_type}", - file=sys.stderr, - ) - elif event_type in TERMINAL_EVENT_FAILURE: - self._terminal_seen = True - self._terminal_event_status = "failed" - print( - f"[DEBUG] Terminal failure event detected: {event_type}", - file=sys.stderr, - ) + # Check for terminal events + if event_type in TERMINAL_EVENT_SUCCESS: + self._terminal_seen = True + self._terminal_event_status = "succeeded" + print( + f"[DEBUG] Terminal success event detected: {event_type}", + file=sys.stderr, + ) + elif event_type in TERMINAL_EVENT_FAILURE: + self._terminal_seen = True + self._terminal_event_status = "failed" + print( + f"[DEBUG] Terminal failure event detected: {event_type}", + file=sys.stderr, + ) - event_status = str(event_data.get("data", {}).get("status", "")).lower() - if event_status in TERMINAL_STATUSES and not self._terminal_seen: - self._terminal_seen = True - self._terminal_event_status = ( - "succeeded" if event_status in ("succeeded", "completed") else "failed" - ) - print( - f"[DEBUG] Terminal status detected in event data: {event_status}", - file=sys.stderr, - ) + # Also check event data for status (synthetic events from status check) + event_status = str(event_data.get("data", {}).get("status", "")).lower() + if event_status in TERMINAL_STATUSES and not self._terminal_seen: + self._terminal_seen = True + self._terminal_event_status = ( + "succeeded" + if event_status in ("succeeded", "completed") + else "failed" + ) + print( + f"[DEBUG] Terminal status detected in event data: {event_status}", + file=sys.stderr, + ) yield msg - async def _refresh_status(self, http: RustCoreHttpClient) -> str: + async def _refresh_status(self, http: AsyncHttpClient) -> str: status_payload = await self._poll_status(http) if status_payload: self._last_status_payload = status_payload @@ -506,8 +675,23 @@ async def _refresh_status(self, http: RustCoreHttpClient) -> str: if status: self._last_status_value = status if status in TERMINAL_STATUSES: + # Treat status as authoritative for terminal state. self._terminal_seen = True - print(f"[SDK] Terminal status detected: {status}", flush=True) + if status in {"failed", "cancelled", "canceled"}: + self._terminal_event_status = "failed" + else: + self._terminal_event_status = "succeeded" + if self.debug: + print( + f"[STREAM DEBUG] STATUS TERMINAL: {status}" + ) + self._consecutive_terminal_status_polls = 0 + self._terminal_status_seen_at = None + self._terminal_status_value = None + else: + self._consecutive_terminal_status_polls = 0 + self._terminal_status_seen_at = None + self._terminal_status_value = None return status return self._last_status_value or "" @@ -529,6 +713,7 @@ async def _poll_status(self, http: RustCoreHttpClient) -> dict[str, Any] | None: if isinstance(data, dict): message = StreamMessage.from_status(self.job_id, data) self._dispatch([message]) + self._update_backfill_from_handlers() return data # If all paths failed, log the error for debugging @@ -554,42 +739,39 @@ async def _poll_events(self, http: RustCoreHttpClient) -> list[StreamMessage]: try: data = await http.get(path, params=params) # Debug: Always log what we got from API - print( - f"[DEBUG] Polling {path} with since_seq={since}, limit={limit}", - file=sys.stderr, - ) - print( - f"[DEBUG] Got response from {path}, type={type(data).__name__}, keys={list(data.keys()) if isinstance(data, dict) else 'not dict'}", - file=sys.stderr, + logger.debug("Polling %s with since_seq=%s, limit=%s", path, since, limit) + logger.debug( + "Got response from %s, type=%s, keys=%s", + path, + type(data).__name__, + list(data.keys()) if isinstance(data, dict) else "not dict", ) if isinstance(data, dict): # Check for next_seq to see if we should update our tracking if "next_seq" in data: - print( - f"[DEBUG] Response has next_seq={data.get('next_seq')}, current since={since}", - file=sys.stderr, + logger.debug( + "Response has next_seq=%s, current since=%s", + data.get("next_seq"), + since, ) # Show what keys are in the response for key in data: val = data[key] if isinstance(val, list): - print( - f"[DEBUG] Response[{key}] is list with {len(val)} items", - file=sys.stderr, - ) + logger.debug("Response[%s] is list with %d items", key, len(val)) if len(val) > 0: - print( - f"[DEBUG] First item in {key}: {list(val[0].keys()) if isinstance(val[0], dict) else type(val[0])}", - file=sys.stderr, + logger.debug( + "First item in %s: %s", + key, + list(val[0].keys()) if isinstance(val[0], dict) else type(val[0]), ) elif isinstance(val, dict): - print( - f"[DEBUG] Response[{key}] is dict with keys: {list(val.keys())[:5]}", - file=sys.stderr, + logger.debug( + "Response[%s] is dict with keys: %s", key, list(val.keys())[:5] ) except Exception as e: error_str = str(e) - print(f"[DEBUG] Error polling {path}: {e}", file=sys.stderr) + logger.debug("Error polling %s: %s", path, e) # Fail fast if we get 404 on GraphGen and fallback endpoints (indicates job ID mapping issue) if ( "404" in error_str @@ -609,10 +791,7 @@ async def _poll_events(self, http: RustCoreHttpClient) -> list[StreamMessage]: continue raw_events = _extract_list(data, "events") # Debug: Always log what we extracted - print( - f"[DEBUG] Extracted {len(raw_events)} events from {path} using _extract_list", - file=sys.stderr, - ) + logger.debug("Extracted %d events from %s using _extract_list", len(raw_events), path) # Update last_seq using next_seq if available if isinstance(data, dict) and "next_seq" in data: next_seq = data.get("next_seq") @@ -621,16 +800,13 @@ async def _poll_events(self, http: RustCoreHttpClient) -> list[StreamMessage]: next_seq_int = int(next_seq) if next_seq_int > since: self._last_seq_by_stream[path] = next_seq_int - print( - f"[DEBUG] Updated last_seq for {path} to {next_seq_int}", - file=sys.stderr, - ) + logger.debug("Updated last_seq for %s to %d", path, next_seq_int) except (TypeError, ValueError): pass if raw_events and len(raw_events) > 0: # Log first event type for debugging first_event_type = raw_events[0].get("type", "unknown") - print(f"[DEBUG] First event type: {first_event_type}", file=sys.stderr) + logger.debug("First event type: %s", first_event_type) for event in raw_events: seq_raw = event.get("seq") try: @@ -650,12 +826,16 @@ async def _poll_events(self, http: RustCoreHttpClient) -> list[StreamMessage]: event_job_id = event.get("job_id") or self.job_id event_message = StreamMessage.from_event(event_job_id, event) event_type = str(event.get("type") or "").lower() - if event_type in TERMINAL_EVENT_SUCCESS: + if is_terminal_success_event(event_type): self._terminal_seen = True self._terminal_event_status = "succeeded" - elif event_type in TERMINAL_EVENT_FAILURE: + if self.debug: + print(f"[STREAM DEBUG] POLL TERMINAL SUCCESS: {event_type}") + elif is_terminal_failure_event(event_type): self._terminal_seen = True self._terminal_event_status = "failed" + if self.debug: + print(f"[STREAM DEBUG] POLL TERMINAL FAILURE: {event_type}") messages.append(event_message) total += 1 if self.config.max_events_per_poll and total >= self.config.max_events_per_poll: @@ -711,30 +891,81 @@ async def _poll_timeline(self, http: RustCoreHttpClient) -> list[StreamMessage]: self._terminal_seen = True if phase in {"failed", "cancelled", "canceled"}: self._terminal_event_status = "failed" + if self.debug: + print(f"[STREAM DEBUG] TIMELINE TERMINAL FAILURE: phase={phase}") elif phase: self._terminal_event_status = "succeeded" + if self.debug: + print(f"[STREAM DEBUG] TIMELINE TERMINAL SUCCESS: phase={phase}") messages.append(StreamMessage.from_timeline(timeline_job_id, entry)) return messages def _dispatch(self, messages: Iterable[StreamMessage]) -> None: message_list = list(messages) for message in message_list: - if self.config.deduplicate and message.key in self._seen_messages: + if message.stream_type == StreamType.EVENTS and message.data: + event_type = str(message.data.get("type") or "").lower() + if ( + event_type.startswith("learning.policy.gepa.") + and message.data.get("run_id") is None + and any( + token in event_type + for token in ( + "candidate.evaluated", + "candidate_scored", + "proposal.scored", + "generation.started", + "generation.completed", + ) + ) + ): + if self.debug: + print(f"[STREAM DEBUG] filtered GEPA event without run_id: {event_type}") + continue + dedupe_keys = [message.key] + if message.stream_type == StreamType.EVENTS and message.data: + data = message.data.get("data") + if isinstance(data, dict) and data.get("source") == "status_check": + continue + if ( + self.config.deduplicate + and self.config.dedupe_events + and message.stream_type == StreamType.EVENTS + and message.data + ): + dedupe_keys.extend(_event_dedupe_keys(message.data)) + fingerprint = _event_dedupe_fingerprint(message.data) + if fingerprint: + fp_key = f"event:fp:{fingerprint}" + if message.seq is None: + dedupe_keys = [fp_key] + else: + dedupe_keys.append(fp_key) + if self.config.deduplicate and any(key in self._seen_messages for key in dedupe_keys): continue if self.config.sample_rate < 1.0 and random.random() > self.config.sample_rate: continue if self.config.deduplicate: - self._seen_messages.add(message.key) + self._seen_messages.update(dedupe_keys) + + # Debug: print all events + if self.debug and message.stream_type == StreamType.EVENTS and message.data: + event_type = str(message.data.get("type", "")) + print(f"[STREAM DEBUG] event: {event_type}") # Check for terminal events in dispatch (belt-and-suspenders) if message.stream_type == StreamType.EVENTS and message.data: event_type = str(message.data.get("type", "")).lower() - if event_type in TERMINAL_EVENT_SUCCESS: + if is_terminal_success_event(event_type): self._terminal_seen = True self._terminal_event_status = "succeeded" - elif event_type in TERMINAL_EVENT_FAILURE: + if self.debug: + print(f"[STREAM DEBUG] *** TERMINAL SUCCESS: {event_type} ***") + elif is_terminal_failure_event(event_type): self._terminal_seen = True self._terminal_event_status = "failed" + if self.debug: + print(f"[STREAM DEBUG] *** TERMINAL FAILURE: {event_type} ***") for handler in self.handlers: try: @@ -743,6 +974,27 @@ def _dispatch(self, messages: Iterable[StreamMessage]) -> None: except Exception: pass + def _update_backfill_from_handlers(self) -> None: + if self._force_event_backfill: + return + for handler in self.handlers: + wants = getattr(handler, "wants_event_backfill", None) + if callable(wants) and wants(): + self._force_event_backfill = True + break + if self._terminal_seen: + return + for handler in self.handlers: + hint = getattr(handler, "terminal_hint_ready", None) + if callable(hint) and hint(grace_seconds=TERMINAL_HANDLER_GRACE_SECONDS): + self._terminal_seen = True + self._terminal_event_status = "succeeded" + if self.debug: + print( + f"[STREAM DEBUG] TERMINAL HINT: handler signaled completion after {TERMINAL_HANDLER_GRACE_SECONDS:.0f}s grace" + ) + break + def _metric_cursor(point: dict[str, Any]) -> tuple[int | None, str]: raw_step = point.get("step") @@ -767,6 +1019,82 @@ def _metric_cursor(point: dict[str, Any]) -> tuple[int | None, str]: return step_value, fingerprint +def _event_dedupe_keys(event_data: dict[str, Any]) -> list[str]: + event_type = str(event_data.get("type") or "").lower() + data = event_data.get("data") or {} + if not isinstance(data, dict): + data = {} + + keys: list[str] = [] + run_id = event_data.get("run_id") or data.get("run_id") + + def _with_run(key: str) -> str: + return f"run:{run_id}:{key}" if run_id else key + + event_id = data.get("event_id") or event_data.get("event_id") + if event_id: + keys.append(_with_run(f"event_id:{event_id}")) + + if event_type.startswith("learning.policy.gepa."): + candidate_id = data.get("candidate_id") or data.get("version_id") + if not candidate_id and isinstance(data.get("program_candidate"), dict): + candidate_id = data.get("program_candidate", {}).get("candidate_id") + if candidate_id: + keys.append(_with_run(f"{event_type}:candidate:{candidate_id}")) + generation = data.get("generation") + if generation is not None and event_type.endswith((".generation.started", ".generation.completed")): + keys.append(_with_run(f"{event_type}:generation:{generation}")) + + if event_type.endswith((".job.completed", ".job.failed", ".job.cancelled", ".job.canceled")): + keys.append(_with_run(f"{event_type}:terminal")) + + return keys + + +def _event_dedupe_fingerprint(event_data: dict[str, Any]) -> str: + drop_keys = { + "id", + "job_id", + "run_id", + "seq", + "created_at", + "updated_at", + "timestamp", + "inserted_at", + "emitted_at", + } + drop_data_keys = { + "event_id", + "created_at", + "updated_at", + "timestamp", + "timestamp_ms", + "inserted_at", + "emitted_at", + "run_id", + "workflow_id", + "workflow_run_id", + "activity_id", + "attempt", + "task_queue", + } + + def scrub(value: Any) -> Any: + if isinstance(value, dict): + return {k: scrub(v) for k, v in value.items() if k not in drop_data_keys} + if isinstance(value, list): + return [scrub(item) for item in value] + return value + + cleaned = {key: value for key, value in event_data.items() if key not in drop_keys} + if "data" in cleaned: + cleaned["data"] = scrub(cleaned["data"]) + try: + return json.dumps(cleaned, sort_keys=True, default=str) + except Exception: + return repr(cleaned) + + def _extract_list(data: Any, field: str) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] seen_items: set[int] = set() diff --git a/synth_ai/core/tunnels/rust.py b/synth_ai/core/tunnels/rust.py index 73a2e1cf6..f50713dbb 100644 --- a/synth_ai/core/tunnels/rust.py +++ b/synth_ai/core/tunnels/rust.py @@ -82,7 +82,21 @@ async def wait_for_health_check( api_key: str | None = None, timeout: float = 30.0, ) -> None: - await asyncio.to_thread(synth_ai_py.wait_for_health_check, host, port, api_key, timeout) + # Use pure Python async to avoid GIL contention with uvicorn thread + import httpx + + url = f"http://{host}:{port}/health" + deadline = asyncio.get_event_loop().time() + timeout + async with httpx.AsyncClient() as client: + while asyncio.get_event_loop().time() < deadline: + try: + resp = await client.get(url, timeout=2.0) + if resp.status_code == 200: + return + except (httpx.RequestError, httpx.TimeoutException): + pass + await asyncio.sleep(0.5) + raise RuntimeError(f"health check failed: {url} not ready after {timeout}s") def is_port_available(port: int, host: str = "0.0.0.0") -> bool: diff --git a/synth_ai/core/tunnels/tunneled_api.py b/synth_ai/core/tunnels/tunneled_api.py index 0f29f79a0..474a2e2de 100644 --- a/synth_ai/core/tunnels/tunneled_api.py +++ b/synth_ai/core/tunnels/tunneled_api.py @@ -227,6 +227,10 @@ async def _create_via_rust( if backend_key is None: raise ValueError(f"Unsupported tunnel backend: {backend}") + # Quick tunnels have unreliable DNS verification - skip it + if backend == TunnelBackend.CloudflareQuickTunnel: + verify_dns = False + handle = await asyncio.to_thread( synth_ai_py.tunnel_open, backend_key, diff --git a/synth_ai/sdk/eval/__init__.py b/synth_ai/sdk/eval/__init__.py index df7492ed7..bc9953f92 100644 --- a/synth_ai/sdk/eval/__init__.py +++ b/synth_ai/sdk/eval/__init__.py @@ -29,5 +29,16 @@ """ from .job import EvalJob, EvalJobConfig, EvalResult, EvalStatus - -__all__ = ["EvalJob", "EvalJobConfig", "EvalResult", "EvalStatus"] +from .utils import ( + mean_reward_or_zero, + run_eval_slices, +) + +__all__ = [ + "EvalJob", + "EvalJobConfig", + "EvalResult", + "EvalStatus", + "mean_reward_or_zero", + "run_eval_slices", +] diff --git a/synth_ai/sdk/eval/job.py b/synth_ai/sdk/eval/job.py index 96266ac3e..9751313e2 100644 --- a/synth_ai/sdk/eval/job.py +++ b/synth_ai/sdk/eval/job.py @@ -131,6 +131,7 @@ def from_response(cls, job_id: str, data: Dict[str, Any]) -> "EvalResult": mean_reward = summary.get("mean_reward") or data.get("mean_reward") if mean_reward is None and isinstance(results_info, dict): mean_reward = results_info.get("mean_reward") + total_tokens = summary.get("total_tokens") or data.get("total_tokens") total_cost_usd = summary.get("total_cost_usd") or data.get("total_cost_usd") @@ -546,9 +547,11 @@ def poll_until_complete( self, *, timeout: float = 1200.0, - interval: float = 15.0, + interval: float = 1.0, progress: bool = False, + progress_label: Optional[str] = None, on_status: Optional[Callable[[Dict[str, Any]], None]] = None, + debug: bool = False, ) -> EvalResult: """Poll job until it reaches a terminal state, then return results. @@ -557,9 +560,10 @@ def poll_until_complete( Args: timeout: Maximum seconds to wait (default: 1200 = 20 minutes) - interval: Seconds between poll attempts (default: 15) + interval: Seconds between poll attempts (default: 1) progress: If True, print status updates during polling (useful for notebooks) on_status: Optional callback called on each status update (for custom progress handling) + debug: If True, print verbose debug information Returns: EvalResult with typed status, mean_reward, seed_results, etc. @@ -570,9 +574,9 @@ def poll_until_complete( Example: >>> result = job.poll_until_complete(progress=True) - [00:05] running | 3/10 completed - [00:10] running | 7/10 completed - [00:15] completed | mean_reward: 0.85 + 05s | [Eval] running | 3/10 completed + 10s | [Eval] running | 7/10 completed + 15s | [Eval] completed | mean_reward: 0.85 >>> result.succeeded True >>> result.mean_reward @@ -583,8 +587,26 @@ def poll_until_complete( job_id = self._job_id start_time = time.time() + printer = None + if (progress or progress_label) and on_status is None: + try: + from synth_ai.sdk.optimization.progress.handlers import EvalStatusPrinter + + label = progress_label or "Eval" + total_seeds = len(self.config.seeds) if self.config.seeds else None + printer = EvalStatusPrinter(label=label, total_seeds=total_seeds) + on_status = printer.handle_status + except Exception: + printer = None last_data: Dict[str, Any] = {} + if printer is not None: + with contextlib.suppress(Exception): + printer.log_start(total=len(self.config.seeds) if self.config.seeds else None) + + if interval > 0: + time.sleep(interval) + while True: elapsed = time.time() - start_time if elapsed >= timeout: @@ -611,7 +633,7 @@ def poll_until_complete( ) # Progress output - if progress: + if progress and printer is None: mins, secs = divmod(int(elapsed), 60) if status.is_terminal: # Get final results for mean_reward @@ -641,13 +663,102 @@ def poll_until_complete( # Check terminal state if status.is_terminal: + if debug: + print(f"[DEBUG] terminal status={status.value}", flush=True) # Fetch full results if completed if status == EvalStatus.COMPLETED: - try: - final_results = self.get_results() - return EvalResult.from_response(job_id, final_results) - except Exception: - pass + final_results = None + last_error: Exception | None = None + # Results may lag behind status; retry briefly before falling back. + result_deadline = time.time() + min(10.0, max(0.0, timeout - elapsed)) + while True: + try: + final_results = self.get_results() + if debug: + print(f"[DEBUG] get_results returned: results type={type(final_results.get('results', 'MISSING'))}, len={len(final_results.get('results', [])) if isinstance(final_results.get('results'), list) else 'not-a-list'}", flush=True) + break + except Exception as exc: + last_error = exc + if debug: + print(f"[DEBUG] get_results exception: {exc}", flush=True) + if time.time() >= result_deadline: + break + time.sleep(1.0) + + if final_results is not None: + # Check if get_results() returned empty data - if so, re-fetch status + # to get results from job metadata (populated from events on completion) + results_list = final_results.get("results", []) + has_results = isinstance(results_list, list) and len(results_list) > 0 + if not has_results: + # Re-fetch status to get updated metadata with results + try: + fresh_status = self.get_status() + status_results = fresh_status.get("results", {}) + if debug: + print(f"[DEBUG] get_results empty, fresh_status.results type={type(status_results).__name__}, keys={list(status_results.keys()) if isinstance(status_results, dict) else 'N/A'}, mean_reward={status_results.get('mean_reward') if isinstance(status_results, dict) else 'N/A'}", flush=True) + if isinstance(status_results, dict) and status_results.get("mean_reward") is not None: + final_results = fresh_status + except Exception as e: + if debug: + print(f"[DEBUG] fresh_status fetch failed: {e}", flush=True) + + if printer is not None: + mean_reward = ( + final_results.get("summary", {}).get("mean_reward") + or final_results.get("results", {}).get("mean_reward") + ) + results_info = final_results.get("results", {}) + completed = 0 + total = len(self.config.seeds) + if isinstance(results_info, dict): + completed = results_info.get("completed", completed) + total = results_info.get("total", total) + seed_results = results_info.get("seed_results") + if isinstance(seed_results, list): + completed = len(seed_results) + if results_info.get("total") is None: + total = len(seed_results) + seed_results = final_results.get("seed_results") + if isinstance(seed_results, list): + completed = len(seed_results) + if total is None: + total = len(seed_results) + if (completed == 0) and total: + completed = total + printer.log_terminal( + status="completed", + mean_reward=mean_reward, + ) + eval_result = EvalResult.from_response(job_id, final_results) + if debug: + print(f"[DEBUG] Returning EvalResult: mean_reward={eval_result.mean_reward}, status={eval_result.status.value}", flush=True) + return eval_result + + # Fallback: use status payload if results endpoint isn't ready. + if printer is not None: + results_info = status_data.get("results", {}) + mean_reward = None + completed = None + total = None + if isinstance(results_info, dict): + mean_reward = results_info.get("mean_reward") + completed = results_info.get("completed") + total = results_info.get("total") + seed_results = results_info.get("seed_results") + if isinstance(seed_results, list): + completed = len(seed_results) + if total is None: + total = len(seed_results) + printer.log_terminal( + status="completed", + mean_reward=mean_reward, + ) + if last_error is not None and progress: + print(f"[poll] results fetch failed: {last_error}") + return EvalResult.from_response(job_id, last_data) + if printer is not None: + printer.log_terminal(status=status.value, mean_reward=None) return EvalResult.from_response(job_id, last_data) except Exception as exc: @@ -717,7 +828,9 @@ def stream_until_complete( # Create streamer with eval endpoints # Note: base_url should NOT include /api prefix - JobStreamer adds it - base_url = self._base_url().replace("/api", "").rstrip("/") + base_url = self._base_url().rstrip("/") + if base_url.endswith("/api"): + base_url = base_url[: -len("/api")] streamer = JobStreamer( base_url=base_url, api_key=self.config.api_key, @@ -767,6 +880,7 @@ async def stream_sse_until_complete_async( timeout: float = 1200.0, on_event: Optional[Callable[[Dict[str, Any]], None]] = None, progress: bool = True, + debug: bool = False, ) -> EvalResult: """Stream job events via SSE until completion (async version). @@ -809,7 +923,7 @@ async def stream_sse_until_complete_async( "job.failed", } - if progress: + if debug: print(f"[DEBUG] SSE stream connecting to {sse_url}") try: @@ -870,6 +984,7 @@ def stream_sse_until_complete( timeout: float = 1200.0, on_event: Optional[Callable[[Dict[str, Any]], None]] = None, progress: bool = True, + debug: bool = False, ) -> EvalResult: """Stream job events via SSE until completion (sync wrapper). @@ -901,6 +1016,7 @@ def stream_sse_until_complete( timeout=timeout, on_event=on_event, progress=progress, + debug=debug, ), ) return future.result() @@ -910,6 +1026,7 @@ def stream_sse_until_complete( timeout=timeout, on_event=on_event, progress=progress, + debug=debug, ) ) diff --git a/synth_ai/sdk/eval/utils.py b/synth_ai/sdk/eval/utils.py new file mode 100644 index 000000000..5785ac4c3 --- /dev/null +++ b/synth_ai/sdk/eval/utils.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import hashlib +from typing import Iterable, Sequence + +from .job import EvalJob, EvalJobConfig, EvalResult + + +def mean_reward_or_zero(results: Iterable[EvalResult | None]) -> float: + values = [] + for result in results: + value = result.mean_reward if result and result.mean_reward is not None else 0.0 + values.append(value) + return sum(values) / max(len(values), 1) + + +def run_eval_slices( + label: str, + instruction: str, + seed_slices: Sequence[Sequence[int]], + *, + task_app_url: str, + base_config: dict, + policy_config: dict, + timeout: float = 120.0, + interval: float = 1.0, + parallel: bool = False, + max_workers: int | None = None, + progress: bool = True, + status_interval: float = 15.0, + debug: bool = False, +) -> list[EvalResult]: + from synth_ai.sdk.optimization.progress.handlers import EvalStatusPrinter + + def _instruction_signature(text: str) -> str: + digest = hashlib.sha256(text.encode("utf-8")).hexdigest() + return digest[:12] + + def _instruction_preview(text: str, limit: int = 140) -> str: + normalized = " ".join(str(text).split()) + if len(normalized) <= limit: + return normalized + return f"{normalized[:limit]}..." + + total_seeds = sum(len(s) for s in seed_slices) + printer: EvalStatusPrinter | None = None + if progress: + printer = EvalStatusPrinter(label=label, total_seeds=total_seeds, debug=debug) + printer.log_start(total=total_seeds) + if debug: + preview = _instruction_preview(instruction) + signature = _instruction_signature(instruction) + model = policy_config.get("model") + provider = policy_config.get("provider") + env_name = base_config.get("env_name") + printer.log_debug_config( + "Eval config: " + f"label={label} " + f"instruction_sig={signature} " + f"model={model} " + f"provider={provider} " + f"env={env_name} " + f"seeds={total_seeds} " + f"task_app_url={task_app_url} " + f"instruction_preview={preview}" + ) + + jobs: list[tuple[int, EvalJob]] = [] + for index, seeds in enumerate(seed_slices, start=1): + eval_job = EvalJob( + EvalJobConfig( + task_app_url=task_app_url, + policy_config={ + **policy_config, + "instruction": instruction, + }, + seeds=list(seeds), + **base_config, + ) + ) + eval_job.submit() + if debug and printer: + signature = _instruction_signature(instruction) + printer.log_debug_config( + "Eval job submitted: " + f"label={label} slice={index}/{len(seed_slices)} " + f"job_id={eval_job.job_id} instruction_sig={signature}" + ) + jobs.append((index, eval_job)) + + def poll(entry: tuple[int, EvalJob]) -> tuple[int, EvalResult]: + index, job = entry + result = job.poll_until_complete( + timeout=timeout, + interval=interval, + progress_label=None, # suppress per-slice progress + on_status=printer.handle_status if printer else None, + ) + return index, result + + if not parallel: + results: list[EvalResult] = [] + for entry in jobs: + _, result = poll(entry) + results.append(result) + else: + from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED + + results_by_index: dict[int, EvalResult] = {} + worker_count = max_workers or len(jobs) + with ThreadPoolExecutor(max_workers=worker_count) as executor: + futures = {executor.submit(poll, entry) for entry in jobs} + while futures: + done, futures = wait(futures, timeout=status_interval, return_when=FIRST_COMPLETED) + for future in done: + index, result = future.result() + results_by_index[index] = result + if printer and futures: + printer.tick(min_idle_seconds=status_interval) + results = [results_by_index[index] for index in sorted(results_by_index)] + + if printer: + mean_reward = mean_reward_or_zero(results) + printer.log_terminal(status="completed", mean_reward=mean_reward) + + return results diff --git a/synth_ai/sdk/localapi/_impl/server.py b/synth_ai/sdk/localapi/_impl/server.py index 88afaf912..4a129a486 100644 --- a/synth_ai/sdk/localapi/_impl/server.py +++ b/synth_ai/sdk/localapi/_impl/server.py @@ -445,19 +445,9 @@ async def root() -> Mapping[str, Any]: async def root_head() -> Mapping[str, Any]: return to_jsonable({"status": "ok"}) - @app.get("/health", dependencies=[Depends(auth_dependency)]) - async def health(request: Request) -> Mapping[str, Any]: - # If we got here, auth_dependency already verified the key exactly matches - expected = normalize_environment_api_key() - return to_jsonable( - { - "healthy": True, - "auth": { - "required": True, - "expected_prefix": (expected[:6] + "...") if expected else "", - }, - } - ) + @app.get("/health") + async def health() -> Mapping[str, Any]: + return to_jsonable({"status": "ok", "healthy": True}) @app.post("/done", dependencies=[Depends(auth_dependency)]) async def done() -> Mapping[str, Any]: diff --git a/synth_ai/sdk/optimization/internal/prompt_learning.py b/synth_ai/sdk/optimization/internal/prompt_learning.py index 905cd85d6..dfc3be012 100644 --- a/synth_ai/sdk/optimization/internal/prompt_learning.py +++ b/synth_ai/sdk/optimization/internal/prompt_learning.py @@ -609,8 +609,17 @@ async def stream_until_complete_async( interval: float = 15.0, handlers: Optional[Sequence[Any]] = None, on_event: Optional[Callable[[Dict[str, Any]], None]] = None, + debug: bool = False, ) -> PromptLearningResult: - """Stream job events until completion using SSE (async).""" + """Stream job events until completion using SSE (async). + + Args: + timeout: Maximum time to wait for job completion. + interval: Polling interval when SSE is unavailable. + handlers: Stream handlers for processing events. + on_event: Callback for final status event. + debug: If True, print all events and terminal detection info. + """ import contextlib if not self._job_id: @@ -634,6 +643,7 @@ async def stream_until_complete_async( handlers=handlers, interval=interval, timeout=timeout, + debug=debug, ) final_status = await streamer.stream_until_terminal() @@ -642,18 +652,28 @@ async def stream_until_complete_async( with contextlib.suppress(Exception): on_event(final_status) + # If handlers captured best prompt/score (e.g., GEPA), merge them in. + if isinstance(final_status, dict) and handlers: + for handler in handlers: + best_prompt = getattr(handler, "best_prompt", None) + best_score = getattr(handler, "best_score", None) + if best_prompt and not final_status.get("best_prompt"): + final_status["best_prompt"] = best_prompt + if best_score is not None and not final_status.get("best_score"): + final_status["best_score"] = best_score + # SSE final_status may not have full results - fetch them if job succeeded status_str = str(final_status.get("status", "")).lower() if status_str in ("succeeded", "completed", "success"): with contextlib.suppress(Exception): full_results = await self.get_results_async() - # Merge full results into final_status - final_status.update( - { - "best_prompt": full_results.get("best_prompt"), - "best_score": full_results.get("best_score"), - } - ) + # Merge full results into final_status without clobbering handler-captured values. + best_prompt = full_results.get("best_prompt") + if best_prompt and not final_status.get("best_prompt"): + final_status["best_prompt"] = best_prompt + best_score = full_results.get("best_score") + if best_score is not None and final_status.get("best_score") is None: + final_status["best_score"] = best_score return PromptLearningResult.from_response(self._job_id, final_status) @@ -664,6 +684,7 @@ def stream_until_complete( interval: float = 15.0, handlers: Optional[Sequence[Any]] = None, on_event: Optional[Callable[[Dict[str, Any]], None]] = None, + debug: bool = False, ) -> PromptLearningResult: """Stream job events until completion using SSE. @@ -677,6 +698,7 @@ def stream_until_complete( interval=interval, handlers=handlers, on_event=on_event, + debug=debug, ), label="stream_until_complete() (use stream_until_complete_async in async contexts)", ) diff --git a/synth_ai/sdk/optimization/internal/prompt_learning_streaming.py b/synth_ai/sdk/optimization/internal/prompt_learning_streaming.py index ca721e5d5..c854019be 100644 --- a/synth_ai/sdk/optimization/internal/prompt_learning_streaming.py +++ b/synth_ai/sdk/optimization/internal/prompt_learning_streaming.py @@ -21,6 +21,7 @@ def build_prompt_learning_streamer( handlers: Sequence[Any] | None, interval: float, timeout: float, + debug: bool = False, ) -> JobStreamer: config = StreamConfig( enabled_streams={StreamType.STATUS, StreamType.EVENTS, StreamType.METRICS}, @@ -40,6 +41,7 @@ def build_prompt_learning_streamer( handlers=list(handlers), interval_seconds=interval, timeout_seconds=timeout, + debug=debug, ) diff --git a/synth_ai/sdk/optimization/internal/validators.py b/synth_ai/sdk/optimization/internal/validators.py index 5b796bd02..c9318f907 100644 --- a/synth_ai/sdk/optimization/internal/validators.py +++ b/synth_ai/sdk/optimization/internal/validators.py @@ -19,6 +19,20 @@ raise RuntimeError("synth_ai_py is required for optimization.validators.") from exc +def _sanitize_for_rust(obj: Any) -> Any: + """Convert numpy types to native Python types for Rust serialization.""" + if isinstance(obj, dict): + return {k: _sanitize_for_rust(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_sanitize_for_rust(v) for v in obj] + if isinstance(obj, tuple): + return tuple(_sanitize_for_rust(v) for v in obj) + # Handle numpy integer types + if hasattr(obj, "item") and hasattr(obj, "dtype"): + return obj.item() + return obj + + class ConfigValidationError(Exception): """Raised when a training config is invalid.""" @@ -43,7 +57,8 @@ def validate_prompt_learning_config(config_data: dict[str, Any], config_path: Pa except Exception: pass - errors = synth_ai_py.validate_prompt_learning_config_strict(config_data) + sanitized = _sanitize_for_rust(config_data) + errors = synth_ai_py.validate_prompt_learning_config_strict(sanitized) if errors: _raise_validation_errors(list(errors), config_path) diff --git a/synth_ai/sdk/optimization/progress/__init__.py b/synth_ai/sdk/optimization/progress/__init__.py index d25c64686..4329afa1a 100644 --- a/synth_ai/sdk/optimization/progress/__init__.py +++ b/synth_ai/sdk/optimization/progress/__init__.py @@ -14,6 +14,11 @@ save_summary_json, save_summary_txt, ) +from synth_ai.sdk.optimization.progress.handlers import ( # noqa: F401 + EvalStreamProgressHandler, + GEPAStreamProgressHandler, +) +from synth_ai.sdk.optimization.progress.time import ProgressClock, ProgressPrinter # noqa: F401 __all__ = [ "DisplayMode", @@ -26,4 +31,8 @@ "save_seed_analysis", "save_summary_json", "save_summary_txt", + "EvalStreamProgressHandler", + "GEPAStreamProgressHandler", + "ProgressClock", + "ProgressPrinter", ] diff --git a/synth_ai/sdk/optimization/progress/handlers.py b/synth_ai/sdk/optimization/progress/handlers.py new file mode 100644 index 000000000..06f1c8d99 --- /dev/null +++ b/synth_ai/sdk/optimization/progress/handlers.py @@ -0,0 +1,777 @@ +"""Stream handlers for GEPA optimization and eval progress.""" + + +import json +import textwrap +import time +from typing import Any, Callable + +from synth_ai.core.streaming.handlers import StreamHandler +from synth_ai.core.streaming.types import StreamType +from synth_ai.sdk.optimization.progress.time import ProgressClock, ProgressPrinter + + +def _coerce_int(value: Any) -> int | None: + if value is None or isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return None + return None + + +def _coerce_float(value: Any) -> float | None: + if value is None or isinstance(value, bool): + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + return None + return None + + +def _extract_reward(event_data: dict[str, Any]) -> float | None: + if not isinstance(event_data, dict): + return None + reward = _coerce_float(event_data.get("reward")) + score_obj = event_data.get("score") + if reward is None and isinstance(score_obj, dict): + reward = _coerce_float(score_obj.get("reward")) + if reward is None: + reward = _coerce_float(event_data.get("accuracy")) + return reward + + +class IdleStatusTicker: + """Log status when output has been idle for a configured interval.""" + + def __init__(self) -> None: + self._last_status_time: float | None = None + + def maybe_log( + self, + *, + status: str, + now: float, + last_output_time: float | None, + min_idle_seconds: float, + log_fn: Callable[[str, float], None], + ) -> None: + if last_output_time is None: + return + idle_seconds = now - last_output_time + if idle_seconds < min_idle_seconds: + return + if self._last_status_time is None or self._last_status_time < last_output_time: + next_due = last_output_time + min_idle_seconds + else: + next_due = self._last_status_time + min_idle_seconds + if now < next_due: + return + log_fn(f"Status: {status}", now) + self._last_status_time = now + + +class GEPAStreamProgressHandler(StreamHandler): + """GEPA progress handler with compact, human-friendly formatting.""" + + SKIP_EVENT_SUBSTRINGS = ("rollout.concurrency", "billing") + + def __init__( + self, + total_generations: int | None, + *, + initial_population_size: int | None = None, + children_per_generation: int | None = None, + job_id: str | None = None, + clock: ProgressClock | None = None, + debug: bool = False, + ) -> None: + self.total_generations = total_generations + self.initial_population_size = initial_population_size + self.children_per_generation = children_per_generation + self.job_id = job_id # Filter events to this job only + self.debug = debug + self._printer = ProgressPrinter(clock=clock) + self._phase: str | None = None + self._phase_generation: int | None = None + self._phase_start_time: float | None = None + self._phase_candidates = 0 + self._phase_best_score = 0.0 + self._overall_best = 0.0 + self._last_best_prompt_sig: str | None = None + self._last_best_candidate_sig: str | None = None + self._seen_rollout_concurrency = False + self._past_initial_phase = False + self._generation_offset: int | None = None + self._last_output_time: float | None = None + self._last_status_time: float | None = None + self._status_ticker = IdleStatusTicker() + self._final_generation_complete_seen = False + self._final_generation_complete_seen_at: float | None = None + self._seen_candidates_by_gen: dict[int | None, set[str]] = {} + self._seen_event_ids: set[str] = set() + self._current_generation: int | None = None + self._last_generation_completed: int | None = None + self._last_run_id: str | None = None + self.best_prompt: str | None = None + self.best_score: float | None = None + + def _log( + self, + message: str, + *, + now: float | None = None, + separator: str = "|", + update_idle_timer: bool = True, + ) -> None: + timestamp = self._printer.now() if now is None else now + self._printer.log(message, now=timestamp, separator=separator) + if update_idle_timer: + self._last_output_time = timestamp + + def _log_continuation(self, message: str, *, width: int = 100) -> None: + """Print continuation lines aligned with text after '| ' in clock prefix.""" + # Align with text after "| ": " 49s | " or " 1m 05s | " = 10 chars + margin = " " * 10 + wrapped = textwrap.fill( + message, + width=width, + initial_indent=margin, + subsequent_indent=margin, + break_on_hyphens=False, + ) + print(wrapped) + + def should_handle(self, message) -> bool: + # Filter to only handle events from the target job + if self.job_id is not None and hasattr(message, "job_id"): + return message.job_id == self.job_id + return True + + def _display_generation(self, backend_generation: Any) -> int | None: + generation = _coerce_int(backend_generation) + if generation is None: + return None + if generation == 0: + self._generation_offset = 1 + elif self._generation_offset is None: + if self.total_generations is not None and generation > self.total_generations: + self._generation_offset = 1 + else: + self._generation_offset = 0 + return generation + (self._generation_offset or 0) + + def _dedupe_event(self, event_type: str, event_data: dict[str, Any], run_id: str | None) -> bool: + if not event_type.startswith("learning.policy.gepa."): + return False + + # Run restarts: reset state when run_id changes + if run_id and self._last_run_id and run_id != self._last_run_id: + self._seen_candidates_by_gen.clear() + self._seen_event_ids.clear() + self._current_generation = None + self._last_generation_completed = None + if run_id: + self._last_run_id = run_id + + generation = _coerce_int(event_data.get("generation")) + + if "candidate" in event_type: + candidate_id = ( + event_data.get("candidate_id") + or event_data.get("version_id") + or event_data.get("id") + ) + if not candidate_id and isinstance(event_data.get("program_candidate"), dict): + candidate_id = event_data.get("program_candidate", {}).get("candidate_id") + if candidate_id: + seen = self._seen_candidates_by_gen.setdefault(generation, set()) + if str(candidate_id) in seen: + return True + seen.add(str(candidate_id)) + + # Drop out-of-order generation repeats (e.g., replays after completion) + if event_type.endswith("generation.started") and generation is not None: + if self._last_generation_completed is not None and generation <= self._last_generation_completed: + return True + if event_type.endswith("generation.completed") and generation is not None: + if self._last_generation_completed is not None and generation <= self._last_generation_completed: + return True + + return False + + def _phase_label(self) -> str: + if self._phase == "initial": + return "Initial population" + if self._phase_generation is None: + return "Generation" + if self.total_generations: + return f"Generation {self._phase_generation}/{self.total_generations}" + return f"Generation {self._phase_generation}" + + def _print_phase_complete(self, now: float) -> None: + if self._phase is None: + return + self._log( + f"{self._phase_label()} complete. Best reward: {self._phase_best_score:.2f}", + now=now, + separator="└─", + ) + + def _start_phase(self, phase: str, generation: int | None, now: float) -> None: + if self._phase == phase and self._phase_generation == generation: + return + if self._phase is not None: + self._print_phase_complete(now) + self._phase = phase + self._phase_generation = generation + self._phase_start_time = now + self._phase_candidates = 0 + self._phase_best_score = 0.0 + self._current_generation = generation + if phase == "initial": + self._log("Initial population started", now=now, separator="┌─", update_idle_timer=False) + else: + self._log(f"{self._phase_label()} started", now=now, separator="┌─", update_idle_timer=False) + + def _format_best_prompt(self, prompt: Any) -> str: + if prompt is None: + return "" + if isinstance(prompt, str): + # Handle JSON strings that might be wrapped + text = prompt.strip() + # Strip "json" prefix if present (e.g., "json { ... }") + if text.lower().startswith("json"): + text = text[4:].lstrip() + if text.startswith("{") and '"instruction"' in text: + try: + parsed = json.loads(text) + return str(parsed.get("instruction", text)).strip() + except Exception: + pass + return text + if isinstance(prompt, dict): + # Direct instruction field (most common) + instruction = prompt.get("instruction") + if instruction: + return str(instruction).strip() + # Nested data.instruction + data = prompt.get("data") if isinstance(prompt.get("data"), dict) else prompt + if isinstance(data, dict) and data.get("instruction"): + return str(data.get("instruction")).strip() + # Messages format + messages = data.get("messages") if isinstance(data, dict) else None + if isinstance(messages, list): + for msg in messages: + if isinstance(msg, dict) and msg.get("role") == "system": + text = msg.get("pattern") or msg.get("content") + if text: + return str(text).strip() + try: + return json.dumps(prompt, default=str) + except Exception: + return str(prompt) + + def _format_best_candidate(self, candidate: Any) -> str: + if candidate is None: + return "" + if isinstance(candidate, dict): + candidate_id = ( + candidate.get("candidate_id") + or candidate.get("version_id") + or candidate.get("key") + or candidate.get("id") + ) + reward = candidate.get("reward") + score_obj = candidate.get("score") if isinstance(candidate.get("score"), dict) else {} + if reward is None and score_obj: + reward = score_obj.get("reward") + prompt_text = candidate.get("prompt_text") or candidate.get("best_prompt_text") + parts = [] + if candidate_id: + parts.append(f"id={candidate_id}") + if reward is not None: + parts.append( + f"reward={reward:.3f}" + if isinstance(reward, (int, float)) + else f"reward={reward}" + ) + if prompt_text: + parts.append(f"prompt={str(prompt_text).strip()}") + return " | ".join(parts) if parts else json.dumps(candidate, default=str) + return str(candidate) + + def _extract_instruction_text(self, event_data: dict[str, Any]) -> str | None: + instruction = ( + event_data.get("instruction") + or event_data.get("best_prompt") + or event_data.get("prompt_text") + ) + if instruction is None: + return None + text = self._format_best_prompt(instruction) + if not text: + return None + return " ".join(str(text).split()) + + def _maybe_print_best(self, now: float, event_data: dict[str, Any]) -> None: + # Track best prompt internally without printing (shown via candidate lines) + best_prompt = event_data.get("best_prompt") + if best_prompt is not None: + sig = json.dumps(best_prompt, default=str, sort_keys=True) + if sig != self._last_best_prompt_sig: + self._last_best_prompt_sig = sig + text = self._format_best_prompt(best_prompt) + if text: + self.best_prompt = text + best_score = _coerce_float(event_data.get("best_score")) + if best_score is not None: + self.best_score = max(self.best_score or 0.0, best_score) + + # Track best candidate internally without printing (shown via candidate lines) + best_candidate = event_data.get("best_candidate") + if best_candidate is not None: + sig = json.dumps(best_candidate, default=str, sort_keys=True) + if sig != self._last_best_candidate_sig: + self._last_best_candidate_sig = sig + + def handle(self, message) -> None: + if not self.should_handle(message): + return + now = self._printer.now() + + data = message.data or {} + if message.stream_type == StreamType.STATUS: + status = str(data.get("status") or data.get("state") or "").lower() + if status: + self._status_ticker.maybe_log( + status=status, + now=now, + last_output_time=self._last_output_time, + min_idle_seconds=15.0, + log_fn=lambda msg, ts: self._log(msg, now=ts), + ) + return + event_type = str(data.get("type", "")) + + if "rollout.concurrency" in event_type and not self._seen_rollout_concurrency: + self._seen_rollout_concurrency = True + if self._phase is None: + self._start_phase("initial", None, now) + return + + if any(skip in event_type for skip in self.SKIP_EVENT_SUBSTRINGS): + return + + event_data = data.get("data", {}) if isinstance(data.get("data"), dict) else {} + run_id = data.get("run_id") + if self.debug: + try: + debug_payload = { + "stream_type": str(getattr(message, "stream_type", "")), + "job_id": getattr(message, "job_id", None), + "type": event_type, + "run_id": run_id, + "data": data, + } + self._log( + f"[DEBUG] GEPA message", + now=now, + update_idle_timer=False, + ) + self._log_continuation(json.dumps(debug_payload, default=str, sort_keys=True)) + except Exception as exc: + self._log( + f"[DEBUG] GEPA message (failed to serialize): {type(exc).__name__}: {exc}", + now=now, + update_idle_timer=False, + ) + if self._dedupe_event(event_type, event_data, run_id): + return + self._maybe_print_best(now, event_data) + + if "generation.started" in event_type or "generation.start" in event_type: + if run_id is None: + return + generation = self._display_generation(event_data.get("generation")) + if generation is not None: + self._past_initial_phase = True + self._start_phase("gen", generation, now) + return + + if "generation.completed" in event_type or "generation.complete" in event_type: + if run_id is None: + return + generation = self._display_generation(event_data.get("generation")) + if generation is not None: + self._past_initial_phase = True + self._start_phase("gen", generation, now) + candidates_eval = _coerce_int(event_data.get("candidates_evaluated")) + if candidates_eval is not None: + self._phase_candidates = candidates_eval + self._print_phase_complete(now) + self._phase = None + self._phase_generation = None + self._phase_start_time = None + self._last_generation_completed = generation + if self.total_generations and generation >= self.total_generations: + self._final_generation_complete_seen = True + self._final_generation_complete_seen_at = time.time() + return + + if "candidate.evaluated" in event_type or "candidate_scored" in event_type or "proposal.scored" in event_type: + if run_id is None: + return + generation = self._display_generation(event_data.get("generation")) + mutation_type = event_data.get("mutation_type") + if generation is None: + if self._phase is None: + self._start_phase("initial", None, now) + # If the backend doesn't attach generation info, infer when we've left initial population. + if ( + self._phase == "initial" + and self.initial_population_size + and self._phase_candidates >= self.initial_population_size + and self.children_per_generation + ) or (self._phase == "initial" and mutation_type not in (None, "initial")): + self._past_initial_phase = True + next_gen = (self._last_generation_completed or 0) + 1 + self._start_phase("gen", next_gen, now) + else: + self._past_initial_phase = True + self._start_phase("gen", generation, now) + + if self._phase is None: + self._start_phase("initial", None, now) + + self._phase_candidates += 1 + score = _extract_reward(event_data) or 0.0 + best_score = _coerce_float(event_data.get("best_score") or event_data.get("best_reward")) + prev_best = self._overall_best + if score > self._phase_best_score: + self._phase_best_score = score + is_new_best = False + if best_score is not None: + is_new_best = best_score > prev_best + self._overall_best = max(self._overall_best, best_score) + else: + is_new_best = score > prev_best + self._overall_best = max(self._overall_best, score) + self.best_score = max(self.best_score or 0.0, self._overall_best) + candidate_text = "" + if is_new_best: + candidate_text = self._extract_instruction_text(event_data) or "" + if candidate_text: + self.best_prompt = candidate_text + if best_score is not None: + self.best_score = max(self.best_score or 0.0, best_score) + else: + self.best_score = max(self.best_score or 0.0, score) + # Determine expected candidate count for this phase + if self._phase == "initial": + phase_total = self.initial_population_size + else: + phase_total = self.children_per_generation + if phase_total: + candidate_label = f"Candidate {self._phase_candidates}/{phase_total}" + else: + candidate_label = f"Candidate {self._phase_candidates}" + self._log(f"{candidate_label}: mean_reward={score:.2f}", now=now) + # Show instruction for new bests (candidate_text already extracted above) + if is_new_best and candidate_text: + self._log_continuation("New best prompt:") + self._log_continuation(candidate_text) + return + + if "job.completed" in event_type or event_type.endswith("job.completed"): + if self._phase is not None: + self._print_phase_complete(now) + self._log(f"Job completed | overall best={self._overall_best:.2f}", now=now) + return + + if "job.failed" in event_type or event_type.endswith("job.failed"): + self._log(f"Job failed: {event_data}", now=now) + return + + def flush(self) -> None: # pragma: no cover - no buffered output + return None + + def wants_event_backfill(self) -> bool: + return self._final_generation_complete_seen + + def terminal_hint_ready(self, *, grace_seconds: float = 8.0) -> bool: + if not self._final_generation_complete_seen_at: + return False + return (time.time() - self._final_generation_complete_seen_at) >= grace_seconds + + def status_tick(self, status: str) -> None: + now = self._printer.now() + self._log(f"Status: {status}", now=now) + self._last_status_time = now + + def status_tick_if_idle(self, status: str, *, min_idle_seconds: float) -> None: + now = self._printer.now() + self._status_ticker.maybe_log( + status=status, + now=now, + last_output_time=self._last_output_time, + min_idle_seconds=min_idle_seconds, + log_fn=lambda msg, ts: self._log(msg, now=ts), + ) + + +class EvalStreamProgressHandler(StreamHandler): + """Eval progress handler with the same time formatting as GEPA.""" + + def __init__( + self, + label: str, + total_seeds: int, + *, + job_id: str | None = None, + clock: ProgressClock | None = None, + log_every: int = 10, + debug: bool = False, + ) -> None: + self.label = label + self.total_seeds = total_seeds + self.job_id = job_id # Filter events to this job only + self.log_every = max(1, int(log_every)) + self.debug = debug + self._printer = ProgressPrinter(label=label, clock=clock) + self._completed_seeds = 0 + self._total_reward = 0.0 + self._rewards: list[float] = [] + self._started = False + self._completed = False + self._seen_seeds: set[int] = set() + self._last_output_time: float | None = None + self._status_ticker = IdleStatusTicker() + + def _log(self, message: str, *, now: float | None = None) -> None: + timestamp = self._printer.now() if now is None else now + self._printer.log(message, now=timestamp) + self._last_output_time = timestamp + + def should_handle(self, message) -> bool: + # Filter to only handle events from the target job + if self.job_id is not None and hasattr(message, "job_id"): + return message.job_id == self.job_id + return True + + def _record_seed(self, reward: float | None, seed: int | None, now: float) -> None: + if seed is not None: + if seed in self._seen_seeds: + return + self._seen_seeds.add(seed) + if reward is None: + return + self._completed_seeds += 1 + self._rewards.append(reward) + self._total_reward += reward + if ( + self._completed_seeds % self.log_every == 0 + or self._completed_seeds == self.total_seeds + ): + mean = self._total_reward / self._completed_seeds if self._completed_seeds else 0.0 + self._log( + f"Progress: {self._completed_seeds}/{self.total_seeds} | mean_reward={mean:.3f}", + now=now, + ) + + def record_rollout(self, seed: int, reward: float) -> None: + now = self._printer.now() + if not self._started: + self._started = True + self._log(f"Eval started: {self.total_seeds} seeds", now=now) + self._record_seed(reward, seed, now) + + def finish(self) -> None: + if self._completed: + return + self._completed = True + now = self._printer.now() + mean = self._total_reward / self._completed_seeds if self._completed_seeds else 0.0 + self._log(f"Eval completed: mean_reward={mean:.3f}", now=now) + + def handle(self, message) -> None: + if not self.should_handle(message): + return + if self.debug: + now = self._printer.now() + try: + debug_payload = { + "stream_type": str(getattr(message, "stream_type", "")), + "job_id": getattr(message, "job_id", None), + "type": str(getattr(message, "data", {}).get("type", "")), + "run_id": getattr(message, "data", {}).get("run_id"), + "data": getattr(message, "data", None), + } + self._log( + "[DEBUG] Eval message", + now=now, + ) + self._log_continuation(json.dumps(debug_payload, default=str, sort_keys=True)) + except Exception as exc: + self._log( + f"[DEBUG] Eval message (failed to serialize): {type(exc).__name__}: {exc}", + now=now, + ) + if message.stream_type == StreamType.STATUS: + status = str(message.data.get("status") or message.data.get("state") or "").lower() + if status: + now = self._printer.now() + self._status_ticker.maybe_log( + status=status, + now=now, + last_output_time=self._last_output_time, + min_idle_seconds=15.0, + log_fn=lambda msg, ts: self._log(msg, now=ts), + ) + return + now = self._printer.now() + data = message.data or {} + event_type = str(data.get("type", "")) + event_data = data.get("data", {}) if isinstance(data.get("data"), dict) else {} + + if event_type == "eval.policy.job.started": + if not self._started: + self._started = True + seed_count = event_data.get("seed_count", self.total_seeds) + self._log(f"Eval started: {seed_count} seeds", now=now) + return + + if event_type == "eval.policy.seed.completed": + seed = event_data.get("seed") + reward = _coerce_float(event_data.get("reward")) + self._record_seed(reward, seed, now) + return + + if event_type == "eval.policy.job.completed": + mean_reward = _coerce_float(event_data.get("mean_reward")) + if mean_reward is not None: + self._log(f"Eval completed: mean_reward={mean_reward:.3f}", now=now) + self._completed = True + return + + if event_type == "eval.policy.job.failed": + error = event_data.get("error", "unknown error") + self._log(f"Eval failed: {error}", now=now) + self._completed = True + return + + +class EvalStatusPrinter: + """Compact, de-duplicated polling output for eval jobs.""" + + def __init__( + self, + *, + label: str, + total_seeds: int | None = None, + debug: bool = False, + ) -> None: + self._label = label + self._total_seeds = total_seeds + self._debug = debug + self._printer = ProgressPrinter(label=label) + self._started = False + self._last_completed: int | None = None + self._last_total: int | None = None + self._last_output_time: float | None = None + self._status_ticker = IdleStatusTicker() + + def _log(self, message: str) -> None: + now = self._printer.now() + self._printer.log(message, now=now) + self._last_output_time = now + + def _extract_progress(self, status_data: dict[str, Any]) -> tuple[int, int, float | None]: + results = status_data.get("results", {}) + completed = results.get("completed", 0) if isinstance(results, dict) else 0 + total = ( + results.get("total", self._total_seeds) + if isinstance(results, dict) + else self._total_seeds + ) + if isinstance(results, dict): + seed_results = results.get("seed_results") + if isinstance(seed_results, list): + completed = len(seed_results) + total = total if total is not None else 0 + mean_reward = None + if isinstance(results, dict): + mean_reward = results.get("mean_reward") + if mean_reward is None: + summary = results.get("summary") + if isinstance(summary, dict): + mean_reward = summary.get("mean_reward") + return int(completed), int(total), mean_reward + + def log_start(self, *, total: int | None = None) -> None: + if self._started: + return + total = total if total is not None else self._total_seeds + total = total if total is not None else 0 + self._log(f"Eval started: {total} seeds") + self._started = True + + def log_debug_config(self, message: str) -> None: + if not self._debug: + return + self._log(f"[DEBUG] {message}") + + def handle_status(self, status_data: dict[str, Any]) -> None: + if self._debug: + try: + self._log(f"[DEBUG] Eval status: {json.dumps(status_data, default=str, sort_keys=True)}") + except Exception as exc: + self._log(f"[DEBUG] Eval status (failed to serialize): {type(exc).__name__}: {exc}") + status = str(status_data.get("status", "pending")).lower() + completed, total, _ = self._extract_progress(status_data) + if not self._started and status in {"pending", "running"}: + self.log_start(total=total) + self._last_completed = completed + self._last_total = total + if status in {"pending", "running"}: + now = time.time() + self._status_ticker.maybe_log( + status=status, + now=now, + last_output_time=self._last_output_time, + min_idle_seconds=15.0, + log_fn=lambda msg, _ts: self._log(msg), + ) + + def log_terminal( + self, + *, + status: str, + mean_reward: float | None = None, + ) -> None: + if status in {"completed", "succeeded", "success"}: + reward_str = f"{mean_reward:.3f}" if mean_reward is not None else "--" + self._log(f"Eval completed: mean_reward={reward_str}") + else: + self._log(f"Eval {status}") + + def tick(self, *, min_idle_seconds: float = 15.0) -> None: + """Print status if idle for min_idle_seconds.""" + now = time.time() + self._status_ticker.maybe_log( + status="running", + now=now, + last_output_time=self._last_output_time, + min_idle_seconds=min_idle_seconds, + log_fn=lambda msg, _ts: self._log(msg), + ) diff --git a/synth_ai/sdk/optimization/progress/time.py b/synth_ai/sdk/optimization/progress/time.py new file mode 100644 index 000000000..721f3df25 --- /dev/null +++ b/synth_ai/sdk/optimization/progress/time.py @@ -0,0 +1,58 @@ +"""Shared time formatting utilities for progress handlers.""" + + +import time +from dataclasses import dataclass + + +@dataclass +class ProgressClock: + """Track elapsed time and render a compact prefix.""" + + start_time: float | None = None + + def now(self) -> float: + current = time.time() + if self.start_time is None: + self.start_time = current + return current + + def elapsed(self, now: float | None = None) -> float: + if self.start_time is None: + return 0.0 + if now is None: + now = time.time() + return max(0.0, now - self.start_time) + + def prefix(self, now: float | None = None, *, separator: str = "|") -> str: + elapsed = int(self.elapsed(now)) + if elapsed >= 60: + mins = elapsed // 60 + secs = elapsed % 60 + return f"{mins:2d}m {secs:02d}s {separator}" + return f" {elapsed:02d}s {separator}" + + def reset(self) -> None: + self.start_time = None + + +class ProgressPrinter: + """Centralized logging format for progress handlers.""" + + def __init__(self, *, label: str | None = None, clock: ProgressClock | None = None) -> None: + self._label = label + self._clock = clock or ProgressClock() + + def now(self) -> float: + return self._clock.now() + + def elapsed(self, now: float | None = None) -> float: + return self._clock.elapsed(now) + + def log(self, message: str, *, now: float | None = None, separator: str = "|") -> None: + timestamp = self._clock.now() if now is None else now + prefix = self._clock.prefix(timestamp, separator=separator) + if self._label: + print(f"{prefix} [{self._label}] {message}") + else: + print(f"{prefix} {message}") diff --git a/synth_ai/sdk/utils/__init__.py b/synth_ai/sdk/utils/__init__.py new file mode 100644 index 000000000..9c7bd15cd --- /dev/null +++ b/synth_ai/sdk/utils/__init__.py @@ -0,0 +1,10 @@ +"""General-purpose SDK utilities.""" + +from .seeds import split_seed_slices, stratified_seed_sample +from .stats import confidence_band + +__all__ = [ + "confidence_band", + "split_seed_slices", + "stratified_seed_sample", +] diff --git a/synth_ai/sdk/utils/seeds.py b/synth_ai/sdk/utils/seeds.py new file mode 100644 index 000000000..3bbe2038d --- /dev/null +++ b/synth_ai/sdk/utils/seeds.py @@ -0,0 +1,53 @@ +import random +from typing import Sequence + + +def stratified_seed_sample(df, label_col: str, total: int, rng: random.Random | None = None) -> list[int]: + """Return up to `total` row indices, roughly balanced across label groups.""" + if total <= 0: + return [] + rng = rng or random.Random() + frame = df.reset_index(drop=True) + groups = frame.groupby(label_col).indices + labels = list(groups.keys()) + if not labels: + return [] + rng.shuffle(labels) + + base = total // len(labels) + remainder = total % len(labels) + seeds: list[int] = [] + for idx, label in enumerate(labels): + target = base + (1 if idx < remainder else 0) + indices = list(groups[label]) + if target >= len(indices): + seeds.extend(indices) + else: + seeds.extend(rng.sample(indices, target)) + + if len(seeds) < total: + all_indices = list(range(len(frame))) + remaining = list(set(all_indices) - set(seeds)) + needed = total - len(seeds) + if needed <= len(remaining): + seeds.extend(rng.sample(remaining, needed)) + else: + seeds.extend(remaining) + seeds.extend(rng.choices(all_indices, k=needed - len(remaining))) + + rng.shuffle(seeds) + return seeds[:total] + + +def split_seed_slices(seeds: Sequence[int], slices: int) -> list[list[int]]: + if slices <= 1: + return [list(seeds)] + base = len(seeds) // slices + remainder = len(seeds) % slices + sizes = [base + (1 if i < remainder else 0) for i in range(slices)] + seed_slices: list[list[int]] = [] + cursor = 0 + for size in sizes: + seed_slices.append(list(seeds[cursor:cursor + size])) + cursor += size + return seed_slices diff --git a/synth_ai/sdk/utils/stats.py b/synth_ai/sdk/utils/stats.py new file mode 100644 index 000000000..08f69f268 --- /dev/null +++ b/synth_ai/sdk/utils/stats.py @@ -0,0 +1,4 @@ +def confidence_band(mean_reward: float, n: int, z: float = 1.96) -> float: + if n <= 0: + return 0.0 + return z * ((mean_reward * (1.0 - mean_reward)) / n) ** 0.5 diff --git a/synth_ai_core/src/tunnels/cloudflared.rs b/synth_ai_core/src/tunnels/cloudflared.rs index 5303417ac..c645ee2ca 100644 --- a/synth_ai_core/src/tunnels/cloudflared.rs +++ b/synth_ai_core/src/tunnels/cloudflared.rs @@ -15,7 +15,7 @@ use tokio::task::JoinHandle; use crate::tunnels::errors::TunnelError; use crate::shared_client::DEFAULT_CONNECT_TIMEOUT_SECS; -static URL_RE: Lazy = Lazy::new(|| Regex::new(r"https://[a-z0-9-]+\\.trycloudflare\\.com").unwrap()); +static URL_RE: Lazy = Lazy::new(|| Regex::new(r"https://[a-z0-9-]+\.trycloudflare\.com").unwrap()); const CLOUDFLARED_RELEASES: &str = "https://updatecloudflared.com/launcher"; @@ -486,13 +486,37 @@ pub async fn verify_tunnel_dns_resolution( let deadline = Instant::now() + Duration::from_secs_f64(timeout_seconds); let mut last_err: Option = None; loop { - if Instant::now() > deadline { + let now = Instant::now(); + if now > deadline { return Err(TunnelError::dns(format!( "dns verification timeout: {} ({:?})", hostname, last_err ))); } - let ip = resolve_hostname_with_explicit_resolvers(hostname).await?; + // Don't fail immediately on DNS resolution errors - retry within timeout + let remaining = deadline.saturating_duration_since(now); + let resolve_timeout = std::cmp::min(Duration::from_secs(5), remaining); + let ip = match tokio::time::timeout( + resolve_timeout, + resolve_hostname_with_explicit_resolvers(hostname), + ) + .await + { + Ok(Ok(ip)) => ip, + Ok(Err(e)) => { + last_err = Some(e.to_string()); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + Err(_) => { + last_err = Some(format!( + "dns resolve timeout after {}s", + resolve_timeout.as_secs_f64() + )); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; let port = if parsed.scheme() == "http" { 80 } else { 443 }; let builder = reqwest::Client::builder() .timeout(Duration::from_secs(5)) diff --git a/synth_ai_core/src/tunnels/connector.rs b/synth_ai_core/src/tunnels/connector.rs index b4a14c98c..7f93617e3 100644 --- a/synth_ai_core/src/tunnels/connector.rs +++ b/synth_ai_core/src/tunnels/connector.rs @@ -154,11 +154,15 @@ impl TunnelConnector { } pub async fn stop(&mut self) -> Result<(), TunnelError> { - let mut proc_opt = self.prepare_stop(); - if let Some(proc) = &mut proc_opt { + self.cancel_idle_timer(); + if let Some(proc) = &mut self.process { let _ = proc.start_kill(); let _ = proc.wait().await; } + self.process = None; + self.state = ConnectorState::Stopped; + self.current_token = None; + self.active_leases.clear(); Ok(()) } @@ -190,28 +194,34 @@ impl TunnelConnector { self.idle_task = Some(tokio::spawn(async move { tokio::time::sleep(timeout).await; let connector = get_connector(); - let mut proc_opt = None; - { + + // Take the process out while holding the lock briefly + // (parking_lot::MutexGuard is not Send, so we can't hold it across await) + let process_to_wait = { let mut guard = connector.lock(); if guard.active_leases.is_empty() { - proc_opt = guard.prepare_stop(); + if let Some(proc) = &mut guard.process { + let _ = proc.start_kill(); + } + guard.process.take() + } else { + None } - } - if let Some(mut proc) = proc_opt { - let _ = proc.start_kill(); + }; + // Lock is dropped here + + // Wait for process outside the lock + if let Some(mut proc) = process_to_wait { let _ = proc.wait().await; + + // Re-acquire lock to update state + let mut guard = connector.lock(); + guard.state = ConnectorState::Stopped; + guard.current_token = None; + guard.active_leases.clear(); } })); } - - fn prepare_stop(&mut self) -> Option { - self.cancel_idle_timer(); - let proc = self.process.take(); - self.state = ConnectorState::Stopped; - self.current_token = None; - self.active_leases.clear(); - proc - } } fn push_log(logs: &Arc>>, line: &str) { diff --git a/synth_ai_py/src/lib.rs b/synth_ai_py/src/lib.rs index af726195a..4739da13d 100644 --- a/synth_ai_py/src/lib.rs +++ b/synth_ai_py/src/lib.rs @@ -33,6 +33,7 @@ use synth_ai_core::tunnels::types::{ TunnelHandle as RustTunnelHandle, }; use synth_ai_core::tunnels::errors::TunnelError; +use synth_ai_core::tunnels::manager as tunnel_manager; use synth_ai_core::urls::{ make_local_api_url as core_make_local_api_url, normalize_backend_base as core_normalize_backend_base, @@ -1264,6 +1265,16 @@ fn tunnel_open( } } +#[pyfunction] +fn tunnel_close(lease_id: String) -> PyResult<()> { + let result = RUNTIME.block_on(async move { + let manager = tunnel_manager::get_manager(None, None); + let mut guard = manager.lock(); + guard.close(&lease_id).await + }); + result.map_err(|e| PyValueError::new_err(e.to_string())) +} + // ============================================================================= // Jobs Module - JobLifecycle state machine // ============================================================================= @@ -5783,6 +5794,7 @@ fn synth_ai_py(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(verify_tunnel_dns_resolution, m)?)?; m.add_function(wrap_pyfunction!(wait_for_health_check, m)?)?; m.add_function(wrap_pyfunction!(tunnel_open, m)?)?; + m.add_function(wrap_pyfunction!(tunnel_close, m)?)?; m.add_function(wrap_pyfunction!(get_cloudflared_path, m)?)?; m.add_function(wrap_pyfunction!(ensure_cloudflared_installed, m)?)?; m.add_function(wrap_pyfunction!(require_cloudflared, m)?)?; diff --git a/uv.lock b/uv.lock index 5f57515f7..f6afe86dc 100644 --- a/uv.lock +++ b/uv.lock @@ -1270,7 +1270,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/cb/48e964c452ca2b92175a9b2dca037a553036cb053ba69e284650ce755f13/greenlet-3.3.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e29f3018580e8412d6aaf5641bb7745d38c85228dacf51a73bd4e26ddf2a6a8e", size = 274908, upload-time = "2025-12-04T14:23:26.435Z" }, { url = "https://files.pythonhosted.org/packages/28/da/38d7bff4d0277b594ec557f479d65272a893f1f2a716cad91efeb8680953/greenlet-3.3.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a687205fb22794e838f947e2194c0566d3812966b41c78709554aa883183fb62", size = 577113, upload-time = "2025-12-04T14:50:05.493Z" }, { url = "https://files.pythonhosted.org/packages/3c/f2/89c5eb0faddc3ff014f1c04467d67dee0d1d334ab81fadbf3744847f8a8a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:4243050a88ba61842186cb9e63c7dfa677ec146160b0efd73b855a3d9c7fcf32", size = 590338, upload-time = "2025-12-04T14:57:41.136Z" }, - { url = "https://files.pythonhosted.org/packages/80/d7/db0a5085035d05134f8c089643da2b44cc9b80647c39e93129c5ef170d8f/greenlet-3.3.0-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:670d0f94cd302d81796e37299bcd04b95d62403883b24225c6b5271466612f45", size = 601098, upload-time = "2025-12-04T15:07:11.898Z" }, { url = "https://files.pythonhosted.org/packages/dc/a6/e959a127b630a58e23529972dbc868c107f9d583b5a9f878fb858c46bc1a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6cb3a8ec3db4a3b0eb8a3c25436c2d49e3505821802074969db017b87bc6a948", size = 590206, upload-time = "2025-12-04T14:26:01.254Z" }, { url = "https://files.pythonhosted.org/packages/48/60/29035719feb91798693023608447283b266b12efc576ed013dd9442364bb/greenlet-3.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:2de5a0b09eab81fc6a382791b995b1ccf2b172a9fec934747a7a23d2ff291794", size = 1550668, upload-time = "2025-12-04T15:04:22.439Z" }, { url = "https://files.pythonhosted.org/packages/0a/5f/783a23754b691bfa86bd72c3033aa107490deac9b2ef190837b860996c9f/greenlet-3.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4449a736606bd30f27f8e1ff4678ee193bc47f6ca810d705981cfffd6ce0d8c5", size = 1615483, upload-time = "2025-12-04T14:27:28.083Z" }, @@ -1278,7 +1277,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, - { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -1286,7 +1284,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, - { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -1294,7 +1291,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" }, { url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" }, { url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" }, - { url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" }, { url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" }, { url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" }, { url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" }, @@ -1302,7 +1298,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" }, { url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" }, { url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" }, - { url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" }, { url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" }, { url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" }, { url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" },