Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ jobs:
strategy:
fail-fast: false
matrix:
# TODO: add windows which does not support container
os: [ ubuntu-24.04 ]
runs-on: ${{ matrix.os }}
container:
Expand All @@ -77,6 +76,13 @@ jobs:
path: ./cov-reports
if-no-files-found: 'error'

rust-tests-windows:
runs-on: windows-2025
steps:
- uses: actions/checkout@v5
- name: Run unit tests on Windows with no coverage report
run: cargo test --no-fail-fast --all-targets --all-features --workspace

python-tests:
strategy:
fail-fast: false
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/file_group/file_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
use crate::file_group::log_file::LogFile;
use crate::storage::util::join_path_segments;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeSet;
use std::fmt::Display;
use std::path::PathBuf;

/// Within a [crate::file_group::FileGroup],
/// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s.
Expand Down Expand Up @@ -78,10 +78,10 @@ impl FileSlice {
}

fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!("Failed to get relative path for file: {file_name}",))
})
Ok(join_path_segments(&[
self.partition_path.as_str(),
file_name,
])?)
}

/// Returns the relative path of the [BaseFile] in the [FileSlice].
Expand Down
29 changes: 20 additions & 9 deletions crates/core/src/file_group/log_file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,36 +289,47 @@ mod tests {
use crate::storage::util::parse_uri;
use apache_avro::schema::Schema as AvroSchema;
use std::fs::canonicalize;
use std::path::PathBuf;

fn get_valid_log_avro_data() -> (String, String) {
let dir = PathBuf::from("tests/data/log_files/valid_log_avro_data");
(
canonicalize(dir).unwrap().to_str().unwrap().to_string(),
canonicalize("tests/data/log_files/valid_log_avro_data")
.unwrap()
.to_str()
.unwrap()
.to_string(),
".ff32ab89-5ad0-4968-83b4-89a34c95d32f-0_20250316025816068.log.1_0-54-122".to_string(),
)
}

fn get_valid_log_parquet_data() -> (String, String) {
let dir = PathBuf::from("tests/data/log_files/valid_log_parquet_data");
(
canonicalize(dir).unwrap().to_str().unwrap().to_string(),
canonicalize("tests/data/log_files/valid_log_parquet_data")
.unwrap()
.to_str()
.unwrap()
.to_string(),
".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(),
)
}

fn get_valid_log_delete() -> (String, String) {
let dir = PathBuf::from("tests/data/log_files/valid_log_delete");
(
canonicalize(dir).unwrap().to_str().unwrap().to_string(),
canonicalize("tests/data/log_files/valid_log_delete")
.unwrap()
.to_str()
.unwrap()
.to_string(),
".6d3d1d6e-2298-4080-a0c1-494877d6f40a-0_20250618054711154.log.1_0-26-85".to_string(),
)
}

fn get_valid_log_rollback() -> (String, String) {
let dir = PathBuf::from("tests/data/log_files/valid_log_rollback");
(
canonicalize(dir).unwrap().to_str().unwrap().to_string(),
canonicalize("tests/data/log_files/valid_log_rollback")
.unwrap()
.to_str()
.unwrap()
.to_string(),
".0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1".to_string(),
)
}
Expand Down
47 changes: 15 additions & 32 deletions crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,51 +275,34 @@ mod tests {
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;
use url::Url;

fn get_non_existent_base_uri() -> String {
"file:///non-existent-path/table".to_string()
}

fn get_base_uri_with_valid_props() -> String {
let url = Url::from_file_path(
canonicalize(
PathBuf::from("tests")
.join("data")
.join("table_props_valid"),
)
.unwrap(),
)
.unwrap();
url.as_ref().to_string()
canonicalize("tests/data/table_props_valid")
.unwrap()
.to_str()
.unwrap()
.to_string()
}

fn get_base_uri_with_valid_props_minimum() -> String {
let url = Url::from_file_path(
canonicalize(
PathBuf::from("tests")
.join("data")
.join("table_props_valid_minimum"),
)
.unwrap(),
)
.unwrap();
url.as_ref().to_string()
canonicalize("tests/data/table_props_valid_minimum")
.unwrap()
.to_str()
.unwrap()
.to_string()
}

fn get_base_uri_with_invalid_props() -> String {
let url = Url::from_file_path(
canonicalize(
PathBuf::from("tests")
.join("data")
.join("table_props_invalid"),
)
.unwrap(),
)
.unwrap();
url.as_ref().to_string()
canonicalize("tests/data/table_props_invalid")
.unwrap()
.to_str()
.unwrap()
.to_string()
}

#[test]
Expand Down
16 changes: 3 additions & 13 deletions crates/core/src/schema/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use crate::avro_to_arrow::to_arrow_schema;
use crate::config::table::HudiTableConfig;
use crate::error::{CoreError, Result};
use crate::schema::prepend_meta_fields;
use crate::storage::util::join_path_segments;
use crate::storage::Storage;
use crate::table::Table;
use apache_avro::schema::Schema as AvroSchema;
use arrow_schema::{Schema, SchemaRef};
use serde_json::{Map, Value};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

/// Resolves the [`arrow_schema::Schema`] for a given Hudi table.
Expand Down Expand Up @@ -154,17 +153,8 @@ async fn resolve_schema_from_base_file(
"Failed to resolve the latest schema: no file path found".to_string(),
)
})?;
let parquet_file_path_buf = PathBuf::from_str(partition_path)
.map_err(|e| {
CoreError::CommitMetadata(format!("Failed to resolve the latest schema: {}", e))
})?
.join(base_file);
let path = parquet_file_path_buf.to_str().ok_or_else(|| {
CoreError::CommitMetadata(
"Failed to resolve the latest schema: invalid file path".to_string(),
)
})?;
Ok(storage.get_parquet_file_schema(path).await?)
let parquet_file_path = join_path_segments(&[partition_path, base_file])?;
Ok(storage.get_parquet_file_schema(&parquet_file_path).await?)
}
None => Err(CoreError::CommitMetadata(
"Failed to resolve the latest schema: no file path found".to_string(),
Expand Down
10 changes: 6 additions & 4 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl Storage {
Ok(bytes)
}

pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result<Bytes> {
let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?;
pub async fn get_file_data_from_url_path(&self, path: impl AsRef<str>) -> Result<Bytes> {
let obj_path = ObjPath::from_url_path(path)?;
let result = self.object_store.get(&obj_path).await?;
let bytes = result.bytes().await?;
Ok(bytes)
Expand Down Expand Up @@ -283,10 +283,12 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result<Ve
next_subdir.push(curr);
}
next_subdir.push(child_dir);
let next_subdir = next_subdir
let next_subdir_str = next_subdir
.to_str()
.ok_or_else(|| InvalidPath(format!("Failed to convert path: {:?}", next_subdir)))?;
let curr_leaf_dir = get_leaf_dirs(storage, Some(next_subdir)).await?;
// Normalize to forward slashes for consistency across platforms
let next_subdir_normalized = next_subdir_str.replace('\\', "/");
let curr_leaf_dir = get_leaf_dirs(storage, Some(&next_subdir_normalized)).await?;
leaf_dirs.extend(curr_leaf_dir);
}
}
Expand Down
Loading
Loading