diff --git a/src/commands/logs.rs b/src/commands/logs.rs index a2523d8..e7b7228 100644 --- a/src/commands/logs.rs +++ b/src/commands/logs.rs @@ -10,7 +10,7 @@ use datadog_api_client::datadogV2::api_logs_metrics::LogsMetricsAPI; #[cfg(not(target_arch = "wasm32"))] use datadog_api_client::datadogV2::model::{ LogsAggregateRequest, LogsAggregationFunction, LogsCompute, LogsListRequest, - LogsListRequestPage, LogsQueryFilter, LogsSort, + LogsListRequestPage, LogsQueryFilter, LogsSort, LogsStorageTier, }; #[cfg(not(target_arch = "wasm32"))] @@ -19,6 +19,24 @@ use crate::config::Config; use crate::formatter; use crate::util; +/// Parse a storage tier string into a `LogsStorageTier` enum value. +/// Returns `None` if the input is `None`; returns an error for unrecognised values. +#[cfg(not(target_arch = "wasm32"))] +fn parse_storage_tier(storage: Option) -> Result> { + match storage { + None => Ok(None), + Some(s) => match s.to_lowercase().as_str() { + "indexes" => Ok(Some(LogsStorageTier::INDEXES)), + "online-archives" | "online_archives" => Ok(Some(LogsStorageTier::ONLINE_ARCHIVES)), + "flex" => Ok(Some(LogsStorageTier::FLEX)), + other => anyhow::bail!( + "unknown storage tier {:?}; valid values are: indexes, online-archives, flex", + other + ), + }, + } +} + #[cfg(not(target_arch = "wasm32"))] pub async fn search( cfg: &Config, @@ -26,6 +44,7 @@ pub async fn search( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { let dd_cfg = client::make_dd_config(cfg); let api = match client::make_bearer_client(cfg) { @@ -36,13 +55,18 @@ pub async fn search( let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let storage_tier = parse_storage_tier(storage)?; + + let mut filter = LogsQueryFilter::new() + .query(query) + .from(from_ms.to_string()) + .to(to_ms.to_string()); + if let Some(tier) = storage_tier { + filter = filter.storage_tier(tier); + } + let body = LogsListRequest::new() - .filter( - LogsQueryFilter::new() - .query(query) - .from(from_ms.to_string()) - .to(to_ms.to_string()), - ) + .filter(filter) .page(LogsListRequestPage::new().limit(limit)) .sort(LogsSort::TIMESTAMP_DESCENDING); @@ -83,15 +107,20 @@ pub async fn search( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let mut filter = serde_json::json!({ + "query": query, + "from": from_ms.to_string(), + "to": to_ms.to_string() + }); + if let Some(tier) = storage { + filter["storage_tier"] = serde_json::Value::String(tier); + } let body = serde_json::json!({ - "filter": { - "query": query, - "from": from_ms.to_string(), - "to": to_ms.to_string() - }, + "filter": filter, "page": { "limit": limit }, "sort": "-timestamp" }); @@ -100,8 +129,15 @@ pub async fn search( } /// Alias for `search` with the same interface. -pub async fn list(cfg: &Config, query: String, from: String, to: String, limit: i32) -> Result<()> { - search(cfg, query, from, to, limit).await +pub async fn list( + cfg: &Config, + query: String, + from: String, + to: String, + limit: i32, + storage: Option, +) -> Result<()> { + search(cfg, query, from, to, limit, storage).await } /// Alias for `search` with the same interface. @@ -111,12 +147,19 @@ pub async fn query( from: String, to: String, limit: i32, + storage: Option, ) -> Result<()> { - search(cfg, query, from, to, limit).await + search(cfg, query, from, to, limit, storage).await } #[cfg(not(target_arch = "wasm32"))] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> { +pub async fn aggregate( + cfg: &Config, + query: String, + from: String, + to: String, + storage: Option, +) -> Result<()> { let dd_cfg = client::make_dd_config(cfg); let api = match client::make_bearer_client(cfg) { Some(c) => LogsAPI::with_client_and_config(dd_cfg, c), @@ -126,13 +169,18 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let storage_tier = parse_storage_tier(storage)?; + + let mut filter = LogsQueryFilter::new() + .query(query) + .from(from_ms.to_string()) + .to(to_ms.to_string()); + if let Some(tier) = storage_tier { + filter = filter.storage_tier(tier); + } + let body = LogsAggregateRequest::new() - .filter( - LogsQueryFilter::new() - .query(query) - .from(from_ms.to_string()) - .to(to_ms.to_string()), - ) + .filter(filter) .compute(vec![LogsCompute::new(LogsAggregationFunction::COUNT)]); let resp = api @@ -145,15 +193,25 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> } #[cfg(target_arch = "wasm32")] -pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> { +pub async fn aggregate( + cfg: &Config, + query: String, + from: String, + to: String, + storage: Option, +) -> Result<()> { let from_ms = util::parse_time_to_unix_millis(&from)?; let to_ms = util::parse_time_to_unix_millis(&to)?; + let mut filter = serde_json::json!({ + "query": query, + "from": from_ms.to_string(), + "to": to_ms.to_string() + }); + if let Some(tier) = storage { + filter["storage_tier"] = serde_json::Value::String(tier); + } let body = serde_json::json!({ - "filter": { - "query": query, - "from": from_ms.to_string(), - "to": to_ms.to_string() - }, + "filter": filter, "compute": [{ "type": "count" }] }); let data = crate::api::post(cfg, "/api/v2/logs/analytics/aggregate", &body).await?; diff --git a/src/main.rs b/src/main.rs index 1a44776..631f561 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4958,9 +4958,9 @@ async fn main_inner() -> anyhow::Result<()> { limit, sort: _, index: _, - storage: _, + storage, } => { - commands::logs::search(&cfg, query, from, to, limit).await?; + commands::logs::search(&cfg, query, from, to, limit, storage).await?; } LogActions::List { query, @@ -4968,9 +4968,9 @@ async fn main_inner() -> anyhow::Result<()> { to, limit, sort: _, - storage: _, + storage, } => { - commands::logs::list(&cfg, query, from, to, limit).await?; + commands::logs::list(&cfg, query, from, to, limit, storage).await?; } LogActions::Query { query, @@ -4978,10 +4978,10 @@ async fn main_inner() -> anyhow::Result<()> { to, limit, sort: _, - storage: _, + storage, timezone: _, } => { - commands::logs::query(&cfg, query, from, to, limit).await?; + commands::logs::query(&cfg, query, from, to, limit, storage).await?; } LogActions::Aggregate { query, @@ -4990,9 +4990,10 @@ async fn main_inner() -> anyhow::Result<()> { compute: _, group_by: _, limit: _, - storage: _, + storage, } => { - commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to).await?; + commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to, storage) + .await?; } LogActions::Archives { action } => match action { LogArchiveActions::List => commands::logs::archives_list(&cfg).await?, diff --git a/src/test_commands.rs b/src/test_commands.rs index cea1fab..8bb63d6 100644 --- a/src/test_commands.rs +++ b/src/test_commands.rs @@ -363,9 +363,15 @@ async fn test_logs_search() { let cfg = test_config(&server.url()); let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; - let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10) - .await; + let result = crate::commands::logs::search( + &cfg, + "status:error".into(), + "1h".into(), + "now".into(), + 10, + None, + ) + .await; assert!(result.is_ok(), "logs search failed: {:?}", result.err()); cleanup_env(); } @@ -389,9 +395,15 @@ async fn test_logs_search_with_oauth() { let _mock = mock_any(&mut server, "POST", r#"{"data": []}"#).await; - let result = - crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10) - .await; + let result = crate::commands::logs::search( + &cfg, + "status:error".into(), + "1h".into(), + "now".into(), + 10, + None, + ) + .await; assert!(result.is_ok(), "logs search should work with OAuth"); cleanup_env(); } @@ -404,11 +416,111 @@ async fn test_logs_aggregate() { let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await; let result = - crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into()).await; + crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into(), None).await; assert!(result.is_ok(), "logs aggregate failed: {:?}", result.err()); cleanup_env(); } +#[tokio::test] +async fn test_logs_search_with_flex_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("flex".into()), + ) + .await; + assert!( + result.is_ok(), + "logs search with flex failed: {:?}", + result.err() + ); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_search_with_online_archives_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await; + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("online-archives".into()), + ) + .await; + assert!( + result.is_ok(), + "logs search with online-archives failed: {:?}", + result.err() + ); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_search_with_invalid_storage_tier() { + let _lock = lock_env(); + let server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let result = crate::commands::logs::search( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + 10, + Some("invalid-tier".into()), + ) + .await; + assert!( + result.is_err(), + "logs search with invalid storage tier should fail" + ); + assert!( + result + .unwrap_err() + .to_string() + .contains("unknown storage tier"), + "error should mention unknown storage tier" + ); + cleanup_env(); +} + +#[tokio::test] +async fn test_logs_aggregate_with_flex_storage() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await; + + let result = crate::commands::logs::aggregate( + &cfg, + "*".into(), + "1h".into(), + "now".into(), + Some("flex".into()), + ) + .await; + assert!( + result.is_ok(), + "logs aggregate with flex failed: {:?}", + result.err() + ); + cleanup_env(); +} + #[tokio::test] async fn test_logs_archives_list() { let _lock = lock_env();