From bc4f17ff0374fa4f0545532ebf2f2471a76e0db7 Mon Sep 17 00:00:00 2001 From: Matthew Moon Date: Fri, 27 Feb 2026 20:49:43 +0000 Subject: [PATCH 1/2] fix(logs): wire --storage flag through to Datadog API The --storage flag was accepted by the CLI but silently discarded in all four log command dispatch arms (search, list, query, aggregate). The value was matched as storage: _ in main.rs and never forwarded to the command functions, so requests always used the default index tier regardless of what the user specified. - Added LogsStorageTier import and parse_storage_tier() helper in src/commands/logs.rs with error on unrecognised values - Wired storage: Option param through search(), list(), query(), and aggregate() functions (native + WASM variants) - Applied parsed tier to LogsQueryFilter via .storage_tier() builder - Fixed dispatch in src/main.rs: changed storage: _ to storage in all four LogActions match arms - Fixed three existing tests and added four new unit tests covering flex storage, online-archives storage, and invalid tier errors Co-Authored-By: Claude Sonnet 4.6 --- src/commands/logs.rs | 95 +++++++++++++++++++++++++++++++------------- src/main.rs | 16 ++++---- src/test_commands.rs | 88 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 160 insertions(+), 39 deletions(-) diff --git a/src/commands/logs.rs b/src/commands/logs.rs index 6be859d..cae8304 100644 --- a/src/commands/logs.rs +++ b/src/commands/logs.rs @@ -10,7 +10,7 @@ use datadog_api_client::datadogV2::api_logs_metrics::LogsMetricsAPI; #[cfg(not(target_arch = "wasm32"))] use datadog_api_client::datadogV2::model::{ LogsAggregateRequest, LogsAggregationFunction, LogsCompute, LogsListRequest, - LogsListRequestPage, LogsQueryFilter, LogsSort, + LogsListRequestPage, LogsQueryFilter, LogsSort, LogsStorageTier, }; #[cfg(not(target_arch = "wasm32"))] @@ -19,6 +19,24 @@ use crate::config::Config; use crate::formatter; use crate::util; +/// Parse a storage tier string into a `LogsStorageTier` enum value. +/// Returns `None` if the input is `None`; returns an error for unrecognised values. +#[cfg(not(target_arch = "wasm32"))] +fn parse_storage_tier(storage: Option) -> Result> { + match storage { + None => Ok(None), + Some(s) => match s.to_lowercase().as_str() { + "indexes" => Ok(Some(LogsStorageTier::INDEXES)), + "online-archives" | "online_archives" => Ok(Some(LogsStorageTier::ONLINE_ARCHIVES)), + "flex" => Ok(Some(LogsStorageTier::FLEX)), + other => anyhow::bail!( + "unknown storage tier {:?}; valid values are: indexes, online-archives, flex", + other + ), + }, + } +} + #[cfg(not(target_arch = "wasm32"))] pub async fn search( cfg: &Config, @@ -26,6 +44,7 @@ pub async fn search( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { // Logs search API doesn't support OAuth/bearer - force API keys if !cfg.has_api_keys() { @@ -42,13 +61,18 @@ pub async fn search( let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let storage_tier = parse_storage_tier(storage)?; + + let mut filter = LogsQueryFilter::new() + .query(query) + .from(from_ms.to_string()) + .to(to_ms.to_string()); + if let Some(tier) = storage_tier { + filter = filter.storage_tier(tier); + } + let body = LogsListRequest::new() - .filter( - LogsQueryFilter::new() - .query(query) - .from(from_ms.to_string()) - .to(to_ms.to_string()), - ) + .filter(filter) .page(LogsListRequestPage::new().limit(limit)) .sort(LogsSort::TIMESTAMP_DESCENDING); @@ -89,15 +113,20 @@ pub async fn search( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let mut filter = serde_json::json!({ + "query": query, + "from": from_ms.to_string(), + "to": to_ms.to_string() + }); + if let Some(tier) = storage { + filter["storage_tier"] = serde_json::Value::String(tier); + } let body = serde_json::json!({ - "filter": { - "query": query, - "from": from_ms.to_string(), - "to": to_ms.to_string() - }, + "filter": filter, "page": { "limit": limit }, "sort": "-timestamp" }); @@ -106,8 +135,8 @@ pub async fn search( } /// Alias for `search` with the same interface. -pub async fn list(cfg: &Config, query: String, from: String, to: String, limit: i32) -> Result<()> { - search(cfg, query, from, to, limit).await +pub async fn list(cfg: &Config, query: String, from: String, to: String, limit: i32, storage: Option) -> Result<()> { + search(cfg, query, from, to, limit, storage).await } /// Alias for `search` with the same interface. @@ -117,12 +146,13 @@ pub async fn query( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { - search(cfg, query, from, to, limit).await + search(cfg, query, from, to, limit, storage).await } #[cfg(not(target_arch = "wasm32"))] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> { +pub async fn aggregate(cfg: &Config, query: String, from: String, to: String, storage: Option) -> Result<()> { if !cfg.has_api_keys() { bail!( "logs aggregate requires API key authentication (DD_API_KEY + DD_APP_KEY).\n\ @@ -136,13 +166,18 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let storage_tier = parse_storage_tier(storage)?; + + let mut filter = LogsQueryFilter::new() + .query(query) + .from(from_ms.to_string()) + .to(to_ms.to_string()); + if let Some(tier) = storage_tier { + filter = filter.storage_tier(tier); + } + let body = LogsAggregateRequest::new() - .filter( - LogsQueryFilter::new() - .query(query) - .from(from_ms.to_string()) - .to(to_ms.to_string()), - ) + .filter(filter) .compute(vec![LogsCompute::new(LogsAggregationFunction::COUNT)]); let resp = api @@ -155,15 +190,19 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> } #[cfg(target_arch = "wasm32")] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> { +pub async fn aggregate(cfg: &Config, query: String, from: String, to: String, storage: Option) -> Result<()> { let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let mut filter = serde_json::json!({ + "query": query, + "from": from_ms.to_string(), + "to": to_ms.to_string() + }); + if let Some(tier) = storage { + filter["storage_tier"] = serde_json::Value::String(tier); + } let body = serde_json::json!({ - "filter": { - "query": query, - "from": from_ms.to_string(), - "to": to_ms.to_string() - }, + "filter": filter, "compute": [{ "type": "count" }] }); let data = crate::api::post(cfg, "/api/v2/logs/analytics/aggregate", &body).await?; diff --git a/src/main.rs b/src/main.rs index 88caf63..e6e69a7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4904,9 +4904,9 @@ async fn main_inner() -> anyhow::Result<()> { limit, sort: _, index: _, - storage: _, + storage, } => { - commands::logs::search(&cfg, query, from, to, limit).await?; + commands::logs::search(&cfg, query, from, to, limit, storage).await?; } LogActions::List { query, @@ -4914,9 +4914,9 @@ async fn main_inner() -> anyhow::Result<()> { to, limit, sort: _, - storage: _, + storage, } => { - commands::logs::list(&cfg, query, from, to, limit).await?; + commands::logs::list(&cfg, query, from, to, limit, storage).await?; } LogActions::Query { query, @@ -4924,10 +4924,10 @@ async fn main_inner() -> anyhow::Result<()> { to, limit, sort: _, - storage: _, + storage, timezone: _, } => { - commands::logs::query(&cfg, query, from, to, limit).await?; + commands::logs::query(&cfg, query, from, to, limit, storage).await?; } LogActions::Aggregate { query, @@ -4936,9 +4936,9 @@ async fn main_inner() -> anyhow::Result<()> { compute: _, group_by: _, limit: _, - storage: _, + storage, } => { - commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to).await?; + commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to, storage).await?; } LogActions::Archives { action } => match action { LogArchiveActions::List => commands::logs::archives_list(&cfg).await?, diff --git a/src/test_commands.rs b/src/test_commands.rs index 2232dee..5083e22 100644 --- a/src/test_commands.rs +++ b/src/test_commands.rs @@ -363,7 +363,7 @@ async fn test_logs_search() { let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10) + crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10, None) .await; assert!(result.is_ok(), "logs search failed: {:?}", result.err()); cleanup_env(); @@ -386,7 +386,7 @@ async fn test_logs_search_requires_api_keys() { }; let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10) + crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10, None) .await; assert!(result.is_err(), "logs search should require API keys"); assert!( @@ -407,11 +407,93 @@ async fn test_logs_aggregate() { let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await; let result = - crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into()).await; + crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into(), None).await; assert!(result.is_ok(), "logs aggregate failed: {:?}", result.err()); cleanup_env(); } +#[tokio::test] +async fn test_logs_search_with_flex_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("flex".into()), + ) + .await; + assert!(result.is_ok(), "logs search with flex failed: {:?}", result.err()); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_search_with_online_archives_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("online-archives".into()), + ) + .await; + assert!(result.is_ok(), "logs search with online-archives failed: {:?}", result.err()); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_search_with_invalid_storage_tier() { + let _lock = lock_env(); + let server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("invalid-tier".into()), + ) + .await; + assert!(result.is_err(), "logs search with invalid storage tier should fail"); + assert!( + result.unwrap_err().to_string().contains("unknown storage tier"), + "error should mention unknown storage tier" + ); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_aggregate_with_flex_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await; + + let result = crate::commands::logs::aggregate( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + Some("flex".into()), + ) + .await; + assert!(result.is_ok(), "logs aggregate with flex failed: {:?}", result.err()); + cleanup_env(); +} + #[tokio::test] async fn test_logs_archives_list() { let _lock = lock_env(); From 606edd37195b57e1be351d125310cd5bd9d24686 Mon Sep 17 00:00:00 2001 From: Cody Lee Date: Sat, 28 Feb 2026 18:20:10 -0600 Subject: [PATCH 2/2] style(logs): apply cargo fmt to storage flag changes Auto-format long function signatures and call sites that exceeded the line-length limit introduced by the --storage parameter wiring. Co-Authored-By: Matthew Moon Co-Authored-By: Claude Sonnet 4.6 --- src/commands/logs.rs | 25 ++++++++++++++++++--- src/main.rs | 3 ++- src/test_commands.rs | 52 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/src/commands/logs.rs b/src/commands/logs.rs index cae8304..4cefc41 100644 --- a/src/commands/logs.rs +++ b/src/commands/logs.rs @@ -135,7 +135,14 @@ pub async fn search( } /// Alias for `search` with the same interface. -pub async fn list(cfg: &Config, query: String, from: String, to: String, limit: i32, storage: Option) -> Result<()> { +pub async fn list( + cfg: &Config, + query: String, + from: String, + to: String, + limit: i32, + storage: Option, +) -> Result<()> { search(cfg, query, from, to, limit, storage).await } @@ -152,7 +159,13 @@ pub async fn query( } #[cfg(not(target_arch = "wasm32"))] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String, storage: Option) -> Result<()> { +pub async fn aggregate( + cfg: &Config, + query: String, + from: String, + to: String, + storage: Option, +) -> Result<()> { if !cfg.has_api_keys() { bail!( "logs aggregate requires API key authentication (DD_API_KEY + DD_APP_KEY).\n\ @@ -190,7 +203,13 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String, st } #[cfg(target_arch = "wasm32")] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String, storage: Option) -> Result<()> { +pub async fn aggregate( + cfg: &Config, + query: String, + from: String, + to: String, + storage: Option, +) -> Result<()> { let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; let mut filter = serde_json::json!({ diff --git a/src/main.rs b/src/main.rs index e6e69a7..1e9cfa9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4938,7 +4938,8 @@ async fn main_inner() -> anyhow::Result<()> { limit: _, storage, } => { - commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to, storage).await?; + commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to, storage) + .await?; } LogActions::Archives { action } => match action { LogArchiveActions::List => commands::logs::archives_list(&cfg).await?, diff --git a/src/test_commands.rs b/src/test_commands.rs index 5083e22..4a18b20 100644 --- a/src/test_commands.rs +++ b/src/test_commands.rs @@ -362,9 +362,15 @@ async fn test_logs_search() { let cfg = test_config(&server.url()); let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; - let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10, None) - .await; + let result = crate::commands::logs::search( + &cfg, + "status:error".into(), + "1h".into(), + "now".into(), + 10, + None, + ) + .await; assert!(result.is_ok(), "logs search failed: {:?}", result.err()); cleanup_env(); } @@ -385,9 +391,15 @@ async fn test_logs_search_requires_api_keys() { agent_mode: false, }; - let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10, None) - .await; + let result = crate::commands::logs::search( + &cfg, + "status:error".into(), + "1h".into(), + "now".into(), + 10, + None, + ) + .await; assert!(result.is_err(), "logs search should require API keys"); assert!( result @@ -428,7 +440,11 @@ async fn test_logs_search_with_flex_storage() { Some("flex".into()), ) .await; - assert!(result.is_ok(), "logs search with flex failed: {:?}", result.err()); + assert!( + result.is_ok(), + "logs search with flex failed: {:?}", + result.err() + ); cleanup_env(); } @@ -448,7 +464,11 @@ async fn test_logs_search_with_online_archives_storage() { Some("online-archives".into()), ) .await; - assert!(result.is_ok(), "logs search with online-archives failed: {:?}", result.err()); + assert!( + result.is_ok(), + "logs search with online-archives failed: {:?}", + result.err() + ); cleanup_env(); } @@ -467,9 +487,15 @@ async fn test_logs_search_with_invalid_storage_tier() { Some("invalid-tier".into()), ) .await; - assert!(result.is_err(), "logs search with invalid storage tier should fail"); assert!( - result.unwrap_err().to_string().contains("unknown storage tier"), + result.is_err(), + "logs search with invalid storage tier should fail" + ); + assert!( + result + .unwrap_err() + .to_string() + .contains("unknown storage tier"), "error should mention unknown storage tier" ); cleanup_env(); @@ -490,7 +516,11 @@ async fn test_logs_aggregate_with_flex_storage() { Some("flex".into()), ) .await; - assert!(result.is_ok(), "logs aggregate with flex failed: {:?}", result.err()); + assert!( + result.is_ok(), + "logs aggregate with flex failed: {:?}", + result.err() + ); cleanup_env(); }