diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 4f10eefc..70af8f46 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -147,6 +147,15 @@ func TestConcurrentAppendAndQuery(t *testing.T) { readTest(t, sealed, numReaders, numQueries, docs, fromTime, toTime, mapping) } +const ( + scheduler = "scheduler" + database = "database" + bus = "bus" + proxy = "proxy" + gateway = "gateway" + kafka = "kafka" +) + func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) { readersGroup, ctx := errgroup.WithContext(t.Context()) @@ -173,7 +182,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs case 1: query = "service:gateway" filter = func(doc *testDoc) bool { - return doc.service == "gateway" + return doc.service == gateway } case 2: query = "level:2" @@ -198,7 +207,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs case 6: query = "service:gateway AND level:3" filter = func(doc *testDoc) bool { - return doc.service == "gateway" && doc.level == 3 + return doc.service == gateway && doc.level == 3 } } @@ -265,7 +274,7 @@ type testDoc = struct { } func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time.Time, time.Time) { - services := []string{"gateway", "proxy", "scheduler", "database", "bus", "kafka"} + services := []string{gateway, proxy, scheduler, database, bus, kafka} messages := []string{ "request started", "request completed", "processing timed out", "processing data", "processing failed", "processing retry", diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8252867a..cc42bca0 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -740,14 +740,14 @@ func (s *FractionTestSuite) TestBasicAggregation() { "message:*", withAggQuery(processor.AggQuery{GroupBy: aggField("service")})), []map[string]uint64{ - {"gateway": 3, "proxy": 2, "scheduler": 1}, + {gateway: 3, proxy: 2, scheduler: 1}, }) assertAggSearch( s.query( "message:good", withAggQuery(processor.AggQuery{GroupBy: aggField("service")})), []map[string]uint64{ - {"gateway": 2, "proxy": 1}, + {gateway: 2, proxy: 1}, }) assertAggSearch( s.query( @@ -762,7 +762,7 @@ func (s *FractionTestSuite) TestBasicAggregation() { withAggQuery(processor.AggQuery{GroupBy: aggField("service")}), withAggQuery(processor.AggQuery{GroupBy: aggField("level")})), []map[string]uint64{ - {"gateway": 3, "proxy": 2, "scheduler": 1}, + {gateway: 3, proxy: 2, scheduler: 1}, {"1": 4, "2": 1, "3": 1}, }) } @@ -1222,17 +1222,31 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: toTime, }, + { + name: "NOT service:bus", + query: "NOT service:bus", + filter: func(doc *testDoc) bool { return doc.service != bus }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "NOT service:bus (time range)", + query: "NOT service:bus", + filter: func(doc *testDoc) bool { return doc.service != bus }, + fromTime: fromTime, + toTime: midTime, + }, { name: "service:proxy (time range)", query: "service:proxy", - filter: func(doc *testDoc) bool { return doc.service == "proxy" }, + filter: func(doc *testDoc) bool { return doc.service == proxy }, fromTime: fromTime, toTime: midTime, }, { name: "service:scheduler (time range + limit)", query: "service:scheduler", - filter: func(doc *testDoc) bool { return doc.service == "scheduler" }, + filter: func(doc *testDoc) bool { return doc.service == scheduler }, fromTime: fromTime, toTime: midTime, limit: 100, @@ -1272,6 +1286,13 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: midTime, }, + { + name: "NOT trace_id:trace-4999", + query: "NOT trace_id:trace-4999", + filter: func(doc *testDoc) bool { return doc.traceId != "trace-4999" }, + fromTime: fromTime, + toTime: toTime, + }, { name: "trace_id:trace-4999", query: "trace_id:trace-4999", @@ -1286,6 +1307,97 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: midTime, }, + // AND operator queries + { + name: "message:request AND message:failed", + query: "message:request AND message:failed", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.message, "request") && strings.Contains(doc.message, "failed") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND message:processing AND message:retry AND level:5", + query: "service:gateway AND message:processing AND message:retry AND level:5", + filter: func(doc *testDoc) bool { + return doc.service == gateway && strings.Contains(doc.message, "processing") && + strings.Contains(doc.message, "retry") && doc.level == 5 + }, + fromTime: fromTime, + toTime: toTime, + }, + // OR operator queries + { + name: "trace_id OR", + query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000", + filter: func(doc *testDoc) bool { + return doc.traceId == "trace-1000" || + doc.traceId == "trace-1500" || + doc.traceId == "trace-2000" || + doc.traceId == "trace-2500" || + doc.traceId == "trace-3000" + }, + fromTime: fromTime, + toTime: toTime, + }, + + // mixed AND/OR/NOT + { + name: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*", + query: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.message, "request") && (doc.level == 1 || doc.level == 3 || doc.level == 5) && + strings.Contains(doc.traceId, "trace-2") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "complex AND+OR", + query: "(service:gateway OR service:proxy OR service:scheduler) AND " + + "(message:request OR message:failed) AND level:[1 to 3]", + filter: func(doc *testDoc) bool { + return (doc.service == gateway || doc.service == proxy || doc.service == "scheduler") && + (strings.Contains(doc.message, "request") || strings.Contains(doc.message, "failed")) && + (doc.level >= 1 && doc.level <= 3) + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND NOT (message:request OR message:timed OR level:[0 to 3])", + query: "service:gateway AND NOT (message:request OR message:timed OR level:[0 to 3])", + filter: func(doc *testDoc) bool { + return doc.service == gateway && + !(strings.Contains(doc.message, "request") || + strings.Contains(doc.message, "timed") || + (doc.level >= 0 && doc.level <= 3)) + }, + fromTime: fromTime, + toTime: midTime, + }, + { + name: "service:proxy AND NOT level:5 AND NOT pod:pod-2* AND NOT client_ip:ip_range(192.168.19.0,192.168.19.255)", + query: "service:proxy AND NOT level:5 AND NOT pod:pod-2* AND NOT client_ip:ip_range(192.168.19.0,192.168.19.255)", + filter: func(doc *testDoc) bool { + return doc.service == proxy && + doc.level != 5 && + !strings.Contains(doc.pod, "pod-2") && + !strings.Contains(doc.clientIp, "192.168.19") + }, + fromTime: fromTime, + toTime: midTime, + }, + + // other queries + { + name: "trace_id:trace-4*", + query: "trace_id:trace-4*", + filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") }, + fromTime: fromTime, + toTime: toTime, + }, } for _, tc := range searchTestCases { @@ -1320,37 +1432,85 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { } s.Run("service:kafka | group by pod unique_count(client_ip)", func() { - ips := make(map[string]map[string]struct{}) - for _, doc := range testDocs { - if doc.service != "kafka" { - continue + // Check both sort orders simply for aggTree to be iterated in a different order + orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc} + + for _, ord := range orders { + ips := make(map[string]map[string]struct{}) + for _, doc := range testDocs { + if doc.service != kafka { + continue + } + if ips[doc.pod] == nil { + ips[doc.pod] = make(map[string]struct{}) + } + + ips[doc.pod][doc.clientIp] = struct{}{} } - if ips[doc.pod] == nil { - ips[doc.pod] = make(map[string]struct{}) + + var expectedBuckets []seq.AggregationBucket + for pod, podIps := range ips { + expectedBuckets = append(expectedBuckets, seq.AggregationBucket{ + Name: pod, + Value: float64(len(podIps)), + NotExists: 0, + }) } - ips[doc.pod][doc.clientIp] = struct{}{} + searchParams := s.query( + "service:kafka", + withTo(toTime.Format(time.RFC3339Nano)), + withAggQuery(processor.AggQuery{ + Field: aggField("client_ip"), + GroupBy: aggField("pod"), + Func: seq.AggFuncUniqueCount, + })) + searchParams.Order = ord + + s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets) } + }) - var expectedBuckets []seq.AggregationBucket - for pod, podIps := range ips { - expectedBuckets = append(expectedBuckets, seq.AggregationBucket{ - Name: pod, - Value: float64(len(podIps)), - NotExists: 0, - }) - } + s.Run("service:scheduler | group by pod avg(level)", func() { + // Check both sort orders simply for aggTree to be iterated in a different order + orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc} - searchParams := s.query( - "service:kafka", - withTo(toTime.Format(time.RFC3339Nano)), - withAggQuery(processor.AggQuery{ - Field: aggField("client_ip"), - GroupBy: aggField("pod"), - Func: seq.AggFuncUniqueCount, - })) + for _, ord := range orders { + levelsByPod := make(map[string][]int) + for _, doc := range testDocs { + if doc.service != "scheduler" { + continue + } + + levelsByPod[doc.pod] = append(levelsByPod[doc.pod], doc.level) + } - s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets) + var expectedBuckets []seq.AggregationBucket + for pod, levels := range levelsByPod { + sum := 0 + for _, level := range levels { + sum += level + } + avg := float64(sum) / float64(len(levels)) + expectedBuckets = append(expectedBuckets, seq.AggregationBucket{ + Name: pod, + Value: avg, + NotExists: 0, + }) + } + + searchParams := s.query( + "service:scheduler", + withTo(toTime.Format(time.RFC3339Nano)), + withAggQuery(processor.AggQuery{ + Field: aggField("level"), + GroupBy: aggField("pod"), + Func: seq.AggFuncAvg, + })) + searchParams.Order = ord + + s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncAvg}, expectedBuckets) + } }) // Test large QPR with 25000 groups (all ids are unique) @@ -1418,18 +1578,27 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { }) s.Run("service:database AND level:3 | hist 1s", func() { - histBuckets := make(map[string]uint64) - for _, doc := range testDocs { - if doc.service == "database" && doc.level == 3 { - bucketTime := doc.timestamp.Truncate(time.Second) - bucketKey := bucketTime.Format(time.RFC3339Nano) - histBuckets[bucketKey]++ + // Check both sort orders simply for lid tree to be iterated in a different order + orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc} + + for _, ord := range orders { + histBuckets := make(map[string]uint64) + for _, doc := range testDocs { + if doc.service == "database" && doc.level == 3 { + bucketTime := doc.timestamp.Truncate(time.Second) + bucketKey := bucketTime.Format(time.RFC3339Nano) + histBuckets[bucketKey]++ + } } - } - s.AssertHist( - s.query("service:database AND level:3", withTo(toTime.Format(time.RFC3339Nano)), withHist(1000)), - histBuckets) + searchParams := s.query( + "service:database AND level:3", + withTo(toTime.Format(time.RFC3339Nano)), + withHist(1000)) + searchParams.Order = ord + + s.AssertHist(searchParams, histBuckets) + } }) s.Run("scroll with offset id", func() { diff --git a/frac/processor/aggregator.go b/frac/processor/aggregator.go index c9a35e72..dc3f99f3 100644 --- a/frac/processor/aggregator.go +++ b/frac/processor/aggregator.go @@ -70,7 +70,7 @@ func NewGroupAndFieldAggregator( } // Next iterates over groupBy and field iterators (actually trees) to count occurrence. -func (n *TwoSourceAggregator) Next(lid uint32) error { +func (n *TwoSourceAggregator) Next(lid node.LID) error { groupBySource, hasGroupBy, err := n.groupBy.ConsumeTokenSource(lid) if err != nil { return err @@ -100,7 +100,7 @@ func (n *TwoSourceAggregator) Next(lid uint32) error { // Both group and field exist, increment the count for the combined sources. source := AggBin[twoSources]{ - MID: n.extractMID(seq.LID(lid)), + MID: n.extractMID(seq.LID(lid.Unpack())), Source: twoSources{ GroupBySource: groupBySource, FieldSource: fieldSource, @@ -204,14 +204,14 @@ func NewSingleSourceCountAggregator( } // Next iterates over groupBy tree to count occurrence. -func (n *SingleSourceCountAggregator) Next(lid uint32) error { +func (n *SingleSourceCountAggregator) Next(lid node.LID) error { source, has, err := n.group.ConsumeTokenSource(lid) if err != nil { return err } if has { - mid := n.extractMID(seq.LID(lid)) + mid := n.extractMID(seq.LID(lid.Unpack())) n.countBySource[AggBin[uint32]{ MID: mid, @@ -273,7 +273,7 @@ func NewSingleSourceUniqueAggregator(iterator *SourcedNodeIterator) *SingleSourc } // Next iterates over groupBy tree to count occurrence. -func (n *SingleSourceUniqueAggregator) Next(lid uint32) error { +func (n *SingleSourceUniqueAggregator) Next(lid node.LID) error { source, has, err := n.group.ConsumeTokenSource(lid) if err != nil { return err @@ -325,13 +325,13 @@ func NewSingleSourceHistogramAggregator( } } -func (n *SingleSourceHistogramAggregator) Next(lid uint32) error { +func (n *SingleSourceHistogramAggregator) Next(lid node.LID) error { source, has, err := n.field.ConsumeTokenSource(lid) if err != nil { return err } - mid := n.extractMID(seq.LID(lid)) + mid := n.extractMID(seq.LID(lid.Unpack())) if _, ok := n.histogram[mid]; !ok { n.histogram[mid] = seq.NewSamplesContainers() } @@ -381,15 +381,12 @@ type SourcedNodeIterator struct { uniqSourcesLimit iteratorLimit countBySource map[uint32]int - lastID uint32 + lastID node.LID lastSource uint32 - has bool - - less node.LessFn } -func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit iteratorLimit, reverse bool) *SourcedNodeIterator { - lastID, lastSource, has := sourced.NextSourced() +func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit iteratorLimit) *SourcedNodeIterator { + lastID, lastSource := sourced.NextSourced() return &SourcedNodeIterator{ sourcedNode: sourced, ti: ti, @@ -399,17 +396,15 @@ func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, countBySource: make(map[uint32]int), lastID: lastID, lastSource: lastSource, - has: has, - less: node.GetLessFn(reverse), } } -func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, error) { - for s.has && s.less(s.lastID, lid) { - s.lastID, s.lastSource, s.has = s.sourcedNode.NextSourced() +func (s *SourcedNodeIterator) ConsumeTokenSource(lid node.LID) (uint32, bool, error) { + for s.lastID.Less(lid) { + s.lastID, s.lastSource = s.sourcedNode.NextSourced() } - exists := s.has && s.lastID == lid + exists := !s.lastID.IsNull() && s.lastID == lid if !exists { return 0, false, nil } @@ -421,7 +416,7 @@ func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, erro s.countBySource[s.lastSource]++ if len(s.countBySource) > s.uniqSourcesLimit.limit { - return lid, true, fmt.Errorf("%w: iterator limit is exceeded", s.uniqSourcesLimit.err) + return lid.Unpack(), true, fmt.Errorf("%w: iterator limit is exceeded", s.uniqSourcesLimit.err) } return s.lastSource, true, nil diff --git a/frac/processor/aggregator_test.go b/frac/processor/aggregator_test.go index 2c62011d..b0a173d3 100644 --- a/frac/processor/aggregator_test.go +++ b/frac/processor/aggregator_test.go @@ -28,11 +28,11 @@ func TestSingleSourceCountAggregator(t *testing.T) { {1, 2, 4, 5, 8, 11, 12}, } - source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false) - iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + source := node.BuildORTreeAgg(node.MakeStaticNodes(sources)) + iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0)) for _, id := range searchDocs { - if err := agg.Next(id); err != nil { + if err := agg.Next(node.NewDescLID(id)); err != nil { t.Fatal(err) } } @@ -56,15 +56,15 @@ func TestSingleSourceCountAggregatorWithInterval(t *testing.T) { {1, 2, 4, 5, 8, 11, 12}, } - source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false) - iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + source := node.BuildORTreeAgg(node.MakeStaticNodes(sources)) + iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) agg := NewSingleSourceCountAggregator(iter, func(l seq.LID) seq.MID { return seq.MID(l) % 3 }) for _, id := range searchDocs { - if err := agg.Next(id); err != nil { + if err := agg.Next(node.NewDescLID(id)); err != nil { t.Fatal(err) } } @@ -99,13 +99,13 @@ func BenchmarkAggDeep(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) v, _ := Generate(r, s) src := node.NewSourcedNodeWrapper(node.NewStatic(v, false), 0) - iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0)) vals, _ := Generate(r, s) for b.Loop() { for _, v := range vals { - if err := n.Next(v); err != nil { + if err := n.Next(node.NewDescLID(v)); err != nil { b.Fatal(err) } } @@ -131,15 +131,15 @@ func BenchmarkAggWide(b *testing.B) { slices.Sort(wide[i]) } - source := node.BuildORTreeAgg(node.MakeStaticNodes(wide), false) + source := node.BuildORTreeAgg(node.MakeStaticNodes(wide)) - iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0)) vals, _ := Generate(r, s) for b.Loop() { for _, v := range vals { - if err := n.Next(v); err != nil { + if err := n.Next(node.NewDescLID(v)); err != nil { b.Fatal(err) } } @@ -157,7 +157,7 @@ func (m *MockTokenIndex) GetValByTID(tid uint32) []byte { } type IDSourcePair struct { - LID uint32 + LID node.LID Source uint32 } @@ -170,13 +170,13 @@ func (m *MockNode) String() string { return reflect.TypeOf(m).String() } -func (m *MockNode) NextSourced() (uint32, uint32, bool) { +func (m *MockNode) NextSourced() (node.LID, uint32) { if len(m.Pairs) == 0 { - return 0, 0, false + return node.NullLID(), 0 } first := m.Pairs[0] m.Pairs = m.Pairs[1:] - return first.LID, first.Source, true + return first.LID, first.Source } func TestTwoSourceAggregator(t *testing.T) { @@ -186,29 +186,29 @@ func TestTwoSourceAggregator(t *testing.T) { dp := &MockTokenIndex{} field := &MockNode{ Pairs: []IDSourcePair{ - {LID: 1, Source: 0}, - {LID: 2, Source: 1}, + {LID: node.NewDescLID(1), Source: 0}, + {LID: node.NewDescLID(2), Source: 1}, }, } groupBy := &MockNode{ Pairs: []IDSourcePair{ - {LID: 1, Source: 0}, - {LID: 2, Source: 1}, + {LID: node.NewDescLID(1), Source: 0}, + {LID: node.NewDescLID(2), Source: 1}, }, } fieldTIDs := []uint32{42, 73} groupByTIDs := []uint32{1, 2} - groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) - fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) + fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) limits := AggLimits{} aggregator := NewGroupAndFieldAggregator( fieldIterator, groupIterator, provideExtractTimeFunc(nil, nil, 0), true, false, limits, ) // Call Next for two data points. - r.NoError(aggregator.Next(1)) - r.NoError(aggregator.Next(2)) + r.NoError(aggregator.Next(node.NewDescLID(1))) + r.NoError(aggregator.Next(node.NewDescLID(2))) // Verify countBySource map. expectedCountBySource := map[twoSources]int64{ @@ -248,14 +248,14 @@ func TestSingleTreeCountAggregator(t *testing.T) { dp := &MockTokenIndex{} field := &MockNode{ Pairs: []IDSourcePair{ - {LID: 1, Source: 0}, + {LID: node.NewDescLID(1), Source: 0}, }, } - iter := NewSourcedNodeIterator(field, dp, []uint32{0}, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false) + iter := NewSourcedNodeIterator(field, dp, []uint32{0}, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}) aggregator := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0)) - r.NoError(aggregator.Next(1)) + r.NoError(aggregator.Next(node.NewDescLID(1))) result, err := aggregator.Aggregate() if err != nil { @@ -289,15 +289,15 @@ func TestAggregatorLimitExceeded(t *testing.T) { const limit = 1 for _, expectedErr := range []error{consts.ErrTooManyGroupTokens, consts.ErrTooManyFieldTokens} { - source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false) - iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: limit, err: expectedErr}, false) + source := node.BuildORTreeAgg(node.MakeStaticNodes(sources)) + iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: limit, err: expectedErr}) agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0)) var limitErr error var limitIteration int for i, id := range searchDocs { - if err := agg.Next(id); err != nil { + if err := agg.Next(node.NewDescLID(id)); err != nil { limitErr = err limitIteration = i break diff --git a/frac/processor/eval_test.go b/frac/processor/eval_test.go index 249f63c1..eaa433f3 100644 --- a/frac/processor/eval_test.go +++ b/frac/processor/eval_test.go @@ -50,10 +50,8 @@ func (p *staticProvider) newStatic(literal *parser.Literal) (node.Node, error) { } func readAllInto(n node.Node, ids []uint32) []uint32 { - id, has := n.Next() - for has { - ids = append(ids, id) - id, has = n.Next() + for cmp := n.Next(); !cmp.IsNull(); cmp = n.Next() { + ids = append(ids, cmp.Unpack()) } return ids } diff --git a/frac/processor/eval_tree.go b/frac/processor/eval_tree.go index 7ba058e9..0f260053 100644 --- a/frac/processor/eval_tree.go +++ b/frac/processor/eval_tree.go @@ -36,16 +36,25 @@ func buildEvalTree(root *parser.ASTNode, minVal, maxVal uint32, stats *searchSta switch token.Operator { case parser.LogicalAnd: stats.NodesTotal++ - return node.NewAnd(children[0], children[1], reverse), nil + return node.NewAnd(children[0], children[1]), nil case parser.LogicalOr: stats.NodesTotal++ - return node.NewOr(children[0], children[1], reverse), nil + return node.NewOr(children[0], children[1]), nil case parser.LogicalNAnd: stats.NodesTotal++ - return node.NewNAnd(children[0], children[1], reverse), nil + return node.NewNAnd(children[0], children[1]), nil case parser.LogicalNot: stats.NodesTotal++ - return node.NewNot(children[0], minVal, maxVal, reverse), nil + var minLID node.LID + var maxLID node.LID + if reverse { + minLID = node.NewAscLID(maxVal) + maxLID = node.NewAscLID(minVal) + } else { + minLID = node.NewDescLID(minVal) + maxLID = node.NewDescLID(maxVal) + } + return node.NewNot(children[0], minLID, maxLID), nil } } return nil, fmt.Errorf("unknown token type") @@ -76,12 +85,12 @@ func evalLeaf( stats.NodesTotal += len(lidsTids)*2 - 1 } - return node.BuildORTree(lidsTids, order.IsReverse()), nil + return node.BuildORTree(lidsTids), nil } type Aggregator interface { // Next iterates to count the next lid. - Next(lid uint32) error + Next(lid node.LID) error // Aggregate processes and returns the final aggregation result. Aggregate() (seq.AggregatableSamples, error) } @@ -208,6 +217,6 @@ func iteratorFromLiteral( stats.AggNodesTotal += len(lidsTids)*2 - 1 } - sourcedNode := node.BuildORTreeAgg(lidsTids, order.IsReverse()) - return NewSourcedNodeIterator(sourcedNode, ti, tids, iteratorLimit, order.IsReverse()), nil + sourcedNode := node.BuildORTreeAgg(lidsTids) + return NewSourcedNodeIterator(sourcedNode, ti, tids, iteratorLimit), nil } diff --git a/frac/processor/search.go b/frac/processor/search.go index e7d8885b..21fb4e8a 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -197,16 +197,17 @@ func iterateEvalTree( } timerEval.Start() - lid, has := evalTree.Next() + lid := evalTree.Next() timerEval.Stop() - if !has { + if lid.IsNull() { break } + rawLid := lid.Unpack() if needMore || hasHist { timerMID.Start() - mid := idsIndex.GetMID(seq.LID(lid)) + mid := idsIndex.GetMID(seq.LID(rawLid)) timerMID.Stop() if hasHist { @@ -223,7 +224,7 @@ func iterateEvalTree( if needMore { timerRID.Start() - rid := idsIndex.GetRID(seq.LID(lid)) + rid := idsIndex.GetRID(seq.LID(rawLid)) timerRID.Stop() id := seq.ID{MID: mid, RID: rid} diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index 11bb48d1..783a3d1f 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/node" ) type IteratorAsc Cursor @@ -55,10 +56,10 @@ func (it *IteratorAsc) loadNextLIDsBlock() { it.blockIndex-- } -func (it *IteratorAsc) Next() (uint32, bool) { +func (it *IteratorAsc) Next() node.LID { for len(it.lids) == 0 { if !it.tryNextBlock { - return 0, false + return node.NullLID() } it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block @@ -69,5 +70,5 @@ func (it *IteratorAsc) Next() (uint32, bool) { i := len(it.lids) - 1 lid := it.lids[i] it.lids = it.lids[:i] - return lid, true + return node.NewAscLID(lid) } diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index f3fa741b..0485d41c 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/node" ) type IteratorDesc Cursor @@ -55,10 +56,10 @@ func (it *IteratorDesc) loadNextLIDsBlock() { it.blockIndex++ } -func (it *IteratorDesc) Next() (uint32, bool) { +func (it *IteratorDesc) Next() node.LID { for len(it.lids) == 0 { if !it.tryNextBlock { - return 0, false + return node.NullLID() } it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block @@ -68,5 +69,5 @@ func (it *IteratorDesc) Next() (uint32, bool) { lid := it.lids[0] it.lids = it.lids[1:] - return lid, true + return node.NewDescLID(lid) } diff --git a/node/bench_test.go b/node/bench_test.go index 81872217..cf517686 100644 --- a/node/bench_test.go +++ b/node/bench_test.go @@ -33,7 +33,7 @@ func BenchmarkNot(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) v, last := Generate(r, s) res := make([]uint32, 0, last+1) - n := NewNot(NewStatic(v, false), 1, last, false) + n := NewNot(NewStatic(v, false), NewDescLID(1), NewDescLID(last)) for b.Loop() { res = readAllInto(n, res) @@ -50,7 +50,7 @@ func BenchmarkNotEmpty(b *testing.B) { for _, s := range sizes { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { res := make([]uint32, 0, s*2) - n := NewNot(NewStatic(nil, false), 1, uint32(s), false) + n := NewNot(NewStatic(nil, false), NewDescLID(1), NewDescLID(uint32(s))) for b.Loop() { res = readAllInto(n, res) @@ -69,7 +69,7 @@ func BenchmarkOr(b *testing.B) { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) res := make([]uint32, 0, s*2) - n := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) + n := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) for b.Loop() { res = readAllInto(n, res) @@ -87,7 +87,7 @@ func BenchmarkAnd(b *testing.B) { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) res := make([]uint32, 0, s) - n := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) + n := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) for b.Loop() { res = readAllInto(n, res) @@ -105,7 +105,7 @@ func BenchmarkNAnd(b *testing.B) { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) res := make([]uint32, 0, s) - n := NewNAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) + n := NewNAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) for b.Loop() { res = readAllInto(n, res) @@ -122,13 +122,13 @@ func BenchmarkAndTree(b *testing.B) { for _, s := range sizes { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) - n1 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n2 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n3 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n4 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n12 := NewAnd(n1, n2, false) - n34 := NewAnd(n3, n4, false) - n := NewAnd(n12, n34, false) + n1 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n2 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n3 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n4 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n12 := NewAnd(n1, n2) + n34 := NewAnd(n3, n4) + n := NewAnd(n12, n34) res := make([]uint32, 0, s) for b.Loop() { @@ -146,13 +146,13 @@ func BenchmarkOrTree(b *testing.B) { for _, s := range sizes { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) - n1 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n2 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n3 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n4 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n12 := NewOr(n1, n2, false) - n34 := NewOr(n3, n4, false) - n := NewOr(n12, n34, false) + n1 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n2 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n3 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n4 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n12 := NewOr(n1, n2) + n34 := NewOr(n3, n4) + n := NewOr(n12, n34) res := make([]uint32, 0, s*8) for b.Loop() { @@ -172,11 +172,11 @@ func BenchmarkComplex(b *testing.B) { b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { r := rand.New(rand.NewSource(benchRandSeed)) res := make([]uint32, 0, s*2) - n1 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n2 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n3 := NewNAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s), false) - n12 := NewOr(n1, n2, false) - n := NewAnd(n12, n3, false) + n1 := NewAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n2 := NewOr(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n3 := NewNAnd(newNodeStaticSize(r, s), newNodeStaticSize(r, s)) + n12 := NewOr(n1, n2) + n := NewAnd(n12, n3) for b.Loop() { res = readAllInto(n, res) diff --git a/node/builder.go b/node/builder.go index d0cd8906..80ab62d3 100644 --- a/node/builder.go +++ b/node/builder.go @@ -5,17 +5,17 @@ var ( emptyNodeSourced = NewSourcedNodeWrapper(emptyNode, 0) ) -func BuildORTree(nodes []Node, reverse bool) Node { +func BuildORTree(nodes []Node) Node { return TreeFold( - func(l, r Node) Node { return NewOr(l, r, reverse) }, + func(l, r Node) Node { return NewOr(l, r) }, emptyNode, nodes, ) } -func BuildORTreeAgg(nodes []Node, reverse bool) Sourced { +func BuildORTreeAgg(nodes []Node) Sourced { return TreeFold( - func(l, r Sourced) Sourced { return NewNodeOrAgg(l, r, reverse) }, + NewNodeOrAgg, emptyNodeSourced, WrapWithSource(nodes), ) diff --git a/node/less_fn.go b/node/less_fn.go deleted file mode 100644 index afe2f981..00000000 --- a/node/less_fn.go +++ /dev/null @@ -1,13 +0,0 @@ -package node - -type LessFn func(uint32, uint32) bool - -var lessAsc LessFn = func(u1, u2 uint32) bool { return u1 < u2 } -var lessDesc LessFn = func(u1, u2 uint32) bool { return u2 < u1 } - -func GetLessFn(reverse bool) LessFn { - if reverse { - return lessDesc - } - return lessAsc -} diff --git a/node/lid.go b/node/lid.go new file mode 100644 index 00000000..620d4c1d --- /dev/null +++ b/node/lid.go @@ -0,0 +1,74 @@ +package node + +import ( + "fmt" + "math" +) + +const ( + descMask = uint32(0) + ascMask = uint32(0xFFFFFFFF) +) + +// LID is an encoded representation of LID and reverse flag made specifically for fast compare operations. +// +// For reverse order LID is inverted as follows: "MaxUint32 - LID" formula using XOR mask. Terminal LID value is 0 instead +// of MaxUint32 in reverse order, but 0 is XORed to MaxUint32. Which means, null value will always have lid field set to +// 0xFFFFFFFF (math.MaxUint32) regardless of reverse (order) flag. +type LID struct { + lid uint32 // do not read this field, use Unpack instead + mask uint32 +} + +func NullLID() LID { + // order does not matter, as null values are never unpacked + return NewDescLID(math.MaxUint32) +} + +// NewDescLID returns LIDs for desc sort order +func NewDescLID(lid uint32) LID { + return LID{ + lid: lid, + mask: descMask, + } +} + +// NewAscLID returns LIDs for asc sort order +func NewAscLID(lid uint32) LID { + return LID{ + lid: lid ^ ascMask, + mask: ascMask, + } +} + +// Less compares two values. It also does an implicit null check, since we store math.MaxUint32 for null values. +// Which means if we call x.Less(y), then we know for sure that x is not null. Therefore, this Less call can work +// as both "null check + less" combo. +func (c LID) Less(other LID) bool { + return c.lid < other.lid +} + +func (c LID) LessOrEq(other LID) bool { + return c.lid <= other.lid +} + +func (c LID) Inc() LID { + c.lid++ + return c +} + +func (c LID) Eq(other LID) bool { + return c.lid == other.lid +} + +func (c LID) Unpack() uint32 { + return c.lid ^ c.mask +} + +func (c LID) IsNull() bool { + return c.lid == math.MaxUint32 +} + +func (c LID) String() string { + return fmt.Sprintf("%d, reverse=%t", c.Unpack(), c.mask == ascMask) +} diff --git a/node/lid_test.go b/node/lid_test.go new file mode 100644 index 00000000..44ad8004 --- /dev/null +++ b/node/lid_test.go @@ -0,0 +1,63 @@ +package node + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLID_Unpack_Desc(t *testing.T) { + x := NewDescLID(5) + assert.Equal(t, uint32(5), x.Unpack()) + + x = NewDescLID(math.MaxUint32) + assert.Equal(t, uint32(math.MaxUint32), x.Unpack()) + + x = NewDescLID(0) + assert.Equal(t, uint32(0), x.Unpack()) + + x = NullLID() + + assert.True(t, x.IsNull()) +} + +func TestLID_Unpack_Asc(t *testing.T) { + x := NewAscLID(5) + assert.Equal(t, uint32(5), x.Unpack()) + + x = NewAscLID(0) + assert.Equal(t, uint32(0), x.Unpack()) + + x = NewAscLID(math.MaxUint32) + assert.Equal(t, uint32(math.MaxUint32), x.Unpack()) + + x = NullLID() + + assert.True(t, x.IsNull()) +} + +func TestLID_Eq(t *testing.T) { + assert.Equal(t, NewDescLID(6), NewDescLID(6)) + assert.Equal(t, NewDescLID(math.MaxUint32), NewDescLID(math.MaxUint32)) + + assert.Equal(t, NewAscLID(6), NewAscLID(6)) + assert.Equal(t, NewAscLID(0), NewAscLID(0)) +} + +func TestLID_Less_Desc(t *testing.T) { + assert.False(t, NewDescLID(6).Less(NewDescLID(6))) + assert.True(t, NewDescLID(6).Less(NewDescLID(7))) + assert.True(t, NewDescLID(0).Less(NewDescLID(5))) + + assert.True(t, NewDescLID(56000).Less(NullLID())) +} + +func TestLID_Less_Asc(t *testing.T) { + // for asc sort order larger values go first (order is reversed), i.e. greater values are "less" than lower values + assert.False(t, NewAscLID(6).Less(NewAscLID(6))) + assert.True(t, NewAscLID(10).Less(NewAscLID(1))) + assert.True(t, NewAscLID(5).Less(NewAscLID(0))) + + assert.True(t, NewAscLID(56000).Less(NullLID())) +} diff --git a/node/node.go b/node/node.go index e6525b45..6f87e9c3 100644 --- a/node/node.go +++ b/node/node.go @@ -6,11 +6,11 @@ import ( type Node interface { fmt.Stringer // for testing - Next() (id uint32, has bool) + Next() LID } type Sourced interface { fmt.Stringer // for testing // aggregation need source - NextSourced() (id uint32, source uint32, has bool) + NextSourced() (id LID, source uint32) } diff --git a/node/node_and.go b/node/node_and.go index 58bc9391..51918f46 100644 --- a/node/node_and.go +++ b/node/node_and.go @@ -1,57 +1,50 @@ package node -import "fmt" +import ( + "fmt" +) type nodeAnd struct { - less LessFn - left Node right Node - leftID uint32 - hasLeft bool - rightID uint32 - hasRight bool + leftID LID + rightID LID } func (n *nodeAnd) String() string { return fmt.Sprintf("(%s AND %s)", n.left.String(), n.right.String()) } -func NewAnd(left, right Node, reverse bool) *nodeAnd { - node := &nodeAnd{ - less: GetLessFn(reverse), - - left: left, - right: right, - } +func NewAnd(left, right Node) *nodeAnd { + node := &nodeAnd{left: left, right: right} node.readLeft() node.readRight() return node } func (n *nodeAnd) readLeft() { - n.leftID, n.hasLeft = n.left.Next() + n.leftID = n.left.Next() } func (n *nodeAnd) readRight() { - n.rightID, n.hasRight = n.right.Next() + n.rightID = n.right.Next() } -func (n *nodeAnd) Next() (uint32, bool) { - for n.hasLeft && n.hasRight && n.leftID != n.rightID { - for n.hasLeft && n.hasRight && n.less(n.leftID, n.rightID) { +func (n *nodeAnd) Next() LID { + for !n.leftID.IsNull() && !n.rightID.IsNull() && n.leftID != n.rightID { + for !n.rightID.IsNull() && n.leftID.Less(n.rightID) { n.readLeft() } - for n.hasLeft && n.hasRight && n.less(n.rightID, n.leftID) { + for !n.leftID.IsNull() && n.rightID.Less(n.leftID) { n.readRight() } } - if !n.hasLeft || !n.hasRight { - return 0, false + if n.leftID.IsNull() || n.rightID.IsNull() { + return NullLID() } cur := n.leftID n.readLeft() n.readRight() - return cur, true + return cur } diff --git a/node/node_nand.go b/node/node_nand.go index 7f7b7a89..42c679ac 100644 --- a/node/node_nand.go +++ b/node/node_nand.go @@ -3,52 +3,43 @@ package node import "fmt" type nodeNAnd struct { - less LessFn + neg Node + reg Node - neg Node - negID uint32 - hasNeg bool - - reg Node - regID uint32 - hasReg bool + negID LID + regID LID } func (n *nodeNAnd) String() string { return fmt.Sprintf("(%s NAND %s)", n.neg.String(), n.reg.String()) } -func NewNAnd(negative, regular Node, reverse bool) *nodeNAnd { - node := &nodeNAnd{ - less: GetLessFn(reverse), - - neg: negative, - reg: regular, - } +func NewNAnd(negative, regular Node) *nodeNAnd { + node := &nodeNAnd{neg: negative, reg: regular} node.readNeg() node.readReg() return node } func (n *nodeNAnd) readNeg() { - n.negID, n.hasNeg = n.neg.Next() + n.negID = n.neg.Next() } func (n *nodeNAnd) readReg() { - n.regID, n.hasReg = n.reg.Next() + n.regID = n.reg.Next() } -func (n *nodeNAnd) Next() (uint32, bool) { - for n.hasReg { - for n.hasNeg && n.less(n.negID, n.regID) { +func (n *nodeNAnd) Next() LID { + for !n.regID.IsNull() { + for n.negID.Less(n.regID) { n.readNeg() } - if !n.hasNeg || n.negID != n.regID { // i.e. n.negID > regID + if n.negID.IsNull() || n.negID != n.regID { cur := n.regID n.readReg() - return cur, true + return cur } n.readReg() } - return 0, false + return NullLID() } diff --git a/node/node_not.go b/node/node_not.go index 098f7f1e..1aa9c098 100644 --- a/node/node_not.go +++ b/node/node_not.go @@ -10,8 +10,8 @@ func (n *nodeNot) String() string { return fmt.Sprintf("(NOT %s)", n.neg.String()) } -func NewNot(child Node, minVal, maxVal uint32, reverse bool) *nodeNot { - nodeRange := NewRange(minVal, maxVal, reverse) - nodeNAnd := NewNAnd(child, nodeRange, reverse) +func NewNot(child Node, minID, maxID LID) *nodeNot { + nodeRange := NewRange(minID, maxID) + nodeNAnd := NewNAnd(child, nodeRange) return &nodeNot{nodeNAnd: *(nodeNAnd)} } diff --git a/node/node_or.go b/node/node_or.go index 1cfec7ac..a31773a3 100644 --- a/node/node_or.go +++ b/node/node_or.go @@ -3,120 +3,95 @@ package node import "fmt" type nodeOr struct { - less LessFn - left Node right Node - leftID uint32 - hasLeft bool - rightID uint32 - hasRight bool + leftID LID + rightID LID } func (n *nodeOr) String() string { return fmt.Sprintf("(%s OR %s)", n.left.String(), n.right.String()) } -func NewOr(left, right Node, reverse bool) *nodeOr { - n := &nodeOr{ - less: GetLessFn(reverse), - - left: left, - right: right, - } +func NewOr(left, right Node) *nodeOr { + n := &nodeOr{left: left, right: right} n.readLeft() n.readRight() return n } func (n *nodeOr) readLeft() { - n.leftID, n.hasLeft = n.left.Next() + n.leftID = n.left.Next() } func (n *nodeOr) readRight() { - n.rightID, n.hasRight = n.right.Next() + n.rightID = n.right.Next() } -func (n *nodeOr) Next() (uint32, bool) { - if !n.hasLeft && !n.hasRight { - return 0, false +func (n *nodeOr) Next() LID { + if n.leftID.IsNull() && n.rightID.IsNull() { + return n.leftID } - if n.hasLeft && (!n.hasRight || n.less(n.leftID, n.rightID)) { + if n.leftID.Less(n.rightID) { cur := n.leftID n.readLeft() - return cur, true + return cur } - - if n.hasRight && (!n.hasLeft || n.less(n.rightID, n.leftID)) { + if n.rightID.Less(n.leftID) { cur := n.rightID n.readRight() - return cur, true + return cur } - cur := n.leftID n.readLeft() n.readRight() - - return cur, true + return cur } type nodeOrAgg struct { left Sourced right Sourced - leftID uint32 + leftID LID leftSource uint32 - hasLeft bool - rightID uint32 + rightID LID rightSource uint32 - hasRight bool - - less LessFn } func (n *nodeOrAgg) String() string { return fmt.Sprintf("(%s OR %s)", n.left.String(), n.right.String()) } -func NewNodeOrAgg(left, right Sourced, reverse bool) Sourced { - n := &nodeOrAgg{ - left: left, - right: right, - less: GetLessFn(reverse), - } +func NewNodeOrAgg(left, right Sourced) Sourced { + n := &nodeOrAgg{left: left, right: right} n.readLeft() n.readRight() return n } func (n *nodeOrAgg) readLeft() { - n.leftID, n.leftSource, n.hasLeft = n.left.NextSourced() + n.leftID, n.leftSource = n.left.NextSourced() } func (n *nodeOrAgg) readRight() { - n.rightID, n.rightSource, n.hasRight = n.right.NextSourced() + n.rightID, n.rightSource = n.right.NextSourced() } -func (n *nodeOrAgg) NextSourced() (uint32, uint32, bool) { - if !n.hasLeft && !n.hasRight { - return 0, 0, false +func (n *nodeOrAgg) NextSourced() (LID, uint32) { + if n.leftID.IsNull() && n.rightID.IsNull() { + return n.leftID, 0 } - - if n.hasLeft && (!n.hasRight || n.less(n.leftID, n.rightID)) { + if n.leftID.Less(n.rightID) { cur := n.leftID curSource := n.leftSource n.readLeft() - - return cur, curSource, true + return cur, curSource } - - // we don't need deduplication cur := n.rightID curSource := n.rightSource n.readRight() - - return cur, curSource, true + return cur, curSource } diff --git a/node/node_range.go b/node/node_range.go index ed5d0486..a6f75467 100644 --- a/node/node_range.go +++ b/node/node_range.go @@ -1,37 +1,26 @@ package node type nodeRange struct { - less LessFn - - maxVal uint32 - cur int - step int + maxID LID + curID LID } func (n *nodeRange) String() string { return "(RANGE)" } -func NewRange(minVal, maxVal uint32, reverse bool) *nodeRange { - step := 1 - if reverse { - step = -1 - minVal, maxVal = maxVal, minVal - } +func NewRange(minVal, maxVal LID) *nodeRange { return &nodeRange{ - less: GetLessFn(reverse), - - cur: int(minVal), - maxVal: maxVal, - step: step, + curID: minVal, + maxID: maxVal, } } -func (n *nodeRange) Next() (uint32, bool) { - if n.less(n.maxVal, uint32(n.cur)) { - return 0, false +func (n *nodeRange) Next() LID { + if n.maxID.Less(n.curID) { + return NullLID() } - cur := uint32(n.cur) - n.cur += n.step - return cur, true + result := n.curID + n.curID = n.curID.Inc() + return result } diff --git a/node/node_static.go b/node/node_static.go index d40135d2..d0b17eab 100644 --- a/node/node_static.go +++ b/node/node_static.go @@ -1,5 +1,7 @@ package node +import "math" + type staticCursor struct { ptr int data []uint32 @@ -31,22 +33,24 @@ func NewStatic(data []uint32, reverse bool) Node { }} } -func (n *staticAsc) Next() (uint32, bool) { +func (n *staticAsc) Next() LID { + // staticAsc is used in docs order desc, hence we return LID with desc order if n.ptr >= len(n.data) { - return 0, false + return NewDescLID(math.MaxUint32) } cur := n.data[n.ptr] n.ptr++ - return cur, true + return NewDescLID(cur) } -func (n *staticDesc) Next() (uint32, bool) { +func (n *staticDesc) Next() LID { + // staticDesc is used in docs order asc, hence we return LID with asc order if n.ptr < 0 { - return 0, false + return NewAscLID(0) } cur := n.data[n.ptr] n.ptr-- - return cur, true + return NewAscLID(cur) } // MakeStaticNodes is currently used only for tests diff --git a/node/node_test.go b/node/node_test.go index 6da2d8a1..b6730e21 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -8,10 +8,8 @@ import ( ) func readAllInto(node Node, ids []uint32) []uint32 { - id, has := node.Next() - for has { - ids = append(ids, id) - id, has = node.Next() + for cur := node.Next(); !cur.IsNull(); cur = node.Next() { + ids = append(ids, cur.Unpack()) } return ids } @@ -35,25 +33,85 @@ var ( func TestNodeAnd(t *testing.T) { expect := []uint32{5, 6, 13} - and := NewAnd(NewStatic(data[0], false), NewStatic(data[1], false), false) + and := NewAnd(NewStatic(data[0], false), NewStatic(data[1], false)) assert.Equal(t, expect, readAll(and)) + + // commutativity test + and2 := NewAnd(NewStatic(data[1], false), NewStatic(data[0], false)) + assert.Equal(t, expect, readAll(and2)) } func TestNodeOr(t *testing.T) { expect := []uint32{1, 2, 3, 5, 6, 7, 8, 9, 13, 14} - or := NewOr(NewStatic(data[0], false), NewStatic(data[1], false), false) + or := NewOr(NewStatic(data[0], false), NewStatic(data[1], false)) assert.Equal(t, expect, readAll(or)) + + // commutativity test + or2 := NewOr(NewStatic(data[1], false), NewStatic(data[0], false)) + assert.Equal(t, expect, readAll(or2)) } func TestNodeNAnd(t *testing.T) { expect := []uint32{2, 3, 14} - nand := NewNAnd(NewStatic(data[0], false), NewStatic(data[1], false), false) + nand := NewNAnd(NewStatic(data[0], false), NewStatic(data[1], false)) + assert.Equal(t, expect, readAll(nand)) +} + +func TestNodeAndReverse(t *testing.T) { + expect := []uint32{13, 6, 5} + and := NewAnd(NewStatic(data[0], true), NewStatic(data[1], true)) + assert.Equal(t, expect, readAll(and)) +} + +func TestNodeOrReverse(t *testing.T) { + expect := []uint32{14, 13, 9, 8, 7, 6, 5, 3, 2, 1} + or := NewOr(NewStatic(data[0], true), NewStatic(data[1], true)) + assert.Equal(t, expect, readAll(or)) + + // commutativity test + or2 := NewOr(NewStatic(data[0], true), NewStatic(data[1], true)) + assert.Equal(t, expect, readAll(or2)) +} + +func TestNodeNAndReverse(t *testing.T) { + expect := []uint32{14, 3, 2} + nand := NewNAnd(NewStatic(data[0], true), NewStatic(data[1], true)) assert.Equal(t, expect, readAll(nand)) } +func TestNodeNotReverse(t *testing.T) { + expect := []uint32{15, 12, 11, 10, 9, 8, 7, 4, 1} + not := NewNot(NewStatic(data[1], true), NewAscLID(15), NewAscLID(1)) + assert.Equal(t, expect, readAll(not)) +} + +func TestNodeRange(t *testing.T) { + expect := []uint32{3, 4, 5, 6, 7, 8, 9, 10} + not := NewRange(NewDescLID(3), NewDescLID(10)) + assert.Equal(t, expect, readAll(not)) +} + +func TestNodeRangeReverse(t *testing.T) { + expect := []uint32{10, 9, 8, 7, 6, 5, 4, 3} + not := NewRange(NewAscLID(10), NewAscLID(3)) + assert.Equal(t, expect, readAll(not)) +} + +func TestNodeNotPartialRange(t *testing.T) { + expect := []uint32{4, 7, 8, 9, 10} + not := NewNot(NewStatic(data[1], false), NewDescLID(3), NewDescLID(10)) + assert.Equal(t, expect, readAll(not)) +} + +func TestNodeNotPartialRangeReverse(t *testing.T) { + expect := []uint32{10, 9, 8, 7, 4} + not := NewNot(NewStatic(data[1], true), NewAscLID(10), NewAscLID(3)) + assert.Equal(t, expect, readAll(not)) +} + func TestNodeNot(t *testing.T) { expect := []uint32{1, 4, 7, 8, 9, 10, 11, 12, 15} - nand := NewNot(NewStatic(data[1], false), 1, 15, false) + nand := NewNot(NewStatic(data[1], false), NewDescLID(1), NewDescLID(15)) assert.Equal(t, expect, readAll(nand)) } @@ -61,7 +119,7 @@ func TestNodeNot(t *testing.T) { func TestNodeLazyAnd(t *testing.T) { left := []uint32{1, 2} right := []uint32{1, 2, 3, 4, 5, 6} - and := NewAnd(NewStatic(left, false), NewStatic(right, false), false) + and := NewAnd(NewStatic(left, false), NewStatic(right, false)) assert.Equal(t, []uint32{1, 2}, readAll(and)) assert.Equal(t, []uint32{4, 5, 6}, getRemainingSlice(t, and.right)) assert.Equal(t, []uint32(nil), readAll(and)) @@ -72,7 +130,7 @@ func TestNodeLazyAnd(t *testing.T) { func TestNodeLazyNAnd(t *testing.T) { left := []uint32{1, 2, 5, 6, 7, 8} right := []uint32{2, 4} - nand := NewNAnd(NewStatic(left, false), NewStatic(right, false), false) + nand := NewNAnd(NewStatic(left, false), NewStatic(right, false)) assert.Equal(t, []uint32{4}, readAll(nand)) assert.Equal(t, []uint32{6, 7, 8}, getRemainingSlice(t, nand.neg)) assert.Equal(t, []uint32(nil), readAll(nand)) @@ -92,44 +150,44 @@ func isEmptyNode(node any) bool { func TestNodeTreeBuilding(t *testing.T) { t.Run("size_0", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 0)) - assert.True(t, isEmptyNode(BuildORTree(dn, false)), "expected empty node") - assert.True(t, isEmptyNode(BuildORTreeAgg(dn, false)), "expected empty node") + assert.True(t, isEmptyNode(BuildORTree(dn)), "expected empty node") + assert.True(t, isEmptyNode(BuildORTreeAgg(dn)), "expected empty node") }) t.Run("size_1", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 1)) - assert.Equal(t, "STATIC", BuildORTree(dn, false).String()) - assert.Equal(t, "SOURCED", BuildORTreeAgg(dn, false).String()) + assert.Equal(t, "STATIC", BuildORTree(dn).String()) + assert.Equal(t, "SOURCED", BuildORTreeAgg(dn).String()) }) t.Run("size_2", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 2)) - assert.Equal(t, "(STATIC OR STATIC)", BuildORTree(dn, false).String()) - assert.Equal(t, "(SOURCED OR SOURCED)", BuildORTreeAgg(dn, false).String()) + assert.Equal(t, "(STATIC OR STATIC)", BuildORTree(dn).String()) + assert.Equal(t, "(SOURCED OR SOURCED)", BuildORTreeAgg(dn).String()) }) t.Run("size_3", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 3)) - assert.Equal(t, "(STATIC OR (STATIC OR STATIC))", BuildORTree(dn, false).String()) - assert.Equal(t, "(SOURCED OR (SOURCED OR SOURCED))", BuildORTreeAgg(dn, false).String()) + assert.Equal(t, "(STATIC OR (STATIC OR STATIC))", BuildORTree(dn).String()) + assert.Equal(t, "(SOURCED OR (SOURCED OR SOURCED))", BuildORTreeAgg(dn).String()) }) t.Run("size_4", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 4)) - assert.Equal(t, "((STATIC OR STATIC) OR (STATIC OR STATIC))", BuildORTree(dn, false).String()) - assert.Equal(t, "((SOURCED OR SOURCED) OR (SOURCED OR SOURCED))", BuildORTreeAgg(dn, false).String()) + assert.Equal(t, "((STATIC OR STATIC) OR (STATIC OR STATIC))", BuildORTree(dn).String()) + assert.Equal(t, "((SOURCED OR SOURCED) OR (SOURCED OR SOURCED))", BuildORTreeAgg(dn).String()) }) t.Run("size_5", func(t *testing.T) { dn := MakeStaticNodes(make([][]uint32, 5)) - assert.Equal(t, "((STATIC OR STATIC) OR (STATIC OR (STATIC OR STATIC)))", BuildORTree(dn, false).String()) - assert.Equal(t, "((SOURCED OR SOURCED) OR (SOURCED OR (SOURCED OR SOURCED)))", BuildORTreeAgg(dn, false).String()) + assert.Equal(t, "((STATIC OR STATIC) OR (STATIC OR (STATIC OR STATIC)))", BuildORTree(dn).String()) + assert.Equal(t, "((SOURCED OR SOURCED) OR (SOURCED OR (SOURCED OR SOURCED)))", BuildORTreeAgg(dn).String()) }) t.Run("size_6", func(t *testing.T) { - labels := BuildORTree(MakeStaticNodes(make([][]uint32, 6)), false).String() + labels := BuildORTree(MakeStaticNodes(make([][]uint32, 6))).String() assert.Equal(t, "((STATIC OR (STATIC OR STATIC)) OR (STATIC OR (STATIC OR STATIC)))", labels) }) t.Run("size_7", func(t *testing.T) { - labels := BuildORTree(MakeStaticNodes(make([][]uint32, 7)), false).String() + labels := BuildORTree(MakeStaticNodes(make([][]uint32, 7))).String() assert.Equal(t, "((STATIC OR (STATIC OR STATIC)) OR ((STATIC OR STATIC) OR (STATIC OR STATIC)))", labels) }) t.Run("size_8", func(t *testing.T) { - labels := BuildORTree(MakeStaticNodes(make([][]uint32, 8)), false).String() + labels := BuildORTree(MakeStaticNodes(make([][]uint32, 8))).String() assert.Equal(t, "(((STATIC OR STATIC) OR (STATIC OR STATIC)) OR ((STATIC OR STATIC) OR (STATIC OR STATIC)))", labels) }) } diff --git a/node/sourced_node_wrapper.go b/node/sourced_node_wrapper.go index 369e2eed..82b52449 100644 --- a/node/sourced_node_wrapper.go +++ b/node/sourced_node_wrapper.go @@ -9,9 +9,9 @@ func (*sourcedNodeWrapper) String() string { return "SOURCED" } -func (w *sourcedNodeWrapper) NextSourced() (uint32, uint32, bool) { - id, has := w.node.Next() - return id, w.source, has +func (w *sourcedNodeWrapper) NextSourced() (LID, uint32) { + cmp := w.node.Next() + return cmp, w.source } func NewSourcedNodeWrapper(d Node, source int) Sourced {