diff --git a/.gitignore b/.gitignore index e9a6ba35..c6470752 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ packages/cloud/ /test_screenshots/ third_party/ packages/stem/workflow.sqlite* +# Local serinus module (gitignored per user) +packages/stem_serinus/ diff --git a/packages/dashboard/lib/src/server.dart b/packages/dashboard/lib/src/server.dart index 3953f933..7a76eba8 100644 --- a/packages/dashboard/lib/src/server.dart +++ b/packages/dashboard/lib/src/server.dart @@ -1031,34 +1031,36 @@ List _applyTaskViewFilters( final namespaceFilter = options.namespaceFilter?.toLowerCase(); final taskFilter = options.taskFilter?.toLowerCase(); final runFilter = options.runId?.toLowerCase(); - return tasks.where((entry) { - if (options.hasFilter) { - final queue = entry.queue.toLowerCase(); - if (!(queueFilter != null && queue.contains(queueFilter))) { - return false; - } - } - if (options.hasNamespaceFilter && - entry.namespace.toLowerCase() != namespaceFilter) { - return false; - } - if (options.hasTaskFilter) { - final name = entry.taskName.toLowerCase(); - if (!(taskFilter != null && name.contains(taskFilter))) { - return false; - } - } - if (options.hasRunIdFilter) { - final runId = entry.runId?.toLowerCase() ?? ''; - if (!(runFilter != null && runId.contains(runFilter))) { - return false; - } - } - if (options.hasStateFilter && entry.state != options.stateFilter) { - return false; - } - return true; - }).toList(growable: false); + return tasks + .where((entry) { + if (options.hasFilter) { + final queue = entry.queue.toLowerCase(); + if (!(queueFilter != null && queue.contains(queueFilter))) { + return false; + } + } + if (options.hasNamespaceFilter && + entry.namespace.toLowerCase() != namespaceFilter) { + return false; + } + if (options.hasTaskFilter) { + final name = entry.taskName.toLowerCase(); + if (!(taskFilter != null && name.contains(taskFilter))) { + return false; + } + } + if (options.hasRunIdFilter) { + final runId = entry.runId?.toLowerCase() ?? ''; + if (!(runFilter != null && runId.contains(runFilter))) { + return false; + } + } + if (options.hasStateFilter && entry.state != options.stateFilter) { + return false; + } + return true; + }) + .toList(growable: false); } String _resolveDefaultNamespace( diff --git a/packages/dashboard/lib/src/services/stem_service.dart b/packages/dashboard/lib/src/services/stem_service.dart index 08d584cf..c40e01f0 100644 --- a/packages/dashboard/lib/src/services/stem_service.dart +++ b/packages/dashboard/lib/src/services/stem_service.dart @@ -38,8 +38,9 @@ abstract class DashboardDataSource { Future fetchWorkflowRun(String runId); /// Fetches persisted workflow checkpoints, if a workflow store is available. - Future> - fetchWorkflowCheckpoints(String runId); + Future> fetchWorkflowCheckpoints( + String runId, + ); /// Enqueues a task request through the backing broker. Future enqueueTask(EnqueueRequest request); diff --git a/packages/dashboard/lib/src/ui/failures.dart b/packages/dashboard/lib/src/ui/failures.dart index fc282f93..90ecc833 100644 --- a/packages/dashboard/lib/src/ui/failures.dart +++ b/packages/dashboard/lib/src/ui/failures.dart @@ -108,8 +108,7 @@ ${renderFailuresAlert(options)} options: DashboardTaskTableOptions( showState: false, emptyMessage: 'No individual failures to inspect.', - actionsBuilder: (task) => - buildTaskReplayAction(task, redirectPath: redirectPath), + actionsBuilder: (task) => buildTaskReplayAction(task, redirectPath: redirectPath), ), )} diff --git a/packages/dashboard/lib/src/ui/jobs.dart b/packages/dashboard/lib/src/ui/jobs.dart index 164e0704..3565be2c 100644 --- a/packages/dashboard/lib/src/ui/jobs.dart +++ b/packages/dashboard/lib/src/ui/jobs.dart @@ -13,17 +13,19 @@ String buildJobsContent({ final jobs = buildJobSummaries(taskStatuses, limit: 500); final taskFilter = options.task?.toLowerCase(); final queueFilter = options.queue?.toLowerCase(); - final filtered = jobs.where((entry) { - final matchesTask = - taskFilter == null || - taskFilter.isEmpty || - entry.taskName.toLowerCase().contains(taskFilter); - final matchesQueue = - queueFilter == null || - queueFilter.isEmpty || - entry.sampleQueue.toLowerCase().contains(queueFilter); - return matchesTask && matchesQueue; - }).toList(growable: false); + final filtered = jobs + .where((entry) { + final matchesTask = + taskFilter == null || + taskFilter.isEmpty || + entry.taskName.toLowerCase().contains(taskFilter); + final matchesQueue = + queueFilter == null || + queueFilter.isEmpty || + entry.sampleQueue.toLowerCase().contains(queueFilter); + return matchesTask && matchesQueue; + }) + .toList(growable: false); final total = filtered.fold(0, (sum, entry) => sum + entry.total); final running = filtered.fold(0, (sum, entry) => sum + entry.running); diff --git a/packages/dashboard/lib/src/ui/overview.dart b/packages/dashboard/lib/src/ui/overview.dart index 86cf62a2..c105ba6b 100644 --- a/packages/dashboard/lib/src/ui/overview.dart +++ b/packages/dashboard/lib/src/ui/overview.dart @@ -68,11 +68,9 @@ OverviewSections buildOverviewSections( List queues, List workers, DashboardThroughput? throughput, - List taskStatuses, - { - String defaultNamespace = 'stem', - } -) { + List taskStatuses, { + String defaultNamespace = 'stem', +}) { final totalPending = queues.fold( 0, (total, summary) => total + summary.pending, @@ -331,14 +329,14 @@ OverviewSections buildOverviewSections( '''
${buildTaskStatusTable( - taskStatuses.take(8).toList(growable: false), - options: const DashboardTaskTableOptions( - showAttempt: false, - showError: false, - showActions: false, - emptyMessage: 'No persisted task statuses yet.', - ), - )} + taskStatuses.take(8).toList(growable: false), + options: const DashboardTaskTableOptions( + showAttempt: false, + showError: false, + showActions: false, + emptyMessage: 'No persisted task statuses yet.', + ), + )}
'''; diff --git a/packages/dashboard/lib/src/ui/tasks.dart b/packages/dashboard/lib/src/ui/tasks.dart index 466abe70..bdfb31fe 100644 --- a/packages/dashboard/lib/src/ui/tasks.dart +++ b/packages/dashboard/lib/src/ui/tasks.dart @@ -171,8 +171,7 @@ ${renderTasksAlert(options)} options: DashboardTaskTableOptions( emptyMessage: 'No task statuses match the current filters.', expandableRows: true, - actionsBuilder: (task) => - buildTaskLifecycleActions(task, redirectPath: taskActionRedirect), + actionsBuilder: (task) => buildTaskLifecycleActions(task, redirectPath: taskActionRedirect), ), )} ${buildTaskPaginationBar(options, taskStatuses.length)} diff --git a/packages/dashboard/pubspec.yaml b/packages/dashboard/pubspec.yaml index 03421099..67552a92 100644 --- a/packages/dashboard/pubspec.yaml +++ b/packages/dashboard/pubspec.yaml @@ -12,7 +12,7 @@ dependencies: ormed: ^0.2.0 routed: ^0.3.2 routed_hotwire: ^0.1.2 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" stem_cli: ^0.1.0 stem_postgres: ^0.1.0 stem_redis: ^0.1.0 diff --git a/packages/stem/CHANGELOG.md b/packages/stem/CHANGELOG.md index cf282845..9f8db792 100644 --- a/packages/stem/CHANGELOG.md +++ b/packages/stem/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.2.1-wip + +- Guarded worker and example process signal registration so Windows only + installs supported shutdown watchers, avoiding unsupported `SIGTERM` / + `SIGQUIT` subscriptions during graceful shutdown setup. + ## 0.2.0 - Added `StemClient.fromStack(...)` and `StemStack.createClient(...)` so diff --git a/packages/stem/example/autoscaling_demo/bin/worker.dart b/packages/stem/example/autoscaling_demo/bin/worker.dart index defe5750..7410f19f 100644 --- a/packages/stem/example/autoscaling_demo/bin/worker.dart +++ b/packages/stem/example/autoscaling_demo/bin/worker.dart @@ -67,7 +67,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/daemonized_worker/bin/worker.dart b/packages/stem/example/daemonized_worker/bin/worker.dart index bed0dfac..8ab573cb 100644 --- a/packages/stem/example/daemonized_worker/bin/worker.dart +++ b/packages/stem/example/daemonized_worker/bin/worker.dart @@ -18,8 +18,10 @@ Future main(List args) async { } } - ProcessSignal.sigterm.watch().listen((_) => complete()); ProcessSignal.sigint.watch().listen((_) => complete()); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen((_) => complete()); + } // #endregion daemonized-worker-signal-handlers // Simulate background work. diff --git a/packages/stem/example/dlq_sandbox/bin/worker.dart b/packages/stem/example/dlq_sandbox/bin/worker.dart index 6378a5e4..667f1cfa 100644 --- a/packages/stem/example/dlq_sandbox/bin/worker.dart +++ b/packages/stem/example/dlq_sandbox/bin/worker.dart @@ -36,7 +36,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await worker.start(); } diff --git a/packages/stem/example/docs_snippets/lib/daemonization.dart b/packages/stem/example/docs_snippets/lib/daemonization.dart index 57ec61de..b190040c 100644 --- a/packages/stem/example/docs_snippets/lib/daemonization.dart +++ b/packages/stem/example/docs_snippets/lib/daemonization.dart @@ -20,8 +20,10 @@ Future main(List args) async { } } - ProcessSignal.sigterm.watch().listen((_) => shutdown()); ProcessSignal.sigint.watch().listen((_) => shutdown()); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen((_) => shutdown()); + } // #endregion daemonization-signal-handlers // #region daemonization-loop diff --git a/packages/stem/example/ecommerce/bin/server.dart b/packages/stem/example/ecommerce/bin/server.dart index 45fa5153..f7daa26c 100644 --- a/packages/stem/example/ecommerce/bin/server.dart +++ b/packages/stem/example/ecommerce/bin/server.dart @@ -39,5 +39,7 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } diff --git a/packages/stem/example/ecommerce/lib/src/database/orm_registry.g.dart b/packages/stem/example/ecommerce/lib/src/database/orm_registry.g.dart index 2296b040..0103746b 100644 --- a/packages/stem/example/ecommerce/lib/src/database/orm_registry.g.dart +++ b/packages/stem/example/ecommerce/lib/src/database/orm_registry.g.dart @@ -21,8 +21,7 @@ ModelRegistry buildOrmRegistry() => ModelRegistry() ..registerTypeAlias(_$ormModelDefinitions[1]) ..registerTypeAlias(_$ormModelDefinitions[2]) ..registerTypeAlias(_$ormModelDefinitions[3]) - ..registerTypeAlias(_$ormModelDefinitions[4]) - ; + ..registerTypeAlias(_$ormModelDefinitions[4]); List> get generatedOrmModelDefinitions => List.unmodifiable(_$ormModelDefinitions); @@ -41,8 +40,7 @@ extension GeneratedOrmModels on ModelRegistry { /// Registers factory definitions for all models that have factory support. /// Call this before using [Model.factory()] to ensure definitions are available. -void registerOrmFactories() { -} +void registerOrmFactories() {} /// Combined setup: registers both model registry and factories. /// Returns a ModelRegistry with all generated models registered. @@ -62,7 +60,14 @@ void registerModelScopes({ScopeRegistry? scopeRegistry}) { } /// Bootstraps generated ORM pieces: registry, factories, event handlers, and scopes. -ModelRegistry bootstrapOrm({ModelRegistry? registry, EventBus? bus, ScopeRegistry? scopes, bool registerFactories = true, bool registerEventHandlers = true, bool registerScopes = true}) { +ModelRegistry bootstrapOrm({ + ModelRegistry? registry, + EventBus? bus, + ScopeRegistry? scopes, + bool registerFactories = true, + bool registerEventHandlers = true, + bool registerScopes = true, +}) { final reg = registry ?? buildOrmRegistry(); if (registry != null) { reg.registerGeneratedModels(); diff --git a/packages/stem/example/email_service/bin/enqueuer.dart b/packages/stem/example/email_service/bin/enqueuer.dart index 5fa20e83..dfaebc2c 100644 --- a/packages/stem/example/email_service/bin/enqueuer.dart +++ b/packages/stem/example/email_service/bin/enqueuer.dart @@ -25,7 +25,7 @@ Future main(List args) async { name: 'email.send', entrypoint: _placeholderEntrypoint, options: const TaskOptions(queue: 'emails', maxRetries: 3), - ), + ), ]; final client = await StemClient.fromUrl( @@ -82,7 +82,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } FutureOr _placeholderEntrypoint( diff --git a/packages/stem/example/email_service/bin/worker.dart b/packages/stem/example/email_service/bin/worker.dart index 7772ff72..b2bf2464 100644 --- a/packages/stem/example/email_service/bin/worker.dart +++ b/packages/stem/example/email_service/bin/worker.dart @@ -71,7 +71,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } Future sendEmail( diff --git a/packages/stem/example/encrypted_payload/worker/bin/worker.dart b/packages/stem/example/encrypted_payload/worker/bin/worker.dart index fc91f5ec..8c25c1b2 100644 --- a/packages/stem/example/encrypted_payload/worker/bin/worker.dart +++ b/packages/stem/example/encrypted_payload/worker/bin/worker.dart @@ -58,7 +58,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/image_processor/bin/api.dart b/packages/stem/example/image_processor/bin/api.dart index ab0afb08..e2fdcd60 100644 --- a/packages/stem/example/image_processor/bin/api.dart +++ b/packages/stem/example/image_processor/bin/api.dart @@ -73,7 +73,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } FutureOr _placeholderEntrypoint( diff --git a/packages/stem/example/image_processor/bin/worker.dart b/packages/stem/example/image_processor/bin/worker.dart index 67bb5fa9..486acb7e 100644 --- a/packages/stem/example/image_processor/bin/worker.dart +++ b/packages/stem/example/image_processor/bin/worker.dart @@ -56,7 +56,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } Future generateThumbnail( diff --git a/packages/stem/example/microservice/beat/bin/beat.dart b/packages/stem/example/microservice/beat/bin/beat.dart index 8a8195a0..f70776ac 100644 --- a/packages/stem/example/microservice/beat/bin/beat.dart +++ b/packages/stem/example/microservice/beat/bin/beat.dart @@ -59,7 +59,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/microservice/enqueuer/bin/main.dart b/packages/stem/example/microservice/enqueuer/bin/main.dart index 5f3984df..93d769fb 100644 --- a/packages/stem/example/microservice/enqueuer/bin/main.dart +++ b/packages/stem/example/microservice/enqueuer/bin/main.dart @@ -260,7 +260,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } } FutureOr _placeholderEntrypoint( diff --git a/packages/stem/example/mixed_cluster/postgres_worker/bin/worker.dart b/packages/stem/example/mixed_cluster/postgres_worker/bin/worker.dart index 789e4d9e..4702fb23 100644 --- a/packages/stem/example/mixed_cluster/postgres_worker/bin/worker.dart +++ b/packages/stem/example/mixed_cluster/postgres_worker/bin/worker.dart @@ -57,7 +57,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/mixed_cluster/redis_worker/bin/worker.dart b/packages/stem/example/mixed_cluster/redis_worker/bin/worker.dart index 33078c2d..4840a624 100644 --- a/packages/stem/example/mixed_cluster/redis_worker/bin/worker.dart +++ b/packages/stem/example/mixed_cluster/redis_worker/bin/worker.dart @@ -54,7 +54,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/monolith_service/bin/service.dart b/packages/stem/example/monolith_service/bin/service.dart index 9d9698d4..2a4b4f77 100644 --- a/packages/stem/example/monolith_service/bin/service.dart +++ b/packages/stem/example/monolith_service/bin/service.dart @@ -148,7 +148,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(handleShutdown); - ProcessSignal.sigterm.watch().listen(handleShutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(handleShutdown); + } } FutureOr _greetingEntrypoint( diff --git a/packages/stem/example/ops_health_suite/bin/worker.dart b/packages/stem/example/ops_health_suite/bin/worker.dart index bde7ed77..34063d63 100644 --- a/packages/stem/example/ops_health_suite/bin/worker.dart +++ b/packages/stem/example/ops_health_suite/bin/worker.dart @@ -38,7 +38,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/postgres_tls/bin/worker.dart b/packages/stem/example/postgres_tls/bin/worker.dart index f59803fd..9ad3ca69 100644 --- a/packages/stem/example/postgres_tls/bin/worker.dart +++ b/packages/stem/example/postgres_tls/bin/worker.dart @@ -62,7 +62,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/postgres_worker/worker/bin/worker.dart b/packages/stem/example/postgres_worker/worker/bin/worker.dart index 4d341fd9..1c50b793 100644 --- a/packages/stem/example/postgres_worker/worker/bin/worker.dart +++ b/packages/stem/example/postgres_worker/worker/bin/worker.dart @@ -63,7 +63,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/progress_heartbeat/bin/worker.dart b/packages/stem/example/progress_heartbeat/bin/worker.dart index 1a7ade3f..eab0d2fe 100644 --- a/packages/stem/example/progress_heartbeat/bin/worker.dart +++ b/packages/stem/example/progress_heartbeat/bin/worker.dart @@ -42,7 +42,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await worker.start(); } diff --git a/packages/stem/example/rate_limit_delay/bin/worker.dart b/packages/stem/example/rate_limit_delay/bin/worker.dart index 90b0e3ae..823b37f9 100644 --- a/packages/stem/example/rate_limit_delay/bin/worker.dart +++ b/packages/stem/example/rate_limit_delay/bin/worker.dart @@ -47,7 +47,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await worker.start(); } diff --git a/packages/stem/example/redis_postgres_worker/worker/bin/worker.dart b/packages/stem/example/redis_postgres_worker/worker/bin/worker.dart index 90258f29..4244e57a 100644 --- a/packages/stem/example/redis_postgres_worker/worker/bin/worker.dart +++ b/packages/stem/example/redis_postgres_worker/worker/bin/worker.dart @@ -59,7 +59,9 @@ Future main(List args) async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/routing_parity/bin/worker.dart b/packages/stem/example/routing_parity/bin/worker.dart index 817a20d9..c01b3f71 100644 --- a/packages/stem/example/routing_parity/bin/worker.dart +++ b/packages/stem/example/routing_parity/bin/worker.dart @@ -46,5 +46,7 @@ Future main() async { } ProcessSignal.sigint.watch().listen((_) => shutdown()); - ProcessSignal.sigterm.watch().listen((_) => shutdown()); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen((_) => shutdown()); + } } diff --git a/packages/stem/example/scheduler_observability/bin/beat.dart b/packages/stem/example/scheduler_observability/bin/beat.dart index e804f6e3..393fec24 100644 --- a/packages/stem/example/scheduler_observability/bin/beat.dart +++ b/packages/stem/example/scheduler_observability/bin/beat.dart @@ -56,7 +56,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/scheduler_observability/bin/worker.dart b/packages/stem/example/scheduler_observability/bin/worker.dart index 99bc708a..d017dbb2 100644 --- a/packages/stem/example/scheduler_observability/bin/worker.dart +++ b/packages/stem/example/scheduler_observability/bin/worker.dart @@ -37,7 +37,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/signals_demo/bin/producer.dart b/packages/stem/example/signals_demo/bin/producer.dart index e3c108a8..de65e04c 100644 --- a/packages/stem/example/signals_demo/bin/producer.dart +++ b/packages/stem/example/signals_demo/bin/producer.dart @@ -36,7 +36,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(scheduleShutdown); - ProcessSignal.sigterm.watch().listen(scheduleShutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(scheduleShutdown); + } await Completer().future; } diff --git a/packages/stem/example/signals_demo/bin/worker.dart b/packages/stem/example/signals_demo/bin/worker.dart index 58a3b3db..338efd49 100644 --- a/packages/stem/example/signals_demo/bin/worker.dart +++ b/packages/stem/example/signals_demo/bin/worker.dart @@ -44,7 +44,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(scheduleShutdown); - ProcessSignal.sigterm.watch().listen(scheduleShutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(scheduleShutdown); + } // Keep the worker running indefinitely. await Completer().future; diff --git a/packages/stem/example/signing_key_rotation/bin/worker.dart b/packages/stem/example/signing_key_rotation/bin/worker.dart index 83d4fd83..f95809d3 100644 --- a/packages/stem/example/signing_key_rotation/bin/worker.dart +++ b/packages/stem/example/signing_key_rotation/bin/worker.dart @@ -43,7 +43,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await Completer().future; } diff --git a/packages/stem/example/task_context_mixed/bin/worker.dart b/packages/stem/example/task_context_mixed/bin/worker.dart index ade492a3..0fda8b3f 100644 --- a/packages/stem/example/task_context_mixed/bin/worker.dart +++ b/packages/stem/example/task_context_mixed/bin/worker.dart @@ -24,10 +24,12 @@ Future main() async { stdout.writeln('Shutdown requested.'); unawaited(_shutdown(worker, broker, backend)); }); - ProcessSignal.sigterm.watch().listen((_) { - stdout.writeln('Shutdown requested.'); - unawaited(_shutdown(worker, broker, backend)); - }); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen((_) { + stdout.writeln('Shutdown requested.'); + unawaited(_shutdown(worker, broker, backend)); + }); + } final paths = resolveDatabasePaths(); stdout.writeln( diff --git a/packages/stem/example/worker_control_lab/bin/worker.dart b/packages/stem/example/worker_control_lab/bin/worker.dart index ab8505e6..517c8310 100644 --- a/packages/stem/example/worker_control_lab/bin/worker.dart +++ b/packages/stem/example/worker_control_lab/bin/worker.dart @@ -47,7 +47,9 @@ Future main() async { } ProcessSignal.sigint.watch().listen(shutdown); - ProcessSignal.sigterm.watch().listen(shutdown); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen(shutdown); + } await worker.start(); } diff --git a/packages/stem/lib/src/bootstrap/workflow_app.dart b/packages/stem/lib/src/bootstrap/workflow_app.dart index abbe7344..f0ef4966 100644 --- a/packages/stem/lib/src/bootstrap/workflow_app.dart +++ b/packages/stem/lib/src/bootstrap/workflow_app.dart @@ -558,10 +558,7 @@ class StemWorkflowApp T Function(Map payload, int version)? decodeVersionedJson, }) async { assert( - [decode, decodeJson, decodeVersionedJson] - .whereType() - .length <= - 1, + [decode, decodeJson, decodeVersionedJson].whereType().length <= 1, 'Specify at most one of decode, decodeJson, or decodeVersionedJson.', ); final startedAt = stemNow(); @@ -1072,11 +1069,9 @@ extension StemAppWorkflowExtension on StemApp { void _validateReusableStemApp( StemApp app, - StemWorkerConfig workerConfig, - { - required bool allowWorkerAutoStart, - } -) { + StemWorkerConfig workerConfig, { + required bool allowWorkerAutoStart, +}) { if (app.allowWorkerAutoStart != allowWorkerAutoStart) { throw StateError( 'StemWorkflowApp.create(stemApp: ...) requires the reused StemApp ' diff --git a/packages/stem/lib/src/core/contracts.dart b/packages/stem/lib/src/core/contracts.dart index 0ea0ef94..f50c69ba 100644 --- a/packages/stem/lib/src/core/contracts.dart +++ b/packages/stem/lib/src/core/contracts.dart @@ -2804,8 +2804,7 @@ class TaskDefinition { decodeResultJson == null || decodeResultVersionedJson == null, 'Specify either decodeResultJson or decodeResultVersionedJson, not both.', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: defaultDecodeVersion ?? 1, decode: decodeResultVersionedJson, @@ -2847,8 +2846,7 @@ class TaskDefinition { decodeResultJson == null || decodeResultVersionedJson == null, 'Specify either decodeResultJson or decodeResultVersionedJson, not both.', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: version, decode: decodeResultVersionedJson, @@ -2946,8 +2944,7 @@ class TaskDefinition { defaultDecodeVersion: defaultDecodeVersion, typeName: argsTypeName ?? '$TArgs', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: version, decode: decodeResultVersionedJson, @@ -3268,7 +3265,6 @@ class NoArgsTaskDefinition { /// Decodes a persisted payload into a typed result. TResult? decode(Object? payload) => asDefinition.decode(payload); - } /// Represents a pending enqueue operation built from a [TaskDefinition]. @@ -3312,7 +3308,6 @@ class TaskCall { /// Resolve final options combining call overrides with defaults. TaskOptions resolveOptions() => options ?? definition.defaultOptions; - } /// Convenience helpers for building typed enqueue requests directly from a task @@ -3374,7 +3369,6 @@ extension TaskEnqueuerBuilderExtension on TaskEnqueuer { enqueueOptions: enqueueOptions, ); } - } /// Retry strategy used to compute the next backoff delay. diff --git a/packages/stem/lib/src/core/payload_codec.dart b/packages/stem/lib/src/core/payload_codec.dart index 2a3b91b9..38d8b7f3 100644 --- a/packages/stem/lib/src/core/payload_codec.dart +++ b/packages/stem/lib/src/core/payload_codec.dart @@ -40,7 +40,6 @@ class PayloadVersionRegistry { } } - /// Encodes and decodes a strongly-typed payload value. /// /// This author-facing codec layer is used by generated workflow/task helpers to diff --git a/packages/stem/lib/src/core/payload_map.dart b/packages/stem/lib/src/core/payload_map.dart index 6513c259..f856f149 100644 --- a/packages/stem/lib/src/core/payload_map.dart +++ b/packages/stem/lib/src/core/payload_map.dart @@ -86,10 +86,11 @@ extension PayloadMapX on Map { throw StateError("Missing required payload key '$key'."); } return valueJson( - key, - decode: decode, - typeName: typeName, - ) as T; + key, + decode: decode, + typeName: typeName, + ) + as T; } /// Decodes the value for [key] as a version-aware typed DTO, or [fallback] @@ -125,12 +126,13 @@ extension PayloadMapX on Map { throw StateError("Missing required payload key '$key'."); } return valueVersionedJson( - key, - defaultVersion: defaultVersion, - decode: decode, - defaultDecodeVersion: defaultDecodeVersion, - typeName: typeName, - ) as T; + key, + defaultVersion: defaultVersion, + decode: decode, + defaultDecodeVersion: defaultDecodeVersion, + typeName: typeName, + ) + as T; } /// Returns the decoded list value for [key], or `null` when it is absent. diff --git a/packages/stem/lib/src/core/stem.dart b/packages/stem/lib/src/core/stem.dart index 91fcd010..370da441 100644 --- a/packages/stem/lib/src/core/stem.dart +++ b/packages/stem/lib/src/core/stem.dart @@ -541,10 +541,7 @@ class Stem implements TaskResultCaller { T Function(Map payload, int version)? decodeVersionedJson, }) async { assert( - [decode, decodeJson, decodeVersionedJson] - .whereType() - .length <= - 1, + [decode, decodeJson, decodeVersionedJson].whereType().length <= 1, 'Specify at most one of decode, decodeJson, or decodeVersionedJson.', ); final resultBackend = backend; @@ -1184,7 +1181,6 @@ TResult _decodeTaskDefinitionResult( return value as TResult; } - /// Convenience helpers for waiting on typed task definitions. extension TaskDefinitionExtension on TaskDefinition { diff --git a/packages/stem/lib/src/worker/worker.dart b/packages/stem/lib/src/worker/worker.dart index 9565fa91..5354e64c 100644 --- a/packages/stem/lib/src/worker/worker.dart +++ b/packages/stem/lib/src/worker/worker.dart @@ -3211,24 +3211,29 @@ class Worker { /// Installs process signal handlers for graceful shutdown. void _installSignalHandlers() { if (!lifecycleConfig.installSignalHandlers) return; - _sigtermSub ??= _safeWatch(ProcessSignal.sigterm, () { - if (_running) { - unawaited(shutdown(mode: WorkerShutdownMode.warm)); - } - }); _sigintSub ??= _safeWatch(ProcessSignal.sigint, () { if (_running) { unawaited(shutdown(mode: WorkerShutdownMode.soft)); } }); - _sigquitSub ??= _safeWatch(ProcessSignal.sigquit, () { - if (_running) { - unawaited(shutdown()); - } - }); + + // Windows does not support the full Unix signal set. Only install the + // additional handlers when the platform can actually deliver them. + if (!Platform.isWindows) { + _sigtermSub ??= _safeWatch(ProcessSignal.sigterm, () { + if (_running) { + unawaited(shutdown(mode: WorkerShutdownMode.warm)); + } + }); + _sigquitSub ??= _safeWatch(ProcessSignal.sigquit, () { + if (_running) { + unawaited(shutdown()); + } + }); + } } - /// Wraps [ProcessSignal.watch] to guard against unsupported platforms. + /// Wraps [ProcessSignal.watch] to guard against unsupported runtimes. StreamSubscription? _safeWatch( ProcessSignal signal, void Function() handler, diff --git a/packages/stem/lib/src/worker/worker_config.dart b/packages/stem/lib/src/worker/worker_config.dart index 8b34ff62..7aef4408 100644 --- a/packages/stem/lib/src/worker/worker_config.dart +++ b/packages/stem/lib/src/worker/worker_config.dart @@ -160,7 +160,7 @@ class WorkerAutoscaleConfig { /// Lifecycle guard configuration for worker isolates and shutdown semantics. /// /// Controls how the worker handles: -/// - **Process Signals**: SIGTERM, SIGINT, SIGQUIT handling +/// - **Process Signals**: Platform-appropriate shutdown signal handling /// - **Shutdown Behavior**: Grace periods and forced termination /// - **Isolate Recycling**: When to recycle isolates based on usage /// @@ -171,9 +171,12 @@ class WorkerAutoscaleConfig { /// /// | Signal | Default Behavior | /// |--------|-----------------| -/// | SIGTERM | Soft shutdown | /// | SIGINT | Soft shutdown | -/// | SIGQUIT | Hard shutdown | +/// | SIGTERM | Warm shutdown on platforms that support it | +/// | SIGQUIT | Hard shutdown on platforms that support it | +/// +/// Windows only installs the SIGINT handler because SIGTERM/SIGQUIT are not +/// consistently available there. /// /// ## Isolate Recycling /// @@ -208,7 +211,7 @@ class WorkerLifecycleConfig { this.maxMemoryPerIsolateBytes, }); - /// Whether to install default signal handlers (SIGTERM/SIGINT/SIGQUIT). + /// Whether to install default process signal handlers for the host platform. final bool installSignalHandlers; /// Grace period before escalating a soft shutdown to hard termination. diff --git a/packages/stem/lib/src/workflow/core/run_state.dart b/packages/stem/lib/src/workflow/core/run_state.dart index 777157d1..c1088b93 100644 --- a/packages/stem/lib/src/workflow/core/run_state.dart +++ b/packages/stem/lib/src/workflow/core/run_state.dart @@ -112,8 +112,7 @@ class RunState { WorkflowRunRuntimeMetadata.fromParams(params); /// Parent workflow run identifier, if this run was started as a child. - String? get parentRunId => - params[workflowParentRunIdParamKey]?.toString(); + String? get parentRunId => params[workflowParentRunIdParamKey]?.toString(); /// Timestamp when the workflow run was created. final DateTime createdAt; diff --git a/packages/stem/lib/src/workflow/core/workflow_event_ref.dart b/packages/stem/lib/src/workflow/core/workflow_event_ref.dart index 00a41f06..2dd90df6 100644 --- a/packages/stem/lib/src/workflow/core/workflow_event_ref.dart +++ b/packages/stem/lib/src/workflow/core/workflow_event_ref.dart @@ -143,7 +143,6 @@ class WorkflowEventRef { /// Optional codec for encoding and decoding event payloads. final PayloadCodec? codec; - } /// Convenience helpers for dispatching typed workflow events. diff --git a/packages/stem/lib/src/workflow/core/workflow_ref.dart b/packages/stem/lib/src/workflow/core/workflow_ref.dart index 8097d022..812a7515 100644 --- a/packages/stem/lib/src/workflow/core/workflow_ref.dart +++ b/packages/stem/lib/src/workflow/core/workflow_ref.dart @@ -45,8 +45,7 @@ class WorkflowRef { decodeResultJson == null || decodeResultVersionedJson == null, 'Specify either decodeResultJson or decodeResultVersionedJson, not both.', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: defaultDecodeVersion ?? 1, decode: decodeResultVersionedJson, @@ -84,8 +83,7 @@ class WorkflowRef { decodeResultJson == null || decodeResultVersionedJson == null, 'Specify either decodeResultJson or decodeResultVersionedJson, not both.', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: version, decode: decodeResultVersionedJson, @@ -165,8 +163,7 @@ class WorkflowRef { defaultDecodeVersion: defaultDecodeVersion, typeName: paramsTypeName ?? '$TParams', ); - final resultCodec = - decodeResultVersionedJson != null + final resultCodec = decodeResultVersionedJson != null ? PayloadCodec.versionedJson( version: version, decode: decodeResultVersionedJson, @@ -490,7 +487,6 @@ class WorkflowStartCall { /// Encodes typed parameters into the workflow parameter map. Map encodeParams() => definition.encodeParams(params); - } /// Convenience helpers for waiting on typed workflow refs using a generic diff --git a/packages/stem/lib/src/workflow/runtime/workflow_runtime.dart b/packages/stem/lib/src/workflow/runtime/workflow_runtime.dart index 94f318cb..4e5015ed 100644 --- a/packages/stem/lib/src/workflow/runtime/workflow_runtime.dart +++ b/packages/stem/lib/src/workflow/runtime/workflow_runtime.dart @@ -324,10 +324,7 @@ class WorkflowRuntime implements WorkflowCaller, WorkflowEventEmitter { T Function(Map payload, int version)? decodeVersionedJson, }) async { assert( - [decode, decodeJson, decodeVersionedJson] - .whereType() - .length <= - 1, + [decode, decodeJson, decodeVersionedJson].whereType().length <= 1, 'Specify at most one of decode, decodeJson, or decodeVersionedJson.', ); final startedAt = _clock.now(); diff --git a/packages/stem/pubspec.yaml b/packages/stem/pubspec.yaml index cd3ff58b..5194ca24 100644 --- a/packages/stem/pubspec.yaml +++ b/packages/stem/pubspec.yaml @@ -1,6 +1,6 @@ name: stem description: "Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling." -version: 0.2.0 +version: 0.2.1-wip repository: https://github.com/kingwill101/stem resolution: workspace environment: diff --git a/packages/stem/test/bootstrap/shortcut_allow_auto_start_test.dart b/packages/stem/test/bootstrap/shortcut_allow_auto_start_test.dart index ce95793f..b282b752 100644 --- a/packages/stem/test/bootstrap/shortcut_allow_auto_start_test.dart +++ b/packages/stem/test/bootstrap/shortcut_allow_auto_start_test.dart @@ -40,47 +40,50 @@ void main() { } }); - test('StemWorkflowApp can create runs without starting the worker', () async { - final flow = Flow( - name: 'shortcut.workflow', - build: (builder) { - builder.step('done', (context) async => 'workflow-done'); - }, - ); + test( + 'StemWorkflowApp can create runs without starting the worker', + () async { + final flow = Flow( + name: 'shortcut.workflow', + build: (builder) { + builder.step('done', (context) async => 'workflow-done'); + }, + ); - final app = await StemWorkflowApp.inMemory( - flows: [flow], - allowWorkerAutoStart: false, - ); + final app = await StemWorkflowApp.inMemory( + flows: [flow], + allowWorkerAutoStart: false, + ); - try { - final runId = await flow.start(app); - expect(app.isRuntimeStarted, isTrue); - expect(app.isWorkerStarted, isFalse); + try { + final runId = await flow.start(app); + expect(app.isRuntimeStarted, isTrue); + expect(app.isWorkerStarted, isFalse); - final pending = await app.waitForCompletion( - runId, - timeout: const Duration(milliseconds: 10), - ); - expect(pending, isNotNull); - expect(pending!.timedOut, isTrue); - expect(pending.status, WorkflowStatus.running); + final pending = await app.waitForCompletion( + runId, + timeout: const Duration(milliseconds: 10), + ); + expect(pending, isNotNull); + expect(pending!.timedOut, isTrue); + expect(pending.status, WorkflowStatus.running); - await app.startWorker(); - expect(app.isRuntimeStarted, isTrue); - expect(app.isWorkerStarted, isTrue); - expect(app.isStarted, isTrue); + await app.startWorker(); + expect(app.isRuntimeStarted, isTrue); + expect(app.isWorkerStarted, isTrue); + expect(app.isStarted, isTrue); - final completed = await flow.waitFor( - app, - runId, - timeout: const Duration(seconds: 1), - ); - expect(completed?.isCompleted, isTrue); - expect(completed?.value, 'workflow-done'); - } finally { - await app.shutdown(); - } - }); + final completed = await flow.waitFor( + app, + runId, + timeout: const Duration(seconds: 1), + ); + expect(completed?.isCompleted, isTrue); + expect(completed?.value, 'workflow-done'); + } finally { + await app.shutdown(); + } + }, + ); }); } diff --git a/packages/stem/test/bootstrap/stem_app_test.dart b/packages/stem/test/bootstrap/stem_app_test.dart index 74c1ddd6..7bda08b8 100644 --- a/packages/stem/test/bootstrap/stem_app_test.dart +++ b/packages/stem/test/bootstrap/stem_app_test.dart @@ -549,43 +549,44 @@ void main() { test( 'startWorkflowValue encodes typed params through the supplied codec', () async { - final flow = Flow( - name: 'workflow.codec.start', - build: (builder) { - builder.step( - 'payload', - (ctx) async => - '${ctx.requiredParam('foo')}:' - '${ctx.requiredParam('kind')}', - ); - }, - ); - - final workflowApp = await StemWorkflowApp.inMemory(flows: [flow]); - try { - final runId = await workflowApp.startWorkflowValue( - 'workflow.codec.start', - const _DemoPayload('bar'), - codec: const PayloadCodec<_DemoPayload>.map( - encode: _encodeDemoPayloadMap, - decode: _DemoPayload.fromJson, - typeName: '_DemoPayload', - ), - ); - final runState = await workflowApp.getRun(runId); - final run = await workflowApp.waitForCompletion( - runId, - timeout: const Duration(seconds: 2), + final flow = Flow( + name: 'workflow.codec.start', + build: (builder) { + builder.step( + 'payload', + (ctx) async => + '${ctx.requiredParam('foo')}:' + '${ctx.requiredParam('kind')}', + ); + }, ); - expect(runId, isNotEmpty); - expect(runState?.params, containsPair('foo', 'bar')); - expect(runState?.params, containsPair('kind', 'custom')); - expect(run?.requiredValue(), 'bar:custom'); - } finally { - await workflowApp.shutdown(); - } - }); + final workflowApp = await StemWorkflowApp.inMemory(flows: [flow]); + try { + final runId = await workflowApp.startWorkflowValue( + 'workflow.codec.start', + const _DemoPayload('bar'), + codec: const PayloadCodec<_DemoPayload>.map( + encode: _encodeDemoPayloadMap, + decode: _DemoPayload.fromJson, + typeName: '_DemoPayload', + ), + ); + final runState = await workflowApp.getRun(runId); + final run = await workflowApp.waitForCompletion( + runId, + timeout: const Duration(seconds: 2), + ); + + expect(runId, isNotEmpty); + expect(runState?.params, containsPair('foo', 'bar')); + expect(runState?.params, containsPair('kind', 'custom')); + expect(run?.requiredValue(), 'bar:custom'); + } finally { + await workflowApp.shutdown(); + } + }, + ); test( 'startWorkflowVersionedJson encodes DTO params with a persisted ' diff --git a/packages/stem/test/bootstrap/workflow_module_bootstrap_test.dart b/packages/stem/test/bootstrap/workflow_module_bootstrap_test.dart index e4ffdc2d..9c1bdb56 100644 --- a/packages/stem/test/bootstrap/workflow_module_bootstrap_test.dart +++ b/packages/stem/test/bootstrap/workflow_module_bootstrap_test.dart @@ -161,23 +161,23 @@ void main() { test( 'explicit workflow subscription overrides inferred module queues', () async { - final helperTask = FunctionTaskHandler( - name: 'workflow.module.explicit-subscription', - entrypoint: (context, args) async => 'ignored', - runInIsolate: false, - ); - final workflowApp = await StemWorkflowApp.inMemory( - module: StemModule(tasks: [helperTask]), - workerConfig: StemWorkerConfig( - queue: 'workflow', - subscription: RoutingSubscription.singleQueue('workflow'), - ), - ); - try { - expect(workflowApp.app.worker.subscription.queues, ['workflow']); - } finally { - await workflowApp.shutdown(); - } + final helperTask = FunctionTaskHandler( + name: 'workflow.module.explicit-subscription', + entrypoint: (context, args) async => 'ignored', + runInIsolate: false, + ); + final workflowApp = await StemWorkflowApp.inMemory( + module: StemModule(tasks: [helperTask]), + workerConfig: StemWorkerConfig( + queue: 'workflow', + subscription: RoutingSubscription.singleQueue('workflow'), + ), + ); + try { + expect(workflowApp.app.worker.subscription.queues, ['workflow']); + } finally { + await workflowApp.shutdown(); + } }, ); diff --git a/packages/stem/test/support/fixtures/daemon_stub.dart b/packages/stem/test/support/fixtures/daemon_stub.dart index bcc9cfbb..12a87f7d 100644 --- a/packages/stem/test/support/fixtures/daemon_stub.dart +++ b/packages/stem/test/support/fixtures/daemon_stub.dart @@ -19,16 +19,18 @@ Future main(List args) async { completer.complete(); } }); - ProcessSignal.sigterm.watch().listen((_) { - if (!completer.isCompleted) { - completer.complete(); - } - }); ProcessSignal.sigint.watch().listen((_) { if (!completer.isCompleted) { completer.complete(); } }); + if (!Platform.isWindows) { + ProcessSignal.sigterm.watch().listen((_) { + if (!completer.isCompleted) { + completer.complete(); + } + }); + } await completer.future; fallback.cancel(); diff --git a/packages/stem/test/unit/core/contracts_test.dart b/packages/stem/test/unit/core/contracts_test.dart index 402c3c8b..765f4b78 100644 --- a/packages/stem/test/unit/core/contracts_test.dart +++ b/packages/stem/test/unit/core/contracts_test.dart @@ -342,8 +342,11 @@ void main() { decode: _GroupReceipt.fromJson, ), { - 'task-1': isA<_GroupReceipt>() - .having((value) => value.id, 'id', 'receipt-1'), + 'task-1': isA<_GroupReceipt>().having( + (value) => value.id, + 'id', + 'receipt-1', + ), }, ); expect( @@ -352,8 +355,11 @@ void main() { decode: _GroupReceipt.fromVersionedJson, ), { - 'task-1': isA<_GroupReceipt>() - .having((value) => value.id, 'id', 'receipt-1'), + 'task-1': isA<_GroupReceipt>().having( + (value) => value.id, + 'id', + 'receipt-1', + ), }, ); }); @@ -582,16 +588,22 @@ void main() { ); expect( entry.metaJson<_ScheduleMeta>(decode: _ScheduleMeta.fromJson), - isA<_ScheduleMeta>() - .having((value) => value.source, 'source', 'scheduler'), + isA<_ScheduleMeta>().having( + (value) => value.source, + 'source', + 'scheduler', + ), ); expect( entry.metaVersionedJson<_ScheduleMeta>( version: 2, decode: _ScheduleMeta.fromVersionedJson, ), - isA<_ScheduleMeta>() - .having((value) => value.source, 'source', 'scheduler'), + isA<_ScheduleMeta>().having( + (value) => value.source, + 'source', + 'scheduler', + ), ); }); }); diff --git a/packages/stem/test/unit/core/queue_events_test.dart b/packages/stem/test/unit/core/queue_events_test.dart index 9fd8c7ba..f6b57c6f 100644 --- a/packages/stem/test/unit/core/queue_events_test.dart +++ b/packages/stem/test/unit/core/queue_events_test.dart @@ -172,47 +172,47 @@ void main() { test( 'emitValue publishes typed payloads through the supplied codec', () async { - final listener = QueueEvents( - broker: broker, - queue: 'orders', - consumerName: 'orders-listener-codec', - ); - await listener.start(); - addTearDown(listener.close); - - final received = listener - .on('order.codec') - .first - .timeout(const Duration(seconds: 5)); + final listener = QueueEvents( + broker: broker, + queue: 'orders', + consumerName: 'orders-listener-codec', + ); + await listener.start(); + addTearDown(listener.close); - final eventId = await producer.emitValue( - 'orders', - 'order.codec', - const _QueueEventPayload(orderId: 'o-2b', status: 'codec'), - codec: const PayloadCodec<_QueueEventPayload>.map( - encode: _encodeQueueEventPayloadMap, - decode: _QueueEventPayload.fromJson, - typeName: '_QueueEventPayload', - ), - ); + final received = listener + .on('order.codec') + .first + .timeout(const Duration(seconds: 5)); - final event = await received; - expect(event.id, eventId); - expect(event.requiredPayloadValue('orderId'), 'o-2b'); - expect(event.requiredPayloadValue('status'), 'codec'); - expect(event.requiredPayloadValue('kind'), 'custom'); - expect( - event.payloadAs<_QueueEventPayload>( + final eventId = await producer.emitValue( + 'orders', + 'order.codec', + const _QueueEventPayload(orderId: 'o-2b', status: 'codec'), codec: const PayloadCodec<_QueueEventPayload>.map( encode: _encodeQueueEventPayloadMap, decode: _QueueEventPayload.fromJson, typeName: '_QueueEventPayload', ), - ), - isA<_QueueEventPayload>() - .having((value) => value.orderId, 'orderId', 'o-2b') - .having((value) => value.status, 'status', 'codec'), - ); + ); + + final event = await received; + expect(event.id, eventId); + expect(event.requiredPayloadValue('orderId'), 'o-2b'); + expect(event.requiredPayloadValue('status'), 'codec'); + expect(event.requiredPayloadValue('kind'), 'custom'); + expect( + event.payloadAs<_QueueEventPayload>( + codec: const PayloadCodec<_QueueEventPayload>.map( + encode: _encodeQueueEventPayloadMap, + decode: _QueueEventPayload.fromJson, + typeName: '_QueueEventPayload', + ), + ), + isA<_QueueEventPayload>() + .having((value) => value.orderId, 'orderId', 'o-2b') + .having((value) => value.status, 'status', 'codec'), + ); }, ); diff --git a/packages/stem/test/unit/core/stem_core_test.dart b/packages/stem/test/unit/core/stem_core_test.dart index a8cd359c..7fe54382 100644 --- a/packages/stem/test/unit/core/stem_core_test.dart +++ b/packages/stem/test/unit/core/stem_core_test.dart @@ -409,11 +409,11 @@ void main() { final stem = Stem(broker: broker, backend: backend); final definition = TaskDefinition<_CodecTaskArgs, _CodecReceipt>.versionedMap( - name: 'sample.versioned_map.result', - version: 2, - encodeArgs: (args) => {'legacy_value': args.value}, - decodeResultVersionedJson: _CodecReceipt.fromVersionedJson, - ); + name: 'sample.versioned_map.result', + version: 2, + encodeArgs: (args) => {'legacy_value': args.value}, + decodeResultVersionedJson: _CodecReceipt.fromVersionedJson, + ); final id = await stem.enqueueCall( definition.buildCall(const _CodecTaskArgs('encoded')), @@ -838,29 +838,30 @@ void main() { test( 'supports json argful task definitions with versioned results', () async { - final backend = InMemoryResultBackend(); - final stem = Stem(broker: _RecordingBroker(), backend: backend); - final definition = TaskDefinition<_CodecTaskArgs, _CodecReceipt>.json( - name: 'args.json.versioned.wait', - decodeResultVersionedJson: _CodecReceipt.fromVersionedJson, - defaultDecodeVersion: 2, - ); + final backend = InMemoryResultBackend(); + final stem = Stem(broker: _RecordingBroker(), backend: backend); + final definition = TaskDefinition<_CodecTaskArgs, _CodecReceipt>.json( + name: 'args.json.versioned.wait', + decodeResultVersionedJson: _CodecReceipt.fromVersionedJson, + defaultDecodeVersion: 2, + ); - await backend.set( - 'task-args-json-versioned-wait', - TaskState.succeeded, - payload: {'id': 'done', PayloadCodec.versionKey: 2}, - meta: {stemResultEncoderMetaKey: _codecReceiptEncoder.id}, - ); + await backend.set( + 'task-args-json-versioned-wait', + TaskState.succeeded, + payload: {'id': 'done', PayloadCodec.versionKey: 2}, + meta: {stemResultEncoderMetaKey: _codecReceiptEncoder.id}, + ); - final result = await definition.waitFor( - stem, - 'task-args-json-versioned-wait', - ); + final result = await definition.waitFor( + stem, + 'task-args-json-versioned-wait', + ); - expect(result?.value?.id, 'done-v2'); - expect(result?.rawPayload, isA>()); - }); + expect(result?.value?.id, 'done-v2'); + expect(result?.rawPayload, isA>()); + }, + ); test('enqueueAndWait supports no-arg task definitions', () async { final broker = _RecordingBroker(); diff --git a/packages/stem/test/unit/core/task_context_enqueue_test.dart b/packages/stem/test/unit/core/task_context_enqueue_test.dart index e93346ea..30898bf5 100644 --- a/packages/stem/test/unit/core/task_context_enqueue_test.dart +++ b/packages/stem/test/unit/core/task_context_enqueue_test.dart @@ -139,32 +139,32 @@ void main() { test( 'enqueueValue encodes typed payloads through the supplied codec', () async { - final enqueuer = _RecordingEnqueuer(); - final context = TaskContext( - id: 'parent-3b', - attempt: 1, - headers: const {'x-trace-id': 'trace-2'}, - meta: const {'tenant': 'acme'}, - heartbeat: () {}, - extendLease: (_) async {}, - progress: (_, {data}) async {}, - enqueuer: enqueuer, - ); + final enqueuer = _RecordingEnqueuer(); + final context = TaskContext( + id: 'parent-3b', + attempt: 1, + headers: const {'x-trace-id': 'trace-2'}, + meta: const {'tenant': 'acme'}, + heartbeat: () {}, + extendLease: (_) async {}, + progress: (_, {data}) async {}, + enqueuer: enqueuer, + ); - await context.enqueueValue( - 'tasks.child', - const _InvitePayload(email: 'ops@example.com'), - codec: const PayloadCodec<_InvitePayload>.json( - decode: _InvitePayload.fromJson, - typeName: '_InvitePayload', - ), - ); + await context.enqueueValue( + 'tasks.child', + const _InvitePayload(email: 'ops@example.com'), + codec: const PayloadCodec<_InvitePayload>.json( + decode: _InvitePayload.fromJson, + typeName: '_InvitePayload', + ), + ); - final record = enqueuer.last!; - expect(record.args, equals({'email': 'ops@example.com'})); - expect(record.meta[_parentTaskIdKey], equals('parent-3b')); - expect(record.meta[_parentAttemptKey], equals(1)); - expect(record.headers['x-trace-id'], equals('trace-2')); + final record = enqueuer.last!; + expect(record.args, equals({'email': 'ops@example.com'})); + expect(record.meta[_parentTaskIdKey], equals('parent-3b')); + expect(record.meta[_parentAttemptKey], equals(1)); + expect(record.headers['x-trace-id'], equals('trace-2')); }, ); diff --git a/packages/stem/test/unit/core/task_enqueue_builder_test.dart b/packages/stem/test/unit/core/task_enqueue_builder_test.dart index b79d6a79..34a2c06b 100644 --- a/packages/stem/test/unit/core/task_enqueue_builder_test.dart +++ b/packages/stem/test/unit/core/task_enqueue_builder_test.dart @@ -163,18 +163,18 @@ void main() { test( 'NoArgsTaskDefinition.asDefinition.buildCall builds an empty call', () { - final definition = TaskDefinition.noArgs(name: 'demo.no_args'); + final definition = TaskDefinition.noArgs(name: 'demo.no_args'); - final call = definition.asDefinition.buildCall( - (), - headers: const {'h': 'v'}, - meta: const {'m': 1}, - ); + final call = definition.asDefinition.buildCall( + (), + headers: const {'h': 'v'}, + meta: const {'m': 1}, + ); - expect(call.name, 'demo.no_args'); - expect(call.encodeArgs(), isEmpty); - expect(call.headers, containsPair('h', 'v')); - expect(call.meta, containsPair('m', 1)); + expect(call.name, 'demo.no_args'); + expect(call.encodeArgs(), isEmpty); + expect(call.headers, containsPair('h', 'v')); + expect(call.meta, containsPair('m', 1)); }, ); @@ -194,21 +194,21 @@ void main() { test( 'NoArgsTaskDefinition.enqueue uses the TaskEnqueuer surface', () async { - final definition = TaskDefinition.noArgs(name: 'demo.no_args'); - final enqueuer = _RecordingTaskEnqueuer(); + final definition = TaskDefinition.noArgs(name: 'demo.no_args'); + final enqueuer = _RecordingTaskEnqueuer(); - final taskId = await definition.enqueue( - enqueuer, - headers: const {'h': 'v'}, - meta: const {'m': 1}, - ); + final taskId = await definition.enqueue( + enqueuer, + headers: const {'h': 'v'}, + meta: const {'m': 1}, + ); - expect(taskId, 'task-1'); - expect(enqueuer.lastCall, isNotNull); - expect(enqueuer.lastCall!.name, 'demo.no_args'); - expect(enqueuer.lastCall!.encodeArgs(), isEmpty); - expect(enqueuer.lastCall!.headers, containsPair('h', 'v')); - expect(enqueuer.lastCall!.meta, containsPair('m', 1)); + expect(taskId, 'task-1'); + expect(enqueuer.lastCall, isNotNull); + expect(enqueuer.lastCall!.name, 'demo.no_args'); + expect(enqueuer.lastCall!.encodeArgs(), isEmpty); + expect(enqueuer.lastCall!.headers, containsPair('h', 'v')); + expect(enqueuer.lastCall!.meta, containsPair('m', 1)); }, ); }); diff --git a/packages/stem/test/unit/core/task_invocation_test.dart b/packages/stem/test/unit/core/task_invocation_test.dart index 735ba22a..5505a70e 100644 --- a/packages/stem/test/unit/core/task_invocation_test.dart +++ b/packages/stem/test/unit/core/task_invocation_test.dart @@ -277,10 +277,13 @@ void main() { version: 2, ); - expect(progressSignal?.data, equals(const { - PayloadCodec.versionKey: 2, - 'stage': 'warming', - })); + expect( + progressSignal?.data, + equals(const { + PayloadCodec.versionKey: 2, + 'stage': 'warming', + }), + ); }, ); @@ -530,38 +533,48 @@ void main() { ); expect( - enqueue.argsVersionedJson<_ProgressUpdate>( - version: 2, - decode: _ProgressUpdate.fromVersionedJson, - ).stage, + enqueue + .argsVersionedJson<_ProgressUpdate>( + version: 2, + decode: _ProgressUpdate.fromVersionedJson, + ) + .stage, 'warming', ); expect( - enqueue.metaVersionedJson<_QueueLabel>( - version: 2, - decode: _QueueLabel.fromVersionedJson, - ).label, + enqueue + .metaVersionedJson<_QueueLabel>( + version: 2, + decode: _QueueLabel.fromVersionedJson, + ) + .label, 'queued', ); expect( - start.paramsVersionedJson<_WorkflowStartPayload>( - version: 2, - decode: _WorkflowStartPayload.fromVersionedJson, - ).value, + start + .paramsVersionedJson<_WorkflowStartPayload>( + version: 2, + decode: _WorkflowStartPayload.fromVersionedJson, + ) + .value, 'child', ); expect( - wait.resultVersionedJson<_WorkflowResultPayload>( - version: 2, - decode: _WorkflowResultPayload.fromVersionedJson, - )?.value, + wait + .resultVersionedJson<_WorkflowResultPayload>( + version: 2, + decode: _WorkflowResultPayload.fromVersionedJson, + ) + ?.value, 'done', ); expect( - emit.payloadVersionedJson<_WorkflowEventPayload>( - version: 2, - decode: _WorkflowEventPayload.fromVersionedJson, - ).value, + emit + .payloadVersionedJson<_WorkflowEventPayload>( + version: 2, + decode: _WorkflowEventPayload.fromVersionedJson, + ) + .value, 'event', ); }); diff --git a/packages/stem/test/unit/core/task_registry_test.dart b/packages/stem/test/unit/core/task_registry_test.dart index bd63b56c..572ae603 100644 --- a/packages/stem/test/unit/core/task_registry_test.dart +++ b/packages/stem/test/unit/core/task_registry_test.dart @@ -191,7 +191,6 @@ void main() { final handler = _TestHandler('meta', description: 'Example task'); expect(handler.metadata.description, 'Example task'); }); - }); group('TaskDefinition', () { diff --git a/packages/stem/test/unit/core/task_result_test.dart b/packages/stem/test/unit/core/task_result_test.dart index d128d14d..7b886e75 100644 --- a/packages/stem/test/unit/core/task_result_test.dart +++ b/packages/stem/test/unit/core/task_result_test.dart @@ -105,13 +105,13 @@ void main() { expect( result.requiredValue, throwsA( - isA().having( - (error) => error.message, - 'message', - contains('task-1'), - ), + isA().having( + (error) => error.message, + 'message', + contains('task-1'), ), - ); + ), + ); expect(result.valueOr(7), 7); }); } diff --git a/packages/stem/test/unit/workflow/flow_context_test.dart b/packages/stem/test/unit/workflow/flow_context_test.dart index af2379a8..d35a6b38 100644 --- a/packages/stem/test/unit/workflow/flow_context_test.dart +++ b/packages/stem/test/unit/workflow/flow_context_test.dart @@ -121,7 +121,9 @@ void main() { }, ); - test('startWith throws when workflow caller support is unavailable', () { + test( + 'startWith throws when workflow caller support is unavailable', + () { final context = FlowContext( workflow: 'demo', runId: 'run-4', diff --git a/packages/stem/test/unit/workflow/workflow_metadata_views_test.dart b/packages/stem/test/unit/workflow/workflow_metadata_views_test.dart index 5a4f9f23..a76a7fdb 100644 --- a/packages/stem/test/unit/workflow/workflow_metadata_views_test.dart +++ b/packages/stem/test/unit/workflow/workflow_metadata_views_test.dart @@ -200,31 +200,43 @@ void main() { state.lastErrorJson<_WorkflowErrorPayload>( decode: _WorkflowErrorPayload.fromJson, ), - isA<_WorkflowErrorPayload>() - .having((value) => value.message, 'message', 'boom'), + isA<_WorkflowErrorPayload>().having( + (value) => value.message, + 'message', + 'boom', + ), ); expect( state.lastErrorVersionedJson<_WorkflowErrorPayload>( version: 2, decode: _WorkflowErrorPayload.fromVersionedJson, ), - isA<_WorkflowErrorPayload>() - .having((value) => value.message, 'message', 'boom'), + isA<_WorkflowErrorPayload>().having( + (value) => value.message, + 'message', + 'boom', + ), ); expect( state.cancellationDataJson<_CancellationPayload>( decode: _CancellationPayload.fromJson, ), - isA<_CancellationPayload>() - .having((value) => value.reason, 'reason', 'manual'), + isA<_CancellationPayload>().having( + (value) => value.reason, + 'reason', + 'manual', + ), ); expect( state.cancellationDataVersionedJson<_CancellationPayload>( version: 2, decode: _CancellationPayload.fromVersionedJson, ), - isA<_CancellationPayload>() - .having((value) => value.reason, 'reason', 'manual'), + isA<_CancellationPayload>().having( + (value) => value.reason, + 'reason', + 'manual', + ), ); }); }); @@ -506,16 +518,22 @@ void main() { view.lastErrorJson<_WorkflowErrorPayload>( decode: _WorkflowErrorPayload.fromJson, ), - isA<_WorkflowErrorPayload>() - .having((value) => value.message, 'message', 'boom'), + isA<_WorkflowErrorPayload>().having( + (value) => value.message, + 'message', + 'boom', + ), ); expect( view.lastErrorVersionedJson<_WorkflowErrorPayload>( version: 2, decode: _WorkflowErrorPayload.fromVersionedJson, ), - isA<_WorkflowErrorPayload>() - .having((value) => value.message, 'message', 'boom'), + isA<_WorkflowErrorPayload>().having( + (value) => value.message, + 'message', + 'boom', + ), ); expect( view.runtimeJson<_RuntimePayload>(decode: _RuntimePayload.fromJson), diff --git a/packages/stem/test/unit/workflow/workflow_result_test.dart b/packages/stem/test/unit/workflow/workflow_result_test.dart index 24ada74c..62fdcf23 100644 --- a/packages/stem/test/unit/workflow/workflow_result_test.dart +++ b/packages/stem/test/unit/workflow/workflow_result_test.dart @@ -76,16 +76,14 @@ void main() { result.payloadJson<_WorkflowReceipt>( decode: _WorkflowReceipt.fromJson, ), - isA<_WorkflowReceipt>() - .having((value) => value.id, 'id', 'receipt-1'), + isA<_WorkflowReceipt>().having((value) => value.id, 'id', 'receipt-1'), ); expect( result.payloadVersionedJson<_WorkflowReceipt>( version: 2, decode: _WorkflowReceipt.fromVersionedJson, ), - isA<_WorkflowReceipt>() - .having((value) => value.id, 'id', 'receipt-1'), + isA<_WorkflowReceipt>().having((value) => value.id, 'id', 'receipt-1'), ); }); @@ -108,13 +106,13 @@ void main() { expect( result.requiredValue, throwsA( - isA().having( - (error) => error.message, - 'message', - contains('run-1'), - ), + isA().having( + (error) => error.message, + 'message', + contains('run-1'), ), - ); + ), + ); expect(result.valueOr(7), 7); }); } diff --git a/packages/stem/test/unit/workflow/workflow_resume_test.dart b/packages/stem/test/unit/workflow/workflow_resume_test.dart index bab01090..567b047a 100644 --- a/packages/stem/test/unit/workflow/workflow_resume_test.dart +++ b/packages/stem/test/unit/workflow/workflow_resume_test.dart @@ -488,8 +488,8 @@ void main() { stepIndex: 0, ); - final firstResult = - firstContext.waitForEventValueVersionedJson<_ResumePayload>( + final firstResult = firstContext + .waitForEventValueVersionedJson<_ResumePayload>( 'demo.event', defaultVersion: 2, decode: _ResumePayload.fromVersionedJson, @@ -514,8 +514,8 @@ void main() { }, ); - final resumed = - resumedContext.waitForEventValueVersionedJson<_ResumePayload>( + final resumed = resumedContext + .waitForEventValueVersionedJson<_ResumePayload>( 'demo.event', defaultVersion: 2, decode: _ResumePayload.fromVersionedJson, diff --git a/packages/stem/test/workflow/workflow_runtime_test.dart b/packages/stem/test/workflow/workflow_runtime_test.dart index 9add7da0..562a53cc 100644 --- a/packages/stem/test/workflow/workflow_runtime_test.dart +++ b/packages/stem/test/workflow/workflow_runtime_test.dart @@ -818,153 +818,153 @@ void main() { test( 'emitEvent resumes flows with versioned-json workflow event refs', () async { - final event = WorkflowEventRef<_UserUpdatedEvent>.versionedJson( - topic: 'user.updated.versioned.ref', - version: 2, - decode: _UserUpdatedEvent.fromVersionedJson, - typeName: '_UserUpdatedEvent', - ); - _UserUpdatedEvent? observedPayload; + final event = WorkflowEventRef<_UserUpdatedEvent>.versionedJson( + topic: 'user.updated.versioned.ref', + version: 2, + decode: _UserUpdatedEvent.fromVersionedJson, + typeName: '_UserUpdatedEvent', + ); + _UserUpdatedEvent? observedPayload; - runtime.registerWorkflow( - Flow( - name: 'event.versioned.ref.workflow', - build: (flow) { - flow.step( - 'wait', - (context) async { - final resume = event.waitValue(context); - if (resume == null) { - return null; - } - observedPayload = resume; - return resume.id; - }, - ); - }, - ).definition, - ); + runtime.registerWorkflow( + Flow( + name: 'event.versioned.ref.workflow', + build: (flow) { + flow.step( + 'wait', + (context) async { + final resume = event.waitValue(context); + if (resume == null) { + return null; + } + observedPayload = resume; + return resume.id; + }, + ); + }, + ).definition, + ); - final runId = await runtime.startWorkflow('event.versioned.ref.workflow'); - await runtime.executeRun(runId); + final runId = await runtime.startWorkflow('event.versioned.ref.workflow'); + await runtime.executeRun(runId); - final suspended = await store.get(runId); - expect(suspended?.status, WorkflowStatus.suspended); - expect(suspended?.waitTopic, event.topic); + final suspended = await store.get(runId); + expect(suspended?.status, WorkflowStatus.suspended); + expect(suspended?.waitTopic, event.topic); - await event.emit( - runtime, - const _UserUpdatedEvent(id: 'user-versioned-ref-2'), - ); - await runtime.executeRun(runId); + await event.emit( + runtime, + const _UserUpdatedEvent(id: 'user-versioned-ref-2'), + ); + await runtime.executeRun(runId); - final completed = await store.get(runId); - expect(completed?.status, WorkflowStatus.completed); - expect(observedPayload?.id, 'user-versioned-ref-2'); - expect(completed?.result, 'user-versioned-ref-2'); + final completed = await store.get(runId); + expect(completed?.status, WorkflowStatus.completed); + expect(observedPayload?.id, 'user-versioned-ref-2'); + expect(completed?.result, 'user-versioned-ref-2'); }, ); test( 'emitEvent resumes flows with registry-backed workflow event refs', () async { - final event = WorkflowEventRef<_UserUpdatedEvent>.versionedJsonRegistry( - topic: 'user.updated.registry.ref', - version: 2, - registry: _userUpdatedEventRegistry, - typeName: '_UserUpdatedEvent', - ); - _UserUpdatedEvent? observedPayload; + final event = WorkflowEventRef<_UserUpdatedEvent>.versionedJsonRegistry( + topic: 'user.updated.registry.ref', + version: 2, + registry: _userUpdatedEventRegistry, + typeName: '_UserUpdatedEvent', + ); + _UserUpdatedEvent? observedPayload; - runtime.registerWorkflow( - Flow( - name: 'event.registry.ref.workflow', - build: (flow) { - flow.step( - 'wait', - (context) async { - final resume = event.waitValue(context); - if (resume == null) { - return null; - } - observedPayload = resume; - return resume.id; - }, - ); - }, - ).definition, - ); + runtime.registerWorkflow( + Flow( + name: 'event.registry.ref.workflow', + build: (flow) { + flow.step( + 'wait', + (context) async { + final resume = event.waitValue(context); + if (resume == null) { + return null; + } + observedPayload = resume; + return resume.id; + }, + ); + }, + ).definition, + ); - final runId = await runtime.startWorkflow('event.registry.ref.workflow'); - await runtime.executeRun(runId); + final runId = await runtime.startWorkflow('event.registry.ref.workflow'); + await runtime.executeRun(runId); - final suspended = await store.get(runId); - expect(suspended?.status, WorkflowStatus.suspended); - expect(suspended?.waitTopic, event.topic); + final suspended = await store.get(runId); + expect(suspended?.status, WorkflowStatus.suspended); + expect(suspended?.waitTopic, event.topic); - await event.emit( - runtime, - const _UserUpdatedEvent(id: 'user-registry-ref-2'), - ); - await runtime.executeRun(runId); + await event.emit( + runtime, + const _UserUpdatedEvent(id: 'user-registry-ref-2'), + ); + await runtime.executeRun(runId); - final completed = await store.get(runId); - expect(completed?.status, WorkflowStatus.completed); - expect(observedPayload?.id, 'user-registry-ref-2'); - expect(completed?.result, 'user-registry-ref-2'); + final completed = await store.get(runId); + expect(completed?.status, WorkflowStatus.completed); + expect(observedPayload?.id, 'user-registry-ref-2'); + expect(completed?.result, 'user-registry-ref-2'); }, ); test( 'emitEvent resumes flows with versioned-map workflow event refs', () async { - final event = WorkflowEventRef<_UserUpdatedEvent>.versionedMap( - topic: 'user.updated.versioned.map.ref', - encode: (value) => {'user_id': value.id}, - version: 3, - decode: _UserUpdatedEvent.fromVersionedMap, - typeName: '_UserUpdatedEvent', - ); - _UserUpdatedEvent? observedPayload; + final event = WorkflowEventRef<_UserUpdatedEvent>.versionedMap( + topic: 'user.updated.versioned.map.ref', + encode: (value) => {'user_id': value.id}, + version: 3, + decode: _UserUpdatedEvent.fromVersionedMap, + typeName: '_UserUpdatedEvent', + ); + _UserUpdatedEvent? observedPayload; - runtime.registerWorkflow( - Flow( - name: 'event.versioned.map.ref.workflow', - build: (flow) { - flow.step( - 'wait', - (context) async { - final resume = event.waitValue(context); - if (resume == null) { - return null; - } - observedPayload = resume; - return resume.id; - }, - ); - }, - ).definition, - ); + runtime.registerWorkflow( + Flow( + name: 'event.versioned.map.ref.workflow', + build: (flow) { + flow.step( + 'wait', + (context) async { + final resume = event.waitValue(context); + if (resume == null) { + return null; + } + observedPayload = resume; + return resume.id; + }, + ); + }, + ).definition, + ); - final runId = await runtime.startWorkflow( - 'event.versioned.map.ref.workflow', - ); - await runtime.executeRun(runId); + final runId = await runtime.startWorkflow( + 'event.versioned.map.ref.workflow', + ); + await runtime.executeRun(runId); - final suspended = await store.get(runId); - expect(suspended?.status, WorkflowStatus.suspended); - expect(suspended?.waitTopic, event.topic); + final suspended = await store.get(runId); + expect(suspended?.status, WorkflowStatus.suspended); + expect(suspended?.waitTopic, event.topic); - await event.emit( - runtime, - const _UserUpdatedEvent(id: 'user-versioned-map-ref'), - ); - await runtime.executeRun(runId); + await event.emit( + runtime, + const _UserUpdatedEvent(id: 'user-versioned-map-ref'), + ); + await runtime.executeRun(runId); - final completed = await store.get(runId); - expect(completed?.status, WorkflowStatus.completed); - expect(observedPayload?.id, 'user-versioned-map-ref-v3'); - expect(completed?.result, 'user-versioned-map-ref-v3'); + final completed = await store.get(runId); + expect(completed?.status, WorkflowStatus.completed); + expect(observedPayload?.id, 'user-versioned-map-ref-v3'); + expect(completed?.result, 'user-versioned-map-ref-v3'); }, ); diff --git a/packages/stem_adapter_tests/pubspec.yaml b/packages/stem_adapter_tests/pubspec.yaml index 01f091c6..ce5b15e7 100644 --- a/packages/stem_adapter_tests/pubspec.yaml +++ b/packages/stem_adapter_tests/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: ">=3.9.2 <4.0.0" dependencies: - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" test: ^1.29.0 dev_dependencies: diff --git a/packages/stem_builder/example/bin/main.dart b/packages/stem_builder/example/bin/main.dart index 9d4481af..f523c634 100644 --- a/packages/stem_builder/example/bin/main.dart +++ b/packages/stem_builder/example/bin/main.dart @@ -11,9 +11,9 @@ Future main() async { print('\nGenerated workflow manifest:'); print( - const JsonEncoder.withIndent( - ' ', - ).convert(stemModule.workflowManifest.map((entry) => entry.toJson()).toList()), + const JsonEncoder.withIndent(' ').convert( + stemModule.workflowManifest.map((entry) => entry.toJson()).toList(), + ), ); final app = await StemWorkflowApp.inMemory(module: stemModule); @@ -43,10 +43,7 @@ Future main() async { final taskApp = await StemApp.inMemory(module: stemModule); try { final taskResult = await StemTaskDefinitions.builderExamplePing - .enqueueAndWait( - taskApp, - timeout: const Duration(seconds: 2), - ); + .enqueueAndWait(taskApp, timeout: const Duration(seconds: 2)); print('\nNo-arg task result: ${taskResult?.value}'); } finally { await taskApp.shutdown(); diff --git a/packages/stem_builder/example/bin/runtime_metadata_views.dart b/packages/stem_builder/example/bin/runtime_metadata_views.dart index 1c81bc83..f8a71cde 100644 --- a/packages/stem_builder/example/bin/runtime_metadata_views.dart +++ b/packages/stem_builder/example/bin/runtime_metadata_views.dart @@ -10,9 +10,9 @@ Future main() async { try { print('--- Generated manifest (builder output) ---'); print( - const JsonEncoder.withIndent( - ' ', - ).convert(stemModule.workflowManifest.map((entry) => entry.toJson()).toList()), + const JsonEncoder.withIndent(' ').convert( + stemModule.workflowManifest.map((entry) => entry.toJson()).toList(), + ), ); print('\n--- Runtime manifest (registered definitions) ---'); @@ -31,11 +31,10 @@ Future main() async { ); await app.executeRun(flowRunId); - final scriptRunId = await StemWorkflowDefinitions.userSignup - .start( - runtime, - params: 'dev@stem.dev', - ); + final scriptRunId = await StemWorkflowDefinitions.userSignup.start( + runtime, + params: 'dev@stem.dev', + ); await app.executeRun(scriptRunId); final runViews = await app.listRunViews(limit: 10); diff --git a/packages/stem_builder/pubspec.yaml b/packages/stem_builder/pubspec.yaml index 43b5b059..54c99d56 100644 --- a/packages/stem_builder/pubspec.yaml +++ b/packages/stem_builder/pubspec.yaml @@ -13,7 +13,7 @@ dependencies: dart_style: ^3.1.4 glob: ^2.1.3 source_gen: ^4.1.2 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" dev_dependencies: build_runner: ^2.10.5 diff --git a/packages/stem_cli/pubspec.yaml b/packages/stem_cli/pubspec.yaml index ede5f621..e59df733 100644 --- a/packages/stem_cli/pubspec.yaml +++ b/packages/stem_cli/pubspec.yaml @@ -8,7 +8,7 @@ environment: dependencies: artisanal: ^0.2.0 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" stem_redis: ^0.1.0 stem_postgres: ^0.1.0 stem_sqlite: ^0.1.0 diff --git a/packages/stem_cli/test/unit/cli/cli_worker_stats_test.dart b/packages/stem_cli/test/unit/cli/cli_worker_stats_test.dart index 5c759544..e0c16460 100644 --- a/packages/stem_cli/test/unit/cli/cli_worker_stats_test.dart +++ b/packages/stem_cli/test/unit/cli/cli_worker_stats_test.dart @@ -510,4 +510,3 @@ Future _assertTaskRemainsQueued( await Future.delayed(const Duration(milliseconds: 20)); } } - diff --git a/packages/stem_memory/pubspec.yaml b/packages/stem_memory/pubspec.yaml index cdf0dfb2..edea5267 100644 --- a/packages/stem_memory/pubspec.yaml +++ b/packages/stem_memory/pubspec.yaml @@ -8,7 +8,7 @@ environment: dependencies: collection: ^1.19.1 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" uuid: ^4.5.2 dev_dependencies: diff --git a/packages/stem_postgres/lib/src/workflow/postgres_workflow_store.dart b/packages/stem_postgres/lib/src/workflow/postgres_workflow_store.dart index bf9c5bf6..c6f867af 100644 --- a/packages/stem_postgres/lib/src/workflow/postgres_workflow_store.dart +++ b/packages/stem_postgres/lib/src/workflow/postgres_workflow_store.dart @@ -106,8 +106,7 @@ class PostgresWorkflowStore implements WorkflowStore { Duration? ttl, WorkflowCancellationPolicy? cancellationPolicy, }) async { - final id = - (runId != null && runId.trim().isNotEmpty) + final id = (runId != null && runId.trim().isNotEmpty) ? runId.trim() : _uuid.v7(); final now = _clock.now().toUtc(); diff --git a/packages/stem_postgres/lib/src/workflow/postgres_workflow_store_new.dart b/packages/stem_postgres/lib/src/workflow/postgres_workflow_store_new.dart index 262c467b..ec883b7a 100644 --- a/packages/stem_postgres/lib/src/workflow/postgres_workflow_store_new.dart +++ b/packages/stem_postgres/lib/src/workflow/postgres_workflow_store_new.dart @@ -81,8 +81,7 @@ class PostgresWorkflowStore implements WorkflowStore { Duration? ttl, WorkflowCancellationPolicy? cancellationPolicy, }) async { - final id = - (runId != null && runId.trim().isNotEmpty) + final id = (runId != null && runId.trim().isNotEmpty) ? runId.trim() : _uuid.v7(); final now = _clock.now().toUtc(); diff --git a/packages/stem_postgres/pubspec.yaml b/packages/stem_postgres/pubspec.yaml index 658c02ad..7bdcdeb6 100644 --- a/packages/stem_postgres/pubspec.yaml +++ b/packages/stem_postgres/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: ormed_postgres: ^0.2.0 path: ^1.9.1 postgres: ^3.5.9 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" uuid: ^4.5.2 dev_dependencies: diff --git a/packages/stem_redis/lib/src/workflow/redis_workflow_store.dart b/packages/stem_redis/lib/src/workflow/redis_workflow_store.dart index 7069ded3..064862e5 100644 --- a/packages/stem_redis/lib/src/workflow/redis_workflow_store.dart +++ b/packages/stem_redis/lib/src/workflow/redis_workflow_store.dart @@ -357,8 +357,7 @@ return 1 }) async { final now = _clock.now(); final nowIso = now.toIso8601String(); - final id = - (runId != null && runId.trim().isNotEmpty) + final id = (runId != null && runId.trim().isNotEmpty) ? runId.trim() : 'wf-${now.microsecondsSinceEpoch}-${_idCounter++}'; final result = await _send([ diff --git a/packages/stem_redis/pubspec.yaml b/packages/stem_redis/pubspec.yaml index fde9cf43..3cdf8170 100644 --- a/packages/stem_redis/pubspec.yaml +++ b/packages/stem_redis/pubspec.yaml @@ -10,7 +10,7 @@ dependencies: async: ^2.13.0 collection: ^1.19.1 redis: ^4.0.0 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" uuid: ^4.5.2 dev_dependencies: diff --git a/packages/stem_sqlite/lib/src/workflow/sqlite_workflow_store.dart b/packages/stem_sqlite/lib/src/workflow/sqlite_workflow_store.dart index 02ed45d4..6449b91d 100644 --- a/packages/stem_sqlite/lib/src/workflow/sqlite_workflow_store.dart +++ b/packages/stem_sqlite/lib/src/workflow/sqlite_workflow_store.dart @@ -90,8 +90,7 @@ class SqliteWorkflowStore implements WorkflowStore { WorkflowCancellationPolicy? cancellationPolicy, }) async { final now = _clock.now().toUtc(); - final id = - (runId != null && runId.trim().isNotEmpty) + final id = (runId != null && runId.trim().isNotEmpty) ? runId.trim() : 'wf-${now.microsecondsSinceEpoch}-${_idCounter++}'; final policyJson = cancellationPolicy == null || cancellationPolicy.isEmpty diff --git a/packages/stem_sqlite/pubspec.yaml b/packages/stem_sqlite/pubspec.yaml index 774daa4a..cd70a607 100644 --- a/packages/stem_sqlite/pubspec.yaml +++ b/packages/stem_sqlite/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: ormed: ^0.2.0 ormed_sqlite: ^0.2.0 path: ^1.9.1 - stem: ^0.2.0 + stem: ">=0.2.0-dev <0.3.0" uuid: ^4.5.2 dev_dependencies: