From cec535eed578df090557eed3da2bf6531eb2dab8 Mon Sep 17 00:00:00 2001 From: moshaad7 Date: Mon, 9 Oct 2023 00:38:44 +0530 Subject: [PATCH 1/4] support hooks in analysis pipeline --- analysis/type.go | 90 +++++++++++++++++- document/document.go | 9 ++ document/field.go | 2 +- document/field_boolean.go | 4 +- document/field_composite.go | 3 +- document/field_datetime.go | 4 +- document/field_geopoint.go | 4 +- document/field_geoshape.go | 4 +- document/field_ip.go | 3 +- document/field_numeric.go | 3 +- document/field_text.go | 10 +- go.mod | 7 ++ index/scorch/scorch.go | 81 +++++++++++++--- mapping/index.go | 8 +- registry/analyzer.go | 20 +++- registry/hook_tokens_analyzer.go | 153 +++++++++++++++++++++++++++++++ registry/hook_vector_analyzer.go | 17 ++++ registry/registry.go | 1 + search/query/match.go | 7 +- search/query/match_phrase.go | 5 +- 20 files changed, 404 insertions(+), 31 deletions(-) create mode 100644 registry/hook_tokens_analyzer.go create mode 100644 registry/hook_vector_analyzer.go diff --git a/analysis/type.go b/analysis/type.go index e3a7f201b..cb2725dd6 100644 --- a/analysis/type.go +++ b/analysis/type.go @@ -73,7 +73,89 @@ type TokenFilter interface { } type Analyzer interface { - Analyze([]byte) TokenStream + Type() string + // return value of this method depends on the type of analyzer + Analyze([]byte) interface{} +} + +const ( + TokensAnalyzerType = "token" + HookTokensAnalyzerType = "hook_token" + VectorAnalyzerType = "vector" + HookVectorAnalyzerType = "hook_vector" +) + +// # Analyzer Analyze() return type +type TokensAnalyzer struct { + Tokens TokenStream +} +type HookTokensAnalyzer struct { + Tokens TokenStream + Err error +} +type VectorAnalyzer []float64 +type HookVectorAnalyzer struct { + Vector []float64 + Err error +} + +func AnalyzeForTokens(analyzer Analyzer, input []byte) (TokenStream, error) { + analyzerType := analyzer.Type() + if analyzerType != TokensAnalyzerType && + analyzerType != HookTokensAnalyzerType { + return nil, fmt.Errorf("cannot analyze text with analyzer of type: %s", + analyzerType) + } + + // analyze ouput + analyzedOp := analyzer.Analyze(input) + err := CheckAnalyzed(analyzedOp, analyzer) + if err != nil { + return nil, fmt.Errorf("incompatible analysis result for analyzer "+ + "of type: %s, err:%+v", analyzerType, err) + } + + if analyzerType == TokensAnalyzerType { + op := analyzedOp.(TokensAnalyzer) + return op.Tokens, nil + } + + // analyzerType == analysis.HookTokensAnalyzerType + + op := analyzedOp.(HookTokensAnalyzer) + if op.Err != nil { + return nil, fmt.Errorf("analyzer hook failed, err:%+v", op.Err) + } + + return op.Tokens, nil +} + +func CheckAnalyzed(value interface{}, analyzer Analyzer) error { + switch analyzer.Type() { + case TokensAnalyzerType: + _, ok := value.(TokensAnalyzer) + if !ok { + return fmt.Errorf("expected TokensAnalyzer, got %T", value) + } + case HookTokensAnalyzerType: + _, ok := value.(HookTokensAnalyzer) + if !ok { + return fmt.Errorf("expected HookTokensAnalyzer, got %T", value) + } + case VectorAnalyzerType: + _, ok := value.(VectorAnalyzer) + if !ok { + return fmt.Errorf("expected VectorAnalyzer, got %T", value) + } + case HookVectorAnalyzerType: + _, ok := value.(HookVectorAnalyzer) + if !ok { + return fmt.Errorf("expected HookVectorAnalyzer, got %T", value) + } + default: + return fmt.Errorf("unknown analyzer type %s", analyzer.Type()) + } + return nil } type DefaultAnalyzer struct { @@ -82,7 +164,7 @@ type DefaultAnalyzer struct { TokenFilters []TokenFilter } -func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream { +func (a *DefaultAnalyzer) Analyze(input []byte) interface{} { if a.CharFilters != nil { for _, cf := range a.CharFilters { input = cf.Filter(input) @@ -97,6 +179,10 @@ func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream { return tokens } +func (a *DefaultAnalyzer) Type() string { + return TokensAnalyzerType +} + var ErrInvalidDateTime = fmt.Errorf("unable to parse datetime with any of the layouts") var ErrInvalidTimestampString = fmt.Errorf("unable to parse timestamp string") diff --git a/document/document.go b/document/document.go index 54fd6d442..6960b424b 100644 --- a/document/document.go +++ b/document/document.go @@ -124,6 +124,15 @@ func (d *Document) VisitFields(visitor index.FieldVisitor) { } } +func (d *Document) VisitFieldsAdv(visitor index.FieldVisitorAdv) { + for _, f := range d.Fields { + stop := visitor(f) + if stop { + break + } + } +} + func (d *Document) VisitComposite(visitor index.CompositeFieldVisitor) { for _, f := range d.CompositeFields { visitor(f) diff --git a/document/field.go b/document/field.go index eb104e2df..0b590a91a 100644 --- a/document/field.go +++ b/document/field.go @@ -29,7 +29,7 @@ type Field interface { // "doc1", then "field" in "doc2". ArrayPositions() []uint64 Options() index.FieldIndexingOptions - Analyze() + Analyze() error Value() []byte // NumPlainTextBytes should return the number of plain text bytes diff --git a/document/field_boolean.go b/document/field_boolean.go index fdf3cc0e5..ab9bd3bc7 100644 --- a/document/field_boolean.go +++ b/document/field_boolean.go @@ -61,7 +61,7 @@ func (b *BooleanField) Options() index.FieldIndexingOptions { return b.options } -func (b *BooleanField) Analyze() { +func (b *BooleanField) Analyze() error { tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -73,6 +73,8 @@ func (b *BooleanField) Analyze() { b.length = len(tokens) b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options) + + return nil } func (b *BooleanField) Value() []byte { diff --git a/document/field_composite.go b/document/field_composite.go index 8c47643f5..302e357ed 100644 --- a/document/field_composite.go +++ b/document/field_composite.go @@ -91,7 +91,8 @@ func (c *CompositeField) Options() index.FieldIndexingOptions { return c.options } -func (c *CompositeField) Analyze() { +func (c *CompositeField) Analyze() error { + return nil } func (c *CompositeField) Value() []byte { diff --git a/document/field_datetime.go b/document/field_datetime.go index efdd26b60..beb2aaa00 100644 --- a/document/field_datetime.go +++ b/document/field_datetime.go @@ -92,7 +92,7 @@ func (n *DateTimeField) splitValue() (numeric.PrefixCoded, string) { return numeric.PrefixCoded(parts[0]), string(parts[1]) } -func (n *DateTimeField) Analyze() { +func (n *DateTimeField) Analyze() error { valueWithoutLayout, _ := n.splitValue() tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ @@ -126,6 +126,8 @@ func (n *DateTimeField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *DateTimeField) Value() []byte { diff --git a/document/field_geopoint.go b/document/field_geopoint.go index 719d18c35..3620af502 100644 --- a/document/field_geopoint.go +++ b/document/field_geopoint.go @@ -76,7 +76,7 @@ func (n *GeoPointField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *GeoPointField) Analyze() { +func (n *GeoPointField) Analyze() error { tokens := make(analysis.TokenStream, 0, 8) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -127,6 +127,8 @@ func (n *GeoPointField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *GeoPointField) Value() []byte { diff --git a/document/field_geoshape.go b/document/field_geoshape.go index a20ff1837..d3a5eb847 100644 --- a/document/field_geoshape.go +++ b/document/field_geoshape.go @@ -77,7 +77,7 @@ func (n *GeoShapeField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *GeoShapeField) Analyze() { +func (n *GeoShapeField) Analyze() error { // compute the bytes representation for the coordinates tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ @@ -104,6 +104,8 @@ func (n *GeoShapeField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *GeoShapeField) Value() []byte { diff --git a/document/field_ip.go b/document/field_ip.go index 1e5be5006..aa9e5e883 100644 --- a/document/field_ip.go +++ b/document/field_ip.go @@ -74,7 +74,7 @@ func (n *IPField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (b *IPField) Analyze() { +func (b *IPField) Analyze() error { tokens := analysis.TokenStream{ &analysis.Token{ @@ -87,6 +87,7 @@ func (b *IPField) Analyze() { } b.length = 1 b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options) + return nil } func (b *IPField) Value() []byte { diff --git a/document/field_numeric.go b/document/field_numeric.go index a54b082b4..7a633b889 100644 --- a/document/field_numeric.go +++ b/document/field_numeric.go @@ -75,7 +75,7 @@ func (n *NumericField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *NumericField) Analyze() { +func (n *NumericField) Analyze() error { tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -108,6 +108,7 @@ func (n *NumericField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + return nil } func (n *NumericField) Value() []byte { diff --git a/document/field_text.go b/document/field_text.go index fddc59d09..9817d2828 100644 --- a/document/field_text.go +++ b/document/field_text.go @@ -74,7 +74,7 @@ func (t *TextField) AnalyzedTokenFrequencies() index.TokenFrequencies { return t.frequencies } -func (t *TextField) Analyze() { +func (t *TextField) Analyze() error { var tokens analysis.TokenStream if t.analyzer != nil { bytesToAnalyze := t.Value() @@ -84,7 +84,11 @@ func (t *TextField) Analyze() { copy(bytesCopied, bytesToAnalyze) bytesToAnalyze = bytesCopied } - tokens = t.analyzer.Analyze(bytesToAnalyze) + var err error + tokens, err = analysis.AnalyzeForTokens(t.analyzer, bytesToAnalyze) + if err != nil { + return err + } } else { tokens = analysis.TokenStream{ &analysis.Token{ @@ -98,6 +102,8 @@ func (t *TextField) Analyze() { } t.length = len(tokens) // number of tokens in this doc field t.frequencies = analysis.TokenFrequency(tokens, t.arrayPositions, t.options) + + return nil } func (t *TextField) Analyzer() analysis.Analyzer { diff --git a/go.mod b/go.mod index 5a922d8d0..638a8fa03 100644 --- a/go.mod +++ b/go.mod @@ -41,3 +41,10 @@ require ( github.com/spf13/pflag v1.0.5 // indirect golang.org/x/sys v0.5.0 // indirect ) + +replace github.com/blevesearch/bleve_index_api => ../bleve_index_api +replace github.com/blevesearch/zapx/v11 => ../zapx11/zapx +replace github.com/blevesearch/zapx/v12 => ../zapx12/zapx +replace github.com/blevesearch/zapx/v13 => ../zapx13/zapx +replace github.com/blevesearch/zapx/v14 => ../zapx14/zapx +replace github.com/blevesearch/zapx/v15 => ../zapx15/zapx diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index f30d795e9..340680d66 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -370,6 +370,12 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { }() resultChan := make(chan index.Document, len(batch.IndexOps)) + // docIDs of failed doc + type failedDoc struct { + id string + err error + } + failedDocs := make(chan *failedDoc, len(batch.IndexOps)) var numUpdates uint64 var numDeletes uint64 @@ -391,32 +397,68 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { if numUpdates > 0 { go func() { + var wg sync.WaitGroup for k := range batch.IndexOps { doc := batch.IndexOps[k] if doc != nil { // put the work on the queue + wg.Add(1) s.analysisQueue.Queue(func() { - analyze(doc, s.setSpatialAnalyzerPlugin) + defer wg.Done() + errAnalyze := analyze(doc, s.setSpatialAnalyzerPlugin) + if errAnalyze != nil { + failedDocs <- &failedDoc{id: k, err: errAnalyze} + return + } resultChan <- doc }) } } + wg.Wait() + close(resultChan) + close(failedDocs) }() + } else { + close(resultChan) + close(failedDocs) } - // wait for analysis result + // # Setup routines to handle analysis results and failed docs + var wg sync.WaitGroup + wg.Add(2) + + // handle analysis result analysisResults := make([]index.Document, int(numUpdates)) - var itemsDeQueued uint64 + analysisResults = analysisResults[:0] var totalAnalysisSize int - for itemsDeQueued < numUpdates { - result := <-resultChan - resultSize := result.Size() - atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) - totalAnalysisSize += resultSize - analysisResults[itemsDeQueued] = result - itemsDeQueued++ - } - close(resultChan) + go func() { + defer wg.Done() + + for result := range resultChan { + resultSize := result.Size() + atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) + totalAnalysisSize += resultSize + analysisResults = append(analysisResults, result) + } + }() + + // handle failed docs + failedResults := make([]*failedDoc, 0, len(batch.IndexOps)) + failedResults = failedResults[:0] + go func() { + defer wg.Done() + for failedDoc := range failedDocs { + failedResults = append(failedResults, failedDoc) + } + }() + + wg.Wait() + + // todo: change the interface of bleve_index_api.Index to return failedDocs + for _, failedDoc := range failedResults { + fmt.Println("failed doc:", failedDoc.id, failedDoc.err) + } + defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize)) atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start))) @@ -658,14 +700,19 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) { } } -func analyze(d index.Document, fn customAnalyzerPluginInitFunc) { - d.VisitFields(func(field index.Field) { +func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { + var analyzeErr error + d.VisitFieldsAdv(func(field index.Field) bool { if field.Options().IsIndexed() { if fn != nil { fn(field) } - field.Analyze() + err := field.Analyze() + if err != nil { + analyzeErr = err + return true // stop visiting further fields + } if d.HasComposite() && field.Name() != "_id" { // see if any of the composite fields need this @@ -674,7 +721,11 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) { }) } } + + return false }) + + return analyzeErr } func (s *Scorch) AddEligibleForRemoval(epoch uint64) { diff --git a/mapping/index.go b/mapping/index.go index 0de4147a4..0a49b7649 100644 --- a/mapping/index.go +++ b/mapping/index.go @@ -423,7 +423,13 @@ func (im *IndexMappingImpl) AnalyzeText(analyzerName string, text []byte) (analy if err != nil { return nil, err } - return analyzer.Analyze(text), nil + + tokens, err := analysis.AnalyzeForTokens(analyzer, text) + if err != nil { + return nil, err + } + + return tokens, nil } // FieldAnalyzer returns the name of the analyzer used on a field. diff --git a/registry/analyzer.go b/registry/analyzer.go index f4753bc1c..fd1d71525 100644 --- a/registry/analyzer.go +++ b/registry/analyzer.go @@ -41,7 +41,25 @@ func NewAnalyzerCache() *AnalyzerCache { } } -func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) { +func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) ( + interface{}, error) { + analyzer, errStatic := StaticAnalyzerBuild(name, config, cache) + if errStatic == nil { + return analyzer, nil + } + + analyzer, errTokensAnalyzerHook := HookTokensAnalyzerBuild(name, config, + cache) + if errTokensAnalyzerHook == nil { + return analyzer, nil + } + + return nil, fmt.Errorf("error building analyzer: %v (static: %v, "+ + "tokens_analyzer_hook: %v)", errTokensAnalyzerHook, errStatic, + errTokensAnalyzerHook) +} + +func StaticAnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) { cons, registered := analyzers[name] if !registered { return nil, fmt.Errorf("no analyzer with name or type '%s' registered", name) diff --git a/registry/hook_tokens_analyzer.go b/registry/hook_tokens_analyzer.go new file mode 100644 index 000000000..1015cf041 --- /dev/null +++ b/registry/hook_tokens_analyzer.go @@ -0,0 +1,153 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "fmt" + "sync" + + "github.com/blevesearch/bleve/v2/analysis" +) + +var ( + ErrHookNotRegistered = fmt.Errorf("hook not registered") + ErrHookAlreadyExists = fmt.Errorf("hook already exists") +) + +// Analysis hook signature +// +// Analysis hook will be invoked at indexing and search analysis time. +// Analysis hook can be specified as the analyzer in the index mapping. +// +// returns: +// `tokenStream`: result of analysis +// `skip` (bool): A way for embedder to tell bleve that hook blew up and +// bleve should skip indexing/searching the value. +// +// * This hook based mechanism is a way for the embedder to register its own +// analysis logic with bleve, without having to modify bleve's code. +// +// * Using this, Embedder can create complex analysis pipelines, +// which may involve custom/niche analysis components (char filters, tokenizers, +// token filters), network calls to other services, or may involve running +// user submitted code. +// +// Note: +// - Bleve won't handle errors/timeouts in the callback. +// - In case, embedder's analysis part of the callback errored (or timedout), +// callback can error and bleve will skip indexing/searching the value. +type TokensAnalyzerHook func([]byte) (analysis.TokenStream, error) + +// Concurrent safe registry of tokens analyzer hooks. +type TokensAnalyzerHooks struct { + m *sync.RWMutex // protect following fields + hooks map[string]TokensAnalyzerHook // Hook Identifier -> Hook +} + +func NewTokensAnalyzerHooks() *TokensAnalyzerHooks { + return &TokensAnalyzerHooks{ + m: &sync.RWMutex{}, + hooks: make(map[string]TokensAnalyzerHook), + } +} + +func RegisterTokensAnalyzerHook(name string, hook TokensAnalyzerHook) error { + // check if hook already exists + tahs.m.RLock() + _, exists := tahs.hooks[name] + if exists { + tahs.m.RUnlock() + return ErrHookAlreadyExists + } + tahs.m.RUnlock() + + // update the registry + tahs.m.Lock() + tahs.hooks[name] = hook + tahs.m.Unlock() + + return nil +} + +// todo: add comment (embedder must make sure that it is not deregistering an active hook) +func DeregisterTokensAnalyzerHook(name string) { + // Early exit if hook doesn't exist + tahs.m.RLock() + _, exists := tahs.hooks[name] + if !exists { + tahs.m.RUnlock() + return + } + tahs.m.RUnlock() + + // update the registry + tahs.m.Lock() + delete(tahs.hooks, name) + tahs.m.Unlock() +} + +func GetTokensAnalyzerHook(name string) (TokensAnalyzerHook, error) { + tahs.m.RLock() + hook, exists := tahs.hooks[name] + tahs.m.RUnlock() + if exists { + return hook, nil + } + + return nil, ErrHookNotRegistered +} + +// ----------------------------------------------------------------------------- +// Hook to Analyzer adapter + +func HookTokensAnalyzerBuild(name string, config map[string]interface{}, + cache *Cache) (interface{}, error) { + _, err := GetTokensAnalyzerHook(name) + if err != nil { + return nil, err + } + + return NewDefaultTokensAnalyzer(name), nil +} + +type DefaultTokensAnalyzer struct { + name string +} + +func NewDefaultTokensAnalyzer(name string) *DefaultTokensAnalyzer { + return &DefaultTokensAnalyzer{ + name: name, + } +} + +func (a *DefaultTokensAnalyzer) Type() string { + return analysis.HookTokensAnalyzerType +} + +func (a *DefaultTokensAnalyzer) Analyze(input []byte) interface{} { + hook, err := GetTokensAnalyzerHook(a.name) + if err != nil { + return analysis.HookTokensAnalyzer{ + Tokens: nil, + Err: err, + } + } + + tokens, err := hook(input) + return analysis.HookTokensAnalyzer{ + Tokens: tokens, + Err: err, + } +} diff --git a/registry/hook_vector_analyzer.go b/registry/hook_vector_analyzer.go new file mode 100644 index 000000000..f2bc1ac89 --- /dev/null +++ b/registry/hook_vector_analyzer.go @@ -0,0 +1,17 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +// todo: make a registry similar to AnalysisHooksRegistry \ No newline at end of file diff --git a/registry/registry.go b/registry/registry.go index 1954d0896..95ea9fe31 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -35,6 +35,7 @@ var tokenizers = make(TokenizerRegistry, 0) var tokenMaps = make(TokenMapRegistry, 0) var tokenFilters = make(TokenFilterRegistry, 0) var analyzers = make(AnalyzerRegistry, 0) +var tahs = NewTokensAnalyzerHooks() var dateTimeParsers = make(DateTimeParserRegistry, 0) type Cache struct { diff --git a/search/query/match.go b/search/query/match.go index 074d11d34..f320bfa96 100644 --- a/search/query/match.go +++ b/search/query/match.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/blevesearch/bleve/v2/analysis" "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/util" @@ -134,7 +135,11 @@ func (q *MatchQuery) Searcher(ctx context.Context, i index.IndexReader, m mappin return nil, fmt.Errorf("no analyzer named '%s' registered", q.Analyzer) } - tokens := analyzer.Analyze([]byte(q.Match)) + tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.Match)) + if err != nil { + fmt.Printf("(*MatchQuery) Searcher: analysis err:%+v\n", err) + } + if len(tokens) > 0 { tqs := make([]Query, len(tokens)) diff --git a/search/query/match_phrase.go b/search/query/match_phrase.go index 63a16a534..45208868d 100644 --- a/search/query/match_phrase.go +++ b/search/query/match_phrase.go @@ -84,7 +84,10 @@ func (q *MatchPhraseQuery) Searcher(ctx context.Context, i index.IndexReader, m return nil, fmt.Errorf("no analyzer named '%s' registered", q.Analyzer) } - tokens := analyzer.Analyze([]byte(q.MatchPhrase)) + tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.MatchPhrase)) + if err != nil { + fmt.Printf("(*MatchQuery) Searcher: analysis err:%+v\n", err) + } if len(tokens) > 0 { phrase := tokenStreamToPhrase(tokens) phraseQuery := NewMultiPhraseQuery(phrase, field) From e15d283aa79137fdd00a11fc7c9e824bb18e1071 Mon Sep 17 00:00:00 2001 From: moshaad7 Date: Tue, 10 Oct 2023 16:01:59 +0530 Subject: [PATCH 2/4] fix go.mod and go fmt --- go.mod | 7 ------- registry/hook_tokens_analyzer.go | 2 +- registry/hook_vector_analyzer.go | 2 +- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 638a8fa03..5a922d8d0 100644 --- a/go.mod +++ b/go.mod @@ -41,10 +41,3 @@ require ( github.com/spf13/pflag v1.0.5 // indirect golang.org/x/sys v0.5.0 // indirect ) - -replace github.com/blevesearch/bleve_index_api => ../bleve_index_api -replace github.com/blevesearch/zapx/v11 => ../zapx11/zapx -replace github.com/blevesearch/zapx/v12 => ../zapx12/zapx -replace github.com/blevesearch/zapx/v13 => ../zapx13/zapx -replace github.com/blevesearch/zapx/v14 => ../zapx14/zapx -replace github.com/blevesearch/zapx/v15 => ../zapx15/zapx diff --git a/registry/hook_tokens_analyzer.go b/registry/hook_tokens_analyzer.go index 1015cf041..e44106fcc 100644 --- a/registry/hook_tokens_analyzer.go +++ b/registry/hook_tokens_analyzer.go @@ -112,7 +112,7 @@ func GetTokensAnalyzerHook(name string) (TokensAnalyzerHook, error) { // ----------------------------------------------------------------------------- // Hook to Analyzer adapter -func HookTokensAnalyzerBuild(name string, config map[string]interface{}, +func HookTokensAnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) { _, err := GetTokensAnalyzerHook(name) if err != nil { diff --git a/registry/hook_vector_analyzer.go b/registry/hook_vector_analyzer.go index f2bc1ac89..c0c306e0e 100644 --- a/registry/hook_vector_analyzer.go +++ b/registry/hook_vector_analyzer.go @@ -14,4 +14,4 @@ package registry -// todo: make a registry similar to AnalysisHooksRegistry \ No newline at end of file +// todo: make a registry similar to AnalysisHooksRegistry From 0dc93eeeb42e71867c5e58cdfe911534473df016 Mon Sep 17 00:00:00 2001 From: moshaad7 Date: Wed, 11 Oct 2023 14:35:56 +0530 Subject: [PATCH 3/4] search will now return err if analysis fails --- search/query/match.go | 2 +- search/query/match_phrase.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/search/query/match.go b/search/query/match.go index f320bfa96..2129124f7 100644 --- a/search/query/match.go +++ b/search/query/match.go @@ -137,7 +137,7 @@ func (q *MatchQuery) Searcher(ctx context.Context, i index.IndexReader, m mappin tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.Match)) if err != nil { - fmt.Printf("(*MatchQuery) Searcher: analysis err:%+v\n", err) + return nil, fmt.Errorf("error analyzing input: %v", err) } if len(tokens) > 0 { diff --git a/search/query/match_phrase.go b/search/query/match_phrase.go index 45208868d..813bf6bd8 100644 --- a/search/query/match_phrase.go +++ b/search/query/match_phrase.go @@ -86,7 +86,7 @@ func (q *MatchPhraseQuery) Searcher(ctx context.Context, i index.IndexReader, m tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.MatchPhrase)) if err != nil { - fmt.Printf("(*MatchQuery) Searcher: analysis err:%+v\n", err) + return nil, fmt.Errorf("error analyzing input: %v", err) } if len(tokens) > 0 { phrase := tokenStreamToPhrase(tokens) From 1cc5a32c1b1aae61ff21be53c6334afe0cc5c6c3 Mon Sep 17 00:00:00 2001 From: moshaad7 Date: Thu, 12 Oct 2023 14:35:00 +0530 Subject: [PATCH 4/4] support indexing doc even if partially analyzed - While analyzing a doc, analysis of few fields can fail. - We want to index the part of doc for which analysis succeeded. --- document/document.go | 9 ---- index/scorch/scorch.go | 96 +++++++++++++++--------------------------- 2 files changed, 35 insertions(+), 70 deletions(-) diff --git a/document/document.go b/document/document.go index 6960b424b..54fd6d442 100644 --- a/document/document.go +++ b/document/document.go @@ -124,15 +124,6 @@ func (d *Document) VisitFields(visitor index.FieldVisitor) { } } -func (d *Document) VisitFieldsAdv(visitor index.FieldVisitorAdv) { - for _, f := range d.Fields { - stop := visitor(f) - if stop { - break - } - } -} - func (d *Document) VisitComposite(visitor index.CompositeFieldVisitor) { for _, f := range d.CompositeFields { visitor(f) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 340680d66..0e800b003 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -17,6 +17,7 @@ package scorch import ( "encoding/json" "fmt" + "log" "os" "sync" "sync/atomic" @@ -77,6 +78,13 @@ type Scorch struct { segPlugin SegmentPlugin spatialPlugin index.SpatialAnalyzerPlugin + + failedAnalysisMutex sync.RWMutex + // note: this can grow unboundedly. + // In future, we may want to limit the size of this map. + // (something like, only keep the last 1000 failed analysis) + // In addition to that, we can store total number of failed analysis so far. + failedAnalysis map[string]map[string]error // docID -> fieldName -> error } // AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process @@ -112,6 +120,8 @@ func NewScorch(storeName string, ineligibleForRemoval: map[string]bool{}, forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, + + failedAnalysis: make(map[string]map[string]error), } forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) @@ -370,12 +380,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { }() resultChan := make(chan index.Document, len(batch.IndexOps)) - // docIDs of failed doc - type failedDoc struct { - id string - err error - } - failedDocs := make(chan *failedDoc, len(batch.IndexOps)) var numUpdates uint64 var numDeletes uint64 @@ -397,68 +401,41 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { if numUpdates > 0 { go func() { - var wg sync.WaitGroup for k := range batch.IndexOps { doc := batch.IndexOps[k] if doc != nil { // put the work on the queue - wg.Add(1) s.analysisQueue.Queue(func() { - defer wg.Done() - errAnalyze := analyze(doc, s.setSpatialAnalyzerPlugin) - if errAnalyze != nil { - failedDocs <- &failedDoc{id: k, err: errAnalyze} - return + fieldsError := analyze(doc, s.setSpatialAnalyzerPlugin) + if len(fieldsError) > 0 { + s.failedAnalysisMutex.Lock() + s.failedAnalysis[doc.ID()] = fieldsError + s.failedAnalysisMutex.Unlock() + + log.Printf("AnalysisReport: docID: %s, fieldsError: %v", + doc.ID(), fieldsError) } + resultChan <- doc }) } } - wg.Wait() - close(resultChan) - close(failedDocs) }() - } else { - close(resultChan) - close(failedDocs) } - // # Setup routines to handle analysis results and failed docs - var wg sync.WaitGroup - wg.Add(2) - - // handle analysis result + // wait for analysis result analysisResults := make([]index.Document, int(numUpdates)) - analysisResults = analysisResults[:0] + var itemsDeQueued uint64 var totalAnalysisSize int - go func() { - defer wg.Done() - - for result := range resultChan { - resultSize := result.Size() - atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) - totalAnalysisSize += resultSize - analysisResults = append(analysisResults, result) - } - }() - - // handle failed docs - failedResults := make([]*failedDoc, 0, len(batch.IndexOps)) - failedResults = failedResults[:0] - go func() { - defer wg.Done() - for failedDoc := range failedDocs { - failedResults = append(failedResults, failedDoc) - } - }() - - wg.Wait() - - // todo: change the interface of bleve_index_api.Index to return failedDocs - for _, failedDoc := range failedResults { - fmt.Println("failed doc:", failedDoc.id, failedDoc.err) - } - + for itemsDeQueued < numUpdates { + result := <-resultChan + resultSize := result.Size() + atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) + totalAnalysisSize += resultSize + analysisResults[itemsDeQueued] = result + itemsDeQueued++ + } + close(resultChan) defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize)) atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start))) @@ -700,9 +677,9 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) { } } -func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { - var analyzeErr error - d.VisitFieldsAdv(func(field index.Field) bool { +func analyze(d index.Document, fn customAnalyzerPluginInitFunc) map[string]error { + rv := make(map[string]error) + d.VisitFields(func(field index.Field) { if field.Options().IsIndexed() { if fn != nil { fn(field) @@ -710,8 +687,7 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { err := field.Analyze() if err != nil { - analyzeErr = err - return true // stop visiting further fields + rv[field.Name()] = err } if d.HasComposite() && field.Name() != "_id" { @@ -721,11 +697,9 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { }) } } - - return false }) - return analyzeErr + return rv } func (s *Scorch) AddEligibleForRemoval(epoch uint64) {