diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c27ac6..914e2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ All notable changes to this project will be documented in this file. ### Added - Support configuring the scaler reconcile interval ([#61]). +- Add simple web-based dashboard that shows the current state and query counts of all clusters. + This makes it easier to debug state transitions of clusters ([#62]). ### Changed @@ -18,6 +20,7 @@ All notable changes to this project will be documented in this file. [#57]: https://github.com/stackabletech/trino-lb/pull/57 [#61]: https://github.com/stackabletech/trino-lb/pull/61 +[#62]: https://github.com/stackabletech/trino-lb/pull/62 ## [0.3.2] - 2024-08-20 diff --git a/Cargo.lock b/Cargo.lock index ccdc55b..0203e4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,6 +122,50 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "askama" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b79091df18a97caea757e28cd2d5fda49c6cd4bd01ddffd7ff01ace0c0ad2c28" +dependencies = [ + "askama_derive", + "askama_escape", + "humansize", + "num-traits", + "percent-encoding", +] + +[[package]] +name = "askama_derive" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19fe8d6cb13c4714962c072ea496f3392015f0989b1a2847bb4b2d9effd71d83" +dependencies = [ + "askama_parser", + "basic-toml", + "mime", + "mime_guess", + "proc-macro2", + "quote", + "serde", + "syn", +] + +[[package]] +name = "askama_escape" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" + +[[package]] +name = "askama_parser" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acb1161c6b64d1c3d83108213c2a2533a342ac225aabd0bda218278c2ddb00c0" +dependencies = [ + "nom", +] + [[package]] name = "async-broadcast" version = "0.7.2" @@ -379,6 +423,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "basic-toml" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "823388e228f614e9558c6804262db37960ec8821856535f5c3f59913140558f8" +dependencies = [ + "serde", +] + [[package]] name = "bigdecimal" version = "0.3.1" @@ -1322,6 +1375,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.1.0" @@ -2001,6 +2063,22 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.3" @@ -2021,6 +2099,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -4178,6 +4266,7 @@ dependencies = [ name = "trino-lb" version = "0.3.2" dependencies = [ + "askama", "axum 0.8.1", "axum-server", "chrono", @@ -4294,6 +4383,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-bidi" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index 35eff2e..1cafc03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ edition = "2021" repository = "https://github.com/stackabletech/trino-lb" [workspace.dependencies] +askama = "0.12" axum = { version = "0.8", features = ["tracing"] } # If we use the feature "tls-rustls" it will pull in the "aws-lc-rs" crate, which as of 2024-08-16 I did not get to build in the "make run-dev" workflow :/ axum-server = { version = "0.7", features = ["tls-rustls-no-provider"] } diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 767a9a5..b61cf11 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -137,7 +137,7 @@ impl Default for TrinoLbPortsConfig { #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields, rename_all = "camelCase")] pub enum PersistenceConfig { - InMemory, + InMemory {}, Redis(RedisConfig), Postgres(PostgresConfig), } diff --git a/trino-lb-core/src/trino_cluster.rs b/trino-lb-core/src/trino_cluster.rs index 7a5c1cd..8bf4cef 100644 --- a/trino-lb-core/src/trino_cluster.rs +++ b/trino-lb-core/src/trino_cluster.rs @@ -1,4 +1,4 @@ -use std::time::SystemTime; +use std::{fmt::Display, time::SystemTime}; use serde::{Deserialize, Serialize}; use strum::IntoStaticStr; @@ -23,6 +23,20 @@ pub enum ClusterState { Deactivated, } +impl Display for ClusterState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClusterState::Unknown => f.write_str("Unknown"), + ClusterState::Stopped => f.write_str("Stopped"), + ClusterState::Starting => f.write_str("Starting"), + ClusterState::Ready => f.write_str("Ready"), + ClusterState::Draining { .. } => f.write_str("Draining"), + ClusterState::Terminating => f.write_str("Terminating"), + ClusterState::Deactivated => f.write_str("Deactivated"), + } + } +} + impl ClusterState { pub fn start(&self) -> Self { match self { diff --git a/trino-lb/Cargo.toml b/trino-lb/Cargo.toml index 2bfa981..b96a0c2 100644 --- a/trino-lb/Cargo.toml +++ b/trino-lb/Cargo.toml @@ -15,6 +15,7 @@ default-run = "trino-lb" trino-lb-core = { path = "../trino-lb-core" } trino-lb-persistence = { path = "../trino-lb-persistence" } +askama.workspace = true axum-server.workspace = true axum.workspace = true chrono.workspace = true @@ -22,6 +23,7 @@ clap.workspace = true enum_dispatch.workspace = true futures.workspace = true http.workspace = true +indoc.workspace = true k8s-openapi.workspace = true kube.workspace = true main_error.workspace = true diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index 75399a0..126fde9 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -13,7 +13,7 @@ use tracing::{debug, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use trino_lb_core::{ config::Config, sanitization::Sanitize, trino_api::TrinoQueryApiResponse, - trino_query::TrinoQuery, + trino_cluster::ClusterState, trino_query::TrinoQuery, }; use trino_lb_persistence::{Persistence, PersistenceImplementation}; use url::Url; @@ -72,13 +72,19 @@ pub struct ClusterGroupManager { http_client: Client, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct TrinoCluster { pub name: String, pub max_running_queries: u64, pub endpoint: Url, } +#[derive(Clone, Debug)] +pub struct ClusterStats { + pub state: ClusterState, + pub query_counter: u64, +} + pub enum SendToTrinoResponse { HandedOver { trino_query_api_response: TrinoQueryApiResponse, @@ -251,13 +257,36 @@ impl ClusterGroupManager { Ok(()) } - /// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have reached their - /// configured query limit, this function returns [`None`]. + /// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have + /// reached their configured query limit, this function returns [`None`]. #[instrument(skip(self))] pub async fn try_find_best_cluster_for_group( &self, cluster_group: &str, ) -> Result, Error> { + let cluster_stats = self + .get_cluster_stats_for_cluster_group(cluster_group) + .await?; + + let cluster_with_min_queries = cluster_stats + .into_iter() + // Only send queries to clusters that are actually able to accept them + .filter(|(_, stats)| stats.state.ready_to_accept_queries()) + // Only send queries to clusters that are not already full + .filter(|(cluster, stats)| stats.query_counter < cluster.max_running_queries) + // Pick the emptiest cluster + .min_by_key(|(_, stats)| stats.query_counter) + .map(|(cluster, _)| cluster); + + Ok(cluster_with_min_queries) + } + + /// Collect statistics (such as state and query counter) for all Trino clusters in a given clusterGroup + #[instrument(skip(self))] + pub async fn get_cluster_stats_for_cluster_group( + &self, + cluster_group: &str, + ) -> Result, Error> { let clusters = self .groups .get(cluster_group) @@ -273,13 +302,6 @@ impl ClusterGroupManager { .await .context(ReadCurrentClusterStateForClusterGroupFromPersistenceSnafu { cluster_group })?; - let clusters = clusters - .iter() - .zip(cluster_states) - .filter(|(_, state)| state.ready_to_accept_queries()) - .map(|(c, _)| c) - .collect::>(); - let cluster_query_counters = try_join_all( clusters .iter() @@ -288,21 +310,42 @@ impl ClusterGroupManager { .await .context(GetQueryCounterForGroupSnafu { cluster_group })?; - let debug_output = clusters + let cluster_stats = clusters .iter() - .map(|c| &c.name) - .zip(cluster_query_counters.iter()) - .collect::>(); - debug!(query_counters = ?debug_output, "Clusters had the following query counters"); - - let cluster_with_min_queries = clusters - .into_iter() + .zip(cluster_states) .zip(cluster_query_counters) - .filter(|(cluster, counter)| *counter < cluster.max_running_queries) - .min_by_key(|(_, counter)| *counter) - .map(|(c, _)| c); + .map(|((trino_cluster, state), query_counter)| { + ( + trino_cluster, + ClusterStats { + state, + query_counter, + }, + ) + }) + .collect(); + + debug!(?cluster_stats, "Clusters had the following stats"); + + Ok(cluster_stats) + } - Ok(cluster_with_min_queries) + /// Get the stats for all clusters, regardless the cluster group membership + pub async fn get_all_cluster_stats( + &self, + ) -> Result, Error> { + let cluster_stats = try_join_all( + self.groups + .keys() + .map(|cluster_group| self.get_cluster_stats_for_cluster_group(cluster_group)), + ) + .await?; + + let mut all_cluster_stats = HashMap::new(); + for cluster_stat in cluster_stats { + all_cluster_stats.extend(cluster_stat); + } + Ok(all_cluster_stats) } } diff --git a/trino-lb/src/http_server/mod.rs b/trino-lb/src/http_server/mod.rs index fcc6553..eeb94e8 100644 --- a/trino-lb/src/http_server/mod.rs +++ b/trino-lb/src/http_server/mod.rs @@ -94,6 +94,7 @@ pub async fn start_http_server( }); let app = Router::new() + .route("/", get(|| async { Redirect::permanent("/ui/index.html") })) .route("/v1/statement", post(v1::statement::post_statement)) .route( "/v1/statement/queued_in_trino_lb/{query_id}/{sequence_number}", @@ -119,6 +120,7 @@ pub async fn start_http_server( "/v1/statement/executing/{query_id}/{slug}/{token}", delete(v1::statement::delete_trino_executing_statement), ) + .route("/ui/index.html", get(ui::index::get_ui_index)) .route("/ui/query.html", get(ui::query::get_ui_query)) .with_state(app_state); diff --git a/trino-lb/src/http_server/ui/index.rs b/trino-lb/src/http_server/ui/index.rs new file mode 100644 index 0000000..23378cd --- /dev/null +++ b/trino-lb/src/http_server/ui/index.rs @@ -0,0 +1,70 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use askama::Template; +use axum::{ + extract::State, + response::{Html, IntoResponse, Response}, +}; +use http::StatusCode; +use opentelemetry::KeyValue; +use snafu::{ResultExt, Snafu}; +use tracing::{instrument, warn}; + +use crate::{ + cluster_group_manager::{self, ClusterStats}, + http_server::AppState, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("Failed to get all cluster states"))] + GetAllClusterStates { + source: cluster_group_manager::Error, + }, + + #[snafu(display("Failed to render template"))] + RenderTemplate { source: askama::Error }, +} + +impl IntoResponse for Error { + fn into_response(self) -> Response { + warn!(error = ?self, "Error while processing ui query request"); + let status_code = match self { + Error::GetAllClusterStates { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::RenderTemplate { .. } => StatusCode::INTERNAL_SERVER_ERROR, + }; + (status_code, format!("{self}")).into_response() + } +} + +#[derive(Template)] +#[template(path = "index.html")] +struct IndexTemplate<'a> { + cluster_stats: &'a BTreeMap<&'a String, ClusterStats>, +} + +/// Show some information to the user about the query state +#[instrument(name = "GET /ui/index.html", skip(state))] +pub async fn get_ui_index(State(state): State>) -> Result, Error> { + state + .metrics + .http_counter + .add(1, &[KeyValue::new("resource", "get_ui_index")]); + + let cluster_stats = state + .cluster_group_manager + .get_all_cluster_stats() + .await + .context(GetAllClusterStatesSnafu)?; + + // Sort the clusters alphabetically + let cluster_stats: BTreeMap<_, _> = cluster_stats + .into_iter() + .map(|(cluster, stats)| (&cluster.name, stats)) + .collect(); + + let index = IndexTemplate { + cluster_stats: &cluster_stats, + }; + index.render().context(RenderTemplateSnafu).map(Html) +} diff --git a/trino-lb/src/http_server/ui/mod.rs b/trino-lb/src/http_server/ui/mod.rs index 67350db..4ae84d9 100644 --- a/trino-lb/src/http_server/ui/mod.rs +++ b/trino-lb/src/http_server/ui/mod.rs @@ -1 +1,2 @@ +pub mod index; pub mod query; diff --git a/trino-lb/templates/index.html b/trino-lb/templates/index.html new file mode 100644 index 0000000..327123f --- /dev/null +++ b/trino-lb/templates/index.html @@ -0,0 +1,16 @@ +

Cluster stats

+ + + + + + + + {% for entry in cluster_stats %} + + + + + + {% endfor %} +
ClusterStateQuery counter
{{entry.0}}{{entry.1.state}}{{entry.1.query_counter}}