Skip to content

20968: feat: add approx_top_k aggregate function#289

Open
martin-augment wants to merge 1 commit intomainfrom
pr-20968-2026-03-24-07-10-44
Open

20968: feat: add approx_top_k aggregate function#289
martin-augment wants to merge 1 commit intomainfrom
pr-20968-2026-03-24-07-10-44

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

20968: To review by AI

Add a new approx_top_k(expression, k) aggregate function that returns
the approximate top-k most frequent values with their estimated counts,
using the Filtered Space-Saving algorithm.

The implementation uses a capacity multiplier of 3 (matching ClickHouse's
default) and includes an alpha map for improved accuracy by filtering
low-frequency noise before it enters the main summary.

Return type is List(Struct({value: T, count: UInt64})) ordered by count
descending, where T matches the input column type.

Closes apache#20967
@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances DataFusion's analytical capabilities by introducing the approx_top_k aggregate function. This function provides an efficient way to identify the most frequent elements in a dataset, which is particularly useful for large-scale data analysis where exact calculations might be computationally expensive. The implementation leverages a robust streaming algorithm to deliver approximate results with controlled accuracy, making it a valuable addition for data exploration and heavy hitter detection.

Highlights

  • New Aggregate Function: Introduced approx_top_k, an approximate aggregate function to find the most frequent (top-k) values and their counts in a column.
  • Algorithm Implementation: The function is implemented using the Filtered Space-Saving algorithm, designed for distributed-friendly approximate aggregation.
  • Data Type Support: Supports various data types including strings, integers, and floats, handling float specificities like -0.0, +0.0, and NaN representations.
  • Distributed Processing: Includes serialization and deserialization logic for the internal SpaceSavingSummary state, enabling efficient merging of partial aggregations in a distributed environment.
  • Documentation and Testing: Comprehensive documentation has been added, along with unit tests, dataframe tests, logical plan roundtrip tests, and SQL logic tests to ensure correctness and proper behavior.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 24, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e4e058f6-0f98-4225-82f8-d1817e8987a8

📥 Commits

Reviewing files that changed from the base of the PR and between 260ed13 and aa7bc47.

📒 Files selected for processing (7)
  • datafusion/core/tests/dataframe/dataframe_functions.rs
  • datafusion/functions-aggregate/src/approx_top_k.rs
  • datafusion/functions-aggregate/src/lib.rs
  • datafusion/proto/tests/cases/roundtrip_logical_plan.rs
  • datafusion/sqllogictest/test_files/approx_top_k.slt
  • docs/source/user-guide/expressions.md
  • docs/source/user-guide/sql/aggregate_functions.md
👮 Files not reviewed due to content moderation or server errors (7)
  • datafusion/functions-aggregate/src/approx_top_k.rs
  • datafusion/functions-aggregate/src/lib.rs
  • docs/source/user-guide/expressions.md
  • docs/source/user-guide/sql/aggregate_functions.md
  • datafusion/proto/tests/cases/roundtrip_logical_plan.rs
  • datafusion/core/tests/dataframe/dataframe_functions.rs
  • datafusion/sqllogictest/test_files/approx_top_k.slt

Walkthrough

This pull request introduces a new aggregate function approx_top_k to DataFusion. The function implements the Filtered Space-Saving algorithm to approximate the top-k most frequent values. It accepts a value column and an integer k in the range [1, 10000], returning a list of structs containing value-count pairs. The implementation includes the UDAF definition with state serialization, deserialization, and merging logic for distributed execution. Integration is completed through library registration, comprehensive test coverage including unit tests, dataframe tests, sqllogictest cases, and protobuf roundtrip validation, along with documentation updates to the user guide.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pr-20968-2026-03-24-07-10-44

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new aggregate function approx_top_k using the Filtered Space-Saving algorithm. The implementation is comprehensive, including support for various data types, distributed execution via state serialization, and extensive testing through unit tests, integration tests, and SQL logic tests. The documentation has also been updated accordingly.

My review focuses on improving the implementation's clarity, correctness, and robustness. I've suggested refactoring the SpaceSavingSummary::new method for better readability, improving the accuracy of the memory size estimation in SpaceSavingSummary::size, and simplifying the ApproxTopKAccumulator by making input_data_type non-optional.

Overall, this is a solid contribution. Addressing the suggested points will further enhance the quality of this new feature.

Comment on lines +154 to +163
fn new(capacity: usize) -> Self {
Self {
counters: Vec::new(),
counter_map: HashMap::new(),
alpha_map: Vec::new(),
requested_capacity: 0,
target_capacity: 0,
}
.resized(capacity)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new function creates a temporary SpaceSavingSummary and then immediately calls resized. Since resized (lines 165-175) is not used anywhere else, its logic can be inlined into new for clarity and to avoid creating a temporary struct. This also allows removing the resized function.

    fn new(capacity: usize) -> Self {
        let alpha_map_size = Self::compute_alpha_map_size(capacity);
        let target_capacity = std::cmp::max(64, capacity * 2);
        let mut counters = Vec::new();
        counters.reserve(target_capacity);

        Self {
            counters,
            counter_map: HashMap::new(),
            alpha_map: vec![0u64; alpha_map_size],
            requested_capacity: capacity,
            target_capacity,
        }
    }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:bug; feedback: The Gemini AI reviewer is correct! The resized() function is used just once, so it could be inlined directly in new() to simplify the logic. But having it as a separate function that returns a new/modified Self makes it more flexible for resizing SpaceSavingSummary instances at any point later

Comment on lines +471 to +482
fn size(&self) -> usize {
size_of::<Self>()
+ self
.counters
.iter()
.map(|c| size_of::<Counter>() + c.item.len())
.sum::<usize>()
// HashMap overhead: each bucket has a key-value pair plus control byte metadata.
+ self.counter_map.capacity()
* (size_of::<(Vec<u8>, usize)>() + size_of::<u8>())
+ self.alpha_map.capacity() * size_of::<u64>()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The size method likely underestimates the memory usage of the SpaceSavingSummary. Specifically:

  1. It doesn't account for the heap allocation of the Vec<u8> keys in counter_map. Since push_counter clones the item, its memory is allocated twice (once in counters and once in counter_map).
  2. It uses c.item.len() which might be less than c.item.capacity(), under-reporting allocated memory.
  3. It doesn't account for the capacity of the counters and alpha_map vectors themselves, only their contents.

An inaccurate size method can lead to poor memory management decisions by the query engine. A more accurate implementation is suggested.

    fn size(&self) -> usize {
        use std::mem::size_of;
        let item_bytes: usize = self.counters.iter().map(|c| c.item.capacity()).sum();

        size_of::<Self>()
            + (self.counters.capacity() * size_of::<Counter>())
            // HashMap overhead. The keys are cloned from `counters` so we account for their heap allocation separately.
            + (self.counter_map.capacity() * (size_of::<(Vec<u8>, usize)>() + size_of::<u8>()))
            + (self.alpha_map.capacity() * size_of::<u64>())
            + item_bytes * 2
    }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! To calculate the size of collections one has to multiply their capacity with the size of the collection item. Here a Counter contains items - a Vec. And these items are duplicated both in the counters Vec and in counters_map HashMap, so they should be calculated twice.

summary: SpaceSavingSummary,
k: usize,
/// The data type of the input column. `None` until the first batch is seen.
input_data_type: Option<DataType>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The input_data_type is an Option<DataType>, but it seems it can always be initialized with a DataType in the accumulator factory method since the planner has access to the input expression's type.

By changing input_data_type to be a non-optional DataType, you can simplify the implementation significantly:

  • Remove the need for unwrap_or and checks for is_none.
  • Simplify state() serialization and merge_batch() logic.
  • Make the code more robust by ensuring the type is always present.

This would involve changing ApproxTopKAccumulator and its related methods to work with DataType directly instead of Option<DataType>. The comment on line 686 would also need to be updated.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! Only the tests use None as an initial value. The method signature could be optimized to not use Option and always pass the concrete DataType. Prevents using more complex logic to support both None and Some.

@claude
Copy link
Copy Markdown

claude bot commented Mar 24, 2026

Code Review: approx_top_k Aggregate Function

Overall this is a solid implementation of the Filtered Space-Saving algorithm for approximate top-k aggregation. The code is well-structured and includes good test coverage. Below are my findings organized by severity.


Bugs / Correctness Issues

1. Integer overflow in from_bytes deserialization

In approx_top_k.rs, the bounds check for the alpha map can overflow:

if offset + alpha_map_size * 8 > bytes.len() {

If alpha_map_size is large enough (e.g., > usize::MAX / 8), the multiplication wraps in release builds, defeating the bounds check. An attacker who can supply crafted binary state could cause incorrect behavior. Use checked_mul / checked_add:

let alpha_bytes = alpha_map_size
    .checked_mul(8)
    .and_then(|n| offset.checked_add(n))
    .ok_or_else(|| DataFusionError::Execution("alpha map size overflow".to_string()))?;
if alpha_bytes > bytes.len() { ... }

Similarly, Vec::with_capacity(alpha_map_size) will try to allocate memory proportional to the untrusted value — the bounds check above needs to be correct first.

2. .unwrap() on try_into() slices in from_bytes

The deserialization code repeatedly does:

u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap())

While the length checks above each call should prevent a panic, .try_into().map_err(|_| DataFusionError::...)? would be safer and consistent with DataFusion's error handling style used elsewhere in the file.

3. counter_map is stale during the merge loop

In merge(), new items from other.counters are pushed to self.counters but self.counter_map is not updated until truncate_if_needed(true) at the end. This is only safe because items within a single SpaceSavingSummary are guaranteed to be unique. If that invariant is ever violated (e.g., by a malformed deserialized state), duplicate items could be double-counted. Worth adding a comment or a debug assertion.


Performance Concerns

4. HashMap<Vec<u8>, usize> keys cause a heap allocation per tracked item

For numeric types (Int8–UInt64, Float32/64), values are 1–8 bytes serialized to Vec<u8>, each requiring a separate heap allocation in the counter_map. For hot aggregate paths processing millions of rows, this adds significant GC pressure.

Consider using a fixed-size key type (e.g., [u8; 16] padded) or, for the numeric path, maintaining a separate HashMap<u128, usize> — a common pattern in DataFusion's other aggregate functions.

5. Misleading memory estimate in the CAPACITY_MULTIPLIER comment

/// Memory impact: ~100 bytes per counter, so top-100 uses ~30 KB per accumulator.

For k=100: requested_capacity = 300, but target_capacity = max(64, 600) = 600. Items accumulate to 600 before truncation, so the actual peak is closer to 60 KB, not 30 KB. The comment should document both requested_capacity and target_capacity.


Design / Algorithmic Concerns

6. FNV-1a hash is vulnerable to hash-flooding

The custom FNV-1a implementation is used for both item identification in the counter_map (via HashMap, which uses a different hasher internally) and for the alpha map slot selection. The alpha map slot selection uses FNV-1a without any randomization, which means adversarially crafted inputs (all items hashing to the same alpha slot) would cause systematic underestimation of frequency. Consider seeding the hash with a random value at construction time or using ahash.

7. Silent accuracy degradation when alpha map sizes differ in merge

// If they differ due to a bug, we skip the merge
// which only degrades accuracy without affecting correctness.
if self.alpha_map.len() == other.alpha_map.len() {

Since the alpha map captures history of evicted items, skipping its merge means new items that were previously evicted from other will not get the alpha boost, leading to underestimation that's hard to diagnose. At minimum, log a warning (at debug level). In practice, this could happen in legitimate scenarios if the planner routes queries through code paths that reconstruct accumulators with different k values.

8. Missing type support

The function doesn't support Boolean, Decimal128, Date32, Date64, Timestamp, Time32/64, Utf8View, or BinaryView. These are common in analytics workloads. The restriction is reasonable as an initial implementation, but it should be clearly documented in both the SQL docs and the Rust doc comment.


Minor Issues

9. Custom data_type_to_string / string_to_data_type roundtrip

These manual string conversions could be replaced with Arrow's built-in DataType serialization, or at minimum should use DataType's Display impl (which is stable). Custom serialization strings are fragile and a maintenance burden.

10. Error bound not exposed in output

The algorithm tracks both count and error per item, but only count is returned. Exposing the error bound (e.g., as count_error) would let users make informed decisions about result accuracy, which is especially useful for approximate functions. The count - error lower bound guarantees correctness.

11. evaluate() after state() may see a truncated summary

state() calls summary.serialize() which calls truncate_if_needed(false), potentially modifying self.summary. If evaluate() is called after state(), the summary has been truncated and results may differ from pre-state() evaluation. The DataFusion planner doesn't do this today, but a comment noting the order-dependency would be helpful.

12. Snapshot test in dataframe_functions.rs is fragile for an approximate function

| [{value: 10, count: 2}, {value: 1, count: 1}] |

For a dataset small enough to fit entirely in requested_capacity, this will always be exact. But it's worth adding a comment noting this is a deterministic result because the dataset is small—readers should not expect SQL-level tests of approximate functions to always match byte-for-byte.


Test Coverage Gaps

  • No test for -0.0 vs +0.0 being tracked as distinct values (documented behavior)
  • No test for NaN values in float columns
  • No SQL test for float column types
  • No test for k > APPROX_TOP_K_MAX_K error path
  • No test for corrupted/truncated binary state in from_bytes
  • The distributed merge simulation (test_distributed_merge_simulation) doesn't test the alpha map interaction — it only tests exact-count accumulation

Summary

The overall structure and algorithm implementation are sound. The main items to address before merging:

  1. Fix the overflow in from_bytes (correctness/security)
  2. Replace .unwrap() in deserialization with proper ?-based error handling
  3. Fix or document the memory estimate in CAPACITY_MULTIPLIER
  4. Document missing types in user-facing docs

The performance concern with HashMap<Vec<u8>, usize> is worth tracking as a follow-up issue rather than blocking this PR.

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Mar 24, 2026

🤖 Augment PR Summary

Summary: Adds a new approximate heavy-hitters aggregate, approx_top_k(expr, k), to return the most frequent values and their counts.

Changes:

  • Implemented approx_top_k as a new aggregate UDF using the Filtered Space-Saving algorithm with serializable state for distributed merges
  • Added an accumulator that supports common string and numeric Arrow types and returns a List<Struct{value,count}>
  • Registered the UDAF in the aggregate function module exports and default aggregate registry
  • Added coverage via DataFrame API tests, sqllogictest cases, and logical-plan protobuf roundtrip tests
  • Documented approx_top_k in the expressions and aggregate-function user guides

Technical Notes: k is required to be a literal integer (1..=10,000); the implementation tracks a larger internal capacity to improve accuracy and supports mergeable execution.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 4 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

let k_expr = &args.exprs[1];
let k = k_expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Literal>()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k is currently required to be a Literal physical expression; if it’s wrapped in a cast or other foldable constant expression (e.g. CAST(2 AS BIGINT)), planning/execution may fail even though the value is still a literal. It may be worth extracting k via scalar evaluation (like other UDAFs here) rather than downcasting to Literal.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

self.find_counter(item).map(|c| (c.count, c.error))
}

/// Get the top-k items sorted by count descending.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment says results are “sorted by count descending”, but top_k orders by (count - error) first via cmp_by_rank, which can differ from the returned count values. That mismatch could lead to surprising ordering for users (and contradict the documented behavior).

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

let return_size = std::cmp::min(sorted.len(), k);

if return_size < sorted.len() {
sorted.select_nth_unstable_by(return_size - 1, |a, b| a.cmp_by_rank(b));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because cmp_by_rank returns Equal for ties, select_nth_unstable_by can choose an arbitrary subset when multiple items have equal rank, making output non-deterministic for tied frequencies. This can also make snapshot-style tests brittle when there are ties.

Severity: medium

Other Locations
  • datafusion/core/tests/dataframe/dataframe_functions.rs:428

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap()) as usize;
offset += 8;

if offset + alpha_map_size * 8 > bytes.len() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_bytes uses arithmetic like offset + alpha_map_size * 8 without guarding against usize overflow; malformed/corrupt state could bypass the length check and panic during slicing. It may be worth adding overflow-safe bounds checks/sanity limits when parsing the serialized state.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

}

bytes
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialization drops counters without alpha map update

Medium Severity

serialize() selects the top requested_capacity counters and discards the rest, but unlike truncate_if_needed, it does not add the evicted counters' true counts to the alpha_map before writing. In a distributed merge, this silently loses frequency information for items tracked in the buffer zone (between requested_capacity and target_capacity), degrading the accuracy of the Filtered Space-Saving algorithm for subsequent merges.

Additional Locations (1)
Fix in Cursor Fix in Web

");

Ok(())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests assert non-deterministic order of tied elements

Low Severity

The test_fn_approx_top_k snapshot expects value 1 as the second result, but values 1 and 100 are tied (both count=1, error=0). Similarly, the .slt WHERE test expects c over b when tied. Since top_k uses select_nth_unstable_by, which doesn't guarantee order for equal elements, these tests depend on unspecified implementation behavior and could break across Rust versions.

Additional Locations (1)
Fix in Cursor Fix in Web

@martin-augment
Copy link
Copy Markdown
Owner Author

1. Integer overflow in from_bytes deserialization

In approx_top_k.rs, the bounds check for the alpha map can overflow:

if offset + alpha_map_size * 8 > bytes.len() {

If alpha_map_size is large enough (e.g., > usize::MAX / 8), the multiplication wraps in release builds, defeating the bounds check. An attacker who can supply crafted binary state could cause incorrect behavior. Use checked_mul / checked_add:

let alpha_bytes = alpha_map_size
    .checked_mul(8)
    .and_then(|n| offset.checked_add(n))
    .ok_or_else(|| DataFusionError::Execution("alpha map size overflow".to_string()))?;
if alpha_bytes > bytes.len() { ... }

Similarly, Vec::with_capacity(alpha_map_size) will try to allocate memory proportional to the untrusted value — the bounds check above needs to be correct first.

value:good-to-have; category:bug; feedback: The Claude AI reviewer is correct! The multiplication may lead to an overflow if the map is really big. Using a saturating multiplication would avoid this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants