Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion examples/effect-worker-v2/src/jobs/basic-debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ class Random extends Context.Tag("MyRandomService")<
{ readonly next: Effect.Effect<number> }
>() {}

import { Layer } from "effect";

const RandomLive = Layer.succeed(Random, {
next: Effect.sync(() => Math.random()),
});

// =============================================================================
// Debounce Job - Batches events and flushes after delay
// =============================================================================
Expand Down Expand Up @@ -69,5 +75,5 @@ export const debounceExample = Debounce.make({
yield* Effect.log(
`Debounce flushed! Events: ${eventCount}, Last action: ${state?.actionId}, Reason: ${ctx.flushReason}`,
);
}),
}).pipe(Effect.provide(RandomLive)),
});
2 changes: 1 addition & 1 deletion packages/jobs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@durable-effect/jobs",
"version": "0.0.1-next.5",
"version": "0.0.1-next.6",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
22 changes: 17 additions & 5 deletions packages/jobs/src/definitions/continuous.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type { JobRetryConfig } from "../retry/types";
/**
* Input config for creating a continuous job definition.
*/
export interface ContinuousMakeConfig<S, E, R> {
export interface ContinuousMakeConfig<S, E> {
/**
* Schema for validating and serializing state.
* Accepts any Effect Schema (Struct, Class, etc.)
Expand Down Expand Up @@ -73,8 +73,20 @@ export interface ContinuousMakeConfig<S, E, R> {

/**
* The function to execute on schedule.
*
* Must return Effect<void, E, never> - all service requirements must be satisfied.
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
*
* @example
* ```ts
* execute: (ctx) =>
* Effect.gen(function* () {
* const random = yield* Random;
* // ...
* }).pipe(Effect.provide(RandomLive))
* ```
*/
execute(ctx: ContinuousContext<S>): Effect.Effect<void, E, R>;
execute(ctx: ContinuousContext<S>): Effect.Effect<void, E, never>;
}

/**
Expand Down Expand Up @@ -116,9 +128,9 @@ export const Continuous = {
* @param config - Configuration for the job
* @returns An UnregisteredContinuousDefinition that can be registered
*/
make: <S, E = never, R = never>(
config: ContinuousMakeConfig<S, E, R>
): UnregisteredContinuousDefinition<S, E, R> => ({
make: <S, E = never>(
config: ContinuousMakeConfig<S, E>
): UnregisteredContinuousDefinition<S, E> => ({
_tag: "ContinuousDefinition",
stateSchema: config.stateSchema,
schedule: config.schedule,
Expand Down
29 changes: 21 additions & 8 deletions packages/jobs/src/definitions/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import type { JobRetryConfig } from "../retry/types";

/**
* Configuration for creating a debounce job definition.
*
* Note: All handler functions must return Effect with R = never.
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
*/
export interface DebounceMakeConfig<I, S, E, R> {
export interface DebounceMakeConfig<I, S, E> {
/**
* Schema for validating incoming events.
*/
Expand Down Expand Up @@ -76,14 +79,24 @@ export interface DebounceMakeConfig<I, S, E, R> {

/**
* Reducer for each incoming event. Defaults to returning the latest event.
* Must return Effect<S, never, never> - all service requirements must be satisfied.
*/
onEvent?(ctx: DebounceEventContext<I, S>): Effect.Effect<S, never, R>;
onEvent?(ctx: DebounceEventContext<I, S>): Effect.Effect<S, never, never>;

/**
* Effect executed when the debounce flushes.
* Must return Effect<void, E, never> - all service requirements must be satisfied.
*
* @example
* ```ts
* execute: (ctx) =>
* Effect.gen(function* () {
* const random = yield* Random;
* // ...
* }).pipe(Effect.provide(RandomLive))
* ```
*/
execute(ctx: DebounceExecuteContext<S>): Effect.Effect<void, E, R>;

execute(ctx: DebounceExecuteContext<S>): Effect.Effect<void, E, never>;
}

/**
Expand All @@ -99,14 +112,14 @@ export interface DebounceMakeConfig<I, S, E, R> {
* ```
*/
export const Debounce = {
make: <I, S = I, E = never, R = never>(
config: DebounceMakeConfig<I, S, E, R>
): UnregisteredDebounceDefinition<I, S, E, R> => ({
make: <I, S = I, E = never>(
config: DebounceMakeConfig<I, S, E>
): UnregisteredDebounceDefinition<I, S, E> => ({
_tag: "DebounceDefinition",
eventSchema: config.eventSchema,
stateSchema: (config.stateSchema ?? config.eventSchema) as Schema.Schema<
S,
any,
unknown,
never
>,
flushAfter: config.flushAfter,
Expand Down
24 changes: 15 additions & 9 deletions packages/jobs/src/definitions/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import type {
* - Execute runs when alarm fires
* - User controls the full lifecycle via schedule/clear
*
* Note: All handler functions must return Effect with R = never.
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
*
* @typeParam S - State type (inferred from stateSchema)
* @typeParam E - Event type (inferred from eventSchema)
* @typeParam Err - Error type (inferred from handlers)
* @typeParam R - Effect requirements (inferred from handlers)
*/
export interface TaskMakeConfig<S, E, Err, R> {
export interface TaskMakeConfig<S, E, Err> {
/**
* Schema for validating and serializing state.
* State is persisted durably and survives restarts.
Expand All @@ -42,6 +44,7 @@ export interface TaskMakeConfig<S, E, Err, R> {

/**
* Handler called for each incoming event.
* Must return Effect<void, Err, never> - all service requirements must be satisfied.
*
* The event is passed as the first parameter (not on ctx) to make it
* clear that it's a direct value, not an Effect that needs yielding.
Expand Down Expand Up @@ -76,10 +79,11 @@ export interface TaskMakeConfig<S, E, Err, R> {
* })
* ```
*/
onEvent(event: E, ctx: TaskEventContext<S>): Effect.Effect<void, Err, R>;
onEvent(event: E, ctx: TaskEventContext<S>): Effect.Effect<void, Err, never>;

/**
* Handler called when the scheduled alarm fires.
* Must return Effect<void, Err, never> - all service requirements must be satisfied.
*
* Responsibilities:
* - Process the current state
Expand All @@ -100,11 +104,12 @@ export interface TaskMakeConfig<S, E, Err, R> {
* })
* ```
*/
execute(ctx: TaskExecuteContext<S>): Effect.Effect<void, Err, R>;
execute(ctx: TaskExecuteContext<S>): Effect.Effect<void, Err, never>;

/**
* Optional handler called when either `onEvent` or `execute` completes
* and no alarm is scheduled.
* Must return Effect<void, never, never> - all service requirements must be satisfied.
*
* Use this to:
* - Schedule delayed cleanup
Expand All @@ -119,10 +124,11 @@ export interface TaskMakeConfig<S, E, Err, R> {
* })
* ```
*/
readonly onIdle?: (ctx: TaskIdleContext<S>) => Effect.Effect<void, never, R>;
readonly onIdle?: (ctx: TaskIdleContext<S>) => Effect.Effect<void, never, never>;

/**
* Optional error handler for onEvent/execute failures.
* Must return Effect<void, never, never> - all service requirements must be satisfied.
*
* Use this to:
* - Log errors
Expand Down Expand Up @@ -151,7 +157,7 @@ export interface TaskMakeConfig<S, E, Err, R> {
readonly onError?: (
error: Err,
ctx: TaskErrorContext<S>
) => Effect.Effect<void, never, R>;
) => Effect.Effect<void, never, never>;

/**
* Control logging for this job.
Expand Down Expand Up @@ -269,9 +275,9 @@ export const Task = {
* @param config - Configuration for the task
* @returns An UnregisteredTaskDefinition that can be registered
*/
make: <S, E, Err = never, R = never>(
config: TaskMakeConfig<S, E, Err, R>
): UnregisteredTaskDefinition<S, E, Err, R> => ({
make: <S, E, Err = never>(
config: TaskMakeConfig<S, E, Err>
): UnregisteredTaskDefinition<S, E, Err> => ({
_tag: "TaskDefinition",
stateSchema: config.stateSchema,
eventSchema: config.eventSchema,
Expand Down
18 changes: 9 additions & 9 deletions packages/jobs/src/handlers/continuous/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const ContinuousHandlerLayer = Layer.effect(

const getDefinition = (
name: string,
): Effect.Effect<StoredContinuousDefinition<any, any>, JobNotFoundError> => {
): Effect.Effect<StoredContinuousDefinition, JobNotFoundError> => {
const def = registryService.registry.continuous[name];
if (!def) {
return Effect.fail(new JobNotFoundError({ type: "continuous", name }));
Expand Down Expand Up @@ -78,7 +78,7 @@ export const ContinuousHandlerLayer = Layer.effect(
});

const scheduleNext = (
def: StoredContinuousDefinition<any, any>,
def: StoredContinuousDefinition,
): Effect.Effect<void, SchedulerError | ExecutionError> => {
const schedule = def.schedule;
switch (schedule._tag) {
Expand All @@ -94,7 +94,7 @@ export const ContinuousHandlerLayer = Layer.effect(
};

const runExecution = (
def: StoredContinuousDefinition<any, any>,
def: StoredContinuousDefinition,
runCount: number,
id?: string,
) =>
Expand All @@ -107,7 +107,7 @@ export const ContinuousHandlerLayer = Layer.effect(
id,
run: (ctx: ContinuousContext<unknown>) => def.execute(ctx),
createContext: (base) => {
const proxyHolder: StateHolder<any> = {
const proxyHolder: StateHolder<unknown> = {
get current() {
return base.getState();
},
Expand All @@ -132,9 +132,9 @@ export const ContinuousHandlerLayer = Layer.effect(
});

const handleStart = (
def: StoredContinuousDefinition<any, any>,
def: StoredContinuousDefinition,
request: ContinuousRequest,
): Effect.Effect<ContinuousResponse, HandlerError, any> =>
): Effect.Effect<ContinuousResponse, HandlerError> =>
Effect.gen(function* () {
const existing = yield* metadata.get();
if (existing) {
Expand Down Expand Up @@ -251,8 +251,8 @@ export const ContinuousHandlerLayer = Layer.effect(
});

const handleTrigger = (
def: StoredContinuousDefinition<any, any>,
): Effect.Effect<ContinuousResponse, HandlerError, any> =>
def: StoredContinuousDefinition,
): Effect.Effect<ContinuousResponse, HandlerError> =>
Effect.gen(function* () {
const existing = yield* metadata.get();
if (
Expand Down Expand Up @@ -321,7 +321,7 @@ export const ContinuousHandlerLayer = Layer.effect(
});

const handleGetState = (
def: StoredContinuousDefinition<any, any>,
def: StoredContinuousDefinition,
): Effect.Effect<ContinuousResponse, HandlerError> =>
Effect.gen(function* () {
const stateService = yield* createEntityStateService(
Expand Down
21 changes: 10 additions & 11 deletions packages/jobs/src/handlers/debounce/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const DebounceHandlerLayer = Layer.effect(

const getDefinition = (
name: string,
): Effect.Effect<StoredDebounceDefinition<any, any, any>, JobNotFoundError> => {
): Effect.Effect<StoredDebounceDefinition, JobNotFoundError> => {
const def = registryService.registry.debounce[name];
if (!def) {
return Effect.fail(new JobNotFoundError({ type: "debounce", name }));
Expand Down Expand Up @@ -91,7 +91,7 @@ export const DebounceHandlerLayer = Layer.effect(
});

const runFlush = (
def: StoredDebounceDefinition<any, any, any>,
def: StoredDebounceDefinition,
flushReason: "maxEvents" | "flushAfter" | "manual",
id?: string,
) =>
Expand All @@ -102,7 +102,7 @@ export const DebounceHandlerLayer = Layer.effect(
retryConfig: def.retry,
runCount: 0, // Debounce doesn't track runCount persistently in same way
id,
run: (ctx: DebounceExecuteContext<any>) => def.execute(ctx),
run: (ctx: DebounceExecuteContext<unknown>) => def.execute(ctx),
createContext: (base) => {
return {
state: Effect.succeed(base.getState()), // Snapshotted state
Expand All @@ -116,14 +116,14 @@ export const DebounceHandlerLayer = Layer.effect(
flushReason,
attempt: base.attempt,
isRetry: base.isRetry,
} as DebounceExecuteContext<any>;
} as DebounceExecuteContext<unknown>;
},
});

const handleAdd = (
def: StoredDebounceDefinition<any, any, any>,
def: StoredDebounceDefinition,
request: DebounceRequest,
): Effect.Effect<DebounceResponse, HandlerError, any> =>
): Effect.Effect<DebounceResponse, HandlerError> =>
Effect.gen(function* () {
const meta = yield* metadata.get();
const created = !meta;
Expand Down Expand Up @@ -158,13 +158,12 @@ export const DebounceHandlerLayer = Layer.effect(
const stateForContext = currentState ?? (validatedEvent as unknown);

const onEvent = def.onEvent!;
// Cast is still needed unless we fix Definition generic constraints
const reducedState = yield* onEvent({
event: validatedEvent as unknown,
state: stateForContext,
eventCount: nextCount,
instanceId: runtime.instanceId,
} as any);
});

yield* stateService.set(reducedState);
yield* setEventCount(nextCount);
Expand Down Expand Up @@ -244,9 +243,9 @@ export const DebounceHandlerLayer = Layer.effect(
});

const handleFlush = (
def: StoredDebounceDefinition<any, any, any>,
def: StoredDebounceDefinition,
reason: "manual" | "flushAfter" | "maxEvents",
): Effect.Effect<DebounceResponse, HandlerError, any> =>
): Effect.Effect<DebounceResponse, HandlerError> =>
Effect.gen(function* () {
const meta = yield* metadata.get();
if (!meta) {
Expand Down Expand Up @@ -353,7 +352,7 @@ export const DebounceHandlerLayer = Layer.effect(
});

const handleGetState = (
def: StoredDebounceDefinition<any, any, any>,
def: StoredDebounceDefinition,
): Effect.Effect<DebounceResponse, HandlerError> =>
Effect.gen(function* () {
const stateService = yield* withStorage(
Expand Down
Loading