Skip to content

Latest commit

 

History

History

README.md

Stem Logo

stem_builder

pub package Dart License Buy Me A Coffee

Build-time code generator for annotated Stem workflows and tasks.

Install

dart pub add stem_builder

Add the core runtime if you haven't already:

dart pub add stem

Usage

Annotate workflows and tasks:

import 'package:stem/stem.dart';

part 'workflows.stem.g.dart';

@WorkflowDefn(name: 'hello.flow')
class HelloFlow {
  @WorkflowStep()
  Future<void> greet(String email) async {
    // ...
  }
}

@WorkflowDefn(name: 'hello.script', kind: WorkflowKind.script)
class HelloScript {
  Future<void> run(String email) async {
    await sendEmail(email);
  }

  @WorkflowStep()
  Future<void> sendEmail(String email) async {
    // builder routes this through durable script.step(...)
  }
}

@TaskDefn(name: 'hello.task')
Future<void> helloTask(
  String email,
  {TaskExecutionContext? context}
) async {
  // ...
}

Script workflows can use a plain run(...) method with no extra annotation. When you need runtime metadata, add an optional named WorkflowScriptContext? context parameter. The direct annotated checkpoint call still stays the default path.

The intended usage is to call annotated checkpoint methods directly from run(...):

Future<Map<String, Object?>> run(String email) async {
  final user = await createUser(email);
  await sendWelcomeEmail(email);
  await sendOneWeekCheckInEmail(email);
  return {'userId': user['id'], 'status': 'done'};
}

stem_builder generates a proxy subclass that rewrites those calls into durable script.step(...) executions. The source method bodies stay readable, while the generated part handles the workflow runtime plumbing.

Conceptually:

  • Flow: declared steps are the execution plan
  • script workflows: run(...) is the execution plan, and declared checkpoints are metadata for manifests/tooling

Script workflows use one entry model:

  • start with a plain direct-call run(String email, ...)
  • add an optional named injected context when you need runtime metadata
    • Future<T> run(String email, {WorkflowScriptContext? context})
    • Future<T> checkpoint(String email, {WorkflowExecutionContext? context})
  • direct annotated checkpoint calls stay the default path

Supported context injection points:

  • flow steps: FlowContext or WorkflowExecutionContext
  • script runs: WorkflowScriptContext
  • script checkpoints: WorkflowScriptStepContext or WorkflowExecutionContext
  • tasks: TaskExecutionContext

Durable workflow execution contexts enqueue tasks directly:

  • WorkflowExecutionContext.enqueue(...)
  • typed task definitions can target those contexts via enqueue(...)

Child workflows should be started from durable boundaries:

  • ref.start(context, params: value) inside flow steps
  • ref.startAndWait(context, params: value) inside script checkpoints
  • pass ttl:, parentRunId:, or cancellationPolicy: directly to ref.start(...) / ref.startAndWait(...) for the normal override cases
  • build an explicit transport request with ref.buildStart(...) only for the rarer low-level cases where you need to pass a WorkflowStartCall around

Avoid starting child workflows directly from the raw WorkflowScriptContext body unless you are explicitly handling replay semantics yourself.

Serializable parameter rules are enforced by the generator:

  • supported:
    • String, bool, int, double, num, Object?, null
    • List<T> where T is serializable
    • Map<String, T> where T is serializable
  • supported DTOs:
    • Dart classes with toJson() plus a named fromJson(...) constructor taking Map<String, Object?>
  • unsupported directly:
    • optional/named business parameters on generated workflow/task entrypoints

Typed task results can use the same DTO convention.

Workflow inputs, checkpoint values, and final workflow results can use the same DTO convention. The generated PayloadCodec persists the JSON form while workflow code continues to work with typed objects.

The intended DX is:

  • define annotated workflows and tasks in one file
  • add part '<file>.stem.g.dart';
  • run build_runner
  • pass generated stemModule into StemWorkflowApp or StemClient
  • start workflows through generated workflow refs instead of raw workflow-name strings
  • enqueue annotated tasks through generated task definitions instead of raw task-name strings

You can customize generated workflow ref names via @WorkflowDefn:

@WorkflowDefn(
  name: 'billing.daily_sync',
  starterName: 'DailyBilling',
  nameField: 'dailyBilling',
  kind: WorkflowKind.script,
)
class BillingWorkflow {
  Future<void> run(String tenant) async {}
}

Run build_runner to generate *.stem.g.dart part files:

dart run build_runner build

The generated part exports a bundle plus typed refs/definitions so you can avoid raw workflow-name and task-name strings (for example StemWorkflowDefinitions.userSignup.start( workflowApp, params: 'user@example.com', ) or StemTaskDefinitions.builderExamplePing.enqueue(stem)).

Generated output includes:

  • stemModule
  • StemWorkflowDefinitions
  • StemTaskDefinitions
  • typed TaskDefinition objects whose advanced explicit transport path uses TaskCall, alongside direct enqueue(...) / enqueueAndWait(...)

Generated task definitions are producer-safe. Stem.enqueueCall(...) can use the definition metadata directly, so a producer can publish typed task calls without registering the worker handler locally first.

Wiring Into StemWorkflowApp

For the common case, pass the generated bundle directly to StemWorkflowApp:

final workflowApp = await StemWorkflowApp.fromUrl(
  'redis://localhost:6379',
  module: stemModule,
);

final result = await StemWorkflowDefinitions.userSignup.startAndWait(
  workflowApp,
  params: 'user@example.com',
);

When you use module: stemModule, the workflow app infers the worker subscription from the workflow queue plus the default queues declared on the bundled task handlers. Override workerConfig.subscription only when your routing sends work to additional queues.

If your application already owns a StemApp, reuse it:

final stemApp = await StemApp.fromUrl(
  'redis://localhost:6379',
  adapters: const [StemRedisAdapter()],
  module: stemModule,
  workerConfig: StemWorkerConfig(
    queue: 'workflow',
    subscription: RoutingSubscription(
      queues: ['workflow', 'default'],
    ),
  ),
);

final workflowApp = await stemApp.createWorkflowApp();

That shared-app path only works when the existing StemApp worker already subscribes to the workflow queue plus any task queues the workflows need. If you want subscription inference, prefer StemClient.createWorkflowApp().

For task-only services, use the same bundle directly with StemApp:

final taskApp = await StemApp.fromUrl(
  'redis://localhost:6379',
  adapters: const [StemRedisAdapter()],
  module: stemModule,
);

Plain StemApp bootstrap also infers task queue subscriptions from the bundled or explicitly supplied task handlers when workerConfig.subscription is omitted, and it lazy-starts on the first enqueue or wait call.

If you already centralize wiring in a StemClient, prefer the shared-client path:

final client = await StemClient.fromUrl(
  'redis://localhost:6379',
  adapters: const [StemRedisAdapter()],
  module: stemModule,
);

final workflowApp = await client.createWorkflowApp();

If you reuse an existing StemApp, its worker subscription stays in charge. Workflow-side queue inference only applies when StemWorkflowApp is creating the worker for you.

When you are intentionally using the low-level WorkflowRuntime, the generated workflow refs work there too:

final runtime = workflowApp.runtime;
final runId = await StemWorkflowDefinitions.userSignup.start(
  runtime,
  params: 'user@example.com',
);
await workflowApp.executeRun(runId);

Annotated tasks also get generated definitions:

final taskId = await StemTaskDefinitions.builderExampleTask.enqueue(
  workflowApp,
  const {'kind': 'welcome'},
);

Examples

See example/README.md for runnable examples, including:

  • Generated registration + execution with StemWorkflowApp
  • Runtime manifest + run detail views with WorkflowRuntime
  • Plain direct-call script checkpoints and context-aware script checkpoints
  • Typed @TaskDefn parameters with TaskExecutionContext