Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ type IndexCopyable interface {
CopyTo(d index.Directory) error
}

// IndexFileCopyable is an index supporting the transfer of a single file between
// two indexes
type IndexFileCopyable interface {
UpdateFileInBolt(key []byte, value []byte) error
CopyFile(file string, d index.IndexDirectory) error
}

// FileSystemDirectory is the default implementation for the
// index.Directory interface.
type FileSystemDirectory string
Expand Down
5 changes: 5 additions & 0 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,11 @@ func copyToDirectory(srcPath string, d index.Directory) (int64, error) {
return 0, fmt.Errorf("GetWriter err: %v", err)
}

// skip
if dest == nil {
return 0, nil
}

sourceFileStat, err := os.Stat(srcPath)
if err != nil {
return 0, err
Expand Down
66 changes: 66 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scorch
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -78,6 +79,8 @@ type Scorch struct {
persisterNotifier chan *epochWatcher
rootBolt *bolt.DB
asyncTasks sync.WaitGroup
// not a real searchable segment
centroidIndex *SegmentSnapshot

trainer trainer

Expand Down Expand Up @@ -112,6 +115,10 @@ type trainer interface {
loadTrainedData(*bolt.Bucket) error
// to fetch the internal data from the component
getInternal(key []byte) ([]byte, error)

// file transfer operations
copyFileLOCKED(file string, d index.IndexDirectory) error
updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error
}

type ScorchErrorType string
Expand Down Expand Up @@ -1041,6 +1048,65 @@ func (s *Scorch) CopyReader() index.CopyReader {
return rv
}

func (s *Scorch) UpdateFileInBolt(key []byte, value []byte) error {
tx, err := s.rootBolt.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
}
}()

snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket)
if err != nil {
return err
}

// currently this is specific to centroid index file update
err = s.trainer.updateBolt(snapshotsBucket, key, value)
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

return s.rootBolt.Sync()
}

// CopyFile copies a specific file to a destination directory which has an access to a bleve index
// doing a io.Copy() isn't enough because the file needs to be tracked in bolt file as well
func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error {
s.rootLock.Lock()
defer s.rootLock.Unlock()

dest, err := d.GetWriter(filepath.Join("store", file))
if err != nil {
return err
}

source, err := os.Open(filepath.Join(s.path, file))
if err != nil {
return err
}

defer source.Close()
defer dest.Close()
_, err = io.Copy(dest, source)
if err != nil {
return err
}

// this code is currently specific to copying trained data but is future proofed for other files
// to be updated in the dest's bolt
err = s.trainer.copyFileLOCKED(file, d)
return err
}

// external API to fire a scorch event (EventKindIndexStart) externally from bleve
func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
Expand Down
8 changes: 8 additions & 0 deletions index/scorch/train_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ func (t *noopTrainer) loadTrainedData(bucket *bolt.Bucket) error {
func (t *noopTrainer) getInternal(key []byte) ([]byte, error) {
return nil, nil
}

func (t *noopTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error {
return nil
}

func (t *noopTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error {
return nil
}
44 changes: 44 additions & 0 deletions index/scorch/train_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package scorch

import (
"bytes"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -281,3 +282,46 @@ func (t *vectorTrainer) getCentroidIndex(field string) (*faiss.IndexImpl, error)
}
return coarseQuantizer, nil
}

func (t *vectorTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error {
if strings.HasSuffix(file, index.CentroidIndexFileName) {
// centroid index file - this is outside the snapshots domain so the bolt update is different
err := d.UpdateFileInBolt(util.BoltTrainerKey, []byte(file))
if err != nil {
return fmt.Errorf("error updating dest index bolt: %w", err)
}
}

return nil
}

func (t *vectorTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error {
if bytes.Equal(key, util.BoltTrainerKey) {
trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey)
if err != nil {
return err
}
if trainerBucket == nil {
return fmt.Errorf("trainer bucket not found")
}

// guard against duplicate updates
existingValue := trainerBucket.Get(util.BoltPathKey)
if existingValue != nil {
return fmt.Errorf("key already exists %v %v", t.parent.path, string(existingValue))
}

err = trainerBucket.Put(util.BoltPathKey, value)
if err != nil {
return err
}

// update the centroid index pointer
t.centroidIndex, err = t.parent.loadSegment(trainerBucket)
if err != nil {
return err
}
}

return nil
}
1 change: 1 addition & 0 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (i *indexAliasImpl) IndexSynonym(id string, collection string, definition *
func (i *indexAliasImpl) Train(batch *Batch) error {
i.mutex.RLock()
defer i.mutex.RUnlock()

if !i.open {
return ErrorIndexClosed
}
Expand Down
32 changes: 32 additions & 0 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,3 +1593,35 @@ func (i *indexImpl) buildTopNCollector(ctx context.Context, req *SearchRequest,
}
return newCollector(), nil
}

func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) {
i.mutex.RLock()
defer i.mutex.RUnlock()

if !i.open {
return ErrorIndexClosed
}

copyIndex, ok := i.i.(index.IndexFileCopyable)
if !ok {
return fmt.Errorf("index implementation does not support copy reader")
}

return copyIndex.CopyFile(file, d)
}

func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error {
i.mutex.RLock()
defer i.mutex.RUnlock()

if !i.open {
return ErrorIndexClosed
}

copyIndex, ok := i.i.(index.IndexFileCopyable)
if !ok {
return fmt.Errorf("index implementation does not support file copy")
}

return copyIndex.UpdateFileInBolt(key, value)
}
Loading