Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
readTest(t, sealed, numReaders, numQueries, docs, fromTime, toTime, mapping)
}

const (
scheduler = "scheduler"
database = "database"
bus = "bus"
proxy = "proxy"
gateway = "gateway"
kafka = "kafka"
)

func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) {
readersGroup, ctx := errgroup.WithContext(t.Context())

Expand All @@ -173,7 +182,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
case 1:
query = "service:gateway"
filter = func(doc *testDoc) bool {
return doc.service == "gateway"
return doc.service == gateway
}
case 2:
query = "level:2"
Expand All @@ -198,7 +207,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
case 6:
query = "service:gateway AND level:3"
filter = func(doc *testDoc) bool {
return doc.service == "gateway" && doc.level == 3
return doc.service == gateway && doc.level == 3
}
}

Expand Down Expand Up @@ -265,7 +274,7 @@ type testDoc = struct {
}

func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time.Time, time.Time) {
services := []string{"gateway", "proxy", "scheduler", "database", "bus", "kafka"}
services := []string{gateway, proxy, scheduler, database, bus, kafka}
messages := []string{
"request started", "request completed", "processing timed out",
"processing data", "processing failed", "processing retry",
Expand Down
247 changes: 208 additions & 39 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,14 @@ func (s *FractionTestSuite) TestBasicAggregation() {
"message:*",
withAggQuery(processor.AggQuery{GroupBy: aggField("service")})),
[]map[string]uint64{
{"gateway": 3, "proxy": 2, "scheduler": 1},
{gateway: 3, proxy: 2, scheduler: 1},
})
assertAggSearch(
s.query(
"message:good",
withAggQuery(processor.AggQuery{GroupBy: aggField("service")})),
[]map[string]uint64{
{"gateway": 2, "proxy": 1},
{gateway: 2, proxy: 1},
})
assertAggSearch(
s.query(
Expand All @@ -762,7 +762,7 @@ func (s *FractionTestSuite) TestBasicAggregation() {
withAggQuery(processor.AggQuery{GroupBy: aggField("service")}),
withAggQuery(processor.AggQuery{GroupBy: aggField("level")})),
[]map[string]uint64{
{"gateway": 3, "proxy": 2, "scheduler": 1},
{gateway: 3, proxy: 2, scheduler: 1},
{"1": 4, "2": 1, "3": 1},
})
}
Expand Down Expand Up @@ -1222,17 +1222,31 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: toTime,
},
{
name: "NOT service:bus",
query: "NOT service:bus",
filter: func(doc *testDoc) bool { return doc.service != bus },
fromTime: fromTime,
toTime: toTime,
},
{
name: "NOT service:bus (time range)",
query: "NOT service:bus",
filter: func(doc *testDoc) bool { return doc.service != bus },
fromTime: fromTime,
toTime: midTime,
},
{
name: "service:proxy (time range)",
query: "service:proxy",
filter: func(doc *testDoc) bool { return doc.service == "proxy" },
filter: func(doc *testDoc) bool { return doc.service == proxy },
fromTime: fromTime,
toTime: midTime,
},
{
name: "service:scheduler (time range + limit)",
query: "service:scheduler",
filter: func(doc *testDoc) bool { return doc.service == "scheduler" },
filter: func(doc *testDoc) bool { return doc.service == scheduler },
fromTime: fromTime,
toTime: midTime,
limit: 100,
Expand Down Expand Up @@ -1272,6 +1286,13 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: midTime,
},
{
name: "NOT trace_id:trace-4999",
query: "NOT trace_id:trace-4999",
filter: func(doc *testDoc) bool { return doc.traceId != "trace-4999" },
fromTime: fromTime,
toTime: toTime,
},
{
name: "trace_id:trace-4999",
query: "trace_id:trace-4999",
Expand All @@ -1286,6 +1307,97 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: midTime,
},
// AND operator queries
{
name: "message:request AND message:failed",
query: "message:request AND message:failed",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "request") && strings.Contains(doc.message, "failed")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND message:processing AND message:retry AND level:5",
query: "service:gateway AND message:processing AND message:retry AND level:5",
filter: func(doc *testDoc) bool {
return doc.service == gateway && strings.Contains(doc.message, "processing") &&
strings.Contains(doc.message, "retry") && doc.level == 5
},
fromTime: fromTime,
toTime: toTime,
},
// OR operator queries
{
name: "trace_id OR",
query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000",
filter: func(doc *testDoc) bool {
return doc.traceId == "trace-1000" ||
doc.traceId == "trace-1500" ||
doc.traceId == "trace-2000" ||
doc.traceId == "trace-2500" ||
doc.traceId == "trace-3000"
},
fromTime: fromTime,
toTime: toTime,
},

// mixed AND/OR/NOT
{
name: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*",
query: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "request") && (doc.level == 1 || doc.level == 3 || doc.level == 5) &&
strings.Contains(doc.traceId, "trace-2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "complex AND+OR",
query: "(service:gateway OR service:proxy OR service:scheduler) AND " +
"(message:request OR message:failed) AND level:[1 to 3]",
filter: func(doc *testDoc) bool {
return (doc.service == gateway || doc.service == proxy || doc.service == "scheduler") &&
(strings.Contains(doc.message, "request") || strings.Contains(doc.message, "failed")) &&
(doc.level >= 1 && doc.level <= 3)
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND NOT (message:request OR message:timed OR level:[0 to 3])",
query: "service:gateway AND NOT (message:request OR message:timed OR level:[0 to 3])",
filter: func(doc *testDoc) bool {
return doc.service == gateway &&
!(strings.Contains(doc.message, "request") ||
strings.Contains(doc.message, "timed") ||
(doc.level >= 0 && doc.level <= 3))
},
fromTime: fromTime,
toTime: midTime,
},
{
name: "service:proxy AND NOT level:5 AND NOT pod:pod-2* AND NOT client_ip:ip_range(192.168.19.0,192.168.19.255)",
query: "service:proxy AND NOT level:5 AND NOT pod:pod-2* AND NOT client_ip:ip_range(192.168.19.0,192.168.19.255)",
filter: func(doc *testDoc) bool {
return doc.service == proxy &&
doc.level != 5 &&
!strings.Contains(doc.pod, "pod-2") &&
!strings.Contains(doc.clientIp, "192.168.19")
},
fromTime: fromTime,
toTime: midTime,
},

// other queries
{
name: "trace_id:trace-4*",
query: "trace_id:trace-4*",
filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") },
fromTime: fromTime,
toTime: toTime,
},
}

for _, tc := range searchTestCases {
Expand Down Expand Up @@ -1320,37 +1432,85 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
}

s.Run("service:kafka | group by pod unique_count(client_ip)", func() {
ips := make(map[string]map[string]struct{})
for _, doc := range testDocs {
if doc.service != "kafka" {
continue
// Check both sort orders simply for aggTree to be iterated in a different order
orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc}

for _, ord := range orders {
ips := make(map[string]map[string]struct{})
for _, doc := range testDocs {
if doc.service != kafka {
continue
}
if ips[doc.pod] == nil {
ips[doc.pod] = make(map[string]struct{})
}

ips[doc.pod][doc.clientIp] = struct{}{}
}
if ips[doc.pod] == nil {
ips[doc.pod] = make(map[string]struct{})

var expectedBuckets []seq.AggregationBucket
for pod, podIps := range ips {
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: pod,
Value: float64(len(podIps)),
NotExists: 0,
})
}

ips[doc.pod][doc.clientIp] = struct{}{}
searchParams := s.query(
"service:kafka",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
Field: aggField("client_ip"),
GroupBy: aggField("pod"),
Func: seq.AggFuncUniqueCount,
}))
searchParams.Order = ord

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
}
})

var expectedBuckets []seq.AggregationBucket
for pod, podIps := range ips {
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: pod,
Value: float64(len(podIps)),
NotExists: 0,
})
}
s.Run("service:scheduler | group by pod avg(level)", func() {
// Check both sort orders simply for aggTree to be iterated in a different order
orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc}

searchParams := s.query(
"service:kafka",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
Field: aggField("client_ip"),
GroupBy: aggField("pod"),
Func: seq.AggFuncUniqueCount,
}))
for _, ord := range orders {
levelsByPod := make(map[string][]int)
for _, doc := range testDocs {
if doc.service != "scheduler" {
continue
}

levelsByPod[doc.pod] = append(levelsByPod[doc.pod], doc.level)
}

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
var expectedBuckets []seq.AggregationBucket
for pod, levels := range levelsByPod {
sum := 0
for _, level := range levels {
sum += level
}
avg := float64(sum) / float64(len(levels))
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: pod,
Value: avg,
NotExists: 0,
})
}

searchParams := s.query(
"service:scheduler",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
Field: aggField("level"),
GroupBy: aggField("pod"),
Func: seq.AggFuncAvg,
}))
searchParams.Order = ord

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncAvg}, expectedBuckets)
}
})

// Test large QPR with 25000 groups (all ids are unique)
Expand Down Expand Up @@ -1418,18 +1578,27 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
})

s.Run("service:database AND level:3 | hist 1s", func() {
histBuckets := make(map[string]uint64)
for _, doc := range testDocs {
if doc.service == "database" && doc.level == 3 {
bucketTime := doc.timestamp.Truncate(time.Second)
bucketKey := bucketTime.Format(time.RFC3339Nano)
histBuckets[bucketKey]++
// Check both sort orders simply for lid tree to be iterated in a different order
orders := []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc}

for _, ord := range orders {
histBuckets := make(map[string]uint64)
for _, doc := range testDocs {
if doc.service == "database" && doc.level == 3 {
bucketTime := doc.timestamp.Truncate(time.Second)
bucketKey := bucketTime.Format(time.RFC3339Nano)
histBuckets[bucketKey]++
}
}
}

s.AssertHist(
s.query("service:database AND level:3", withTo(toTime.Format(time.RFC3339Nano)), withHist(1000)),
histBuckets)
searchParams := s.query(
"service:database AND level:3",
withTo(toTime.Format(time.RFC3339Nano)),
withHist(1000))
searchParams.Order = ord

s.AssertHist(searchParams, histBuckets)
}
})

s.Run("scroll with offset id", func() {
Expand Down
Loading
Loading