Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 86 additions & 28 deletions src/commands/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datadog_api_client::datadogV2::api_logs_metrics::LogsMetricsAPI;
#[cfg(not(target_arch = "wasm32"))]
use datadog_api_client::datadogV2::model::{
LogsAggregateRequest, LogsAggregationFunction, LogsCompute, LogsListRequest,
LogsListRequestPage, LogsQueryFilter, LogsSort,
LogsListRequestPage, LogsQueryFilter, LogsSort, LogsStorageTier,
};

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -19,13 +19,32 @@ use crate::config::Config;
use crate::formatter;
use crate::util;

/// Parse a storage tier string into a `LogsStorageTier` enum value.
/// Returns `None` if the input is `None`; returns an error for unrecognised values.
#[cfg(not(target_arch = "wasm32"))]
fn parse_storage_tier(storage: Option<String>) -> Result<Option<LogsStorageTier>> {
match storage {
None => Ok(None),
Some(s) => match s.to_lowercase().as_str() {
"indexes" => Ok(Some(LogsStorageTier::INDEXES)),
"online-archives" | "online_archives" => Ok(Some(LogsStorageTier::ONLINE_ARCHIVES)),
"flex" => Ok(Some(LogsStorageTier::FLEX)),
other => anyhow::bail!(
"unknown storage tier {:?}; valid values are: indexes, online-archives, flex",
other
),
},
}
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn search(
cfg: &Config,
query: String,
from: String,
to: String,
limit: i32,
storage: Option<String>,
) -> Result<()> {
let dd_cfg = client::make_dd_config(cfg);
let api = match client::make_bearer_client(cfg) {
Expand All @@ -36,13 +55,18 @@ pub async fn search(
let from_ms = util::parse_time_to_unix_millis(&from)?;
let to_ms = util::parse_time_to_unix_millis(&to)?;

let storage_tier = parse_storage_tier(storage)?;

let mut filter = LogsQueryFilter::new()
.query(query)
.from(from_ms.to_string())
.to(to_ms.to_string());
if let Some(tier) = storage_tier {
filter = filter.storage_tier(tier);
}

let body = LogsListRequest::new()
.filter(
LogsQueryFilter::new()
.query(query)
.from(from_ms.to_string())
.to(to_ms.to_string()),
)
.filter(filter)
.page(LogsListRequestPage::new().limit(limit))
.sort(LogsSort::TIMESTAMP_DESCENDING);

Expand Down Expand Up @@ -83,15 +107,20 @@ pub async fn search(
from: String,
to: String,
limit: i32,
storage: Option<String>,
) -> Result<()> {
let from_ms = util::parse_time_to_unix_millis(&from)?;
let to_ms = util::parse_time_to_unix_millis(&to)?;
let mut filter = serde_json::json!({
"query": query,
"from": from_ms.to_string(),
"to": to_ms.to_string()
});
if let Some(tier) = storage {
filter["storage_tier"] = serde_json::Value::String(tier);
}
let body = serde_json::json!({
"filter": {
"query": query,
"from": from_ms.to_string(),
"to": to_ms.to_string()
},
"filter": filter,
"page": { "limit": limit },
"sort": "-timestamp"
});
Expand All @@ -100,8 +129,15 @@ pub async fn search(
}

/// Alias for `search` with the same interface.
pub async fn list(cfg: &Config, query: String, from: String, to: String, limit: i32) -> Result<()> {
search(cfg, query, from, to, limit).await
pub async fn list(
cfg: &Config,
query: String,
from: String,
to: String,
limit: i32,
storage: Option<String>,
) -> Result<()> {
search(cfg, query, from, to, limit, storage).await
}

/// Alias for `search` with the same interface.
Expand All @@ -111,12 +147,19 @@ pub async fn query(
from: String,
to: String,
limit: i32,
storage: Option<String>,
) -> Result<()> {
search(cfg, query, from, to, limit).await
search(cfg, query, from, to, limit, storage).await
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> {
pub async fn aggregate(
cfg: &Config,
query: String,
from: String,
to: String,
storage: Option<String>,
) -> Result<()> {
let dd_cfg = client::make_dd_config(cfg);
let api = match client::make_bearer_client(cfg) {
Some(c) => LogsAPI::with_client_and_config(dd_cfg, c),
Expand All @@ -126,13 +169,18 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) ->
let from_ms = util::parse_time_to_unix_millis(&from)?;
let to_ms = util::parse_time_to_unix_millis(&to)?;

let storage_tier = parse_storage_tier(storage)?;

let mut filter = LogsQueryFilter::new()
.query(query)
.from(from_ms.to_string())
.to(to_ms.to_string());
if let Some(tier) = storage_tier {
filter = filter.storage_tier(tier);
}

let body = LogsAggregateRequest::new()
.filter(
LogsQueryFilter::new()
.query(query)
.from(from_ms.to_string())
.to(to_ms.to_string()),
)
.filter(filter)
.compute(vec![LogsCompute::new(LogsAggregationFunction::COUNT)]);

let resp = api
Expand All @@ -145,15 +193,25 @@ pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) ->
}

#[cfg(target_arch = "wasm32")]
pub async fn aggregate(cfg: &Config, query: String, from: String, to: String) -> Result<()> {
pub async fn aggregate(
cfg: &Config,
query: String,
from: String,
to: String,
storage: Option<String>,
) -> Result<()> {
let from_ms = util::parse_time_to_unix_millis(&from)?;
let to_ms = util::parse_time_to_unix_millis(&to)?;
let mut filter = serde_json::json!({
"query": query,
"from": from_ms.to_string(),
"to": to_ms.to_string()
});
if let Some(tier) = storage {
filter["storage_tier"] = serde_json::Value::String(tier);
}
let body = serde_json::json!({
"filter": {
"query": query,
"from": from_ms.to_string(),
"to": to_ms.to_string()
},
"filter": filter,
"compute": [{ "type": "count" }]
});
let data = crate::api::post(cfg, "/api/v2/logs/analytics/aggregate", &body).await?;
Expand Down
17 changes: 9 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4958,30 +4958,30 @@ async fn main_inner() -> anyhow::Result<()> {
limit,
sort: _,
index: _,
storage: _,
storage,
} => {
commands::logs::search(&cfg, query, from, to, limit).await?;
commands::logs::search(&cfg, query, from, to, limit, storage).await?;
}
LogActions::List {
query,
from,
to,
limit,
sort: _,
storage: _,
storage,
} => {
commands::logs::list(&cfg, query, from, to, limit).await?;
commands::logs::list(&cfg, query, from, to, limit, storage).await?;
}
LogActions::Query {
query,
from,
to,
limit,
sort: _,
storage: _,
storage,
timezone: _,
} => {
commands::logs::query(&cfg, query, from, to, limit).await?;
commands::logs::query(&cfg, query, from, to, limit, storage).await?;
}
LogActions::Aggregate {
query,
Expand All @@ -4990,9 +4990,10 @@ async fn main_inner() -> anyhow::Result<()> {
compute: _,
group_by: _,
limit: _,
storage: _,
storage,
} => {
commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to).await?;
commands::logs::aggregate(&cfg, query.unwrap_or_default(), from, to, storage)
.await?;
}
LogActions::Archives { action } => match action {
LogArchiveActions::List => commands::logs::archives_list(&cfg).await?,
Expand Down
126 changes: 119 additions & 7 deletions src/test_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,15 @@ async fn test_logs_search() {
let cfg = test_config(&server.url());
let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await;

let result =
crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10)
.await;
let result = crate::commands::logs::search(
&cfg,
"status:error".into(),
"1h".into(),
"now".into(),
10,
None,
)
.await;
assert!(result.is_ok(), "logs search failed: {:?}", result.err());
cleanup_env();
}
Expand All @@ -389,9 +395,15 @@ async fn test_logs_search_with_oauth() {

let _mock = mock_any(&mut server, "POST", r#"{"data": []}"#).await;

let result =
crate::commands::logs::search(&cfg, "status:error".into(), "1h".into(), "now".into(), 10)
.await;
let result = crate::commands::logs::search(
&cfg,
"status:error".into(),
"1h".into(),
"now".into(),
10,
None,
)
.await;
assert!(result.is_ok(), "logs search should work with OAuth");
cleanup_env();
}
Expand All @@ -404,11 +416,111 @@ async fn test_logs_aggregate() {
let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await;

let result =
crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into()).await;
crate::commands::logs::aggregate(&cfg, "*".into(), "1h".into(), "now".into(), None).await;
assert!(result.is_ok(), "logs aggregate failed: {:?}", result.err());
cleanup_env();
}

#[tokio::test]
async fn test_logs_search_with_flex_storage() {
let _lock = lock_env();
let mut server = mockito::Server::new_async().await;
let cfg = test_config(&server.url());
let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await;

let result = crate::commands::logs::search(
&cfg,
"*".into(),
"1h".into(),
"now".into(),
10,
Some("flex".into()),
)
.await;
assert!(
result.is_ok(),
"logs search with flex failed: {:?}",
result.err()
);
cleanup_env();
}

#[tokio::test]
async fn test_logs_search_with_online_archives_storage() {
let _lock = lock_env();
let mut server = mockito::Server::new_async().await;
let cfg = test_config(&server.url());
let _mock = mock_any(&mut server, "POST", r#"{"data": [], "meta": {"page": {}}}"#).await;

let result = crate::commands::logs::search(
&cfg,
"*".into(),
"1h".into(),
"now".into(),
10,
Some("online-archives".into()),
)
.await;
assert!(
result.is_ok(),
"logs search with online-archives failed: {:?}",
result.err()
);
cleanup_env();
}

#[tokio::test]
async fn test_logs_search_with_invalid_storage_tier() {
let _lock = lock_env();
let server = mockito::Server::new_async().await;
let cfg = test_config(&server.url());

let result = crate::commands::logs::search(
&cfg,
"*".into(),
"1h".into(),
"now".into(),
10,
Some("invalid-tier".into()),
)
.await;
assert!(
result.is_err(),
"logs search with invalid storage tier should fail"
);
assert!(
result
.unwrap_err()
.to_string()
.contains("unknown storage tier"),
"error should mention unknown storage tier"
);
cleanup_env();
}

#[tokio::test]
async fn test_logs_aggregate_with_flex_storage() {
let _lock = lock_env();
let mut server = mockito::Server::new_async().await;
let cfg = test_config(&server.url());
let _mock = mock_any(&mut server, "POST", r#"{"data": {"buckets": []}}"#).await;

let result = crate::commands::logs::aggregate(
&cfg,
"*".into(),
"1h".into(),
"now".into(),
Some("flex".into()),
)
.await;
assert!(
result.is_ok(),
"logs aggregate with flex failed: {:?}",
result.err()
);
cleanup_env();
}

#[tokio::test]
async fn test_logs_archives_list() {
let _lock = lock_env();
Expand Down