Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ fn create_physical_plan_impl(
#[allow(unused_variables)]
Scan {
sources,
original_sources: _,
file_info,
hive_parts,
output_schema,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/scan_predicate/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ where
{
let IR::Scan {
sources,
original_sources: _,
file_info:
FileInfo {
schema: _,
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-plan/src/dsl/file_scan/python_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub static DATASET_PROVIDER_VTABLE: OnceLock<PythonDatasetProviderVTable> = Once
pub struct PythonDatasetProviderVTable {
pub name: fn(dataset_object: &PythonObject) -> PlSmallStr,

pub uri: fn(dataset_object: &PythonObject) -> Option<PlSmallStr>,

pub schema: fn(dataset_object: &PythonObject) -> PolarsResult<SchemaRef>,

#[expect(clippy::type_complexity)]
Expand Down Expand Up @@ -51,6 +53,10 @@ impl PythonDatasetProvider {
(dataset_provider_vtable().unwrap().name)(&self.dataset_object)
}

pub fn uri(&self) -> Option<PlSmallStr> {
(dataset_provider_vtable().unwrap().uri)(&self.dataset_object)
}

pub fn schema(&self) -> PolarsResult<SchemaRef> {
(dataset_provider_vtable().unwrap().schema)(&self.dataset_object)
}
Expand Down
30 changes: 20 additions & 10 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,44 @@ pub(super) async fn dsl_to_ir(
}
}

let sources_before_expansion = &sources;
let mut original_sources = sources;

let sources = match &*scan_type {
#[cfg(feature = "parquet")]
FileScanDsl::Parquet { .. } => {
sources
original_sources
.expand_paths_with_hive_update(unified_scan_args)
.await?
},
#[cfg(feature = "ipc")]
FileScanDsl::Ipc { .. } => {
sources
original_sources
.expand_paths_with_hive_update(unified_scan_args)
.await?
},
#[cfg(feature = "csv")]
FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args).await?,
FileScanDsl::Csv { .. } => original_sources.expand_paths(unified_scan_args).await?,
#[cfg(feature = "json")]
FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args).await?,
FileScanDsl::NDJson { .. } => original_sources.expand_paths(unified_scan_args).await?,
#[cfg(feature = "python")]
FileScanDsl::PythonDataset { .. } => {
FileScanDsl::PythonDataset { dataset_object } => {
// Retrieve the user-provided table_uri, e.g. for lineage purposes.
let uri = dataset_object
.uri()
.unwrap_or_else(|| dataset_object.name());
original_sources =
ScanSources::Paths(Buffer::from_iter([PlRefPath::new(uri.as_str())]));

// There are a lot of places that short-circuit if the paths is empty,
// so we just give a dummy path here.
ScanSources::Paths(Buffer::from_iter([PlRefPath::new("PL_PY_DSET")]))
},
#[cfg(feature = "scan_lines")]
FileScanDsl::Lines { .. } => sources.expand_paths(unified_scan_args).await?,
FileScanDsl::ExpandedPaths { .. } => sources.expand_paths(unified_scan_args).await?,
FileScanDsl::Anonymous { .. } => sources.clone(),
FileScanDsl::Lines { .. } => original_sources.expand_paths(unified_scan_args).await?,
FileScanDsl::ExpandedPaths { .. } => {
original_sources.expand_paths(unified_scan_args).await?
},
FileScanDsl::Anonymous { .. } => original_sources.clone(),
};

// For cloud we must deduplicate files. Serialization/deserialization leads to Arc's losing there
Expand All @@ -84,7 +93,7 @@ pub(super) async fn dsl_to_ir(
.get_or_insert(
&scan_type,
&sources,
sources_before_expansion,
&original_sources,
unified_scan_args,
verbose,
)
Expand Down Expand Up @@ -174,6 +183,7 @@ pub(super) async fn dsl_to_ir(

IR::Scan {
sources,
original_sources,
file_info,
hive_parts,
predicate: None,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/ir/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl<'a> IRDotDisplay<'a> {
},
Scan {
sources,
original_sources: _,
file_info,
hive_parts: _,
predicate,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ pub fn write_ir_non_recursive(
},
IR::Scan {
sources,
original_sources: _,
file_info,
predicate,
predicate_file_skip_applied: _,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub enum IR {
},
Scan {
sources: ScanSources,
/// Holds the original sources, i.e., prior to any expansion.
original_sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<HivePartitionsDf>,
predicate: Option<ExprIR>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(super) fn expand_datasets(

let IR::Scan {
sources,
original_sources: _,
scan_type,
unified_scan_args,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl PredicatePushDown {
},
Scan {
sources,
original_sources,
file_info,
hive_parts: scan_hive_parts,
ref predicate,
Expand Down Expand Up @@ -383,6 +384,7 @@ impl PredicatePushDown {
let lp = if do_optimization {
Scan {
sources,
original_sources,
file_info,
hive_parts,
predicate,
Expand All @@ -394,6 +396,7 @@ impl PredicatePushDown {
} else {
let lp = Scan {
sources,
original_sources,
file_info,
hive_parts,
predicate: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl ProjectionPushDown {
},
Scan {
sources,
original_sources,
mut file_info,
hive_parts,
scan_type,
Expand Down Expand Up @@ -617,6 +618,7 @@ impl ProjectionPushDown {

Ok(Scan {
sources,
original_sources,
file_info,
hive_parts,
output_schema,
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl SlicePushDown {
(
Scan {
sources,
original_sources,
file_info,
hive_parts,
output_schema,
Expand Down Expand Up @@ -306,6 +307,7 @@ impl SlicePushDown {

let lp = Scan {
sources,
original_sources,
file_info,
hive_parts,
output_schema,
Expand All @@ -320,6 +322,7 @@ impl SlicePushDown {
} else {
let lp = Scan {
sources,
original_sources,
file_info,
hive_parts,
output_schema,
Expand All @@ -337,6 +340,7 @@ impl SlicePushDown {

let lp = Scan {
sources,
original_sources,
file_info,
hive_parts,
output_schema,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/visitor/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Hash for IRHashWrap<'_> {
},
IR::Scan {
sources,
original_sources: _,
file_info: _,
hive_parts: _,
predicate,
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-python/src/dataset/dataset_provider_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
.unwrap()
}

pub fn uri(dataset_object: &PythonObject) -> Option<PlSmallStr> {
Python::attach(|py| {
let Ok(uri) = dataset_object.getattr(py, intern!(py, "table_uri_")) else {
return PyResult::Ok(None);
};
let Ok(Some(s)) = uri.extract::<Option<PyBackedStr>>(py) else {
return PyResult::Ok(None);
};
PyResult::Ok(Some(PlSmallStr::from_str(&s)))
})
.unwrap_or(None)
}

pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
Python::attach(|py| {
let pyarrow_schema_cls = py
Expand Down
1 change: 1 addition & 0 deletions crates/polars-python/src/lazyframe/visitor/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
)),
IR::Scan {
sources,
original_sources: _,
file_info: _,
hive_parts: _,
predicate,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-python/src/on_startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ pub unsafe fn register_startup_deps(catch_keyboard_interrupt: bool) {

polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
name: dataset_provider_funcs::name,
uri: dataset_provider_funcs::uri,
schema: dataset_provider_funcs::schema,
to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
});
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ pub fn lower_ir(
v @ IR::Scan { .. } => {
let IR::Scan {
sources: scan_sources,
original_sources: _,
file_info,
mut hive_parts,
output_schema: _,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/utils/late_materialized_df.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl LateMaterializedDataFrame {
});
IR::Scan {
sources: ScanSources::Paths(Default::default()),
original_sources: ScanSources::Paths(Default::default()),
file_info: FileInfo::new(schema, None, (None, usize::MAX)),
hive_parts: None,
predicate: None,
Expand Down
14 changes: 14 additions & 0 deletions py-polars/src/polars/io/delta/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import warnings
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -161,3 +162,16 @@ def _extract_table_statistics_from_delta_add_actions(
out[f"{col_name}_max"] = col_max

return pl.DataFrame(out, height=add_actions_df.height)


def _to_table_uri(source: str | Path | deltalake.DeltaTable) -> str:
if isinstance(source, deltalake.DeltaTable):
uri = source.table_uri
else:
s = str(source)
if "://" not in s:
# local path — normalize to absolute file:// URI
uri = Path(os.path.expanduser(s)).resolve().as_uri() # noqa: PTH111
else:
uri = s
return uri if uri.endswith("/") else uri + "/"
3 changes: 2 additions & 1 deletion py-polars/src/polars/io/delta/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from polars._utils.wrap import wrap_ldf
from polars.io.cloud._utils import NoPickleOption
from polars.io.delta._dataset import DeltaDataset
from polars.io.delta._utils import _to_table_uri

if TYPE_CHECKING:
from datetime import datetime
Expand Down Expand Up @@ -322,7 +323,7 @@ def scan_delta(

dataset = DeltaDataset(
table_=NoPickleOption(table),
table_uri_=str(source) if table is None else None,
table_uri_=_to_table_uri(source),
version=version,
storage_options=storage_options,
credential_provider_builder=credential_provider_builder,
Expand Down
Loading