Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d4cc10f
Upgrade DataFusion 48.0.0 (#61)
xudong963 Jun 26, 2025
25e5ccc
Upgrade to DF49 (#75)
xudong963 Aug 1, 2025
3026895
Upgrade DF to 49.0.2 (#86)
zhuqi-lucas Sep 3, 2025
d8364fb
make cost fn accept candidates (#83)
xudong963 Sep 13, 2025
540f29e
Fix empty unnest columns handling when pushdown_projection_inexact (#88)
zhuqi-lucas Sep 15, 2025
169eb66
upgrade to DF50 (#87)
xudong963 Sep 16, 2025
f485d46
Update internal DF 50
zhuqi-lucas Sep 17, 2025
3707a1e
fix CI
zhuqi-lucas Sep 17, 2025
bf2725b
fmt
zhuqi-lucas Sep 17, 2025
bc4cd51
Fix ci
zhuqi-lucas Sep 17, 2025
d7dd6b0
fix ci
zhuqi-lucas Sep 17, 2025
b426f5b
fix ci
zhuqi-lucas Sep 17, 2025
c34bb15
fix ci
zhuqi-lucas Sep 17, 2025
947bd8e
Merge pull request #13 from polygon-io/update-internal-df-50
zhuqi-lucas Sep 17, 2025
9c68378
upgrade df
zhuqi-lucas Sep 23, 2025
8d57dd4
Merge pull request #14 from polygon-io/upgrade-df-for-mv
xudong963 Sep 23, 2025
6164917
debug testing
zhuqi-lucas Sep 26, 2025
e6c4b35
update DF version
zhuqi-lucas Sep 27, 2025
b7f64b6
Support static partition columns for MV (#89)
suremarc Sep 25, 2025
1af1666
support limit pushdown for oneofexec
xudong963 Sep 29, 2025
946ead3
Merge pull request #16 from polygon-io/branch-50-enable-limit-pushdown
xudong963 Oct 10, 2025
eaf03ef
Update DF version
xudong963 Oct 14, 2025
849e951
Merge pull request #17 from polygon-io/update_version
xudong963 Oct 14, 2025
65dc5e6
update df version
xudong963 Oct 16, 2025
a5379e2
Update datafusion
zhuqi-lucas Oct 24, 2025
cdc6784
[X-1289] add benchmark for heavy operation for datafusion-materialize…
zhuqi-lucas Oct 28, 2025
ddf4c65
fmt
zhuqi-lucas Oct 28, 2025
24fb12e
fix ci
zhuqi-lucas Oct 28, 2025
330f367
Merge pull request #18 from polygon-io/X-1289
zhuqi-lucas Oct 28, 2025
5dac819
update datafusion
zhuqi-lucas Nov 4, 2025
2a4dc27
Update df version
zhuqi-lucas Nov 4, 2025
2d933c7
Fix Deps
wyatt-herkamp Nov 6, 2025
0552842
update DF version
xudong963 Nov 18, 2025
ea368ae
Upgrade DF 51 (#20)
jcsherin Nov 27, 2025
823f86c
chore: update DF version
jcsherin Dec 1, 2025
aeb7978
prevent rewriting strict inequality to closed interval for non-discre…
Friede80 Dec 12, 2025
b6a9e7f
Expose mv_plans for ViewMatcher (#22)
xudong963 Dec 19, 2025
3347228
Create a new OneOf node with the given branches (#23)
xudong963 Dec 30, 2025
560648f
update DF version
xudong963 Jan 15, 2026
4484dd3
update DF version
xudong963 Jan 16, 2026
5a14924
Update df
zhuqi-lucas Jan 22, 2026
81db5c2
Update DF version
zhuqi-lucas Jan 30, 2026
65457f2
Update DF version
zhuqi-lucas Feb 3, 2026
f60dd33
Optimize rewrite performance and SPJ new (#24)
zhuqi-lucas Feb 4, 2026
88b7fb2
update DF version
xudong963 Feb 13, 2026
6e1307c
Update DF version (#26)
trevor-wilson-polygonio Feb 20, 2026
6d0e950
[X-2048] Support view matcher with boolean binary operation (#27)
zhuqi-lucas Mar 3, 2026
8835c81
Fix/aggregate output ordering streaming
xudong963 Mar 5, 2026
51724c1
Strict unions option
matthewmturner Mar 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .claude/settings.local.json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to put the file in the repo?

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(cargo clippy:*)"
]
}
}
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: crate-ci/typos@v1.13.10
with:
config: .typos.toml

check:
name: Check
Expand Down Expand Up @@ -132,7 +134,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: EmbarkStudios/cargo-deny-action@v1
- uses: EmbarkStudios/cargo-deny-action@v2
with:
command: check license

Expand Down
21 changes: 21 additions & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# typos configuration file
# Place this file in the project root (same level as Cargo.toml)
[files]
extend-exclude = ["Cargo.toml", "**/Cargo.toml"]
33 changes: 20 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ authors = ["Matthew Cramerus <matt@polygon.io>"]
license = "Apache-2.0"
description = "Materialized Views & Query Rewriting in DataFusion"
keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.80"
rust-version = "1.88.0"

[dependencies]
arrow = "55"
arrow-schema = "55"
async-trait = "0.1"
aquamarine = "0.6.0"
arrow = "57.0.0"
arrow-schema = "57.0.0"
async-trait = "0.1.89"
dashmap = "6"
datafusion = "47"
datafusion-common = "47"
datafusion-expr = "47"
datafusion-functions = "47"
datafusion-functions-aggregate = "47"
datafusion-optimizer = "47"
datafusion-physical-expr = "47"
datafusion-physical-plan = "47"
datafusion-sql = "47"
datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "3f7b02d" }
futures = "0.3"
itertools = "0.14"
log = "0.4"
Expand All @@ -49,7 +50,13 @@ ordered-float = "5.0.0"

[dev-dependencies]
anyhow = "1.0.95"
criterion = "0.4"
env_logger = "0.11.6"
tempfile = "3.14.0"
tokio = "1.42.0"
url = "2.5.4"

[[bench]]
name = "materialized_views_benchmark"
harness = false
path = "benches/materialized_views_benchmark.rs"
182 changes: 182 additions & 0 deletions benches/materialized_views_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::sync::Arc;
use std::time::Duration;

use datafusion::datasource::provider_as_source;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use datafusion_common::Result as DfResult;
use datafusion_expr::LogicalPlan;
use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm;
use datafusion_sql::TableReference;
use tokio::runtime::Builder;

// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1}
fn make_create_table_sql(table_name: &str, ncols: usize) -> String {
let cols = (0..ncols)
.map(|i| format!("c{} INT", i))
.collect::<Vec<_>>()
.join(", ");
format!(
"CREATE TABLE {table} ({cols})",
table = table_name,
cols = cols
)
}

// Utility: generate a base SELECT that projects all columns and has a couple filters
fn make_base_sql(table_name: &str, ncols: usize) -> String {
let cols = (0..ncols)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ");
let mut where_clauses = vec![];
if ncols > 0 {
where_clauses.push("c0 >= 0".to_string());
}
if ncols > 1 {
where_clauses.push("c0 + c1 >= 0".to_string());
}
let where_part = if where_clauses.is_empty() {
"".to_string()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
}

// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance)
fn make_query_sql(table_name: &str, ncols: usize) -> String {
let take = std::cmp::max(1, ncols / 2);
let cols = (0..take)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ");

let mut where_clauses = vec![];
if ncols > 0 {
where_clauses.push("c0 >= 10".to_string());
}
if ncols > 1 {
where_clauses.push("c0 * c1 > 100".to_string());
}
if ncols > 10 {
where_clauses.push(format!("c{} >= 0", ncols - 1));
}

let where_part = if where_clauses.is_empty() {
"".to_string()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};

format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
}

// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider
fn build_fixture_for_cols(
rt: &tokio::runtime::Runtime,
ncols: usize,
) -> DfResult<(LogicalPlan, LogicalPlan, Arc<dyn TableProvider>)> {
rt.block_on(async move {
let ctx = SessionContext::new();

// create table
let table_name = "t";
let create_sql = make_create_table_sql(table_name, ncols);
ctx.sql(&create_sql).await?.collect().await?; // create table in catalog

// base and query plans (optimize to normalize)
let base_sql = make_base_sql(table_name, ncols);
let query_sql = make_query_sql(table_name, ncols);

let base_df = ctx.sql(&base_sql).await?;
let base_plan = base_df.into_optimized_plan()?;

let query_df = ctx.sql(&query_sql).await?;
let query_plan = query_df.into_optimized_plan()?;

// get table provider (Arc<dyn TableProvider>)
let table_ref = TableReference::bare(table_name);
let provider: Arc<dyn TableProvider> = ctx.table_provider(table_ref.clone()).await?;

Ok((base_plan, query_plan, provider))
})
}

// Criterion benchmark
fn criterion_benchmark(c: &mut Criterion) {
// columns to test
let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320];

// build a tokio runtime that's broadly compatible
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");

let mut group = c.benchmark_group("view_matcher_spj");
group.warm_up_time(Duration::from_secs(1));
group.measurement_time(Duration::from_secs(5));
group.sample_size(30);

for &ncols in &col_cases {
// Build fixture
let (base_plan, query_plan, provider) =
build_fixture_for_cols(&rt, ncols).expect("fixture");

// Measure SpjNormalForm::new for base_plan and query_plan separately
let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols));
group.throughput(Throughput::Elements(1));
group.bench_with_input(id_base, &base_plan, |b, plan| {
b.iter(|| {
let _nf = SpjNormalForm::new(plan).unwrap();
});
});

let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols));
group.bench_with_input(id_query_nf, &query_plan, |b, plan| {
b.iter(|| {
let _nf = SpjNormalForm::new(plan).unwrap();
});
});

// Precompute normal forms once (to measure rewrite_from cost only)
let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf");
let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf");

// qualifier for rewrite_from and a source created from the provider
let qualifier = TableReference::bare("mv");
let source = provider_as_source(Arc::clone(&provider));

// Benchmark rewrite_from (this is the heavy check)
let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols));
group.bench_with_input(id_rewrite, &ncols, |b, &_n| {
b.iter(|| {
let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source));
});
});
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
4 changes: 3 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ allow = [
"BSD-3-Clause",
"CC0-1.0",
"Unicode-3.0",
"Zlib"
"Zlib",
"ISC",
"bzip2-1.0.6"
]
version = 2
20 changes: 20 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[toolchain]
channel = "1.91.0"
components = ["rust-analyzer", "rustfmt", "clippy"]
48 changes: 41 additions & 7 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion::{
catalog::TableProvider,
datasource::listing::{ListingTable, ListingTableUrl},
};
use datafusion_common::DataFusionError;
use datafusion_expr::LogicalPlan;
use itertools::Itertools;

Expand Down Expand Up @@ -110,6 +111,14 @@ pub trait Materialized: ListingTableLike {
fn config(&self) -> MaterializedConfig {
MaterializedConfig::default()
}

/// Which partition columns are 'static'.
/// Static partition columns are those that are used in incremental view maintenance.
/// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`].
/// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh.
fn static_partition_columns(&self) -> Vec<String> {
<Self as ListingTableLike>::partition_columns(self)
}
}

/// Register a [`Materialized`] implementation in this registry.
Expand All @@ -122,13 +131,38 @@ pub fn register_materialized<T: Materialized>() {
}

/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_materialized(decorator.base()))
})
/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`.
///
/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated.
/// In particular, checks that the static partition columns are a prefix of all partition columns.
pub fn cast_to_materialized(
table: &dyn TableProvider,
) -> Result<Option<&dyn Materialized>, DataFusionError> {
let materialized = match TABLE_TYPE_REGISTRY
.cast_to_materialized(table)
.map(Ok)
.or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_materialized(decorator.base()).transpose())
})
.transpose()?
{
None => return Ok(None),
Some(m) => m,
};

let static_partition_cols = materialized.static_partition_columns();
let all_partition_cols = materialized.partition_columns();

if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..]
{
return Err(DataFusionError::Plan(format!(
"Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})"
)));
}

Ok(Some(materialized))
}

/// A `TableProvider` that decorates other `TableProvider`s.
Expand Down
Loading
Loading