Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/polars-arrow/src/bitmap/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,14 @@ impl BitmapBuilder {
length: usize,
repeats: usize,
) {
debug_assert!(8 * slice.len() >= offset + length);
if repeats == 0 {
return;
}
if repeats == 1 {
return self.extend_from_slice_unchecked(slice, offset, length);
}
for bit_idx in offset..length {
for bit_idx in offset..(offset + length) {
let bit = (*slice.get_unchecked(bit_idx / 8) >> (bit_idx % 8)) & 1 != 0;
self.extend_constant(repeats, bit);
}
Expand Down
21 changes: 11 additions & 10 deletions crates/polars-expr/src/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use polars_core::utils::{_split_offsets, NoNull};
use polars_ops::prelude::ArgAgg;
#[cfg(feature = "propagate_nans")]
use polars_ops::prelude::nan_propagating_aggregate;
use polars_utils::itertools::Itertools;
use rayon::prelude::*;

use super::*;
Expand Down Expand Up @@ -712,21 +711,23 @@ impl PhysicalExpr for AggMinMaxByExpr {
unsafe { by_col.agg_arg_min(&by_groups) }
};
let idxs_in_groups: &IdxCa = idxs_in_groups.as_materialized_series().as_ref().as_ref();
let flat_gather_idxs = match input_groups.as_ref().as_ref() {
let gather_idxs: IdxCa = match input_groups.as_ref().as_ref() {
GroupsType::Idx(g) => idxs_in_groups
.into_no_null_iter()
.iter()
.enumerate()
.map(|(group_idx, idx_in_group)| g.all()[group_idx][idx_in_group as usize])
.collect_vec(),
.map(|(group_idx, idx_in_group)| {
idx_in_group.map(|i| g.all()[group_idx][i as usize])
})
.collect(),
GroupsType::Slice { groups, .. } => idxs_in_groups
.into_no_null_iter()
.iter()
.enumerate()
.map(|(group_idx, idx_in_group)| groups[group_idx][0] + idx_in_group)
.collect_vec(),
.map(|(group_idx, idx_in_group)| idx_in_group.map(|i| groups[group_idx][0] + i))
.collect(),
};

// SAFETY: All indices are within input_col's groups.
let gathered = unsafe { input_col.take_slice_unchecked(&flat_gather_idxs) };
// SAFETY: All non-null indices are within input_col's groups.
let gathered = unsafe { input_col.take_unchecked(&gather_idxs) };
let agg_state = AggregatedScalar(gathered.with_name(keep_name));
Ok(AggregationContext::from_agg_state(
agg_state,
Expand Down
50 changes: 50 additions & 0 deletions crates/polars-plan/src/plans/conversion/type_coercion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,56 @@ impl OptimizationRule for TypeCoercionRule {
options,
})
},
#[cfg(feature = "business")]
AExpr::Function {
function: IRFunctionExpr::Business(ref business_fn),
ref input,
options,
} => {
let holiday_arg_idx: usize = match business_fn {
IRBusinessFunction::AddBusinessDay { .. }
| IRBusinessFunction::BusinessDayCount { .. } => 2,
IRBusinessFunction::IsBusinessDay { .. } => 1,
};

let holiday_arg = unpack!(input.get(holiday_arg_idx));

// We implode, only for literal Series(dtype=Date), as this is considered a valid
// parameter on the Python API as an `Iterable[date]`.
let new_lv_ae: AExpr = match expr_arena.get(holiday_arg.node()) {
AExpr::Literal(LiteralValue::Series(s)) if s.dtype() == &DataType::Date => {
AExpr::Literal(LiteralValue::Series(SpecialEq::new(
s.implode().unwrap().into_series(),
)))
},
ae => {
let dtype = ae.to_dtype(&ToFieldContext::new(expr_arena, schema))?;

let is_list_of_date = match &dtype {
DataType::List(inner) => inner.as_ref() == &DataType::Date,
_ => false,
};

polars_ensure!(
is_list_of_date,
ComputeError:
"dtype of holidays list must be List(Date), got {dtype:?} instead"
);

return Ok(None);
},
};

let mut input = input.clone();
let function = IRFunctionExpr::Business(business_fn.clone());
input[holiday_arg_idx].set_node(expr_arena.add(new_lv_ae));

Some(AExpr::Function {
input,
function,
options,
})
},
#[cfg(feature = "list_gather")]
AExpr::Function {
function: ref function @ IRFunctionExpr::ListExpr(IRListFunction::Gather(_)),
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-plan/src/plans/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ pub fn optimize(

if opt_flags.slice_pushdown() {
let mut slice_pushdown_opt = SlicePushDown::new();
let ir = ir_arena.take(root);
let ir = slice_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;

ir_arena.replace(root, ir);

Expand Down Expand Up @@ -246,8 +245,7 @@ pub fn optimize(

if repeat_slice_pd_after_filter_pd {
let mut slice_pushdown_opt = SlicePushDown::new();
let ir = ir_arena.take(root);
let ir = slice_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
let ir = slice_pushdown_opt.optimize(root, ir_arena, expr_arena)?;

ir_arena.replace(root, ir);
}
Expand Down
55 changes: 28 additions & 27 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ impl SlicePushDown {
let new_inputs = inputs
.into_iter()
.map(|node| {
let alp = lp_arena.take(node);
// No state, so we do not push down the slice here.
let state = None;
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
let alp = self.pushdown(node, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
Expand All @@ -216,26 +215,37 @@ impl SlicePushDown {
let new_inputs = inputs
.into_iter()
.map(|node| {
let alp = lp_arena.take(node);
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
let alp = self.pushdown(node, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<PolarsResult<UnitVec<_>>>()?;
Ok(lp.with_inputs(new_inputs))
}

/// This will take the `ir_node` from the `lp_arena`, replacing it with `IR::Invalid` (except if
/// `ir_node` is a `IR::Cache`).
#[recursive]
fn pushdown(
&mut self,
lp: IR,
ir_node: Node,
state: Option<State>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
use IR::*;

match (lp, state) {
// Don't take this, the node can be referenced multiple times in the tree.
if let IR::Cache { .. } = lp_arena.get(ir_node) {
return self.no_pushdown_restart_opt(
lp_arena.get(ir_node).clone(),
state,
lp_arena,
expr_arena,
);
}

match (lp_arena.take(ir_node), state) {
#[cfg(feature = "python")]
(
PythonScan { mut options },
Expand Down Expand Up @@ -305,7 +315,8 @@ impl SlicePushDown {
predicate_file_skip_applied,
};

self.pushdown(lp, None, lp_arena, expr_arena)
lp_arena.replace(ir_node, lp);
self.pushdown(ir_node, None, lp_arena, expr_arena)
} else {
let lp = Scan {
sources,
Expand Down Expand Up @@ -385,8 +396,7 @@ impl SlicePushDown {
.map(|len| State { offset: 0, len });

for input in &mut inputs {
let input_lp = lp_arena.take(*input);
let input_lp = self.pushdown(input_lp, subplan_slice, lp_arena, expr_arena)?;
let input_lp = self.pushdown(*input, subplan_slice, lp_arena, expr_arena)?;
lp_arena.replace(*input, input_lp);
}
options.slice = opt_state.map(|x| (x.offset, x.len.try_into().unwrap()));
Expand Down Expand Up @@ -440,12 +450,10 @@ impl SlicePushDown {
}

// first restart optimization in both inputs and get the updated LP
let lp_left = lp_arena.take(input_left);
let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?;
let lp_left = self.pushdown(input_left, None, lp_arena, expr_arena)?;
let input_left = lp_arena.add(lp_left);

let lp_right = lp_arena.take(input_right);
let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?;
let lp_right = self.pushdown(input_right, None, lp_arena, expr_arena)?;
let input_right = lp_arena.add(lp_right);

// then assign the slice state to the join operation
Expand Down Expand Up @@ -476,8 +484,7 @@ impl SlicePushDown {
Some(state),
) => {
// first restart optimization in inputs and get the updated LP
let input_lp = lp_arena.take(input);
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
let input_lp = self.pushdown(input, None, lp_arena, expr_arena)?;
let input = lp_arena.add(input_lp);

if let Some(existing_slice) = &mut Arc::make_mut(&mut options).slice {
Expand Down Expand Up @@ -528,8 +535,7 @@ impl SlicePushDown {
},
(Distinct { input, mut options }, Some(state)) => {
// first restart optimization in inputs and get the updated LP
let input_lp = lp_arena.take(input);
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
let input_lp = self.pushdown(input, None, lp_arena, expr_arena)?;
let input = lp_arena.add(input_lp);

if let Some(existing_slice) = &mut options.slice {
Expand Down Expand Up @@ -594,8 +600,7 @@ impl SlicePushDown {
assert!(slice.is_none() || slice == new_slice);

// first restart optimization in inputs and get the updated LP
let input_lp = lp_arena.take(input);
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
let input_lp = self.pushdown(input, None, lp_arena, expr_arena)?;
let input = lp_arena.add(input_lp);

Ok(Sort {
Expand All @@ -613,8 +618,6 @@ impl SlicePushDown {
},
Some(outer_slice),
) => {
let alp = lp_arena.take(input);

// If offset is negative the length can never be greater than it.
if offset < 0 {
#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.
Expand All @@ -626,10 +629,10 @@ impl SlicePushDown {
if let Some(combined) =
combine_outer_inner_slice(outer_slice, State { offset, len })
{
self.pushdown(alp, Some(combined), lp_arena, expr_arena)
self.pushdown(input, Some(combined), lp_arena, expr_arena)
} else {
let lp =
self.pushdown(alp, Some(State { offset, len }), lp_arena, expr_arena)?;
self.pushdown(input, Some(State { offset, len }), lp_arena, expr_arena)?;
let input = lp_arena.add(lp);
self.slice_node_in_optimized_plan = true;
Ok(Slice {
Expand All @@ -647,8 +650,6 @@ impl SlicePushDown {
},
None,
) => {
let alp = lp_arena.take(input);

// If offset is negative the length can never be greater than it.
if offset < 0 {
#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.
Expand All @@ -658,7 +659,7 @@ impl SlicePushDown {
}

let state = Some(State { offset, len });
self.pushdown(alp, state, lp_arena, expr_arena)
self.pushdown(input, state, lp_arena, expr_arena)
},
m @ (Filter { .. }, _)
| m @ (DataFrameScan { .. }, _)
Expand Down Expand Up @@ -809,7 +810,7 @@ impl SlicePushDown {

pub fn optimize(
&mut self,
logical_plan: IR,
logical_plan: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/c_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod allocator;
// Since Python Polars cannot share its version into here and we need to be able to build this
// package correctly without `py-polars`, we need to mirror the version here.
// example: 1.35.0-beta.1
pub static PYPOLARS_VERSION: &str = "1.39.0";
pub static PYPOLARS_VERSION: &str = "1.39.1";

// We allow multiple features to be set simultaneously so checking with all-features
// is possible. In the case multiple are set or none at all, we set the repr to "unknown".
Expand Down
2 changes: 1 addition & 1 deletion docs/source/polars-cloud/run/distributed-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ result = (
This example demonstrates running query 3 of the PDS-H benchmarkon scale factor 100 (approx. 100GB
of data) using Polars Cloud distributed engine.

!!! note "Run the example yourself"
!!! example "Run the example yourself"

Copy and paste the code to you environment and run it. The data is hosted in S3 buckets that use [AWS Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html), meaning you pay only for pays the cost of the request and the data download from the bucket. The storage costs are covered.

Expand Down
6 changes: 3 additions & 3 deletions docs/source/polars-cloud/run/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ completion back to the scheduler and write shuffle output for downstream stages

The **stage graph** is produced by the distributed query planner from the optimized logical plan.
The planner walks the logical plan and identifies **stage boundaries**: points where a data shuffle
is required to optimize stages to maximize parallelism, minimize data shuffle, and keep peak memory
usage under control. Joins and group-bys are typical examples, a worker cannot produce its final
result without first receiving the relevant keys or partial aggregates from other workers.
is required. The planner optimizes stages to maximize parallelism, minimize data shuffle, and keep
peak memory usage under control. Joins and group-bys are typical examples; a worker cannot produce
its final result without first receiving the relevant keys or partial aggregates from other workers.

At each stage boundary, the planner inserts a shuffle and starts a new stage. The result is a
directed acyclic graph (DAG) in which each node is a stage and each edge is a shuffle. All workers
Expand Down
Loading
Loading