diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a2d5602..6a487685 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - Add Listener integration for Trino ([#753]). - Add support for Trino 476 ([#755]). +- Add internal headless service in addition to the metrics service ([#766]). ### Changed @@ -56,6 +57,7 @@ All notable changes to this project will be documented in this file. [#753]: https://github.com/stackabletech/trino-operator/pull/753 [#755]: https://github.com/stackabletech/trino-operator/pull/755 [#760]: https://github.com/stackabletech/trino-operator/pull/760 +[#766]: https://github.com/stackabletech/trino-operator/pull/766 ## [25.3.0] - 2025-03-21 diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 2b610599..7b37f778 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -36,8 +36,7 @@ use stackable_operator::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, ConfigMapVolumeSource, ContainerPort, EnvVar, EnvVarSource, ExecAction, - HTTPGetAction, Probe, Secret, SecretKeySelector, Service, ServicePort, ServiceSpec, - Volume, + HTTPGetAction, Probe, Secret, SecretKeySelector, Volume, }, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, @@ -47,7 +46,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, - kvp::{Annotation, Label, Labels, ObjectLabels}, + kvp::{Annotation, Labels, ObjectLabels}, logging::controller::ReconcilerError, memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::{ @@ -88,16 +87,17 @@ use crate::{ authentication::resolve_authentication_classes, catalog, discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef}, - rolegroup_metrics_service_name, v1alpha1, + rolegroup_headless_service_name, v1alpha1, }, listener::{ LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc, - group_listener_name, + group_listener_name, secret_volume_listener_scope, }, operations::{ add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb::add_pdbs, }, product_logging::{get_log_properties, get_vector_toml}, + service::{build_rolegroup_headless_service, build_rolegroup_metrics_service}, }; pub struct Ctx { @@ -357,6 +357,9 @@ pub enum Error { #[snafu(display("failed to configure listener"))] ListenerConfiguration { source: crate::listener::Error }, + + #[snafu(display("failed to configure service"))] + ServiceConfiguration { source: crate::service::Error }, } type Result = std::result::Result; @@ -482,8 +485,37 @@ pub async fn reconcile_trino( .merged_config(&trino_role, &role_group_ref, &catalog_definitions) .context(FailedToResolveConfigSnafu)?; - let rg_service = - build_rolegroup_service(trino, &resolved_product_image, &role_group_ref)?; + let role_group_service_recommended_labels = build_recommended_labels( + trino, + &resolved_product_image.app_version_label, + &role_group_ref.role, + &role_group_ref.role_group, + ); + + let role_group_service_selector = Labels::role_group_selector( + trino, + APP_NAME, + &role_group_ref.role, + &role_group_ref.role_group, + ) + .context(LabelBuildSnafu)?; + + let rg_headless_service = build_rolegroup_headless_service( + trino, + &role_group_ref, + role_group_service_recommended_labels.clone(), + role_group_service_selector.clone().into(), + ) + .context(ServiceConfigurationSnafu)?; + + let rg_metrics_service = build_rolegroup_metrics_service( + trino, + &role_group_ref, + role_group_service_recommended_labels, + role_group_service_selector.into(), + ) + .context(ServiceConfigurationSnafu)?; + let rg_configmap = build_rolegroup_config_map( trino, &resolved_product_image, @@ -515,7 +547,14 @@ pub async fn reconcile_trino( )?; cluster_resources - .add(client, rg_service) + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: role_group_ref.clone(), + })?; + + cluster_resources + .add(client, rg_metrics_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { rolegroup: role_group_ref.clone(), @@ -834,7 +873,7 @@ fn build_rolegroup_catalog_config_map( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the -/// corresponding [`Service`] (from [`build_rolegroup_service`]). +/// corresponding [`stackable_operator::k8s_openapi::api::core::v1::Service`] (from [`build_rolegroup_headless_service`]). #[allow(clippy::too_many_arguments)] fn build_rolegroup_statefulset( trino: &v1alpha1::TrinoCluster, @@ -930,6 +969,7 @@ fn build_rolegroup_statefulset( // add volume mounts depending on the client tls, internal tls, catalogs and authentication tls_volume_mounts( trino, + trino_role, &mut pod_builder, &mut cb_prepare, &mut cb_trino, @@ -1193,7 +1233,7 @@ fn build_rolegroup_statefulset( ), ..LabelSelector::default() }, - service_name: Some(rolegroup_metrics_service_name( + service_name: Some(rolegroup_headless_service_name( &role_group_ref.object_name(), )), template: pod_template, @@ -1204,53 +1244,6 @@ fn build_rolegroup_statefulset( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup -/// -/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_rolegroup_service( - trino: &v1alpha1::TrinoCluster, - resolved_product_image: &ResolvedProductImage, - role_group_ref: &RoleGroupRef, -) -> Result { - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(trino) - .name(rolegroup_metrics_service_name( - &role_group_ref.object_name(), - )) - .ownerreference_from_resource(trino, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - trino, - &resolved_product_image.app_version_label, - &role_group_ref.role, - &role_group_ref.role_group, - )) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(), - spec: Some(ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), - ports: Some(service_ports()), - selector: Some( - Labels::role_group_selector( - trino, - APP_NAME, - &role_group_ref.role, - &role_group_ref.role_group, - ) - .context(LabelBuildSnafu)? - .into(), - ), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }), - status: None, - }) -} - pub fn error_policy( _obj: Arc>, error: &Error, @@ -1409,15 +1402,6 @@ fn get_random_base64() -> String { openssl::base64::encode_block(&buf) } -fn service_ports() -> Vec { - vec![ServicePort { - name: Some(METRICS_PORT_NAME.to_string()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }] -} - fn container_ports(trino: &v1alpha1::TrinoCluster) -> Vec { let mut ports = vec![ContainerPort { name: Some(METRICS_PORT_NAME.to_string()), @@ -1530,14 +1514,22 @@ fn create_tls_volume( volume_name: &str, tls_secret_class: &str, requested_secret_lifetime: &Duration, + listener_scope: Option, ) -> Result { + let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder::new(tls_secret_class); + + secret_volume_source_builder + .with_pod_scope() + .with_format(SecretFormat::TlsPkcs12) + .with_auto_tls_cert_lifetime(*requested_secret_lifetime); + + if let Some(listener_scope) = &listener_scope { + secret_volume_source_builder.with_listener_volume_scope(listener_scope); + } + Ok(VolumeBuilder::new(volume_name) .ephemeral( - SecretOperatorVolumeSourceBuilder::new(tls_secret_class) - .with_pod_scope() - .with_node_scope() - .with_format(SecretFormat::TlsPkcs12) - .with_auto_tls_cert_lifetime(*requested_secret_lifetime) + secret_volume_source_builder .build() .context(TlsCertSecretClassVolumeBuildSnafu)?, ) @@ -1546,6 +1538,7 @@ fn create_tls_volume( fn tls_volume_mounts( trino: &v1alpha1::TrinoCluster, + trino_role: &TrinoRole, pod_builder: &mut PodBuilder, cb_prepare: &mut ContainerBuilder, cb_trino: &mut ContainerBuilder, @@ -1564,6 +1557,8 @@ fn tls_volume_mounts( "server-tls-mount", server_tls, requested_secret_lifetime, + // add listener + secret_volume_listener_scope(trino_role), )?) .context(AddVolumeSnafu)?; } @@ -1600,6 +1595,7 @@ fn tls_volume_mounts( "internal-tls-mount", internal_tls, requested_secret_lifetime, + None, )?) .context(AddVolumeSnafu)?; diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index a68bfddc..862e7846 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -119,6 +119,7 @@ pub const MAX_TRINO_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity { }; pub const METRICS_SERVICE_SUFFIX: &str = "metrics"; +pub const HEADLESS_SERVICE_SUFFIX: &str = "headless"; pub const JVM_HEAP_FACTOR: f32 = 0.8; @@ -837,7 +838,7 @@ impl v1alpha1::TrinoCluster { let ns = ns.clone(); (0..rolegroup.replicas.unwrap_or(0)).map(move |i| TrinoPodRef { namespace: ns.clone(), - role_group_service_name: rolegroup_metrics_service_name( + role_group_service_name: rolegroup_headless_service_name( &role_group_ref.object_name(), ), pod_name: format!( @@ -945,6 +946,11 @@ pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> Strin format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}") } +/// Returns the headless rolegroup service name `---`. +pub fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String { + format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}") +} + fn extract_role_from_coordinator_config( fragment: Role, ) -> Role { diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs index 443a7a45..a41c3c98 100644 --- a/rust/operator-binary/src/listener.rs +++ b/rust/operator-binary/src/listener.rs @@ -83,6 +83,14 @@ pub fn group_listener_name(trino: &v1alpha1::TrinoCluster, role: &TrinoRole) -> } } +/// The listener volume name depending on the role +pub fn secret_volume_listener_scope(role: &TrinoRole) -> Option { + match role { + TrinoRole::Coordinator => Some(LISTENER_VOLUME_NAME.to_string()), + TrinoRole::Worker => None, + } +} + /// We only use the http/https port here and intentionally omit the metrics one. fn listener_ports(trino: &v1alpha1::TrinoCluster) -> Vec { let name = trino.exposed_protocol().to_string(); diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 25a44ba4..43c3125c 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -8,6 +8,7 @@ mod crd; mod listener; mod operations; mod product_logging; +mod service; use std::sync::Arc; diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs new file mode 100644 index 00000000..39595443 --- /dev/null +++ b/rust/operator-binary/src/service.rs @@ -0,0 +1,117 @@ +use std::collections::BTreeMap; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Label, ObjectLabels}, + role_utils::RoleGroupRef, +}; + +use crate::crd::{ + METRICS_PORT, METRICS_PORT_NAME, rolegroup_headless_service_name, + rolegroup_metrics_service_name, v1alpha1, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, +} + +/// The rolegroup headless [`Service`] is a service that allows direct access to the instances of a certain rolegroup +/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. +pub fn build_rolegroup_headless_service( + trino: &v1alpha1::TrinoCluster, + role_group_ref: &RoleGroupRef, + object_labels: ObjectLabels, + selector: BTreeMap, +) -> Result { + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(trino) + .name(rolegroup_headless_service_name( + &role_group_ref.object_name(), + )) + .ownerreference_from_resource(trino, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(headless_service_ports(trino)), + selector: Some(selector), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }) +} + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label. +pub fn build_rolegroup_metrics_service( + trino: &v1alpha1::TrinoCluster, + role_group_ref: &RoleGroupRef, + object_labels: ObjectLabels, + selector: BTreeMap, +) -> Result { + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(trino) + .name(rolegroup_metrics_service_name( + &role_group_ref.object_name(), + )) + .ownerreference_from_resource(trino, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(metrics_service_ports()), + selector: Some(selector), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }) +} + +fn headless_service_ports(trino: &v1alpha1::TrinoCluster) -> Vec { + let name = trino.exposed_protocol().to_string(); + let port = trino.exposed_port().into(); + + vec![ServicePort { + name: Some(name), + port, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +fn metrics_service_ports() -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} diff --git a/tests/templates/kuttl/authentication/20-assert.yaml b/tests/templates/kuttl/authentication/20-assert.yaml index 8f38517c..ea54a3ac 100644 --- a/tests/templates/kuttl/authentication/20-assert.yaml +++ b/tests/templates/kuttl/authentication/20-assert.yaml @@ -4,10 +4,10 @@ kind: TestAssert timeout: 300 commands: # file - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_1 -p test_user_1 -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_2_other -p test_user_2_other -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_1 -p test_user_1 -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_2_other -p test_user_2_other -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1 # ldap - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest -p integrationtest -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest-other -p integrationtest-other -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest -p integrationtest -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest-other -p integrationtest-other -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1 # oidc/oauth2 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-oauth-login.py https://trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local:8443/ui/ + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-oauth-login.py https://trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local:8443/ui/ diff --git a/tests/templates/kuttl/authentication/31-assert.yaml b/tests/templates/kuttl/authentication/31-assert.yaml index 5b2948fa..98b70e5b 100644 --- a/tests/templates/kuttl/authentication/31-assert.yaml +++ b/tests/templates/kuttl/authentication/31-assert.yaml @@ -5,4 +5,4 @@ timeout: 600 commands: # file # new user? - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1 diff --git a/tests/templates/kuttl/authentication/33-assert.yaml b/tests/templates/kuttl/authentication/33-assert.yaml index 5794c83e..f8c36c72 100644 --- a/tests/templates/kuttl/authentication/33-assert.yaml +++ b/tests/templates/kuttl/authentication/33-assert.yaml @@ -5,4 +5,4 @@ timeout: 600 commands: # We use the check-active-workers script for the login. Since we do want to wait until we cannot log in anymore # we flip the return value in the end. - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1; if [ $? -eq 0 ]; then exit 1; fi + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-headless.$NAMESPACE.svc.cluster.local -w 1; if [ $? -eq 0 ]; then exit 1; fi diff --git a/tests/templates/kuttl/listener/10-assert.yaml b/tests/templates/kuttl/listener/10-assert.yaml index 24ae199a..1dea84a0 100644 --- a/tests/templates/kuttl/listener/10-assert.yaml +++ b/tests/templates/kuttl/listener/10-assert.yaml @@ -42,18 +42,32 @@ kind: Service metadata: name: test-trino-coordinator spec: - type: NodePort # external-unstable - by listener op + type: NodePort # by listener op +--- +apiVersion: v1 +kind: Service +metadata: + name: test-trino-coordinator-default-headless +spec: + type: ClusterIP # by trino op --- apiVersion: v1 kind: Service metadata: name: test-trino-coordinator-default-metrics spec: - type: ClusterIP # cluster-internal - by trino op + type: ClusterIP # by trino op +--- +apiVersion: v1 +kind: Service +metadata: + name: test-trino-worker-default-headless +spec: + type: ClusterIP # by trino op --- apiVersion: v1 kind: Service metadata: name: test-trino-worker-default-metrics spec: - type: ClusterIP # cluster-internal - by trino op + type: ClusterIP # by trino op diff --git a/tests/templates/kuttl/tls/check-tls.py b/tests/templates/kuttl/tls/check-tls.py index dc7e7515..96de7d9a 100755 --- a/tests/templates/kuttl/tls/check-tls.py +++ b/tests/templates/kuttl/tls/check-tls.py @@ -65,7 +65,7 @@ def read_json(config_path): "/tmp/test-config.json" ) # config file to indicate our test script if auth / tls is used or not coordinator_host = ( - "trino-coordinator-default-metrics." + namespace + ".svc.cluster.local" + f"trino-coordinator-default-headless.{namespace}.svc.cluster.local" ) trusted_ca = "/stackable/trusted/ca.crt" # will be mounted from secret op untrusted_ca = "/stackable/untrusted-cert.crt" # some random CA