diff --git a/.claude/skills/scaffold-act-app/SKILL.md b/.claude/skills/scaffold-act-app/SKILL.md index 0da87778..f589072a 100644 --- a/.claude/skills/scaffold-act-app/SKILL.md +++ b/.claude/skills/scaffold-act-app/SKILL.md @@ -123,7 +123,7 @@ my-app/ │ │ │ ├── index.ts # Router composition + AppRouter type │ │ │ ├── trpc.ts # tRPC init + middleware (public/authed/admin) │ │ │ ├── context.ts # Request context + token verification -│ │ │ ├── helpers.ts # drainAll(), eventBus, serialization +│ │ │ ├── helpers.ts # serializeEvents() for SSE payloads │ │ │ ├── auth.ts # Token signing, password hashing │ │ │ ├── auth.routes.ts # Auth endpoints (login, signup, OAuth) │ │ │ ├── domain.routes.ts # Domain mutations + queries @@ -326,7 +326,7 @@ export const app = act() **`withActor()`**: Sets the actor type for the entire app. All `app.do()` calls will require `target.actor` to satisfy `T`. Define `AppActor` extending `Actor` in schemas.ts. -> **Note:** When using reactions with `drain()`, you must call `app.correlate()` before `app.drain()` to discover target streams. Use the `scheduleDrain()` pattern for non-blocking, debounced processing that signals the UI only after all reactions complete. See [act-api.md](act-api.md) §6. +> **Note:** When using reactions with `drain()`, you must call `app.correlate()` before `app.drain()` to discover target streams. Use `app.settle()` for non-blocking, debounced correlate→drain that emits a `"settled"` event when the system is consistent. See [act-api.md](act-api.md) §6. ### Step 7 — tRPC API (in `packages/app/src/api/`) @@ -337,16 +337,16 @@ Decompose the API into focused route modules. See [monorepo-template.md](monorep | `trpc.ts` | tRPC init + middleware | `publicProcedure`, `authedProcedure`, `adminProcedure` | | `context.ts` | Request context | Extract `AppActor` from Bearer token via `verifyToken()` | | `auth.ts` | Token + password crypto | HMAC-signed tokens, scrypt password hashing (zero deps) | -| `helpers.ts` | `scheduleDrain()` + `eventBus` | Non-blocking, debounced drain; signals SSE after reactions complete | +| `helpers.ts` | Event serialization | `serializeEvents()` for SSE payloads | | `auth.routes.ts` | Auth endpoints | login, signup, me, assignRole, listUsers | | `domain.routes.ts` | Domain mutations + queries | `app.do()` + `scheduleDrain()` per mutation; query projections | -| `events.routes.ts` | SSE subscription | `tracked()` yields with `eventBus` for live updates | +| `events.routes.ts` | SSE subscription | `tracked()` yields with `app.on("settled")` for live updates | | `index.ts` | Router composition | `t.mergeRouters()` + export `AppRouter` type | **Key rules:** -- Call `scheduleDrain()` after every `app.do()` in mutations — non-blocking, returns immediately +- Call `app.settle()` after every `app.do()` in mutations — non-blocking, returns immediately - Use `authedProcedure` / `adminProcedure` for authorization (middleware narrows `ctx.actor`) -- SSE uses `eventBus.emit("committed")` which fires only after `correlate()` + `drain()` complete +- SSE uses `app.on("settled", ...)` which fires only after `correlate()` + `drain()` complete ### Step 8 — React Client (in `packages/app/src/client/`) @@ -416,7 +416,7 @@ See [monorepo-template.md](monorepo-template.md) for complete `package.json` fil 9. **ESM only** — All packages use `"type": "module"` and `.js` import extensions. 10. **Single-key records** — `state({})`, `.on({})`, `.emits({})` take single-key records. Multi-key throws at runtime. 11. **API decomposition** — Split tRPC router into focused route files (`auth.routes.ts`, `domain.routes.ts`, `events.routes.ts`). Keep `trpc.ts` for init + middleware, `context.ts` for request context, `helpers.ts` for shared utilities. -12. **scheduleDrain() after mutations** — Call `scheduleDrain()` after every `app.do()` in API mutations. This is non-blocking (returns immediately), debounced (coalesces rapid commits), and signals the UI via eventBus only after all reactions and projections are fully processed. +12. **settle() after mutations** — Call `app.settle()` after every `app.do()` in API mutations. This is non-blocking (returns immediately), debounced (coalesces rapid commits), and emits a `"settled"` event only after all correlate/drain iterations and projections are fully processed. ## Error Handling @@ -441,7 +441,7 @@ For production deployment (PostgresStore, background processing, automated jobs) - [ ] Domain package has no infrastructure dependencies - [ ] All packages use `"type": "module"` and TypeScript strict mode - [ ] tRPC API decomposed into route files with typed middleware -- [ ] SSE subscription wired with eventBus for live events -- [ ] `scheduleDrain()` called after mutations (non-blocking, debounced, signals UI after reactions) +- [ ] SSE subscription wired with `app.on("settled")` for live events +- [ ] `app.settle()` called after mutations (non-blocking, debounced, emits "settled" after reactions) - [ ] Client uses `splitLink` for SSE subscriptions + HTTP for mutations/queries - [ ] Types compile with `npx tsc --noEmit` diff --git a/.claude/skills/scaffold-act-app/act-api.md b/.claude/skills/scaffold-act-app/act-api.md index 7a127761..d43e8379 100644 --- a/.claude/skills/scaffold-act-app/act-api.md +++ b/.claude/skills/scaffold-act-app/act-api.md @@ -121,7 +121,7 @@ Do NOT use `z.object({})` — use `ZodEmpty` for consistency and correct validat .to("fixed-stream-name") // static target ``` -## 6. Correlate Before Drain — scheduleDrain() Pattern +## 6. Correlate Before Drain — settle() Pattern `app.correlate()` scans events, resolves reaction targets, and **registers new streams** with the store via `store().lease()`. Without this step, `drain()` won't find streams to process. @@ -134,47 +134,36 @@ await app.drain(); await app.drain(); // returns empty results ``` -**scheduleDrain()** — the production pattern for API mutations. Non-blocking, debounced, signals UI only after reactions complete: +**`app.settle()`** — the production pattern for API mutations. Non-blocking, debounced, runs correlate→drain in a loop, emits `"settled"` when the system reaches a consistent state: ```typescript -import { EventEmitter } from "node:events"; - -// Event bus for SSE — only signals AFTER reactions complete -export const eventBus = new EventEmitter(); -eventBus.setMaxListeners(100); - -let drainTimer: ReturnType | null = null; -let draining = false; - -async function executeDrain() { - if (draining) return; // skip if already running - draining = true; - try { - for (let i = 0; i < 2; i++) { // 2 iterations for chained reactions - const { leased } = await app.correlate({ after: -1, limit: 100 }); - if (leased.length === 0) break; - await app.drain({ streamLimit: 10, eventLimit: 100 }); - } - } finally { - draining = false; - } - eventBus.emit("committed"); // signal UI AFTER reactions + projections done -} +// In API mutations — fire-and-forget +await app.do("CreateItem", target, input); +app.settle(); // non-blocking, debounced — UI notified via "settled" event + +// Subscribe to settled event for SSE notifications +app.on("settled", (drain) => { + // drain has { fetched, leased, acked, blocked } + // notify SSE clients that the system is consistent +}); +``` -export function scheduleDrain() { - if (drainTimer) clearTimeout(drainTimer); - drainTimer = setTimeout(() => { - drainTimer = null; - executeDrain().catch(console.error); - }, 10); // 10ms debounce coalesces rapid commits -} +**`settle()` options:** +```typescript +app.settle({ + debounceMs: 10, // debounce window (default: 10ms) + correlate: { after: -1, limit: 100 }, // correlate query (default) + maxPasses: 5, // max correlate→drain loops (default: 5) + streamLimit: 10, // passed to drain() + eventLimit: 100, // passed to drain() +}); ``` **Key design:** -- **Non-blocking**: `scheduleDrain()` returns immediately — mutations don't wait for drain -- **Debounced**: Multiple rapid `app.do()` calls coalesce into one drain cycle (10ms window) -- **Guarded**: `draining` flag prevents concurrent drain cycles -- **UI signal after completion**: `eventBus.emit("committed")` fires only after all correlate/drain iterations finish, so SSE clients see a consistent view +- **Non-blocking**: `settle()` returns immediately — mutations don't wait for drain +- **Debounced**: Multiple rapid `app.do()` calls coalesce into one settle cycle (10ms window) +- **Guarded**: Internal `_settling` flag prevents concurrent settle cycles +- **Lifecycle event**: `"settled"` fires only after all correlate/drain iterations finish, so SSE clients see a consistent view **In tests:** Call `correlate()` + `drain()` directly (synchronous, no debounce): ```typescript @@ -185,11 +174,11 @@ it("should process reactions", async () => { }); ``` -**In API mutations:** Call `scheduleDrain()` and return immediately: +**In API mutations:** Call `settle()` and return immediately: ```typescript CreateItem: authedProcedure.mutation(async ({ input, ctx }) => { await app.do("CreateItem", target, input); - scheduleDrain(); // fire-and-forget — UI notified via SSE + app.settle(); // fire-and-forget — UI notified via "settled" event return { success: true }; }); ``` diff --git a/.claude/skills/scaffold-act-app/monorepo-template.md b/.claude/skills/scaffold-act-app/monorepo-template.md index aeee9bf6..36c5011d 100644 --- a/.claude/skills/scaffold-act-app/monorepo-template.md +++ b/.claude/skills/scaffold-act-app/monorepo-template.md @@ -313,12 +313,9 @@ export function verifyPassword(password: string, stored: string): boolean { } ``` -### packages/app/src/api/helpers.ts (debounced drainAll + event bus) +### packages/app/src/api/helpers.ts (event serialization) ```typescript -import { app } from "@my-app/domain"; -import { EventEmitter } from "node:events"; - export type SerializedEvent = { id: number; name: string; @@ -340,41 +337,6 @@ export function serializeEvents(events: Array<{ id: number; name: unknown; data: meta: e.meta as SerializedEvent["meta"], })); } - -// Event bus for SSE subscriptions — only signals AFTER reactions complete -export const eventBus = new EventEmitter(); -eventBus.setMaxListeners(100); - -// Debounced, non-blocking drain — coalesces rapid commits into one drain cycle -let drainTimer: ReturnType | null = null; -let draining = false; - -async function executeDrain() { - if (draining) return; - draining = true; - try { - for (let i = 0; i < 2; i++) { - const { leased } = await app.correlate({ after: -1, limit: 100 }); - if (leased.length === 0) break; - await app.drain({ streamLimit: 10, eventLimit: 100 }); - } - } finally { - draining = false; - } - // Signal UI AFTER all reactions and projections are processed - eventBus.emit("committed"); -} - -/** - * Schedule a drain cycle (debounced, non-blocking). - * Call this after app.do() — it returns immediately and drains in the background. - * The eventBus "committed" event fires only after reactions complete, - * so SSE clients receive a consistent view of the world. - */ -export function scheduleDrain() { - if (drainTimer) clearTimeout(drainTimer); - drainTimer = setTimeout(() => { drainTimer = null; executeDrain().catch(console.error); }, 10); -} ``` ### packages/app/src/api/domain.routes.ts (domain commands + queries) @@ -382,7 +344,6 @@ export function scheduleDrain() { ```typescript import { app, getItems } from "@my-app/domain"; import { z } from "zod"; -import { scheduleDrain } from "./helpers.js"; import { t, authedProcedure, adminProcedure, publicProcedure } from "./trpc.js"; export const domainRouter = t.router({ @@ -391,7 +352,7 @@ export const domainRouter = t.router({ .mutation(async ({ input, ctx }) => { const target = { stream: crypto.randomUUID(), actor: ctx.actor }; await app.do("CreateItem", target, input); - scheduleDrain(); // non-blocking — UI notified via SSE after reactions complete + app.settle(); // non-blocking — UI notified via "settled" event after reactions complete return { success: true, id: target.stream }; }), @@ -404,7 +365,7 @@ export const domainRouter = t.router({ ```typescript import { app } from "@my-app/domain"; import { tracked } from "@trpc/server"; -import { eventBus, serializeEvents } from "./helpers.js"; +import { serializeEvents } from "./helpers.js"; import { t, publicProcedure } from "./trpc.js"; export const eventsRouter = t.router({ @@ -416,8 +377,8 @@ export const eventsRouter = t.router({ let lastId = existing.length > 0 ? existing[existing.length - 1].id : -1; let notify: (() => void) | null = null; - const onCommitted = () => { if (notify) { notify(); notify = null; } }; - eventBus.on("committed", onCommitted); + const onSettled = () => { if (notify) { notify(); notify = null; } }; + app.on("settled", onSettled); try { while (!signal?.aborted) { @@ -434,7 +395,7 @@ export const eventsRouter = t.router({ } } } finally { - eventBus.off("committed", onCommitted); + app.off("settled", onSettled); } }), }); @@ -447,7 +408,6 @@ import { app, getAllUsers, getUserByEmail, getUserByProviderId, systemActor } fr import { TRPCError } from "@trpc/server"; import { z } from "zod"; import { hashPassword, signToken, verifyPassword } from "./auth.js"; -import { scheduleDrain } from "./helpers.js"; import { t, publicProcedure, authedProcedure, adminProcedure } from "./trpc.js"; export const authRouter = t.router({ @@ -472,7 +432,7 @@ export const authRouter = t.router({ await app.do("RegisterUser", { stream: input.username, actor: { ...systemActor, name: "AuthSystem" } }, { email: input.username, name: input.name, provider: "local", providerId: input.username, passwordHash, }); - scheduleDrain(); + app.settle(); const user = getUserByEmail(input.username); if (!user) throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "Failed to register" }); const token = signToken({ email: user.email }); @@ -486,7 +446,7 @@ export const authRouter = t.router({ .mutation(async ({ input, ctx }) => { if (!getUserByEmail(input.email)) throw new TRPCError({ code: "NOT_FOUND", message: "User not found" }); await app.do("AssignRole", { stream: input.email, actor: ctx.actor }, { role: input.role }); - scheduleDrain(); + app.settle(); return { success: true }; }), @@ -572,7 +532,7 @@ const server = createHTTPServer({ const port = Number(process.env.PORT) || 4000; server.listen(port); -await app.drain(); +app.settle(); console.log(`Server listening on http://localhost:${port}`); ``` diff --git a/.claude/skills/scaffold-act-app/production.md b/.claude/skills/scaffold-act-app/production.md index 6172744e..39314a02 100644 --- a/.claude/skills/scaffold-act-app/production.md +++ b/.claude/skills/scaffold-act-app/production.md @@ -1,6 +1,6 @@ # Production Deployment -Covers production-specific concerns beyond what's in [monorepo-template.md](monorepo-template.md). The template already provides `scheduleDrain()`, `eventBus`, auth crypto, `createContext()`, and dev seed scripts. +Covers production-specific concerns beyond what's in [monorepo-template.md](monorepo-template.md). The template already provides `app.settle()`, auth crypto, `createContext()`, and dev seed scripts. ## Switch to PostgreSQL @@ -23,7 +23,7 @@ Install: `pnpm -F @my-app/app add @rotorsoft/act-pg` ## Background Correlation (Large-Scale) -For high-throughput deployments, use periodic background correlation instead of (or in addition to) the debounced `scheduleDrain()` from helpers.ts: +For high-throughput deployments, use periodic background correlation instead of (or in addition to) `app.settle()`: ```typescript // Periodic correlation resolution — discovers new reaction streams every 3s @@ -90,6 +90,9 @@ await app.drain({ // Observe all state changes app.on("committed", (snapshots) => { /* log, metrics */ }); +// React when system settles after settle() completes +app.on("settled", (drain) => { /* notify SSE clients, update caches */ }); + // Catch reaction failures app.on("blocked", (leases) => { /* alert on blocked streams */ }); diff --git a/AGENT.md b/AGENT.md index a20dbae1..400a937e 100644 --- a/AGENT.md +++ b/AGENT.md @@ -200,6 +200,9 @@ const snapshot = await app.load(Counter, "counter1"); // Process reactions (event-driven workflows) await app.drain({ streamLimit: 100, eventLimit: 1000 }); + +// Debounced correlate→drain for production (non-blocking, emits "settled" when done) +app.settle(); ``` ### Event Sourcing Model @@ -254,6 +257,7 @@ Dynamic stream discovery through correlation metadata: - Each action/event includes `correlation` (request ID) and `causation` (what triggered it) - Reactions can discover new streams to process by querying uncommitted events - `app.correlate()` - Manual correlation +- `app.settle()` - Debounced, non-blocking correlate→drain loop; emits `"settled"` when done - `app.start_correlations()` - Periodic background correlation ### Invariants diff --git a/CLAUDE.md b/CLAUDE.md index ed6d637e..ce08125e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -202,6 +202,9 @@ const snapshot = await app.load(Counter, "counter1"); // Process reactions (event-driven workflows) await app.drain({ streamLimit: 100, eventLimit: 1000 }); + +// Debounced correlate→drain for production (non-blocking, emits "settled" when done) +app.settle(); ``` ### Event Sourcing Model @@ -256,9 +259,10 @@ Dynamic stream discovery through correlation metadata: - Each action/event includes `correlation` (request ID) and `causation` (what triggered it) - Reactions can discover new streams to process by querying uncommitted events - `app.correlate()` - Manual correlation (must be called before `drain()` to discover target streams) +- `app.settle()` - Debounced, non-blocking correlate→drain loop; emits `"settled"` when done - `app.start_correlations()` - Periodic background correlation -**Important:** `correlate()` must be called before `drain()` to register reaction target streams with the store. Without correlation, `drain()` has no streams to process. In tests: `await app.correlate(); await app.drain();`. In production: use `app.on("committed", () => { app.correlate().then(() => app.drain()).catch(console.error); })` or `app.start_correlations()` for background discovery. +**Important:** `correlate()` must be called before `drain()` to register reaction target streams with the store. Without correlation, `drain()` has no streams to process. In tests: `await app.correlate(); await app.drain();`. In production: use `app.settle()` for debounced correlate→drain with a `"settled"` lifecycle event, or `app.start_correlations()` for background discovery. ### Invariants @@ -400,6 +404,7 @@ Both stream leasing and version-based optimistic concurrency must be implemented - Set `LOG_LEVEL=debug` or `LOG_LEVEL=trace` for verbose logging - Use `app.on("committed", ...)` to observe all state changes +- Use `app.on("settled", ...)` to react when `settle()` completes all correlate/drain passes - Use `app.on("blocked", ...)` to catch reaction processing failures - Query events directly: `await app.query_array({ stream: "mystream" })` diff --git a/libs/act/README.md b/libs/act/README.md index e3298249..8c843712 100644 --- a/libs/act/README.md +++ b/libs/act/README.md @@ -235,12 +235,23 @@ While events within a stream are processed in order, multiple streams can be pro The `drain` method processes pending reactions across all subscribed streams: ```typescript -// Process pending reactions +// Process pending reactions (synchronous, single cycle) await app.drain({ streamLimit: 100, eventLimit: 1000 }); + +// Debounced correlate→drain for production (non-blocking, emits "settled" when done) +app.settle(); + +// Subscribe to the "settled" lifecycle event +app.on("settled", (drain) => { + // drain has { fetched, leased, acked, blocked } + // notify SSE clients, update caches, etc. +}); ``` Drain cycles continue until all reactions have caught up to the latest events. Consumers only process new work — acknowledged events are skipped, and failed events are re-leased automatically. +The `settle()` method is the recommended production pattern — it debounces rapid commits (10ms default), runs correlate→drain in a loop until the system is consistent, and emits a `"settled"` event when done. + ### Real-Time Notifications When using the PostgreSQL backend, the store emits `NOTIFY` events on each commit, enabling consumers to react immediately via `LISTEN` rather than polling. This reduces latency and unnecessary database queries in production deployments. diff --git a/libs/act/src/act.ts b/libs/act/src/act.ts index 13257d96..bbe16d11 100644 --- a/libs/act/src/act.ts +++ b/libs/act/src/act.ts @@ -15,6 +15,7 @@ import type { Schema, SchemaRegister, Schemas, + SettleOptions, Snapshot, State, Target, @@ -58,7 +59,10 @@ export class Act< private _emitter = new EventEmitter(); private _drain_locked = false; private _drain_lag2lead_ratio = 0.5; - private _correlation_interval: NodeJS.Timeout | undefined = undefined; + private _correlation_timer: ReturnType | undefined = + undefined; + private _settle_timer: ReturnType | undefined = undefined; + private _settling = false; /** * Emit a lifecycle event (internal use, but can be used for custom listeners). @@ -70,6 +74,7 @@ export class Act< emit(event: "committed", args: Snapshot[]): boolean; emit(event: "acked", args: Lease[]): boolean; emit(event: "blocked", args: Array): boolean; + emit(event: "settled", args: Drain): boolean; emit(event: string, args: any): boolean { return this._emitter.emit(event, args); } @@ -90,6 +95,7 @@ export class Act< event: "blocked", listener: (args: Array) => void ): this; + on(event: "settled", listener: (args: Drain) => void): this; on(event: string, listener: (args: any) => void): this { this._emitter.on(event, listener); return this; @@ -111,6 +117,7 @@ export class Act< event: "blocked", listener: (args: Array) => void ): this; + off(event: "settled", listener: (args: Drain) => void): this; off(event: string, listener: (args: any) => void): this { this._emitter.off(event, listener); return this; @@ -129,6 +136,7 @@ export class Act< dispose(() => { this._emitter.removeAllListeners(); this.stop_correlations(); + this.stop_settling(); return Promise.resolve(); }); } @@ -467,7 +475,7 @@ export class Act< /** * Processes pending reactions by draining uncommitted events from the event store. * - * The drain process: + * Runs a single drain cycle: * 1. Polls the store for streams with uncommitted events * 2. Leases streams to prevent concurrent processing * 3. Fetches events for each leased stream @@ -477,7 +485,8 @@ export class Act< * Drain uses a dual-frontier strategy to balance processing of new streams (lagging) * vs active streams (leading). The ratio adapts based on event pressure. * - * Call this method periodically in a background loop, or after committing events. + * Call `correlate()` before `drain()` to discover target streams. For a higher-level + * API that handles debouncing, correlation, and signaling automatically, use {@link settle}. * * @param options - Drain configuration options * @param options.streamLimit - Maximum number of streams to process per cycle (default: 10) @@ -485,46 +494,20 @@ export class Act< * @param options.leaseMillis - Lease duration in milliseconds (default: 10000) * @returns Drain statistics with fetched, leased, acked, and blocked counts * - * @example Basic drain loop + * @example In tests and scripts * ```typescript - * // Process reactions after each action * await app.do("createUser", target, payload); + * await app.correlate(); * await app.drain(); * ``` * - * @example Background drain worker - * ```typescript - * setInterval(async () => { - * try { - * const result = await app.drain({ - * streamLimit: 20, - * eventLimit: 50 - * }); - * if (result.acked.length) { - * console.log(`Processed ${result.acked.length} streams`); - * } - * } catch (error) { - * console.error("Drain error:", error); - * } - * }, 5000); // Every 5 seconds - * ``` - * - * @example With lifecycle listeners + * @example In production, prefer settle() * ```typescript - * app.on("acked", (leases) => { - * console.log(`Acknowledged ${leases.length} streams`); - * }); - * - * app.on("blocked", (blocked) => { - * console.error(`Blocked ${blocked.length} streams due to errors`); - * blocked.forEach(({ stream, error }) => { - * console.error(`Stream ${stream}: ${error}`); - * }); - * }); - * - * await app.drain(); + * await app.do("CreateItem", target, input); + * app.settle(); // debounced correlate→drain, emits "settled" * ``` * + * @see {@link settle} for debounced correlate→drain with lifecycle events * @see {@link correlate} for dynamic stream discovery * @see {@link start_correlations} for automatic correlation */ @@ -794,11 +777,11 @@ export class Act< frequency = 10_000, callback?: (leased: Lease[]) => void ): boolean { - if (this._correlation_interval) return false; + if (this._correlation_timer) return false; const limit = query.limit || 100; let after = query.after || -1; - this._correlation_interval = setInterval( + this._correlation_timer = setInterval( () => this.correlate({ ...query, after, limit }) .then((result) => { @@ -829,9 +812,81 @@ export class Act< * @see {@link start_correlations} */ stop_correlations() { - if (this._correlation_interval) { - clearInterval(this._correlation_interval); - this._correlation_interval = undefined; + if (this._correlation_timer) { + clearInterval(this._correlation_timer); + this._correlation_timer = undefined; } } + + /** + * Cancels any pending or active settle cycle. + * + * @see {@link settle} + */ + stop_settling() { + if (this._settle_timer) { + clearTimeout(this._settle_timer); + this._settle_timer = undefined; + } + } + + /** + * Debounced, non-blocking correlate→drain cycle. + * + * Call this after `app.do()` to schedule a background drain. Multiple rapid + * calls within the debounce window are coalesced into a single cycle. Runs + * correlate→drain in a loop until the system reaches a consistent state, + * then emits the `"settled"` lifecycle event. + * + * @param options - Settle configuration options + * @param options.debounceMs - Debounce window in milliseconds (default: 10) + * @param options.correlate - Query filter for correlation scans (default: `{ after: -1, limit: 100 }`) + * @param options.maxPasses - Maximum correlate→drain loops (default: 5) + * @param options.streamLimit - Maximum streams per drain cycle (default: 10) + * @param options.eventLimit - Maximum events per stream (default: 10) + * @param options.leaseMillis - Lease duration in milliseconds (default: 10000) + * + * @example API mutations + * ```typescript + * await app.do("CreateItem", target, input); + * app.settle(); // non-blocking, returns immediately + * + * app.on("settled", (drain) => { + * // notify SSE clients, invalidate caches, etc. + * }); + * ``` + * + * @see {@link drain} for single synchronous drain cycles + * @see {@link correlate} for manual correlation + */ + settle(options: SettleOptions = {}): void { + const { + debounceMs = 10, + correlate: correlateQuery = { after: -1, limit: 100 }, + maxPasses = 5, + ...drainOptions + } = options; + + if (this._settle_timer) clearTimeout(this._settle_timer); + this._settle_timer = setTimeout(() => { + this._settle_timer = undefined; + if (this._settling) return; + this._settling = true; + + (async () => { + let lastDrain: Drain | undefined; + for (let i = 0; i < maxPasses; i++) { + const { leased } = await this.correlate(correlateQuery); + if (leased.length === 0 && i > 0) break; + lastDrain = await this.drain(drainOptions); + if (!lastDrain.acked.length && !lastDrain.blocked.length) break; + } + if (lastDrain) this.emit("settled", lastDrain); + })() + .catch((err) => logger.error(err)) + .finally(() => { + this._settling = false; + }); + }, debounceMs); + } } diff --git a/libs/act/src/types/reaction.ts b/libs/act/src/types/reaction.ts index b023ab4b..afc153c8 100644 --- a/libs/act/src/types/reaction.ts +++ b/libs/act/src/types/reaction.ts @@ -8,6 +8,7 @@ import type { Actor, Committed, Dispatcher, + Query, Schema, Schemas, Snapshot, @@ -258,3 +259,20 @@ export type Drain = { readonly acked: Lease[]; readonly blocked: Array; }; + +/** + * Options for the debounced settle cycle. + * + * Extends {@link DrainOptions} with parameters that control the debounce + * window, the correlation query, and the maximum number of correlate→drain + * passes. + * + * @property debounceMs - Debounce window in milliseconds (default: 10) + * @property correlate - Query filter for correlation scans (default: `{ after: -1, limit: 100 }`) + * @property maxPasses - Maximum correlate→drain loops (default: 5) + */ +export type SettleOptions = DrainOptions & { + readonly debounceMs?: number; + readonly correlate?: Query; + readonly maxPasses?: number; +}; diff --git a/libs/act/test/act.spec.ts b/libs/act/test/act.spec.ts index eb6aba86..5483c03f 100644 --- a/libs/act/test/act.spec.ts +++ b/libs/act/test/act.spec.ts @@ -73,27 +73,27 @@ describe("act", () => { await app.correlate(); // should drain the first two events... third event should throw and stop drain - let drained = await app.drain({ leaseMillis: 1 }); // 1ms leases to test blocking + let drained = await app.drain({ leaseMillis: 1 }); expect(drained.acked.length).toBe(1); expect(drained.acked[0].at).toBe(1); expect(onIncremented).toHaveBeenCalledTimes(2); expect(onDecremented).toHaveBeenCalledTimes(1); // first fully failed - drained = await app.drain({ leaseMillis: 1 }); // 1ms leases to test blocking + drained = await app.drain({ leaseMillis: 1 }); console.log("first try", drained); expect(drained.acked.length).toBe(0); expect(onDecremented).toHaveBeenCalledTimes(2); // second fully failed (first retry) - drained = await app.drain({ leaseMillis: 1 }); // 1ms leases to test blocking + drained = await app.drain({ leaseMillis: 1 }); console.log("second try", drained); expect(drained.acked.length).toBe(0); expect(drained.blocked.length).toBe(0); expect(onDecremented).toHaveBeenCalledTimes(3); // third fully failed (second retry) - should block - drained = await app.drain({ leaseMillis: 1 }); // 1ms leases to test blocking + drained = await app.drain({ leaseMillis: 1 }); console.log("third try", drained); expect(drained.acked.length).toBe(0); expect(drained.blocked.length).toBe(1); @@ -213,4 +213,148 @@ describe("act", () => { expect(drained.leased.length).toBe(0); mockedPoll.mockClear(); }); + + describe("settle", () => { + it("should debounce multiple rapid calls into a single settle cycle", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + await app.do("increment", { stream: "debounce-test", actor }, {}); + // Rapid-fire settle calls — only the last timer fires + app.settle({ debounceMs: 20 }); + app.settle({ debounceMs: 20 }); + app.settle({ debounceMs: 20 }); + + // Wait for debounce + processing + await sleep(300); + expect(settledListener).toHaveBeenCalledTimes(1); + const drain = settledListener.mock.calls[0][0]; + expect(drain).toHaveProperty("fetched"); + expect(drain).toHaveProperty("leased"); + expect(drain).toHaveProperty("acked"); + expect(drain).toHaveProperty("blocked"); + + app.off("settled", settledListener); + }); + + it("should emit 'settled' event after reactions complete", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + await app.do("increment", { stream: "settled-event", actor }, {}); + app.settle({ debounceMs: 5 }); + + await sleep(300); + expect(settledListener).toHaveBeenCalledTimes(1); + + app.off("settled", settledListener); + }); + + it("should be a no-op when already settling", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + // Slow down correlate so the first settle is still running when the second fires + const originalCorrelate = app.correlate.bind(app); + const spy = vi.spyOn(app, "correlate").mockImplementation(async (q) => { + await sleep(100); + return originalCorrelate(q); + }); + + await app.do("increment", { stream: "settle-guard", actor }, {}); + // First call starts the cycle + app.settle({ debounceMs: 1 }); + await sleep(20); // Let the timer fire and settle begin + // Second call while settling — timer fires but guard returns + app.settle({ debounceMs: 1 }); + await sleep(500); + + // Only 1 settled emission — second was guarded + expect(settledListener).toHaveBeenCalledTimes(1); + + spy.mockRestore(); + app.off("settled", settledListener); + }); + + it("should respect custom options", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + await app.do("increment", { stream: "settle-opts", actor }, {}); + app.settle({ + debounceMs: 5, + correlate: { after: -1, limit: 50 }, + maxPasses: 2, + streamLimit: 5, + eventLimit: 5, + }); + + await sleep(300); + expect(settledListener).toHaveBeenCalledTimes(1); + + app.off("settled", settledListener); + }); + + it("should stop_settling cancel a pending timer", async () => { + app.settle({ debounceMs: 500 }); + app.stop_settling(); // cancels before it fires + await sleep(600); + // no error, no settle cycle ran + }); + + it("should not emit settled when maxPasses is 0", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + app.settle({ debounceMs: 1, maxPasses: 0 }); + await sleep(100); + expect(settledListener).not.toHaveBeenCalled(); + + app.off("settled", settledListener); + }); + + it("should break early when correlate returns no leases on second pass", async () => { + const settledListener = vi.fn(); + app.on("settled", settledListener); + + // Mock correlate: first call returns a lease so loop continues, + // second call returns empty → triggers i>0 break + let correlateCount = 0; + const correlateSpy = vi.spyOn(app, "correlate").mockImplementation(() => { + correlateCount++; + if (correlateCount === 1) + return Promise.resolve({ + leased: [{ stream: "x", at: 0, by: "test" }], + } as any); + return Promise.resolve({ leased: [] } as any); + }); + // Mock drain to return acked work so the loop doesn't break at line 882 + const drainSpy = vi.spyOn(app, "drain").mockResolvedValue({ + fetched: [], + leased: [], + acked: [{ stream: "x", at: 1, by: "test", retry: 0, lagging: false }], + blocked: [], + }); + + app.settle({ debounceMs: 1, maxPasses: 5 }); + await sleep(300); + expect(settledListener).toHaveBeenCalledTimes(1); + expect(correlateCount).toBe(2); // ran 2 passes, broke on second + + correlateSpy.mockRestore(); + drainSpy.mockRestore(); + app.off("settled", settledListener); + }); + + it("should handle errors in settle gracefully", async () => { + const spy = vi + .spyOn(app, "correlate") + .mockRejectedValue(new Error("settle-error")); + + app.settle({ debounceMs: 1 }); + await sleep(300); + // settle caught the error internally — no unhandled rejection + spy.mockRestore(); + }); + }); }); diff --git a/performance/act-performance/src/index.ts b/performance/act-performance/src/index.ts index 6d8f727a..948fb0c2 100644 --- a/performance/act-performance/src/index.ts +++ b/performance/act-performance/src/index.ts @@ -1,7 +1,6 @@ import { act, type Committed, - type DrainOptions, type Schemas, sleep, store, @@ -164,7 +163,7 @@ async function main( // 👉 Change drain options to evaluate performance at different load levels const drainFrequency = 500; -const drainOptions: DrainOptions = { +const drainOptions = { streamLimit: 15, eventLimit: 20, };