Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ tokio = { version = "1.47.1", features = ["full"] }
tonic = { version = "0.14.2", features = ["transport"] }
tonic-prost = "0.14.2"
prost = "0.14"
futures = "0.3"
serde = { version = "1.0.210", features = ["derive", "rc"] }
serde_json = "1.0"
uuid = { version = "1.3", features = ["v4"] }
Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ service = { path = "../nyas/service" }
tokio.workspace = true
tonic.workspace = true
cpu-time = "1.0.0"
futures.workspace = true

[[bin]]
name = "sift10k_index"
Expand Down
195 changes: 99 additions & 96 deletions examples/src/index_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::path::Path;

use diskann::index_view::IndexView;
use system::vector_data::VectorData;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, BufReader};
use tokio::io;

fn env_usize(name: &str) -> Option<usize> {
std::env::var(name).ok()?.parse::<usize>().ok().filter(|v| *v > 0)
}

#[derive(Debug)]
pub struct SiftDataset {
Expand All @@ -14,119 +17,108 @@ pub struct SiftDataset {

impl SiftDataset {
pub async fn from_fvecs<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);

let mut vectors = Vec::new();
let mut dimension = 0u32;
let mut buffer = [0u8; 4];
let bytes = tokio::fs::read(path).await?;
if bytes.len() < 4 {
return Ok(SiftDataset { dimension: 0, vectors: Vec::new() });
}

if reader.read_exact(&mut buffer).await.is_ok() {
dimension = u32::from_le_bytes(buffer);
let dimension = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let record_size = 4usize + 4usize * dimension as usize;
if record_size == 0 || bytes.len() % record_size != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid fvecs layout or truncated file",
));
}

// Read first vector
let mut vec = vec![0f32; dimension as usize];
for item in vec.iter_mut().take(dimension as usize) {
reader.read_exact(&mut buffer).await?;
*item = f32::from_le_bytes(buffer);
let mut vectors = Vec::with_capacity(bytes.len() / record_size);
let mut offset = 0usize;
while offset + record_size <= bytes.len() {
let dim = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
if dim != dimension {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Inconsistent vector dimensions",
));
}
let vec_data = VectorData::from_f32(vec);
vectors.push(vec_data);
}

loop {
match reader.read_exact(&mut buffer).await {
Ok(_) => {
let dim = u32::from_le_bytes(buffer);
if dim != dimension {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Inconsistent vector dimensions",
));
}

let mut vec = vec![0f32; dimension as usize];
for item in vec.iter_mut().take(dimension as usize) {
reader.read_exact(&mut buffer).await?;
*item = f32::from_le_bytes(buffer);
}
let vec_data = VectorData::from_f32(vec);
vectors.push(vec_data);
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
let mut vec = vec![0f32; dimension as usize];
let data_start = offset + 4;
for (i, item) in vec.iter_mut().enumerate() {
let s = data_start + i * 4;
*item = f32::from_le_bytes(bytes[s..s + 4].try_into().unwrap());
}
vectors.push(VectorData::from_f32(vec));
offset += record_size;
}

Ok(SiftDataset { dimension, vectors })
}

#[allow(dead_code)]
pub async fn from_bvecs<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);

let mut vectors = Vec::new();
let mut dimension = 0u32;
let mut buffer = [0u8; 4];

if reader.read_exact(&mut buffer).await.is_ok() {
dimension = u32::from_le_bytes(buffer);
let bytes = tokio::fs::read(path).await?;
if bytes.len() < 4 {
return Ok(SiftDataset { dimension: 0, vectors: Vec::new() });
}

let mut vec = vec![0u8; dimension as usize];
reader.read_exact(&mut vec).await?;
let vec = vec.iter().map(|&x| x as f32).collect::<Vec<f32>>();
let vec_data = VectorData::from_f32(vec);
vectors.push(vec_data);
let dimension = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let record_size = 4usize + dimension as usize;
if record_size == 0 || bytes.len() % record_size != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid bvecs layout or truncated file",
));
}

loop {
match reader.read_exact(&mut buffer).await {
Ok(_) => {
let dim = u32::from_le_bytes(buffer);
if dim != dimension {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Inconsistent vector dimensions",
));
}

let mut vec = vec![0u8; dimension as usize];
reader.read_exact(&mut vec).await?;
let vec = vec.iter().map(|&x| x as f32).collect::<Vec<f32>>();
let vec_data = VectorData::from_f32(vec);
vectors.push(vec_data);
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
let mut vectors = Vec::with_capacity(bytes.len() / record_size);
let mut offset = 0usize;
while offset + record_size <= bytes.len() {
let dim = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap());
if dim != dimension {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Inconsistent vector dimensions",
));
}
let data_start = offset + 4;
let vec = bytes[data_start..data_start + dimension as usize]
.iter()
.map(|&x| x as f32)
.collect::<Vec<f32>>();
vectors.push(VectorData::from_f32(vec));
offset += record_size;
}

Ok(SiftDataset { dimension, vectors })
}

pub async fn from_ivecs<P: AsRef<Path>>(path: P) -> io::Result<Vec<Vec<u32>>> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);
let bytes = tokio::fs::read(path).await?;
if bytes.is_empty() {
return Ok(Vec::new());
}

let mut results = Vec::new();
let mut buffer = [0u8; 4];

loop {
match reader.read_exact(&mut buffer).await {
Ok(_) => {
let k = u32::from_le_bytes(buffer);
let mut neighbors = vec![0u32; k as usize];

for item in neighbors.iter_mut().take(k as usize) {
reader.read_exact(&mut buffer).await?;
*item = u32::from_le_bytes(buffer);
}
results.push(neighbors);
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
let mut offset = 0usize;
while offset + 4 <= bytes.len() {
let k = u32::from_le_bytes(bytes[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
let record_bytes = k * 4;
if offset + record_bytes > bytes.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid ivecs layout or truncated file",
));
}

let mut neighbors = Vec::with_capacity(k);
for i in 0..k {
let s = offset + i * 4;
neighbors.push(u32::from_le_bytes(bytes[s..s + 4].try_into().unwrap()));
}
results.push(neighbors);
offset += record_bytes;
}

Ok(results)
Expand All @@ -143,7 +135,6 @@ pub fn compute_recall(results: &[Vec<u32>], ground_truth: &[Vec<u32>], k: usize)
let truth_set: HashSet<u32> = truth.iter().take(k).copied().collect();

let matches = result.iter().take(k).filter(|&&idx| truth_set.contains(&idx)).count();
println!("Matches: {}", matches);
total_matches += matches;
}

Expand All @@ -154,11 +145,23 @@ pub fn compute_recall(results: &[Vec<u32>], ground_truth: &[Vec<u32>], k: usize)
pub async fn compute_recall_at_k(
index_view: &IndexView, query: &SiftDataset, ground_truth: &[Vec<u32>], k: usize,
) {
let mut results = Vec::new();
for q in query.vectors.iter() {
let res = index_view.search(q, k, 128).await;
results.push(res);
}
use futures::StreamExt;

let query_len = query.vectors.len();
let parallelism = env_usize("NYAS_SEARCH_CONCURRENCY").unwrap_or_else(|| {
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4).max(4)
});

let mut indexed_results: Vec<(usize, Vec<u32>)> =
futures::stream::iter(query.vectors.iter().enumerate())
.map(|(i, q)| async move { (i, index_view.search(q, k, 128).await) })
.buffer_unordered(parallelism)
.collect()
.await;

indexed_results.sort_unstable_by_key(|(i, _)| *i);
let results: Vec<Vec<u32>> = indexed_results.into_iter().map(|(_, r)| r).collect();
debug_assert_eq!(results.len(), query_len);

let recall = compute_recall(&results, ground_truth, k);
println!("Recall for k: {}: {:?}", k, recall);
Expand Down
Loading