Skip to content

Implement cost-based optimizer for INTERSECTS join algorithm selection — Closes #67#76

Draft
conradbzura wants to merge 28 commits intomainfrom
67-cost-based-intersects-optimizer
Draft

Implement cost-based optimizer for INTERSECTS join algorithm selection — Closes #67#76
conradbzura wants to merge 28 commits intomainfrom
67-cost-based-intersects-optimizer

Conversation

@conradbzura
Copy link
Copy Markdown
Collaborator

@conradbzura conradbzura commented Mar 27, 2026

Summary

Add a DataFusion logical optimizer rule that rewrites INTERSECTS joins into binned equi-joins with adaptive bin sizing. Instead of reverse-engineering overlap predicates from column-name heuristics after transpilation, introduce a "datafusion" dialect to the GIQL transpiler that emits giql_intersects() function calls, preserving INTERSECTS semantics through the SQL layer so the optimizer can match directly.

Bin size is chosen using two-tier statistics: Tier 1 reads actual start/end values from 1–3 representative Parquet row groups and computes p95 interval width (~0.2ms overhead, capped at 300K sampled rows). Tier 2 falls back to column-level min/max estimates for non-Parquet sources. DataFusion handles parallelism, hash join execution, and disk spilling natively.

Closes #67

Proposed changes

Transpiler dialect support (Python)

Add dialect parameter to transpile() with "datafusion" option. A new DataFusionGIQLGenerator subclass overrides _generate_column_join() for INTERSECTS to emit giql_intersects(start, end, start, end) instead of raw overlap predicates. The chrom equi-key is preserved as standard SQL for hash partitioning. Literal range queries and CONTAINS/WITHIN fall through to the base generator unchanged.

Placeholder UDF (lib.rs)

Register a giql_intersects ScalarUDF on the DataFusion session state so the SQL parser accepts the function call. The UDF is never executed — the logical rule rewrites it away before execution. If the rule is missing, the UDF returns a clear error message.

Logical optimizer rule (logical_rule.rs)

The IntersectsLogicalRule detects giql_intersects() in join filters and rewrites to:

  1. Expand both sides with range(start/B, (end-1)/B+1) bin columns via UNNEST
  2. Equi-join on (chrom, bin_id) plus the original overlap filter (with giql_intersects() replaced by real start < end AND end > start predicates)
  3. Canonical-bin dedup filter (__giql_bins == CASE WHEN left_first_bin >= right_first_bin THEN left_first_bin ELSE right_first_bin END) eliminates multi-bin duplicates without DISTINCT
  4. Project away internal bin columns

Table aliases for the UNNEST SubqueryAlias are derived from the plan's schema qualifier rather than walking to the underlying TableScan, which correctly handles SQL aliases and self-joins.

Two-tier adaptive bin sizing (logical_rule.rs)

Tier 1 — Parquet sampling (~0.2ms): Read start/end columns from 1–3 representative row groups (first, middle, last) using the parquet crate's Arrow reader with projection pushdown. Compute actual interval widths (filtering non-positive values from malformed data) and return p95. Sampling is capped at 300K rows to bound memory for very large row groups. This handles all width distributions correctly, including "middle-wide" data where the widest interval has neither the smallest start nor the largest end. For remote sources (s3://, gs://), sampling fails gracefully and falls through to Tier 2.

Tier 2 — Column-level heuristic (fallback): For non-Parquet sources, estimate width from max(min(end)-min(start), max(end)-max(start)) — two independent estimates from column-level min/max stats, taking the max for robustness.

Both tiers clamp to [1,000, 1,000,000] and fall back to a default of 10,000 when no statistics are available.

Build configuration

Add Rust gitignore patterns (target/, Cargo.lock) to both the root .gitignore and a crate-level .gitignore. The parquet crate is a runtime dependency for Tier 1 sampling.

Test cases

# Test Suite Given When Then Coverage Target
1 logical_rule_test Default rule instance Rule name queried Returns intersects_logical_binned Trait impl
2 logical_rule_test Default rule instance Apply order queried Returns BottomUp Trait impl
3 logical_rule_test Default rule instance supports_rewrite queried Returns true Trait impl
4 logical_rule_test An EmptyRelation plan Rule applied Plan unchanged Non-join skip
5 logical_rule_test LEFT JOIN with giql_intersects Rule applied Plan not transformed Join type filter
6 logical_rule_test RIGHT JOIN with giql_intersects Rule applied Plan not transformed Join type filter
7 logical_rule_test FULL OUTER JOIN with giql_intersects Rule applied Plan not transformed Join type filter
8 logical_rule_test INNER JOIN with raw overlap predicates Rule applied Plan not transformed Function detection
9 logical_rule_test Three overlapping intervals across two tables INTERSECTS join executed Returns 3 pairs Basic correctness
10 logical_rule_test Adjacent intervals [100,200) and [200,300) INTERSECTS join executed Returns 0 pairs Half-open semantics
11 logical_rule_test Interval [100,500) containing [200,300) INTERSECTS join executed Returns 1 pair Containment
12 logical_rule_test Intervals on different chromosomes INTERSECTS join executed Returns 0 pairs Chrom isolation
13 logical_rule_test Wide intervals spanning multiple bins Binned join executed Each pair appears once Canonical-bin dedup
14 logical_rule_test One very wide interval vs multiple narrow Binned join executed Each pair appears once Multi-bin dedup
15 logical_rule_test 100 narrow intervals (width ~100bp) INTERSECTS join executed Returns 100 pairs Small bin sizing
16 logical_rule_test Wide intervals (width ~50,000bp) INTERSECTS join executed Returns 2 pairs Large bin sizing
17 logical_rule_test Intervals on chr1, chr2, chr3 vs chr1, chr2, chr4 INTERSECTS join executed Returns 2 pairs Multi-chromosome
18 logical_rule_test 3x3 many-to-many overlapping intervals INTERSECTS join executed Returns 8 pairs Combinatorial
19 logical_rule_test Non-empty left, empty right table INTERSECTS join executed Returns 0 rows Empty table
20 logical_rule_test Two single-row overlapping tables INTERSECTS join executed Returns 1 pair Minimal input
21 logical_rule_test Two single-row non-overlapping tables INTERSECTS join executed Returns 0 pairs Minimal input
22 logical_rule_test Two tables with identical intervals INTERSECTS join executed Returns 4 pairs (2x2) Duplicate intervals
23 logical_rule_test Intervals overlapping by exactly 1bp INTERSECTS join executed Returns 1 pair Boundary
24 logical_rule_test Tables with chromStart/chromEnd columns giql_intersects with custom names Returns 2 pairs Custom column names
25 logical_rule_test Known overlapping intervals INTERSECTS join executed Output values match expected Value correctness
26 logical_rule_test Tables named peaks/genes INTERSECTS join executed Returns 3 pairs Non-standard aliases
27 logical_rule_test Single table joined against itself INTERSECTS self-join executed Returns 5 pairs Self-join aliases
28 logical_rule_test Extra filter alongside giql_intersects INTERSECTS join with b.start > 300 Returns 1 pair Compound predicates
29 logical_rule_test Middle-wide distribution where widest interval has neither min(start) nor max(end) INTERSECTS join executed Returns 2 pairs with correct dedup Parquet sampling robustness
30 lib::tests Fresh SessionState register_optimizer called Rule and UDF both registered Registration
31 TestTranspileDataFusionDialect Column-to-column INTERSECTS join Transpile with dialect="datafusion" Output contains giql_intersects() Dialect routing
32 TestTranspileDataFusionDialect Literal range INTERSECTS Transpile with dialect="datafusion" Output uses standard SQL predicates Literal fallback
33 TestTranspileDataFusionDialect CONTAINS column join Transpile with dialect="datafusion" Output uses standard SQL predicates Non-INTERSECTS fallback
34 TestTranspileDataFusionDialect Column-to-column INTERSECTS join Transpile with default dialect Output uses standard SQL predicates Default unchanged
35 TestTranspileDataFusionDialect Any query Transpile with dialect="postgres" Raises ValueError Invalid dialect

Introduces the giql-datafusion Rust crate, a DataFusion
PhysicalOptimizerRule that reads Parquet metadata and lightweight
sampling to choose between sweep-line and binned equi-join algorithms
for interval overlap joins.

The optimizer collects statistics in two tiers:
- Tier 1 (free): row group column stats, sorting_columns, page index
  presence from the Parquet file footer
- Tier 2 (milliseconds): width distribution (median, p95, p99, CV)
  from sampling start/end columns of 1-3 representative row groups

Decision function short-circuits to sweep line when the width
distribution is heavy-tailed (p99/median > 10) or high-variance
(CV > 1.5), and falls back to a cost comparison for uniform
distributions where binning is viable.

Both algorithms are implemented as custom ExecutionPlan nodes:
- SweepLineJoinExec: sort + sweep with active set, O(n log n + k)
- BinnedJoinExec: bin expansion + hash map probe + dedup, O(n*r + k)
DataFusion v47 returns StringViewArray (not StringArray) for string
columns read from Parquet. The sweep-line and binned join execution
plans were downcasting to StringArray only, causing runtime errors
when the optimizer replaced default joins. Now both string types
are handled via a fallback chain.
Unit tests cover stats types, Parquet sampling, cost model edge
cases, config defaults, pruning bounds, and optimizer registration.

Integration tests exercise the full pipeline through DataFusion:
create Parquet files, register with SessionContext, execute
INTERSECTS join SQL, and verify correctness for overlapping,
non-overlapping, cross-chromosome, adjacent, and containment
interval scenarios.

Total: 38 tests (31 unit + 6 integration + 1 doc-test).
…detection

Three correctness bugs fixed:

1. Sweep-line active set: the retain predicate was removing wide
   right intervals based on r.start >= l.end, but those intervals
   could still overlap a later, wider left interval. Now only truly
   expired intervals (r.end <= l.start) are removed, and the full
   overlap check is done inline when emitting matches.

2. Multi-partition collection: RepartitionExec uses shared channels
   that break under sequential partition reads. Replaced manual
   sequential collection with datafusion::physical_plan::collect
   which spawns all partitions concurrently.

3. Filter column resolution: DataFusion may order filter operands
   as (right.start < left.end) instead of (left.start < right.end).
   The pattern detector now resolves columns by name and join side
   rather than assuming positional semantics.

Also handles Int32 and StringViewArray column types that DataFusion
v47 produces from Parquet reads.
Instead of a single-threaded custom BinnedJoinExec, the optimizer
now rewrites the physical plan to compose DataFusion's own parallel
infrastructure:

  BinExpandExec(left)  ─┐
                        ├─ HashJoinExec(on=[chrom, __giql_bin])
  BinExpandExec(right) ─┘         │
                           FilterExec (canonical-bin dedup)
                                  │
                           ProjectionExec (strip extra cols)

BinExpandExec is a stateless per-partition node that replicates each
interval into rows for every genome bin it touches, adding __giql_bin
and __giql_first_bin columns. The canonical-bin filter ensures each
pair is emitted exactly once by keeping only the match from
max(left_first_bin, right_first_bin).

When no Parquet stats are available, the optimizer now defers to
DataFusion's built-in join rather than defaulting to sweep line.
CLI tool that runs INTERSECTS join queries through DataFusion with
the optimizer registered, outputting JSON timing results per rep.
Supports --no-optimizer flag for baseline comparison and --op
join|pairs for different query shapes.
Restructure the sweep-line algorithm for three key speedups:

1. Per-chromosome parallelism: intervals on different chromosomes
   cannot overlap, so each chromosome is swept independently via
   tokio::spawn. This scales with the number of chromosomes (~23
   for human genome).

2. Integer chromosome IDs: map chromosome strings to dense u32 IDs
   at collection time, eliminating String allocations and HashMap
   lookups during the hot sort/sweep loop.

3. Vectorized output: collect match indices as u32 arrays and use
   arrow::compute::take once per column instead of per-row slice +
   concat.

At 500K intervals/side, heavy-tail goes from 0.56s to 0.11s (5x)
and moderate from 0.19s to 0.10s (2x), both 100-120x faster than
naive.
The sweep-line exec now declares required_input_ordering of
(chrom ASC, start ASC) on both children so DataFusion inserts
SortExec nodes automatically when inputs are unsorted.

The cost model picks the smaller side (by row count from Parquet
metadata) as the build side to materialize, reducing peak memory
for asymmetric joins. The SmallSide enum replaces the previous
skip_sort flag throughout the optimizer and cost model.
Replace the collect-both-sides approach with a proper poll_next
state machine that follows DataFusion's build/probe pattern:

  WaitBuildSide → FetchProbeBatch → process → FetchProbeBatch → ...

The build side (smaller, selected by cost model) is materialized
into a sorted, chromosome-indexed BuildSideData struct. The probe
side is streamed batch-by-batch via a SendableRecordBatchStream.
Each probe batch is swept against the build side per-chromosome,
producing output immediately via vectorized compute::take.

Multi-partition probe inputs are coalesced via CoalescePartitionsExec
to handle DataFusion's RepartitionExec transparently.

Chromosome matching between build and probe uses string names
rather than independently-assigned integer IDs, avoiding mismatches
when batches contain different chromosome subsets.
Instead of a separate FilterExec + ProjectionExec after the
HashJoinExec, the canonical-bin dedup condition is now embedded
directly in the JoinFilter expression. The HashJoinExec's own
projection parameter strips the extra columns.

This reduces the binned plan from 4 nodes (BinExpandExec →
HashJoinExec → FilterExec → ProjectionExec) to 2 nodes
(BinExpandExec → HashJoinExec), eliminating two intermediate
RecordBatch materializations.

The remaining performance gap vs the pure SQL binning approach
is attributable to the DataFusion engine version (v47 vs v52),
not the plan structure.
Sets p99_median and CV thresholds to MAX so the cost model always
selects the binned strategy, useful for isolating binned-path
performance independently of distribution characteristics.
Closes the 1.6x engine performance gap between our Rust crate and
the Python DataFusion package (v52).

API migration:
- properties() now returns &Arc<PlanProperties>
- required_input_ordering() returns Vec<Option<OrderingRequirements>>
- HashJoinExec::try_new takes NullEquality enum + null_aware bool
- LexRequirement::new returns Option (non-empty invariant)
Runs the pure SQL binned join query through the same Rust DataFusion
engine, enabling apples-to-apples comparison of the SQL approach vs
the physical plan rewrite on the same engine version.
Both the physical plan rewrite (BinExpandExec) and SQL re-planning
(BinnedSqlExec) add more overhead than they save vs DataFusion's
built-in hash join on chrom + range filter. The optimizer now defers
to DataFusion for uniform-width data.

The sweep-line remains the primary optimization path, delivering
100x+ speedup for heavy-tailed distributions.
Adds IntersectsLogicalRule that rewrites interval overlap joins to
UNNEST-based binned equi-joins at the logical level, enabling
DataFusion's native UNNEST, hash join, and DISTINCT to run with full
parallelism.

The rule is disabled by default (enable_logical_rule config flag)
because it has a known schema bug with wide intervals spanning many
bins. The physical sweep-line rule remains the active optimizer.

When enabled and working correctly for uniform data, it matches the
performance of hand-written SQL binned joins (~0.012s at 500K) by
letting DataFusion handle the entire execution pipeline natively.

Also includes BinnedSqlExec (SQL re-planning exec) which was explored
as an alternative approach but found to have higher overhead than the
logical rewrite due to context creation and materialization costs.
Three fixes to the UNNEST-based binned join logical rule:

1. SubqueryAlias on expanded sides preserves table qualifiers
   so the join filter (a.start < b.end) resolves correctly
   after UNNEST transforms the schema.

2. DISTINCT before PROJECT ordering prevents DataFusion's
   projection-pushdown from folding column selection into the
   join, which caused column count mismatches at runtime.

3. Canonical-bin filter with CAST(start AS BIGINT) / bin_size
   eliminates multi-bin duplicates without DISTINCT on the
   full output, and handles Int32 start columns correctly.

The logical rule now matches hand-written SQL-binned performance
within 10% across all distribution profiles (0.016-0.024s at 500K)
and is enabled by default.
The logical rule now reads Parquet file footer metadata (row group
column statistics) to compute the bin size adaptively. The width
signal max(end) - max(start) approximates the typical interval
width and is used directly as the bin size, clamped to [1K, 1M].

When a ListingTable is detected, the rule downcasts through
DefaultTableSource to access table_paths(), reads the first
file's footer via collect_metadata(), and extracts per-row-group
min/max bounds for start and end columns.

Falls back to the 10K default only if no Parquet metadata is
accessible (e.g., in-memory Arrow tables).

Also fixes the bench binary to respect the config default for
enable_logical_rule instead of always overriding to false.
The canonical-bin filter already ensures each interval pair is
emitted exactly once, making DISTINCT a no-op that wastes cycles
hashing all output rows. Removing it closes the gap with hand-
written SQL from 1.2-1.6x to 1.1-1.4x.
Cover the logical rule's join detection, rewrite correctness,
adaptive bin sizing, canonical-bin dedup, edge cases, and full
pipeline integration through DataFusion.

Total: 65 tests (31 unit + 27 logical rule + 6 integration + 1 doc).
@conradbzura
Copy link
Copy Markdown
Collaborator Author

Code Review

Blocking

  1. is_from_left uses alphabetical heuristic (logical_rule.rs lines 216-238) — Determines which join side a column belongs to by checking if the table alias starts with 'a' or 'l'. Will silently produce wrong results for FROM peaks JOIN genes or any non-trivial aliasing. Must check against join.left / join.right child schemas instead.

  2. 17+ eprintln! debug statements in production codelogical_rule.rs (12), optimizer.rs (4), pattern.rs (1). These emit noisy stderr output on every query. Replace with log::debug!/log::trace! or remove.

High

  1. Doc comment says enable_logical_rule defaults to false but it defaults to true (lib.rs line 55) — Fix the documentation.

  2. ~900 lines of dead codeBinExpandExec, BinnedJoinExec, BinnedSqlExec, pruning.rs, most of cost.rs are never instantiated in any active code path. Remove or feature-gate.

  3. Unnecessary unsafe Pin::new_unchecked (sweep_line.rs line 312) — JoinHandle is Unpin, so Pin::new() suffices. The safety comment ("we only poll this once") is also incorrect since this is inside a loop.

Medium

  1. choose_bin_size heuristic (logical_rule.rs lines 400-438) — max(end) - max(start) approximates the width of the interval with the largest start position, not the typical width. Could produce poor bin sizes when the widest intervals don't have the largest start coordinates.

  2. Logical rule only handles strict </> (logical_rule.rs lines 150-151) — Users writing a.start <= b.end (closed intervals) won't trigger the rewrite. The physical rule's extract_comparison_columns does accept <=/>=, creating an inconsistency.

  3. No tests for non-a/l table aliases — Given the is_from_left fragility, this is the most important missing test.

Low

  1. Cargo.lock committed — Convention for library crates is to gitignore it.

  2. UInt64Array vs UInt32Array inconsistency between bin_expand.rs and sweep_line.rs.

…ug output

Blocking fixes from code review:

1. Replace the fragile is_from_left alphabetical heuristic with
   schema-based column resolution. The function now checks the
   column's qualified name against the join's left child schema
   via DFSchema::has_column(), correctly handling any table alias.

2. Replace all eprintln! debug statements with log::debug! across
   logical_rule.rs, optimizer.rs, and pattern.rs (17 instances).

Also fixes:
- Doc comment for enable_logical_rule now matches the true default
- Remove unnecessary unsafe Pin::new_unchecked in sweep_line.rs
- Add test for non-a/l table aliases (peaks/genes) to verify fix
@conradbzura conradbzura marked this pull request as ready for review March 27, 2026 19:17
@conradbzura conradbzura marked this pull request as draft March 27, 2026 19:18
The transpile() function now accepts dialect="datafusion" which emits
giql_intersects(start, end, start, end) function calls for
column-to-column INTERSECTS joins instead of expanding to raw overlap
predicates. This preserves INTERSECTS semantics through the SQL layer
so a DataFusion logical optimizer rule can match on the function call
directly, without heuristic column-name pattern detection.

Literal range queries and CONTAINS/WITHIN fall through to the base
generator unchanged.
The logical optimizer rule now matches on giql_intersects() function
calls emitted by the GIQL transpiler's datafusion dialect, instead of
reverse-engineering overlap predicates from column name heuristics.

A placeholder giql_intersects ScalarUDF is registered so DataFusion's
SQL parser accepts the function call. The logical rule rewrites it to
a binned equi-join with adaptive bin sizing, replacing the function
call with real overlap predicates before execution.

This eliminates the physical optimizer, cost model, sweep line
executor, sampling infrastructure, and heuristic pattern matching
(15 files, ~4,000 lines removed). The binned join approach with
adaptive bin sizing from table statistics is sufficient for all
interval width distributions.

BREAKING CHANGE: register_optimizer() no longer accepts
IntersectsOptimizerConfig. The IntersectsOptimizerConfig struct,
JoinStrategy enum, and physical optimizer rule are removed.
The previous width estimate max(end) - max(start) only measured the
width of the interval with the largest start coordinate, which could
be catastrophically wrong for bimodal data. Replace with two
independent width signals — min(end)-min(start) and
max(end)-max(start) — and take the max for robustness.

Also: use schema qualifier instead of walking to TableScan for
SubqueryAlias resolution (fixes self-join alias collisions), use
exact column name matching for bin column filtering, and guard
against negative width values before i64-to-usize cast.
Library crates should not commit Cargo.lock per Rust convention.
Also ignore the target/ build directory.
Add 5 Python tests for the datafusion dialect parameter (join emits
giql_intersects, literal range unchanged, contains unchanged, default
dialect unchanged, invalid dialect raises ValueError).

Add 2 Rust integration tests: self-join (same table on both sides)
and compound predicates (extra filter alongside giql_intersects).

Also wrap the datafusion generator import in transpile.py with a
try/except that produces a clear error message when the package is
not installed.
The column-level min/max heuristic for bin sizing fails when the
widest interval is in the middle of the coordinate space — neither
at min(start) nor max(end). Both estimates can simultaneously
underestimate, leading to massive bin replication.

Read actual start/end values from 1–3 representative Parquet row
groups and compute p95 interval width directly. This is used as
Tier 1 for bin sizing (sampled p95), falling back to the
column-level heuristic (Tier 2) for non-Parquet sources. Sampling
adds ~0.2ms to planning — under 1% of total query time.
Cap sampled rows to 300K and filter non-positive widths to handle
malformed intervals and bound memory for huge row groups. Add
assert!(!result.transformed) to join-type skip tests that previously
discarded the result. Remove misleading try/except ImportError from
transpile.py since the datafusion generator ships with giql. Add
Rust gitignore patterns to root .gitignore.
Replace binned equi-join with a cache-oblivious interval tree
(coitrees crate) as the default join strategy for INTERSECTS. The
COI tree stores each interval exactly once — no bin replication —
and queries in O(log N + k) regardless of width distribution.

Benchmarks at 100K intervals/side show the COI tree path is faster
than fixed-10K binning across all tested distributions, including
2-5x on uniform data and 37x on pathological middle-wide data.

The binned equi-join path is retained behind IntersectsConfig with
force_binned = true for future benchmarking. The adaptive bin sizing
(Parquet sampling + cost-optimal binary search) is preserved for
that path.

Architecture:
- COITreeJoinNode: UserDefinedLogicalNode emitted by the logical rule
- COITreeExec: build/probe ExecutionPlan using per-chromosome COITrees
- COITreePlanner: ExtensionPlanner converting logical to physical
- GiqlQueryPlanner: registers the extension planner on SessionState
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement cost-based optimizer for INTERSECTS join algorithm selection

1 participant