Add IoT pipeline example with multi-language support#6
Add IoT pipeline example with multi-language support#6andersonleal wants to merge 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughAdds a multi-language IoT pipeline example: Rust sensor worker, Python analytics worker, and Node.js API gateway plus frontend, streams implementation, tests, docs, and orchestration scripts. Introduces new modules, configs, and cross-worker integrations (streams, function registrations, triggers, and peer discovery). Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant NodeGW as Node.js API Gateway
participant Engine as III Engine
participant RustW as Rust Sensor Worker
participant PythonW as Python Analytics Worker
Client->>NodeGW: POST /sensors/ingest
NodeGW->>Engine: call sensors::data::ingest
Engine->>RustW: invoke sensors::data::ingest
RustW->>Engine: write streams / update stats
RustW->>Engine: stream batch -> PythonW
Engine->>PythonW: call analytics::batch::process
PythonW->>PythonW: run anomaly detection / stats
PythonW->>Engine: call api::alerts::notify
Engine->>NodeGW: emit threshold trigger / events
NodeGW-->>Client: 200 OK
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
|
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (10)
examples/iot-pipeline/python-worker/pyproject.toml (2)
13-17: Consider pinning pytest-asyncio version.The
asyncio_mode = "auto"setting requirespytest-asyncio >= 0.18.0. Without a version constraint, users with an older version installed system-wide may encounter configuration errors.📦 Suggested version pin
[project.optional-dependencies] -dev = ["pytest", "pytest-asyncio"] +dev = ["pytest>=7.0", "pytest-asyncio>=0.21"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/pyproject.toml` around lines 13 - 17, Update the dev optional dependency list in pyproject.toml to pin pytest-asyncio to a compatible minimum (e.g., "pytest-asyncio>=0.18.0") so the existing pytest config option asyncio_mode = "auto" works reliably; modify the [project.optional-dependencies] dev array entry that currently lists "pytest-asyncio" to include the version constraint.
22-23: Generic script name may cause conflicts.The entry point
mainis very generic and could shadow commands from other packages or confuse users. A more descriptive name would improve clarity.✏️ Suggested rename
[project.scripts] -main = "src.main:main" +iot-analytics-worker = "src.main:main"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/pyproject.toml` around lines 22 - 23, The script entry point currently uses a generic name "main" under project.scripts which can cause conflicts; rename it to a more descriptive identifier (e.g., "iot_worker" or "iot_pipeline_main") and update the entry to point at the same function (for example change main = "src.main:main" to iot_worker = "src.main:main"), then search the repo for any references to the old "main" script name (docs, CI, README, tests) and update them to the new script name so callers and documentation remain consistent.examples/iot-pipeline/smoke.sh (1)
96-99: Fail fast on missing worker toolchains.Only
iiiis checked up front. Ifcargo,uv,npx, orcurlis missing, the background process dies and the script later times out inwait_for_workers, which hides the real problem.Suggested refactor
-if ! command -v iii >/dev/null 2>&1; then - echo "ERROR: 'iii' command not found. Install the iii engine first." - exit 1 -fi +for cmd in iii curl cargo uv npx; do + if ! command -v "$cmd" >/dev/null 2>&1; then + echo "ERROR: '$cmd' command not found." + exit 1 + fi +done🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/smoke.sh` around lines 96 - 99, The script only checks for the 'iii' tool and lets missing utilities (cargo, uv, npx, curl) cause background workers to die and later time out in wait_for_workers; add a preflight check that validates all required commands (at least 'iii', 'cargo', 'uv', 'npx', 'curl') before starting workers — implement this by defining a REQUIRED_CMDS list/array in smoke.sh and looping over it to command -v each item, echoing a clear error and exit 1 if any are missing so wait_for_workers never hides the real failure.examples/iot-pipeline/node-worker/src/index.ts (1)
223-236: Document the threshold multiplier magic number.The threshold calculation
config.z_score_threshold * 20uses a magic number without explanation. Consider extracting this to a named constant or adding a comment explaining the conversion from z-score threshold to the trigger threshold value.🔧 Suggested improvement
+// Multiplier to convert z-score threshold to raw value threshold +// (assumes sensor values are approximately 20x the z-score scale) +const ZSCORE_TO_VALUE_MULTIPLIER = 20 + // Register threshold triggers for each sensor (maps state thresholds to trigger registrations) for (const [sensorId, config] of Object.entries(DEFAULT_THRESHOLDS)) { iii.registerTrigger({ type: 'threshold', function_id: 'api::alerts::notify', - config: { sensor_id: sensorId, threshold: config.z_score_threshold * 20 }, + config: { sensor_id: sensorId, threshold: config.z_score_threshold * ZSCORE_TO_VALUE_MULTIPLIER }, }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/node-worker/src/index.ts` around lines 223 - 236, The multiplier "20" used when computing trigger thresholds (config.z_score_threshold * 20) is a magic number—extract it into a named constant (e.g., Z_SCORE_TO_TRIGGER_MULTIPLIER) or add an inline comment explaining the conversion from z-score to trigger value, then use that constant in the iii.registerTrigger call; update references around DEFAULT_THRESHOLDS, the loop calling iii.registerTrigger, and the config.z_score_threshold expression so the intent is documented and reusable.examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs (2)
69-85: Consider logging or returning more useful data from handle_stats.The
handle_statshandler is documented as an "external invocation endpoint" but only logs and returns status. Since the actual aggregation happens iningest_with_streams, consider documenting this clearly or returning actual stats if callers expect them.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs` around lines 69 - 85, The handler handle_stats currently only returns a simple status and logs sensor_id; update it to provide useful data or clarify behavior: either (A) fetch and return real aggregation results by calling the aggregation helper used by ingest (e.g., invoke ingest_with_streams or a dedicated get_aggregate_stats(sensor_id) function) and include fields like last_updated, count, avg, min, max, and sensor_id, or (B) change the returned payload and eprintln() to clearly indicate this is a proxy/placeholder and document that actual aggregation occurs in ingest_with_streams (e.g., include "note": "aggregation performed in ingest_with_streams"). Ensure references to the function name handle_stats and ingest_with_streams are present so reviewers can locate the change.
54-67: Simplify the current_stats extraction.Line 62 uses a convoluted pattern that checks for an object but then returns the original value regardless. This can be simplified.
♻️ Suggested simplification
// Step 1: Read current stats by sending an empty update let read_result = streams.update(&key, vec![]).await?; - let current_stats = read_result.new_value.as_object().and_then(|_| Some(&read_result.new_value)); + let current_stats = read_result.new_value.as_object().map(|_| &read_result.new_value); // Step 2: Compute ops with context and apply let ops = update_stats_ops(reading, current_stats);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs` around lines 54 - 67, In update_stats replace the convoluted extraction of current_stats (currently using read_result.new_value.as_object().and_then(|_| Some(&read_result.new_value))) with a simpler mapping such as read_result.new_value.as_object().map(|_| &read_result.new_value) or an is_object() check (e.g. if read_result.new_value.is_object() { Some(&read_result.new_value) } else { None }); this keeps the same semantics while simplifying the code around read_result.new_value/current_stats in update_stats.examples/iot-pipeline/python-worker/tests/test_stats.py (1)
38-77: Cover the all-sensors branch as well.These tests only exercise
compute({"sensor_id": ...}, rb), butexamples/iot-pipeline/python-worker/src/analytics/stats.pyalso exposes a second branch that returns{"sensors": ..., "total_sensors": ...}whensensor_idis omitted. A small multi-sensor case here would keep that contract from regressing.Possible test addition
+@pytest.mark.asyncio +async def test_compute_stats_all_sensors_summary() -> None: + rb = TimeSeriesRingBuffer(max_size=100) + await _populate_buffer(rb, "temp-001", [10, 20]) + await _populate_buffer(rb, "temp-002", [30]) + + result = await compute({}, rb) + + assert result["total_sensors"] == 2 + assert {item["sensor_id"] for item in result["sensors"]} == {"temp-001", "temp-002"}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/tests/test_stats.py` around lines 38 - 77, Add a test that exercises the compute() branch when no "sensor_id" is provided: create a TimeSeriesRingBuffer, populate it with values for multiple sensor IDs using the existing _populate_buffer helper, call compute({}, rb), and assert the returned dict contains a "sensors" entry with per-sensor stats and a "total_sensors" count matching the number of distinct sensors; reference the compute function and TimeSeriesRingBuffer to locate the code to test.examples/iot-pipeline/python-worker/src/analytics/stats.py (1)
59-62: Avoid duplicating the"timeseries"stream name.
examples/iot-pipeline/python-worker/src/main.pyregisters the ring buffer under the same literal. If that name changes later, this branch silently returns an empty summary while the per-sensor path still works. Sharing a constant or config between registration and lookup would make that coupling explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/analytics/stats.py` around lines 59 - 62, The code hardcodes the stream name "timeseries" when calling ring_buffer.list_groups via StreamListGroupsInput(stream_name="timeseries"); extract that literal into a shared constant or config used by both registration (in main.py) and lookup (in analytics/stats.py) so the ring buffer registration and listing use the same source of truth (e.g., define TIMESERIES_STREAM_NAME in a shared module and reference it in the ring_buffer registration and in StreamListGroupsInput).examples/iot-pipeline/python-worker/src/main.py (2)
103-110: Consider usingasyncio.Eventfor peer readiness signaling.The polling loop with
sleep(0.5)works butasyncio.Eventprovides cleaner signaling and eliminates unnecessary wake-ups. This is optional since the current approach is functional.♻️ Optional: Event-based signaling
+peers_ready_event = asyncio.Event() + def on_functions_available(functions: list) -> None: """Accumulate discovered peer function IDs and log new discoveries.""" - global peers_ready for fn in functions: if fn.function_id in EXPECTED_PEERS and fn.function_id not in discovered_ids: discovered_ids.add(fn.function_id) print(f"[discovery] Discovered peer function: {fn.function_id}") - if not peers_ready and EXPECTED_PEERS and EXPECTED_PEERS.issubset(discovered_ids): - peers_ready = True + if EXPECTED_PEERS and EXPECTED_PEERS.issubset(discovered_ids): + peers_ready_event.set() print(f"[discovery] All peers ready: {discovered_ids}")Then in
_async_main:try: await asyncio.wait_for(peers_ready_event.wait(), timeout=30.0) except asyncio.TimeoutError: print(f"[discovery] Timeout: not all peers found after 30s. Got: {discovered_ids}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/main.py` around lines 103 - 110, The current peer-wait loop in poll_peers uses a sleep-based polling on the boolean peers_ready; replace this with an asyncio.Event to avoid busy-waiting: create an asyncio.Event (e.g., peers_ready_event), set it where peers_ready would be set, change poll_peers/_async_main to await peers_ready_event.wait() and keep the asyncio.wait_for timeout logic and the timeout print that references discovered_ids; update references to peers_ready to use peers_ready_event and remove the sleep loop.
113-114: No graceful shutdown mechanism.The infinite keep-alive loop has no way to terminate cleanly on SIGTERM/SIGINT. For example code this is acceptable, but production deployments would benefit from signal handling.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/main.py` around lines 113 - 114, The infinite keep-alive loop ("while True: await asyncio.sleep(60)") has no shutdown hook; replace it with an asyncio-compatible stop mechanism (e.g., create an asyncio.Event named stop_event and await stop_event.wait() instead of the infinite sleep) and register signal handlers for SIGINT and SIGTERM that call stop_event.set() (use loop.add_signal_handler where available or signal.signal fallback). Ensure any cleanup and graceful shutdown logic runs after stop_event is set so the worker can terminate cleanly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/iot-pipeline/FUNCTION_IDS.md`:
- Around line 3-5: The file documents a canonical function ID shape as
`namespace::group::action` but several entries (e.g.,
`api::http::get::system_workers`, `api::http::post::...`) use four segments,
causing inconsistency; update FUNCTION_IDS.md so the rule matches the entries by
either (A) changing the canonical pattern to allow an extra segment (e.g.,
`namespace::transport::group::action` or `namespace::group::action[:transport]`)
and give the HTTP example (`api::http::get::system_workers`) as the explicit
pattern, or (B) refactor the listed HTTP IDs to conform to
`namespace::group::action`; pick one approach and make the intro, examples, and
the entries at lines referenced (like the `api::http::get::system_workers`
entries) consistent with that chosen pattern.
- Line 14: Update the FUNCTION_IDS.md entry for sensors::aggregate::stats to
remove the misleading "Atomic aggregation" phrasing and instead indicate that
updates are non-atomic/read-then-write and yield approximate min/max under
concurrency; reference the implementation in
examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs
(sensors::aggregate::stats) and describe it as "Approximate aggregation:
read-then-write update, increment counts, merge stats (non-atomic; min/max
approximate under concurrency)". Ensure the new description clearly warns about
non-atomic semantics and concurrent approximation.
- Around line 16-22: The registry is missing the function ID
analytics::batch::process; update FUNCTION_IDS.md under the Analytics Worker
table to add a row for analytics::batch::process with a short description (e.g.,
"Batch-process sensor readings / aggregate into ring buffer") and the
appropriate Phase (use 3 to match other analytics entries) so the documented
registry matches the actual invocation of analytics::batch::process.
In `@examples/iot-pipeline/node-worker/package.json`:
- Around line 4-6: The "dev" script in package.json currently invokes Bun
("dev": "bun src/index.ts") while the README only documents Node/npm/npx; update
the dev script to use the same runtime as "start" (e.g., set the "dev" script to
use tsx like the "start" script) so npm run dev works with the documented
prerequisites, or alternatively update README/prerequisites to explicitly list
Bun; modify the package.json "dev" script (and README if you choose the Bun
route) to keep runtime choice consistent with the "start" script.
In `@examples/iot-pipeline/node-worker/src/triggers/threshold.ts`:
- Around line 29-38: In checkReading (in threshold.ts) the payload sent via
iii.callVoid(function_id, ...) lacks the alert handler fields; update the
payload to include severity, message, reading, and z_score so it matches
api::alerts::notify contract: set reading to the sensor value, include a human
message (e.g. "sensor {sensorId} exceeded threshold"), add a severity string
(e.g. "warning" or derive from how far value exceeds config.threshold), and
provide z_score (compute if you have stats or set to null) along with the
existing trigger_id, sensor_id, value/threshold/exceeded_by when calling
function_id.
In `@examples/iot-pipeline/python-worker/src/analytics/batch.py`:
- Around line 38-41: The code currently rebuilds the payload passed to
anomaly.detect using only sensor_id and value, which drops timestamp and other
fields; change the call in batch.py to pass the original reading object (the
variable reading) as the first argument to anomaly.detect so the complete event
(including timestamp and any extra fields) gets persisted to the ring_buffer;
keep the existing ring_buffer argument and invocation of anomaly.detect
unchanged otherwise.
- Around line 55-76: The code currently treats every alert attempt as dispatched
because it appends to alert_results for both successes and failures; modify the
logic around the iii.call("api::alerts::notify", ...) and the try/except so that
alert_results only records successful notifications (e.g., append alert_result
only when the call returns a success status) or keep all results but change the
return value to compute alerts_dispatched = sum(1 for r in alert_results if
r.get("status") == "ok" or "error" not in r); update the final return dict (the
alerts_dispatched field) to use that success-counting logic instead of
len(alert_results) and keep processed/anomalies_found as-is.
In `@examples/iot-pipeline/python-worker/src/streams/ring_buffer.py`:
- Around line 79-82: The delete method currently removes the item from the group
map but leaves its id in the insertion-order deque (_order), causing stale ids
to accumulate; update the async delete(self, input: StreamDeleteInput)
implementation to also remove input.item_id from the _order deque (use
_order.remove(input.item_id) guarded by try/except or a conditional membership
check) so the insertion-order is consistent with _groups and capacity/eviction
logic; reference the delete method, the _groups dict, and the _order deque when
making the change.
In `@examples/iot-pipeline/README.md`:
- Around line 159-163: Update the README blurb to state that
examples/iot-pipeline/smoke.sh starts the engine and all three workers (it owns
process startup) and therefore should be run by itself to avoid
duplicate-process or port-binding errors; mention that if a user prefers manual
startup they should instead start the engine and workers individually (or
provide the commands) before running smoke.sh, and add a short warning note that
running manual startup and smoke.sh together will cause conflicts.
- Around line 167-189: The fenced code block in the examples/iot-pipeline
README.md lacks a language tag causing markdownlint MD040; update the opening
triple-backtick to include a language (e.g., change ``` to ```text) for the
directory-tree block so the linter accepts it and the rendered fenced block
remains unchanged; locate the tree block in the README.md and add the tag to the
opening fence.
In `@examples/iot-pipeline/rust-worker/src/sensors/data.rs`:
- Around line 153-159: The doc comment claims the reading is written "via
streams.merge" but the implementation uses UpdateOp::set for a full replace (see
UpdateOp::set and the code path that writes to readings::{sensor_id}::latest and
calls aggregate::update_stats); update the doc comment to accurately describe
the actual behavior (e.g., "writes reading to readings::{sensor_id}::latest via
a full replace using UpdateOp::set") or alternatively change the implementation
to perform a merge instead of using UpdateOp::set—reference UpdateOp::set,
readings::{sensor_id}::latest, and aggregate::update_stats to locate the
relevant code.
In `@examples/iot-pipeline/rust-worker/src/sensors/synthetic.rs`:
- Around line 150-158: The batch path is regenerating new random SensorReading
values (via generate_reading/generate_all_readings) after releasing the lock, so
the batch stream gets different data than what was actually ingested; instead
pass the exact SensorReading instances that were pushed into the primary stream
to super::batch::handle_batch_stream (reuse the variable/collection used during
ingestion rather than calling generate_all_readings again and avoid re-seeding
randomness), or capture and clone those same readings while still holding
last_values if needed so handle_batch_stream receives the identical readings
used by the main pipeline.
In `@examples/iot-pipeline/smoke.sh`:
- Around line 104-107: The readiness check waits for the `/system/workers`
endpoint which is only registered later by the Node worker, causing a timeout;
update the call to wait_for_ready to poll a built-in engine health endpoint or
the engine port (e.g., use "$ENGINE_HTTP/" or "$ENGINE_HTTP/health") instead of
"$ENGINE_HTTP/system/workers" and ensure this check happens after starting iii
(the background process captured by PIDS+=($!)) so that iii is started before
wait_for_ready is invoked.
---
Nitpick comments:
In `@examples/iot-pipeline/node-worker/src/index.ts`:
- Around line 223-236: The multiplier "20" used when computing trigger
thresholds (config.z_score_threshold * 20) is a magic number—extract it into a
named constant (e.g., Z_SCORE_TO_TRIGGER_MULTIPLIER) or add an inline comment
explaining the conversion from z-score to trigger value, then use that constant
in the iii.registerTrigger call; update references around DEFAULT_THRESHOLDS,
the loop calling iii.registerTrigger, and the config.z_score_threshold
expression so the intent is documented and reusable.
In `@examples/iot-pipeline/python-worker/pyproject.toml`:
- Around line 13-17: Update the dev optional dependency list in pyproject.toml
to pin pytest-asyncio to a compatible minimum (e.g., "pytest-asyncio>=0.18.0")
so the existing pytest config option asyncio_mode = "auto" works reliably;
modify the [project.optional-dependencies] dev array entry that currently lists
"pytest-asyncio" to include the version constraint.
- Around line 22-23: The script entry point currently uses a generic name "main"
under project.scripts which can cause conflicts; rename it to a more descriptive
identifier (e.g., "iot_worker" or "iot_pipeline_main") and update the entry to
point at the same function (for example change main = "src.main:main" to
iot_worker = "src.main:main"), then search the repo for any references to the
old "main" script name (docs, CI, README, tests) and update them to the new
script name so callers and documentation remain consistent.
In `@examples/iot-pipeline/python-worker/src/analytics/stats.py`:
- Around line 59-62: The code hardcodes the stream name "timeseries" when
calling ring_buffer.list_groups via
StreamListGroupsInput(stream_name="timeseries"); extract that literal into a
shared constant or config used by both registration (in main.py) and lookup (in
analytics/stats.py) so the ring buffer registration and listing use the same
source of truth (e.g., define TIMESERIES_STREAM_NAME in a shared module and
reference it in the ring_buffer registration and in StreamListGroupsInput).
In `@examples/iot-pipeline/python-worker/src/main.py`:
- Around line 103-110: The current peer-wait loop in poll_peers uses a
sleep-based polling on the boolean peers_ready; replace this with an
asyncio.Event to avoid busy-waiting: create an asyncio.Event (e.g.,
peers_ready_event), set it where peers_ready would be set, change
poll_peers/_async_main to await peers_ready_event.wait() and keep the
asyncio.wait_for timeout logic and the timeout print that references
discovered_ids; update references to peers_ready to use peers_ready_event and
remove the sleep loop.
- Around line 113-114: The infinite keep-alive loop ("while True: await
asyncio.sleep(60)") has no shutdown hook; replace it with an asyncio-compatible
stop mechanism (e.g., create an asyncio.Event named stop_event and await
stop_event.wait() instead of the infinite sleep) and register signal handlers
for SIGINT and SIGTERM that call stop_event.set() (use loop.add_signal_handler
where available or signal.signal fallback). Ensure any cleanup and graceful
shutdown logic runs after stop_event is set so the worker can terminate cleanly.
In `@examples/iot-pipeline/python-worker/tests/test_stats.py`:
- Around line 38-77: Add a test that exercises the compute() branch when no
"sensor_id" is provided: create a TimeSeriesRingBuffer, populate it with values
for multiple sensor IDs using the existing _populate_buffer helper, call
compute({}, rb), and assert the returned dict contains a "sensors" entry with
per-sensor stats and a "total_sensors" count matching the number of distinct
sensors; reference the compute function and TimeSeriesRingBuffer to locate the
code to test.
In `@examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs`:
- Around line 69-85: The handler handle_stats currently only returns a simple
status and logs sensor_id; update it to provide useful data or clarify behavior:
either (A) fetch and return real aggregation results by calling the aggregation
helper used by ingest (e.g., invoke ingest_with_streams or a dedicated
get_aggregate_stats(sensor_id) function) and include fields like last_updated,
count, avg, min, max, and sensor_id, or (B) change the returned payload and
eprintln() to clearly indicate this is a proxy/placeholder and document that
actual aggregation occurs in ingest_with_streams (e.g., include "note":
"aggregation performed in ingest_with_streams"). Ensure references to the
function name handle_stats and ingest_with_streams are present so reviewers can
locate the change.
- Around line 54-67: In update_stats replace the convoluted extraction of
current_stats (currently using read_result.new_value.as_object().and_then(|_|
Some(&read_result.new_value))) with a simpler mapping such as
read_result.new_value.as_object().map(|_| &read_result.new_value) or an
is_object() check (e.g. if read_result.new_value.is_object() {
Some(&read_result.new_value) } else { None }); this keeps the same semantics
while simplifying the code around read_result.new_value/current_stats in
update_stats.
In `@examples/iot-pipeline/smoke.sh`:
- Around line 96-99: The script only checks for the 'iii' tool and lets missing
utilities (cargo, uv, npx, curl) cause background workers to die and later time
out in wait_for_workers; add a preflight check that validates all required
commands (at least 'iii', 'cargo', 'uv', 'npx', 'curl') before starting workers
— implement this by defining a REQUIRED_CMDS list/array in smoke.sh and looping
over it to command -v each item, echoing a clear error and exit 1 if any are
missing so wait_for_workers never hides the real failure.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3f5b9482-d2e6-45e9-b75b-3224476c200c
⛔ Files ignored due to path filters (15)
examples/iot-pipeline/node-worker/package-lock.jsonis excluded by!**/package-lock.jsonexamples/iot-pipeline/python-worker/src/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/__pycache__/main.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/anomaly.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/batch.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/stats.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/streams/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/streams/__pycache__/ring_buffer.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_anomaly.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_ring_buffer.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_stats.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/uv.lockis excluded by!**/*.lockexamples/iot-pipeline/rust-worker/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (30)
.gitignoreexamples/iot-pipeline/FUNCTION_IDS.mdexamples/iot-pipeline/README.mdexamples/iot-pipeline/node-worker/package.jsonexamples/iot-pipeline/node-worker/src/iii.tsexamples/iot-pipeline/node-worker/src/index.tsexamples/iot-pipeline/node-worker/src/triggers/threshold.tsexamples/iot-pipeline/node-worker/tsconfig.jsonexamples/iot-pipeline/python-worker/pyproject.tomlexamples/iot-pipeline/python-worker/src/__init__.pyexamples/iot-pipeline/python-worker/src/analytics/__init__.pyexamples/iot-pipeline/python-worker/src/analytics/anomaly.pyexamples/iot-pipeline/python-worker/src/analytics/batch.pyexamples/iot-pipeline/python-worker/src/analytics/stats.pyexamples/iot-pipeline/python-worker/src/main.pyexamples/iot-pipeline/python-worker/src/streams/__init__.pyexamples/iot-pipeline/python-worker/src/streams/ring_buffer.pyexamples/iot-pipeline/python-worker/tests/__init__.pyexamples/iot-pipeline/python-worker/tests/test_anomaly.pyexamples/iot-pipeline/python-worker/tests/test_ring_buffer.pyexamples/iot-pipeline/python-worker/tests/test_stats.pyexamples/iot-pipeline/rust-worker/Cargo.tomlexamples/iot-pipeline/rust-worker/src/main.rsexamples/iot-pipeline/rust-worker/src/sensors/aggregate.rsexamples/iot-pipeline/rust-worker/src/sensors/batch.rsexamples/iot-pipeline/rust-worker/src/sensors/data.rsexamples/iot-pipeline/rust-worker/src/sensors/mod.rsexamples/iot-pipeline/rust-worker/src/sensors/models.rsexamples/iot-pipeline/rust-worker/src/sensors/synthetic.rsexamples/iot-pipeline/smoke.sh
| ## Analytics Worker (Python) -- analytics::* | ||
|
|
||
| | Function ID | Description | Phase | | ||
| |-------------|-------------|-------| | ||
| | analytics::stats::compute | Compute min/max/avg/count per sensor from ring buffer | 3 | | ||
| | analytics::anomaly::detect | Z-score anomaly detection, returns is_anomaly boolean | 3 | | ||
|
|
There was a problem hiding this comment.
The registry is already missing a live function ID.
examples/iot-pipeline/rust-worker/src/sensors/batch.rs, Lines 1-48 invokes analytics::batch::process, but this “single source of truth” does not list it. At minimum add that ID here so the registry is not incomplete on day one.
Suggested doc fix
| analytics::stats::compute | Compute min/max/avg/count per sensor from ring buffer | 3 |
| analytics::anomaly::detect | Z-score anomaly detection, returns is_anomaly boolean | 3 |
+| analytics::batch::process | Consume streamed reading batches from Rust and return batch results | 3 |📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ## Analytics Worker (Python) -- analytics::* | |
| | Function ID | Description | Phase | | |
| |-------------|-------------|-------| | |
| | analytics::stats::compute | Compute min/max/avg/count per sensor from ring buffer | 3 | | |
| | analytics::anomaly::detect | Z-score anomaly detection, returns is_anomaly boolean | 3 | | |
| ## Analytics Worker (Python) -- analytics::* | |
| | Function ID | Description | Phase | | |
| |-------------|-------------|-------| | |
| | analytics::stats::compute | Compute min/max/avg/count per sensor from ring buffer | 3 | | |
| | analytics::anomaly::detect | Z-score anomaly detection, returns is_anomaly boolean | 3 | | |
| | analytics::batch::process | Consume streamed reading batches from Rust and return batch results | 3 | |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/FUNCTION_IDS.md` around lines 16 - 22, The registry is
missing the function ID analytics::batch::process; update FUNCTION_IDS.md under
the Analytics Worker table to add a row for analytics::batch::process with a
short description (e.g., "Batch-process sensor readings / aggregate into ring
buffer") and the appropriate Phase (use 3 to match other analytics entries) so
the documented registry matches the actual invocation of
analytics::batch::process.
| "scripts": { | ||
| "dev": "bun src/index.ts", | ||
| "start": "npx tsx src/index.ts" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
import json
from pathlib import Path
pkg = json.loads(Path("examples/iot-pipeline/node-worker/package.json").read_text())
print("scripts:", pkg.get("scripts", {}))
PY
rg -n "bun|Node.js|npm|npx" \
examples/iot-pipeline/README.md \
examples/iot-pipeline/node-worker/package.jsonRepository: iii-hq/iii-examples
Length of output: 1510
npm run dev uses an undocumented runtime.
The example documents Node.js/npm/npx prerequisites, but the dev script depends on Bun. Following the README will fail at npm run dev. Either switch dev to tsx (consistent with start) or document Bun as a required prerequisite.
Suggested fix
"scripts": {
- "dev": "bun src/index.ts",
- "start": "npx tsx src/index.ts"
+ "dev": "tsx watch src/index.ts",
+ "start": "tsx src/index.ts"
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "scripts": { | |
| "dev": "bun src/index.ts", | |
| "start": "npx tsx src/index.ts" | |
| "scripts": { | |
| "dev": "tsx watch src/index.ts", | |
| "start": "tsx src/index.ts" | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/node-worker/package.json` around lines 4 - 6, The "dev"
script in package.json currently invokes Bun ("dev": "bun src/index.ts") while
the README only documents Node/npm/npx; update the dev script to use the same
runtime as "start" (e.g., set the "dev" script to use tsx like the "start"
script) so npm run dev works with the documented prerequisites, or alternatively
update README/prerequisites to explicitly list Bun; modify the package.json
"dev" script (and README if you choose the Bun route) to keep runtime choice
consistent with the "start" script.
| checkReading(sensorId: string, value: number, iii: { callVoid: (fn: string, data: unknown) => void }): void { | ||
| for (const [triggerId, { function_id, config }] of this.triggers) { | ||
| if (config.sensor_id === sensorId && value > config.threshold) { | ||
| iii.callVoid(function_id, { | ||
| trigger_id: triggerId, | ||
| sensor_id: sensorId, | ||
| value, | ||
| threshold: config.threshold, | ||
| exceeded_by: value - config.threshold, | ||
| }) |
There was a problem hiding this comment.
Threshold events do not match the alert handler contract.
These payloads go to api::alerts::notify, but examples/iot-pipeline/node-worker/src/index.ts, Lines 18-37 reads severity, message, reading, and z_score. The current call only sends value, threshold, and exceeded_by, so threshold alerts get stored with most of their useful fields missing.
Suggested fix
iii.callVoid(function_id, {
trigger_id: triggerId,
sensor_id: sensorId,
- value,
- threshold: config.threshold,
- exceeded_by: value - config.threshold,
+ severity: 'warning',
+ message: `Sensor ${sensorId} exceeded threshold ${config.threshold}`,
+ reading: {
+ sensor_id: sensorId,
+ value,
+ threshold: config.threshold,
+ exceeded_by: value - config.threshold,
+ },
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| checkReading(sensorId: string, value: number, iii: { callVoid: (fn: string, data: unknown) => void }): void { | |
| for (const [triggerId, { function_id, config }] of this.triggers) { | |
| if (config.sensor_id === sensorId && value > config.threshold) { | |
| iii.callVoid(function_id, { | |
| trigger_id: triggerId, | |
| sensor_id: sensorId, | |
| value, | |
| threshold: config.threshold, | |
| exceeded_by: value - config.threshold, | |
| }) | |
| checkReading(sensorId: string, value: number, iii: { callVoid: (fn: string, data: unknown) => void }): void { | |
| for (const [triggerId, { function_id, config }] of this.triggers) { | |
| if (config.sensor_id === sensorId && value > config.threshold) { | |
| iii.callVoid(function_id, { | |
| trigger_id: triggerId, | |
| sensor_id: sensorId, | |
| severity: 'warning', | |
| message: `Sensor ${sensorId} exceeded threshold ${config.threshold}`, | |
| reading: { | |
| sensor_id: sensorId, | |
| value, | |
| threshold: config.threshold, | |
| exceeded_by: value - config.threshold, | |
| }, | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/node-worker/src/triggers/threshold.ts` around lines 29
- 38, In checkReading (in threshold.ts) the payload sent via
iii.callVoid(function_id, ...) lacks the alert handler fields; update the
payload to include severity, message, reading, and z_score so it matches
api::alerts::notify contract: set reading to the sensor value, include a human
message (e.g. "sensor {sensorId} exceeded threshold"), add a severity string
(e.g. "warning" or derive from how far value exceeds config.threshold), and
provide z_score (compute if you have stats or set to null) along with the
existing trigger_id, sensor_id, value/threshold/exceeded_by when calling
function_id.
| A smoke test script validates every endpoint after starting all workers: | ||
|
|
||
| ```bash | ||
| ./examples/iot-pipeline/smoke.sh | ||
| ``` |
There was a problem hiding this comment.
Document that smoke.sh owns process startup.
examples/iot-pipeline/smoke.sh already launches the engine and all three workers. Saying to run it after manual startup points readers at duplicate-process / port-binding failures instead of a clean smoke test.
Suggested doc fix
-A smoke test script validates every endpoint after starting all workers:
+A smoke test script starts the engine and all three workers, then validates every endpoint:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/README.md` around lines 159 - 163, Update the README
blurb to state that examples/iot-pipeline/smoke.sh starts the engine and all
three workers (it owns process startup) and therefore should be run by itself to
avoid duplicate-process or port-binding errors; mention that if a user prefers
manual startup they should instead start the engine and workers individually (or
provide the commands) before running smoke.sh, and add a short warning note that
running manual startup and smoke.sh together will cause conflicts.
| ``` | ||
| examples/iot-pipeline/ | ||
| ├── FUNCTION_IDS.md # Function naming registry | ||
| ├── README.md # This file | ||
| ├── smoke.sh # End-to-end smoke test | ||
| ├── rust-worker/ # Sensor ingestion (Rust) | ||
| │ ├── Cargo.toml | ||
| │ └── src/ | ||
| │ ├── main.rs | ||
| │ └── sensors/ # models, data, aggregate, synthetic, batch | ||
| ├── python-worker/ # Analytics (Python) | ||
| │ ├── pyproject.toml | ||
| │ └── src/ | ||
| │ ├── main.py | ||
| │ ├── analytics/ # anomaly, stats, batch | ||
| │ └── streams/ # TimeSeriesRingBuffer | ||
| └── node-worker/ # API Gateway (Node.js) | ||
| ├── package.json | ||
| └── src/ | ||
| ├── index.ts | ||
| ├── iii.ts | ||
| └── triggers/ # threshold.ts | ||
| ``` |
There was a problem hiding this comment.
Add a language tag to this fenced block.
This is the markdownlint MD040 warning from static analysis. text or plaintext is enough.
Suggested doc fix
-```
+```text
examples/iot-pipeline/
├── FUNCTION_IDS.md # Function naming registry
├── README.md # This file
├── smoke.sh # End-to-end smoke test
@@
├── index.ts
├── iii.ts
└── triggers/ # threshold.ts</details>
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.21.0)</summary>
[warning] 167-167: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
Verify each finding against the current code and only fix it if needed.
In @examples/iot-pipeline/README.md around lines 167 - 189, The fenced code
block in the examples/iot-pipeline README.md lacks a language tag causing
markdownlint MD040; update the opening triple-backtick to include a language
(e.g., change totext) for the directory-tree block so the linter accepts
it and the rendered fenced block remains unchanged; locate the tree block in the
README.md and add the tag to the opening fence.
</details>
<!-- fingerprinting:phantom:medusa:grasshopper -->
<!-- This is an auto-generated comment by CodeRabbit -->
| /// Ingest a sensor reading with stream writes and aggregation. | ||
| /// | ||
| /// 1. Validates the raw JSON input | ||
| /// 2. Deserializes into SensorReading | ||
| /// 3. Writes reading to `readings::{sensor_id}::latest` via streams.merge | ||
| /// 4. Updates aggregated stats via `aggregate::update_stats` | ||
| /// 5. Returns the reading as JSON |
There was a problem hiding this comment.
Fix doc comment inconsistency.
The doc comment at line 157 says "writes reading to readings::{sensor_id}::latest via streams.merge" but the implementation at line 182 uses UpdateOp::set("", ...) for a full replace, not merge.
📝 Suggested fix
/// Ingest a sensor reading with stream writes and aggregation.
///
/// 1. Validates the raw JSON input
/// 2. Deserializes into SensorReading
-/// 3. Writes reading to `readings::{sensor_id}::latest` via streams.merge
+/// 3. Writes reading to `readings::{sensor_id}::latest` via streams.update (full replace)
/// 4. Updates aggregated stats via `aggregate::update_stats`
/// 5. Returns the reading as JSON📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Ingest a sensor reading with stream writes and aggregation. | |
| /// | |
| /// 1. Validates the raw JSON input | |
| /// 2. Deserializes into SensorReading | |
| /// 3. Writes reading to `readings::{sensor_id}::latest` via streams.merge | |
| /// 4. Updates aggregated stats via `aggregate::update_stats` | |
| /// 5. Returns the reading as JSON | |
| /// Ingest a sensor reading with stream writes and aggregation. | |
| /// | |
| /// 1. Validates the raw JSON input | |
| /// 2. Deserializes into SensorReading | |
| /// 3. Writes reading to `readings::{sensor_id}::latest` via streams.update (full replace) | |
| /// 4. Updates aggregated stats via `aggregate::update_stats` | |
| /// 5. Returns the reading as JSON |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/rust-worker/src/sensors/data.rs` around lines 153 -
159, The doc comment claims the reading is written "via streams.merge" but the
implementation uses UpdateOp::set for a full replace (see UpdateOp::set and the
code path that writes to readings::{sensor_id}::latest and calls
aggregate::update_stats); update the doc comment to accurately describe the
actual behavior (e.g., "writes reading to readings::{sensor_id}::latest via a
full replace using UpdateOp::set") or alternatively change the implementation to
perform a merge instead of using UpdateOp::set—reference UpdateOp::set,
readings::{sensor_id}::latest, and aggregate::update_stats to locate the
relevant code.
| // Batch-stream the same readings via channel to Python (non-fatal secondary path) | ||
| let readings_for_batch: Vec<SensorReading> = { | ||
| let current = last_values.lock().await.clone(); | ||
| generate_all_readings(¤t) | ||
| }; | ||
| match super::batch::handle_batch_stream(readings_for_batch, iii).await { | ||
| Ok(result) => eprintln!("[synthetic] Batch stream result: {result}"), | ||
| Err(e) => eprintln!("[synthetic] Batch stream failed (non-fatal): {e}"), | ||
| } |
There was a problem hiding this comment.
Batch streams different readings than what was ingested.
The code regenerates readings after dropping the lock (lines 151-154), which produces different readings than those just ingested (lines 131-139). Since generate_reading uses rand::thread_rng(), each call produces new random values.
This means the Python batch processor receives data that wasn't actually ingested into the streams, causing data inconsistency between the primary pipeline and the batch analytics path.
🐛 Proposed fix
+ // Clone readings before dropping lock for batch streaming
+ let readings_for_batch = readings.clone();
+
// Drop the lock before the batch call to avoid holding it across await
drop(new_last);
eprintln!(
"[synthetic] Generated {} readings for sensors: {:?}",
sensor_ids.len(),
sensor_ids
);
// Batch-stream the same readings via channel to Python (non-fatal secondary path)
- let readings_for_batch: Vec<SensorReading> = {
- let current = last_values.lock().await.clone();
- generate_all_readings(¤t)
- };
match super::batch::handle_batch_stream(readings_for_batch, iii).await {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/rust-worker/src/sensors/synthetic.rs` around lines 150
- 158, The batch path is regenerating new random SensorReading values (via
generate_reading/generate_all_readings) after releasing the lock, so the batch
stream gets different data than what was actually ingested; instead pass the
exact SensorReading instances that were pushed into the primary stream to
super::batch::handle_batch_stream (reuse the variable/collection used during
ingestion rather than calling generate_all_readings again and avoid re-seeding
randomness), or capture and clone those same readings while still holding
last_values if needed so handle_batch_stream receives the identical readings
used by the main pipeline.
| echo "=== Starting iii engine ===" | ||
| iii & | ||
| PIDS+=($!) | ||
| wait_for_ready "$ENGINE_HTTP/system/workers" |
There was a problem hiding this comment.
The engine readiness gate waits on an endpoint that does not exist yet.
/system/workers is registered later by the Node worker in examples/iot-pipeline/node-worker/src/index.ts, Lines 115-143. Waiting on it before any worker is started turns a clean run into a timeout. Poll the engine port or a built-in engine health endpoint instead.
Suggested fix
+wait_for_port() {
+ local host="$1"
+ local port="$2"
+ local elapsed=0
+ echo "Waiting for ${host}:${port} to accept connections..."
+ while [ "$elapsed" -lt "$TIMEOUT" ]; do
+ if (echo >"/dev/tcp/$host/$port") >/dev/null 2>&1; then
+ echo " Ready after ${elapsed}s"
+ return 0
+ fi
+ sleep "$POLL_INTERVAL"
+ elapsed=$((elapsed + POLL_INTERVAL))
+ done
+ echo " TIMEOUT after ${TIMEOUT}s waiting for ${host}:${port}"
+ return 1
+}
+
echo "=== Starting iii engine ==="
iii &
PIDS+=($!)
-wait_for_ready "$ENGINE_HTTP/system/workers"
+wait_for_port "localhost" "3111"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/smoke.sh` around lines 104 - 107, The readiness check
waits for the `/system/workers` endpoint which is only registered later by the
Node worker, causing a timeout; update the call to wait_for_ready to poll a
built-in engine health endpoint or the engine port (e.g., use "$ENGINE_HTTP/" or
"$ENGINE_HTTP/health") instead of "$ENGINE_HTTP/system/workers" and ensure this
check happens after starting iii (the background process captured by PIDS+=($!))
so that iii is started before wait_for_ready is invoked.
…ime dashboard Complete IoT pipeline demonstrating Rust, Python, and Node.js workers coordinated through the iii engine, with a React dashboard featuring: - Rust worker: synthetic sensor data generation and batch aggregation - Python worker: anomaly detection, statistics, and ring buffer streams - Node.js worker: alert notifications, API gateway, and peer discovery - React frontend: live architecture graph, sensor charts, API playground, WebSocket streaming via engine streams, and IDE-style resizable layout
140d8d5 to
cf69ec8
Compare
There was a problem hiding this comment.
Actionable comments posted: 13
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
♻️ Duplicate comments (2)
examples/iot-pipeline/smoke.sh (1)
104-107:⚠️ Potential issue | 🔴 CriticalEngine readiness gate still waits on
/system/workerstoo early.Line [107] can block startup because this route depends on worker registration that happens later. Gate engine readiness on port/health instead.
Suggested patch
wait_for_ready() { local endpoint="$1" local elapsed=0 echo "Waiting for $endpoint to be ready..." @@ return 1 } +wait_for_port() { + local host="$1" + local port="$2" + local elapsed=0 + echo "Waiting for ${host}:${port} to accept connections..." + while [ "$elapsed" -lt "$TIMEOUT" ]; do + if (echo >"/dev/tcp/$host/$port") >/dev/null 2>&1; then + echo " Ready after ${elapsed}s" + return 0 + fi + sleep "$POLL_INTERVAL" + elapsed=$((elapsed + POLL_INTERVAL)) + done + echo " TIMEOUT after ${TIMEOUT}s waiting for ${host}:${port}" + return 1 +} + @@ echo "=== Starting iii engine ===" iii & PIDS+=($!) -wait_for_ready "$ENGINE_HTTP/system/workers" +wait_for_port "localhost" "3111"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/smoke.sh` around lines 104 - 107, The readiness gate currently waits on the /system/workers route which can block startup; change the check in the startup script so the call to wait_for_ready uses the engine health endpoint or a plain port/health probe instead of "$ENGINE_HTTP/system/workers" (e.g., "$ENGINE_HTTP/health" or a TCP port check) after launching iii and appending its PID; update the argument passed to wait_for_ready (refer to wait_for_ready and ENGINE_HTTP in the snippet) so it no longer depends on worker registration.examples/iot-pipeline/node-worker/src/triggers/threshold.ts (1)
29-38:⚠️ Potential issue | 🟠 MajorThreshold alerts still don't match the notify payload.
This call still only sends
value,threshold, andexceeded_by. If the alert notifier is still consumingseverity,message,reading, andz_score, threshold alerts will arrive missing their core metadata.Run this to confirm the current consumer contract before changing the payload:
#!/bin/bash set -euo pipefail threshold_file="$(fd '^threshold\.ts$' examples/iot-pipeline/node-worker/src | head -n1)" index_file="$(fd '^index\.ts$' examples/iot-pipeline/node-worker/src | head -n1)" test -n "$threshold_file" test -n "$index_file" printf '--- %s ---\n' "$threshold_file" sed -n '1,120p' "$threshold_file" printf '\n--- %s ---\n' "$index_file" sed -n '1,160p' "$index_file" printf '\n--- related fields ---\n' rg -n -C2 'severity|message|reading|z_score|api::alerts::notify|callVoid\(' examples/iot-pipeline/node-worker/srcExpected result: if the consumer still reads
severity,message,reading, orz_score, this payload is still incomplete.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/node-worker/src/triggers/threshold.ts` around lines 29 - 38, The payload sent in checkReading (method checkReading, loop over this.triggers, call iii.callVoid(function_id,...)) is missing consumer-required fields; update the object passed to iii.callVoid to include severity, message, reading, and z_score alongside the existing trigger_id, sensor_id, value, threshold, and exceeded_by. Use config values when present (e.g., config.severity, config.message) and set reading to the original value; if z_score cannot be computed from available config (mean/std), include it as null or compute (value - config.mean)/config.std when both exist so the consumer contract is satisfied.
🟡 Minor comments (11)
examples/iot-pipeline/python-worker/src/analytics/batch.py-29-31 (1)
29-31:⚠️ Potential issue | 🟡 MinorAdd error handling for malformed JSON in the batch payload.
If
rawcontains invalid JSON,json.loadswill raise aJSONDecodeErrorthat propagates uncaught. For batch processing resilience, consider wrapping this with error handling.🔧 Suggested improvement
raw: bytes = await reader.read_all() - readings: list[dict[str, Any]] = json.loads(raw.decode("utf-8")) + try: + readings: list[dict[str, Any]] = json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as e: + return {"processed": 0, "anomalies_found": 0, "alerts_dispatched": 0, "error": str(e)}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/analytics/batch.py` around lines 29 - 31, The JSON parsing of the batch payload using json.loads(raw.decode("utf-8")) can raise json.JSONDecodeError and currently will propagate uncaught; wrap the decode+json.loads call in a try/except that catches json.JSONDecodeError (and optionally UnicodeDecodeError), log the error with context (include the offending raw/prefix and the exception) and then handle the bad batch (e.g., skip processing this batch or raise a controlled exception) before continuing; change the code around reader.read_all(), raw, and readings to perform this guarded parse and ensure downstream code only runs when readings is a valid list[dict].examples/iot-pipeline/frontend/src/features/sensors/components/sensor-chart.tsx-68-79 (1)
68-79:⚠️ Potential issue | 🟡 MinorGuard against undefined sensor config in tooltip.
If
entry.dataKeydoesn't exist inSENSOR_MAP,cfgwill beundefined, causing a runtime error when accessingcfg.labelandcfg.uniton Lines 74 and 76.🛡️ Proposed fix
{payload.filter((p) => p.value != null).map((entry) => { const cfg = SENSOR_MAP[entry.dataKey]; + if (!cfg) return null; return ( <div key={entry.dataKey} className="flex items-center justify-between gap-3 text-[11px]">🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/sensors/components/sensor-chart.tsx` around lines 68 - 79, The tooltip mapping can crash when SENSOR_MAP[entry.dataKey] is undefined; update the mapping in sensor-chart (where payload.map uses cfg = SENSOR_MAP[entry.dataKey]) to guard against missing configs by providing a safe fallback object (e.g., const cfg = SENSOR_MAP[entry.dataKey] || { label: entry.dataKey, unit: '' }) or by using optional chaining/defaults when reading cfg.label and cfg.unit so the tooltip never accesses properties on undefined; ensure keys still use entry.dataKey and the existing value.toFixed(1) logic remains unchanged.examples/iot-pipeline/frontend/src/features/layout/components/bottom-panel.tsx-54-58 (1)
54-58:⚠️ Potential issue | 🟡 MinorEvent listeners may leak if component unmounts during drag.
The cleanup effect only resets
isDragging.currentbut doesn't remove themousemoveandmouseuplisteners attached todocument. If the component unmounts while the user is dragging, those listeners remain attached.🔧 Proposed fix: Track and clean up listeners
+ const cleanupRef = useRef<(() => void) | null>(null); + const handleMouseDown = useCallback( (e: React.MouseEvent) => { e.preventDefault(); isDragging.current = true; const startY = e.clientY; const startHeight = bottomPanelHeight; const handleMouseMove = (moveEvent: MouseEvent) => { if (isDragging.current) { const delta = startY - moveEvent.clientY; setBottomPanelHeight(startHeight + delta); } }; const handleMouseUp = () => { isDragging.current = false; document.removeEventListener("mousemove", handleMouseMove); document.removeEventListener("mouseup", handleMouseUp); document.body.style.cursor = ""; document.body.style.userSelect = ""; + cleanupRef.current = null; }; + cleanupRef.current = () => { + document.removeEventListener("mousemove", handleMouseMove); + document.removeEventListener("mouseup", handleMouseUp); + document.body.style.cursor = ""; + document.body.style.userSelect = ""; + }; + document.body.style.cursor = "row-resize"; document.body.style.userSelect = "none"; document.addEventListener("mousemove", handleMouseMove); document.addEventListener("mouseup", handleMouseUp); }, [bottomPanelHeight, setBottomPanelHeight], ); useEffect(() => { return () => { isDragging.current = false; + cleanupRef.current?.(); }; }, []);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/layout/components/bottom-panel.tsx` around lines 54 - 58, The effect cleanup only resets isDragging.current but doesn't remove document-level event listeners; update the component to track the mousemove and mouseup handler functions (e.g., store references like onDocumentMouseMove and onDocumentMouseUp or booleans like listenersAttached) when you call document.addEventListener, and in the useEffect cleanup return removeEventListener for both 'mousemove' and 'mouseup' (and set isDragging.current = false) so any listeners added during a drag are removed on unmount; reference the existing isDragging ref and the functions that attach document listeners in the drag start logic to ensure you remove the exact same handler references.examples/iot-pipeline/frontend/src/features/playground/lib/endpoints.ts-18-29 (1)
18-29:⚠️ Potential issue | 🟡 MinorStale timestamp in
defaultBody.
new Date().toISOString()is evaluated once at module load time, so the default body will always contain the same timestamp regardless of when the user opens the playground. Consider makingdefaultBodya function or regenerating it when the endpoint is selected.🔧 Proposed fix: Use a factory function
export interface EndpointConfig { id: string; method: "GET" | "POST"; path: string; label: string; description: string; - defaultBody?: string; + defaultBody?: string | (() => string); pathParams?: { name: string; placeholder: string; default: string }[]; } export const ENDPOINTS: EndpointConfig[] = [ { id: "post-sensors-ingest", method: "POST", path: "/sensors/ingest", label: "POST /sensors/ingest", description: "Ingest a sensor reading", - defaultBody: JSON.stringify( - { - sensor_id: "temp-001", - value: 23.5, - timestamp: new Date().toISOString(), - sensor_type: "temperature", - unit: "celsius", - location: { lat: 40.7128, lon: -74.006 }, - }, - null, - 2, - ), + defaultBody: () => + JSON.stringify( + { + sensor_id: "temp-001", + value: 23.5, + timestamp: new Date().toISOString(), + sensor_type: "temperature", + unit: "celsius", + location: { lat: 40.7128, lon: -74.006 }, + }, + null, + 2, + ), }, // ... ];Then in the consumer, call it if it's a function:
const body = typeof endpoint.defaultBody === "function" ? endpoint.defaultBody() : endpoint.defaultBody;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/playground/lib/endpoints.ts` around lines 18 - 29, The defaultBody currently uses new Date().toISOString() at module load, producing a stale timestamp; change defaultBody in the endpoint definition to be a factory function (e.g., () => JSON.stringify({... , timestamp: new Date().toISOString(), ...}, null, 2)) so a fresh timestamp is created each time, and update the consumer to handle a function by calling it when present (use a typeof endpoint.defaultBody === "function" check and call endpoint.defaultBody(), otherwise use the value as-is) so the playground always shows a current timestamp.examples/iot-pipeline/node-worker/src/index.ts-25-32 (1)
25-32:⚠️ Potential issue | 🟡 MinorReuse one generated alert ID for both persistence paths.
Line 25 can overwrite an earlier alert from the same sensor if two notifications land in the same millisecond. Generate one
alertIdonce and reuse it for bothstate::setandstream::setso the records stay correlated and collision-resistant.🛠️ Suggested change
- await iii.call('state::set', { scope: 'alerts', key: sensor_id + '-' + Date.now(), value: alert }) + const alertId = `${sensor_id ?? 'unknown'}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` + + await iii.call('state::set', { scope: 'alerts', key: alertId, value: alert }) await iii.call('stream::set', { stream_name: 'alerts', group_id: sensor_id, - item_id: `${sensor_id}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + item_id: alertId, data: { sensor_id, severity, message, z_score, timestamp: new Date().toISOString() }, })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/node-worker/src/index.ts` around lines 25 - 32, The current code calls iii.call('state::set', ...) and iii.call('stream::set', ...) with independently computed IDs (sensor_id + '-' + Date.now()) causing possible collisions; generate a single alertId (e.g., use a timestamp plus a random suffix) and a single timestamp value once, then reuse that alertId for the state key and the stream item_id (and reuse the timestamp variable for data.timestamp) so both persistence paths share the same ID and are collision-resistant; update the calls that reference the inline expressions for key and item_id to use the new alertId and timestamp variables instead.examples/iot-pipeline/frontend/src/features/realtime/components/alert-log.tsx-50-55 (1)
50-55:⚠️ Potential issue | 🟡 MinorLet the message cell shrink, and render all anomaly scores.
Line 50 will not ellipsize reliably in this flex row without
min-w-0, and Line 54 hides negative z-scores even though low-side anomalies are still relevant.🛠️ Suggested change
- <span className="text-text-secondary truncate">{entry.message}</span> + <span className="min-w-0 flex-1 truncate text-text-secondary">{entry.message}</span> {entry.sensorId && ( <span className="ml-auto text-text-muted whitespace-nowrap">{entry.sensorId}</span> )} - {entry.zScore > 0 && ( + {typeof entry.zScore === "number" && Number.isFinite(entry.zScore) && ( <span className="text-text-muted whitespace-nowrap">z={entry.zScore.toFixed(1)}</span> )}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/realtime/components/alert-log.tsx` around lines 50 - 55, The message cell is not shrinking because the span containing entry.message needs the CSS utility min-w-0, and the z-score check currently hides negative anomalies; update the JSX in the component rendering entry.message and entry.zScore so the message span includes "min-w-0" (e.g., className="text-text-secondary truncate min-w-0") and change the z-score conditional from "entry.zScore > 0" to a presence check like "entry.zScore !== undefined && entry.zScore !== null" so all anomaly scores (including negative) are rendered as z={entry.zScore.toFixed(1)}.examples/iot-pipeline/frontend/src/features/graph/components/invocation-edge.tsx-4-12 (1)
4-12:⚠️ Potential issue | 🟡 MinorForward
markerEndfromEdgePropstoBaseEdge.Custom edge components in
@xyflow/reactshould accept and forward themarkerEndprop fromEdgePropstoBaseEdge. Currently, the component hard-codesMarkerType.ArrowClosed, which bypasses the edge marker configuration defined on the graph. AddmarkerEndto the destructured parameters and pass it through toBaseEdge.Proposed fix
export function InvocationEdge({ id, sourceX, sourceY, targetX, targetY, sourcePosition, targetPosition, + markerEnd, }: EdgeProps) { @@ <BaseEdge path={edgePath} - markerEnd={MarkerType.ArrowClosed} + markerEnd={markerEnd} style={{Also applies to: 39-42
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/graph/components/invocation-edge.tsx` around lines 4 - 12, The InvocationEdge component currently hard-codes the edge marker (MarkerType.ArrowClosed) instead of honoring the markerEnd from EdgeProps; update the function signature to destructure markerEnd from EdgeProps (alongside id, sourceX, sourceY, targetX, targetY, sourcePosition, targetPosition) and pass that markerEnd prop into the BaseEdge component instead of MarkerType.ArrowClosed; ensure any other similar custom edge occurrences in this file also forward markerEnd to BaseEdge.examples/iot-pipeline/frontend/src/features/realtime/stores/event.store.ts-53-57 (1)
53-57:⚠️ Potential issue | 🟡 MinorHardcoded worker ID reduces flexibility.
The
anomaly_alerthandler always targets"worker-python", which tightly couples the store to a specific topology. If the Python worker's ID changes or multiple workers can raise anomalies, this will break.💡 Suggested improvement
Consider extracting the worker ID from the event payload if available:
if (event.type === "anomaly_alert") { const nextNodes = new Map(state.flashingNodes); - nextNodes.set("worker-python", now + 2000); + const workerId = (event.payload.worker_id as string) ?? "worker-python"; + nextNodes.set(workerId, now + 2000); return { flashingNodes: nextNodes }; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/realtime/stores/event.store.ts` around lines 53 - 57, The anomaly_alert branch hardcodes the node id "worker-python", making the store brittle; update the handler in the event processing (where event.type === "anomaly_alert") to read the target node id from the event payload (e.g., event.workerId or event.sourceId) and use that value when setting flashingNodes in the Map instead of the literal "worker-python", falling back to a sensible default if the payload lacks the id; keep the existing Map logic (nextNodes.set(..., now + 2000)) and ensure the symbol flashingNodes and the event.type check remain unchanged.examples/iot-pipeline/frontend/src/features/realtime/stores/event.store.ts-102-103 (1)
102-103:⚠️ Potential issue | 🟡 MinorInterval not cleaned up on module unload.
The
setIntervalruns indefinitely and is never cleared. In development with hot module replacement (HMR), this will stack intervals on each reload, causing memory leaks and redundant tick calls.🔧 Suggested approach for HMR-safe cleanup
// Periodic cleanup of expired animation entries -setInterval(() => useEventStore.getState().tick(), 500); +const tickInterval = setInterval(() => useEventStore.getState().tick(), 500); + +// Clean up on HMR (if supported by bundler) +if (import.meta.hot) { + import.meta.hot.dispose(() => clearInterval(tickInterval)); +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/realtime/stores/event.store.ts` around lines 102 - 103, The periodic setInterval call that invokes useEventStore.getState().tick() is never cleared, causing duplicate intervals during HMR; modify the code that creates the interval (the setInterval invoking useEventStore.getState().tick()) to capture the returned timer id, call clearInterval(timerId) when the module is disposed, and (optionally) clear any existing timer before creating a new one; use the module hot API (e.g., import.meta.hot?.accept()/dispose()) or equivalent to perform the cleanup so tick() isn't invoked multiple times after reloads.examples/iot-pipeline/frontend/src/features/realtime/types.ts-46-52 (1)
46-52:⚠️ Potential issue | 🟡 MinorInconsistent
timestamptype across payload interfaces.
StreamAlertPayload.timestampis typed asstring, whileStreamEventPayload.timestamp(line 41) andDashboardEvent.timestamp(line 59) are typed asnumber. This inconsistency could cause runtime type mismatches when processing alerts.🔧 Suggested fix for consistency
export interface StreamAlertPayload { sensor_id: string severity: string message: string z_score: number - timestamp: string + timestamp: number }If the backend actually sends a string timestamp for alerts, consider parsing it to a number at the boundary, or document the intentional difference.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/realtime/types.ts` around lines 46 - 52, StreamAlertPayload has timestamp: string while StreamEventPayload.timestamp and DashboardEvent.timestamp are number — make these consistent by changing StreamAlertPayload.timestamp to number (or, if the backend truly returns a string, keep the type but add parsing at the ingestion boundary). Update the StreamAlertPayload interface (symbol: StreamAlertPayload) to use number and ensure any code that constructs alerts parses string timestamps to numbers (or conversely, add a clear conversion in the alert ingestion/normalization code) so all consumers (e.g., code that reads StreamEventPayload and DashboardEvent) see a consistent numeric timestamp.examples/iot-pipeline/frontend/src/features/layout/components/sidebar-panel.tsx-173-179 (1)
173-179:⚠️ Potential issue | 🟡 MinorKeyboard interaction missing on resizable separator.
The separator has
tabIndex={0}androle="separator", making it focusable and accessible, but there's noonKeyDownhandler to support keyboard-based resizing (e.g., using arrow keys). Screen reader users or keyboard-only users can focus this element but cannot interact with it.♿ Proposed fix to add keyboard support
<div role="separator" tabIndex={0} aria-orientation="vertical" onMouseDown={handleMouseDown} + onKeyDown={(e) => { + if (e.key === "ArrowLeft") setSidebarWidth(sidebarWidth - 10); + if (e.key === "ArrowRight") setSidebarWidth(sidebarWidth + 10); + }} className="absolute top-0 right-0 z-10 h-full w-1 cursor-col-resize transition-colors hover:bg-accent" />🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/layout/components/sidebar-panel.tsx` around lines 173 - 179, The separator div in sidebar-panel.tsx is focusable but lacks keyboard handlers; add an onKeyDown prop alongside onMouseDown to handle ArrowLeft/ArrowRight (and optionally ArrowUp/ArrowDown/Home/End) to adjust the panel width state used by the resizer, calling preventDefault and updating the same resize logic used by handleMouseDown/drag (or reuse a shared resize function), and ensure ARIA attributes (role="separator" and aria-orientation) expose current size via aria-valuenow/aria-valuetext if available; implement this in the component where handleMouseDown is defined (and any resize state updater function) so keyboard users can increment/decrement the width with consistent step values.
🧹 Nitpick comments (16)
examples/iot-pipeline/frontend/src/app/index.css (1)
33-49: Scrollbar styling is WebKit-only.The
::-webkit-scrollbarpseudo-elements only apply to WebKit/Blink browsers (Chrome, Safari, Edge). Firefox users will see the default browser scrollbar. If cross-browser consistency is desired, consider addingscrollbar-widthandscrollbar-colorproperties for Firefox:🎨 Optional: Add Firefox scrollbar support
`@layer` base { body { background-color: var(--color-bg-primary); color: var(--color-text-primary); font-family: var(--font-mono); -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale; + scrollbar-width: thin; + scrollbar-color: var(--color-accent-dim) var(--color-bg-surface); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/app/index.css` around lines 33 - 49, The current CSS only targets WebKit/Blink via ::-webkit-scrollbar/::-webkit-scrollbar-thumb rules, so add cross-browser rules for Firefox by including scrollbar-width and scrollbar-color declarations (e.g., on the root or the same selectors) to match var(--color-accent-dim) and var(--color-bg-surface) and set a thin width; keep the existing ::-webkit-* rules intact and ensure the variables used (var(--color-accent-dim), var(--color-accent), var(--color-bg-surface)) are applied consistently so Firefox users get equivalent thumb/track colors via scrollbar-color and a thin appearance via scrollbar-width.examples/iot-pipeline/frontend/vite.config.ts (1)
15-17: Consider deduplicating identical HTTP proxy targets.Lines 15–17 repeat the same target; extracting a constant reduces drift risk in future edits.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/vite.config.ts` around lines 15 - 17, The proxy entries in the Vite config repeat the same target URL; introduce a single constant (e.g., BACKEND_URL) near the top of the vite.config.ts and replace the repeated string values for the proxy mapping (the object that defines "/sensors", "/analytics", "/system") with that constant so all three routes reference the same target value and future updates only change the constant.examples/iot-pipeline/python-worker/src/analytics/anomaly.py (1)
99-108: Timestamp fallback usesstr(time.time())which may cause ordering issues.When
timestampis missing, the fallbackstr(time.time())produces a string like"1741876543.123456". Sinceitem_idis used in a deque for ordering and the ring buffer uses string keys, mixing user-provided timestamps (e.g., ISO format) with numeric strings could lead to inconsistent ordering or duplicate key collisions if two readings arrive within the same float precision window.Consider using a more robust fallback:
🔧 Suggested improvement
- timestamp = data.get("timestamp", str(time.time())) + import uuid + timestamp = data.get("timestamp") or f"{time.time():.6f}-{uuid.uuid4().hex[:8]}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/analytics/anomaly.py` around lines 99 - 108, The fallback timestamp using str(time.time()) can create ordering/collision issues when mixed with other timestamp formats; change the fallback in the storage block that builds StreamSetInput (the timestamp variable used for item_id with ring_buffer.set) to a consistent, monotonic and unique format — e.g., use an integer millisecond epoch (int(time.time() * 1000)) or an ISO8601 UTC string with microsecond precision, convert it to a string only once for item_id, and if necessary append a short uniqueness token (e.g., sequence or uuid) to avoid duplicates for readings that arrive in the same millisecond.examples/iot-pipeline/python-worker/src/main.py (1)
77-90: Global mutable state for peer discovery works but could be encapsulated.Using module-level globals (
discovered_ids,peers_ready) with aglobalstatement in the callback is functional but makes testing harder. For an example project this is acceptable, but consider encapsulating in a class for production use.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/main.py` around lines 77 - 90, Encapsulate the module-level mutable state by creating a PeerDiscovery class that holds discovered_ids and peers_ready as instance attributes and provides an on_functions_available(self, functions: list) method to replace the top-level on_functions_available; update any registration to pass the instance method instead of the standalone function. Specifically, move discovered_ids and peers_ready into PeerDiscovery, implement on_functions_available to check EXPECTED_PEERS (referenced in the method) and mutate self.discovered_ids / self.peers_ready, and remove the global statement and module globals so tests can instantiate PeerDiscovery for isolation.examples/iot-pipeline/python-worker/src/analytics/stats.py (1)
66-70: Consider batching executor calls for all-sensors aggregation.Each sensor's stats computation spawns a separate
run_in_executorcall. For many sensors, this creates overhead from repeated thread pool scheduling. Consider computing all stats in a single executor call.♻️ Optional batched approach
sensors: list[dict[str, Any]] = [] + all_values = [(gid, ring_buffer.get_values(gid)) for gid in groups] + + def _compute_all() -> list[dict[str, Any]]: + return [ + {**_compute_stats_sync(vals), "sensor_id": gid} + for gid, vals in all_values + ] + + sensors = await loop.run_in_executor(None, _compute_all) - for gid in groups: - values = ring_buffer.get_values(gid) - sensor_stats = await loop.run_in_executor(None, _compute_stats_sync, values) - sensor_stats["sensor_id"] = gid - sensors.append(sensor_stats)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/analytics/stats.py` around lines 66 - 70, The current loop calls loop.run_in_executor for each gid which causes per-sensor executor scheduling overhead; instead gather all per-sensor value lists (using ring_buffer.get_values in the existing for gid in groups loop), then call loop.run_in_executor once with a new sync helper (e.g. _compute_stats_sync_all or a partial that maps _compute_stats_sync over the collected values) to compute stats for every sensor in a single background thread call, then iterate the returned aggregated results to set "sensor_id" and extend the sensors list; update references to loop.run_in_executor and _compute_stats_sync accordingly.examples/iot-pipeline/python-worker/src/streams/ring_buffer.py (1)
47-47: Consider renaminginputparameter to avoid shadowing the builtin.The
inputparameter shadows Python's built-ininput()function. While this is unlikely to cause issues in this context (the builtin is rarely used in async code), renaming toinpor a more descriptive name likeget_inputwould silence the linter and improve clarity.This applies to all IStream methods (lines 47, 53, 79, 84, 92, 95).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/python-worker/src/streams/ring_buffer.py` at line 47, The parameter name "input" in async method get (and other IStream methods) shadows the built-in input(); rename the parameter to a non-conflicting name (e.g., inp or get_input) in the method signatures (e.g., async def get(self, inp: StreamGetInput) -> dict[str, Any] | None) and update all internal references and any matching method definitions in the IStream interface/implementations to use the new name so type hints and call sites remain consistent across get and the other IStream methods mentioned.examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs (1)
60-66: Simplify the current_stats extraction.Line 62 is unnecessarily convoluted. It checks if
new_valueis an object, then discards the object and returns a reference tonew_value. This works becauseupdate_stats_opshandles missing keys with defaults, but the intent would be clearer with a simpler approach.♻️ Suggested simplification
// Step 1: Read current stats by sending an empty update let read_result = streams.update(&key, vec![]).await?; - let current_stats = read_result.new_value.as_object().and_then(|_| Some(&read_result.new_value)); + let current_stats = read_result.new_value.as_object().map(|_| &read_result.new_value);Or if the intent is to treat empty objects as "no prior stats":
let read_result = streams.update(&key, vec![]).await?; - let current_stats = read_result.new_value.as_object().and_then(|_| Some(&read_result.new_value)); + let current_stats = read_result + .new_value + .as_object() + .filter(|obj| !obj.is_empty()) + .map(|_| &read_result.new_value);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/rust-worker/src/sensors/aggregate.rs` around lines 60 - 66, The current extraction of current_stats via read_result.new_value.as_object().and_then(|_| Some(&read_result.new_value)) is overcomplicated; simplify by directly using the new_value reference (e.g., set current_stats = Some(&read_result.new_value)) before passing to update_stats_ops, or if you intend to treat an empty object as "no prior stats", check if read_result.new_value.as_object().map(|o| o.is_empty()).unwrap_or(false) and set current_stats to None in that case so update_stats_ops receives either Some(&new_value) or None; update the code around read_result/current_stats and the streams.update call accordingly.examples/iot-pipeline/frontend/src/features/graph/components/language-icon.tsx (1)
81-90: Add a defensive fallback for unexpectedlanguagevalues.At runtime, a malformed value can still reach this component and make it render nothing. Adding a default branch improves resilience (Line [82]).
Suggested patch
export function LanguageIcon({ language, className }: LanguageIconProps) { switch (language) { case "rust": return <RustIcon className={className} />; case "python": return <PythonIcon className={className} />; case "node": return <NodeIcon className={className} />; + default: + return <NodeIcon className={className} />; } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/graph/components/language-icon.tsx` around lines 81 - 90, LanguageIcon currently has no default branch so unexpected language values render nothing; update the LanguageIcon({ language, className }) switch to include a default case that returns a defensive fallback (e.g., an UnknownLanguageIcon or a simple <span> with the language label) and optionally logs a warning (console.warn or a logger) so malformed values are visible at runtime; ensure the fallback accepts className so styling remains consistent.examples/iot-pipeline/smoke.sh (2)
79-83: Preserve actual HTTP status codes in endpoint tests.Using
-fcauses non-2xx responses to collapse into000, which removes useful failure diagnostics (Lines [79]-[83]).Suggested patch
- http_code=$(curl -sf -o /dev/null -w "%{http_code}" \ + http_code=$(curl -sS -o /dev/null --connect-timeout 2 --max-time 10 -w "%{http_code}" \ -X POST -H "Content-Type: application/json" -d "$body" "$url" 2>/dev/null) || http_code="000" else - http_code=$(curl -sf -o /dev/null -w "%{http_code}" "$url" 2>/dev/null) || http_code="000" + http_code=$(curl -sS -o /dev/null --connect-timeout 2 --max-time 10 -w "%{http_code}" "$url" 2>/dev/null) || http_code="000" fi🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/smoke.sh` around lines 79 - 83, The current curl invocations that set http_code (the two commands constructing http_code in the script) use -f which makes curl exit non‑zero and the subsequent || http_code="000", losing real HTTP status codes; modify both curl calls to remove the -f flag (keep -s and -o /dev/null -w "%{http_code}" behavior) so that curl writes the actual HTTP status code into the http_code variable instead of collapsing failures to "000".
96-99: Add preflight checks for all required CLIs.Currently only
iiiis validated. Fail fast forcurl,cargo,uv, andnpxtoo (Line [96]).Suggested patch
-if ! command -v iii >/dev/null 2>&1; then - echo "ERROR: 'iii' command not found. Install the iii engine first." - exit 1 -fi +for cmd in iii curl cargo uv npx; do + if ! command -v "$cmd" >/dev/null 2>&1; then + echo "ERROR: '$cmd' command not found." + exit 1 + fi +done🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/smoke.sh` around lines 96 - 99, Add preflight CLI checks similar to the existing iii check: for each of curl, cargo, uv, and npx, verify availability using the same command -v ... >/dev/null 2>&1 pattern and exit with a clear error message if missing. Update the block that currently checks for "iii" so it includes separate checks for "curl", "cargo", "uv", and "npx" (same style as the iii check) and ensure each failure uses echo with the specific tool name and exit 1 to fail fast.examples/iot-pipeline/frontend/src/features/realtime/stores/alert.store.ts (1)
36-48: Consider extracting the ID generation helper.Both
addAlertandaddWorkerEventuse the same ID generation pattern. A small helper would reduce duplication.♻️ Optional: Extract ID helper
+const generateId = (prefix: string) => + `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + export const useAlertStore = create<AlertStore>()((set) => ({ alerts: [], addAlert: (payload) => set((state) => { const entry: AlertEntry = { - id: `alert-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + id: generateId("alert"), // ... }; // ... }), addWorkerEvent: (payload) => set((state) => { // ... const entry: AlertEntry = { - id: `worker-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + id: generateId("worker"), // ... }; // ... }), }));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/realtime/stores/alert.store.ts` around lines 36 - 48, Both addAlert and addWorkerEvent duplicate the same ID pattern; extract that logic into a single helper (e.g., generateAlertId or makeAlertId) and call it from both functions. Move the new helper into the same module/scope as addAlert/addWorkerEvent, implement it to return the existing format `worker-${Date.now()}-${Math.random().toString(36).slice(2,8)}` (or `alert-` for generic alerts if preferred), then replace the inline ID expressions in addAlert and addWorkerEvent with calls to this helper to remove duplication.examples/iot-pipeline/frontend/src/features/layout/components/bottom-panel.tsx (1)
67-74: Separator lacks keyboard support.The separator has
tabIndex={0}making it focusable, but there's noonKeyDownhandler. Keyboard users cannot resize the panel. Consider adding arrow key support or removingtabIndexif keyboard resize isn't intended.⌨️ Optional: Add keyboard support
<div role="separator" tabIndex={0} aria-orientation="horizontal" + aria-valuenow={bottomPanelHeight} + aria-valuemin={150} + aria-valuemax={500} onMouseDown={handleMouseDown} + onKeyDown={(e) => { + if (e.key === "ArrowUp") { + e.preventDefault(); + setBottomPanelHeight(bottomPanelHeight + 10); + } else if (e.key === "ArrowDown") { + e.preventDefault(); + setBottomPanelHeight(bottomPanelHeight - 10); + } + }} className="h-1 shrink-0 cursor-row-resize transition-colors hover:bg-accent" />🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/layout/components/bottom-panel.tsx` around lines 67 - 74, The separator div rendered in bottom-panel.tsx is focusable (tabIndex={0}) but lacks keyboard handlers; update it to either remove tabIndex if keyboard resizing isn't supported or add an onKeyDown that mirrors handleMouseDown's resize behavior: implement a handleKeyDown that listens for ArrowUp/ArrowDown (and optionally Home/End/PageUp/PageDown), prevents default, computes the new panel height (or calls the same resize helper used by handleMouseDown/resize logic) and updates state (e.g., setPanelHeight or invoke the existing resize function), and attach onKeyDown={handleKeyDown} to the separator so keyboard users can resize with arrow keys.examples/iot-pipeline/frontend/src/features/graph/components/architecture-graph.tsx (1)
3-3: Prefer an explicit event-type import here.Line 38 relies on the ambient
Reactnamespace even though this file only importsuseCallback. Importingtype MouseEventfromreactis more robust across TS configs and avoids depending on global namespace types.♻️ Suggested change
-import { useCallback } from "react"; +import { type MouseEvent, useCallback } from "react"; ... - (_: React.MouseEvent, node: Node) => { + (_: MouseEvent, node: Node) => { selectNode(node.id); },Also applies to: 37-41
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/graph/components/architecture-graph.tsx` at line 3, The file currently imports only useCallback from React but relies on the ambient React namespace for event types; update the import to explicitly import the MouseEvent type (e.g. import { useCallback, type MouseEvent } from "react") and then annotate the handler signature(s) that use the React.MouseEvent (for example onNodeClick or any function at/around the current handler at line ~38) to use MouseEvent<HTMLDivElement> or the appropriate element generic; this ensures type imports are explicit and avoids depending on global React namespace types.examples/iot-pipeline/frontend/src/features/playground/components/api-playground.tsx (1)
6-9: Make the playground layout responsive.A fixed
grid-cols-2will squeeze both panes once the bottom panel is narrowed or viewed on a smaller screen. Consider collapsing to one column until a larger breakpoint.♻️ Suggested change
- <div className="grid h-full grid-cols-2 gap-2 p-2"> + <div className="grid h-full grid-cols-1 gap-2 p-2 xl:grid-cols-2">🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/playground/components/api-playground.tsx` around lines 6 - 9, The two-pane playground currently uses a fixed grid (className "grid h-full grid-cols-2 gap-2 p-2") which squeezes panes on small screens; update the container to use responsive Tailwind classes (e.g., collapse to one column on small viewports and switch to two columns at a breakpoint) so RequestPanel and ResponsePanel stack vertically on narrow screens and sit side-by-side on larger screens; modify the container's className to something like "grid h-full grid-cols-1 md:grid-cols-2 gap-2 p-2" (or choose the breakpoint that fits your layout) and ensure any internal sizing or heights in RequestPanel/ResponsePanel still work when stacked.examples/iot-pipeline/frontend/src/features/graph/components/worker-node.tsx (1)
26-27: Consider optimizing store selector to reduce re-renders.The component subscribes to the entire
flashingNodesMap, which triggers a re-render whenever any node's flashing state changes. For a graph with many nodes, this could cause performance issues.⚡ Proposed optimization using a targeted selector
- const flashingNodes = useEventStore((s) => s.flashingNodes); - const isFlashing = flashingNodes.has(id); + const isFlashing = useEventStore((s) => s.flashingNodes.has(id));Note: This requires Zustand's shallow equality or a custom equality function since
has()returns a primitive boolean, which should work correctly with default equality.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/graph/components/worker-node.tsx` around lines 26 - 27, The component currently subscribes to the entire flashingNodes Map which causes re-renders on any map change; change the selector passed to useEventStore to directly derive the boolean for this node (e.g., call useEventStore(s => s.flashingNodes.has(id))) and remove the separate flashingNodes variable so the component only subscribes to this node's flashing state (isFlashing) — since the selector returns a primitive boolean no special equality is required.examples/iot-pipeline/frontend/src/features/layout/components/sidebar-panel.tsx (1)
45-46: Potential duplicate function display across workers.The filter
f.workerId === w.id || f.namespace === inferNamespace(w)could match the same function under multiple workers ifworkerIddiffers but the inferred namespace matches. This might be intentional for fallback grouping, but could cause visual duplication in the sidebar.Consider whether the fallback to namespace matching is necessary, or if
workerIdmatching alone is sufficient:.filter((f) => f.workerId === w.id || f.namespace === inferNamespace(w)) + // Consider: .filter((f) => f.workerId === w.id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/iot-pipeline/frontend/src/features/layout/components/sidebar-panel.tsx` around lines 45 - 46, The current filter on functions (functions.filter((f) => f.workerId === w.id || f.namespace === inferNamespace(w))) can yield duplicates across workers; change the logic to only include namespace matches when the function has no explicit workerId or to deduplicate after filtering. Specifically, update the filter to something like: keep functions where f.workerId === w.id OR (f.workerId is falsy AND f.namespace === inferNamespace(w)), or alternatively run the existing filter then remove duplicates by function id (e.g., using a Set keyed on f.id) so the same function isn't shown under multiple worker entries; use the same symbols (functions, f.workerId, w.id, f.namespace, inferNamespace) when locating code to modify.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eac857bf-872f-44bd-9a2d-959021c41ff1
⛔ Files ignored due to path filters (19)
examples/iot-pipeline/frontend/pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlexamples/iot-pipeline/frontend/public/favicon.svgis excluded by!**/*.svgexamples/iot-pipeline/frontend/public/icons.svgis excluded by!**/*.svgexamples/iot-pipeline/node-worker/package-lock.jsonis excluded by!**/package-lock.jsonexamples/iot-pipeline/node-worker/pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlexamples/iot-pipeline/python-worker/src/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/__pycache__/main.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/anomaly.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/batch.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/analytics/__pycache__/stats.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/streams/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/src/streams/__pycache__/ring_buffer.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/__init__.cpython-314.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_anomaly.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_ring_buffer.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/tests/__pycache__/test_stats.cpython-314-pytest-9.0.2.pycis excluded by!**/*.pycexamples/iot-pipeline/python-worker/uv.lockis excluded by!**/*.lockexamples/iot-pipeline/rust-worker/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (87)
.gitignoreexamples/iot-pipeline/FUNCTION_IDS.mdexamples/iot-pipeline/README.mdexamples/iot-pipeline/frontend/.gitignoreexamples/iot-pipeline/frontend/biome.jsonexamples/iot-pipeline/frontend/components.jsonexamples/iot-pipeline/frontend/index.htmlexamples/iot-pipeline/frontend/package.jsonexamples/iot-pipeline/frontend/src/app/App.tsxexamples/iot-pipeline/frontend/src/app/index.cssexamples/iot-pipeline/frontend/src/app/providers.tsxexamples/iot-pipeline/frontend/src/features/graph/components/architecture-graph.tsxexamples/iot-pipeline/frontend/src/features/graph/components/detail-panel.tsxexamples/iot-pipeline/frontend/src/features/graph/components/function-node.tsxexamples/iot-pipeline/frontend/src/features/graph/components/invocation-edge.tsxexamples/iot-pipeline/frontend/src/features/graph/components/language-icon.tsxexamples/iot-pipeline/frontend/src/features/graph/components/ownership-edge.tsxexamples/iot-pipeline/frontend/src/features/graph/components/worker-node.tsxexamples/iot-pipeline/frontend/src/features/graph/data/graph-topology.tsexamples/iot-pipeline/frontend/src/features/graph/graph.store.tsexamples/iot-pipeline/frontend/src/features/graph/hooks/use-graph-layout.tsexamples/iot-pipeline/frontend/src/features/graph/hooks/use-live-graph.tsexamples/iot-pipeline/frontend/src/features/layout/components/bottom-panel.tsxexamples/iot-pipeline/frontend/src/features/layout/components/dashboard-layout.tsxexamples/iot-pipeline/frontend/src/features/layout/components/graph-placeholder.tsxexamples/iot-pipeline/frontend/src/features/layout/components/header-bar.tsxexamples/iot-pipeline/frontend/src/features/layout/components/sidebar-panel.tsxexamples/iot-pipeline/frontend/src/features/layout/stores/layout.store.tsexamples/iot-pipeline/frontend/src/features/playground/components/api-playground.tsxexamples/iot-pipeline/frontend/src/features/playground/components/endpoint-selector.tsxexamples/iot-pipeline/frontend/src/features/playground/components/request-panel.tsxexamples/iot-pipeline/frontend/src/features/playground/components/response-panel.tsxexamples/iot-pipeline/frontend/src/features/playground/components/status-badge.tsxexamples/iot-pipeline/frontend/src/features/playground/hooks/use-send-request.tsexamples/iot-pipeline/frontend/src/features/playground/lib/endpoints.tsexamples/iot-pipeline/frontend/src/features/playground/lib/theme.tsexamples/iot-pipeline/frontend/src/features/playground/stores/playground.store.tsexamples/iot-pipeline/frontend/src/features/realtime/components/alert-log.tsxexamples/iot-pipeline/frontend/src/features/realtime/hooks/use-streams.tsexamples/iot-pipeline/frontend/src/features/realtime/stores/alert.store.tsexamples/iot-pipeline/frontend/src/features/realtime/stores/event.store.tsexamples/iot-pipeline/frontend/src/features/realtime/types.tsexamples/iot-pipeline/frontend/src/features/sensors/components/sensor-chart.tsxexamples/iot-pipeline/frontend/src/features/sensors/components/sensor-skeleton.tsxexamples/iot-pipeline/frontend/src/features/sensors/stores/sensor.store.tsexamples/iot-pipeline/frontend/src/main.tsxexamples/iot-pipeline/frontend/src/shared/api/client.tsexamples/iot-pipeline/frontend/src/shared/api/queries.tsexamples/iot-pipeline/frontend/src/shared/lib/utils.tsexamples/iot-pipeline/frontend/src/shared/stores/connection.store.tsexamples/iot-pipeline/frontend/src/shared/types/api.tsexamples/iot-pipeline/frontend/src/shared/types/worker.tsexamples/iot-pipeline/frontend/src/shared/ui/badge.tsxexamples/iot-pipeline/frontend/src/shared/ui/button.tsxexamples/iot-pipeline/frontend/src/shared/ui/tabs.tsxexamples/iot-pipeline/frontend/src/vite-env.d.tsexamples/iot-pipeline/frontend/tsconfig.app.jsonexamples/iot-pipeline/frontend/tsconfig.jsonexamples/iot-pipeline/frontend/tsconfig.node.jsonexamples/iot-pipeline/frontend/vite.config.tsexamples/iot-pipeline/node-worker/package.jsonexamples/iot-pipeline/node-worker/src/iii.tsexamples/iot-pipeline/node-worker/src/index.tsexamples/iot-pipeline/node-worker/src/triggers/threshold.tsexamples/iot-pipeline/node-worker/tsconfig.jsonexamples/iot-pipeline/python-worker/pyproject.tomlexamples/iot-pipeline/python-worker/src/__init__.pyexamples/iot-pipeline/python-worker/src/analytics/__init__.pyexamples/iot-pipeline/python-worker/src/analytics/anomaly.pyexamples/iot-pipeline/python-worker/src/analytics/batch.pyexamples/iot-pipeline/python-worker/src/analytics/stats.pyexamples/iot-pipeline/python-worker/src/main.pyexamples/iot-pipeline/python-worker/src/streams/__init__.pyexamples/iot-pipeline/python-worker/src/streams/ring_buffer.pyexamples/iot-pipeline/python-worker/tests/__init__.pyexamples/iot-pipeline/python-worker/tests/test_anomaly.pyexamples/iot-pipeline/python-worker/tests/test_ring_buffer.pyexamples/iot-pipeline/python-worker/tests/test_stats.pyexamples/iot-pipeline/rust-worker/Cargo.tomlexamples/iot-pipeline/rust-worker/src/main.rsexamples/iot-pipeline/rust-worker/src/sensors/aggregate.rsexamples/iot-pipeline/rust-worker/src/sensors/batch.rsexamples/iot-pipeline/rust-worker/src/sensors/data.rsexamples/iot-pipeline/rust-worker/src/sensors/mod.rsexamples/iot-pipeline/rust-worker/src/sensors/models.rsexamples/iot-pipeline/rust-worker/src/sensors/synthetic.rsexamples/iot-pipeline/smoke.sh
✅ Files skipped from review due to trivial changes (3)
- examples/iot-pipeline/frontend/src/vite-env.d.ts
- examples/iot-pipeline/frontend/.gitignore
- examples/iot-pipeline/frontend/components.json
🚧 Files skipped from review as they are similar to previous changes (9)
- examples/iot-pipeline/python-worker/tests/test_stats.py
- examples/iot-pipeline/node-worker/src/iii.ts
- examples/iot-pipeline/python-worker/tests/test_ring_buffer.py
- examples/iot-pipeline/rust-worker/src/sensors/batch.rs
- .gitignore
- examples/iot-pipeline/rust-worker/Cargo.toml
- examples/iot-pipeline/python-worker/pyproject.toml
- examples/iot-pipeline/FUNCTION_IDS.md
- examples/iot-pipeline/python-worker/src/streams/init.py
| @@ -0,0 +1,27 @@ | |||
| { | |||
| "$schema": "https://biomejs.dev/schemas/1.9.0/schema.json", | |||
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What is the latest version of Biome linter?
💡 Result:
The latest Biome linter (Biome CLI) release is v2.4.6 (npm package @biomejs/biome@2.4.6), released March 5, 2026. [1]
Source:
[1] Biome GitHub releases page (biomejs/biome)
🏁 Script executed:
find . -name "biome.json" -type fRepository: iii-hq/iii-examples
Length of output: 119
🌐 Web query:
Biome schema version 2.4.6 URL
💡 Result:
https://biomejs.dev/schemas/2.4.6/schema.json [1], [2], [3]
🏁 Script executed:
cat ./examples/iot-pipeline/frontend/biome.jsonRepository: iii-hq/iii-examples
Length of output: 543
Update Biome schema to version 2.4.6.
The configuration references Biome schema version 1.9.0, but the latest version is 2.4.6 (released March 5, 2026). Update "$schema" to "https://biomejs.dev/schemas/2.4.6/schema.json" to align with the current Biome release.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/frontend/biome.json` at line 2, Update the "$schema"
value in the Biome configuration: replace the existing "$schema" string
("https://biomejs.dev/schemas/1.9.0/schema.json") with the new schema URL
"https://biomejs.dev/schemas/2.4.6/schema.json" so the Biome config references
version 2.4.6.
| import { ENDPOINTS } from "../lib/endpoints"; | ||
| import { usePlaygroundStore } from "../stores/playground.store"; | ||
|
|
||
| export function EndpointSelector() { | ||
| const selectedEndpointId = usePlaygroundStore((s) => s.selectedEndpointId); | ||
| const setSelectedEndpoint = usePlaygroundStore((s) => s.setSelectedEndpoint); | ||
|
|
||
| const selected = ENDPOINTS.find((e) => e.id === selectedEndpointId); | ||
|
|
||
| return ( | ||
| <div className="flex flex-col gap-1"> | ||
| <select | ||
| value={selectedEndpointId} | ||
| onChange={(e) => setSelectedEndpoint(e.target.value)} | ||
| className="w-full rounded border border-border-default bg-bg-elevated px-2 py-1.5 text-xs font-mono text-text-primary outline-none focus:border-accent" | ||
| > | ||
| {ENDPOINTS.map((ep) => ( | ||
| <option key={ep.id} value={ep.id}> | ||
| {ep.label} | ||
| </option> | ||
| ))} | ||
| </select> | ||
| {selected && <span className="text-[10px] text-text-muted">{selected.description}</span>} |
There was a problem hiding this comment.
Add a built-in label hook for this select.
As written, this component renders an unlabeled <select> unless every caller wraps it in a <label>, and the helper text is not associated with the control. Giving it its own id/label/aria-describedby makes it accessible by default.
Suggested fix
+import { useId } from "react";
import { ENDPOINTS } from "../lib/endpoints";
import { usePlaygroundStore } from "../stores/playground.store";
export function EndpointSelector() {
+ const selectId = useId();
const selectedEndpointId = usePlaygroundStore((s) => s.selectedEndpointId);
const setSelectedEndpoint = usePlaygroundStore((s) => s.setSelectedEndpoint);
const selected = ENDPOINTS.find((e) => e.id === selectedEndpointId);
return (
<div className="flex flex-col gap-1">
+ <label htmlFor={selectId} className="sr-only">
+ API endpoint
+ </label>
<select
+ id={selectId}
+ aria-describedby={selected ? `${selectId}-description` : undefined}
value={selectedEndpointId}
onChange={(e) => setSelectedEndpoint(e.target.value)}
className="w-full rounded border border-border-default bg-bg-elevated px-2 py-1.5 text-xs font-mono text-text-primary outline-none focus:border-accent"
>
{ENDPOINTS.map((ep) => (
@@
- {selected && <span className="text-[10px] text-text-muted">{selected.description}</span>}
+ {selected && (
+ <span id={`${selectId}-description`} className="text-[10px] text-text-muted">
+ {selected.description}
+ </span>
+ )}
</div>
);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { ENDPOINTS } from "../lib/endpoints"; | |
| import { usePlaygroundStore } from "../stores/playground.store"; | |
| export function EndpointSelector() { | |
| const selectedEndpointId = usePlaygroundStore((s) => s.selectedEndpointId); | |
| const setSelectedEndpoint = usePlaygroundStore((s) => s.setSelectedEndpoint); | |
| const selected = ENDPOINTS.find((e) => e.id === selectedEndpointId); | |
| return ( | |
| <div className="flex flex-col gap-1"> | |
| <select | |
| value={selectedEndpointId} | |
| onChange={(e) => setSelectedEndpoint(e.target.value)} | |
| className="w-full rounded border border-border-default bg-bg-elevated px-2 py-1.5 text-xs font-mono text-text-primary outline-none focus:border-accent" | |
| > | |
| {ENDPOINTS.map((ep) => ( | |
| <option key={ep.id} value={ep.id}> | |
| {ep.label} | |
| </option> | |
| ))} | |
| </select> | |
| {selected && <span className="text-[10px] text-text-muted">{selected.description}</span>} | |
| import { useId } from "react"; | |
| import { ENDPOINTS } from "../lib/endpoints"; | |
| import { usePlaygroundStore } from "../stores/playground.store"; | |
| export function EndpointSelector() { | |
| const selectId = useId(); | |
| const selectedEndpointId = usePlaygroundStore((s) => s.selectedEndpointId); | |
| const setSelectedEndpoint = usePlaygroundStore((s) => s.setSelectedEndpoint); | |
| const selected = ENDPOINTS.find((e) => e.id === selectedEndpointId); | |
| return ( | |
| <div className="flex flex-col gap-1"> | |
| <label htmlFor={selectId} className="sr-only"> | |
| API endpoint | |
| </label> | |
| <select | |
| id={selectId} | |
| aria-describedby={selected ? `${selectId}-description` : undefined} | |
| value={selectedEndpointId} | |
| onChange={(e) => setSelectedEndpoint(e.target.value)} | |
| className="w-full rounded border border-border-default bg-bg-elevated px-2 py-1.5 text-xs font-mono text-text-primary outline-none focus:border-accent" | |
| > | |
| {ENDPOINTS.map((ep) => ( | |
| <option key={ep.id} value={ep.id}> | |
| {ep.label} | |
| </option> | |
| ))} | |
| </select> | |
| {selected && ( | |
| <span id={`${selectId}-description`} className="text-[10px] text-text-muted"> | |
| {selected.description} | |
| </span> | |
| )} | |
| </div> | |
| ); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@examples/iot-pipeline/frontend/src/features/playground/components/endpoint-selector.tsx`
around lines 1 - 23, The EndpointSelector currently renders an unlabeled select
and unassociated helper text; update the EndpointSelector component to generate
a stable id (e.g., via React's useId or a memoized unique string), add a <label>
element tied to that id for the select (use the existing ENDPOINTS label or a
fixed "Endpoint" label), set the select's id to that generated id, and set the
helper span's id and reference it from the select using aria-describedby so the
helper text is programmatically associated; keep using selectedEndpointId and
setSelectedEndpoint unchanged for value/onChange.
| {endpoint.pathParams.map((param) => ( | ||
| <div key={param.name} className="flex items-center gap-2"> | ||
| <span className="text-xs font-mono text-text-muted">:{param.name}</span> | ||
| <input | ||
| type="text" | ||
| value={params[param.name] ?? param.default} | ||
| onChange={(e) => setPathParamValue(endpoint.id, param.name, e.target.value)} | ||
| placeholder={param.placeholder} | ||
| className="flex-1 rounded border border-border-default bg-bg-elevated px-2 py-1 text-xs font-mono text-text-primary outline-none focus:border-accent" | ||
| /> |
There was a problem hiding this comment.
Programmatically label the path-param inputs.
The leading span is visual only, so assistive tech sees an unlabeled textbox. On endpoints with multiple params that makes the playground hard to operate.
Proposed fix
- <div key={param.name} className="flex items-center gap-2">
+ <label key={param.name} className="flex items-center gap-2">
<span className="text-xs font-mono text-text-muted">:{param.name}</span>
<input
+ aria-label={param.name}
type="text"
value={params[param.name] ?? param.default}
onChange={(e) => setPathParamValue(endpoint.id, param.name, e.target.value)}
placeholder={param.placeholder}
className="flex-1 rounded border border-border-default bg-bg-elevated px-2 py-1 text-xs font-mono text-text-primary outline-none focus:border-accent"
/>
- </div>
+ </label>📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| {endpoint.pathParams.map((param) => ( | |
| <div key={param.name} className="flex items-center gap-2"> | |
| <span className="text-xs font-mono text-text-muted">:{param.name}</span> | |
| <input | |
| type="text" | |
| value={params[param.name] ?? param.default} | |
| onChange={(e) => setPathParamValue(endpoint.id, param.name, e.target.value)} | |
| placeholder={param.placeholder} | |
| className="flex-1 rounded border border-border-default bg-bg-elevated px-2 py-1 text-xs font-mono text-text-primary outline-none focus:border-accent" | |
| /> | |
| {endpoint.pathParams.map((param) => ( | |
| <label key={param.name} className="flex items-center gap-2"> | |
| <span className="text-xs font-mono text-text-muted">:{param.name}</span> | |
| <input | |
| aria-label={param.name} | |
| type="text" | |
| value={params[param.name] ?? param.default} | |
| onChange={(e) => setPathParamValue(endpoint.id, param.name, e.target.value)} | |
| placeholder={param.placeholder} | |
| className="flex-1 rounded border border-border-default bg-bg-elevated px-2 py-1 text-xs font-mono text-text-primary outline-none focus:border-accent" | |
| /> | |
| </label> |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@examples/iot-pipeline/frontend/src/features/playground/components/request-panel.tsx`
around lines 38 - 47, The span before each path-param input is visual-only,
leaving the input unlabeled for assistive tech; update the mapping over
endpoint.pathParams to provide an accessible label by either turning the visual
span into a <label> tied to the input via a unique id (e.g., id derived from
endpoint.id and param.name) or by adding an aria-label/aria-labelledby on the
input (e.g., aria-label={`Path parameter ${param.name}`}), and ensure the span
is marked aria-hidden if you keep it visual-only; update the input attributes
used with params and setPathParamValue accordingly so screen readers correctly
announce each field.
|
|
||
| if (!response) return null; | ||
|
|
||
| const formatted = response.data ? JSON.stringify(response.data, null, 2) : ""; |
There was a problem hiding this comment.
Don't hide falsy JSON bodies.
Line 31 uses a truthiness check before JSON.stringify(), so valid payloads like 0, false, null, or "" render as blank output. Check only for undefined here.
Suggested fix
- const formatted = response.data ? JSON.stringify(response.data, null, 2) : "";
+ const formatted =
+ response.data === undefined ? "" : JSON.stringify(response.data, null, 2);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const formatted = response.data ? JSON.stringify(response.data, null, 2) : ""; | |
| const formatted = | |
| response.data === undefined ? "" : JSON.stringify(response.data, null, 2); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@examples/iot-pipeline/frontend/src/features/playground/components/response-panel.tsx`
at line 31, The current conditional for building formatted incorrectly treats
falsy but valid JSON bodies (0, false, null, "") as absent; update the check in
the response formatting logic so it only treats undefined as missing—replace the
truthiness guard around response.data used to create formatted (the const
formatted assignment) with an explicit undefined check (response.data !==
undefined) so valid falsy values are JSON.stringified and rendered.
| function resolvePath(endpoint: EndpointConfig, pathParamValues: Record<string, string>): string { | ||
| let resolvedPath = endpoint.path; | ||
| for (const param of endpoint.pathParams ?? []) { | ||
| resolvedPath = resolvedPath.replace( | ||
| `:${param.name}`, | ||
| pathParamValues[param.name] ?? param.default, | ||
| ); | ||
| } | ||
| return resolvedPath; |
There was a problem hiding this comment.
Validate and encode path params before substitution.
If a required param is missing, replace() inserts the literal "undefined". Raw values also let /, ?, and # change the request path. Fail fast on missing params and wrap each segment with encodeURIComponent.
Proposed fix
function resolvePath(endpoint: EndpointConfig, pathParamValues: Record<string, string>): string {
let resolvedPath = endpoint.path;
for (const param of endpoint.pathParams ?? []) {
+ const value = pathParamValues[param.name] ?? param.default;
+ if (value == null) {
+ throw new Error(`Missing path param: ${param.name}`);
+ }
resolvedPath = resolvedPath.replace(
`:${param.name}`,
- pathParamValues[param.name] ?? param.default,
+ encodeURIComponent(value),
);
}
return resolvedPath;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@examples/iot-pipeline/frontend/src/features/playground/hooks/use-send-request.ts`
around lines 6 - 14, The resolvePath function currently substitutes path params
directly which can produce the literal "undefined" when required values are
missing and allows unsafe characters into the URL; update resolvePath to
validate each param in endpoint.pathParams against pathParamValues and
param.default, throwing an error when a required param is missing, and when
substituting wrap the value with encodeURIComponent (use the chosen value =
pathParamValues[param.name] ?? param.default, ensure it's not undefined/null,
then encodeURIComponent(value)) so segments are safely encoded before calling
replace.
| workers: [...byRuntime.values()].map((w) => { | ||
| const lang = normalizeLanguage(w.runtime); | ||
| return { | ||
| id: w.id, | ||
| name: WORKER_DISPLAY_NAMES[lang], | ||
| language: lang, | ||
| status: (w.status === "connected" ? "connected" : "disconnected") as "connected" | "disconnected", | ||
| functionCount: w.function_count, | ||
| }; |
There was a problem hiding this comment.
Keep WorkerInfo.id aligned with the graph node ids.
The graph/selectors in this PR use stable ids like worker-rust, but this transform returns raw engine ids. Once /system/workers resolves, DetailPanel can no longer join the selected graph node back to live worker data and falls back to stale static status/function counts. Either emit the stable UI id here or carry both ids and make downstream code use the same key consistently.
One workable fix
return {
- id: w.id,
+ id: `worker-${lang}`,
name: WORKER_DISPLAY_NAMES[lang],
language: lang,
status: (w.status === "connected" ? "connected" : "disconnected") as "connected" | "disconnected",
functionCount: w.function_count,
};If another screen still needs the engine id, keep w.id in a separate field instead of overloading id.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/frontend/src/shared/api/queries.ts` around lines 47 -
55, The transformed workers list in queries.ts returns raw engine ids (w.id)
which don't match the graph's stable UI ids (e.g., "worker-rust"), breaking
DetailPanel joins; in the workers mapping (the array built with
normalizeLanguage and WORKER_DISPLAY_NAMES) set WorkerInfo.id to the stable UI
id used by the graph (for example construct "worker-${lang}" or use the graph's
id generator) and preserve the engine id by adding a separate field (e.g.,
engineId: w.id) so downstream code can use the consistent UI key while other
screens can still access the original engine id; update any consumers to read
the stable id (id) for joining and engineId where needed.
| functions: appFunctions.map((f) => { | ||
| const ns = f.function_id.split("::")[0]; | ||
| return { | ||
| id: f.function_id, | ||
| name: f.function_id, | ||
| namespace: ns, | ||
| description: f.description ?? "", | ||
| workerId: "", | ||
| }; |
There was a problem hiding this comment.
Don’t drop the worker/function relationship during normalization.
Returning workerId: "" breaks every downstream join. In this PR that means worker detail panels lose their child-function list and function ownership metadata as soon as /system/functions resolves. Derive the worker id from the namespace mapping here, or expose it from the raw API response.
Proposed fix
return {
id: f.function_id,
name: f.function_id,
namespace: ns,
description: f.description ?? "",
- workerId: "",
+ workerId:
+ ns === "sensors"
+ ? "worker-rust"
+ : ns === "analytics"
+ ? "worker-python"
+ : "worker-node",
};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/frontend/src/shared/api/queries.ts` around lines 66 -
74, The normalization currently sets workerId to an empty string in the
functions mapping, breaking joins; update the mapping in the appFunctions
processing so workerId is derived from the namespace-to-worker mapping (or from
the raw response if available) instead of "", e.g., compute ns =
f.function_id.split("::")[0] then look up the corresponding worker id from the
existing namespace map and assign it to workerId on the returned object in the
functions array (ensure the mapping used by appFunctions is the same namespace
mapping used elsewhere so worker detail panels can join by workerId).
| "compilerOptions": { | ||
| "baseUrl": ".", | ||
| "paths": { | ||
| "@/*": ["./src/*"] | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🌐 Web query:
In TypeScript solution-style projects that use "references", do compilerOptions in the root tsconfig automatically apply to referenced tsconfig files, or only when referenced files use "extends" from a shared base config?
💡 Result:
In a solution-style TypeScript setup, the root tsconfig.json that only lists "references" does not “push down” its compilerOptions into the referenced projects. Each referenced project is compiled using its own tsconfig and compilerOptions. (typescriptlang.org)
To share compilerOptions across referenced projects, you typically put the common options in a base config and have each project tsconfig.json use "extends" to inherit them. (typescriptlang.org)
Citations:
- 1: https://www.typescriptlang.org/docs/handbook/project-references
- 2: https://www.typescriptlang.org/docs/handbook/project-references
🏁 Script executed:
#!/bin/bash
# Explore the project structure around the file in question
cd examples/iot-pipeline
# Check directory structure
echo "=== Directory structure ==="
ls -la
# Look for all tsconfig.json files
echo -e "\n=== All tsconfig.json files in iot-pipeline ==="
find . -name "tsconfig*.json" -type f
# Check if there's a root tsconfig with references
echo -e "\n=== Root tsconfig.json (if exists) ==="
if [ -f "tsconfig.json" ]; then
cat -n tsconfig.json
else
echo "No tsconfig.json in iot-pipeline root"
fi
# Check parent directory for root tsconfig
echo -e "\n=== Parent directory structure ==="
cd ..
ls -la | head -20
echo -e "\n=== Root tsconfig.json (if exists) ==="
if [ -f "tsconfig.json" ]; then
cat -n tsconfig.json
else
echo "No tsconfig.json in parent"
fiRepository: iii-hq/iii-examples
Length of output: 1431
🏁 Script executed:
#!/bin/bash
# Examine the tsconfig files in detail
echo "=== frontend/tsconfig.json (the file under review) ==="
cat -n examples/iot-pipeline/frontend/tsconfig.json
echo -e "\n=== frontend/tsconfig.app.json ==="
cat -n examples/iot-pipeline/frontend/tsconfig.app.json
echo -e "\n=== frontend/tsconfig.node.json ==="
cat -n examples/iot-pipeline/frontend/tsconfig.node.json
echo -e "\n=== node-worker/tsconfig.json ==="
cat -n examples/iot-pipeline/node-worker/tsconfig.jsonRepository: iii-hq/iii-examples
Length of output: 2717
Extract shared path alias into tsconfig.base.json and extend from both referenced configs.
In solution-style TypeScript projects with "references", the root compilerOptions do not automatically apply to referenced projects. Currently, baseUrl and the @/* path alias are defined in both tsconfig.json (root) and tsconfig.app.json, creating maintenance overhead and drift. Additionally, tsconfig.node.json lacks these definitions entirely, causing inconsistency.
Create a shared tsconfig.base.json with the path configuration and have tsconfig.app.json and tsconfig.node.json extend it:
{
"compilerOptions": {
"baseUrl": ".",
"paths": {
"@/*": ["./src/*"]
}
}
}Then in each referenced config, use "extends": "./tsconfig.base.json" at the top level.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/frontend/tsconfig.json` around lines 4 - 9, Move the
shared "baseUrl" and "@/*" path alias into a new tsconfig.base.json containing
the compilerOptions block shown in the comment, then update tsconfig.app.json
and tsconfig.node.json to add "extends": "./tsconfig.base.json" at the top and
remove duplicated baseUrl/paths from those files (leave other options intact);
keep the existing root tsconfig.json as-is or point it to extend the base if you
want a single source of truth.
| plugins: [react(), tailwindcss()], | ||
| resolve: { | ||
| alias: { | ||
| "@": resolve(__dirname, "./src"), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
pkg="$(fd -p 'package.json' examples/iot-pipeline/frontend | head -n1)"
echo "Detected manifest: $pkg"
jq -r '.type // "(not set)"' "$pkg"
echo "Checking __dirname usage in Vite config:"
rg -n '\b__dirname\b' examples/iot-pipeline/frontend/vite.config.tsRepository: iii-hq/iii-examples
Length of output: 216
Replace __dirname with an ESM-safe path helper.
Line 10 uses __dirname, which is undefined in ES Module projects (this package uses "type": "module"). This will cause a ReferenceError and break Vite startup.
Proposed fix
-import { resolve } from "node:path";
+import { fileURLToPath, URL } from "node:url";
import tailwindcss from "@tailwindcss/vite";
import react from "@vitejs/plugin-react";
import { defineConfig } from "vite";
@@
alias: {
- "@": resolve(__dirname, "./src"),
+ "@": fileURLToPath(new URL("./src", import.meta.url)),
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "@": resolve(__dirname, "./src"), | |
| import { fileURLToPath, URL } from "node:url"; | |
| import tailwindcss from "@tailwindcss/vite"; | |
| import react from "@vitejs/plugin-react"; | |
| import { defineConfig } from "vite"; | |
| export default defineConfig({ | |
| plugins: [react(), tailwindcss()], | |
| resolve: { | |
| alias: { | |
| "@": fileURLToPath(new URL("./src", import.meta.url)), | |
| }, | |
| }, | |
| server: { | |
| proxy: { | |
| "/api": "http://localhost:3111", | |
| "/auth": "http://localhost:3111", | |
| "/data": "http://localhost:3111", | |
| "/streams": { | |
| target: "ws://localhost:3112", | |
| ws: true, | |
| rewriteWsOrigin: true, | |
| pathRewrite: { "^/streams": "" }, | |
| }, | |
| }, | |
| }, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/frontend/vite.config.ts` at line 10, The alias uses
__dirname (undefined in ESM); replace it with an ESM-safe path computed from
import.meta.url. Import or use fileURLToPath and dirname to derive a directory
(e.g. dirname(fileURLToPath(import.meta.url))) and pass that into resolve
instead of __dirname so the alias entry (the resolve(...) call in
vite.config.ts) works in a "type: module" project.
| iii.call('state::set', { | ||
| scope: 'worker_metadata', | ||
| key: 'iot-api-gateway', | ||
| value: { | ||
| name: 'iot-api-gateway', | ||
| version: '0.1.0', | ||
| started_at: new Date().toISOString(), | ||
| peers_ready_at: new Date().toISOString(), | ||
| peers: [...discoveredIds], | ||
| }, | ||
| }).catch(() => {}) // best-effort update |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n examples/iot-pipeline/node-worker/src/index.ts | sed -n '210,290p'Repository: iii-hq/iii-examples
Length of output: 3408
🏁 Script executed:
cd examples/iot-pipeline/node-worker && rg "state::set" -B 2 -A 8 --type tsRepository: iii-hq/iii-examples
Length of output: 2267
🏁 Script executed:
cd examples/iot-pipeline && find . -name "*.ts" -o -name "*.js" -o -name "*.md" | head -20Repository: iii-hq/iii-examples
Length of output: 995
🏁 Script executed:
cd examples/iot-pipeline && rg "state::set" --type ts --type md -B 3 -A 3 | head -80Repository: iii-hq/iii-examples
Length of output: 1775
Consolidate the two worker_metadata writes to avoid data inconsistency.
Lines 219-229 and 264-279 both write worker_metadata/iot-api-gateway with different fields. The started_at field is recalculated separately in each call (lines 225 and 270), causing the final state to record the peer-ready timestamp instead of the actual worker startup time. Additionally, if state::set replaces the value, the second call loses the endpoints field; if it merges, the started_at value still gets overwritten.
🛠️ Suggested direction
+const startedAt = new Date().toISOString()
+const endpoints = [
+ 'POST /sensors/ingest',
+ 'GET /sensors/:id',
+ 'GET /analytics/summary',
+ 'GET /system/workers',
+ 'GET /system/functions',
+] as const
+
+let metadataWrite = Promise.resolve()
+function persistWorkerMetadata(extra: Record<string, unknown> = {}) {
+ metadataWrite = metadataWrite.then(() =>
+ iii.call('state::set', {
+ scope: 'worker_metadata',
+ key: 'iot-api-gateway',
+ value: {
+ name: 'iot-api-gateway',
+ version: '0.1.0',
+ started_at: startedAt,
+ endpoints: [...endpoints],
+ ...extra,
+ },
+ }),
+ )
+ return metadataWrite
+}
+
...
- iii.call('state::set', {
- scope: 'worker_metadata',
- key: 'iot-api-gateway',
- value: {
- name: 'iot-api-gateway',
- version: '0.1.0',
- started_at: new Date().toISOString(),
- peers_ready_at: new Date().toISOString(),
- peers: [...discoveredIds],
- },
- }).catch(() => {}) // best-effort update
+ persistWorkerMetadata({
+ peers_ready_at: new Date().toISOString(),
+ peers: [...discoveredIds],
+ }).catch(() => {}) // best-effort update
}
})
...
-iii.call('state::set', {
- scope: 'worker_metadata',
- key: 'iot-api-gateway',
- value: {
- name: 'iot-api-gateway',
- version: '0.1.0',
- started_at: new Date().toISOString(),
- endpoints: [
- 'POST /sensors/ingest',
- 'GET /sensors/:id',
- 'GET /analytics/summary',
- 'GET /system/workers',
- 'GET /system/functions',
- ],
- },
-}).catch((err: Error) => logger.warn('Failed to store worker metadata', { error: err.message }))
+persistWorkerMetadata().catch((err: Error) =>
+ logger.warn('Failed to store worker metadata', { error: err.message }),
+)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/iot-pipeline/node-worker/src/index.ts` around lines 219 - 229, The
two separate iii.call('state::set') calls that write worker_metadata with key
'iot-api-gateway' are overwriting each other and recomputing started_at; compute
started_at once at process start and reuse that constant, then consolidate the
writes by either building a single metadata object containing name, version,
started_at, peers_ready_at, peers and endpoints (use the existing endpoints
value or merge fetched state) and call iii.call('state::set', { scope:
'worker_metadata', key: 'iot-api-gateway', value: metadata }). If the state API
supports partial updates, use a merge/update call instead of replacing;
otherwise read the current value with iii.call('state::get') and merge endpoints
into the single object before calling iii.call('state::set') so started_at is
never overwritten.
Summary
examples/iot-pipelineworkflow demonstrating multi-language workers in Node.js, Python, and Rust.README.md,FUNCTION_IDS.md, and asmoke.shscript for end-to-end validation.Test plan
examples/iot-pipeline/smoke.shwith valid environment variables.examples/iot-pipeline/python-worker.Summary by CodeRabbit