diff --git a/src/datafusion_integration/context.rs b/src/datafusion_integration/context.rs index 7e6a7ab..6253934 100644 --- a/src/datafusion_integration/context.rs +++ b/src/datafusion_integration/context.rs @@ -16,7 +16,7 @@ use datafusion::prelude::SessionConfig; use crate::kubernetes::K8sClientPool; use crate::output::QueryResult; -use super::preprocess::{preprocess_sql, validate_read_only}; +use super::preprocess::{preprocess_sql_with_registry, validate_read_only}; use super::provider::K8sTableProvider; /// Table information with native types (for data layer) @@ -95,8 +95,15 @@ impl K8sSessionContext { /// Execute a SQL query and return the results as Arrow RecordBatches pub async fn execute_sql(&self, sql: &str) -> DFResult> { - // Preprocess first (compiles PRQL to SQL if detected, fixes arrow precedence) - let processed_sql = preprocess_sql(sql) + // Get registry for table-aware JSON column detection + let registry = self + .pool + .get_registry(None) + .await + .map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?; + + // Preprocess with table-aware JSON columns (compiles PRQL, converts JSON paths) + let processed_sql = preprocess_sql_with_registry(sql, ®istry) .map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?; // Validate the resulting SQL is read-only diff --git a/src/datafusion_integration/preprocess.rs b/src/datafusion_integration/preprocess.rs index 920fd49..45ade25 100644 --- a/src/datafusion_integration/preprocess.rs +++ b/src/datafusion_integration/preprocess.rs @@ -55,11 +55,14 @@ //! ``` use super::{json_path, prql}; +use crate::kubernetes::discovery::ResourceRegistry; use anyhow::Result; use datafusion::sql::sqlparser::ast::Statement; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use datafusion::sql::sqlparser::parser::Parser; +use datafusion::sql::sqlparser::tokenizer::{Token, Tokenizer}; use regex::Regex; +use std::collections::HashSet; use std::sync::LazyLock; /// Regex to match arrows followed by comparison operators (left side) @@ -120,7 +123,112 @@ fn fix_arrow_precedence(sql: &str) -> String { .into_owned() } -/// Preprocess a query for execution. +/// Extract table names from a SQL query. +/// +/// Uses DataFusion's tokenizer to find identifiers following FROM and JOIN keywords. +/// This is a best-effort extraction - missing some tables is acceptable since we +/// fall back to DEFAULT_JSON_OBJECT_COLUMNS for unrecognized columns. +/// +/// # Examples +/// +/// ```ignore +/// extract_table_names("SELECT * FROM pods") // -> ["pods"] +/// extract_table_names("SELECT * FROM pods p JOIN services s ON ...") // -> ["pods", "services"] +/// extract_table_names("SELECT * FROM pods WHERE x IN (SELECT * FROM namespaces)") // -> ["pods", "namespaces"] +/// ``` +fn extract_table_names(sql: &str) -> Vec { + let dialect = PostgreSqlDialect {}; + let tokens = match Tokenizer::new(&dialect, sql).tokenize() { + Ok(t) => t, + Err(_) => return vec![], + }; + + let mut table_names = Vec::new(); + let mut i = 0; + + while i < tokens.len() { + // Look for FROM or JOIN keywords + if let Token::Word(word) = &tokens[i] { + let keyword = word.value.to_uppercase(); + if keyword == "FROM" || keyword == "JOIN" { + // Skip whitespace and find the next identifier + i += 1; + while i < tokens.len() && matches!(tokens[i], Token::Whitespace(_)) { + i += 1; + } + + // The next token should be a table name (identifier or word) + if let Some(Token::Word(table_word)) = tokens.get(i) { + // Skip keywords that might follow FROM (like SELECT in subqueries) + let upper = table_word.value.to_uppercase(); + if !matches!( + upper.as_str(), + "SELECT" | "WITH" | "LATERAL" | "UNNEST" | "(" + ) { + table_names.push(table_word.value.to_lowercase()); + } + } + } + } + i += 1; + } + + table_names +} + +/// Build a set of JSON column names from the registry for the given tables. +/// +/// Merges DEFAULT_JSON_OBJECT_COLUMNS with table-specific JSON columns. +fn build_json_columns_for_tables( + table_names: &[String], + registry: &ResourceRegistry, +) -> HashSet { + // Start with default columns (always available) + let mut columns = json_path::build_json_columns_set(&[]); + + // Add table-specific JSON columns + columns.extend(registry.get_json_columns_for_tables(table_names)); + + columns +} + +/// Preprocess a SQL query with table-aware JSON column detection. +/// +/// This is the primary preprocessing function when a ResourceRegistry is available. +/// It extracts table names from the query and looks up their JSON columns for +/// accurate dot-notation conversion. +/// +/// # Arguments +/// +/// * `sql` - The SQL or PRQL query to preprocess +/// * `registry` - The resource registry containing table schemas +/// +/// # Returns +/// +/// The preprocessed SQL ready for execution +pub fn preprocess_sql_with_registry(sql: &str, registry: &ResourceRegistry) -> Result { + // Step 1: Compile PRQL to SQL if detected + let sql = if prql::is_prql(sql) { + let prql_preprocessed = prql::preprocess_prql_json_paths(sql); + prql::compile_prql(&prql_preprocessed)? + } else { + sql.to_string() + }; + + // Step 2: Extract table names from the SQL + let table_names = extract_table_names(&sql); + + // Step 3: Build JSON columns set from registry for these tables + let json_columns = build_json_columns_for_tables(&table_names, registry); + + // Step 4: Convert JSON path syntax with table-aware columns + let sql = json_path::preprocess_json_paths(&sql, Some(&json_columns)); + + // Step 5: Fix JSON arrow operator precedence + Ok(fix_arrow_precedence(&sql)) +} + +/// Preprocess a query for execution (without registry - uses defaults only). /// /// This function handles: /// 1. **PRQL detection and compilation**: Queries starting with `from`, `let`, or `prql` @@ -132,6 +240,9 @@ fn fix_arrow_precedence(sql: &str) -> String { /// 3. **JSON arrow precedence fix**: Wraps arrow expressions in parentheses when used /// with comparison operators to work around DataFusion parser precedence. /// +/// Note: This function only recognizes DEFAULT_JSON_OBJECT_COLUMNS (spec, status, labels, etc.). +/// For table-aware JSON column detection, use `preprocess_sql_with_registry` instead. +/// /// # Examples /// /// ```ignore @@ -153,6 +264,7 @@ fn fix_arrow_precedence(sql: &str) -> String { /// preprocess_sql("SELECT * FROM pods WHERE labels->>'app' = 'nginx'")?; /// // Returns: SELECT * FROM pods WHERE (labels->>'app') = 'nginx' /// ``` +#[allow(dead_code)] // Used by tests and for backward compatibility pub fn preprocess_sql(sql: &str) -> Result { // Step 1: Compile PRQL to SQL if detected let sql = if prql::is_prql(sql) { @@ -709,4 +821,74 @@ mod tests { let result = preprocess_sql(sql).unwrap(); assert_eq!(result, sql); } + + // ==================== Table extraction tests ==================== + + #[test] + fn test_extract_table_names_simple() { + let tables = extract_table_names("SELECT * FROM pods"); + assert_eq!(tables, vec!["pods"]); + } + + #[test] + fn test_extract_table_names_with_alias() { + let tables = extract_table_names("SELECT * FROM pods p"); + assert_eq!(tables, vec!["pods"]); + } + + #[test] + fn test_extract_table_names_join() { + let tables = extract_table_names("SELECT * FROM pods p JOIN services s ON p.name = s.name"); + assert_eq!(tables, vec!["pods", "services"]); + } + + #[test] + fn test_extract_table_names_multiple_joins() { + let tables = extract_table_names( + "SELECT * FROM pods p \ + JOIN services s ON p.name = s.name \ + JOIN deployments d ON d.name = p.name", + ); + assert_eq!(tables, vec!["pods", "services", "deployments"]); + } + + #[test] + fn test_extract_table_names_subquery() { + let tables = extract_table_names( + "SELECT * FROM pods WHERE namespace IN (SELECT name FROM namespaces)", + ); + assert_eq!(tables, vec!["pods", "namespaces"]); + } + + #[test] + fn test_extract_table_names_case_insensitive() { + let tables = extract_table_names("SELECT * FROM Pods"); + assert_eq!(tables, vec!["pods"]); // lowercased + } + + #[test] + fn test_extract_table_names_left_join() { + let tables = extract_table_names("SELECT * FROM pods LEFT JOIN services ON true"); + // LEFT is a keyword before JOIN, so we get both tables + assert!(tables.contains(&"pods".to_string())); + assert!(tables.contains(&"services".to_string())); + } + + // ==================== Table-aware preprocessing tests ==================== + // Note: These tests need a registry, which requires more setup. + // The integration is tested via execute_sql in context.rs. + + #[test] + fn test_build_json_columns_includes_defaults() { + // Even with no registry tables found, defaults should be present + use crate::kubernetes::discovery::ResourceRegistry; + let registry = ResourceRegistry::new(); + let columns = build_json_columns_for_tables(&[], ®istry); + + // Should include default columns + assert!(columns.contains("spec")); + assert!(columns.contains("status")); + assert!(columns.contains("labels")); + assert!(columns.contains("annotations")); + } } diff --git a/src/kubernetes/discovery.rs b/src/kubernetes/discovery.rs index bb080e2..628c13b 100644 --- a/src/kubernetes/discovery.rs +++ b/src/kubernetes/discovery.rs @@ -12,7 +12,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{ }; use kube::api::ObjectList; use kube::discovery::{ApiCapabilities, ApiResource, Scope}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// Arrow data type for column schema #[derive(Debug, Clone, Copy, PartialEq)] @@ -86,6 +86,18 @@ impl ColumnDef { } } + /// Create a JSON array/object column with a JSON path. + /// Always sets is_json_object=true, enabling dot-notation and bracket syntax. + /// Use this for columns that contain JSON arrays or objects (e.g., containers, rules, subjects). + fn json(name: &str, json_path: &str) -> Self { + Self { + name: name.into(), + data_type: ColumnDataType::Text, + json_path: Some(json_path.into()), + is_json_object: true, + } + } + /// Create a text column without a JSON path (uses name as path) fn text_raw(name: &str) -> Self { Self { @@ -237,6 +249,32 @@ impl ResourceRegistry { resources.sort_by(|a, b| a.table_name.cmp(&b.table_name)); resources } + + /// Get JSON column names for a specific table. + /// Returns column names where is_json_object is true (arrays and objects that support dot-notation). + /// Returns None if the table is not found. + pub fn get_json_columns(&self, table_name: &str) -> Option> { + let info = self.get(table_name)?; + let schema = generate_schema(info); + let json_cols: HashSet = schema + .into_iter() + .filter(|col| col.is_json_object) + .map(|col| col.name) + .collect(); + Some(json_cols) + } + + /// Get JSON column names for multiple tables, merged into a single set. + /// Useful when a query involves multiple tables (JOINs). + pub fn get_json_columns_for_tables(&self, table_names: &[String]) -> HashSet { + let mut result = HashSet::new(); + for name in table_names { + if let Some(cols) = self.get_json_columns(name) { + result.extend(cols); + } + } + result + } } /// Result of analyzing an OpenAPI schema type @@ -622,78 +660,78 @@ fn get_core_resource_fields(table_name: &str) -> Option> { // ==================== RBAC: rules pattern ==================== // Role and ClusterRole have rules array, not spec/status "roles" | "clusterroles" => Some(vec![ - ColumnDef::text("rules", "/rules"), - ColumnDef::text("aggregation_rule", "/aggregationRule"), + ColumnDef::json("rules", "/rules"), // array of PolicyRule + ColumnDef::json("aggregation_rule", "/aggregationRule"), // AggregationRule object ]), // ==================== RBAC: binding pattern ==================== // RoleBinding and ClusterRoleBinding reference a role and subjects "rolebindings" | "clusterrolebindings" => Some(vec![ - ColumnDef::text("role_ref", "/roleRef"), - ColumnDef::text("subjects", "/subjects"), + ColumnDef::json("role_ref", "/roleRef"), // RoleRef object + ColumnDef::json("subjects", "/subjects"), // array of Subject ]), // ==================== ServiceAccount: flat fields ==================== "serviceaccounts" => Some(vec![ - ColumnDef::text("secrets", "/secrets"), - ColumnDef::text("image_pull_secrets", "/imagePullSecrets"), + ColumnDef::json("secrets", "/secrets"), // array of ObjectReference + ColumnDef::json("image_pull_secrets", "/imagePullSecrets"), // array of LocalObjectReference ColumnDef::text( "automount_service_account_token", "/automountServiceAccountToken", - ), + ), // boolean (scalar) ]), // ==================== Endpoints: subsets pattern ==================== - "endpoints" => Some(vec![ColumnDef::text("subsets", "/subsets")]), + "endpoints" => Some(vec![ColumnDef::json("subsets", "/subsets")]), // array of EndpointSubset // ==================== ConfigMap/Secret: data pattern ==================== "configmaps" => Some(vec![ - ColumnDef::text("data", "/data"), - ColumnDef::text("binary_data", "/binaryData"), - ColumnDef::text("immutable", "/immutable"), + ColumnDef::text("data", "/data"), // handled by DEFAULT_JSON_OBJECT_COLUMNS + ColumnDef::json("binary_data", "/binaryData"), // map[string][]byte + ColumnDef::text("immutable", "/immutable"), // boolean (scalar) ]), "secrets" => Some(vec![ - ColumnDef::text("type", "/type"), - ColumnDef::text("data", "/data"), - ColumnDef::text("string_data", "/stringData"), - ColumnDef::text("immutable", "/immutable"), + ColumnDef::text("type", "/type"), // string (scalar) + ColumnDef::text("data", "/data"), // handled by DEFAULT_JSON_OBJECT_COLUMNS + ColumnDef::json("string_data", "/stringData"), // map[string]string + ColumnDef::text("immutable", "/immutable"), // boolean (scalar) ]), // ==================== Events: flat structure ==================== "events" => Some(vec![ - ColumnDef::text("type", "/type"), - ColumnDef::text("reason", "/reason"), - ColumnDef::text("message", "/message"), + ColumnDef::text("type", "/type"), // string (scalar) + ColumnDef::text("reason", "/reason"), // string (scalar) + ColumnDef::text("message", "/message"), // string (scalar) ColumnDef::integer("count", "/count"), ColumnDef::timestamp("first_timestamp", "/firstTimestamp"), ColumnDef::timestamp("last_timestamp", "/lastTimestamp"), - ColumnDef::text("involved_object", "/involvedObject"), - ColumnDef::text("source", "/source"), + ColumnDef::json("involved_object", "/involvedObject"), // ObjectReference + ColumnDef::json("source", "/source"), // EventSource object ]), // ==================== Metrics: special structure ==================== "podmetrics" => Some(vec![ ColumnDef::timestamp("timestamp", "/timestamp"), - ColumnDef::text("window", "/window"), - ColumnDef::text("containers", "/containers"), + ColumnDef::text("window", "/window"), // string (duration) + ColumnDef::json("containers", "/containers"), // array of ContainerMetrics ]), "nodemetrics" => Some(vec![ ColumnDef::timestamp("timestamp", "/timestamp"), - ColumnDef::text("window", "/window"), - ColumnDef::text("usage", "/usage"), + ColumnDef::text("window", "/window"), // string (duration) + ColumnDef::json("usage", "/usage"), // ResourceList (map) ]), // ==================== CustomResourceDefinitions: CRD metadata ==================== "customresourcedefinitions" => Some(vec![ - ColumnDef::text("group", "/spec/group"), - ColumnDef::text("scope", "/spec/scope"), - ColumnDef::text("resource_kind", "/spec/names/kind"), - ColumnDef::text("plural", "/spec/names/plural"), - ColumnDef::text("singular", "/spec/names/singular"), - ColumnDef::text("short_names", "/spec/names/shortNames"), - ColumnDef::text("categories", "/spec/names/categories"), - // Note: spec.versions and status.conditions are too large for table display - // Use kubectl get crd -o yaml for full schema details + ColumnDef::text("group", "/spec/group"), // string (scalar) + ColumnDef::text("scope", "/spec/scope"), // string (scalar) + ColumnDef::text("resource_kind", "/spec/names/kind"), // string (scalar) + ColumnDef::text("plural", "/spec/names/plural"), // string (scalar) + ColumnDef::text("singular", "/spec/names/singular"), // string (scalar) + ColumnDef::json("short_names", "/spec/names/shortNames"), // array of strings + ColumnDef::json("categories", "/spec/names/categories"), // array of strings + // Note: spec.versions and status.conditions are too large for table display + // Use kubectl get crd -o yaml for full schema details ]), // Unknown resource - return None to trigger CRD schema detection or fallback