diff --git a/Cargo.lock b/Cargo.lock index d9bd3d8a..fb3a987d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,6 +318,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chacha20" version = "0.10.0" @@ -990,8 +996,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1001,9 +1009,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -1211,6 +1221,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots", ] [[package]] @@ -1568,7 +1579,6 @@ dependencies = [ [[package]] name = "k8s-version" version = "0.1.3" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "darling", "regex", @@ -1750,6 +1760,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -2263,6 +2279,61 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.44" @@ -2432,16 +2503,21 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-rustls", "tower", "tower-http", "tower-service", @@ -2449,6 +2525,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", ] [[package]] @@ -2525,6 +2602,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2567,6 +2650,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ + "web-time", "zeroize", ] @@ -2938,7 +3022,6 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "stackable-certs" version = "0.4.0" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "const-oid", "ecdsa", @@ -2973,6 +3056,7 @@ dependencies = [ "pin-project", "product-config", "rand 0.10.0", + "reqwest", "rstest", "semver", "serde", @@ -2989,7 +3073,6 @@ dependencies = [ [[package]] name = "stackable-operator" version = "0.106.2" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "clap", "const_format", @@ -3028,7 +3111,6 @@ dependencies = [ [[package]] name = "stackable-operator-derive" version = "0.3.1" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "darling", "proc-macro2", @@ -3039,7 +3121,6 @@ dependencies = [ [[package]] name = "stackable-shared" version = "0.1.0" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "jiff", "k8s-openapi", @@ -3056,7 +3137,6 @@ dependencies = [ [[package]] name = "stackable-telemetry" version = "0.6.1" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "axum", "clap", @@ -3080,7 +3160,6 @@ dependencies = [ [[package]] name = "stackable-versioned" version = "0.8.3" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "schemars", "serde", @@ -3093,7 +3172,6 @@ dependencies = [ [[package]] name = "stackable-versioned-macros" version = "0.8.3" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "convert_case", "convert_case_extras", @@ -3111,7 +3189,6 @@ dependencies = [ [[package]] name = "stackable-webhook" version = "0.9.0" -source = "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#1918877b52970998cdd717c57ab014970727e1cb" dependencies = [ "arc-swap", "async-trait", @@ -3304,6 +3381,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tls_codec" version = "0.4.2" @@ -3848,6 +3940,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.nix b/Cargo.nix index be86eacf..28dd4403 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -1009,6 +1009,16 @@ rec { "rustc-dep-of-std" = [ "core" ]; }; }; + "cfg_aliases" = rec { + crateName = "cfg_aliases"; + version = "0.2.1"; + edition = "2018"; + sha256 = "092pxdc1dbgjb6qvh83gk56rkic2n2ybm4yvy76cgynmzi3zwfk1"; + authors = [ + "Zicklag " + ]; + + }; "chacha20" = rec { crateName = "chacha20"; version = "0.10.0"; @@ -3064,6 +3074,12 @@ rec { name = "cfg-if"; packageId = "cfg-if"; } + { + name = "js-sys"; + packageId = "js-sys"; + optional = true; + target = { target, features }: ((("wasm32" == target."arch" or null) || ("wasm64" == target."arch" or null)) && ("unknown" == target."os" or null)); + } { name = "libc"; packageId = "libc"; @@ -3076,6 +3092,13 @@ rec { usesDefaultFeatures = false; target = { target, features }: ("wasi" == target."os" or null); } + { + name = "wasm-bindgen"; + packageId = "wasm-bindgen"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: ((("wasm32" == target."arch" or null) || ("wasm64" == target."arch" or null)) && ("unknown" == target."os" or null)); + } ]; features = { "compiler_builtins" = [ "dep:compiler_builtins" ]; @@ -3085,7 +3108,7 @@ rec { "rustc-dep-of-std" = [ "compiler_builtins" "core" "libc/rustc-dep-of-std" "wasi/rustc-dep-of-std" ]; "wasm-bindgen" = [ "dep:wasm-bindgen" ]; }; - resolvedDefaultFeatures = [ "std" ]; + resolvedDefaultFeatures = [ "js" "js-sys" "std" "wasm-bindgen" ]; }; "getrandom 0.3.4" = rec { crateName = "getrandom"; @@ -3100,6 +3123,13 @@ rec { name = "cfg-if"; packageId = "cfg-if"; } + { + name = "js-sys"; + packageId = "js-sys"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (("wasm32" == target."arch" or null) && (("unknown" == target."os" or null) || ("none" == target."os" or null)) && (builtins.elem "atomics" targetFeatures)); + } { name = "libc"; packageId = "libc"; @@ -3160,11 +3190,18 @@ rec { usesDefaultFeatures = false; target = { target, features }: (("wasm32" == target."arch" or null) && ("wasi" == target."os" or null) && ("p2" == target."env" or null)); } + { + name = "wasm-bindgen"; + packageId = "wasm-bindgen"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (("wasm32" == target."arch" or null) && (("unknown" == target."os" or null) || ("none" == target."os" or null))); + } ]; features = { "wasm_js" = [ "dep:wasm-bindgen" "dep:js-sys" ]; }; - resolvedDefaultFeatures = [ "std" ]; + resolvedDefaultFeatures = [ "std" "wasm_js" ]; }; "getrandom 0.4.1" = rec { crateName = "getrandom"; @@ -3860,6 +3897,11 @@ rec { name = "tower-service"; packageId = "tower-service"; } + { + name = "webpki-roots"; + packageId = "webpki-roots"; + optional = true; + } ]; devDependencies = [ { @@ -3896,7 +3938,7 @@ rec { "webpki-roots" = [ "dep:webpki-roots" ]; "webpki-tokio" = [ "webpki-roots" ]; }; - resolvedDefaultFeatures = [ "http1" "log" "logging" "native-tokio" "ring" "rustls-native-certs" "tls12" ]; + resolvedDefaultFeatures = [ "http1" "log" "logging" "native-tokio" "ring" "rustls-native-certs" "tls12" "webpki-roots" "webpki-tokio" ]; }; "hyper-timeout" = rec { crateName = "hyper-timeout"; @@ -5049,12 +5091,7 @@ rec { crateName = "k8s-version"; version = "0.1.3"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/k8s-version; }; libName = "k8s_version"; authors = [ "Stackable GmbH " @@ -5872,6 +5909,17 @@ rec { }; resolvedDefaultFeatures = [ "std" ]; }; + "lru-slab" = rec { + crateName = "lru-slab"; + version = "0.1.2"; + edition = "2021"; + sha256 = "0m2139k466qj3bnpk66bwivgcx3z88qkxvlzk70vd65jq373jaqi"; + libName = "lru_slab"; + authors = [ + "Benjamin Saunders " + ]; + + }; "matchers" = rec { crateName = "matchers"; version = "0.2.0"; @@ -7516,6 +7564,253 @@ rec { ]; }; + "quinn" = rec { + crateName = "quinn"; + version = "0.11.9"; + edition = "2021"; + sha256 = "086gzj666dr3slmlynkvxlndy28hahgl361d6bf93hk3i6ahmqmr"; + dependencies = [ + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "pin-project-lite"; + packageId = "pin-project-lite"; + } + { + name = "quinn-proto"; + packageId = "quinn-proto"; + rename = "proto"; + usesDefaultFeatures = false; + } + { + name = "quinn-udp"; + packageId = "quinn-udp"; + rename = "udp"; + usesDefaultFeatures = false; + features = [ "tracing" ]; + } + { + name = "rustc-hash"; + packageId = "rustc-hash"; + } + { + name = "rustls"; + packageId = "rustls"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "socket2"; + packageId = "socket2"; + target = { target, features }: (!((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null))); + } + { + name = "thiserror"; + packageId = "thiserror 2.0.18"; + } + { + name = "tokio"; + packageId = "tokio"; + features = [ "sync" ]; + } + { + name = "tracing"; + packageId = "tracing"; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "web-time"; + packageId = "web-time"; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + } + ]; + buildDependencies = [ + { + name = "cfg_aliases"; + packageId = "cfg_aliases"; + } + ]; + devDependencies = [ + { + name = "tokio"; + packageId = "tokio"; + features = [ "sync" "rt" "rt-multi-thread" "time" "macros" ]; + } + ]; + features = { + "async-io" = [ "dep:async-io" ]; + "async-std" = [ "dep:async-std" ]; + "aws-lc-rs" = [ "proto/aws-lc-rs" ]; + "aws-lc-rs-fips" = [ "proto/aws-lc-rs-fips" ]; + "bloom" = [ "proto/bloom" ]; + "default" = [ "log" "platform-verifier" "runtime-tokio" "rustls-ring" "bloom" ]; + "futures-io" = [ "dep:futures-io" ]; + "log" = [ "tracing/log" "proto/log" "udp/log" ]; + "platform-verifier" = [ "proto/platform-verifier" ]; + "qlog" = [ "proto/qlog" ]; + "ring" = [ "proto/ring" ]; + "runtime-async-std" = [ "async-io" "async-std" ]; + "runtime-smol" = [ "async-io" "smol" ]; + "runtime-tokio" = [ "tokio/time" "tokio/rt" "tokio/net" ]; + "rustls" = [ "rustls-ring" ]; + "rustls-aws-lc-rs" = [ "dep:rustls" "aws-lc-rs" "proto/rustls-aws-lc-rs" "proto/aws-lc-rs" ]; + "rustls-aws-lc-rs-fips" = [ "dep:rustls" "aws-lc-rs-fips" "proto/rustls-aws-lc-rs-fips" "proto/aws-lc-rs-fips" ]; + "rustls-log" = [ "rustls?/logging" ]; + "rustls-ring" = [ "dep:rustls" "ring" "proto/rustls-ring" "proto/ring" ]; + "smol" = [ "dep:smol" ]; + }; + resolvedDefaultFeatures = [ "ring" "runtime-tokio" "rustls" "rustls-ring" ]; + }; + "quinn-proto" = rec { + crateName = "quinn-proto"; + version = "0.11.13"; + edition = "2021"; + sha256 = "0cca3mgja9p4w66f6sl1kfhj8rdf4mwsg1jxzssh9g63n14np47i"; + libName = "quinn_proto"; + dependencies = [ + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "getrandom"; + packageId = "getrandom 0.3.4"; + usesDefaultFeatures = false; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + features = [ "wasm_js" ]; + } + { + name = "lru-slab"; + packageId = "lru-slab"; + } + { + name = "rand"; + packageId = "rand 0.9.2"; + } + { + name = "ring"; + packageId = "ring"; + optional = true; + } + { + name = "ring"; + packageId = "ring"; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + features = [ "wasm32_unknown_unknown_js" ]; + } + { + name = "rustc-hash"; + packageId = "rustc-hash"; + } + { + name = "rustls"; + packageId = "rustls"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "rustls-pki-types"; + packageId = "rustls-pki-types"; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + features = [ "web" ]; + } + { + name = "slab"; + packageId = "slab"; + } + { + name = "thiserror"; + packageId = "thiserror 2.0.18"; + } + { + name = "tinyvec"; + packageId = "tinyvec"; + features = [ "alloc" "alloc" ]; + } + { + name = "tracing"; + packageId = "tracing"; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "web-time"; + packageId = "web-time"; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + } + ]; + features = { + "arbitrary" = [ "dep:arbitrary" ]; + "aws-lc-rs" = [ "dep:aws-lc-rs" "aws-lc-rs?/aws-lc-sys" "aws-lc-rs?/prebuilt-nasm" ]; + "aws-lc-rs-fips" = [ "aws-lc-rs" "aws-lc-rs?/fips" ]; + "bloom" = [ "dep:fastbloom" ]; + "default" = [ "rustls-ring" "log" "bloom" ]; + "log" = [ "tracing/log" ]; + "platform-verifier" = [ "dep:rustls-platform-verifier" ]; + "qlog" = [ "dep:qlog" ]; + "ring" = [ "dep:ring" ]; + "rustls" = [ "rustls-ring" ]; + "rustls-aws-lc-rs" = [ "dep:rustls" "rustls?/aws-lc-rs" "aws-lc-rs" ]; + "rustls-aws-lc-rs-fips" = [ "rustls-aws-lc-rs" "aws-lc-rs-fips" ]; + "rustls-log" = [ "rustls?/logging" ]; + "rustls-ring" = [ "dep:rustls" "rustls?/ring" "ring" ]; + }; + resolvedDefaultFeatures = [ "ring" "rustls-ring" ]; + }; + "quinn-udp" = rec { + crateName = "quinn-udp"; + version = "0.5.14"; + edition = "2021"; + sha256 = "1gacawr17a2zkyri0r3m0lc9spzmxbq1by3ilyb8v2mdvjhcdpmd"; + libName = "quinn_udp"; + dependencies = [ + { + name = "libc"; + packageId = "libc"; + } + { + name = "once_cell"; + packageId = "once_cell"; + target = { target, features }: (target."windows" or false); + } + { + name = "socket2"; + packageId = "socket2"; + target = { target, features }: (!((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null))); + } + { + name = "tracing"; + packageId = "tracing"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "windows-sys"; + packageId = "windows-sys 0.52.0"; + target = { target, features }: (target."windows" or false); + features = [ "Win32_Foundation" "Win32_System_IO" "Win32_Networking_WinSock" ]; + } + ]; + buildDependencies = [ + { + name = "cfg_aliases"; + packageId = "cfg_aliases"; + } + ]; + features = { + "default" = [ "tracing" "log" ]; + "direct-log" = [ "dep:log" ]; + "log" = [ "tracing/log" ]; + "tracing" = [ "dep:tracing" ]; + }; + resolvedDefaultFeatures = [ "tracing" ]; + }; "quote" = rec { crateName = "quote"; version = "1.0.44"; @@ -8051,6 +8346,14 @@ rec { target = { target, features }: (!("wasm32" == target."arch" or null)); features = [ "http1" "client" ]; } + { + name = "hyper-rustls"; + packageId = "hyper-rustls"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (!("wasm32" == target."arch" or null)); + features = [ "http1" "tls12" ]; + } { name = "hyper-util"; packageId = "hyper-util"; @@ -8077,6 +8380,29 @@ rec { packageId = "pin-project-lite"; target = { target, features }: (!("wasm32" == target."arch" or null)); } + { + name = "quinn"; + packageId = "quinn"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (!("wasm32" == target."arch" or null)); + features = [ "rustls" "runtime-tokio" ]; + } + { + name = "rustls"; + packageId = "rustls"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (!("wasm32" == target."arch" or null)); + features = [ "std" "tls12" ]; + } + { + name = "rustls-pki-types"; + packageId = "rustls-pki-types"; + optional = true; + target = { target, features }: (!("wasm32" == target."arch" or null)); + features = [ "std" ]; + } { name = "serde"; packageId = "serde"; @@ -8107,6 +8433,14 @@ rec { target = { target, features }: (!("wasm32" == target."arch" or null)); features = [ "net" "time" ]; } + { + name = "tokio-rustls"; + packageId = "tokio-rustls"; + optional = true; + usesDefaultFeatures = false; + target = { target, features }: (!("wasm32" == target."arch" or null)); + features = [ "tls12" ]; + } { name = "tower"; packageId = "tower"; @@ -8146,6 +8480,12 @@ rec { target = { target, features }: ("wasm32" == target."arch" or null); features = [ "AbortController" "AbortSignal" "Headers" "Request" "RequestInit" "RequestMode" "Response" "Window" "FormData" "Blob" "BlobPropertyBag" "ServiceWorkerGlobalScope" "RequestCredentials" "File" "ReadableStream" "RequestCache" ]; } + { + name = "webpki-roots"; + packageId = "webpki-roots"; + optional = true; + target = { target, features }: (!("wasm32" == target."arch" or null)); + } ]; devDependencies = [ { @@ -8228,7 +8568,7 @@ rec { "system-proxy" = [ "hyper-util/client-proxy-system" ]; "zstd" = [ "tower-http/decompression-zstd" ]; }; - resolvedDefaultFeatures = [ "blocking" ]; + resolvedDefaultFeatures = [ "__rustls" "__rustls-ring" "__tls" "blocking" "json" "rustls-tls" "rustls-tls-webpki-roots" "rustls-tls-webpki-roots-no-provider" ]; }; "rfc6979" = rec { crateName = "rfc6979"; @@ -8312,7 +8652,7 @@ rec { "std" = [ "alloc" ]; "wasm32_unknown_unknown_js" = [ "getrandom/js" ]; }; - resolvedDefaultFeatures = [ "alloc" "default" "dev_urandom_fallback" ]; + resolvedDefaultFeatures = [ "alloc" "default" "dev_urandom_fallback" "wasm32_unknown_unknown_js" ]; }; "rsa" = rec { crateName = "rsa"; @@ -8520,6 +8860,21 @@ rec { }; resolvedDefaultFeatures = [ "async-timeout" "crate-name" ]; }; + "rustc-hash" = rec { + crateName = "rustc-hash"; + version = "2.1.1"; + edition = "2021"; + sha256 = "03gz5lvd9ghcwsal022cgkq67dmimcgdjghfb5yb5d352ga06xrm"; + libName = "rustc_hash"; + authors = [ + "The Rust Project Developers" + ]; + features = { + "default" = [ "std" ]; + "rand" = [ "dep:rand" "std" ]; + }; + resolvedDefaultFeatures = [ "default" "std" ]; + }; "rustc_version" = rec { crateName = "rustc_version"; version = "0.4.1"; @@ -8640,6 +8995,12 @@ rec { sha256 = "1p9zsgslvwzzkzhm6bqicffqndr4jpx67992b0vl0pi21a5hy15y"; libName = "rustls_pki_types"; dependencies = [ + { + name = "web-time"; + packageId = "web-time"; + optional = true; + target = { target, features }: ((builtins.elem "wasm" target."family") && ("unknown" == target."os" or null)); + } { name = "zeroize"; packageId = "zeroize"; @@ -8653,7 +9014,7 @@ rec { "web" = [ "web-time" ]; "web-time" = [ "dep:web-time" ]; }; - resolvedDefaultFeatures = [ "alloc" "default" "std" ]; + resolvedDefaultFeatures = [ "alloc" "default" "std" "web" "web-time" ]; }; "rustls-webpki" = rec { crateName = "rustls-webpki"; @@ -9759,12 +10120,7 @@ rec { crateName = "stackable-certs"; version = "0.4.0"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-certs; }; libName = "stackable_certs"; authors = [ "Stackable GmbH " @@ -9911,6 +10267,12 @@ rec { name = "rand"; packageId = "rand 0.10.0"; } + { + name = "reqwest"; + packageId = "reqwest"; + usesDefaultFeatures = false; + features = [ "rustls-tls" "json" ]; + } { name = "semver"; packageId = "semver"; @@ -9975,12 +10337,7 @@ rec { crateName = "stackable-operator"; version = "0.106.2"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-operator; }; libName = "stackable_operator"; authors = [ "Stackable GmbH " @@ -10148,12 +10505,7 @@ rec { crateName = "stackable-operator-derive"; version = "0.3.1"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-operator-derive; }; procMacro = true; libName = "stackable_operator_derive"; authors = [ @@ -10183,12 +10535,7 @@ rec { crateName = "stackable-shared"; version = "0.1.0"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-shared; }; libName = "stackable_shared"; authors = [ "Stackable GmbH " @@ -10264,12 +10611,7 @@ rec { crateName = "stackable-telemetry"; version = "0.6.1"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-telemetry; }; libName = "stackable_telemetry"; authors = [ "Stackable GmbH " @@ -10374,12 +10716,7 @@ rec { crateName = "stackable-versioned"; version = "0.8.3"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-versioned; }; libName = "stackable_versioned"; authors = [ "Stackable GmbH " @@ -10418,12 +10755,7 @@ rec { crateName = "stackable-versioned-macros"; version = "0.8.3"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-versioned-macros; }; procMacro = true; libName = "stackable_versioned_macros"; authors = [ @@ -10486,12 +10818,7 @@ rec { crateName = "stackable-webhook"; version = "0.9.0"; edition = "2024"; - workspace_member = null; - src = pkgs.fetchgit { - url = "https://github.com/stackabletech/operator-rs.git"; - rev = "1918877b52970998cdd717c57ab014970727e1cb"; - sha256 = "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn"; - }; + src = lib.cleanSourceWith { filter = sourceFilter; src = ../operator-rs/crates/stackable-webhook; }; libName = "stackable_webhook"; authors = [ "Stackable GmbH " @@ -11056,6 +11383,45 @@ rec { }; resolvedDefaultFeatures = [ "zerovec" ]; }; + "tinyvec" = rec { + crateName = "tinyvec"; + version = "1.10.0"; + edition = "2018"; + sha256 = "1yhk0qdqyiaa4v2j9h8pzax5gxgwpz4da0lcphfil6g6pk1zv9dz"; + authors = [ + "Lokathor " + ]; + dependencies = [ + { + name = "tinyvec_macros"; + packageId = "tinyvec_macros"; + optional = true; + } + ]; + features = { + "alloc" = [ "tinyvec_macros" ]; + "arbitrary" = [ "dep:arbitrary" ]; + "borsh" = [ "dep:borsh" ]; + "generic-array" = [ "dep:generic-array" ]; + "latest_stable_rust" = [ "rustc_1_61" ]; + "real_blackbox" = [ "criterion/real_blackbox" ]; + "rustc_1_61" = [ "rustc_1_57" ]; + "serde" = [ "dep:serde" ]; + "std" = [ "alloc" ]; + "tinyvec_macros" = [ "dep:tinyvec_macros" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" "tinyvec_macros" ]; + }; + "tinyvec_macros" = rec { + crateName = "tinyvec_macros"; + version = "0.1.1"; + edition = "2018"; + sha256 = "081gag86208sc3y6sdkshgw3vysm5d34p431dzw0bshz66ncng0z"; + authors = [ + "Soveu " + ]; + + }; "tls_codec" = rec { crateName = "tls_codec"; version = "0.4.2"; @@ -13410,6 +13776,22 @@ rec { "serde" = [ "dep:serde" ]; }; }; + "webpki-roots" = rec { + crateName = "webpki-roots"; + version = "1.0.6"; + edition = "2021"; + sha256 = "1v8brkarm4spqkjs6y5b67xixnz4zlg33d1wwxigz4rr0qyazkr2"; + libName = "webpki_roots"; + dependencies = [ + { + name = "rustls-pki-types"; + packageId = "rustls-pki-types"; + rename = "pki-types"; + usesDefaultFeatures = false; + } + ]; + + }; "windows-core" = rec { crateName = "windows-core"; version = "0.62.2"; @@ -13793,7 +14175,7 @@ rec { "Win32_Web" = [ "Win32" ]; "Win32_Web_InternetExplorer" = [ "Win32_Web" ]; }; - resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_System" "Win32_System_Threading" "default" ]; + resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_Networking" "Win32_Networking_WinSock" "Win32_System" "Win32_System_IO" "Win32_System_Threading" "default" ]; }; "windows-sys 0.60.2" = rec { crateName = "windows-sys"; diff --git a/Cargo.toml b/Cargo.toml index 7311157e..a58debf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ futures = { version = "0.3", features = ["compat"] } indoc = "2.0" pin-project = "1.1" rand = "0.10" +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } rstest = "0.26" semver = "1.0" serde = { version = "1.0", features = ["derive"] } @@ -34,6 +35,5 @@ tracing = "0.1" url = { version = "2.5.7" } xml-rs = "1.0" -# [patch."https://github.com/stackabletech/operator-rs.git"] -# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" } -# stackable-operator = { path = "../operator-rs/crates/stackable-operator" } +[patch."https://github.com/stackabletech/operator-rs.git"] +stackable-operator = { path = "../operator-rs/crates/stackable-operator" } diff --git a/crate-hashes.json b/crate-hashes.json index f2456142..223d5c49 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -4,14 +4,5 @@ "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#kube-derive@3.0.1": "1h1d3rjnfmjk2kyd4iafr84a4kyjkzzns5f36w6w60y8ylr8fqmf", "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#kube-runtime@3.0.1": "1h1d3rjnfmjk2kyd4iafr84a4kyjkzzns5f36w6w60y8ylr8fqmf", "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#kube@3.0.1": "1h1d3rjnfmjk2kyd4iafr84a4kyjkzzns5f36w6w60y8ylr8fqmf", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#k8s-version@0.1.3": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-certs@0.4.0": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-operator-derive@0.3.1": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-operator@0.106.2": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-shared@0.1.0": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-telemetry@0.6.1": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-versioned-macros@0.8.3": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-versioned@0.8.3": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", - "git+https://github.com/stackabletech/operator-rs.git?branch=kube-pre-3.1.0#stackable-webhook@0.9.0": "15pn9nc1prv0fqp19iajy6rbifgzhhpbyk4yvx489yx5gx9n87kn", "git+https://github.com/stackabletech/product-config.git?tag=0.8.0#product-config@0.8.0": "1dz70kapm2wdqcr7ndyjji0lhsl98bsq95gnb2lw487wf6yr7987" } \ No newline at end of file diff --git a/deploy/helm/nifi-operator/templates/roles.yaml b/deploy/helm/nifi-operator/templates/roles.yaml index 814e82f5..f3de6d5c 100644 --- a/deploy/helm/nifi-operator/templates/roles.yaml +++ b/deploy/helm/nifi-operator/templates/roles.yaml @@ -129,6 +129,37 @@ rules: - {{ include "operator.name" . }}clusters/status verbs: - patch + - apiGroups: + - autoscaling.stackable.tech + resources: + - stackablescalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - autoscaling.stackable.tech + resources: + - stackablescalers/status + verbs: + - get + - patch + - apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - authentication.stackable.tech resources: diff --git a/docs/modules/nifi/pages/usage_guide/operations/autoscaling.adoc b/docs/modules/nifi/pages/usage_guide/operations/autoscaling.adoc new file mode 100644 index 00000000..c56403bf --- /dev/null +++ b/docs/modules/nifi/pages/usage_guide/operations/autoscaling.adoc @@ -0,0 +1,278 @@ += Auto-scaling +:description: Configure horizontal auto-scaling for Apache NiFi clusters using a StackableScaler and a Kubernetes HorizontalPodAutoscaler. + +The Stackable Operator for Apache NiFi supports horizontal auto-scaling of NiFi node groups. +Rather than setting a fixed replica count in the `NifiCluster` resource, you can delegate replica management to a Kubernetes https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/[HorizontalPodAutoscaler] (HPA) or something like https://keda.sh/[KEDA] that adjusts the node count based on metrics such as CPU utilization. +Anything that can target a `scale` subresource for scaling operations should work just fine. +For the rest of this article we will focus on the HPA as the default Kubernetes scaling mechanism though. + +Auto-scaling is built on two resources that work together: + +* A *StackableScaler* -- a Stackable custom resource that acts as the scaling target for the HPA and coordinates the scaling lifecycle with the operator. +* A *HorizontalPodAutoscaler* -- the standard Kubernetes resource that monitors metrics and writes the desired replica count to the StackableScaler. + +== How it works + +When auto-scaling is enabled for a role group, the following interaction takes place: + +. The HPA monitors metrics (CPU, memory, or custom metrics) for the NiFi pods. +. When the HPA decides to scale, it writes a new `spec.replicas` value to the StackableScaler via its `/scale` subresource. +. The NiFi operator detects the change and drives a multi-stage state machine: ++ +[source,text] +---- +Idle -> PreScaling -> Scaling -> PostScaling -> Idle +---- ++ +* *PreScaling* -- for scale-down only: the operator contacts the NiFi REST API to gracefully decommission nodes (see <>). +* *Scaling* -- the operator updates the StatefulSet replica count and waits for all pods to become ready. +* *PostScaling* -- reserved for future use (currently a no-op for NiFi). +. The operator updates the StackableScaler status and the `NifiCluster` status conditions throughout the process. + +The operator ensures that the HPA cannot change `spec.replicas` while a scaling operation is in progress. +An admission webhook rejects such changes, causing the HPA to record `AbleToScale: False` and back off until the current operation completes. + +== Configuration + +Auto-scaling requires three things: + +. Setting `replicas: 0` on the role group to signal externally-managed replicas. +. Creating a `StackableScaler` that references the cluster, role, and role group. +. Creating a `HorizontalPodAutoscaler` that targets the `StackableScaler`. + +=== Step 1: Set replicas to 0 + +In the `NifiCluster` resource, set `replicas: 0` on the role group that should be auto-scaled. +This is the convention that tells the operator to read the replica count from a StackableScaler instead of the NifiCluster spec. + +[source,yaml] +---- +apiVersion: nifi.stackable.tech/v1alpha1 +kind: NifiCluster +metadata: + name: my-nifi +spec: + image: + productVersion: 2.7.2 + clusterConfig: + authentication: + - authenticationClass: nifi-admin-user + sensitiveProperties: + keySecret: nifi-sensitive-property-key + autoGenerate: true + nodes: + roleGroups: + default: + replicas: 0 # <1> +---- +<1> Setting `replicas: 0` activates externally-managed scaling for this role group. + +=== Step 2: Create a StackableScaler + +The `StackableScaler` resource connects the HPA to the operator. +It must be in the same namespace as the `NifiCluster` and reference the correct cluster, role, and role group. + +[source,yaml] +---- +apiVersion: autoscaling.stackable.tech/v1alpha1 +kind: StackableScaler +metadata: + name: my-nifi-nodes-default # <1> + labels: + stackable.tech/cluster-kind: NifiCluster # <2> +spec: + replicas: 3 # <3> + clusterRef: + kind: NifiCluster + name: my-nifi + role: nodes + roleGroup: default +---- +<1> Choose any name; a convention is `--`. +<2> The `stackable.tech/cluster-kind` label. See <> below. +<3> The initial replica count. The HPA will overwrite this value as it scales. + +[#cluster-kind-label] +==== The `stackable.tech/cluster-kind` label + +Every `StackableScaler` must carry the label `stackable.tech/cluster-kind` with a value matching the `spec.clusterRef.kind` field (e.g. `NifiCluster`). +This label enables the operator to efficiently discover all StackableScalers for a given cluster type using label selectors rather than listing every scaler in the namespace. + +A *mutating admission webhook* in the Stackable commons-operator automatically injects this label on every `CREATE` and `UPDATE` of a StackableScaler, so you do not strictly need to set it yourself. +However, including it explicitly in your manifests is harmless and has the advantage that the label is visible in version control and in `kubectl get` output even if the webhook has not yet processed the resource (for example, when reviewing manifests before applying them). + +NOTE: If the `spec.clusterRef.kind` value is not a valid Kubernetes label value, the admission webhook will reject the StackableScaler. + +=== Step 3: Create a HorizontalPodAutoscaler + +The HPA targets the `StackableScaler` using the `autoscaling.stackable.tech/v1alpha1` API. + +[source,yaml] +---- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: my-nifi-hpa +spec: + scaleTargetRef: + apiVersion: autoscaling.stackable.tech/v1alpha1 + kind: StackableScaler + name: my-nifi-nodes-default + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 +---- + +The HPA reads the current replica count from `status.replicas` on the StackableScaler and writes the desired count to `spec.replicas`. +Any standard HPA metric type (Resource, Pods, Object, External) can be used. + +== Checking scaling status + +=== StackableScaler status + +The primary place to check the status of a scaling operation is the StackableScaler resource: + +[source,bash] +---- +kubectl get stackablescaler my-nifi-nodes-default -o yaml +---- + +The `.status.currentState` field shows the current stage of the scaling state machine: + +[cols="1,3"] +|=== +| Stage | Description + +| `idle` +| No scaling operation in progress. + +| `preScaling` +| Running pre-scaling hooks (node decommissioning for scale-down). + +| `scaling` +| The StatefulSet replica count has been updated; waiting for all pods to become ready. + +| `postScaling` +| Running post-scaling hooks (currently a no-op for NiFi). + +| `failed` +| A scaling hook returned an error. The scaler stays in this state until <>. +|=== + +Additional status fields: + +* `status.replicas` -- the current replica count as seen by the HPA. +* `status.desiredReplicas` -- the target replica count during an in-progress scaling operation. +* `status.previousReplicas` -- the replica count before the current operation started. + +=== NifiCluster conditions + +The NiFi operator propagates scaling status into the `NifiCluster` resource's `status.conditions`. +When scaling is in progress, a `Progressing` condition is set. +When a scaling operation fails, a `Failed` condition includes the error message. + +[source,bash] +---- +kubectl get nificluster my-nifi -o jsonpath='{.status.conditions}' | jq . +---- + +=== HPA status + +The HPA's own status shows whether it can reach the scaling target and what it is currently computing: + +[source,bash] +---- +kubectl describe hpa my-nifi-hpa +---- + +If the admission webhook is rejecting HPA writes during an active scaling operation, the HPA events show `AbleToScale: False`. +This is expected behavior -- the HPA will retry once the current operation completes. + +[#scale-down-behavior] +== Scale-down behavior + +Scale-up is straightforward: the operator increases the StatefulSet replica count and NiFi nodes join the cluster automatically. + +Scale-down requires careful orchestration to avoid data loss. +During the *PreScaling* stage, the NiFi operator contacts the NiFi REST API to gracefully decommission the highest-ordinal nodes before the StatefulSet is shrunk. + +The decommission sequence depends on the NiFi version: + +=== NiFi 2.x (recommended) + +[source,text] +---- +CONNECTED -> DISCONNECTING -> DISCONNECTED -> OFFLOADING -> OFFLOADED -> DELETE +---- + +. The node is *disconnected* from the cluster, stopping new work from being assigned. +. The node's FlowFile content is *offloaded* (migrated) to remaining cluster nodes. +. The node is *deleted* from NiFi's cluster registry. + +=== NiFi 1.x + +[source,text] +---- +CONNECTED -> OFFLOADING -> OFFLOADED -> DISCONNECTING -> DISCONNECTED -> DELETE +---- + +NiFi 1.x requires offloading before disconnecting (the reverse of 2.x). + +In both cases, the operator polls the NiFi REST API and only advances once all target nodes have reached the expected status in each phase. +The StatefulSet replica count is not reduced until all target nodes have been fully removed from the NiFi cluster. + +=== Authentication requirements + +Auto-scaling with graceful decommissioning requires *SingleUser* authentication. +The operator reads the admin credentials from the Kubernetes Secret to authenticate against the NiFi REST API during scale-down. + +LDAP and OIDC authentication are not currently supported for auto-scaled role groups. +Attempting to configure a StackableScaler for a role group using LDAP or OIDC authentication will result in a reconciliation error. + +[#recovery-from-failure] +== Recovery from failure + +If a scaling hook fails (for example, the NiFi REST API is unreachable during decommissioning), the StackableScaler enters the `failed` state. +In this state: + +* The state machine stops -- no automatic retries. +* The HPA cannot write new replica counts (the admission webhook rejects changes). +* The `failed` status includes the error reason and which stage failed. + +To inspect the failure: + +[source,bash] +---- +kubectl get stackablescaler my-nifi-nodes-default -o jsonpath='{.status.currentState}' | jq . +---- + +Once you have resolved the underlying issue, reset the scaler by applying the retry annotation: + +[source,bash] +---- +kubectl annotate stackablescaler my-nifi-nodes-default \ + autoscaling.stackable.tech/retry=true +---- + +The operator will: + +. Strip the annotation. +. Reset the stage to `idle`. +. Requeue the resource so the next reconcile starts a fresh scaling attempt. + +CAUTION: Before retrying, ensure the underlying issue is resolved. +For example, if NiFi nodes are in an inconsistent state after a partial decommission, you may need to manually reconnect them using the NiFi UI or REST API before resetting the scaler. + +== Limitations + +* Only *one role group* (`nodes`) exists for NiFi, so each NiFi cluster can have at most one StackableScaler. +* Only *SingleUser* authentication is supported for auto-scaled role groups. +* The admission webhook that guards `spec.replicas` during scaling has `failurePolicy: Fail`. + If the commons-operator is unavailable (during rolling updates or crashes), the HPA cannot adjust replicas until the webhook becomes available again. +* NiFi 1.x version upgrades use a full-stop-then-restart strategy. Auto-scaling is paused during version upgrades. diff --git a/docs/modules/nifi/partials/nav.adoc b/docs/modules/nifi/partials/nav.adoc index a3114057..4ebd5404 100644 --- a/docs/modules/nifi/partials/nav.adoc +++ b/docs/modules/nifi/partials/nav.adoc @@ -21,6 +21,7 @@ *** xref:nifi:usage_guide/operations/cluster-operations.adoc[] *** xref:nifi:usage_guide/operations/pod-placement.adoc[] *** xref:nifi:usage_guide/operations/pod-disruptions.adoc[] +*** xref:nifi:usage_guide/operations/autoscaling.adoc[] *** xref:nifi:usage_guide/operations/graceful-shutdown.adoc[] * xref:nifi:troubleshooting/index.adoc[] * xref:nifi:reference/index.adoc[] diff --git a/extra/crds.yaml b/extra/crds.yaml index c1e8105f..ad99ed93 100644 --- a/extra/crds.yaml +++ b/extra/crds.yaml @@ -48,6 +48,16 @@ spec: description: This field contains OIDC-specific configuration. It is only required in case OIDC is used. nullable: true properties: + clientAuthenticationMethod: + default: client_secret_basic + description: 'The client authentication method used when communicating with the token endpoint. Defaults to `client_secret_basic`. The required contents of `clientCredentialsSecret` depend on the chosen method: secret-based methods (`client_secret_basic`, `client_secret_post`, `client_secret_jwt`) expect a client secret, while `private_key_jwt` expects a private key.' + enum: + - client_secret_basic + - client_secret_post + - client_secret_jwt + - private_key_jwt + - none + type: string clientCredentialsSecret: description: |- A reference to the OIDC client credentials secret. The secret contains diff --git a/nix/sources.json b/nix/sources.json index 10cce14f..0873614d 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -29,10 +29,10 @@ "homepage": "", "owner": "NixOS", "repo": "nixpkgs", - "rev": "26eaeac4e409d7b5a6bf6f90a2a2dc223c78d915", - "sha256": "1knl8dcr5ip70a2vbky3q844212crwrvybyw2nhfmgm1mvqry963", + "rev": "917fec990948658ef1ccd07cef2a1ef060786846", + "sha256": "1x3hmj6vbza01cl5yf9d0plnmipw3ap6y0k5rl9bl11fw7gydvva", "type": "tarball", - "url": "https://github.com/NixOS/nixpkgs/archive/26eaeac4e409d7b5a6bf6f90a2a2dc223c78d915.tar.gz", + "url": "https://github.com/NixOS/nixpkgs/archive/917fec990948658ef1ccd07cef2a1ef060786846.tar.gz", "url_template": "https://github.com///archive/.tar.gz" } } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index dd6a415c..65e368e0 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -19,6 +19,7 @@ fnv.workspace = true futures.workspace = true indoc.workspace = true pin-project.workspace = true +reqwest.workspace = true rand.workspace = true semver.workspace = true serde.workspace = true diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index e8b6402a..d11142fa 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -31,7 +31,15 @@ use stackable_operator::{ rbac::build_rbac_resources, }, constants::RESTART_CONTROLLER_ENABLED_LABEL, - crd::{authentication::oidc::v1alpha1::AuthenticationProvider, git_sync}, + crd::{ + authentication::oidc::v1alpha1::AuthenticationProvider, + git_sync, + scaler::{ + BuildScalerError, InitializeStatusError, ReplicasConfig, ScalingCondition, + build_hpa_from_user_spec, build_scaler, initialize_scaler_status, reconcile_scaler, + scale_target_ref, v1alpha1::StackableScaler, + }, + }, k8s_openapi::{ DeepMerge, api::{ @@ -49,7 +57,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::controller::Action, }, - kvp::{Labels, ObjectLabels}, + kvp::{LabelSelectorExt, Labels, ObjectLabels}, logging::controller::ReconcilerError, memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::env_vars_from_rolegroup_config, @@ -74,6 +82,8 @@ use stackable_operator::{ use strum::{EnumDiscriminants, IntoStaticStr}; use tracing::Instrument; +use stackable_operator::crd::scaler::ReconcilerError as ScalerReconcilerError; + use crate::{ OPERATOR_NAME, config::{ @@ -96,6 +106,7 @@ use crate::{ operations::{ graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs, + scaling::NifiScalingHooks, upgrade::{self, ClusterVersionUpdateState}, }, product_logging::extend_role_group_config_map, @@ -351,6 +362,86 @@ pub enum Error { ResolveProductImage { source: product_image_selection::Error, }, + + /// Failed to build a [`StackableScaler`] object for a role group. + #[snafu(display("failed to build StackableScaler for rolegroup {rolegroup}"))] + BuildScaler { + source: BuildScalerError, + rolegroup: RoleGroupRef, + }, + + /// Failed to initialize [`StackableScaler`] status for a freshly created scaler. + #[snafu(display("failed to initialize StackableScaler status for rolegroup {rolegroup}"))] + InitializeScalerStatus { + source: InitializeStatusError, + rolegroup: RoleGroupRef, + }, + + /// The [`ReplicasConfig::Auto`] variant is not yet supported. + #[snafu(display("Auto scaling is not yet implemented for rolegroup {rolegroup}"))] + AutoScalingNotYetImplemented { + rolegroup: RoleGroupRef, + }, + + /// Failed to build a `HorizontalPodAutoscaler` for a role group. + #[snafu(display("failed to build HorizontalPodAutoscaler for rolegroup {rolegroup}"))] + BuildHpa { + source: BuildScalerError, + rolegroup: RoleGroupRef, + }, + + /// Failed to read an existing [`StackableScaler`] for a role group. + #[snafu(display("failed to read existing StackableScaler for rolegroup {rolegroup}"))] + GetExistingScaler { + source: stackable_operator::client::Error, + rolegroup: RoleGroupRef, + }, + + /// Failed to apply a [`StackableScaler`] for a role group. + #[snafu(display("failed to apply StackableScaler for rolegroup {rolegroup}"))] + ApplyScaler { + source: stackable_operator::cluster_resources::Error, + rolegroup: RoleGroupRef, + }, + + /// Failed to apply a `HorizontalPodAutoscaler` for a role group. + #[snafu(display("failed to apply HorizontalPodAutoscaler for rolegroup {rolegroup}"))] + ApplyHpa { + source: stackable_operator::cluster_resources::Error, + rolegroup: RoleGroupRef, + }, + + /// The [`StackableScaler`] reconciliation (state machine + hooks) returned an error. + /// + /// The scaler may have transitioned to `Failed` state; check the scaler's status + /// for details. + #[snafu(display("StackableScaler reconciliation failed for rolegroup {rolegroup}"))] + ScalerReconcile { + source: ScalerReconcilerError, + rolegroup: RoleGroupRef, + }, + + /// Failed to convert the role group's label selector into a query string + /// for the [`StackableScaler`] status `.selector` field. + #[snafu(display("failed to build label selector string for rolegroup {rolegroup}"))] + BuildSelectorString { + source: stackable_operator::kvp::SelectorError, + rolegroup: RoleGroupRef, + }, + + /// A [`StackableScaler`] is configured for a role group whose authentication method + /// does not support scaling hooks. + /// + /// Scaling hooks require SingleUser authentication to call the NiFi REST API for + /// node offload and disconnect. Other authentication methods are not yet supported. + #[snafu(display( + "StackableScaler for rolegroup {rolegroup} requires SingleUser authentication, \ + but cluster uses {auth_method}" + ))] + UnsupportedScalerAuthentication { + rolegroup: RoleGroupRef, + auth_method: String, + }, } type Result = std::result::Result; @@ -480,11 +571,12 @@ pub async fn reconcile_nifi( .context(ApplyRoleBindingSnafu)?; let mut ss_cond_builder = StatefulSetConditionBuilder::default(); + let mut scaler_action: Option = None; let nifi_role = NifiRole::Node; for (rolegroup_name, rolegroup_config) in nifi_node_config.iter() { let rg_span = tracing::info_span!("rolegroup_span", rolegroup = rolegroup_name.as_str()); - async { + let rg_action: Option = async { let rolegroup = nifi.node_rolegroup_ref(rolegroup_name); tracing::debug!("Processing rolegroup {}", rolegroup); @@ -546,12 +638,210 @@ pub async fn reconcile_nifi( .await?; let role_group = role.role_groups.get(&rolegroup.role_group); - let replicas = - if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { - Some(0) - } else { - role_group.and_then(|rg| rg.replicas).map(i32::from) - }; + let replicas_config = role_group + .and_then(|rg| rg.replicas.clone()) + .unwrap_or_default(); + + tracing::debug!( + rolegroup = %rolegroup, + replicas_config = ?replicas_config, + "Processing role group scaling configuration" + ); + + let namespace = nifi + .metadata + .namespace + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?; + + // Build the label selector for the scaler and HPA. + let selector_string = { + use stackable_operator::k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; + LabelSelector { + match_expressions: None, + match_labels: Some( + Labels::role_group_selector( + nifi, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + } + .to_query_string() + .context(BuildSelectorStringSnafu { + rolegroup: rolegroup.clone(), + })? + }; + + // Build an owner reference pointing to the NifiCluster. + let owner_ref = nifi + .controller_owner_ref(&()) + .context(ObjectHasNoNamespaceSnafu)?; + + // For scaler-managed variants (HPA, ExternallyScaled), read the existing + // StackableScaler's spec.replicas so we don't overwrite an externally-set value. + // On first creation (no existing scaler), default to 1. + let existing_scaler_replicas = if matches!( + replicas_config, + ReplicasConfig::Hpa(_) | ReplicasConfig::ExternallyScaled + ) { + let scaler_name = format!( + "{}-{}-{}-scaler", + nifi.name_any(), + rolegroup.role, + rolegroup.role_group, + ); + client + .get_opt::(&scaler_name, namespace) + .await + .context(GetExistingScalerSnafu { + rolegroup: rolegroup.clone(), + })? + .map(|s| s.spec.replicas) + .unwrap_or(1) + } else { + 1 + }; + + // Determine effective replicas and manage scaler/HPA based on ReplicasConfig variant. + let (replicas, scaler_to_reconcile) = match &replicas_config { + ReplicasConfig::Fixed(n) => { + // Static replica count — no scaler, no HPA. + let replicas = if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { + Some(0) + } else { + Some(i32::from(*n)) + }; + (replicas, None) + } + ReplicasConfig::Hpa(hpa_config) => { + // User-provided HPA spec — create scaler and HPA. + let scaler = build_scaler( + &nifi.name_any(), + APP_NAME, + namespace, + &rolegroup.role, + &rolegroup.role_group, + existing_scaler_replicas, + &owner_ref, + OPERATOR_NAME, + NIFI_CONTROLLER_NAME, + ) + .context(BuildScalerSnafu { + rolegroup: rolegroup.clone(), + })?; + + let applied_scaler = cluster_resources + .add(client, scaler) + .await + .with_context(|_| ApplyScalerSnafu { + rolegroup: rolegroup.clone(), + })?; + + // Initialize status on freshly created scalers to prevent scale-to-zero. + if applied_scaler.status.is_none() { + initialize_scaler_status( + client, + &applied_scaler, + 1, + &selector_string, + ) + .await + .context(InitializeScalerStatusSnafu { + rolegroup: rolegroup.clone(), + })?; + } + + // Build and apply the HPA. + let scaler_name = applied_scaler.metadata.name.clone().unwrap_or_default(); + let target_ref = scale_target_ref( + &scaler_name, + "autoscaling.stackable.tech", + "v1alpha1", + ); + let hpa = build_hpa_from_user_spec( + hpa_config.as_ref(), + &target_ref, + &nifi.name_any(), + APP_NAME, + namespace, + &rolegroup.role, + &rolegroup.role_group, + &owner_ref, + OPERATOR_NAME, + NIFI_CONTROLLER_NAME, + ) + .context(BuildHpaSnafu { + rolegroup: rolegroup.clone(), + })?; + cluster_resources + .add(client, hpa) + .await + .with_context(|_| ApplyHpaSnafu { + rolegroup: rolegroup.clone(), + })?; + + let replicas = if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { + Some(0) + } else { + applied_scaler.status.as_ref().map(|st| st.replicas) + }; + (replicas, Some(applied_scaler)) + } + ReplicasConfig::Auto(_) => { + return AutoScalingNotYetImplementedSnafu { + rolegroup: rolegroup.clone(), + } + .fail(); + } + ReplicasConfig::ExternallyScaled => { + // External scaler (KEDA, etc.) — create scaler but no HPA. + let scaler = build_scaler( + &nifi.name_any(), + APP_NAME, + namespace, + &rolegroup.role, + &rolegroup.role_group, + existing_scaler_replicas, + &owner_ref, + OPERATOR_NAME, + NIFI_CONTROLLER_NAME, + ) + .context(BuildScalerSnafu { + rolegroup: rolegroup.clone(), + })?; + + let applied_scaler = cluster_resources + .add(client, scaler) + .await + .with_context(|_| ApplyScalerSnafu { + rolegroup: rolegroup.clone(), + })?; + + if applied_scaler.status.is_none() { + initialize_scaler_status( + client, + &applied_scaler, + 1, + &selector_string, + ) + .await + .context(InitializeScalerStatusSnafu { + rolegroup: rolegroup.clone(), + })?; + } + + let replicas = if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { + Some(0) + } else { + applied_scaler.status.as_ref().map(|st| st.replicas) + }; + (replicas, Some(applied_scaler)) + } + }; let rg_statefulset = build_node_rolegroup_statefulset( nifi, @@ -603,19 +893,108 @@ pub async fn reconcile_nifi( // Note: The StatefulSet needs to be applied after all ConfigMaps and Secrets it mounts // to prevent unnecessary Pod restarts. // See https://github.com/stackabletech/commons-operator/issues/111 for details. - ss_cond_builder.add( - cluster_resources - .add(client, rg_statefulset) - .await - .with_context(|_| ApplyRoleGroupStatefulSetSnafu { - rolegroup: rolegroup.clone(), - })?, - ); + let applied_sts = cluster_resources + .add(client, rg_statefulset) + .await + .with_context(|_| ApplyRoleGroupStatefulSetSnafu { + rolegroup: rolegroup.clone(), + })?; + + // Run the scaler state machine if a scaler exists for this role group. + let scaler_requeue = if let Some(ref s) = scaler_to_reconcile { + let sts_ready = applied_sts + .status + .as_ref() + .and_then(|st| st.ready_replicas) + .unwrap_or(0); + let sts_desired = applied_sts.spec.as_ref().and_then(|sp| sp.replicas).unwrap_or(0); + let scaler_target = s + .status + .as_ref() + .and_then(|st| st.desired_replicas); + let statefulset_stable = + sts_ready == sts_desired && (sts_desired > 0 || scaler_target == Some(0)); + + let credentials_secret_name = match &authentication_config { + NifiAuthenticationConfig::SingleUser { provider } => { + provider.user_credentials_secret.name.clone() + } + NifiAuthenticationConfig::Ldap { .. } => { + return UnsupportedScalerAuthenticationSnafu { + rolegroup: rolegroup.clone(), + auth_method: "LDAP", + } + .fail(); + } + NifiAuthenticationConfig::Oidc { .. } => { + return UnsupportedScalerAuthenticationSnafu { + rolegroup: rolegroup.clone(), + auth_method: "OIDC", + } + .fail(); + } + }; + + let scaling_result = reconcile_scaler( + s, + &NifiScalingHooks { + namespace: namespace.to_owned(), + credentials_secret_name, + statefulset_name: rolegroup.object_name(), + headless_service_name: rolegroup + .rolegroup_headless_service_name(), + cluster_domain: client + .kubernetes_cluster_info + .cluster_domain + .to_string(), + product_version: resolved_product_image + .product_version + .to_string(), + }, + client, + statefulset_stable, + &selector_string, + &rolegroup.role_group, + ) + .await + .context(ScalerReconcileSnafu { + rolegroup: rolegroup.clone(), + })?; - Ok(()) + match &scaling_result.scaling_condition { + ScalingCondition::Failed { reason, .. } => { + tracing::warn!( + rolegroup = %rolegroup, + reason = %reason, + "StackableScaler entered Failed state" + ); + } + ScalingCondition::Progressing { stage } => { + tracing::info!( + rolegroup = %rolegroup, + stage = %stage, + "StackableScaler scaling in progress" + ); + } + ScalingCondition::Healthy => {} + } + + Some(scaling_result.action) + } else { + None + }; + + ss_cond_builder.add(applied_sts); + + Ok(scaler_requeue) } .instrument(rg_span) - .await? + .await?; + if let Some(action) = rg_action { + if scaler_action.is_none() { + scaler_action = Some(action); + } + } } let role_config = nifi.role_config(&nifi_role); @@ -709,7 +1088,7 @@ pub async fn reconcile_nifi( .await .context(StatusUpdateSnafu)?; - Ok(Action::await_change()) + Ok(scaler_action.unwrap_or_else(Action::await_change)) } /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator @@ -892,7 +1271,13 @@ async fn build_node_rolegroup_statefulset( env_vars.push(EnvVar { name: "STACKLET_NAME".to_string(), - value: Some(nifi.name_unchecked().to_string()), + value: Some( + nifi.metadata + .name + .as_deref() + .context(ObjectHasNoNameSnafu)? + .to_string(), + ), ..Default::default() }); diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 46c79f62..f3ba17a9 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -7,6 +7,7 @@ use anyhow::anyhow; use clap::Parser; use crd::v1alpha1::NifiClusteringBackend; use futures::{FutureExt, StreamExt, TryFutureExt}; +use stackable_operator::crd::scaler::v1alpha1::StackableScaler; use stackable_operator::{ YamlSchema, cli::{Command, RunArguments}, @@ -14,6 +15,7 @@ use stackable_operator::{ eos::EndOfSupportChecker, k8s_openapi::api::{ apps::v1::StatefulSet, + autoscaling::v2::HorizontalPodAutoscaler, core::v1::{ConfigMap, Service}, }, kube::{ @@ -175,6 +177,14 @@ async fn main() -> anyhow::Result<()> { .map(|nifi| ObjectRef::from_obj(&*nifi)) }, ) + .owns( + watch_namespace.get_api::(&client), + watcher::Config::default(), + ) + .owns( + watch_namespace.get_api::(&client), + watcher::Config::default(), + ) .graceful_shutdown_on(sigterm_watcher.handle()) .run( controller::reconcile_nifi, diff --git a/rust/operator-binary/src/operations/credentials.rs b/rust/operator-binary/src/operations/credentials.rs new file mode 100644 index 00000000..5fdfb67b --- /dev/null +++ b/rust/operator-binary/src/operations/credentials.rs @@ -0,0 +1,93 @@ +//! Credential resolution for NiFi REST API authentication. +//! +//! Reads the admin username/password from the Kubernetes Secret referenced +//! by the StaticProvider AuthenticationClass. + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{client::Client, k8s_openapi::api::core::v1::Secret}; +use tracing::debug; + +use crate::security::authentication::STACKABLE_ADMIN_USERNAME; + +/// Errors from NiFi credential resolution. +#[derive(Debug, Snafu)] +pub enum Error { + /// Failed to fetch the credentials Secret from the Kubernetes API. + #[snafu(display( + "failed to get credentials Secret '{secret_name}' in namespace '{namespace}'" + ))] + GetSecret { + source: stackable_operator::client::Error, + secret_name: String, + namespace: String, + }, + + /// The credentials Secret does not contain the expected key. + #[snafu(display("credentials Secret '{secret_name}' is missing the '{key}' key"))] + MissingSecretKey { secret_name: String, key: String }, + + /// The password value in the Secret is not valid UTF-8. + #[snafu(display("credentials Secret '{secret_name}' key '{key}' contains invalid UTF-8"))] + InvalidUtf8 { + source: std::string::FromUtf8Error, + secret_name: String, + key: String, + }, +} + +/// Resolved admin credentials for NiFi REST API authentication. +pub struct NifiCredentials { + /// The admin username (always [`STACKABLE_ADMIN_USERNAME`]). + pub username: String, + /// The plaintext admin password, with leading/trailing whitespace trimmed. + pub password: String, +} + +/// Read the admin password from the Kubernetes Secret referenced by the +/// StaticProvider AuthenticationClass. +/// +/// The Secret format is: key = username, value = plaintext password. +/// +/// # Parameters +/// +/// - `client`: Kubernetes client with permissions to read Secrets. +/// - `secret_name`: Name of the Kubernetes Secret containing the credentials. +/// - `namespace`: Namespace of the Secret (same as the NiFi cluster). +pub async fn resolve_single_user_credentials( + client: &Client, + secret_name: &str, + namespace: &str, +) -> Result { + let secret: Secret = + client + .get::(secret_name, namespace) + .await + .context(GetSecretSnafu { + secret_name: secret_name.to_string(), + namespace: namespace.to_string(), + })?; + + let data = secret.data.unwrap_or_default(); + + let password_bytes = data + .get(STACKABLE_ADMIN_USERNAME) + .context(MissingSecretKeySnafu { + secret_name: secret_name.to_string(), + key: STACKABLE_ADMIN_USERNAME.to_string(), + })?; + + let password = String::from_utf8(password_bytes.0.clone()).context(InvalidUtf8Snafu { + secret_name: secret_name.to_string(), + key: STACKABLE_ADMIN_USERNAME.to_string(), + })?; + + debug!( + secret_name, + namespace, "Successfully resolved NiFi admin credentials" + ); + + Ok(NifiCredentials { + username: STACKABLE_ADMIN_USERNAME.to_string(), + password: password.trim().to_string(), + }) +} diff --git a/rust/operator-binary/src/operations/mod.rs b/rust/operator-binary/src/operations/mod.rs index c62006d9..01aa7497 100644 --- a/rust/operator-binary/src/operations/mod.rs +++ b/rust/operator-binary/src/operations/mod.rs @@ -1,3 +1,12 @@ +//! Product-specific operations for the NiFi operator. +//! +//! - [`credentials`]: Resolves NiFi admin credentials from Kubernetes Secrets. +//! - [`nifi_api`]: REST API client for NiFi cluster management. +//! - [`scaling`]: Scaling hooks for NiFi node decommissioning. + +pub mod credentials; pub mod graceful_shutdown; +pub mod nifi_api; pub mod pdb; +pub mod scaling; pub mod upgrade; diff --git a/rust/operator-binary/src/operations/nifi_api.rs b/rust/operator-binary/src/operations/nifi_api.rs new file mode 100644 index 00000000..b4aeb0c8 --- /dev/null +++ b/rust/operator-binary/src/operations/nifi_api.rs @@ -0,0 +1,400 @@ +//! NiFi REST API client for cluster management operations. +//! +//! Provides a thin wrapper around the NiFi REST API endpoints +//! needed for scaling operations (node offload, disconnect, delete). + +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use tracing::{debug, warn}; + +/// Errors from NiFi REST API operations. +#[derive(Debug, Snafu)] +pub enum Error { + /// Failed to construct the reqwest HTTP client. + #[snafu(display("failed to build HTTP client"))] + BuildHttpClient { source: reqwest::Error }, + + /// The authentication HTTP request to NiFi failed at the transport level. + #[snafu(display("failed to authenticate with NiFi at {url}"))] + Authenticate { source: reqwest::Error, url: String }, + + /// NiFi returned a non-success HTTP status during authentication. + #[snafu(display("NiFi authentication returned non-success status {status} at {url}: {body}"))] + AuthenticateStatus { + status: u16, + url: String, + body: String, + }, + + /// The cluster nodes query HTTP request failed at the transport level. + #[snafu(display("failed to query NiFi cluster nodes at {url}"))] + GetClusterNodes { source: reqwest::Error, url: String }, + + /// NiFi returned a non-success HTTP status for the cluster nodes query. + #[snafu(display("NiFi cluster node query returned non-success status {status}: {body}"))] + GetClusterNodesStatus { status: u16, body: String }, + + /// A NiFi node reported a status string not recognized by [`NifiNodeStatus`]. + #[snafu(display("NiFi node with address '{address}' has unexpected status '{raw_status}'"))] + UnexpectedNodeStatus { address: String, raw_status: String }, + + /// The node status update HTTP request failed at the transport level. + #[snafu(display("failed to update NiFi node {node_id} status to {target_status}"))] + UpdateNodeStatus { + source: reqwest::Error, + node_id: String, + target_status: String, + }, + + /// NiFi returned a non-success HTTP status for the node status update. + #[snafu(display( + "NiFi node {node_id} status update to {target_status} returned non-success status {status}: {body}" + ))] + UpdateNodeStatusHttp { + status: u16, + node_id: String, + target_status: String, + body: String, + }, + + /// The node deletion HTTP request failed at the transport level. + #[snafu(display("failed to delete NiFi node {node_id}"))] + DeleteNode { + source: reqwest::Error, + node_id: String, + }, + + /// NiFi returned a non-success HTTP status for the node deletion. + #[snafu(display("NiFi node {node_id} deletion returned non-success status {status}: {body}"))] + DeleteNodeStatus { + status: u16, + node_id: String, + body: String, + }, +} + +/// NiFi node status values used in the REST API. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NifiNodeStatus { + /// Node is connected and participating in the cluster. + Connected, + /// Node is in the process of joining the cluster. + Connecting, + /// Node is in the process of leaving the cluster. + Disconnecting, + /// Node has left the cluster but still exists in the cluster registry. + Disconnected, + /// Node is migrating its data (flowfiles) to other nodes. + Offloading, + /// Node has completed data migration and holds no flowfiles. + Offloaded, +} + +impl NifiNodeStatus { + /// Parse a NiFi REST API status string (e.g. `"CONNECTED"`) into the typed enum. + /// + /// Returns `None` for unrecognized status strings. + pub fn from_api_str(s: &str) -> Option { + match s { + "CONNECTED" => Some(Self::Connected), + "CONNECTING" => Some(Self::Connecting), + "DISCONNECTING" => Some(Self::Disconnecting), + "DISCONNECTED" => Some(Self::Disconnected), + "OFFLOADING" => Some(Self::Offloading), + "OFFLOADED" => Some(Self::Offloaded), + _ => None, + } + } + + /// Return the uppercase NiFi REST API representation (e.g. `"CONNECTED"`). + pub fn as_api_str(&self) -> &'static str { + match self { + Self::Connected => "CONNECTED", + Self::Connecting => "CONNECTING", + Self::Disconnecting => "DISCONNECTING", + Self::Disconnected => "DISCONNECTED", + Self::Offloading => "OFFLOADING", + Self::Offloaded => "OFFLOADED", + } + } +} + +/// A NiFi cluster node as returned by the cluster API. +#[derive(Debug, Clone)] +pub struct NifiNode { + /// Opaque node identifier assigned by NiFi (UUID format). + pub node_id: String, + /// The node's address as known to the NiFi cluster, typically a pod FQDN. + pub address: String, + /// The node's current cluster membership status. + pub status: NifiNodeStatus, +} + +// ── NiFi REST API JSON structures ─────────────────────────────────────── + +/// Top-level JSON response from `/controller/cluster`. +#[derive(Debug, Deserialize)] +struct ClusterResponse { + cluster: ClusterBody, +} + +/// The `cluster` object within a [`ClusterResponse`]. +#[derive(Debug, Deserialize)] +struct ClusterBody { + nodes: Vec, +} + +/// A single node entry in the NiFi cluster API response. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct NodeResponse { + node_id: String, + address: String, + status: String, +} + +/// Request body for updating a node's cluster status. +#[derive(Debug, Serialize)] +struct UpdateNodeStatusRequest { + node: UpdateNodeBody, +} + +/// The `node` object within an [`UpdateNodeStatusRequest`]. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct UpdateNodeBody { + node_id: String, + status: String, +} + +/// Extract the HTTP status code and response body text from an error response. +/// +/// Consumes the response. If the body cannot be read, a fallback string is returned. +async fn read_error_body(resp: reqwest::Response) -> (u16, String) { + let status = resp.status().as_u16(); + let body = resp + .text() + .await + .unwrap_or_else(|e| format!("")); + (status, body) +} + +/// Client for NiFi REST API cluster management operations. +pub struct NifiApiClient { + /// Shared HTTP client with connection pooling. TLS verification is disabled. + http: reqwest::Client, + /// NiFi REST API base URL (e.g. `https://pod-0.svc:8443/nifi-api`). + base_url: String, + /// Bearer token obtained during [`connect`](Self::connect). + bearer_token: String, +} + +impl NifiApiClient { + /// Build an HTTP client (skipping TLS verification) and authenticate + /// using SingleUser credentials. + /// + /// `base_url` should be e.g. `https://:8443/nifi-api`. + /// + /// # Parameters + /// + /// - `base_url`: NiFi REST API base URL, e.g. `"https://pod-0.svc:8443/nifi-api"`. + /// - `username`: SingleUser authentication username (plaintext). + /// - `password`: SingleUser authentication password (plaintext). + /// + /// # Security + /// + /// TLS certificate verification is intentionally disabled because NiFi pods use + /// self-signed certificates generated by the Stackable secret operator. + pub async fn connect(base_url: String, username: &str, password: &str) -> Result { + // TODO(#1): This bypasses all TLS verification, including the mTLS enforced by the + // rest of the NiFi operator via keystores/truststores. Should use the CA cert from + // the Stackable secret operator to verify the server certificate instead. + let http = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .build() + .context(BuildHttpClientSnafu)?; + + let token_url = format!("{base_url}/access/token"); + debug!(url = %token_url, "Authenticating with NiFi"); + let resp = http + .post(&token_url) + .form(&[("username", username), ("password", password)]) + .send() + .await + .context(AuthenticateSnafu { + url: token_url.clone(), + })?; + + if !resp.status().is_success() { + let (status, body) = read_error_body(resp).await; + warn!(status, body = %body, url = %token_url, "NiFi authentication failed"); + return AuthenticateStatusSnafu { + status, + url: token_url, + body, + } + .fail(); + } + + let bearer_token = resp + .text() + .await + .context(AuthenticateSnafu { url: token_url })?; + debug!("NiFi authentication successful"); + + Ok(Self { + http, + base_url, + bearer_token, + }) + } + + /// Retrieve all nodes in the NiFi cluster. + /// + /// Returns all nodes known to the NiFi cluster. Nodes with unrecognized status + /// strings cause an [`Error::UnexpectedNodeStatus`] error. + pub async fn get_cluster_nodes(&self) -> Result, Error> { + let url = format!("{}/controller/cluster", self.base_url); + debug!(url = %url, "Querying NiFi cluster nodes"); + let resp = self + .http + .get(&url) + .bearer_auth(&self.bearer_token) + .send() + .await + .context(GetClusterNodesSnafu { url: url.clone() })?; + + if !resp.status().is_success() { + let (status, body) = read_error_body(resp).await; + warn!(status, body = %body, "NiFi cluster node query failed"); + return GetClusterNodesStatusSnafu { status, body }.fail(); + } + + let body: ClusterResponse = resp.json().await.context(GetClusterNodesSnafu { url })?; + + let nodes = body + .cluster + .nodes + .into_iter() + .map(|n| { + let status = NifiNodeStatus::from_api_str(&n.status).ok_or_else(|| { + Error::UnexpectedNodeStatus { + address: n.address.clone(), + raw_status: n.status, + } + })?; + Ok(NifiNode { + node_id: n.node_id, + address: n.address, + status, + }) + }) + .collect::, _>>()?; + + debug!( + node_count = nodes.len(), + nodes = ?nodes.iter().map(|n| format!("{}({})={}", n.address, n.node_id, n.status.as_api_str())).collect::>(), + "NiFi cluster nodes retrieved" + ); + + Ok(nodes) + } + + /// Update the status of a node (e.g. OFFLOADING, DISCONNECTING). + /// + /// # Parameters + /// + /// - `node_id`: The NiFi node identifier (UUID) from [`NifiNode::node_id`]. + /// - `status`: The target status to request (e.g. [`NifiNodeStatus::Offloading`]). + /// NiFi may reject invalid transitions with a non-success HTTP status. + pub async fn set_node_status( + &self, + node_id: &str, + status: NifiNodeStatus, + ) -> Result<(), Error> { + let url = format!("{}/controller/cluster/nodes/{node_id}", self.base_url); + let body = UpdateNodeStatusRequest { + node: UpdateNodeBody { + node_id: node_id.to_string(), + status: status.as_api_str().to_string(), + }, + }; + debug!( + url = %url, + node_id = %node_id, + target_status = %status.as_api_str(), + request_body = ?body, + "Updating NiFi node status" + ); + let resp = self + .http + .put(&url) + .bearer_auth(&self.bearer_token) + .json(&body) + .send() + .await + .context(UpdateNodeStatusSnafu { + node_id: node_id.to_string(), + target_status: status.as_api_str().to_string(), + })?; + + if !resp.status().is_success() { + let (http_status, resp_body) = read_error_body(resp).await; + warn!( + http_status, + node_id = %node_id, + target_status = %status.as_api_str(), + response_body = %resp_body, + "NiFi node status update failed" + ); + return UpdateNodeStatusHttpSnafu { + status: http_status, + node_id: node_id.to_string(), + target_status: status.as_api_str().to_string(), + body: resp_body, + } + .fail(); + } + + debug!(node_id = %node_id, target_status = %status.as_api_str(), "NiFi node status updated successfully"); + Ok(()) + } + + /// Delete a node from the NiFi cluster. + /// + /// # Parameters + /// + /// - `node_id`: The NiFi node identifier (UUID) to remove from the cluster. + pub async fn delete_node(&self, node_id: &str) -> Result<(), Error> { + let url = format!("{}/controller/cluster/nodes/{node_id}", self.base_url); + debug!(url = %url, node_id = %node_id, "Deleting NiFi node from cluster"); + let resp = self + .http + .delete(&url) + .bearer_auth(&self.bearer_token) + .send() + .await + .context(DeleteNodeSnafu { + node_id: node_id.to_string(), + })?; + + if !resp.status().is_success() { + let (http_status, resp_body) = read_error_body(resp).await; + warn!( + http_status, + node_id = %node_id, + response_body = %resp_body, + "NiFi node deletion failed" + ); + return DeleteNodeStatusSnafu { + status: http_status, + node_id: node_id.to_string(), + body: resp_body, + } + .fail(); + } + + debug!(node_id = %node_id, "NiFi node deleted successfully"); + Ok(()) + } +} diff --git a/rust/operator-binary/src/operations/scaling.rs b/rust/operator-binary/src/operations/scaling.rs new file mode 100644 index 00000000..3c150e6f --- /dev/null +++ b/rust/operator-binary/src/operations/scaling.rs @@ -0,0 +1,476 @@ +//! NiFi scaling hooks — drives node offload/disconnect/delete via the NiFi REST API. +//! +//! The scale-down sequence differs between NiFi versions: +//! - NiFi 1.x: CONNECTED → OFFLOADING → OFFLOADED → DISCONNECTING → DISCONNECTED → DELETE +//! - NiFi 2.x: CONNECTED → DISCONNECTING → DISCONNECTED → OFFLOADING → OFFLOADED → DELETE + +use snafu::{ResultExt, Snafu}; +use stackable_operator::crd::scaler::{HookOutcome, ScalingContext, ScalingHooks}; +use tracing::{info, warn}; + +use super::credentials::{self, NifiCredentials}; +use super::nifi_api::{self, NifiApiClient, NifiNode, NifiNodeStatus}; +use crate::crd::HTTPS_PORT; + +/// Errors from NiFi scaling hook operations. +#[derive(Debug, Snafu)] +pub enum Error { + /// Failed to read NiFi admin credentials from the Kubernetes Secret. + #[snafu(display("failed to resolve NiFi admin credentials"))] + ResolveCredentials { source: credentials::Error }, + + /// Failed to connect and authenticate to the NiFi REST API. + #[snafu(display("failed to connect to NiFi REST API"))] + NifiApiConnect { source: nifi_api::Error }, + + /// Failed to query the NiFi cluster node list. + #[snafu(display("failed to query NiFi cluster nodes"))] + GetClusterNodes { source: nifi_api::Error }, + + /// Failed to initiate offload for a NiFi node. + #[snafu(display("failed to offload NiFi node {node_id}"))] + OffloadNode { + source: nifi_api::Error, + node_id: String, + }, + + /// Failed to initiate disconnect for a NiFi node. + #[snafu(display("failed to disconnect NiFi node {node_id}"))] + DisconnectNode { + source: nifi_api::Error, + node_id: String, + }, + + /// Failed to delete a NiFi node from the cluster. + #[snafu(display("failed to delete NiFi node {node_id}"))] + DeleteNode { + source: nifi_api::Error, + node_id: String, + }, + + /// A node was in an unexpected status for the current scale-down phase. + #[snafu(display( + "NiFi node '{address}' is in unexpected status {status:?} during scale-down phase" + ))] + UnexpectedPhaseStatus { + address: String, + status: NifiNodeStatus, + }, +} + +/// Implements pre/post-scale hooks for NiFi clusters. +/// +/// On scale-down, `pre_scale` drives the NiFi REST API to offload, disconnect, +/// and delete the highest-ordinal nodes before the StatefulSet replica count +/// is reduced. +pub struct NifiScalingHooks { + /// Namespace of the cluster. + pub namespace: String, + /// Name of the Kubernetes Secret containing SingleUser credentials. + pub credentials_secret_name: String, + /// The StatefulSet name for the role group (e.g., "simple-nifi-node-default"). + pub statefulset_name: String, + /// The headless service name for the role group + /// (e.g., "simple-nifi-node-default-headless"). + pub headless_service_name: String, + /// Kubernetes cluster domain (e.g., "cluster.local"). + pub cluster_domain: String, + /// NiFi product version (e.g., "1.28.0" or "2.6.0"). + /// Determines the scale-down sequence since NiFi 2.x requires + /// disconnect-before-offload while 1.x requires offload-before-disconnect. + pub product_version: String, +} + +impl NifiScalingHooks { + /// Build the FQDN for a NiFi pod by ordinal. + /// + /// Format: `{sts_name}-{ordinal}.{headless_svc}.{namespace}.svc.{cluster_domain}` + /// + /// # Parameters + /// + /// - `ordinal`: The StatefulSet pod ordinal (0-based). + fn pod_fqdn(&self, ordinal: i32) -> String { + format!( + "{sts_name}-{ordinal}.{headless_svc}.{namespace}.svc.{cluster_domain}", + sts_name = self.statefulset_name, + headless_svc = self.headless_service_name, + namespace = self.namespace, + cluster_domain = self.cluster_domain, + ) + } + + /// Build the NiFi REST API base URL for a given pod ordinal. + /// + /// # Parameters + /// + /// - `ordinal`: The StatefulSet pod ordinal whose API endpoint to target. + fn api_base_url(&self, ordinal: i32) -> String { + format!("https://{}:{}/nifi-api", self.pod_fqdn(ordinal), HTTPS_PORT) + } + + /// Whether this is a NiFi 2.x (or later) cluster. + /// + /// Parses the major version from `product_version` using semver. Falls back to + /// `false` (NiFi 1.x behavior) if the version string cannot be parsed. + fn is_nifi_2(&self) -> bool { + self.product_version + .split('.') + .next() + .and_then(|major| major.parse::().ok()) + .is_some_and(|major| major >= 2) + } + + /// Drive the scale-down: offload, disconnect, and delete nodes with + /// ordinals >= desired_replicas. + /// + /// Uses a phased approach that differs by NiFi version: + /// + /// NiFi 1.x: CONNECTED → OFFLOADING → OFFLOADED → DISCONNECTING → DISCONNECTED → DELETE + /// NiFi 2.x: CONNECTED → DISCONNECTING → DISCONNECTED → OFFLOADING → OFFLOADED → DELETE + /// + /// # Parameters + /// + /// - `ctx`: Scaling context with client, namespace, and replica counts. + /// + /// # Returns + /// + /// - [`HookOutcome::Done`] when all target nodes have been removed from the cluster. + /// - [`HookOutcome::InProgress`] when nodes are still transitioning. + async fn drive_scale_down(&self, ctx: &ScalingContext<'_>) -> Result { + // 1. Resolve credentials from K8s Secret + let NifiCredentials { username, password } = credentials::resolve_single_user_credentials( + ctx.client, + &self.credentials_secret_name, + ctx.namespace, + ) + .await + .context(ResolveCredentialsSnafu)?; + + // 2. Connect to NiFi REST API via pod-0 (always safe; won't be removed) + let api = NifiApiClient::connect(self.api_base_url(0), &username, &password) + .await + .context(NifiApiConnectSnafu)?; + + // 3. Query cluster nodes + let nodes = api + .get_cluster_nodes() + .await + .context(GetClusterNodesSnafu)?; + + // 4. Identify target nodes by matching pod FQDNs to NiFi node addresses + let mut targets = Vec::new(); + for ordinal in ctx.removed_ordinals() { + let pod_fqdn = self.pod_fqdn(ordinal); + if let Some(node) = nodes.iter().find(|n| n.address == pod_fqdn) { + targets.push(node.clone()); + } else { + let cluster_addresses: Vec<&str> = + nodes.iter().map(|n| n.address.as_str()).collect(); + warn!( + ordinal, + expected_fqdn = %pod_fqdn, + ?cluster_addresses, + "Target node FQDN not found in NiFi cluster — assuming already removed. \ + If this is unexpected, check that the headless service name and cluster \ + domain match the NiFi node addresses." + ); + } + } + + if targets.is_empty() { + return Ok(HookOutcome::Done); + } + + // 5. Run version-specific phased scale-down + if self.is_nifi_2() { + self.drive_scale_down_v2(&api, &targets).await + } else { + self.drive_scale_down_v1(&api, &targets).await + } + } + + /// NiFi 1.x scale-down sequence. + /// + /// # Parameters + /// + /// - `api`: Authenticated NiFi API client. + /// - `targets`: Nodes to decommission (ordinals >= desired replicas). + async fn drive_scale_down_v1( + &self, + api: &NifiApiClient, + targets: &[NifiNode], + ) -> Result { + // Phase 1: Offloading — trigger for CONNECTED nodes, wait for all + let mut any_in_progress = false; + for node in targets { + match node.status { + NifiNodeStatus::Connected => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 1.x: Initiating offload for connected node" + ); + api.set_node_status(&node.node_id, NifiNodeStatus::Offloading) + .await + .context(OffloadNodeSnafu { + node_id: node.node_id.clone(), + })?; + any_in_progress = true; + } + NifiNodeStatus::Connecting | NifiNodeStatus::Offloading => { + info!( + node_id = %node.node_id, + address = %node.address, + status = node.status.as_api_str(), + "NiFi 1.x: Node still in progress, waiting" + ); + any_in_progress = true; + } + NifiNodeStatus::Offloaded + | NifiNodeStatus::Disconnecting + | NifiNodeStatus::Disconnected => { + // Already past offload phase + } + } + } + + if any_in_progress { + return Ok(HookOutcome::InProgress); + } + + // Phase 2: Disconnecting — trigger for OFFLOADED nodes, wait for all + let mut any_in_progress = false; + for node in targets { + match node.status { + NifiNodeStatus::Offloaded => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 1.x: Disconnecting offloaded node" + ); + api.set_node_status(&node.node_id, NifiNodeStatus::Disconnecting) + .await + .context(DisconnectNodeSnafu { + node_id: node.node_id.clone(), + })?; + any_in_progress = true; + } + NifiNodeStatus::Disconnecting => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 1.x: Node still disconnecting" + ); + any_in_progress = true; + } + NifiNodeStatus::Disconnected => { + // Ready for deletion + } + other => { + return UnexpectedPhaseStatusSnafu { + address: node.address.clone(), + status: other, + } + .fail(); + } + } + } + + if any_in_progress { + return Ok(HookOutcome::InProgress); + } + + // Phase 3: Delete all DISCONNECTED nodes + self.delete_nodes_with_status(api, targets, NifiNodeStatus::Disconnected) + .await + } + + /// NiFi 2.x scale-down sequence. + /// + /// # Parameters + /// + /// - `api`: Authenticated NiFi API client. + /// - `targets`: Nodes to decommission (ordinals >= desired replicas). + async fn drive_scale_down_v2( + &self, + api: &NifiApiClient, + targets: &[NifiNode], + ) -> Result { + // Phase 1: Disconnecting — trigger for CONNECTED nodes, wait for all + let mut any_in_progress = false; + for node in targets { + match node.status { + NifiNodeStatus::Connected => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 2.x: Disconnecting connected node" + ); + api.set_node_status(&node.node_id, NifiNodeStatus::Disconnecting) + .await + .context(DisconnectNodeSnafu { + node_id: node.node_id.clone(), + })?; + any_in_progress = true; + } + NifiNodeStatus::Connecting | NifiNodeStatus::Disconnecting => { + info!( + node_id = %node.node_id, + address = %node.address, + status = node.status.as_api_str(), + "NiFi 2.x: Node still in progress, waiting" + ); + any_in_progress = true; + } + NifiNodeStatus::Disconnected + | NifiNodeStatus::Offloading + | NifiNodeStatus::Offloaded => { + // Already past disconnect phase + } + } + } + + if any_in_progress { + return Ok(HookOutcome::InProgress); + } + + // Phase 2: Offloading — trigger for DISCONNECTED nodes, wait for all + let mut any_in_progress = false; + for node in targets { + match node.status { + NifiNodeStatus::Disconnected => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 2.x: Offloading disconnected node" + ); + api.set_node_status(&node.node_id, NifiNodeStatus::Offloading) + .await + .context(OffloadNodeSnafu { + node_id: node.node_id.clone(), + })?; + any_in_progress = true; + } + NifiNodeStatus::Offloading => { + info!( + node_id = %node.node_id, + address = %node.address, + "NiFi 2.x: Node still offloading" + ); + any_in_progress = true; + } + NifiNodeStatus::Offloaded => { + // Ready for deletion + } + other => { + return UnexpectedPhaseStatusSnafu { + address: node.address.clone(), + status: other, + } + .fail(); + } + } + } + + if any_in_progress { + return Ok(HookOutcome::InProgress); + } + + // Phase 3: Delete all OFFLOADED nodes + self.delete_nodes_with_status(api, targets, NifiNodeStatus::Offloaded) + .await + } + + /// Delete all nodes in `targets` that match the given `ready_status`. + /// + /// Nodes not matching `ready_status` are silently skipped -- they are assumed + /// to have already been deleted or to be in an earlier phase. + /// + /// # Parameters + /// + /// - `api`: Authenticated NiFi API client. + /// - `targets`: The full set of target nodes for this scale-down operation. + /// - `ready_status`: Only nodes with this status will be deleted. + async fn delete_nodes_with_status( + &self, + api: &NifiApiClient, + targets: &[NifiNode], + ready_status: NifiNodeStatus, + ) -> Result { + for node in targets { + if node.status == ready_status { + info!( + node_id = %node.node_id, + address = %node.address, + "Deleting NiFi node from cluster" + ); + api.delete_node(&node.node_id) + .await + .context(DeleteNodeSnafu { + node_id: node.node_id.clone(), + })?; + } + } + Ok(HookOutcome::Done) + } +} + +impl ScalingHooks for NifiScalingHooks { + type Error = Error; + + async fn pre_scale(&self, ctx: &ScalingContext<'_>) -> Result { + if !ctx.is_scale_down() { + return Ok(HookOutcome::Done); + } + self.drive_scale_down(ctx).await + } + + // post_scale: use trait default (returns Done immediately). + // on_failure: use trait default (no-op). +} + +#[cfg(test)] +mod tests { + use super::*; + + fn hooks(version: &str) -> NifiScalingHooks { + NifiScalingHooks { + namespace: "default".to_string(), + credentials_secret_name: "nifi-users".to_string(), + statefulset_name: "test-cluster-node-default".to_string(), + headless_service_name: "test-cluster-node-default-headless".to_string(), + cluster_domain: "cluster.local".to_string(), + product_version: version.to_string(), + } + } + + #[test] + fn pod_fqdn_is_correct() { + let h = hooks("2.6.0"); + assert_eq!( + h.pod_fqdn(2), + "test-cluster-node-default-2.test-cluster-node-default-headless.default.svc.cluster.local" + ); + } + + #[test] + fn api_base_url_is_correct() { + let h = hooks("2.6.0"); + assert_eq!( + h.api_base_url(0), + "https://test-cluster-node-default-0.test-cluster-node-default-headless.default.svc.cluster.local:8443/nifi-api" + ); + } + + #[test] + fn is_nifi_2_detects_version() { + assert!(hooks("2.6.0").is_nifi_2()); + assert!(hooks("2.0.0").is_nifi_2()); + assert!(hooks("3.0.0").is_nifi_2()); + assert!(!hooks("1.28.0").is_nifi_2()); + assert!(!hooks("1.14.0").is_nifi_2()); + assert!(!hooks("0.9.0").is_nifi_2()); + // Unparseable version falls back to NiFi 1.x behavior + assert!(!hooks("invalid").is_nifi_2()); + } +} diff --git a/rust/operator-binary/src/reporting_task/mod.rs b/rust/operator-binary/src/reporting_task/mod.rs index a1fa4153..30262428 100644 --- a/rust/operator-binary/src/reporting_task/mod.rs +++ b/rust/operator-binary/src/reporting_task/mod.rs @@ -168,9 +168,10 @@ pub fn build_reporting_task_fqdn_service_name( )) } -/// Return the name of the first pod belonging to the first role group that contains more than 0 replicas. -/// If no replicas are set in any rolegroup (e.g. HPA, see ) -/// return the first rolegroup just in case. +/// Return the name of the first pod belonging to the first role group that has an +/// explicit [`ReplicasConfig`] set. +/// +/// If no role group has replicas configured, falls back to the first role group. /// This is required to only select a single node in the Reporting Task Service. fn get_reporting_task_service_selector_pod(nifi: &v1alpha1::NifiCluster) -> Result { let cluster_name = nifi.name_any(); @@ -191,11 +192,12 @@ fn get_reporting_task_service_selector_pod(nifi: &v1alpha1::NifiCluster) -> Resu selector_role_group = Some(role_group_name); } - if let Some(replicas) = role_group.replicas { - if replicas > 0 { - selector_role_group = Some(role_group_name); - break; - } + if role_group.replicas.is_some() { + // Any explicit ReplicasConfig implies running pods (Fixed(0) is + // rejected by validation, and all other variants manage replicas + // dynamically). + selector_role_group = Some(role_group_name); + break; } } diff --git a/rust/operator-binary/src/security/oidc.rs b/rust/operator-binary/src/security/oidc.rs index 0d93d942..c7d28d2c 100644 --- a/rust/operator-binary/src/security/oidc.rs +++ b/rust/operator-binary/src/security/oidc.rs @@ -186,6 +186,7 @@ mod tests { ); let oidc = oidc::v1alpha1::ClientAuthenticationOptions { client_credentials_secret_ref: "nifi-keycloak-client".to_owned(), + client_authentication_method: Default::default(), extra_scopes: vec![], product_specific_fields: (), };