Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .claude/skills/scaffold-act-app/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ my-app/
│ │ │ ├── index.ts # Router composition + AppRouter type
│ │ │ ├── trpc.ts # tRPC init + middleware (public/authed/admin)
│ │ │ ├── context.ts # Request context + token verification
│ │ │ ├── helpers.ts # drainAll(), eventBus, serialization
│ │ │ ├── helpers.ts # serializeEvents() for SSE payloads
│ │ │ ├── auth.ts # Token signing, password hashing
│ │ │ ├── auth.routes.ts # Auth endpoints (login, signup, OAuth)
│ │ │ ├── domain.routes.ts # Domain mutations + queries
Expand Down Expand Up @@ -326,7 +326,7 @@ export const app = act()

**`withActor<T>()`**: Sets the actor type for the entire app. All `app.do()` calls will require `target.actor` to satisfy `T`. Define `AppActor` extending `Actor` in schemas.ts.

> **Note:** When using reactions with `drain()`, you must call `app.correlate()` before `app.drain()` to discover target streams. Use the `scheduleDrain()` pattern for non-blocking, debounced processing that signals the UI only after all reactions complete. See [act-api.md](act-api.md) §6.
> **Note:** When using reactions with `drain()`, you must call `app.correlate()` before `app.drain()` to discover target streams. Use `app.settle()` for non-blocking, debounced correlate→drain that emits a `"settled"` event when the system is consistent. See [act-api.md](act-api.md) §6.

### Step 7 — tRPC API (in `packages/app/src/api/`)

Expand All @@ -337,16 +337,16 @@ Decompose the API into focused route modules. See [monorepo-template.md](monorep
| `trpc.ts` | tRPC init + middleware | `publicProcedure`, `authedProcedure`, `adminProcedure` |
| `context.ts` | Request context | Extract `AppActor` from Bearer token via `verifyToken()` |
| `auth.ts` | Token + password crypto | HMAC-signed tokens, scrypt password hashing (zero deps) |
| `helpers.ts` | `scheduleDrain()` + `eventBus` | Non-blocking, debounced drain; signals SSE after reactions complete |
| `helpers.ts` | Event serialization | `serializeEvents()` for SSE payloads |
| `auth.routes.ts` | Auth endpoints | login, signup, me, assignRole, listUsers |
| `domain.routes.ts` | Domain mutations + queries | `app.do()` + `scheduleDrain()` per mutation; query projections |
| `events.routes.ts` | SSE subscription | `tracked()` yields with `eventBus` for live updates |
| `events.routes.ts` | SSE subscription | `tracked()` yields with `app.on("settled")` for live updates |
| `index.ts` | Router composition | `t.mergeRouters()` + export `AppRouter` type |

**Key rules:**
- Call `scheduleDrain()` after every `app.do()` in mutations — non-blocking, returns immediately
- Call `app.settle()` after every `app.do()` in mutations — non-blocking, returns immediately
- Use `authedProcedure` / `adminProcedure` for authorization (middleware narrows `ctx.actor`)
- SSE uses `eventBus.emit("committed")` which fires only after `correlate()` + `drain()` complete
- SSE uses `app.on("settled", ...)` which fires only after `correlate()` + `drain()` complete

### Step 8 — React Client (in `packages/app/src/client/`)

Expand Down Expand Up @@ -416,7 +416,7 @@ See [monorepo-template.md](monorepo-template.md) for complete `package.json` fil
9. **ESM only** — All packages use `"type": "module"` and `.js` import extensions.
10. **Single-key records** — `state({})`, `.on({})`, `.emits({})` take single-key records. Multi-key throws at runtime.
11. **API decomposition** — Split tRPC router into focused route files (`auth.routes.ts`, `domain.routes.ts`, `events.routes.ts`). Keep `trpc.ts` for init + middleware, `context.ts` for request context, `helpers.ts` for shared utilities.
12. **scheduleDrain() after mutations** — Call `scheduleDrain()` after every `app.do()` in API mutations. This is non-blocking (returns immediately), debounced (coalesces rapid commits), and signals the UI via eventBus only after all reactions and projections are fully processed.
12. **settle() after mutations** — Call `app.settle()` after every `app.do()` in API mutations. This is non-blocking (returns immediately), debounced (coalesces rapid commits), and emits a `"settled"` event only after all correlate/drain iterations and projections are fully processed.

## Error Handling

Expand All @@ -441,7 +441,7 @@ For production deployment (PostgresStore, background processing, automated jobs)
- [ ] Domain package has no infrastructure dependencies
- [ ] All packages use `"type": "module"` and TypeScript strict mode
- [ ] tRPC API decomposed into route files with typed middleware
- [ ] SSE subscription wired with eventBus for live events
- [ ] `scheduleDrain()` called after mutations (non-blocking, debounced, signals UI after reactions)
- [ ] SSE subscription wired with `app.on("settled")` for live events
- [ ] `app.settle()` called after mutations (non-blocking, debounced, emits "settled" after reactions)
- [ ] Client uses `splitLink` for SSE subscriptions + HTTP for mutations/queries
- [ ] Types compile with `npx tsc --noEmit`
65 changes: 27 additions & 38 deletions .claude/skills/scaffold-act-app/act-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Do NOT use `z.object({})` — use `ZodEmpty` for consistency and correct validat
.to("fixed-stream-name") // static target
```

## 6. Correlate Before Drain — scheduleDrain() Pattern
## 6. Correlate Before Drain — settle() Pattern

`app.correlate()` scans events, resolves reaction targets, and **registers new streams** with the store via `store().lease()`. Without this step, `drain()` won't find streams to process.

Expand All @@ -134,47 +134,36 @@ await app.drain();
await app.drain(); // returns empty results
```

**scheduleDrain()** — the production pattern for API mutations. Non-blocking, debounced, signals UI only after reactions complete:
**`app.settle()`** — the production pattern for API mutations. Non-blocking, debounced, runs correlate→drain in a loop, emits `"settled"` when the system reaches a consistent state:

```typescript
import { EventEmitter } from "node:events";

// Event bus for SSE — only signals AFTER reactions complete
export const eventBus = new EventEmitter();
eventBus.setMaxListeners(100);

let drainTimer: ReturnType<typeof setTimeout> | null = null;
let draining = false;

async function executeDrain() {
if (draining) return; // skip if already running
draining = true;
try {
for (let i = 0; i < 2; i++) { // 2 iterations for chained reactions
const { leased } = await app.correlate({ after: -1, limit: 100 });
if (leased.length === 0) break;
await app.drain({ streamLimit: 10, eventLimit: 100 });
}
} finally {
draining = false;
}
eventBus.emit("committed"); // signal UI AFTER reactions + projections done
}
// In API mutations — fire-and-forget
await app.do("CreateItem", target, input);
app.settle(); // non-blocking, debounced — UI notified via "settled" event

// Subscribe to settled event for SSE notifications
app.on("settled", (drain) => {
// drain has { fetched, leased, acked, blocked }
// notify SSE clients that the system is consistent
});
```

export function scheduleDrain() {
if (drainTimer) clearTimeout(drainTimer);
drainTimer = setTimeout(() => {
drainTimer = null;
executeDrain().catch(console.error);
}, 10); // 10ms debounce coalesces rapid commits
}
**`settle()` options:**
```typescript
app.settle({
debounceMs: 10, // debounce window (default: 10ms)
correlate: { after: -1, limit: 100 }, // correlate query (default)
maxPasses: 5, // max correlate→drain loops (default: 5)
streamLimit: 10, // passed to drain()
eventLimit: 100, // passed to drain()
});
```

**Key design:**
- **Non-blocking**: `scheduleDrain()` returns immediately — mutations don't wait for drain
- **Debounced**: Multiple rapid `app.do()` calls coalesce into one drain cycle (10ms window)
- **Guarded**: `draining` flag prevents concurrent drain cycles
- **UI signal after completion**: `eventBus.emit("committed")` fires only after all correlate/drain iterations finish, so SSE clients see a consistent view
- **Non-blocking**: `settle()` returns immediately — mutations don't wait for drain
- **Debounced**: Multiple rapid `app.do()` calls coalesce into one settle cycle (10ms window)
- **Guarded**: Internal `_settling` flag prevents concurrent settle cycles
- **Lifecycle event**: `"settled"` fires only after all correlate/drain iterations finish, so SSE clients see a consistent view

**In tests:** Call `correlate()` + `drain()` directly (synchronous, no debounce):
```typescript
Expand All @@ -185,11 +174,11 @@ it("should process reactions", async () => {
});
```

**In API mutations:** Call `scheduleDrain()` and return immediately:
**In API mutations:** Call `settle()` and return immediately:
```typescript
CreateItem: authedProcedure.mutation(async ({ input, ctx }) => {
await app.do("CreateItem", target, input);
scheduleDrain(); // fire-and-forget — UI notified via SSE
app.settle(); // fire-and-forget — UI notified via "settled" event
return { success: true };
});
```
Expand Down
58 changes: 9 additions & 49 deletions .claude/skills/scaffold-act-app/monorepo-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,9 @@ export function verifyPassword(password: string, stored: string): boolean {
}
```

### packages/app/src/api/helpers.ts (debounced drainAll + event bus)
### packages/app/src/api/helpers.ts (event serialization)

```typescript
import { app } from "@my-app/domain";
import { EventEmitter } from "node:events";

export type SerializedEvent = {
id: number;
name: string;
Expand All @@ -340,49 +337,13 @@ export function serializeEvents(events: Array<{ id: number; name: unknown; data:
meta: e.meta as SerializedEvent["meta"],
}));
}

// Event bus for SSE subscriptions — only signals AFTER reactions complete
export const eventBus = new EventEmitter();
eventBus.setMaxListeners(100);

// Debounced, non-blocking drain — coalesces rapid commits into one drain cycle
let drainTimer: ReturnType<typeof setTimeout> | null = null;
let draining = false;

async function executeDrain() {
if (draining) return;
draining = true;
try {
for (let i = 0; i < 2; i++) {
const { leased } = await app.correlate({ after: -1, limit: 100 });
if (leased.length === 0) break;
await app.drain({ streamLimit: 10, eventLimit: 100 });
}
} finally {
draining = false;
}
// Signal UI AFTER all reactions and projections are processed
eventBus.emit("committed");
}

/**
* Schedule a drain cycle (debounced, non-blocking).
* Call this after app.do() — it returns immediately and drains in the background.
* The eventBus "committed" event fires only after reactions complete,
* so SSE clients receive a consistent view of the world.
*/
export function scheduleDrain() {
if (drainTimer) clearTimeout(drainTimer);
drainTimer = setTimeout(() => { drainTimer = null; executeDrain().catch(console.error); }, 10);
}
```

### packages/app/src/api/domain.routes.ts (domain commands + queries)

```typescript
import { app, getItems } from "@my-app/domain";
import { z } from "zod";
import { scheduleDrain } from "./helpers.js";
import { t, authedProcedure, adminProcedure, publicProcedure } from "./trpc.js";

export const domainRouter = t.router({
Expand All @@ -391,7 +352,7 @@ export const domainRouter = t.router({
.mutation(async ({ input, ctx }) => {
const target = { stream: crypto.randomUUID(), actor: ctx.actor };
await app.do("CreateItem", target, input);
scheduleDrain(); // non-blocking — UI notified via SSE after reactions complete
app.settle(); // non-blocking — UI notified via "settled" event after reactions complete
return { success: true, id: target.stream };
}),

Expand All @@ -404,7 +365,7 @@ export const domainRouter = t.router({
```typescript
import { app } from "@my-app/domain";
import { tracked } from "@trpc/server";
import { eventBus, serializeEvents } from "./helpers.js";
import { serializeEvents } from "./helpers.js";
import { t, publicProcedure } from "./trpc.js";

export const eventsRouter = t.router({
Expand All @@ -416,8 +377,8 @@ export const eventsRouter = t.router({

let lastId = existing.length > 0 ? existing[existing.length - 1].id : -1;
let notify: (() => void) | null = null;
const onCommitted = () => { if (notify) { notify(); notify = null; } };
eventBus.on("committed", onCommitted);
const onSettled = () => { if (notify) { notify(); notify = null; } };
app.on("settled", onSettled);

try {
while (!signal?.aborted) {
Expand All @@ -434,7 +395,7 @@ export const eventsRouter = t.router({
}
}
} finally {
eventBus.off("committed", onCommitted);
app.off("settled", onSettled);
}
}),
});
Expand All @@ -447,7 +408,6 @@ import { app, getAllUsers, getUserByEmail, getUserByProviderId, systemActor } fr
import { TRPCError } from "@trpc/server";
import { z } from "zod";
import { hashPassword, signToken, verifyPassword } from "./auth.js";
import { scheduleDrain } from "./helpers.js";
import { t, publicProcedure, authedProcedure, adminProcedure } from "./trpc.js";

export const authRouter = t.router({
Expand All @@ -472,7 +432,7 @@ export const authRouter = t.router({
await app.do("RegisterUser", { stream: input.username, actor: { ...systemActor, name: "AuthSystem" } }, {
email: input.username, name: input.name, provider: "local", providerId: input.username, passwordHash,
});
scheduleDrain();
app.settle();
const user = getUserByEmail(input.username);
if (!user) throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "Failed to register" });
const token = signToken({ email: user.email });
Expand All @@ -486,7 +446,7 @@ export const authRouter = t.router({
.mutation(async ({ input, ctx }) => {
if (!getUserByEmail(input.email)) throw new TRPCError({ code: "NOT_FOUND", message: "User not found" });
await app.do("AssignRole", { stream: input.email, actor: ctx.actor }, { role: input.role });
scheduleDrain();
app.settle();
return { success: true };
}),

Expand Down Expand Up @@ -572,7 +532,7 @@ const server = createHTTPServer({
const port = Number(process.env.PORT) || 4000;
server.listen(port);

await app.drain();
app.settle();
console.log(`Server listening on http://localhost:${port}`);
```

Expand Down
7 changes: 5 additions & 2 deletions .claude/skills/scaffold-act-app/production.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Production Deployment

Covers production-specific concerns beyond what's in [monorepo-template.md](monorepo-template.md). The template already provides `scheduleDrain()`, `eventBus`, auth crypto, `createContext()`, and dev seed scripts.
Covers production-specific concerns beyond what's in [monorepo-template.md](monorepo-template.md). The template already provides `app.settle()`, auth crypto, `createContext()`, and dev seed scripts.

## Switch to PostgreSQL

Expand All @@ -23,7 +23,7 @@ Install: `pnpm -F @my-app/app add @rotorsoft/act-pg`

## Background Correlation (Large-Scale)

For high-throughput deployments, use periodic background correlation instead of (or in addition to) the debounced `scheduleDrain()` from helpers.ts:
For high-throughput deployments, use periodic background correlation instead of (or in addition to) `app.settle()`:

```typescript
// Periodic correlation resolution — discovers new reaction streams every 3s
Expand Down Expand Up @@ -90,6 +90,9 @@ await app.drain({
// Observe all state changes
app.on("committed", (snapshots) => { /* log, metrics */ });

// React when system settles after settle() completes
app.on("settled", (drain) => { /* notify SSE clients, update caches */ });

// Catch reaction failures
app.on("blocked", (leases) => { /* alert on blocked streams */ });

Expand Down
4 changes: 4 additions & 0 deletions AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ const snapshot = await app.load(Counter, "counter1");

// Process reactions (event-driven workflows)
await app.drain({ streamLimit: 100, eventLimit: 1000 });

// Debounced correlate→drain for production (non-blocking, emits "settled" when done)
app.settle();
```

### Event Sourcing Model
Expand Down Expand Up @@ -254,6 +257,7 @@ Dynamic stream discovery through correlation metadata:
- Each action/event includes `correlation` (request ID) and `causation` (what triggered it)
- Reactions can discover new streams to process by querying uncommitted events
- `app.correlate()` - Manual correlation
- `app.settle()` - Debounced, non-blocking correlate→drain loop; emits `"settled"` when done
- `app.start_correlations()` - Periodic background correlation

### Invariants
Expand Down
7 changes: 6 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ const snapshot = await app.load(Counter, "counter1");

// Process reactions (event-driven workflows)
await app.drain({ streamLimit: 100, eventLimit: 1000 });

// Debounced correlate→drain for production (non-blocking, emits "settled" when done)
app.settle();
```

### Event Sourcing Model
Expand Down Expand Up @@ -256,9 +259,10 @@ Dynamic stream discovery through correlation metadata:
- Each action/event includes `correlation` (request ID) and `causation` (what triggered it)
- Reactions can discover new streams to process by querying uncommitted events
- `app.correlate()` - Manual correlation (must be called before `drain()` to discover target streams)
- `app.settle()` - Debounced, non-blocking correlate→drain loop; emits `"settled"` when done
- `app.start_correlations()` - Periodic background correlation

**Important:** `correlate()` must be called before `drain()` to register reaction target streams with the store. Without correlation, `drain()` has no streams to process. In tests: `await app.correlate(); await app.drain();`. In production: use `app.on("committed", () => { app.correlate().then(() => app.drain()).catch(console.error); })` or `app.start_correlations()` for background discovery.
**Important:** `correlate()` must be called before `drain()` to register reaction target streams with the store. Without correlation, `drain()` has no streams to process. In tests: `await app.correlate(); await app.drain();`. In production: use `app.settle()` for debounced correlate→drain with a `"settled"` lifecycle event, or `app.start_correlations()` for background discovery.

### Invariants

Expand Down Expand Up @@ -400,6 +404,7 @@ Both stream leasing and version-based optimistic concurrency must be implemented

- Set `LOG_LEVEL=debug` or `LOG_LEVEL=trace` for verbose logging
- Use `app.on("committed", ...)` to observe all state changes
- Use `app.on("settled", ...)` to react when `settle()` completes all correlate/drain passes
- Use `app.on("blocked", ...)` to catch reaction processing failures
- Query events directly: `await app.query_array({ stream: "mystream" })`

Expand Down
13 changes: 12 additions & 1 deletion libs/act/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,23 @@ While events within a stream are processed in order, multiple streams can be pro
The `drain` method processes pending reactions across all subscribed streams:

```typescript
// Process pending reactions
// Process pending reactions (synchronous, single cycle)
await app.drain({ streamLimit: 100, eventLimit: 1000 });

// Debounced correlate→drain for production (non-blocking, emits "settled" when done)
app.settle();

// Subscribe to the "settled" lifecycle event
app.on("settled", (drain) => {
// drain has { fetched, leased, acked, blocked }
// notify SSE clients, update caches, etc.
});
```

Drain cycles continue until all reactions have caught up to the latest events. Consumers only process new work — acknowledged events are skipped, and failed events are re-leased automatically.

The `settle()` method is the recommended production pattern — it debounces rapid commits (10ms default), runs correlate→drain in a loop until the system is consistent, and emits a `"settled"` event when done.

### Real-Time Notifications

When using the PostgreSQL backend, the store emits `NOTIFY` events on each commit, enabling consumers to react immediately via `LISTEN` rather than polling. This reduces latency and unnecessary database queries in production deployments.
Expand Down
Loading