diff --git a/analysis/type.go b/analysis/type.go index e3a7f201b..cb2725dd6 100644 --- a/analysis/type.go +++ b/analysis/type.go @@ -73,7 +73,89 @@ type TokenFilter interface { } type Analyzer interface { - Analyze([]byte) TokenStream + Type() string + // return value of this method depends on the type of analyzer + Analyze([]byte) interface{} +} + +const ( + TokensAnalyzerType = "token" + HookTokensAnalyzerType = "hook_token" + VectorAnalyzerType = "vector" + HookVectorAnalyzerType = "hook_vector" +) + +// # Analyzer Analyze() return type +type TokensAnalyzer struct { + Tokens TokenStream +} +type HookTokensAnalyzer struct { + Tokens TokenStream + Err error +} +type VectorAnalyzer []float64 +type HookVectorAnalyzer struct { + Vector []float64 + Err error +} + +func AnalyzeForTokens(analyzer Analyzer, input []byte) (TokenStream, error) { + analyzerType := analyzer.Type() + if analyzerType != TokensAnalyzerType && + analyzerType != HookTokensAnalyzerType { + return nil, fmt.Errorf("cannot analyze text with analyzer of type: %s", + analyzerType) + } + + // analyze ouput + analyzedOp := analyzer.Analyze(input) + err := CheckAnalyzed(analyzedOp, analyzer) + if err != nil { + return nil, fmt.Errorf("incompatible analysis result for analyzer "+ + "of type: %s, err:%+v", analyzerType, err) + } + + if analyzerType == TokensAnalyzerType { + op := analyzedOp.(TokensAnalyzer) + return op.Tokens, nil + } + + // analyzerType == analysis.HookTokensAnalyzerType + + op := analyzedOp.(HookTokensAnalyzer) + if op.Err != nil { + return nil, fmt.Errorf("analyzer hook failed, err:%+v", op.Err) + } + + return op.Tokens, nil +} + +func CheckAnalyzed(value interface{}, analyzer Analyzer) error { + switch analyzer.Type() { + case TokensAnalyzerType: + _, ok := value.(TokensAnalyzer) + if !ok { + return fmt.Errorf("expected TokensAnalyzer, got %T", value) + } + case HookTokensAnalyzerType: + _, ok := value.(HookTokensAnalyzer) + if !ok { + return fmt.Errorf("expected HookTokensAnalyzer, got %T", value) + } + case VectorAnalyzerType: + _, ok := value.(VectorAnalyzer) + if !ok { + return fmt.Errorf("expected VectorAnalyzer, got %T", value) + } + case HookVectorAnalyzerType: + _, ok := value.(HookVectorAnalyzer) + if !ok { + return fmt.Errorf("expected HookVectorAnalyzer, got %T", value) + } + default: + return fmt.Errorf("unknown analyzer type %s", analyzer.Type()) + } + return nil } type DefaultAnalyzer struct { @@ -82,7 +164,7 @@ type DefaultAnalyzer struct { TokenFilters []TokenFilter } -func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream { +func (a *DefaultAnalyzer) Analyze(input []byte) interface{} { if a.CharFilters != nil { for _, cf := range a.CharFilters { input = cf.Filter(input) @@ -97,6 +179,10 @@ func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream { return tokens } +func (a *DefaultAnalyzer) Type() string { + return TokensAnalyzerType +} + var ErrInvalidDateTime = fmt.Errorf("unable to parse datetime with any of the layouts") var ErrInvalidTimestampString = fmt.Errorf("unable to parse timestamp string") diff --git a/document/field.go b/document/field.go index eb104e2df..0b590a91a 100644 --- a/document/field.go +++ b/document/field.go @@ -29,7 +29,7 @@ type Field interface { // "doc1", then "field" in "doc2". ArrayPositions() []uint64 Options() index.FieldIndexingOptions - Analyze() + Analyze() error Value() []byte // NumPlainTextBytes should return the number of plain text bytes diff --git a/document/field_boolean.go b/document/field_boolean.go index fdf3cc0e5..ab9bd3bc7 100644 --- a/document/field_boolean.go +++ b/document/field_boolean.go @@ -61,7 +61,7 @@ func (b *BooleanField) Options() index.FieldIndexingOptions { return b.options } -func (b *BooleanField) Analyze() { +func (b *BooleanField) Analyze() error { tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -73,6 +73,8 @@ func (b *BooleanField) Analyze() { b.length = len(tokens) b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options) + + return nil } func (b *BooleanField) Value() []byte { diff --git a/document/field_composite.go b/document/field_composite.go index 8c47643f5..302e357ed 100644 --- a/document/field_composite.go +++ b/document/field_composite.go @@ -91,7 +91,8 @@ func (c *CompositeField) Options() index.FieldIndexingOptions { return c.options } -func (c *CompositeField) Analyze() { +func (c *CompositeField) Analyze() error { + return nil } func (c *CompositeField) Value() []byte { diff --git a/document/field_datetime.go b/document/field_datetime.go index efdd26b60..beb2aaa00 100644 --- a/document/field_datetime.go +++ b/document/field_datetime.go @@ -92,7 +92,7 @@ func (n *DateTimeField) splitValue() (numeric.PrefixCoded, string) { return numeric.PrefixCoded(parts[0]), string(parts[1]) } -func (n *DateTimeField) Analyze() { +func (n *DateTimeField) Analyze() error { valueWithoutLayout, _ := n.splitValue() tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ @@ -126,6 +126,8 @@ func (n *DateTimeField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *DateTimeField) Value() []byte { diff --git a/document/field_geopoint.go b/document/field_geopoint.go index 719d18c35..3620af502 100644 --- a/document/field_geopoint.go +++ b/document/field_geopoint.go @@ -76,7 +76,7 @@ func (n *GeoPointField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *GeoPointField) Analyze() { +func (n *GeoPointField) Analyze() error { tokens := make(analysis.TokenStream, 0, 8) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -127,6 +127,8 @@ func (n *GeoPointField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *GeoPointField) Value() []byte { diff --git a/document/field_geoshape.go b/document/field_geoshape.go index a20ff1837..d3a5eb847 100644 --- a/document/field_geoshape.go +++ b/document/field_geoshape.go @@ -77,7 +77,7 @@ func (n *GeoShapeField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *GeoShapeField) Analyze() { +func (n *GeoShapeField) Analyze() error { // compute the bytes representation for the coordinates tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ @@ -104,6 +104,8 @@ func (n *GeoShapeField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + + return nil } func (n *GeoShapeField) Value() []byte { diff --git a/document/field_ip.go b/document/field_ip.go index 1e5be5006..aa9e5e883 100644 --- a/document/field_ip.go +++ b/document/field_ip.go @@ -74,7 +74,7 @@ func (n *IPField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (b *IPField) Analyze() { +func (b *IPField) Analyze() error { tokens := analysis.TokenStream{ &analysis.Token{ @@ -87,6 +87,7 @@ func (b *IPField) Analyze() { } b.length = 1 b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options) + return nil } func (b *IPField) Value() []byte { diff --git a/document/field_numeric.go b/document/field_numeric.go index a54b082b4..7a633b889 100644 --- a/document/field_numeric.go +++ b/document/field_numeric.go @@ -75,7 +75,7 @@ func (n *NumericField) AnalyzedTokenFrequencies() index.TokenFrequencies { return n.frequencies } -func (n *NumericField) Analyze() { +func (n *NumericField) Analyze() error { tokens := make(analysis.TokenStream, 0) tokens = append(tokens, &analysis.Token{ Start: 0, @@ -108,6 +108,7 @@ func (n *NumericField) Analyze() { n.length = len(tokens) n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options) + return nil } func (n *NumericField) Value() []byte { diff --git a/document/field_text.go b/document/field_text.go index fddc59d09..9817d2828 100644 --- a/document/field_text.go +++ b/document/field_text.go @@ -74,7 +74,7 @@ func (t *TextField) AnalyzedTokenFrequencies() index.TokenFrequencies { return t.frequencies } -func (t *TextField) Analyze() { +func (t *TextField) Analyze() error { var tokens analysis.TokenStream if t.analyzer != nil { bytesToAnalyze := t.Value() @@ -84,7 +84,11 @@ func (t *TextField) Analyze() { copy(bytesCopied, bytesToAnalyze) bytesToAnalyze = bytesCopied } - tokens = t.analyzer.Analyze(bytesToAnalyze) + var err error + tokens, err = analysis.AnalyzeForTokens(t.analyzer, bytesToAnalyze) + if err != nil { + return err + } } else { tokens = analysis.TokenStream{ &analysis.Token{ @@ -98,6 +102,8 @@ func (t *TextField) Analyze() { } t.length = len(tokens) // number of tokens in this doc field t.frequencies = analysis.TokenFrequency(tokens, t.arrayPositions, t.options) + + return nil } func (t *TextField) Analyzer() analysis.Analyzer { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index f30d795e9..0e800b003 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -17,6 +17,7 @@ package scorch import ( "encoding/json" "fmt" + "log" "os" "sync" "sync/atomic" @@ -77,6 +78,13 @@ type Scorch struct { segPlugin SegmentPlugin spatialPlugin index.SpatialAnalyzerPlugin + + failedAnalysisMutex sync.RWMutex + // note: this can grow unboundedly. + // In future, we may want to limit the size of this map. + // (something like, only keep the last 1000 failed analysis) + // In addition to that, we can store total number of failed analysis so far. + failedAnalysis map[string]map[string]error // docID -> fieldName -> error } // AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process @@ -112,6 +120,8 @@ func NewScorch(storeName string, ineligibleForRemoval: map[string]bool{}, forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, + + failedAnalysis: make(map[string]map[string]error), } forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) @@ -396,7 +406,16 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { if doc != nil { // put the work on the queue s.analysisQueue.Queue(func() { - analyze(doc, s.setSpatialAnalyzerPlugin) + fieldsError := analyze(doc, s.setSpatialAnalyzerPlugin) + if len(fieldsError) > 0 { + s.failedAnalysisMutex.Lock() + s.failedAnalysis[doc.ID()] = fieldsError + s.failedAnalysisMutex.Unlock() + + log.Printf("AnalysisReport: docID: %s, fieldsError: %v", + doc.ID(), fieldsError) + } + resultChan <- doc }) } @@ -658,14 +677,18 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) { } } -func analyze(d index.Document, fn customAnalyzerPluginInitFunc) { +func analyze(d index.Document, fn customAnalyzerPluginInitFunc) map[string]error { + rv := make(map[string]error) d.VisitFields(func(field index.Field) { if field.Options().IsIndexed() { if fn != nil { fn(field) } - field.Analyze() + err := field.Analyze() + if err != nil { + rv[field.Name()] = err + } if d.HasComposite() && field.Name() != "_id" { // see if any of the composite fields need this @@ -675,6 +698,8 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) { } } }) + + return rv } func (s *Scorch) AddEligibleForRemoval(epoch uint64) { diff --git a/mapping/index.go b/mapping/index.go index 0de4147a4..0a49b7649 100644 --- a/mapping/index.go +++ b/mapping/index.go @@ -423,7 +423,13 @@ func (im *IndexMappingImpl) AnalyzeText(analyzerName string, text []byte) (analy if err != nil { return nil, err } - return analyzer.Analyze(text), nil + + tokens, err := analysis.AnalyzeForTokens(analyzer, text) + if err != nil { + return nil, err + } + + return tokens, nil } // FieldAnalyzer returns the name of the analyzer used on a field. diff --git a/registry/analyzer.go b/registry/analyzer.go index f4753bc1c..fd1d71525 100644 --- a/registry/analyzer.go +++ b/registry/analyzer.go @@ -41,7 +41,25 @@ func NewAnalyzerCache() *AnalyzerCache { } } -func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) { +func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) ( + interface{}, error) { + analyzer, errStatic := StaticAnalyzerBuild(name, config, cache) + if errStatic == nil { + return analyzer, nil + } + + analyzer, errTokensAnalyzerHook := HookTokensAnalyzerBuild(name, config, + cache) + if errTokensAnalyzerHook == nil { + return analyzer, nil + } + + return nil, fmt.Errorf("error building analyzer: %v (static: %v, "+ + "tokens_analyzer_hook: %v)", errTokensAnalyzerHook, errStatic, + errTokensAnalyzerHook) +} + +func StaticAnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) { cons, registered := analyzers[name] if !registered { return nil, fmt.Errorf("no analyzer with name or type '%s' registered", name) diff --git a/registry/hook_tokens_analyzer.go b/registry/hook_tokens_analyzer.go new file mode 100644 index 000000000..e44106fcc --- /dev/null +++ b/registry/hook_tokens_analyzer.go @@ -0,0 +1,153 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "fmt" + "sync" + + "github.com/blevesearch/bleve/v2/analysis" +) + +var ( + ErrHookNotRegistered = fmt.Errorf("hook not registered") + ErrHookAlreadyExists = fmt.Errorf("hook already exists") +) + +// Analysis hook signature +// +// Analysis hook will be invoked at indexing and search analysis time. +// Analysis hook can be specified as the analyzer in the index mapping. +// +// returns: +// `tokenStream`: result of analysis +// `skip` (bool): A way for embedder to tell bleve that hook blew up and +// bleve should skip indexing/searching the value. +// +// * This hook based mechanism is a way for the embedder to register its own +// analysis logic with bleve, without having to modify bleve's code. +// +// * Using this, Embedder can create complex analysis pipelines, +// which may involve custom/niche analysis components (char filters, tokenizers, +// token filters), network calls to other services, or may involve running +// user submitted code. +// +// Note: +// - Bleve won't handle errors/timeouts in the callback. +// - In case, embedder's analysis part of the callback errored (or timedout), +// callback can error and bleve will skip indexing/searching the value. +type TokensAnalyzerHook func([]byte) (analysis.TokenStream, error) + +// Concurrent safe registry of tokens analyzer hooks. +type TokensAnalyzerHooks struct { + m *sync.RWMutex // protect following fields + hooks map[string]TokensAnalyzerHook // Hook Identifier -> Hook +} + +func NewTokensAnalyzerHooks() *TokensAnalyzerHooks { + return &TokensAnalyzerHooks{ + m: &sync.RWMutex{}, + hooks: make(map[string]TokensAnalyzerHook), + } +} + +func RegisterTokensAnalyzerHook(name string, hook TokensAnalyzerHook) error { + // check if hook already exists + tahs.m.RLock() + _, exists := tahs.hooks[name] + if exists { + tahs.m.RUnlock() + return ErrHookAlreadyExists + } + tahs.m.RUnlock() + + // update the registry + tahs.m.Lock() + tahs.hooks[name] = hook + tahs.m.Unlock() + + return nil +} + +// todo: add comment (embedder must make sure that it is not deregistering an active hook) +func DeregisterTokensAnalyzerHook(name string) { + // Early exit if hook doesn't exist + tahs.m.RLock() + _, exists := tahs.hooks[name] + if !exists { + tahs.m.RUnlock() + return + } + tahs.m.RUnlock() + + // update the registry + tahs.m.Lock() + delete(tahs.hooks, name) + tahs.m.Unlock() +} + +func GetTokensAnalyzerHook(name string) (TokensAnalyzerHook, error) { + tahs.m.RLock() + hook, exists := tahs.hooks[name] + tahs.m.RUnlock() + if exists { + return hook, nil + } + + return nil, ErrHookNotRegistered +} + +// ----------------------------------------------------------------------------- +// Hook to Analyzer adapter + +func HookTokensAnalyzerBuild(name string, config map[string]interface{}, + cache *Cache) (interface{}, error) { + _, err := GetTokensAnalyzerHook(name) + if err != nil { + return nil, err + } + + return NewDefaultTokensAnalyzer(name), nil +} + +type DefaultTokensAnalyzer struct { + name string +} + +func NewDefaultTokensAnalyzer(name string) *DefaultTokensAnalyzer { + return &DefaultTokensAnalyzer{ + name: name, + } +} + +func (a *DefaultTokensAnalyzer) Type() string { + return analysis.HookTokensAnalyzerType +} + +func (a *DefaultTokensAnalyzer) Analyze(input []byte) interface{} { + hook, err := GetTokensAnalyzerHook(a.name) + if err != nil { + return analysis.HookTokensAnalyzer{ + Tokens: nil, + Err: err, + } + } + + tokens, err := hook(input) + return analysis.HookTokensAnalyzer{ + Tokens: tokens, + Err: err, + } +} diff --git a/registry/hook_vector_analyzer.go b/registry/hook_vector_analyzer.go new file mode 100644 index 000000000..c0c306e0e --- /dev/null +++ b/registry/hook_vector_analyzer.go @@ -0,0 +1,17 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +// todo: make a registry similar to AnalysisHooksRegistry diff --git a/registry/registry.go b/registry/registry.go index 1954d0896..95ea9fe31 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -35,6 +35,7 @@ var tokenizers = make(TokenizerRegistry, 0) var tokenMaps = make(TokenMapRegistry, 0) var tokenFilters = make(TokenFilterRegistry, 0) var analyzers = make(AnalyzerRegistry, 0) +var tahs = NewTokensAnalyzerHooks() var dateTimeParsers = make(DateTimeParserRegistry, 0) type Cache struct { diff --git a/search/query/match.go b/search/query/match.go index 074d11d34..2129124f7 100644 --- a/search/query/match.go +++ b/search/query/match.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/blevesearch/bleve/v2/analysis" "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/util" @@ -134,7 +135,11 @@ func (q *MatchQuery) Searcher(ctx context.Context, i index.IndexReader, m mappin return nil, fmt.Errorf("no analyzer named '%s' registered", q.Analyzer) } - tokens := analyzer.Analyze([]byte(q.Match)) + tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.Match)) + if err != nil { + return nil, fmt.Errorf("error analyzing input: %v", err) + } + if len(tokens) > 0 { tqs := make([]Query, len(tokens)) diff --git a/search/query/match_phrase.go b/search/query/match_phrase.go index 63a16a534..813bf6bd8 100644 --- a/search/query/match_phrase.go +++ b/search/query/match_phrase.go @@ -84,7 +84,10 @@ func (q *MatchPhraseQuery) Searcher(ctx context.Context, i index.IndexReader, m return nil, fmt.Errorf("no analyzer named '%s' registered", q.Analyzer) } - tokens := analyzer.Analyze([]byte(q.MatchPhrase)) + tokens, err := analysis.AnalyzeForTokens(analyzer, []byte(q.MatchPhrase)) + if err != nil { + return nil, fmt.Errorf("error analyzing input: %v", err) + } if len(tokens) > 0 { phrase := tokenStreamToPhrase(tokens) phraseQuery := NewMultiPhraseQuery(phrase, field)