From 97f20fa905d80f77570b08a23c43bfde10c30c4a Mon Sep 17 00:00:00 2001 From: Benedikt Labrenz Date: Tue, 8 Apr 2025 14:46:50 +0200 Subject: [PATCH 1/6] inject vector aggregator address as env into config & add watch for referenced cms --- Cargo.lock | 8 +-- Cargo.toml | 2 +- rust/operator-binary/src/controller.rs | 71 +++++++++++---------- rust/operator-binary/src/main.rs | 24 +++++++ rust/operator-binary/src/product_logging.rs | 50 +-------------- 5 files changed, 67 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26e74c08..1829fa1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2699,8 +2699,8 @@ dependencies = [ [[package]] name = "stackable-operator" -version = "0.89.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +version = "0.90.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "chrono", "clap", @@ -2735,7 +2735,7 @@ dependencies = [ [[package]] name = "stackable-operator-derive" version = "0.3.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "darling", "proc-macro2", @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "stackable-shared" version = "0.0.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "kube", "semver", diff --git a/Cargo.toml b/Cargo.toml index 1fec114b..7569a983 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/nifi-operator" [workspace.dependencies] product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" } stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" } stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" } diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index dd12d70d..e58c68de 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -88,7 +88,7 @@ use crate::{ v1alpha1, }, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, - product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address}, + product_logging::extend_role_group_config_map, reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name}, security::{ authentication::{ @@ -254,10 +254,8 @@ pub enum Error { #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crate::crd::Error }, - #[snafu(display("failed to resolve the Vector aggregator address"))] - ResolveVectorAggregatorAddress { - source: crate::product_logging::Error, - }, + #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] + VectorAggregatorConfigMapMissing, #[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))] InvalidLoggingConfig { @@ -515,10 +513,6 @@ pub async fn reconcile_nifi( .context(SecuritySnafu)?; } - let vector_aggregator_address = resolve_vector_aggregator_address(nifi, client) - .await - .context(ResolveVectorAggregatorAddressSnafu)?; - let (rbac_sa, rbac_rolebinding) = build_rbac_resources( nifi, APP_NAME, @@ -572,7 +566,6 @@ pub async fn reconcile_nifi( &rolegroup, rolegroup_config, &merged_config, - vector_aggregator_address.as_deref(), &proxy_hosts, ) .await?; @@ -747,7 +740,6 @@ async fn build_node_rolegroup_config_map( rolegroup: &RoleGroupRef, rolegroup_config: &HashMap>, merged_config: &NifiConfig, - vector_aggregator_address: Option<&str>, proxy_hosts: &str, ) -> Result { tracing::debug!("building rolegroup configmaps"); @@ -833,15 +825,11 @@ async fn build_node_rolegroup_config_map( })?, ); - extend_role_group_config_map( - rolegroup, - vector_aggregator_address, - &merged_config.logging, - &mut cm_builder, - ) - .context(InvalidLoggingConfigSnafu { - cm_name: rolegroup.object_name(), - })?; + extend_role_group_config_map(rolegroup, &merged_config.logging, &mut cm_builder).context( + InvalidLoggingConfigSnafu { + cm_name: rolegroup.object_name(), + }, + )?; cm_builder .build() @@ -1244,21 +1232,34 @@ async fn build_node_rolegroup_statefulset( } if merged_config.logging.enable_vector_agent { - pod_builder.add_container( - product_logging::framework::vector_container( - resolved_product_image, - "config", - "log", - merged_config.logging.containers.get(&Container::Vector), - ResourceRequirementsBuilder::new() - .with_cpu_request("250m") - .with_cpu_limit("500m") - .with_memory_request("128Mi") - .with_memory_limit("128Mi") - .build(), - ) - .context(ConfigureLoggingSnafu)?, - ); + match nifi + .spec + .cluster_config + .vector_aggregator_config_map_name + .to_owned() + { + Some(vector_aggregator_config_map_name) => { + pod_builder.add_container( + product_logging::framework::vector_container( + resolved_product_image, + "config", + "log", + merged_config.logging.containers.get(&Container::Vector), + ResourceRequirementsBuilder::new() + .with_cpu_request("250m") + .with_cpu_limit("500m") + .with_memory_request("128Mi") + .with_memory_limit("128Mi") + .build(), + &vector_aggregator_config_map_name, + ) + .context(ConfigureLoggingSnafu)?, + ); + } + None => { + VectorAggregatorConfigMapMissingSnafu.fail()?; + } + } } nifi_auth_config diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index bc7c8d77..26676360 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -11,6 +11,7 @@ use stackable_operator::{ core::v1::{ConfigMap, Service}, }, kube::{ + ResourceExt, core::DeserializeGuard, runtime::{ Controller, @@ -140,6 +141,7 @@ async fn main() -> anyhow::Result<()> { ); let nifi_store_1 = nifi_controller.store(); + let nifi_store_2 = nifi_controller.store(); nifi_controller .owns( @@ -165,6 +167,17 @@ async fn main() -> anyhow::Result<()> { .map(|nifi| ObjectRef::from_obj(&*nifi)) }, ) + .watches( + watch_namespace.get_api::>(&client), + watcher::Config::default(), + move |config_map| { + nifi_store_2 + .state() + .into_iter() + .filter(move |nifi| references_config_map(nifi, &config_map)) + .map(|nifi| ObjectRef::from_obj(&*nifi)) + }, + ) .run( controller::reconcile_nifi, controller::error_policy, @@ -196,3 +209,14 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +fn references_config_map( + nifi: &DeserializeGuard, + config_map: &DeserializeGuard, +) -> bool { + let Ok(nifi) = &nifi.0 else { + return false; + }; + + nifi.spec.cluster_config.zookeeper_config_map_name == config_map.name_any() +} diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 0faa965f..9ae11b08 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -1,9 +1,6 @@ -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::Snafu; use stackable_operator::{ builder::configmap::ConfigMapBuilder, - client::Client, - k8s_openapi::api::core::v1::ConfigMap, - kube::ResourceExt, memory::BinaryMultiple, product_logging::{ self, @@ -39,7 +36,6 @@ type Result = std::result::Result; pub const LOGBACK_CONFIG_FILE: &str = "logback.xml"; pub const NIFI_LOG_FILE: &str = "nifi.log4j.xml"; -const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS"; const CONSOLE_CONVERSION_PATTERN: &str = "%date %level [%thread] %logger{40} %msg%n"; // This is required to remove double entries in the nifi.log4j.xml as well as nested // console output like: " ... ... @@ -58,47 +54,9 @@ const ADDITONAL_LOGBACK_CONFIG: &str = r#" Result> { - let vector_aggregator_address = if let Some(vector_aggregator_config_map_name) = &nifi - .spec - .cluster_config - .vector_aggregator_config_map_name - .as_ref() - { - let vector_aggregator_address = client - .get::( - vector_aggregator_config_map_name, - nifi.namespace() - .as_deref() - .context(ObjectHasNoNamespaceSnafu)?, - ) - .await - .context(ConfigMapNotFoundSnafu { - cm_name: vector_aggregator_config_map_name.to_string(), - })? - .data - .and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY)) - .context(MissingConfigMapEntrySnafu { - entry: VECTOR_AGGREGATOR_CM_ENTRY, - cm_name: vector_aggregator_config_map_name.to_string(), - })?; - Some(vector_aggregator_address) - } else { - None - }; - - Ok(vector_aggregator_address) -} - /// Extend the role group ConfigMap with logging and Vector configurations pub fn extend_role_group_config_map( rolegroup: &RoleGroupRef, - vector_aggregator_address: Option<&str>, logging: &Logging, cm_builder: &mut ConfigMapBuilder, ) -> Result<()> { @@ -137,11 +95,7 @@ pub fn extend_role_group_config_map( if logging.enable_vector_agent { cm_builder.add_data( product_logging::framework::VECTOR_CONFIG_FILE, - product_logging::framework::create_vector_config( - rolegroup, - vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?, - vector_log_config, - ), + product_logging::framework::create_vector_config(rolegroup, vector_log_config), ); } From 396eec69343e97c771a567d98f9a867faeb62c48 Mon Sep 17 00:00:00 2001 From: Benedikt Labrenz Date: Tue, 8 Apr 2025 14:50:28 +0200 Subject: [PATCH 2/6] add changelog entry --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94383a2b..02854750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,15 @@ All notable changes to this project will be documented in this file. - BREAKING: The file log directory was set by `NIFI_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS` (or via `--rolling-logs `). - Replace stackable-operator `print_startup_string` with `tracing::info!` with fields. +- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead + of having the operator write it to the vector config ([#XXX]). + +### Fixed + +- Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#XXX]). [#767]: https://github.com/stackabletech/nifi-operator/pull/767 +[#XXX]: https://github.com/stackabletech/nifi-operator/pull/XXX ## [25.3.0] - 2025-03-21 From 2616f0feb2b3f5fea92ad3d237732613bed22c29 Mon Sep 17 00:00:00 2001 From: Benedikt Labrenz Date: Tue, 8 Apr 2025 14:56:07 +0200 Subject: [PATCH 3/6] add pr number to changelog --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02854750..4b9120df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,14 +11,14 @@ All notable changes to this project will be documented in this file. (or via `--rolling-logs `). - Replace stackable-operator `print_startup_string` with `tracing::info!` with fields. - BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead - of having the operator write it to the vector config ([#XXX]). + of having the operator write it to the vector config ([#772]). ### Fixed -- Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#XXX]). +- Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#772]). [#767]: https://github.com/stackabletech/nifi-operator/pull/767 -[#XXX]: https://github.com/stackabletech/nifi-operator/pull/XXX +[#772]: https://github.com/stackabletech/nifi-operator/pull/772 ## [25.3.0] - 2025-03-21 From 35c6090c1b0018372f3a424410940ddc737280b6 Mon Sep 17 00:00:00 2001 From: Benedikt Labrenz Date: Tue, 8 Apr 2025 21:28:54 +0200 Subject: [PATCH 4/6] stop ignoring unencountered rustsec advisories --- deny.toml | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/deny.toml b/deny.toml index 2c0138d0..1d140638 100644 --- a/deny.toml +++ b/deny.toml @@ -9,27 +9,6 @@ targets = [ [advisories] yanked = "deny" -ignore = [ - # https://rustsec.org/advisories/RUSTSEC-2023-0071 - # "rsa" crate: Marvin Attack: potential key recovery through timing sidechannel - # - # No patch is yet available, however work is underway to migrate to a fully constant-time implementation - # So we need to accept this, as of SDP 24.11 we are not using the rsa crate to create certificates used in production - # setups. - # - # TODO: Remove after https://github.com/RustCrypto/RSA/pull/394 is merged - "RUSTSEC-2023-0071", - - # https://rustsec.org/advisories/RUSTSEC-2024-0384 - # "instant" is unmaintained - # - # The upstream "kube" crate also silenced this in https://github.com/kube-rs/kube/commit/4f1e889f265da8f19f03f60683569cae1a154fda - # They/we are actively working on migrating kube from backoff to backon, which removes the transitive dependency on - # instant, in https://github.com/kube-rs/kube/pull/1652. - # - # TODO: Remove after https://github.com/kube-rs/kube/pull/1652 is merged - "RUSTSEC-2024-0384", -] [bans] multiple-versions = "allow" From 52744338248adf33b4e81cffc04235610334130e Mon Sep 17 00:00:00 2001 From: Benedikt Labrenz Date: Wed, 9 Apr 2025 17:30:45 +0200 Subject: [PATCH 5/6] rename stores --- rust/operator-binary/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 02784215..c441abbb 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -140,8 +140,8 @@ async fn main() -> anyhow::Result<()> { watcher::Config::default(), ); - let nifi_store_1 = nifi_controller.store(); - let nifi_store_2 = nifi_controller.store(); + let authentication_class_store = nifi_controller.store(); + let config_map_store = nifi_controller.store(); nifi_controller .owns( @@ -161,7 +161,7 @@ async fn main() -> anyhow::Result<()> { client.get_api::>(&()), watcher::Config::default(), move |_| { - nifi_store_1 + authentication_class_store .state() .into_iter() .map(|nifi| ObjectRef::from_obj(&*nifi)) @@ -171,7 +171,7 @@ async fn main() -> anyhow::Result<()> { watch_namespace.get_api::>(&client), watcher::Config::default(), move |config_map| { - nifi_store_2 + config_map_store .state() .into_iter() .filter(move |nifi| references_config_map(nifi, &config_map)) From fd5725781eee8ee8bb7900a22de76fdc39008f16 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Wed, 9 Apr 2025 18:31:14 +0200 Subject: [PATCH 6/6] chore: Use borrows --- rust/operator-binary/src/controller.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index e58c68de..76ba4521 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -1232,12 +1232,7 @@ async fn build_node_rolegroup_statefulset( } if merged_config.logging.enable_vector_agent { - match nifi - .spec - .cluster_config - .vector_aggregator_config_map_name - .to_owned() - { + match &nifi.spec.cluster_config.vector_aggregator_config_map_name { Some(vector_aggregator_config_map_name) => { pod_builder.add_container( product_logging::framework::vector_container( @@ -1251,7 +1246,7 @@ async fn build_node_rolegroup_statefulset( .with_memory_request("128Mi") .with_memory_limit("128Mi") .build(), - &vector_aggregator_config_map_name, + vector_aggregator_config_map_name, ) .context(ConfigureLoggingSnafu)?, );