11use std:: collections:: { BinaryHeap , HashMap } ;
22use std:: fmt:: Debug ;
3- use std:: fs:: { File , OpenOptions } ;
4- use std:: io:: Write ;
5- #[ cfg( unix) ]
6- use std:: os:: unix:: fs:: FileExt ;
7- use std:: sync:: Mutex ;
83
94use rkyv:: api:: high:: to_bytes_with_alloc;
105use rkyv:: ser:: allocator:: Arena ;
116use rkyv:: { Archive , Deserialize , Serialize , from_bytes, rancor} ;
127use system:: metric:: MetricType ;
138use system:: vector_data:: VectorData ;
149use system:: vector_point:: VectorPoint ;
10+ use tokio:: fs:: { File , OpenOptions } ;
11+ use tokio:: io:: { self , AsyncReadExt , AsyncSeekExt , AsyncWriteExt , SeekFrom } ;
12+ use tokio:: sync:: Mutex ;
1513use tracing:: debug;
1614
1715use crate :: in_mem_index:: InMemIndex ;
@@ -58,38 +56,36 @@ impl DiskIndexStorage {
5856 Ok ( DiskIndexStorage { files : vec ! [ ] } )
5957 }
6058
61- pub fn insert ( & mut self , mem_index : & InMemIndex ) -> Result < ( ) , std :: io :: Error > {
59+ pub async fn insert ( & mut self , mem_index : & InMemIndex ) -> io :: Result < ( ) > {
6260 let file_count = self . files . len ( ) ;
6361 let data_file = OpenOptions :: new ( )
6462 . read ( true )
6563 . write ( true )
6664 . create ( true )
67- . open ( format ! ( "disk_view_data_{}.bin" , file_count) ) ?;
65+ . open ( format ! ( "disk_view_data_{}.bin" , file_count) )
66+ . await ?;
6867 let graph_file = OpenOptions :: new ( )
6968 . read ( true )
7069 . write ( true )
7170 . create ( true )
72- . open ( format ! ( "disk_view_index_{}.bin" , file_count) ) ?;
71+ . open ( format ! ( "disk_view_index_{}.bin" , file_count) )
72+ . await ?;
7373 self . files . push ( IndexFiles {
7474 data_file : Mutex :: new ( data_file) ,
7575 graph_file : Mutex :: new ( graph_file) ,
7676 } ) ;
7777
78- self . write_graph ( mem_index) ?;
79- self . write_data ( mem_index) ?;
78+ self . write_graph ( mem_index) . await ?;
79+ self . write_data ( mem_index) . await ?;
8080 Ok ( ( ) )
8181 }
8282
83- fn write_graph ( & mut self , mem_index : & InMemIndex ) -> Result < ( ) , std :: io :: Error > {
83+ async fn write_graph ( & mut self , mem_index : & InMemIndex ) -> io :: Result < ( ) > {
8484 let last_file =
8585 self . files . last ( ) . ok_or_else ( || std:: io:: Error :: other ( "No files in the list" ) ) ?;
8686
87- let mut file_guard = last_file
88- . graph_file
89- . lock ( )
90- . map_err ( |e| std:: io:: Error :: other ( format ! ( "Failed to lock file Mutex: {}" , e) ) ) ?;
91-
92- let mut writer = std:: io:: BufWriter :: new ( & mut * file_guard) ;
87+ let mut file_guard = last_file. graph_file . lock ( ) . await ;
88+ let file: & mut File = & mut file_guard;
9389
9490 let binding =
9591 mem_index. points . iter ( ) . next ( ) . ok_or_else ( || {
@@ -114,11 +110,11 @@ impl DiskIndexStorage {
114110 let mut arena = Arena :: new ( ) ;
115111 let meta_bytes = to_bytes_with_alloc :: < _ , rancor:: Error > ( & disk_meta, arena. acquire ( ) )
116112 . map_err ( |e| std:: io:: Error :: other ( e. to_string ( ) ) ) ?;
117- writer . write_all ( & meta_bytes) ?;
113+ file . write_all ( & meta_bytes) . await ?;
118114
119115 let padding_len = BLOCK_SIZE . saturating_sub ( meta_bytes. len ( ) ) ;
120116 if padding_len > 0 {
121- writer . write_all ( & vec ! [ 0u8 ; padding_len] ) ?;
117+ file . write_all ( & vec ! [ 0u8 ; padding_len] ) . await ?;
122118 }
123119 let mut block_buffer = Vec :: with_capacity ( BLOCK_SIZE ) ;
124120 let mut nodes_in_current_block = 0 ;
@@ -151,7 +147,7 @@ impl DiskIndexStorage {
151147 block_buffer. resize ( BLOCK_SIZE , 0 ) ;
152148 }
153149
154- writer . write_all ( & block_buffer) ?;
150+ file . write_all ( & block_buffer) . await ?;
155151
156152 block_buffer. clear ( ) ;
157153 nodes_in_current_block = 0 ;
@@ -161,21 +157,14 @@ impl DiskIndexStorage {
161157 if block_buffer. len ( ) < BLOCK_SIZE {
162158 block_buffer. resize ( BLOCK_SIZE , 0 ) ;
163159 }
164- writer . write_all ( & block_buffer) ?;
160+ file . write_all ( & block_buffer) . await ?;
165161 }
166- writer. flush ( ) ?;
167162 Ok ( ( ) )
168163 }
169164
170- fn write_data ( & mut self , mem_index : & InMemIndex ) -> Result < ( ) , std:: io:: Error > {
171- let mut file_guard = self
172- . files
173- . last ( )
174- . unwrap ( )
175- . data_file
176- . lock ( )
177- . map_err ( |e| std:: io:: Error :: other ( format ! ( "Failed to lock file Mutex: {}" , e) ) ) ?;
178- let mut writer = std:: io:: BufWriter :: new ( & mut * file_guard) ;
165+ async fn write_data ( & mut self , mem_index : & InMemIndex ) -> io:: Result < ( ) > {
166+ let mut file_guard = self . files . last ( ) . unwrap ( ) . data_file . lock ( ) . await ;
167+ let file: & mut File = & mut file_guard;
179168 if mem_index. points . is_empty ( ) {
180169 return Err ( std:: io:: Error :: other ( "Empty index" ) ) ;
181170 }
@@ -210,7 +199,7 @@ impl DiskIndexStorage {
210199 block_buffer. resize ( BLOCK_SIZE , 0 ) ;
211200 }
212201
213- writer . write_all ( & block_buffer) ?;
202+ file . write_all ( & block_buffer) . await ?;
214203
215204 block_buffer. clear ( ) ;
216205 nodes_in_current_block = 0 ;
@@ -220,22 +209,18 @@ impl DiskIndexStorage {
220209 if block_buffer. len ( ) < BLOCK_SIZE {
221210 block_buffer. resize ( BLOCK_SIZE , 0 ) ;
222211 }
223- writer . write_all ( & block_buffer) ?;
212+ file . write_all ( & block_buffer) . await ?;
224213 }
225- writer. flush ( ) ?;
226214 Ok ( ( ) )
227215 }
228216
229- pub fn get_nodes (
217+ pub async fn get_nodes (
230218 & self , file : & Mutex < File > , block_offset : u64 , total_nodes : u64 ,
231219 ) -> Result < Vec < DiskNode > , std:: io:: Error > {
232220 let mut nodes = Vec :: with_capacity ( total_nodes as usize ) ;
233221 let mut block_offset = block_offset;
234222 while nodes. len ( ) < total_nodes as usize {
235- let file_guard = file
236- . lock ( )
237- . map_err ( |e| std:: io:: Error :: other ( format ! ( "Failed to lock file Mutex: {}" , e) ) ) ?;
238- let disk_bytes = Self :: read_bytes ( file_guard, block_offset) ?;
223+ let disk_bytes = Self :: read_bytes ( file, block_offset) . await ?;
239224 let mut offset = 0 ;
240225 while offset + 8 < BLOCK_SIZE && nodes. len ( ) < total_nodes as usize {
241226 if offset % ALIGNMENT != 0 {
@@ -262,18 +247,14 @@ impl DiskIndexStorage {
262247 Ok ( nodes)
263248 }
264249
265- pub fn get_points (
250+ pub async fn get_points (
266251 & self , file : & Mutex < File > , block_offset : u64 , nodes_per_block : u64 , point_size : u64 ,
267252 total_nodes : u64 ,
268- ) -> Result < Vec < VectorPoint > , std :: io :: Error > {
253+ ) -> io :: Result < Vec < VectorPoint > > {
269254 let mut block_offset = block_offset;
270255 let mut points = Vec :: with_capacity ( nodes_per_block as usize ) ;
271256 while points. len ( ) < total_nodes as usize {
272- let file_guard = file
273- . lock ( )
274- . map_err ( |e| std:: io:: Error :: other ( format ! ( "Failed to lock file Mutex: {}" , e) ) ) ?;
275-
276- let disk_bytes = Self :: read_bytes ( file_guard, block_offset) ?;
257+ let disk_bytes = Self :: read_bytes ( file, block_offset) . await ?;
277258 let mut offset = 0 ;
278259 while offset < BLOCK_SIZE && points. len ( ) < total_nodes as usize {
279260 if offset % ALIGNMENT != 0 {
@@ -292,12 +273,8 @@ impl DiskIndexStorage {
292273 }
293274
294275 #[ inline( always) ]
295- fn get_disk_meta ( & self , file : & Mutex < File > ) -> Result < DiskMeta , std:: io:: Error > {
296- let file_guard = file
297- . lock ( )
298- . map_err ( |e| std:: io:: Error :: other ( format ! ( "Failed to lock file Mutex: {}" , e) ) ) ?;
299-
300- let meta_bytes = Self :: read_bytes ( file_guard, 0 ) ?;
276+ async fn get_disk_meta ( & self , file : & Mutex < File > ) -> Result < DiskMeta , std:: io:: Error > {
277+ let meta_bytes = Self :: read_bytes ( file, 0 ) . await ?;
301278
302279 let size = std:: mem:: size_of :: < DiskMeta > ( ) ;
303280 let meta = from_bytes :: < DiskMeta , rancor:: Error > ( & meta_bytes[ 0 ..size] )
@@ -306,30 +283,33 @@ impl DiskIndexStorage {
306283 Ok ( meta)
307284 }
308285
309- pub fn search (
286+ pub async fn search (
310287 & self , query : & VectorData , l : usize , visited_map : & mut HashMap < u64 , VectorPoint > ,
311288 ) {
312289 for file in self . files . iter ( ) {
313- let disk_meta = self . get_disk_meta ( & file. graph_file ) ;
290+ let disk_meta = self . get_disk_meta ( & file. graph_file ) . await ;
314291 let Ok ( disk_meta) = disk_meta else {
315292 debug ! ( "Disk metadata not found" ) ;
316293 continue ;
317294 } ;
318295 debug ! ( "Disk Meta found: {:?}" , disk_meta) ;
319296
320- let Ok ( nodes) = self . get_nodes ( & file. graph_file , 1 , disk_meta. total_nodes ) else {
297+ let Ok ( nodes) = self . get_nodes ( & file. graph_file , 1 , disk_meta. total_nodes ) . await else {
321298 debug ! ( "Nodes not found" ) ;
322299 continue ;
323300 } ;
324301 debug ! ( "Nodes {:?}" , nodes. iter( ) . map( |n| n. id) . collect:: <Vec <_>>( ) ) ;
325302
326- let Ok ( points) = self . get_points (
327- & file. data_file ,
328- 0 ,
329- disk_meta. nodes_per_block ,
330- disk_meta. point_size * 2 ,
331- disk_meta. total_nodes ,
332- ) else {
303+ let Ok ( points) = self
304+ . get_points (
305+ & file. data_file ,
306+ 0 ,
307+ disk_meta. nodes_per_block ,
308+ disk_meta. point_size * 2 ,
309+ disk_meta. total_nodes ,
310+ )
311+ . await
312+ else {
333313 debug ! ( "Points not found in file" ) ;
334314 continue ;
335315 } ;
@@ -404,19 +384,16 @@ impl DiskIndexStorage {
404384 }
405385 }
406386
407- fn read_bytes (
408- file_guard : std :: sync :: MutexGuard < ' _ , File > , block_offset : u64 ,
409- ) -> Result < [ u8 ; BLOCK_SIZE ] , std :: io :: Error > {
387+ async fn read_bytes (
388+ file : & Mutex < File > , block_offset : u64 ,
389+ ) -> std :: io :: Result < [ u8 ; BLOCK_SIZE ] > {
410390 let mut bytes = [ 0u8 ; BLOCK_SIZE ] ;
411391 let offset = block_offset * BLOCK_SIZE as u64 ;
412- #[ cfg( unix) ]
413- file_guard. read_at ( & mut bytes, offset) ?;
414- #[ cfg( not( unix) ) ]
415- {
416- let mut file = & * file_guard;
417- file. seek ( SeekFrom :: Start ( offset) ) ?;
418- file. read_exact ( & mut bytes) ?;
419- }
392+
393+ let mut f = file. lock ( ) . await ;
394+ f. seek ( SeekFrom :: Start ( offset) ) . await ?;
395+ f. read_exact ( & mut bytes) . await ?;
396+
420397 Ok ( bytes)
421398 }
422399}
0 commit comments