diff --git a/Cargo.lock b/Cargo.lock index 2afe4eee..50223958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -882,7 +882,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" dependencies = [ - "unicode-width 0.1.14", + "unicode-width 0.2.1", ] [[package]] @@ -1680,7 +1680,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2159,7 +2159,7 @@ dependencies = [ "log", "presser", "thiserror 1.0.69", - "windows 0.57.0", + "windows 0.58.0", ] [[package]] @@ -3548,34 +3548,88 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.28.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", "js-sys", "pin-project-lite", "thiserror 2.0.18", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +dependencies = [ + "opentelemetry", "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.14.1", + "thiserror 2.0.18", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.14.1", + "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.28.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" [[package]] name = "opentelemetry_sdk" -version = "0.28.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", "futures-util", "opentelemetry", + "percent-encoding", + "rand 0.9.2", "thiserror 2.0.18", ] @@ -4087,7 +4141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -4107,8 +4161,8 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ - "heck 0.4.1", - "itertools 0.11.0", + "heck 0.5.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -4143,7 +4197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn", @@ -4315,7 +4369,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -4671,7 +4725,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5418,7 +5472,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.3", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5902,9 +5956,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -5914,9 +5968,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -5925,14 +5979,28 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" +dependencies = [ + "js-sys", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -5945,9 +6013,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ "matchers", "nu-ansi-term", @@ -6297,6 +6365,11 @@ dependencies = [ "http", "http-body-util", "humantime", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "pbjson-build 0.8.0", "rcgen", "reqwest", @@ -6311,6 +6384,7 @@ dependencies = [ "tokio", "tonic-prost-build", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "uuid", @@ -6342,8 +6416,7 @@ dependencies = [ "oci-client 0.15.0", "oci-wasm 0.3.0", "opentelemetry", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk", + "opentelemetry-http", "pbjson 0.8.0", "pbjson-build 0.8.0", "pbjson-types 0.8.0", @@ -7390,7 +7463,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0978bf7171b3d90bac376700cb56d606feb40f251a475a5d6634613564460b22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.60.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8be7ff6c..93054504 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,11 @@ etcetera = { workspace = true } figment = { workspace = true, features = ["json", "env", "yaml", "toml"] } flate2 = { workspace = true, features = ["rust_backend"] } humantime = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } +opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "logs"] } +opentelemetry-appender-tracing = { workspace = true } reqwest = { workspace = true, features = ["json", "rustls-tls"] } rustls = { workspace = true } semver = { workspace = true } @@ -90,6 +95,7 @@ tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["attributes"] } tracing-subscriber = { workspace = true, features = ["env-filter", "ansi", "time", "json"] } +tracing-opentelemetry = { workspace = true } url = { workspace = true } uuid = { workspace = true } wasm-metadata = { workspace = true } @@ -131,12 +137,13 @@ kube-derive = { version = "1", default-features = false } notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] } oci-client = { version = "0.15.0", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots"]} oci-wasm = { version = "0.3.0", default-features = false, features = ["rustls-tls"] } -opentelemetry = { version = "0.28", default-features = false } -opentelemetry-appender-tracing = { version = "0.28", default-features = false } -opentelemetry-otlp = { version = "0.28", default-features = false } -opentelemetry_sdk = { version = "0.28", default-features = false } -opentelemetry-semantic-conventions = { version = "0.28", default-features = false } -opentelemetry-stdout = { version = "0.28.0", default-features = false } +opentelemetry = { version = "0.31", default-features = false } +opentelemetry-appender-tracing = { version = "0.31", default-features = false } +opentelemetry-otlp = { version = "0.31", default-features = false } +opentelemetry-http = { version = "0.31", default-features = false } +opentelemetry_sdk = { version = "0.31", default-features = false } +opentelemetry-semantic-conventions = { version = "0.31", default-features = false } +opentelemetry-stdout = { version = "0.31.0", default-features = false } pbjson = { version = "0.8.0", default-features = false } pbjson-types = { version = "0.8.0", default-features = false } pbjson-build = { version = "0.8.0", default-features = false } @@ -164,6 +171,7 @@ tonic = { version = "0.14", default-features = false } tonic-prost = { version = "0.14", default-features = false } tonic-prost-build = { version = "0.14", default-features = false } tracing = { version = "0.1.41", default-features = false, features = ["attributes"] } +tracing-opentelemetry = { version = "0.32", default-features = false } tracing-subscriber = { version = "0.3.19", default-features = false, features = ["env-filter", "ansi", "time", "json"] } url = { version = "2.5", default-features = false } uuid = { version = "1.17.0", default-features = false } diff --git a/crates/wash-runtime/Cargo.toml b/crates/wash-runtime/Cargo.toml index d6d7aa9b..b02d7c56 100644 --- a/crates/wash-runtime/Cargo.toml +++ b/crates/wash-runtime/Cargo.toml @@ -62,11 +62,8 @@ pbjson = { workspace = true, default-features = true } pbjson-types = { workspace = true, default-features = true } prost = { workspace = true, default-features = true } tonic-prost = { workspace = true, default-features = true } -opentelemetry = { workspace = true } -opentelemetry-semantic-conventions = { workspace = true, features = [ - "semconv_experimental", -] } -opentelemetry_sdk = { workspace = true } +opentelemetry = { workspace = true, features = ["trace", "metrics", "logs"] } +opentelemetry-http = { workspace = true } wasi-graphics-context-wasmtime = { git = "https://github.com/wasi-gfx/wasi-gfx-runtime.git", rev = "067d92d0eabd264e15486b9cecbb9f7020257b7b", optional = true } wasi-webgpu-wasmtime = { git = "https://github.com/wasi-gfx/wasi-gfx-runtime.git", rev = "067d92d0eabd264e15486b9cecbb9f7020257b7b", optional = true } diff --git a/crates/wash-runtime/src/engine/mod.rs b/crates/wash-runtime/src/engine/mod.rs index 4e7b177f..574af5b8 100644 --- a/crates/wash-runtime/src/engine/mod.rs +++ b/crates/wash-runtime/src/engine/mod.rs @@ -40,6 +40,7 @@ //! ``` use anyhow::{Context, bail}; +use tracing::instrument; use wasmtime::PoolingAllocationConfig; use wasmtime::component::{Component, Linker}; use wasmtime_wasi::sockets::loopback; @@ -202,6 +203,7 @@ impl Engine { )) } + #[instrument(name = "initialize_service", skip_all)] fn initialize_service( &self, workload_id: impl AsRef, @@ -268,6 +270,7 @@ impl Engine { /// Initialize a component that is a part of a workload, add wasi@0.2 interfaces (and /// wasi:http if the `http` feature is enabled) to the linker. + #[instrument(name = "initialize_workload_component", skip_all, fields(component.name = %component.name))] fn initialize_workload_component( &self, workload_id: impl AsRef, @@ -278,8 +281,11 @@ impl Engine { loopback: Arc>, ) -> anyhow::Result { // Create a wasmtime component from the bytes - let wasmtime_component = Component::new(&self.inner, component.bytes) - .context("failed to create component from bytes")?; + let wasmtime_component = { + let _span = tracing::span!(tracing::Level::INFO, "parse_component_bytes").entered(); + Component::new(&self.inner, component.bytes) + .context("failed to create component from bytes")? + }; // Create a linker for this component let mut linker: Linker = Linker::new(&self.inner); diff --git a/crates/wash-runtime/src/engine/workload.rs b/crates/wash-runtime/src/engine/workload.rs index e1ecedb8..fbe7d22a 100644 --- a/crates/wash-runtime/src/engine/workload.rs +++ b/crates/wash-runtime/src/engine/workload.rs @@ -10,7 +10,7 @@ use std::{ use anyhow::{Context as _, bail, ensure}; use tokio::{sync::RwLock, task::JoinHandle, time::timeout}; -use tracing::{debug, info, trace, warn}; +use tracing::{Instrument, debug, info, instrument, trace, warn}; use wasmtime::component::{ Component, Instance, InstancePre, Linker, ResourceAny, ResourceType, Val, types::ComponentItem, }; @@ -459,6 +459,7 @@ pub struct ResolvedWorkload { impl ResolvedWorkload { /// Executes the service, if present, and returns whether it was run. + #[instrument(name="execute_service", skip_all, fields(workload.id = self.id.as_ref(), workload.name = self.name.as_ref(), workload.namespace = self.namespace.as_ref()))] pub(crate) async fn execute_service(&mut self) -> anyhow::Result { let service = self .service @@ -522,6 +523,7 @@ impl ResolvedWorkload { &self.host_interfaces } + #[instrument(name="link_components", skip_all, fields(workload.id = self.id.as_ref(), workload.name = self.name.as_ref(), workload.namespace = self.namespace.as_ref()))] async fn link_components(&mut self) -> anyhow::Result<()> { // A map from component ID to its exported interfaces let mut interface_map: HashMap> = HashMap::new(); @@ -1110,6 +1112,7 @@ impl ResolvedWorkload { /// This should be called when stopping a workload to ensure proper cleanup /// of plugin resources. Errors from individual plugin unbind operations are /// logged but do not prevent the overall unbind from completing. + #[instrument(name="unbind_all_plugins", skip_all, fields(workload.id = self.id.as_ref(), workload.name = self.name.as_ref()))] pub async fn unbind_all_plugins(&self) -> anyhow::Result<()> { trace!( workload_id = self.id.as_ref(), @@ -1239,6 +1242,8 @@ impl UnresolvedWorkload { /// Bind this workload to the host plugins based on the requested /// interfaces. Returns a list of plugins and the component IDs they were bound to. + #[allow(clippy::type_complexity)] + #[instrument(skip_all)] pub async fn bind_plugins( &mut self, plugins: &HashMap<&'static str, Arc>, @@ -1333,9 +1338,16 @@ impl UnresolvedWorkload { "binding plugin to workload" ); + let bind_span = tracing::span!( + tracing::Level::INFO, + "plugin_on_workload_bind", + plugin_id = plugin_id, + ); + // Call on_workload_bind with the workload and all matched interfaces if let Err(e) = p .on_workload_bind(self, plugin_matched_interfaces.clone()) + .instrument(bind_span) .await { tracing::error!( @@ -1390,8 +1402,14 @@ impl UnresolvedWorkload { "binding plugin to workload item" ); + let item_bind_span = tracing::span!( + tracing::Level::INFO, + "plugin_on_workload_item_bind", + plugin_id = plugin_id, + ); if let Err(e) = p .on_workload_item_bind(&mut workload_item, matching_interfaces.clone()) + .instrument(item_bind_span) .await { tracing::error!( @@ -1488,6 +1506,7 @@ impl UnresolvedWorkload { /// - Plugin binding fails /// - Component linking fails /// - Plugin notification fails + #[instrument(name="resolve_workload", skip_all, fields(workload.id = self.id.as_ref(), workload.name = self.name.as_ref(), workload.namespace = self.namespace.as_ref()))] pub async fn resolve( mut self, plugins: Option<&HashMap<&'static str, Arc>>, diff --git a/crates/wash-runtime/src/host/http.rs b/crates/wash-runtime/src/host/http.rs index f2696231..7e7e6116 100644 --- a/crates/wash-runtime/src/host/http.rs +++ b/crates/wash-runtime/src/host/http.rs @@ -30,9 +30,10 @@ use crate::engine::workload::ResolvedWorkload; use crate::wit::WitInterface; use anyhow::{Context, ensure}; use hyper::server::conn::http1; +use opentelemetry::context::FutureExt; use tokio::net::TcpListener; use tokio::task::JoinHandle; -use tracing::{debug, error, info, warn}; +use tracing::{Instrument, debug, error, info, instrument, warn}; use wasmtime::Store; use wasmtime::component::InstancePre; use wasmtime_wasi_http::{ @@ -494,7 +495,11 @@ async fn run_http_server( let handles = handles_clone.clone(); let handler = handler_clone.clone(); async move { - handle_http_request(handler, req, handles).await + let extractor = opentelemetry_http::HeaderExtractor(req.headers()); + let remote_context = + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)); + + handle_http_request(handler, req, handles).with_context(remote_context).await } }); @@ -547,6 +552,11 @@ fn error_response(status: u16) -> hyper::Response { } /// Handle individual HTTP requests by looking up workload and invoking component +#[instrument(skip_all, fields( + http.method = %req.method(), + http.uri = %req.uri(), + http.host = %req.headers().get(hyper::header::HOST).and_then(|h| h.to_str().ok()).unwrap_or("unknown"), +))] async fn handle_http_request( handler: Arc, req: hyper::Request, @@ -577,10 +587,20 @@ async fn handle_http_request( let response = match workload_handle { Some((handle, instance_pre, component_id)) => { - match invoke_component_handler(handle, instance_pre, &component_id, req).await { + let req_span = tracing::span!( + tracing::Level::INFO, + "invoke_component_handler", + workload.name = handle.name(), + workload.namespace = handle.namespace(), + workload.id = handle.id(), + ); + match invoke_component_handler(handle, instance_pre, &component_id, req) + .instrument(req_span) + .await + { Ok(resp) => resp, Err(e) => { - error!(err = ?e, host = %workload_id, "failed to invoke component"); + error!(err = ?e, "failed to invoke component"); // TODO: Add in the actual error message in the response body // .body(HyperOutgoingBody::new(e.to_string())) error_response(500) @@ -630,17 +650,20 @@ pub async fn handle_component_request( // Run the http request itself in a separate task so the task can // optionally continue to execute beyond after the initial // headers/response code are sent. - let task: JoinHandle> = tokio::task::spawn(async move { - // Run the http request itself by instantiating and calling the component - let proxy = pre.instantiate_async(&mut store).await?; + let task: JoinHandle> = tokio::task::spawn( + async move { + // Run the http request itself by instantiating and calling the component + let proxy = pre.instantiate_async(&mut store).await?; - proxy - .wasi_http_incoming_handler() - .call_handle(&mut store, req, out) - .await?; + proxy + .wasi_http_incoming_handler() + .call_handle(&mut store, req, out) + .await?; - Ok(()) - }); + Ok(()) + } + .in_current_span(), + ); match receiver.await { // If the client calls `response-outparam::set` then one of these @@ -674,13 +697,17 @@ async fn load_tls_config( ca_path: Option<&Path>, ) -> anyhow::Result { // Load certificate chain - let cert_data = tokio::fs::read(cert_path) - .await - .with_context(|| format!("Failed to read certificate file: {}", cert_path.display()))?; + let cert_data = tokio::fs::read(cert_path).await.context(format!( + "Failed to read certificate file: {}", + cert_path.display() + ))?; let mut cert_reader = std::io::Cursor::new(cert_data); let cert_chain: Vec> = certs(&mut cert_reader) .collect::, _>>() - .with_context(|| format!("Failed to parse certificate file: {}", cert_path.display()))?; + .context(format!( + "Failed to parse certificate file: {}", + cert_path.display() + ))?; ensure!( !cert_chain.is_empty(), @@ -689,29 +716,33 @@ async fn load_tls_config( ); // Load private key - let key_data = tokio::fs::read(key_path) - .await - .with_context(|| format!("Failed to read private key file: {}", key_path.display()))?; + let key_data = tokio::fs::read(key_path).await.context(format!( + "Failed to read private key file: {}", + key_path.display() + ))?; let mut key_reader = std::io::Cursor::new(key_data); let key = private_key(&mut key_reader) - .with_context(|| format!("Failed to parse private key file: {}", key_path.display()))? + .context(format!( + "Failed to parse private key file: {}", + key_path.display() + ))? .ok_or_else(|| anyhow::anyhow!("No private key found in file: {}", key_path.display()))?; // Create rustls server config let config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(cert_chain, key) - .with_context(|| "Failed to create TLS configuration")?; + .context("Failed to create TLS configuration")?; // If CA is provided, configure client certificate verification if let Some(ca_path) = ca_path { let ca_data = tokio::fs::read(ca_path) .await - .with_context(|| format!("Failed to read CA file: {}", ca_path.display()))?; + .context(format!("Failed to read CA file: {}", ca_path.display()))?; let mut ca_reader = std::io::Cursor::new(ca_data); let ca_certs: Vec> = certs(&mut ca_reader) .collect::, _>>() - .with_context(|| format!("Failed to parse CA file: {}", ca_path.display()))?; + .context(format!("Failed to parse CA file: {}", ca_path.display()))?; ensure!( !ca_certs.is_empty(), diff --git a/crates/wash-runtime/src/host/mod.rs b/crates/wash-runtime/src/host/mod.rs index 864d2be7..751e0618 100644 --- a/crates/wash-runtime/src/host/mod.rs +++ b/crates/wash-runtime/src/host/mod.rs @@ -48,7 +48,7 @@ use std::time::Duration; use anyhow::{Context, bail}; use names::{Generator, Name}; use tokio::sync::RwLock; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use wasmtime::component::Component; use crate::engine::workload::ResolvedWorkload; @@ -531,7 +531,7 @@ impl Host { .await?; // If the service didn't run and we had one, warn - if resolved_workload.execute_service().await? != service_present { + if service_present && !resolved_workload.execute_service().await? { warn!( workload_id = request.workload_id, "service did not properly execute" @@ -605,6 +605,7 @@ impl HostApi for Host { } /// Start a workload + #[instrument(skip_all, fields(workload.id = request.workload_id, workload.name = request.workload.name, workload.namespace = request.workload.namespace))] async fn workload_start( &self, request: WorkloadStartRequest, @@ -648,6 +649,7 @@ impl HostApi for Host { }) } + #[instrument(skip_all, fields(workload.id = request.workload_id))] async fn workload_status( &self, request: WorkloadStatusRequest, @@ -673,6 +675,7 @@ impl HostApi for Host { } } + #[instrument(skip_all, fields(workload.id = request.workload_id))] async fn workload_stop( &self, request: WorkloadStopRequest, diff --git a/crates/wash-runtime/src/plugin/wasi_blobstore/filesystem.rs b/crates/wash-runtime/src/plugin/wasi_blobstore/filesystem.rs index 7157ecc4..87eebbf7 100644 --- a/crates/wash-runtime/src/plugin/wasi_blobstore/filesystem.rs +++ b/crates/wash-runtime/src/plugin/wasi_blobstore/filesystem.rs @@ -11,7 +11,7 @@ use crate::plugin::{HostPlugin, lock_root}; use crate::wit::{WitInterface, WitWorld}; use anyhow::Context; use tokio::sync::RwLock; -use tracing::debug; +use tracing::{debug, instrument}; use wasmtime::component::Resource; use wasmtime_wasi::p2::pipe::{AsyncReadStream, AsyncWriteStream}; use wasmtime_wasi::p2::{InputStream, OutputStream}; @@ -21,7 +21,7 @@ const PLUGIN_BLOBSTORE_ID: &str = "wasi-blobstore"; mod bindings { wasmtime::component::bindgen!({ world: "blobstore", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:io": ::wasmtime_wasi_io::bindings::wasi::io, "wasi:blobstore/container.container": crate::plugin::wasi_blobstore::filesystem::ContainerData, @@ -87,6 +87,7 @@ impl FilesystemBlobstore { // Implementation for the main blobstore interface impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { + #[instrument(skip(self))] async fn create_container( &mut self, name: ContainerName, @@ -111,6 +112,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self))] async fn get_container( &mut self, name: ContainerName, @@ -136,6 +138,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self))] async fn delete_container( &mut self, name: ContainerName, @@ -158,6 +161,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self))] async fn container_exists( &mut self, name: ContainerName, @@ -173,6 +177,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(path.exists())) } + #[instrument(skip(self))] async fn copy_object( &mut self, src: ObjectId, @@ -213,6 +218,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { } } + #[instrument(skip(self))] async fn move_object( &mut self, src: ObjectId, @@ -275,6 +281,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { })) } + #[instrument(skip(self, container))] async fn get_data( &mut self, container: Resource, @@ -297,6 +304,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self, container, data))] async fn write_data( &mut self, container: Resource, @@ -318,6 +326,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self, container))] async fn list_objects( &mut self, container: Resource, @@ -347,6 +356,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self, container))] async fn delete_object( &mut self, container: Resource, @@ -363,6 +373,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { } } + #[instrument(skip(self, container))] async fn delete_objects( &mut self, container: Resource, @@ -382,6 +393,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self, container))] async fn has_object( &mut self, container: Resource, @@ -417,6 +429,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { })) } + #[instrument(skip(self, container))] async fn clear( &mut self, container: Resource, @@ -529,6 +542,7 @@ impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip_all)] async fn finish( &mut self, outgoing_value: Resource, @@ -582,6 +596,7 @@ impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { } impl<'a> bindings::wasi::blobstore::types::HostIncomingValue for ActiveCtx<'a> { + #[instrument(skip_all)] async fn incoming_value_consume_sync( &mut self, incoming_value: Resource, @@ -625,6 +640,7 @@ impl<'a> bindings::wasi::blobstore::types::HostIncomingValue for ActiveCtx<'a> { Ok(Ok(buf)) } + #[instrument(skip_all)] async fn incoming_value_consume_async( &mut self, incoming_value: Resource, diff --git a/crates/wash-runtime/src/plugin/wasi_blobstore/in_memory.rs b/crates/wash-runtime/src/plugin/wasi_blobstore/in_memory.rs index 1452cba8..4d4e33f1 100644 --- a/crates/wash-runtime/src/plugin/wasi_blobstore/in_memory.rs +++ b/crates/wash-runtime/src/plugin/wasi_blobstore/in_memory.rs @@ -29,7 +29,7 @@ use crate::{ mod bindings { wasmtime::component::bindgen!({ world: "blobstore", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:io": ::wasmtime_wasi_io::bindings::wasi::io, "wasi:blobstore/container.container": String, diff --git a/crates/wash-runtime/src/plugin/wasi_blobstore/nats.rs b/crates/wash-runtime/src/plugin/wasi_blobstore/nats.rs index 2f19ffa7..6879b57a 100644 --- a/crates/wash-runtime/src/plugin/wasi_blobstore/nats.rs +++ b/crates/wash-runtime/src/plugin/wasi_blobstore/nats.rs @@ -11,6 +11,7 @@ use async_nats::jetstream::object_store::{self, List, Object, ObjectStore}; use futures::StreamExt; use tokio::io::AsyncReadExt; use tokio::sync::RwLock; +use tracing::instrument; use wasmtime::component::Resource; use wasmtime_wasi::p2::pipe::{AsyncReadStream, AsyncWriteStream}; use wasmtime_wasi::p2::{InputStream, OutputStream}; @@ -20,7 +21,7 @@ const PLUGIN_BLOBSTORE_ID: &str = "wasi-blobstore"; mod bindings { wasmtime::component::bindgen!({ world: "blobstore", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:io": ::wasmtime_wasi_io::bindings::wasi::io, "wasi:blobstore/container.container": crate::plugin::wasi_blobstore::nats::ContainerData, @@ -111,6 +112,7 @@ impl NatsBlobstore { // Implementation for the main blobstore interface impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { + #[instrument(skip(self))] async fn create_container( &mut self, name: ContainerName, @@ -149,6 +151,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self))] async fn get_container( &mut self, name: ContainerName, @@ -183,6 +186,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self))] async fn delete_container( &mut self, name: ContainerName, @@ -205,6 +209,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self))] async fn container_exists( &mut self, name: ContainerName, @@ -226,6 +231,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { } } + #[instrument(skip(self))] async fn copy_object( &mut self, src: ObjectId, @@ -280,6 +286,7 @@ impl<'a> bindings::wasi::blobstore::blobstore::Host for ActiveCtx<'a> { } } + #[instrument(skip(self))] async fn move_object( &mut self, src: ObjectId, @@ -339,6 +346,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { })) } + #[instrument(skip(self, container))] async fn get_data( &mut self, container: Resource, @@ -365,6 +373,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self, container, data))] async fn write_data( &mut self, container: Resource, @@ -395,6 +404,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self, container))] async fn list_objects( &mut self, container: Resource, @@ -418,6 +428,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self, container))] async fn delete_object( &mut self, container: Resource, @@ -444,6 +455,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { } } + #[instrument(skip(self, container, names))] async fn delete_objects( &mut self, container: Resource, @@ -473,6 +485,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self, container))] async fn has_object( &mut self, container: Resource, @@ -485,6 +498,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { } } + #[instrument(skip(self, container))] async fn object_info( &mut self, container: Resource, @@ -502,6 +516,7 @@ impl<'a> bindings::wasi::blobstore::container::HostContainer for ActiveCtx<'a> { } } + #[instrument(skip(self, container))] async fn clear( &mut self, container: Resource, @@ -628,6 +643,7 @@ impl<'a> bindings::wasi::blobstore::container::HostStreamObjectNames for ActiveC } impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { + #[instrument(skip(self))] async fn new_outgoing_value(&mut self) -> anyhow::Result> { let temp_file = tempfile::Builder::new() .tempfile() @@ -643,6 +659,7 @@ impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { Ok(resource) } + #[instrument(skip(self))] async fn outgoing_value_write_body( &mut self, outgoing_value: Resource, @@ -660,6 +677,7 @@ impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { Ok(Ok(resource)) } + #[instrument(skip(self))] async fn finish( &mut self, outgoing_value: Resource, @@ -706,6 +724,7 @@ impl<'a> bindings::wasi::blobstore::types::HostOutgoingValue for ActiveCtx<'a> { } impl<'a> bindings::wasi::blobstore::types::HostIncomingValue for ActiveCtx<'a> { + #[instrument(skip(self))] async fn incoming_value_consume_sync( &mut self, incoming_value: Resource, @@ -719,6 +738,7 @@ impl<'a> bindings::wasi::blobstore::types::HostIncomingValue for ActiveCtx<'a> { } } + #[instrument(skip(self))] async fn incoming_value_consume_async( &mut self, incoming_value: Resource, diff --git a/crates/wash-runtime/src/plugin/wasi_config/mod.rs b/crates/wash-runtime/src/plugin/wasi_config/mod.rs index b61f39b3..be0d8de8 100644 --- a/crates/wash-runtime/src/plugin/wasi_config/mod.rs +++ b/crates/wash-runtime/src/plugin/wasi_config/mod.rs @@ -22,6 +22,7 @@ use std::{ sync::Arc, }; use tokio::sync::RwLock; +use tracing::instrument; use crate::{ engine::{ @@ -35,7 +36,7 @@ use crate::{ mod bindings { wasmtime::component::bindgen!({ world: "config", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, }); } @@ -65,6 +66,7 @@ impl DynamicConfig { } impl<'a> bindings::wasi::config::store::Host for ActiveCtx<'a> { + #[instrument(skip(self))] async fn get( &mut self, key: String, @@ -79,6 +81,7 @@ impl<'a> bindings::wasi::config::store::Host for ActiveCtx<'a> { .map_or(Ok(Ok(None)), |v| Ok(Ok(Some(v)))) } + #[instrument(skip(self))] async fn get_all( &mut self, ) -> anyhow::Result, bindings::wasi::config::store::Error>> { diff --git a/crates/wash-runtime/src/plugin/wasi_keyvalue/filesystem.rs b/crates/wash-runtime/src/plugin/wasi_keyvalue/filesystem.rs index bc5b8d73..687f0168 100644 --- a/crates/wash-runtime/src/plugin/wasi_keyvalue/filesystem.rs +++ b/crates/wash-runtime/src/plugin/wasi_keyvalue/filesystem.rs @@ -14,6 +14,7 @@ use crate::engine::workload::WorkloadItem; use crate::plugin::{HostPlugin, lock_root}; use crate::wit::{WitInterface, WitWorld}; use futures::StreamExt; +use tracing::instrument; use wasmtime::component::Resource; const LIST_KEYS_BATCH_SIZE: usize = 1000; @@ -21,7 +22,7 @@ const LIST_KEYS_BATCH_SIZE: usize = 1000; mod bindings { wasmtime::component::bindgen!({ world: "keyvalue", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:keyvalue/store.bucket": crate::plugin::wasi_keyvalue::filesystem::BucketHandle, }, @@ -77,6 +78,7 @@ impl FilesystemKeyValue { // Implementation for the store interface impl<'a> bindings::wasi::keyvalue::store::Host for ActiveCtx<'a> { + #[instrument(skip(self))] async fn open( &mut self, identifier: String, @@ -109,6 +111,7 @@ impl<'a> bindings::wasi::keyvalue::store::Host for ActiveCtx<'a> { // Resource host trait implementations for bucket impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { + #[instrument(skip(self, bucket))] async fn get( &mut self, bucket: Resource, @@ -137,6 +140,7 @@ impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { Ok(Ok(entry)) } + #[instrument(skip(self, bucket, value))] async fn set( &mut self, bucket: Resource, @@ -165,6 +169,7 @@ impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { } } + #[instrument(skip(self, bucket))] async fn delete( &mut self, bucket: Resource, @@ -191,6 +196,7 @@ impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { } } + #[instrument(skip(self, bucket))] async fn exists( &mut self, bucket: Resource, @@ -217,6 +223,7 @@ impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { Ok(Ok(path.exists())) } + #[instrument(skip(self, bucket))] async fn list_keys( &mut self, bucket: Resource, @@ -280,6 +287,7 @@ impl<'a> bindings::wasi::keyvalue::store::HostBucket for ActiveCtx<'a> { // Implementation for the atomics interface impl<'a> bindings::wasi::keyvalue::atomics::Host for ActiveCtx<'a> { + #[instrument(skip(self, bucket))] async fn increment( &mut self, bucket: Resource, @@ -321,6 +329,8 @@ impl<'a> bindings::wasi::keyvalue::atomics::Host for ActiveCtx<'a> { // Implementation for the batch interface impl<'a> bindings::wasi::keyvalue::batch::Host for ActiveCtx<'a> { + #[instrument(skip(self, bucket, keys))] + #[allow(clippy::type_complexity)] async fn get_many( &mut self, bucket: Resource, @@ -363,6 +373,7 @@ impl<'a> bindings::wasi::keyvalue::batch::Host for ActiveCtx<'a> { Ok(Ok(result)) } + #[instrument(skip(self, bucket, key_values))] async fn set_many( &mut self, bucket: Resource, @@ -404,6 +415,7 @@ impl<'a> bindings::wasi::keyvalue::batch::Host for ActiveCtx<'a> { Ok(Ok(())) } + #[instrument(skip(self, bucket, keys))] async fn delete_many( &mut self, bucket: Resource, diff --git a/crates/wash-runtime/src/plugin/wasi_keyvalue/in_memory.rs b/crates/wash-runtime/src/plugin/wasi_keyvalue/in_memory.rs index b2e33388..9b6e39a1 100644 --- a/crates/wash-runtime/src/plugin/wasi_keyvalue/in_memory.rs +++ b/crates/wash-runtime/src/plugin/wasi_keyvalue/in_memory.rs @@ -24,7 +24,7 @@ use crate::{ mod bindings { wasmtime::component::bindgen!({ world: "keyvalue", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:keyvalue/store.bucket": crate::plugin::wasi_keyvalue::in_memory::BucketHandle, }, diff --git a/crates/wash-runtime/src/plugin/wasi_keyvalue/nats.rs b/crates/wash-runtime/src/plugin/wasi_keyvalue/nats.rs index fe6ffd96..8fdb8dfb 100644 --- a/crates/wash-runtime/src/plugin/wasi_keyvalue/nats.rs +++ b/crates/wash-runtime/src/plugin/wasi_keyvalue/nats.rs @@ -22,7 +22,7 @@ const LIST_KEYS_BATCH_SIZE: usize = 1000; mod bindings { wasmtime::component::bindgen!({ world: "keyvalue", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, with: { "wasi:keyvalue/store.bucket": crate::plugin::wasi_keyvalue::nats::BucketHandle, }, diff --git a/crates/wash-runtime/src/plugin/wasi_logging/mod.rs b/crates/wash-runtime/src/plugin/wasi_logging/mod.rs index 5184ede6..315ffbbc 100644 --- a/crates/wash-runtime/src/plugin/wasi_logging/mod.rs +++ b/crates/wash-runtime/src/plugin/wasi_logging/mod.rs @@ -19,7 +19,7 @@ const PLUGIN_LOGGING_ID: &str = "wasi-logging"; mod bindings { crate::wasmtime::component::bindgen!({ world: "logging", - imports: { default: async | trappable }, + imports: { default: async | trappable | tracing }, }); } diff --git a/crates/wash-runtime/src/plugin/wasmcloud_messaging/nats.rs b/crates/wash-runtime/src/plugin/wasmcloud_messaging/nats.rs index 33488582..74961d7a 100644 --- a/crates/wash-runtime/src/plugin/wasmcloud_messaging/nats.rs +++ b/crates/wash-runtime/src/plugin/wasmcloud_messaging/nats.rs @@ -9,15 +9,15 @@ use anyhow::{Context, bail}; use async_nats::Subscriber; use futures::stream::StreamExt; use tokio::sync::RwLock; -use tracing::{debug, instrument, warn}; +use tracing::{Instrument, debug, instrument, warn}; const PLUGIN_MESSAGING_ID: &str = "wasmcloud-messaging"; mod bindings { crate::wasmtime::component::bindgen!({ world: "messaging", - imports: { default: async | trappable }, - exports: { default: async }, + imports: { default: async | trappable | tracing }, + exports: { default: async | tracing }, }); } @@ -47,7 +47,7 @@ impl NatsMessaging { } impl<'a> Host for ActiveCtx<'a> { - #[instrument(level = "debug", skip_all, fields(subject = %subject, timeout_ms))] + #[instrument(skip_all, fields(subject = %subject, timeout_ms))] async fn request( &mut self, subject: String, @@ -80,7 +80,7 @@ impl<'a> Host for ActiveCtx<'a> { })) } - #[instrument(level = "debug", skip_all, fields(subject = %msg.subject, reply_to = %msg.reply_to.as_deref().unwrap_or("")))] + #[instrument(skip_all, fields(subject = %msg.subject, reply_to = %msg.reply_to.as_deref().unwrap_or("")))] async fn publish(&mut self, msg: types::BrokerMessage) -> anyhow::Result> { let Some(plugin) = self.get_plugin::(PLUGIN_MESSAGING_ID) else { return Ok(Err("plugin not available".to_string())); @@ -222,6 +222,7 @@ impl HostPlugin for NatsMessaging { msg } }; + let mut store = match workload.new_store(&component_id).await { Err(e) => { warn!("failed to create store for component {component_id}: {e}"); @@ -242,9 +243,17 @@ impl HostPlugin for NatsMessaging { reply_to, body: msg.payload.into(), }; + + let span = tracing::span!( + tracing::Level::INFO, + "incoming_wasmcloud_message", + subject = %msg.subject, + reply_to = %msg.reply_to.as_deref().unwrap_or(""), + ); + match proxy .wasmcloud_messaging_handler() - .call_handle_message(store, &msg).await { + .call_handle_message(store, &msg).instrument(span).await { Ok(_) => { debug!("Message handled successfully"); } diff --git a/crates/wash-runtime/src/washlet/mod.rs b/crates/wash-runtime/src/washlet/mod.rs index a324f1d0..dff42d8d 100644 --- a/crates/wash-runtime/src/washlet/mod.rs +++ b/crates/wash-runtime/src/washlet/mod.rs @@ -7,12 +7,8 @@ use crate::oci::{self, OciConfig}; use crate::plugin::HostPlugin; use anyhow::Context as _; use futures::StreamExt as _; -use opentelemetry::KeyValue; -use opentelemetry_sdk::resource::{Resource, ResourceBuilder}; -use opentelemetry_semantic_conventions::resource; -use sysinfo::System; use tokio::sync::oneshot; -use tracing::{debug, info}; +use tracing::{debug, info, instrument}; pub const HOST_API_PREFIX: &str = "runtime.host"; pub const OPERATOR_API_PREFIX: &str = "runtime.operator"; @@ -207,6 +203,7 @@ pub struct NatsConnectionOptions { pub tls_key: Option, } +#[instrument(skip_all)] pub async fn connect_nats( addr: impl async_nats::ToServerAddrs, options: NatsConnectionOptions, @@ -256,6 +253,7 @@ fn from_api<'de, T: serde::Deserialize<'de>>(bytes: &'de [u8]) -> Result anyhow::Result { let hb = host.heartbeat().await?; Ok(hb.into()) } +#[instrument(skip_all, fields( + workload_id = %req.workload_id, + workload.name=?req.workload.as_ref().map(|w| &w.name).unwrap_or(&"".to_string()), + workload.namespace=?req.workload.as_ref().map(|w| &w.namespace).unwrap_or(&"".to_string())), + )] async fn workload_start( host: &impl HostApi, req: types::v2::WorkloadStartRequest, @@ -427,6 +431,7 @@ async fn workload_start( Ok(host.workload_start(request).await?.into()) } +#[instrument(skip_all, fields(workload_id = %req.workload_id))] async fn workload_stop( host: &impl HostApi, req: types::v2::WorkloadStopRequest, @@ -438,6 +443,7 @@ async fn workload_stop( host.workload_stop(req.into()).await.map(|resp| resp.into()) } +#[instrument(skip_all, fields(workload_id = %req.workload_id))] async fn workload_status( host: &impl HostApi, req: types::v2::WorkloadStatusRequest, @@ -451,52 +457,6 @@ async fn workload_status( .map(|resp| resp.into()) } -/// Creates a tracing span for a host invocation with relevant attributes. -/// Use when calling components from plugins (component exported interface) to -/// ensure consistent tracing. -pub fn host_invocation_span( - workload_namespace: impl AsRef, - workload_name: impl AsRef, - component_id: impl AsRef, - plugin_id: impl AsRef, - plugin_operation: impl AsRef, -) -> tracing::Span { - tracing::span!( - tracing::Level::INFO, - "HostInvocation", - component_id = component_id.as_ref(), - workload_namespace = workload_namespace.as_ref(), - workload_name = workload_name.as_ref(), - plugin_id = plugin_id.as_ref(), - plugin_operation = plugin_operation.as_ref(), - ) -} - -pub fn resource_builder() -> ResourceBuilder { - Resource::builder() - .with_attribute(KeyValue::new( - resource::SERVICE_NAME.to_string(), - "wash-host", - )) - .with_attribute(KeyValue::new( - resource::SERVICE_INSTANCE_ID.to_string(), - uuid::Uuid::new_v4().to_string(), - )) - .with_attribute(KeyValue::new( - resource::SERVICE_VERSION.to_string(), - env!("CARGO_PKG_VERSION"), - )) - .with_attributes(vec![ - KeyValue::new("host.version", env!("CARGO_PKG_VERSION")), - KeyValue::new("host.hostname", System::host_name().unwrap_or_default()), - KeyValue::new( - "host.kernel_version", - System::kernel_version().unwrap_or_default(), - ), - KeyValue::new("host.os_version", System::os_version().unwrap_or_default()), - ]) -} - impl From for crate::wit::WitInterface { fn from(wi: types::v2::WitInterface) -> Self { crate::wit::WitInterface { diff --git a/crates/wash/src/cli/component_build.rs b/crates/wash/src/cli/component_build.rs index a061385a..b4c129b4 100644 --- a/crates/wash/src/cli/component_build.rs +++ b/crates/wash/src/cli/component_build.rs @@ -108,7 +108,7 @@ pub async fn build_dev_component( /// This is the main public interface for building components that can be reused /// throughout the project. It handles project detection, tool validation, and /// the actual build process. -#[instrument(level = "debug", skip(ctx, config), name = "perform_component_build")] +#[instrument(skip(ctx, config), name = "perform_component_build")] pub async fn perform_component_build( ctx: &CliContext, config: &Config, diff --git a/crates/wash/src/cli/dev.rs b/crates/wash/src/cli/dev.rs index 29ebaf13..a43e9739 100644 --- a/crates/wash/src/cli/dev.rs +++ b/crates/wash/src/cli/dev.rs @@ -4,7 +4,7 @@ use anyhow::{Context as _, bail, ensure}; use bytes::Bytes; use clap::Args; use tokio::{select, sync::mpsc}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use wash_runtime::{ host::{Host, HostApi}, plugin::{self}, @@ -359,6 +359,7 @@ async fn create_workload(host: &Host, config: &Config, bytes: Bytes) -> anyhow:: } /// Reload the component in the host, stopping the previous workload if needed +#[instrument(name = "reload_component", skip_all, fields(workload_id = ?workload_id))] async fn reload_component( host: Arc, workload: &Workload, diff --git a/crates/wash/src/cli/mod.rs b/crates/wash/src/cli/mod.rs index cfd20ddd..f8607835 100644 --- a/crates/wash/src/cli/mod.rs +++ b/crates/wash/src/cli/mod.rs @@ -609,6 +609,7 @@ impl CliContext { /// Call hooks for the specified hook type with the provided runtime context. /// This will execute ALL plugins that support the given hook type. + #[instrument(skip_all, fields(hook_type = ?hook_type))] pub async fn call_hooks( &self, hook_type: HookType, diff --git a/src/main.rs b/src/main.rs index ca3d7596..4e95777e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,10 +3,17 @@ use std::{ path::PathBuf, }; +use anyhow::Context; use clap::{CommandFactory, FromArgMatches, Parser, Subcommand}; use clap_complete::generate; +use opentelemetry::{KeyValue, trace::TracerProvider}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_sdk::Resource; +use opentelemetry_semantic_conventions::resource; use tracing::{Level, error, info, instrument, trace, warn}; -use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{ + EnvFilter, Layer, Registry, filter::Directive, layer::SubscriberExt, util::SubscriberInitExt, +}; use wash::cli::{ CONFIG_DIR_NAME, CONFIG_FILE_NAME, CliCommand, CliCommandExt, CliContext, CommandOutput, @@ -44,8 +51,8 @@ struct Cli { #[clap( short = 'l', long = "log-level", - default_value = "info", - help = "Set the log level (trace, debug, info, warn, error)", + default_value_t = Level::INFO, + help = "Set the opentelemetry log level (trace, debug, info, warn, error)", global = true )] log_level: Level, @@ -199,8 +206,21 @@ async fn main() { // Auto-detect non-interactive mode if stdin is not a TTY or flag is set let non_interactive = non_interactive_flag || !std::io::stdin().is_terminal(); - // Initialize tracing as early as possible, with the specified log level - let (mut stdout, mut stderr) = initialize_tracing(global_args.log_level, global_args.verbose); + let (mut stdout, mut stderr) = (Box::new(std::io::stdout()), Box::new(std::io::stderr())); + + // Initialize observability as early as possible, with the specified log level + let observability_shutdown = + initialize_observability(global_args.log_level, !non_interactive, global_args.verbose) + .unwrap_or_else(|e| { + exit_with_output( + &mut stderr, + CommandOutput::error( + format!("failed to initialize observability: {e:?}"), + None, + ) + .with_output_kind(global_args.output), + ); + }); // Check if project path exists if !global_args.project_path.exists() { @@ -343,6 +363,8 @@ async fn main() { )) }; + observability_shutdown(); + exit_with_output( &mut stdout_buf, command_output @@ -381,56 +403,116 @@ where /// Initialize tracing with a custom format /// /// Returns a tuple of stdout and stderr writers for consistency with the previous API. -fn initialize_tracing( +fn initialize_observability( log_level: Level, + ansi_colors: bool, verbose: bool, -) -> (Box, Box) { - // Display logs in a compact, CLI-friendly format - if verbose { - // Enable dynamic filtering from `RUST_LOG`, fallback to "info" - let env_filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new(log_level.as_str())); - - let fmt_layer = tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr) - .with_target(true) - .with_thread_ids(true) - .with_thread_names(true) - .with_level(true) - .with_file(true) - .with_line_number(true) - .with_ansi(true); // Color output for TTY - - // Register all layers with the subscriber - Registry::default().with(env_filter).with(fmt_layer).init(); - - (Box::new(std::io::stdout()), Box::new(std::io::stderr())) - } else { - // Enable dynamic filtering from `RUST_LOG`, fallback to "info", but always set wasm_pkg_client=error - #[allow(clippy::expect_used)] // Static directive strings are always valid - let env_filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new(log_level.as_str())) - // async_nats prints out on connect - .add_directive("async_nats=error".parse().expect("valid directive")) +) -> anyhow::Result> { + // STDERR logging layer + let mut fmt_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(log_level.as_str())); + if !verbose { + // async_nats prints out on connect + fmt_filter = fmt_filter + .add_directive(directive("async_nats=error")?) // wasm_pkg_client/core are a little verbose so we set them to error level in non-verbose mode - .add_directive("wasm_pkg_client=error".parse().expect("valid directive")) - .add_directive("wasm_pkg_core=error".parse().expect("valid directive")); - - let fmt_layer = tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr) - .with_target(false) - .with_thread_ids(false) - .with_thread_names(false) - .with_level(true) - .with_file(false) - .with_line_number(false) - .with_ansi(true); - - // Register all layers with the subscriber - Registry::default().with(env_filter).with(fmt_layer).init(); - - (Box::new(std::io::stdout()), Box::new(std::io::stderr())) + .add_directive(directive("wasm_pkg_client=error")?) + .add_directive(directive("wasm_pkg_core=error")?); + } + + let fmt_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_level(true) + .with_target(verbose) + .with_thread_ids(verbose) + .with_thread_names(verbose) + .with_file(verbose) + .with_line_number(verbose) + .with_ansi(ansi_colors) + .with_filter(fmt_filter); + + let otel_enabled = std::env::vars().any(|(key, _)| key.starts_with("OTEL_")); + if !otel_enabled { + Registry::default().with(fmt_layer).init(); + + // No-op shutdown function + let shutdown_fn = || {}; + return Ok(Box::new(shutdown_fn)); } + + let resource = Resource::builder() + .with_attribute(KeyValue::new( + resource::SERVICE_NAME.to_string(), + env!("CARGO_PKG_NAME"), + )) + .with_attribute(KeyValue::new( + resource::SERVICE_INSTANCE_ID.to_string(), + uuid::Uuid::new_v4().to_string(), + )) + .with_attribute(KeyValue::new( + resource::SERVICE_VERSION.to_string(), + env!("CARGO_PKG_VERSION"), + )) + .build(); + + // OTel logging layer + let log_exporter = opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .build()?; + let log_provider = opentelemetry_sdk::logs::LoggerProviderBuilder::default() + .with_batch_exporter(log_exporter) + .with_resource(resource.clone()) + .build(); + let filter_otel_logs = EnvFilter::new(log_level.as_str()); + + let otel_logs_layer = + OpenTelemetryTracingBridge::new(&log_provider).with_filter(filter_otel_logs); + + // OTel tracing layer + let tracer_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build()?; + let tracer_provider = opentelemetry_sdk::trace::TracerProviderBuilder::default() + .with_batch_exporter(tracer_exporter) + .with_resource(resource.clone()) + .build(); + + let filter_otel_traces = EnvFilter::new(log_level.as_str()); + + let otel_tracer_layer = tracing_opentelemetry::layer() + .with_tracer(tracer_provider.tracer("runtime")) + .with_error_records_to_exceptions(true) + .with_error_fields_to_exceptions(true) + .with_error_events_to_status(true) + .with_error_events_to_exceptions(true) + .with_location(true) + .with_filter(filter_otel_traces); + + Registry::default() + .with(fmt_layer) + .with(otel_logs_layer) + .with(otel_tracer_layer) + .init(); + + // Return a shutdown function to flush providers on exit + let shutdown_fn = move || { + if let Err(e) = tracer_provider.shutdown() { + eprintln!("failed to shutdown tracer provider: {e}"); + } + if let Err(e) = log_provider.shutdown() { + eprintln!("failed to shutdown log provider: {e}"); + } + }; + + Ok(Box::new(shutdown_fn)) +} + +/// Helper function to reduce duplication and code size for parsing directives +fn directive(directive: impl AsRef) -> anyhow::Result { + directive + .as_ref() + .parse() + .with_context(|| format!("failed to parse filter: {}", directive.as_ref())) } /// Helper function to ensure that we're exiting the program consistently and with the correct output format.