diff --git a/index.go b/index.go index c083787c4..41b1559c0 100644 --- a/index.go +++ b/index.go @@ -353,6 +353,13 @@ type IndexCopyable interface { CopyTo(d index.Directory) error } +// IndexFileCopyable is an index supporting the transfer of a single file between +// two indexes +type IndexFileCopyable interface { + UpdateFileInBolt(key []byte, value []byte) error + CopyFile(file string, d index.IndexDirectory) error +} + // FileSystemDirectory is the default implementation for the // index.Directory interface. type FileSystemDirectory string diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 35136fcd3..6f33ed41a 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -574,6 +574,11 @@ func copyToDirectory(srcPath string, d index.Directory) (int64, error) { return 0, fmt.Errorf("GetWriter err: %v", err) } + // skip + if dest == nil { + return 0, nil + } + sourceFileStat, err := os.Stat(srcPath) if err != nil { return 0, err diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 3566baf03..9c3a9ae1d 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -17,6 +17,7 @@ package scorch import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sync" @@ -78,6 +79,8 @@ type Scorch struct { persisterNotifier chan *epochWatcher rootBolt *bolt.DB asyncTasks sync.WaitGroup + // not a real searchable segment + centroidIndex *SegmentSnapshot trainer trainer @@ -112,6 +115,10 @@ type trainer interface { loadTrainedData(*bolt.Bucket) error // to fetch the internal data from the component getInternal(key []byte) ([]byte, error) + + // file transfer operations + copyFileLOCKED(file string, d index.IndexDirectory) error + updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error } type ScorchErrorType string @@ -1041,6 +1048,65 @@ func (s *Scorch) CopyReader() index.CopyReader { return rv } +func (s *Scorch) UpdateFileInBolt(key []byte, value []byte) error { + tx, err := s.rootBolt.Begin(true) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) + if err != nil { + return err + } + + // currently this is specific to centroid index file update + err = s.trainer.updateBolt(snapshotsBucket, key, value) + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + return s.rootBolt.Sync() +} + +// CopyFile copies a specific file to a destination directory which has an access to a bleve index +// doing a io.Copy() isn't enough because the file needs to be tracked in bolt file as well +func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error { + s.rootLock.Lock() + defer s.rootLock.Unlock() + + dest, err := d.GetWriter(filepath.Join("store", file)) + if err != nil { + return err + } + + source, err := os.Open(filepath.Join(s.path, file)) + if err != nil { + return err + } + + defer source.Close() + defer dest.Close() + _, err = io.Copy(dest, source) + if err != nil { + return err + } + + // this code is currently specific to copying trained data but is future proofed for other files + // to be updated in the dest's bolt + err = s.trainer.copyFileLOCKED(file, d) + return err +} + // external API to fire a scorch event (EventKindIndexStart) externally from bleve func (s *Scorch) FireIndexEvent() { s.fireEvent(EventKindIndexStart, 0) diff --git a/index/scorch/train_noop.go b/index/scorch/train_noop.go index 49b565e65..3ae5ad155 100644 --- a/index/scorch/train_noop.go +++ b/index/scorch/train_noop.go @@ -45,3 +45,11 @@ func (t *noopTrainer) loadTrainedData(bucket *bolt.Bucket) error { func (t *noopTrainer) getInternal(key []byte) ([]byte, error) { return nil, nil } + +func (t *noopTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error { + return nil +} + +func (t *noopTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error { + return nil +} diff --git a/index/scorch/train_vector.go b/index/scorch/train_vector.go index e764b68c1..f8f6ec08c 100644 --- a/index/scorch/train_vector.go +++ b/index/scorch/train_vector.go @@ -18,6 +18,7 @@ package scorch import ( + "bytes" "fmt" "os" "path/filepath" @@ -281,3 +282,46 @@ func (t *vectorTrainer) getCentroidIndex(field string) (*faiss.IndexImpl, error) } return coarseQuantizer, nil } + +func (t *vectorTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error { + if strings.HasSuffix(file, index.CentroidIndexFileName) { + // centroid index file - this is outside the snapshots domain so the bolt update is different + err := d.UpdateFileInBolt(util.BoltTrainerKey, []byte(file)) + if err != nil { + return fmt.Errorf("error updating dest index bolt: %w", err) + } + } + + return nil +} + +func (t *vectorTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error { + if bytes.Equal(key, util.BoltTrainerKey) { + trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey) + if err != nil { + return err + } + if trainerBucket == nil { + return fmt.Errorf("trainer bucket not found") + } + + // guard against duplicate updates + existingValue := trainerBucket.Get(util.BoltPathKey) + if existingValue != nil { + return fmt.Errorf("key already exists %v %v", t.parent.path, string(existingValue)) + } + + err = trainerBucket.Put(util.BoltPathKey, value) + if err != nil { + return err + } + + // update the centroid index pointer + t.centroidIndex, err = t.parent.loadSegment(trainerBucket) + if err != nil { + return err + } + } + + return nil +} diff --git a/index_alias_impl.go b/index_alias_impl.go index 2839752e2..ee7fbf2a6 100644 --- a/index_alias_impl.go +++ b/index_alias_impl.go @@ -106,6 +106,7 @@ func (i *indexAliasImpl) IndexSynonym(id string, collection string, definition * func (i *indexAliasImpl) Train(batch *Batch) error { i.mutex.RLock() defer i.mutex.RUnlock() + if !i.open { return ErrorIndexClosed } diff --git a/index_impl.go b/index_impl.go index ffe1c21a8..47026ffa6 100644 --- a/index_impl.go +++ b/index_impl.go @@ -1593,3 +1593,35 @@ func (i *indexImpl) buildTopNCollector(ctx context.Context, req *SearchRequest, } return newCollector(), nil } + +func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) { + i.mutex.RLock() + defer i.mutex.RUnlock() + + if !i.open { + return ErrorIndexClosed + } + + copyIndex, ok := i.i.(index.IndexFileCopyable) + if !ok { + return fmt.Errorf("index implementation does not support copy reader") + } + + return copyIndex.CopyFile(file, d) +} + +func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error { + i.mutex.RLock() + defer i.mutex.RUnlock() + + if !i.open { + return ErrorIndexClosed + } + + copyIndex, ok := i.i.(index.IndexFileCopyable) + if !ok { + return fmt.Errorf("index implementation does not support file copy") + } + + return copyIndex.UpdateFileInBolt(key, value) +}