Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/polaris/polaris/polaris/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .modules.registry import Registry
from .modules.runner import Runner
from .runtime import run
from .runtime import initialize, is_initialized, run

__all__ = ["run", "Registry", "Runner"]
__all__ = ["initialize", "is_initialized", "run", "Registry", "Runner"]
163 changes: 102 additions & 61 deletions packages/polaris/polaris/polaris/core/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""HTTP client with automatic retry and environment detection.

Provides adaptive HTTP client that works in both browser (Pyodide) and
server environments with built-in retry logic for transient failures.
"""

import asyncio
import json
import logging
from typing import Any, Awaitable, Callable, TypeVar

from .exceptions import HttpError

Expand All @@ -11,13 +18,81 @@
MAX_RETRIES = 3
INITIAL_BACKOFF = 1.0 # seconds

T = TypeVar("T")


async def _retry_request(
request_fn: Callable[[], Awaitable[tuple[int, str, T | None]]],
url: str,
method: str,
) -> T:
"""Execute HTTP request with retry logic for transient failures.

Args:
request_fn: Async function that returns (status_code, response_text, parsed_data).
parsed_data is None if request failed.
url: Request URL (for error context)
method: HTTP method (for error context)

Returns:
Parsed response data on success

Raises:
HttpError: On non-retryable error or after all retries exhausted
"""
last_error: HttpError | None = None

for attempt in range(MAX_RETRIES):
status, text, data = await request_fn()

# Success
if data is not None:
return data

# Non-retryable error
if status not in RETRY_STATUS_CODES:
raise HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

# Retryable error - store and possibly retry
last_error = HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

if attempt < MAX_RETRIES - 1:
backoff = INITIAL_BACKOFF * (2**attempt)
logger.warning(
f"HTTP {status}, retrying in {backoff}s (attempt {attempt + 1}/{MAX_RETRIES})"
)
await asyncio.sleep(backoff)

# All retries exhausted
if last_error is None:
# This should never happen since MAX_RETRIES >= 1
raise HttpError(
"Request failed with no error captured",
status_code=0,
details={"url": url, "method": method},
)
raise last_error


class HttpClient:
async def request(self, method, url, headers=None, body=None):
"""Base HTTP client interface."""

async def request(
self, method: str, url: str, headers: dict[str, str] | None = None, body: Any = None
) -> Any:
raise NotImplementedError


def is_pyodide():
def is_pyodide() -> bool:
"""Check if running in Pyodide (browser) environment."""
try:
import pyodide_js # type: ignore[import-not-found] # noqa: F401

Expand All @@ -26,8 +101,8 @@ def is_pyodide():
return False


# parse response without relying on content type
async def parse_response(response):
async def _parse_response(response: Any) -> Any:
"""Parse response without relying on content type."""
text = await response.text()
try:
return json.loads(text)
Expand All @@ -41,53 +116,36 @@ async def parse_response(response):


class BrowserHttpClient(HttpClient):
def __init__(self):
"""HTTP client for browser/Pyodide environment using fetch API."""

def __init__(self) -> None:
from js import fetch # type: ignore[import-not-found]
from pyodide.ffi import to_js # type: ignore[import-not-found]

self._fetch = fetch
self._to_js = to_js

async def request(self, method, url, headers=None, body=None):
async def request(
self, method: str, url: str, headers: dict[str, str] | None = None, body: Any = None
) -> Any:
headers = headers or {}
options = {
options: dict[str, Any] = {
"method": method.upper(),
"headers": headers,
}
if body is not None:
options["body"] = json.dumps(body)
headers.setdefault("Content-Type", "application/json")

last_error = None
for attempt in range(MAX_RETRIES):
async def do_request() -> tuple[int, str, Any | None]:
response = await self._fetch(url, self._to_js(options))
if response.ok:
return await parse_response(response)

status = response.status
data = await _parse_response(response)
return response.status, "", data
text = await response.text()
return response.status, text, None

if status not in RETRY_STATUS_CODES:
# Don't retry client errors (except 429)
raise HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

last_error = HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

if attempt < MAX_RETRIES - 1:
backoff = INITIAL_BACKOFF * (2**attempt)
logger.warning(f"HTTP {status}, retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES})")
await asyncio.sleep(backoff)

assert last_error is not None # Always set after at least one iteration
raise last_error
return await _retry_request(do_request, url, method)


# ----------------------------
Expand All @@ -96,20 +154,23 @@ async def request(self, method, url, headers=None, body=None):


class ServerHttpClient(HttpClient):
def __init__(self):
"""HTTP client for server environment using aiohttp."""

def __init__(self) -> None:
import aiohttp

self._aiohttp = aiohttp

async def request(self, method, url, headers=None, body=None):
async def request(
self, method: str, url: str, headers: dict[str, str] | None = None, body: Any = None
) -> Any:
data = None
if body is not None:
data = json.dumps(body)
headers = headers or {}
headers.setdefault("Content-Type", "application/json")

last_error = None
for attempt in range(MAX_RETRIES):
async def do_request() -> tuple[int, str, Any | None]:
async with self._aiohttp.ClientSession() as session:
async with session.request(
method=method.upper(),
Expand All @@ -118,32 +179,12 @@ async def request(self, method, url, headers=None, body=None):
data=data,
) as response:
if response.status < 400:
return await parse_response(response)

status = response.status
parsed = await _parse_response(response)
return response.status, "", parsed
text = await response.text()
return response.status, text, None

if status not in RETRY_STATUS_CODES:
# Don't retry client errors (except 429)
raise HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

last_error = HttpError(
f"HTTP {status}: {text}",
status_code=status,
details={"url": url, "method": method},
)

if attempt < MAX_RETRIES - 1:
backoff = INITIAL_BACKOFF * (2**attempt)
logger.warning(f"HTTP {status}, retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES})")
await asyncio.sleep(backoff)

assert last_error is not None # Always set after at least one iteration
raise last_error
return await _retry_request(do_request, url, method)


# ----------------------------
Expand Down
10 changes: 9 additions & 1 deletion packages/polaris/polaris/polaris/core/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ def _refill(self) -> None:

@property
def available_tokens(self) -> float:
"""Return the current number of available tokens (approximate)."""
"""Return approximate token count for monitoring/logging.

Note: This returns the cached token count without refilling or locking.
The value may be stale by up to (1/rate) seconds. This is intentional
to avoid blocking on a property access.

For accurate token acquisition, always use `acquire()` which properly
handles locking and refilling.
"""
return self.tokens

@classmethod
Expand Down
5 changes: 4 additions & 1 deletion packages/polaris/polaris/polaris/core/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,8 @@ async def retry_async(
on_retry(e, attempt + 1, backoff)
await asyncio.sleep(backoff)

assert last_error is not None
# last_error is guaranteed to be set after at least one iteration
# since max_retries >= 1 and we only reach here if all attempts failed
if last_error is None:
raise RuntimeError("Retry loop completed without capturing an error")
raise last_error
6 changes: 6 additions & 0 deletions packages/polaris/polaris/polaris/modules/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class NodeType(str, Enum):
CONTROL = "control"
EXECUTOR = "executor"
LOOP = "loop"
MATERIALIZER = "materializer"
PLANNER = "planner"
REASONING = "reasoning"
TERMINAL = "terminal"
Expand Down Expand Up @@ -41,6 +42,11 @@ class ErrorCode(str, Enum):
UNKNOWN_NODE_TYPE = "unknown_node_type"
LOOP_INVALID_OVER = "loop_invalid_over"
LOOP_ITERATION_FAILED = "loop_iteration_failed"
MATERIALIZER_FAILED = "materializer_failed"
MATERIALIZER_NOT_FOUND = "materializer_not_found"
MATERIALIZER_INVALID_ARGS = "materializer_invalid_args"
PLANNER_INVALID_JSON = "planner_invalid_json"
PLANNER_SCHEMA_VALIDATION_FAILED = "planner_schema_validation_failed"
TRAVERSE_INVALID_CONFIG = "traverse_invalid_config"
TRAVERSE_FETCH_FAILED = "traverse_fetch_failed"

Expand Down
3 changes: 3 additions & 0 deletions packages/polaris/polaris/polaris/modules/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .control import ControlHandler
from .executor import ExecutorHandler
from .loop import LoopHandler
from .materializer import MaterializerHandler
from .planner import PlannerHandler
from .reasoning import ReasoningHandler
from .terminal import TerminalHandler
Expand All @@ -18,6 +19,7 @@
NodeType.CONTROL: ControlHandler,
NodeType.EXECUTOR: ExecutorHandler,
NodeType.LOOP: LoopHandler,
NodeType.MATERIALIZER: MaterializerHandler,
NodeType.PLANNER: PlannerHandler,
NodeType.REASONING: ReasoningHandler,
NodeType.TERMINAL: TerminalHandler,
Expand Down Expand Up @@ -57,6 +59,7 @@ def get_handler(node_type: str) -> NodeHandler | None:
"ControlHandler",
"ExecutorHandler",
"LoopHandler",
"MaterializerHandler",
"PlannerHandler",
"ReasoningHandler",
"TerminalHandler",
Expand Down
Loading