Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d4cc10f
Upgrade DataFusion 48.0.0 (#61)
xudong963 Jun 26, 2025
25e5ccc
Upgrade to DF49 (#75)
xudong963 Aug 1, 2025
3026895
Upgrade DF to 49.0.2 (#86)
zhuqi-lucas Sep 3, 2025
d8364fb
make cost fn accept candidates (#83)
xudong963 Sep 13, 2025
540f29e
Fix empty unnest columns handling when pushdown_projection_inexact (#88)
zhuqi-lucas Sep 15, 2025
169eb66
upgrade to DF50 (#87)
xudong963 Sep 16, 2025
f485d46
Update internal DF 50
zhuqi-lucas Sep 17, 2025
3707a1e
fix CI
zhuqi-lucas Sep 17, 2025
bf2725b
fmt
zhuqi-lucas Sep 17, 2025
bc4cd51
Fix ci
zhuqi-lucas Sep 17, 2025
d7dd6b0
fix ci
zhuqi-lucas Sep 17, 2025
b426f5b
fix ci
zhuqi-lucas Sep 17, 2025
c34bb15
fix ci
zhuqi-lucas Sep 17, 2025
947bd8e
Merge pull request #13 from polygon-io/update-internal-df-50
zhuqi-lucas Sep 17, 2025
9c68378
upgrade df
zhuqi-lucas Sep 23, 2025
8d57dd4
Merge pull request #14 from polygon-io/upgrade-df-for-mv
xudong963 Sep 23, 2025
6164917
debug testing
zhuqi-lucas Sep 26, 2025
e6c4b35
update DF version
zhuqi-lucas Sep 27, 2025
b7f64b6
Support static partition columns for MV (#89)
suremarc Sep 25, 2025
1af1666
support limit pushdown for oneofexec
xudong963 Sep 29, 2025
946ead3
Merge pull request #16 from polygon-io/branch-50-enable-limit-pushdown
xudong963 Oct 10, 2025
eaf03ef
Update DF version
xudong963 Oct 14, 2025
849e951
Merge pull request #17 from polygon-io/update_version
xudong963 Oct 14, 2025
65dc5e6
update df version
xudong963 Oct 16, 2025
a5379e2
Update datafusion
zhuqi-lucas Oct 24, 2025
cdc6784
[X-1289] add benchmark for heavy operation for datafusion-materialize…
zhuqi-lucas Oct 28, 2025
ddf4c65
fmt
zhuqi-lucas Oct 28, 2025
24fb12e
fix ci
zhuqi-lucas Oct 28, 2025
330f367
Merge pull request #18 from polygon-io/X-1289
zhuqi-lucas Oct 28, 2025
5dac819
update datafusion
zhuqi-lucas Nov 4, 2025
2a4dc27
Update df version
zhuqi-lucas Nov 4, 2025
2d933c7
Fix Deps
wyatt-herkamp Nov 6, 2025
0552842
update DF version
xudong963 Nov 18, 2025
ea368ae
Upgrade DF 51 (#20)
jcsherin Nov 27, 2025
823f86c
chore: update DF version
jcsherin Dec 1, 2025
aeb7978
prevent rewriting strict inequality to closed interval for non-discre…
Friede80 Dec 12, 2025
b6a9e7f
Expose mv_plans for ViewMatcher (#22)
xudong963 Dec 19, 2025
3347228
Create a new OneOf node with the given branches (#23)
xudong963 Dec 30, 2025
560648f
update DF version
xudong963 Jan 15, 2026
4484dd3
update DF version
xudong963 Jan 16, 2026
5a14924
Update df
zhuqi-lucas Jan 22, 2026
81db5c2
Update DF version
zhuqi-lucas Jan 30, 2026
65457f2
Update DF version
zhuqi-lucas Feb 3, 2026
f60dd33
Optimize rewrite performance and SPJ new (#24)
zhuqi-lucas Feb 4, 2026
88b7fb2
update DF version
xudong963 Feb 13, 2026
47de189
Query rewrite targets
matthewmturner Feb 17, 2026
30e70f0
Refactor to a function for getting candidates
matthewmturner Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: crate-ci/typos@v1.13.10
with:
config: .typos.toml

check:
name: Check
Expand Down Expand Up @@ -132,7 +134,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: EmbarkStudios/cargo-deny-action@v1
- uses: EmbarkStudios/cargo-deny-action@v2
with:
command: check license

Expand Down
21 changes: 21 additions & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# typos configuration file
# Place this file in the project root (same level as Cargo.toml)
[files]
extend-exclude = ["Cargo.toml", "**/Cargo.toml"]
33 changes: 20 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ authors = ["Matthew Cramerus <matt@polygon.io>"]
license = "Apache-2.0"
description = "Materialized Views & Query Rewriting in DataFusion"
keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.80"
rust-version = "1.88.0"

[dependencies]
arrow = "55"
arrow-schema = "55"
async-trait = "0.1"
aquamarine = "0.6.0"
arrow = "57.0.0"
arrow-schema = "57.0.0"
async-trait = "0.1.89"
dashmap = "6"
datafusion = "47"
datafusion-common = "47"
datafusion-expr = "47"
datafusion-functions = "47"
datafusion-functions-aggregate = "47"
datafusion-optimizer = "47"
datafusion-physical-expr = "47"
datafusion-physical-plan = "47"
datafusion-sql = "47"
datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "da5d59f" }
futures = "0.3"
itertools = "0.14"
log = "0.4"
Expand All @@ -49,7 +50,13 @@ ordered-float = "5.0.0"

[dev-dependencies]
anyhow = "1.0.95"
criterion = "0.4"
env_logger = "0.11.6"
tempfile = "3.14.0"
tokio = "1.42.0"
url = "2.5.4"

[[bench]]
name = "materialized_views_benchmark"
harness = false
path = "benches/materialized_views_benchmark.rs"
182 changes: 182 additions & 0 deletions benches/materialized_views_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::sync::Arc;
use std::time::Duration;

use datafusion::datasource::provider_as_source;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use datafusion_common::Result as DfResult;
use datafusion_expr::LogicalPlan;
use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm;
use datafusion_sql::TableReference;
use tokio::runtime::Builder;

// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1}
fn make_create_table_sql(table_name: &str, ncols: usize) -> String {
let cols = (0..ncols)
.map(|i| format!("c{} INT", i))
.collect::<Vec<_>>()
.join(", ");
format!(
"CREATE TABLE {table} ({cols})",
table = table_name,
cols = cols
)
}

// Utility: generate a base SELECT that projects all columns and has a couple filters
fn make_base_sql(table_name: &str, ncols: usize) -> String {
let cols = (0..ncols)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ");
let mut where_clauses = vec![];
if ncols > 0 {
where_clauses.push("c0 >= 0".to_string());
}
if ncols > 1 {
where_clauses.push("c0 + c1 >= 0".to_string());
}
let where_part = if where_clauses.is_empty() {
"".to_string()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
}

// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance)
fn make_query_sql(table_name: &str, ncols: usize) -> String {
let take = std::cmp::max(1, ncols / 2);
let cols = (0..take)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ");

let mut where_clauses = vec![];
if ncols > 0 {
where_clauses.push("c0 >= 10".to_string());
}
if ncols > 1 {
where_clauses.push("c0 * c1 > 100".to_string());
}
if ncols > 10 {
where_clauses.push(format!("c{} >= 0", ncols - 1));
}

let where_part = if where_clauses.is_empty() {
"".to_string()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};

format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
}

// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider
fn build_fixture_for_cols(
rt: &tokio::runtime::Runtime,
ncols: usize,
) -> DfResult<(LogicalPlan, LogicalPlan, Arc<dyn TableProvider>)> {
rt.block_on(async move {
let ctx = SessionContext::new();

// create table
let table_name = "t";
let create_sql = make_create_table_sql(table_name, ncols);
ctx.sql(&create_sql).await?.collect().await?; // create table in catalog

// base and query plans (optimize to normalize)
let base_sql = make_base_sql(table_name, ncols);
let query_sql = make_query_sql(table_name, ncols);

let base_df = ctx.sql(&base_sql).await?;
let base_plan = base_df.into_optimized_plan()?;

let query_df = ctx.sql(&query_sql).await?;
let query_plan = query_df.into_optimized_plan()?;

// get table provider (Arc<dyn TableProvider>)
let table_ref = TableReference::bare(table_name);
let provider: Arc<dyn TableProvider> = ctx.table_provider(table_ref.clone()).await?;

Ok((base_plan, query_plan, provider))
})
}

// Criterion benchmark
fn criterion_benchmark(c: &mut Criterion) {
// columns to test
let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320];

// build a tokio runtime that's broadly compatible
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");

let mut group = c.benchmark_group("view_matcher_spj");
group.warm_up_time(Duration::from_secs(1));
group.measurement_time(Duration::from_secs(5));
group.sample_size(30);

for &ncols in &col_cases {
// Build fixture
let (base_plan, query_plan, provider) =
build_fixture_for_cols(&rt, ncols).expect("fixture");

// Measure SpjNormalForm::new for base_plan and query_plan separately
let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols));
group.throughput(Throughput::Elements(1));
group.bench_with_input(id_base, &base_plan, |b, plan| {
b.iter(|| {
let _nf = SpjNormalForm::new(plan).unwrap();
});
});

let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols));
group.bench_with_input(id_query_nf, &query_plan, |b, plan| {
b.iter(|| {
let _nf = SpjNormalForm::new(plan).unwrap();
});
});

// Precompute normal forms once (to measure rewrite_from cost only)
let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf");
let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf");

// qualifier for rewrite_from and a source created from the provider
let qualifier = TableReference::bare("mv");
let source = provider_as_source(Arc::clone(&provider));

// Benchmark rewrite_from (this is the heavy check)
let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols));
group.bench_with_input(id_rewrite, &ncols, |b, &_n| {
b.iter(|| {
let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source));
});
});
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
4 changes: 3 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ allow = [
"BSD-3-Clause",
"CC0-1.0",
"Unicode-3.0",
"Zlib"
"Zlib",
"ISC",
"bzip2-1.0.6"
]
version = 2
20 changes: 20 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[toolchain]
channel = "1.91.0"
components = ["rust-analyzer", "rustfmt", "clippy"]
63 changes: 62 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,77 @@ pub mod materialized;
pub mod rewrite;

/// Configuration options for materialized view related features.
///
/// # Materialized View Configuration
///
/// Query rewriting uses two configuration options that work together:
///
/// 1. **`use_in_query_rewrite` (on candidate MVs)**: Controls whether an MV is globally available
/// for query rewriting. MVs with `use_in_query_rewrite = false` are excluded from the
/// candidate pool entirely.
///
/// 2. **`rewrite_targets` (on queried tables)**: When querying a table, this field filters which
/// MVs from the available pool should be considered as rewrite candidates for that specific table.
///
/// The interaction works as follows:
/// - First, `use_in_query_rewrite` determines the global pool of available MVs
/// - Then, `rewrite_targets` on the queried table filters that pool for that specific query
/// - An MV must have `use_in_query_rewrite = true` **and** be in the `rewrite_targets` list
/// (or the list must be None) to be considered
///
/// # Example
///
/// ```ignore
/// // MV1: available for query rewriting
/// let mv1_config = MaterializedConfig {
/// use_in_query_rewrite: true, // MV1 is in the global pool
/// rewrite_targets: None,
/// };
///
/// // MV2: not available for query rewriting
/// let mv2_config = MaterializedConfig {
/// use_in_query_rewrite: false, // MV2 is excluded from the pool
/// rewrite_targets: None,
/// };
///
/// // Base table: only considers MV1 for rewrites
/// let base_config = MaterializedConfig {
/// use_in_query_rewrite: true,
/// rewrite_targets: Some(vec!["mv1".to_string()]), // Only MV1 is considered
/// };
/// // When querying base_table:
/// // - MV1 will be considered (in pool + in targets list)
/// // - MV2 will NOT be considered (not in pool, even if added to targets list)
/// ```
#[derive(Debug, Clone)]
pub struct MaterializedConfig {
/// Whether or not query rewriting should exploit this materialized view.
/// Whether or not this materialized view is available for query rewriting.
///
/// If `false`, this MV will not be loaded into the query rewrite engine and cannot be used
/// as a rewrite candidate, regardless of any `rewrite_targets` settings on other tables.
pub use_in_query_rewrite: bool,

/// Optional candidate materialized views for query rewriting.
///
/// When this table is queried, only the MVs listed here will be considered as rewrite candidates.
/// These should be full table names (e.g., `atlas.us_stocks_sip.trades_by_ticker`).
///
/// - If `None` (default): all eligible MVs in the catalog (where `use_in_query_rewrite = true`)
/// are considered as rewrite candidates
/// - If `Some(vec![])`: no MVs are considered (effectively disables query rewriting for this table)
/// - If `Some(vec!["mv1", "mv2"])`: only mv1 and mv2 (if they have `use_in_query_rewrite = true`)
/// are considered as rewrite candidates
///
/// Note: This field is typically set on the **queried table** (which may itself be an MV).
/// It acts as a whitelist that further filters the pool of available MVs for queries against this table.
pub rewrite_targets: Option<Vec<String>>,
}

impl Default for MaterializedConfig {
fn default() -> Self {
Self {
use_in_query_rewrite: true,
rewrite_targets: None,
}
}
}
Loading
Loading