Skip to content

Commit ab9efdf

Browse files
committed
[WIP - push over me]
1 parent 669d576 commit ab9efdf

File tree

27 files changed

+730
-155
lines changed

27 files changed

+730
-155
lines changed

internal/execution/queries/nodes/access.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88

99
type AccessStrategy interface {
1010
Serialize(w serialization.IndentWriter)
11-
Filter() impls.Expression
12-
Ordering() impls.OrderExpression
1311
Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error)
1412
}
1513

internal/execution/queries/nodes/access/strategy_index.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package access
33
import (
44
"fmt"
55

6-
"github.com/efritz/gostgres/internal/execution/expressions"
76
"github.com/efritz/gostgres/internal/execution/queries/nodes"
87
"github.com/efritz/gostgres/internal/execution/serialization"
98
"github.com/efritz/gostgres/internal/shared/impls"
@@ -12,38 +11,29 @@ import (
1211
)
1312

1413
type indexAccessStrategy[O impls.ScanOptions] struct {
15-
table impls.Table
16-
index impls.Index[O]
17-
opts O
14+
table impls.Table
15+
index impls.Index[O]
16+
opts O
17+
condition impls.Expression
1818
}
1919

20-
func NewIndexAccessStrategy[O impls.ScanOptions](table impls.Table, index impls.Index[O], opts O) nodes.AccessStrategy {
20+
func NewIndexAccessStrategy[O impls.ScanOptions](table impls.Table, index impls.Index[O], opts O, condition impls.Expression) nodes.AccessStrategy {
2121
return &indexAccessStrategy[O]{
22-
table: table,
23-
index: index,
24-
opts: opts,
22+
table: table,
23+
index: index,
24+
opts: opts,
25+
condition: condition,
2526
}
2627
}
2728

2829
func (s *indexAccessStrategy[ScanOptions]) Serialize(w serialization.IndentWriter) {
2930
w.WritefLine(s.index.Description(s.opts))
3031

31-
if filter := s.Filter(); filter != nil {
32-
w.Indent().WritefLine("index cond: %s", filter)
32+
if s.condition != nil {
33+
w.Indent().WritefLine("index cond: %s", s.condition)
3334
}
3435
}
3536

36-
func (s *indexAccessStrategy[ScanOptions]) Filter() impls.Expression {
37-
filterExpression := s.index.Filter()
38-
condition := s.index.Condition(s.opts)
39-
40-
return expressions.UnionFilters(append(expressions.Conjunctions(filterExpression), expressions.Conjunctions(condition)...)...)
41-
}
42-
43-
func (s *indexAccessStrategy[ScanOptions]) Ordering() impls.OrderExpression {
44-
return s.index.Ordering(s.opts)
45-
}
46-
4737
func (s *indexAccessStrategy[ScanOptions]) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error) {
4838
ctx.Log("Building Index Access scanner Strategy")
4939

internal/execution/queries/nodes/access/strategy_table.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@ func (s *tableAccessStrategy) Serialize(w serialization.IndentWriter) {
2222
w.WritefLine("table scan of %s", s.table.Name())
2323
}
2424

25-
func (s *tableAccessStrategy) Filter() impls.Expression {
26-
return nil
27-
}
28-
29-
func (s *tableAccessStrategy) Ordering() impls.OrderExpression {
30-
return nil
31-
}
32-
3325
func (s *tableAccessStrategy) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error) {
3426
ctx.Log("Building Table Access Strategy scanner")
3527

internal/execution/queries/nodes/combination/append.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ func (n *appendNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error
3838
return nil, err
3939
}
4040

41-
var rightScanner scan.RowScanner
41+
rightScanner, err := n.right.Scanner(ctx)
42+
if err != nil {
43+
return nil, err
44+
}
4245

4346
return scan.RowScannerFunc(func() (rows.Row, error) {
4447
ctx.Log("Scanning Append")
@@ -57,20 +60,11 @@ func (n *appendNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error
5760
return row, nil
5861
}
5962

60-
if rightScanner == nil {
61-
rightScanner, err = n.right.Scanner(ctx)
62-
if err != nil {
63-
return rows.Row{}, err
64-
}
63+
row, err := rightScanner.Scan()
64+
if err != nil {
65+
return rows.Row{}, err
6566
}
6667

67-
for {
68-
row, err := rightScanner.Scan()
69-
if err != nil {
70-
return rows.Row{}, err
71-
}
72-
73-
return rows.NewRow(n.fields, row.Values)
74-
}
68+
return rows.NewRow(n.fields, row.Values)
7569
}), nil
7670
}

internal/execution/queries/nodes/combination/except.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (n *exceptNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error
6464
rc, ok := rowsToIgnore[key]
6565
if !ok {
6666
if n.distinct {
67-
// This is an "EXCEPT DISTINCT" queryh, and we need to track the set of rows
67+
// This is an "EXCEPT DISTINCT" query, and we need to track the set of rows
6868
// we emit so we don't emit a duplicate later. We can reuse the same list.
6969
// The value of the count here is not relevant, as we only adjust and check
7070
// this value for "EXCEPT ALL" queries.

internal/execution/queries/nodes/combination/union.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ func (n *unionNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error)
5050
return nil, err
5151
}
5252

53-
var rightScanner scan.RowScanner
53+
rightScanner, err := n.right.Scanner(ctx)
54+
if err != nil {
55+
return nil, err
56+
}
5457

5558
return scan.RowScannerFunc(func() (rows.Row, error) {
5659
ctx.Log("Scanning Union")
@@ -65,28 +68,23 @@ func (n *unionNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error)
6568

6669
return rows.Row{}, err
6770
}
68-
69-
if mark(row) {
70-
return row, nil
71+
if !mark(row) {
72+
continue
7173
}
72-
}
7374

74-
if rightScanner == nil {
75-
rightScanner, err = n.right.Scanner(ctx)
76-
if err != nil {
77-
return rows.Row{}, err
78-
}
75+
return row, nil
7976
}
8077

8178
for {
8279
row, err := rightScanner.Scan()
8380
if err != nil {
8481
return rows.Row{}, err
8582
}
86-
87-
if mark(row) {
88-
return rows.NewRow(n.fields, row.Values)
83+
if !mark(row) {
84+
continue
8985
}
86+
87+
return rows.NewRow(n.fields, row.Values)
9088
}
9189
}), nil
9290
}
Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package mutation
22

33
import (
4+
"github.com/efritz/gostgres/internal/execution/projection"
45
"github.com/efritz/gostgres/internal/execution/queries/nodes"
6+
projectionHelpers "github.com/efritz/gostgres/internal/execution/queries/nodes/projection"
57
"github.com/efritz/gostgres/internal/execution/serialization"
68
"github.com/efritz/gostgres/internal/shared/impls"
79
"github.com/efritz/gostgres/internal/shared/rows"
@@ -10,37 +12,66 @@ import (
1012

1113
type deleteNode struct {
1214
nodes.Node
13-
table impls.Table
14-
aliasName string
15+
table impls.Table
16+
aliasName string
17+
projection *projection.Projection
1518
}
1619

17-
func NewDelete(node nodes.Node, table impls.Table, aliasName string) nodes.Node {
20+
func NewDelete(node nodes.Node, table impls.Table, aliasName string, projection *projection.Projection) nodes.Node {
1821
return &deleteNode{
19-
Node: node,
20-
table: table,
21-
aliasName: aliasName,
22+
Node: node,
23+
table: table,
24+
aliasName: aliasName,
25+
projection: projection,
2226
}
2327
}
2428

2529
func (n *deleteNode) Serialize(w serialization.IndentWriter) {
2630
w.WritefLine("delete from %s", n.table.Name())
2731
n.Node.Serialize(w.Indent())
32+
33+
if n.projection != nil {
34+
w.WritefLine("returning %s", n.projection)
35+
n.Node.Serialize(w.Indent())
36+
}
2837
}
2938

3039
func (n *deleteNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error) {
3140
ctx.Log("Building Delete scanner")
3241

33-
scanner, err := n.Node.Scanner(ctx)
42+
deletedRows, err := n.deleteRows(ctx)
3443
if err != nil {
3544
return nil, err
3645
}
3746

3847
return scan.RowScannerFunc(func() (rows.Row, error) {
3948
ctx.Log("Scanning Delete")
4049

50+
if len(deletedRows) != 0 {
51+
return rows.Row{}, scan.ErrNoRows
52+
}
53+
54+
row := deletedRows[0]
55+
deletedRows = deletedRows[1:]
56+
return projectionHelpers.Project(ctx, row, n.projection)
57+
}), nil
58+
}
59+
60+
func (n *deleteNode) deleteRows(ctx impls.ExecutionContext) ([]rows.Row, error) {
61+
scanner, err := n.Node.Scanner(ctx)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
var deletedRows []rows.Row
67+
for {
4168
row, err := scanner.Scan()
4269
if err != nil {
43-
return rows.Row{}, err
70+
if err == scan.ErrNoRows {
71+
break
72+
}
73+
74+
return nil, err
4475
}
4576

4677
relationName := n.table.Name()
@@ -50,17 +81,21 @@ func (n *deleteNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error
5081

5182
tidRow, err := row.IsolateTID(relationName)
5283
if err != nil {
53-
return rows.Row{}, err
84+
return nil, err
5485
}
5586

5687
deletedRow, ok, err := n.table.Delete(tidRow)
5788
if err != nil {
58-
return rows.Row{}, err
89+
return nil, err
5990
}
6091
if !ok {
61-
return rows.Row{}, scan.ErrNoRows
92+
continue
6293
}
6394

64-
return deletedRow, nil
65-
}), nil
95+
if n.projection != nil {
96+
deletedRows = append(deletedRows, deletedRow)
97+
}
98+
}
99+
100+
return deletedRows, nil
66101
}

internal/execution/queries/nodes/mutation/insert.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package mutation
33
import (
44
"fmt"
55

6+
"github.com/efritz/gostgres/internal/execution/projection"
67
"github.com/efritz/gostgres/internal/execution/queries/nodes"
8+
projectionHelpers "github.com/efritz/gostgres/internal/execution/queries/nodes/projection"
79
"github.com/efritz/gostgres/internal/execution/serialization"
810
"github.com/efritz/gostgres/internal/shared/fields"
911
"github.com/efritz/gostgres/internal/shared/impls"
@@ -15,24 +17,50 @@ type insertNode struct {
1517
nodes.Node
1618
table impls.Table
1719
columnNames []string
20+
projection *projection.Projection
1821
}
1922

20-
func NewInsert(node nodes.Node, table impls.Table, columnNames []string) nodes.Node {
23+
func NewInsert(node nodes.Node, table impls.Table, columnNames []string, projection *projection.Projection) nodes.Node {
2124
return &insertNode{
2225
Node: node,
2326
table: table,
2427
columnNames: columnNames,
28+
projection: projection,
2529
}
2630
}
2731

2832
func (n *insertNode) Serialize(w serialization.IndentWriter) {
2933
w.WritefLine("insert into %s", n.table.Name())
3034
n.Node.Serialize(w.Indent())
35+
36+
if n.projection != nil {
37+
w.WritefLine("returning %s", n.projection)
38+
n.Node.Serialize(w.Indent())
39+
}
3140
}
3241

3342
func (n *insertNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error) {
3443
ctx.Log("Building Insert scanner")
3544

45+
insertedRows, err := n.insertRows(ctx)
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
return scan.RowScannerFunc(func() (rows.Row, error) {
51+
ctx.Log("Scanning Insert")
52+
53+
if len(insertedRows) != 0 {
54+
return rows.Row{}, scan.ErrNoRows
55+
}
56+
57+
row := insertedRows[0]
58+
insertedRows = insertedRows[1:]
59+
return projectionHelpers.Project(ctx, row, n.projection)
60+
}), nil
61+
}
62+
63+
func (n *insertNode) insertRows(ctx impls.ExecutionContext) ([]rows.Row, error) {
3664
scanner, err := n.Node.Scanner(ctx)
3765
if err != nil {
3866
return nil, err
@@ -50,31 +78,38 @@ func (n *insertNode) Scanner(ctx impls.ExecutionContext) (scan.RowScanner, error
5078
fields = append(fields, field.Field)
5179
}
5280

53-
return scan.RowScannerFunc(func() (rows.Row, error) {
54-
ctx.Log("Scanning Insert")
55-
81+
var insertedRows []rows.Row
82+
for {
5683
row, err := scanner.Scan()
5784
if err != nil {
58-
return rows.Row{}, err
85+
if err == scan.ErrNoRows {
86+
break
87+
}
88+
89+
return nil, err
5990
}
6091

6192
values, err := n.prepareValuesForRow(ctx, row, nonInternalFields)
6293
if err != nil {
63-
return rows.Row{}, err
94+
return nil, err
6495
}
6596

6697
insertedRow, err := rows.NewRow(fields, values)
6798
if err != nil {
68-
return rows.Row{}, err
99+
return nil, err
69100
}
70101

71102
insertedRow, err = n.table.Insert(ctx, insertedRow)
72103
if err != nil {
73-
return rows.Row{}, err
104+
return nil, err
74105
}
75106

76-
return insertedRow, nil
77-
}), nil
107+
if n.projection != nil {
108+
insertedRows = append(insertedRows, insertedRow)
109+
}
110+
}
111+
112+
return insertedRows, nil
78113
}
79114

80115
func (n *insertNode) prepareValuesForRow(ctx impls.ExecutionContext, row rows.Row, fields []impls.TableField) ([]any, error) {

0 commit comments

Comments
 (0)