Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/catalog/src/system_schema/information_schema/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::{Arc, Weak};

use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::FlowId;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowInfoValue;
Expand Down Expand Up @@ -54,6 +55,17 @@ use crate::system_schema::utils;

const INIT_CAPACITY: usize = 42;

fn user_visible_flow_options(
options: &std::collections::HashMap<String, String>,
) -> sql::statements::OptionMap {
sql::statements::OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}

// rows of information_schema.flows
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
Expand Down Expand Up @@ -165,6 +177,7 @@ impl InformationSchemaFlows {
expire_after: flow_info.expire_after(),
eval_interval: flow_info.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_info.options()),
query,
};

Expand Down
50 changes: 49 additions & 1 deletion src/operator/src/expr_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ pub fn to_create_flow_task_expr(
eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: Default::default(),
flow_options: create_flow.flow_options.into_map(),
})
}

Expand All @@ -1065,6 +1065,8 @@ fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
Expand Down Expand Up @@ -1327,6 +1329,52 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
to_dot_sep(expr.source_table_names[0].clone())
);
assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
assert!(expr.flow_options.is_empty());

let sql = r"
CREATE FLOW task_3
SINK TO schema_1.table_1
WITH (defer_on_missing_source = 'true', foo = 'bar')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([
("defer_on_missing_source".to_string(), "true".to_string()),
("foo".to_string(), "bar".to_string()),
])
);

let sql = r"
CREATE FLOW task_4
SINK TO schema_1.table_1
WITH (defer_on_missing_source = true)
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
);

let sql = r"
CREATE FLOW abc.`task_2`
Expand Down
125 changes: 119 additions & 6 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ struct DdlSubmitOptions {
timeout: Duration,
}

const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY];

fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
Expand Down Expand Up @@ -152,6 +155,55 @@ fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
Ok(DdlSubmitOptions { wait, timeout })
}

fn supported_flow_options() -> String {
ALLOWED_FLOW_OPTIONS.join(", ")
}

fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
value
.trim()
.to_ascii_lowercase()
.parse::<bool>()
.map(|value| value.to_string())
.map_err(|_| {
InvalidSqlSnafu {
err_msg: format!("invalid flow option '{key}': '{value}'"),
}
.build()
})
}

fn validate_and_normalize_flow_options(
options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
options
.into_iter()
.map(|(key, value)| {
if key == FlowType::FLOW_TYPE_KEY {
return InvalidSqlSnafu {
err_msg: format!("flow option '{key}' is reserved for internal use"),
}
.fail();
}

let normalized_value = match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"unknown flow option '{key}', supported options: {}",
supported_flow_options()
),
}
.fail();
}
};

Ok((key, normalized_value))
})
.collect()
}
Comment on lines +176 to +205
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation using map with early returns and the ? operator can be hard to follow. Refactoring this function to use a simple for loop would improve readability and make the control flow clearer.

fn validate_and_normalize_flow_options(
    options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
    let mut normalized_options = HashMap::with_capacity(options.len());
    for (key, value) in options {
        if key == FlowType::FLOW_TYPE_KEY {
            return InvalidSqlSnafu {
                err_msg: format!("flow option '{key}' is reserved for internal use"),
            }
            .fail();
        }

        let normalized_value = match key.as_str() {
            DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
            _ => {
                return InvalidSqlSnafu {
                    err_msg: format!(
                        "unknown flow option '{key}', supported options: {}",
                        supported_flow_options()
                    ),
                }
                .fail();
            }
        };

        normalized_options.insert(key, normalized_value);
    }
    Ok(normalized_options)
}


impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
Expand Down Expand Up @@ -629,17 +681,16 @@ impl StatementExecutor {
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let mut expr = expr;
expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;

let flow_type = self
.determine_flow_type(&expr, query_context.clone())
.await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);

let expr = {
let mut expr = expr;
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
expr
};
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());

let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr),
Expand Down Expand Up @@ -2334,6 +2385,68 @@ mod test {
assert_eq!(Duration::from_secs(300), ddl_options.timeout);
}

#[test]
fn test_validate_and_normalize_flow_options_empty() {
assert!(
validate_and_normalize_flow_options(HashMap::new())
.unwrap()
.is_empty()
);
}

#[test]
fn test_validate_and_normalize_flow_options_valid() {
let options =
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]);

assert_eq!(
validate_and_normalize_flow_options(options).unwrap(),
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)])
);
}

#[test]
fn test_validate_and_normalize_flow_options_unknown_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
"foo".to_string(),
"bar".to_string(),
)]))
.unwrap_err();

assert!(
err.to_string()
.contains("unknown flow option 'foo', supported options: defer_on_missing_source")
);
}

#[test]
fn test_validate_and_normalize_flow_options_reserved_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::BATCHING.to_string(),
)]))
.unwrap_err();

assert!(
err.to_string()
.contains("flow option 'flow_type' is reserved for internal use")
);
}

#[test]
fn test_validate_and_normalize_flow_options_invalid_bool() {
let err = validate_and_normalize_flow_options(HashMap::from([(
DEFER_ON_MISSING_SOURCE_KEY.to_string(),
"not-a-bool".to_string(),
)]))
.unwrap_err();

assert!(
err.to_string()
.contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
);
}

#[test]
fn test_name_is_match() {
assert!(!NAME_PATTERN_REG.is_match("/adaf"));
Expand Down
11 changes: 11 additions & 0 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::SchemaOptions;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_query::prelude::greptime_timestamp;
Expand Down Expand Up @@ -78,6 +79,15 @@ const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";

fn user_visible_flow_options(options: &HashMap<String, String>) -> OptionMap {
OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}
const COLUMN_NAME_COLUMN: &str = "Column";
const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
const COLUMN_TYPE_COLUMN: &str = "Type";
Expand Down Expand Up @@ -1056,6 +1066,7 @@ pub fn show_create_flow(
expire_after: flow_val.expire_after(),
eval_interval: flow_val.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_val.options()),
query,
};

Expand Down
Loading
Loading