Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 137 additions & 2 deletions synth_ai/sdk/optimization/internal/learning/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
import json
import warnings
from collections.abc import Callable
from collections.abc import Callable, Mapping
from contextlib import suppress
from pathlib import Path
from typing import Any, TypedDict
from typing import Any, Optional, TypedDict

from synth_ai.core.errors import HTTPError
from synth_ai.core.levers import ScopeKey
from synth_ai.core.rust_core.http import RustCoreHttpClient, sleep
from synth_ai.sdk.optimization.models import LeverHandle, SensorFrame
from synth_ai.sdk.shared.models import UnsupportedModelError, normalize_model_identifier


def _normalize_scope_payload(scope: list[Any] | None) -> Optional[list[dict[str, Any]]]:
if not scope:
return None
normalized: list[dict[str, Any]] = []
for item in scope:
if isinstance(item, ScopeKey):
normalized.append(item.to_dict())
elif isinstance(item, dict):
normalized.append(item)
return normalized or None


def _payload_to_dict(value: Any) -> dict[str, Any]:
if isinstance(value, Mapping):
return dict(value)
if hasattr(value, "to_dict"):
return value.to_dict() # type: ignore[attr-defined]
raise ValueError("Payload must be a dict or provide to_dict()")


class LearningClient:
"""Client for learning/training jobs.

Expand Down Expand Up @@ -154,6 +177,118 @@ async def poll_until_terminal(
if max_seconds is not None and elapsed >= max_seconds:
raise TimeoutError(f"Polling timed out after {elapsed} seconds for job {job_id}")

async def create_or_update_lever(
self,
optimizer_id: str,
lever: dict[str, Any] | LeverHandle,
) -> LeverHandle:
"""Create or update a lever handle for an optimizer.

See: specifications/tanha/future/sensors_and_levers.txt
"""
payload = _payload_to_dict(lever)
url = f"/api/v1/optimizers/{optimizer_id}/levers"
async with RustCoreHttpClient(self._base_url, self._api_key, timeout=self._timeout) as http:
js = await http.post_json(url, json=payload)
if not isinstance(js, dict):
raise HTTPError(
status=500,
url=url,
message="invalid_lever_response",
body_snippet=str(js)[:200],
)
return LeverHandle.from_dict(js)

async def resolve_lever(
self,
optimizer_id: str,
lever_id: str,
*,
scope: list[Any] | None = None,
snapshot: bool = True,
) -> LeverHandle | None:
"""Resolve a lever snapshot for the optimizer's scope.

See: specifications/tanha/future/sensors_and_levers.txt
"""
params: dict[str, str] = {"lever_id": lever_id}
if snapshot:
params["snapshot"] = "true"
scope_payload = _normalize_scope_payload(scope)
if scope_payload:
params["scope"] = json.dumps(scope_payload)
url = f"/api/v1/optimizers/{optimizer_id}/levers"
async with RustCoreHttpClient(self._base_url, self._api_key, timeout=self._timeout) as http:
js = await http.get(url, params=params if params else None)
if isinstance(js, dict):
return LeverHandle.from_dict(js)
return None

async def emit_sensor_frame(
self,
optimizer_id: str,
frame: dict[str, Any] | SensorFrame,
) -> SensorFrame:
"""Emit a sensor frame payload for the optimizer.

See: specifications/tanha/future/sensors_and_levers.txt
"""
payload = _payload_to_dict(frame)
url = f"/api/v1/optimizers/{optimizer_id}/sensors"
async with RustCoreHttpClient(self._base_url, self._api_key, timeout=self._timeout) as http:
js = await http.post_json(url, json=payload)
if not isinstance(js, dict):
raise HTTPError(
status=500,
url=url,
message="invalid_sensor_frame_response",
body_snippet=str(js)[:200],
)
return SensorFrame.from_dict(js)

async def list_sensor_frames(
self,
optimizer_id: str,
*,
scope: list[Any] | None = None,
limit: int | None = None,
) -> list[SensorFrame]:
"""List sensor frames emitted for the optimizer scope.

See: specifications/tanha/future/sensors_and_levers.txt
"""
params: dict[str, str] = {}
scope_payload = _normalize_scope_payload(scope)
if scope_payload:
params["scope"] = json.dumps(scope_payload)
if limit is not None:
params["limit"] = str(limit)
url = f"/api/v1/optimizers/{optimizer_id}/sensors"
async with RustCoreHttpClient(self._base_url, self._api_key, timeout=self._timeout) as http:
js = await http.get(url, params=params if params else None)
payloads: list[Any] = []
if isinstance(js, list):
payloads = js
elif isinstance(js, dict):
candidates = (
js.get("items")
or js.get("sensor_frames")
or js.get("frames")
or js.get("data")
or []
)
Comment on lines +273 to +279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 or-chain in list_sensor_frames treats empty list [] as falsy, falling through to wrong response key

In list_sensor_frames, the or-chain js.get("items") or js.get("sensor_frames") or js.get("frames") or js.get("data") or [] uses Python's or operator to select the first truthy value. An empty list [] is falsy in Python, so if the correct response key (e.g. "items") contains an empty list, it will be skipped in favor of a later key.

Root Cause and Impact

Consider an API response like {"items": [], "data": [{...}]}. The intended key is "items" (empty — no frames), but:

  • js.get("items")[] (falsy)
  • [] or js.get("sensor_frames")None (falsy)
  • None or js.get("frames")None (falsy)
  • None or js.get("data")[{...}] (truthy)
  • candidates = [{...}] (wrong — should be [])

This means the method would return phantom sensor frames from an unrelated key when the correct key has an empty list.

Impact: Callers could receive incorrect/unexpected sensor frames when the actual result set is empty but the response dict contains other list-valued keys.

Suggested change
candidates = (
js.get("items")
or js.get("sensor_frames")
or js.get("frames")
or js.get("data")
or []
)
candidates = None
for _key in ("items", "sensor_frames", "frames", "data"):
_val = js.get(_key)
if _val is not None:
candidates = _val
break
if candidates is None:
candidates = []
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

if isinstance(candidates, list):
payloads = candidates
frames: list[SensorFrame] = []
for entry in payloads:
if not isinstance(entry, dict):
continue
try:
frames.append(SensorFrame.from_dict(entry))
except ValueError:
continue
return frames

# --- Optional diagnostics ---
async def pricing_preflight(
self, *, job_type: str, gpu_type: str, estimated_seconds: float, container_count: int
Expand Down
122 changes: 119 additions & 3 deletions synth_ai/sdk/optimization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from enum import Enum
from typing import Any, Dict, Iterable, Optional

from synth_ai.core.levers import MiproLeverSummary
from synth_ai.core.sensors import SensorFrameSummary
from synth_ai.core.levers import LeverKind, ScopeKey, MiproLeverSummary
from synth_ai.core.sensors import Sensor as CoreSensor, SensorFrameSummary


def _first_present(data: Dict[str, Any], keys: Iterable[str]) -> Optional[Any]:
Expand Down Expand Up @@ -106,7 +106,123 @@ def _extract_system_prompt(
if result:
return result

return None
@dataclass(slots=True)
class LeverHandle:
"""SDK representation of a lever handle resolved by optimizer APIs.

See: specifications/tanha/future/sensors_and_levers.txt
"""

lever_id: str
kind: LeverKind
version: int
scope: list[ScopeKey] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)

@classmethod
def from_dict(cls, raw: Dict[str, Any]) -> "LeverHandle":
if not isinstance(raw, dict):
raise ValueError("LeverHandle data must be an object")
lever_id = raw.get("lever_id") or raw.get("id") or ""
kind_raw = raw.get("kind")
try:
kind = LeverKind(str(kind_raw)) if kind_raw is not None else LeverKind.CUSTOM
except ValueError:
kind = LeverKind.CUSTOM
version_raw = raw.get("version") or raw.get("lever_version")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 or operator treats version 0 as falsy, skipping valid version values

In LeverHandle.from_dict, the expression raw.get("version") or raw.get("lever_version") uses Python's or operator to fall back between two keys. However, or treats 0 as falsy, so a legitimate version value of 0 from the "version" key would be skipped in favor of "lever_version".

Root Cause and Impact

When the API returns {"version": 0, "lever_version": 3}, the expression evaluates as:

  • raw.get("version")0 (falsy)
  • 0 or raw.get("lever_version")3
  • version_raw = 3 (wrong — should be 0)

The correct approach is to use explicit is not None checks:

version_raw = raw.get("version")
if version_raw is None:
    version_raw = raw.get("lever_version")

Impact: Any lever with version 0 will have its version incorrectly read from the lever_version field (if present), or silently default to 0 only by coincidence if lever_version is also absent.

Suggested change
version_raw = raw.get("version") or raw.get("lever_version")
version_raw = raw.get("version")
if version_raw is None:
version_raw = raw.get("lever_version")
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

version = 0
if version_raw is not None:
try:
version = int(version_raw)
except (TypeError, ValueError):
version = 0
scope_raw = raw.get("scope") or []
scope: list[ScopeKey] = []
if isinstance(scope_raw, list):
for item in scope_raw:
if isinstance(item, dict):
scope.append(ScopeKey.from_dict(item))
metadata = raw.get("metadata") if isinstance(raw.get("metadata"), dict) else {}
return cls(
lever_id=str(lever_id),
kind=kind,
version=version,
scope=scope,
metadata=metadata,
)

def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"lever_id": self.lever_id,
"kind": self.kind.value,
"version": self.version,
"scope": [item.to_dict() for item in self.scope],
}
if self.metadata:
payload["metadata"] = self.metadata
return payload


@dataclass(slots=True)
class SensorFrame:
"""SDK representation of a sensor frame emitted by optimizer endpoints.

See: specifications/tanha/future/sensors_and_levers.txt
"""

scope: list[ScopeKey] = field(default_factory=list)
sensors: list[CoreSensor] = field(default_factory=list)
lever_versions: Dict[str, int] = field(default_factory=dict)
trace_ids: list[str] = field(default_factory=list)
frame_id: Optional[str] = None
metadata: dict[str, Any] = field(default_factory=dict)
created_at: Optional[str] = None

@classmethod
def from_dict(cls, raw: Dict[str, Any]) -> "SensorFrame":
if not isinstance(raw, dict):
raise ValueError("SensorFrame data must be an object")
scope_raw = raw.get("scope") or []
scope: list[ScopeKey] = []
if isinstance(scope_raw, list):
for item in scope_raw:
if isinstance(item, dict):
scope.append(ScopeKey.from_dict(item))
sensors_raw = raw.get("sensors") or []
sensors: list[CoreSensor] = []
if isinstance(sensors_raw, list):
for item in sensors_raw:
if isinstance(item, dict):
sensors.append(CoreSensor.from_dict(item))
lever_versions = _parse_lever_versions(raw.get("lever_versions"))
trace_ids_raw = raw.get("trace_ids") or []
trace_ids = [str(x) for x in trace_ids_raw if isinstance(x, (str, int))] if isinstance(trace_ids_raw, list) else []
frame_id = raw.get("frame_id") if isinstance(raw.get("frame_id"), str) else None
metadata = raw.get("metadata") if isinstance(raw.get("metadata"), dict) else {}
created_at = raw.get("created_at") if isinstance(raw.get("created_at"), str) else None
return cls(
scope=scope,
sensors=sensors,
lever_versions=lever_versions,
trace_ids=trace_ids,
frame_id=frame_id,
metadata=metadata,
created_at=created_at,
)

def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"scope": [item.to_dict() for item in self.scope],
"sensors": [sensor.to_dict() for sensor in self.sensors],
"lever_versions": self.lever_versions,
"trace_ids": self.trace_ids,
"metadata": self.metadata,
}
if self.frame_id is not None:
payload["frame_id"] = self.frame_id
if self.created_at is not None:
payload["created_at"] = self.created_at
return payload


class PolicyJobStatus(str, Enum):
Expand Down
Loading
Loading