feat: resource system, native async, KEDA integration#27
Merged
pratyush618 merged 16 commits intomasterfrom Mar 13, 2026
Merged
feat: resource system, native async, KEDA integration#27pratyush618 merged 16 commits intomasterfrom
pratyush618 merged 16 commits intomasterfrom
Conversation
Describes the architecture for argument interception, worker-scoped dependency injection, and transparent resource proxies. Used as the implementation blueprint for the resource system feature.
Three-layer resource system for worker dependency injection: Layer 1 — Argument Interception: Classify and transform task arguments before serialization using PASS/CONVERT/REDIRECT/PROXY/REJECT strategies. Built-in converters for UUID, datetime, Decimal, Path, Enum, dataclasses, Pydantic. Strict/lenient/off modes with depth-limited recursive walking. Layer 2 — Worker Resource Runtime: Declare resources with @queue.worker_resource(), inject into tasks with @queue.task(inject=["db"]). Topological init order, reverse teardown, health checking with automatic recreation. test_mode() supports mock resource injection. Layer 3 — Resource Proxies: Transparent deconstruct/reconstruct for non-serializable objects (files, loggers, HTTP clients). Identity deduplication, LIFO cleanup. Also includes observability integration: resource status API, Prometheus metrics, CLI subcommand, dashboard endpoint, health check integration. New packages: interception/ (9 files), resources/ (5), proxies/ (8) Tests: 46 + 20 + 29 + 16 = 111 new tests
Add resources, resource_health, and threads columns to the workers table across all three backends (SQLite, Postgres, Redis). Workers now advertise their registered resources and health status via heartbeats. - Move heartbeat from Rust thread to Python daemon thread for access to resource runtime health state - Generate worker ID in Python, pass to Rust via new worker_id parameter - Heartbeat sends resource_health JSON snapshot every 5 seconds - list_workers returns new fields in all backends - Updated PyO3 bindings and type stubs Tests: 9 new worker resource tests
Interception layer (Phase A): - NamedTuple/OrderedDict auto-detection and conversion - Lambda and tempfile rejection in strict mode - Missing REDIRECT/REJECT/PROXY entries in built-in registry - Strategy count and max depth tracking in WalkResult Proxy security (Phase B): - HMAC-SHA256 recipe signing and verification - Schema validation with FieldSpec constraints - Path allowlist for file handler with traversal prevention - NoProxy wrapper to bypass interception - Reconstruction timeout via ThreadPoolExecutor Cloud handlers (Phase C): - Boto3 Client/Resource handler (credentials excluded from recipe) - GCS Client/Bucket/Blob handler (ambient ADC credentials) Resource scopes (Phase D): - WORKER/TASK/THREAD/REQUEST scope support - ResourcePool with semaphore-based capacity management - ThreadLocalStore for thread-scoped resources - FrozenResource proxy preventing mutation Additional modules (Phases E-I): - Inject["name"] annotation syntax via metaclass - TOML config loading for resource definitions - Scope-aware ResourceRuntime with hot reload support - InterceptionMetrics and ProxyMetrics with thread-safe tracking
App integration: - task() decorator detects Inject["name"] annotations via get_type_hints() - _wrap_task() performs scope-aware injection with release callbacks - run_worker() initializes/tears down resources, SIGHUP hot reload - New methods: load_resources(), proxy_stats(), interception_stats(), register_type(), worker_resource() with pool/scope params - enqueue() skips interception in test mode Testing: - MockResource class with return_value, wraps, and call tracking - TestMode gains resources param, sets _test_mode_active flag - Proxy passthrough in test mode Observability: - Prometheus gauges for proxy reconstruct, interception strategy, pool size/active/idle/timeouts - Dashboard /api/proxy-stats and /api/interception-stats endpoints - CLI reload subcommand (sends SIGHUP) - Exports: Inject, NoProxy, MockResource
Covers all 13 phases (A-M) of the resource system: - NamedTuple/OrderedDict conversion, lambda/tempfile rejection - HMAC signing, schema validation, NoProxy wrapper - Cloud handler detection (boto3/GCS type checks) - Resource scopes (pool, thread-local, frozen) - Inject annotation detection and merging - TOML config loading - Proxy and interception metrics - MockResource test doubles - Test-mode proxy passthrough - End-to-end injection in test mode
Add aws = ["boto3>=1.20"] and gcs = ["google-cloud-storage>=2.0"] optional dependency groups for cloud proxy handlers.
Library consumers should resolve their own dependency versions.
…mplates Extract build_scaler_response() from dashboard.py with fixed worker utilization (now uses live worker count), liveWorkers/totalCapacity/ targetQueueDepth fields, and per-queue metric namespacing. Add standalone `taskito scaler` CLI subcommand on port 9091 with /api/scaler, /metrics, and /health endpoints — lightweight server for KEDA external scaler or HTTP trigger adapter. Add queue label to taskito_worker_utilization Prometheus gauge for per-queue KEDA Prometheus scaler support. Add deploy/keda/ with ScaledObject (HTTP), ScaledObject (Prometheus), and ScaledJob YAML templates. 17 new tests covering the scaler endpoint contract.
Break the monolithic 1,226-line dashboard.html into 8 focused files: - dashboard.html (40 lines) — HTML shell with placeholders - dashboard.css (368 lines) — all styles - js/utils.js — fmtTime, escHtml, escAttr - js/components.js — statsHTML, jobTableHTML, badgeHTML, progressHTML - js/views.js — all page renderers (overview, jobs, metrics, etc.) - js/chart.js — throughput canvas chart + DAG SVG rendering - js/actions.js — cancelJob, replayJob, retryDead, pagination - js/app.js — state, API helpers, router, refresh, boot _load_spa_html() composes them into a single HTML response at startup (cached), maintaining the zero-dependency serving approach.
New `taskito-async` crate with dual-dispatch worker pool: async tasks run natively on a dedicated Python event loop, sync tasks use spawn_blocking. Feature-gated via `native-async` cargo feature. Extract all async code into `py_src/taskito/async_support/` package: AsyncQueueMixin, AsyncDistributedLock, AsyncJobResultMixin, and centralized `run_maybe_async()` helper. Remove dead `loop` parameters and `run_coroutine_threadsafe` branches from all resource files.
Python 3.12 + PyO3 can SIGABRT during interpreter cleanup after all tests pass. Capture results via JUnit XML and treat exit code 134 as success when zero test failures are recorded.
Python 3.9 reached EOL. Native async support uses asyncio primitives that require 3.10+. Updated pyproject.toml, CI matrix, and publish workflow.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
taskito-asyncRust crate with dual-dispatch worker pool — async tasks run natively on a dedicated Python event loop, sync tasks usespawn_blocking. Full async/sync separation with all async code inasync_support/package