Skip to content

refactor: unify Python SDK public API and align cross-SDK patterns#1310

Merged
ytallo merged 7 commits intomainfrom
refactor/python-sdk
Mar 14, 2026
Merged

refactor: unify Python SDK public API and align cross-SDK patterns#1310
ytallo merged 7 commits intomainfrom
refactor/python-sdk

Conversation

@ytallo
Copy link
Contributor

@ytallo ytallo commented Mar 14, 2026

Summary

Refactors the Python SDK public API to match Node.js SDK conventions and adds missing features for cross-SDK parity.

Breaking Changes

  • connect() removed from public API — register_worker() now blocks until connected
  • III class removed from __init__.py exports — use register_worker() instead
  • Logger constructor: function_name parameter renamed to service_name

New Features

  • 6 types added to public exports: ReconnectionConfig, TelemetryOptions, OtelConfig, TriggerActionEnqueue, TriggerActionVoid, FunctionInfo
  • New worker metrics: cpu.user_micros, cpu.system_micros, memory.vms, memory.heap_used, memory.heap_total, memory.external, uptime_seconds
  • safe_stringify utility for complex object serialization
  • StreamContext type for stream operations

Other

  • Motia framework updated to match new SDK patterns
  • Engine: minor fixes to config, module, and stream modules
  • All 134 Python SDK tests pass

Test plan

  • cd sdk/packages/python/iii && python -m pytest tests/ -x — 134 tests pass
  • Verify motia framework tests still pass
  • Verify engine builds with cargo build

Summary by CodeRabbit

  • Refactor

    • Python SDK and Motia framework moved from async to synchronous APIs; handlers are now regular functions and registration/trigger calls expect descriptor objects.
    • Connection lifecycle improved with explicit connect/shutdown and background connection handling.
  • Documentation

    • SDK README and examples updated to reflect the new sync APIs and descriptor-based registration.
  • Tests

    • Test suite converted to synchronous flow and updated to the new API shapes.
  • Chores

    • CI/test coverage fail-under thresholds removed.

@vercel
Copy link
Contributor

vercel bot commented Mar 14, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
iii-docs Ready Ready Preview, Comment Mar 14, 2026 6:07pm
iii-website Ready Ready Preview, Comment Mar 14, 2026 6:07pm
motia-docs Ready Ready Preview, Comment Mar 14, 2026 6:07pm

Request Review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 14, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors the Python SDK and Motia runtime from async-first to sync-first: introduces a background event-loop + register_worker/IIIClient sync API, converts many async handlers/tests to synchronous, standardizes register_function/register_trigger to dict-based inputs, and adds minor protocol and CI workflow edits.

Changes

Cohort / File(s) Summary
Benchmarks & Engine
engine/benches/protocol_serde_bench.rs, .github/workflows/bench-release.yml
Added stacktrace: None to ErrorBody in invocation errors; changed bench selection flag to --bench '*'.
Core SDK (sync client)
sdk/packages/python/iii/src/iii/iii.py, sdk/packages/python/iii/src/iii/iii_constants.py, sdk/packages/python/iii/src/iii/iii_types.py, sdk/packages/python/iii/src/iii/types.py
Introduced register_worker/IIIClient, background thread + event loop, sync wrappers (trigger/list_), new input types (Register), and reworked public exports/types.
SDK Logging / Utils / Telemetry
sdk/packages/python/iii/src/iii/logger.py, sdk/packages/python/iii/src/iii/utils.py, sdk/packages/python/iii/src/iii/telemetry*.py, sdk/packages/python/iii/src/iii/telemetry_exporters.py
Logger constructor/fields changed (service_name/trace/span), added safe_stringify, telemetry/metrics wiring and exporter signature tweaks, expanded gauges/worker metrics.
Channels & Streams
sdk/packages/python/iii/src/iii/channels.py, sdk/packages/python/iii/src/iii/stream.py
Adjusted channel end/write sequencing; added StreamContext alias and tightened StreamSetResult.new_value typing.
III examples / helpers
sdk/packages/python/iii-example/src/*
Updated to IIIClient types, dict-based register_function/register_trigger calls, and Logger usage.
Motia runtime & API surface
frameworks/motia/motia-py/packages/motia/src/motia/iii.py, runtime.py, types.py, multi_trigger.py
Switched internal III → IIIClient, normalized handler results (await if coroutine), unified dict-based registration payloads, and updated FlowContext.match to handle sync/async handlers.
Motia sync conversions (state/streams/enqueue/cli)
.../state.py, .../streams.py, .../enqueue.py, .../stream_client.py, cli.py
Converted many async APIs to synchronous (trigger instead of await call), replaced asyncio loops with threading.Event + signal handling, and adjusted tracing metadata.
Playground steps & examples
frameworks/motia/.../playground/steps/**/*.py
Converted ~30 example handlers from async to sync and removed awaits for stream/state/enqueue calls.
Tests: Motia & SDK
frameworks/motia/.../tests/*, sdk/packages/python/iii/tests/*
Comprehensive async→sync migration across tests: fixtures, handlers, mocks (AsyncMock→MagicMock), registration/trigger shapes updated to dict payloads, new sync test modules added.
Docs & README
sdk/README.md, sdk/packages/python/iii/README.md
Updated examples/docs to reflect register_worker, dict-based register_function/register_trigger, sync trigger usage, and Logger changes.
CI / Build
Makefile, .github/workflows/ci.yml, sdk/packages/python/iii/pyproject.toml
Removed pytest coverage fail-under thresholds; relaxed coverage enforcement.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Caller (sync)
    participant III as IIIClient (sync wrapper)
    participant BG as Background Thread / EventLoop
    participant Engine as Engine (ws)

    Client->>III: register_worker(address)
    III->>BG: start background loop & _do_connect()
    BG->>Engine: websocket connect
    Engine-->>BG: connected
    Client->>III: _wait_until_connected()
    III-->>Client: ready

    Client->>III: register_function({"id":...}, handler)
    III->>BG: schedule async register
    BG->>Engine: send RegisterFunctionMessage

    Client->>III: trigger({"function_id":...,"payload":...})
    III->>BG: schedule _async_trigger
    BG->>Engine: send TriggerRequest
    Engine-->>BG: TriggerResponse
    BG-->>III: return result (sync)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • guibeira

Poem

🐰 "Threads hum softly where coroutines slept,

Dicts now carry what once was kept,
Registered workers in a tidy row,
Triggers fire fast, no await to slow.
Rabbits clap paws — sync flows onward we go!"

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/python-sdk

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
engine/examples/python/echo_invoker.py (1)

15-59: ⚠️ Potential issue | 🔴 Critical

File is non-runnable: await statements in non-async function and missing main() definition.

The parse_args() function at line 15 is defined as a regular function (def, not async def), yet contains await statements at lines 37, 51, and 55. Line 59 calls asyncio.run(main()) but the main() function is never defined in the file.

Additionally, the file has missing imports (argparse used at line 16) and undefined symbols (bridge used at lines 23–37, 55; log used throughout; variable BRIDGE_URL at line 36 is undefined—should be DEFAULT_BRIDGE_URL).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/examples/python/echo_invoker.py` around lines 15 - 59, The file mixes
async awaits inside a synchronous parse_args() and calls an undefined main();
fix by making parse_args() only parse/return args (remove any awaits and trigger
registrations), then add an async def main() that: imports/creates or receives
the bridge and log instances used, registers triggers via
bridge.register_trigger(...), uses args.bridge_url or DEFAULT_BRIDGE_URL
(replace BRIDGE_URL), awaits bridge.connect() and bridge.disconnect(), and runs
the serve loop; ensure asyncio, argparse, and logging (or whatever module
provides log) are imported and any bridge symbol is properly instantiated or
imported before use; finally keep the existing asyncio.run(main()) call.
frameworks/motia/motia-py/playground/steps/multi_trigger/triple_trigger_step.py (1)

43-49: ⚠️ Potential issue | 🔴 Critical

Convert handler to async and await ctx.match(), or make ctx.match() synchronous.

FlowContext.match() is an async method (line 79 in types.py), but the handler function calls it without await at line 43. This returns a coroutine object instead of the actual result. Either make handler async and add await, or refactor ctx.match() to be synchronous.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@frameworks/motia/motia-py/playground/steps/multi_trigger/triple_trigger_step.py`
around lines 43 - 49, The handler currently calls the async method ctx.match()
without awaiting it; change the handler function (the function named handler in
triple_trigger_step.py) to be async and add await before ctx.match(...) so it
awaits the coroutine result (i.e., make the signature async def handler(...) and
use await ctx.match({...})); ensure any places invoking handler can handle an
async function (or adapt them to await it).
frameworks/motia/motia-py/playground/steps/conditions/mixed_triggers_step.py (1)

48-53: ⚠️ Potential issue | 🔴 Critical

Missing await for async ctx.match() call.

FlowContext.match() is still defined as async def match(...) in types.py (line 79). Calling it without await returns a coroutine object instead of the actual result. The handler needs to remain async, or match() needs to be converted to sync.

🐛 Proposed fix to await the async match call
-def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
+async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
     """Dispatch to handler based on trigger type."""

-    def _event_handler(input: Any) -> None:
+    async def _event_handler(input: Any) -> None:
         logger.info("Processing order (event)", {"data": input, "topic": ctx.trigger.topic})

-    def _api_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
+    async def _api_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
         logger.info("Processing order (api)", {"path": ctx.trigger.path, "method": ctx.trigger.method})
         return ApiResponse(status=200, body={"message": "Order processed via API"})

-    return ctx.match(
+    return await ctx.match(
         {
             "queue": _event_handler,
             "http": _api_handler,
         },
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/playground/steps/conditions/mixed_triggers_step.py`
around lines 48 - 53, The call to ctx.match is invoking an async method
(FlowContext.match) but is missing await; update the handler containing the
return statement so it remains async and change the return to await
ctx.match({...}) so the coroutine is awaited and the actual result from the
match call is returned; ensure surrounding function signature stays async and
that _event_handler and _api_handler remain usable as match targets.
frameworks/motia/motia-py/packages/motia/src/motia/runtime.py (1)

557-569: ⚠️ Potential issue | 🟡 Minor

Document the breaking change to initialize() signature in migration notes.

The method signature changed from async def initialize(self) to def initialize(self) in the recent refactoring (commit 8eaaa06). This is a breaking change for any code calling await motia.initialize(). While the MIGRATION_GUIDE.md covers Python runtime changes, it does not explicitly document this signature change. Add a note in the Python migration section to alert developers about this async-to-sync transition.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py` around lines
557 - 569, Add a clear note to the Python migration section of
MIGRATION_GUIDE.md stating that Runtime.initialize changed from async to sync
(async def initialize(self) -> def initialize(self)), explaining that callers
must stop awaiting motia.initialize() and invoke it synchronously (or wrap in
asyncio.run if needed); reference the Runtime.initialize method in motia.runtime
and include the commit/PR id or short rationale for the breaking change so users
can update call sites.
🧹 Nitpick comments (19)
sdk/packages/python/iii/tests/test_utils.py (1)

35-50: Add a regression case for non-TypeError/ValueError __str__ failures.

Given this utility’s “safe” intent, add a RuntimeError-raising __str__ test so future changes don’t reintroduce crashy edge cases.

Suggested test addition
+def test_safe_stringify_runtime_error_returns_unserializable() -> None:
+    class BadRuntime:
+        def __str__(self) -> str:
+            raise RuntimeError("runtime failure")
+
+    result = safe_stringify(BadRuntime())
+    assert result == "[unserializable]"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_utils.py` around lines 35 - 50, Add a
regression test mirroring the existing tests for safe_stringify: create a new
test function (e.g., test_safe_stringify_runtime_error_returns_unserializable)
that defines a class whose __str__ raises RuntimeError and then calls
safe_stringify(instance) and asserts the result equals "[unserializable]";
locate this alongside the existing tests for BadRepr/BadValue in test_utils.py
to ensure safe_stringify handles non-TypeError/ValueError exceptions from
__str__ without raising.
sdk/packages/python/iii/tests/test_data_channels.py (1)

38-50: Consider using public API methods instead of internal _async_* methods.

The test directly calls _async_create_channel() and _async_trigger() which are internal methods (indicated by the underscore prefix). If these are meant to be tested, consider either:

  1. Using the public sync wrappers if available (create_channel(), trigger())
  2. Documenting why internal async methods are explicitly needed in this test context

This improves test maintainability since internal APIs may change without notice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_data_channels.py` around lines 38 - 50,
The test is calling internal async helpers _async_create_channel() and
_async_trigger(); replace these with the public API equivalents (e.g.,
create_channel() or create_channel_async wrapper and trigger() or trigger_async)
where available, or add a short note in the test explaining why the internals
are required if no public async wrapper exists. Update references to
iii_client._async_create_channel and iii_client._async_trigger to use
iii_client.create_channel / iii_client.trigger (or their async public wrappers)
so the test exercises the supported API surface, or add a comment documenting
the necessity of the internal calls if changing them is not feasible.
sdk/packages/python/iii/tests/test_sync_api.py (2)

16-31: Consider extracting FakeWebSocket to a shared test utilities module.

FakeWebSocket is duplicated between this file and test_iii_registration_dedup.py. Consider extracting it to a shared conftest.py or test utilities module to reduce duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_sync_api.py` around lines 16 - 31, Extract
the FakeWebSocket test helper into a shared test utilities module (e.g.,
tests/utils.py or conftest.py) and replace the duplicate class definitions in
both test_sync_api.py and test_iii_registration_dedup.py with an import;
specifically move the class FakeWebSocket (including its sent attribute, state
initialization, async send, async close, __aiter__ and __anext__
implementations) to the shared module, update both test files to import
FakeWebSocket from that module, and remove the local duplicate definitions so
tests use the single shared implementation.

89-98: Thread termination test may be racy.

The test checks thread.is_alive() immediately after shutdown(). Depending on thread scheduling, there could be a small window where the thread hasn't fully stopped yet. Consider adding a small tolerance or using thread.join(timeout=...) to ensure deterministic behavior.

🔧 More robust thread termination check
     thread = client._thread
     client.shutdown()
 
-    assert not thread.is_alive()
+    thread.join(timeout=1.0)  # Wait up to 1 second for thread to stop
+    assert not thread.is_alive(), "Background thread did not stop within timeout"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_sync_api.py` around lines 89 - 98, The
test test_background_thread_stops_on_shutdown is racy because it asserts
thread.is_alive() immediately after calling III.shutdown(); change it to wait
deterministically for the thread to stop by calling
client._thread.join(timeout=...) (or loop with small sleeps up to a short
deadline) after client.shutdown(), then assert not client._thread.is_alive() (or
that join returned/that the thread terminated within the timeout) so the test
tolerates scheduling delays and becomes deterministic.
sdk/packages/python/iii/tests/test_iii_registration_dedup.py (1)

81-92: Consider using slightly longer sleep times for CI stability.

The time.sleep(0.05) (50ms) waits might be too short on slow CI runners. Consider using a more robust synchronization mechanism or slightly longer timeouts (e.g., 0.1s) to reduce flakiness.

🔧 Optional: Increase sleep times for CI stability
-    time.sleep(0.05)
+    time.sleep(0.1)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_iii_registration_dedup.py` around lines 81
- 92, Replace the two short time.sleep(0.05) calls surrounding the register flow
with slightly longer waits to improve CI stability: change them to
time.sleep(0.1) (or a configurable constant) in the test that defines handler
and calls client.register_function and client.register_trigger before
client.shutdown; alternatively, if the client exposes a ready/connected event or
awaitable (e.g., a connect/ready future), use that instead of sleeping to wait
for auto-connect completion and for registration to propagate.
frameworks/motia/motia-py/packages/motia/src/motia/runtime.py (1)

585-591: Inconsistent return handling in join_handler.

The join_handler awaits the result if it's a coroutine but the return statement at line 588 is inside the if config and config.on_join: block. If the condition is false, the function returns None implicitly, which is fine. However, compare this to line 608-610 in leave_handler where the result is awaited but never returned. This inconsistency may be intentional (leave handlers might not need to return a value), but worth confirming.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py` around lines
585 - 591, The join_handler currently returns the awaited result of
config.on_join but leave_handler awaits config.on_leave without returning it,
causing inconsistent behavior; update leave_handler (the handler registered via
get_instance().register_function and get_instance().register_trigger for
stream:leave) to return the awaited result just like join_handler (i.e., capture
result = config.on_leave(subscription, context, auth_context), await if
coroutine, then return result) so both handlers have consistent return semantics
for config.on_* handlers.
sdk/packages/python/iii/src/iii/stream.py (1)

28-30: Consider stronger typing for StreamContext.

StreamContext = Any provides no type safety. If the context structure is known, consider using a TypedDict or a protocol. If it's intentionally untyped, this is acceptable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/stream.py` around lines 28 - 30,
StreamContext is currently declared as Any which removes type safety; replace it
with a more specific type (e.g., a TypedDict or a Protocol) that matches the
actual context shape used by the stream code, or at minimum document why it must
remain Any. Update the StreamContext symbol to reference the new
TypedDict/Protocol name (or an appropriate Union of known context types) and
ensure any functions/methods that accept or return StreamContext (e.g., stream
handlers) use the refined type so type checkers can validate structure.
frameworks/motia/motia-py/playground/steps/state_example/update_state_step.py (1)

26-27: Consider rejecting empty id values explicitly.

Line 26 currently accepts id=""/None values and proceeds to write state. A strict non-empty check makes the endpoint more defensive.

🔧 Optional hardening
     user_id = request.path_params.get("id")
+    if not user_id:
+        return {
+            "status": 400,
+            "body": {"error": "Path parameter 'id' must be non-empty"},
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@frameworks/motia/motia-py/playground/steps/state_example/update_state_step.py`
around lines 26 - 27, The handler in update_state_step.py retrieves user_id via
request.path_params.get("id") but doesn’t reject empty values; add a defensive
check after obtaining user_id (the variable user_id in update_state_step.py /
the update state handler) to ensure it is non-empty (not None and not an empty
string) and return a client error (e.g., raise HTTPException or return a 400
response with a clear message) when validation fails, before proceeding to write
state.
frameworks/motia/motia-py/packages/motia/src/motia/iii.py (1)

13-13: Prefer public SDK exports over internal module imports.

Line 13 imports InitOptions and TelemetryOptions from iii.iii_constants, which couples Motia to SDK internals. These classes are publicly exported from the iii package and should be imported from there instead.

♻️ Suggested change
-from iii.iii_constants import InitOptions, TelemetryOptions
+from iii import InitOptions, TelemetryOptions
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/iii.py` at line 13, The
import in iii.py pulls InitOptions and TelemetryOptions from the internal module
iii.iii_constants; update the import to use the public exports from the iii
package instead by importing InitOptions and TelemetryOptions from the top-level
iii module (replace the iii.iii_constants import with the public iii export) so
the code depends on the public SDK API rather than internal modules.
frameworks/motia/motia-py/playground/steps/todo/update_todo_step.py (1)

37-42: Return updated_todo explicitly instead of the setter result.

Line 38 makes the 200 response depend on whatever todo_stream.set() returns, even though Line 37 already computed the payload this endpoint wants to expose. If set() returns None or metadata, clients will get the wrong body.

♻️ Suggested fix
-    todo = todo_stream.set("inbox", todo_id, updated_todo)
+    todo_stream.set("inbox", todo_id, updated_todo)

     logger.info("Todo updated successfully", {"todo_id": todo_id})

-    return ApiResponse(status=200, body=todo)
+    return ApiResponse(status=200, body=updated_todo)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/playground/steps/todo/update_todo_step.py` around
lines 37 - 42, The response body is using the return value of todo_stream.set
(assigned to variable todo) which may be None or metadata; instead return the
computed payload updated_todo. Update the ApiResponse construction in the
function that performs the update so it uses updated_todo as the body rather
than the todo variable (leave the call to todo_stream.set(...) for persistence
but don't rely on its return value for the response).
frameworks/motia/motia-py/packages/motia/tests/test_queue_integration.py (1)

24-29: Replace the fixed sleeps with deterministic waits.

The new 0.5 / 1.5 / 2.0 second delays make these integration tests both slow and timing-dependent. This file already imports flush_bridge_queue and wait_for_registration; using those helpers, or a bounded poll on received, will be much less flaky on busy CI.

Also applies to: 46-50, 80-84, 108-113, 148-154

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_queue_integration.py`
around lines 24 - 29, Replace fixed time.sleep calls with deterministic waits:
after flush_bridge_queue(bridge) and after bridge.trigger(...) use
wait_for_registration(bridge, topic) or poll the shared received list with a
bounded timeout/assertion instead of sleeping; update each occurrence around
bridge.trigger/enqueue (e.g., the blocks that currently sleep 0.5/1.5/2.0
seconds) to call wait_for_registration(bridge, topic) to wait for the consumer
to be ready and then use a small loop that checks whether received contains the
expected message with a timeout (raise/assert on timeout) so tests become fast
and deterministic.
frameworks/motia/motia-py/packages/motia/tests/test_cli.py (1)

170-172: Assert the new connect() / shutdown() lifecycle in these tests.

Both tests stub the lifecycle methods but never verify them, so the new cleanup path can regress without failing the suite.

🧪 Suggested assertion pattern
-    monkeypatch.setattr("motia.iii.get_instance", lambda: SimpleNamespace(connect=MagicMock(), shutdown=MagicMock()))
+    iii = SimpleNamespace(connect=MagicMock(), shutdown=MagicMock())
+    monkeypatch.setattr("motia.iii.get_instance", lambda: iii)
     monkeypatch.setattr(cli.threading, "Event", lambda: SimpleNamespace(wait=MagicMock(), set=MagicMock()))
     monkeypatch.setattr(sys, "argv", ["motia", "run"])

     cli.main()

+    iii.connect.assert_called_once()
+    iii.shutdown.assert_called_once()
     assert loaded_streams == ["streams/demo_stream.py"]
     assert loaded_steps == ["steps/demo_step.py"]

Apply the same pattern to the dev --watch test below.

Also applies to: 216-219

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_cli.py` around lines 170
- 172, The tests stub motia.iii.get_instance to return a SimpleNamespace with
connect and shutdown MagicMocks but never assert they were invoked; update both
the "run" test and the "dev --watch" test to assert the lifecycle by checking
that the returned instance's connect MagicMock was called (e.g., assert
instance.connect.called or assert_called_once()) and that shutdown was called
during cleanup (assert instance.shutdown.called or assert_called_once()); locate
the SimpleNamespace returned by the monkeypatched get_instance in the test and
use that object to perform the assertions after invoking the CLI via cli.main or
similar so both connect() and shutdown() are verified.
frameworks/motia/motia-py/packages/motia/tests/test_pubsub.py (1)

40-42: Reduce fixed sleeps to improve test stability.

Line [41], Line [92], and Line [109] use fixed delays that can still flake on slower CI runners. Prefer a bounded polling wait (or event/queue condition) for registration/readiness instead of hard-coded sleeps.

Also applies to: 91-93, 109-109

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_pubsub.py` around lines
40 - 42, The test uses fixed time.sleep calls after flush_bridge_queue(bridge)
(and similar sleeps around lines using the bridge/registration readiness) which
can flake on slow CI; replace these sleeps with a bounded polling loop that
checks the actual readiness condition (e.g., bridge queue is empty, a
registration flag is set, or the expected message appears) using
time.monotonic() with a short interval and a clear timeout; update occurrences
around flush_bridge_queue(bridge) and the other sleeps referenced (the test
functions interacting with bridge/registration) to poll the concrete condition
instead of sleeping to ensure deterministic waits and timely failures.
sdk/packages/python/iii/tests/test_context_propagation.py (1)

62-82: Guarantee client cleanup with finally.

client.shutdown() at Line [81] and Line [105] should be in finally blocks so failures in the test body don’t leak client/loop resources into later tests.

Also applies to: 92-105

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_context_propagation.py` around lines 62 -
82, Wrap usage of the III client in a try/finally so client.shutdown() is always
called: for the blocks that call client._run_on_loop(... _handle_invoke(...))
and the later block (lines 92-105), surround the test body that uses the III
instance with try: ... finally: client.shutdown(), ensuring client.shutdown() is
invoked even if assertions or mocked calls raise; locate the III(...)
instantiation and the existing client.shutdown() calls and replace them with
this try/finally pattern around the code that uses client and its methods
(_run_on_loop, _handle_invoke).
sdk/packages/python/iii/tests/test_state.py (1)

18-28: The _poll helper appears to be unused.

The _poll helper function is defined but not called anywhere in this test file. Consider removing it to reduce dead code.

Proposed removal
-async def _poll(check, *, retries: int = 100, delay: float = 0.1):
-    """Retry an assertion up to `retries` times with `delay` between attempts."""
-    for attempt in range(retries):
-        try:
-            check()
-            return
-        except (AssertionError, Exception):
-            if attempt == retries - 1:
-                raise
-            await asyncio.sleep(delay)
-
-
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_state.py` around lines 18 - 28, The helper
function `_poll` is not used anywhere in the test file; delete the entire
`_poll` async function definition to remove dead code, and after removing it,
also remove any now-unused imports (e.g., `asyncio`) that were only required for
`_poll`; ensure no other references to `_poll` remain in tests before
committing.
sdk/packages/python/iii/tests/test_register_function_args.py (2)

79-82: Consider increasing the sleep duration for reliability.

The 50ms sleep may be insufficient for reliable connection establishment, especially under CI load. This could lead to flaky tests.

Proposed fix
 def _make_client() -> III:
     client = III("ws://fake", InitOptions())
-    time.sleep(0.05)
+    time.sleep(0.1)
     return client
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_register_function_args.py` around lines 79
- 82, The short sleep in the test helper _make_client (calling III("ws://fake",
InitOptions()) then time.sleep(0.05)) can be flaky under CI; increase the sleep
duration to a more reliable value (e.g., 200–500ms) so the III client has time
to establish its connection before returning, or replace the fixed sleep with a
simple readiness loop that polls the client's ready/connected state instead of
sleeping 50ms.

103-104: Handlers defined as async def but PR migrates to sync-first.

Per the PR objectives, the Python SDK is moving to a sync-first execution model. For consistency with the new patterns demonstrated in other test files (e.g., test_queue_integration.py), consider using sync handlers (def) instead of async def.

Example fix for line 103
-    async def handler(data: Any) -> Any:
+    def handler(data: Any) -> Any:
         return data

Also applies to: 135-136, 164-165

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_register_function_args.py` around lines
103 - 104, Change the async handlers to sync handlers to match the sync-first
model: replace "async def handler(data: Any) -> Any:" with "def handler(data:
Any) -> Any:" in this test (and the other two occurrences noted), ensure the
function body remains the same and any test invocation uses the sync call
pattern used elsewhere (no await). Target the "handler" definitions in
test_register_function_args.py and update all three async handler declarations
to plain def.
sdk/packages/python/iii/src/iii/logger.py (1)

67-79: Consider adding validation for hex trace/span IDs.

If trace_id or span_id are provided but are not valid hexadecimal strings, int(..., 16) will raise a ValueError. This could cause unexpected failures. Consider catching the exception or validating the input.

Proposed defensive handling
             if self._trace_id is not None:
-                trace_id = int(self._trace_id, 16)
+                try:
+                    trace_id = int(self._trace_id, 16)
+                except ValueError:
+                    trace_id = 0
             elif span_ctx.is_valid:
                 trace_id = span_ctx.trace_id
             else:
                 trace_id = 0

             if self._span_id is not None:
-                span_id = int(self._span_id, 16)
+                try:
+                    span_id = int(self._span_id, 16)
+                except ValueError:
+                    span_id = 0
             elif span_ctx.is_valid:
                 span_id = span_ctx.span_id
             else:
                 span_id = 0
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/logger.py` around lines 67 - 79, The code
converts self._trace_id/self._span_id with int(...,16) which will raise
ValueError for non-hex strings; update the block around self._trace_id /
self._span_id to validate or catch the conversion error: attempt to parse hex
(for self._trace_id and self._span_id) inside a try/except ValueError (or use a
regex/str.isalnum check), and on failure fall back to
span_ctx.trace_id/span_ctx.span_id when span_ctx.is_valid or to 0; also consider
emitting a warning/log when a provided ID is invalid so the issue is observable.
frameworks/motia/motia-py/packages/motia/tests/conftest.py (1)

16-17: Prefer register_worker() in fixture setup to avoid internal API coupling.

The fixture currently imports from iii.iii import III and calls .connect(). Since register_worker() is the public API entrypoint, use it instead to keep tests aligned with the supported SDK surface.

♻️ Suggested migration
-from iii.iii import III
+from iii import register_worker
@@
-    bridge_instance = III(TEST_ENGINE_URL)
-    bridge_instance.connect()
-    time.sleep(0.1)
+    bridge_instance = register_worker(TEST_ENGINE_URL)

Also applies to: lines 47-50

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 16 -
17, The test fixture currently imports and instantiates the internal III class
(from iii.iii import III) and calls .connect(); replace that with the public API
by importing register_worker and using register_worker(...) to create and
connect the worker in the fixture (use the returned worker object in place of
III instances), and make the same change for the other occurrence referenced
(lines 47-50); update any attributes/method calls to match the public worker
object returned by register_worker instead of the internal III class.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/cli.py`:
- Around line 121-130: The signal handlers set in the lifecycle blocks (the
auto-generated index block around the iii.connect()/iii.shutdown() sequence and
the CLI commands run and dev) overwrite process-global handlers and are not
restored, breaking subsequent invocations; modify each block to capture the
previous handlers with signal.getsignal(signal.SIGINT) and
signal.getsignal(signal.SIGTERM) before calling signal.signal(...), and then
restore those saved handlers in the corresponding finally clause (after
stop.wait() and before iii.shutdown()) so main(), the run handler, and the dev
handler each reinstate the original SIGINT/SIGTERM handlers when they return.

In `@frameworks/motia/motia-py/packages/motia/src/motia/enqueue.py`:
- Around line 9-12: The test and any callers still using "await enqueue(...)"
must be updated because enqueue is now a synchronous function (def enqueue), so
remove the unnecessary await in tests like test_tracing_integration.py (around
the async handler at line ~141) and any other places that call await
enqueue(...); locate uses by searching for the symbol "enqueue" and replace
"await enqueue(...)" with a direct call "enqueue(...)" (adjust surrounding async
handler code if needed per MIGRATION_GUIDE.md).

In `@frameworks/motia/motia-py/packages/motia/src/motia/state.py`:
- Around line 16-33: StateManager methods (get, set, update, delete, list,
list_groups, clear) are now synchronous, so callers must stop awaiting them;
update all call sites (e.g., places using await state.get(...), await
state.set(...), etc.) to call the methods directly (state.get(...),
state.set(...)) and adjust surrounding code accordingly (remove unnecessary
async/await or convert caller to synchronous flow or explicitly run in an
executor if true async behavior is required). Ensure you reference the
StateManager instance used in the code and replace every await on these method
names to restore correct runtime behavior.

In `@frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py`:
- Around line 42-44: Replace blocking time.sleep() calls inside async test
functions with non-blocking await asyncio.sleep(0.3): locate each occurrence of
time.sleep(0.3) used after bridge/HTTP ops (e.g., the call following
flush_bridge_queue(bridge)) and change it to await asyncio.sleep(0.3); ensure
the test coroutine is declared async and add an import for asyncio at the top if
missing so tests remain non-blocking and won’t block the event loop.

In `@frameworks/motia/motia-py/packages/motia/tests/test_bridge.py`:
- Line 80: The trigger payload uses the wrong field name and units: update the
call to bridge.trigger so it sends "timeout_ms" instead of "timeout" and convert
seconds to milliseconds (e.g., change the {"function_id":
"nonexistent.function", "payload": {}, "timeout": 2.0} payload to use
"timeout_ms": 2000) so it matches the TriggerRequest timeout_ms field expected
by bridge.trigger.

In `@frameworks/motia/motia-py/packages/motia/tests/test_motia_framework.py`:
- Line 50: Replace blocking time.sleep calls inside async test functions with
non-blocking await asyncio.sleep: import asyncio if not already imported, then
change each time.sleep(0.5) (and other durations at the indicated sites) to
await asyncio.sleep(0.5) in the async test functions in test_motia_framework.py
so the event loop is not blocked (look for occurrences of time.sleep in the test
functions and update them accordingly).

In `@frameworks/motia/motia-py/playground/steps/api_steps/sse_step.py`:
- Line 53: The async handler function in sse_step.py is calling time.sleep(...)
which blocks the event loop; replace that blocking call in the handler with a
non-blocking await asyncio.sleep(...) and add the asyncio import if missing
(keep using random.random() for the delay calculation). Update the call inside
the handler (and remove or retain the time import as appropriate) so the sleep
becomes await asyncio.sleep(0.3 + random.random() * 0.7) to avoid blocking
concurrent SSE clients.

In `@frameworks/motia/motia-py/playground/steps/greetings/greet_step.py`:
- Around line 29-37: The handler currently calls the async method
greetings_stream.set(...) without awaiting it, causing an unawaited coroutine;
update the handler function to be asynchronous (change def handler(...) to async
def handler(...)) and await greetings_stream.set(GREETINGS_GROUP_ID, name,
greeting), or alternatively call a provided synchronous wrapper on
greetings_stream if one exists; ensure the function signature (handler) and
return type (ApiResponse[dict[str, Any]]) remain compatible with its caller and
that InternalStateManager's async set is awaited to avoid runtime errors.

In `@sdk/packages/python/iii/README.md`:
- Around line 18-35: The README sample still uses the removed III class and
connect() API; update it to the new register_worker() lifecycle by removing any
usage of III() and iii.connect(), importing the new register_worker entrypoint,
and registering your functions/triggers via the register_worker-compatible API
(replace calls to iii.register_function and iii.register_trigger with the
corresponding arguments or registration calls expected by register_worker).
Ensure all occurrences (including lines 43-45) are changed so the snippet
imports the new symbol instead of III and shows the worker registered/started
via register_worker rather than calling connect().

In `@sdk/packages/python/iii/src/iii/channels.py`:
- Around line 49-54: The helper coroutine _write_and_close currently awaits
self._writer.write(data) and then calls self._writer.close_async(), but if
write() raises an exception close_async() is skipped; modify _write_and_close to
ensure close_async() runs in a finally block (call await
self._writer.close_async() inside finally), catch/log any exceptions from
close_async() so they don't mask the original error, and keep
loop.create_task(_write_and_close()) as the caller; this guarantees the socket
is always closed even if self._writer.write(data) fails.

In `@sdk/packages/python/iii/src/iii/iii.py`:
- Around line 111-129: The registries (_functions, _services, _triggers,
_trigger_types) are mutated from caller threads (via register_function,
register_service, register_trigger, register_trigger_type) while the
reconnect/replay code on the event loop thread iterates them, causing race
conditions; fix by either funneling all registry mutations onto the loop thread
using the existing _schedule_on_loop/_run_on_loop helpers (invoke the register
logic inside a coroutine scheduled on self._loop) or by protecting the shared
dicts with a threading.RLock used both by the public register_* methods and by
the reconnect/replay iterator in _async_connect so every mutation and iteration
acquires the lock. Ensure the chosen approach updates all four registries and
the replay code paths (where list(...) snapshots are taken) to use the same
synchronization.
- Around line 133-141: _wait_until_connected currently waits only once for 30s
and can return while still in "connecting"/"reconnecting", so change
_wait_until_connected to repeatedly wait until the state becomes "connected" or
"failed" (e.g. loop calling self._connected_event.wait(timeout=some_interval)
and re-check self._connection_state), and add an optional timeout parameter
(timeout: float | None) that accumulates elapsed time and, if exceeded, raises a
ConnectionError; preserve the existing behavior when timeout is None (block
indefinitely). Update register_worker to accept/forward a bounded startup
timeout to _wait_until_connected when callers want a bounded startup time.
Ensure you reference and use _connected_event, _connection_state,
_wait_until_connected, and register_worker in the changes.

In `@sdk/packages/python/iii/src/iii/otel_worker_gauges.py`:
- Around line 146-150: The code calls a non-existent
Meter.remove_batch_observable_callback and silently swallows the error, so
remove that call and implement proper callback cleanup: stop calling
_meter.remove_batch_observable_callback(_batch_callback_ref, _observables);
instead ensure you never double-register by checking/clearing
_batch_callback_ref when registering (e.g. only register if _batch_callback_ref
is None) and, depending on your OpenTelemetry version, deregister using the
correct API (look for the API that returns an unregister/handle when registering
and call its unregister/close method) or rely on meter lifecycle to clean up and
just set _batch_callback_ref and _observables to None; update logic around
_meter, _batch_callback_ref, and _observables to reflect this change.

In `@sdk/packages/python/iii/src/iii/telemetry_exporters.py`:
- Around line 102-105: The shutdown() method currently compares self._loop to
asyncio.get_running_loop(), which raises RuntimeError when no event loop is
running; modify shutdown() to guard that call by wrapping
asyncio.get_running_loop() in a try/except RuntimeError (or otherwise detect
absence of a running loop) and only await self._task if a running loop exists
and equals self._loop; specifically update the conditional that now reads "if
self._loop is not None and self._loop is asyncio.get_running_loop():" to handle
RuntimeError and skip the await when no running loop is present, referencing
shutdown(), self._loop, self._task, and asyncio.get_running_loop().

In `@sdk/packages/python/iii/src/iii/telemetry.py`:
- Around line 471-475: The noop span's get_span_context() currently returns None
which breaks callers that expect a SpanContext with an is_valid attribute;
change get_span_context() to return a SpanContext-like object (or a tiny
sentinel) with is_valid = False (and other minimal attributes if used elsewhere)
instead of None so callers that call .is_valid won't raise; update the class
that defines is_recording and get_span_context (the noop span implementation) to
return this invalid/noop context from get_span_context().

In `@sdk/packages/python/iii/src/iii/types.py`:
- Around line 95-99: Update the handler typing so sync functions are allowed:
widen the RemoteFunctionHandler alias (and any occurrence used by
register_function) from only returning an Awaitable to accept callables that
return either Any or Awaitable[Any] (e.g. Union[Callable[..., Any],
Callable[..., Awaitable[Any]]]); then update the register_function signature to
use that widened RemoteFunctionHandler and make the same change in the
implementation signature in iii.py so the public protocol and implementation
match (look for the register_function definition and the RemoteFunctionHandler
type alias).

In `@sdk/packages/python/iii/src/iii/utils.py`:
- Around line 56-61: The safe_stringify function's json.dumps call can raise
exceptions from a value's __str__ (or other unexpected errors) but currently
only catches TypeError and ValueError; update safe_stringify to catch all
exceptions (e.g., except Exception) so any error from json.dumps or the
default=str fallback is swallowed and the function consistently returns the
fallback "[unserializable]". Keep the existing structure and docstring and only
change the exception clause in safe_stringify to ensure the "safe" contract is
honored.

In `@sdk/packages/python/iii/tests/conftest.py`:
- Around line 17-21: The fixture currently creates client = III(ENGINE_WS_URL),
calls client._wait_until_connected() and time.sleep(...) then yield client and
finally client.shutdown(), but if an exception occurs before yield the shutdown
is skipped; wrap the setup and yield in a try/finally so shutdown always runs:
allocate the client as before, then enter try: perform _wait_until_connected()
and sleep, yield client; finally: if client is not None call client.shutdown();
reference symbols: III, client, _wait_until_connected, shutdown.
- Line 8: The test fixture currently imports and directly instantiates the
private III class and calls its internal _wait_until_connected() and a
hard-coded sleep; replace that with the public register_worker(address, options)
API: remove the from iii.iii import III import and any uses of
_wait_until_connected() and sleep, call register_worker(...) with the same
address and an options dict/kwargs (including a timeout if needed) so the
function blocks until the WebSocket is ready, and update any references to the
returned object to match what register_worker returns.

In `@sdk/README.md`:
- Line 97: Update the Python API table so all entries use the new API shape
consistently: replace legacy usages of init(...), register_function(id, ...),
and awaited trigger(...) with the new function names/signatures shown for JS
(e.g., register_trigger({"type": ..., "function_id": ..., "config": ...}) and
the suggested iii.register_trigger(type, fn_id, config) form), and ensure
registerTrigger/register_trigger rows match each other across columns; update
any examples and descriptions to reference register_trigger, function_id, and
the new config shape instead of the old init/register_function/trigger symbols.

---

Outside diff comments:
In `@engine/examples/python/echo_invoker.py`:
- Around line 15-59: The file mixes async awaits inside a synchronous
parse_args() and calls an undefined main(); fix by making parse_args() only
parse/return args (remove any awaits and trigger registrations), then add an
async def main() that: imports/creates or receives the bridge and log instances
used, registers triggers via bridge.register_trigger(...), uses args.bridge_url
or DEFAULT_BRIDGE_URL (replace BRIDGE_URL), awaits bridge.connect() and
bridge.disconnect(), and runs the serve loop; ensure asyncio, argparse, and
logging (or whatever module provides log) are imported and any bridge symbol is
properly instantiated or imported before use; finally keep the existing
asyncio.run(main()) call.

In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py`:
- Around line 557-569: Add a clear note to the Python migration section of
MIGRATION_GUIDE.md stating that Runtime.initialize changed from async to sync
(async def initialize(self) -> def initialize(self)), explaining that callers
must stop awaiting motia.initialize() and invoke it synchronously (or wrap in
asyncio.run if needed); reference the Runtime.initialize method in motia.runtime
and include the commit/PR id or short rationale for the breaking change so users
can update call sites.

In
`@frameworks/motia/motia-py/playground/steps/conditions/mixed_triggers_step.py`:
- Around line 48-53: The call to ctx.match is invoking an async method
(FlowContext.match) but is missing await; update the handler containing the
return statement so it remains async and change the return to await
ctx.match({...}) so the coroutine is awaited and the actual result from the
match call is returned; ensure surrounding function signature stays async and
that _event_handler and _api_handler remain usable as match targets.

In
`@frameworks/motia/motia-py/playground/steps/multi_trigger/triple_trigger_step.py`:
- Around line 43-49: The handler currently calls the async method ctx.match()
without awaiting it; change the handler function (the function named handler in
triple_trigger_step.py) to be async and add await before ctx.match(...) so it
awaits the coroutine result (i.e., make the signature async def handler(...) and
use await ctx.match({...})); ensure any places invoking handler can handle an
async function (or adapt them to await it).

---

Nitpick comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/iii.py`:
- Line 13: The import in iii.py pulls InitOptions and TelemetryOptions from the
internal module iii.iii_constants; update the import to use the public exports
from the iii package instead by importing InitOptions and TelemetryOptions from
the top-level iii module (replace the iii.iii_constants import with the public
iii export) so the code depends on the public SDK API rather than internal
modules.

In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py`:
- Around line 585-591: The join_handler currently returns the awaited result of
config.on_join but leave_handler awaits config.on_leave without returning it,
causing inconsistent behavior; update leave_handler (the handler registered via
get_instance().register_function and get_instance().register_trigger for
stream:leave) to return the awaited result just like join_handler (i.e., capture
result = config.on_leave(subscription, context, auth_context), await if
coroutine, then return result) so both handlers have consistent return semantics
for config.on_* handlers.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py`:
- Around line 16-17: The test fixture currently imports and instantiates the
internal III class (from iii.iii import III) and calls .connect(); replace that
with the public API by importing register_worker and using register_worker(...)
to create and connect the worker in the fixture (use the returned worker object
in place of III instances), and make the same change for the other occurrence
referenced (lines 47-50); update any attributes/method calls to match the public
worker object returned by register_worker instead of the internal III class.

In `@frameworks/motia/motia-py/packages/motia/tests/test_cli.py`:
- Around line 170-172: The tests stub motia.iii.get_instance to return a
SimpleNamespace with connect and shutdown MagicMocks but never assert they were
invoked; update both the "run" test and the "dev --watch" test to assert the
lifecycle by checking that the returned instance's connect MagicMock was called
(e.g., assert instance.connect.called or assert_called_once()) and that shutdown
was called during cleanup (assert instance.shutdown.called or
assert_called_once()); locate the SimpleNamespace returned by the monkeypatched
get_instance in the test and use that object to perform the assertions after
invoking the CLI via cli.main or similar so both connect() and shutdown() are
verified.

In `@frameworks/motia/motia-py/packages/motia/tests/test_pubsub.py`:
- Around line 40-42: The test uses fixed time.sleep calls after
flush_bridge_queue(bridge) (and similar sleeps around lines using the
bridge/registration readiness) which can flake on slow CI; replace these sleeps
with a bounded polling loop that checks the actual readiness condition (e.g.,
bridge queue is empty, a registration flag is set, or the expected message
appears) using time.monotonic() with a short interval and a clear timeout;
update occurrences around flush_bridge_queue(bridge) and the other sleeps
referenced (the test functions interacting with bridge/registration) to poll the
concrete condition instead of sleeping to ensure deterministic waits and timely
failures.

In `@frameworks/motia/motia-py/packages/motia/tests/test_queue_integration.py`:
- Around line 24-29: Replace fixed time.sleep calls with deterministic waits:
after flush_bridge_queue(bridge) and after bridge.trigger(...) use
wait_for_registration(bridge, topic) or poll the shared received list with a
bounded timeout/assertion instead of sleeping; update each occurrence around
bridge.trigger/enqueue (e.g., the blocks that currently sleep 0.5/1.5/2.0
seconds) to call wait_for_registration(bridge, topic) to wait for the consumer
to be ready and then use a small loop that checks whether received contains the
expected message with a timeout (raise/assert on timeout) so tests become fast
and deterministic.

In
`@frameworks/motia/motia-py/playground/steps/state_example/update_state_step.py`:
- Around line 26-27: The handler in update_state_step.py retrieves user_id via
request.path_params.get("id") but doesn’t reject empty values; add a defensive
check after obtaining user_id (the variable user_id in update_state_step.py /
the update state handler) to ensure it is non-empty (not None and not an empty
string) and return a client error (e.g., raise HTTPException or return a 400
response with a clear message) when validation fails, before proceeding to write
state.

In `@frameworks/motia/motia-py/playground/steps/todo/update_todo_step.py`:
- Around line 37-42: The response body is using the return value of
todo_stream.set (assigned to variable todo) which may be None or metadata;
instead return the computed payload updated_todo. Update the ApiResponse
construction in the function that performs the update so it uses updated_todo as
the body rather than the todo variable (leave the call to todo_stream.set(...)
for persistence but don't rely on its return value for the response).

In `@sdk/packages/python/iii/src/iii/logger.py`:
- Around line 67-79: The code converts self._trace_id/self._span_id with
int(...,16) which will raise ValueError for non-hex strings; update the block
around self._trace_id / self._span_id to validate or catch the conversion error:
attempt to parse hex (for self._trace_id and self._span_id) inside a try/except
ValueError (or use a regex/str.isalnum check), and on failure fall back to
span_ctx.trace_id/span_ctx.span_id when span_ctx.is_valid or to 0; also consider
emitting a warning/log when a provided ID is invalid so the issue is observable.

In `@sdk/packages/python/iii/src/iii/stream.py`:
- Around line 28-30: StreamContext is currently declared as Any which removes
type safety; replace it with a more specific type (e.g., a TypedDict or a
Protocol) that matches the actual context shape used by the stream code, or at
minimum document why it must remain Any. Update the StreamContext symbol to
reference the new TypedDict/Protocol name (or an appropriate Union of known
context types) and ensure any functions/methods that accept or return
StreamContext (e.g., stream handlers) use the refined type so type checkers can
validate structure.

In `@sdk/packages/python/iii/tests/test_context_propagation.py`:
- Around line 62-82: Wrap usage of the III client in a try/finally so
client.shutdown() is always called: for the blocks that call
client._run_on_loop(... _handle_invoke(...)) and the later block (lines 92-105),
surround the test body that uses the III instance with try: ... finally:
client.shutdown(), ensuring client.shutdown() is invoked even if assertions or
mocked calls raise; locate the III(...) instantiation and the existing
client.shutdown() calls and replace them with this try/finally pattern around
the code that uses client and its methods (_run_on_loop, _handle_invoke).

In `@sdk/packages/python/iii/tests/test_data_channels.py`:
- Around line 38-50: The test is calling internal async helpers
_async_create_channel() and _async_trigger(); replace these with the public API
equivalents (e.g., create_channel() or create_channel_async wrapper and
trigger() or trigger_async) where available, or add a short note in the test
explaining why the internals are required if no public async wrapper exists.
Update references to iii_client._async_create_channel and
iii_client._async_trigger to use iii_client.create_channel / iii_client.trigger
(or their async public wrappers) so the test exercises the supported API
surface, or add a comment documenting the necessity of the internal calls if
changing them is not feasible.

In `@sdk/packages/python/iii/tests/test_iii_registration_dedup.py`:
- Around line 81-92: Replace the two short time.sleep(0.05) calls surrounding
the register flow with slightly longer waits to improve CI stability: change
them to time.sleep(0.1) (or a configurable constant) in the test that defines
handler and calls client.register_function and client.register_trigger before
client.shutdown; alternatively, if the client exposes a ready/connected event or
awaitable (e.g., a connect/ready future), use that instead of sleeping to wait
for auto-connect completion and for registration to propagate.

In `@sdk/packages/python/iii/tests/test_register_function_args.py`:
- Around line 79-82: The short sleep in the test helper _make_client (calling
III("ws://fake", InitOptions()) then time.sleep(0.05)) can be flaky under CI;
increase the sleep duration to a more reliable value (e.g., 200–500ms) so the
III client has time to establish its connection before returning, or replace the
fixed sleep with a simple readiness loop that polls the client's ready/connected
state instead of sleeping 50ms.
- Around line 103-104: Change the async handlers to sync handlers to match the
sync-first model: replace "async def handler(data: Any) -> Any:" with "def
handler(data: Any) -> Any:" in this test (and the other two occurrences noted),
ensure the function body remains the same and any test invocation uses the sync
call pattern used elsewhere (no await). Target the "handler" definitions in
test_register_function_args.py and update all three async handler declarations
to plain def.

In `@sdk/packages/python/iii/tests/test_state.py`:
- Around line 18-28: The helper function `_poll` is not used anywhere in the
test file; delete the entire `_poll` async function definition to remove dead
code, and after removing it, also remove any now-unused imports (e.g.,
`asyncio`) that were only required for `_poll`; ensure no other references to
`_poll` remain in tests before committing.

In `@sdk/packages/python/iii/tests/test_sync_api.py`:
- Around line 16-31: Extract the FakeWebSocket test helper into a shared test
utilities module (e.g., tests/utils.py or conftest.py) and replace the duplicate
class definitions in both test_sync_api.py and test_iii_registration_dedup.py
with an import; specifically move the class FakeWebSocket (including its sent
attribute, state initialization, async send, async close, __aiter__ and
__anext__ implementations) to the shared module, update both test files to
import FakeWebSocket from that module, and remove the local duplicate
definitions so tests use the single shared implementation.
- Around line 89-98: The test test_background_thread_stops_on_shutdown is racy
because it asserts thread.is_alive() immediately after calling III.shutdown();
change it to wait deterministically for the thread to stop by calling
client._thread.join(timeout=...) (or loop with small sleeps up to a short
deadline) after client.shutdown(), then assert not client._thread.is_alive() (or
that join returned/that the thread terminated within the timeout) so the test
tolerates scheduling delays and becomes deterministic.

In `@sdk/packages/python/iii/tests/test_utils.py`:
- Around line 35-50: Add a regression test mirroring the existing tests for
safe_stringify: create a new test function (e.g.,
test_safe_stringify_runtime_error_returns_unserializable) that defines a class
whose __str__ raises RuntimeError and then calls safe_stringify(instance) and
asserts the result equals "[unserializable]"; locate this alongside the existing
tests for BadRepr/BadValue in test_utils.py to ensure safe_stringify handles
non-TypeError/ValueError exceptions from __str__ without raising.

Comment on lines +121 to +130
" import signal",
" import threading",
" iii.connect()",
" stop = threading.Event()",
" signal.signal(signal.SIGINT, lambda *_: stop.set())",
" signal.signal(signal.SIGTERM, lambda *_: stop.set())",
" try:",
" stop.wait()",
" finally:",
" iii.shutdown()",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n frameworks/motia/motia-py/packages/motia/src/motia/cli.py | head -150

Repository: iii-hq/iii

Length of output: 6387


🏁 Script executed:

cat -n frameworks/motia/motia-py/packages/motia/src/motia/cli.py | sed -n '115,245p'

Repository: iii-hq/iii

Length of output: 5236


🏁 Script executed:

cat -n frameworks/motia/motia-py/packages/motia/src/motia/cli.py | sed -n '256,280p'

Repository: iii-hq/iii

Length of output: 1060


🏁 Script executed:

fd test_cli.py --type f

Repository: iii-hq/iii

Length of output: 112


🏁 Script executed:

cat frameworks/motia/motia-py/packages/motia/tests/test_cli.py

Repository: iii-hq/iii

Length of output: 9419


Restore signal handlers before returning from all three lifecycle blocks.

These handlers replace process-global state without reinstatement. After main() exits, subsequent in-process code inherits handlers that only flip a stale event, breaking normal Ctrl+C behavior. This affects the test suite, which invokes cli.main() repeatedly in the same process.

Fix by saving handlers with signal.getsignal() before replacement and restoring them in the finally block:

♻️ Suggested fix pattern
             stop_event = threading.Event()
+            previous_sigint = signal.getsignal(signal.SIGINT)
+            previous_sigterm = signal.getsignal(signal.SIGTERM)
             signal.signal(signal.SIGINT, lambda *_: stop_event.set())
             signal.signal(signal.SIGTERM, lambda *_: stop_event.set())
             try:
                 stop_event.wait()
             finally:
+                signal.signal(signal.SIGINT, previous_sigint)
+                signal.signal(signal.SIGTERM, previous_sigterm)
                 iii.shutdown()

Apply to all three locations: the auto-generated index (lines 121–130), the run command (lines 233–239), and the dev command (lines 270–276).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/cli.py` around lines 121 -
130, The signal handlers set in the lifecycle blocks (the auto-generated index
block around the iii.connect()/iii.shutdown() sequence and the CLI commands run
and dev) overwrite process-global handlers and are not restored, breaking
subsequent invocations; modify each block to capture the previous handlers with
signal.getsignal(signal.SIGINT) and signal.getsignal(signal.SIGTERM) before
calling signal.signal(...), and then restore those saved handlers in the
corresponding finally clause (after stop.wait() and before iii.shutdown()) so
main(), the run handler, and the dev handler each reinstate the original
SIGINT/SIGTERM handlers when they return.

Comment on lines +42 to 44
flush_bridge_queue(bridge)
time.sleep(0.3)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify blocking sleeps inside async tests in this file.
python - <<'PY'
import ast, pathlib
path = pathlib.Path("frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py")
tree = ast.parse(path.read_text())
for fn in [n for n in ast.walk(tree) if isinstance(n, ast.AsyncFunctionDef)]:
    for node in ast.walk(fn):
        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
            if isinstance(node.func.value, ast.Name) and node.func.value.id == "time" and node.func.attr == "sleep":
                print(f"{path}:{node.lineno}: async test '{fn.name}' uses time.sleep")
PY

Repository: iii-hq/iii

Length of output: 1278


🏁 Script executed:

#!/bin/bash
# Check file context and imports
head -20 frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py

echo "---"

# Check context around line 43
sed -n '40,46p' frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py

echo "---"

# Check if asyncio is already imported
grep -n "^import asyncio\|^from asyncio" frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py || echo "No asyncio import found"

Repository: iii-hq/iii

Length of output: 622


Replace blocking time.sleep() with await asyncio.sleep() in async tests.

Using time.sleep() in async test functions (lines 43, 76, 114, 147, 181, 214, 259, 329, 401) blocks the event loop and creates potential flakiness in async HTTP/bridge operations. Use await asyncio.sleep(0.3) instead.

♻️ Suggested change pattern
-import json
-import time
+import asyncio
+import json
+import time

-    time.sleep(0.3)
+    await asyncio.sleep(0.3)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py` around
lines 42 - 44, Replace blocking time.sleep() calls inside async test functions
with non-blocking await asyncio.sleep(0.3): locate each occurrence of
time.sleep(0.3) used after bridge/HTTP ops (e.g., the call following
flush_bridge_queue(bridge)) and change it to await asyncio.sleep(0.3); ensure
the test coroutine is declared async and add an import for asyncio at the top if
missing so tests remain non-blocking and won’t block the event loop.

"""Test that invoking a non-existent function raises an error."""
with pytest.raises(Exception):
await bridge.trigger({"function_id": "nonexistent.function", "payload": {}, "timeout": 2.0})
bridge.trigger({"function_id": "nonexistent.function", "payload": {}, "timeout": 2.0})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

python - <<'PY'
import ast, pathlib

types_path = pathlib.Path("sdk/packages/python/iii/src/iii/iii_types.py")
tree = ast.parse(types_path.read_text())
for node in tree.body:
    if isinstance(node, ast.ClassDef) and node.name == "TriggerRequest":
        fields = [s.target.id for s in node.body if isinstance(s, ast.AnnAssign) and isinstance(s.target, ast.Name)]
        print("TriggerRequest fields:", fields)
        break

test_path = pathlib.Path("frameworks/motia/motia-py/packages/motia/tests/test_bridge.py")
for i, line in enumerate(test_path.read_text().splitlines(), start=1):
    if '"timeout"' in line or "'timeout'" in line:
        print(f"{test_path}:{i}: {line.strip()}")
PY

Repository: iii-hq/iii

Length of output: 280


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for other timeout usage in test files
echo "=== Searching for 'timeout' in Python test files ==="
rg -t py '"timeout"\s*:|'"'"'timeout'"'"'\s*:' frameworks/motia/motia-py/packages/motia/tests/ -n

echo ""
echo "=== Checking if 'timeout_ms' is used correctly elsewhere ==="
rg -t py '"timeout_ms"\s*:|'"'"'timeout_ms'"'"'\s*:' frameworks/motia/motia-py/packages/motia/ -n

Repository: iii-hq/iii

Length of output: 328


Use timeout_ms instead of timeout in trigger payload.

Line 80 uses "timeout": 2.0, which does not match the TriggerRequest field (timeout_ms). Update to "timeout_ms": 2000 (converting the value from seconds to milliseconds).

Suggested fix
-        bridge.trigger({"function_id": "nonexistent.function", "payload": {}, "timeout": 2.0})
+        bridge.trigger({"function_id": "nonexistent.function", "payload": {}, "timeout_ms": 2000})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_bridge.py` at line 80,
The trigger payload uses the wrong field name and units: update the call to
bridge.trigger so it sends "timeout_ms" instead of "timeout" and convert seconds
to milliseconds (e.g., change the {"function_id": "nonexistent.function",
"payload": {}, "timeout": 2.0} payload to use "timeout_ms": 2000) so it matches
the TriggerRequest timeout_ms field expected by bridge.trigger.

Comment on lines +56 to +61
def safe_stringify(value: Any) -> str:
"""Safely stringify a value, handling circular references and non-serializable types."""
try:
return json.dumps(value, default=str)
except (TypeError, ValueError):
return "[unserializable]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import json

class Boom:
    def __str__(self):
        raise RuntimeError("boom")

try:
    print(json.dumps({"obj": Boom()}, default=str))
except Exception as e:
    print(type(e).__name__)
PY

Repository: iii-hq/iii

Length of output: 66


Catch all exceptions to honor the "safe" contract.

The json.dumps(value, default=str) call can propagate any exception raised by a value's __str__() method. The current exception handler only catches TypeError and ValueError, leaving RuntimeError and other exceptions unhandled. This violates the function's documented guarantee to safely stringify values.

Proposed fix
 def safe_stringify(value: Any) -> str:
     """Safely stringify a value, handling circular references and non-serializable types."""
     try:
         return json.dumps(value, default=str)
-    except (TypeError, ValueError):
+    except Exception:
         return "[unserializable]"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/utils.py` around lines 56 - 61, The
safe_stringify function's json.dumps call can raise exceptions from a value's
__str__ (or other unexpected errors) but currently only catches TypeError and
ValueError; update safe_stringify to catch all exceptions (e.g., except
Exception) so any error from json.dumps or the default=str fallback is swallowed
and the function consistently returns the fallback "[unserializable]". Keep the
existing structure and docstring and only change the exception clause in
safe_stringify to ensure the "safe" contract is honored.

Comment on lines 17 to +21
client = III(ENGINE_WS_URL)
await client.connect()
await asyncio.sleep(0.3)
client._wait_until_connected() # wait for auto-connect to complete
time.sleep(0.3)
yield client
await client.shutdown()
client.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure fixture teardown always runs on setup failure.

If setup fails before yield (e.g., at Line 18 or Line 19), client.shutdown() at Line 21 is skipped. Wrap setup/yield in try/finally so cleanup is guaranteed.

Proposed fix
 `@pytest.fixture`
 def iii_client():
     """Create and connect an III client, shut it down after the test."""
     client = III(ENGINE_WS_URL)
-    client._wait_until_connected()  # wait for auto-connect to complete
-    time.sleep(0.3)
-    yield client
-    client.shutdown()
+    try:
+        client._wait_until_connected()  # wait for auto-connect to complete
+        time.sleep(0.3)
+        yield client
+    finally:
+        client.shutdown()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
client = III(ENGINE_WS_URL)
await client.connect()
await asyncio.sleep(0.3)
client._wait_until_connected() # wait for auto-connect to complete
time.sleep(0.3)
yield client
await client.shutdown()
client.shutdown()
try:
client = III(ENGINE_WS_URL)
client._wait_until_connected() # wait for auto-connect to complete
time.sleep(0.3)
yield client
finally:
client.shutdown()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/conftest.py` around lines 17 - 21, The fixture
currently creates client = III(ENGINE_WS_URL), calls
client._wait_until_connected() and time.sleep(...) then yield client and finally
client.shutdown(), but if an exception occurs before yield the shutdown is
skipped; wrap the setup and yield in a try/finally so shutdown always runs:
allocate the client as before, then enter try: perform _wait_until_connected()
and sleep, yield client; finally: if client is not None call client.shutdown();
reference symbols: III, client, _wait_until_connected, shutdown.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
frameworks/motia/motia-py/packages/motia/src/motia/cli.py (1)

121-130: ⚠️ Potential issue | 🟠 Major

Restore previous SIGINT/SIGTERM handlers before exiting each lifecycle block.

Line 125/126, Line 234/235, and Line 271/272 overwrite process-global handlers and never restore them. This leaks handler state across repeated in-process invocations (notably tests).

♻️ Suggested fix pattern
- stop_event = threading.Event()
- signal.signal(signal.SIGINT, lambda *_: stop_event.set())
- signal.signal(signal.SIGTERM, lambda *_: stop_event.set())
+ stop_event = threading.Event()
+ previous_sigint = signal.getsignal(signal.SIGINT)
+ previous_sigterm = signal.getsignal(signal.SIGTERM)
+ signal.signal(signal.SIGINT, lambda *_: stop_event.set())
+ signal.signal(signal.SIGTERM, lambda *_: stop_event.set())
  try:
      stop_event.wait()
  finally:
+     signal.signal(signal.SIGINT, previous_sigint)
+     signal.signal(signal.SIGTERM, previous_sigterm)
      iii.shutdown()

Also applies to: 233-239, 270-276

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/cli.py` around lines 121 -
130, The code installs new SIGINT/SIGTERM handlers with signal.signal(...)
around the lifecycle wait (see iii._wait_until_connected(), stop Event and
stop.wait()) but never restores the previous handlers, leaking global state; fix
by saving the original handlers with signal.getsignal(signal.SIGINT) and
signal.getsignal(signal.SIGTERM) before calling signal.signal, and in the
finally block (around iii.shutdown()) restore them using signal.signal(...,
previous_handler) so each lifecycle block cleans up its handlers; apply the same
pattern to the other lifecycle blocks that create stop Events and call
signal.signal.
🧹 Nitpick comments (1)
frameworks/motia/motia-py/packages/motia/tests/conftest.py (1)

47-53: Harden fixture teardown with try/finally.

If setup logic evolves and raises before yield, teardown can become fragile. Wrapping the yielded section in try/finally keeps cleanup deterministic.

Proposed refactor
 `@pytest.fixture`
-def bridge() -> Generator:
+def bridge() -> Generator[III, None, None]:
     bridge_instance = III(TEST_ENGINE_URL)
-    bridge_instance._wait_until_connected()
-    time.sleep(0.1)
-    yield bridge_instance
-    bridge_instance.shutdown()
+    try:
+        bridge_instance._wait_until_connected()
+        time.sleep(0.1)
+        yield bridge_instance
+    finally:
+        bridge_instance.shutdown()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 47 -
53, The bridge fixture should ensure deterministic teardown by wrapping the
yield in a try/finally: create bridge_instance (using III(TEST_ENGINE_URL)),
call _wait_until_connected() and time.sleep(0.1) in setup, then do "try: yield
bridge_instance finally: bridge_instance.shutdown()"; also guard the finally so
shutdown is only called if bridge_instance was successfully created (e.g., check
locals() or use a conditional) to avoid NameError if setup raises before
assignment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/cli.py`:
- Around line 123-130: The calls to iii._wait_until_connected() are currently
outside the finally that calls iii.shutdown(), so if _wait_until_connected()
raises or is interrupted cleanup is skipped; fix by moving/placing the
_wait_until_connected() (and the subsequent stop.wait() logic and signal
handlers) inside a try block and ensure iii.shutdown() is called in the
corresponding finally for each occurrence (the blocks around
iii._wait_until_connected(), stop.wait(), signal.signal(...) at the three
sites), i.e. wrap the connect/wait sequence in try: ... finally: iii.shutdown()
so shutdown always runs even on exceptions or interrupts.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py`:
- Around line 24-29: flush_bridge_queue currently schedules send coroutines via
bridge._schedule_on_loop for each item in bridge._queue but then uses a fixed
time.sleep which does not await send completion and can drop errors; change it
to collect the scheduled send futures/coroutines (from bridge._schedule_on_loop
or by creating tasks that call bridge._ws.send(json.dumps(...))) and then wait
for them to complete (e.g., await or use loop.run_until_complete/gather on those
futures) instead of time.sleep(0.05) so all sends finish and exceptions
propagate; update flush_bridge_queue to drain bridge._queue only after the
awaited send completions and rely on bridge._ws.send and
bridge._schedule_on_loop identifiers to locate the scheduling logic.
- Around line 31-43: wait_for_registration currently swallows all exceptions in
its polling loop, hiding the real failure; modify the except block to capture
the exception (e.g., except Exception as e: last_exc = e) and continue retrying,
and when timing out raise the TimeoutError including the original exception
information (either by using "raise TimeoutError(...) from last_exc" or
embedding str(last_exc) in the message); update the local variables in
wait_for_registration to initialize last_exc = None before the loop so the final
TimeoutError can reference it.

---

Duplicate comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/cli.py`:
- Around line 121-130: The code installs new SIGINT/SIGTERM handlers with
signal.signal(...) around the lifecycle wait (see iii._wait_until_connected(),
stop Event and stop.wait()) but never restores the previous handlers, leaking
global state; fix by saving the original handlers with
signal.getsignal(signal.SIGINT) and signal.getsignal(signal.SIGTERM) before
calling signal.signal, and in the finally block (around iii.shutdown()) restore
them using signal.signal(..., previous_handler) so each lifecycle block cleans
up its handlers; apply the same pattern to the other lifecycle blocks that
create stop Events and call signal.signal.

---

Nitpick comments:
In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py`:
- Around line 47-53: The bridge fixture should ensure deterministic teardown by
wrapping the yield in a try/finally: create bridge_instance (using
III(TEST_ENGINE_URL)), call _wait_until_connected() and time.sleep(0.1) in
setup, then do "try: yield bridge_instance finally: bridge_instance.shutdown()";
also guard the finally so shutdown is only called if bridge_instance was
successfully created (e.g., check locals() or use a conditional) to avoid
NameError if setup raises before assignment.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2c6f5734-7b7a-42d1-94af-4455b6afef68

📥 Commits

Reviewing files that changed from the base of the PR and between efbc1d7 and dcd2d05.

📒 Files selected for processing (2)
  • frameworks/motia/motia-py/packages/motia/src/motia/cli.py
  • frameworks/motia/motia-py/packages/motia/tests/conftest.py

Comment on lines +24 to 29
def flush_bridge_queue(bridge) -> None:
"""Flush the bridge queue."""
while bridge._queue and bridge._ws:
await bridge._ws.send(json.dumps(bridge._queue.pop(0)))
bridge._schedule_on_loop(bridge._ws.send(json.dumps(bridge._queue.pop(0))))
time.sleep(0.05)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

flush_bridge_queue does not actually wait for send completion.

Line 27 schedules sends, but Line 28 uses a fixed sleep instead of waiting on send completion. That makes flush timing-dependent and can silently lose send errors.

Proposed fix
 def flush_bridge_queue(bridge) -> None:
     """Flush the bridge queue."""
+    pending = []
     while bridge._queue and bridge._ws:
-        bridge._schedule_on_loop(bridge._ws.send(json.dumps(bridge._queue.pop(0))))
-    time.sleep(0.05)
+        payload = json.dumps(bridge._queue.pop(0))
+        fut = bridge._schedule_on_loop(bridge._ws.send(payload))
+        if fut is not None:
+            pending.append(fut)
+    for fut in pending:
+        fut.result(timeout=1.0)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 24 -
29, flush_bridge_queue currently schedules send coroutines via
bridge._schedule_on_loop for each item in bridge._queue but then uses a fixed
time.sleep which does not await send completion and can drop errors; change it
to collect the scheduled send futures/coroutines (from bridge._schedule_on_loop
or by creating tasks that call bridge._ws.send(json.dumps(...))) and then wait
for them to complete (e.g., await or use loop.run_until_complete/gather on those
futures) instead of time.sleep(0.05) so all sends finish and exceptions
propagate; update flush_bridge_queue to drain bridge._queue only after the
awaited send completions and rely on bridge._ws.send and
bridge._schedule_on_loop identifiers to locate the scheduling logic.

Comment on lines +31 to 43
def wait_for_registration(bridge, function_id: str, timeout: float = 5.0) -> None:
deadline = time.monotonic() + timeout
poll_interval = 0.1
while time.monotonic() < deadline:
try:
result = await bridge.trigger({"function_id": "engine::functions::list", "payload": {}})
result = bridge.trigger({"function_id": "engine::functions::list", "payload": {}})
functions = result.get("functions", []) if isinstance(result, dict) else []
ids = [f.get("function_id") for f in functions if isinstance(f, dict) and f.get("function_id")]
ids = [f.get("function_id") for f in functions if isinstance(f, dict)]
if function_id in ids:
return
except Exception:
pass
await asyncio.sleep(poll_interval)
time.sleep(0.1)
raise TimeoutError(f"Function {function_id} was not registered within {timeout}s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

wait_for_registration masks the real failure cause.

Line 40 catches all exceptions and drops them, so a timeout at Line 43 hides what actually failed (transport error, RPC error, etc.).

Proposed fix
 def wait_for_registration(bridge, function_id: str, timeout: float = 5.0) -> None:
     deadline = time.monotonic() + timeout
+    last_error: Exception | None = None
     while time.monotonic() < deadline:
         try:
             result = bridge.trigger({"function_id": "engine::functions::list", "payload": {}})
             functions = result.get("functions", []) if isinstance(result, dict) else []
             ids = [f.get("function_id") for f in functions if isinstance(f, dict)]
             if function_id in ids:
                 return
-        except Exception:
-            pass
+        except Exception as exc:
+            last_error = exc
         time.sleep(0.1)
-    raise TimeoutError(f"Function {function_id} was not registered within {timeout}s")
+    msg = f"Function {function_id} was not registered within {timeout}s"
+    if last_error is not None:
+        raise TimeoutError(msg) from last_error
+    raise TimeoutError(msg)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 31 -
43, wait_for_registration currently swallows all exceptions in its polling loop,
hiding the real failure; modify the except block to capture the exception (e.g.,
except Exception as e: last_exc = e) and continue retrying, and when timing out
raise the TimeoutError including the original exception information (either by
using "raise TimeoutError(...) from last_exc" or embedding str(last_exc) in the
message); update the local variables in wait_for_registration to initialize
last_exc = None before the loop so the final TimeoutError can reference it.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
frameworks/motia/motia-py/packages/motia/tests/test_cli.py (1)

170-171: Optional: extract repeated SDK/event mock setup into a helper fixture.

The same setup is duplicated in two tests; extracting it will reduce drift when lifecycle methods change again.

♻️ Example refactor
+def _mock_iii_and_stop_event(monkeypatch: pytest.MonkeyPatch):
+    iii = SimpleNamespace(_wait_until_connected=MagicMock(), shutdown=MagicMock())
+    stop_event = SimpleNamespace(wait=MagicMock(), set=MagicMock())
+    monkeypatch.setattr("motia.iii.get_instance", lambda: iii)
+    monkeypatch.setattr(cli.threading, "Event", lambda: stop_event)
+    return iii, stop_event
-    monkeypatch.setattr("motia.iii.get_instance", lambda: SimpleNamespace(_wait_until_connected=MagicMock(), shutdown=MagicMock()))
-    monkeypatch.setattr(cli.threading, "Event", lambda: SimpleNamespace(wait=MagicMock(), set=MagicMock()))
+    _mock_iii_and_stop_event(monkeypatch)

Also applies to: 216-218

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_cli.py` around lines 170
- 171, Extract the duplicated SDK and Event monkeypatch setup in
tests/test_cli.py into a reusable pytest fixture (e.g., sdk_and_event_mock) that
performs the two monkeypatch.setattr calls currently using
motia.iii.get_instance and cli.threading.Event; then replace the duplicated
lines at the current locations (and lines 216-218) by adding that fixture as a
test parameter. Ensure the fixture creates the SimpleNamespace with
_wait_until_connected and shutdown MagicMocks and the Event with wait and set
MagicMocks so existing tests keep the same behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@frameworks/motia/motia-py/packages/motia/tests/test_cli.py`:
- Around line 170-171: Extract the duplicated SDK and Event monkeypatch setup in
tests/test_cli.py into a reusable pytest fixture (e.g., sdk_and_event_mock) that
performs the two monkeypatch.setattr calls currently using
motia.iii.get_instance and cli.threading.Event; then replace the duplicated
lines at the current locations (and lines 216-218) by adding that fixture as a
test parameter. Ensure the fixture creates the SimpleNamespace with
_wait_until_connected and shutdown MagicMocks and the Event with wait and set
MagicMocks so existing tests keep the same behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9e49d591-cc5d-41f6-a9ad-da83230ec857

📥 Commits

Reviewing files that changed from the base of the PR and between dcd2d05 and 5b3c0da.

📒 Files selected for processing (1)
  • frameworks/motia/motia-py/packages/motia/tests/test_cli.py

ytallo added 5 commits March 14, 2026 13:32
- Python SDK: register_worker() now auto-connects (connect() removed from public API)
- Replaced III class usage with register_worker() as single entry point
- Added missing type exports: ReconnectionConfig, TelemetryOptions, OtelConfig,
  TriggerActionEnqueue, TriggerActionVoid, FunctionInfo
- Logger: function_name → service_name constructor parameter
- New worker metrics: cpu.user_micros, cpu.system_micros, memory.vms,
  memory.heap_used, memory.heap_total, memory.external, uptime_seconds
- Added safe_stringify utility and StreamContext type
- Updated motia framework to match new SDK patterns
- All tests updated (134 pass)
The III.connect() method was renamed to _wait_until_connected() as part
of the SDK refactor. Update all motia framework usages (tests/conftest.py
and cli.py) to use the new internal method name.
The SimpleNamespace mocks in test_cli.py used the removed connect()
method. Update to _wait_until_connected() to match the SDK refactor.
…ration

- Updated Makefile and CI workflow to remove the --cov-fail-under=70 option from pytest commands.
- Adjusted pyproject.toml to eliminate the coverage fail-under threshold for pytest.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
engine/examples/python/echo_invoker.py (1)

15-21: ⚠️ Potential issue | 🔴 Critical

Critical: This example file is broken and will not run.

Multiple issues prevent execution:

  1. Missing import: argparse is used on line 16 but not imported.
  2. Undefined identifiers: bridge, log, and main are referenced throughout but never defined. Line 36 references BRIDGE_URL, which only DEFAULT_BRIDGE_URL is defined as.
  3. Broken function structure: The parse_args() function (lines 15–50) improperly contains all connection logic, trigger registration, and the event loop. This code should be in a separate main() or async main() function. The function does not return the parsed arguments.
  4. Missing main function: Line 54 calls main(), which is never defined.

The file requires significant structural refactoring before it can run.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/examples/python/echo_invoker.py` around lines 15 - 21, The file is
structurally broken: move CLI parsing into a proper parse_args() that only
builds and returns argparse.Namespace (fix missing import by adding "import
argparse"), create a separate main() (or async main()) to perform connection
setup and event loop logic currently inside parse_args(), replace undefined
BRIDGE_URL uses with DEFAULT_BRIDGE_URL or args.bridge_url, and define the
missing globals/objects (instantiate a bridge client stored in bridge, and a
logger stored in log) before registering triggers; ensure the script calls
main() (await if async) from the if __name__ == "__main__" block so the module
runs.
♻️ Duplicate comments (11)
sdk/README.md (1)

95-101: ⚠️ Potential issue | 🟡 Minor

API table is still partially migrated for Python.

Only the trigger row was updated. Python rows for initialize, register function, and invoke still use legacy signatures, and the init() paragraph remains Python-inaccurate. Please align Line 95, Line 96, Line 98, and Line 101 with register_worker(...), dict-based register_function(...), and sync trigger(...).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/README.md` around lines 95 - 101, Update the Python API entries in the
table and the paragraph to use the new, accurate Python signatures: change
initialization to reference register_worker(...) as the Python way to
create/connect the SDK (instead of init()), update the Python register function
row to show the dict-based register_function({...}) signature, and change the
Python invoke rows to the synchronous trigger(...) form (and the fire-and-forget
variant using TriggerAction.Void() in the same style). Also update the paragraph
about init() to describe register_worker(...) creating an SDK instance and
auto-connecting with WebSocket/reconnect/OpenTelemetry behavior so the prose
matches the new Python API names (referencing register_worker,
register_function, and trigger).
frameworks/motia/motia-py/playground/steps/api_steps/sse_step.py (1)

53-53: ⚠️ Potential issue | 🟠 Major

Replace blocking sleep in async handler.

Line 53 uses time.sleep(...) inside async def handler, which blocks the event loop and can stall concurrent SSE streams.

💡 Proposed fix
+import asyncio
 import json
 import math
 import random
 import time
@@
-        time.sleep(0.3 + random.random() * 0.7)
+        await asyncio.sleep(0.3 + random.random() * 0.7)

Use this read-only check to verify there are no time.sleep calls inside async functions:

#!/bin/bash
python - <<'PY'
import ast
from pathlib import Path

for path in Path(".").rglob("*.py"):
    try:
        tree = ast.parse(path.read_text())
    except Exception:
        continue
    for fn in [n for n in ast.walk(tree) if isinstance(n, ast.AsyncFunctionDef)]:
        for call in [c for c in ast.walk(fn) if isinstance(c, ast.Call)]:
            f = call.func
            if isinstance(f, ast.Attribute) and isinstance(f.value, ast.Name):
                if f.value.id == "time" and f.attr == "sleep":
                    print(f"{path}: Line {call.lineno}: blocking time.sleep inside async function '{fn.name}'")
PY
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/playground/steps/api_steps/sse_step.py` at line 53,
The async function handler contains a blocking call time.sleep(...) which stalls
the event loop; change that call in async def handler to use await
asyncio.sleep(0.3 + random.random() * 0.7) and add an import asyncio if not
present, or remove the specific time import only if it's unused elsewhere;
ensure no other time.sleep usages remain inside async functions (keep
random.random as-is).
sdk/packages/python/iii/src/iii/utils.py (1)

56-61: ⚠️ Potential issue | 🟠 Major

Broaden exception handling in safe_stringify (Line 60).

json.dumps(..., default=str) can propagate exceptions beyond TypeError/ValueError (e.g., from __str__). Current handling can still crash callers.

Proposed fix
 def safe_stringify(value: Any) -> str:
     """Safely stringify a value, handling circular references and non-serializable types."""
     try:
         return json.dumps(value, default=str)
-    except (TypeError, ValueError):
+    except Exception:
         return "[unserializable]"
#!/bin/bash
python - <<'PY'
import json

class Boom:
    def __str__(self):
        raise RuntimeError("boom")

def current_safe_stringify(value):
    try:
        return json.dumps(value, default=str)
    except (TypeError, ValueError):
        return "[unserializable]"

def fixed_safe_stringify(value):
    try:
        return json.dumps(value, default=str)
    except Exception:
        return "[unserializable]"

for fn in (current_safe_stringify, fixed_safe_stringify):
    try:
        print(fn.__name__, "=>", fn({"obj": Boom()}))
    except Exception as e:
        print(fn.__name__, "raised", type(e).__name__)
PY

Expected: current_safe_stringify raises RuntimeError; fixed_safe_stringify returns "[unserializable]".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/utils.py` around lines 56 - 61,
safe_stringify currently only catches TypeError and ValueError which misses
exceptions raised from __str__ or other unexpected errors during json.dumps;
update the exception handler in safe_stringify to catch a broader Exception
(e.g., replace "except (TypeError, ValueError):" with "except Exception:") so
any runtime error during serialization (including from __str__) returns
"[unserializable]" instead of propagating.
sdk/packages/python/iii/src/iii/telemetry.py (1)

471-475: ⚠️ Potential issue | 🟠 Major

Return a SpanContext-like sentinel from the noop span.

At Line 475, _NoopSpan.get_span_context() returns None, which can crash callers that check span.get_span_context().is_valid.

Proposed fix
         class _NoopSpan:
+            class _NoopSpanContext:
+                is_valid = False
+                trace_id = 0
+                span_id = 0
+
             def set_attribute(self, *a: Any, **kw: Any) -> None: ...
             def set_attributes(self, *a: Any, **kw: Any) -> None: ...
             def add_event(self, *a: Any, **kw: Any) -> None: ...
             def set_status(self, *a: Any, **kw: Any) -> None: ...
             def record_exception(self, *a: Any, **kw: Any) -> None: ...
             def end(self) -> None: ...
             def is_recording(self) -> bool:
                 return False

             def get_span_context(self) -> Any:
-                return None
+                return self._NoopSpanContext()
#!/bin/bash
# Verify code paths that dereference span context validity.
rg -nP --type=py '\bget_span_context\(\)\.is_valid\b|\bspan_ctx\.is_valid\b'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/telemetry.py` around lines 471 - 475, The
noop span's get_span_context currently returns None which breaks callers
expecting a SpanContext-like object; update _NoopSpan.get_span_context to return
a sentinel SpanContext object (e.g., create an internal _NoopSpanContext with
is_valid property returning False and any minimal attributes callers expect,
such as trace_id/span_id or methods) instead of None, or return the library's
INVALID/empty span context if available; modify the _NoopSpan class to
instantiate/return this sentinel so callers can safely call
span.get_span_context().is_valid without crashing.
sdk/packages/python/iii/tests/test_sync_api.py (1)

66-75: ⚠️ Potential issue | 🟠 Major

Call register_worker() in the test that claims to cover it.

At Lines 66-73 this bypasses the API under test: III(...) plus _wait_until_connected() would still pass even if register_worker() stopped blocking until ready. Use the public helper here so the test actually guards the supported contract.

Proposed fix
 def test_register_worker_connects(monkeypatch: pytest.MonkeyPatch) -> None:
     """register_worker() should return an already-connected client."""
     _patch_ws(monkeypatch)
-    client = III("ws://fake", InitOptions())
-    time.sleep(0.05)
-
-    client._wait_until_connected()
+    from iii import register_worker
+
+    client = register_worker("ws://fake", InitOptions())
     assert client.get_connection_state() == "connected"
 
     client.shutdown()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_sync_api.py` around lines 66 - 75, The
test currently constructs III("ws://fake", InitOptions()) and uses
client._wait_until_connected() which bypasses the public API; replace that with
a call to the public helper register_worker() so the test actually exercises the
contract: call client.register_worker(...) (or the module-level register_worker
helper if that’s the supported entry) and wait/assert its returned/observed
connected state instead of calling the private _wait_until_connected(), then
assert client.get_connection_state() == "connected" and call client.shutdown();
update references in test_register_worker_connects to use register_worker() and
remove the direct _wait_until_connected() usage.
sdk/packages/python/iii/tests/test_context_propagation.py (1)

62-81: ⚠️ Potential issue | 🟡 Minor

Always shut the client down in a finally block.

In both tests, client.shutdown() only runs after the assertions. If _handle_invoke() or any assertion fails, the background thread and OTel state survive into later tests.

Also applies to: 92-105

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_context_propagation.py` around lines 62 -
81, Wrap the client usage and assertions in a try/finally so client.shutdown()
always runs even if _handle_invoke or assertions fail: create the III client
(III(...)), register_function(...), call
client._run_on_loop(client._handle_invoke(...)) inside a try block and move
client.shutdown() into the finally block; do the same for the other test block
that calls client._handle_invoke (the duplicate around lines 92-105) to ensure
background thread and OTel state are always cleaned up.
sdk/packages/python/iii/tests/conftest.py (1)

4-4: ⚠️ Potential issue | 🟠 Major

Build the shared client fixture via register_worker(), and guarantee teardown.

This fixture still instantiates III directly, calls _wait_until_connected(), and relies on a fixed sleep. That bypasses the blocking register_worker() contract and still skips shutdown() if setup fails before yield.

Suggested fixture rewrite
 import os
-import time
 
 import pytest
 
-from iii.iii import III
+from iii import register_worker
@@
 `@pytest.fixture`
 def iii_client():
     """Create and connect an III client, shut it down after the test."""
-    client = III(ENGINE_WS_URL)
-    client._wait_until_connected()  # wait for auto-connect to complete
-    time.sleep(0.3)
-    yield client
-    client.shutdown()
+    client = register_worker(ENGINE_WS_URL)
+    try:
+        yield client
+    finally:
+        client.shutdown()

Also applies to: 8-8, 14-21

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/conftest.py` at line 4, Replace the direct
instantiation of III and the manual _wait_until_connected()/time.sleep usage in
the shared client fixture with a call to register_worker() to obtain the shared
client, and remove the fixed sleep; ensure the fixture yields the registered
client and uses a try/finally so shutdown() is always called in teardown even if
setup fails. Specifically, stop calling III(...) and _wait_until_connected() in
the fixture and instead call register_worker() to build the client, yield it,
and in the finally block call client.shutdown() (or the appropriate shutdown
method) to guarantee cleanup.
frameworks/motia/motia-py/packages/motia/tests/conftest.py (2)

31-43: ⚠️ Potential issue | 🟡 Minor

Preserve the last polling error when registration times out.

except Exception: pass hides transport and RPC failures, so the final timeout loses the real reason registration never appeared.

♻️ Suggested fix
 def wait_for_registration(bridge, function_id: str, timeout: float = 5.0) -> None:
     deadline = time.monotonic() + timeout
+    last_error: Exception | None = None
     while time.monotonic() < deadline:
         try:
             result = bridge.trigger({"function_id": "engine::functions::list", "payload": {}})
             functions = result.get("functions", []) if isinstance(result, dict) else []
             ids = [f.get("function_id") for f in functions if isinstance(f, dict)]
             if function_id in ids:
                 return
-        except Exception:
-            pass
+        except Exception as exc:
+            last_error = exc
         time.sleep(0.1)
-    raise TimeoutError(f"Function {function_id} was not registered within {timeout}s")
+    msg = f"Function {function_id} was not registered within {timeout}s"
+    if last_error is not None:
+        raise TimeoutError(msg) from last_error
+    raise TimeoutError(msg)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 31 -
43, The polling function wait_for_registration currently swallows all exceptions
in its loop; update it to capture the last exception raised by bridge.trigger
(e.g., set last_exc = e in the except Exception as e block) and, when timing
out, raise the TimeoutError with the original exception as the cause (use "raise
TimeoutError(...) from last_exc" or include last_exc in the message) so
transport/RPC failures from bridge.trigger are preserved for debugging.

24-28: ⚠️ Potential issue | 🟠 Major

Wait for each scheduled send before dropping it from bridge._queue.

This helper currently removes messages and sleeps, but never waits for the scheduled websocket send to complete. A slow or failed send can race the tests and erase the original failure.

♻️ Suggested fix
 def flush_bridge_queue(bridge) -> None:
     """Flush the bridge queue."""
     while bridge._queue and bridge._ws:
-        bridge._schedule_on_loop(bridge._ws.send(json.dumps(bridge._queue.pop(0))))
-    time.sleep(0.05)
+        payload = json.dumps(bridge._queue[0])
+        fut = bridge._schedule_on_loop(bridge._ws.send(payload))
+        if fut is not None:
+            fut.result(timeout=1.0)
+        bridge._queue.pop(0)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py` around lines 24 -
28, flush_bridge_queue currently pops messages and schedules sends without
waiting for their completion; change it to schedule each send via
bridge._schedule_on_loop(bridge._ws.send(...)) but capture the returned
Future/Task and wait for it to complete before removing the message from
bridge._queue (or only pop after the awaited send succeeds). If
_schedule_on_loop does not return a awaitable, update it to return the scheduled
Task/Future so flush_bridge_queue can await it (using the bridge's event loop or
asyncio utilities) and handle/send errors/timeouts to avoid racing and losing
failures.
frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py (1)

42-43: ⚠️ Potential issue | 🟠 Major

Replace blocking sleeps in these async tests.

These time.sleep(0.3) calls block the event loop and can make the HTTP/bridge assertions flaky. Switch them to await asyncio.sleep(0.3) and add import asyncio at the top.

♻️ Suggested change pattern
-    time.sleep(0.3)
+    await asyncio.sleep(0.3)

Also applies to: 75-76, 113-114, 146-147, 180-181, 213-214, 258-259, 328-329, 400-401

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py` around
lines 42 - 43, The tests are using blocking time.sleep(...) which blocks the
event loop; add "import asyncio" at the top of the test module and replace every
time.sleep(0.3) (e.g., the occurrence after flush_bridge_queue(bridge) and the
other listed occurrences) with "await asyncio.sleep(0.3)" inside the async test
functions (ensure those tests are declared async def so awaits are valid).
sdk/packages/python/iii/README.md (1)

18-20: ⚠️ Potential issue | 🟠 Major

Update the README to the new worker lifecycle.

These snippets still tell users to import III and call iii.connect(), but this PR removes both from the public entrypoint. Copy-pasting this README will fail immediately after merge.

📝 Suggested doc update
-from iii import III
+from iii import register_worker

-iii = III("ws://localhost:49134")
+iii = register_worker("ws://localhost:49134")
@@
-iii.connect()
-
 result = iii.trigger({"function_id": "greet", "payload": {"name": "world"}})
 print(result)  # {"message": "Hello, world!"}
@@
-| Initialize               | `III(url, options?)`                              | Create an SDK instance                                 |
-| Connect                  | `iii.connect()`                                   | Connect to the engine                                  |
+| Initialize               | `register_worker(url, options?)`                  | Create and connect an SDK instance                     |
@@
-| `iii`           | Core SDK (`III`, types)           |
+| `iii`           | Core SDK (`register_worker`, `IIIClient`, types) |

Also applies to: 33-44, 80-80

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/README.md` around lines 18 - 20, Update the README
code examples to stop importing and calling III and iii.connect(); remove any
references to the III class and the iii.connect() call and replace the snippets
with the new worker lifecycle API introduced in this PR (use the new public
entrypoint exported by the package and the new start/creation methods for worker
lifecycle instead of III/iii.connect()). Ensure every example that previously
used the symbols "III" or "iii.connect()" (including the other occurrences
noted) is updated to the new public API and lifecycle calls so copy-pasted
examples work after the change.
🧹 Nitpick comments (13)
frameworks/motia/motia-py/playground/steps/multi_trigger/dual_trigger_step.py (1)

21-21: Nitpick: input shadows the built-in.

The parameter name input shadows Python's built-in input() function. Consider renaming to data or event_data for clarity.

Suggested rename
-    def _event_handler(input: Any) -> None:
-        logger.info("Dual trigger fired (queue)", {"data": input, "topic": ctx.trigger.topic})
+    def _event_handler(data: Any) -> None:
+        logger.info("Dual trigger fired (queue)", {"data": data, "topic": ctx.trigger.topic})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@frameworks/motia/motia-py/playground/steps/multi_trigger/dual_trigger_step.py`
at line 21, The parameter name in function _event_handler shadows the built-in
input(); rename the parameter (e.g., to event_data or data) in the def
_event_handler signature and update every usage inside _event_handler (and any
direct callers) to the new name, keeping the type hint (Any) intact so behavior
doesn't change.
sdk/packages/python/iii/tests/test_worker_metrics.py (1)

99-99: Avoid duplicating gauge-count magic numbers in assertions.

These expectations are valid now, but hard-coding 9 and 18 in multiple places will cause avoidable churn if gauges are added/removed again. Consider deriving counts once per test flow.

♻️ Proposed test-hardening diff
 def test_register_worker_gauges_observes_metrics_once() -> None:
     meter = Mock()
-    gauges = [object() for _ in range(9)]
+    expected_gauge_count = 9
+    gauges = [object() for _ in range(expected_gauge_count)]
     meter.create_observable_gauge.side_effect = gauges
@@
-        assert meter.create_observable_gauge.call_count == 9
-        assert result.observe.call_count == 9
+        assert meter.create_observable_gauge.call_count == expected_gauge_count
+        assert result.observe.call_count == expected_gauge_count
@@
 def test_stop_worker_gauges_resets_registration_state() -> None:
@@
     gauges_module.register_worker_gauges(meter, "worker-2")
+    first_registration_count = meter.create_observable_gauge.call_count
     gauges_module.stop_worker_gauges()
     gauges_module.register_worker_gauges(meter, "worker-3")
 
-    assert meter.create_observable_gauge.call_count == 18
+    assert meter.create_observable_gauge.call_count == first_registration_count * 2

Also applies to: 123-124, 143-143

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_worker_metrics.py` at line 99, The test
currently hard-codes the gauge counts (e.g., creating gauges = [object() for _
in range(9)] and asserting 9 and 18 later); replace those magic numbers by
deriving counts from the gauges list (use len(gauges) and compute any expected
totals like len(gauges) * 2) and update the assertions that reference 9 and 18
to use these derived values so future changes to the gauges array don’t require
multiple assertion edits (update the occurrences tied to the gauges variable and
its related assertions).
sdk/packages/python/iii/tests/test_streams.py (1)

34-50: Consider removing @pytest.mark.asyncio for pure sync tests.

This test is marked async but contains no await statements. While it runs correctly, consider converting to a sync test for clarity. The only tests that genuinely need async are those using await asyncio.sleep() (e.g., test_stream_custom_operations).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_streams.py` around lines 34 - 50, The test
function test_stream_set_new_item is decorated with `@pytest.mark.asyncio` but
contains no awaits; remove the asyncio marker and make it a normal synchronous
test (drop `@pytest.mark.asyncio` and keep def
test_stream_set_new_item(iii_client: III):), since iii_client.trigger is used
synchronously in this test and only tests that use await (e.g.,
test_stream_custom_operations) should remain async.
sdk/packages/python/iii/tests/test_http_external_functions_integration.py (2)

180-181: Potentially redundant sleep after _wait_until_connected().

If _wait_until_connected() already blocks until connected, the additional time.sleep(0.1) may be unnecessary. Consider removing it or documenting why an extra delay is needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_http_external_functions_integration.py`
around lines 180 - 181, The extra time.sleep(0.1) after calling
client._wait_until_connected() appears redundant; remove the time.sleep(0.1)
line from the test (or if there is a known race that requires the pause, replace
it with a brief comment explaining why and/or wait on a deterministic condition
instead), leaving only client._wait_until_connected() to ensure the test doesn't
rely on arbitrary sleeps.

165-169: Short fixed sleep may cause test flakiness.

time.sleep(0.05) is brief and may not be sufficient on slower CI runners or under load. Consider using a retry/polling pattern or increasing the delay for more reliable tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_http_external_functions_integration.py`
around lines 165 - 169, The brief fixed sleep in _make_connected_client may
cause flakiness; replace time.sleep(0.05) with a retry/polling wait that checks
the client's connection state (e.g., a loop that calls a connection predicate on
the III instance such as a is_connected()/connected property or underlying
websocket state) until success or a reasonable timeout (e.g., several seconds),
sleeping a small interval between attempts and raising/asserting on timeout;
update _make_connected_client to use this polling pattern so tests wait
deterministically for III (created with III("ws://fake", InitOptions())) to be
fully connected after the FakeWs monkeypatch.
frameworks/motia/motia-py/packages/motia/src/motia/state.py (1)

122-136: Items without "id" key are silently skipped in clear().

The conditional if isinstance(item, dict) and "id" in item means items lacking an "id" key won't be deleted. If this is intentional (e.g., only specific item types should be cleared), consider adding a debug log for skipped items to aid troubleshooting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/state.py` around lines 122
- 136, The clear method currently skips items that are not dicts or lack an "id"
without any visibility; update the clear(self, scope: str) implementation to (a)
explicitly log or record skipped items so callers can troubleshoot which entries
were ignored (use the same tracing/span or a logger), and (b) keep the existing
behavior of calling self.delete(scope, item["id"]) for dicts with "id"; refer to
the clear, list, and delete methods and the
operation_span/set_span_ok/record_exception span usage so the log/trace is
emitted inside the same span context rather than silently skipping entries.
sdk/packages/python/iii/src/iii/logger.py (1)

67-79: Hex string parsing may raise ValueError on invalid input.

If trace_id or span_id contains a non-hexadecimal string, int(..., 16) will raise a ValueError. Consider validating or catching exceptions if callers might pass malformed IDs.

🛡️ Optional defensive handling
-            if self._trace_id is not None:
-                trace_id = int(self._trace_id, 16)
+            if self._trace_id is not None:
+                try:
+                    trace_id = int(self._trace_id, 16)
+                except ValueError:
+                    trace_id = 0
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/src/iii/logger.py` around lines 67 - 79,
self._trace_id and self._span_id are parsed as hex with int(..., 16) and can
raise ValueError for malformed input; wrap those conversions in try/except
(around the blocks referencing self._trace_id / self._span_id and span_ctx) to
catch ValueError, log or warn about invalid hex IDs, and fall back to using
span_ctx.trace_id/span_ctx.span_id or 0 as the existing else path does; ensure
you only catch ValueError (not broad exceptions) and keep span_ctx.is_valid
behavior intact.
frameworks/motia/motia-py/packages/motia/tests/test_setup_step_endpoint.py (1)

18-23: Make the fake enforce the migrated registration shape.

Allowing both str and {"id": ...} here means this suite will still pass if setup_step_endpoint() keeps using the deprecated function-registration form. Tightening the fake to dict-only inputs will make these tests protect the migration instead of masking it.

♻️ Suggested tightening for the test double
-    def register_function(self, func: dict[str, object] | str, handler: object) -> None:
-        func_id = func["id"] if isinstance(func, dict) else func
-        self.functions[func_id] = handler
+    def register_function(self, func: dict[str, object], handler: object) -> None:
+        func_id = func.get("id")
+        assert isinstance(func_id, str), "register_function expects {'id': <str>}"
+        self.functions[func_id] = handler
 
     def register_trigger(self, trigger: dict[str, object]) -> None:
+        assert isinstance(trigger.get("type"), str)
+        assert isinstance(trigger.get("function_id"), str)
+        assert isinstance(trigger.get("config"), dict)
         self.triggers.append(trigger)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_setup_step_endpoint.py`
around lines 18 - 23, The test double's register_function currently accepts both
str and dict which masks deprecated registration; change register_function to
require a dict only by updating the signature to accept dict[str, object]
(remove | str), remove the isinstance check, and always extract func_id =
func["id"]; if you want explicit safety, raise a TypeError or assert when the
incoming value is not a dict to fail tests that still use the old string form so
the suite protects the migrated setup_step_endpoint() behavior.
sdk/packages/python/iii/tests/test_init_api.py (1)

52-52: Replace fixed sleep with deterministic synchronization.

Line 52 can make this test flaky under slow CI scheduling. Prefer an event-based wait tied to fake_init_otel execution.

Proposed diff
-import time
+import threading
@@
 def test_connect_consumes_otel_from_init_options(monkeypatch) -> None:
@@
     captured = {"config": None}
+    otel_called = threading.Event()
@@
     def fake_init_otel(config=None, loop=None):
         captured["config"] = config
+        otel_called.set()
@@
-    time.sleep(0.05)
+    assert otel_called.wait(timeout=1.0), "init_otel was not invoked"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_init_api.py` at line 52, Replace the
brittle time.sleep(0.05) in test_init_api.py with an event-based wait tied to
fake_init_otel: modify fake_init_otel to signal a threading.Event (or
asyncio.Event) when it finishes, then in the test wait on that event with a
reasonable timeout (e.g., event.wait(timeout=...)) instead of sleeping;
reference the existing time.sleep call and the fake_init_otel helper to locate
where to set and wait on the event so the test deterministically synchronizes
with fake_init_otel's completion.
frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py (1)

101-103: Keep these waits non-blocking inside async tests.

time.sleep() at Lines 101-103, 115, 152-153, and 160 stalls the pytest.mark.asyncio loop. If the delay is still needed, switch back to await asyncio.sleep(...); better yet, poll for registration/span readiness instead of hard-coded sleeps.

Also applies to: 115-115, 152-153, 160-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py`
around lines 101 - 103, The test uses blocking time.sleep calls inside
pytest.mark.asyncio tests (see flush_bridge_queue usage in
test_tracing_integration.py), which stalls the event loop; replace each
time.sleep(...) with await asyncio.sleep(...) or, preferably, implement a short
polling loop that awaits and checks for the expected condition (e.g.,
registration/span readiness) before proceeding so the async loop stays
responsive; update occurrences around flush_bridge_queue and the other noted
lines (115, 152-153, 160) to use the non-blocking await-based wait or a
readiness poll.
sdk/packages/python/iii/tests/test_api_triggers.py (1)

39-39: Use non-blocking waits in these async integration tests.

time.sleep() at these registration boundaries blocks the event loop for every pytest.mark.asyncio case. If the delay is still required, use await asyncio.sleep(...) or replace it with a readiness poll so slow CI workers do not turn this into flaky timing.

Also applies to: 74-74, 112-112, 153-153, 185-185, 224-224, 275-275, 338-338, 413-413, 483-483

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_api_triggers.py` at line 39, Replace
blocking time.sleep calls in the async tests with non-blocking waits: import
asyncio in sdk/packages/python/iii/tests/test_api_triggers.py and change each
time.sleep(…) used inside pytest.mark.asyncio tests (occurrences around lines
39, 74, 112, 153, 185, 224, 275, 338, 413, 483) to await asyncio.sleep(…) or,
preferably, replace the fixed delays with a readiness-poll helper (e.g., an
async wait_for_condition coroutine) that loops with await
asyncio.sleep(small_interval) until the expected registration/event condition is
true; ensure all calling test functions are async so the await works.
sdk/packages/python/iii-example/src/hooks.py (1)

3-4: Use the public FunctionInfo export in this example.

This example now reaches into iii.iii_types, which teaches consumers to depend on an internal module even though FunctionInfo is part of the supported root API in this PR.

♻️ Suggested cleanup
-from iii import ApiRequest, ApiResponse, IIIClient, Logger
-from iii.iii_types import FunctionInfo
+from iii import ApiRequest, ApiResponse, FunctionInfo, IIIClient, Logger
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii-example/src/hooks.py` around lines 3 - 4, The example
imports FunctionInfo from the internal module iii.iii_types; update the import
to use the public export by importing FunctionInfo from the package root
alongside ApiRequest, ApiResponse, IIIClient, and Logger (i.e., replace the
iii.iii_types import with FunctionInfo from iii) so the example relies only on
the supported public API.
frameworks/motia/motia-py/packages/motia/tests/test_event_queue_compat.py (1)

12-19: Remove unused bridge.call mock.

Line 18 sets up bridge.call = MagicMock(), but the tests now use bridge.trigger instead. This mock is dead code after the async-to-sync migration.

Suggested fix
 `@pytest.fixture`
 def mock_bridge() -> MagicMock:
-    """Create a bridge mock with call support."""
+    """Create a bridge mock with trigger support."""
     bridge = MagicMock()
     bridge.register_function = MagicMock()
     bridge.register_trigger = MagicMock()
-    bridge.call = MagicMock()
+    bridge.trigger = MagicMock()
     return bridge
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_event_queue_compat.py`
around lines 12 - 19, In the mock_bridge fixture remove the unused bridge.call =
MagicMock() assignment because tests now use bridge.trigger; update the mock
setup in the mock_bridge function (the MagicMock instance named bridge and its
attributes register_function, register_trigger, and trigger) by deleting the
bridge.call line so only the relevant mocks remain.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py`:
- Around line 57-68: The code uses inspect.iscoroutine() in multiple places
(e.g., inside make_next()/next_handler flow where composed_handler is invoked)
which misses Tasks/Futures/custom awaitables; add a shared helper async def
_await_if_needed(value: Any) -> Any that uses inspect.isawaitable() and await
when true, then replace each pattern like result = fn(...); if
inspect.iscoroutine(result): result = await result with result = await
_await_if_needed(fn(...)) — update all occurrences (including the
composed_handler/make_next call sites and the other listed locations) to call
_await_if_needed instead of inspect.iscoroutine().

In `@frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py`:
- Around line 177-180: The try/except around bridge.trigger({"function_id":
"engine.traces.list", ...}) is too broad and hides real failures; change it to
only catch the specific RPC-not-supported error returned by the engine (inspect
the exception type or message that indicates the RPC method is unsupported) and
call pytest.skip in that branch, but re-raise any other exceptions (connection
errors, timeouts, serialization errors) so actual failures surface; locate the
call to bridge.trigger and update the except block to check the exception (e.g.,
error type or "unsupported" message) before skipping.

In
`@frameworks/motia/motia-py/playground/steps/otel_example/create_order_step.py`:
- Around line 49-50: The current code calls order_stream.set(order_id,
new_order) before enqueue(...), causing a partial-write if enqueue fails; change
this to an atomic pattern: either 1) implement an outbox by persisting the event
alongside the order in the same transactional write (e.g., write order and an
outbox record together and have a background worker publish outbox entries), 2)
wrap the persistence + enqueue in a transaction/rollback so enqueue failure
prevents committing the order, or 3) introduce an idempotency guard by
generating order_id before handling and checking/locking for existing order_id
so retries won’t create duplicates; update the logic around order_stream.set,
enqueue, order_id and new_order accordingly.

In `@sdk/packages/python/iii/tests/test_queue_integration.py`:
- Around line 107-113: chain_a_handler is currently synchronous but calls the
blocking iii_client.trigger() inside a handler running on the event loop; change
the handler to async (async def chain_a_handler(input_data)) and replace the
sync call with an awaited async call to iii_client._async_trigger({...})
(preserving the same payload and TriggerAction.Enqueue(queue="test-chain")) and
then return input_data; ensure you await iii_client._async_trigger(...) so the
call runs non-blocking on the event loop.

In `@sdk/packages/python/iii/tests/test_streams.py`:
- Line 4: Remove the unused import statement "import builtins" from the test
file; locate the top of sdk/packages/python/iii/tests/test_streams.py where the
import appears and delete that line (ensure no other references to builtins
remain in functions or tests in this file such as any occurrences in test_*
functions).

---

Outside diff comments:
In `@engine/examples/python/echo_invoker.py`:
- Around line 15-21: The file is structurally broken: move CLI parsing into a
proper parse_args() that only builds and returns argparse.Namespace (fix missing
import by adding "import argparse"), create a separate main() (or async main())
to perform connection setup and event loop logic currently inside parse_args(),
replace undefined BRIDGE_URL uses with DEFAULT_BRIDGE_URL or args.bridge_url,
and define the missing globals/objects (instantiate a bridge client stored in
bridge, and a logger stored in log) before registering triggers; ensure the
script calls main() (await if async) from the if __name__ == "__main__" block so
the module runs.

---

Duplicate comments:
In `@frameworks/motia/motia-py/packages/motia/tests/conftest.py`:
- Around line 31-43: The polling function wait_for_registration currently
swallows all exceptions in its loop; update it to capture the last exception
raised by bridge.trigger (e.g., set last_exc = e in the except Exception as e
block) and, when timing out, raise the TimeoutError with the original exception
as the cause (use "raise TimeoutError(...) from last_exc" or include last_exc in
the message) so transport/RPC failures from bridge.trigger are preserved for
debugging.
- Around line 24-28: flush_bridge_queue currently pops messages and schedules
sends without waiting for their completion; change it to schedule each send via
bridge._schedule_on_loop(bridge._ws.send(...)) but capture the returned
Future/Task and wait for it to complete before removing the message from
bridge._queue (or only pop after the awaited send succeeds). If
_schedule_on_loop does not return a awaitable, update it to return the scheduled
Task/Future so flush_bridge_queue can await it (using the bridge's event loop or
asyncio utilities) and handle/send errors/timeouts to avoid racing and losing
failures.

In `@frameworks/motia/motia-py/packages/motia/tests/test_api_triggers.py`:
- Around line 42-43: The tests are using blocking time.sleep(...) which blocks
the event loop; add "import asyncio" at the top of the test module and replace
every time.sleep(0.3) (e.g., the occurrence after flush_bridge_queue(bridge) and
the other listed occurrences) with "await asyncio.sleep(0.3)" inside the async
test functions (ensure those tests are declared async def so awaits are valid).

In `@frameworks/motia/motia-py/playground/steps/api_steps/sse_step.py`:
- Line 53: The async function handler contains a blocking call time.sleep(...)
which stalls the event loop; change that call in async def handler to use await
asyncio.sleep(0.3 + random.random() * 0.7) and add an import asyncio if not
present, or remove the specific time import only if it's unused elsewhere;
ensure no other time.sleep usages remain inside async functions (keep
random.random as-is).

In `@sdk/packages/python/iii/README.md`:
- Around line 18-20: Update the README code examples to stop importing and
calling III and iii.connect(); remove any references to the III class and the
iii.connect() call and replace the snippets with the new worker lifecycle API
introduced in this PR (use the new public entrypoint exported by the package and
the new start/creation methods for worker lifecycle instead of
III/iii.connect()). Ensure every example that previously used the symbols "III"
or "iii.connect()" (including the other occurrences noted) is updated to the new
public API and lifecycle calls so copy-pasted examples work after the change.

In `@sdk/packages/python/iii/src/iii/telemetry.py`:
- Around line 471-475: The noop span's get_span_context currently returns None
which breaks callers expecting a SpanContext-like object; update
_NoopSpan.get_span_context to return a sentinel SpanContext object (e.g., create
an internal _NoopSpanContext with is_valid property returning False and any
minimal attributes callers expect, such as trace_id/span_id or methods) instead
of None, or return the library's INVALID/empty span context if available; modify
the _NoopSpan class to instantiate/return this sentinel so callers can safely
call span.get_span_context().is_valid without crashing.

In `@sdk/packages/python/iii/src/iii/utils.py`:
- Around line 56-61: safe_stringify currently only catches TypeError and
ValueError which misses exceptions raised from __str__ or other unexpected
errors during json.dumps; update the exception handler in safe_stringify to
catch a broader Exception (e.g., replace "except (TypeError, ValueError):" with
"except Exception:") so any runtime error during serialization (including from
__str__) returns "[unserializable]" instead of propagating.

In `@sdk/packages/python/iii/tests/conftest.py`:
- Line 4: Replace the direct instantiation of III and the manual
_wait_until_connected()/time.sleep usage in the shared client fixture with a
call to register_worker() to obtain the shared client, and remove the fixed
sleep; ensure the fixture yields the registered client and uses a try/finally so
shutdown() is always called in teardown even if setup fails. Specifically, stop
calling III(...) and _wait_until_connected() in the fixture and instead call
register_worker() to build the client, yield it, and in the finally block call
client.shutdown() (or the appropriate shutdown method) to guarantee cleanup.

In `@sdk/packages/python/iii/tests/test_context_propagation.py`:
- Around line 62-81: Wrap the client usage and assertions in a try/finally so
client.shutdown() always runs even if _handle_invoke or assertions fail: create
the III client (III(...)), register_function(...), call
client._run_on_loop(client._handle_invoke(...)) inside a try block and move
client.shutdown() into the finally block; do the same for the other test block
that calls client._handle_invoke (the duplicate around lines 92-105) to ensure
background thread and OTel state are always cleaned up.

In `@sdk/packages/python/iii/tests/test_sync_api.py`:
- Around line 66-75: The test currently constructs III("ws://fake",
InitOptions()) and uses client._wait_until_connected() which bypasses the public
API; replace that with a call to the public helper register_worker() so the test
actually exercises the contract: call client.register_worker(...) (or the
module-level register_worker helper if that’s the supported entry) and
wait/assert its returned/observed connected state instead of calling the private
_wait_until_connected(), then assert client.get_connection_state() ==
"connected" and call client.shutdown(); update references in
test_register_worker_connects to use register_worker() and remove the direct
_wait_until_connected() usage.

In `@sdk/README.md`:
- Around line 95-101: Update the Python API entries in the table and the
paragraph to use the new, accurate Python signatures: change initialization to
reference register_worker(...) as the Python way to create/connect the SDK
(instead of init()), update the Python register function row to show the
dict-based register_function({...}) signature, and change the Python invoke rows
to the synchronous trigger(...) form (and the fire-and-forget variant using
TriggerAction.Void() in the same style). Also update the paragraph about init()
to describe register_worker(...) creating an SDK instance and auto-connecting
with WebSocket/reconnect/OpenTelemetry behavior so the prose matches the new
Python API names (referencing register_worker, register_function, and trigger).

---

Nitpick comments:
In `@frameworks/motia/motia-py/packages/motia/src/motia/state.py`:
- Around line 122-136: The clear method currently skips items that are not dicts
or lack an "id" without any visibility; update the clear(self, scope: str)
implementation to (a) explicitly log or record skipped items so callers can
troubleshoot which entries were ignored (use the same tracing/span or a logger),
and (b) keep the existing behavior of calling self.delete(scope, item["id"]) for
dicts with "id"; refer to the clear, list, and delete methods and the
operation_span/set_span_ok/record_exception span usage so the log/trace is
emitted inside the same span context rather than silently skipping entries.

In `@frameworks/motia/motia-py/packages/motia/tests/test_event_queue_compat.py`:
- Around line 12-19: In the mock_bridge fixture remove the unused bridge.call =
MagicMock() assignment because tests now use bridge.trigger; update the mock
setup in the mock_bridge function (the MagicMock instance named bridge and its
attributes register_function, register_trigger, and trigger) by deleting the
bridge.call line so only the relevant mocks remain.

In `@frameworks/motia/motia-py/packages/motia/tests/test_setup_step_endpoint.py`:
- Around line 18-23: The test double's register_function currently accepts both
str and dict which masks deprecated registration; change register_function to
require a dict only by updating the signature to accept dict[str, object]
(remove | str), remove the isinstance check, and always extract func_id =
func["id"]; if you want explicit safety, raise a TypeError or assert when the
incoming value is not a dict to fail tests that still use the old string form so
the suite protects the migrated setup_step_endpoint() behavior.

In `@frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py`:
- Around line 101-103: The test uses blocking time.sleep calls inside
pytest.mark.asyncio tests (see flush_bridge_queue usage in
test_tracing_integration.py), which stalls the event loop; replace each
time.sleep(...) with await asyncio.sleep(...) or, preferably, implement a short
polling loop that awaits and checks for the expected condition (e.g.,
registration/span readiness) before proceeding so the async loop stays
responsive; update occurrences around flush_bridge_queue and the other noted
lines (115, 152-153, 160) to use the non-blocking await-based wait or a
readiness poll.

In
`@frameworks/motia/motia-py/playground/steps/multi_trigger/dual_trigger_step.py`:
- Line 21: The parameter name in function _event_handler shadows the built-in
input(); rename the parameter (e.g., to event_data or data) in the def
_event_handler signature and update every usage inside _event_handler (and any
direct callers) to the new name, keeping the type hint (Any) intact so behavior
doesn't change.

In `@sdk/packages/python/iii-example/src/hooks.py`:
- Around line 3-4: The example imports FunctionInfo from the internal module
iii.iii_types; update the import to use the public export by importing
FunctionInfo from the package root alongside ApiRequest, ApiResponse, IIIClient,
and Logger (i.e., replace the iii.iii_types import with FunctionInfo from iii)
so the example relies only on the supported public API.

In `@sdk/packages/python/iii/src/iii/logger.py`:
- Around line 67-79: self._trace_id and self._span_id are parsed as hex with
int(..., 16) and can raise ValueError for malformed input; wrap those
conversions in try/except (around the blocks referencing self._trace_id /
self._span_id and span_ctx) to catch ValueError, log or warn about invalid hex
IDs, and fall back to using span_ctx.trace_id/span_ctx.span_id or 0 as the
existing else path does; ensure you only catch ValueError (not broad exceptions)
and keep span_ctx.is_valid behavior intact.

In `@sdk/packages/python/iii/tests/test_api_triggers.py`:
- Line 39: Replace blocking time.sleep calls in the async tests with
non-blocking waits: import asyncio in
sdk/packages/python/iii/tests/test_api_triggers.py and change each time.sleep(…)
used inside pytest.mark.asyncio tests (occurrences around lines 39, 74, 112,
153, 185, 224, 275, 338, 413, 483) to await asyncio.sleep(…) or, preferably,
replace the fixed delays with a readiness-poll helper (e.g., an async
wait_for_condition coroutine) that loops with await
asyncio.sleep(small_interval) until the expected registration/event condition is
true; ensure all calling test functions are async so the await works.

In `@sdk/packages/python/iii/tests/test_http_external_functions_integration.py`:
- Around line 180-181: The extra time.sleep(0.1) after calling
client._wait_until_connected() appears redundant; remove the time.sleep(0.1)
line from the test (or if there is a known race that requires the pause, replace
it with a brief comment explaining why and/or wait on a deterministic condition
instead), leaving only client._wait_until_connected() to ensure the test doesn't
rely on arbitrary sleeps.
- Around line 165-169: The brief fixed sleep in _make_connected_client may cause
flakiness; replace time.sleep(0.05) with a retry/polling wait that checks the
client's connection state (e.g., a loop that calls a connection predicate on the
III instance such as a is_connected()/connected property or underlying websocket
state) until success or a reasonable timeout (e.g., several seconds), sleeping a
small interval between attempts and raising/asserting on timeout; update
_make_connected_client to use this polling pattern so tests wait
deterministically for III (created with III("ws://fake", InitOptions())) to be
fully connected after the FakeWs monkeypatch.

In `@sdk/packages/python/iii/tests/test_init_api.py`:
- Line 52: Replace the brittle time.sleep(0.05) in test_init_api.py with an
event-based wait tied to fake_init_otel: modify fake_init_otel to signal a
threading.Event (or asyncio.Event) when it finishes, then in the test wait on
that event with a reasonable timeout (e.g., event.wait(timeout=...)) instead of
sleeping; reference the existing time.sleep call and the fake_init_otel helper
to locate where to set and wait on the event so the test deterministically
synchronizes with fake_init_otel's completion.

In `@sdk/packages/python/iii/tests/test_streams.py`:
- Around line 34-50: The test function test_stream_set_new_item is decorated
with `@pytest.mark.asyncio` but contains no awaits; remove the asyncio marker and
make it a normal synchronous test (drop `@pytest.mark.asyncio` and keep def
test_stream_set_new_item(iii_client: III):), since iii_client.trigger is used
synchronously in this test and only tests that use await (e.g.,
test_stream_custom_operations) should remain async.

In `@sdk/packages/python/iii/tests/test_worker_metrics.py`:
- Line 99: The test currently hard-codes the gauge counts (e.g., creating gauges
= [object() for _ in range(9)] and asserting 9 and 18 later); replace those
magic numbers by deriving counts from the gauges list (use len(gauges) and
compute any expected totals like len(gauges) * 2) and update the assertions that
reference 9 and 18 to use these derived values so future changes to the gauges
array don’t require multiple assertion edits (update the occurrences tied to the
gauges variable and its related assertions).

Comment on lines +57 to +68
result = m(req, ctx, p)
if inspect.iscoroutine(result):
return await result
return result

return next_handler

composed_handler = make_next()
return await composed_handler()
result = composed_handler()
if inspect.iscoroutine(result):
return await result
return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import asyncio
import inspect

async def coro():
    return 1

class CustomAwaitable:
    def __await__(self):
        return coro().__await__()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

task = loop.create_task(coro())
future = loop.create_future()

for name, value in [("task", task), ("future", future), ("custom", CustomAwaitable())]:
    print(name, "iscoroutine=", inspect.iscoroutine(value), "isawaitable=", inspect.isawaitable(value))

task.cancel()
loop.close()
PY

Repository: iii-hq/iii

Length of output: 369


🏁 Script executed:

# First, let's find and examine the runtime.py file
find . -name "runtime.py" -path "*/motia/*" | head -5

Repository: iii-hq/iii

Length of output: 117


🏁 Script executed:

# Check the specific file mentioned
head -100 "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py"

Repository: iii-hq/iii

Length of output: 3051


🏁 Script executed:

# Search for inspect.iscoroutine usage in the file
rg "inspect\.iscoroutine" "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py" -n

Repository: iii-hq/iii

Length of output: 699


🏁 Script executed:

# Check for import of inspect
rg "^import inspect|^from inspect" "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py" -n

Repository: iii-hq/iii

Length of output: 70


🏁 Script executed:

# Verify the pattern around lines 57-68
sed -n '50,75p' "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py"

Repository: iii-hq/iii

Length of output: 865


🏁 Script executed:

# Check a few more locations to confirm the pattern is identical
sed -n '310,325p' "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py"

Repository: iii-hq/iii

Length of output: 910


🏁 Script executed:

# Check another location
sed -n '545,560p' "frameworks/motia/motia-py/packages/motia/src/motia/runtime.py"

Repository: iii-hq/iii

Length of output: 656


Replace inspect.iscoroutine() with a shared helper using inspect.isawaitable().

The current code uses inspect.iscoroutine() at 12 locations throughout this file, but this only catches coroutine objects. asyncio.Task, asyncio.Future, and custom __await__ implementations will bypass the awaiting logic and leak into runtime response handling. All instances (lines 58, 66, 316, 320, 387, 426, 463, 497, 551, 586, 609, 631) should use inspect.isawaitable() instead.

Extract a helper function:

Suggested normalization helper
async def _await_if_needed(value: Any) -> Any:
    if inspect.isawaitable(value):
        return await value
    return value

Then replace all instances like:

result = handler(...)
if inspect.iscoroutine(result):
    result = await result

With:

result = await _await_if_needed(handler(...))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/src/motia/runtime.py` around lines
57 - 68, The code uses inspect.iscoroutine() in multiple places (e.g., inside
make_next()/next_handler flow where composed_handler is invoked) which misses
Tasks/Futures/custom awaitables; add a shared helper async def
_await_if_needed(value: Any) -> Any that uses inspect.isawaitable() and await
when true, then replace each pattern like result = fn(...); if
inspect.iscoroutine(result): result = await result with result = await
_await_if_needed(fn(...)) — update all occurrences (including the
composed_handler/make_next call sites and the other listed locations) to call
_await_if_needed instead of inspect.iscoroutine().

Comment on lines 177 to 180
try:
result = await bridge.trigger({"function_id": "engine.traces.list", "payload": {"limit": 10}})
result = bridge.trigger({"function_id": "engine.traces.list", "payload": {"limit": 10}})
except Exception:
pytest.skip("Engine does not support 'engine.traces.list' RPC; " "skipping traceparent verification")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "test_tracing_integration.py" -type f

Repository: iii-hq/iii

Length of output: 130


🏁 Script executed:

head -200 ./frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py | tail -50

Repository: iii-hq/iii

Length of output: 1997


🏁 Script executed:

sed -n '170,190p' ./frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py

Repository: iii-hq/iii

Length of output: 758


🏁 Script executed:

grep -n "def bridge" ./frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

find ./frameworks/motia/motia-py -name "conftest.py" -type f

Repository: iii-hq/iii

Length of output: 114


🏁 Script executed:

grep -r "class.*bridge" ./frameworks/motia/motia-py/packages/motia --include="*.py" -i | head -20

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

cat ./frameworks/motia/motia-py/packages/motia/tests/conftest.py

Repository: iii-hq/iii

Length of output: 1808


🏁 Script executed:

grep -r "def trigger" ./frameworks/motia/motia-py --include="*.py" -B 2 -A 5

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

find ./frameworks -name "iii.py" -type f

Repository: iii-hq/iii

Length of output: 113


🏁 Script executed:

grep -r "class III" ./frameworks --include="*.py" -A 20

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

wc -l ./frameworks/motia/motia-py/packages/motia/src/motia/iii.py

Repository: iii-hq/iii

Length of output: 116


🏁 Script executed:

grep -n "def trigger" ./frameworks/motia/motia-py/packages/motia/src/motia/iii.py

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

cat ./frameworks/motia/motia-py/packages/motia/src/motia/iii.py

Repository: iii-hq/iii

Length of output: 2171


🏁 Script executed:

find ./frameworks -path "*iii/iii.py" -type f

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

find ./frameworks -type d -name "iii"

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

find . -name "iii.py" -type f | grep -v "__pycache__"

Repository: iii-hq/iii

Length of output: 154


🏁 Script executed:

rg "class III" --type py | head -20

Repository: iii-hq/iii

Length of output: 171


🏁 Script executed:

rg "def trigger" --type py -A 10 | head -50

Repository: iii-hq/iii

Length of output: 1894


🏁 Script executed:

wc -l ./sdk/packages/python/iii/src/iii/iii.py

Repository: iii-hq/iii

Length of output: 99


🏁 Script executed:

grep -n "def trigger" ./sdk/packages/python/iii/src/iii/iii.py

Repository: iii-hq/iii

Length of output: 131


🏁 Script executed:

sed -n '200,350p' ./sdk/packages/python/iii/src/iii/iii.py

Repository: iii-hq/iii

Length of output: 6114


🏁 Script executed:

sed -n '764,850p' ./sdk/packages/python/iii/src/iii/iii.py

Repository: iii-hq/iii

Length of output: 3444


🏁 Script executed:

grep -n "_handle_result" ./sdk/packages/python/iii/src/iii/iii.py -A 15

Repository: iii-hq/iii

Length of output: 1339


🏁 Script executed:

rg "RPC" --type py ./sdk/packages/python/iii/src/iii/

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

rg "not supported" --type py ./sdk/packages/python/iii/src/iii/

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

rg "not supported\|not.supported\|unsupported" --type py ./frameworks/motia/motia-py/packages/motia/tests/ -i

Repository: iii-hq/iii

Length of output: 36


🏁 Script executed:

rg "RPC" ./frameworks/motia/motia-py/packages/motia/ --type py -C 3

Repository: iii-hq/iii

Length of output: 1651


🏁 Script executed:

grep -r "function_id.*engine" ./frameworks/motia/motia-py/packages/motia/tests/ --include="*.py"

Repository: iii-hq/iii

Length of output: 383


Narrow the skip to the specific "RPC not supported" error case.

The broad except Exception at lines 177-180 catches all errors including connection failures, timeouts, and serialization issues. Only skip when the engine explicitly indicates the RPC method is unsupported; re-raise all other exceptions to surface real failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/packages/motia/tests/test_tracing_integration.py`
around lines 177 - 180, The try/except around bridge.trigger({"function_id":
"engine.traces.list", ...}) is too broad and hides real failures; change it to
only catch the specific RPC-not-supported error returned by the engine (inspect
the exception type or message that indicates the RPC method is unsupported) and
call pytest.skip in that branch, but re-raise any other exceptions (connection
errors, timeouts, serialization errors) so actual failures surface; locate the
call to bridge.trigger and update the except block to check the exception (e.g.,
error type or "unsupported" message) before skipping.

Comment on lines +49 to +50
order_stream.set("pending", order_id, new_order)
enqueue({"topic": "order.created", "data": new_order})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The synchronous enqueue makes this a partial-write path.

order_stream.set() commits the order before enqueue() runs. If enqueue() raises, the request fails even though the order is already persisted, and a client retry creates a second order because the ID is generated inside the handler. This needs rollback, an outbox, or an idempotency guard around the pair.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frameworks/motia/motia-py/playground/steps/otel_example/create_order_step.py`
around lines 49 - 50, The current code calls order_stream.set(order_id,
new_order) before enqueue(...), causing a partial-write if enqueue fails; change
this to an atomic pattern: either 1) implement an outbox by persisting the event
alongside the order in the same transactional write (e.g., write order and an
outbox record together and have a background worker publish outbox entries), 2)
wrap the persistence + enqueue in a transaction/rollback so enqueue failure
prevents committing the order, or 3) introduce an idempotency guard by
generating order_id before handling and checking/locking for existing order_id
so retries won’t create duplicates; update the logic around order_stream.set,
enqueue, order_id and new_order accordingly.

Comment on lines +107 to 113
def chain_a_handler(input_data):
iii_client.trigger({
"function_id": "test.queue.py.chain-b",
"payload": {**input_data, "chained": True},
"action": TriggerAction.Enqueue(queue="test-chain"),
})
return input_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect whether sync trigger guards against same-thread calls into the client loop.
rg -n -C4 --type=py '\bdef trigger\b|\bdef _async_trigger\b|\bdef _run_on_loop\b|run_coroutine_threadsafe|current_thread\(|_thread\b' sdk/packages/python/iii/src/iii

Repository: iii-hq/iii

Length of output: 5854


Fix sync trigger() call within handler to use async method.

At Lines 107-112, chain_a_handler() is defined as sync but calls iii_client.trigger(), which is a blocking method. The SDK's _run_on_loop() guard explicitly prevents calling sync methods from the event loop thread (where handlers execute) and will raise RuntimeError: "Cannot call sync SDK methods from the event loop thread. Use async handler methods instead." Make the handler async and await _async_trigger() instead.

Fix
-    def chain_a_handler(input_data):
-        iii_client.trigger({
+    async def chain_a_handler(input_data):
+        await iii_client._async_trigger({
             "function_id": "test.queue.py.chain-b",
             "payload": {**input_data, "chained": True},
             "action": TriggerAction.Enqueue(queue="test-chain"),
         })
         return input_data
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def chain_a_handler(input_data):
iii_client.trigger({
"function_id": "test.queue.py.chain-b",
"payload": {**input_data, "chained": True},
"action": TriggerAction.Enqueue(queue="test-chain"),
})
return input_data
async def chain_a_handler(input_data):
await iii_client._async_trigger({
"function_id": "test.queue.py.chain-b",
"payload": {**input_data, "chained": True},
"action": TriggerAction.Enqueue(queue="test-chain"),
})
return input_data
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_queue_integration.py` around lines 107 -
113, chain_a_handler is currently synchronous but calls the blocking
iii_client.trigger() inside a handler running on the event loop; change the
handler to async (async def chain_a_handler(input_data)) and replace the sync
call with an awaited async call to iii_client._async_trigger({...}) (preserving
the same payload and TriggerAction.Enqueue(queue="test-chain")) and then return
input_data; ensure you await iii_client._async_trigger(...) so the call runs
non-blocking on the event loop.

"""Integration tests for stream operations."""

import asyncio
import builtins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused import.

The builtins import is not used anywhere in this file.

🧹 Proposed fix
 import asyncio
-import builtins
 import time
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import builtins
import asyncio
import time
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/packages/python/iii/tests/test_streams.py` at line 4, Remove the unused
import statement "import builtins" from the test file; locate the top of
sdk/packages/python/iii/tests/test_streams.py where the import appears and
delete that line (ensure no other references to builtins remain in functions or
tests in this file such as any occurrences in test_* functions).

- Simplified the `register_worker` section by removing the connection details.
- Enhanced descriptions and examples for `create_channel`, `create_stream`, `get_connection_state`, `list_functions`, `list_triggers`, `list_workers`, and `on_functions_available`.
- Improved parameter descriptions for `register_function` to clarify input types and usage.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant