Conversation
…producing result are next steps
feat:Implement single column aggregations, Implement group by statments, implement having statments
Fix/memory leaks
There was a problem hiding this comment.
Pull request overview
This PR implements a comprehensive sort operator along with aggregation functionality (single aggregates, group by, and having clauses) for the opti-sql-go query execution engine. The implementation uses Apache Arrow for columnar data processing and introduces several new operators that follow the Volcano/Iterator execution model.
Key Changes:
- Implementation of in-memory sort and top-k sort operators with support for multi-column sorting
- Global aggregation operators (MIN, MAX, SUM, COUNT, AVG) with proper null handling
- GROUP BY operator with hash-based grouping and multiple aggregation support
- HAVING clause operator for post-aggregation filtering
- Utility functions for pretty-printing record batches and releasing Arrow arrays
- Refactoring of existing operators to use consistent naming conventions (
inputinstead ofchild)
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
operators/aggr/sort.go |
Implements SortExec and TopKSortExec operators with comparison logic for all Arrow types |
operators/aggr/sort_test.go |
Comprehensive tests for sort functionality including multi-column sorting and type coverage |
operators/aggr/singleAggr.go |
Global aggregation operator with accumulator pattern for MIN, MAX, SUM, COUNT, AVG |
operators/aggr/singleAggr_test.go |
Test coverage for single aggregation functions including null handling |
operators/aggr/groupBy.go |
GROUP BY operator using hash-based grouping with dynamic array building |
operators/aggr/groupBy_test.go |
Tests for group by functionality with multiple grouping columns and aggregates |
operators/aggr/having.go |
HAVING clause operator for filtering grouped results |
operators/aggr/having_test.go |
Test cases for HAVING clause filtering on aggregated data |
operators/record.go |
Adds ReleaseArrays helper, PrettyPrint method, and value formatting utilities |
operators/project/projectExec.go |
Refactors to use input field name and ReleaseArrays helper |
operators/project/custom.go |
Adds NewInMemoryProjectExecFromArrays constructor for test data setup |
operators/project/parquet.go |
Removes TODO comments and cleans up field alignment |
operators/filter/filter.go |
Refactors to use consistent naming and exports ApplyBooleanMask |
Expr/expr.go |
Removes debug print statements and comments |
go.mod |
Updates Go version and adds go-jose dependency |
go.sum |
Adds checksums for new go-jose dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
| if err != nil { | ||
| return nil, err | ||
| } | ||
| boolArr, ok := booleanMask.(*array.Boolean) // impossible for this to not be a boolean array,assuming validPredicates works as it should |
There was a problem hiding this comment.
The comment "impossible for this to not be a boolean array,assuming validPredicates works as it should" contains a typo. Add a space after the comma: "impossible for this to not be a boolean array, assuming validPredicates works as it should".
| boolArr, ok := booleanMask.(*array.Boolean) // impossible for this to not be a boolean array,assuming validPredicates works as it should | |
| boolArr, ok := booleanMask.(*array.Boolean) // impossible for this to not be a boolean array, assuming validPredicates works as it should |
| if t.done { | ||
| return nil, io.EOF | ||
| } | ||
| return nil, nil |
There was a problem hiding this comment.
The TopKSortExec.Next() method always returns nil, nil instead of implementing the actual top-k sort functionality. This creates a non-functional operator that will cause silent failures. The method should either implement the top-k sort logic or return an error indicating it's not yet implemented.
| return nil, nil | |
| return nil, errors.New("TopKSortExec.Next not implemented") |
| func TestOne(t *testing.T) { | ||
| v := compute.GetExecCtx(context.Background()) | ||
| names := v.Registry.GetFunctionNames() | ||
| for i, name := range names { | ||
| fmt.Printf("%d: %v\n", i, name) | ||
| } | ||
| /* | ||
| mem := memory.NewGoAllocator() | ||
| floatB := array.NewFloat64Builder(mem) | ||
| floatB.AppendValues([]float64{10.5, 20.3, 30.1, 40.7, 50.2}, []bool{true, true, true, true, true}) | ||
| pos := array.NewInt32Builder(mem) | ||
| pos.AppendValues([]int32{1, 3, 4}, []bool{true, true, true}) | ||
|
|
||
| dat, err := compute.Take(context.TODO(), *compute.DefaultTakeOptions(), compute.NewDatum(floatB.NewArray()), compute.NewDatum(pos.NewArray())) | ||
| if err != nil { | ||
| t.Fatalf("Take failed: %v", err) | ||
| } | ||
| array, ok := dat.(*compute.ArrayDatum) | ||
| if !ok { | ||
| t.Logf("expected an array to be returned but got something else %T\n", dat) | ||
| } | ||
| t.Logf("data: %v\n", array.MakeArray()) | ||
| */ | ||
| } |
There was a problem hiding this comment.
The TestOne function appears to be a debugging/exploratory test that prints all compute function names. This should either be removed or converted to a meaningful test with assertions. Test functions should verify specific behavior, not just print output.
| func TestOne(t *testing.T) { | |
| v := compute.GetExecCtx(context.Background()) | |
| names := v.Registry.GetFunctionNames() | |
| for i, name := range names { | |
| fmt.Printf("%d: %v\n", i, name) | |
| } | |
| /* | |
| mem := memory.NewGoAllocator() | |
| floatB := array.NewFloat64Builder(mem) | |
| floatB.AppendValues([]float64{10.5, 20.3, 30.1, 40.7, 50.2}, []bool{true, true, true, true, true}) | |
| pos := array.NewInt32Builder(mem) | |
| pos.AppendValues([]int32{1, 3, 4}, []bool{true, true, true}) | |
| dat, err := compute.Take(context.TODO(), *compute.DefaultTakeOptions(), compute.NewDatum(floatB.NewArray()), compute.NewDatum(pos.NewArray())) | |
| if err != nil { | |
| t.Fatalf("Take failed: %v", err) | |
| } | |
| array, ok := dat.(*compute.ArrayDatum) | |
| if !ok { | |
| t.Logf("expected an array to be returned but got something else %T\n", dat) | |
| } | |
| t.Logf("data: %v\n", array.MakeArray()) | |
| */ | |
| } |
| booleanMask, err := Expr.EvalExpression(h.havingExpr, childBatch) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| boolArr, ok := booleanMask.(*array.Boolean) // impossible for this to not be a boolean array,assuming validPredicates works as it should | ||
| if !ok { | ||
| return nil, errors.New("having predicate did not evaluate to boolean array") | ||
| } | ||
| filteredCol := make([]arrow.Array, len(childBatch.Columns)) | ||
| for i, col := range childBatch.Columns { | ||
| filteredCol[i], err = filter.ApplyBooleanMask(col, boolArr) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| // release old columns | ||
| operators.ReleaseArrays(childBatch.Columns) |
There was a problem hiding this comment.
The booleanMask array should be released after use to prevent memory leaks. Add booleanMask.Release() after line 62 or use defer booleanMask.Release() after line 48.
| } | ||
|
|
||
| func NewSortExec(child operators.Operator, sortKeys []SortKey) (*SortExec, error) { | ||
| fmt.Printf("sorts Keys %v\n", sortKeys) |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Use a proper logging framework or remove this line entirely.
| fmt.Printf("sorts Keys %v\n", sortKeys) |
| } | ||
|
|
||
| }) | ||
| t.Run("tok k sort exec init", func(t *testing.T) { |
There was a problem hiding this comment.
This typo in the test function name "tok k" should be "top k" to properly describe the test's purpose.
| t.Run("tok k sort exec init", func(t *testing.T) { | |
| t.Run("top k sort exec init", func(t *testing.T) { |
| } | ||
|
|
||
| func NewTopKSortExec(child operators.Operator, sortKeys []SortKey, k uint16) (*TopKSortExec, error) { | ||
| fmt.Printf("sort keys %v\n", sortKeys) |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Use a proper logging framework or remove this line entirely.
| fmt.Printf("sort keys %v\n", sortKeys) |
| type SortKey struct { | ||
| Expr Expr.Expression | ||
| Ascending bool // by default false -- DESC (highest values first -> smaller values) | ||
| NullFirst bool // by default false -- nulls last |
There was a problem hiding this comment.
The NullFirst field in the SortKey struct is defined but never used in the sorting logic. The compareArrowValues function always treats nulls as the lowest value (line 260-267), ignoring the NullFirst option. Either implement proper null handling based on this field or remove it if it's not needed.
| done bool // know when to return io.EOF | ||
| } | ||
|
|
||
| func NewGlobalAggrExec(child operators.Operator, aggExprs []AggregateFunctions) (*AggrExec, error) { |
There was a problem hiding this comment.
Inconsistent naming: the operator struct is named AggrExec but the constructor is NewGlobalAggrExec. For consistency with other operators in the codebase, either rename the struct to GlobalAggrExec or rename the constructor to NewAggrExec.
| func NewGlobalAggrExec(child operators.Operator, aggExprs []AggregateFunctions) (*AggrExec, error) { | |
| func NewAggrExec(child operators.Operator, aggExprs []AggregateFunctions) (*AggrExec, error) { |
| "github.com/apache/arrow/go/v17/arrow/array" | ||
| ) | ||
|
|
||
| // carbon copy of filter.go with minor changes to fit having semantics |
There was a problem hiding this comment.
[nitpick] The comment "carbon copy of filter.go with minor changes to fit having semantics" suggests code duplication. Consider refactoring to share common logic between HavingExec and FilterExec, possibly by extracting the filtering logic into a shared helper function or creating a generic filter operator that both can use.
No description provided.