Conversation
feat: generate test substrait plans & basic parse
…ructs implement helper functions for all operators & test for those helper functions next steps are to implement filter and project operator and test them end to end. after that it should be downhill since other operators follow a very similar pattern
… from grpc execute the plan (go-routines) and write out to s3
…ransform results to csv and upload to s3
There was a problem hiding this comment.
Pull request overview
This PR prepares the “pre-release” branch by switching the gRPC API to accept a base64-encoded custom JSON plan, implementing a Substrait-ish plan parser/executor in Go, and expanding test data + test coverage around plan consumption and S3 I/O.
Changes:
- Update the gRPC contract to send the logical plan as a base64 string and remove the
SourceTypemessage. - Add/expand Substrait plan parsing + execution pipeline (plan → operator tree → execute → CSV → upload).
- Add extensive unit/integration tests and sample plan JSON fixtures.
Reviewed changes
Copilot reviewed 43 out of 44 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Contract/operation.proto | Switch request payload from bytes/source wrapper to a string logical_plan field. |
| src/Backend/test_data/substrait_plans/basic/basic_00_test.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_01_source_filter.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_02_project.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_03_sort.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_04_distinct.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_05_limit.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/basic/basic_06_aggr.json | Adds basic plan fixture for plan consumption tests. |
| src/Backend/test_data/substrait_plans/medium/mid_01_filter_project_sort.json | Adds medium complexity plan fixture. |
| src/Backend/test_data/substrait_plans/medium/mid_02_group_by_aggregate.json | Adds medium complexity plan fixture. |
| src/Backend/test_data/substrait_plans/medium/mid_03_join_filter.json | Adds medium complexity plan fixture. |
| src/Backend/test_data/substrait_plans/medium/mid_04_join_sort_limit.json | Adds medium complexity plan fixture. |
| src/Backend/test_data/s3_source/source.json | Updates test S3 source manifest contents/formatting. |
| src/Backend/opti-sql-go/substrait/substrait_test.go | Adds extensive plan/operator parsing unit tests and cleanup helpers. |
| src/Backend/opti-sql-go/substrait/substrait_integration_test.go | Adds integration tests that build/consume plans from inline objects and JSON files. |
| src/Backend/opti-sql-go/substrait/substrait.go | Implements JSON plan parsing into operator tree + execution helpers. |
| src/Backend/opti-sql-go/substrait/server.go | Updates gRPC server to base64-decode plan, execute, convert to CSV, upload results. |
| src/Backend/opti-sql-go/substrait/operation_grpc.pb.go | Regenerated gRPC stubs after proto changes. |
| src/Backend/opti-sql-go/substrait/operation.pb.go | Regenerated protobuf types after proto changes. |
| src/Backend/opti-sql-go/substrait/format.md | Adds documentation for the custom IR plan format. |
| src/Backend/opti-sql-go/substrait/expr.md | Adds documentation for expression JSON encoding. |
| src/Backend/opti-sql-go/operators/test/t1_test.go | Updates join import path casing. |
| src/Backend/opti-sql-go/operators/test/intergration_test.go | Updates join import path casing. |
| src/Backend/opti-sql-go/operators/record.go | Adds Operator.Name() and a RecordBatch.ToCSV() helper for result export. |
| src/Backend/opti-sql-go/operators/project/source_test.go | Updates local download API and adds an S3 upload test. |
| src/Backend/opti-sql-go/operators/project/s3.go | Changes local download naming; adds UploadResults helper. |
| src/Backend/opti-sql-go/operators/project/projectExec.go | Implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/project/parquet.go | Implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/project/custom.go | Implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/project/csv.go | Implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/filter/limit.go | Exposes limit remaining count; implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/filter/filter.go | Implements Operator.Name(); removes debug prints. |
| src/Backend/opti-sql-go/operators/aggr/sort.go | Implements Operator.Name() for sort operators. |
| src/Backend/opti-sql-go/operators/aggr/singleAggr.go | Adjusts aggregation validation and count handling; implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/aggr/having.go | Implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/aggr/groupBy.go | Adjusts group-by aggregation handling (esp. Count); implements Operator.Name(). |
| src/Backend/opti-sql-go/operators/Join/hashJoin_test.go | Updates Arrow memory import to v17. |
| src/Backend/opti-sql-go/operators/Join/hashJoin.go | Exposes join clause fields; implements Operator.Name(). |
| src/Backend/opti-sql-go/dockerfile | Adds container build recipe for the Go execution server. |
| src/Backend/opti-sql-go/config/config.go | Changes default server host/port to 0.0.0.0:7024. |
| src/Backend/opti-sql-go/Expr/expr_test.go | Fixes type name to exported SupportedFunctions. |
| src/Backend/opti-sql-go/Expr/expr.go | Exports operator/function enums; adds helper mapping functions. |
| README.md | Adds docker build/run and example gRPC payload documentation. |
| .gitignore | Unignores test_data/substrait_plans/** so fixtures are committed. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| operatorNode := m[operator].(string) | ||
| body := m[operatorNode].(map[string]any) |
There was a problem hiding this comment.
buildTree type-asserts m[operatorNode] to map[string]any without checking the key exists or the value type. Invalid/malformed plans will panic instead of returning a parse error. Add a presence/type check (similar to resolveInput) and return a descriptive error when missing/mismatched.
| body := m[operatorNode].(map[string]any) | |
| rawBody, ok := m[operatorNode] | |
| if !ok { | |
| return nil, ErrInvalidSubstraitPlan(fmt.Errorf("missing body for operator %q", operatorNode)) | |
| } | |
| body, ok := rawBody.(map[string]any) | |
| if !ok { | |
| return nil, ErrInvalidSubstraitPlan(fmt.Errorf("body for operator %q must be an object", operatorNode)) | |
| } |
| v, _ := m["value"].(int) | ||
| value = int64(v) |
There was a problem hiding this comment.
LiteralResolve with lit_type: "int" reads value using m["value"].(int), but JSON unmarshalling produces float64 for numbers. This currently turns integer literals into 0. Parse numeric literals robustly (accept float64 and convert) so literals preserve their value.
| v, _ := m["value"].(int) | |
| value = int64(v) | |
| rawVal, ok := m["value"] | |
| if !ok { | |
| return nil, fmt.Errorf("malformed expression body: missing value for int literal") | |
| } | |
| switch v := rawVal.(type) { | |
| case int: | |
| value = int64(v) | |
| case int8: | |
| value = int64(v) | |
| case int16: | |
| value = int64(v) | |
| case int32: | |
| value = int64(v) | |
| case int64: | |
| value = v | |
| case float64: | |
| value = int64(v) | |
| default: | |
| return nil, fmt.Errorf("malformed expression body: value for int literal has incompatible type %T", rawVal) | |
| } |
| e, err := os.ReadDir(customIRPath) | ||
| if err != nil { | ||
| t.Fatalf("failed to open dir with error: %v\n", e) | ||
| } |
There was a problem hiding this comment.
This t.Fatalf prints the directory entries slice (e) instead of the error (err). This makes failures misleading. Use the err value in the fatal message.
README.md
Outdated
| ```bash | ||
| docker buildx build \ | ||
| --platform linux/amd64 \ | ||
| -t rich239/execution-engine:0.9.2 \ ## bump major/minor |
There was a problem hiding this comment.
The docker build example includes an inline comment after a line-continuation backslash, which will break if copy/pasted (the comment becomes part of the command). Consider moving the comment to its own line or removing the trailing backslash before the comment.
| -t rich239/execution-engine:0.9.2 \ ## bump major/minor | |
| -t rich239/execution-engine:0.9.2 \ |
| grpcServer.GracefulStop() | ||
|
|
||
| fmt.Println("Server shutdown complete") | ||
| os.Exit(1) |
There was a problem hiding this comment.
unifiedShutdownHandler calls os.Exit(1) after a graceful shutdown. This forces the process to exit with a failure status code (and skips deferred cleanups in other goroutines). Prefer returning normally, or if an exit is required use a successful exit code (0) and keep shutdown responsibility in main/caller.
| os.Exit(1) |
| for _, f := range results.p.localFileNames { | ||
| if _, err := os.Open(f); !strings.Contains(err.Error(), "no such file or directory") { | ||
| t.Errorf("%s was found when it should have been cleaned up by consumeAll: %v", f, err) | ||
| } |
There was a problem hiding this comment.
This cleanup verification can panic: if os.Open(f) returns nil error, calling err.Error() will dereference nil. Check err != nil before inspecting the message, and treat err == nil as the failure case (file still exists).
| // include random number for the sake of avoiding conflicts, should resolve this at the | ||
| // logical processing step but for now this works | ||
| fName := fmt.Sprintf("%s-%s-%d", strings.ReplaceAll(req.SqlStatement, " ", "-"), req.Id, rand.IntN(1000)) | ||
| if err = project.UploadResults(fName, csv); err != nil { |
There was a problem hiding this comment.
The S3 object name is derived from sql_statement with only spaces replaced. SQL can contain characters that are invalid/problematic in object keys (slashes, quotes, punctuation) and may leak sensitive info into filenames. Consider generating a stable safe key (e.g., UUID or hash of query/id) and/or applying a strict allowlist + length cap.
| return nil, fmt.Errorf("plan missing 'Emit' key") | ||
| } | ||
|
|
||
| emitObj := planMap["Emit"].(map[string]any) |
There was a problem hiding this comment.
ConsumeSubstraitPlan type-asserts planMap["Emit"] to map[string]any without an ok check, which can panic on malformed input. Use a checked assertion and return a structured error when the type is unexpected.
| emitObj := planMap["Emit"].(map[string]any) | |
| emitVal := planMap["Emit"] | |
| emitObj, ok := emitVal.(map[string]any) | |
| if !ok { | |
| return nil, fmt.Errorf("plan 'Emit' key has unexpected type %T, expected object", emitVal) | |
| } |
| return parseGroupBy(newOBJ, plan) | ||
| } | ||
|
|
||
| return nil, nil |
There was a problem hiding this comment.
resolveInput returns (nil, nil) when opName is not recognized. This can lead to nil dereferences later and makes errors hard to diagnose. Return a non-nil error for unsupported operator names so parsing fails fast.
| return nil, nil | |
| return nil, fmt.Errorf("unsupported operator %q", opName) |
|
|
||
| EXPOSE 7024 | ||
|
|
There was a problem hiding this comment.
This Dockerfile runs execution-engine as the default Docker user, which is root in the golang:1.24 base image, so the gRPC service will have full root privileges inside the container. If an attacker ever gains code execution in the service (for example via a bug in query parsing or plan handling), running as root greatly increases the impact by allowing access to mounted host paths and making container-escape vulnerabilities far more damaging. Create a dedicated unprivileged user in this image and add a USER directive before the CMD so the process runs with minimal required privileges.
| EXPOSE 7024 | |
| # Create an unprivileged user and give it ownership of the app directory | |
| RUN useradd -m -u 10001 appuser && chown -R appuser /app | |
| EXPOSE 7024 | |
| # Run the execution engine as the unprivileged user | |
| USER appuser |
58a3b9c to
dfdca8a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 43 out of 44 changed files in this pull request and generated 18 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "source-node": { | ||
| "file-name": "country-full.csv", | ||
| "local": false | ||
| } |
There was a problem hiding this comment.
The Source operator examples wrap the actual source fields inside a source-node object, but the parser expects Source to directly contain file-name and local. Update these examples to match the accepted JSON shape so they’re runnable.
| "source-node": { | |
| "file-name": "country-full.csv", | |
| "local": false | |
| } | |
| "file-name": "country-full.csv", | |
| "local": false |
| fmt.Printf("(IN) json object passed in for expression parsing: %v\n", m) | ||
| err := containsFields([]string{"expr_type"}, m) |
There was a problem hiding this comment.
parseExpression prints the full expression JSON to stdout on every call. This will spam logs and can leak user data in production. Replace with structured logging behind a debug level, or remove entirely.
| fieldTypes := []string{m["lit_type"].(string), "string"} | ||
| err = correctFieldTypes(neededFields, fieldTypes, m) | ||
| if err != nil { |
There was a problem hiding this comment.
In LiteralResolve, m["lit_type"].(string) is accessed before validating the field type. If a client sends a non-string lit_type, this will panic. Validate lit_type is a string (e.g., correctFieldTypes([]string{"lit_type"}, []string{"string"}, m)) before any type assertion.
| docker buildx build \ | ||
| --platform linux/amd64 \ | ||
| -t rich239/execution-engine:0.9.5 \ ## bump major/minor | ||
| -t rich239/execution-engine:latest \ | ||
| --push \ | ||
| . |
There was a problem hiding this comment.
The docker buildx build command is duplicated outside of a fenced code block at the end of the README, which makes the Markdown render oddly and looks accidental. Consider removing it or wrapping it in a proper code block.
| // Use reflection to check if it's any kind of slice/array | ||
| return reflect.TypeOf(value).Kind() == reflect.Slice |
There was a problem hiding this comment.
matchesExpectedType uses reflect.TypeOf(value).Kind() for arrays. If the JSON contains an explicit null for an array field, reflect.TypeOf(nil) returns nil and calling Kind() will panic. Add a value == nil guard before calling reflect.TypeOf.
| // Use reflection to check if it's any kind of slice/array | |
| return reflect.TypeOf(value).Kind() == reflect.Slice | |
| if value == nil { | |
| return false | |
| } | |
| // Use reflection to check if it's any kind of slice/array | |
| t := reflect.TypeOf(value) | |
| k := t.Kind() | |
| return k == reflect.Slice || k == reflect.Array |
| grpcServer.GracefulStop() | ||
|
|
||
| fmt.Println("Server shutdown complete") | ||
| os.Exit(1) |
There was a problem hiding this comment.
unifiedShutdownHandler calls os.Exit(1) after GracefulStop(). Exiting with status 1 signals failure and also bypasses any deferred cleanup in other goroutines. Prefer returning normally (or os.Exit(0) at most), and let the caller control process lifetime.
| } | ||
| results.Columns[i] = joinArr | ||
| } | ||
| results.RowCount += intermediate.RowCount |
There was a problem hiding this comment.
consumeAll concatenates Arrow arrays but never releases the old arrays / intermediate batch arrays after they’re no longer referenced. Since Arrow arrays are reference-counted, this can leak memory across large result sets. Release oldArr/newArr (and any intermediate batch columns) after concatenation, or refactor to stream instead of materializing everything.
| } | |
| results.Columns[i] = joinArr | |
| } | |
| results.RowCount += intermediate.RowCount | |
| } | |
| // replace the old result column with the concatenated array | |
| results.Columns[i] = joinArr | |
| // release the old result array, which is no longer referenced | |
| if oldArr != nil { | |
| oldArr.Release() | |
| } | |
| } | |
| results.RowCount += intermediate.RowCount | |
| // release intermediate batch columns now that they have been merged | |
| for i := range intermediate.Columns { | |
| if intermediate.Columns[i] != nil { | |
| intermediate.Columns[i].Release() | |
| intermediate.Columns[i] = nil | |
| } | |
| } |
| case "Round": | ||
| return 4 | ||
| } | ||
| return 1 |
There was a problem hiding this comment.
FnToScalarFunction returns Upper (1) for unknown function names. That can silently turn invalid input into a different operation. Prefer returning a sentinel (e.g., SupportedFunctions(-1)) and/or a (SupportedFunctions, bool) result so callers can handle unknown names explicitly.
| return 1 | |
| return SupportedFunctions(-1) |
| jc.RightS = append(jc.RightS, rightExpr) | ||
| } | ||
| if len(jc.LeftS) < 1 { | ||
| return jc, fmt.Errorf("join clause cannot be empyy") |
There was a problem hiding this comment.
Typo in error message: "empyy" → "empty".
| return jc, fmt.Errorf("join clause cannot be empyy") | |
| return jc, fmt.Errorf("join clause cannot be empty") |
|
|
||
| } | ||
| if fn == Expr.SupportedFunctions(-1) { | ||
| return nil, fmt.Errorf("invalid scalr function provided %s", function) |
There was a problem hiding this comment.
Typo in error message: "scalr" → "scalar".
| return nil, fmt.Errorf("invalid scalr function provided %s", function) | |
| return nil, fmt.Errorf("invalid scalar function provided %s", function) |
Fix crash state
Feat: Implement garbage collection for s3
4e5292a to
1d5a987
Compare
No description provided.