Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
There was a problem hiding this comment.
Code Review
This pull request introduces support for custom options in CREATE FLOW statements using a WITH clause. The changes include updates to the SQL parser to handle the new syntax, the addition of validation and normalization logic for flow options (specifically supporting defer_on_missing_source), and ensuring these options are correctly displayed in information_schema.flows and SHOW CREATE FLOW results. Feedback was provided regarding the readability of the option validation logic, suggesting a refactor from a functional style to a more explicit loop.
| fn validate_and_normalize_flow_options( | ||
| options: HashMap<String, String>, | ||
| ) -> Result<HashMap<String, String>> { | ||
| options | ||
| .into_iter() | ||
| .map(|(key, value)| { | ||
| if key == FlowType::FLOW_TYPE_KEY { | ||
| return InvalidSqlSnafu { | ||
| err_msg: format!("flow option '{key}' is reserved for internal use"), | ||
| } | ||
| .fail(); | ||
| } | ||
|
|
||
| let normalized_value = match key.as_str() { | ||
| DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?, | ||
| _ => { | ||
| return InvalidSqlSnafu { | ||
| err_msg: format!( | ||
| "unknown flow option '{key}', supported options: {}", | ||
| supported_flow_options() | ||
| ), | ||
| } | ||
| .fail(); | ||
| } | ||
| }; | ||
|
|
||
| Ok((key, normalized_value)) | ||
| }) | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
The current implementation using map with early returns and the ? operator can be hard to follow. Refactoring this function to use a simple for loop would improve readability and make the control flow clearer.
fn validate_and_normalize_flow_options(
options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
let mut normalized_options = HashMap::with_capacity(options.len());
for (key, value) in options {
if key == FlowType::FLOW_TYPE_KEY {
return InvalidSqlSnafu {
err_msg: format!("flow option '{key}' is reserved for internal use"),
}
.fail();
}
let normalized_value = match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"unknown flow option '{key}', supported options: {}",
supported_flow_options()
),
}
.fail();
}
};
normalized_options.insert(key, normalized_value);
}
Ok(normalized_options)
}
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
Summary
WITH (...)support toCREATE FLOWSHOW CREATE FLOWandinformation_schema.flows.flow_definitionWhy
This extracts the SQL parsing/validation surface from the larger pending-flow change so it can be reviewed independently.
Out of Scope
Tests
cargo test -p sql create_flowcargo test -p operator validate_and_normalize_flow_optionsPR Checklist
Please convert it to a draft if some of the following conditions are not met.