Add polyglot-coordination example with Python, Node.js, and Rust#4
Add polyglot-coordination example with Python, Node.js, and Rust#4
Conversation
This example demonstrates iii-engine coordinating services across three languages using different transport mechanisms: - Python analytics service via stdin/stdout IPC (NO HTTP endpoints) - Rust fake Stripe server via HTTP (Axum) - Node.js user service and orchestration workflow Key features: - Transport abstraction: all bridge.invokeFunction() calls look identical - SaaS user onboarding workflow coordinating all services - Input validation and sanitization for Stripe IDs - Max pending requests limit for Python IPC - Race condition fixes in Rust shared state - Graceful shutdown handlers for all workers - OpenTelemetry integration with trace propagation The example proves that iii abstracts away transport mechanisms entirely.
📝 WalkthroughWalkthroughAdds a new polyglot coordination example that wires Node.js workers, a Rust HTTP service, and a Python stdin/stdout analytics process via an iii-engine Bridge, including IPC utilities, types, worker bridges, a Rust service, a Python service, orchestration workflow, configs, and documentation. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Workflow as Onboarding Workflow
participant UserSvc as User Service (Node)
participant StripeSvc as Stripe Service (Rust)
participant Analytics as Analytics Service (Python)
Client->>Workflow: POST /onboard (email,name,plan)
Workflow->>UserSvc: users.create(...)
UserSvc-->>Workflow: User
Workflow->>StripeSvc: POST /v1/customers
StripeSvc-->>Workflow: StripeCustomer
Workflow->>StripeSvc: POST /v1/subscriptions
StripeSvc-->>Workflow: StripeSubscription
Workflow->>Analytics: analytics.score (via PythonIPC)
Analytics-->>Workflow: AnalyticsResult
Workflow->>UserSvc: users.update(...stripe/analytics...)
UserSvc-->>Workflow: Updated User
Workflow-->>Client: 201 OnboardingResult (user,customer,subscription,analytics)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@examples/polyglot-coordination/services/python-analytics/analytics.py`:
- Around line 127-130: The except block can raise NameError because request may
be undefined if json.loads(line) fails; update the error handling around
json.loads(line) (where request = json.loads(line) is invoked) to capture the
request id safely by precomputing a fallback req_id (e.g., default 0) before the
try or by using a local variable req_id set after successful parsing and
referenced in both except handlers; ensure the json.JSONDecodeError handler
still returns id 0 while the generic Exception handler uses the safe req_id
value instead of directly accessing request.get('id', 0).
In `@examples/polyglot-coordination/workers/data-requester.ts`:
- Around line 7-8: Remove the redundant await by deleting the extra await
python.waitReady() call that occurs immediately after await python.start(),
since PythonIPC.start() already awaits the initial ping and sets the ready
state; locate references to PythonIPC.start and the python.waitReady invocation
in this file and remove the latter. Also make the PYTHON_SCRIPT resolution more
robust by replacing the __dirname + relative path approach (the PYTHON_SCRIPT
and __dirname symbols) with a resolution that uses process.cwd() or a
configurable environment variable (e.g., PYTHON_ANALYTICS_PATH) so the script
path won’t break when compiled into a dist/ output layout.
In `@examples/polyglot-coordination/workflow/onboarding.ts`:
- Around line 115-120: The code passes full User objects from
invoke('users.list') into invoke('analytics.onboardingMetrics'), exposing PII;
instead, map the users array to a minimized payload containing only the fields
needed for metrics (e.g., id, plan, riskScore or whatever OnboardingMetrics
expects) and send that trimmed array to invoke('analytics.onboardingMetrics'),
updating any type annotations (OnboardingMetrics or the invoke generic)
accordingly so only the reduced user shape is transmitted from the onboarding
function.
- Around line 16-21: The invoke helper (async function invoke) is logging raw
input and result via ctx.logger.info which may contain PII; update invoke to
avoid logging full payloads by redacting sensitive fields (email, name, ssn,
etc.) or by extracting and logging only safe identifiers (e.g., userId, traceId)
before calling ctx.logger.info for both the request and response, and ensure any
helper used for redaction is applied to both the input and result (reference
invoke, ctx.logger.info, and bridge.invokeFunction).
🧹 Nitpick comments (8)
examples/polyglot-coordination/lib/python-ipc.ts (2)
80-83: Request ID wraparound could collide with pending requests.When
requestIdwraps around to 0, there's a risk of collision with an in-flight request that was assigned that ID earlier. Consider checking if the ID is already in thependingmap before using it.♻️ Proposed fix to avoid ID collision
const id = ++this.requestId if (this.requestId > Number.MAX_SAFE_INTEGER - 1) { this.requestId = 0 } + // In the extremely unlikely event of collision, reject early + if (this.pending.has(id)) { + throw new Error('Request ID collision - too many concurrent long-running requests') + } const request = JSON.stringify({ id, method, params })
118-127:stop()doesn't reject pending requests, leaving dangling promises.When
stop()is called, pending requests remain unresolved. Unlike thecloseevent handler which rejects all pending requests,stop()leaves callers waiting indefinitely.♻️ Proposed fix to reject pending requests on stop
stop(): void { + for (const [id, req] of this.pending) { + clearTimeout(req.timeout) + req.reject(new Error('Python IPC stopped')) + } + this.pending.clear() if (this.process) { this.process.kill() this.process = null } if (this.readline) { this.readline.close() this.readline = null } }examples/polyglot-coordination/README.md (1)
9-23: Add language specifiers to fenced code blocks.The architecture diagram and file structure blocks are flagged by markdownlint. Adding
textorplaintextas the language specifier improves consistency.📝 Proposed fix
-``` +```text iii-engine (ws://127.0.0.1:49134)-``` +```text polyglot-coordination/ ├── README.mdAlso applies to: 126-147
examples/polyglot-coordination/workers/user-service.ts (1)
15-17: ID generation may produce duplicates under high concurrency.
Date.now()has millisecond resolution, and concurrent requests within the same millisecond rely solely on the 6-character random suffix (~2 billion combinations). For a demo this is fine, but in production consider UUIDs.examples/polyglot-coordination/workers/stripe-bridge.ts (2)
32-49: Add request timeout to prevent indefinite hangs.The
stripeRequestfunction has no timeout. If the Rust server becomes unresponsive, callers will hang indefinitely. Theanalytics-client.tsin the codebase demonstrates the AbortController pattern for timeouts.♻️ Proposed fix to add timeout
+const REQUEST_TIMEOUT_MS = 30000 + async function stripeRequest<T>( method: string, path: string, body?: unknown ): Promise<T> { + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS) + - const response = await fetch(`${STRIPE_URL}${path}`, { - method, - headers: { 'Content-Type': 'application/json' }, - body: body ? JSON.stringify(body) : undefined, - }) + try { + const response = await fetch(`${STRIPE_URL}${path}`, { + method, + headers: { 'Content-Type': 'application/json' }, + body: body ? JSON.stringify(body) : undefined, + signal: controller.signal, + }) - if (!response.ok) { - const errorBody = await response.text().catch(() => '') - throw new Error(`Stripe API error: ${response.status} ${response.statusText}${errorBody ? ` - ${errorBody}` : ''}`) - } + if (!response.ok) { + const errorBody = await response.text().catch(() => '') + throw new Error(`Stripe API error: ${response.status} ${response.statusText}${errorBody ? ` - ${errorBody}` : ''}`) + } - return response.json() + return response.json() + } finally { + clearTimeout(timer) + } }
66-84: Inconsistent ID validation:createSubscriptionandchargedon't validate customer IDs.
getCustomervalidates the ID format withvalidateStripeId, butcreateSubscriptionandchargeacceptcustomerIdwithout validation. This inconsistency could allow malformed IDs to reach the Rust server.♻️ Proposed fix to add validation
bridge.registerFunction( { function_path: 'stripe.createSubscription' }, async (input: CreateSubscriptionInput): Promise<StripeSubscription> => { + validateStripeId(input.customerId, 'cus') return stripeRequest<StripeSubscription>('POST', '/v1/subscriptions', { customer: input.customerId, plan: input.plan, }) } ) bridge.registerFunction( { function_path: 'stripe.charge' }, async (input: ChargeInput): Promise<StripeCharge> => { + validateStripeId(input.customerId, 'cus') return stripeRequest<StripeCharge>('POST', '/v1/charges', { customer: input.customerId, amount: input.amount, currency: input.currency ?? 'usd', }) } )examples/polyglot-coordination/workers/data-requester.ts (1)
15-16: RedundantwaitReady()call afterstart().
python.start()already awaits the ping and setsready = truebefore resolving. The subsequentwaitReady()call is unnecessary.♻️ Proposed simplification
console.log('[Data Requester] Starting Python analytics subprocess...') await python.start() - await python.waitReady() console.log('[Data Requester] Python analytics ready (stdin/stdout IPC)')examples/polyglot-coordination/workflow/onboarding.ts (1)
42-103: Consider compensation logic for partial failures.The workflow creates resources across multiple services (user, Stripe customer, subscription). If a later step fails (e.g., step 5 user update), earlier resources (Stripe customer/subscription) remain orphaned with no rollback.
For an example, this may be intentional to keep the code simple. If production-readiness is desired, consider implementing the saga pattern with compensation handlers or at minimum documenting this limitation.
| except json.JSONDecodeError as e: | ||
| response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}} | ||
| except Exception as e: | ||
| response = {'id': request.get('id', 0), 'error': {'message': str(e)}} |
There was a problem hiding this comment.
Potential NameError if JSON parsing fails before request is defined.
On line 130, if json.JSONDecodeError is raised, the code correctly handles it. However, if any other exception occurs before request = json.loads(line) succeeds (unlikely but possible with malformed input), request will be undefined, causing a NameError when accessing request.get('id', 0).
🐛 Proposed fix
try:
request = json.loads(line)
req_id = request.get('id', 0)
method = request.get('method', '')
params = request.get('params', {})
result = handle_request(method, params)
response = {'id': req_id, 'result': result}
except json.JSONDecodeError as e:
response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}}
except Exception as e:
- response = {'id': request.get('id', 0), 'error': {'message': str(e)}}
+ response = {'id': req_id if 'req_id' in dir() else 0, 'error': {'message': str(e)}}Alternatively, extract req_id earlier:
+ req_id = 0
try:
request = json.loads(line)
req_id = request.get('id', 0)
...
except json.JSONDecodeError as e:
response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}}
except Exception as e:
- response = {'id': request.get('id', 0), 'error': {'message': str(e)}}
+ response = {'id': req_id, 'error': {'message': str(e)}}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except json.JSONDecodeError as e: | |
| response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}} | |
| except Exception as e: | |
| response = {'id': request.get('id', 0), 'error': {'message': str(e)}} | |
| except json.JSONDecodeError as e: | |
| response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}} | |
| except Exception as e: | |
| response = {'id': req_id if 'req_id' in dir() else 0, 'error': {'message': str(e)}} |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 129-129: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@examples/polyglot-coordination/services/python-analytics/analytics.py` around
lines 127 - 130, The except block can raise NameError because request may be
undefined if json.loads(line) fails; update the error handling around
json.loads(line) (where request = json.loads(line) is invoked) to capture the
request id safely by precomputing a fallback req_id (e.g., default 0) before the
try or by using a local variable req_id set after successful parsing and
referenced in both except handlers; ensure the json.JSONDecodeError handler
still returns id 0 while the generic Exception handler uses the safe req_id
value instead of directly accessing request.get('id', 0).
| const __dirname = dirname(fileURLToPath(import.meta.url)) | ||
| const PYTHON_SCRIPT = join(__dirname, '../services/python-analytics/analytics.py') |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check for tsconfig.json in examples/polyglot-coordination/
find examples/polyglot-coordination -name "tsconfig.json" -type f | head -20Repository: MotiaDev/iii-examples
Length of output: 109
🏁 Script executed:
# Check package.json in examples/polyglot-coordination/
find examples/polyglot-coordination -name "package.json" -type f | head -20Repository: MotiaDev/iii-examples
Length of output: 108
🏁 Script executed:
# Check the actual file structure to locate analytics.py
find examples/polyglot-coordination -name "analytics.py" -type fRepository: MotiaDev/iii-examples
Length of output: 134
🏁 Script executed:
# Examine the data-requester.ts file to understand the full context
cat examples/polyglot-coordination/workers/data-requester.tsRepository: MotiaDev/iii-examples
Length of output: 2158
🏁 Script executed:
# Check if there's a root tsconfig that might affect this
ls -la tsconfig*.json 2>/dev/null || echo "No root tsconfig found"Repository: MotiaDev/iii-examples
Length of output: 87
🏁 Script executed:
cat examples/polyglot-coordination/tsconfig.jsonRepository: MotiaDev/iii-examples
Length of output: 300
🏁 Script executed:
# Find and read the PythonIPC class to check if waitReady() is redundant
find examples/polyglot-coordination -name "python-ipc*" -o -name "*python-ipc*" | grep -E "\.(ts|js)$"Repository: MotiaDev/iii-examples
Length of output: 113
🏁 Script executed:
# Check if there's a lib directory with the python-ipc implementation
ls -la examples/polyglot-coordination/lib/ 2>/dev/null || echo "No lib directory"Repository: MotiaDev/iii-examples
Length of output: 356
🏁 Script executed:
cat examples/polyglot-coordination/lib/python-ipc.tsRepository: MotiaDev/iii-examples
Length of output: 3446
🏁 Script executed:
# Check package.json scripts to understand how this is built/run
cat examples/polyglot-coordination/package.jsonRepository: MotiaDev/iii-examples
Length of output: 1027
🏁 Script executed:
# Check the directory structure to see where Python script might be expected
tree examples/polyglot-coordination -L 3 2>/dev/null || find examples/polyglot-coordination -type f -name "*.py" -o -type d | head -30Repository: MotiaDev/iii-examples
Length of output: 501
Remove redundant waitReady() call.
The start() method in PythonIPC already awaits the initial ping and resolves the ready state before returning. Calling await python.waitReady() on line 16 after await python.start() is redundant and can be safely removed.
Additionally, note that path resolution using __dirname derived from import.meta.url works correctly in the current development setup (where bun runs TypeScript directly). However, if the project transitions to a compiled build process that outputs to the dist/ directory specified in tsconfig.json without copying the Python script alongside compiled workers, the relative path ../services/python-analytics/analytics.py would break at runtime. Consider using process.cwd() or an environment variable for more robust path resolution if deployment patterns change.
🤖 Prompt for AI Agents
In `@examples/polyglot-coordination/workers/data-requester.ts` around lines 7 - 8,
Remove the redundant await by deleting the extra await python.waitReady() call
that occurs immediately after await python.start(), since PythonIPC.start()
already awaits the initial ping and sets the ready state; locate references to
PythonIPC.start and the python.waitReady invocation in this file and remove the
latter. Also make the PYTHON_SCRIPT resolution more robust by replacing the
__dirname + relative path approach (the PYTHON_SCRIPT and __dirname symbols)
with a resolution that uses process.cwd() or a configurable environment variable
(e.g., PYTHON_ANALYTICS_PATH) so the script path won’t break when compiled into
a dist/ output layout.
| async function invoke<T>(ctx: Context, fn: string, input: unknown): Promise<T> { | ||
| ctx.logger.info(`Calling ${fn}`, { input }) | ||
| const result = (await bridge.invokeFunction(fn, input)) as T | ||
| ctx.logger.info(`${fn} returned`, { result }) | ||
| return result | ||
| } |
There was a problem hiding this comment.
PII logging risk: input/output may contain sensitive user data.
The invoke helper logs both input and result at info level. These payloads include user PII (email, name) throughout the workflow. This creates compliance risks (GDPR/CCPA) and could leak sensitive data to log aggregators or monitoring systems.
Consider redacting PII fields or logging only non-sensitive identifiers (e.g., userId, traceId).
🛡️ Proposed fix to redact sensitive fields
async function invoke<T>(ctx: Context, fn: string, input: unknown): Promise<T> {
- ctx.logger.info(`Calling ${fn}`, { input })
+ ctx.logger.info(`Calling ${fn}`)
+ ctx.logger.debug(`${fn} input`, { input }) // Move detailed payload to debug level
const result = (await bridge.invokeFunction(fn, input)) as T
- ctx.logger.info(`${fn} returned`, { result })
+ ctx.logger.info(`${fn} completed`)
+ ctx.logger.debug(`${fn} result`, { result }) // Move detailed payload to debug level
return result
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async function invoke<T>(ctx: Context, fn: string, input: unknown): Promise<T> { | |
| ctx.logger.info(`Calling ${fn}`, { input }) | |
| const result = (await bridge.invokeFunction(fn, input)) as T | |
| ctx.logger.info(`${fn} returned`, { result }) | |
| return result | |
| } | |
| async function invoke<T>(ctx: Context, fn: string, input: unknown): Promise<T> { | |
| ctx.logger.info(`Calling ${fn}`) | |
| ctx.logger.debug(`${fn} input`, { input }) // Move detailed payload to debug level | |
| const result = (await bridge.invokeFunction(fn, input)) as T | |
| ctx.logger.info(`${fn} completed`) | |
| ctx.logger.debug(`${fn} result`, { result }) // Move detailed payload to debug level | |
| return result | |
| } |
🤖 Prompt for AI Agents
In `@examples/polyglot-coordination/workflow/onboarding.ts` around lines 16 - 21,
The invoke helper (async function invoke) is logging raw input and result via
ctx.logger.info which may contain PII; update invoke to avoid logging full
payloads by redacting sensitive fields (email, name, ssn, etc.) or by extracting
and logging only safe identifiers (e.g., userId, traceId) before calling
ctx.logger.info for both the request and response, and ensure any helper used
for redaction is applied to both the input and result (reference invoke,
ctx.logger.info, and bridge.invokeFunction).
| try { | ||
| const users = await invoke<User[]>(ctx, 'users.list', {}) | ||
|
|
||
| const metrics = await invoke<OnboardingMetrics>(ctx, 'analytics.onboardingMetrics', { | ||
| users, | ||
| }) |
There was a problem hiding this comment.
Data minimization: full user objects sent to analytics.
Passing the entire users array (with PII like email, name) to the analytics service for metrics calculation may be unnecessary. For metrics aggregation, consider sending only the required fields (e.g., user IDs, plan types, risk scores) rather than complete user objects.
This reduces PII exposure across service boundaries and improves efficiency for large datasets.
🤖 Prompt for AI Agents
In `@examples/polyglot-coordination/workflow/onboarding.ts` around lines 115 -
120, The code passes full User objects from invoke('users.list') into
invoke('analytics.onboardingMetrics'), exposing PII; instead, map the users
array to a minimized payload containing only the fields needed for metrics
(e.g., id, plan, riskScore or whatever OnboardingMetrics expects) and send that
trimmed array to invoke('analytics.onboardingMetrics'), updating any type
annotations (OnboardingMetrics or the invoke generic) accordingly so only the
reduced user shape is transmitted from the onboarding function.
- Add input validation for user creation (email format, length limits) - Add Python script existence check before spawning - Add PYTHON_PATH environment variable support - Replace ASCII diagram with Mermaid flowchart - Add polyglot-coordination to main README examples table
Summary
This PR adds a new polyglot-coordination example demonstrating iii-engine seamlessly coordinating services across Python, Node.js, and Rust without requiring HTTP endpoints for all services.
Key Differentiator
The Python service communicates via stdin/stdout IPC (not HTTP), proving that iii abstracts away transport mechanisms entirely.
Architecture
Business Scenario: SaaS User Onboarding
Files Added
lib/- Shared bridge factory, Python IPC manager, TypeScript typesworkers/- stripe-bridge, data-requester, user-serviceworkflow/- onboarding orchestration with API triggersservices/python-analytics/- Python stdin/stdout JSON-RPC serviceservices/rust-stripe/- Rust Axum HTTP server mimicking Stripe APICode Quality
Test plan
iiicd examples/polyglot-coordination && pnpm stripepnpm devcurl -X POST http://localhost:3111/onboard -H "Content-Type: application/json" -d '{"email": "test@example.com", "name": "Test User", "plan": "pro"}'curl http://localhost:3111/onboard/metricsSummary by CodeRabbit
New Features
Documentation
Chores