From 7d402560c5b1b7e5bb3774c44b04f50acae208ea Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sun, 1 Mar 2026 19:19:14 +0200 Subject: [PATCH 1/3] add support to raw queries --- miso-connectors/src/lib.rs | 9 +++ miso-kql/src/lexer.rs | 2 + miso-kql/src/parser.rs | 23 ++++++-- miso-kql/src/parser_tests.rs | 33 +++++++++++ miso-server/src/query_to_workflow.rs | 85 ++++++++++++++++++---------- miso-workflow-types/src/query.rs | 5 ++ 6 files changed, 124 insertions(+), 33 deletions(-) diff --git a/miso-connectors/src/lib.rs b/miso-connectors/src/lib.rs index 2787c32..a8f418a 100644 --- a/miso-connectors/src/lib.rs +++ b/miso-connectors/src/lib.rs @@ -317,5 +317,14 @@ pub trait Connector: Debug + Send + Sync { None } + fn raw_query( + &self, + _collection: &str, + _query: &str, + _handle: &dyn QueryHandle, + ) -> Option> { + None + } + async fn close(&self); } diff --git a/miso-kql/src/lexer.rs b/miso-kql/src/lexer.rs index 68c4b19..400197a 100644 --- a/miso-kql/src/lexer.rs +++ b/miso-kql/src/lexer.rs @@ -180,6 +180,8 @@ pub enum Token { Iff, #[token("extract")] Extract, + #[token("raw")] + Raw, #[token(",")] Comma, diff --git a/miso-kql/src/parser.rs b/miso-kql/src/parser.rs index f5117d5..15ddad6 100644 --- a/miso-kql/src/parser.rs +++ b/miso-kql/src/parser.rs @@ -219,6 +219,7 @@ where Token::Case => "case".to_string(), Token::Iff => "iff".to_string(), Token::Extract => "extract".to_string(), + Token::Raw => "raw".to_string(), } } @@ -1291,6 +1292,14 @@ impl KqlParser { .labelled("let") .boxed(); + let string_literal = select! { + Token::String(StringValue::Text(s)) => s, + }; + + let raw_suffix = just(Token::Dot) + .ignore_then(just(Token::Raw)) + .ignore_then(string_literal.delimited_by(just(Token::LParen), just(Token::RParen))); + let scan_step = ident .clone() .then( @@ -1302,13 +1311,19 @@ impl KqlParser { ))) .or_not(), ) - .map(|(connector, collection)| { - QueryStep::Scan(match collection { - Some(collection) => ScanKind::Collection { + .then(raw_suffix.or_not()) + .map(|((connector, collection), raw_query)| { + QueryStep::Scan(match (collection, raw_query) { + (Some(collection), Some(query)) => ScanKind::Raw { + connector, + collection, + query, + }, + (Some(collection), None) => ScanKind::Collection { connector, collection, }, - None => ScanKind::Var(connector), + (None, _) => ScanKind::Var(connector), }) }) .labelled("scan") diff --git a/miso-kql/src/parser_tests.rs b/miso-kql/src/parser_tests.rs index b716b6f..df837a0 100644 --- a/miso-kql/src/parser_tests.rs +++ b/miso-kql/src/parser_tests.rs @@ -1291,6 +1291,39 @@ fn test_parse_extract_with_field_arguments() { } } +#[test] +fn test_raw_scan() { + let query = "connector.table.raw(\"{\\\"query\\\": {\\\"match_all\\\": {}}}\")"; + let result = parse_unwrap!(query); + + assert_eq!(result.len(), 1); + match &result[0] { + QueryStep::Scan(ScanKind::Raw { + connector, + collection, + query, + }) => { + assert_eq!(connector, "connector"); + assert_eq!(collection, "table"); + assert_eq!(query, r#"{"query": {"match_all": {}}}"#); + } + _ => panic!("Expected Raw scan step"), + } +} + +#[test] +fn test_raw_as_field_name() { + let query = r#"connector.table | where raw == "foo""#; + let result = parse_unwrap!(query); + + assert_eq!(result.len(), 2); + assert!(matches!( + result[0], + QueryStep::Scan(ScanKind::Collection { .. }) + )); + assert!(!matches!(result[0], QueryStep::Scan(ScanKind::Raw { .. }))); +} + #[test] fn test_parse_extract_in_filter() { let query = r#"connector.table | where extract("(\\d+)", 1, message) == "123""#; diff --git a/miso-server/src/query_to_workflow.rs b/miso-server/src/query_to_workflow.rs index bc939ff..4c21da0 100644 --- a/miso-server/src/query_to_workflow.rs +++ b/miso-server/src/query_to_workflow.rs @@ -86,37 +86,29 @@ fn to_workflow_steps_inner( connector: connector_name, collection: collection_name, }) => { - let Some(connector_state) = connectors.get(&connector_name).cloned() else { - return Err(HttpError::from_string( - StatusCode::NOT_FOUND, - format!("connector '{connector_name}' not found"), - )); - }; - - info!(?collection_name, "Getting collection info"); - let Some(collection) = connector_state.connector.get_collection(&collection_name) - else { - info!(?collection_name, "Collection doesn't exist"); - return Err(HttpError::from_string( - StatusCode::NOT_FOUND, - format!("collection '{collection_name}' not found"), - )); - }; - - steps.push(WorkflowStep::Scan( - Scan::new( - connector_state, - connector_name, - collection_name, - collection.static_fields, - ) - .map_err(|e| { + steps.push(WorkflowStep::Scan(create_scan( + connectors, + &connector_name, + &collection_name, + )?)); + } + QueryStep::Scan(ScanKind::Raw { + connector: connector_name, + collection: collection_name, + query: raw_query, + }) => { + let mut scan = create_scan(connectors, &connector_name, &collection_name)?; + let raw_handle = scan + .connector + .raw_query(&collection_name, &raw_query, &*scan.handle) + .ok_or_else(|| { HttpError::from_string( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to create connector from scan step: {e}"), + StatusCode::BAD_REQUEST, + format!("connector '{connector_name}' does not support raw queries"), ) - })?, - )); + })?; + scan.handle = raw_handle.into(); + steps.push(WorkflowStep::Scan(scan)); } _ if steps.is_empty() => { return Err(HttpError::from_string( @@ -205,6 +197,41 @@ fn to_workflow_steps_inner( Ok(steps) } +fn create_scan( + connectors: &ConnectorsMap, + connector_name: &str, + collection_name: &str, +) -> Result { + let Some(connector_state) = connectors.get(connector_name).cloned() else { + return Err(HttpError::from_string( + StatusCode::NOT_FOUND, + format!("connector '{connector_name}' not found"), + )); + }; + + info!(?collection_name, "Getting collection info"); + let Some(collection) = connector_state.connector.get_collection(collection_name) else { + info!(?collection_name, "Collection doesn't exist"); + return Err(HttpError::from_string( + StatusCode::NOT_FOUND, + format!("collection '{collection_name}' not found"), + )); + }; + + Scan::new( + connector_state, + connector_name.to_string(), + collection_name.to_string(), + collection.static_fields, + ) + .map_err(|e| { + HttpError::from_string( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to create connector from scan step: {e}"), + ) + }) +} + fn create_sink( connectors: &ConnectorsMap, connector_name: String, diff --git a/miso-workflow-types/src/query.rs b/miso-workflow-types/src/query.rs index 847f50d..fbac1f3 100644 --- a/miso-workflow-types/src/query.rs +++ b/miso-workflow-types/src/query.rs @@ -39,4 +39,9 @@ pub enum ScanKind { connector: String, collection: String, }, + Raw { + connector: String, + collection: String, + query: String, + }, } From a675e8a20e48d7d123004a9a3af1bece304bcc9c Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sun, 1 Mar 2026 19:19:21 +0200 Subject: [PATCH 2/3] connectors: quickwit: support raw query --- miso-connectors/src/quickwit.rs | 100 +++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 33 deletions(-) diff --git a/miso-connectors/src/quickwit.rs b/miso-connectors/src/quickwit.rs index a5442e5..ec0ad8b 100644 --- a/miso-connectors/src/quickwit.rs +++ b/miso-connectors/src/quickwit.rs @@ -93,6 +93,7 @@ struct QuickwitHandle { limit: Option, count: bool, collections: Vec, + raw_query: Option, } #[typetag::serde] @@ -163,12 +164,22 @@ impl QuickwitHandle { handle.collections.push(collection.to_string()); handle } + + fn with_raw_query(&self, query: Value) -> QuickwitHandle { + let mut handle = self.clone(); + handle.raw_query = Some(query); + handle + } } impl fmt::Display for QuickwitHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut items = Vec::new(); + if let Some(raw) = &self.raw_query { + items.push(format!("raw_query={raw}")); + } + if self.count { items.push("count".to_string()); } @@ -1159,42 +1170,44 @@ impl Connector for QuickwitConnector { collections.dedup(); let collections = collections.join(","); - let mut query_map = Map::new(); - - if !handle.queries.is_empty() { - query_map.insert( - "query", - json!({ - "bool": { - "must": handle.queries.clone(), - } - }), - ); - } + let query = if let Some(raw) = &handle.raw_query { + Some(raw.clone()) + } else { + let mut query_map = Map::new(); - if let Some(sorts) = &handle.sorts { - query_map.insert("sort", sorts.clone()); - } + if !handle.queries.is_empty() { + query_map.insert( + "query", + json!({ + "bool": { + "must": handle.queries.clone(), + } + }), + ); + } - let is_aggregation_query = if let Some(aggs) = &handle.aggs { - query_map.insert("size", json!(0)); - for (key, value) in aggs.as_object().unwrap() { - query_map.insert(key, value.clone()); + if let Some(sorts) = &handle.sorts { + query_map.insert("sort", sorts.clone()); } - true - } else { - if let Some(limit) = limit { + + if let Some(aggs) = &handle.aggs { + query_map.insert("size", json!(0)); + for (key, value) in aggs.as_object().unwrap() { + query_map.insert(key, value.clone()); + } + } else if let Some(limit) = limit { query_map.insert("size", limit.into()); } - false - }; - let query = if !query_map.is_empty() { - Some(json!(query_map)) - } else { - None + if !query_map.is_empty() { + Some(json!(query_map)) + } else { + None + } }; + let is_aggregation_query = handle.aggs.is_some(); + info!( count = handle.count, scroll_size, @@ -1254,7 +1267,7 @@ impl Connector for QuickwitConnector { fn apply_filter(&self, ast: &Expr, handle: &dyn QueryHandle) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); - if handle.sorts.is_some() || !handle.group_by.is_empty() { + if handle.raw_query.is_some() || handle.sorts.is_some() || !handle.group_by.is_empty() { // Cannot filter over top-n / group by in Quickwit. return None; } @@ -1267,7 +1280,7 @@ impl Connector for QuickwitConnector { handle: &dyn QueryHandle, ) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); - if handle.count || !handle.group_by.is_empty() { + if handle.raw_query.is_some() || handle.count || !handle.group_by.is_empty() { return None; } @@ -1286,6 +1299,9 @@ impl Connector for QuickwitConnector { fn apply_limit(&self, mut max: u64, handle: &dyn QueryHandle) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); + if handle.raw_query.is_some() { + return None; + } if let Some(limit) = handle.limit && limit < max { @@ -1301,7 +1317,7 @@ impl Connector for QuickwitConnector { handle: &dyn QueryHandle, ) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); - if handle.sorts.is_some() { + if handle.raw_query.is_some() || handle.sorts.is_some() { // Cannot top-n over top-n in Quickwit. return None; } @@ -1337,7 +1353,7 @@ impl Connector for QuickwitConnector { fn apply_count(&self, handle: &dyn QueryHandle) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); - if !handle.group_by.is_empty() { + if handle.raw_query.is_some() || !handle.group_by.is_empty() { // Quickwit count query returns number of items instead of number of unique groups. // This is fine, as usually aggregation requests return few results, we can count // them ourselves. @@ -1352,7 +1368,11 @@ impl Connector for QuickwitConnector { handle: &dyn QueryHandle, ) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); - if handle.limit.is_some() || handle.sorts.is_some() || !handle.group_by.is_empty() { + if handle.raw_query.is_some() + || handle.limit.is_some() + || handle.sorts.is_some() + || !handle.group_by.is_empty() + { // Quickwit's query (like Elasticsearch's) is not pipelined, most similar to SQL. // When you request it to both sort (or limit) and aggregate, it will always first // aggregate and then sort (or limit), no way to control the order of these 2 AFAIK. @@ -1498,6 +1518,9 @@ impl Connector for QuickwitConnector { union_handle: &dyn QueryHandle, ) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); + if handle.raw_query.is_some() { + return None; + } let union_handle = union_handle.as_any().downcast_ref::()?; if handle != union_handle { @@ -1525,6 +1548,17 @@ impl Connector for QuickwitConnector { async fn close(&self) { self.interval_task.shutdown().await; } + + fn raw_query( + &self, + _collection: &str, + query: &str, + handle: &dyn QueryHandle, + ) -> Option> { + let handle = downcast_unwrap!(handle, QuickwitHandle); + let parsed: Value = serde_json::from_str(query).ok()?; + Some(Box::new(handle.with_raw_query(parsed))) + } } #[cfg(test)] From 665d16a42b178fee3484f966a968e14a128fcc5d Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sun, 1 Mar 2026 19:42:22 +0200 Subject: [PATCH 3/3] tests: connectors: quickwit: add raw query tests --- tests/quickwit.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/quickwit.rs b/tests/quickwit.rs index 5545021..ce1cbb9 100644 --- a/tests/quickwit.rs +++ b/tests/quickwit.rs @@ -26,6 +26,7 @@ use tracing::info; use common::init_test_tracing; use common::predicate_pushdown::{run_tests, TestConnector, INDEXES, TESTS}; +use common::test_cases::{Expected, ExpectedResults, TestCase}; const QUICKWIT_REFRESH_INTERVAL: Duration = Duration::from_secs(1); @@ -343,5 +344,51 @@ async fn quickwit_predicate_pushdown() -> Result<()> { } }; let connectors = Arc::new(setup(url).await?); - run_tests(TestConnector::Quickwit, connectors, &[TESTS]).await + run_tests( + TestConnector::Quickwit, + connectors, + &[TESTS, QUICKWIT_RAW_QUERY_TESTS], + ) + .await } + +const QUICKWIT_RAW_QUERY_TESTS: &[TestCase] = &[ + TestCase { + query: r##"test.stack.raw("{\"query\":{\"match_all\":{}}}")"##, + expected: Expected::Default(r##"test.stack.raw("{\"query\":{\"match_all\":{}}}")"##), + results: ExpectedResults::Count(10), + name: "raw_match_all", + }, + TestCase { + query: r##"test.stack.raw("{\"query\":{\"term\":{\"user\":\"9\"}}}")"##, + expected: Expected::Default( + r##"test.stack.raw("{\"query\":{\"term\":{\"user\":\"9\"}}}")"##, + ), + results: ExpectedResults::Count(3), + name: "raw_term_filter", + }, + TestCase { + query: r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | where questionId > 10"##, + expected: Expected::Default( + r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | where questionId > 10"##, + ), + results: ExpectedResults::Count(6), + name: "raw_with_pipeline_filter_no_pushdown", + }, + TestCase { + query: r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | take 3"##, + expected: Expected::Default( + r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | take 3"##, + ), + results: ExpectedResults::Count(3), + name: "raw_with_pipeline_limit_no_pushdown", + }, + TestCase { + query: r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | count"##, + expected: Expected::Default( + r##"test.stack.raw("{\"query\":{\"match_all\":{}}}") | count"##, + ), + results: ExpectedResults::Logs(r#"[{"Count": 10}]"#), + name: "raw_with_pipeline_count_no_pushdown", + }, +];