|
| 1 | +use std::time::{Duration, SystemTime}; |
| 2 | + |
| 3 | +use cpu_time::ProcessTime; |
| 4 | +use service::vector::vector_db_client::VectorDbClient; |
| 5 | +use service::vector::{InsertVectorRequest, SearchVectorRequest}; |
| 6 | +use tokio::io; |
| 7 | +use tonic::Request; |
| 8 | + |
| 9 | +use crate::index_utils::SiftDataset; |
| 10 | +mod index_utils; |
| 11 | + |
| 12 | +#[tokio::main] |
| 13 | +async fn main() -> io::Result<()> { |
| 14 | + let base_folder = "examples/data/siftsmall"; |
| 15 | + let start_cpu = ProcessTime::now(); |
| 16 | + let start_wall = SystemTime::now(); |
| 17 | + |
| 18 | + let (base, query, ground_truth) = tokio::try_join!( |
| 19 | + SiftDataset::from_fvecs(format!("{}/siftsmall_base.fvecs", base_folder)), |
| 20 | + SiftDataset::from_fvecs(format!("{}/siftsmall_query.fvecs", base_folder)), |
| 21 | + SiftDataset::from_ivecs(format!("{}/siftsmall_groundtruth.ivecs", base_folder)) |
| 22 | + )?; |
| 23 | + |
| 24 | + println!("Base dataset: {} vectors of dimension {}", base.vectors.len(), base.dimension); |
| 25 | + println!("Query dataset: {} vectors of dimension {}", query.vectors.len(), query.dimension); |
| 26 | + println!("Ground truth: {} queries", ground_truth.len()); |
| 27 | + |
| 28 | + let mut client = VectorDbClient::connect("http://0.0.0.0:50051") |
| 29 | + .await |
| 30 | + .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; |
| 31 | + |
| 32 | + // Since we are connecting via gRPC, we don't check for local file existence here |
| 33 | + // as easily. For simplicity in this example, we'll try to insert if not found |
| 34 | + // or just always try to insert (server can handle duplicates if implemented). |
| 35 | + // However, the original code had: |
| 36 | + // let path = Path::new(index_name); |
| 37 | + // if !path.exists() { ... } |
| 38 | + |
| 39 | + // We'll skip the check for now as the server manages the index. |
| 40 | + // If we want to avoid re-inserting, we'd need a way to check if index is built on server. |
| 41 | + |
| 42 | + println!("Inserting vectors..."); |
| 43 | + for (index, vector) in base.vectors.iter().enumerate() { |
| 44 | + let request = Request::new(InsertVectorRequest { |
| 45 | + id: index.to_string(), |
| 46 | + vector: vector.to_f32_vec(), |
| 47 | + }); |
| 48 | + |
| 49 | + let _response = client.insert_vector(request).await.map_err(io::Error::other)?; |
| 50 | + |
| 51 | + if index % 1000 == 0 && index > 0 { |
| 52 | + println!("Inserted {} vectors", index); |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + let index_cpu_time = start_cpu.elapsed(); |
| 57 | + let index_wall_time = start_wall.elapsed().unwrap(); |
| 58 | + println!("Indexing time: CPU {:?}, Wall {:?}", index_cpu_time, index_wall_time); |
| 59 | + |
| 60 | + for k in [1, 10, 100] { |
| 61 | + let mut results = Vec::new(); |
| 62 | + for q in query.vectors.iter() { |
| 63 | + let request = |
| 64 | + Request::new(SearchVectorRequest { vector: q.to_f32_vec(), top_k: k as u32 }); |
| 65 | + let response = client.search_vector(request).await.map_err(io::Error::other)?; |
| 66 | + let search_res = response.into_inner(); |
| 67 | + |
| 68 | + // Map string IDs back to u32 |
| 69 | + let u32_ids: Vec<u32> = |
| 70 | + search_res.ids.iter().filter_map(|id: &String| id.parse::<u32>().ok()).collect(); |
| 71 | + results.push(u32_ids); |
| 72 | + } |
| 73 | + let recall = index_utils::compute_recall(&results, &ground_truth, k); |
| 74 | + println!("Recall for k={}: {}", k, recall); |
| 75 | + } |
| 76 | + |
| 77 | + let cpu_time: Duration = start_cpu.elapsed(); |
| 78 | + let wall_time = start_wall.elapsed().unwrap(); |
| 79 | + |
| 80 | + println!("Total CPU time: {:?}", cpu_time); |
| 81 | + println!("Total Wall time: {:?}", wall_time); |
| 82 | + |
| 83 | + Ok(()) |
| 84 | +} |
0 commit comments