From a92b46c3540c0aedb828580b2ef9c433b0bdaaaf Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Tue, 17 Jun 2025 12:12:16 +0530 Subject: [PATCH 1/9] use callbacks to collect and use train data while merging --- index/scorch/introducer.go | 4 ++++ index/scorch/merge.go | 19 +++++++++++++++++++ index/scorch/snapshot_index.go | 17 ++++++++++------- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index ef26532b0..7965cc5c3 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -360,6 +360,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { creator: "introduceMerge", } + if len(nextMerge.trainData) > 0 { + newSnapshot.trainData = append(root.trainData, nextMerge.trainData...) + } + var running, docsToPersistCount, memSegments, fileSegments uint64 var droppedSegmentFiles []string newSegmentDeleted := make([]*roaring.Bitmap, len(nextMerge.new)) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index bca9bbb81..31cac6e61 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -17,6 +17,7 @@ package scorch import ( "context" "fmt" + "math" "os" "strings" "sync" @@ -481,6 +482,7 @@ type mergedSegmentHistory struct { type segmentMerge struct { id []uint64 new []segment.Segment + trainData [][]float32 mergedSegHistory map[uint64]*mergedSegmentHistory notifyCh chan *mergeTaskIntroStatus mmaped uint32 @@ -527,6 +529,22 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, var em sync.Mutex var errs []error + var trainingSample [][]float32 + collectTrainData := func(segTrainData [][]float32) { + trainingSample = append(trainingSample, segTrainData...) + } + + numDocs, err := snapshot.DocCount() + if err != nil { + return nil, nil, err + } + trainingSampleSize := math.Ceil(4 * math.Sqrt(float64(numDocs)) * 39) + + // collect train data only if needed + if len(snapshot.trainData) < int(trainingSampleSize) { + s.segmentConfig["collectTrainDataCallback"] = collectTrainData + } + s.segmentConfig["trainData"] = snapshot.trainData // deploy the workers to merge and flush the batches of segments concurrently // and create a new file segment for i := 0; i < numFlushes; i++ { @@ -601,6 +619,7 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, mergedSegHistory: make(map[uint64]*mergedSegmentHistory, numSegments), notifyCh: make(chan *mergeTaskIntroStatus), newCount: newMergedCount, + trainData: trainingSample, } // create a history map which maps the old in-memory segments with the specific diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 688f9d903..f36625ae1 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -66,13 +66,16 @@ func init() { } type IndexSnapshot struct { - parent *Scorch - segment []*SegmentSnapshot - offsets []uint64 - internal map[string][]byte - epoch uint64 - size uint64 - creator string + parent *Scorch + + // POC: trainData is ephemeral and read-only just like []*SegmentSnapshot + trainData [][]float32 + segment []*SegmentSnapshot + offsets []uint64 + internal map[string][]byte + epoch uint64 + size uint64 + creator string m sync.Mutex // Protects the fields that follow. refs int64 From 7c0f5a08fb503a8c24d04e373dc63d0e85009dbe Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Tue, 8 Jul 2025 11:46:06 +0530 Subject: [PATCH 2/9] collect training sample on the file path as well --- index/scorch/merge.go | 17 ++++++++++++----- index/scorch/persister.go | 4 ++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 31cac6e61..254b60bfa 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -364,6 +364,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, var seg segment.Segment var filename string + var trainingSample []float32 if len(segmentsToMerge) > 0 { filename = zapFileName(newSegmentID) s.markIneligibleForRemoval(filename) @@ -418,6 +419,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, newCount: seg.Count(), notifyCh: make(chan *mergeTaskIntroStatus), mmaped: 1, + trainData: trainingSample, } s.fireEvent(EventKindMergeTaskIntroductionStart, 0) @@ -534,17 +536,22 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, trainingSample = append(trainingSample, segTrainData...) } - numDocs, err := snapshot.DocCount() - if err != nil { - return nil, nil, err - } + // numDocs, err := snapshot.DocCount() + // if err != nil { + // return nil, nil, err + // } + + // harcoding the total docs for now, need to get it from CB level + numDocs := 1000000 trainingSampleSize := math.Ceil(4 * math.Sqrt(float64(numDocs)) * 39) // collect train data only if needed if len(snapshot.trainData) < int(trainingSampleSize) { s.segmentConfig["collectTrainDataCallback"] = collectTrainData + } else { + s.segmentConfig["trainData"] = snapshot.trainData } - s.segmentConfig["trainData"] = snapshot.trainData + // deploy the workers to merge and flush the batches of segments concurrently // and create a new file segment for i := 0; i < numFlushes; i++ { diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 35136fcd3..5b656fa1f 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -1022,6 +1022,10 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { rv.MergeUpdateFieldsInfo(segmentSnapshot.updatedFields) } running += segmentSnapshot.segment.Count() + // persistedSegment, ok := segmentSnapshot.segment.(segment.PersistedSegment) + // if ok { + // fmt.Println("segment path", persistedSegment.Path()) + // } } } return rv, nil From 0d65735f905e4501542fa81979732d14a4e2ba00 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 21 Aug 2025 19:52:06 +0530 Subject: [PATCH 3/9] cleanup debug logs --- index/scorch/introducer.go | 10 +++++++++- index/scorch/merge.go | 4 ++-- index/scorch/persister.go | 4 ---- index/scorch/snapshot_index.go | 16 +++++++++------- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 7965cc5c3..6afda3347 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -129,6 +129,10 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { creator: "introduceSegment", } + if len(root.trainData) > 0 { + newSnapshot.trainData = root.trainData + } + // iterate through current segments var running uint64 var docsToPersistCount, memSegments, fileSegments uint64 @@ -284,6 +288,10 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) { creator: "introducePersist", } + if len(root.trainData) > 0 { + newIndexSnapshot.trainData = root.trainData + } + var docsToPersistCount, memSegments, fileSegments uint64 for i, segmentSnapshot := range root.segment { // see if this segment has been replaced @@ -361,7 +369,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } if len(nextMerge.trainData) > 0 { - newSnapshot.trainData = append(root.trainData, nextMerge.trainData...) + newSnapshot.trainData = nextMerge.trainData } var running, docsToPersistCount, memSegments, fileSegments uint64 diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 254b60bfa..06dde26d5 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -543,10 +543,10 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, // harcoding the total docs for now, need to get it from CB level numDocs := 1000000 - trainingSampleSize := math.Ceil(4 * math.Sqrt(float64(numDocs)) * 39) + trainingSampleSize := math.Ceil(4 * math.Sqrt(float64(numDocs)) * 50) // collect train data only if needed - if len(snapshot.trainData) < int(trainingSampleSize) { + if len(snapshot.trainData)/768 < int(trainingSampleSize) { s.segmentConfig["collectTrainDataCallback"] = collectTrainData } else { s.segmentConfig["trainData"] = snapshot.trainData diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 5b656fa1f..35136fcd3 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -1022,10 +1022,6 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { rv.MergeUpdateFieldsInfo(segmentSnapshot.updatedFields) } running += segmentSnapshot.segment.Count() - // persistedSegment, ok := segmentSnapshot.segment.(segment.PersistedSegment) - // if ok { - // fmt.Println("segment path", persistedSegment.Path()) - // } } } return rv, nil diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index f36625ae1..0ad0cf63c 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -69,13 +69,15 @@ type IndexSnapshot struct { parent *Scorch // POC: trainData is ephemeral and read-only just like []*SegmentSnapshot - trainData [][]float32 - segment []*SegmentSnapshot - offsets []uint64 - internal map[string][]byte - epoch uint64 - size uint64 - creator string + trainData []float32 + // trainSegments []*SegmentSnapshot // either store []float32 or []faissIndexes aka centroid indexes + + segment []*SegmentSnapshot + offsets []uint64 + internal map[string][]byte + epoch uint64 + size uint64 + creator string m sync.Mutex // Protects the fields that follow. refs int64 From 9efce1b749d7acbd96c94959704913e769b8c118 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Wed, 26 Nov 2025 11:14:04 -0800 Subject: [PATCH 4/9] batch training support --- index/scorch/introducer.go | 12 ------------ index/scorch/merge.go | 25 ------------------------- index/scorch/persister.go | 5 +++++ index/scorch/scorch.go | 2 ++ index/scorch/snapshot_index.go | 4 ---- index_alias_impl.go | 4 ++++ index_impl.go | 3 +++ 7 files changed, 14 insertions(+), 41 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 6afda3347..ef26532b0 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -129,10 +129,6 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { creator: "introduceSegment", } - if len(root.trainData) > 0 { - newSnapshot.trainData = root.trainData - } - // iterate through current segments var running uint64 var docsToPersistCount, memSegments, fileSegments uint64 @@ -288,10 +284,6 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) { creator: "introducePersist", } - if len(root.trainData) > 0 { - newIndexSnapshot.trainData = root.trainData - } - var docsToPersistCount, memSegments, fileSegments uint64 for i, segmentSnapshot := range root.segment { // see if this segment has been replaced @@ -368,10 +360,6 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { creator: "introduceMerge", } - if len(nextMerge.trainData) > 0 { - newSnapshot.trainData = nextMerge.trainData - } - var running, docsToPersistCount, memSegments, fileSegments uint64 var droppedSegmentFiles []string newSegmentDeleted := make([]*roaring.Bitmap, len(nextMerge.new)) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 06dde26d5..0e90ecda2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -17,7 +17,6 @@ package scorch import ( "context" "fmt" - "math" "os" "strings" "sync" @@ -419,7 +418,6 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, newCount: seg.Count(), notifyCh: make(chan *mergeTaskIntroStatus), mmaped: 1, - trainData: trainingSample, } s.fireEvent(EventKindMergeTaskIntroductionStart, 0) @@ -484,7 +482,6 @@ type mergedSegmentHistory struct { type segmentMerge struct { id []uint64 new []segment.Segment - trainData [][]float32 mergedSegHistory map[uint64]*mergedSegmentHistory notifyCh chan *mergeTaskIntroStatus mmaped uint32 @@ -531,27 +528,6 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, var em sync.Mutex var errs []error - var trainingSample [][]float32 - collectTrainData := func(segTrainData [][]float32) { - trainingSample = append(trainingSample, segTrainData...) - } - - // numDocs, err := snapshot.DocCount() - // if err != nil { - // return nil, nil, err - // } - - // harcoding the total docs for now, need to get it from CB level - numDocs := 1000000 - trainingSampleSize := math.Ceil(4 * math.Sqrt(float64(numDocs)) * 50) - - // collect train data only if needed - if len(snapshot.trainData)/768 < int(trainingSampleSize) { - s.segmentConfig["collectTrainDataCallback"] = collectTrainData - } else { - s.segmentConfig["trainData"] = snapshot.trainData - } - // deploy the workers to merge and flush the batches of segments concurrently // and create a new file segment for i := 0; i < numFlushes; i++ { @@ -626,7 +602,6 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, mergedSegHistory: make(map[uint64]*mergedSegmentHistory, numSegments), notifyCh: make(chan *mergeTaskIntroStatus), newCount: newMergedCount, - trainData: trainingSample, } // create a history map which maps the old in-memory segments with the specific diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 35136fcd3..6f33ed41a 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -574,6 +574,11 @@ func copyToDirectory(srcPath string, d index.Directory) (int64, error) { return 0, fmt.Errorf("GetWriter err: %v", err) } + // skip + if dest == nil { + return 0, nil + } + sourceFileStat, err := os.Stat(srcPath) if err != nil { return 0, err diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 3566baf03..38556cd77 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -78,6 +78,8 @@ type Scorch struct { persisterNotifier chan *epochWatcher rootBolt *bolt.DB asyncTasks sync.WaitGroup + // not a real searchable segment, singleton + centroidIndex *SegmentSnapshot trainer trainer diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 0ad0cf63c..3585b31d8 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -68,10 +68,6 @@ func init() { type IndexSnapshot struct { parent *Scorch - // POC: trainData is ephemeral and read-only just like []*SegmentSnapshot - trainData []float32 - // trainSegments []*SegmentSnapshot // either store []float32 or []faissIndexes aka centroid indexes - segment []*SegmentSnapshot offsets []uint64 internal map[string][]byte diff --git a/index_alias_impl.go b/index_alias_impl.go index 2839752e2..2669dcec7 100644 --- a/index_alias_impl.go +++ b/index_alias_impl.go @@ -110,6 +110,10 @@ func (i *indexAliasImpl) Train(batch *Batch) error { return ErrorIndexClosed } + if !i.open { + return ErrorIndexClosed + } + err := i.isAliasToSingleIndex() if err != nil { return err diff --git a/index_impl.go b/index_impl.go index ffe1c21a8..2c1dfcef9 100644 --- a/index_impl.go +++ b/index_impl.go @@ -1432,6 +1432,7 @@ func (m *searchHitSorter) Less(i, j int) bool { return c < 0 } +// CopyTo (index.Directory, filter) func (i *indexImpl) CopyTo(d index.Directory) (err error) { i.mutex.RLock() defer i.mutex.RUnlock() @@ -1445,6 +1446,8 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) { return fmt.Errorf("index implementation does not support copy reader") } + // copyIndex.Copy() -> copies the centroid index + copyReader := copyIndex.CopyReader() if copyReader == nil { return fmt.Errorf("index's copyReader is nil") From 186f53e597a6e098b4ae1fe6f629a7c5fc768cca Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 11 Dec 2025 14:43:28 -0800 Subject: [PATCH 5/9] bug fix, debug logging --- index.go | 2 ++ index_impl.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/index.go b/index.go index c083787c4..c8cbcc9f9 100644 --- a/index.go +++ b/index.go @@ -51,10 +51,12 @@ func (b *Batch) Index(id string, data interface{}) error { eventIndex.FireIndexEvent() } doc := document.NewDocument(id) + // fmt.Printf("data is before mapping %#v\n", data) err := b.index.Mapping().MapDocument(doc, data) if err != nil { return err } + // fmt.Printf("data is after mapping %#v\n", doc) b.internal.Update(doc) b.lastDocSize = uint64(doc.Size() + diff --git a/index_impl.go b/index_impl.go index 2c1dfcef9..5e685f740 100644 --- a/index_impl.go +++ b/index_impl.go @@ -328,11 +328,13 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) { i.FireIndexEvent() + // fmt.Printf("data is %#v\n", data) doc := document.NewDocument(id) err = i.m.MapDocument(doc, data) if err != nil { return } + // fmt.Printf("data is after mapping %#v\n", doc) err = i.i.Update(doc) return } From fe8ebe0d586bc6765500946d6984f3f4b6293d74 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 29 Jan 2026 11:02:24 -0800 Subject: [PATCH 6/9] refactor file transfer --- index.go | 2 -- index/scorch/scorch.go | 2 +- index/scorch/snapshot_index.go | 3 +-- index_impl.go | 4 ---- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/index.go b/index.go index c8cbcc9f9..c083787c4 100644 --- a/index.go +++ b/index.go @@ -51,12 +51,10 @@ func (b *Batch) Index(id string, data interface{}) error { eventIndex.FireIndexEvent() } doc := document.NewDocument(id) - // fmt.Printf("data is before mapping %#v\n", data) err := b.index.Mapping().MapDocument(doc, data) if err != nil { return err } - // fmt.Printf("data is after mapping %#v\n", doc) b.internal.Update(doc) b.lastDocSize = uint64(doc.Size() + diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 38556cd77..48bfd7ee7 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -78,7 +78,7 @@ type Scorch struct { persisterNotifier chan *epochWatcher rootBolt *bolt.DB asyncTasks sync.WaitGroup - // not a real searchable segment, singleton + // not a real searchable segment centroidIndex *SegmentSnapshot trainer trainer diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 3585b31d8..688f9d903 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -66,8 +66,7 @@ func init() { } type IndexSnapshot struct { - parent *Scorch - + parent *Scorch segment []*SegmentSnapshot offsets []uint64 internal map[string][]byte diff --git a/index_impl.go b/index_impl.go index 5e685f740..fbbb9bc19 100644 --- a/index_impl.go +++ b/index_impl.go @@ -328,13 +328,11 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) { i.FireIndexEvent() - // fmt.Printf("data is %#v\n", data) doc := document.NewDocument(id) err = i.m.MapDocument(doc, data) if err != nil { return } - // fmt.Printf("data is after mapping %#v\n", doc) err = i.i.Update(doc) return } @@ -1448,8 +1446,6 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) { return fmt.Errorf("index implementation does not support copy reader") } - // copyIndex.Copy() -> copies the centroid index - copyReader := copyIndex.CopyReader() if copyReader == nil { return fmt.Errorf("index's copyReader is nil") From 7048019ea01bed2ff0d12a12f9a3ee8257c464b3 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 29 Jan 2026 11:04:10 -0800 Subject: [PATCH 7/9] implement file transfer APIs --- index.go | 7 ++++ index/scorch/scorch.go | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/index.go b/index.go index c083787c4..41b1559c0 100644 --- a/index.go +++ b/index.go @@ -353,6 +353,13 @@ type IndexCopyable interface { CopyTo(d index.Directory) error } +// IndexFileCopyable is an index supporting the transfer of a single file between +// two indexes +type IndexFileCopyable interface { + UpdateFileInBolt(key []byte, value []byte) error + CopyFile(file string, d index.IndexDirectory) error +} + // FileSystemDirectory is the default implementation for the // index.Directory interface. type FileSystemDirectory string diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 48bfd7ee7..05e818de3 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -15,10 +15,13 @@ package scorch import ( + "bytes" "encoding/json" "fmt" + "io" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -1043,6 +1046,88 @@ func (s *Scorch) CopyReader() index.CopyReader { return rv } +func (s *Scorch) UpdateFileInBolt(key []byte, value []byte) error { + tx, err := s.rootBolt.Begin(true) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) + if err != nil { + return err + } + + // currently this is specific to centroid index file update + if bytes.Equal(key, util.BoltTrainerKey) { + // todo: guard against duplicate updates + trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey) + if err != nil { + return err + } + if trainerBucket == nil { + return fmt.Errorf("trainer bucket not found") + } + existingValue := trainerBucket.Get(util.BoltPathKey) + if existingValue != nil { + return fmt.Errorf("key already exists %v %v", s.path, string(existingValue)) + } + + err = trainerBucket.Put(util.BoltPathKey, value) + if err != nil { + return err + } + } + + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + return nil +} + +// CopyFile copies a specific file to a destination directory which has an access to a bleve index +// doing a io.Copy() isn't enough because the file needs to be tracked in bolt file as well +func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error { + s.rootLock.Lock() + defer s.rootLock.Unlock() + + // this code is currently specific to centroid index file but is future proofed for other files + // to be updated in the dest's bolt + if strings.HasSuffix(file, index.CentroidIndexFileName) { + // centroid index file - this is outside the snapshots domain so the bolt update is different + err := d.UpdateFileInBolt(util.BoltTrainerKey, []byte(file)) + if err != nil { + return fmt.Errorf("error updating dest index bolt: %w", err) + } + } + + dest, err := d.GetWriter(filepath.Join("store", file)) + if err != nil { + return err + } + + source, err := os.Open(filepath.Join(s.path, file)) + if err != nil { + return err + } + + defer source.Close() + defer dest.Close() + _, err = io.Copy(dest, source) + return err +} + // external API to fire a scorch event (EventKindIndexStart) externally from bleve func (s *Scorch) FireIndexEvent() { s.fireEvent(EventKindIndexStart, 0) From 239d16b74cfc6d1178a95c9f4bdc5a226f808b1f Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 29 Jan 2026 15:18:19 -0800 Subject: [PATCH 8/9] move file transfer to train files --- index/scorch/scorch.go | 43 +++++++++--------------------------- index/scorch/train_noop.go | 8 +++++++ index/scorch/train_vector.go | 38 +++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 32 deletions(-) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 05e818de3..629200106 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -15,13 +15,11 @@ package scorch import ( - "bytes" "encoding/json" "fmt" "io" "os" "path/filepath" - "strings" "sync" "sync/atomic" "time" @@ -117,6 +115,10 @@ type trainer interface { loadTrainedData(*bolt.Bucket) error // to fetch the internal data from the component getInternal(key []byte) ([]byte, error) + + // file transfer operations + copyFileLOCKED(file string, d index.IndexDirectory) error + updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error } type ScorchErrorType string @@ -1063,37 +1065,17 @@ func (s *Scorch) UpdateFileInBolt(key []byte, value []byte) error { } // currently this is specific to centroid index file update - if bytes.Equal(key, util.BoltTrainerKey) { - // todo: guard against duplicate updates - trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey) - if err != nil { - return err - } - if trainerBucket == nil { - return fmt.Errorf("trainer bucket not found") - } - existingValue := trainerBucket.Get(util.BoltPathKey) - if existingValue != nil { - return fmt.Errorf("key already exists %v %v", s.path, string(existingValue)) - } - - err = trainerBucket.Put(util.BoltPathKey, value) - if err != nil { - return err - } - } - - err = tx.Commit() + err = s.trainer.updateBolt(snapshotsBucket, key, value) if err != nil { return err } - err = s.rootBolt.Sync() + err = tx.Commit() if err != nil { return err } - return nil + return s.rootBolt.Sync() } // CopyFile copies a specific file to a destination directory which has an access to a bleve index @@ -1102,14 +1084,11 @@ func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error { s.rootLock.Lock() defer s.rootLock.Unlock() - // this code is currently specific to centroid index file but is future proofed for other files + // this code is currently specific to copying trained data but is future proofed for other files // to be updated in the dest's bolt - if strings.HasSuffix(file, index.CentroidIndexFileName) { - // centroid index file - this is outside the snapshots domain so the bolt update is different - err := d.UpdateFileInBolt(util.BoltTrainerKey, []byte(file)) - if err != nil { - return fmt.Errorf("error updating dest index bolt: %w", err) - } + err := s.trainer.copyFileLOCKED(file, d) + if err != nil { + return err } dest, err := d.GetWriter(filepath.Join("store", file)) diff --git a/index/scorch/train_noop.go b/index/scorch/train_noop.go index 49b565e65..3ae5ad155 100644 --- a/index/scorch/train_noop.go +++ b/index/scorch/train_noop.go @@ -45,3 +45,11 @@ func (t *noopTrainer) loadTrainedData(bucket *bolt.Bucket) error { func (t *noopTrainer) getInternal(key []byte) ([]byte, error) { return nil, nil } + +func (t *noopTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error { + return nil +} + +func (t *noopTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error { + return nil +} diff --git a/index/scorch/train_vector.go b/index/scorch/train_vector.go index e764b68c1..00e014c93 100644 --- a/index/scorch/train_vector.go +++ b/index/scorch/train_vector.go @@ -18,6 +18,7 @@ package scorch import ( + "bytes" "fmt" "os" "path/filepath" @@ -281,3 +282,40 @@ func (t *vectorTrainer) getCentroidIndex(field string) (*faiss.IndexImpl, error) } return coarseQuantizer, nil } + +func (t *vectorTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error { + if strings.HasSuffix(file, index.CentroidIndexFileName) { + // centroid index file - this is outside the snapshots domain so the bolt update is different + err := d.UpdateFileInBolt(util.BoltTrainerKey, []byte(file)) + if err != nil { + return fmt.Errorf("error updating dest index bolt: %w", err) + } + } + + return nil +} + +func (t *vectorTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, value []byte) error { + if bytes.Equal(key, util.BoltTrainerKey) { + trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey) + if err != nil { + return err + } + if trainerBucket == nil { + return fmt.Errorf("trainer bucket not found") + } + + // guard against duplicate updates + existingValue := trainerBucket.Get(util.BoltPathKey) + if existingValue != nil { + return fmt.Errorf("key already exists %v %v", t.parent.path, string(existingValue)) + } + + err = trainerBucket.Put(util.BoltPathKey, value) + if err != nil { + return err + } + } + + return nil +} From be7a3f593ca80610341904bb9b54f349d9717ae3 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Wed, 4 Feb 2026 11:53:38 -0800 Subject: [PATCH 9/9] fix file transfer logic --- index/scorch/merge.go | 1 - index/scorch/scorch.go | 14 +++++++------- index/scorch/train_vector.go | 6 ++++++ index_alias_impl.go | 3 --- index_impl.go | 33 ++++++++++++++++++++++++++++++++- 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 0e90ecda2..bca9bbb81 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -363,7 +363,6 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, var seg segment.Segment var filename string - var trainingSample []float32 if len(segmentsToMerge) > 0 { filename = zapFileName(newSegmentID) s.markIneligibleForRemoval(filename) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 629200106..9c3a9ae1d 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -1084,13 +1084,6 @@ func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error { s.rootLock.Lock() defer s.rootLock.Unlock() - // this code is currently specific to copying trained data but is future proofed for other files - // to be updated in the dest's bolt - err := s.trainer.copyFileLOCKED(file, d) - if err != nil { - return err - } - dest, err := d.GetWriter(filepath.Join("store", file)) if err != nil { return err @@ -1104,6 +1097,13 @@ func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error { defer source.Close() defer dest.Close() _, err = io.Copy(dest, source) + if err != nil { + return err + } + + // this code is currently specific to copying trained data but is future proofed for other files + // to be updated in the dest's bolt + err = s.trainer.copyFileLOCKED(file, d) return err } diff --git a/index/scorch/train_vector.go b/index/scorch/train_vector.go index 00e014c93..f8f6ec08c 100644 --- a/index/scorch/train_vector.go +++ b/index/scorch/train_vector.go @@ -315,6 +315,12 @@ func (t *vectorTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, val if err != nil { return err } + + // update the centroid index pointer + t.centroidIndex, err = t.parent.loadSegment(trainerBucket) + if err != nil { + return err + } } return nil diff --git a/index_alias_impl.go b/index_alias_impl.go index 2669dcec7..ee7fbf2a6 100644 --- a/index_alias_impl.go +++ b/index_alias_impl.go @@ -106,9 +106,6 @@ func (i *indexAliasImpl) IndexSynonym(id string, collection string, definition * func (i *indexAliasImpl) Train(batch *Batch) error { i.mutex.RLock() defer i.mutex.RUnlock() - if !i.open { - return ErrorIndexClosed - } if !i.open { return ErrorIndexClosed diff --git a/index_impl.go b/index_impl.go index fbbb9bc19..47026ffa6 100644 --- a/index_impl.go +++ b/index_impl.go @@ -1432,7 +1432,6 @@ func (m *searchHitSorter) Less(i, j int) bool { return c < 0 } -// CopyTo (index.Directory, filter) func (i *indexImpl) CopyTo(d index.Directory) (err error) { i.mutex.RLock() defer i.mutex.RUnlock() @@ -1594,3 +1593,35 @@ func (i *indexImpl) buildTopNCollector(ctx context.Context, req *SearchRequest, } return newCollector(), nil } + +func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) { + i.mutex.RLock() + defer i.mutex.RUnlock() + + if !i.open { + return ErrorIndexClosed + } + + copyIndex, ok := i.i.(index.IndexFileCopyable) + if !ok { + return fmt.Errorf("index implementation does not support copy reader") + } + + return copyIndex.CopyFile(file, d) +} + +func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error { + i.mutex.RLock() + defer i.mutex.RUnlock() + + if !i.open { + return ErrorIndexClosed + } + + copyIndex, ok := i.i.(index.IndexFileCopyable) + if !ok { + return fmt.Errorf("index implementation does not support file copy") + } + + return copyIndex.UpdateFileInBolt(key, value) +}