Skip to content

Commit a9de3db

Browse files
Richard BaahRichard Baah
authored andcommitted
feature:ParquetSource operator is implemented
1 parent 7159b86 commit a9de3db

File tree

13 files changed

+1212
-56
lines changed

13 files changed

+1212
-56
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,7 @@ src/Backend/test_data/json
106106

107107
# Allow a specific CSV dataset that we want tracked despite the general csv ignores
108108
!src/Backend/test_data/csv/
109-
!src/Backend/test_data/csv/Mental_Health_and_Social_Media_Balance_Dataset.csv
109+
!src/Backend/test_data/csv/Mental_Health_and_Social_Media_Balance_Dataset.csv
110+
# allow parquet file
111+
!src/Backend/test_data/parquet/
112+
!src/Backend/test_data/parquet/capitals_clean.parquet

src/Backend/opti-sql-go/config/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ type batchConfig struct {
3232
EnableParallelRead bool `yaml:"enable_parallel_read"`
3333
MaxMemoryBeforeSpill uint64 `yaml:"max_memory_before_spill"`
3434
MaxFileSizeMB int `yaml:"max_file_size_mb"` // max size of a single file
35-
ShouldDowndload bool `yaml:"should_download"`
36-
MaxDownloadSizeMB int `yaml:"max_download_size_mb"` // max size to download from external sources like S3
35+
// TODO: add test for these two fileds, just add to existing test
36+
ShouldDowndload bool `yaml:"should_download"`
37+
MaxDownloadSizeMB int `yaml:"max_download_size_mb"` // max size to download from external sources like S3
3738
}
3839
type queryConfig struct {
3940
// should results be cached, server side? if so how long

src/Backend/opti-sql-go/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
)
1212

1313
require (
14+
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
1415
github.com/andybalholm/brotli v1.1.0 // indirect
1516
github.com/apache/thrift v0.20.0 // indirect
1617
github.com/goccy/go-json v0.10.3 // indirect
@@ -21,6 +22,7 @@ require (
2122
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
2223
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
2324
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
25+
github.com/pierrec/lz4/v4 v4.1.21 // indirect
2426
github.com/zeebo/xxh3 v1.0.2 // indirect
2527
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
2628
golang.org/x/mod v0.18.0 // indirect

src/Backend/opti-sql-go/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
2+
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
13
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
24
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
35
github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE=
Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,24 @@
11
package filter
22

3-
// handle Bitwise operations here as well
3+
import (
4+
"github.com/apache/arrow/go/v17/arrow"
5+
"github.com/apache/arrow/go/v17/arrow/array"
6+
)
47

5-
// OR
6-
// AND
7-
// NOT
8+
// FilterExpr takes in a field and column and yeildss a function that takes in an index and returns a bool indicating whether the row at that index satisfies the filter condition.
9+
type FilterExpr func(filed arrow.Field, col arrow.Array) func(i int) bool
10+
11+
// example
12+
func ExampleFilterExpr(field arrow.Field, col arrow.Array) func(i int) bool {
13+
{
14+
if field.Name == "age" && col.DataType().ID() == arrow.INT32 {
15+
return func(i int) bool {
16+
val := col.(*array.Int32).Value(i)
17+
return val > 30
18+
}
19+
}
20+
return func(i int) bool {
21+
return true
22+
}
23+
}
24+
}

src/Backend/opti-sql-go/operators/project/source/csv.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"github.com/apache/arrow/go/v17/arrow/array"
1414
)
1515

16-
type ProjectCSVLeaf struct {
16+
// TODO: change the leaf stuff to be called scans instead
17+
18+
type CSVSource struct {
1719
r *csv.Reader
1820
schema *arrow.Schema // columns to project as well as types to cast to
1921
colPosition map[string]int
@@ -22,9 +24,9 @@ type ProjectCSVLeaf struct {
2224
}
2325

2426
// assume everything is on disk for now
25-
func NewProjectCSVLeaf(source io.Reader) (*ProjectCSVLeaf, error) {
27+
func NewProjectCSVLeaf(source io.Reader) (*CSVSource, error) {
2628
r := csv.NewReader(source)
27-
proj := &ProjectCSVLeaf{
29+
proj := &CSVSource{
2830
r: r,
2931
colPosition: make(map[string]int),
3032
}
@@ -34,31 +36,32 @@ func NewProjectCSVLeaf(source io.Reader) (*ProjectCSVLeaf, error) {
3436
return proj, err
3537
}
3638

37-
func (pcsv *ProjectCSVLeaf) Next(n uint64) (*operators.RecordBatch, error) {
38-
if pcsv.done {
39+
func (csvS *CSVSource) Next(n uint64) (*operators.RecordBatch, error) {
40+
if csvS.done {
3941
return nil, io.EOF
4042
}
4143

4244
// 1. Create builders
43-
builders := pcsv.initBuilders()
45+
builders := csvS.initBuilders()
4446

4547
rowsRead := uint64(0)
4648

4749
// Process stored first row (from parseHeader) ---
48-
if pcsv.firstDataRow != nil && rowsRead < n {
49-
if err := pcsv.processRow(pcsv.firstDataRow, builders); err != nil {
50+
if csvS.firstDataRow != nil && rowsRead < n {
51+
fmt.Printf("First row: %v\n", csvS.firstDataRow)
52+
if err := csvS.processRow(csvS.firstDataRow, builders); err != nil {
5053
return nil, err
5154
}
52-
pcsv.firstDataRow = nil // consume it once
55+
csvS.firstDataRow = nil // consume it once
5356
rowsRead++
5457
}
5558

5659
// Stream remaining rows from CSV reader ---
5760
for rowsRead < n {
58-
row, err := pcsv.r.Read()
61+
row, err := csvS.r.Read()
5962
if err == io.EOF {
6063
if rowsRead == 0 {
61-
pcsv.done = true
64+
csvS.done = true
6265
return nil, io.EOF
6366
}
6467
break
@@ -68,24 +71,24 @@ func (pcsv *ProjectCSVLeaf) Next(n uint64) (*operators.RecordBatch, error) {
6871
}
6972

7073
// append to builders
71-
if err := pcsv.processRow(row, builders); err != nil {
74+
if err := csvS.processRow(row, builders); err != nil {
7275
return nil, err
7376
}
7477

7578
rowsRead++
7679
}
7780

7881
// Freeze into Arrow arrays
79-
columns := pcsv.finalizeBuilders(builders)
82+
columns := csvS.finalizeBuilders(builders)
8083

8184
return &operators.RecordBatch{
82-
Schema: pcsv.schema,
85+
Schema: csvS.schema,
8386
Columns: columns,
8487
}, nil
8588
}
8689

87-
func (pcsv *ProjectCSVLeaf) initBuilders() []array.Builder {
88-
fields := pcsv.schema.Fields()
90+
func (csvS *CSVSource) initBuilders() []array.Builder {
91+
fields := csvS.schema.Fields()
8992
builders := make([]array.Builder, len(fields))
9093

9194
for i, f := range fields {
@@ -94,14 +97,14 @@ func (pcsv *ProjectCSVLeaf) initBuilders() []array.Builder {
9497

9598
return builders
9699
}
97-
func (pcsv *ProjectCSVLeaf) processRow(
100+
func (csvS *CSVSource) processRow(
98101
content []string,
99102
builders []array.Builder,
100103
) error {
101-
fields := pcsv.schema.Fields()
102-
104+
fields := csvS.schema.Fields()
105+
fmt.Printf("content : %v\n", content)
103106
for i, f := range fields {
104-
colIdx := pcsv.colPosition[f.Name]
107+
colIdx := csvS.colPosition[f.Name]
105108
cell := content[colIdx]
106109

107110
switch b := builders[i].(type) {
@@ -143,7 +146,7 @@ func (pcsv *ProjectCSVLeaf) processRow(
143146

144147
return nil
145148
}
146-
func (pcsv *ProjectCSVLeaf) finalizeBuilders(builders []array.Builder) []arrow.Array {
149+
func (csvS *CSVSource) finalizeBuilders(builders []array.Builder) []arrow.Array {
147150
columns := make([]arrow.Array, len(builders))
148151

149152
for i, b := range builders {
@@ -155,16 +158,16 @@ func (pcsv *ProjectCSVLeaf) finalizeBuilders(builders []array.Builder) []arrow.A
155158
}
156159

157160
// first call to csv.Reader
158-
func (pscv *ProjectCSVLeaf) parseHeader() (*arrow.Schema, error) {
159-
header, err := pscv.r.Read()
161+
func (csvS *CSVSource) parseHeader() (*arrow.Schema, error) {
162+
header, err := csvS.r.Read()
160163
if err != nil {
161164
return nil, err
162165
}
163-
firstDataRow, err := pscv.r.Read()
166+
firstDataRow, err := csvS.r.Read()
164167
if err != nil {
165168
return nil, err
166169
}
167-
pscv.firstDataRow = firstDataRow
170+
csvS.firstDataRow = firstDataRow
168171
newFields := make([]arrow.Field, 0, len(header))
169172
for i, colName := range header {
170173
sampleValue := firstDataRow[i]
@@ -173,7 +176,7 @@ func (pscv *ProjectCSVLeaf) parseHeader() (*arrow.Schema, error) {
173176
Type: parseDataType(sampleValue),
174177
Nullable: true,
175178
})
176-
pscv.colPosition[colName] = i
179+
csvS.colPosition[colName] = i
177180
}
178181
return arrow.NewSchema(newFields, nil), nil
179182
}

src/Backend/opti-sql-go/operators/project/source/csv_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/apache/arrow/go/v17/arrow"
1111
"github.com/apache/arrow/go/v17/arrow/array"
12+
"github.com/apache/arrow/go/v17/arrow/memory"
1213
)
1314

1415
const csvFilePath = "../../../../test_data/csv/Mental_Health_and_Social_Media_Balance_Dataset.csv"
@@ -943,6 +944,29 @@ func TestIntegrationWithRealFile(t *testing.T) {
943944
}
944945
})
945946
}
947+
func TestProccessFirstLine(t *testing.T) {
948+
v := getTestFile()
949+
p, err := NewProjectCSVLeaf(v)
950+
if err != nil {
951+
t.Errorf("Failed to create ProjectCSVLeaf: %v", err)
952+
}
953+
defer func() {
954+
if err := v.Close(); err != nil {
955+
t.Fatalf("failed to close: %v", err)
956+
}
957+
}()
958+
var builders []array.Builder
959+
for range len(p.schema.Fields()) {
960+
builder := array.NewBuilder(memory.DefaultAllocator, &arrow.Date64Type{})
961+
defer builder.Release()
962+
builders = append(builders, builder)
963+
}
964+
err = p.processRow([]string{"1", "alice", "95.5", "true"}, builders)
965+
if err == nil {
966+
t.Errorf("Expected error for empty row, got nil")
967+
}
968+
969+
}
946970

947971
/*
948972
func TestLargercsvFile(t *testing.T) {

src/Backend/opti-sql-go/operators/project/source/custom.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ var (
2222
}
2323
)
2424

25-
type InMemoryProjectExec struct {
25+
type InMemorySource struct {
2626
schema *arrow.Schema
2727
columns []arrow.Array
2828
pos uint64
2929
fieldToColIDx map[string]int
3030
}
3131

32-
func NewInMemoryProjectExec(names []string, columns []any) (*InMemoryProjectExec, error) {
32+
func NewInMemoryProjectExec(names []string, columns []any) (*InMemorySource, error) {
3333
if len(names) != len(columns) {
3434
return nil, operators.ErrInvalidSchema("number of column names and columns do not match")
3535
}
@@ -49,51 +49,50 @@ func NewInMemoryProjectExec(names []string, columns []any) (*InMemoryProjectExec
4949
arrays = append(arrays, arr)
5050
fieldToColIDx[field.Name] = i
5151
}
52-
return &InMemoryProjectExec{
52+
return &InMemorySource{
5353
schema: arrow.NewSchema(fields, nil),
5454
columns: arrays,
5555
fieldToColIDx: fieldToColIDx,
5656
}, nil
5757
}
58-
func (ime *InMemoryProjectExec) withFields(names ...string) error {
58+
func (ms *InMemorySource) withFields(names ...string) error {
5959

60-
newSchema, cols, err := project.ProjectSchemaFilterDown(ime.schema, ime.columns, names...)
60+
newSchema, cols, err := project.ProjectSchemaFilterDown(ms.schema, ms.columns, names...)
6161
if err != nil {
6262
return err
6363
}
6464
newMap := make(map[string]int)
6565
for i, f := range newSchema.Fields() {
6666
newMap[f.Name] = i
67-
fmt.Printf("%s:%d", f.Name, i)
6867
}
69-
ime.schema = newSchema
70-
ime.fieldToColIDx = newMap
71-
ime.columns = cols
68+
ms.schema = newSchema
69+
ms.fieldToColIDx = newMap
70+
ms.columns = cols
7271
return nil
7372
}
74-
func (ime *InMemoryProjectExec) Next(n uint64) (*operators.RecordBatch, error) {
75-
if ime.pos >= uint64(ime.columns[0].Len()) {
73+
func (ms *InMemorySource) Next(n uint64) (*operators.RecordBatch, error) {
74+
if ms.pos >= uint64(ms.columns[0].Len()) {
7675
return nil, io.EOF // EOF
7776
}
7877
var currRows uint64 = 0
79-
outPutCols := make([]arrow.Array, len(ime.schema.Fields()))
78+
outPutCols := make([]arrow.Array, len(ms.schema.Fields()))
8079

81-
for i, field := range ime.schema.Fields() {
82-
col := ime.columns[ime.fieldToColIDx[field.Name]]
80+
for i, field := range ms.schema.Fields() {
81+
col := ms.columns[ms.fieldToColIDx[field.Name]]
8382
colLen := uint64(col.Len())
84-
remaining := colLen - ime.pos
83+
remaining := colLen - ms.pos
8584
toRead := n
8685
if remaining < n {
8786
toRead = remaining
8887
}
89-
slice := array.NewSlice(col, int64(ime.pos), int64(ime.pos+toRead))
88+
slice := array.NewSlice(col, int64(ms.pos), int64(ms.pos+toRead))
9089
outPutCols[i] = slice
9190
currRows = toRead
9291
}
93-
ime.pos += currRows
92+
ms.pos += currRows
9493

9594
return &operators.RecordBatch{
96-
Schema: ime.schema,
95+
Schema: ms.schema,
9796
Columns: outPutCols,
9897
}, nil
9998
}

0 commit comments

Comments
 (0)