Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ start-kafka:
-e KAFKA_CLIENT_PASSWORDS=adminpassword \
-e KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN \
-e KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN \
bitnami/kafka:latest
bitnamilegacy/kafka:4.0.0-debian-12-r10
printf '%s\n' \
'security.protocol=SASL_PLAINTEXT' \
'sasl.mechanism=PLAIN' \
Expand Down
49 changes: 0 additions & 49 deletions engine/core/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,55 +96,6 @@ func (l *LookupVertsLabelIndex) Process(ctx context.Context, man gdbi.Manager, i

////////////////////////////////////////////////////////////////////////////////

// LookupEdges starts query by looking up edges
type LookupEdges struct {
db gdbi.GraphInterface
ids []string
loadData bool
}

// Process runs LookupEdges
func (l *LookupEdges) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, out gdbi.OutPipe) context.Context {
go func() {
defer close(out)
for t := range in {
if t.IsSignal() {
out <- t
continue
}
if len(l.ids) == 0 {
for v := range l.db.GetEdgeList(ctx, l.loadData) {
out <- t.AddCurrent(&gdbi.DataElement{
ID: v.ID,
Label: v.Label,
From: v.From,
To: v.To,
Data: v.Data,
Loaded: v.Loaded,
})
}
} else {
for _, i := range l.ids {
v := l.db.GetEdge(i, l.loadData)
if v != nil {
out <- t.AddCurrent(&gdbi.DataElement{
ID: v.ID,
Label: v.Label,
From: v.From,
To: v.To,
Data: v.Data,
Loaded: v.Loaded,
})
}
}
}
}
}()
return ctx
}

////////////////////////////////////////////////////////////////////////////////

// Fields selects fields from current element
type Fields struct {
keys []string
Expand Down
76 changes: 1 addition & 75 deletions existing-sql/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,80 +214,6 @@ func (g *Graph) VertexLabelScan(ctx context.Context, label string) chan string {
return o
}

// GetEdgeList produces a channel of all edges in the graph
func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge {
o := make(chan *gdbi.Edge, 100)
go func() {
defer close(o)
for _, edgeSchema := range g.schema.Edges {
q := ""
switch edgeSchema.Table {
case "":
q = fmt.Sprintf("SELECT %s.%s, %s.%s FROM %s INNER JOIN %s ON %s.%s=%s.%s",
// SELECT
edgeSchema.From.DestTable, g.schema.GetVertexGid(edgeSchema.From.DestTable),
edgeSchema.To.DestTable, g.schema.GetVertexGid(edgeSchema.To.DestTable),
// FROM
edgeSchema.From.DestTable,
// INNER JOIN
edgeSchema.To.DestTable,
// ON
edgeSchema.From.DestTable, edgeSchema.From.DestField,
edgeSchema.To.DestTable, edgeSchema.To.DestField,
)
rows, err := g.db.QueryxContext(ctx, q)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: QueryxContext")
return
}
defer rows.Close()
for rows.Next() {
var fromGid, toGid string
if err := rows.Scan(&fromGid, &toGid); err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: Scan")
return
}
geid := &generatedEdgeID{edgeSchema.Label, edgeSchema.From.DestTable, fromGid, edgeSchema.To.DestTable, toGid}
edge := geid.Edge()
o <- gdbi.NewElementFromEdge(edge)
}
if err := rows.Err(); err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating")
return
}

default:
q = fmt.Sprintf("SELECT * FROM %s", edgeSchema.Table)
rows, err := g.db.QueryxContext(ctx, q)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: QueryxContext")
return
}
types, err := columnTypeMap(rows)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: columnTypeMap")
return
}

defer rows.Close()
for rows.Next() {
data := make(map[string]interface{})
if err := rows.MapScan(data); err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: MapScan")
return
}
o <- gdbi.NewElementFromEdge(rowDataToEdge(edgeSchema, data, types, load))
}
if err := rows.Err(); err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating")
return
}
}
}
}()
return o
}

// GetVertexChannel is passed a channel of vertex ids and it produces a channel
// of vertices
func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool) chan gdbi.ElementLookup {
Expand Down Expand Up @@ -331,7 +257,7 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL
}
defer rows.Close()
for rows.Next() {
data := make(map[string]interface{})
data := make(map[string]any)
if err := rows.MapScan(data); err != nil {
log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: MapScan")
return
Expand Down
1 change: 0 additions & 1 deletion gdbi/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ type GraphInterface interface {
GetVertexIndexList() <-chan *gripql.IndexID

GetVertexList(ctx context.Context, load bool) <-chan *Vertex
GetEdgeList(ctx context.Context, load bool) <-chan *Edge

GetVertexChannel(ctx context.Context, req chan ElementLookup, load bool) chan ElementLookup
GetOutChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup
Expand Down
14 changes: 6 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
module github.com/bmeg/grip

go 1.24

toolchain go1.24.2
go 1.24.2

require (
github.com/IBM/sarama v1.45.1
github.com/Shopify/sarama v1.38.1
github.com/Workiva/go-datastructures v1.1.5
github.com/akrylysov/pogreb v0.10.2
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bmeg/benchtop v0.0.0-20250827195345-9810354883b9
github.com/bmeg/benchtop v0.0.0-20251027212658-046a256eb6fa
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad
github.com/bmeg/jsonschema/v6 v6.0.4
github.com/bmeg/jsonschemagraph v0.0.4-0.20250828230703-257ca9afd85a
github.com/bmeg/jsonschemagraph v0.0.4-0.20251017205345-236d2de9887c
github.com/boltdb/bolt v1.3.1
github.com/bytedance/sonic v1.14.0
github.com/casbin/casbin/v2 v2.97.0
github.com/cockroachdb/pebble v1.1.2
github.com/cockroachdb/pebble v1.1.5
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/dop251/goja v0.0.0-20240707163329-b1681fb2a2f5
Expand Down Expand Up @@ -63,7 +61,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 // indirect
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e // indirect
github.com/DataDog/zstd v1.5.7 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/casbin/govaluate v1.2.0 // indirect
Expand Down Expand Up @@ -106,7 +104,7 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jessevdk/go-flags v1.6.1 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mo
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 h1:kYRSnvJju5gYVyhkij+RTJ/VR6QIUaCfWeaFm2ycsjQ=
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE=
github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/IBM/sarama v1.45.1 h1:nY30XqYpqyXOXSNoe2XCgjj9jklGM1Ye94ierUb1jQ0=
github.com/IBM/sarama v1.45.1/go.mod h1:qifDhA3VWSrQ1TjSMyxDl3nYL3oX2C83u+G6L79sq4w=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand All @@ -35,14 +35,14 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmeg/benchtop v0.0.0-20250827195345-9810354883b9 h1:sIgPwNZKv3pSuIm/hngszbBrg3pKlZcyJ+HTzeFyjNA=
github.com/bmeg/benchtop v0.0.0-20250827195345-9810354883b9/go.mod h1:Jy39KqCHrPeU9J3SEAdVnZ5dxE6VZm8tX899z5n6ud8=
github.com/bmeg/benchtop v0.0.0-20251027212658-046a256eb6fa h1:8gqN6aRKHYkAQGXr8bdOquCl6gzn42jl31aUtznYJlY=
github.com/bmeg/benchtop v0.0.0-20251027212658-046a256eb6fa/go.mod h1:mKIXKgNg/q55XrsWKAeWBI9aeSV9yep6tdqaZYHkDcw=
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad h1:ICgBexeLB7iv/IQz4rsP+MimOXFZUwWSPojEypuOaQ8=
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad/go.mod h1:ft96Irkp72C7ZrUWRenG7LrF0NKMxXdRvsypo5Njhm4=
github.com/bmeg/jsonschema/v6 v6.0.4 h1:AXFAz7G05VZkKretSSU+uacMKF8+C16ONG6pzFzzA7E=
github.com/bmeg/jsonschema/v6 v6.0.4/go.mod h1:gTh32doM+BEZyi/TDPJEp8k3qXTckXY4ohptV2xExQY=
github.com/bmeg/jsonschemagraph v0.0.4-0.20250828230703-257ca9afd85a h1:O0JcMLcazrwVzf8iC/RogUen4CG5UVErrBU76UkxhYQ=
github.com/bmeg/jsonschemagraph v0.0.4-0.20250828230703-257ca9afd85a/go.mod h1:rlek2WcKAhnynqE7NJi8U+RDbUkRFr8Kqpb2SDmcW94=
github.com/bmeg/jsonschemagraph v0.0.4-0.20251017205345-236d2de9887c h1:J1EhcEmL1D/YHxoMIw4HeeVv+hRMUFezNlAAlFX/a8M=
github.com/bmeg/jsonschemagraph v0.0.4-0.20251017205345-236d2de9887c/go.mod h1:Ve7jAQhYAMkHUiko99+2CwqXI4Ur0ty/ai8Tfa2ONz4=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
Expand Down Expand Up @@ -75,8 +75,8 @@ github.com/cockroachdb/fifo v0.0.0-20240616162244-4768e80dfb9a h1:f52TdbU4D5nozM
github.com/cockroachdb/fifo v0.0.0-20240616162244-4768e80dfb9a/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA=
github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU=
github.com/cockroachdb/pebble v1.1.5 h1:5AAWCBWbat0uE0blr8qzufZP5tBjkRyy/jWe1QWLnvw=
github.com/cockroachdb/pebble v1.1.5/go.mod h1:17wO9el1YEigxkP/YtV8NtCivQDgoCyBg5c4VR/eOWo=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
Expand Down Expand Up @@ -247,8 +247,8 @@ github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeW
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
Expand Down
2 changes: 1 addition & 1 deletion googleapis
Submodule googleapis updated 10310 files
71 changes: 35 additions & 36 deletions grids/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package grids

import (
bFilters "github.com/bmeg/benchtop/filters"
"github.com/bmeg/benchtop/jsontable"
"github.com/bmeg/benchtop/jsontable/table"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bytedance/sonic"
Expand All @@ -21,8 +21,8 @@ func (f *GripQLFilter) IsNoOp() bool {
return f.Expression == nil
}

func (f *GripQLFilter) Matches(row any) bool {
return MatchesHasExpression(row, f.Expression)
func (f *GripQLFilter) Matches(row []byte, tableName string) bool {
return MatchesHasExpression(row, f.Expression, tableName)
}

func (f *GripQLFilter) RequiredFields() []string {
Expand Down Expand Up @@ -61,26 +61,19 @@ func extractKeys(expr *gripql.HasExpression) []string {
}
return out
}
func MatchesHasExpression(val any, stmt *gripql.HasExpression) bool {

func MatchesHasExpression(row []byte, stmt *gripql.HasExpression, tableName string) bool {
switch stmt.Expression.(type) {
case *gripql.HasExpression_Condition:
cond := stmt.GetCondition()
var lookupVal any

// Handle lookup based on input type
switch v := val.(type) {
case map[string]any:
lookupVal = jsontable.PathLookup(v, cond.Key)
case []byte:
pathArr, err := jsontable.ConvertJSONPathToArray(cond.Key)
if err != nil {
log.Errorf("Error converting JSON path: %v", err)
return false
}
node, err := sonic.Get(v, pathArr...)
if cond.Key == "_label" {
lookupVal = tableName[2:]
} else if cond.Key == "_id" {
node, err := sonic.Get(row, []any{"1"}...)
if err != nil {
if err != ast.ErrNotExist {
log.Errorf("Sonic Fetch err for path: %s on doc %#v: %v", pathArr, string(v), err)
log.Errorf("Sonic Fetch err for path 1 on doc %#v: %v", string(row), err)
}
return false
}
Expand All @@ -89,9 +82,26 @@ func MatchesHasExpression(val any, stmt *gripql.HasExpression) bool {
log.Errorf("Error unmarshaling node: %v", err)
return false
}
default:
log.Errorf("Unsupported input type: %T", val)
return false
} else {
pathArr, err := table.ConvertJSONPathToArray(cond.Key)
if err != nil {
log.Errorf("Error converting JSON path: %v", err)
return false
}
node, err := sonic.Get(row, pathArr...)
if err != nil {
if err != ast.ErrNotExist {
log.Errorf("Sonic Fetch err for path: %s on doc %#v: %v", pathArr, string(row), err)
return false
}
lookupVal = nil
} else {
lookupVal, err = node.Interface()
if err != nil {
log.Errorf("Error unmarshaling node: %v", err)
return false
}
}
}

return bFilters.ApplyFilterCondition(
Expand All @@ -104,34 +114,23 @@ func MatchesHasExpression(val any, stmt *gripql.HasExpression) bool {
)

case *gripql.HasExpression_And:
and := stmt.GetAnd()
andRes := []bool{}
for _, e := range and.Expressions {
andRes = append(andRes, MatchesHasExpression(val, e))
}
for _, r := range andRes {
if !r {
for _, e := range stmt.GetAnd().Expressions {
if !MatchesHasExpression(row, e, tableName) {
return false
}
}
return true

case *gripql.HasExpression_Or:
or := stmt.GetOr()
orRes := []bool{}
for _, e := range or.Expressions {
orRes = append(orRes, MatchesHasExpression(val, e))
}
for _, r := range orRes {
if r {
for _, e := range stmt.GetOr().Expressions {
if MatchesHasExpression(row, e, tableName) {
return true
}
}
return false

case *gripql.HasExpression_Not:
e := stmt.GetNot()
return !MatchesHasExpression(val, e)
return !MatchesHasExpression(row, stmt.GetNot(), tableName)

default:
log.Errorf("unknown where expression type: %T", stmt.Expression)
Expand Down
Loading
Loading