Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 79 additions & 74 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,23 @@ pub struct InformationSchemaProvider {

impl InformationSchemaProvider {
/// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
pub fn new(
catalog_list: Arc<dyn CatalogProviderList>,
catalog_name: Arc<str>,
) -> Self {
Self {
config: InformationSchemaConfig { catalog_list },
config: InformationSchemaConfig {
catalog_list,
catalog_name,
},
}
}
}

#[derive(Clone, Debug)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogProviderList>,
catalog_name: Arc<str>,
}

impl InformationSchemaConfig {
Expand All @@ -92,53 +99,52 @@ impl InformationSchemaConfig {
&self,
builder: &mut InformationSchemaTablesBuilder,
) -> Result<(), DataFusionError> {
// create a mem table with the names of tables
let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else {
return Ok(());
};

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_table(
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
// create a mem table with the names of tables
for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_table(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
}
}
}
}

// Add a final list for the information schema tables themselves
for table_name in INFORMATION_SCHEMA_TABLES {
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
table_name,
TableType::View,
);
}
// Add a final list for the information schema tables themselves
for table_name in INFORMATION_SCHEMA_TABLES {
builder.add_table(
&self.catalog_name,
INFORMATION_SCHEMA,
table_name,
TableType::View,
);
}

Ok(())
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else {
return;
};

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&self.catalog_name, &schema_name, schema_owner);
}
}
}
Expand All @@ -148,28 +154,27 @@ impl InformationSchemaConfig {
&self,
builder: &mut InformationSchemaViewBuilder,
) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_view(
&catalog_name,
&schema_name,
&table_name,
table.get_table_definition(),
)
}
let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else {
return Ok(());
};

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_view(
&self.catalog_name,
&schema_name,
&table_name,
table.get_table_definition(),
)
}
}
}
}
}

Ok(())
}

Expand All @@ -178,26 +183,26 @@ impl InformationSchemaConfig {
&self,
builder: &mut InformationSchemaColumnsBuilder,
) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
builder.add_column(
&catalog_name,
&schema_name,
&table_name,
field_position,
field,
)
}
let Some(catalog) = self.catalog_list.catalog(&self.catalog_name) else {
return Ok(());
};

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
builder.add_column(
&self.catalog_name,
&schema_name,
&table_name,
field_position,
field,
)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ impl SessionState {
let resolved_ref = self.resolve_table_ref(table_ref);
if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA
{
return Ok(Arc::new(InformationSchemaProvider::new(Arc::clone(
&self.catalog_list,
))));
return Ok(Arc::new(InformationSchemaProvider::new(
Arc::clone(&self.catalog_list),
resolved_ref.catalog,
)));
}

self.catalog_list
Expand Down
Loading