Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ All notable changes to this project will be documented in this file.
### Added

- Add rolling upgrade support for upgrades between NiFi 2 versions ([#771]).
- Added Listener support for NiFi ([#784]).
- BREAKING: Added Listener support for NiFi ([#784], [#819]).
- Adds new telemetry CLI arguments and environment variables ([#782]).
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
Expand Down Expand Up @@ -69,6 +69,7 @@ All notable changes to this project will be documented in this file.
[#801]: https://github.com/stackabletech/nifi-operator/pull/801
[#808]: https://github.com/stackabletech/nifi-operator/pull/808
[#817]: https://github.com/stackabletech/nifi-operator/pull/817
[#819]: https://github.com/stackabletech/nifi-operator/pull/819

## [25.3.0] - 2025-03-21

Expand Down
33 changes: 17 additions & 16 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ use crate::{
tls::{KEYSTORE_NIFI_CONTAINER_MOUNT, KEYSTORE_VOLUME_NAME, TRUSTSTORE_VOLUME_NAME},
},
service::{
build_rolegroup_headless_service, build_rolegroup_metrics_service,
build_rolegroup_headless_service, build_rolegroup_metrics_service, metrics_service_port,
rolegroup_headless_service_name,
},
};
Expand Down Expand Up @@ -562,22 +562,23 @@ pub async fn reconcile_nifi(
)
.await?;

if resolved_product_image.product_version.starts_with("1.") {
let rg_metrics_service = build_rolegroup_metrics_service(
nifi,
&rolegroup,
role_group_service_recommended_labels,
role_group_service_selector.into(),
)
.context(ServiceConfigurationSnafu)?;
let rg_metrics_service = build_rolegroup_metrics_service(
nifi,
&rolegroup,
role_group_service_recommended_labels,
role_group_service_selector.into(),
vec![metrics_service_port(
&resolved_product_image.product_version,
)],
)
.context(ServiceConfigurationSnafu)?;

cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
})?;
}
cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
})?;

cluster_resources
.add(client, rg_headless_service)
Expand Down
29 changes: 21 additions & 8 deletions rust/operator-binary/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub fn build_rolegroup_metrics_service(
role_group_ref: &RoleGroupRef<v1alpha1::NifiCluster>,
object_labels: ObjectLabels<v1alpha1::NifiCluster>,
selector: BTreeMap<String, String>,
ports: Vec<ServicePort>,
) -> Result<Service, Error> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
Expand All @@ -86,7 +87,7 @@ pub fn build_rolegroup_metrics_service(
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(metrics_service_ports()),
ports: Some(ports),
selector: Some(selector),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
Expand All @@ -104,13 +105,25 @@ fn headless_service_ports() -> Vec<ServicePort> {
}]
}

fn metrics_service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
/// Returns the metrics port based on the NiFi version
/// V1: Uses extra port via JMX exporter
/// V2: Uses NiFi HTTP(S) port for metrics
pub fn metrics_service_port(product_version: &str) -> ServicePort {
if product_version.starts_with("1.") {
ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}
} else {
ServicePort {
name: Some(HTTPS_PORT_NAME.into()),
port: HTTPS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}
}
}

/// Returns the metrics rolegroup service name `<cluster>-<role>-<rolegroup>-<METRICS_SERVICE_SUFFIX>`.
Expand Down
15 changes: 13 additions & 2 deletions tests/templates/kuttl/external-access/30-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ status:
readyReplicas: 2
replicas: 2
---
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
Expand All @@ -25,11 +24,23 @@ status:
currentHealthy: 2
disruptionsAllowed: 1
---
---
apiVersion: v1
kind: Service
metadata:
name: test-nifi-node
spec:
type: NodePort # external-unstable
---
apiVersion: v1
kind: Service
metadata:
name: test-nifi-node-default-headless
spec:
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
name: test-nifi-node-default-metrics
spec:
type: ClusterIP