From 351e08fdc8b81afeac332e89bc621141d26cd383 Mon Sep 17 00:00:00 2001 From: Matthew Sessions Date: Tue, 6 Jan 2026 10:23:56 -0700 Subject: [PATCH] added state to emmit --- packages/core/package.json | 2 +- packages/core/src/events.ts | 6 + packages/jobs/package.json | 3 +- packages/jobs/src/services/execution.ts | 12 + .../jobs/test/handlers/continuous.test.ts | 163 ++++++-- packages/jobs/test/handlers/tracking.test.ts | 375 ++++++++++++++++++ packages/jobs/test/registry.test.ts | 13 +- packages/jobs/test/services/metadata.test.ts | 6 +- packages/jobs/tsconfig.test.json | 8 + 9 files changed, 537 insertions(+), 51 deletions(-) create mode 100644 packages/jobs/test/handlers/tracking.test.ts create mode 100644 packages/jobs/tsconfig.test.json diff --git a/packages/core/package.json b/packages/core/package.json index 83a1690..d889cf8 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@durable-effect/core", - "version": "0.0.1-next.14", + "version": "0.0.1-next.15", "type": "module", "main": "./dist/index.js", "types": "./dist/index.d.ts", diff --git a/packages/core/src/events.ts b/packages/core/src/events.ts index ba4de5e..dc01ac0 100644 --- a/packages/core/src/events.ts +++ b/packages/core/src/events.ts @@ -588,6 +588,8 @@ export const InternalJobExecutedEventSchema = Schema.Struct({ durationMs: Schema.Number, /** Current retry attempt (1 = first attempt) */ attempt: Schema.Number, + /** State snapshot before execution (for tracking/debugging) */ + preExecutionState: Schema.optional(Schema.Unknown), }); export type InternalJobExecutedEvent = Schema.Schema.Type; @@ -604,6 +606,8 @@ export const InternalJobFailedEventSchema = Schema.Struct({ attempt: Schema.Number, /** Whether a retry will be attempted */ willRetry: Schema.Boolean, + /** State snapshot before execution (for tracking/debugging) */ + preExecutionState: Schema.optional(Schema.Unknown), }); export type InternalJobFailedEvent = Schema.Schema.Type; @@ -728,6 +732,7 @@ export const JobExecutedEventSchema = Schema.Struct({ runCount: Schema.Number, durationMs: Schema.Number, attempt: Schema.Number, + preExecutionState: Schema.optional(Schema.Unknown), }); export type JobExecutedEvent = Schema.Schema.Type; @@ -738,6 +743,7 @@ export const JobFailedEventSchema = Schema.Struct({ runCount: Schema.Number, attempt: Schema.Number, willRetry: Schema.Boolean, + preExecutionState: Schema.optional(Schema.Unknown), }); export type JobFailedEvent = Schema.Schema.Type; diff --git a/packages/jobs/package.json b/packages/jobs/package.json index 67a5fc9..40cc43e 100644 --- a/packages/jobs/package.json +++ b/packages/jobs/package.json @@ -1,6 +1,6 @@ { "name": "@durable-effect/jobs", - "version": "0.0.1-next.6", + "version": "0.0.1-next.7", "type": "module", "main": "./dist/index.js", "types": "./dist/index.d.ts", @@ -17,6 +17,7 @@ "build": "tsc", "clean": "rm -rf dist", "typecheck": "tsc --noEmit", + "typecheck:test": "tsc --noEmit --project tsconfig.test.json", "test": "vitest run", "test:watch": "vitest" }, diff --git a/packages/jobs/src/services/execution.ts b/packages/jobs/src/services/execution.ts index 3ec3928..3fb97e4 100644 --- a/packages/jobs/src/services/execution.ts +++ b/packages/jobs/src/services/execution.ts @@ -45,6 +45,8 @@ export interface ExecuteOptions { readonly allowNullState?: boolean; /** User-provided ID for business logic correlation (included in events) */ readonly id?: string; + /** Include pre-execution state in tracking events (default: true) */ + readonly includeStateInEvents?: boolean; readonly run: (ctx: Ctx) => Effect.Effect; readonly createContext: (base: ExecutionContextBase) => Ctx; @@ -104,6 +106,7 @@ export const JobExecutionServiceLayer = Layer.effect( run, createContext, id, + includeStateInEvents = true, } = options; // Track execution start time for duration calculation @@ -137,6 +140,12 @@ export const JobExecutionServiceLayer = Layer.effect( }; } + // Capture pre-execution state snapshot for tracking events + // This is the state BEFORE user code runs + const preExecutionState = includeStateInEvents + ? loadedState + : undefined; + const stateHolder = { current: loadedState as S | null, dirty: false, @@ -220,6 +229,7 @@ export const JobExecutionServiceLayer = Layer.effect( runCount: options.runCount ?? 0, attempt: failedAttempt, willRetry: true, + ...(preExecutionState !== undefined && { preExecutionState }), } satisfies InternalJobFailedEvent); } @@ -281,6 +291,7 @@ export const JobExecutionServiceLayer = Layer.effect( runCount: options.runCount ?? 0, attempt, willRetry: false, + ...(preExecutionState !== undefined && { preExecutionState }), } satisfies InternalJobFailedEvent).pipe( Effect.zipRight(Effect.fail(wrapError(error))), ); @@ -308,6 +319,7 @@ export const JobExecutionServiceLayer = Layer.effect( runCount: options.runCount ?? 0, durationMs, attempt, + ...(preExecutionState !== undefined && { preExecutionState }), } satisfies InternalJobExecutedEvent); } diff --git a/packages/jobs/test/handlers/continuous.test.ts b/packages/jobs/test/handlers/continuous.test.ts index 5623b46..797c35b 100644 --- a/packages/jobs/test/handlers/continuous.test.ts +++ b/packages/jobs/test/handlers/continuous.test.ts @@ -108,8 +108,34 @@ const createTestRegistry = (): RuntimeJobRegistry => ({ } as Record, debounce: {} as Record, workerPool: {} as Record, + task: {} as Record, }); +// ============================================================================= +// Test Helpers +// ============================================================================= + +// Helper to run Effect with layer, bypassing strict R parameter checking +// This is needed because the test registry uses `as Record` which +// causes `any` to leak into Effect R parameters +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const runWithLayer = ( + effect: Effect.Effect, + layer: Layer.Layer +): Promise => + Effect.runPromise( + effect.pipe(Effect.provide(layer)) as Effect.Effect + ); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const runExitWithLayer = ( + effect: Effect.Effect, + layer: Layer.Layer +) => + Effect.runPromiseExit( + effect.pipe(Effect.provide(layer)) as Effect.Effect + ); + // ============================================================================= // Test Setup // ============================================================================= @@ -148,7 +174,7 @@ const createTestLayer = (initialTime = 1000000) => { Layer.provideMerge(servicesLayer), Layer.provideMerge(retryLayer), Layer.provideMerge(executionLayer) - ); + ) as Layer.Layer; return { layer: handlerLayer, time, handles, coreLayer }; }; @@ -168,16 +194,18 @@ describe("ContinuousHandler", () => { it("creates a new instance and executes immediately by default", async () => { const { layer } = createTestLayer(1000000); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; return yield* handler.handle({ type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.start"); @@ -196,7 +224,7 @@ describe("ContinuousHandler", () => { it("returns existing instance if already started", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -205,6 +233,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -213,9 +242,11 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 100, lastRun: null }, }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.start"); @@ -229,16 +260,18 @@ describe("ContinuousHandler", () => { it("respects startImmediately: false", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; return yield* handler.handle({ type: "continuous", action: "start", name: "no-immediate", + id: "no-immediate-1", input: { count: 0, lastRun: null }, }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.start"); @@ -253,7 +286,7 @@ describe("ContinuousHandler", () => { it("terminates a running instance and deletes all storage", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -262,6 +295,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -270,6 +304,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "user requested", }); @@ -278,10 +313,12 @@ describe("ContinuousHandler", () => { type: "continuous", action: "status", name: "counter", + id: "counter-1", }); return { terminateResult, statusResult }; - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result.terminateResult._type).toBe("continuous.terminate"); @@ -295,16 +332,18 @@ describe("ContinuousHandler", () => { it("returns not_found for non-existent instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; return yield* handler.handle({ type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "test", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.terminate"); @@ -315,7 +354,7 @@ describe("ContinuousHandler", () => { it("returns not_found when called twice (since first terminate deletes all storage)", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -324,6 +363,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -332,6 +372,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "first terminate", }); @@ -340,9 +381,11 @@ describe("ContinuousHandler", () => { type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "second terminate", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.terminate"); @@ -355,7 +398,7 @@ describe("ContinuousHandler", () => { it("triggers immediate execution", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -364,6 +407,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -372,8 +416,10 @@ describe("ContinuousHandler", () => { type: "continuous", action: "trigger", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.trigger"); @@ -387,15 +433,17 @@ describe("ContinuousHandler", () => { it("returns triggered: false for non-existent instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; return yield* handler.handle({ type: "continuous", action: "trigger", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.trigger"); @@ -405,7 +453,7 @@ describe("ContinuousHandler", () => { it("returns triggered: false for terminated instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -413,6 +461,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -420,6 +469,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "terminating", }); @@ -427,8 +477,10 @@ describe("ContinuousHandler", () => { type: "continuous", action: "trigger", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.trigger"); @@ -440,7 +492,7 @@ describe("ContinuousHandler", () => { it("returns status for running instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -448,6 +500,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -455,8 +508,10 @@ describe("ContinuousHandler", () => { type: "continuous", action: "status", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.status"); @@ -467,15 +522,17 @@ describe("ContinuousHandler", () => { it("returns not_found for non-existent instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; return yield* handler.handle({ type: "continuous", action: "status", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.status"); @@ -487,7 +544,7 @@ describe("ContinuousHandler", () => { it("returns current state", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -495,6 +552,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 5, lastRun: null }, }); @@ -502,8 +560,10 @@ describe("ContinuousHandler", () => { type: "continuous", action: "getState", name: "counter", + id: "counter-1", }); - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.getState"); @@ -516,7 +576,7 @@ describe("ContinuousHandler", () => { it("executes on alarm and schedules next", async () => { const { layer, time, handles } = createTestLayer(1000000); - await Effect.runPromise( + await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -525,6 +585,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -534,7 +595,8 @@ describe("ContinuousHandler", () => { // Advance time and simulate alarm time.advance(Duration.toMillis("30 minutes")); yield* handler.handleAlarm(); - }).pipe(Effect.provide(layer)) + }), + layer ); // Verify execution happened @@ -549,7 +611,7 @@ describe("ContinuousHandler", () => { it("does nothing when terminated", async () => { const { layer, time } = createTestLayer(); - await Effect.runPromise( + await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -557,6 +619,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "counter", + id: "counter-1", input: { count: 0, lastRun: null }, }); @@ -564,6 +627,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "terminate", name: "counter", + id: "counter-1", reason: "terminating", }); @@ -571,7 +635,8 @@ describe("ContinuousHandler", () => { time.advance(Duration.toMillis("30 minutes")); yield* handler.handleAlarm(); - }).pipe(Effect.provide(layer)) + }), + layer ); // No execution should happen @@ -583,7 +648,7 @@ describe("ContinuousHandler", () => { it("fails with ExecutionError when execute fails without retry config", async () => { const { layer } = createTestLayer(); - const resultExit = await Effect.runPromiseExit( + const resultExit = await runExitWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -591,9 +656,11 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "failing", + id: "failing-1", input: { count: 0, lastRun: null }, }); - }).pipe(Effect.provide(layer)) + }), + layer ); // Without onError or retry, the job should fail with an error @@ -605,7 +672,7 @@ describe("ContinuousHandler", () => { it("terminates on first run when condition met (purges state by default)", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -614,6 +681,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "terminating", + id: "terminating-1", input: { maxRuns: 1, currentRun: 0 }, }); @@ -622,10 +690,12 @@ describe("ContinuousHandler", () => { type: "continuous", action: "status", name: "terminating", + id: "terminating-1", }); return { startResult, statusResult }; - }).pipe(Effect.provide(layer)) + }), + layer ); // Start should return terminated status @@ -645,7 +715,7 @@ describe("ContinuousHandler", () => { it("terminates during alarm and stops further alarms", async () => { const { layer, time, handles } = createTestLayer(1000000); - await Effect.runPromise( + await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -654,6 +724,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "terminating", + id: "terminating-1", input: { maxRuns: 2, currentRun: 0 }, }); @@ -670,11 +741,13 @@ describe("ContinuousHandler", () => { type: "continuous", action: "status", name: "terminating", + id: "terminating-1", }); // After purge, job is completely deleted expect((status as any).status).toBe("not_found"); - }).pipe(Effect.provide(layer)) + }), + layer ); // Execution happened once (during alarm) @@ -692,7 +765,7 @@ describe("ContinuousHandler", () => { it("trigger action returns terminated: true when terminate called", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -701,6 +774,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "terminating", + id: "terminating-1", input: { maxRuns: 2, currentRun: 0 }, }); @@ -709,10 +783,12 @@ describe("ContinuousHandler", () => { type: "continuous", action: "trigger", name: "terminating", + id: "terminating-1", }); return triggerResult; - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.trigger"); @@ -723,7 +799,7 @@ describe("ContinuousHandler", () => { it("trigger action returns triggered: false for terminated instance", async () => { const { layer } = createTestLayer(); - const result = await Effect.runPromise( + const result = await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -732,6 +808,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "terminating", + id: "terminating-1", input: { maxRuns: 1, currentRun: 0 }, }); @@ -740,10 +817,12 @@ describe("ContinuousHandler", () => { type: "continuous", action: "trigger", name: "terminating", + id: "terminating-1", }); return triggerResult; - }).pipe(Effect.provide(layer)) + }), + layer ); expect(result._type).toBe("continuous.trigger"); @@ -753,7 +832,7 @@ describe("ContinuousHandler", () => { it("handleAlarm does nothing for terminated instance", async () => { const { layer, time } = createTestLayer(); - await Effect.runPromise( + await runWithLayer( Effect.gen(function* () { const handler = yield* ContinuousHandler; @@ -762,6 +841,7 @@ describe("ContinuousHandler", () => { type: "continuous", action: "start", name: "terminating", + id: "terminating-1", input: { maxRuns: 1, currentRun: 0 }, }); @@ -772,7 +852,8 @@ describe("ContinuousHandler", () => { // Try to trigger alarm on terminated instance time.advance(Duration.toMillis("10 minutes")); yield* handler.handleAlarm(); - }).pipe(Effect.provide(layer)) + }), + layer ); // No execution should happen diff --git a/packages/jobs/test/handlers/tracking.test.ts b/packages/jobs/test/handlers/tracking.test.ts new file mode 100644 index 0000000..bf46bdc --- /dev/null +++ b/packages/jobs/test/handlers/tracking.test.ts @@ -0,0 +1,375 @@ +// packages/jobs/test/handlers/tracking.test.ts + +import { describe, it, expect, beforeEach } from "vitest"; +import { Effect, Layer, Schema } from "effect"; +import { + createTestRuntime, + createInMemoryTracker, + EventTracker, + type EventTrackerService, + type InternalJobExecutedEvent, + type InternalJobFailedEvent, + type InternalTrackingEvent, +} from "@durable-effect/core"; +import { + ContinuousHandler, + ContinuousHandlerLayer, +} from "../../src/handlers/continuous"; +import { MetadataServiceLayer } from "../../src/services/metadata"; +import { AlarmServiceLayer } from "../../src/services/alarm"; +import { RegistryServiceLayer } from "../../src/services/registry"; +import { JobExecutionServiceLayer } from "../../src/services/execution"; +import { CleanupServiceLayer } from "../../src/services/cleanup"; +import { RetryExecutorLayer } from "../../src/retry"; +import { Continuous } from "../../src/definitions/continuous"; +import type { RuntimeJobRegistry } from "../../src/registry/typed"; + +// ============================================================================= +// Test Fixtures +// ============================================================================= + +const CounterState = Schema.Struct({ + count: Schema.Number, + lastRun: Schema.NullOr(Schema.Number), +}); +type CounterState = typeof CounterState.Type; + +const executionLog: Array<{ + instanceId: string; + runCount: number; + state: CounterState; +}> = []; + +const counterPrimitive = Continuous.make({ + stateSchema: CounterState, + schedule: Continuous.every("30 minutes"), + execute: (ctx) => + Effect.gen(function* () { + const currentState = yield* ctx.state; + executionLog.push({ + instanceId: ctx.instanceId, + runCount: ctx.runCount, + state: currentState, + }); + yield* ctx.updateState((s) => ({ + count: s.count + 1, + lastRun: Date.now(), + })); + }), +}); + +const failingPrimitive = Continuous.make({ + stateSchema: CounterState, + schedule: Continuous.every("1 hour"), + execute: () => Effect.fail(new Error("Intentional test failure")), +}); + +// Create test registry +const createTestRegistry = (): RuntimeJobRegistry => ({ + continuous: { + counter: { ...counterPrimitive, name: "counter" }, + failing: { ...failingPrimitive, name: "failing" }, + } as Record, + debounce: {} as Record, + workerPool: {} as Record, + task: {} as Record, +}); + +// ============================================================================= +// Test Helpers +// ============================================================================= + +// Helper to run Effect with layer, bypassing strict R parameter checking +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const runWithLayer = ( + effect: Effect.Effect, + layer: Layer.Layer +): Promise => + Effect.runPromise( + effect.pipe(Effect.provide(layer)) as Effect.Effect + ); + +// ============================================================================= +// Test Setup with InMemoryTracker +// ============================================================================= + +const createTestLayerWithTracking = (initialTime = 1000000) => { + const { + layer: coreLayer, + time, + handles, + } = createTestRuntime("test-instance", initialTime); + const registry = createTestRegistry(); + + // Create in-memory tracker synchronously using Effect.runSync + const { service: trackerService, handle: trackerHandle } = Effect.runSync( + createInMemoryTracker() + ); + + // Cast to base EventTrackerService for Layer.succeed compatibility + const trackerLayer = Layer.succeed( + EventTracker, + trackerService as EventTrackerService + ); + + const servicesLayer = Layer.mergeAll( + MetadataServiceLayer, + AlarmServiceLayer + ).pipe(Layer.provideMerge(trackerLayer), Layer.provideMerge(coreLayer)); + + const retryLayer = RetryExecutorLayer.pipe(Layer.provideMerge(servicesLayer)); + + const cleanupLayer = CleanupServiceLayer.pipe( + Layer.provideMerge(servicesLayer) + ); + + const executionLayer = JobExecutionServiceLayer.pipe( + Layer.provideMerge(retryLayer), + Layer.provideMerge(cleanupLayer), + Layer.provideMerge(coreLayer) + ); + + const handlerLayer = ContinuousHandlerLayer.pipe( + Layer.provideMerge(RegistryServiceLayer(registry)), + Layer.provideMerge(servicesLayer), + Layer.provideMerge(retryLayer), + Layer.provideMerge(executionLayer) + ) as Layer.Layer; + + return { layer: handlerLayer, time, handles, coreLayer, trackerHandle }; +}; + +// ============================================================================= +// Tests +// ============================================================================= + +describe("Pre-Execution State Tracking", () => { + beforeEach(() => { + executionLog.length = 0; + }); + + describe("job.executed event", () => { + it("includes preExecutionState in job.executed event on success", async () => { + const { layer, trackerHandle } = createTestLayerWithTracking(1000000); + const initialState = { count: 5, lastRun: null }; + + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "counter", + id: "test-counter-1", + input: initialState, + }); + }), + layer + ); + + // Get all events from tracker + const events = await Effect.runPromise(trackerHandle.getEvents()); + + // Find job.executed event + const executedEvents = events.filter( + (e): e is InternalJobExecutedEvent => e.type === "job.executed" + ); + + expect(executedEvents).toHaveLength(1); + expect(executedEvents[0].preExecutionState).toEqual(initialState); + expect(executedEvents[0].runCount).toBe(1); + expect(executedEvents[0].attempt).toBe(1); + }); + + it("captures state before execute modifies it", async () => { + const { layer, trackerHandle } = createTestLayerWithTracking(1000000); + const initialState = { count: 10, lastRun: null }; + + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "counter", + id: "test-counter-2", + input: initialState, + }); + }), + layer + ); + + // Get events + const events = await Effect.runPromise(trackerHandle.getEvents()); + const executedEvent = events.find( + (e): e is InternalJobExecutedEvent => e.type === "job.executed" + ); + + // preExecutionState should be the ORIGINAL state (count: 10) + // NOT the modified state (count: 11) + expect(executedEvent?.preExecutionState).toEqual(initialState); + + // Verify the execution log shows state was modified during execution + expect(executionLog[0].state).toEqual(initialState); + }); + + it("includes preExecutionState with zero count when state has zero", async () => { + const { layer, trackerHandle } = createTestLayerWithTracking(1000000); + + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "counter", + id: "test-counter-3", + input: { count: 0, lastRun: null }, + }); + }), + layer + ); + + const events = await Effect.runPromise(trackerHandle.getEvents()); + const executedEvent = events.find( + (e): e is InternalJobExecutedEvent => e.type === "job.executed" + ); + + expect(executedEvent?.preExecutionState).toEqual({ + count: 0, + lastRun: null, + }); + }); + }); + + describe("job.failed event", () => { + it("includes preExecutionState in job.failed event on error", async () => { + const { layer, trackerHandle } = createTestLayerWithTracking(1000000); + const initialState = { count: 42, lastRun: 999 }; + + // Run failing job - expect it to throw + try { + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "failing", + id: "test-failing-1", + input: initialState, + }); + }), + layer + ); + } catch { + // Expected to fail + } + + // Get events from tracker + const events = await Effect.runPromise(trackerHandle.getEvents()); + + // Find job.failed event + const failedEvents = events.filter( + (e): e is InternalJobFailedEvent => e.type === "job.failed" + ); + + expect(failedEvents).toHaveLength(1); + expect(failedEvents[0].preExecutionState).toEqual(initialState); + expect(failedEvents[0].willRetry).toBe(false); + expect(failedEvents[0].error.message).toContain( + "Intentional test failure" + ); + }); + }); + + describe("multiple executions", () => { + it("captures correct preExecutionState for each execution", async () => { + const { layer, trackerHandle, time } = + createTestLayerWithTracking(1000000); + const initialState = { count: 0, lastRun: null }; + + // First execution via start + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "counter", + id: "test-counter-multi", + input: initialState, + }); + }), + layer + ); + + // Clear tracker for clean second test + await Effect.runPromise(trackerHandle.clear()); + + // Advance time and trigger alarm for second execution + time.set(1000000 + 30 * 60 * 1000 + 1000); // 30 minutes + buffer + + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handleAlarm(); + }), + layer + ); + + // Get events from second execution + const events = await Effect.runPromise(trackerHandle.getEvents()); + const executedEvent = events.find( + (e): e is InternalJobExecutedEvent => e.type === "job.executed" + ); + + // Second execution should see count: 1 (after first execution modified it) + expect(executedEvent?.preExecutionState).toMatchObject({ + count: 1, + }); + }); + }); + + describe("event metadata", () => { + it("includes all required fields alongside preExecutionState", async () => { + const { layer, trackerHandle } = createTestLayerWithTracking(1000000); + + await runWithLayer( + Effect.gen(function* () { + const handler = yield* ContinuousHandler; + yield* handler.handle({ + type: "continuous", + action: "start", + name: "counter", + id: "test-counter-metadata", + input: { count: 100, lastRun: null }, + }); + }), + layer + ); + + const events = await Effect.runPromise(trackerHandle.getEvents()); + const executedEvent = events.find( + (e): e is InternalJobExecutedEvent => e.type === "job.executed" + ); + + // Verify all fields are present + expect(executedEvent).toMatchObject({ + type: "job.executed", + source: "job", + instanceId: "test-instance", + jobType: "continuous", + jobName: "counter", + runCount: 1, + attempt: 1, + preExecutionState: { count: 100, lastRun: null }, + }); + + // Event should have eventId and timestamp + expect(executedEvent?.eventId).toBeDefined(); + expect(executedEvent?.timestamp).toBeDefined(); + expect(typeof executedEvent?.durationMs).toBe("number"); + }); + }); +}); diff --git a/packages/jobs/test/registry.test.ts b/packages/jobs/test/registry.test.ts index d6fe08e..2004892 100644 --- a/packages/jobs/test/registry.test.ts +++ b/packages/jobs/test/registry.test.ts @@ -28,7 +28,6 @@ const TokenState = Schema.Struct({ const testContinuous: UnregisteredContinuousDefinition< typeof TokenState.Type, - never, never > = { _tag: "ContinuousDefinition", @@ -43,14 +42,18 @@ const WebhookEvent = Schema.Struct({ data: Schema.Unknown, }); +const DebounceState = Schema.Struct({ + events: Schema.Array(WebhookEvent), +}); + const testDebounce: UnregisteredDebounceDefinition< typeof WebhookEvent.Type, - { events: Array }, - never, + typeof DebounceState.Type, never > = { _tag: "DebounceDefinition", eventSchema: WebhookEvent, + stateSchema: DebounceState, flushAfter: "5 minutes", maxEvents: 100, execute: () => Effect.void, @@ -61,7 +64,7 @@ const EmailEvent = Schema.Struct({ template: Schema.String, }); -const testWorkerPool: UnregisteredWorkerPoolDefinition = { +const testWorkerPool: UnregisteredWorkerPoolDefinition = { _tag: "WorkerPoolDefinition", eventSchema: EmailEvent, concurrency: 5, @@ -261,7 +264,7 @@ describe("getAllJobNames", () => { }); it("handles multiple definitions per type", () => { - const secondContinuous: UnregisteredContinuousDefinition = { + const secondContinuous: UnregisteredContinuousDefinition = { _tag: "ContinuousDefinition", stateSchema: Schema.Unknown, schedule: { _tag: "Every", interval: "1 minute" }, diff --git a/packages/jobs/test/services/metadata.test.ts b/packages/jobs/test/services/metadata.test.ts index bc2f68c..c94400e 100644 --- a/packages/jobs/test/services/metadata.test.ts +++ b/packages/jobs/test/services/metadata.test.ts @@ -6,7 +6,7 @@ import { createTestRuntime } from "@durable-effect/core"; import { MetadataService, MetadataServiceLayer, - type PrimitiveMetadata, + type JobMetadata, } from "../../src/services/metadata"; describe("MetadataService", () => { @@ -46,7 +46,7 @@ describe("MetadataService", () => { status: "initializing", createdAt: 1000000, updatedAt: 1000000, - } satisfies PrimitiveMetadata); + } satisfies JobMetadata); }); it("updates status", async () => { @@ -71,7 +71,7 @@ describe("MetadataService", () => { status: "running", createdAt: 1000000, updatedAt: 1005000, - } satisfies PrimitiveMetadata); + } satisfies JobMetadata); }); it("does not update status if not initialized", async () => { diff --git a/packages/jobs/tsconfig.test.json b/packages/jobs/tsconfig.test.json new file mode 100644 index 0000000..711fe80 --- /dev/null +++ b/packages/jobs/tsconfig.test.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "noEmit": true, + "types": ["@cloudflare/workers-types"] + }, + "include": ["src", "test"] +}