diff --git a/examples/effect-worker-v2/src/jobs/basic-debounce.ts b/examples/effect-worker-v2/src/jobs/basic-debounce.ts index 6345404..2d6c764 100644 --- a/examples/effect-worker-v2/src/jobs/basic-debounce.ts +++ b/examples/effect-worker-v2/src/jobs/basic-debounce.ts @@ -52,7 +52,8 @@ export const debounceExample = Debounce.make({ Effect.gen(function* () { const state = yield* ctx.state; const eventCount = yield* ctx.eventCount; - + yield* Effect.tryPromise(() => fetch("http://localhost:3000/api/health")); + // yield* Effect.fail("Debounce job failed"); yield* Effect.log( `Debounce flushed! Events: ${eventCount}, Last action: ${state?.actionId}, Reason: ${ctx.flushReason}`, ); diff --git a/examples/effect-worker-v2/src/jobs/basic-task.ts b/examples/effect-worker-v2/src/jobs/basic-task.ts index 498199d..e66cf63 100644 --- a/examples/effect-worker-v2/src/jobs/basic-task.ts +++ b/examples/effect-worker-v2/src/jobs/basic-task.ts @@ -27,7 +27,7 @@ export const basicTask = Task.make({ currentRun: 0, }); - yield* ctx.schedule(Date.now() + 5000); + yield* ctx.schedule(Date.now() + 200); } }), diff --git a/examples/effect-worker-v2/src/jobs/heartbeat.ts b/examples/effect-worker-v2/src/jobs/heartbeat.ts index c8dc147..0cdd6ee 100644 --- a/examples/effect-worker-v2/src/jobs/heartbeat.ts +++ b/examples/effect-worker-v2/src/jobs/heartbeat.ts @@ -1,5 +1,5 @@ import { Continuous } from "@durable-effect/jobs"; -import { Effect, Schema } from "effect"; +import { Duration, Effect, Schema } from "effect"; // ============================================================================= // Heartbeat Job - Simple Continuous Job Example @@ -41,11 +41,16 @@ export const heartbeat = Continuous.make({ stateSchema: HeartbeatState, // Run every 10 seconds - schedule: Continuous.every("4 minutes"), + schedule: Continuous.every("4 seconds"), // Start immediately when created (default: true) startImmediately: true, + retry: { + maxAttempts: 3, + delay: Duration.seconds(1), + }, + // The execute function runs on each scheduled tick execute: (ctx) => Effect.gen(function* () { @@ -55,6 +60,7 @@ export const heartbeat = Continuous.make({ yield* Effect.log( `Heartbeat #${ctx.runCount}: ${currentState.name} - count=${currentState.count}`, ); + // yield* Effect.fail("Heartbeat job failed"); // Update state (Effect-based) yield* ctx.updateState((s) => ({ @@ -64,7 +70,7 @@ export const heartbeat = Continuous.make({ })); // Example: auto-terminate after 10 heartbeats - if (currentState.count >= 9) { + if (currentState.count >= 2000) { yield* Effect.log( `Heartbeat ${currentState.name} reached max count, terminating`, ); diff --git a/examples/effect-worker-v2/src/jobs/index.ts b/examples/effect-worker-v2/src/jobs/index.ts index 64ef80f..1b9062b 100644 --- a/examples/effect-worker-v2/src/jobs/index.ts +++ b/examples/effect-worker-v2/src/jobs/index.ts @@ -30,15 +30,24 @@ import { debounceExample } from "./basic-debounce"; * }); * ``` */ -export const { Jobs, JobsClient, registry } = createDurableJobs({ - // Task jobs - basicTask, +export const { Jobs, JobsClient, registry } = createDurableJobs( + { + // Task jobs + basicTask2: basicTask, - // Continuous jobs - heartbeat, - // Debounce job - debounceExample, -}); + // Continuous jobs + heartbeat2: heartbeat, + // Debounce job + debounceExample2: debounceExample, + }, + { + tracker: { + endpoint: "http://localhost:3000/sync", + env: "dev", + serviceKey: "my-service-key", + }, + }, +); // ============================================================================= // Type Exports diff --git a/examples/effect-worker-v2/src/routes/jobs/continuous.ts b/examples/effect-worker-v2/src/routes/jobs/continuous.ts index fd81b8d..3e90902 100644 --- a/examples/effect-worker-v2/src/routes/jobs/continuous.ts +++ b/examples/effect-worker-v2/src/routes/jobs/continuous.ts @@ -40,7 +40,7 @@ export const continuousRoutes = HttpRouter.empty.pipe( const body = yield* HttpServerRequest.schemaBodyJson(StartRequest); const now = Date.now(); - const result = yield* client.continuous("heartbeat").start({ + const result = yield* client.continuous("heartbeat2").start({ id: body.id, input: { name: body.name, @@ -69,7 +69,7 @@ export const continuousRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(TerminateRequest); - const result = yield* client.continuous("heartbeat").terminate(body.id, { + const result = yield* client.continuous("heartbeat2").terminate(body.id, { reason: body.reason, }); @@ -91,7 +91,7 @@ export const continuousRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.continuous("heartbeat").trigger(body.id); + const result = yield* client.continuous("heartbeat2").trigger(body.id); return yield* HttpServerResponse.json({ success: true, @@ -112,7 +112,7 @@ export const continuousRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.continuous("heartbeat").status(body.id); + const result = yield* client.continuous("heartbeat2").status(body.id); return yield* HttpServerResponse.json({ success: true, @@ -129,7 +129,7 @@ export const continuousRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.continuous("heartbeat").getState(body.id); + const result = yield* client.continuous("heartbeat2").getState(body.id); return yield* HttpServerResponse.json({ success: true, diff --git a/examples/effect-worker-v2/src/routes/jobs/debounce.ts b/examples/effect-worker-v2/src/routes/jobs/debounce.ts index f5888c6..7f8cbb2 100644 --- a/examples/effect-worker-v2/src/routes/jobs/debounce.ts +++ b/examples/effect-worker-v2/src/routes/jobs/debounce.ts @@ -35,7 +35,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(AddEventRequest); - const result = yield* client.debounce("debounceExample").add({ + const result = yield* client.debounce("debounceExample2").add({ id: body.id, event: { actionId: body.actionId, @@ -53,7 +53,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( created: result.created, }, }); - }) + }), ), // POST /debounce/flush - Manually flush the buffer @@ -64,7 +64,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.debounce("debounceExample").flush(body.id); + const result = yield* client.debounce("debounceExample2").flush(body.id); return yield* HttpServerResponse.json({ success: true, @@ -74,7 +74,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( reason: result.reason, }, }); - }) + }), ), // POST /debounce/clear - Clear the buffer without flushing @@ -85,7 +85,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.debounce("debounceExample").clear(body.id); + const result = yield* client.debounce("debounceExample2").clear(body.id); return yield* HttpServerResponse.json({ success: true, @@ -94,7 +94,7 @@ export const debounceRoutes = HttpRouter.empty.pipe( discardedEvents: result.discardedEvents, }, }); - }) + }), ), // POST /debounce/status - Get debounce status @@ -105,13 +105,13 @@ export const debounceRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.debounce("debounceExample").status(body.id); + const result = yield* client.debounce("debounceExample2").status(body.id); return yield* HttpServerResponse.json({ success: true, result, }); - }) + }), ), // POST /debounce/state - Get debounce state @@ -122,7 +122,9 @@ export const debounceRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.debounce("debounceExample").getState(body.id); + const result = yield* client + .debounce("debounceExample2") + .getState(body.id); return yield* HttpServerResponse.json({ success: true, @@ -130,6 +132,6 @@ export const debounceRoutes = HttpRouter.empty.pipe( state: result.state, }, }); - }) - ) + }), + ), ); diff --git a/examples/effect-worker-v2/src/routes/jobs/task.ts b/examples/effect-worker-v2/src/routes/jobs/task.ts index 796c301..3491910 100644 --- a/examples/effect-worker-v2/src/routes/jobs/task.ts +++ b/examples/effect-worker-v2/src/routes/jobs/task.ts @@ -34,7 +34,7 @@ export const taskRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(SendEventRequest); - const result = yield* client.task("basicTask").send({ + const result = yield* client.task("basicTask2").send({ id: body.id, event: { targetRuns: body.targetRuns }, }); @@ -46,7 +46,7 @@ export const taskRoutes = HttpRouter.empty.pipe( scheduledAt: result.scheduledAt, }, }); - }) + }), ), // POST /task/status - Get task status @@ -57,13 +57,13 @@ export const taskRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.task("basicTask").status(body.id); + const result = yield* client.task("basicTask2").status(body.id); return yield* HttpServerResponse.json({ success: true, result, }); - }) + }), ), // POST /task/state - Get task state @@ -74,7 +74,7 @@ export const taskRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.task("basicTask").getState(body.id); + const result = yield* client.task("basicTask2").getState(body.id); return yield* HttpServerResponse.json({ success: true, @@ -84,7 +84,7 @@ export const taskRoutes = HttpRouter.empty.pipe( scheduledAt: result.scheduledAt, }, }); - }) + }), ), // POST /task/terminate - Terminate task @@ -95,7 +95,7 @@ export const taskRoutes = HttpRouter.empty.pipe( const client = JobsClient.fromBinding(env.JOBS); const body = yield* HttpServerRequest.schemaBodyJson(IdRequest); - const result = yield* client.task("basicTask").terminate(body.id); + const result = yield* client.task("basicTask2").terminate(body.id); return yield* HttpServerResponse.json({ success: true, @@ -104,6 +104,6 @@ export const taskRoutes = HttpRouter.empty.pipe( terminated: result.terminated, }, }); - }) - ) + }), + ), ); diff --git a/packages/core/package.json b/packages/core/package.json index 3557deb..83a1690 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@durable-effect/core", - "version": "0.0.1-next.13", + "version": "0.0.1-next.14", "type": "module", "main": "./dist/index.js", "types": "./dist/index.d.ts", diff --git a/packages/core/src/events.ts b/packages/core/src/events.ts index 85187e2..ba4de5e 100644 --- a/packages/core/src/events.ts +++ b/packages/core/src/events.ts @@ -55,9 +55,6 @@ const InternalWorkflowBaseFields = { executionId: Schema.optional(Schema.String), }; -/** @deprecated Use InternalWorkflowBaseFields instead */ -const InternalBaseEventFields = InternalWorkflowBaseFields; - export const InternalBaseEventSchema = Schema.Struct(InternalWorkflowBaseFields); export type InternalBaseEvent = Schema.Schema.Type; @@ -78,8 +75,10 @@ const InternalJobBaseFields = { ...SharedBaseFields, /** Event source discriminator */ source: Schema.Literal("job"), - /** Durable Object instance ID */ + /** Durable Object instance ID (format: {type}:{name}:{id}) */ instanceId: Schema.String, + /** User-provided ID for business logic correlation */ + id: Schema.optional(Schema.String), /** Job type discriminator */ jobType: Schema.Literal("continuous", "debounce", "task", "workerPool"), /** Job definition name */ @@ -105,9 +104,6 @@ const WireWorkflowBaseFields = { serviceKey: Schema.String, }; -/** @deprecated Use WireWorkflowBaseFields instead */ -const WireBaseEventFields = WireWorkflowBaseFields; - export const BaseEventSchema = Schema.Struct(WireWorkflowBaseFields); export type BaseEvent = Schema.Schema.Type; @@ -130,7 +126,7 @@ const WireJobBaseFields = { * Emitted when a workflow starts execution. */ export const InternalWorkflowStartedEventSchema = Schema.Struct({ - ...InternalBaseEventFields, + ...InternalWorkflowBaseFields, type: Schema.Literal("workflow.started"), input: Schema.Unknown, }); @@ -140,7 +136,7 @@ export type InternalWorkflowStartedEvent = Schema.Schema.Type; @@ -203,7 +199,7 @@ export type InternalWorkflowResumedEvent = Schema.Schema.Type; export const WorkflowQueuedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.queued"), input: Schema.Unknown, }); export type WorkflowQueuedEvent = Schema.Schema.Type; export const WorkflowCompletedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.completed"), completedSteps: Schema.Array(Schema.String), durationMs: Schema.Number, @@ -406,7 +402,7 @@ export const WorkflowCompletedEventSchema = Schema.Struct({ export type WorkflowCompletedEvent = Schema.Schema.Type; export const WorkflowFailedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.failed"), error: WorkflowErrorSchema, completedSteps: Schema.Array(Schema.String), @@ -414,7 +410,7 @@ export const WorkflowFailedEventSchema = Schema.Struct({ export type WorkflowFailedEvent = Schema.Schema.Type; export const WorkflowPausedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.paused"), reason: Schema.Literal("sleep", "retry"), resumeAt: Schema.optional(Schema.String), @@ -423,13 +419,13 @@ export const WorkflowPausedEventSchema = Schema.Struct({ export type WorkflowPausedEvent = Schema.Schema.Type; export const WorkflowResumedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.resumed"), }); export type WorkflowResumedEvent = Schema.Schema.Type; export const WorkflowCancelledEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("workflow.cancelled"), reason: Schema.optional(Schema.String), completedSteps: Schema.Array(Schema.String), @@ -437,7 +433,7 @@ export const WorkflowCancelledEventSchema = Schema.Struct({ export type WorkflowCancelledEvent = Schema.Schema.Type; export const StepStartedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("step.started"), stepName: Schema.String, attempt: Schema.Number, @@ -445,7 +441,7 @@ export const StepStartedEventSchema = Schema.Struct({ export type StepStartedEvent = Schema.Schema.Type; export const StepCompletedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("step.completed"), stepName: Schema.String, attempt: Schema.Number, @@ -455,7 +451,7 @@ export const StepCompletedEventSchema = Schema.Struct({ export type StepCompletedEvent = Schema.Schema.Type; export const StepFailedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("step.failed"), stepName: Schema.String, attempt: Schema.Number, @@ -465,7 +461,7 @@ export const StepFailedEventSchema = Schema.Struct({ export type StepFailedEvent = Schema.Schema.Type; export const RetryScheduledEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("retry.scheduled"), stepName: Schema.String, attempt: Schema.Number, @@ -475,7 +471,7 @@ export const RetryScheduledEventSchema = Schema.Struct({ export type RetryScheduledEvent = Schema.Schema.Type; export const RetryExhaustedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("retry.exhausted"), stepName: Schema.String, attempts: Schema.Number, @@ -483,7 +479,7 @@ export const RetryExhaustedEventSchema = Schema.Struct({ export type RetryExhaustedEvent = Schema.Schema.Type; export const SleepStartedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("sleep.started"), durationMs: Schema.Number, resumeAt: Schema.String, @@ -491,14 +487,14 @@ export const SleepStartedEventSchema = Schema.Struct({ export type SleepStartedEvent = Schema.Schema.Type; export const SleepCompletedEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("sleep.completed"), durationMs: Schema.Number, }); export type SleepCompletedEvent = Schema.Schema.Type; export const TimeoutSetEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("timeout.set"), stepName: Schema.String, deadline: Schema.String, @@ -507,7 +503,7 @@ export const TimeoutSetEventSchema = Schema.Struct({ export type TimeoutSetEvent = Schema.Schema.Type; export const TimeoutExceededEventSchema = Schema.Struct({ - ...WireBaseEventFields, + ...WireWorkflowBaseFields, type: Schema.Literal("timeout.exceeded"), stepName: Schema.String, timeoutMs: Schema.Number, @@ -865,11 +861,6 @@ export function createWorkflowBaseEvent( }; } -/** - * Create the base fields for an internal workflow event. - * @deprecated Use createWorkflowBaseEvent instead - */ -export const createBaseEvent = createWorkflowBaseEvent; /** * Create the base fields for an internal job event. @@ -878,11 +869,13 @@ export const createBaseEvent = createWorkflowBaseEvent; * @param instanceId - Durable Object instance ID * @param jobType - The job type (continuous, debounce, task, workerPool) * @param jobName - Job definition name + * @param id - Optional user-provided ID for business logic correlation */ export function createJobBaseEvent( instanceId: string, jobType: JobType, jobName: string, + id?: string, ): InternalJobBaseEvent { return { eventId: uuidv7(), @@ -891,6 +884,7 @@ export function createJobBaseEvent( instanceId, jobType, jobName, + ...(id !== undefined && { id }), }; } @@ -909,11 +903,6 @@ export function enrichWorkflowEvent( }; } -/** - * Enrich an internal workflow event with env and serviceKey for wire transmission. - * @deprecated Use enrichWorkflowEvent instead - */ -export const enrichEvent = enrichWorkflowEvent; /** * Enrich an internal job event with env and serviceKey for wire transmission. diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1253ba7..5c48c28 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -4,10 +4,8 @@ export { PauseSignal, StorageError, SchedulerError } from "./errors"; // Event Schemas export { // Helper functions - createBaseEvent, // @deprecated Use createWorkflowBaseEvent createWorkflowBaseEvent, createJobBaseEvent, - enrichEvent, // @deprecated Use enrichWorkflowEvent enrichWorkflowEvent, enrichJobEvent, diff --git a/packages/jobs/src/engine/engine.ts b/packages/jobs/src/engine/engine.ts index 24e32b3..0925f8d 100644 --- a/packages/jobs/src/engine/engine.ts +++ b/packages/jobs/src/engine/engine.ts @@ -60,11 +60,12 @@ export class DurableJobsEngine throw new Error("DurableJobsEngine requires __JOB_REGISTRY__ in env"); } - // Create the runtime with DO state and registry + // Create the runtime with DO state, registry, and optional tracker config // The runtime handles all Effect complexity this.#runtime = createJobsRuntime({ doState: state, registry: env.__JOB_REGISTRY__, + trackerConfig: env.__TRACKER_CONFIG__, }); } @@ -78,13 +79,8 @@ export class DurableJobsEngine * @returns The typed response */ async call(request: JobRequest): Promise { - // Delegate to runtime - const result = await this.#runtime.handle(request); - - // Fire-and-forget event flushing - don't block response - this.ctx.waitUntil(this.#runtime.flush()); - - return result; + // Delegate to runtime (flush happens inside runtime.handle) + return this.#runtime.handle(request); } /** @@ -94,10 +90,7 @@ export class DurableJobsEngine * then delegates to the appropriate handler. */ async alarm(): Promise { - // Delegate to runtime + // Delegate to runtime (flush happens inside runtime.handleAlarm) await this.#runtime.handleAlarm(); - - // Fire-and-forget event flushing - this.ctx.waitUntil(this.#runtime.flush()); } } diff --git a/packages/jobs/src/engine/types.ts b/packages/jobs/src/engine/types.ts index 2651bd0..c4acbbe 100644 --- a/packages/jobs/src/engine/types.ts +++ b/packages/jobs/src/engine/types.ts @@ -1,5 +1,6 @@ // packages/jobs/src/engine/types.ts +import type { HttpBatchTrackerConfig } from "@durable-effect/core"; import type { JobRequest, JobResponse } from "../runtime/types"; import type { RuntimeJobRegistry } from "../registry/typed"; @@ -17,6 +18,12 @@ export interface JobsEngineConfig { * The job registry containing all registered jobs. */ readonly __JOB_REGISTRY__: RuntimeJobRegistry; + + /** + * Optional tracker configuration. + * If provided, events will be sent to the configured endpoint. + */ + readonly __TRACKER_CONFIG__?: HttpBatchTrackerConfig; } // ============================================================================= diff --git a/packages/jobs/src/factory.ts b/packages/jobs/src/factory.ts index 85aeb8f..924f93b 100644 --- a/packages/jobs/src/factory.ts +++ b/packages/jobs/src/factory.ts @@ -2,6 +2,7 @@ import { Context } from "effect"; import { DurableObject } from "cloudflare:workers"; +import type { HttpBatchTrackerConfig } from "@durable-effect/core"; import { DurableJobsEngine, type JobsEngineConfig } from "./engine"; import { createTypedJobRegistry, @@ -16,6 +17,32 @@ import { type JobsClientFactory, } from "./client"; +// ============================================================================= +// Options +// ============================================================================= + +/** + * Options for creating durable jobs. + */ +export interface CreateDurableJobsOptions { + /** + * Optional tracker configuration. + * If provided, job events will be sent to the configured endpoint. + * + * @example + * ```ts + * const { Jobs, JobsClient } = createDurableJobs(definitions, { + * tracker: { + * endpoint: "https://events.example.com/ingest", + * env: "production", + * serviceKey: "my-jobs-service", + * }, + * }); + * ``` + */ + readonly tracker?: HttpBatchTrackerConfig; +} + /** * Result of creating durable jobs. */ @@ -77,6 +104,7 @@ export type InferRegistryFromDefinitions< * - `registry`: The job registry (for advanced use cases) * * @param definitions - Object of job definitions (keys become job names) + * @param options - Optional configuration (tracker, etc.) * * @example * ```ts @@ -100,6 +128,12 @@ export type InferRegistryFromDefinitions< * // Create engine and client - keys become job names * const { Jobs, JobsClient } = createDurableJobs({ * tokenRefresher, + * }, { + * tracker: { + * endpoint: "https://events.example.com/ingest", + * env: "production", + * serviceKey: "my-jobs", + * }, * }); * * // Export for Cloudflare @@ -124,7 +158,8 @@ export type InferRegistryFromDefinitions< export function createDurableJobs< const T extends Record, >( - definitions: T + definitions: T, + options?: CreateDurableJobsOptions ): CreateDurableJobsResult { // Create typed registry from definitions (preserves literal keys) const registry = createTypedJobRegistry(definitions); @@ -135,10 +170,11 @@ export function createDurableJobs< // Create bound DO class with runtime registry injected class BoundJobsEngine extends DurableJobsEngine { constructor(state: DurableObjectState, env: unknown) { - // Inject runtime registry into environment + // Inject runtime registry and tracker config into environment const enrichedEnv: JobsEngineConfig = { ...(env as object), __JOB_REGISTRY__: runtimeRegistry, + __TRACKER_CONFIG__: options?.tracker, }; super(state, enrichedEnv); diff --git a/packages/jobs/src/handlers/continuous/handler.ts b/packages/jobs/src/handlers/continuous/handler.ts index 5d18cdf..3c7e24a 100644 --- a/packages/jobs/src/handlers/continuous/handler.ts +++ b/packages/jobs/src/handlers/continuous/handler.ts @@ -4,8 +4,12 @@ import { Context, Effect, Layer } from "effect"; import { RuntimeAdapter, StorageAdapter, + createJobBaseEvent, + emitEvent, type StorageError, type SchedulerError, + type InternalJobStartedEvent, + type InternalJobTerminatedEvent, } from "@durable-effect/core"; import { MetadataService, type JobStatus } from "../../services/metadata"; import { AlarmService } from "../../services/alarm"; @@ -96,7 +100,8 @@ export const ContinuousHandlerLayer = Layer.effect( const runExecution = ( def: ContinuousDefinition, - runCount: number + runCount: number, + id?: string ): Effect.Effect => execution.execute({ jobType: "continuous", @@ -104,6 +109,7 @@ export const ContinuousHandlerLayer = Layer.effect( schema: def.stateSchema, retryConfig: def.retry, runCount, + id, run: (ctx: ContinuousContext) => def.execute(ctx), createContext: (base) => { const proxyHolder: StateHolder = { @@ -144,8 +150,15 @@ export const ContinuousHandlerLayer = Layer.effect( }; } - yield* metadata.initialize("continuous", request.name); - + yield* metadata.initialize("continuous", request.name, request.id); + + // Emit job.started event + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "continuous", request.name, request.id), + type: "job.started" as const, + input: request.input, + } satisfies InternalJobStartedEvent); + // Initial state set // TODO: ExecutionService handles loading, but here we need to set INITIAL state // We can use execution service to "seed" state? No, we should just use storage directly for init. @@ -164,7 +177,7 @@ export const ContinuousHandlerLayer = Layer.effect( if (def.startImmediately !== false) { const runCount = yield* incrementRunCount(); - const result = yield* runExecution(def, runCount); + const result = yield* runExecution(def, runCount, request.id); if (result.terminated) { // Terminated - CleanupService has already purged everything @@ -213,6 +226,17 @@ export const ContinuousHandlerLayer = Layer.effect( }; } + // Get run count before deletion for event + const runCount = yield* getRunCount(); + + // Emit job.terminated event + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "continuous", existing.name, existing.id), + type: "job.terminated" as const, + reason: request.reason, + runCount, + } satisfies InternalJobTerminatedEvent); + // Cancel alarm yield* alarm.cancel(); @@ -249,7 +273,7 @@ export const ContinuousHandlerLayer = Layer.effect( yield* retryExecutor.reset().pipe(Effect.ignore); const runCount = yield* incrementRunCount(); - const result = yield* runExecution(def, runCount); + const result = yield* runExecution(def, runCount, existing.id); if (result.terminated) { // Terminated - CleanupService has already purged everything @@ -380,7 +404,7 @@ export const ContinuousHandlerLayer = Layer.effect( runCount = yield* incrementRunCount(); } - const result = yield* runExecution(def, runCount); + const result = yield* runExecution(def, runCount, meta.id); if (result.retryScheduled) { return; diff --git a/packages/jobs/src/handlers/debounce/handler.ts b/packages/jobs/src/handlers/debounce/handler.ts index 1836049..be92802 100644 --- a/packages/jobs/src/handlers/debounce/handler.ts +++ b/packages/jobs/src/handlers/debounce/handler.ts @@ -4,8 +4,12 @@ import { Context, Effect, Layer, Schema } from "effect"; import { RuntimeAdapter, StorageAdapter, + createJobBaseEvent, + emitEvent, type StorageError, type SchedulerError, + type InternalDebounceStartedEvent, + type InternalDebounceFlushedEvent, } from "@durable-effect/core"; import { MetadataService } from "../../services/metadata"; import { AlarmService } from "../../services/alarm"; @@ -78,7 +82,8 @@ export const DebounceHandlerLayer = Layer.effect( const runFlush = ( def: DebounceDefinition, - flushReason: "maxEvents" | "flushAfter" | "manual" + flushReason: "maxEvents" | "flushAfter" | "manual", + id?: string ): Effect.Effect => execution.execute({ jobType: "debounce", @@ -86,6 +91,7 @@ export const DebounceHandlerLayer = Layer.effect( schema: def.stateSchema, retryConfig: def.retry, runCount: 0, // Debounce doesn't track runCount persistently in same way + id, run: (ctx: DebounceExecuteContext) => def.execute(ctx), createContext: (base) => { return { @@ -110,7 +116,7 @@ export const DebounceHandlerLayer = Layer.effect( const created = !meta; if (created) { - yield* metadata.initialize("debounce", request.name); + yield* metadata.initialize("debounce", request.name, request.id); yield* metadata.updateStatus("running"); yield* setStartedAt(); yield* setEventCount(0); @@ -149,15 +155,36 @@ export const DebounceHandlerLayer = Layer.effect( if (created) { yield* alarm.schedule(def.flushAfter); + + // Emit debounce.started event for first event + const scheduledAt = yield* alarm.getScheduled(); + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", request.name, request.id), + type: "debounce.started" as const, + flushAt: scheduledAt ? new Date(scheduledAt).toISOString() : new Date().toISOString(), + } satisfies InternalDebounceStartedEvent); } const willFlushAt = yield* alarm.getScheduled(); if (def.maxEvents !== undefined && nextCount >= def.maxEvents) { - // Immediate flush - const result = yield* runFlush(def, "maxEvents"); + // Get startedAt for duration calculation before flush + const startedAt = yield* getStartedAt(); + const durationMs = startedAt ? Date.now() - startedAt : 0; + + // Immediate flush - use request.id since we just initialized or it's the same + const result = yield* runFlush(def, "maxEvents", request.id); if (result.success) { + // Emit debounce.flushed event for maxEvents trigger + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", def.name, request.id), + type: "debounce.flushed" as const, + eventCount: nextCount, + reason: "maxEvents" as const, + durationMs, + } satisfies InternalDebounceFlushedEvent); + // Success - purge state after flush yield* purge(); } else if (result.terminated) { @@ -209,9 +236,22 @@ export const DebounceHandlerLayer = Layer.effect( }; } - const result = yield* runFlush(def, reason); + // Get startedAt for duration calculation + const startedAt = yield* getStartedAt(); + const durationMs = startedAt ? Date.now() - startedAt : 0; + + const result = yield* runFlush(def, reason, meta.id); if (result.success) { + // Emit debounce.flushed event + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", def.name, meta.id), + type: "debounce.flushed" as const, + eventCount, + reason: reason === "flushAfter" ? "timeout" : reason, + durationMs, + } satisfies InternalDebounceFlushedEvent); + // Success - purge state after flush yield* purge(); } else if (result.terminated) { diff --git a/packages/jobs/src/handlers/task/handler.ts b/packages/jobs/src/handlers/task/handler.ts index cac0104..c809eb1 100644 --- a/packages/jobs/src/handlers/task/handler.ts +++ b/packages/jobs/src/handlers/task/handler.ts @@ -4,8 +4,11 @@ import { Context, Effect, Layer, Schema } from "effect"; import { RuntimeAdapter, StorageAdapter, + createJobBaseEvent, + emitEvent, type StorageError, type SchedulerError, + type InternalTaskScheduledEvent, } from "@durable-effect/core"; import { MetadataService } from "../../services/metadata"; import { AlarmService } from "../../services/alarm"; @@ -117,6 +120,9 @@ export const TaskHandlerLayer = Layer.effect( const applyScheduleChanges = ( holder: TaskScheduleHolder, + jobName: string, + trigger: "event" | "execute" | "idle" | "error", + id?: string ): Effect.Effect => Effect.gen(function* () { if (!holder.dirty) return; @@ -127,6 +133,14 @@ export const TaskHandlerLayer = Layer.effect( } else if (holder.scheduledAt !== null) { yield* alarm.schedule(holder.scheduledAt); yield* setScheduledAt(holder.scheduledAt); + + // Emit task.scheduled event + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "task", jobName, id), + type: "task.scheduled" as const, + scheduledAt: new Date(holder.scheduledAt).toISOString(), + trigger, + } satisfies InternalTaskScheduledEvent); } }); @@ -134,14 +148,16 @@ export const TaskHandlerLayer = Layer.effect( def: StoredTaskDefinition, runCount: number, triggerType: "execute" | "onEvent", - event?: any - ) => + event?: any, + id?: string + ) => execution.execute({ jobType: "task", jobName: def.name, schema: def.stateSchema, retryConfig: undefined, runCount, + id, allowNullState: true, createContext: (base) => { const scheduleHolder: TaskScheduleHolder = { @@ -217,16 +233,19 @@ export const TaskHandlerLayer = Layer.effect( } // Apply schedule changes - yield* applyScheduleChanges(scheduleHolder); - + yield* applyScheduleChanges(scheduleHolder, def.name, triggerType === "onEvent" ? "event" : "execute", id); + // Maybe run onIdle if (def.onIdle) { // Check if scheduled - const scheduled = scheduleHolder.dirty - ? scheduleHolder.scheduledAt + const scheduled = scheduleHolder.dirty + ? scheduleHolder.scheduledAt : (yield* getScheduledAt()); - + if (scheduled === null) { + // Reset dirty flag before onIdle + scheduleHolder.dirty = false; + const idleCtx = createTaskIdleContext( proxyStateHolder, scheduleHolder, @@ -236,13 +255,13 @@ export const TaskHandlerLayer = Layer.effect( () => Effect.succeed(base.getState()), () => getScheduledAt().pipe(Effect.catchAll(() => Effect.succeed(null))) ); - + // onIdle doesn't have standard error handling in definition, it returns Effect // But let's wrap it just in case yield* def.onIdle(idleCtx); - + // Re-apply schedule changes - yield* applyScheduleChanges(scheduleHolder); + yield* applyScheduleChanges(scheduleHolder, def.name, "idle", id); } } }) @@ -258,7 +277,7 @@ export const TaskHandlerLayer = Layer.effect( const now = yield* runtime.now(); if (created) { - yield* metadata.initialize("task", request.name); + yield* metadata.initialize("task", request.name, request.id); yield* metadata.updateStatus("running"); yield* setCreatedAt(now); yield* storage.put(KEYS.TASK.EVENT_COUNT, 0); @@ -277,7 +296,7 @@ export const TaskHandlerLayer = Layer.effect( })) ); - yield* runExecution(def, 0, "onEvent", validatedEvent); + yield* runExecution(def, 0, "onEvent", validatedEvent, request.id); const scheduledAt = yield* getScheduledAt(); @@ -303,7 +322,7 @@ export const TaskHandlerLayer = Layer.effect( } yield* incrementExecuteCount(); - yield* runExecution(def, 0, "execute"); + yield* runExecution(def, 0, "execute", undefined, meta.id); return { _type: "task.trigger" as const, @@ -434,7 +453,7 @@ export const TaskHandlerLayer = Layer.effect( const def = yield* getDefinition(meta.name); yield* incrementExecuteCount(); - yield* runExecution(def, 0, "execute"); + yield* runExecution(def, 0, "execute", undefined, meta.id); }).pipe( Effect.catchTag("StorageError", (e) => Effect.fail( diff --git a/packages/jobs/src/index.ts b/packages/jobs/src/index.ts index 886ed3b..33da39e 100644 --- a/packages/jobs/src/index.ts +++ b/packages/jobs/src/index.ts @@ -25,6 +25,7 @@ export { createDurableJobs, type CreateDurableJobsResult, + type CreateDurableJobsOptions, type InferRegistryFromDefinitions, } from "./factory"; @@ -256,3 +257,24 @@ export { type DebounceResponse, JobHandlersLayer, } from "./handlers"; + +// ============================================================================= +// Tracker (Re-exported from @durable-effect/core) +// ============================================================================= + +export { + // Tracker config + type HttpBatchTrackerConfig, + // Job event types + type InternalJobEvent, + type InternalJobStartedEvent, + type InternalJobExecutedEvent, + type InternalJobFailedEvent, + type InternalJobRetryExhaustedEvent, + type InternalJobTerminatedEvent, + type InternalDebounceStartedEvent, + type InternalDebounceFlushedEvent, + type InternalTaskScheduledEvent, + // Wire event types (with env/serviceKey) + type JobEvent, +} from "@durable-effect/core"; diff --git a/packages/jobs/src/runtime/runtime.ts b/packages/jobs/src/runtime/runtime.ts index f3e17a7..163f0d9 100644 --- a/packages/jobs/src/runtime/runtime.ts +++ b/packages/jobs/src/runtime/runtime.ts @@ -1,7 +1,14 @@ // packages/jobs/src/runtime/runtime.ts import { Effect, Layer } from "effect"; -import { createDurableObjectRuntime, flushEvents, type RuntimeLayer } from "@durable-effect/core"; +import { + createDurableObjectRuntime, + flushEvents, + HttpBatchTrackerLayer, + NoopTrackerLayer, + type RuntimeLayer, + type HttpBatchTrackerConfig, +} from "@durable-effect/core"; import { RuntimeServicesLayer, RegistryServiceLayer, JobExecutionServiceLayer, CleanupServiceLayer } from "../services"; import { JobHandlersLayer, RetryExecutorLayer } from "../handlers"; import { Dispatcher, DispatcherLayer } from "./dispatcher"; @@ -26,6 +33,12 @@ export interface JobsRuntimeConfig { * Required for handlers to look up definitions. */ readonly registry: RuntimeJobRegistry; + + /** + * Optional tracker configuration. + * If provided, events will be sent to the configured endpoint. + */ + readonly trackerConfig?: HttpBatchTrackerConfig; } /** @@ -64,13 +77,22 @@ export interface JobsRuntime { */ function createDispatcherLayer( coreLayer: RuntimeLayer, - registry: RuntimeJobRegistry + registry: RuntimeJobRegistry, + trackerConfig?: HttpBatchTrackerConfig ): Layer.Layer { + // Tracker layer (if config provided, otherwise noop) + const trackerLayer = trackerConfig + ? HttpBatchTrackerLayer(trackerConfig) + : NoopTrackerLayer; + + // Base layer with core + tracker + const baseLayer = Layer.provideMerge(coreLayer, trackerLayer); + // Registry layer const registryLayer = RegistryServiceLayer(registry); // Runtime services layer (MetadataService, AlarmService, IdempotencyService) - const servicesLayer = RuntimeServicesLayer.pipe(Layer.provideMerge(coreLayer)); + const servicesLayer = RuntimeServicesLayer.pipe(Layer.provideMerge(baseLayer)); // Cleanup service layer (depends on AlarmService and StorageAdapter) const cleanupLayer = CleanupServiceLayer.pipe(Layer.provideMerge(servicesLayer)); @@ -82,7 +104,7 @@ function createDispatcherLayer( const executionLayer = JobExecutionServiceLayer.pipe( Layer.provideMerge(retryLayer), Layer.provideMerge(cleanupLayer), - Layer.provideMerge(coreLayer) + Layer.provideMerge(baseLayer) ); // Handlers layer (ContinuousHandler, DebounceHandler, etc.) @@ -115,7 +137,10 @@ function createRuntimeFromDispatcherLayer( runEffect( Effect.gen(function* () { const dispatcher = yield* Dispatcher; - return yield* dispatcher.handle(request); + const result = yield* dispatcher.handle(request); + // Flush events in the same execution context to preserve tracker instance + yield* flushEvents; + return result; }) ), @@ -124,10 +149,13 @@ function createRuntimeFromDispatcherLayer( Effect.gen(function* () { const dispatcher = yield* Dispatcher; yield* dispatcher.handleAlarm(); + // Flush events in the same execution context to preserve tracker instance + yield* flushEvents; }) ), - flush: () => runEffect(flushEvents), + // Kept for backwards compatibility but no longer needed + flush: () => Effect.runPromise(Effect.void), }; } @@ -175,7 +203,11 @@ export function createJobsRuntime( } const coreLayer = createDurableObjectRuntime(config.doState); - const dispatcherLayer = createDispatcherLayer(coreLayer, config.registry); + const dispatcherLayer = createDispatcherLayer( + coreLayer, + config.registry, + config.trackerConfig + ); return createRuntimeFromDispatcherLayer(dispatcherLayer); } @@ -201,9 +233,10 @@ export function createJobsRuntime( */ export function createJobsRuntimeFromLayer( coreLayer: RuntimeLayer, - registry: RuntimeJobRegistry + registry: RuntimeJobRegistry, + trackerConfig?: HttpBatchTrackerConfig ): JobsRuntime { - const dispatcherLayer = createDispatcherLayer(coreLayer, registry); + const dispatcherLayer = createDispatcherLayer(coreLayer, registry, trackerConfig); return createRuntimeFromDispatcherLayer(dispatcherLayer); } diff --git a/packages/jobs/src/services/execution.ts b/packages/jobs/src/services/execution.ts index 2b93203..9289069 100644 --- a/packages/jobs/src/services/execution.ts +++ b/packages/jobs/src/services/execution.ts @@ -1,7 +1,16 @@ // packages/jobs/src/services/execution.ts import { Context, Effect, Layer, type Schema } from "effect"; -import { RuntimeAdapter, StorageAdapter } from "@durable-effect/core"; +import { + RuntimeAdapter, + StorageAdapter, + createJobBaseEvent, + emitEvent, + type InternalJobExecutedEvent, + type InternalJobFailedEvent, + type InternalJobRetryExhaustedEvent, + type JobType, +} from "@durable-effect/core"; import { RetryExecutor, RetryExhaustedSignal, @@ -42,7 +51,9 @@ export interface OnRetryExhaustedContext { /** Terminate the job - cancel alarm, delete all storage */ readonly terminate: () => Effect.Effect; /** Reschedule execution - reset retry count, try again later */ - readonly reschedule: (delay: import("effect").Duration.DurationInput) => Effect.Effect; + readonly reschedule: ( + delay: import("effect").Duration.DurationInput, + ) => Effect.Effect; /** Internal: track if terminate was called */ readonly _terminated: boolean; /** Internal: track if reschedule was called */ @@ -56,6 +67,8 @@ export interface ExecuteOptions { readonly retryConfig?: JobRetryConfig; readonly runCount?: number; readonly allowNullState?: boolean; + /** User-provided ID for business logic correlation (included in events) */ + readonly id?: string; readonly run: (ctx: Ctx) => Effect.Effect; readonly createContext: (base: ExecutionContextBase) => Ctx; @@ -69,7 +82,7 @@ export interface ExecuteOptions { */ readonly onRetryExhausted?: ( error: E, - ctx: OnRetryExhaustedContext + ctx: OnRetryExhaustedContext, ) => Effect.Effect; } @@ -84,7 +97,7 @@ export interface ExecutionResult { export interface JobExecutionServiceI { readonly execute: ( - options: ExecuteOptions + options: ExecuteOptions, ) => Effect.Effect; } @@ -93,7 +106,7 @@ export interface JobExecutionServiceI { // ============================================================================= export class JobExecutionService extends Context.Tag( - "@durable-effect/jobs/JobExecutionService" + "@durable-effect/jobs/JobExecutionService", )() {} // ============================================================================= @@ -109,13 +122,13 @@ export const JobExecutionServiceLayer = Layer.effect( const cleanup = yield* CleanupService; const withStorage = ( - effect: Effect.Effect + effect: Effect.Effect, ): Effect.Effect> => - Effect.provideService(effect, StorageAdapter, storage) as any; + Effect.provideService(effect, StorageAdapter, storage); return { execute: ( - options: ExecuteOptions + options: ExecuteOptions, ): Effect.Effect => Effect.gen(function* () { const { @@ -126,10 +139,14 @@ export const JobExecutionServiceLayer = Layer.effect( run, createContext, onRetryExhausted, + id, } = options; + // Track execution start time for duration calculation + const startTime = Date.now(); + const stateService = yield* withStorage( - createEntityStateService(schema) + createEntityStateService(schema), ); const loadedState = yield* stateService.get().pipe( @@ -141,8 +158,8 @@ export const JobExecutionServiceLayer = Layer.effect( jobName, instanceId: runtime.instanceId, cause: e, - }) - ) + }), + ), ); // If no state and not allowed, treat as already terminated @@ -173,9 +190,9 @@ export const JobExecutionServiceLayer = Layer.effect( } }; - const attempt = yield* retryExecutor.getAttempt().pipe( - Effect.catchAll(() => Effect.succeed(1)) - ); + const attempt = yield* retryExecutor + .getAttempt() + .pipe(Effect.catchAll(() => Effect.succeed(1))); const isRetry = attempt > 1; const ctx = createContext({ @@ -192,11 +209,10 @@ export const JobExecutionServiceLayer = Layer.effect( // Build execution effect with optional retry const executionEffect = retryConfig - ? retryExecutor.executeWithRetry( - executeUserLogic, - retryConfig, - { jobType, jobName } - ) + ? retryExecutor.executeWithRetry(executeUserLogic, retryConfig, { + jobType, + jobName, + }) : executeUserLogic; // Result tracking @@ -223,7 +239,22 @@ export const JobExecutionServiceLayer = Layer.effect( // Handle retry scheduled signal if (error instanceof RetryScheduledSignal) { retryScheduled = true; - return Effect.void; + // Emit job.failed event with willRetry: true + return emitEvent({ + ...createJobBaseEvent( + runtime.instanceId, + jobType as JobType, + jobName, + id, + ), + type: "job.failed" as const, + error: { + message: `Retry scheduled for attempt ${error.attempt + 1}`, + }, + runCount: options.runCount ?? 0, + attempt: error.attempt, + willRetry: true, + } satisfies InternalJobFailedEvent); } // Handle terminate signal (from ctx.terminate()) @@ -231,7 +262,7 @@ export const JobExecutionServiceLayer = Layer.effect( terminated = true; terminateReason = error.reason; return cleanup.terminate().pipe( - Effect.catchAll(() => Effect.void) // Ignore cleanup errors + Effect.catchAll(() => Effect.void), // Ignore cleanup errors ); } @@ -239,6 +270,19 @@ export const JobExecutionServiceLayer = Layer.effect( if (error instanceof RetryExhaustedSignal) { retryExhausted = true; + // Emit job.retryExhausted event then continue with handler logic + const retryExhaustedEvent = emitEvent({ + ...createJobBaseEvent( + runtime.instanceId, + jobType, + jobName, + id, + ), + type: "job.retryExhausted" as const, + attempts: error.attempts, + reason: "max_attempts" as const, + } satisfies InternalJobRetryExhaustedEvent); + if (onRetryExhausted) { // User has handler - create context and call it const exhaustedCtx: OnRetryExhaustedContext = { @@ -255,9 +299,9 @@ export const JobExecutionServiceLayer = Layer.effect( Effect.sync(() => { (exhaustedCtx as any)._terminated = true; terminated = true; - }) + }), ), - Effect.catchAll(() => Effect.void) + Effect.catchAll(() => Effect.void), ), reschedule: (delay) => Effect.gen(function* () { @@ -265,43 +309,102 @@ export const JobExecutionServiceLayer = Layer.effect( yield* alarm.schedule(delay); (exhaustedCtx as any)._rescheduled = true; rescheduled = true; - }).pipe(Effect.catchAll(() => Effect.void)) as Effect.Effect, + }).pipe( + Effect.catchAll(() => Effect.void), + ) as Effect.Effect, }; - return (onRetryExhausted(error.lastError as E, exhaustedCtx) as Effect.Effect).pipe( - Effect.tap(() => { - // If user didn't take action, leave state intact (paused) - terminated = exhaustedCtx._terminated; - rescheduled = exhaustedCtx._rescheduled; - }), - Effect.catchAll(() => Effect.void) + return retryExhaustedEvent.pipe( + Effect.zipRight( + ( + onRetryExhausted( + error.lastError as E, + exhaustedCtx, + ) as Effect.Effect + ).pipe( + Effect.tap(() => { + // If user didn't take action, leave state intact (paused) + terminated = exhaustedCtx._terminated; + rescheduled = exhaustedCtx._rescheduled; + }), + Effect.catchAll(() => Effect.void), + ), + ), ); } // No handler - default behavior: terminate (purge everything) terminated = true; terminateReason = `Retry exhausted after ${error.attempts} attempts`; - return cleanup.terminate().pipe( - Effect.catchAll(() => Effect.void) + return retryExhaustedEvent.pipe( + Effect.zipRight( + cleanup + .terminate() + .pipe(Effect.catchAll(() => Effect.void)), + ), ); } - // Unknown error - wrap and fail - return Effect.fail(wrapError(error)); - }) + // Unknown/unhandled error - emit job.failed event with willRetry: false + const errorMessage = + error instanceof Error ? error.message : String(error); + const errorStack = + error instanceof Error ? error.stack : undefined; + return emitEvent({ + ...createJobBaseEvent( + runtime.instanceId, + jobType as JobType, + jobName, + id, + ), + type: "job.failed" as const, + error: { + message: errorMessage, + stack: errorStack, + }, + runCount: options.runCount ?? 0, + attempt, + willRetry: false, + } satisfies InternalJobFailedEvent).pipe( + Effect.zipRight(Effect.fail(wrapError(error))), + ); + }), ); // Determine success - if (!retryScheduled && !terminated && !rescheduled && !retryExhausted) { + if ( + !retryScheduled && + !terminated && + !rescheduled && + !retryExhausted + ) { success = true; + // Emit job.executed event on success + const durationMs = Date.now() - startTime; + yield* emitEvent({ + ...createJobBaseEvent( + runtime.instanceId, + jobType as JobType, + jobName, + id, + ), + type: "job.executed" as const, + runCount: options.runCount ?? 0, + durationMs, + attempt, + } satisfies InternalJobExecutedEvent); } // Save state if modified and not terminated - if (stateHolder.dirty && !terminated && !retryScheduled && stateHolder.current !== null) { - yield* stateService.set(stateHolder.current as S).pipe( - withStorage, - Effect.mapError(wrapError) - ); + if ( + stateHolder.dirty && + !terminated && + !retryScheduled && + stateHolder.current !== null + ) { + yield* stateService + .set(stateHolder.current as S) + .pipe(withStorage, Effect.mapError(wrapError)); } return { @@ -312,7 +415,7 @@ export const JobExecutionServiceLayer = Layer.effect( retryExhausted, terminateReason, }; - }) + }), }; - }) + }), ); diff --git a/packages/jobs/src/services/metadata.ts b/packages/jobs/src/services/metadata.ts index 1229ef9..6ab1ef2 100644 --- a/packages/jobs/src/services/metadata.ts +++ b/packages/jobs/src/services/metadata.ts @@ -35,6 +35,8 @@ export interface JobMetadata { readonly status: JobStatus; readonly createdAt: number; readonly updatedAt: number; + /** User-provided ID for business logic correlation */ + readonly id?: string; /** Reason for stopping/terminating (if applicable) */ readonly stopReason?: string; } @@ -55,10 +57,14 @@ export interface JobMetadata { export interface MetadataServiceI { /** * Initialize metadata for a new job instance. + * @param type - Job type + * @param name - Job definition name + * @param id - Optional user-provided ID for business logic correlation */ readonly initialize: ( type: JobType, - name: string + name: string, + id?: string ) => Effect.Effect; /** @@ -110,7 +116,7 @@ export const MetadataServiceLayer = Layer.effect( const runtime = yield* RuntimeAdapter; return { - initialize: (type: JobType, name: string) => + initialize: (type: JobType, name: string, id?: string) => Effect.gen(function* () { const now = yield* runtime.now(); const metadata: JobMetadata = { @@ -119,6 +125,7 @@ export const MetadataServiceLayer = Layer.effect( status: "initializing", createdAt: now, updatedAt: now, + ...(id !== undefined && { id }), }; yield* storage.put(KEYS.META, metadata); }), diff --git a/reports/060-event-flow-ui-design-guide.md b/reports/060-event-flow-ui-design-guide.md new file mode 100644 index 0000000..72cf9ce --- /dev/null +++ b/reports/060-event-flow-ui-design-guide.md @@ -0,0 +1,920 @@ +# Event Flow & UI Design Guide + +**For Product Engineers designing monitoring/observability UIs** + +This document describes all tracking events emitted by the durable-effect system, their data schemas, and recommended UI patterns for visualizing workflow and job execution. + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [Event Architecture](#event-architecture) +3. [Common Event Fields](#common-event-fields) +4. [Workflow Events](#workflow-events) +5. [Job Events](#job-events) +6. [Event Flow Diagrams](#event-flow-diagrams) +7. [UI Design Recommendations](#ui-design-recommendations) +8. [Database Schema Recommendations](#database-schema-recommendations) + +--- + +## Overview + +The system emits two categories of tracking events: + +| Source | Package | Purpose | +|--------|---------|---------| +| `workflow` | `@durable-effect/workflow` | Multi-step, long-running processes with steps, retries, sleeps | +| `job` | `@durable-effect/jobs` | Recurring/scheduled tasks (continuous, debounce, task) | + +Events are batched and sent via HTTP POST to a configured tracking endpoint in the format: + +```json +{ + "events": [ + { /* event 1 */ }, + { /* event 2 */ }, + ... + ] +} +``` + +--- + +## Event Architecture + +### Discriminator Pattern + +All events use a discriminated union pattern: + +```typescript +// Route by source first +if (event.source === "workflow") { + // Then by type: "workflow.started" | "step.completed" | ... +} else if (event.source === "job") { + // Then by type: "job.started" | "debounce.flushed" | ... +} +``` + +### Event ID & Deduplication + +Every event has a globally unique `eventId` (UUIDv7). UUIDv7 is: +- Lexicographically sortable by creation time +- Contains embedded timestamp for efficient time-range queries +- Safe for use as database primary key + +--- + +## Common Event Fields + +### Workflow Events Base Fields + +All workflow events include: + +```typescript +{ + eventId: string; // UUIDv7 - unique event ID + timestamp: string; // ISO 8601 - when event occurred + source: "workflow"; // Discriminator + workflowId: string; // Durable Object ID (unique per instance) + workflowName: string; // Definition name (e.g., "orderProcessing") + executionId?: string; // Optional user-provided correlation ID + env: string; // Environment (e.g., "production") + serviceKey: string; // Service identifier +} +``` + +### Job Events Base Fields + +All job events include: + +```typescript +{ + eventId: string; // UUIDv7 - unique event ID + timestamp: string; // ISO 8601 - when event occurred + source: "job"; // Discriminator + instanceId: string; // Durable Object ID (unique per instance) + id?: string; // User-provided ID for business logic correlation + jobType: "continuous" | "debounce" | "task" | "workerPool"; + jobName: string; // Definition name (e.g., "tokenRefresher") + env: string; // Environment + serviceKey: string; // Service identifier +} +``` + +--- + +## Workflow Events + +### Lifecycle Events + +#### `workflow.started` +**When:** Workflow begins synchronous execution via `run()` + +```typescript +{ + type: "workflow.started", + input: unknown, // The input payload provided to the workflow + // + base fields +} +``` + +**UI:** Show workflow as "Running" with start time. Display input in expandable panel. + +--- + +#### `workflow.queued` +**When:** Workflow queued for async execution via `runAsync()` + +```typescript +{ + type: "workflow.queued", + input: unknown, + // + base fields +} +``` + +**UI:** Show workflow as "Queued" with scheduled badge. + +--- + +#### `workflow.resumed` +**When:** Workflow resumes after a pause (sleep completed or retry ready) + +```typescript +{ + type: "workflow.resumed", + // + base fields +} +``` + +**UI:** Update status from "Paused" to "Running". Add timeline entry. + +--- + +#### `workflow.paused` +**When:** Workflow pauses for sleep or retry backoff + +```typescript +{ + type: "workflow.paused", + reason: "sleep" | "retry", + resumeAt?: string, // ISO timestamp when it will resume + stepName?: string, // Which step caused the pause (for retry) + // + base fields +} +``` + +**UI:** Show workflow as "Paused" with countdown to `resumeAt`. Highlight the step causing the pause. + +--- + +#### `workflow.completed` +**When:** Workflow finishes successfully + +```typescript +{ + type: "workflow.completed", + completedSteps: string[], // Names of all completed steps + durationMs: number, // Total execution time + // + base fields +} +``` + +**UI:** Show workflow as "Completed" (green). Display total duration and step count. + +--- + +#### `workflow.failed` +**When:** Workflow fails permanently (unrecoverable error or retries exhausted) + +```typescript +{ + type: "workflow.failed", + error: { + message: string, + stack?: string, + stepName?: string, // Which step failed + attempt?: number, // Which attempt + }, + completedSteps: string[], + // + base fields +} +``` + +**UI:** Show workflow as "Failed" (red). Highlight failed step. Show error message prominently. + +--- + +#### `workflow.cancelled` +**When:** Workflow manually cancelled + +```typescript +{ + type: "workflow.cancelled", + reason?: string, + completedSteps: string[], + // + base fields +} +``` + +**UI:** Show workflow as "Cancelled" (gray). Show reason if provided. + +--- + +### Step Events + +#### `step.started` +**When:** A step begins execution + +```typescript +{ + type: "step.started", + stepName: string, + attempt: number, // 1 = first attempt, 2 = first retry, etc. + // + base fields +} +``` + +**UI:** Show step as "Running". If `attempt > 1`, show retry badge. + +--- + +#### `step.completed` +**When:** A step completes successfully + +```typescript +{ + type: "step.completed", + stepName: string, + attempt: number, + durationMs: number, + cached: boolean, // True if result came from cache (replay) + // + base fields +} +``` + +**UI:** Show step as "Completed" (green). Show duration. If `cached`, show "Cached" badge (indicates replay). + +--- + +#### `step.failed` +**When:** A step fails + +```typescript +{ + type: "step.failed", + stepName: string, + attempt: number, + error: { + message: string, + stack?: string, + }, + willRetry: boolean, // True if retry will be attempted + // + base fields +} +``` + +**UI:** If `willRetry`, show step as "Retrying" (orange). Otherwise, show as "Failed" (red). + +--- + +### Retry Events + +#### `retry.scheduled` +**When:** A retry is scheduled for a failed step + +```typescript +{ + type: "retry.scheduled", + stepName: string, + attempt: number, // The attempt that failed + nextAttemptAt: string, // When retry will execute + delayMs: number, // Backoff delay + // + base fields +} +``` + +**UI:** Show countdown timer. Display retry strategy info (exponential backoff). + +--- + +#### `retry.exhausted` +**When:** All retry attempts exhausted for a step + +```typescript +{ + type: "retry.exhausted", + stepName: string, + attempts: number, // Total attempts made + // + base fields +} +``` + +**UI:** Show step as "Failed - Retries Exhausted" (red). Show total attempt count. + +--- + +### Sleep Events + +#### `sleep.started` +**When:** Workflow begins a sleep + +```typescript +{ + type: "sleep.started", + durationMs: number, + resumeAt: string, // When sleep ends + // + base fields +} +``` + +**UI:** Show countdown timer. Display as workflow "Sleeping". + +--- + +#### `sleep.completed` +**When:** Sleep ends and workflow continues + +```typescript +{ + type: "sleep.completed", + durationMs: number, + // + base fields +} +``` + +**UI:** Remove sleep indicator. Resume workflow timeline. + +--- + +### Timeout Events + +#### `timeout.set` +**When:** A step timeout is configured + +```typescript +{ + type: "timeout.set", + stepName: string, + deadline: string, // Absolute deadline timestamp + timeoutMs: number, // Original timeout duration + // + base fields +} +``` + +**UI:** Show timeout indicator on step with countdown. + +--- + +#### `timeout.exceeded` +**When:** A step exceeds its timeout + +```typescript +{ + type: "timeout.exceeded", + stepName: string, + timeoutMs: number, + // + base fields +} +``` + +**UI:** Show step as "Timed Out" (red). This leads to step.failed. + +--- + +## Job Events + +### General Lifecycle Events + +#### `job.started` +**When:** A new job instance is created + +```typescript +{ + type: "job.started", + input?: unknown, // Initial state/input + // + base fields (including jobType, jobName) +} +``` + +**UI:** Show job as "Started". Display job type badge (continuous/debounce/task). + +--- + +#### `job.executed` +**When:** A job execution completes successfully + +```typescript +{ + type: "job.executed", + runCount: number, // Total executions (1-indexed) + durationMs: number, // This execution's duration + attempt: number, // 1 = first attempt (no retries) + // + base fields +} +``` + +**UI:** Increment run counter. Show duration. For continuous jobs, this repeats indefinitely. + +--- + +#### `job.failed` +**When:** A job execution fails + +```typescript +{ + type: "job.failed", + error: { + message: string, + stack?: string, + }, + runCount: number, + attempt: number, + willRetry: boolean, + // + base fields +} +``` + +**UI:** If `willRetry`, show "Retrying". Otherwise, may lead to termination or exhaustion. + +--- + +#### `job.retryExhausted` +**When:** All retry attempts exhausted for a job + +```typescript +{ + type: "job.retryExhausted", + attempts: number, + reason: "max_attempts" | "max_duration", + // + base fields +} +``` + +**UI:** Show "Retries Exhausted" error state. Job may terminate or pause depending on handler. + +--- + +#### `job.terminated` +**When:** A job is terminated (manually or automatically) + +```typescript +{ + type: "job.terminated", + reason?: string, + runCount: number, // Total executions before termination + // + base fields +} +``` + +**UI:** Show job as "Terminated" (gray). Display final run count. + +--- + +### Debounce-Specific Events + +#### `debounce.started` +**When:** First event received for a debounce job + +```typescript +{ + type: "debounce.started", + flushAt: string, // When flush is scheduled + // + base fields (jobType = "debounce") +} +``` + +**UI:** Show "Collecting Events" with countdown to flush. + +--- + +#### `debounce.flushed` +**When:** Debounce job flushes (executes with collected events) + +```typescript +{ + type: "debounce.flushed", + eventCount: number, // Number of events in batch + reason: "timeout" | "maxEvents" | "manual", + durationMs: number, + // + base fields +} +``` + +**UI:** Show "Flushed" with event count and reason. Great for analytics charts. + +--- + +### Task-Specific Events + +#### `task.scheduled` +**When:** A task execution is scheduled + +```typescript +{ + type: "task.scheduled", + scheduledAt: string, // When execution will run + trigger: "event" | "execute" | "idle" | "error", + // + base fields (jobType = "task") +} +``` + +**UI:** Show "Scheduled" with countdown. Display trigger type. + +--- + +## Event Flow Diagrams + +### Successful Workflow Execution + +``` +workflow.started + │ + ├── step.started (step1, attempt=1) + │ └── step.completed (step1, cached=false) + │ + ├── step.started (step2, attempt=1) + │ └── step.completed (step2, cached=false) + │ + └── workflow.completed (durationMs=1234) +``` + +### Workflow with Retry + +``` +workflow.started + │ + ├── step.started (paymentStep, attempt=1) + │ ├── step.failed (willRetry=true) + │ └── retry.scheduled (nextAttemptAt=...) + │ + ├── workflow.paused (reason=retry) + │ + ├── workflow.resumed + │ + ├── step.started (paymentStep, attempt=2) + │ └── step.completed + │ + └── workflow.completed +``` + +### Workflow with Sleep + +``` +workflow.started + │ + ├── step.started (processOrder) + │ └── step.completed + │ + ├── sleep.started (durationMs=3600000, resumeAt=...) + │ + ├── workflow.paused (reason=sleep) + │ + ├── workflow.resumed + │ + ├── sleep.completed + │ + ├── step.started (sendConfirmation) + │ └── step.completed + │ + └── workflow.completed +``` + +### Continuous Job Lifecycle + +``` +job.started (jobType=continuous) + │ + ├── job.executed (runCount=1) + │ ... (waits for schedule) + ├── job.executed (runCount=2) + │ ... (waits for schedule) + ├── job.executed (runCount=3) + │ + └── job.terminated (runCount=3, reason="manual") +``` + +### Debounce Job Flow + +``` +debounce.started (flushAt=...) + │ + │ ... (events accumulate) + │ + ├── debounce.flushed (eventCount=5, reason=timeout) + │ + └── job.executed (runCount=1) +``` + +--- + +## UI Design Recommendations + +### 1. Workflow List View + +**Columns:** +- Workflow Name +- Instance ID (truncated) +- Status (Running/Paused/Completed/Failed/Cancelled) +- Progress (e.g., "3/5 steps") +- Duration +- Started At + +**Status Colors:** +- Running: Blue (animated) +- Paused: Orange +- Completed: Green +- Failed: Red +- Cancelled: Gray + +### 2. Workflow Detail View + +**Timeline/Gantt Chart:** +- Horizontal timeline showing step executions +- Color-coded by status +- Hover shows duration and attempt number +- Retry attempts shown as stacked bars + +**Step List:** +- Collapsible step details +- Show error messages inline +- Cache badge for replayed steps + +**Pause Indicators:** +- Visual break in timeline for sleep +- Countdown for scheduled resume + +### 3. Job Dashboard + +**Continuous Jobs:** +- Run counter (total executions) +- Last execution time +- Average duration chart +- Error rate metric + +**Debounce Jobs:** +- Events/flush histogram +- Average batch size +- Flush reason breakdown (timeout vs maxEvents) + +**Task Jobs:** +- Schedule visualization +- Next execution countdown +- Trigger type distribution + +### 4. Real-Time Updates + +Use WebSocket or SSE for live updates: +- Workflow status changes +- Step progress +- Countdown timers for pauses/sleeps + +### 5. Error Analysis + +**Error Aggregation:** +- Group by error message +- Show affected workflows/jobs +- Display stack traces in modal + +**Retry Analysis:** +- Retry success rate +- Average attempts before success +- Backoff delay distribution + +--- + +## Database Schema Recommendations + +The schema uses separate tables for workflow and job events, with JSONB payload columns for event-specific data. This design: +- Allows efficient queries within each event source +- Supports full event history with common indexed fields +- Stores event-specific data in flexible JSONB payload + +### Enums + +```sql +-- Workflow event types +CREATE TYPE de_workflow_event_type AS ENUM ( + 'workflow.queued', + 'workflow.started', + 'workflow.paused', + 'workflow.resumed', + 'workflow.completed', + 'workflow.failed', + 'workflow.cancelled', + 'step.started', + 'step.completed', + 'step.failed', + 'sleep.started', + 'sleep.completed', + 'timeout.set', + 'timeout.exceeded', + 'retry.scheduled', + 'retry.exhausted' +); + +-- Job types +CREATE TYPE de_job_type AS ENUM ( + 'continuous', + 'debounce', + 'task', + 'workerPool' +); + +-- Job event types +CREATE TYPE de_job_event_type AS ENUM ( + 'job.started', + 'job.executed', + 'job.failed', + 'job.retryExhausted', + 'job.terminated', + 'debounce.started', + 'debounce.flushed', + 'task.scheduled' +); +``` + +### Workflow Events Table + +```sql +CREATE TABLE de_workflow_events ( + id UUID PRIMARY KEY, -- UUIDv7 from eventId + event_type de_workflow_event_type NOT NULL, + workflow_id TEXT NOT NULL, + workflow_name TEXT NOT NULL, + + -- Environment & Service + env TEXT NOT NULL, + service_key TEXT NOT NULL, + + -- Event timestamp + event_timestamp TIMESTAMPTZ NOT NULL, + + -- Full event payload (event-specific data) + payload JSONB NOT NULL, + + -- Processing metadata + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + processed BOOLEAN NOT NULL DEFAULT FALSE, + processed_at TIMESTAMPTZ, + processing_error TEXT +); + +-- Indexes +CREATE INDEX de_workflow_events_workflow_type_id_idx ON de_workflow_events(workflow_id, event_type, id); +CREATE INDEX de_workflow_events_workflow_id_idx ON de_workflow_events(workflow_id, id); +CREATE INDEX de_workflow_events_env_service_idx ON de_workflow_events(env, service_key); +CREATE INDEX de_workflow_events_workflow_idx ON de_workflow_events(env, service_key, workflow_id, event_timestamp); +CREATE INDEX de_workflow_events_env_type_idx ON de_workflow_events(env, event_type); +CREATE INDEX de_workflow_events_unprocessed_idx ON de_workflow_events(received_at); +``` + +### Job Events Table + +```sql +CREATE TABLE de_job_events ( + id UUID PRIMARY KEY, -- UUIDv7 from eventId + event_type de_job_event_type NOT NULL, + instance_id TEXT NOT NULL, + user_provided_id TEXT, -- User-provided ID for business logic correlation + job_type de_job_type NOT NULL, + job_name TEXT NOT NULL, + + -- Environment & Service + env TEXT NOT NULL, + service_key TEXT NOT NULL, + + -- Event timestamp + event_timestamp TIMESTAMPTZ NOT NULL, + + -- Full event payload (event-specific data) + payload JSONB NOT NULL, + + -- Processing metadata + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + processed BOOLEAN NOT NULL DEFAULT FALSE, + processed_at TIMESTAMPTZ, + processing_error TEXT +); + +-- Indexes +CREATE INDEX de_job_events_instance_type_id_idx ON de_job_events(instance_id, event_type, id); +CREATE INDEX de_job_events_instance_id_idx ON de_job_events(instance_id, id); +CREATE INDEX de_job_events_env_service_idx ON de_job_events(env, service_key); +CREATE INDEX de_job_events_job_idx ON de_job_events(env, service_key, instance_id, event_timestamp); +CREATE INDEX de_job_events_env_type_idx ON de_job_events(env, event_type); +CREATE INDEX de_job_events_job_type_idx ON de_job_events(job_type); +CREATE INDEX de_job_events_unprocessed_idx ON de_job_events(received_at); +``` + +### Drizzle ORM Schema (TypeScript) + +The above SQL is generated from this Drizzle schema at `durable-effect-ui/worker/db/schema.ts`: + +```typescript +// Workflow events table +export const workflowEvents = pgTable("de_workflow_events", { + id: uuid("id").primaryKey(), + eventType: workflowEventTypeEnum("event_type").notNull(), + workflowId: text("workflow_id").notNull(), + workflowName: text("workflow_name").notNull(), + env: text("env").notNull(), + serviceKey: text("service_key").notNull(), + eventTimestamp: timestamp("event_timestamp", { withTimezone: true }).notNull(), + payload: jsonb("payload").notNull(), + receivedAt: timestamp("received_at", { withTimezone: true }).notNull().defaultNow(), + processed: boolean("processed").notNull().default(false), + processedAt: timestamp("processed_at", { withTimezone: true }), + processingError: text("processing_error"), +}); + +// Job events table +export const jobEvents = pgTable("de_job_events", { + id: uuid("id").primaryKey(), + eventType: jobEventTypeEnum("event_type").notNull(), + instanceId: text("instance_id").notNull(), + userProvidedId: text("user_provided_id"), // User-provided ID for business logic correlation + jobType: jobTypeEnum("job_type").notNull(), + jobName: text("job_name").notNull(), + env: text("env").notNull(), + serviceKey: text("service_key").notNull(), + eventTimestamp: timestamp("event_timestamp", { withTimezone: true }).notNull(), + payload: jsonb("payload").notNull(), + receivedAt: timestamp("received_at", { withTimezone: true }).notNull().defaultNow(), + processed: boolean("processed").notNull().default(false), + processedAt: timestamp("processed_at", { withTimezone: true }), + processingError: text("processing_error"), +}); +``` + +### Materialized State Tables + +For efficient querying of current workflow/job state, maintain materialized tables updated from events: + +**Workflows Table** (see `de_workflows` in schema.ts): +- Current workflow status, progress, error info +- Updated via event processing + +**Job Instances Table** (derived from job events): +- Current job status, run count, last execution +- Updated via event processing + +--- + +## TypeScript Schema Import + +For validation in your tracking service: + +```typescript +import { + // Combined schema (validates both workflow and job events) + TrackingEventSchema, + type TrackingEvent, + + // Individual schemas if you need to route separately + WorkflowEventSchema, + JobEventSchema, + + // Type helpers + type WorkflowEvent, + type JobEvent, +} from "@durable-effect/core"; + +// Validate incoming batch +const result = Schema.decodeUnknown( + Schema.Array(TrackingEventSchema) +)(body.events); +``` + +--- + +## Summary + +| Event Type | Source | When Emitted | Key Data | +|------------|--------|--------------|----------| +| `workflow.started` | workflow | Sync execution begins | input | +| `workflow.queued` | workflow | Async execution queued | input | +| `workflow.resumed` | workflow | After pause ends | - | +| `workflow.paused` | workflow | Sleep or retry wait | reason, resumeAt | +| `workflow.completed` | workflow | Success | completedSteps, durationMs | +| `workflow.failed` | workflow | Permanent failure | error, completedSteps | +| `workflow.cancelled` | workflow | Manual cancel | reason | +| `step.started` | workflow | Step begins | stepName, attempt | +| `step.completed` | workflow | Step succeeds | stepName, durationMs, cached | +| `step.failed` | workflow | Step fails | stepName, error, willRetry | +| `retry.scheduled` | workflow | Retry queued | stepName, nextAttemptAt, delayMs | +| `retry.exhausted` | workflow | No more retries | stepName, attempts | +| `sleep.started` | workflow | Sleep begins | durationMs, resumeAt | +| `sleep.completed` | workflow | Sleep ends | durationMs | +| `timeout.set` | workflow | Timeout configured | stepName, deadline | +| `timeout.exceeded` | workflow | Timeout fired | stepName, timeoutMs | +| `job.started` | job | Job instance created | input, jobType | +| `job.executed` | job | Execution succeeds | runCount, durationMs, attempt | +| `job.failed` | job | Execution fails | error, willRetry | +| `job.retryExhausted` | job | Retries exhausted | attempts, reason | +| `job.terminated` | job | Job ends | reason, runCount | +| `debounce.started` | job | First event received | flushAt | +| `debounce.flushed` | job | Batch executes | eventCount, reason, durationMs | +| `task.scheduled` | job | Execution scheduled | scheduledAt, trigger | diff --git a/reports/061-debounce-maxevents-flush-tracking-gap.md b/reports/061-debounce-maxevents-flush-tracking-gap.md new file mode 100644 index 0000000..3ea036f --- /dev/null +++ b/reports/061-debounce-maxevents-flush-tracking-gap.md @@ -0,0 +1,301 @@ +# Report 061: Debounce maxEvents Flush Tracking Gap + +## Problem Statement + +When a flood of events causes debounce to repeatedly hit `maxEvents`, the `debounce.flushed` event is **never emitted** for these maxEvents-triggered flushes. The tracker's `flushEvents` is called after each handle, but the tracking events themselves are incomplete - only `job.executed` is emitted, not `debounce.flushed`. + +This makes tracking/reporting incorrect because: +1. The UI/analytics cannot distinguish between timeout flushes and maxEvents flushes +2. The `debounce.flushed` event with `eventCount`, `reason`, and `durationMs` is never recorded for maxEvents triggers +3. The event flow diagram shows incomplete data for rapid debounce scenarios + +## Root Cause Analysis + +### Code Flow Comparison + +**When flush is triggered by `flushAfter` timeout (alarm) or manual flush:** + +``` +handleAlarm() / handle("flush") + → handleFlush(def, reason) + → runFlush(def, reason, meta.id) + → execution.execute() → emits job.executed ✓ + → emitEvent(debounce.flushed) ✓ + → purge() +``` + +**When flush is triggered by `maxEvents` in handleAdd:** + +``` +handle("add") + → handleAdd(def, request) + → runFlush(def, "maxEvents", request.id) // Direct call! + → execution.execute() → emits job.executed ✓ + → purge() + // NO debounce.flushed event emitted! ✗ +``` + +### The Gap + +In `handler.ts` lines 170-188, when maxEvents is reached: + +```typescript +if (def.maxEvents !== undefined && nextCount >= def.maxEvents) { + // Immediate flush - use request.id since we just initialized or it's the same + const result = yield* runFlush(def, "maxEvents", request.id); + + if (result.success) { + // Success - purge state after flush + yield* purge(); + } else if (result.terminated) { + // Terminated by CleanupService - already purged + } + + return { /* response */ }; +} +``` + +Notice that `handleAdd` calls `runFlush` directly instead of `handleFlush`. The `debounce.flushed` event is only emitted in `handleFlush` (lines 232-240): + +```typescript +if (result.success) { + // Emit debounce.flushed event + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", def.name, meta.id), + type: "debounce.flushed" as const, + eventCount, + reason: reason === "flushAfter" ? "timeout" : reason, + durationMs, + } satisfies InternalDebounceFlushedEvent); + + // Success - purge state after flush + yield* purge(); +} +``` + +## Event Flow Diagrams + +### Current (Broken) Flow with maxEvents + +``` +debounce.started (flushAt=...) + │ + │ ... (events 1-9 accumulate) + │ + └── job.executed (runCount=1) ← NO debounce.flushed event! + │ + └── (purge) + +debounce.started (flushAt=...) ← New cycle starts + │ + │ ... (events 11-19 accumulate) + │ + └── job.executed (runCount=1) ← NO debounce.flushed event! + │ + └── (purge) + +... continues for each batch of maxEvents ... +``` + +### Expected Flow with maxEvents + +``` +debounce.started (flushAt=...) + │ + │ ... (events 1-10 accumulate) + │ + ├── debounce.flushed (eventCount=10, reason=maxEvents, durationMs=123) + │ + └── job.executed (runCount=1) + │ + └── (purge) + +debounce.started (flushAt=...) ← New cycle starts + │ + │ ... (events 11-20 accumulate) + │ + ├── debounce.flushed (eventCount=10, reason=maxEvents, durationMs=456) + │ + └── job.executed (runCount=1) + │ + └── (purge) +``` + +## Impact + +1. **Analytics Gaps**: Cannot track maxEvents-triggered flushes in dashboards +2. **Incorrect Metrics**: Flush reason distribution is skewed (only shows timeout/manual) +3. **Missing Event Counts**: The `eventCount` for maxEvents batches is never recorded +4. **Duration Tracking**: `durationMs` for debounce collection period is not captured + +## Recommended Fix + +### Option A: Emit debounce.flushed in handleAdd (Minimal Change) + +Add the `debounce.flushed` event emission to `handleAdd` when maxEvents triggers: + +```typescript +// In handleAdd, after runFlush for maxEvents +if (def.maxEvents !== undefined && nextCount >= def.maxEvents) { + // Get startedAt for duration calculation + const startedAt = yield* getStartedAt(); + const durationMs = startedAt ? Date.now() - startedAt : 0; + + const result = yield* runFlush(def, "maxEvents", request.id); + + if (result.success) { + // Emit debounce.flushed event for maxEvents trigger + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", def.name, request.id), + type: "debounce.flushed" as const, + eventCount: nextCount, + reason: "maxEvents" as const, + durationMs, + } satisfies InternalDebounceFlushedEvent); + + // Success - purge state after flush + yield* purge(); + } + // ... +} +``` + +**Pros:** +- Minimal code change +- Direct fix for the issue +- No changes to existing handleFlush logic + +**Cons:** +- Duplicates event emission logic between handleAdd and handleFlush +- eventCount calculation slightly different (nextCount vs getEventCount) + +### Option B: Route maxEvents through handleFlush (DRY) + +Modify `handleAdd` to call `handleFlush` instead of `runFlush` directly: + +```typescript +if (def.maxEvents !== undefined && nextCount >= def.maxEvents) { + // Use handleFlush for consistency - it emits debounce.flushed + const result = yield* handleFlush(def, "maxEvents"); + + return { + _type: "debounce.add" as const, + instanceId: runtime.instanceId, + eventCount: nextCount, + willFlushAt: result.retryScheduled ? ((yield* alarm.getScheduled()) ?? null) : null, + created, + retryScheduled: result.retryScheduled, + }; +} +``` + +**Pros:** +- DRY - single point of event emission +- Consistent behavior between all flush triggers +- handleFlush already handles all edge cases + +**Cons:** +- handleFlush returns DebounceResponse type, need to transform +- handleFlush fetches eventCount/startedAt again (minor perf impact) +- handleFlush requires metadata to exist (should be fine here) + +### Option C: Extract shared flush logic (Most Robust) + +Create a shared internal function for the flush-and-emit logic: + +```typescript +const executeFlushWithTracking = ( + def: DebounceDefinition, + reason: "maxEvents" | "flushAfter" | "manual", + id?: string +): Effect.Effect => + Effect.gen(function* () { + const eventCount = yield* getEventCount(); + const startedAt = yield* getStartedAt(); + const durationMs = startedAt ? Date.now() - startedAt : 0; + + const result = yield* runFlush(def, reason, id); + + if (result.success) { + yield* emitEvent({ + ...createJobBaseEvent(runtime.instanceId, "debounce", def.name, id), + type: "debounce.flushed" as const, + eventCount, + reason: reason === "flushAfter" ? "timeout" : reason, + durationMs, + } satisfies InternalDebounceFlushedEvent); + } + + return result; + }); +``` + +Then use `executeFlushWithTracking` in both `handleAdd` (for maxEvents) and `handleFlush`. + +**Pros:** +- Single source of truth for flush tracking +- Easy to test +- Clear separation of concerns + +**Cons:** +- Requires refactoring handleFlush +- More code changes + +## Recommendation + +**Option A** is recommended for immediate fix as it has minimal blast radius and directly addresses the issue. The duplication is acceptable given the localized nature of the fix. + +**Option C** should be considered for a future cleanup pass to consolidate the flush logic. + +## Testing Verification + +Add a test case that verifies `debounce.flushed` is emitted for maxEvents: + +```typescript +it("emits debounce.flushed when maxEvents triggers flush", async () => { + const { layer: trackerLayer, handle } = await createTestTrackerLayer(); + + // Add maxEvents events + for (let i = 0; i < 10; i++) { + await runtime.handle({ type: "debounce", action: "add", name: "test", event: { id: i } }); + } + + const events = await handle.getEvents(); + + // Should have debounce.flushed with reason=maxEvents + expect(events).toContainEqual( + expect.objectContaining({ + type: "debounce.flushed", + reason: "maxEvents", + eventCount: 10, + }) + ); +}); +``` + +## Additional Considerations + +### Tracker Flush Timing + +The original concern about "flush is not being run until the flood of events finishes" may also be related to HTTP batching. The http-batch tracker sends events when: +1. Buffer reaches `batchSize` (default 100) +2. `flush()` is explicitly called + +Each `runtime.handle()` call does call `flushEvents` at the end. However, if the HTTP POST fails silently (errors are caught and ignored per line 150 in http-batch.ts), events could be lost during rapid floods. + +This is a separate concern from the missing `debounce.flushed` event and should be addressed if event loss is observed. + +### Event Ordering + +With the fix, events will be emitted in this order for maxEvents flush: +1. `job.executed` (from execution.execute) +2. `debounce.flushed` (new, after execution) + +This ordering matches the handleFlush behavior (execute first, then emit flush event). + +## Files to Modify + +| File | Change | +|------|--------| +| `packages/jobs/src/handlers/debounce/handler.ts` | Add debounce.flushed emission in handleAdd for maxEvents | +| `packages/jobs/test/debounce.test.ts` | Add test for maxEvents tracking |