Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 206 additions & 22 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
* under the License.
*/

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::config::HudiConfigs;
use crate::file_group::base_file::BaseFile;
use crate::file_group::{FileGroup, FileSlice};
use crate::storage::{get_leaf_dirs, Storage};
use async_recursion::async_recursion;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;

use crate::config::read::HudiReadConfig::ListingParallelism;
use crate::error::CoreError;
Expand All @@ -33,6 +34,8 @@
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};

pub const HOODIE_PARTITION_METAFILE_NAME: &str = ".hoodie_partition_metadata";

/// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and
/// access the file groups and file slices.
#[derive(Clone, Debug)]
Expand All @@ -57,6 +60,7 @@
})
}

#[allow(dead_code)]
async fn list_all_partition_paths(storage: &Storage) -> Result<Vec<String>> {
Self::list_partition_paths(storage, &PartitionPruner::empty()).await
}
Expand All @@ -73,34 +77,84 @@
.collect();
let mut partition_paths = Vec::new();
for dir in top_level_dirs {
partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
let leaf_paths = if partition_pruner.is_empty() {
// full dirs listing
get_leaf_dirs(storage, Some(&dir)).await?
} else {
// leveled pruning
Self::list_partition_paths_with_leveled_pruning(storage, dir, partition_pruner, 0)
.await?
};
partition_paths.extend(leaf_paths);

Check warning on line 88 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L88

Added line #L88 was not covered by tests
}
if partition_paths.is_empty() {
// TODO: reconsider is it reasonable to add empty partition path? For partitioned table, we should return empty vec rather than vec with empty string
partition_paths.push("".to_string())
}
Comment on lines 90 to 93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing this out. This should be improved. I've made changes for this in #251 . In short, we should return "" for non-partitioned tables, this aligns with the convention we're using in timeline commit metadata where "" is also used as a key for the partition write stats. We should return empty list like you said here for partitioned table if there is not yet any partition written.

if partition_pruner.is_empty() {
return Ok(partition_paths);
}

Ok(partition_paths
.into_iter()
.filter(|path_str| partition_pruner.should_include(path_str))
.collect())
Ok(partition_paths.into_iter().collect())
}

#[async_recursion]
async fn list_partition_paths_with_leveled_pruning(
storage: &Storage,
path: String,
partition_pruner: &PartitionPruner,
current_level: usize,
) -> Result<Vec<String>> {
Comment on lines +99 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be merged with the get_leaf_dirs()? so that we don't need to go 2 paths that one does multi-level pruning and the other does not. We can by default use multi-level pruning and early return when possible.

Check out this line https://github.com/apache/hudi-rs/pull/251/files#diff-a6c5a16b1d83ac9b5d186863a123fd1b273daf8710c12444c9f1b6f6fec96914R140

we probably need to move get_leaf_dirs() into FileLister such that it's the lister's responsibility to use pruner to check partition segments.

PartitionPruner owns the partition schema, so we should make use of it to understand which level is currently parsed and how many levels there should be.

To make code logic easy to understand, maybe iterative is better than recursive here - just loop through all the top level dirs and go down according to the schema.

// TODO: consider stop iterating when visit the `partition_metadata` file
let mut leaf_matched_dirs = Vec::new();
// 1. Check if the current level path can be pruned
if !partition_pruner.should_include_with_level(path.as_str(), current_level) {
// if current level path can be pruned, return empty list
return Ok(leaf_matched_dirs);
}
// 2. Iterate over all child directories and keep listing with pruning
let child_dirs = storage.list_dirs(Some(&path)).await?;
if child_dirs.is_empty() {
// if no child directories, return the current path
leaf_matched_dirs.push(path);
} else {
for child_dir in child_dirs {
let mut child_full_path = PathBuf::new();
child_full_path.push(&path);
child_full_path.push(&child_dir);
leaf_matched_dirs.extend(
Self::list_partition_paths_with_leveled_pruning(
storage,
child_full_path.to_str().unwrap().to_string(),
partition_pruner,
current_level + 1,
)
.await?,
);
}
}
Ok(leaf_matched_dirs)
}

async fn list_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
) -> Result<Vec<FileGroup>> {
let file_metadata: Vec<FileMetadata> = storage
.list_files(Some(partition_path))
.await?
) -> Result<(
bool, /*if valid partition dir*/
Vec<FileGroup>, /*file groups*/
)> {
let files = storage.list_files(Some(partition_path)).await?;
if !files
.iter()
.any(|f| f.name.eq(HOODIE_PARTITION_METAFILE_NAME))
{
// not a partition directory
return Ok((false, Vec::new()));
}
let data_files_metadata: Vec<FileMetadata> = files
.into_iter()
.filter(|f| f.name.ends_with(".parquet"))
.collect();

let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = HashMap::new();
for metadata in file_metadata {
for metadata in data_files_metadata {
let base_file = BaseFile::try_from(metadata)?;
let fg_id = &base_file.file_group_id;
fg_id_to_base_files
Expand All @@ -117,16 +171,16 @@
}
file_groups.push(fg);
}
Ok(file_groups)
Ok((true, file_groups))
}

async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> Result<()> {
let all_partition_paths = Self::list_all_partition_paths(&self.storage).await?;
let need_partition_paths =
Self::list_partition_paths(&self.storage, partition_pruner).await?;

let partition_paths_to_list = all_partition_paths
let partition_paths_to_list = need_partition_paths
.into_iter()
.filter(|p| !self.partition_to_file_groups.contains_key(p))
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();

let parallelism = self
Expand All @@ -140,8 +194,10 @@
Ok::<_, CoreError>((path, file_groups))
})
.buffer_unordered(parallelism)
.try_for_each(|(path, file_groups)| async move {
self.partition_to_file_groups.insert(path, file_groups);
.try_for_each(|(path, (valid_partition_path, file_groups))| async move {
if valid_partition_path {
self.partition_to_file_groups.insert(path, file_groups);
}
Ok(())
})
.await
Expand Down Expand Up @@ -194,6 +250,7 @@
use crate::table::partition::PartitionPruner;
use crate::table::Table;

use arrow_schema::Schema;
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -334,4 +391,131 @@
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
}
}

#[tokio::test]
async fn fs_view_get_latest_file_slices_with_complex_partition_filters() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();

fn create_filter(
filter_triples: Vec<(&str, &str, &str)>,
schema: &Schema,
config: &HudiConfigs,
) -> PartitionPruner {
let filters: Vec<Filter> = filter_triples
.iter()
.map(|binary_expr_tuple| Filter::try_from(*binary_expr_tuple).unwrap())
.collect();
PartitionPruner::new(&filters, schema, config).unwrap()
}

verify_partition_pruning(
base_url.clone(),
create_filter(
vec![("byteField", "<", "20"), ("shortField", "=", "300")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
),
1,
1,
vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"],
)
.await;

verify_partition_pruning(
base_url.clone(),
create_filter(
vec![("byteField", "<", "20"), ("shortField", "=", "100")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
),
0,
0,
vec![],
)
.await;

verify_partition_pruning(
base_url.clone(),
create_filter(
vec![("byteField", "<=", "20")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
),
2,
2,
vec![
"a22e8257-e249-45e9-ba46-115bc85adcba-0",
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0",
],
)
.await;

verify_partition_pruning(
base_url.clone(),
create_filter(
vec![("shortField", ">", "100")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
),
1,
1,
vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"],
)
.await;

verify_partition_pruning(
base_url.clone(),
create_filter(
vec![("shortField", "=", "100")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
),
2,
2,
vec![
"4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0",
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0",
],
)
.await;

verify_partition_pruning(
base_url.clone(),
PartitionPruner::empty(),
3,
3,
vec![
"4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0",
"a22e8257-e249-45e9-ba46-115bc85adcba-0",
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0",
],
)
.await;
}

async fn verify_partition_pruning(
base_url: Url,
partition_pruner: PartitionPruner,
expected_fg_num: usize,
expected_slices_num: usize,
expected_fg_ids: Vec<&str>,
) {
let fs_view = create_test_fs_view(base_url).await;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of("20240418173235694", &partition_pruner, &excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), expected_fg_num);
assert_eq!(file_slices.len(), expected_slices_num);
let mut fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
fg_ids.sort();
assert_eq!(fg_ids, expected_fg_ids);
}
}
Loading
Loading