diff --git a/examples/effect-worker-v2/src/jobs/basic-debounce.ts b/examples/effect-worker-v2/src/jobs/basic-debounce.ts index b5d9022..a26a480 100644 --- a/examples/effect-worker-v2/src/jobs/basic-debounce.ts +++ b/examples/effect-worker-v2/src/jobs/basic-debounce.ts @@ -6,6 +6,12 @@ class Random extends Context.Tag("MyRandomService")< { readonly next: Effect.Effect } >() {} +import { Layer } from "effect"; + +const RandomLive = Layer.succeed(Random, { + next: Effect.sync(() => Math.random()), +}); + // ============================================================================= // Debounce Job - Batches events and flushes after delay // ============================================================================= @@ -69,5 +75,5 @@ export const debounceExample = Debounce.make({ yield* Effect.log( `Debounce flushed! Events: ${eventCount}, Last action: ${state?.actionId}, Reason: ${ctx.flushReason}`, ); - }), + }).pipe(Effect.provide(RandomLive)), }); diff --git a/packages/jobs/package.json b/packages/jobs/package.json index 2ba2a3b..67a5fc9 100644 --- a/packages/jobs/package.json +++ b/packages/jobs/package.json @@ -1,6 +1,6 @@ { "name": "@durable-effect/jobs", - "version": "0.0.1-next.5", + "version": "0.0.1-next.6", "type": "module", "main": "./dist/index.js", "types": "./dist/index.d.ts", diff --git a/packages/jobs/src/definitions/continuous.ts b/packages/jobs/src/definitions/continuous.ts index 3480b43..95c3274 100644 --- a/packages/jobs/src/definitions/continuous.ts +++ b/packages/jobs/src/definitions/continuous.ts @@ -16,7 +16,7 @@ import type { JobRetryConfig } from "../retry/types"; /** * Input config for creating a continuous job definition. */ -export interface ContinuousMakeConfig { +export interface ContinuousMakeConfig { /** * Schema for validating and serializing state. * Accepts any Effect Schema (Struct, Class, etc.) @@ -73,8 +73,20 @@ export interface ContinuousMakeConfig { /** * The function to execute on schedule. + * + * Must return Effect - all service requirements must be satisfied. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). + * + * @example + * ```ts + * execute: (ctx) => + * Effect.gen(function* () { + * const random = yield* Random; + * // ... + * }).pipe(Effect.provide(RandomLive)) + * ``` */ - execute(ctx: ContinuousContext): Effect.Effect; + execute(ctx: ContinuousContext): Effect.Effect; } /** @@ -116,9 +128,9 @@ export const Continuous = { * @param config - Configuration for the job * @returns An UnregisteredContinuousDefinition that can be registered */ - make: ( - config: ContinuousMakeConfig - ): UnregisteredContinuousDefinition => ({ + make: ( + config: ContinuousMakeConfig + ): UnregisteredContinuousDefinition => ({ _tag: "ContinuousDefinition", stateSchema: config.stateSchema, schedule: config.schedule, diff --git a/packages/jobs/src/definitions/debounce.ts b/packages/jobs/src/definitions/debounce.ts index 34fd06b..f64cd10 100644 --- a/packages/jobs/src/definitions/debounce.ts +++ b/packages/jobs/src/definitions/debounce.ts @@ -15,8 +15,11 @@ import type { JobRetryConfig } from "../retry/types"; /** * Configuration for creating a debounce job definition. + * + * Note: All handler functions must return Effect with R = never. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). */ -export interface DebounceMakeConfig { +export interface DebounceMakeConfig { /** * Schema for validating incoming events. */ @@ -76,14 +79,24 @@ export interface DebounceMakeConfig { /** * Reducer for each incoming event. Defaults to returning the latest event. + * Must return Effect - all service requirements must be satisfied. */ - onEvent?(ctx: DebounceEventContext): Effect.Effect; + onEvent?(ctx: DebounceEventContext): Effect.Effect; /** * Effect executed when the debounce flushes. + * Must return Effect - all service requirements must be satisfied. + * + * @example + * ```ts + * execute: (ctx) => + * Effect.gen(function* () { + * const random = yield* Random; + * // ... + * }).pipe(Effect.provide(RandomLive)) + * ``` */ - execute(ctx: DebounceExecuteContext): Effect.Effect; - + execute(ctx: DebounceExecuteContext): Effect.Effect; } /** @@ -99,14 +112,14 @@ export interface DebounceMakeConfig { * ``` */ export const Debounce = { - make: ( - config: DebounceMakeConfig - ): UnregisteredDebounceDefinition => ({ + make: ( + config: DebounceMakeConfig + ): UnregisteredDebounceDefinition => ({ _tag: "DebounceDefinition", eventSchema: config.eventSchema, stateSchema: (config.stateSchema ?? config.eventSchema) as Schema.Schema< S, - any, + unknown, never >, flushAfter: config.flushAfter, diff --git a/packages/jobs/src/definitions/task.ts b/packages/jobs/src/definitions/task.ts index 492557c..5d690c5 100644 --- a/packages/jobs/src/definitions/task.ts +++ b/packages/jobs/src/definitions/task.ts @@ -22,12 +22,14 @@ import type { * - Execute runs when alarm fires * - User controls the full lifecycle via schedule/clear * + * Note: All handler functions must return Effect with R = never. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). + * * @typeParam S - State type (inferred from stateSchema) * @typeParam E - Event type (inferred from eventSchema) * @typeParam Err - Error type (inferred from handlers) - * @typeParam R - Effect requirements (inferred from handlers) */ -export interface TaskMakeConfig { +export interface TaskMakeConfig { /** * Schema for validating and serializing state. * State is persisted durably and survives restarts. @@ -42,6 +44,7 @@ export interface TaskMakeConfig { /** * Handler called for each incoming event. + * Must return Effect - all service requirements must be satisfied. * * The event is passed as the first parameter (not on ctx) to make it * clear that it's a direct value, not an Effect that needs yielding. @@ -76,10 +79,11 @@ export interface TaskMakeConfig { * }) * ``` */ - onEvent(event: E, ctx: TaskEventContext): Effect.Effect; + onEvent(event: E, ctx: TaskEventContext): Effect.Effect; /** * Handler called when the scheduled alarm fires. + * Must return Effect - all service requirements must be satisfied. * * Responsibilities: * - Process the current state @@ -100,11 +104,12 @@ export interface TaskMakeConfig { * }) * ``` */ - execute(ctx: TaskExecuteContext): Effect.Effect; + execute(ctx: TaskExecuteContext): Effect.Effect; /** * Optional handler called when either `onEvent` or `execute` completes * and no alarm is scheduled. + * Must return Effect - all service requirements must be satisfied. * * Use this to: * - Schedule delayed cleanup @@ -119,10 +124,11 @@ export interface TaskMakeConfig { * }) * ``` */ - readonly onIdle?: (ctx: TaskIdleContext) => Effect.Effect; + readonly onIdle?: (ctx: TaskIdleContext) => Effect.Effect; /** * Optional error handler for onEvent/execute failures. + * Must return Effect - all service requirements must be satisfied. * * Use this to: * - Log errors @@ -151,7 +157,7 @@ export interface TaskMakeConfig { readonly onError?: ( error: Err, ctx: TaskErrorContext - ) => Effect.Effect; + ) => Effect.Effect; /** * Control logging for this job. @@ -269,9 +275,9 @@ export const Task = { * @param config - Configuration for the task * @returns An UnregisteredTaskDefinition that can be registered */ - make: ( - config: TaskMakeConfig - ): UnregisteredTaskDefinition => ({ + make: ( + config: TaskMakeConfig + ): UnregisteredTaskDefinition => ({ _tag: "TaskDefinition", stateSchema: config.stateSchema, eventSchema: config.eventSchema, diff --git a/packages/jobs/src/handlers/continuous/handler.ts b/packages/jobs/src/handlers/continuous/handler.ts index 509360c..dfdcdf5 100644 --- a/packages/jobs/src/handlers/continuous/handler.ts +++ b/packages/jobs/src/handlers/continuous/handler.ts @@ -50,7 +50,7 @@ export const ContinuousHandlerLayer = Layer.effect( const getDefinition = ( name: string, - ): Effect.Effect, JobNotFoundError> => { + ): Effect.Effect => { const def = registryService.registry.continuous[name]; if (!def) { return Effect.fail(new JobNotFoundError({ type: "continuous", name })); @@ -78,7 +78,7 @@ export const ContinuousHandlerLayer = Layer.effect( }); const scheduleNext = ( - def: StoredContinuousDefinition, + def: StoredContinuousDefinition, ): Effect.Effect => { const schedule = def.schedule; switch (schedule._tag) { @@ -94,7 +94,7 @@ export const ContinuousHandlerLayer = Layer.effect( }; const runExecution = ( - def: StoredContinuousDefinition, + def: StoredContinuousDefinition, runCount: number, id?: string, ) => @@ -107,7 +107,7 @@ export const ContinuousHandlerLayer = Layer.effect( id, run: (ctx: ContinuousContext) => def.execute(ctx), createContext: (base) => { - const proxyHolder: StateHolder = { + const proxyHolder: StateHolder = { get current() { return base.getState(); }, @@ -132,9 +132,9 @@ export const ContinuousHandlerLayer = Layer.effect( }); const handleStart = ( - def: StoredContinuousDefinition, + def: StoredContinuousDefinition, request: ContinuousRequest, - ): Effect.Effect => + ): Effect.Effect => Effect.gen(function* () { const existing = yield* metadata.get(); if (existing) { @@ -251,8 +251,8 @@ export const ContinuousHandlerLayer = Layer.effect( }); const handleTrigger = ( - def: StoredContinuousDefinition, - ): Effect.Effect => + def: StoredContinuousDefinition, + ): Effect.Effect => Effect.gen(function* () { const existing = yield* metadata.get(); if ( @@ -321,7 +321,7 @@ export const ContinuousHandlerLayer = Layer.effect( }); const handleGetState = ( - def: StoredContinuousDefinition, + def: StoredContinuousDefinition, ): Effect.Effect => Effect.gen(function* () { const stateService = yield* createEntityStateService( diff --git a/packages/jobs/src/handlers/debounce/handler.ts b/packages/jobs/src/handlers/debounce/handler.ts index 3809d9f..c602220 100644 --- a/packages/jobs/src/handlers/debounce/handler.ts +++ b/packages/jobs/src/handlers/debounce/handler.ts @@ -59,7 +59,7 @@ export const DebounceHandlerLayer = Layer.effect( const getDefinition = ( name: string, - ): Effect.Effect, JobNotFoundError> => { + ): Effect.Effect => { const def = registryService.registry.debounce[name]; if (!def) { return Effect.fail(new JobNotFoundError({ type: "debounce", name })); @@ -91,7 +91,7 @@ export const DebounceHandlerLayer = Layer.effect( }); const runFlush = ( - def: StoredDebounceDefinition, + def: StoredDebounceDefinition, flushReason: "maxEvents" | "flushAfter" | "manual", id?: string, ) => @@ -102,7 +102,7 @@ export const DebounceHandlerLayer = Layer.effect( retryConfig: def.retry, runCount: 0, // Debounce doesn't track runCount persistently in same way id, - run: (ctx: DebounceExecuteContext) => def.execute(ctx), + run: (ctx: DebounceExecuteContext) => def.execute(ctx), createContext: (base) => { return { state: Effect.succeed(base.getState()), // Snapshotted state @@ -116,14 +116,14 @@ export const DebounceHandlerLayer = Layer.effect( flushReason, attempt: base.attempt, isRetry: base.isRetry, - } as DebounceExecuteContext; + } as DebounceExecuteContext; }, }); const handleAdd = ( - def: StoredDebounceDefinition, + def: StoredDebounceDefinition, request: DebounceRequest, - ): Effect.Effect => + ): Effect.Effect => Effect.gen(function* () { const meta = yield* metadata.get(); const created = !meta; @@ -158,13 +158,12 @@ export const DebounceHandlerLayer = Layer.effect( const stateForContext = currentState ?? (validatedEvent as unknown); const onEvent = def.onEvent!; - // Cast is still needed unless we fix Definition generic constraints const reducedState = yield* onEvent({ event: validatedEvent as unknown, state: stateForContext, eventCount: nextCount, instanceId: runtime.instanceId, - } as any); + }); yield* stateService.set(reducedState); yield* setEventCount(nextCount); @@ -244,9 +243,9 @@ export const DebounceHandlerLayer = Layer.effect( }); const handleFlush = ( - def: StoredDebounceDefinition, + def: StoredDebounceDefinition, reason: "manual" | "flushAfter" | "maxEvents", - ): Effect.Effect => + ): Effect.Effect => Effect.gen(function* () { const meta = yield* metadata.get(); if (!meta) { @@ -353,7 +352,7 @@ export const DebounceHandlerLayer = Layer.effect( }); const handleGetState = ( - def: StoredDebounceDefinition, + def: StoredDebounceDefinition, ): Effect.Effect => Effect.gen(function* () { const stateService = yield* withStorage( diff --git a/packages/jobs/src/handlers/task/handler.ts b/packages/jobs/src/handlers/task/handler.ts index 5add0ad..24d61b1 100644 --- a/packages/jobs/src/handlers/task/handler.ts +++ b/packages/jobs/src/handlers/task/handler.ts @@ -59,7 +59,7 @@ export const TaskHandlerLayer = Layer.effect( const getDefinition = ( name: string, - ): Effect.Effect, JobNotFoundError> => { + ): Effect.Effect => { const def = registryService.registry.task[name]; if (!def) { return Effect.fail(new JobNotFoundError({ type: "task", name })); @@ -146,10 +146,10 @@ export const TaskHandlerLayer = Layer.effect( }); const runExecution = ( - def: StoredTaskDefinition, + def: StoredTaskDefinition, runCount: number, triggerType: "execute" | "onEvent", - event?: any, + event?: unknown, id?: string, ) => execution.execute({ @@ -168,7 +168,7 @@ export const TaskHandlerLayer = Layer.effect( }; // Proxy state holder for the legacy context factories - const proxyStateHolder: TaskStateHolder = { + const proxyStateHolder: TaskStateHolder = { get current() { return base.getState(); }, @@ -191,7 +191,7 @@ export const TaskHandlerLayer = Layer.effect( // Define error handler for this scope const runWithErrorHandling = ( - effect: Effect.Effect, + effect: Effect.Effect, errorSource: "onEvent" | "execute", ) => effect.pipe( @@ -320,9 +320,9 @@ export const TaskHandlerLayer = Layer.effect( }); const handleSend = ( - def: StoredTaskDefinition, + def: StoredTaskDefinition, request: TaskRequest, - ): Effect.Effect => + ): Effect.Effect => Effect.gen(function* () { const meta = yield* metadata.get(); const created = !meta; @@ -364,8 +364,8 @@ export const TaskHandlerLayer = Layer.effect( }); const handleTrigger = ( - def: StoredTaskDefinition, - ): Effect.Effect => + def: StoredTaskDefinition, + ): Effect.Effect => Effect.gen(function* () { const meta = yield* metadata.get(); if (!meta) { @@ -436,7 +436,7 @@ export const TaskHandlerLayer = Layer.effect( }); const handleGetState = ( - def: StoredTaskDefinition, + def: StoredTaskDefinition, ): Effect.Effect => Effect.gen(function* () { const meta = yield* metadata.get(); diff --git a/packages/jobs/src/registry/registry.ts b/packages/jobs/src/registry/registry.ts index c4284f7..f3b8e2d 100644 --- a/packages/jobs/src/registry/registry.ts +++ b/packages/jobs/src/registry/registry.ts @@ -39,9 +39,9 @@ export function createJobRegistry< T extends Record, >(definitions: T): JobRegistry { const registry: JobRegistry = { - continuous: new Map>(), - debounce: new Map>(), - workerPool: new Map>(), + continuous: new Map>(), + debounce: new Map>(), + workerPool: new Map>(), }; // Type casts are needed because Object.entries loses the discriminated union @@ -53,19 +53,19 @@ export function createJobRegistry< case "ContinuousDefinition": registry.continuous.set( name, - withName as ContinuousDefinition, + withName as ContinuousDefinition, ); break; case "DebounceDefinition": registry.debounce.set( name, - withName as DebounceDefinition, + withName as DebounceDefinition, ); break; case "WorkerPoolDefinition": registry.workerPool.set( name, - withName as WorkerPoolDefinition, + withName as WorkerPoolDefinition, ); break; } @@ -146,17 +146,17 @@ export function toRuntimeRegistry< return { continuous: registry.continuous as Record< string, - StoredContinuousDefinition + StoredContinuousDefinition >, debounce: registry.debounce as Record< string, - StoredDebounceDefinition + StoredDebounceDefinition >, workerPool: registry.workerPool as Record< string, - StoredWorkerPoolDefinition + StoredWorkerPoolDefinition >, - task: registry.task as Record>, + task: registry.task as Record, }; } @@ -171,7 +171,7 @@ export function toRuntimeRegistry< export function getContinuousDefinition( registry: JobRegistry, name: string, -): ContinuousDefinition | undefined { +): ContinuousDefinition | undefined { return registry.continuous.get(name); } @@ -182,7 +182,7 @@ export function getContinuousDefinition( export function getDebounceDefinition( registry: JobRegistry, name: string, -): DebounceDefinition | undefined { +): DebounceDefinition | undefined { return registry.debounce.get(name); } @@ -193,7 +193,7 @@ export function getDebounceDefinition( export function getWorkerPoolDefinition( registry: JobRegistry, name: string, -): WorkerPoolDefinition | undefined { +): WorkerPoolDefinition | undefined { return registry.workerPool.get(name); } diff --git a/packages/jobs/src/registry/typed.ts b/packages/jobs/src/registry/typed.ts index 9331249..8ad8902 100644 --- a/packages/jobs/src/registry/typed.ts +++ b/packages/jobs/src/registry/typed.ts @@ -30,32 +30,30 @@ import type { /** * Extract keys from T that are continuous definitions. - * - * Note: Uses `unknown` for error type to match AnyUnregisteredDefinition. */ export type ContinuousKeysOf> = { - [K in keyof T]: T[K] extends UnregisteredContinuousDefinition ? K : never; + [K in keyof T]: T[K] extends UnregisteredContinuousDefinition ? K : never; }[keyof T] & string; /** * Extract keys from T that are debounce definitions. */ export type DebounceKeysOf> = { - [K in keyof T]: T[K] extends UnregisteredDebounceDefinition ? K : never; + [K in keyof T]: T[K] extends UnregisteredDebounceDefinition ? K : never; }[keyof T] & string; /** * Extract keys from T that are workerPool definitions. */ export type WorkerPoolKeysOf> = { - [K in keyof T]: T[K] extends UnregisteredWorkerPoolDefinition ? K : never; + [K in keyof T]: T[K] extends UnregisteredWorkerPoolDefinition ? K : never; }[keyof T] & string; /** * Extract keys from T that are task definitions. */ export type TaskKeysOf> = { - [K in keyof T]: T[K] extends UnregisteredTaskDefinition ? K : never; + [K in keyof T]: T[K] extends UnregisteredTaskDefinition ? K : never; }[keyof T] & string; // ============================================================================= @@ -68,7 +66,7 @@ export type TaskKeysOf> = { export type ContinuousStateOf< T extends Record, K extends ContinuousKeysOf, -> = T[K] extends UnregisteredContinuousDefinition ? S : never; +> = T[K] extends UnregisteredContinuousDefinition ? S : never; /** * Extract the error type from a continuous definition. @@ -76,7 +74,7 @@ export type ContinuousStateOf< export type ContinuousErrorOf< T extends Record, K extends ContinuousKeysOf, -> = T[K] extends UnregisteredContinuousDefinition ? E : never; +> = T[K] extends UnregisteredContinuousDefinition ? E : never; /** * Extract the event type from a debounce definition. @@ -84,7 +82,7 @@ export type ContinuousErrorOf< export type DebounceEventOf< T extends Record, K extends DebounceKeysOf, -> = T[K] extends UnregisteredDebounceDefinition ? I : never; +> = T[K] extends UnregisteredDebounceDefinition ? I : never; /** * Extract the state type from a debounce definition. @@ -92,7 +90,7 @@ export type DebounceEventOf< export type DebounceStateOf< T extends Record, K extends DebounceKeysOf, -> = T[K] extends UnregisteredDebounceDefinition ? S : never; +> = T[K] extends UnregisteredDebounceDefinition ? S : never; /** * Extract the event type from a workerPool definition. @@ -100,7 +98,7 @@ export type DebounceStateOf< export type WorkerPoolEventOf< T extends Record, K extends WorkerPoolKeysOf, -> = T[K] extends UnregisteredWorkerPoolDefinition ? E : never; +> = T[K] extends UnregisteredWorkerPoolDefinition ? E : never; /** * Extract the state type from a task definition. @@ -108,7 +106,7 @@ export type WorkerPoolEventOf< export type TaskStateOf< T extends Record, K extends TaskKeysOf, -> = T[K] extends UnregisteredTaskDefinition ? S : never; +> = T[K] extends UnregisteredTaskDefinition ? S : never; /** * Extract the event type from a task definition. @@ -116,7 +114,7 @@ export type TaskStateOf< export type TaskEventOf< T extends Record, K extends TaskKeysOf, -> = T[K] extends UnregisteredTaskDefinition ? E : never; +> = T[K] extends UnregisteredTaskDefinition ? E : never; // ============================================================================= // Registered Definition Types (with name added) @@ -124,35 +122,33 @@ export type TaskEventOf< /** * Add the name property to a definition, making it a registered definition. - * - * Note: Uses `unknown` for error type in extends clause to match AnyUnregisteredDefinition. */ type RegisterContinuous< - D extends UnregisteredContinuousDefinition, + D extends UnregisteredContinuousDefinition, N extends string, -> = D extends UnregisteredContinuousDefinition - ? ContinuousDefinition & { readonly name: N } +> = D extends UnregisteredContinuousDefinition + ? ContinuousDefinition & { readonly name: N } : never; type RegisterDebounce< - D extends UnregisteredDebounceDefinition, + D extends UnregisteredDebounceDefinition, N extends string, -> = D extends UnregisteredDebounceDefinition - ? DebounceDefinition & { readonly name: N } +> = D extends UnregisteredDebounceDefinition + ? DebounceDefinition & { readonly name: N } : never; type RegisterWorkerPool< - D extends UnregisteredWorkerPoolDefinition, + D extends UnregisteredWorkerPoolDefinition, N extends string, -> = D extends UnregisteredWorkerPoolDefinition - ? WorkerPoolDefinition & { readonly name: N } +> = D extends UnregisteredWorkerPoolDefinition + ? WorkerPoolDefinition & { readonly name: N } : never; type RegisterTask< - D extends UnregisteredTaskDefinition, + D extends UnregisteredTaskDefinition, N extends string, -> = D extends UnregisteredTaskDefinition - ? TaskDefinition & { readonly name: N } +> = D extends UnregisteredTaskDefinition + ? TaskDefinition & { readonly name: N } : never; // ============================================================================= @@ -171,7 +167,7 @@ export interface TypedJobRegistry]: RegisterContinuous< - Extract>, + Extract>, K >; }; @@ -181,7 +177,7 @@ export interface TypedJobRegistry]: RegisterDebounce< - Extract>, + Extract>, K >; }; @@ -191,7 +187,7 @@ export interface TypedJobRegistry]: RegisterWorkerPool< - Extract>, + Extract>, K >; }; @@ -201,7 +197,7 @@ export interface TypedJobRegistry]: RegisterTask< - Extract>, + Extract>, K >; }; @@ -221,13 +217,13 @@ export interface TypedJobRegistry>; - readonly debounce: Record>; - readonly workerPool: Record>; - readonly task: Record>; + readonly continuous: Record; + readonly debounce: Record; + readonly workerPool: Record; + readonly task: Record; } // ============================================================================= diff --git a/packages/jobs/src/registry/types.ts b/packages/jobs/src/registry/types.ts index aa7fbc9..ca55f6f 100644 --- a/packages/jobs/src/registry/types.ts +++ b/packages/jobs/src/registry/types.ts @@ -37,12 +37,11 @@ export type ContinuousSchedule = * Unregistered continuous job definition. * Created by Continuous.make() - does not have a name yet. * Name is assigned when registered via createDurableJobs(). + * + * Note: The execute function must return Effect. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). */ -export interface UnregisteredContinuousDefinition< - S = unknown, - E = unknown, - R = never, -> { +export interface UnregisteredContinuousDefinition { readonly _tag: "ContinuousDefinition"; /** Schema for state - encoded type can be anything (typically same as S for simple schemas) */ readonly stateSchema: Schema.Schema; @@ -66,19 +65,35 @@ export interface UnregisteredContinuousDefinition< * @default false (LogLevel.Error - failures only) */ readonly logging?: LoggingOption; - /** Function to execute on schedule */ - execute(ctx: ContinuousContext): Effect.Effect; + /** + * Function to execute on schedule. + * + * Must return Effect - all service requirements must be satisfied. + * Use .pipe(Effect.provide(layer)) to provide custom services. + * + * @example + * ```ts + * execute: (ctx) => + * Effect.gen(function* () { + * const random = yield* Random; + * // ... + * }).pipe(Effect.provide(RandomLive)) + * ``` + */ + execute(ctx: ContinuousContext): Effect.Effect; } /** * Unregistered debounce job definition. * Created by Debounce.make() - does not have a name yet. + * + * Note: All handler functions must return Effect with R = never. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). */ export interface UnregisteredDebounceDefinition< I = unknown, S = unknown, E = unknown, - R = never, > { readonly _tag: "DebounceDefinition"; readonly eventSchema: Schema.Schema; @@ -104,8 +119,16 @@ export interface UnregisteredDebounceDefinition< * @default false (LogLevel.Error - failures only) */ readonly logging?: LoggingOption; - execute(ctx: DebounceExecuteContext): Effect.Effect; - onEvent?(ctx: DebounceEventContext): Effect.Effect; + /** + * Execute handler called on flush. + * Must return Effect - all service requirements must be satisfied. + */ + execute(ctx: DebounceExecuteContext): Effect.Effect; + /** + * Optional event handler to transform state on each event. + * Must return Effect - all service requirements must be satisfied. + */ + onEvent?(ctx: DebounceEventContext): Effect.Effect; } /** @@ -121,19 +144,18 @@ export interface WorkerPoolRetryConfig { /** * Unregistered workerPool job definition. * Created by WorkerPool.make() - does not have a name yet. + * + * Note: All handler functions must return Effect with R = never. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). */ -export interface UnregisteredWorkerPoolDefinition< - E = unknown, - Err = unknown, - R = never, -> { +export interface UnregisteredWorkerPoolDefinition { readonly _tag: "WorkerPoolDefinition"; readonly eventSchema: Schema.Schema; readonly concurrency: number; readonly retry?: WorkerPoolRetryConfig; - execute(ctx: WorkerPoolExecuteContext): Effect.Effect; - onDeadLetter?(event: E, error: Err, ctx: WorkerPoolDeadLetterContext): Effect.Effect; - onEmpty?(ctx: WorkerPoolEmptyContext): Effect.Effect; + execute(ctx: WorkerPoolExecuteContext): Effect.Effect; + onDeadLetter?(event: E, error: Err, ctx: WorkerPoolDeadLetterContext): Effect.Effect; + onEmpty?(ctx: WorkerPoolEmptyContext): Effect.Effect; } /** @@ -149,13 +171,14 @@ export interface UnregisteredWorkerPoolDefinition< * - S: State schema type (decoded) * - E: Event schema type (decoded) * - Err: Error type from handlers - * - R: Effect requirements (context) + * + * Note: All handler functions must return Effect with R = never. + * If your effect requires services, provide them via .pipe(Effect.provide(layer)). */ export interface UnregisteredTaskDefinition< S = unknown, E = unknown, Err = unknown, - R = never, > { readonly _tag: "TaskDefinition"; /** Schema for validating and serializing state */ @@ -166,29 +189,33 @@ export interface UnregisteredTaskDefinition< /** * Handler called for each incoming event. * Updates state and optionally schedules execution. + * Must return Effect - all service requirements must be satisfied. * * @param event - The incoming event (already validated against eventSchema) * @param ctx - Context for state access, scheduling, and metadata */ - onEvent(event: E, ctx: TaskEventContext): Effect.Effect; + onEvent(event: E, ctx: TaskEventContext): Effect.Effect; /** * Handler called when alarm fires. * Processes state and optionally schedules next execution. + * Must return Effect - all service requirements must be satisfied. */ - execute(ctx: TaskExecuteContext): Effect.Effect; + execute(ctx: TaskExecuteContext): Effect.Effect; /** * Optional handler called when either `onEvent` or `execute` completes * and no alarm is scheduled. + * Must return Effect - all service requirements must be satisfied. */ - onIdle?(ctx: TaskIdleContext): Effect.Effect; + onIdle?(ctx: TaskIdleContext): Effect.Effect; /** * Optional error handler for onEvent/execute failures. * If not provided, errors are logged and task continues. + * Must return Effect - all service requirements must be satisfied. */ - onError?(error: Err, ctx: TaskErrorContext): Effect.Effect; + onError?(error: Err, ctx: TaskErrorContext): Effect.Effect; /** * Configure logging level for this job. @@ -201,14 +228,14 @@ export interface UnregisteredTaskDefinition< /** * Union of all unregistered job definition types. * - * Note: Error types use `unknown` to accept definitions with any error type. - * The stored types (below) handle the runtime representation with unknown errors. + * Note: Uses `unknown` for all type parameters to accept any definition. + * Specific types are preserved through TypedJobRegistry's __definitions property. */ export type AnyUnregisteredDefinition = - | UnregisteredContinuousDefinition - | UnregisteredDebounceDefinition - | UnregisteredWorkerPoolDefinition - | UnregisteredTaskDefinition; + | UnregisteredContinuousDefinition + | UnregisteredDebounceDefinition + | UnregisteredWorkerPoolDefinition + | UnregisteredTaskDefinition; // ============================================================================= // Stored Definition Types (error type widened to unknown for registry storage) @@ -228,7 +255,7 @@ export interface StoredJobRetryConfig { /** * Stored continuous job definition (error type widened to unknown). */ -export interface StoredContinuousDefinition { +export interface StoredContinuousDefinition { readonly _tag: "ContinuousDefinition"; readonly name: string; readonly stateSchema: Schema.Schema; @@ -236,13 +263,13 @@ export interface StoredContinuousDefinition { readonly startImmediately?: boolean; readonly retry?: StoredJobRetryConfig; readonly logging?: LoggingOption; - execute(ctx: ContinuousContext): Effect.Effect; + execute(ctx: ContinuousContext): Effect.Effect; } /** * Stored debounce job definition (error type widened to unknown). */ -export interface StoredDebounceDefinition { +export interface StoredDebounceDefinition { readonly _tag: "DebounceDefinition"; readonly name: string; readonly eventSchema: Schema.Schema; @@ -251,37 +278,37 @@ export interface StoredDebounceDefinition { readonly maxEvents?: number; readonly retry?: StoredJobRetryConfig; readonly logging?: LoggingOption; - execute(ctx: DebounceExecuteContext): Effect.Effect; - onEvent?(ctx: DebounceEventContext): Effect.Effect; + execute(ctx: DebounceExecuteContext): Effect.Effect; + onEvent?(ctx: DebounceEventContext): Effect.Effect; } /** * Stored workerPool job definition (error type widened to unknown). */ -export interface StoredWorkerPoolDefinition { +export interface StoredWorkerPoolDefinition { readonly _tag: "WorkerPoolDefinition"; readonly name: string; readonly eventSchema: Schema.Schema; readonly concurrency: number; readonly retry?: WorkerPoolRetryConfig; - execute(ctx: WorkerPoolExecuteContext): Effect.Effect; - onDeadLetter?(event: E, error: unknown, ctx: WorkerPoolDeadLetterContext): Effect.Effect; - onEmpty?(ctx: WorkerPoolEmptyContext): Effect.Effect; + execute(ctx: WorkerPoolExecuteContext): Effect.Effect; + onDeadLetter?(event: E, error: unknown, ctx: WorkerPoolDeadLetterContext): Effect.Effect; + onEmpty?(ctx: WorkerPoolEmptyContext): Effect.Effect; } /** * Stored task job definition (error type widened to unknown). */ -export interface StoredTaskDefinition { +export interface StoredTaskDefinition { readonly _tag: "TaskDefinition"; readonly name: string; readonly stateSchema: Schema.Schema; readonly eventSchema: Schema.Schema; readonly logging?: LoggingOption; - onEvent(event: E, ctx: TaskEventContext): Effect.Effect; - execute(ctx: TaskExecuteContext): Effect.Effect; - onIdle?(ctx: TaskIdleContext): Effect.Effect; - onError?(error: unknown, ctx: TaskErrorContext): Effect.Effect; + onEvent(event: E, ctx: TaskEventContext): Effect.Effect; + execute(ctx: TaskExecuteContext): Effect.Effect; + onIdle?(ctx: TaskIdleContext): Effect.Effect; + onError?(error: unknown, ctx: TaskErrorContext): Effect.Effect; } // ============================================================================= @@ -294,8 +321,7 @@ export interface StoredTaskDefinition { export interface ContinuousDefinition< S = unknown, E = unknown, - R = never, -> extends UnregisteredContinuousDefinition { +> extends UnregisteredContinuousDefinition { readonly name: string; } @@ -306,8 +332,7 @@ export interface DebounceDefinition< I = unknown, S = unknown, E = unknown, - R = never, -> extends UnregisteredDebounceDefinition { +> extends UnregisteredDebounceDefinition { readonly name: string; } @@ -317,8 +342,7 @@ export interface DebounceDefinition< export interface WorkerPoolDefinition< E = unknown, Err = unknown, - R = never, -> extends UnregisteredWorkerPoolDefinition { +> extends UnregisteredWorkerPoolDefinition { readonly name: string; } @@ -329,8 +353,7 @@ export interface TaskDefinition< S = unknown, E = unknown, Err = unknown, - R = never, -> extends UnregisteredTaskDefinition { +> extends UnregisteredTaskDefinition { readonly name: string; } @@ -338,10 +361,10 @@ export interface TaskDefinition< * Union of all registered job definition types. */ export type AnyJobDefinition = - | ContinuousDefinition - | DebounceDefinition - | WorkerPoolDefinition - | TaskDefinition; + | ContinuousDefinition + | DebounceDefinition + | WorkerPoolDefinition + | TaskDefinition; // ============================================================================= // Context Types (provided to user functions) @@ -642,9 +665,9 @@ export interface TaskErrorContext { * Organized by job type for efficient lookup. */ export interface JobRegistry { - readonly continuous: Map>; - readonly debounce: Map>; - readonly workerPool: Map>; + readonly continuous: Map>; + readonly debounce: Map>; + readonly workerPool: Map>; } /** @@ -652,24 +675,24 @@ export interface JobRegistry { */ export type InferRegistry> = { continuous: { - [K in keyof T as T[K] extends ContinuousDefinition + [K in keyof T as T[K] extends ContinuousDefinition ? K - : never]: T[K] extends ContinuousDefinition - ? ContinuousDefinition + : never]: T[K] extends ContinuousDefinition + ? ContinuousDefinition : never; }; debounce: { - [K in keyof T as T[K] extends DebounceDefinition + [K in keyof T as T[K] extends DebounceDefinition ? K - : never]: T[K] extends DebounceDefinition - ? DebounceDefinition + : never]: T[K] extends DebounceDefinition + ? DebounceDefinition : never; }; workerPool: { - [K in keyof T as T[K] extends WorkerPoolDefinition + [K in keyof T as T[K] extends WorkerPoolDefinition ? K - : never]: T[K] extends WorkerPoolDefinition - ? WorkerPoolDefinition + : never]: T[K] extends WorkerPoolDefinition + ? WorkerPoolDefinition : never; }; }; diff --git a/reports/069-schema-invariance-type-error.md b/reports/069-schema-invariance-type-error.md new file mode 100644 index 0000000..7860d67 --- /dev/null +++ b/reports/069-schema-invariance-type-error.md @@ -0,0 +1,116 @@ +# Report 069: Schema Invariance Type Error + +## Summary + +After the R = never type system changes (reports 067-068), the jobs package fails to compile when used in applications. The root cause is Effect's Schema type using **invariant** variance for its encoded type parameter. + +## Error Examples + +``` +Type 'Struct<{ actionId: typeof String$; timestamp: typeof Number$; ... }>' +is not assignable to type 'Schema<{ readonly actionId: string; ... }, unknown, never>'. + The types of '[TypeId]._I' are incompatible between these types. + Type 'Invariant<{ readonly actionId: string; ... }>' is not assignable to type 'Invariant'. +``` + +## Root Cause + +Effect's Schema interface is defined as: +```typescript +export interface Schema +``` + +The `in out` modifier on both `A` and `I` makes them **invariant**: +- `A` (decoded type) - invariant +- `I` (encoded type) - invariant +- `R` (requirements) - covariant (out only) + +In the recent type changes, the definition interfaces were updated to: +```typescript +export interface UnregisteredContinuousDefinition { + readonly stateSchema: Schema.Schema; // <-- Problem here + // ... +} +``` + +When a user passes `Schema.Struct({...})`, the schema's encoded type is the struct's actual type, NOT `unknown`. For example: +- `Schema.Struct({ name: Schema.String })` has type `Schema<{ name: string }, { name: string }, never>` +- But the definition expects `Schema<{ name: string }, unknown, never>` + +Since Schema's `I` parameter is invariant, `{ name: string }` is NOT assignable to `unknown` (and vice versa). + +## Affected Files + +- `packages/jobs/src/registry/types.ts` - All definition interfaces with `stateSchema`, `eventSchema` +- `packages/jobs/src/definitions/*.ts` - Factory config interfaces + +## Affected Types + +1. `UnregisteredContinuousDefinition` - `stateSchema: Schema.Schema` +2. `UnregisteredDebounceDefinition` - `eventSchema`, `stateSchema` +3. `UnregisteredWorkerPoolDefinition` - `eventSchema` +4. `UnregisteredTaskDefinition` - `stateSchema`, `eventSchema` +5. Stored definition types +6. Factory config interfaces + +## Secondary Issue: AnyUnregisteredDefinition + +The `AnyUnregisteredDefinition` type also fails: +```typescript +export type AnyUnregisteredDefinition = + | UnregisteredContinuousDefinition + | UnregisteredDebounceDefinition + | ... +``` + +When registering jobs: +```typescript +const jobs = { heartbeat, debounceExample }; // Type error! +``` + +The specific definition types are not assignable to `AnyUnregisteredDefinition` because the schema's invariant type parameter prevents widening. + +## Solution + +Change the encoded type parameter from `unknown` to `any`: + +```typescript +// Before (broken) +readonly stateSchema: Schema.Schema; + +// After (working) +readonly stateSchema: Schema.Schema; +``` + +Using `any` for the encoded type works because: +1. `any` is bivariant - assignable to and from any type +2. We don't actually care about the encoded type at runtime +3. The decoded type `S` is still preserved for type safety + +## Files to Update + +1. `packages/jobs/src/registry/types.ts`: + - Update all schema type parameters from `unknown` to `any` + - Approximately 8 occurrences + +2. `packages/jobs/src/definitions/continuous.ts` +3. `packages/jobs/src/definitions/debounce.ts` +4. `packages/jobs/src/definitions/task.ts` + - Update `stateSchema` and `eventSchema` type parameters + +## Verification + +After fixing, the example app should build without errors: +```bash +cd examples/effect-worker-v2 && pnpm build +``` + +All existing tests should continue to pass. + +## Key Insight + +Effect's Schema type is more strict than Effect's Effect type: +- `Effect` has R as covariant, allowing `never` to work +- `Schema` has I as invariant, requiring exact type matches + +This explains why the R = never changes worked for Effect but broke Schema types. diff --git a/reports/070-effect-provide-service-misuse.md b/reports/070-effect-provide-service-misuse.md new file mode 100644 index 0000000..3a123bb --- /dev/null +++ b/reports/070-effect-provide-service-misuse.md @@ -0,0 +1,156 @@ +# Report 070: Effect.provideService Misuse in Job Handler + +## Summary + +The example in `basic-debounce.ts` incorrectly uses `Effect.provideService` to create what it assumes is a layer, but `Effect.provideService` is a **partial application function**, not a layer constructor. This causes the job handler to fail with `R = Random` instead of `R = never`. + +## Error Messages + +``` +src/jobs/basic-debounce.ts(62,5): error TS2322: Type 'Effect' +is not assignable to type 'Effect'. + Type 'Random' is not assignable to type 'never'. + +src/jobs/basic-debounce.ts(76,28): error TS2769: No overload matches this call. + The last overload gave the following error. + Argument of type '(self: Effect) => Effect>' + is not assignable to parameter of type 'ManagedRuntime'. +``` + +## Root Cause + +In `basic-debounce.ts`: + +```typescript +// Line 9-11: INCORRECT usage +const RandomLive = Effect.provideService(Random, { + next: Effect.sync(() => Math.random()), +}); +``` + +`Effect.provideService(tag, service)` returns a **function** of type: +```typescript +(self: Effect) => Effect> +``` + +It does NOT return a Layer or Context. It's a curried/partial application meant to be used directly in a pipe: +```typescript +effect.pipe(Effect.provideService(Random, impl)) +``` + +However, `Effect.provide()` expects one of: +- `Layer` +- `Context` +- `Runtime` +- `ManagedRuntime` + +When `RandomLive` (a function) is passed to `Effect.provide()`, TypeScript cannot find a matching overload, and the service requirement `Random` is never satisfied. + +## The Broken Pattern + +```typescript +// This creates a function, not a layer +const RandomLive = Effect.provideService(Random, impl); + +// This fails - RandomLive is not a Layer/Context/Runtime +effect.pipe(Effect.provide(RandomLive)) +``` + +## Correct Patterns + +### Option 1: Use Layer.succeed + +```typescript +import { Layer } from "effect"; + +const RandomLive = Layer.succeed(Random, { + next: Effect.sync(() => Math.random()), +}); + +// Now Effect.provide works correctly +effect.pipe(Effect.provide(RandomLive)) +``` + +### Option 2: Use Effect.provideService directly in pipe + +```typescript +// Don't store it - use directly +effect.pipe( + Effect.provideService(Random, { + next: Effect.sync(() => Math.random()), + }) +) +``` + +### Option 3: Use Layer.effect for effectful construction + +```typescript +const RandomLive = Layer.effect( + Random, + Effect.succeed({ + next: Effect.sync(() => Math.random()), + }) +); +``` + +## Documentation Issue + +The docstring in `packages/jobs/src/definitions/debounce.ts` (lines 92-96) shows: + +```typescript +* @example +* ```ts +* execute: (ctx) => +* Effect.gen(function* () { +* const random = yield* Random; +* // ... +* }).pipe(Effect.provide(RandomLive)) +* ``` +``` + +This example assumes `RandomLive` is a Layer, but doesn't show how to create it correctly. The example in `basic-debounce.ts` attempted to follow this pattern but used the wrong API. + +## Affected Files + +- `examples/effect-worker-v2/src/jobs/basic-debounce.ts` - Broken example +- `packages/jobs/src/definitions/debounce.ts` - Documentation could be clearer +- `packages/jobs/src/definitions/continuous.ts` - Same documentation pattern +- `packages/jobs/src/definitions/task.ts` - Same documentation pattern + +## Fix Required + +Change `basic-debounce.ts` from: + +```typescript +const RandomLive = Effect.provideService(Random, { + next: Effect.sync(() => Math.random()), +}); +``` + +To: + +```typescript +import { Layer } from "effect"; + +const RandomLive = Layer.succeed(Random, { + next: Effect.sync(() => Math.random()), +}); +``` + +## Key Insight + +Effect-TS has two similar-sounding but different APIs: + +| API | Returns | Usage | +|-----|---------|-------| +| `Effect.provideService(tag, impl)` | `(effect) => effect` (function) | Use directly in `.pipe()` | +| `Layer.succeed(tag, impl)` | `Layer` | Pass to `Effect.provide()` | + +The naming similarity can cause confusion. `Effect.provideService` is essentially a convenience wrapper for `Effect.provide(Context.make(tag, impl))` but as a pipeable function. + +## Verification + +After fixing, the example should build: +```bash +cd examples/effect-worker-v2 && pnpm build +```