diff --git a/Cargo.lock b/Cargo.lock index 69b6ab3..d2d7879 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -676,6 +676,7 @@ version = "0.1.0" dependencies = [ "bcrypt", "chrono", + "clap", "color-eyre", "dotenvy", "fs_extra", @@ -686,6 +687,7 @@ dependencies = [ "poem-openapi", "pretty_assertions", "rand 0.9.2", + "reqwest", "rust-embed", "scraper", "serde", @@ -917,9 +919,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasi 0.14.3+wasi-0.2.4", + "wasm-bindgen", ] [[package]] @@ -1107,6 +1111,24 @@ dependencies = [ "pin-utils", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots 1.0.2", ] [[package]] @@ -1115,13 +1137,22 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ + "base64", "bytes 1.10.1", + "futures-channel", "futures-core", + "futures-util", "http", "http-body", "hyper", + "ipnet", + "libc", + "percent-encoding", "pin-project-lite", + "socket2", "tokio", + "tower-service", + "tracing", ] [[package]] @@ -1297,6 +1328,22 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + +[[package]] +name = "iri-string" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1425,6 +1472,12 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "mac" version = "0.1.1" @@ -2056,6 +2109,61 @@ dependencies = [ "serde", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes 1.10.1", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes 1.10.1", + "getrandom 0.3.3", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.40" @@ -2168,6 +2276,44 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes 1.10.1", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 1.0.2", +] + [[package]] name = "rfc7239" version = "0.1.3" @@ -2251,6 +2397,12 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + [[package]] name = "rustix" version = "1.0.8" @@ -2284,6 +2436,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ + "web-time", "zeroize", ] @@ -2462,6 +2615,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -2987,6 +3150,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "slab", "socket2", "tokio-macros", @@ -3004,6 +3168,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.17" @@ -3046,6 +3220,51 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags", + "bytes 1.10.1", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.41" @@ -3114,6 +3333,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.18.0" @@ -3279,6 +3504,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -3326,6 +3560,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.100" @@ -3358,6 +3605,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web_atoms" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index b99ae90..4253c04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ license = "AGPL-3.0" [dependencies] bcrypt = "0.17.1" +clap = { version = "4.5.38", features = ["derive"] } chrono = "0.4.41" color-eyre = "0.6.5" dotenvy = "0.15.7" @@ -15,13 +16,14 @@ petgraph = "0.8.2" poem = { version = "3.1.12", features = ["embed"] } poem-openapi = { version = "5.1.16", features = ["chrono", "swagger-ui", "uuid"] } rand = "0.9.2" +reqwest = { version = "0.12.15", default-features = false, features = ["json", "rustls-tls"] } rust-embed = "8.7.2" serde = "1.0.219" serde_json = "1.0.143" sqlx = { version = "0.8.6", features = ["chrono", "postgres", "runtime-tokio", "tls-rustls", "uuid"] } strum = { version = "0.27.2", features = ["derive"] } thiserror = "2.0.16" -tokio = { version = "1.47.1", features = ["rt-multi-thread"] } +tokio = { version = "1.47.1", features = ["io-util", "macros", "process", "rt-multi-thread"] } tracing = "0.1.41" tracing-subscriber = "0.3.20" uuid = { version = "1.18.0", features = ["v4"] } diff --git a/build.rs b/build.rs index f5ea8da..fbc3533 100644 --- a/build.rs +++ b/build.rs @@ -53,6 +53,19 @@ fn main() { ) .expect("Failed to copy viz-global.js from node_modules"); + // Populate Assets directory with Nice-DAG core + fs::create_dir_all("assets/nice-dag").expect("Failed to create assets/nice-dag directory"); + fs::copy( + "node_modules/@ebay/nice-dag-core/lib/index.umd.cjs", + "assets/nice-dag/nice-dag-core.js", + ) + .expect("Failed to copy nice-dag-core.js from node_modules"); + + // Populate Assets directory with Fletcher JS helpers + fs::create_dir_all("assets/js").expect("Failed to create assets/js directory"); + fs::copy("scripts/plan-dag.js", "assets/js/plan-dag.js") + .expect("Failed to copy plan-dag.js into assets"); + // Populate Assets directory with Prism.js fs::create_dir_all("assets/prism").expect("Failed to create assets/prism directory"); fs::copy("node_modules/prismjs/prism.js", "assets/prism/prism.js") diff --git a/migrations/20260410170000_execution_log.down.sql b/migrations/20260410170000_execution_log.down.sql new file mode 100644 index 0000000..4dc0049 --- /dev/null +++ b/migrations/20260410170000_execution_log.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS execution_log_dataset_product_run_idx; +DROP TABLE IF EXISTS execution_log; diff --git a/migrations/20260410170000_execution_log.up.sql b/migrations/20260410170000_execution_log.up.sql new file mode 100644 index 0000000..674ff2d --- /dev/null +++ b/migrations/20260410170000_execution_log.up.sql @@ -0,0 +1,14 @@ +CREATE TABLE execution_log ( + log_id BIGSERIAL PRIMARY KEY, + dataset_id UUID NOT NULL, + data_product_id UUID NOT NULL, + run_id UUID NOT NULL, + stream TEXT NOT NULL, + message TEXT NOT NULL, + created_by TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + FOREIGN KEY(dataset_id, data_product_id) REFERENCES data_product(dataset_id, data_product_id) +); + +CREATE INDEX execution_log_dataset_product_run_idx + ON execution_log (dataset_id, data_product_id, run_id, log_id); diff --git a/migrations/20260410190000_plan_runs.down.sql b/migrations/20260410190000_plan_runs.down.sql new file mode 100644 index 0000000..4225f09 --- /dev/null +++ b/migrations/20260410190000_plan_runs.down.sql @@ -0,0 +1,4 @@ +DROP TABLE plan_run_dependency; +DROP TABLE plan_run_data_product; +DROP INDEX plan_run_dataset_created_idx; +DROP TABLE plan_run; diff --git a/migrations/20260410190000_plan_runs.up.sql b/migrations/20260410190000_plan_runs.up.sql new file mode 100644 index 0000000..71523b4 --- /dev/null +++ b/migrations/20260410190000_plan_runs.up.sql @@ -0,0 +1,43 @@ +CREATE TABLE plan_run ( + plan_run_id UUID PRIMARY KEY, + dataset_id UUID NOT NULL, + status state NOT NULL, + created_by TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + modified_by TEXT NOT NULL, + modified_date TIMESTAMPTZ NOT NULL, + FOREIGN KEY(dataset_id) REFERENCES dataset(dataset_id) +); + +CREATE INDEX plan_run_dataset_created_idx + ON plan_run(dataset_id, created_at DESC); + +CREATE TABLE plan_run_data_product ( + plan_run_id UUID NOT NULL, + dataset_id UUID NOT NULL, + data_product_id UUID NOT NULL, + compute compute NOT NULL, + name TEXT NOT NULL, + version TEXT NOT NULL, + eager BOOL NOT NULL, + state state NOT NULL, + step_run_id UUID, + link TEXT, + passback JSONB, + modified_by TEXT NOT NULL, + modified_date TIMESTAMPTZ NOT NULL, + PRIMARY KEY(plan_run_id, data_product_id), + FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE +); + +CREATE INDEX plan_run_data_product_step_run_idx + ON plan_run_data_product(step_run_id); + +CREATE TABLE plan_run_dependency ( + plan_run_id UUID NOT NULL, + dataset_id UUID NOT NULL, + parent_id UUID NOT NULL, + child_id UUID NOT NULL, + PRIMARY KEY(plan_run_id, parent_id, child_id), + FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE +); diff --git a/package-lock.json b/package-lock.json index 3052528..a804680 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,6 +5,7 @@ "packages": { "": { "dependencies": { + "@ebay/nice-dag-core": "^1.0.41", "@tailwindcss/cli": "^4.1.11", "@viz-js/viz": "^3.16.0", "daisyui": "^5.0.50", @@ -31,6 +32,17 @@ "node": ">=6.0.0" } }, + "node_modules/@ebay/nice-dag-core": { + "version": "1.0.41", + "resolved": "https://registry.npmjs.org/@ebay/nice-dag-core/-/nice-dag-core-1.0.41.tgz", + "integrity": "sha512-l1cAgBuvI8nFPxssgjEkGq4l6ss4kysJcHcwN+uA4l/x95Mco2JQHt6oQ7hNd+5q1toVGrM5Ib32QfiPnEs04g==", + "license": "MIT", + "dependencies": { + "@types/dagre": "^0.7.46", + "dagre": "^0.8.5", + "html2canvas": "^1.4.1" + } + }, "node_modules/@isaacs/fs-minipass": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@isaacs/fs-minipass/-/fs-minipass-4.0.1.tgz", @@ -699,6 +711,12 @@ "node": ">=8" } }, + "node_modules/@types/dagre": { + "version": "0.7.54", + "resolved": "https://registry.npmjs.org/@types/dagre/-/dagre-0.7.54.tgz", + "integrity": "sha512-QjcRY+adGbYvBFS7cwv5txhVIwX1XXIUswWl+kSQTbI6NjgZydrZkEKX/etzVd7i+bCsCb40Z/xlBY5eoFuvWQ==", + "license": "MIT" + }, "node_modules/@types/debug": { "version": "4.1.12", "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", @@ -743,6 +761,15 @@ "dev": true, "license": "Python-2.0" }, + "node_modules/base64-arraybuffer": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-1.0.2.tgz", + "integrity": "sha512-I3yl4r9QB5ZRY3XuJVEPfc2XhZO6YweFPI+UovAzn+8/hb3oJ6lnysaFcjVpkCPfVWFUDvoZ8kmVDP7WyRtYtQ==", + "license": "MIT", + "engines": { + "node": ">= 0.6.0" + } + }, "node_modules/braces": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", @@ -807,6 +834,25 @@ "node": ">= 12" } }, + "node_modules/css-line-break": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/css-line-break/-/css-line-break-2.1.0.tgz", + "integrity": "sha512-FHcKFCZcAha3LwfVBhCQbW2nCNbkZXn7KVUJcsT5/P8YmfsVja0FMPJr0B903j/E69HUphKiV9iQArX8SDYA4w==", + "license": "MIT", + "dependencies": { + "utrie": "^1.0.2" + } + }, + "node_modules/dagre": { + "version": "0.8.5", + "resolved": "https://registry.npmjs.org/dagre/-/dagre-0.8.5.tgz", + "integrity": "sha512-/aTqmnRta7x7MCCpExk7HQL2O4owCT2h8NT//9I1OQ9vt29Pa0BzSAkR5lwFUcQ7491yVi/3CXU9jQ5o0Mn2Sw==", + "license": "MIT", + "dependencies": { + "graphlib": "^2.1.8", + "lodash": "^4.17.15" + } + }, "node_modules/daisyui": { "version": "5.0.50", "resolved": "https://registry.npmjs.org/daisyui/-/daisyui-5.0.50.tgz", @@ -989,6 +1035,28 @@ "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==", "license": "ISC" }, + "node_modules/graphlib": { + "version": "2.1.8", + "resolved": "https://registry.npmjs.org/graphlib/-/graphlib-2.1.8.tgz", + "integrity": "sha512-jcLLfkpoVGmH7/InMC/1hIvOPSUh38oJtGhvrOFGzioE1DZ+0YW16RgmOJhHiuWTvGiJQ9Z1Ik43JvkRPRvE+A==", + "license": "MIT", + "dependencies": { + "lodash": "^4.17.15" + } + }, + "node_modules/html2canvas": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/html2canvas/-/html2canvas-1.4.1.tgz", + "integrity": "sha512-fPU6BHNpsyIhr8yyMpTLLxAbkaK8ArIBcmZIRiBLiDhjeqvXolaEmDGmELFuX9I4xDcaKKcJl+TKZLqruBbmWA==", + "license": "MIT", + "dependencies": { + "css-line-break": "^2.1.0", + "text-segmentation": "^1.0.3" + }, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/htmx.org": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/htmx.org/-/htmx.org-2.0.6.tgz", @@ -1376,6 +1444,12 @@ "uc.micro": "^2.0.0" } }, + "node_modules/lodash": { + "version": "4.18.1", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.18.1.tgz", + "integrity": "sha512-dMInicTPVE8d1e5otfwmmjlxkZoUpiVLwyeTdUsi/Caj/gfzzblBcCE5sRHV/AsjuCmxWrte2TNGSYuCeCq+0Q==", + "license": "MIT" + }, "node_modules/magic-string": { "version": "0.30.17", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.17.tgz", @@ -2283,6 +2357,15 @@ "node": ">=18" } }, + "node_modules/text-segmentation": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/text-segmentation/-/text-segmentation-1.0.3.tgz", + "integrity": "sha512-iOiPUo/BGnZ6+54OsWxZidGCsdU8YbE4PSpdPinp7DeMtUJNJBoJ/ouUSTJjHkh1KntHaltHl/gDs2FC4i5+Nw==", + "license": "MIT", + "dependencies": { + "utrie": "^1.0.2" + } + }, "node_modules/to-regex-range": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz", @@ -2315,6 +2398,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/utrie": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/utrie/-/utrie-1.0.2.tgz", + "integrity": "sha512-1MLa5ouZiOmQzUbjbu9VmjLzn1QLXBhwpUa7kdLUQK+KQ5KA9I1vk5U4YHe/X2Ch7PYnJfWuWT+VbuxbGwljhw==", + "license": "MIT", + "dependencies": { + "base64-arraybuffer": "^1.0.2" + } + }, "node_modules/yallist": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-5.0.0.tgz", diff --git a/package.json b/package.json index 1b4441c..a526da4 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "markdownlint-cli2": "^0.18.1" }, "dependencies": { + "@ebay/nice-dag-core": "^1.0.41", "@tailwindcss/cli": "^4.1.11", "@viz-js/viz": "^3.16.0", "daisyui": "^5.0.50", diff --git a/scripts/plan-dag.js b/scripts/plan-dag.js new file mode 100644 index 0000000..752b1f6 --- /dev/null +++ b/scripts/plan-dag.js @@ -0,0 +1,456 @@ +"use strict"; + +(function () { + const instances = new Map(); + const MIN_SCALE = 0.5; + const MAX_SCALE = 1.6; + + function getNodeRecord(widgetId) { + return instances.get(widgetId) || null; + } + + function destroyRecord(record) { + if (!record) return; + if (record.resizeObserver && typeof record.resizeObserver.disconnect === "function") { + record.resizeObserver.disconnect(); + } + if (record.dag && typeof record.dag.destory === "function") { + record.dag.destory(); + } + } + + function clampScale(scale) { + return Math.max(MIN_SCALE, Math.min(MAX_SCALE, scale)); + } + + function updateZoomLabel(record) { + if (!record || !record.widget) return; + const label = record.widget.querySelector("[data-role='zoom-label']"); + if (label) { + label.textContent = `${Math.round((record.scale || 1) * 100)}%`; + } + } + + function centerDag(record) { + if (!record || !record.dag || !record.canvas) return; + record.dag.center({ + width: Math.max(record.canvas.clientWidth || 0, 320), + height: Math.max(record.canvas.clientHeight || 0, 320), + }); + } + + function setDagScale(record, nextScale) { + if (!record || !record.dag) return; + record.scale = clampScale(nextScale); + record.dag.setScale(record.scale); + centerDag(record); + updateZoomLabel(record); + } + + function sizeCanvas(canvas, nodes) { + if (!canvas) return 320; + const nodeHeight = Math.max(320, (Array.isArray(nodes) ? nodes.length : 0) * 110); + const viewportHeight = Math.max( + 320, + window.innerHeight - canvas.getBoundingClientRect().top - 40, + ); + const targetHeight = Math.max(320, Math.min(nodeHeight, viewportHeight)); + canvas.style.height = `${targetHeight}px`; + return targetHeight; + } + + function fitDag(record) { + if (!record || !record.dag || !record.canvas) return; + const content = record.dag.getRootContentElement(); + if (!content) return; + const currentScale = record.scale || 1; + const measuredWidth = + Math.max(content.scrollWidth || 0, content.getBoundingClientRect().width || 0, 1); + const measuredHeight = + Math.max(content.scrollHeight || 0, content.getBoundingClientRect().height || 0, 1); + const naturalWidth = measuredWidth / currentScale; + const naturalHeight = measuredHeight / currentScale; + const availableWidth = Math.max(record.canvas.clientWidth - 32, 240); + const availableHeight = Math.max(record.canvas.clientHeight - 32, 240); + const nextScale = clampScale( + Math.min(availableWidth / naturalWidth, availableHeight / naturalHeight, 1), + ); + setDagScale(record, nextScale); + } + + function stateColors(state) { + switch (state) { + case "success": + return ["#22c55e", "rgba(34, 197, 94, 0.12)", "#166534", "#dcfce7"]; + case "running": + return ["#38bdf8", "rgba(56, 189, 248, 0.12)", "#075985", "#e0f2fe"]; + case "failed": + return ["#ef4444", "rgba(239, 68, 68, 0.12)", "#991b1b", "#fee2e2"]; + case "queued": + return ["#6b7280", "rgba(107, 114, 128, 0.12)", "#374151", "#f3f4f6"]; + case "defined": + return ["#d97706", "rgba(245, 158, 11, 0.12)", "#92400e", "#fef3c7"]; + case "waiting": + default: + return ["#9ca3af", "rgba(156, 163, 175, 0.12)", "#4b5563", "#f3f4f6"]; + } + } + + function applyNodeElementState(wrapper, node, selectedNodeId) { + const data = node.data || {}; + const [border, background, badgeColor, badgeBackground] = stateColors(data.state); + const isSelected = selectedNodeId && node.id === selectedNodeId; + wrapper.dataset.fletcherNodeId = node.id; + wrapper.style.border = `2px solid ${border}`; + wrapper.style.background = background; + wrapper.style.boxShadow = isSelected + ? "0 0 0 3px rgba(251, 191, 36, 0.45), 0 10px 24px rgba(15, 23, 42, 0.16)" + : "0 6px 18px rgba(15, 23, 42, 0.08)"; + const title = wrapper.querySelector("[data-role='title']"); + const version = wrapper.querySelector("[data-role='version']"); + const badge = wrapper.querySelector("[data-role='badge']"); + const compute = wrapper.querySelector("[data-role='compute']"); + const run = wrapper.querySelector("[data-role='run']"); + if (title) title.textContent = data.name || node.id; + if (version) version.textContent = data.version || ""; + if (compute) compute.textContent = `Compute: ${data.compute || "-"}`; + if (badge) { + badge.textContent = data.state || "unknown"; + badge.style.background = badgeBackground; + badge.style.color = badgeColor; + } + if (run) { + if (data.run_id) { + run.textContent = `Run: ${data.run_id.slice(0, 8)}…`; + run.style.display = "block"; + } else { + run.textContent = ""; + run.style.display = "none"; + } + } + } + + function createNodeElement(node, selectedNodeId, selectionInfo) { + const data = node.data || {}; + + const wrapper = document.createElement("button"); + wrapper.type = "button"; + wrapper.style.width = "220px"; + wrapper.style.minHeight = "88px"; + wrapper.style.boxSizing = "border-box"; + wrapper.style.borderRadius = "12px"; + wrapper.style.padding = "12px"; + wrapper.style.display = "flex"; + wrapper.style.flexDirection = "column"; + wrapper.style.gap = "8px"; + wrapper.style.fontFamily = + "ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif"; + wrapper.style.textAlign = "left"; + wrapper.style.cursor = selectionInfo ? "pointer" : "default"; + + const top = document.createElement("div"); + top.style.display = "flex"; + top.style.justifyContent = "space-between"; + top.style.gap = "8px"; + top.style.alignItems = "flex-start"; + + const titleBlock = document.createElement("div"); + + const title = document.createElement("div"); + title.dataset.role = "title"; + title.style.fontWeight = "700"; + title.style.color = "#111827"; + title.style.lineHeight = "1.2"; + title.textContent = data.name || node.id; + + const version = document.createElement("div"); + version.dataset.role = "version"; + version.style.fontSize = "12px"; + version.style.opacity = "0.7"; + version.textContent = data.version || ""; + + titleBlock.appendChild(title); + titleBlock.appendChild(version); + + const badge = document.createElement("span"); + badge.dataset.role = "badge"; + badge.textContent = data.state || "unknown"; + badge.style.alignSelf = "flex-start"; + badge.style.padding = "2px 8px"; + badge.style.borderRadius = "9999px"; + badge.style.fontSize = "12px"; + badge.style.fontWeight = "700"; + badge.style.textTransform = "uppercase"; + + top.appendChild(titleBlock); + top.appendChild(badge); + + const meta = document.createElement("div"); + meta.style.fontSize = "12px"; + meta.style.color = "#4b5563"; + meta.style.display = "flex"; + meta.style.flexDirection = "column"; + meta.style.gap = "4px"; + + const compute = document.createElement("div"); + compute.dataset.role = "compute"; + compute.textContent = `Compute: ${data.compute || "-"}`; + meta.appendChild(compute); + + const run = document.createElement("div"); + run.dataset.role = "run"; + meta.appendChild(run); + + if (selectionInfo) { + wrapper.addEventListener("click", function () { + window.dispatchEvent( + new CustomEvent("fletcher:dag-step-selected", { + detail: { + datasetId: selectionInfo.datasetId, + planRunId: selectionInfo.planRunId, + dataProductId: node.id, + }, + }), + ); + }); + } else { + wrapper.disabled = true; + } + + wrapper.appendChild(top); + wrapper.appendChild(meta); + applyNodeElementState(wrapper, node, selectedNodeId); + return wrapper; + } + + function updatePlanDagSelection(widgetId, selectedNodeId) { + const record = getNodeRecord(widgetId); + const widget = document.getElementById(widgetId); + if (!record || !widget) return; + record.selectedNodeId = selectedNodeId || ""; + record.nodes.forEach(function (node, nodeId) { + const element = widget.querySelector(`[data-fletcher-node-id="${nodeId}"]`); + if (element) applyNodeElementState(element, node, record.selectedNodeId); + }); + } + + function updatePlanDagWidgetNodes(widgetId, nodes) { + const record = getNodeRecord(widgetId); + const widget = document.getElementById(widgetId); + if (!record || !widget || !Array.isArray(nodes)) return; + record.nodes = new Map(nodes.map(function (node) { + return [node.id, node]; + })); + sizeCanvas(record.canvas, nodes); + nodes.forEach(function (node) { + const element = widget.querySelector(`[data-fletcher-node-id="${node.id}"]`); + if (element) applyNodeElementState(element, node, record.selectedNodeId); + }); + } + + function installPanBehavior(record) { + const canvas = record.canvas; + if (!canvas || canvas.dataset.fletcherPanBound === "true") return; + canvas.dataset.fletcherPanBound = "true"; + + let dragging = false; + let startX = 0; + let startY = 0; + let scrollLeft = 0; + let scrollTop = 0; + + canvas.addEventListener("mousedown", function (event) { + if (event.button !== 0) return; + if (event.target && event.target.closest("[data-fletcher-node-id]")) return; + dragging = true; + startX = event.clientX; + startY = event.clientY; + scrollLeft = canvas.scrollLeft; + scrollTop = canvas.scrollTop; + canvas.style.cursor = "grabbing"; + event.preventDefault(); + }); + + window.addEventListener("mousemove", function (event) { + if (!dragging) return; + canvas.scrollLeft = scrollLeft - (event.clientX - startX); + canvas.scrollTop = scrollTop - (event.clientY - startY); + }); + + window.addEventListener("mouseup", function () { + dragging = false; + canvas.style.cursor = "grab"; + }); + } + + function installControlBehavior(record) { + const widget = record.widget; + if (!widget || widget.dataset.fletcherControlsBound === "true") return; + widget.dataset.fletcherControlsBound = "true"; + + widget.addEventListener("click", function (event) { + const action = event.target && event.target.dataset + ? event.target.dataset.planAction + : null; + if (!action) return; + if (action === "zoom-in") setDagScale(record, (record.scale || 1) + 0.1); + if (action === "zoom-out") setDagScale(record, (record.scale || 1) - 0.1); + if (action === "fit") fitDag(record); + }); + + record.canvas.addEventListener("wheel", function (event) { + if (!event.ctrlKey && !event.metaKey) return; + event.preventDefault(); + const delta = event.deltaY < 0 ? 0.1 : -0.1; + setDagScale(record, (record.scale || 1) + delta); + }, { passive: false }); + } + + function installResizeBehavior(record) { + if (!record || !record.widget || record.widget.dataset.fletcherResizeBound === "true") return; + record.widget.dataset.fletcherResizeBound = "true"; + + const resizeHandler = function () { + sizeCanvas(record.canvas, Array.from(record.nodes.values())); + fitDag(record); + }; + + if (window.ResizeObserver) { + record.resizeObserver = new ResizeObserver(resizeHandler); + record.resizeObserver.observe(record.widget); + } + window.addEventListener("resize", resizeHandler); + } + + function renderDagWidget(widget) { + if (!widget || !window.niceDag) return; + + const canvas = widget.querySelector(".fletcher-dag-canvas"); + const dataScript = widget.querySelector(".fletcher-dag-data"); + if (!canvas || !dataScript) return; + + const widgetId = widget.id || canvas.id || Math.random().toString(16).slice(2); + const existing = instances.get(widgetId); + if (existing && existing.widget === widget) { + return; + } + if (existing) { + destroyRecord(existing); + instances.delete(widgetId); + } + + const nodes = JSON.parse(dataScript.textContent || "[]"); + const selectedNodeId = widget.dataset.selectedNodeId || ""; + const selectionInfo = + widget.dataset.datasetId && widget.dataset.planRunId + ? { + datasetId: widget.dataset.datasetId, + planRunId: widget.dataset.planRunId, + } + : null; + + canvas.innerHTML = ""; + canvas.style.position = "relative"; + canvas.style.width = "100%"; + canvas.style.cursor = "grab"; + sizeCanvas(canvas, nodes); + + if (!Array.isArray(nodes) || nodes.length === 0) { + const empty = document.createElement("div"); + empty.textContent = "No plan steps to display."; + empty.style.padding = "1rem"; + empty.style.borderRadius = "12px"; + empty.style.background = "rgba(148, 163, 184, 0.12)"; + empty.style.color = "#475569"; + canvas.appendChild(empty); + return; + } + + try { + const dag = window.niceDag.init({ + container: canvas, + mode: "DEFAULT", + omitJointBeforeEnd: true, + graphLabel: { + rankdir: "LR", + nodesep: 40, + ranksep: 70, + }, + rootViewPadding: { + top: 20, + right: 24, + bottom: 20, + left: 24, + }, + getNodeSize: function getNodeSize() { + return { width: 220, height: 88 }; + }, + getEdgeAttributes: function getEdgeAttributes() { + return { color: "#94a3b8" }; + }, + mapNodeToElement: function mapNodeToElement(node) { + return createNodeElement(node, selectedNodeId, selectionInfo); + }, + }); + + dag.withNodes(nodes).render(); + const record = { + widget: widget, + canvas: canvas, + dag: dag, + nodes: new Map(nodes.map(function (node) { + return [node.id, node]; + })), + selectedNodeId: selectedNodeId, + scale: 1, + }; + instances.set(widgetId, record); + installPanBehavior(record); + installControlBehavior(record); + installResizeBehavior(record); + requestAnimationFrame(function () { + fitDag(record); + }); + } catch (error) { + console.error("Failed to render Fletcher DAG widget", error); + const failure = document.createElement("div"); + failure.textContent = "Failed to render plan."; + failure.style.padding = "1rem"; + failure.style.borderRadius = "12px"; + failure.style.background = "rgba(239, 68, 68, 0.12)"; + failure.style.color = "#991b1b"; + canvas.appendChild(failure); + } + } + + function initPlanDagWidgets(root) { + const scope = root && root.querySelectorAll ? root : document; + const widgets = []; + if (scope.matches && scope.matches(".fletcher-dag-widget")) { + widgets.push(scope); + } + scope.querySelectorAll && + scope + .querySelectorAll(".fletcher-dag-widget") + .forEach(function (widget) { + widgets.push(widget); + }); + widgets.forEach(renderDagWidget); + } + + window.initPlanDagWidgets = initPlanDagWidgets; + window.updatePlanDagSelection = updatePlanDagSelection; + window.updatePlanDagWidgetNodes = updatePlanDagWidgetNodes; + + document.addEventListener("DOMContentLoaded", function () { + initPlanDagWidgets(document); + }); + + window.addEventListener("load", function () { + initPlanDagWidgets(document); + }); + + document.body.addEventListener("htmx:afterSwap", function (event) { + initPlanDagWidgets(event.target); + }); +})(); diff --git a/src/api.rs b/src/api.rs index 9c47bf2..b1c4181 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,11 +2,15 @@ use crate::{ Config, auth::{Authenticated, JwtAuth, RemoteLogin, Role, authenticate}, core::{ - clear_edit, data_product_read, disable_drop, plan_add, plan_pause_edit, plan_read, - plan_search_read, states_edit, + clear_edit_with_config, data_product_read, disable_drop, execution_log_add, + plan_add_with_config, plan_pause_edit_with_config, plan_read, plan_search_read, + states_edit_with_config, }, error::into_poem_error, - model::{DataProduct, DataProductId, DatasetId, Plan, PlanParam, SearchReturn, StateParam}, + model::{ + DataProduct, DataProductId, DatasetId, ExecutionLog, ExecutionLogParam, Plan, PlanParam, + SearchReturn, StateParam, + }, }; use poem::{error::InternalServerError, web::Data}; use poem_openapi::{ @@ -47,6 +51,7 @@ impl Api { async fn plan_post( &self, auth: JwtAuth, + Data(config): Data<&Config>, Data(pool): Data<&PgPool>, Json(plan): Json, ) -> poem::Result> { @@ -56,7 +61,7 @@ impl Api { let mut tx = pool.begin().await.map_err(InternalServerError)?; // Add the plan to the DB - let plan: Plan = plan_add(&mut tx, &plan, auth.service()) + let plan: Plan = plan_add_with_config(&mut tx, config, &plan, auth.service()) .await .map_err(into_poem_error)?; @@ -114,6 +119,7 @@ impl Api { async fn plan_pause_put( &self, auth: JwtAuth, + Data(config): Data<&Config>, Data(pool): Data<&PgPool>, Path(dataset_id): Path, ) -> poem::Result> { @@ -123,9 +129,10 @@ impl Api { let mut tx = pool.begin().await.map_err(InternalServerError)?; // Pause a Plan - let plan: Plan = plan_pause_edit(&mut tx, dataset_id, true, auth.service()) - .await - .map_err(into_poem_error)?; + let plan: Plan = + plan_pause_edit_with_config(&mut tx, config, dataset_id, true, auth.service()) + .await + .map_err(into_poem_error)?; // Commit transaction tx.commit().await.map_err(InternalServerError)?; @@ -138,6 +145,7 @@ impl Api { async fn plan_unpause_put( &self, auth: JwtAuth, + Data(config): Data<&Config>, Data(pool): Data<&PgPool>, Path(dataset_id): Path, ) -> poem::Result> { @@ -147,9 +155,10 @@ impl Api { let mut tx = pool.begin().await.map_err(InternalServerError)?; // Unpause a Plan - let plan: Plan = plan_pause_edit(&mut tx, dataset_id, false, auth.service()) - .await - .map_err(into_poem_error)?; + let plan: Plan = + plan_pause_edit_with_config(&mut tx, config, dataset_id, false, auth.service()) + .await + .map_err(into_poem_error)?; // Commit transaction tx.commit().await.map_err(InternalServerError)?; @@ -184,6 +193,7 @@ impl Api { async fn state_put( &self, auth: JwtAuth, + Data(config): Data<&Config>, Data(pool): Data<&PgPool>, Path(dataset_id): Path, Json(states): Json>, @@ -194,9 +204,10 @@ impl Api { let mut tx = pool.begin().await.map_err(InternalServerError)?; // Update data product states and return the updated plan - let plan: Plan = states_edit(&mut tx, dataset_id, &states, auth.service()) - .await - .map_err(into_poem_error)?; + let plan: Plan = + states_edit_with_config(&mut tx, config, dataset_id, &states, auth.service()) + .await + .map_err(into_poem_error)?; // Commit Transaction tx.commit().await.map_err(InternalServerError)?; @@ -209,6 +220,7 @@ impl Api { async fn clear_put( &self, auth: JwtAuth, + Data(config): Data<&Config>, Data(pool): Data<&PgPool>, Path(dataset_id): Path, Json(data_product_ids): Json>, @@ -219,9 +231,15 @@ impl Api { let mut tx = pool.begin().await.map_err(InternalServerError)?; // Clear Data Products and clear all downsteam data products. - let plan: Plan = clear_edit(&mut tx, dataset_id, &data_product_ids, auth.service()) - .await - .map_err(into_poem_error)?; + let plan: Plan = clear_edit_with_config( + &mut tx, + config, + dataset_id, + &data_product_ids, + auth.service(), + ) + .await + .map_err(into_poem_error)?; // Commit Transaction tx.commit().await.map_err(InternalServerError)?; @@ -253,6 +271,27 @@ impl Api { Ok(Json(plan)) } + + /// Append a worker log entry for a data product + #[oai(path = "/data_product/:dataset_id/:data_product_id/log", method = "post", tag = Tag::DataProduct)] + async fn data_product_log_post( + &self, + auth: JwtAuth, + Data(pool): Data<&PgPool>, + Path(dataset_id): Path, + Path(data_product_id): Path, + Json(log): Json, + ) -> poem::Result> { + auth.check_role(Role::Update).map_err(into_poem_error)?; + + let mut tx = pool.begin().await.map_err(InternalServerError)?; + let log = execution_log_add(&mut tx, dataset_id, data_product_id, &log, auth.service()) + .await + .map_err(into_poem_error)?; + tx.commit().await.map_err(InternalServerError)?; + + Ok(Json(log)) + } } #[cfg(test)] @@ -346,7 +385,7 @@ mod tests { .get("test") .assert_string("passthrough1"); dp1.get("state").assert_string("queued"); - dp1.get("run_id").assert_null(); + assert!(!dp1.get("run_id").string().is_empty()); dp1.get("link").assert_null(); dp1.get("passback").assert_null(); dp1.get("extra") @@ -667,7 +706,7 @@ mod tests { .get("test") .assert_string("passthrough1"); json_value.object().get("state").assert_string("queued"); - json_value.object().get("run_id").assert_null(); + assert!(!json_value.object().get("run_id").string().is_empty()); json_value.object().get("link").assert_null(); json_value.object().get("passback").assert_null(); json_value @@ -798,17 +837,9 @@ mod tests { .unwrap(); dp2.get("id").assert_string(&dp2_id.to_string()); dp2.get("state").assert_string("queued"); - dp2.get("run_id") - .assert_string("12345678-1234-1234-1234-123456789def"); + assert!(!dp2.get("run_id").string().is_empty()); dp2.get("link").assert_string("https://example.com/run-456"); - dp2.get("passback") - .object() - .get("status") - .assert_string("finished"); - dp2.get("passback") - .object() - .get("result") - .assert_string("failed"); + dp2.get("passback").assert_null(); } /// Test State Put - Dataset Not Found Case diff --git a/src/core.rs b/src/core.rs index ee35a85..0699551 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,17 +1,27 @@ use crate::{ + Config, dag::Dag, - db::search_plans_select, + db::{ + dataset_lock, execution_log_insert, execution_logs_by_run_data_product_select, + plan_run_active_by_dataset_select, plan_run_by_step_run_select, + plan_run_data_product_upsert, plan_run_data_products_by_run_select, + plan_run_dependencies_by_run_select, plan_run_dependency_insert, plan_run_insert, + plan_run_select, plan_run_state_counts_select, plan_run_status_update, + plan_runs_by_dataset_select, search_plans_select, + }, error::{Error, Result}, model::{ - DataProduct, DataProductId, DatasetId, Edge, Plan, PlanParam, SearchReturn, SearchRow, - State, StateParam, + DataProduct, DataProductId, DatasetId, Edge, ExecutionLog, ExecutionLogParam, Plan, + PlanParam, PlanRun, RunPlan, SearchReturn, SearchRow, State, StateParam, }, + worker::extract_worker_spec, }; use chrono::{DateTime, Utc}; use petgraph::graph::DiGraph; use sqlx::{Postgres, Transaction}; -use std::collections::HashSet; -use tracing::warn; +use std::{collections::HashSet, env::current_exe, process::Command}; +use tracing::{info, warn}; +use uuid::Uuid; /// How much do we want to paginate by const PAGE_SIZE: u32 = 50; @@ -81,10 +91,22 @@ fn validate_plan_param(param: &PlanParam, plan: &Option) -> Result<()> { } /// Add a Plan Dag to the DB +#[cfg_attr(not(test), allow(dead_code))] pub async fn plan_add( tx: &mut Transaction<'_, Postgres>, param: &PlanParam, username: &str, +) -> Result { + let config = crate::load_config()?; + plan_add_with_config(tx, &config, param, username).await +} + +/// Add a Plan Dag to the DB using an explicit runtime config +pub async fn plan_add_with_config( + tx: &mut Transaction<'_, Postgres>, + config: &Config, + param: &PlanParam, + username: &str, ) -> Result { // Pull any prior details let wip_plan = Plan::from_db(tx, param.dataset.id).await; @@ -105,7 +127,7 @@ pub async fn plan_add( let mut plan: Plan = param.upsert(tx, username, modified_date).await?; // Triger the next batch of data products - trigger_next_batch(tx, &mut plan, username, modified_date).await?; + trigger_next_batch_with_config(tx, config, &mut plan, username, modified_date).await?; Ok(plan) } @@ -116,11 +138,24 @@ pub async fn plan_read(tx: &mut Transaction<'_, Postgres>, id: DatasetId) -> Res } /// Set the pause state of a plan +#[cfg_attr(not(test), allow(dead_code))] pub async fn plan_pause_edit( tx: &mut Transaction<'_, Postgres>, id: DatasetId, paused: bool, username: &str, +) -> Result { + let config = crate::load_config()?; + plan_pause_edit_with_config(tx, &config, id, paused, username).await +} + +/// Set the pause state of a plan using an explicit runtime config +pub async fn plan_pause_edit_with_config( + tx: &mut Transaction<'_, Postgres>, + config: &Config, + id: DatasetId, + paused: bool, + username: &str, ) -> Result { // Pull the current plan from the DB let mut plan = Plan::from_db(tx, id).await?; @@ -133,7 +168,7 @@ pub async fn plan_pause_edit( // If unpaused, lets run the next batch if !paused { - trigger_next_batch(tx, &mut plan, username, modified_date).await?; + trigger_next_batch_with_config(tx, config, &mut plan, username, modified_date).await?; } Ok(plan) @@ -174,6 +209,25 @@ async fn state_update( .state_update(tx, dataset_id, state, username, modified_date) .await?; + let plan_run = if let Some(step_run_id) = state.run_id { + plan_run_by_step_run_select(tx, dataset_id, state.id, step_run_id).await? + } else { + plan_run_active_by_dataset_select(tx, dataset_id).await? + }; + + if let Some(run) = plan_run { + snapshot_plan_run_data_product( + tx, + dataset_id, + run.id, + data_product, + username, + modified_date, + ) + .await?; + refresh_plan_run_status(tx, dataset_id, run.id, username, modified_date).await?; + } + Ok(()) } @@ -214,6 +268,9 @@ async fn clear_downstream_nodes( let current_state: StateParam = data_product.into(); let new_state = StateParam { state: State::Waiting, + run_id: None, + link: None, + passback: None, ..current_state }; @@ -226,12 +283,178 @@ async fn clear_downstream_nodes( Ok(()) } +/// Append a worker log entry +pub async fn execution_log_add( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + data_product_id: DataProductId, + param: &ExecutionLogParam, + username: &str, +) -> Result { + execution_log_insert(tx, dataset_id, data_product_id, param, username, Utc::now()).await +} + +async fn snapshot_plan_run_data_product( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + data_product: &DataProduct, + username: &str, + modified_date: DateTime, +) -> Result<()> { + plan_run_data_product_upsert( + tx, + dataset_id, + plan_run_id, + data_product.id, + data_product.compute, + &data_product.name, + &data_product.version, + data_product.eager, + data_product.state, + data_product.run_id, + data_product.link.as_deref(), + data_product.passback.clone(), + username, + modified_date, + ) + .await?; + + Ok(()) +} + +async fn refresh_plan_run_status( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + username: &str, + modified_date: DateTime, +) -> Result { + let (failed_count, success_count, running_count, queued_count, waiting_count) = + plan_run_state_counts_select(tx, plan_run_id).await?; + let total_count = failed_count + success_count + running_count + queued_count + waiting_count; + + let status = if failed_count > 0 { + State::Failed + } else if total_count > 0 && success_count == total_count { + State::Success + } else if running_count > 0 { + State::Running + } else if queued_count > 0 || waiting_count > 0 { + State::Queued + } else { + State::Queued + }; + + plan_run_status_update(tx, dataset_id, plan_run_id, status, username, modified_date).await +} + +async fn ensure_active_plan_run( + tx: &mut Transaction<'_, Postgres>, + plan: &Plan, + username: &str, + modified_date: DateTime, +) -> Result { + if let Some(run) = plan_run_active_by_dataset_select(tx, plan.dataset.id).await? { + return Ok(run); + } + + let plan_run_id = Uuid::new_v4(); + let run = plan_run_insert( + tx, + plan.dataset.id, + plan_run_id, + State::Queued, + username, + modified_date, + ) + .await?; + + for data_product in plan + .data_products + .iter() + .filter(|dp| dp.state != State::Disabled) + { + snapshot_plan_run_data_product( + tx, + plan.dataset.id, + plan_run_id, + data_product, + username, + modified_date, + ) + .await?; + } + + for dependency in &plan.dependencies { + let parent = plan.data_product(dependency.parent_id); + let child = plan.data_product(dependency.child_id); + if matches!(parent, Some(dp) if dp.state != State::Disabled) + && matches!(child, Some(dp) if dp.state != State::Disabled) + { + plan_run_dependency_insert( + tx, + plan.dataset.id, + plan_run_id, + dependency.parent_id, + dependency.child_id, + ) + .await?; + } + } + + Ok(run) +} + +pub async fn plan_runs_read( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, +) -> Result> { + plan_runs_by_dataset_select(tx, dataset_id).await +} + +pub async fn plan_run_read( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, +) -> Result { + let run = plan_run_select(tx, dataset_id, plan_run_id).await?; + let data_products = plan_run_data_products_by_run_select(tx, dataset_id, plan_run_id).await?; + let dependencies = plan_run_dependencies_by_run_select(tx, dataset_id, plan_run_id).await?; + Ok(RunPlan { + run, + data_products, + dependencies, + }) +} + +pub async fn plan_run_step_logs_read( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + data_product_id: DataProductId, +) -> Result> { + execution_logs_by_run_data_product_select(tx, dataset_id, plan_run_id, data_product_id).await +} + /// Triger the next batch of data products +#[cfg_attr(not(test), allow(dead_code))] async fn trigger_next_batch( tx: &mut Transaction<'_, Postgres>, plan: &mut Plan, username: &str, modified_date: DateTime, +) -> Result<()> { + let config = crate::load_config()?; + trigger_next_batch_with_config(tx, &config, plan, username, modified_date).await +} + +async fn trigger_next_batch_with_config( + tx: &mut Transaction<'_, Postgres>, + config: &Config, + plan: &mut Plan, + username: &str, + modified_date: DateTime, ) -> Result<()> { // Is the dataset paused? If so, no need to trigger the next data product. if plan.dataset.paused { @@ -254,8 +477,13 @@ async fn trigger_next_batch( }) .collect(); + if waiting_ids.is_empty() { + return Ok(()); + } + // Generate Dag representation of the plan (Disabled nodes are not part of the dag.) let dag: DiGraph = plan.to_dag()?; + let mut active_plan_run: Option = None; // Check each data product's parents to see if any of them are blocking for waiting_id in waiting_ids { @@ -271,17 +499,37 @@ async fn trigger_next_batch( // If the waiting node is ready to run, trigger it if ready { - warn!( - "This is where I would trigger the data product with the OaaS Wrapper... IF I HAD ONE!!!" - ); + let plan_run_id = if let Some(run) = &active_plan_run { + run.id + } else { + let run = ensure_active_plan_run(tx, plan, username, modified_date).await?; + let run_id = run.id; + active_plan_run = Some(run); + run_id + }; // Representation of the current data product let data_product: &mut DataProduct = plan .data_product_mut(waiting_id) .ok_or(Error::Unreachable)?; + let worker_spec = extract_worker_spec(&data_product.passthrough)?; + let run_id = Uuid::new_v4(); + // Params for the current and new state. Make our change and dump the current state to fill the rest. let new_state = StateParam { + run_id: Some(run_id), + link: worker_spec + .as_ref() + .map(|spec| spec.script.clone()) + .or_else(|| data_product.link.clone()), + passback: worker_spec.as_ref().map(|spec| { + serde_json::json!({ + "worker_mode": "local", + "runner": spec.runner, + "script": spec.script, + }) + }), state: State::Queued, ..data_product.into() }; @@ -290,6 +538,53 @@ async fn trigger_next_batch( data_product .state_update(tx, dataset_id, &new_state, username, modified_date) .await?; + snapshot_plan_run_data_product( + tx, + dataset_id, + plan_run_id, + data_product, + username, + modified_date, + ) + .await?; + refresh_plan_run_status(tx, dataset_id, plan_run_id, username, modified_date).await?; + + if let Some(spec) = worker_spec { + if let Err(err) = launch_local_worker(config, dataset_id, waiting_id, run_id, &spec) + { + let failed_state = StateParam { + id: waiting_id, + state: State::Failed, + run_id: Some(run_id), + link: Some(spec.script.clone()), + passback: Some(serde_json::json!({ + "worker_mode": "local", + "runner": spec.runner, + "script": spec.script, + "launch_error": err.to_string(), + })), + }; + + data_product + .state_update(tx, dataset_id, &failed_state, username, modified_date) + .await?; + snapshot_plan_run_data_product( + tx, + dataset_id, + plan_run_id, + data_product, + username, + modified_date, + ) + .await?; + refresh_plan_run_status(tx, dataset_id, plan_run_id, username, modified_date) + .await?; + } + } else { + warn!( + "Data product '{waiting_id}' reached queued state but has no local worker definition" + ); + } } } @@ -297,15 +592,30 @@ async fn trigger_next_batch( } /// Edit the state of a Data Product +#[cfg_attr(not(test), allow(dead_code))] pub async fn states_edit( tx: &mut Transaction<'_, Postgres>, id: DatasetId, states: &[StateParam], username: &str, +) -> Result { + let config = crate::load_config()?; + states_edit_with_config(tx, &config, id, states, username).await +} + +/// Edit the state of a Data Product using an explicit runtime config +pub async fn states_edit_with_config( + tx: &mut Transaction<'_, Postgres>, + config: &Config, + id: DatasetId, + states: &[StateParam], + username: &str, ) -> Result { // Timestamp of the transaction let modified_date: DateTime = Utc::now(); + dataset_lock(tx, id).await?; + // Check the new state to update to, and make sure is valid. for state in states { if matches!( @@ -329,21 +639,36 @@ pub async fn states_edit( clear_downstream_nodes(tx, &mut plan, &nodes, username, modified_date).await?; // Trigger the next batch of data products - trigger_next_batch(tx, &mut plan, username, modified_date).await?; + trigger_next_batch_with_config(tx, config, &mut plan, username, modified_date).await?; Ok(plan) } /// Clear Data Products so then can be rerun +#[cfg_attr(not(test), allow(dead_code))] pub async fn clear_edit( tx: &mut Transaction<'_, Postgres>, id: DatasetId, data_product_ids: &[DataProductId], username: &str, +) -> Result { + let config = crate::load_config()?; + clear_edit_with_config(tx, &config, id, data_product_ids, username).await +} + +/// Clear Data Products so then can be rerun using an explicit runtime config +pub async fn clear_edit_with_config( + tx: &mut Transaction<'_, Postgres>, + config: &Config, + id: DatasetId, + data_product_ids: &[DataProductId], + username: &str, ) -> Result { // Timestamp of the transaction let modified_date: DateTime = Utc::now(); + dataset_lock(tx, id).await?; + // Pull the Plan so we know what we are working with let mut plan = Plan::from_db(tx, id).await?; @@ -355,6 +680,9 @@ pub async fn clear_edit( // Build new cleared state let state = StateParam { state: State::Waiting, + run_id: None, + link: None, + passback: None, ..data_product.into() }; @@ -366,11 +694,72 @@ pub async fn clear_edit( clear_downstream_nodes(tx, &mut plan, data_product_ids, username, modified_date).await?; // Trigger the next batch of data products - trigger_next_batch(tx, &mut plan, username, modified_date).await?; + trigger_next_batch_with_config(tx, config, &mut plan, username, modified_date).await?; Ok(plan) } +fn launch_local_worker( + config: &Config, + dataset_id: DatasetId, + data_product_id: DataProductId, + run_id: Uuid, + spec: &crate::worker::LocalWorkerSpec, +) -> Result<()> { + let executable = current_exe()?; + let command_line = format!( + "{} worker --remote {} --dataset-id {} --data-product-id {} --run-id {} --runner {} --script {} --service {}", + executable.display(), + config.base_url, + dataset_id, + data_product_id, + run_id, + spec.runner.as_arg(), + spec.script, + config.worker_service, + ); + let mut command = Command::new(&executable); + command + .arg("worker") + .arg("--remote") + .arg(&config.base_url) + .arg("--dataset-id") + .arg(dataset_id.to_string()) + .arg("--data-product-id") + .arg(data_product_id.to_string()) + .arg("--run-id") + .arg(run_id.to_string()) + .arg("--runner") + .arg(spec.runner.as_arg()) + .arg("--script") + .arg(&spec.script) + .arg("--service") + .arg(&config.worker_service) + .env("FLETCHER_WORKER_KEY", &config.worker_key); + + info!( + dataset_id = %dataset_id, + data_product_id = %data_product_id, + run_id = %run_id, + runner = %spec.runner.as_arg(), + script = %spec.script, + remote = %config.base_url, + command_line = %command_line, + "launching fletcher worker" + ); + + let child = command.spawn()?; + info!( + dataset_id = %dataset_id, + data_product_id = %data_product_id, + run_id = %run_id, + pid = child.id(), + "fletcher worker launched" + ); + + Ok(()) +} + /// Mark a Data Product as Disabled pub async fn disable_drop( tx: &mut Transaction<'_, Postgres>, @@ -378,9 +767,12 @@ pub async fn disable_drop( data_product_ids: &[DataProductId], username: &str, ) -> Result { + let config = crate::load_config()?; // Timestamp of the transaction let modified_date: DateTime = Utc::now(); + dataset_lock(tx, id).await?; + // Pull the Plan so we know what we are working with let mut plan = Plan::from_db(tx, id).await?; @@ -399,7 +791,7 @@ pub async fn disable_drop( } // Triger the next batch of data products - trigger_next_batch(tx, &mut plan, username, modified_date).await?; + trigger_next_batch_with_config(tx, &config, &mut plan, username, modified_date).await?; Ok(plan) } @@ -1452,6 +1844,12 @@ mod tests { assert!(result.is_ok()); assert_eq!(plan.data_product(dp2_id).unwrap().state, State::Waiting); assert_eq!(plan.data_product(dp3_id).unwrap().state, State::Waiting); + assert_eq!(plan.data_product(dp2_id).unwrap().run_id, None); + assert_eq!(plan.data_product(dp2_id).unwrap().link, None); + assert_eq!(plan.data_product(dp2_id).unwrap().passback, None); + assert_eq!(plan.data_product(dp3_id).unwrap().run_id, None); + assert_eq!(plan.data_product(dp3_id).unwrap().link, None); + assert_eq!(plan.data_product(dp3_id).unwrap().passback, None); } /// Test clear_downstream_nodes skips nodes already in Waiting or Disabled state @@ -2046,6 +2444,12 @@ mod tests { updated_plan.data_product(dp3_id).unwrap().state, State::Waiting ); + assert_eq!(updated_plan.data_product(dp2_id).unwrap().run_id, None); + assert_eq!(updated_plan.data_product(dp2_id).unwrap().link, None); + assert_eq!(updated_plan.data_product(dp2_id).unwrap().passback, None); + assert_eq!(updated_plan.data_product(dp3_id).unwrap().run_id, None); + assert_eq!(updated_plan.data_product(dp3_id).unwrap().link, None); + assert_eq!(updated_plan.data_product(dp3_id).unwrap().passback, None); } /// Test clear_edit returns error for non-existent data product @@ -3203,6 +3607,123 @@ mod tests { ); } + /// Test state_update snapshots the run identified by the step run id, not just the latest active run. + #[sqlx::test] + async fn test_state_update_targets_matching_plan_run(pool: PgPool) { + let mut tx = pool.begin().await.unwrap(); + let param = create_test_plan_param(); + let username = "test_user"; + let modified_date = Utc::now(); + + let mut plan = plan_add(&mut tx, ¶m, username).await.unwrap(); + let data_product_id = plan.data_products[0].id; + + let older_run_id = Uuid::new_v4(); + let newer_run_id = Uuid::new_v4(); + let older_created_at = modified_date; + let newer_created_at = modified_date + chrono::Duration::seconds(1); + let older_step_run_id = Uuid::new_v4(); + let newer_step_run_id = Uuid::new_v4(); + + plan_run_insert( + &mut tx, + plan.dataset.id, + older_run_id, + State::Queued, + username, + older_created_at, + ) + .await + .unwrap(); + plan_run_insert( + &mut tx, + plan.dataset.id, + newer_run_id, + State::Queued, + username, + newer_created_at, + ) + .await + .unwrap(); + + for data_product in &plan.data_products { + let mut older_snapshot = data_product.clone(); + if older_snapshot.id == data_product_id { + older_snapshot.run_id = Some(older_step_run_id); + } + snapshot_plan_run_data_product( + &mut tx, + plan.dataset.id, + older_run_id, + &older_snapshot, + username, + older_created_at, + ) + .await + .unwrap(); + + let mut newer_snapshot = data_product.clone(); + if newer_snapshot.id == data_product_id { + newer_snapshot.run_id = Some(newer_step_run_id); + } + snapshot_plan_run_data_product( + &mut tx, + plan.dataset.id, + newer_run_id, + &newer_snapshot, + username, + newer_created_at, + ) + .await + .unwrap(); + } + + let state_param = StateParam { + id: data_product_id, + state: State::Success, + run_id: Some(older_step_run_id), + link: Some("http://test-link.com".to_string()), + passback: None, + }; + + state_update(&mut tx, &mut plan, &state_param, username, modified_date) + .await + .unwrap(); + + let older_run = plan_run_read(&mut tx, plan.dataset.id, older_run_id) + .await + .unwrap(); + let newer_run = plan_run_read(&mut tx, plan.dataset.id, newer_run_id) + .await + .unwrap(); + + assert_eq!( + older_run + .data_products + .iter() + .find(|dp| dp.data_product_id == data_product_id) + .unwrap() + .state, + State::Success, + "The run matching the worker step run id should receive the success snapshot" + ); + let newer_snapshot = newer_run + .data_products + .iter() + .find(|dp| dp.data_product_id == data_product_id) + .unwrap(); + assert_eq!( + newer_snapshot.step_run_id, + Some(newer_step_run_id), + "A newer active run should keep its own step run id" + ); + assert_ne!( + newer_snapshot.state, + State::Success, + "A newer active run should not be updated by an older worker callback" + ); + } + /// Test state_update with missing data product #[sqlx::test] async fn test_state_update_missing_data_product(pool: PgPool) { diff --git a/src/db.rs b/src/db.rs index b902175..8a628b7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,12 +2,14 @@ use crate::{ error::Result, model::{ Compute, DataProduct, DataProductId, DataProductParam, Dataset, DatasetId, DatasetParam, - Dependency, DependencyParam, SearchRow, State, StateParam, + Dependency, DependencyParam, ExecutionLog, ExecutionLogParam, PlanRun, PlanRunDataProduct, + PlanRunDependency, SearchRow, State, StateParam, }, }; use chrono::{DateTime, Utc}; use serde_json::Value; -use sqlx::{Postgres, Transaction, query_as}; +use sqlx::{Postgres, Transaction, query, query_as}; +use uuid::Uuid; /// Insert up Update a Dataset pub async fn dataset_upsert( @@ -79,6 +81,24 @@ pub async fn dataset_select( Ok(dataset) } +/// Lock a dataset row for the duration of the current transaction +pub async fn dataset_lock(tx: &mut Transaction<'_, Postgres>, dataset_id: DatasetId) -> Result<()> { + query!( + r#"SELECT + dataset_id + FROM + dataset + WHERE + dataset_id = $1 + FOR UPDATE"#, + dataset_id, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(()) +} + /// Pause / Un-pause a dataset pub async fn dataset_pause_update( tx: &mut Transaction<'_, Postgres>, @@ -319,6 +339,577 @@ pub async fn data_products_by_dataset_select( Ok(data_products) } +/// Append a log entry for a running worker +pub async fn execution_log_insert( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + data_product_id: DataProductId, + param: &ExecutionLogParam, + username: &str, + created_at: DateTime, +) -> Result { + let log = query_as!( + ExecutionLog, + r#"INSERT INTO execution_log ( + dataset_id, + data_product_id, + run_id, + stream, + message, + created_by, + created_at + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7 + ) RETURNING + log_id AS id, + data_product_id, + run_id, + stream, + message, + created_by, + created_at"#, + dataset_id, + data_product_id, + param.run_id, + param.stream, + param.message, + username, + created_at, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(log) +} + +/// Retrieve logs for the latest run of each data product in a dataset +#[cfg_attr(not(test), allow(dead_code))] +pub async fn execution_logs_by_dataset_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, +) -> Result> { + let logs = query_as!( + ExecutionLog, + r#"SELECT + log.log_id AS id, + log.data_product_id, + log.run_id, + log.stream, + log.message, + log.created_by, + log.created_at + FROM + execution_log log + JOIN + data_product dp + ON + log.dataset_id = dp.dataset_id + AND log.data_product_id = dp.data_product_id + AND log.run_id = dp.run_id + WHERE + log.dataset_id = $1 + ORDER BY + log.created_at, + log.log_id"#, + dataset_id, + ) + .fetch_all(&mut **tx) + .await?; + + Ok(logs) +} + +/// Insert a new plan execution run +pub async fn plan_run_insert( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + status: State, + username: &str, + created_at: DateTime, +) -> Result { + let run = query_as!( + PlanRun, + r#"INSERT INTO plan_run ( + plan_run_id, + dataset_id, + status, + created_by, + created_at, + modified_by, + modified_date + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $4, + $5 + ) RETURNING + plan_run_id AS id, + dataset_id, + status AS "status: State", + created_by, + created_at, + modified_by, + modified_date"#, + plan_run_id, + dataset_id, + status as State, + username, + created_at, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(run) +} + +/// Select the most recent active plan run for a dataset +pub async fn plan_run_active_by_dataset_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, +) -> Result> { + let run = query_as!( + PlanRun, + r#"SELECT + plan_run_id AS id, + dataset_id, + status AS "status: State", + created_by, + created_at, + modified_by, + modified_date + FROM + plan_run + WHERE + dataset_id = $1 + AND status IN ('queued'::state, 'running'::state) + ORDER BY + created_at DESC, + plan_run_id DESC + LIMIT 1"#, + dataset_id, + ) + .fetch_optional(&mut **tx) + .await?; + + Ok(run) +} + +/// Select the plan run owning a specific worker step run +pub async fn plan_run_by_step_run_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + data_product_id: DataProductId, + step_run_id: Uuid, +) -> Result> { + let run = query_as!( + PlanRun, + r#"SELECT + run.plan_run_id AS id, + run.dataset_id, + run.status AS "status: State", + run.created_by, + run.created_at, + run.modified_by, + run.modified_date + FROM + plan_run run + JOIN + plan_run_data_product run_dp + ON + run.plan_run_id = run_dp.plan_run_id + AND run.dataset_id = run_dp.dataset_id + WHERE + run.dataset_id = $1 + AND run_dp.data_product_id = $2 + AND run_dp.step_run_id = $3 + ORDER BY + run.created_at DESC, + run.plan_run_id DESC + LIMIT 1"#, + dataset_id, + data_product_id, + step_run_id, + ) + .fetch_optional(&mut **tx) + .await?; + + Ok(run) +} + +/// Select all plan runs for a dataset +pub async fn plan_runs_by_dataset_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, +) -> Result> { + let runs = query_as!( + PlanRun, + r#"SELECT + plan_run_id AS id, + dataset_id, + status AS "status: State", + created_by, + created_at, + modified_by, + modified_date + FROM + plan_run + WHERE + dataset_id = $1 + ORDER BY + created_at DESC, + plan_run_id DESC"#, + dataset_id, + ) + .fetch_all(&mut **tx) + .await?; + + Ok(runs) +} + +/// Select a plan run by id +pub async fn plan_run_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, +) -> Result { + let run = query_as!( + PlanRun, + r#"SELECT + plan_run_id AS id, + dataset_id, + status AS "status: State", + created_by, + created_at, + modified_by, + modified_date + FROM + plan_run + WHERE + dataset_id = $1 + AND plan_run_id = $2"#, + dataset_id, + plan_run_id, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(run) +} + +/// Update a plan run's status +pub async fn plan_run_status_update( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + status: State, + username: &str, + modified_date: DateTime, +) -> Result { + let run = query_as!( + PlanRun, + r#"UPDATE + plan_run + SET + status = $3, + modified_by = $4, + modified_date = $5 + WHERE + dataset_id = $1 + AND plan_run_id = $2 + RETURNING + plan_run_id AS id, + dataset_id, + status AS "status: State", + created_by, + created_at, + modified_by, + modified_date"#, + dataset_id, + plan_run_id, + status as State, + username, + modified_date, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(run) +} + +/// Upsert a plan run's data product snapshot +#[allow(clippy::too_many_arguments)] +pub async fn plan_run_data_product_upsert( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + data_product_id: DataProductId, + compute: Compute, + name: &str, + version: &str, + eager: bool, + state: State, + step_run_id: Option, + link: Option<&str>, + passback: Option, + username: &str, + modified_date: DateTime, +) -> Result { + let row = query_as!( + PlanRunDataProduct, + r#"INSERT INTO plan_run_data_product ( + plan_run_id, + dataset_id, + data_product_id, + compute, + name, + version, + eager, + state, + step_run_id, + link, + passback, + modified_by, + modified_date + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13 + ) ON CONFLICT (plan_run_id, data_product_id) DO + UPDATE SET + compute = $4, + name = $5, + version = $6, + eager = $7, + state = $8, + step_run_id = $9, + link = $10, + passback = $11, + modified_by = $12, + modified_date = $13 + RETURNING + data_product_id, + compute AS "compute: Compute", + name, + version, + eager, + state AS "state: State", + step_run_id, + link, + passback, + modified_by, + modified_date"#, + plan_run_id, + dataset_id, + data_product_id, + compute as Compute, + name, + version, + eager, + state as State, + step_run_id, + link, + passback, + username, + modified_date, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(row) +} + +/// Insert a plan run dependency snapshot +pub async fn plan_run_dependency_insert( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + parent_id: DataProductId, + child_id: DataProductId, +) -> Result { + let row = query_as!( + PlanRunDependency, + r#"INSERT INTO plan_run_dependency ( + plan_run_id, + dataset_id, + parent_id, + child_id + ) VALUES ( + $1, + $2, + $3, + $4 + ) ON CONFLICT (plan_run_id, parent_id, child_id) DO NOTHING + RETURNING + parent_id, + child_id"#, + plan_run_id, + dataset_id, + parent_id, + child_id, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(row) +} + +/// Select all run data product snapshots for a plan run +pub async fn plan_run_data_products_by_run_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, +) -> Result> { + let rows = query_as!( + PlanRunDataProduct, + r#"SELECT + data_product_id, + compute AS "compute: Compute", + name, + version, + eager, + state AS "state: State", + step_run_id, + link, + passback, + modified_by, + modified_date + FROM + plan_run_data_product + WHERE + dataset_id = $1 + AND plan_run_id = $2 + ORDER BY + modified_date, + data_product_id"#, + dataset_id, + plan_run_id, + ) + .fetch_all(&mut **tx) + .await?; + + Ok(rows) +} + +/// Select all run dependency snapshots for a plan run +pub async fn plan_run_dependencies_by_run_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, +) -> Result> { + let rows = query_as!( + PlanRunDependency, + r#"SELECT + parent_id, + child_id + FROM + plan_run_dependency + WHERE + dataset_id = $1 + AND plan_run_id = $2 + ORDER BY + parent_id, + child_id"#, + dataset_id, + plan_run_id, + ) + .fetch_all(&mut **tx) + .await?; + + Ok(rows) +} + +/// Select logs for a step within a historical plan run +pub async fn execution_logs_by_run_data_product_select( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + data_product_id: DataProductId, +) -> Result> { + let logs = query_as!( + ExecutionLog, + r#"SELECT + log.log_id AS id, + log.data_product_id, + log.run_id, + log.stream, + log.message, + log.created_by, + log.created_at + FROM + execution_log log + JOIN + plan_run_data_product run_dp + ON + log.run_id = run_dp.step_run_id + AND log.data_product_id = run_dp.data_product_id + WHERE + log.dataset_id = $1 + AND run_dp.plan_run_id = $2 + AND run_dp.dataset_id = $1 + AND run_dp.data_product_id = $3 + ORDER BY + log.created_at, + log.log_id"#, + dataset_id, + plan_run_id, + data_product_id, + ) + .fetch_all(&mut **tx) + .await?; + + Ok(logs) +} + +/// Count run step states for a historical plan run +pub async fn plan_run_state_counts_select( + tx: &mut Transaction<'_, Postgres>, + plan_run_id: Uuid, +) -> Result<(i64, i64, i64, i64, i64)> { + let row = query!( + r#"SELECT + COUNT(*) FILTER (WHERE state = 'failed'::state) AS "failed_count!", + COUNT(*) FILTER (WHERE state = 'success'::state) AS "success_count!", + COUNT(*) FILTER (WHERE state = 'running'::state) AS "running_count!", + COUNT(*) FILTER (WHERE state = 'queued'::state) AS "queued_count!", + COUNT(*) FILTER (WHERE state = 'waiting'::state) AS "waiting_count!" + FROM + plan_run_data_product + WHERE + plan_run_id = $1"#, + plan_run_id, + ) + .fetch_one(&mut **tx) + .await?; + + Ok(( + row.failed_count, + row.success_count, + row.running_count, + row.queued_count, + row.waiting_count, + )) +} + /// Upsert a new Dependency between Data Products pub async fn dependency_upsert( tx: &mut Transaction<'_, Postgres>, diff --git a/src/error.rs b/src/error.rs index fb7c63b..5643160 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,7 +8,10 @@ use petgraph::graph::GraphError; use poem::error::{ BadRequest, Forbidden, InternalServerError, NotFound, Unauthorized, UnprocessableEntity, }; -use std::num::ParseIntError; +use reqwest::Error as ReqwestError; +use std::{io::Error as IoError, num::ParseIntError}; +use tokio::task::JoinError; +use uuid::Error as UuidError; /// Crate-wide result alias. pub type Result = std::result::Result; @@ -60,6 +63,14 @@ pub enum Error { #[error(transparent)] Jwt(#[from] JwtError), + /// Error from std::io + #[error(transparent)] + Io(#[from] IoError), + + /// Error from joining async tasks + #[error(transparent)] + Join(#[from] JoinError), + /// Data Product not found #[error("Data product not found for: '{0}'")] Missing(DataProductId), @@ -72,6 +83,10 @@ pub enum Error { #[error(transparent)] ParseInt(#[from] ParseIntError), + /// Error from reqwest + #[error(transparent)] + Reqwest(#[from] ReqwestError), + /// Missing the needed role access #[error("Service account '{0}' is missing the following role: '{1}'")] Role(String, Role), @@ -84,9 +99,17 @@ pub enum Error { #[error(transparent)] Sqlx(#[from] sqlx::Error), + /// Error from UUID parsing + #[error(transparent)] + Uuid(#[from] UuidError), + /// This error should never be reachable #[error("This error should not have been reachable")] Unreachable, + + /// Worker exited with non-zero status + #[error("Worker exited with non-zero exit code: {0}")] + WorkerExit(i32), } impl Error { @@ -103,10 +126,13 @@ impl Error { Error::Graph(err) => InternalServerError(err), Error::InvalidKey => Unauthorized(self), Error::InvalidService(_) => Unauthorized(self), + Error::Io(err) => InternalServerError(err), + Error::Join(err) => InternalServerError(err), Error::Jwt(err) => Forbidden(err), Error::Missing(_) => NotFound(self), Error::Pause(_, _) => BadRequest(self), Error::ParseInt(err) => InternalServerError(err), + Error::Reqwest(err) => InternalServerError(err), Error::Role(_, _) => Forbidden(self), Error::SerdeJson(err) => InternalServerError(err), Error::Sqlx(sqlx::Error::RowNotFound) => NotFound(sqlx::Error::RowNotFound), @@ -115,6 +141,8 @@ impl Error { } Error::Sqlx(err) => InternalServerError(err), Error::Unreachable => InternalServerError(self), + Error::Uuid(err) => BadRequest(err), + Error::WorkerExit(_) => InternalServerError(self), } } } diff --git a/src/main.rs b/src/main.rs index 17153f9..d176c74 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,13 +6,16 @@ mod db; mod error; mod model; mod ui; +mod worker; use crate::{ api::Api, auth::RemoteAuth, error::Result, ui::{not_found_404, user_service}, + worker::{Cli, CliCommand, run_worker}, }; +use clap::Parser; use jsonwebtoken::{DecodingKey, EncodingKey}; use poem::{ EndpointExt, Route, Server, endpoint::EmbeddedFilesEndpoint, listener::TcpListener, @@ -25,12 +28,14 @@ use sqlx::{PgPool, migrate, postgres::PgPoolOptions}; /// Struct we will put all our configs to run Fletcher into #[derive(Clone)] pub struct Config { - base_url: String, - database_url: String, - decoding_key: DecodingKey, - encoding_key: EncodingKey, - max_connections: u32, - remote_auths: Vec, + pub(crate) base_url: String, + pub(crate) database_url: String, + pub(crate) decoding_key: DecodingKey, + pub(crate) encoding_key: EncodingKey, + pub(crate) max_connections: u32, + pub(crate) remote_auths: Vec, + pub(crate) worker_service: String, + pub(crate) worker_key: String, } /// Static files hosted via webserver @@ -46,6 +51,17 @@ async fn main() -> color_eyre::Result<()> { // Enable Poem's logging tracing_subscriber::fmt::init(); + let cli = Cli::parse(); + + match cli.command { + Some(CliCommand::Worker(args)) => run_worker(args).await?, + None => run_server().await?, + } + + Ok(()) +} + +async fn run_server() -> color_eyre::Result<()> { // Read in configs let config: Config = load_config()?; @@ -109,6 +125,8 @@ pub fn load_config() -> Result { encoding_key: EncodingKey::from_secret(secret_key_bytes), max_connections, remote_auths: serde_json::from_str(&dotenvy::var("REMOTE_APIS")?)?, + worker_service: dotenvy::var("WORKER_SERVICE").unwrap_or("local".to_string()), + worker_key: dotenvy::var("WORKER_KEY").unwrap_or("abc123".to_string()), }) } @@ -128,7 +146,7 @@ mod tests { let config = result.unwrap(); assert_eq!( config.database_url, - "postgres://fletcher_user:password@localhost/fletcher_db" + "postgres://fletcher_user:password@127.0.0.1:5432/fletcher_db" ); assert_eq!(config.max_connections, 10); assert_eq!(config.remote_auths.len(), 2); @@ -214,6 +232,20 @@ mod tests { "viz/viz-global.js should be present in embedded assets", ); + // Check that nice-dag-core.js exists in the embedded assets + let nice_dag_file = Assets::get("nice-dag/nice-dag-core.js"); + assert!( + nice_dag_file.is_some(), + "nice-dag/nice-dag-core.js should be present in embedded assets", + ); + + // Check that plan-dag.js exists in the embedded assets + let plan_dag_file = Assets::get("js/plan-dag.js"); + assert!( + plan_dag_file.is_some(), + "js/plan-dag.js should be present in embedded assets", + ); + // Check that prism.js exists in the embedded assets let prism_js_file = Assets::get("prism/prism.js"); assert!( diff --git a/src/model.rs b/src/model.rs index 724a540..603863e 100644 --- a/src/model.rs +++ b/src/model.rs @@ -207,6 +207,75 @@ impl DataProduct { } } +/// Persisted worker log entry for a data product run +#[derive(Clone, Debug, Object, PartialEq)] +pub struct ExecutionLog { + pub id: i64, + pub data_product_id: DataProductId, + pub run_id: Uuid, + pub stream: String, + pub message: String, + pub created_by: String, + pub created_at: DateTime, +} + +/// Input for appending worker logs +#[derive(Clone, Object)] +pub struct ExecutionLogParam { + pub run_id: Uuid, + pub stream: String, + pub message: String, +} + +/// Historical execution run for a plan +#[derive(Clone, Debug, Object, PartialEq)] +pub struct PlanRun { + pub id: Uuid, + pub dataset_id: DatasetId, + pub status: State, + pub created_by: String, + pub created_at: DateTime, + pub modified_by: String, + pub modified_date: DateTime, +} + +impl PlanRun { + pub fn is_active(&self) -> bool { + matches!(self.status, State::Queued | State::Running) + } +} + +/// Snapshot of a data product inside a historical plan run +#[derive(Clone, Debug, Object, PartialEq)] +pub struct PlanRunDataProduct { + pub data_product_id: DataProductId, + pub compute: Compute, + pub name: String, + pub version: String, + pub eager: bool, + pub state: State, + pub step_run_id: Option, + pub link: Option, + pub passback: Option, + pub modified_by: String, + pub modified_date: DateTime, +} + +/// Snapshot of a dependency inside a historical plan run +#[derive(Clone, Debug, Object, PartialEq)] +pub struct PlanRunDependency { + pub parent_id: DataProductId, + pub child_id: DataProductId, +} + +/// Historical run view of a plan +#[derive(Clone, Debug, Object, PartialEq)] +pub struct RunPlan { + pub run: PlanRun, + pub data_products: Vec, + pub dependencies: Vec, +} + /// Dependency from one Data Product to another Data Product #[derive(Clone, Debug, Object, PartialEq)] pub struct Dependency { diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 92a11cb..adb9f6a 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -4,9 +4,12 @@ mod layout; mod page; mod snippet; -use crate::ui::page::{index_page, plan_page}; +use crate::ui::page::{ + index_page, plan_page, plan_run_graph_get, plan_run_logs_get, plan_run_nodes_get, + plan_run_page, trigger_plan_post, +}; pub use crate::ui::{component::plan_search_get, error::not_found_404}; -use poem::{Route, get}; +use poem::{Route, get, post}; /// Router for UI pub fn user_service() -> Route { @@ -14,4 +17,18 @@ pub fn user_service() -> Route { .at("/", get(index_page)) .at("/component/plan_search", get(plan_search_get)) .at("/plan/:dataset_id", get(plan_page)) + .at("/plan/:dataset_id/run/:plan_run_id", get(plan_run_page)) + .at( + "/plan/:dataset_id/run/:plan_run_id/nodes", + get(plan_run_nodes_get), + ) + .at( + "/plan/:dataset_id/run/:plan_run_id/graph", + get(plan_run_graph_get), + ) + .at( + "/plan/:dataset_id/run/:plan_run_id/logs", + get(plan_run_logs_get), + ) + .at("/plan/:dataset_id/trigger", post(trigger_plan_post)) } diff --git a/src/ui/page.rs b/src/ui/page.rs index 1c26e2b..2599877 100644 --- a/src/ui/page.rs +++ b/src/ui/page.rs @@ -1,23 +1,66 @@ use crate::{ - core::plan_read, - model::{DataProduct, DataProductId, DatasetId, Plan, State}, + Config, + core::{ + clear_edit_with_config, plan_read, plan_run_read, plan_run_step_logs_read, plan_runs_read, + }, + model::{DataProduct, DataProductId, DatasetId, ExecutionLog, Plan, PlanRun, RunPlan, State}, ui::{component::plan_search_component, layout::base_layout}, }; use maud::{Markup, PreEscaped, html}; -use petgraph::{ - dot::{Config, Dot, RankDir}, - graph::DiGraph, -}; use poem::{ Result, error::{InternalServerError, NotFoundError}, handler, http::StatusCode, - web::{Data, Path}, + web::{Data, Json, Path, Query, Redirect}, }; -use poem_openapi::types::ToJSON; -use serde_json::to_string_pretty; +use serde::{Deserialize, Serialize}; use sqlx::{PgPool, Postgres, Transaction}; +use std::collections::{HashMap, HashSet}; +use uuid::Uuid; + +struct PlanPageData { + plan: Plan, + runs: Vec, + dag_nodes_json: String, +} + +struct RunPageData { + plan: Plan, + runs: Vec, + run_plan: RunPlan, + dag_nodes_json: String, + selected_step_id: Option, + selected_step_logs: Vec, +} + +#[derive(Serialize)] +struct RunDagUpdatePayload { + nodes: Vec, + active: bool, +} + +#[derive(Default, Deserialize)] +struct RunStepQuery { + step: Option, +} + +#[derive(Serialize)] +struct NiceDagNode { + id: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + dependencies: Vec, + data: NiceDagNodeData, +} + +#[derive(Serialize)] +struct NiceDagNodeData { + name: String, + version: String, + state: String, + compute: String, + run_id: Option, +} /// Index Page #[handler] @@ -64,6 +107,7 @@ pub async fn index_page(Data(pool): Data<&PgPool>) -> Result { } /// Format nodes in the GraphViz format we want for the graph we will show in the UI +#[cfg_attr(not(test), allow(dead_code))] fn format_node(id: DataProductId, plan: &Plan) -> String { let dp: &DataProduct = match plan.data_product(id) { Some(dp) => dp, @@ -88,6 +132,7 @@ fn format_node(id: DataProductId, plan: &Plan) -> String { } /// Render a GraphViz DOT notation to HTML format +#[cfg_attr(not(test), allow(dead_code))] fn render_dot(dot: &str) -> Markup { // Escape backticks to prevent breaking the JavaScript template literal let escaped_dot = dot.replace('`', "\\`"); @@ -107,188 +152,629 @@ fn render_dot(dot: &str) -> Markup { } } -/// Plan Page -#[handler] -pub async fn plan_page( - Data(pool): Data<&PgPool>, - Path(dataset_id): Path, -) -> Result { - // Convert path dataset_id to UUID - let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; +async fn load_plan_page_data( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, +) -> Result { + let plan: Plan = plan_read(tx, dataset_id).await.map_err(|err| { + let error: poem::Error = err.into_poem_error(); + match error.status() { + StatusCode::NOT_FOUND => NotFoundError.into(), + _ => error, + } + })?; - // Start Transaction - let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let runs = plan_runs_read(tx, dataset_id) + .await + .map_err(InternalServerError)?; + let dag_nodes_json = build_plan_dag_nodes_json(&plan, Some("defined".to_string())) + .map_err(InternalServerError)?; + + Ok(PlanPageData { + plan, + runs, + dag_nodes_json, + }) +} - // Pull the plan from the DB - let plan: Plan = plan_read(&mut tx, dataset_id).await.map_err(|err| { +async fn load_run_page_data( + tx: &mut Transaction<'_, Postgres>, + dataset_id: DatasetId, + plan_run_id: Uuid, + selected_step_id: Option, +) -> Result { + let plan: Plan = plan_read(tx, dataset_id).await.map_err(|err| { let error: poem::Error = err.into_poem_error(); match error.status() { - // Map poem's "NotFound" states to "NotFoundError" so the 404 page comes up StatusCode::NOT_FOUND => NotFoundError.into(), _ => error, } })?; + let runs = plan_runs_read(tx, dataset_id) + .await + .map_err(InternalServerError)?; + let run_plan = plan_run_read(tx, dataset_id, plan_run_id) + .await + .map_err(|err| { + let error: poem::Error = err.into_poem_error(); + match error.status() { + StatusCode::NOT_FOUND => NotFoundError.into(), + _ => error, + } + })?; + let dag_nodes_json = build_run_dag_nodes_json(&run_plan).map_err(InternalServerError)?; + let selected_step_logs = if let Some(step_id) = selected_step_id { + plan_run_step_logs_read(tx, dataset_id, plan_run_id, step_id) + .await + .map_err(InternalServerError)? + } else { + Vec::new() + }; - let dag: DiGraph = plan.to_dag().map_err(InternalServerError)?; - - // Render plan in GraphViz DOT format - let dag_viz: String = Dot::with_attr_getters( - &dag, - &[ - Config::EdgeNoLabel, - Config::NodeNoLabel, - Config::RankDir(RankDir::LR), - ], - &|_graph, _edge| String::new(), - &|_graph, (_node_idx, data_product_id)| format_node(*data_product_id, &plan), - ) - .to_string(); + Ok(RunPageData { + plan, + runs, + run_plan, + dag_nodes_json, + selected_step_id, + selected_step_logs, + }) +} - // Json version of the payload as a pretty string - let plan_json_pretty: String = match plan.to_json() { - Some(value) => to_string_pretty(&value).unwrap_or(String::new()), - None => String::new(), - }; +fn build_plan_dag_nodes_json( + plan: &Plan, + state_override: Option, +) -> serde_json::Result { + let active_ids: HashSet = plan + .data_products + .iter() + .filter(|dp| dp.state != State::Disabled) + .map(|dp| dp.id) + .collect(); + let dependency_map: HashMap> = plan + .dependencies + .iter() + .filter(|dep| active_ids.contains(&dep.parent_id) && active_ids.contains(&dep.child_id)) + .fold(HashMap::new(), |mut acc, dep| { + acc.entry(dep.child_id) + .or_default() + .push(dep.parent_id.to_string()); + acc + }); + let dag_nodes: Vec = plan + .data_products + .iter() + .filter(|dp| dp.state != State::Disabled) + .map(|dp| NiceDagNode { + id: dp.id.to_string(), + dependencies: dependency_map.get(&dp.id).cloned().unwrap_or_default(), + data: NiceDagNodeData { + name: dp.name.clone(), + version: dp.version.clone(), + state: state_override + .clone() + .unwrap_or_else(|| dp.state.to_string()), + compute: dp.compute.to_string(), + run_id: dp.run_id.map(|run_id| run_id.to_string()), + }, + }) + .collect(); + serde_json::to_string(&dag_nodes) +} - Ok(base_layout( - "Plan", - &Some(dataset_id), - html! { - // Data Sets Stats - div class="stats shadow animate-fade" { - // Data Set - div class="stat" { - div class="stat-title" { "Dataset ID:" } - div class="stat-value" { (&plan.dataset.id) } - div class="stat-desc" { - @if plan.dataset.paused { - span class={ "badge badge-sm badge-warning" } { - "Paused" - } - } @else { - span class={ "badge badge-sm badge-info" } { - "Active" - } +fn build_run_dag_nodes(run_plan: &RunPlan) -> Vec { + let dependency_map: HashMap> = + run_plan + .dependencies + .iter() + .fold(HashMap::new(), |mut acc, dep| { + acc.entry(dep.child_id) + .or_default() + .push(dep.parent_id.to_string()); + acc + }); + let dag_nodes: Vec = run_plan + .data_products + .iter() + .map(|dp| NiceDagNode { + id: dp.data_product_id.to_string(), + dependencies: dependency_map + .get(&dp.data_product_id) + .cloned() + .unwrap_or_default(), + data: NiceDagNodeData { + name: dp.name.clone(), + version: dp.version.clone(), + state: dp.state.to_string(), + compute: dp.compute.to_string(), + run_id: dp.step_run_id.map(|run_id| run_id.to_string()), + }, + }) + .collect(); + dag_nodes +} + +fn build_run_dag_nodes_json(run_plan: &RunPlan) -> serde_json::Result { + let dag_nodes = build_run_dag_nodes(run_plan); + serde_json::to_string(&dag_nodes) +} + +fn render_plan_summary(plan: &Plan) -> Markup { + html! { + div class="stats shadow animate-fade" { + div class="stat" { + div class="stat-title" { "Dataset ID:" } + div class="stat-value" { (plan.dataset.id) } + div class="stat-desc" { + @if plan.dataset.paused { + span class={ "badge badge-sm badge-warning" } { + "Paused" + } + } @else { + span class={ "badge badge-sm badge-info" } { + "Active" } } } - // Modified - div class="stat" { - div class="stat-title" { "Modified By:" } - div class="stat-value" { (&plan.dataset.modified_by) } - div class="stat-desc" { "Date: " (&plan.dataset.modified_date) } + } + // Modified + div class="stat" { + div class="stat-title" { "Modified By:" } + div class="stat-value" { (&plan.dataset.modified_by) } + div class="stat-desc" { "Date: " (&plan.dataset.modified_date) } + } + } + div class="ml-12 mt-6 animate-fade" { + form method="post" action={ "/plan/" (plan.dataset.id) "/trigger" } { + button + type="submit" + class="btn btn-warning" + disabled[plan.dataset.paused] { + "Trigger Run Again" } } + p class="mt-2 text-sm opacity-70" { "Open a run to inspect plan state and step logs." } + @if plan.dataset.paused { + p class="mt-2 text-sm opacity-70" { + "Unpause the dataset before triggering another run." + } + } + } + } +} - // GraphViz representation of the plan's state - div class="ml-12 animate-fade" { - h2 class="text-4xl mb-2" { - span class="bg-gradient-to-r from-orange-700 to-amber-600 bg-clip-text text-transparent" { - "Plan's Current " - } - span class="bg-gradient-to-r from-amber-600 to-amber-400 bg-clip-text text-transparent" { - "State:" - } +fn render_dag_widget( + widget_id: &str, + dag_nodes_json: &str, + selection_context: Option<(DatasetId, Uuid, Option)>, +) -> Markup { + let (selection_dataset_id, selection_plan_run_id, selection_step_id) = + if let Some((dataset_id, plan_run_id, step_id)) = selection_context { + ( + Some(dataset_id.to_string()), + Some(plan_run_id.to_string()), + step_id.map(|id| id.to_string()), + ) + } else { + (None, None, None) + }; + + html! { + div + id=(widget_id) + class="fletcher-dag-widget" + data-dataset-id=[selection_dataset_id] + data-plan-run-id=[selection_plan_run_id] + data-selected-node-id=[selection_step_id] { + div class="mb-3 mr-4 flex items-center justify-end gap-2 text-sm" { + span class="opacity-70 mr-2" { "Plan view" } + button type="button" class="btn btn-sm btn-outline" data-plan-action="zoom-out" { "-" } + span class="min-w-14 text-center font-medium" data-role="zoom-label" { "100%" } + button type="button" class="btn btn-sm btn-outline" data-plan-action="zoom-in" { "+" } + button type="button" class="btn btn-sm btn-outline" data-plan-action="fit" { "Fit" } + } + div + class="fletcher-dag-canvas overflow-auto rounded-xl bg-base-100 p-4" + style="height: min(70vh, 720px);" {} + script type="application/json" class="fletcher-dag-data" { + (PreEscaped(dag_nodes_json.to_string())) + } + } + } +} + +fn render_plan_graph(dataset_id: DatasetId, dag_nodes_json: &str) -> Markup { + html! { + div class="ml-12 animate-fade" { + h2 class="text-4xl mb-2" { + span class="bg-gradient-to-r from-orange-700 to-amber-600 bg-clip-text text-transparent" { + "Plan " } - div class="ml-4" { - (render_dot(&dag_viz)) + span class="bg-gradient-to-r from-amber-600 to-amber-400 bg-clip-text text-transparent" { + "Structure:" } } + div class="ml-4" { + (render_dag_widget( + &format!("dag_widget_plan_{dataset_id}"), + dag_nodes_json, + None, + )) + } + } + } +} - // Data Products state details - h2 class="ml-12 mt-10 text-4xl mb-2 animate-fade" { +fn render_run_graph( + dataset_id: DatasetId, + plan_run_id: Uuid, + dag_nodes_json: &str, + selected_step_id: Option, +) -> Markup { + html! { + div class="ml-12 animate-fade" { + h2 class="text-4xl mb-2" { span class="bg-gradient-to-r from-orange-700 to-amber-600 bg-clip-text text-transparent" { - "Data " + "Run " } span class="bg-gradient-to-r from-amber-600 to-amber-400 bg-clip-text text-transparent" { - "Products:" + "Plan:" } } - table class="table table-zebra table-sm animate-fade" { - thead { - tr { - th { "Data Product ID" } - th { "Compute" } - th { "Name" } - th { "Version" } - th { "Eager" } - th { "State" } - th { "Run ID" } - th { "Link" } - th { "Last Update" } - } + div class="ml-4" { + (render_dag_widget( + &format!("dag_widget_run_{plan_run_id}"), + dag_nodes_json, + Some((dataset_id, plan_run_id, selected_step_id)), + )) + } + } + } +} + +fn render_run_list( + dataset_id: DatasetId, + runs: &[PlanRun], + selected_run_id: Option, +) -> Markup { + html! { + div class="ml-12 mt-10 mr-12 animate-fade" { + h2 class="text-4xl mb-2" { + span class="bg-gradient-to-r from-orange-700 to-amber-600 bg-clip-text text-transparent" { + "Plan " + } + span class="bg-gradient-to-r from-amber-600 to-amber-400 bg-clip-text text-transparent" { + "Runs:" } - tbody { - @for dp in &plan.data_products { - // Badge style for the eager flag - @let eager_badge: &str = if dp.eager { - "badge-primary" + } + div class="max-h-[24rem] overflow-y-auto pr-2" { + @if runs.is_empty() { + p class="opacity-70" { "No runs recorded yet." } + } @else { + div class="space-y-2" { + @for run in runs { + @let row_class = if Some(run.id) == selected_run_id { + "flex items-center justify-between rounded-xl border border-warning bg-base-200 px-4 py-3" } else { - "badge-secondary" - }; - - // Badge style for the state flag - @let state_badge: &str = match dp.state { - State::Disabled => "badge-error", - State::Failed => "badge-error", - State::Queued => "badge-neutral", - State::Running => "badge-info", - State::Success => "badge-success", - State::Waiting => "badge-neutral", + "flex items-center justify-between rounded-xl border border-base-300 bg-base-100 px-4 py-3" }; - - tr - id={ "row_" (dp.id) } - class="hover:bg-base-300" { - td { (dp.id) } - td { (dp.compute) } - td { (dp.name) } - td { (dp.version) } - // Put badges into span since if the badge is at the td level, and two - // badge td meet, they merge into one for some reason - td { span class={ "badge " (eager_badge) } { - (dp.eager) - } } - td { span class={ "badge " (state_badge) } { - (dp.state) - } } - @if let Some(run_id) = &dp.run_id { - td { (run_id) } - } @else { - td {} - } - @if let Some(link) = &dp.link { - td { - a href=(link) { - (link) - } + div class=(row_class) { + div class="space-y-1" { + a class="font-semibold link link-hover" href={ "/plan/" (dataset_id) "/run/" (run.id) } { + (run.id) } - } @else { - td {} + div class="text-sm opacity-70" { + "Started " (run.created_at) " by " (&run.created_by) + } + } + span class={ "badge " (match run.status { + State::Success => "badge-success", + State::Failed => "badge-error", + State::Running => "badge-info", + State::Queued | State::Waiting => "badge-neutral", + State::Disabled => "badge-neutral", + }) } { + (run.status) } - td { (dp.modified_date) } } } + } } } - // Json Payload for the Plan - h2 class="ml-12 mt-10 text-4xl mb-2 animate-fade" { + } + } +} + +fn render_selected_step_logs( + run_plan: &RunPlan, + selected_step_id: Option, + logs: &[ExecutionLog], +) -> Markup { + html! { + div class="ml-12 mt-10 mr-12 animate-fade" { + h2 class="text-4xl mb-2" { span class="bg-gradient-to-r from-orange-700 to-amber-600 bg-clip-text text-transparent" { - "Plan " + "Step " } span class="bg-gradient-to-r from-amber-600 to-amber-400 bg-clip-text text-transparent" { - "Details:" + "Logs:" } } - pre class="animate-fade" { - code class="language-json" { - (plan_json_pretty) + @if let Some(step_id) = selected_step_id { + @if let Some(step) = run_plan + .data_products + .iter() + .find(|dp| dp.data_product_id == step_id) { + div class="mt-3 rounded-xl border border-base-300 bg-base-100 p-4" { + div class="mb-3 text-sm opacity-80" { + (step.name) " (" (step.data_product_id) ")" + } + @if logs.is_empty() { + p class="opacity-70" { "No logs recorded for this step in the selected run." } + } @else { + pre class="text-sm whitespace-pre-wrap" { + @for log in logs { + "[" (log.created_at) "] " + (log.stream) ": " + (log.message) + "\n" + } + } + } + } + } @else { + p class="mt-3 opacity-70" { "Selected step was not found in this run." } } + } @else { + p class="mt-3 opacity-70" { "Click a plan step to view its logs." } + } + } + } +} + +fn selected_step_from_query(params: &RunStepQuery) -> Result> { + match params.step.as_deref() { + Some(step) => Ok(Some( + DataProductId::try_parse(step).map_err(|_| NotFoundError)?, + )), + None => Ok(None), + } +} + +fn run_page_script(dataset_id: DatasetId, plan_run_id: Uuid, poll_live: bool) -> Markup { + let logs_url = format!("/plan/{dataset_id}/run/{plan_run_id}/logs"); + let nodes_url = format!("/plan/{dataset_id}/run/{plan_run_id}/nodes"); + let widget_id = format!("dag_widget_run_{plan_run_id}"); + let script = PreEscaped(format!( + r##" +window.fletcherRunLogsUrl = {logs_url:?}; +window.fletcherRunNodesUrl = {nodes_url:?}; +window.fletcherRunWidgetId = {widget_id:?}; +if (!window.__fletcherRunStepHandlerInstalled) {{ + window.__fletcherRunStepHandlerInstalled = true; + window.addEventListener("fletcher:dag-step-selected", function (event) {{ + const detail = event.detail || {{}}; + if (!detail.dataProductId) return; + const url = new URL(window.location.href); + url.searchParams.set("step", detail.dataProductId); + window.history.replaceState({{}}, "", url.toString()); + if (window.updatePlanDagSelection) {{ + window.updatePlanDagSelection(window.fletcherRunWidgetId, detail.dataProductId); + }} + if (window.htmx) {{ + window.htmx.ajax("GET", `${{window.fletcherRunLogsUrl}}?step=${{encodeURIComponent(detail.dataProductId)}}`, {{ + target: "#run_logs", + swap: "innerHTML", + }}); + }} + }}); +}} +if ({poll_live}) {{ + if (window.__fletcherRunPollTimer) {{ + window.clearInterval(window.__fletcherRunPollTimer); + }} + window.__fletcherRunPollTimer = window.setInterval(function () {{ + fetch(window.fletcherRunNodesUrl, {{ headers: {{ "Accept": "application/json" }} }}) + .then(function (response) {{ return response.ok ? response.json() : null; }}) + .then(function (payload) {{ + if (!payload || !window.updatePlanDagWidgetNodes) return; + window.updatePlanDagWidgetNodes(window.fletcherRunWidgetId, payload.nodes || []); + if (!payload.active && window.__fletcherRunPollTimer) {{ + window.clearInterval(window.__fletcherRunPollTimer); + window.__fletcherRunPollTimer = null; + }} + }}) + .catch(function () {{}}); + }}, 2000); +}} +"##, + poll_live = if poll_live { "true" } else { "false" }, + )); + html! { script { (script) } } +} + +/// Trigger a plan rerun from the UI by clearing all non-disabled data products. +#[handler] +pub async fn trigger_plan_post( + Data(app_config): Data<&Config>, + Data(pool): Data<&PgPool>, + Path(dataset_id): Path, +) -> Result { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let plan = plan_read(&mut tx, dataset_id).await.map_err(|err| { + let error: poem::Error = err.into_poem_error(); + match error.status() { + StatusCode::NOT_FOUND => NotFoundError.into(), + _ => error, + } + })?; + + let rerun_ids: Vec = plan + .data_products + .iter() + .filter_map(|dp| (dp.state != State::Disabled).then_some(dp.id)) + .collect(); + + if !rerun_ids.is_empty() { + clear_edit_with_config( + &mut tx, + app_config, + dataset_id, + &rerun_ids, + &app_config.worker_service, + ) + .await + .map_err(InternalServerError)?; + } + + let latest_run_id = plan_runs_read(&mut tx, dataset_id) + .await + .map_err(InternalServerError)? + .into_iter() + .next() + .map(|run| run.id); + + tx.commit().await.map_err(InternalServerError)?; + Ok(Redirect::see_other( + latest_run_id + .map(|plan_run_id| format!("/plan/{dataset_id}/run/{plan_run_id}")) + .unwrap_or_else(|| format!("/plan/{dataset_id}")), + )) +} + +/// Plan Page +#[handler] +pub async fn plan_page( + Data(pool): Data<&PgPool>, + Path(dataset_id): Path, +) -> Result { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let plan_data = load_plan_page_data(&mut tx, dataset_id).await?; + + Ok(base_layout( + "Plan", + &Some(dataset_id), + html! { + script src="/assets/nice-dag/nice-dag-core.js" {} + script src="/assets/js/plan-dag.js" {} + (render_plan_summary(&plan_data.plan)) + (render_plan_graph(dataset_id, &plan_data.dag_nodes_json)) + (render_run_list(dataset_id, &plan_data.runs, None)) + }, + )) +} + +#[handler] +pub async fn plan_run_page( + Data(pool): Data<&PgPool>, + Path((dataset_id, plan_run_id)): Path<(String, String)>, + Query(params): Query, +) -> Result { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + let plan_run_id = Uuid::try_parse(&plan_run_id).map_err(|_| NotFoundError)?; + let selected_step_id = selected_step_from_query(¶ms)?; + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let run_data = load_run_page_data(&mut tx, dataset_id, plan_run_id, selected_step_id).await?; + let poll_live = run_data.run_plan.run.is_active(); + + Ok(base_layout( + "Run", + &Some(dataset_id), + html! { + script src="/assets/nice-dag/nice-dag-core.js" {} + script src="/assets/js/plan-dag.js" {} + (run_page_script(dataset_id, plan_run_id, poll_live)) + (render_plan_summary(&run_data.plan)) + div id="run_graph" { + (render_run_graph( + dataset_id, + plan_run_id, + &run_data.dag_nodes_json, + run_data.selected_step_id, + )) + } + div id="run_logs" { + (render_selected_step_logs( + &run_data.run_plan, + run_data.selected_step_id, + &run_data.selected_step_logs, + )) } + (render_run_list(dataset_id, &run_data.runs, Some(plan_run_id))) }, )) } +#[handler] +pub async fn plan_run_nodes_get( + Data(pool): Data<&PgPool>, + Path((dataset_id, plan_run_id)): Path<(String, String)>, +) -> Result> { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + let plan_run_id = Uuid::try_parse(&plan_run_id).map_err(|_| NotFoundError)?; + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let run_data = plan_run_read(&mut tx, dataset_id, plan_run_id) + .await + .map_err(|err| { + let error: poem::Error = err.into_poem_error(); + match error.status() { + StatusCode::NOT_FOUND => NotFoundError.into(), + _ => error, + } + })?; + + Ok(Json(RunDagUpdatePayload { + nodes: build_run_dag_nodes(&run_data), + active: run_data.run.is_active(), + })) +} + +#[handler] +pub async fn plan_run_graph_get( + Data(pool): Data<&PgPool>, + Path((dataset_id, plan_run_id)): Path<(String, String)>, + Query(params): Query, +) -> Result { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + let plan_run_id = Uuid::try_parse(&plan_run_id).map_err(|_| NotFoundError)?; + let selected_step_id = selected_step_from_query(¶ms)?; + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let run_data = load_run_page_data(&mut tx, dataset_id, plan_run_id, selected_step_id).await?; + + Ok(html! { + (render_run_graph( + dataset_id, + plan_run_id, + &run_data.dag_nodes_json, + run_data.selected_step_id, + )) + }) +} + +#[handler] +pub async fn plan_run_logs_get( + Data(pool): Data<&PgPool>, + Path((dataset_id, plan_run_id)): Path<(String, String)>, + Query(params): Query, +) -> Result { + let dataset_id = DatasetId::try_parse(&dataset_id).map_err(|_| NotFoundError)?; + let plan_run_id = Uuid::try_parse(&plan_run_id).map_err(|_| NotFoundError)?; + let selected_step_id = selected_step_from_query(¶ms)?; + let mut tx: Transaction<'_, Postgres> = pool.begin().await.map_err(InternalServerError)?; + let run_data = load_run_page_data(&mut tx, dataset_id, plan_run_id, selected_step_id).await?; + + Ok(html! { + (render_selected_step_logs( + &run_data.run_plan, + run_data.selected_step_id, + &run_data.selected_step_logs, + )) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -296,7 +782,7 @@ mod tests { Compute, DataProduct, DataProductParam, Dataset, DatasetParam, DependencyParam, PlanParam, State, }; - use crate::ui::user_service; + use crate::{load_config, ui::user_service}; use chrono::Utc; use poem::{ EndpointExt, @@ -1268,17 +1754,17 @@ mod tests { assert!(title.is_some(), "Should have title element"); assert_eq!(title.unwrap().inner_html(), "Fletcher"); - // Should contain plan heading - let h2_selector = Selector::parse("h2").unwrap(); - let h2 = document.select(&h2_selector).next(); - assert!(h2.is_some(), "Should have h2 element"); - - // Check for spans within h2 - let span_selector = Selector::parse("span").unwrap(); - let spans: Vec<_> = h2.unwrap().select(&span_selector).collect(); - assert_eq!(spans.len(), 2, "h2 should have exactly 2 span elements"); - assert_eq!(spans[0].inner_html(), "Plan's Current "); - assert_eq!(spans[1].inner_html(), "State:"); + assert!( + html_content.contains("Dataset ID:"), + "Should render the plan summary block" + ); + assert!( + html_content.contains("Plan ") + && html_content.contains("Structure:") + && html_content.contains("Plan ") + && html_content.contains("Runs:"), + "Should render the new plan structure and runs sections" + ); // Should have navigation with both Search and Plan links let nav_selector = Selector::parse("nav ul li").unwrap(); @@ -1303,48 +1789,268 @@ mod tests { Some(format!("/plan/{dataset_id}").as_str()) ); - // Should contain data products table - let table_selector = Selector::parse("table").unwrap(); - let table = document.select(&table_selector).next(); - assert!(table.is_some(), "Should have data products table"); + let dag_selector = Selector::parse(".fletcher-dag-widget").unwrap(); + let dag_widgets: Vec<_> = document.select(&dag_selector).collect(); + assert_eq!(dag_widgets.len(), 1, "Plan page should have one DAG widget"); + assert!( + html_content.contains("No runs recorded yet.") || html_content.contains("/run/"), + "Should show run history content" + ); + assert!( + !html_content.contains("language-json"), + "Plan JSON details block should be hidden" + ); + } - // Should have correct table headers - let th_selector = Selector::parse("th").unwrap(); - let headers: Vec<_> = document.select(&th_selector).collect(); - assert_eq!(headers.len(), 9, "Should have 9 table headers"); - assert_eq!(headers[0].inner_html(), "Data Product ID"); - assert_eq!(headers[1].inner_html(), "Compute"); - assert_eq!(headers[2].inner_html(), "Name"); - assert_eq!(headers[3].inner_html(), "Version"); - assert_eq!(headers[4].inner_html(), "Eager"); - assert_eq!(headers[5].inner_html(), "State"); - assert_eq!(headers[6].inner_html(), "Run ID"); - assert_eq!(headers[7].inner_html(), "Link"); - assert_eq!(headers[8].inner_html(), "Last Update"); - - // Should have data product rows - let tr_selector = Selector::parse("tbody tr").unwrap(); - let rows: Vec<_> = document.select(&tr_selector).collect(); - assert_eq!(rows.len(), 3, "Should have 3 data product rows"); + /// Test trigger button reruns a plan from the UI + #[sqlx::test] + async fn test_trigger_plan_post(pool: PgPool) { + let dataset_id = Uuid::new_v4(); + let dp1_id = Uuid::new_v4(); + let dp2_id = Uuid::new_v4(); + let config = load_config().unwrap(); + + let mut tx = pool.begin().await.unwrap(); + let plan_param = PlanParam { + dataset: DatasetParam { + id: dataset_id, + extra: Some(json!({"test":"trigger"})), + }, + data_products: vec![ + DataProductParam { + id: dp1_id, + compute: Compute::Cams, + name: "step-1".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + DataProductParam { + id: dp2_id, + compute: Compute::Dbxaas, + name: "step-2".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + ], + dependencies: vec![DependencyParam { + parent_id: dp1_id, + child_id: dp2_id, + extra: None, + }], + }; + plan_param + .upsert(&mut tx, "test_user", Utc::now()) + .await + .unwrap(); + tx.commit().await.unwrap(); + + let app = user_service().data(config).data(pool.clone()); + let cli = TestClient::new(app); + let response = cli.post(format!("/plan/{dataset_id}/trigger")).send().await; + response.assert_status(StatusCode::SEE_OTHER); + let location = response + .0 + .headers() + .get("location") + .unwrap() + .to_str() + .unwrap(); + assert!( + location.starts_with(&format!("/plan/{dataset_id}/run/")), + "Trigger should redirect to the latest run page" + ); + + let mut verify_tx = pool.begin().await.unwrap(); + let plan = plan_read(&mut verify_tx, dataset_id).await.unwrap(); + let step_1 = plan.data_product(dp1_id).unwrap(); + let step_2 = plan.data_product(dp2_id).unwrap(); + assert_eq!(step_1.state, State::Queued); + assert_eq!(step_2.state, State::Waiting); + assert!(step_1.run_id.is_some()); + } + + /// Test the plan page shows the structure DAG and run history links. + #[sqlx::test] + async fn test_plan_page_shows_structure_and_runs(pool: PgPool) { + let mut tx = pool.begin().await.unwrap(); + let dataset_id = Uuid::new_v4(); + setup_test_plan_with_data_products(&mut tx, dataset_id, 1).await; + tx.commit().await.unwrap(); + + let app = user_service().data(pool.clone()); + let cli = TestClient::new(app); + + let response: TestResponse = cli.get(format!("/plan/{dataset_id}")).send().await; + response.assert_status_is_ok(); + + let html_content = response.0.into_body().into_string().await.unwrap(); + assert!( + html_content.contains("Plan ") && html_content.contains("Structure:"), + "Plan page should show the plan structure section" + ); + assert!( + html_content.contains("Plan ") && html_content.contains("Runs:"), + "Plan page should show the plan runs section" + ); + assert!( + html_content.contains("nice-dag-core.js"), + "Plan page should load the DAG widget assets" + ); + assert!( + html_content.contains("max-h-[24rem]") && html_content.contains("overflow-y-auto"), + "Plan page should make the run list scrollable" + ); + assert!( + html_content.contains("No runs recorded yet.") || html_content.contains("/run/"), + "Plan page should show run history content" + ); + } - // Verify first row structure - let first_row = &rows[0]; - let td_selector = Selector::parse("td").unwrap(); - let cells: Vec<_> = first_row.select(&td_selector).collect(); - assert_eq!(cells.len(), 9, "Each row should have 9 cells"); + /// Test the run page renders the selected run DAG and step log pane. + #[sqlx::test] + async fn test_plan_run_page(pool: PgPool) { + let mut tx = pool.begin().await.unwrap(); + let dataset_id = Uuid::new_v4(); + let config = load_config().unwrap(); + let dp1_id = Uuid::new_v4(); + let dp2_id = Uuid::new_v4(); + let plan_param = PlanParam { + dataset: DatasetParam { + id: dataset_id, + extra: Some(json!({"test":"run-page"})), + }, + data_products: vec![ + DataProductParam { + id: dp1_id, + compute: Compute::Cams, + name: "step-1".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + DataProductParam { + id: dp2_id, + compute: Compute::Dbxaas, + name: "step-2".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + ], + dependencies: vec![DependencyParam { + parent_id: dp1_id, + child_id: dp2_id, + extra: None, + }], + }; + crate::core::plan_add_with_config(&mut tx, &config, &plan_param, "test_user") + .await + .unwrap(); + let runs = plan_runs_read(&mut tx, dataset_id).await.unwrap(); + tx.commit().await.unwrap(); + + let app = user_service().data(config).data(pool.clone()); + let cli = TestClient::new(app); + + let response: TestResponse = cli + .get(format!("/plan/{dataset_id}/run/{}", runs[0].id)) + .send() + .await; + response.assert_status_is_ok(); - // Check that cells contain expected content types + let html_content = response.0.into_body().into_string().await.unwrap(); + let run_graph_index = html_content.find("id=\"run_graph\"").unwrap(); + let run_list_index = html_content.find("Plan Runs:").unwrap(); + assert!( + html_content.contains("Run ") && html_content.contains("Plan:"), + "Run page should include the run plan section" + ); assert!( - !cells[0].inner_html().is_empty(), - "Data Product ID cell should not be empty" + html_content.contains("Step ") && html_content.contains("Logs:"), + "Run page should include the step logs section" ); assert!( - cells[2].inner_html().contains("test-product"), - "Name cell should contain test-product" + html_content.contains("Click a plan step to view its logs."), + "Run page should prompt for step selection" ); assert!( - cells[3].inner_html().contains("1."), - "Version cell should contain version number" + run_graph_index < run_list_index, + "Run page should keep the DAG above the run list" + ); + } + + /// Test the run graph fragment route renders the nice-dag widget payload. + #[sqlx::test] + async fn test_plan_run_graph_get(pool: PgPool) { + let mut tx = pool.begin().await.unwrap(); + let dataset_id = Uuid::new_v4(); + let config = load_config().unwrap(); + let dp1_id = Uuid::new_v4(); + let dp2_id = Uuid::new_v4(); + let plan_param = PlanParam { + dataset: DatasetParam { + id: dataset_id, + extra: None, + }, + data_products: vec![ + DataProductParam { + id: dp1_id, + compute: Compute::Cams, + name: "step-1".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + DataProductParam { + id: dp2_id, + compute: Compute::Dbxaas, + name: "step-2".to_string(), + version: "1.0.0".to_string(), + eager: true, + passthrough: None, + extra: None, + }, + ], + dependencies: vec![DependencyParam { + parent_id: dp1_id, + child_id: dp2_id, + extra: None, + }], + }; + crate::core::plan_add_with_config(&mut tx, &config, &plan_param, "test_user") + .await + .unwrap(); + let runs = plan_runs_read(&mut tx, dataset_id).await.unwrap(); + tx.commit().await.unwrap(); + + let app = user_service().data(config).data(pool.clone()); + let cli = TestClient::new(app); + + let response: TestResponse = cli + .get(format!("/plan/{dataset_id}/run/{}/graph", runs[0].id)) + .send() + .await; + response.assert_status_is_ok(); + + let html_content = response.0.into_body().into_string().await.unwrap(); + assert!( + html_content.contains("fletcher-dag-widget"), + "Graph fragment should include the DAG widget wrapper" + ); + assert!( + html_content.contains("fletcher-dag-data"), + "Graph fragment should include DAG JSON payload" + ); + assert!( + !html_content.contains("Fletcher"), + "Graph fragment should not render the full page layout" ); } @@ -1408,33 +2114,28 @@ mod tests { let h2 = document.select(&h2_selector).next(); assert!(h2.is_some(), "Should have h2 element even with empty plan"); - // Check for spans within h2 - let span_selector = Selector::parse("span").unwrap(); - let spans: Vec<_> = h2.unwrap().select(&span_selector).collect(); - assert_eq!(spans.len(), 2, "h2 should have exactly 2 span elements"); - assert_eq!(spans[0].inner_html(), "Plan's Current "); - assert_eq!(spans[1].inner_html(), "State:"); - - // Should still have the table structure - let table_selector = Selector::parse("table").unwrap(); - let table = document.select(&table_selector).next(); - assert!(table.is_some(), "Should have table even with empty plan"); + assert!( + html_content.contains("Plan ") + && html_content.contains("Structure:") + && html_content.contains("Plan ") + && html_content.contains("Runs:"), + "Should render the plan structure and runs sections" + ); - // Should have no data product rows - let tr_selector = Selector::parse("tbody tr").unwrap(); - let rows: Vec<_> = document.select(&tr_selector).collect(); - assert_eq!( - rows.len(), - 0, - "Should have 0 data product rows for empty plan" + assert!( + document + .root_element() + .html() + .contains("No runs recorded yet."), + "Should show an empty run history message" ); - // Should still have visualization div - let graph_div_selector = Selector::parse("div#graph").unwrap(); + // Should still have DAG widget container + let graph_div_selector = Selector::parse("div[id^='dag_widget_']").unwrap(); let graph_div = document.select(&graph_div_selector).next(); assert!( graph_div.is_some(), - "Should have visualization div even with empty plan" + "Should have DAG widget container even with empty plan" ); } @@ -1464,55 +2165,37 @@ mod tests { // Parse HTML and verify detailed structure let document = Html::parse_fragment(&html_content); - // Should contain div with id="graph" for GraphViz visualization - let graph_div_selector = Selector::parse("div#graph").unwrap(); + // Should contain DAG widget container for visualization + let graph_div_selector = Selector::parse("div[id^='dag_widget_']").unwrap(); let graph_div = document.select(&graph_div_selector).next(); assert!( graph_div.is_some(), - "Should have div with id='graph' for visualization" + "Should have a DAG widget container for visualization" ); - // Should contain script tags for viz-js + // Should contain script tags for the widget assets let script_selector = Selector::parse("script").unwrap(); let scripts: Vec<_> = document.select(&script_selector).collect(); assert!( scripts.len() >= 2, - "Should have at least 2 script tags for visualization" + "Should have at least 2 script tags for the DAG widget" ); - // Should have viz-js library script - let viz_script = scripts.iter().find(|script| { + // Should have nice-dag library script + let nice_dag_script = scripts.iter().find(|script| { script .attr("src") - .is_some_and(|src| src.contains("viz-global.js")) + .is_some_and(|src| src.contains("nice-dag-core.js")) }); - assert!(viz_script.is_some(), "Should have viz-js library script"); - - // Should contain table with tbody for data products - let tbody_selector = Selector::parse("table tbody").unwrap(); - let tbody = document.select(&tbody_selector).next(); - assert!(tbody.is_some(), "Should have tbody in data products table"); - - // Verify data product rows have correct structure - let tr_selector = Selector::parse("tbody tr").unwrap(); - let rows: Vec<_> = document.select(&tr_selector).collect(); + assert!( + nice_dag_script.is_some(), + "Should have nice-dag library script" + ); - for (i, row) in rows.iter().enumerate() { - // Each row should have id attribute - assert!( - row.attr("id").is_some(), - "Each row should have id attribute" - ); - assert!( - row.attr("id").unwrap().starts_with("row_"), - "Row id should start with 'row_'" - ); - - // Each row should have 9 cells - let td_selector = Selector::parse("td").unwrap(); - let cells: Vec<_> = row.select(&td_selector).collect(); - assert_eq!(cells.len(), 9, "Row {} should have 9 cells", i); - } + assert!( + html_content.contains("Plan ") && html_content.contains("Runs:"), + "Should contain the run history section" + ); } /// Test plan_page content validation for specific elements @@ -1548,32 +2231,24 @@ mod tests { "Should contain Plan title span" ); assert!( - html_content.contains("Plan's Current "), - "Should contain Plan's Current span" - ); - assert!( - html_content.contains("State:"), - "Should contain State span" + html_content.contains("Plan "), + "Should contain Plan span" ); assert!( - html_content.contains("Data "), - "Should contain Data span" + html_content.contains("Structure:"), + "Should contain Structure span" ); assert!( - html_content.contains("Products:"), - "Should contain Products span" + html_content.contains("Runs:"), + "Should contain Runs span" ); assert!( html_content.contains("Plan "), "Should contain Plan span" ); assert!( - html_content.contains("Details:"), - "Should contain Details span" - ); - assert!( - html_content.contains("Data Product ID"), - "Should contain table headers" + html_content.contains("No runs recorded yet.") || html_content.contains("/run/"), + "Should contain run history content" ); assert!( html_content.contains("test-product-0"), @@ -1584,32 +2259,31 @@ mod tests { "Should contain test data product version" ); assert!( - html_content.contains("Viz.instance()"), - "Should contain visualization JavaScript" + html_content.contains("fletcher-dag-widget"), + "Should contain DAG widget markup" ); assert!( - html_content.contains("viz-global.js"), - "Should reference viz-js library" + html_content.contains("nice-dag-core.js"), + "Should reference nice-dag library" ); assert!( - html_content.contains("id=\"graph\""), - "Should contain graph div id" + html_content.contains("id=\"dag_widget_"), + "Should contain DAG widget container id" ); - // Verify JSON rendering assert!( - html_content.contains("class=\"language-json\""), - "Should contain Prism.js JSON language class" + !html_content.contains("class=\"language-json\""), + "Plan details JSON block should be hidden" ); assert!( - html_content.contains("
"),
-            "Should contain properly formatted JSON code block with animation class"
+            html_content.contains("max-h-[24rem]") && html_content.contains("overflow-y-auto"),
+            "Should contain a scrollable run history list"
         );
     }
 
-    /// Test plan_page JSON rendering functionality
+    /// Test plan_page details are hidden.
     #[sqlx::test]
-    async fn test_plan_page_json_rendering(pool: PgPool) {
+    async fn test_plan_page_details_hidden(pool: PgPool) {
         let mut tx = pool.begin().await.unwrap();
 
         // Setup test data with specific values for JSON validation
@@ -1630,145 +2304,19 @@ mod tests {
         // Get response text (HTML)
         let html_content = response.0.into_body().into_string().await.unwrap();
 
-        // Parse HTML and verify JSON rendering structure
-        let document = Html::parse_fragment(&html_content);
-
-        // Should contain "Plan Details:" heading with spans
-        let h2_selector = Selector::parse("h2").unwrap();
-        let headings: Vec<_> = document.select(&h2_selector).collect();
-
-        // Find the heading that contains "Plan " and "Details:" in spans
-        let plan_details_heading = headings.iter().find(|h2| {
-            let span_selector = Selector::parse("span").unwrap();
-            let spans: Vec<_> = h2.select(&span_selector).collect();
-            spans.len() == 2
-                && spans[0].inner_html() == "Plan "
-                && spans[1].inner_html() == "Details:"
-        });
-        assert!(
-            plan_details_heading.is_some(),
-            "Should have 'Plan Details:' heading with proper span structure"
-        );
-
-        // Should contain pre > code structure with language-json class
-        let pre_code_selector = Selector::parse("pre code.language-json").unwrap();
-        let json_code_block = document.select(&pre_code_selector).next();
-        assert!(
-            json_code_block.is_some(),
-            "Should have pre > code.language-json structure"
-        );
-
-        // Get the JSON content from the code block
-        let json_content = json_code_block.unwrap().inner_html();
-        assert!(!json_content.is_empty(), "JSON content should not be empty");
-
-        // Verify JSON structure contains expected plan elements
         assert!(
-            json_content.contains("dataset"),
-            "JSON should contain dataset object"
+            !html_content.contains("Details:"),
+            "Plan details heading should be hidden"
         );
         assert!(
-            json_content.contains("data_products"),
-            "JSON should contain data_products array"
-        );
-        assert!(
-            json_content.contains("dependencies"),
-            "JSON should contain dependencies array"
-        );
-
-        // Verify dataset fields are present
-        assert!(
-            json_content.contains("\"id\""),
-            "JSON should contain dataset id field"
-        );
-        assert!(
-            json_content.contains("\"paused\""),
-            "JSON should contain paused field"
-        );
-        assert!(
-            json_content.contains("\"modified_by\""),
-            "JSON should contain modified_by field"
-        );
-        assert!(
-            json_content.contains("\"modified_date\""),
-            "JSON should contain modified_date field"
-        );
-
-        // Verify data product fields are present
-        assert!(
-            json_content.contains("test-product-0"),
-            "JSON should contain first data product name"
-        );
-        assert!(
-            json_content.contains("test-product-1"),
-            "JSON should contain second data product name"
-        );
-        assert!(
-            json_content.contains("\"compute\""),
-            "JSON should contain compute field"
-        );
-        assert!(
-            json_content.contains("\"version\""),
-            "JSON should contain version field"
-        );
-        assert!(
-            json_content.contains("\"eager\""),
-            "JSON should contain eager field"
-        );
-        assert!(
-            json_content.contains("\"state\""),
-            "JSON should contain state field"
-        );
-
-        // Verify JSON is properly formatted (should contain indentation/whitespace)
-        assert!(
-            json_content.contains("\n"),
-            "JSON should be pretty-printed with newlines"
-        );
-        assert!(json_content.contains("  "), "JSON should be indented");
-
-        // Verify the JSON content is valid by attempting to parse it
-        let parsed_json: Result = serde_json::from_str(&json_content);
-        assert!(parsed_json.is_ok(), "JSON content should be valid JSON");
-
-        // Verify specific JSON structure
-        let json_value = parsed_json.unwrap();
-        assert!(json_value.is_object(), "Root JSON should be an object");
-        assert!(
-            json_value.get("dataset").is_some(),
-            "Should have dataset field"
-        );
-        assert!(
-            json_value.get("data_products").is_some(),
-            "Should have data_products field"
-        );
-        assert!(
-            json_value.get("dependencies").is_some(),
-            "Should have dependencies field"
-        );
-
-        // Verify data_products is an array with correct count
-        let data_products = json_value.get("data_products").unwrap();
-        assert!(data_products.is_array(), "data_products should be an array");
-        assert_eq!(
-            data_products.as_array().unwrap().len(),
-            2,
-            "Should have 2 data products"
-        );
-
-        // Verify dependencies is an array
-        let dependencies = json_value.get("dependencies").unwrap();
-        assert!(dependencies.is_array(), "dependencies should be an array");
-        assert_eq!(
-            dependencies.as_array().unwrap().len(),
-            1,
-            "Should have 1 dependency"
+            !html_content.contains("language-json"),
+            "Plan details JSON block should be hidden"
         );
     }
 
-    /// Test plan_page JSON rendering with empty plan
+    /// Test hidden plan details with empty plan.
     #[sqlx::test]
-    async fn test_plan_page_json_rendering_empty_plan(pool: PgPool) {
+    async fn test_plan_page_details_hidden_empty_plan(pool: PgPool) {
         let mut tx = pool.begin().await.unwrap();
 
         // Setup test data with no data products
@@ -1789,56 +2337,13 @@ mod tests {
         // Get response text (HTML)
         let html_content = response.0.into_body().into_string().await.unwrap();
 
-        // Parse HTML and verify JSON rendering structure for empty plan
-        let document = Html::parse_fragment(&html_content);
-
-        // Should still contain JSON structure even with empty plan
-        let pre_code_selector = Selector::parse("pre code.language-json").unwrap();
-        let json_code_block = document.select(&pre_code_selector).next();
-        assert!(
-            json_code_block.is_some(),
-            "Should have JSON structure even with empty plan"
-        );
-
-        // Get the JSON content
-        let json_content = json_code_block.unwrap().inner_html();
         assert!(
-            !json_content.is_empty(),
-            "JSON content should not be empty even with empty plan"
-        );
-
-        // Verify JSON is valid and contains expected structure
-        let parsed_json: Result = serde_json::from_str(&json_content);
-        assert!(
-            parsed_json.is_ok(),
-            "JSON content should be valid JSON for empty plan"
-        );
-
-        let json_value = parsed_json.unwrap();
-        assert!(
-            json_value.get("dataset").is_some(),
-            "Should have dataset field"
+            !html_content.contains("language-json"),
+            "Hidden plan details should remain hidden for empty plans"
         );
         assert!(
-            json_value.get("data_products").is_some(),
-            "Should have data_products field"
-        );
-
-        // Verify empty arrays
-        let data_products = json_value.get("data_products").unwrap();
-        assert!(data_products.is_array(), "data_products should be an array");
-        assert_eq!(
-            data_products.as_array().unwrap().len(),
-            0,
-            "Should have 0 data products"
-        );
-
-        let dependencies = json_value.get("dependencies").unwrap();
-        assert!(dependencies.is_array(), "dependencies should be an array");
-        assert_eq!(
-            dependencies.as_array().unwrap().len(),
-            0,
-            "Should have 0 dependencies"
+            html_content.contains("No runs recorded yet."),
+            "Empty plans should still show empty run history"
         );
     }
 }
diff --git a/src/worker.rs b/src/worker.rs
new file mode 100644
index 0000000..d4b6be0
--- /dev/null
+++ b/src/worker.rs
@@ -0,0 +1,402 @@
+use crate::{
+    error::{Error, Result},
+    model::{DataProductId, DatasetId, State},
+};
+use clap::{Args, Parser, Subcommand, ValueEnum};
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
+use serde_json::{Value, json};
+use std::{
+    path::{Path, PathBuf},
+    process::Stdio,
+    time::Instant,
+};
+use tokio::{
+    io::{AsyncBufReadExt, BufReader},
+    process::Command as TokioCommand,
+};
+use tracing::info;
+use uuid::Uuid;
+
+#[derive(Parser)]
+pub struct Cli {
+    #[command(subcommand)]
+    pub command: Option,
+}
+
+#[derive(Subcommand)]
+pub enum CliCommand {
+    Worker(WorkerArgs),
+}
+
+#[derive(Args, Clone, Debug)]
+pub struct WorkerArgs {
+    #[arg(long)]
+    pub remote: String,
+    #[arg(long)]
+    pub dataset_id: DatasetId,
+    #[arg(long)]
+    pub data_product_id: DataProductId,
+    #[arg(long)]
+    pub run_id: Uuid,
+    #[arg(long)]
+    pub script: PathBuf,
+    #[arg(long, value_enum)]
+    pub runner: WorkerRunner,
+    #[arg(long, default_value = "local")]
+    pub service: String,
+    #[arg(long)]
+    pub key: Option,
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, ValueEnum)]
+#[serde(rename_all = "lowercase")]
+pub enum WorkerRunner {
+    Bash,
+    Python,
+}
+
+impl WorkerRunner {
+    pub const fn as_arg(self) -> &'static str {
+        match self {
+            WorkerRunner::Bash => "bash",
+            WorkerRunner::Python => "python",
+        }
+    }
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct LocalWorkerSpec {
+    pub runner: WorkerRunner,
+    pub script: String,
+}
+
+#[derive(Deserialize)]
+struct AuthResponse {
+    access_token: String,
+}
+
+#[derive(Serialize)]
+struct WorkerLogParam<'a> {
+    run_id: String,
+    stream: &'a str,
+    message: &'a str,
+}
+
+pub fn extract_worker_spec(passthrough: &Option) -> Result> {
+    let Some(value) = passthrough.as_ref() else {
+        return Ok(None);
+    };
+
+    let Some(worker) = value.get("worker").cloned() else {
+        return Ok(None);
+    };
+
+    Ok(Some(serde_json::from_value(worker)?))
+}
+
+pub async fn run_worker(args: WorkerArgs) -> Result<()> {
+    let started_at = Instant::now();
+    let client = Client::builder().build()?;
+    let token = authenticate(&client, &args).await?;
+
+    update_state(
+        &client,
+        &args,
+        &token,
+        State::Running,
+        Some(json!({
+            "runner": args.runner,
+            "script": args.script.display().to_string(),
+            "phase": "started",
+        })),
+    )
+    .await?;
+
+    let pyproject_used = has_pyproject(&args.script);
+    let spawned_command = rendered_command(&args, pyproject_used);
+    info!(
+        dataset_id = %args.dataset_id,
+        data_product_id = %args.data_product_id,
+        run_id = %args.run_id,
+        runner = %args.runner.as_arg(),
+        script = %args.script.display(),
+        remote = %args.remote,
+        command_line = %spawned_command,
+        "executing worker command"
+    );
+    let mut command = build_command(&args);
+    command.stdout(Stdio::piped()).stderr(Stdio::piped());
+
+    let mut child = match command.spawn() {
+        Ok(child) => child,
+        Err(err) => {
+            update_state(
+                &client,
+                &args,
+                &token,
+                State::Failed,
+                Some(json!({
+                    "runner": args.runner,
+                    "script": args.script.display().to_string(),
+                    "pyproject": pyproject_used,
+                    "reason": err.to_string(),
+                })),
+            )
+            .await?;
+            return Err(err.into());
+        }
+    };
+    let stdout = child.stdout.take().ok_or(Error::Unreachable)?;
+    let stderr = child.stderr.take().ok_or(Error::Unreachable)?;
+
+    let stdout_task = tokio::spawn(stream_output(
+        client.clone(),
+        args.clone(),
+        token.clone(),
+        BufReader::new(stdout),
+        "stdout",
+    ));
+    let stderr_task = tokio::spawn(stream_output(
+        client.clone(),
+        args.clone(),
+        token.clone(),
+        BufReader::new(stderr),
+        "stderr",
+    ));
+
+    let status = child.wait().await?;
+    let stdout_result = stdout_task.await.map_err(Error::Join)?;
+    let stderr_result = stderr_task.await.map_err(Error::Join)?;
+
+    if let Err(err) = stdout_result.as_ref().or(stderr_result.as_ref()) {
+        update_state(
+            &client,
+            &args,
+            &token,
+            State::Failed,
+            Some(json!({
+                "runner": args.runner,
+                "script": args.script.display().to_string(),
+                "pyproject": pyproject_used,
+                "duration_ms": started_at.elapsed().as_millis(),
+                "reason": err.to_string(),
+            })),
+        )
+        .await?;
+    }
+
+    stdout_result?;
+    stderr_result?;
+
+    let exit_code = status.code().unwrap_or(-1);
+    let final_state = if status.success() {
+        State::Success
+    } else {
+        State::Failed
+    };
+
+    update_state(
+        &client,
+        &args,
+        &token,
+        final_state,
+        Some(json!({
+            "runner": args.runner,
+            "script": args.script.display().to_string(),
+            "pyproject": pyproject_used,
+            "duration_ms": started_at.elapsed().as_millis(),
+            "exit_code": exit_code,
+        })),
+    )
+    .await?;
+
+    if status.success() {
+        Ok(())
+    } else {
+        Err(Error::WorkerExit(exit_code))
+    }
+}
+
+fn build_command(args: &WorkerArgs) -> TokioCommand {
+    match args.runner {
+        WorkerRunner::Bash => {
+            let mut command = TokioCommand::new("bash");
+            command.arg(&args.script);
+            command
+        }
+        WorkerRunner::Python => {
+            if has_pyproject(&args.script) {
+                let script_dir = args
+                    .script
+                    .parent()
+                    .map(Path::to_path_buf)
+                    .unwrap_or_else(|| PathBuf::from("."));
+                let script_name = args
+                    .script
+                    .file_name()
+                    .map(|name| name.to_owned())
+                    .unwrap_or_else(|| args.script.as_os_str().to_owned());
+                let mut command = TokioCommand::new("uv");
+                command.current_dir(script_dir);
+                command.args(["run", "python"]);
+                command.arg(script_name);
+                command
+            } else {
+                let mut command = TokioCommand::new("python3");
+                command.arg(&args.script);
+                command
+            }
+        }
+    }
+}
+
+fn has_pyproject(script: &Path) -> bool {
+    script
+        .parent()
+        .map(|dir| dir.join("pyproject.toml").exists())
+        .unwrap_or(false)
+}
+
+fn rendered_command(args: &WorkerArgs, pyproject_used: bool) -> String {
+    match args.runner {
+        WorkerRunner::Bash => format!("bash {}", args.script.display()),
+        WorkerRunner::Python if pyproject_used => {
+            let script_dir = args
+                .script
+                .parent()
+                .map(Path::to_path_buf)
+                .unwrap_or_else(|| PathBuf::from("."));
+            let script_name = args
+                .script
+                .file_name()
+                .map(|name| name.to_string_lossy().into_owned())
+                .unwrap_or_else(|| args.script.display().to_string());
+            format!(
+                "cd {} && uv run python {}",
+                script_dir.display(),
+                script_name
+            )
+        }
+        WorkerRunner::Python => format!("python3 {}", args.script.display()),
+    }
+}
+
+async fn stream_output(
+    client: Client,
+    args: WorkerArgs,
+    token: String,
+    reader: BufReader,
+    stream: &'static str,
+) -> Result<()>
+where
+    R: tokio::io::AsyncRead + Unpin,
+{
+    let mut lines = reader.lines();
+    while let Some(line) = lines.next_line().await? {
+        post_log(&client, &args, &token, stream, &line).await?;
+    }
+    Ok(())
+}
+
+async fn authenticate(client: &Client, args: &WorkerArgs) -> Result {
+    let key = args
+        .key
+        .clone()
+        .or_else(|| std::env::var("FLETCHER_WORKER_KEY").ok())
+        .ok_or(Error::Unreachable)?;
+
+    let response = client
+        .post(api_url(&args.remote, "authenticate"))
+        .json(&json!({
+            "service": args.service,
+            "key": key,
+        }))
+        .send()
+        .await?
+        .error_for_status()?
+        .json::()
+        .await?;
+
+    Ok(response.access_token)
+}
+
+async fn update_state(
+    client: &Client,
+    args: &WorkerArgs,
+    token: &str,
+    state: State,
+    passback: Option,
+) -> Result<()> {
+    client
+        .put(api_url(
+            &args.remote,
+            &format!("data_product/{}/update", args.dataset_id),
+        ))
+        .bearer_auth(token)
+        .json(&vec![json!({
+            "id": args.data_product_id.to_string(),
+            "state": state.to_string(),
+            "run_id": args.run_id.to_string(),
+            "link": args.script.display().to_string(),
+            "passback": passback,
+        })])
+        .send()
+        .await?
+        .error_for_status()?;
+
+    Ok(())
+}
+
+async fn post_log(
+    client: &Client,
+    args: &WorkerArgs,
+    token: &str,
+    stream: &str,
+    message: &str,
+) -> Result<()> {
+    client
+        .post(api_url(
+            &args.remote,
+            &format!(
+                "data_product/{}/{}/log",
+                args.dataset_id, args.data_product_id
+            ),
+        ))
+        .bearer_auth(token)
+        .json(&WorkerLogParam {
+            run_id: args.run_id.to_string(),
+            stream,
+            message,
+        })
+        .send()
+        .await?
+        .error_for_status()?;
+
+    Ok(())
+}
+
+fn api_url(remote: &str, path: &str) -> String {
+    format!("{}/api/{}", remote.trim_end_matches('/'), path)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde_json::json;
+
+    #[test]
+    fn test_extract_worker_spec() {
+        let passthrough = Some(json!({
+            "path": "ignored",
+            "worker": {
+                "runner": "bash",
+                "script": "./scripts/job.sh"
+            }
+        }));
+
+        let spec = extract_worker_spec(&passthrough).unwrap().unwrap();
+        assert_eq!(spec.runner, WorkerRunner::Bash);
+        assert_eq!(spec.script, "./scripts/job.sh");
+    }
+}