-
Notifications
You must be signed in to change notification settings - Fork 4
WIP: attemp for lazy data from python #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
WalkthroughAdds a lazy-population wrapper for pg_catalog.pg_database with a registration API and insertion helper, exposes it from the crate root, integrates view registration and SQL rewrite/UDF changes, updates dependencies, and adds tests and tools for validating pg_catalog views. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Session as SessionContext
participant LazyProv as LazyDatabaseProvider
participant Fetcher as User Callback
participant MemTable as MemTable (inner provider)
Client->>Session: Execute SQL scanning pg_catalog.pg_database
Session->>LazyProv: scan(request)
alt fetcher registered
LazyProv->>Fetcher: invoke()
Fetcher-->>LazyProv: Vec<LazyDatabaseRow>
Note right of LazyProv #E8F8F2: Build RecordBatch matching pg_database schema
LazyProv->>MemTable: delegate scan with constructed RecordBatch
else no fetcher
LazyProv->>MemTable: delegate scan to inner provider
end
MemTable-->>Session: RecordBatches
Session-->>Client: ResultSet
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (2)
src/db_table.rs (1)
128-128: Consider moving imports to file level.The
usestatements for Arrow array types are placed inside the method body. While this is valid Rust, it's more conventional to place imports at the file level for better visibility and to avoid repeating them if this pattern is used elsewhere.Move the import to the top of the file:
+use arrow::array::{new_null_array, ArrayRef, BooleanArray, BooleanBuilder, Int32Array, Int32Builder, StringArray, StringBuilder}; + // Wrapper around MemTable that records query scans.Then remove it from line 128:
- use arrow::array::{new_null_array, ArrayRef, BooleanArray, BooleanBuilder, Int32Array, Int32Builder, StringArray, StringBuilder}; - let mut oid_b = Int32Builder::new();src/lazy_pg_catalog_helpers.rs (1)
60-77: Consider caching to avoid repeated lazy population on every scan.The
scanmethod invokes the fetcher callback and attempts to insert rows on every scan ofpg_database. This is inefficient if the table is scanned multiple times in a query plan or across multiple queries.Consider adding a flag or cache to track whether lazy population has already occurred for the current session, or implement a one-time initialization pattern.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/db_table.rs(2 hunks)src/lazy_pg_catalog_helpers.rs(1 hunks)src/lib.rs(2 hunks)src/session.rs(1 hunks)tests/lazy_pg_catalog.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
tests/lazy_pg_catalog.rs (2)
src/session.rs (1)
get_base_session_context(784-868)src/lazy_pg_catalog_helpers.rs (3)
register_user_database_with_callback(172-196)new(31-36)new(142-163)
src/lazy_pg_catalog_helpers.rs (2)
src/db_table.rs (11)
new(77-90)as_any(95-97)schema(46-46)schema(50-57)schema(99-101)table_type(103-105)supports_filters_pushdown(107-112)scan(114-227)state(207-207)state(236-236)insert_into(229-264)src/session.rs (1)
as_any(89-91)
src/db_table.rs (1)
src/lazy_pg_catalog_helpers.rs (4)
current_database_rows(235-237)new(31-36)new(142-163)maybe_refresh_pg_database(211-219)
src/session.rs (2)
src/db_table.rs (1)
new(77-90)src/lazy_pg_catalog_helpers.rs (3)
new(31-36)new(142-163)wrap_pg_database_provider_if_lazy(223-232)
🔇 Additional comments (3)
src/session.rs (1)
757-765: LGTM! Clean conditional wrapping logic.The refactoring to compute the
providerat runtime and conditionally wrappg_databasewith the lazy provider is well-structured. The variable rename fromwrappedtobaseclearly indicates the intermediate representation before potential lazy wrapping.The
wrap_pg_database_provider_if_lazyhelper correctly returns the original provider if no fetcher is registered, ensuring backwards compatibility.src/lazy_pg_catalog_helpers.rs (2)
92-119: Excellent documentation for the data structure.The comprehensive table documenting all columns of
LazyDatabaseRowis very helpful. It clearly explains each field's purpose, type, and semantics, making it easy for users to understand what data to provide when using the lazy population API.
250-260: Verify OID generation is thread-safe. UsingSELECT max(oid)+1in lazy_pg_catalog_helpers.rs and pg_catalog_helpers.rs can race under concurrent inserts, causing duplicate OIDs. Ensure assignment is serialized or backed by a proper sequence or unique constraint.
src/db_table.rs
Outdated
| // If scanning pg_database and a lazy fetcher is registered, append | ||
| // rows for fetched databases with default/null values where needed. | ||
| if self.table_name == "pg_database" { | ||
| if let Some(rows) = current_database_rows() { | ||
| if !rows.is_empty() { | ||
| // Build a batch in-memory for immediate visibility, and also | ||
| // trigger SQL inserts for persistence. | ||
| use arrow::array::{new_null_array, ArrayRef, BooleanArray, BooleanBuilder, Int32Array, Int32Builder, StringArray, StringBuilder}; | ||
|
|
||
| let mut oid_b = Int32Builder::new(); | ||
| let mut datname_b = StringBuilder::new(); | ||
| let mut datdba_b = Int32Builder::new(); | ||
| let mut encoding_b = Int32Builder::new(); | ||
| let mut datlocprovider_b = StringBuilder::new(); | ||
| let mut datistemplate_b = BooleanBuilder::new(); | ||
| let mut datallowconn_b = BooleanBuilder::new(); | ||
| let mut dathasloginevt_b = BooleanBuilder::new(); | ||
| let mut datconnlimit_b = Int32Builder::new(); | ||
| let mut datfrozenxid_b = StringBuilder::new(); | ||
| let mut datminmxid_b = StringBuilder::new(); | ||
| let mut dattablespace_b = Int32Builder::new(); | ||
| let mut datcollate_b = StringBuilder::new(); | ||
| let mut datctype_b = StringBuilder::new(); | ||
| let mut datlocale_b = StringBuilder::new(); | ||
| let mut daticurules_b = StringBuilder::new(); | ||
| let mut datcollversion_b = StringBuilder::new(); | ||
|
|
||
| for r in &rows { | ||
| match r.oid { Some(v) => oid_b.append_value(v), None => oid_b.append_null() } | ||
| datname_b.append_value(&r.datname); | ||
| datdba_b.append_value(r.datdba); | ||
| encoding_b.append_value(r.encoding.unwrap_or(6)); | ||
| if let Some(c) = r.datlocprovider { datlocprovider_b.append_value(&c.to_string()); } else { datlocprovider_b.append_null(); } | ||
| datistemplate_b.append_value(r.datistemplate.unwrap_or(false)); | ||
| datallowconn_b.append_value(r.datallowconn.unwrap_or(true)); | ||
| dathasloginevt_b.append_value(r.dathasloginevt.unwrap_or(false)); | ||
| datconnlimit_b.append_value(r.datconnlimit.unwrap_or(-1)); | ||
| datfrozenxid_b.append_value(&r.datfrozenxid.clone().unwrap_or_else(|| "726".to_string())); | ||
| datminmxid_b.append_value(&r.datminmxid.clone().unwrap_or_else(|| "1".to_string())); | ||
| dattablespace_b.append_value(r.dattablespace.unwrap_or(1663)); | ||
| datcollate_b.append_value(&r.datcollate.clone().unwrap_or_else(|| "C".to_string())); | ||
| datctype_b.append_value(&r.datctype.clone().unwrap_or_else(|| "C".to_string())); | ||
| if let Some(v) = &r.datlocale { datlocale_b.append_value(v); } else { datlocale_b.append_null(); } | ||
| if let Some(v) = &r.daticurules { daticurules_b.append_value(v); } else { daticurules_b.append_null(); } | ||
| if let Some(v) = &r.datcollversion { datcollversion_b.append_value(v); } else { datcollversion_b.append_null(); } | ||
| } | ||
|
|
||
| // Build arrays in schema order; use NULL arrays for unknown/system columns. | ||
| let mut arrays: Vec<ArrayRef> = Vec::new(); | ||
| for field in self.schema.fields() { | ||
| let name = field.name().as_str(); | ||
| let arr: ArrayRef = match name { | ||
| "oid" => Arc::new(oid_b.finish()), | ||
| "datname" => Arc::new(datname_b.finish()), | ||
| "datdba" => Arc::new(datdba_b.finish()), | ||
| "encoding" => Arc::new(encoding_b.finish()), | ||
| "datlocprovider" => Arc::new(datlocprovider_b.finish()), | ||
| "datistemplate" => Arc::new(datistemplate_b.finish()), | ||
| "datallowconn" => Arc::new(datallowconn_b.finish()), | ||
| "dathasloginevt" => Arc::new(dathasloginevt_b.finish()), | ||
| "datconnlimit" => Arc::new(datconnlimit_b.finish()), | ||
| "datfrozenxid" => Arc::new(datfrozenxid_b.finish()), | ||
| "datminmxid" => Arc::new(datminmxid_b.finish()), | ||
| "dattablespace" => Arc::new(dattablespace_b.finish()), | ||
| "datcollate" => Arc::new(datcollate_b.finish()), | ||
| "datctype" => Arc::new(datctype_b.finish()), | ||
| "datlocale" => Arc::new(datlocale_b.finish()), | ||
| "daticurules" => Arc::new(daticurules_b.finish()), | ||
| "datcollversion" => Arc::new(datcollversion_b.finish()), | ||
| // datacl and any other fields: build NULL arrays sized to rows | ||
| _ => new_null_array(field.data_type(), rows.len()), | ||
| }; | ||
| arrays.push(arr); | ||
| } | ||
|
|
||
| let new_batch = RecordBatch::try_new(self.schema.clone(), arrays).unwrap(); | ||
| // Merge with existing batches | ||
| let mut guard = self.mem.batches[0].write().await; | ||
| if !guard.is_empty() { | ||
| let merged = concat_batches(&self.schema, &vec![guard[0].clone(), new_batch])?; | ||
| guard.clear(); | ||
| guard.push(merged); | ||
| } else { | ||
| guard.push(new_batch); | ||
| } | ||
|
|
||
| if let Some(ctx) = state.as_any().downcast_ref::<SessionContext>() { | ||
| // Also persist via SQL so subsequent queries see them. | ||
| let _ = crate::lazy_pg_catalog_helpers::maybe_refresh_pg_database(ctx).await; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify thread-safety and consider idempotency of lazy population.
The lazy population logic executes on every scan of pg_database when fetched rows exist. This raises several concerns:
-
Concurrent scans: Multiple threads scanning
pg_databasesimultaneously will each try to build and merge batches, then callmaybe_refresh_pg_database. The write lock onself.mem.batches[0]protects the merge, but the overall flow is not atomic across both the in-memory merge and the SQL insert. -
Double population: The code builds an in-memory batch (lines 196-205) AND triggers SQL inserts via
maybe_refresh_pg_database(line 209). Whileinsert_database_rowin the helpers checks for existence, this creates duplicate work on every scan until the SQL insert completes. -
Performance: Building Arrow arrays manually on every scan is expensive. Consider caching the built batch or using a flag to indicate when lazy population has already occurred.
Run the following script to check if there are any concurrency controls or flags to prevent repeated lazy population:
🏁 Script executed:
#!/bin/bash
# Search for any flags or mechanisms to prevent repeated lazy population
rg -nC3 "pg_database.*lazy|lazy.*pg_database" --type=rustLength of output: 5278
Enforce idempotent, atomic lazy population for pg_database
- Introduce a one-time guard (e.g.
OnceCellor atomic flag) to skip the in-memory batch build and SQL insert on subsequent scans. - Wrap the batch merge and
maybe_refresh_pg_databasecall in a transaction or mutex to avoid races across concurrent scans. - Cache the built
RecordBatchinstead of rebuilding Arrow arrays on every scan. - Don’t ignore errors from
maybe_refresh_pg_database—log or propagate failures.
🤖 Prompt for AI Agents
In src/db_table.rs around lines 121-213, the lazy population of pg_database is
not idempotent or atomic: add a one-time guard (e.g. OnceCell<RecordBatch> or an
AtomicBool/OnceCell) on the table struct to skip rebuilding arrays and
re-running inserts after the first successful population; protect the in-memory
merge and the call to maybe_refresh_pg_database with a shared async Mutex or a
transaction-like critical section to avoid concurrent races; store/cache the
built RecordBatch in that OnceCell and reuse it for subsequent scans instead of
rebuilding Arrow arrays; and do not ignore errors from maybe_refresh_pg_database
— propagate them or at minimum log them and return a failure so callers can
handle the error.
src/db_table.rs
Outdated
| if let Some(ctx) = state.as_any().downcast_ref::<SessionContext>() { | ||
| // Also persist via SQL so subsequent queries see them. | ||
| let _ = crate::lazy_pg_catalog_helpers::maybe_refresh_pg_database(ctx).await; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't silently ignore errors from persistence.
Using let _ = ... silently discards errors from maybe_refresh_pg_database, which could hide important issues like SQL syntax errors, connection problems, or constraint violations.
Apply this diff to log or propagate the error:
- if let Some(ctx) = state.as_any().downcast_ref::<SessionContext>() {
- // Also persist via SQL so subsequent queries see them.
- let _ = crate::lazy_pg_catalog_helpers::maybe_refresh_pg_database(ctx).await;
- }
+ if let Some(ctx) = state.as_any().downcast_ref::<SessionContext>() {
+ // Also persist via SQL so subsequent queries see them.
+ if let Err(e) = crate::lazy_pg_catalog_helpers::maybe_refresh_pg_database(ctx).await {
+ log::warn!("Failed to refresh pg_database: {:?}", e);
+ }
+ }🤖 Prompt for AI Agents
In src/db_table.rs around lines 207 to 210, the call to
maybe_refresh_pg_database is using `let _ = ...` which silently ignores any
error; change this to handle the Result by either propagating the error with `?`
(if the current function returns a compatible Result) or explicitly log the
error and its context (e.g., using tracing::error! or the existing logger) and
handle/fail accordingly; ensure you await the call into a named variable or use
`if let Err(e) = ... .await { /* log or return Err(e.into()) */ }` so failures
are not dropped.
src/lazy_pg_catalog_helpers.rs
Outdated
| if let Some(f) = fetcher { | ||
| for row in (f)() { | ||
| let _ = insert_database_row(ctx, &row).await; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log errors from database row insertion.
The code silently ignores errors from insert_database_row via let _ = .... If insertion fails (e.g., due to constraint violations, SQL errors, or connection issues), users won't know that lazy population failed.
Apply this diff to log the errors:
let fetcher = { DB_FETCHER.lock().unwrap().clone() };
if let Some(f) = fetcher {
for row in (f)() {
- let _ = insert_database_row(ctx, &row).await;
+ if let Err(e) = insert_database_row(ctx, &row).await {
+ log::warn!("Failed to insert database row '{}': {:?}", row.datname, e);
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Some(f) = fetcher { | |
| for row in (f)() { | |
| let _ = insert_database_row(ctx, &row).await; | |
| } | |
| if let Some(f) = fetcher { | |
| for row in (f)() { | |
| if let Err(e) = insert_database_row(ctx, &row).await { | |
| log::warn!("Failed to insert database row '{}': {:?}", row.datname, e); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In src/lazy_pg_catalog_helpers.rs around lines 213 to 216, the Result returned
from insert_database_row is currently ignored; change it to capture the Result,
await it, and handle errors by logging them (including the error details and
relevant row/context) instead of discarding them, then continue the loop; use
the existing project logging/tracing facility to record failures so lazy
population errors are observable.
| let sql = format!( | ||
| "INSERT INTO pg_catalog.pg_database ( | ||
| oid, datname, datdba, encoding, | ||
| datistemplate, datallowconn, datconnlimit, | ||
| datfrozenxid, datminmxid, dattablespace, | ||
| datcollate, datctype, datacl | ||
| ) VALUES ( | ||
| {oid}, '{datname}', {datdba}, {encoding}, | ||
| {datistemplate}, {datallowconn}, {datconnlimit}, | ||
| '{datfrozenxid}', '{datminmxid}', {dattablespace}, | ||
| '{datcollate}', '{datctype}', {datacl} | ||
| )", | ||
| oid = oid_val, | ||
| datname = datname, | ||
| datdba = datdba, | ||
| encoding = encoding, | ||
| datistemplate = if datistemplate { "true" } else { "false" }, | ||
| datallowconn = if datallowconn { "true" } else { "false" }, | ||
| datconnlimit = datconnlimit, | ||
| datfrozenxid = esc(&datfrozenxid), | ||
| datminmxid = esc(&datminmxid), | ||
| dattablespace = dattablespace, | ||
| datcollate = esc(&datcollate), | ||
| datctype = esc(&datctype), | ||
| datacl = datacl_sql, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection risk: use parameterized queries.
The code manually constructs SQL strings with user-controlled values (datname, datcollate, datctype, etc.) using string escaping. While the esc function handles single quotes, manual SQL construction is error-prone and could miss edge cases.
Refactor to use DataFusion's parameterized query support:
let sql = "INSERT INTO pg_catalog.pg_database (
oid, datname, datdba, encoding,
datistemplate, datallowconn, datconnlimit,
datfrozenxid, datminmxid, dattablespace,
datcollate, datctype, datacl
) VALUES (
$oid, $datname, $datdba, $encoding,
$datistemplate, $datallowconn, $datconnlimit,
$datfrozenxid, $datminmxid, $dattablespace,
$datcollate, $datctype, $datacl
)";
ctx.sql(sql).await?
.with_param_values(vec![
("oid", ScalarValue::Int64(Some(oid_val))),
("datname", ScalarValue::Utf8(Some(datname))),
("datdba", ScalarValue::Int32(Some(datdba))),
// ... other parameters
])?
.collect().await?;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
src/lazy_pg_catalog_helpers.rs (2)
246-252: Don't swallow registration errors; return on register failure.Silently ignoring register_table errors can leave the lazy provider uninstalled with no signal. Log and return the error.
- let _ = schema.deregister_table("pg_database"); - let _ = schema.register_table("pg_database".to_string(), wrapped); + if let Err(e) = schema.deregister_table("pg_database") { + log::debug!("Failed to deregister pg_database (may be unsupported): {:?}", e); + } + if let Err(e) = schema.register_table("pg_database".to_string(), wrapped) { + log::error!("Failed to register lazy pg_database provider: {:?}", e); + return Err(e); + }
297-325: SQL injection risk in INSERT; switch to parameterized query.Manual SQL construction with escaped strings is unsafe and brittle. Use DataFusion parameters for all values.
- let sql = format!( - "INSERT INTO pg_catalog.pg_database ( - oid, datname, datdba, encoding, - datistemplate, datallowconn, datconnlimit, - datfrozenxid, datminmxid, dattablespace, - datcollate, datctype, datacl - ) VALUES ( - {oid}, '{datname}', {datdba}, {encoding}, - {datistemplate}, {datallowconn}, {datconnlimit}, - '{datfrozenxid}', '{datminmxid}', {dattablespace}, - '{datcollate}', '{datctype}', {datacl} - )", - oid = oid_val, - datname = datname, - datdba = datdba, - encoding = encoding, - datistemplate = if datistemplate { "true" } else { "false" }, - datallowconn = if datallowconn { "true" } else { "false" }, - datconnlimit = datconnlimit, - datfrozenxid = esc(&datfrozenxid), - datminmxid = esc(&datminmxid), - dattablespace = dattablespace, - datcollate = esc(&datcollate), - datctype = esc(&datctype), - datacl = datacl_sql, - ); - - ctx.sql(&sql).await?.collect().await?; + use datafusion::common::ScalarValue; + let sql = "INSERT INTO pg_catalog.pg_database ( + oid, datname, datdba, encoding, + datistemplate, datallowconn, datconnlimit, + datfrozenxid, datminmxid, dattablespace, + datcollate, datctype, datacl + ) VALUES ( + $oid, $datname, $datdba, $encoding, + $datistemplate, $datallowconn, $datconnlimit, + $datfrozenxid, $datminmxid, $dattablespace, + $datcollate, $datctype, $datacl + )"; + + let mut params = vec![ + ("oid", ScalarValue::Int64(Some(oid_val))), + ("datname", ScalarValue::Utf8(Some(row.datname.clone()))), + ("datdba", ScalarValue::Int32(Some(datdba))), + ("encoding", ScalarValue::Int32(Some(encoding))), + ("datistemplate", ScalarValue::Boolean(Some(datistemplate))), + ("datallowconn", ScalarValue::Boolean(Some(datallowconn))), + ("datconnlimit", ScalarValue::Int32(Some(datconnlimit))), + ("datfrozenxid", ScalarValue::Utf8(Some(datfrozenxid))), + ("datminmxid", ScalarValue::Utf8(Some(datminmxid))), + ("dattablespace", ScalarValue::Int32(Some(dattablespace))), + ("datcollate", ScalarValue::Utf8(Some(datcollate))), + ("datctype", ScalarValue::Utf8(Some(datctype))), + ]; + + // TODO: if you need datacl, pass as List<Utf8> ScalarValue; for now use NULL + params.push(("datacl", ScalarValue::Null)); + + ctx.sql(sql).await? + .with_param_values(params)? + .collect().await?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/db_table.rs(2 hunks)src/lazy_pg_catalog_helpers.rs(1 hunks)src/session.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/session.rs
- src/db_table.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/lazy_pg_catalog_helpers.rs (2)
src/db_table.rs (10)
new(76-89)as_any(94-96)schema(45-45)schema(49-56)schema(98-100)table_type(102-104)supports_filters_pushdown(106-111)scan(113-135)state(144-144)insert_into(137-172)src/session.rs (5)
as_any(89-91)batch(155-170)batch(914-917)batch(922-925)batch(986-989)
🪛 GitHub Actions: CI
src/lazy_pg_catalog_helpers.rs
[error] 1-1: cargo fmt --check reported formatting changes. Run 'cargo fmt' to fix code style issues in this file.
🔇 Additional comments (2)
src/lazy_pg_catalog_helpers.rs (2)
1-1: CI formatting failure.cargo fmt --check failed. Please run cargo fmt.
74-91: No Arrow type mismatches detected. Theoidcolumn is mapped toInt32and text-like columns toUtf8bymap_pg_type, matching theInt32BuilderandStringBuilderusage in src/lazy_pg_catalog_helpers.rs.
| /// A table provider wrapper that, on scan, invokes a user-supplied callback | ||
| /// to fetch database names and ensures they are registered in pg_database | ||
| /// before delegating to the underlying table. | ||
| /// | ||
| /// This enables lazy population of `pg_catalog.pg_database`. | ||
| pub struct LazyDatabaseProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs and behavior diverge.
Comment says “ensures [rows] are registered … before delegating to the underlying table,” but scan builds an in-memory MemTable and does not update inner or delegate to it. Clarify the doc or implement insertion + inner.scan as intended.
| async fn scan( | ||
| &self, | ||
| state: &dyn datafusion::catalog::Session, | ||
| projection: Option<&Vec<usize>>, | ||
| _filters: &[Expr], | ||
| _limit: Option<usize>, | ||
| ) -> DFResult<Arc<dyn ExecutionPlan>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate filters and limit to avoid disabling pushdown.
scan ignores both filters and limit, which can degrade performance and change pushdown semantics. Pass them through to MemTable::scan.
async fn scan(
&self,
state: &dyn datafusion::catalog::Session,
projection: Option<&Vec<usize>>,
- _filters: &[Expr],
- _limit: Option<usize>,
+ filters: &[Expr],
+ limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
@@
- mem.scan(state, projection, &[], None).await
+ mem.scan(state, projection, filters, limit).await
}Also applies to: 139-142
🤖 Prompt for AI Agents
In src/lazy_pg_catalog_helpers.rs around lines 61-67 (and also apply the same
change at 139-142), the scan implementation currently ignores the incoming
filters and limit, disabling pushdown and hurting performance; update the
function to forward the projection, filters, and limit parameters into the
underlying MemTable::scan call (i.e., pass the same projection, the filters
slice/Vec as the scan's filters argument, and the limit Option) so pushed
predicates and limits are preserved and used by the execution plan.
| let arr: ArrayRef = match name { | ||
| "oid" => Arc::new(oid_b.finish()), | ||
| "datname" => Arc::new(datname_b.finish()), | ||
| "datdba" => Arc::new(datdba_b.finish()), | ||
| "encoding" => Arc::new(encoding_b.finish()), | ||
| "datlocprovider" => Arc::new(datlocprovider_b.finish()), | ||
| "datistemplate" => Arc::new(datistemplate_b.finish()), | ||
| "datallowconn" => Arc::new(datallowconn_b.finish()), | ||
| "dathasloginevt" => Arc::new(dathasloginevt_b.finish()), | ||
| "datconnlimit" => Arc::new(datconnlimit_b.finish()), | ||
| "datfrozenxid" => Arc::new(datfrozenxid_b.finish()), | ||
| "datminmxid" => Arc::new(datminmxid_b.finish()), | ||
| "dattablespace" => Arc::new(dattablespace_b.finish()), | ||
| "datcollate" => Arc::new(datcollate_b.finish()), | ||
| "datctype" => Arc::new(datctype_b.finish()), | ||
| "datlocale" => Arc::new(datlocale_b.finish()), | ||
| "daticurules" => Arc::new(daticurules_b.finish()), | ||
| "datcollversion" => Arc::new(datcollversion_b.finish()), | ||
| _ => new_null_array(field.data_type(), rows.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datacl column is ignored (always NULL).
rows.datacl is never written into the batch; match arm lacks "datacl". Either populate it (List) or document as intentionally NULL.
If you intend to populate:
- Add a ListBuilder for datacl.
- In the loop, append list values for each row.
- Return Arc::new(datacl_b.finish()) in the match arm for "datacl".
🤖 Prompt for AI Agents
In src/lazy_pg_catalog_helpers.rs around lines 116 to 134, the match arm for
building column arrays omits "datacl" so the datacl column is always NULL; fix
by adding a ListBuilder<StringBuilder> (List<Utf8>) named datacl_b, populate it
inside the row iteration by starting a list, appending each ACL string element
(or leaving it empty/null for rows without ACLs), finishing the list for each
row, and finally add a match arm "datacl" => Arc::new(datacl_b.finish()) so
datacl is returned instead of new_null_array(...).
src/lazy_pg_catalog_helpers.rs
Outdated
| let getiddf = ctx.sql("select max(oid)+1 from pg_catalog.pg_database").await?; | ||
| let batches = getiddf.collect().await?; | ||
| let array = batches[0] | ||
| .column(0) | ||
| .as_any() | ||
| .downcast_ref::<arrow::array::Int64Array>() | ||
| .unwrap(); | ||
| array.value(0) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle empty-table case when generating OID.
max(oid) can be NULL on an empty table; value(0) on NULL is undefined. Use COALESCE to ensure a starting value.
- let getiddf = ctx.sql("select max(oid)+1 from pg_catalog.pg_database").await?;
+ let getiddf = ctx.sql("select COALESCE(max(oid), 0)+1 from pg_catalog.pg_database").await?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let getiddf = ctx.sql("select max(oid)+1 from pg_catalog.pg_database").await?; | |
| let batches = getiddf.collect().await?; | |
| let array = batches[0] | |
| .column(0) | |
| .as_any() | |
| .downcast_ref::<arrow::array::Int64Array>() | |
| .unwrap(); | |
| array.value(0) | |
| }; | |
| let getiddf = ctx.sql("select COALESCE(max(oid), 0)+1 from pg_catalog.pg_database").await?; | |
| let batches = getiddf.collect().await?; | |
| let array = batches[0] | |
| .column(0) | |
| .as_any() | |
| .downcast_ref::<arrow::array::Int64Array>() | |
| .unwrap(); | |
| array.value(0) | |
| }; |
🤖 Prompt for AI Agents
In src/lazy_pg_catalog_helpers.rs around lines 271 to 279, the code assumes
max(oid) returns a non-NULL value and directly calls value(0), which is
undefined for empty tables; change the SQL to use COALESCE(max(oid), 0)+1 (or
equivalent) so the query always returns a non-NULL starting OID, and ensure the
result is read as an Int64Array before calling value(0) (or alternatively check
for null on the returned array element and fallback to 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/lazy_pg_catalog_helpers.rs (2)
266-276: Address the unresolved error handling issue from previous reviews.Lines 271-272 still use
let _ = ...to silently ignore errors from bothderegister_tableandregister_table, despite this being flagged in previous reviews. Whilederegisterfailures may be acceptable (the operation might not be supported), silently ignoringregisterfailures is problematic because it leaves the lazy provider uninstalled, causing the callback to never be invoked.Based on previous review feedback, apply this fix:
let wrapped: Arc<dyn TableProvider> = Arc::new(LazyDatabaseProvider::new(current.clone(), fetch_databases)); - let _ = schema.deregister_table("pg_database"); - let _ = schema.register_table("pg_database".to_string(), wrapped); + // Deregister may fail if not supported; that's acceptable + let _ = schema.deregister_table("pg_database"); + // Registration failure must be reported + if let Err(e) = schema.register_table("pg_database".to_string(), wrapped) { + log::error!("Failed to register lazy pg_database provider: {:?}", e); + return Err(e); + } }
341-366: SQL injection risk remains from previous review (though function is unused).The code still constructs SQL strings using
format!with user-controlled values (lines 341-366), despite this being flagged in previous reviews. While theescfunction handles single quotes, manual SQL construction is error-prone.Since
insert_database_rowappears to be dead code (see separate comment), the immediate risk is low. However, if this function is retained or revived, it should use DataFusion's parameterized query API:let sql = "INSERT INTO pg_catalog.pg_database ( oid, datname, datdba, encoding, datistemplate, datallowconn, datconnlimit, datfrozenxid, datminmxid, dattablespace, datcollate, datctype, datacl ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; ctx.sql(sql).await? .with_param_values(vec![ ScalarValue::Int64(Some(oid_val)), ScalarValue::Utf8(Some(datname.clone())), ScalarValue::Int32(Some(datdba)), // ... other parameters ])? .collect().await?;This eliminates injection risks and improves code clarity.
🧹 Nitpick comments (3)
tests/lazy_pg_catalog.rs (1)
46-49: Strengthen test assertions and verify returned content.The test has two weaknesses:
Line 49: Using
>= 1for callback invocation is imprecise. Considerassert_eq!(calls.load(Ordering::SeqCst), 1)unless multiple invocations are expected (if so, add a comment explaining why).Missing content verification: The test only checks the row count but doesn't verify the actual data. Consider adding assertions on the returned
datnameanddatdbavalues to ensure the callback data was correctly materialized.Apply this diff to strengthen the test:
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); assert_eq!(total_rows, 2); - assert!(calls.load(Ordering::SeqCst) >= 1); + assert_eq!(calls.load(Ordering::SeqCst), 1, "callback should be invoked exactly once per scan"); + + // Verify the returned database names + use arrow::array::StringArray; + let datname_col = batches[0].column(0).as_any().downcast_ref::<StringArray>().unwrap(); + let names: Vec<&str> = (0..datname_col.len()).map(|i| datname_col.value(i)).collect(); + assert_eq!(names, vec!["lazy_db1", "lazy_db2"]);src/lazy_pg_catalog_helpers.rs (2)
61-72: Consider caching callback results to avoid expensive re-invocations.The fetcher callback is invoked on every scan (line 72), which could be expensive if the callback performs I/O, database queries, or other costly operations. If the same query scans
pg_databasemultiple times (e.g., in a self-join or subquery), the callback will be invoked multiple times.Consider one of these approaches:
- Cache with TTL: Store callback results with a time-to-live, refreshing only when stale.
- Once-per-session: Invoke the callback once per session and cache results.
- Explicit refresh: Add a method to force refresh when needed.
If frequent re-invocation is intentional for data freshness, add a comment explaining this design decision.
61-67: Document that filter and limit pushdown are not yet supported.The
scanmethod ignores_filtersand_limitparameters (lines 65-66), meaning DataFusion applies them after the scan rather than pushing them down to the callback. This is inefficient but acceptable for an initial implementation.Add a comment documenting this limitation and the performance implications:
async fn scan( &self, state: &dyn datafusion::catalog::Session, projection: Option<&Vec<usize>>, _filters: &[Expr], // TODO: Not yet pushed down to callback; applied by DataFusion post-scan _limit: Option<usize>, // TODO: Not yet pushed down to callback; applied by DataFusion post-scan ) -> DFResult<Arc<dyn ExecutionPlan>> {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/db_table.rs(2 hunks)src/lazy_pg_catalog_helpers.rs(1 hunks)src/lib.rs(2 hunks)src/session.rs(1 hunks)tests/lazy_pg_catalog.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/db_table.rs
🧰 Additional context used
🧬 Code graph analysis (2)
tests/lazy_pg_catalog.rs (2)
src/session.rs (1)
get_base_session_context(778-862)src/lazy_pg_catalog_helpers.rs (3)
register_user_database_with_callback(257-277)new(32-37)new(227-248)
src/lazy_pg_catalog_helpers.rs (2)
src/db_table.rs (10)
new(76-89)as_any(94-96)schema(45-45)schema(49-56)schema(98-100)table_type(102-104)supports_filters_pushdown(106-111)scan(113-135)state(144-144)insert_into(137-172)src/session.rs (5)
as_any(89-91)batch(155-170)batch(909-912)batch(917-920)batch(981-984)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (4)
src/session.rs (1)
757-759: LGTM! Trait object wrapping enables polymorphic providers.The refactor from directly registering
Arc<ObservableMemTable>to wrapping it asArc<dyn TableProvider>is correct and necessary to support the new lazy catalog mechanism while maintaining backward compatibility with existing table providers.src/lib.rs (1)
6-6: LGTM! Module exposure follows established crate patterns.The new module declaration and wildcard re-export are consistent with the existing
pg_catalog_helperspattern, making the lazy catalog API conveniently accessible at the crate root.Also applies to: 18-18
src/lazy_pg_catalog_helpers.rs (2)
177-249: LGTM! Excellent documentation and sensible API design.The
LazyDatabaseRowstruct is well-documented with a comprehensive table of PostgreSQL column semantics. The builder pattern (requiring onlydatnameanddatdbain the constructor) with optional fields is user-friendly and aligns well with the default values applied during scans.The TODO on line 208 about making
datdbaoptional is noted and acceptable for future work.
280-370: Remove dead code:insert_database_rowis never invoked.The
insert_database_rowfunction is defined but never called in the current implementation. Thescanmethod builds aRecordBatchdirectly from callback rows without inserting into the underlying table. This makes the entire function dead code.If this function is intended for future use, consider:
- Removing it until needed, or
- Adding a comment explaining its planned usage, or
- Hooking it up if it should be called during scan or registration.
If it's genuinely unused, remove it to reduce maintenance burden:
-/// Insert a single database row into `pg_catalog.pg_database` if missing. -async fn insert_database_row(ctx: &SessionContext, row: &LazyDatabaseRow) -> DFResult<()> { - // ... entire function body ... -}Likely an incorrect or invalid review comment.
| async fn insert_into( | ||
| &self, | ||
| state: &dyn Session, | ||
| input: Arc<dyn ExecutionPlan>, | ||
| insert_op: InsertOp, | ||
| ) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
| // Forward inserts to the underlying provider so that SQL-based updates work. | ||
| self.inner.insert_into(state, input, insert_op).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify insert behavior or prevent inconsistent state.
The insert_into method forwards to the inner provider, but subsequent scans return fresh callback data rather than the inserted rows. This creates an inconsistency where:
- User executes
INSERT INTO pg_database ... - Insert succeeds (goes to inner table)
- User executes
SELECT * FROM pg_database - Result shows only callback data, not the inserted row
Consider one of these approaches:
-
Prevent inserts: Return an error indicating the table is read-only:
async fn insert_into(...) -> DFResult<Arc<dyn ExecutionPlan>> { Err(DataFusionError::Plan( "pg_database is populated by callback and does not support INSERT".to_string() )) }
-
Merge inserted rows: Store inserted rows separately and merge them with callback results during scans (more complex but supports both patterns).
-
Document the limitation: If the current behavior is intentional, add a comment explaining that inserts are separate from callback data.
🤖 Prompt for AI Agents
In src/lazy_pg_catalog_helpers.rs around lines 164 to 172, the insert_into
implementation forwards inserts to the inner provider but subsequent scans
return callback-generated rows causing inconsistent visibility; replace the
passthrough with a clear read-only error by returning a DataFusionError::Plan
indicating "pg_database is populated by callback and does not support INSERT"
(or, if you prefer to allow inserts, implement storing inserted rows and merging
them with callback results during scans), or at minimum add a top-of-function
comment documenting the limitation so callers understand inserts won’t appear in
subsequent callback-based scans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (5)
src/lazy_pg_catalog_helpers.rs (5)
271-272: Registration errors are still silently ignored.Despite the past review comment being marked as addressed, the code still uses
let _ = ...to ignore errors from bothderegister_tableandregister_table. This means a failed registration will silently leave the lazy provider uninstalled, causing the callback to never be invoked.Apply the previously suggested fix:
- let _ = schema.deregister_table("pg_database"); - let _ = schema.register_table("pg_database".to_string(), wrapped); + let _ = schema.deregister_table("pg_database"); + if let Err(e) = schema.register_table("pg_database".to_string(), wrapped) { + log::error!("Failed to register lazy pg_database provider: {:?}", e); + return Err(e); + }
61-67: Filters and limit are ignored, disabling pushdown.The scan implementation ignores both
filtersandlimitparameters (prefixed with_), which prevents predicate and limit pushdown optimizations. This can significantly degrade query performance, especially on large datasets.Pass filters and limit through to MemTable::scan:
async fn scan( &self, state: &dyn datafusion::catalog::Session, projection: Option<&Vec<usize>>, - _filters: &[Expr], - _limit: Option<usize>, + filters: &[Expr], + limit: Option<usize>, ) -> DFResult<Arc<dyn ExecutionPlan>> { @@ - mem.scan(state, projection, &[], None).await + mem.scan(state, projection, filters, limit).await }
74-130: datacl column is never populated.A builder for
dataclis created but never used in the column array matching logic below (lines 136-154). This results in thedataclcolumn always being NULL, which omits access privilege information from the lazy-populated database rows.To fix this, add a
ListBuilder<StringBuilder>and populate it in the row iteration loop:let mut datcollversion_b = StringBuilder::new(); +let mut datacl_b = ListBuilder::new(StringBuilder::new()); for r in &rows { @@ } else { datcollversion_b.append_null(); } + if let Some(items) = &r.datacl { + for item in items { + datacl_b.values().append_value(item); + } + datacl_b.append(true); + } else { + datacl_b.append(false); + } }Then add the match arm at line 154:
"datcollversion" => Arc::new(datcollversion_b.finish()), + "datacl" => Arc::new(datacl_b.finish()), _ => new_null_array(field.data_type(), rows.len()),
297-306: OID generation fails on empty table.When
pg_catalog.pg_databaseis empty,max(oid)returns NULL and callingvalue(0)on a NULL result is undefined behavior that may panic or return garbage.Use COALESCE to ensure a starting value:
- let getiddf = ctx - .sql("select max(oid)+1 from pg_catalog.pg_database") - .await?; + let getiddf = ctx + .sql("select COALESCE(max(oid), 0)+1 from pg_catalog.pg_database") + .await?;
341-366: SQL injection risk via manual string construction.The code manually constructs SQL strings with user-controlled values using a simple escaping function. While
eschandles single quotes, this approach is error-prone and could miss edge cases or future vulnerabilities.Refactor to use DataFusion's parameterized queries:
let sql = "INSERT INTO pg_catalog.pg_database ( oid, datname, datdba, encoding, datistemplate, datallowconn, datconnlimit, datfrozenxid, datminmxid, dattablespace, datcollate, datctype, datacl ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; let datacl_scalar = if let Some(items) = row.datacl.clone() { ScalarValue::List(Arc::new( arrow::array::StringArray::from(items) )) } else { ScalarValue::Null }; ctx.sql(sql).await? .with_param_values(vec![ ScalarValue::Int64(Some(oid_val)), ScalarValue::Utf8(Some(row.datname.clone())), ScalarValue::Int32(Some(datdba)), ScalarValue::Int32(Some(encoding)), ScalarValue::Boolean(Some(datistemplate)), ScalarValue::Boolean(Some(datallowconn)), ScalarValue::Int32(Some(datconnlimit)), ScalarValue::Utf8(Some(datfrozenxid)), ScalarValue::Utf8(Some(datminmxid)), ScalarValue::Int32(Some(dattablespace)), ScalarValue::Utf8(Some(datcollate)), ScalarValue::Utf8(Some(datctype)), datacl_scalar, ])? .collect().await?;
🧹 Nitpick comments (1)
src/session.rs (1)
757-759: Good refactor to enable provider polymorphism.Changing from
Arc<ObservableMemTable>toArc<dyn TableProvider>enables the lazy provider pattern introduced in this PR, allowingpg_databaseand other tables to be replaced with custom implementations likeLazyDatabaseProviderwhile maintaining existing behavior for non-replaced tables.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/db_table.rs(2 hunks)src/lazy_pg_catalog_helpers.rs(1 hunks)src/lib.rs(2 hunks)src/session.rs(1 hunks)tests/lazy_pg_catalog.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/lazy_pg_catalog.rs
🧰 Additional context used
🧬 Code graph analysis (2)
src/lazy_pg_catalog_helpers.rs (2)
src/db_table.rs (10)
new(76-89)as_any(94-96)schema(45-45)schema(49-56)schema(98-100)table_type(102-104)supports_filters_pushdown(106-111)scan(113-135)state(144-144)insert_into(137-172)src/session.rs (5)
as_any(89-91)batch(155-170)batch(909-912)batch(917-920)batch(981-984)
src/session.rs (2)
src/db_table.rs (1)
new(76-89)src/lazy_pg_catalog_helpers.rs (2)
new(32-37)new(227-248)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (2)
src/db_table.rs (1)
120-121: LGTM!The comment accurately reflects the new architecture where
pg_databaseprovider replacement is handled externally viaLazyDatabaseProviderregistration, removing the need for special-casing in this file.src/lib.rs (1)
6-6: LGTM!The module addition and re-export follow the existing pattern established for
pg_catalog_helpers, maintaining consistency in the crate's public API structure.Also applies to: 18-18
…tring(...) before execution so DataFusion 50 no longer hits
the list→utf8 cast panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/clean_duplicate_columns.rs (1)
70-80: Function-call aliases recorded inconsistently vs tests; use '?column?'Current code stores the function name in alias_map for Expr::Function, but tests expect "?column?" (e.g., SUBSTR case). Align by recording "?column?" for functions.
Apply this diff:
- Expr::Function(f) => { + Expr::Function(_) => { let alias = format!("alias_{}", *counter); *counter += 1; - let name = f.clone().name.to_string(); - alias_map.insert(alias.clone(), name); + alias_map.insert(alias.clone(), "?column?".to_string()); new_proj.push(SelectItem::ExprWithAlias { expr: expr.clone(), alias: Ident::new(alias), }); }
🧹 Nitpick comments (6)
src/user_functions.rs (1)
740-768: Batch OID lookup in one query per invocationBuild a set of input OIDs, run one SELECT with IN (...), and fill results from a map. This removes N queries and avoids repeated block_in_place calls.
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { let arrays = ColumnarValue::values_to_arrays(&args.args)?; let arr = &arrays[0]; let len = arr.len(); - let mut builder = StringBuilder::with_capacity(len, 16 * len.max(1)); - - for i in 0..len { - let scalar = ScalarValue::try_from_array(arr, i)?; - let oid = match scalar { - ScalarValue::Int32(v) => v.map(i64::from), - ScalarValue::Int64(v) => v, - ScalarValue::UInt32(v) => v.map(|val| val as i64), - ScalarValue::UInt64(v) => v.map(|val| val as i64), - ScalarValue::Null => None, - _ => { - return plan_err!("pg_get_userbyid expects an OID argument"); - } - }; - - if let Some(oid) = oid { - let name = self.lookup(oid)?; - builder.append_value(&name); - } else { - builder.append_null(); - } - } - - Ok(ColumnarValue::Array(Arc::new(builder.finish()) as ArrayRef)) + let mut builder = StringBuilder::with_capacity(len, 16 * len.max(1)); + + // 1) collect unique OIDs present in the input + let mut need = std::collections::BTreeSet::<i64>::new(); + let mut oids_vec = Vec::<Option<i64>>::with_capacity(len); + for i in 0..len { + let scalar = ScalarValue::try_from_array(arr, i)?; + let oid = match scalar { + ScalarValue::Int32(v) => v.map(i64::from), + ScalarValue::Int64(v) => v, + ScalarValue::UInt32(v) => v.map(|val| val as i64), + ScalarValue::UInt64(v) => v.map(|val| val as i64), + ScalarValue::Null => None, + _ => return plan_err!("pg_get_userbyid expects an OID argument"), + }; + if let Some(v) = oid { + need.insert(v); + } + oids_vec.push(oid); + } + + // 2) one query: SELECT oid, rolname FROM pg_catalog.pg_authid WHERE oid IN (...) + let mut map = std::collections::HashMap::<i64, String>::new(); + if !need.is_empty() { + let in_list = need.iter().map(|v| v.to_string()).collect::<Vec<_>>().join(", "); + let sql = format!( + "SELECT oid, rolname FROM pg_catalog.pg_authid WHERE oid IN ({})", + in_list + ); + let batches = block_in_place(|| { + let ctx = self.ctx.clone(); + block_on(async move { + let df = ctx.sql(&sql).await?; + df.collect().await + }) + })?; + for batch in batches { + let n = batch.num_rows(); + if n == 0 { + continue; + } + // OID column may be Int64 or Int32 depending on source + let name_arr = batch + .column(1) + .as_any() + .downcast_ref::<arrow::array::StringArray>() + .ok_or_else(|| DataFusionError::Execution("pg_authid.rolname must be text".into()))?; + if let Some(a64) = batch.column(0).as_any().downcast_ref::<arrow::array::Int64Array>() { + for i in 0..n { + if !a64.is_null(i) && !name_arr.is_null(i) { + map.insert(a64.value(i), name_arr.value(i).to_string()); + } + } + } else if let Some(a32) = batch.column(0).as_any().downcast_ref::<arrow::array::Int32Array>() { + for i in 0..n { + if !a32.is_null(i) && !name_arr.is_null(i) { + map.insert(a32.value(i) as i64, name_arr.value(i).to_string()); + } + } + } + } + } + + // 3) build output using the map, formatting unknowns as requested + for oid in oids_vec { + if let Some(v) = oid { + if let Some(name) = map.get(&v) { + builder.append_value(name); + } else { + builder.append_value(&format!("unknown (OID={v})")); + } + } else { + builder.append_null(); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()) as ArrayRef)) }src/replace.rs (1)
194-261: array_agg(... )::varchar rewrite: solidDetects varchar casts and rewrites to pg_catalog.array_to_string while preserving the inner aggregate expression.
Optionally also handle:
- schema-qualified pg_catalog.varchar (DataType::Custom for “pg_catalog.varchar”)
- CHARACTER VARYING alias
to catch more client variants.tests/test_validate_pg_catalog_views.py (3)
5-5: Remove unnecessary part of thenoqadirective.The static analysis tool is correct that the
noqashould only suppressARG001(unused argument) for theserverfixture, notF401(unused import). Theserverimport is used as a pytest fixture parameter in the test functions below.Apply this diff:
-from tests.test_functional import CONN_STR, server # noqa: F401 +from tests.test_functional import CONN_STR, server
19-29: Add fixture suppression comment for the unused argument.The
serverfixture is needed to start the PostgreSQL server, even though it's not directly referenced in the function body. Add a# noqa: ARG001comment to document this intentional fixture usage pattern.Apply this diff:
-def test_run_view_query_success(server): # noqa: F811 +def test_run_view_query_success(server): # noqa: ARG001
32-44: Add fixture suppression comment for the unused argument.The
serverfixture is needed to start the PostgreSQL server, even though it's not directly referenced in the function body. Add a# noqa: ARG001comment to document this intentional fixture usage pattern.Apply this diff:
-def test_run_view_query_missing_function(server): # noqa: F811 +def test_run_view_query_missing_function(server): # noqa: ARG001validate_pg_catalog_views.py (1)
44-69: Consider using spread operator for list concatenation.The stack-based traversal logic is correct. However, for better readability and performance, consider using the spread operator for list concatenation at line 69.
Apply this diff:
for key, value in node.items(): - stack.append((prefix + [key], value)) + stack.append(([*prefix, key], value))Based on static analysis.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.toml(2 hunks)src/clean_duplicate_columns.rs(1 hunks)src/logical_plan_rules.rs(1 hunks)src/replace.rs(4 hunks)src/scalar_to_cte.rs(4 hunks)src/session.rs(4 hunks)src/user_functions.rs(14 hunks)tests/test_validate_pg_catalog_views.py(1 hunks)validate_pg_catalog_views.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/logical_plan_rules.rs
🧰 Additional context used
🧬 Code graph analysis (3)
src/session.rs (2)
src/replace.rs (1)
rewrite_array_agg_varchar_cast(197-261)src/db_table.rs (1)
new(76-89)
src/user_functions.rs (3)
src/pg_catalog_helpers.rs (19)
array(354-354)array(356-356)array(383-383)array(385-385)batches(92-95)batches(568-571)batches(627-630)batches(633-636)batches(639-642)batches(645-648)batches(651-654)batches(657-660)batches(715-718)batches(721-724)batches(727-730)batches(733-736)batches(740-743)batches(746-749)batches(752-755)src/session.rs (5)
batch(154-169)batch(909-912)batch(917-920)batch(981-984)as_any(88-90)src/db_table.rs (5)
new(76-89)as_any(94-96)schema(45-45)schema(49-56)schema(98-100)
tests/test_validate_pg_catalog_views.py (1)
validate_pg_catalog_views.py (4)
ViewDefinition(16-25)collect_view_definitions(35-41)run_view_query(72-84)qualified_name(24-25)
🪛 Ruff (0.14.0)
tests/test_validate_pg_catalog_views.py
5-5: Unused noqa directive (non-enabled: F401)
Remove unused noqa directive
(RUF100)
19-19: Unused function argument: server
(ARG001)
32-32: Unused function argument: server
(ARG001)
validate_pg_catalog_views.py
69-69: Consider [*prefix, key] instead of concatenation
Replace with [*prefix, key]
(RUF005)
139-139: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (15)
src/scalar_to_cte.rs (1)
577-581: Pattern-match simplification: LGTMConsolidating SelectItem patterns and scanning via a single branch is clear and keeps behavior unchanged.
src/replace.rs (1)
66-78: SET variable namespace prefixing: LGTMCovers single, parenthesized, and multiple assignments. Safe no-op for other forms.
Cargo.toml (1)
11-14: Dependencies match DataFusion 50.2.0 requirementsarrow 56.0.0 and sqlparser 0.58.0 align with DataFusion 50.2.0’s dependencies, no duplicates or mismatches.
src/session.rs (4)
17-24: LGTM! Import addition is consistent with usage.The import of
rewrite_array_agg_varchar_castis properly placed and used in the rewrite pipeline at line 267.
267-267: LGTM! Rewrite function added to the pipeline.The
rewrite_array_agg_varchar_castfunction is correctly integrated into the rewrite pipeline. The implementation insrc/replace.rs(lines 196-260) transformsCAST(array_agg(...) AS VARCHAR)topg_catalog.array_to_string(array_agg(...), ','), which is appropriate for PostgreSQL compatibility.
508-508: LGTM! Removed unnecessarymutqualifier.The variable
partialis not mutated after assignment, so removing themutqualifier is correct.
757-759: LGTM! Provider typing change enables lazy catalog integration.The change from
Arc<ObservableMemTable>toArc<dyn TableProvider>trait object is correct and aligns with the lazy population mechanism introduced in the PR. This enables runtime swapping of providers (e.g.,LazyDatabaseProviderforpg_database).tests/test_validate_pg_catalog_views.py (1)
13-16: LGTM! Test validates view discovery.The test correctly verifies that
collect_view_definitionscan findpg_catalog.pg_tablesfrom the YAML files.validate_pg_catalog_views.py (7)
10-12: LGTM! Default connection string is appropriate for dev/test environment.The connection string targets a local development PostgreSQL instance, which is suitable for validation scripts.
15-26: LGTM! Well-structured dataclass for view definitions.The frozen dataclass with a computed
qualified_nameproperty provides good encapsulation and immutability.
35-41: LGTM! View collection logic is clean and straightforward.The function correctly globs for YAML files, loads them, and aggregates view definitions.
72-84: LGTM! Comprehensive error handling for view query execution.The function correctly handles:
- Empty SQL validation
- Specific UndefinedFunction exception for missing functions
- General exception catch-all with appropriate noqa for broad exception
87-117: LGTM! Clear and informative report generation.The report provides good categorization of results (success, missing functions, other failures) with helpful details for debugging failures.
120-144: LGTM! Well-structured CLI with sensible defaults.The argparse-based CLI provides appropriate configuration options and follows good practices with context manager for database connection.
139-139: Exception message length is acceptable for a validation script.While the static analysis tool flags the long message, it's acceptable here since this is a top-level CLI script where a clear error message is more valuable than strict adherence to exception class conventions. However, if you prefer, you could define a custom exception class.
Based on static analysis hint, but this is more of a style preference than a critical issue.
| fn fetch_user_by_oid(ctx: Arc<SessionContext>, oid: i64) -> Result<String> { | ||
| block_in_place(|| { | ||
| let ctx = ctx.clone(); | ||
| block_on(async move { | ||
| let query = format!( | ||
| "SELECT rolname FROM pg_catalog.pg_authid WHERE oid = {} LIMIT 1", | ||
| oid | ||
| ); | ||
| let df = ctx.sql(&query).await?; | ||
| let batches = df.collect().await?; | ||
| for batch in batches { | ||
| if batch.num_rows() == 0 { | ||
| continue; | ||
| } | ||
| let col = batch.column(0); | ||
| let arr = col | ||
| .as_any() | ||
| .downcast_ref::<arrow::array::StringArray>() | ||
| .ok_or_else(|| { | ||
| DataFusionError::Execution( | ||
| "pg_catalog.pg_authid.rolname must be text".to_string(), | ||
| ) | ||
| })?; | ||
| if !arr.is_null(0) { | ||
| return Ok(arr.value(0).to_string()); | ||
| } | ||
| } | ||
| Ok(format!("unknown (OID={oid})")) | ||
| }) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per-row ctx.sql in UDF risks poor performance and re‑entrancy
fetch_user_by_oid issues a query per input row via block_in_place. For larger batches this will be very slow and may stress the runtime. Batch OIDs and run a single lookup per invocation instead.
Apply this refactor in invoke_with_args (see next comment) and drop per-row calls. Keep fetch_user_by_oid as a fallback if you prefer incremental changes.
🤖 Prompt for AI Agents
In src/user_functions.rs around lines 645 to 675, fetch_user_by_oid performs a
blocking per-row ctx.sql call which will be slow and can stress the runtime;
instead, change the callsite (invoke_with_args) to collect all OIDs for the
current UDF invocation, deduplicate them, and perform a single asynchronous
lookup (one SQL query using WHERE oid IN (...)) that returns a HashMap<oid,
rolname>; then use that map to populate results for each input row, falling back
to fetch_user_by_oid only for any missing single OIDs or for incremental changes
while refactoring. Ensure the batched query is run outside per-row blocking
(await it once), handle empty/missing values by inserting "unknown (OID=...)"
entries, and preserve current error handling and types when converting the
batched results back to the UDF result stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
task_move_views_out_from_stub_data.md (1)
1-100: Clarify the purpose and location of this file.This file appears to contain raw validation/test output documenting 67 failing pg_catalog views. Consider whether:
- This should be committed to version control or generated on-demand
- If it's documentation, it might belong in a
docs/directory- If it's task tracking, it might be better suited as a GitHub issue
- The note on line 21 ("this can stay as is but need cleanup") suggests pending work
Is this file intended to be:
- A permanent reference document?
- Temporary output that should be excluded from the repository?
- Task tracking that should be converted to issues?
src/session.rs (2)
155-177: Consider a more flexible approach for the view-only table list.The hardcoded
VIEW_ONLY_TABLESconstant works for the current single entry but could become unwieldy as more view-only tables are added. Consider loading this from configuration or deriving it from thetypefield in the YAML schema.Note: The AI summary indicates these types are exposed publicly, but they lack
pubvisibility modifiers. Verify whether they need to be public or if they should remain module-private.
871-915: View creation logic is correct, but consider adding error context.The function properly handles:
- Schema switching optimization (lines 885-888)
- State restoration (lines 910-912)
- UDF lifecycle management consistent with
execute_sql_innerThe temporary UDFs registered during
rewrite_query(line 901) are correctly deregistered after view creation (lines 905-907) because they're only needed for SQL rewriting, not for the view's persistent definition. When the view is later queried, the SQL will be rewritten again with fresh temp UDFs.Consider wrapping the CREATE VIEW execution (line 904) with additional error context:
- ctx.sql(&create_sql).await?.collect().await?; + ctx.sql(&create_sql).await?.collect().await + .map_err(|e| DataFusionError::Execution( + format!("Failed to create view {}: {}", qualified, e) + ))?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/session.rs(20 hunks)task_move_views_out_from_stub_data.md(1 hunks)tests/public_api.rs(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/public_api.rs (2)
src/session.rs (2)
get_base_session_context(929-1016)as_any(88-90)src/db_table.rs (4)
schema(45-45)schema(49-56)schema(98-100)as_any(94-96)
src/session.rs (2)
src/replace.rs (1)
rewrite_array_agg_varchar_cast(197-261)src/db_table.rs (5)
schema(45-45)schema(49-56)schema(98-100)new(76-89)state(144-144)
🪛 LanguageTool
task_move_views_out_from_stub_data.md
[style] ~74-~74: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,301)..Location(1,321)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~75-~75: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ..._catalog"), span: Span(Location(1,743)..Location(1,755)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~75-~75: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,763)..Location(1,783)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~82-~82: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,377)..Location(1,397)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~89-~89: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,256)..Location(1,276)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~90-~90: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ..._catalog"), span: Span(Location(1,182)..Location(1,194)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~90-~90: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,202)..Location(1,222)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~91-~91: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ..._catalog"), span: Span(Location(1,350)..Location(1,362)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~91-~91: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...n_schema"), span: Span(Location(1,370)..Location(1,390)) }), data_type: Text, format: No...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
🔇 Additional comments (12)
tests/public_api.rs (2)
3-5: LGTM!The import of
ObservableMemTableis correctly added and properly used in the new test to verify view registration semantics.
54-90: LGTM! Well-structured test.This test effectively verifies that
pg_viewsis registered as a proper view rather than anObservableMemTable, which aligns with the PR's goal of implementing lazy data population. The test includes:
- A functional check that the view returns data
- A structural check using type introspection to verify registration semantics
- Clear assertion messages
src/session.rs (10)
17-23: LGTM!The import addition for
rewrite_array_agg_varchar_castis consistent with the existing import pattern and aligns with its usage at line 327.
148-149: LGTM!The addition of the optional
view_sqlfield properly uses#[serde(default)]to handle its absence in existing YAML files.
586-783: LGTM! Refactoring improves code clarity.The transition from returning a tuple to returning the
ParsedTablestruct significantly improves code readability while preserving the existing table-building logic. The addition ofview_sqlandis_viewfields properly supports the new view registration feature.
785-869: LGTM! View registration logic is sound.The refactored function properly separates view registration from table registration:
- View-only tables are collected for later registration via CREATE VIEW statements
- Warning messages at lines 836-840 and 850-853 appropriately flag metadata inconsistencies
- Fallback to table registration (line 855) provides robustness when view_sql is missing
949-950: LGTM!The two-phase registration approach (tables first at line 950, then views at line 1008) is correct because views may reference the registered tables.
1008-1008: LGTM!View creation is properly placed after all UDF registration, ensuring that views can reference the registered functions.
1050-1052: LGTM!Test correctly updated to access schema and batches through the new
ParsedTablestructure fields.
1131-1136: LGTM!Test correctly updated to access schema and batches through the new
ParsedTablestructure fields.
462-583: LGTM! Function signature updates are consistent.All parsing functions have been consistently updated to work with
ParsedTableinstead of tuples. Themerge_schema_mapsfunction signature (lines 573-576) correctly reflects the new type throughout the nested HashMap structure.
179-211: Based on my verification, DataFusion runtime configurations are set via SQL using the SET command, and once RuntimeEnv and RuntimeConfig were created, they cannot be changed through direct programmatic APIs after context creation.The implementation in
set_default_schemacorrectly uses the SQL SET command to configure options at runtime, which is the documented way to set session configurations. The use ofquote_literal()to escape the schema name follows proper SQL escaping rules for string literals.No issues found; code is correct.
The original review comment's suggestion for a "more robust" programmatic API does not exist in DataFusion for runtime config changes. The current implementation is the intended pattern.
Summary by CodeRabbit
New Features
New Tools
Tests