From a8a2e3b7c269fd0b2557a1c00827bf3bb966d374 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 14:49:05 +0300 Subject: [PATCH 1/3] Build information schema per catalog --- datafusion/catalog/src/information_schema.rs | 149 +++++++++--------- .../core/src/execution/session_state.rs | 7 +- 2 files changed, 78 insertions(+), 78 deletions(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 7948c0299d393..1b5bac2e301f9 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -20,7 +20,7 @@ //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema use crate::streaming::StreamingTable; -use crate::{CatalogProviderList, SchemaProvider, TableProvider}; +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; use arrow::array::builder::{BooleanBuilder, UInt8Builder}; use arrow::{ array::{StringBuilder, UInt64Builder}, @@ -74,9 +74,9 @@ pub struct InformationSchemaProvider { impl InformationSchemaProvider { /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list` - pub fn new(catalog_list: Arc) -> Self { + pub fn new(catalog_list: Arc, catalog_name: Arc) -> Self { Self { - config: InformationSchemaConfig { catalog_list }, + config: InformationSchemaConfig { catalog_list, catalog_name }, } } } @@ -84,6 +84,7 @@ impl InformationSchemaProvider { #[derive(Clone, Debug)] struct InformationSchemaConfig { catalog_list: Arc, + catalog_name: Arc } impl InformationSchemaConfig { @@ -92,53 +93,52 @@ impl InformationSchemaConfig { &self, builder: &mut InformationSchemaTablesBuilder, ) -> Result<(), DataFusionError> { + let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { + return Ok(()); + }; + // create a mem table with the names of tables - - for catalog_name in self.catalog_list.catalog_names() { - let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); - - for schema_name in catalog.schema_names() { - if schema_name != INFORMATION_SCHEMA { - // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name) { - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - builder.add_table( - &catalog_name, - &schema_name, - &table_name, - table.table_type(), - ); - } + for schema_name in catalog.schema_names() { + if schema_name != INFORMATION_SCHEMA { + // schema name may not exist in the catalog, so we need to check + if let Some(schema) = catalog.schema(&schema_name) { + for table_name in schema.table_names() { + if let Some(table) = schema.table(&table_name).await? { + builder.add_table( + &self.catalog_name, + &schema_name, + &table_name, + table.table_type(), + ); } } } } + } - // Add a final list for the information schema tables themselves - for table_name in INFORMATION_SCHEMA_TABLES { - builder.add_table( - &catalog_name, - INFORMATION_SCHEMA, - table_name, - TableType::View, - ); - } + // Add a final list for the information schema tables themselves + for table_name in INFORMATION_SCHEMA_TABLES { + builder.add_table( + &self.catalog_name, + INFORMATION_SCHEMA, + table_name, + TableType::View, + ); } Ok(()) } async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) { - for catalog_name in self.catalog_list.catalog_names() { - let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); - - for schema_name in catalog.schema_names() { - if schema_name != INFORMATION_SCHEMA { - if let Some(schema) = catalog.schema(&schema_name) { - let schema_owner = schema.owner_name(); - builder.add_schemata(&catalog_name, &schema_name, schema_owner); - } + let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { + return + }; + + for schema_name in catalog.schema_names() { + if schema_name != INFORMATION_SCHEMA { + if let Some(schema) = catalog.schema(&schema_name) { + let schema_owner = schema.owner_name(); + builder.add_schemata(&self.catalog_name, &schema_name, schema_owner); } } } @@ -148,28 +148,27 @@ impl InformationSchemaConfig { &self, builder: &mut InformationSchemaViewBuilder, ) -> Result<(), DataFusionError> { - for catalog_name in self.catalog_list.catalog_names() { - let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); - - for schema_name in catalog.schema_names() { - if schema_name != INFORMATION_SCHEMA { - // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name) { - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - builder.add_view( - &catalog_name, - &schema_name, - &table_name, - table.get_table_definition(), - ) - } + let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { + return Ok(()); + }; + + for schema_name in catalog.schema_names() { + if schema_name != INFORMATION_SCHEMA { + // schema name may not exist in the catalog, so we need to check + if let Some(schema) = catalog.schema(&schema_name) { + for table_name in schema.table_names() { + if let Some(table) = schema.table(&table_name).await? { + builder.add_view( + &self.catalog_name, + &schema_name, + &table_name, + table.get_table_definition(), + ) } } } } } - Ok(()) } @@ -178,26 +177,26 @@ impl InformationSchemaConfig { &self, builder: &mut InformationSchemaColumnsBuilder, ) -> Result<(), DataFusionError> { - for catalog_name in self.catalog_list.catalog_names() { - let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); - - for schema_name in catalog.schema_names() { - if schema_name != INFORMATION_SCHEMA { - // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name) { - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - for (field_position, field) in - table.schema().fields().iter().enumerate() - { - builder.add_column( - &catalog_name, - &schema_name, - &table_name, - field_position, - field, - ) - } + let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { + return Ok(()); + }; + + for schema_name in catalog.schema_names() { + if schema_name != INFORMATION_SCHEMA { + // schema name may not exist in the catalog, so we need to check + if let Some(schema) = catalog.schema(&schema_name) { + for table_name in schema.table_names() { + if let Some(table) = schema.table(&table_name).await? { + for (field_position, field) in + table.schema().fields().iter().enumerate() + { + builder.add_column( + &self.catalog_name, + &schema_name, + &table_name, + field_position, + field, + ) } } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 74708b7fa7bef..c31e2a7e1dc8c 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -306,9 +306,10 @@ impl SessionState { let resolved_ref = self.resolve_table_ref(table_ref); if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA { - return Ok(Arc::new(InformationSchemaProvider::new(Arc::clone( - &self.catalog_list, - )))); + return Ok(Arc::new(InformationSchemaProvider::new( + Arc::clone(&self.catalog_list), + resolved_ref.catalog, + ))); } self.catalog_list From 9497b453360f4a54171bc6931ec8b5f8008579c2 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 14:54:23 +0300 Subject: [PATCH 2/3] Fmt --- datafusion/catalog/src/information_schema.rs | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 1b5bac2e301f9..cd461f350ecd5 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -74,9 +74,15 @@ pub struct InformationSchemaProvider { impl InformationSchemaProvider { /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list` - pub fn new(catalog_list: Arc, catalog_name: Arc) -> Self { + pub fn new( + catalog_list: Arc, + catalog_name: Arc, + ) -> Self { Self { - config: InformationSchemaConfig { catalog_list, catalog_name }, + config: InformationSchemaConfig { + catalog_list, + catalog_name, + }, } } } @@ -84,7 +90,7 @@ impl InformationSchemaProvider { #[derive(Clone, Debug)] struct InformationSchemaConfig { catalog_list: Arc, - catalog_name: Arc + catalog_name: Arc, } impl InformationSchemaConfig { @@ -96,7 +102,7 @@ impl InformationSchemaConfig { let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { return Ok(()); }; - + // create a mem table with the names of tables for schema_name in catalog.schema_names() { if schema_name != INFORMATION_SCHEMA { @@ -131,9 +137,9 @@ impl InformationSchemaConfig { async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) { let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { - return + return; }; - + for schema_name in catalog.schema_names() { if schema_name != INFORMATION_SCHEMA { if let Some(schema) = catalog.schema(&schema_name) { @@ -151,7 +157,7 @@ impl InformationSchemaConfig { let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { return Ok(()); }; - + for schema_name in catalog.schema_names() { if schema_name != INFORMATION_SCHEMA { // schema name may not exist in the catalog, so we need to check @@ -180,7 +186,7 @@ impl InformationSchemaConfig { let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else { return Ok(()); }; - + for schema_name in catalog.schema_names() { if schema_name != INFORMATION_SCHEMA { // schema name may not exist in the catalog, so we need to check From a8f1dac6e5d2bba565621652b51543c4c766a984 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 14:55:10 +0300 Subject: [PATCH 3/3] Fmt --- datafusion/catalog/src/information_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index cd461f350ecd5..990de89bb5221 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -20,7 +20,7 @@ //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema use crate::streaming::StreamingTable; -use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; +use crate::{CatalogProviderList, SchemaProvider, TableProvider}; use arrow::array::builder::{BooleanBuilder, UInt8Builder}; use arrow::{ array::{StringBuilder, UInt64Builder},