Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
}

type testDoc = struct {
id string
json string
message string
service string
Expand Down Expand Up @@ -280,19 +281,21 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time.
message := messages[rand.IntN(len(messages))]
level := rand.IntN(6)
timestamp := fromTime.Add(time.Duration(i) * time.Millisecond)
id := fmt.Sprintf("id-%d", i)
traceId := fmt.Sprintf("trace-%d", i%5000)
pod := fmt.Sprintf("pod-%d", i%50)
clientIp := fmt.Sprintf("192.168.%d.%d", rand.IntN(64), rand.IntN(256))
if i == numMessages-1 {
toTime = timestamp
}

json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, pod, clientIp, message, traceId, level)
json := fmt.Sprintf(`{"timestamp":%q,"id": %q, "service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), id, service, pod, clientIp, message, traceId, level)

docs = append(docs, &testDoc{
json: json,
timestamp: timestamp,
id: id,
message: message,
service: service,
pod: pod,
Expand Down
28 changes: 28 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
seq.TokenizerTypePath: tokenizer.NewPathTokenizer(512, false, true),
}
s.mapping = seq.Mapping{
"id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_namespace": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_container": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down Expand Up @@ -1352,6 +1353,33 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
})

// Test large QPR with 25000 groups (all ids are unique)
s.Run("_exists_:service | group by id count()", func() {
countById := make(map[string]int)
for _, doc := range testDocs {
countById[doc.id]++
}

var expectedBuckets []seq.AggregationBucket
for id, cnt := range countById {
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: id,
Value: float64(cnt),
NotExists: 0,
})
}

searchParams := s.query(
"_exists_:service",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
GroupBy: aggField("id"),
Func: seq.AggFuncCount,
}))

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncCount}, expectedBuckets)
})

s.Run("NOT message:retry | group by service avg(level)", func() {
levelsByService := make(map[string][]int)
for _, doc := range testDocs {
Expand Down
15 changes: 10 additions & 5 deletions frac/sealed/token/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,18 @@ func (t Table) GetEntryByTID(tid uint32) *TableEntry {
if tid == 0 {
return nil
}
// todo: use bin search (we must have ordered slice here)
for _, data := range t {
for _, entry := range data.Entries {
if tid >= entry.StartTID && tid < entry.StartTID+entry.ValCount {
return entry
}
from := data.Entries[0].StartTID
to := data.Entries[len(data.Entries)-1].getLastTID()
if tid < from || tid > to {
continue
}

i := sort.Search(len(data.Entries), func(j int) bool {
return data.Entries[j].StartTID > tid
})

return data.Entries[i-1]
}

logger.Panic("can't find tid", zap.Uint32("tid", tid))
Expand Down
Loading