From 27da57cc752e46eaa4c0943abbc66063b80eb190 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Mon, 16 Feb 2026 18:31:21 +0400 Subject: [PATCH 1/3] log search type when it's slow --- frac/processor/search_params.go | 10 ++++++++++ storeapi/grpc_search.go | 1 + 2 files changed, 11 insertions(+) diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index e383c9e2..2a2dcf62 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -39,3 +39,13 @@ func (p *SearchParams) HasAgg() bool { func (p *SearchParams) IsScanAllRequest() bool { return p.WithTotal || p.HasAgg() || p.HasHist() } + +func (p *SearchParams) Type() string { + if p.HasAgg() { + return "agg" + } else if p.HasHist() { + return "hist" + } + + return "reg" +} diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 4acd3bb7..2fa87799 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -230,6 +230,7 @@ func (g *GrpcV1) doSearch( logger.Warn("slow search", zap.Int64("took_ms", took.Milliseconds()), zap.Object("req", (*searchRequestMarshaler)(req)), + zap.String("type", searchParams.Type()), zap.Uint64("found", qpr.Total), zap.String("from", seq.MillisToMID(uint64(req.From)).String()), zap.String("to", seq.MillisToMID(uint64(req.To)).String()), From 59f5235e965c768e5c8d4f74c8cc6fa76f6c60a3 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:19:52 +0300 Subject: [PATCH 2/3] better slow search --- frac/processor/search_params.go | 19 +++++++++++ fracmanager/searcher.go | 57 ++++++++++++++++++++++++++++++++- seq/qpr.go | 27 ++++++++++++++++ seq/seq.go | 4 +++ storeapi/grpc_search.go | 16 --------- storeapi/grpc_v1.go | 1 + 6 files changed, 107 insertions(+), 17 deletions(-) diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index 2a2dcf62..3d5d0e93 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -1,6 +1,8 @@ package processor import ( + "go.uber.org/zap/zapcore" + "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" ) @@ -13,6 +15,23 @@ type AggQuery struct { Interval int64 } +func (q AggQuery) MarshalLogObject(enc zapcore.ObjectEncoder) error { + if q.Field != nil { + enc.AddString("field", q.Field.Field) + } + if q.GroupBy != nil { + enc.AddString("groupBy", q.GroupBy.Field) + } + enc.AddString("func", q.Func.String()) + if len(q.Quantiles) != 0 { + enc.AddInt("quantiles_count", len(q.Quantiles)) + } + if q.Interval != 0 { + enc.AddInt64("interval", q.Interval) + } + return nil +} + type SearchParams struct { AST *parser.ASTNode `json:"-"` diff --git a/fracmanager/searcher.go b/fracmanager/searcher.go index d69277fb..6cc688cb 100644 --- a/fracmanager/searcher.go +++ b/fracmanager/searcher.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/processor" @@ -17,9 +19,14 @@ import ( "github.com/ozontech/seq-db/util" ) +const ( + maxFracsSlowSearchLog = 10 +) + type SearcherCfg struct { MaxFractionHits int // the maximum number of fractions used in the search FractionsPerIteration int + SlowLogThreshold time.Duration } type Searcher struct { @@ -38,6 +45,7 @@ func NewSearcher(maxWorkersNum int, cfg SearcherCfg) *Searcher { } func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params processor.SearchParams, tr *querytracer.Tracer) (*seq.QPR, error) { + start := time.Now() remainingFracs, err := s.prepareFracs(fracs, params) if err != nil { return nil, err @@ -59,13 +67,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params var totalSearchTimeNanos int64 var totalWaitTimeNanos int64 + totalFracsFound := 0 + totalFracsSkipped := 0 + var fracsFound []string + var fracsSkipped []string for len(remainingFracs) > 0 && (scanAll || params.Limit > 0) { - subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, remainingFracs.Shift(fracsChunkSize), params) + chunk := remainingFracs.Shift(fracsChunkSize) + + subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, chunk, params) if err != nil { return nil, err } + for i, qpr := range subQPRs { + if !qpr.Empty() { + totalFracsFound++ + if len(fracsFound) < maxFracsSlowSearchLog { + fracsFound = append(fracsFound, chunk[i].Info().Name()) + } + } else { + totalFracsSkipped++ + if len(fracsSkipped) < maxFracsSlowSearchLog { + fracsSkipped = append(fracsSkipped, chunk[i].Info().Name()) + } + } + } + totalSearchTimeNanos += searchTimeNanos totalWaitTimeNanos += waitTimeNanos @@ -91,6 +119,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params } searchSubSearches.Observe(float64(subSearchesCnt)) + + took := time.Since(start) + if s.cfg.SlowLogThreshold != 0 && took >= s.cfg.SlowLogThreshold { + fields := []zap.Field{ + zap.Int64("took_ms", took.Milliseconds()), + zap.String("type", params.Type()), + zap.String("request", params.AST.SeqQLString()), + zap.Uint64("hist_interval", params.HistInterval), + zap.String("from", params.From.String()), + zap.String("to", params.To.String()), + zap.Uint64("range", seq.MIDToSeconds(params.To)-seq.MIDToSeconds(params.From)), + zap.String("offset_id", params.OffsetId.String()), + zap.Int("limit", params.Limit), + zap.Bool("with_total", params.WithTotal), + zap.Int("total_fracs_found", totalFracsFound), + zap.Strings("fracs_found", fracsFound), + zap.Int("total_fracs_skipped", totalFracsSkipped), + zap.Strings("fracs_skipped", fracsSkipped), + zap.Uint64("total", total.Total), + } + + for i, agg := range params.AggQ { + fields = append(fields, zap.Object(fmt.Sprintf("agg_%d", i), agg)) + } + + logger.Warn("slow search", fields...) + } return total, nil } diff --git a/seq/qpr.go b/seq/qpr.go index a7246233..30b78937 100644 --- a/seq/qpr.go +++ b/seq/qpr.go @@ -74,6 +74,10 @@ type QPR struct { Errors []ErrorSource } +func (q *QPR) Empty() bool { + return len(q.IDs) == 0 && len(q.Histogram) == 0 && len(q.Aggs) == 0 +} + func (q *QPR) Aggregate(args []AggregateArgs) []AggregationResult { allAggregations := make([]AggregationResult, len(q.Aggs)) for i, agg := range q.Aggs { @@ -107,6 +111,29 @@ const ( AggFuncUniqueCount ) +func (f AggFunc) String() string { + switch f { + case AggFuncCount: + return "count" + case AggFuncSum: + return "sum" + case AggFuncMin: + return "min" + case AggFuncMax: + return "max" + case AggFuncAvg: + return "avg" + case AggFuncQuantile: + return "quantile" + case AggFuncUnique: + return "unique" + case AggFuncUniqueCount: + return "unique_count" + default: + return "unknown" + } +} + type AggBin struct { MID MID Token string diff --git a/seq/seq.go b/seq/seq.go index 87f82087..6a5a0039 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -152,6 +152,10 @@ func MIDToMillis(t MID) uint64 { return uint64(t) / uint64(time.Millisecond) } +func MIDToSeconds(t MID) uint64 { + return uint64(t) / uint64(time.Second) +} + func MIDToCeilingMillis(t MID) uint64 { millis := uint64(t) / uint64(time.Millisecond) nanosPartOfMilli := uint64(t) % uint64(time.Millisecond) diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 2fa87799..2057ea39 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -225,22 +225,6 @@ func (g *GrpcV1) doSearch( } } - took := time.Since(start) - if g.config.Search.LogThreshold != 0 && took >= g.config.Search.LogThreshold { - logger.Warn("slow search", - zap.Int64("took_ms", took.Milliseconds()), - zap.Object("req", (*searchRequestMarshaler)(req)), - zap.String("type", searchParams.Type()), - zap.Uint64("found", qpr.Total), - zap.String("from", seq.MillisToMID(uint64(req.From)).String()), - zap.String("to", seq.MillisToMID(uint64(req.To)).String()), - zap.Int64("offset", req.Offset), - zap.String("offset_id", req.OffsetId), - zap.Int64("size", req.Size), - zap.Bool("with_total", req.WithTotal), - ) - } - return buildSearchResponse(qpr), nil } diff --git a/storeapi/grpc_v1.go b/storeapi/grpc_v1.go index 2ed55ea5..a242b172 100644 --- a/storeapi/grpc_v1.go +++ b/storeapi/grpc_v1.go @@ -106,6 +106,7 @@ func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvi searcher: fracmanager.NewSearcher(cfg.Search.WorkersCount, fracmanager.SearcherCfg{ MaxFractionHits: cfg.Search.MaxFractionHits, FractionsPerIteration: cfg.Search.FractionsPerIteration, + SlowLogThreshold: cfg.Search.LogThreshold, }), }, fetchData: fetchData{ From dd8c253dae88407fc83e53556c04bf3c100bb5b9 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:41:47 +0400 Subject: [PATCH 3/3] implement MarshalLogObject for SearchParams --- frac/processor/search_params.go | 30 ++++++++++++++++++++++++++++++ fracmanager/searcher.go | 14 +------------- seq/qpr.go | 11 +++++++++++ 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index 3d5d0e93..de2594b8 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -1,6 +1,8 @@ package processor import ( + "fmt" + "go.uber.org/zap/zapcore" "github.com/ozontech/seq-db/parser" @@ -47,6 +49,34 @@ type SearchParams struct { Order seq.DocsOrder } +func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { + if p.AST != nil { + enc.AddString("request", p.AST.SeqQLString()) + } + enc.AddString("type", p.Type()) + if p.HistInterval != 0 { + enc.AddUint64("hist_interval", p.HistInterval) + } + enc.AddString("from", p.From.String()) + enc.AddString("to", p.To.String()) + enc.AddUint64("range_seconds", seq.MIDToSeconds(p.To)-seq.MIDToSeconds(p.From)) + if uint64(p.OffsetId.MID) != 0 { + enc.AddString("offset_id", p.OffsetId.String()) + } + if p.Limit != 0 { + enc.AddInt("limit", p.Limit) + } + enc.AddBool("with_total", p.WithTotal) + enc.AddString("order", p.Order.String()) + for i, agg := range p.AggQ { + err := enc.AddObject(fmt.Sprintf("agg_%d", i), agg) + if err != nil { + return err + } + } + return nil +} + func (p *SearchParams) HasHist() bool { return p.HistInterval > 0 } diff --git a/fracmanager/searcher.go b/fracmanager/searcher.go index 6cc688cb..6a5b768b 100644 --- a/fracmanager/searcher.go +++ b/fracmanager/searcher.go @@ -124,15 +124,7 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params if s.cfg.SlowLogThreshold != 0 && took >= s.cfg.SlowLogThreshold { fields := []zap.Field{ zap.Int64("took_ms", took.Milliseconds()), - zap.String("type", params.Type()), - zap.String("request", params.AST.SeqQLString()), - zap.Uint64("hist_interval", params.HistInterval), - zap.String("from", params.From.String()), - zap.String("to", params.To.String()), - zap.Uint64("range", seq.MIDToSeconds(params.To)-seq.MIDToSeconds(params.From)), - zap.String("offset_id", params.OffsetId.String()), - zap.Int("limit", params.Limit), - zap.Bool("with_total", params.WithTotal), + zap.Object("params", params), zap.Int("total_fracs_found", totalFracsFound), zap.Strings("fracs_found", fracsFound), zap.Int("total_fracs_skipped", totalFracsSkipped), @@ -140,10 +132,6 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params zap.Uint64("total", total.Total), } - for i, agg := range params.AggQ { - fields = append(fields, zap.Object(fmt.Sprintf("agg_%d", i), agg)) - } - logger.Warn("slow search", fields...) } return total, nil diff --git a/seq/qpr.go b/seq/qpr.go index 30b78937..c0415b85 100644 --- a/seq/qpr.go +++ b/seq/qpr.go @@ -29,6 +29,17 @@ func (o DocsOrder) IsReverse() bool { return o == DocsOrderAsc } +func (f DocsOrder) String() string { + switch f { + case DocsOrderDesc: + return "desc" + case DocsOrderAsc: + return "asc" + default: + return "unknown" + } +} + type IDSource struct { ID ID Source uint64